blob: 1b8c59d23953bbba99a42d939ad6867628b931cf [file] [log] [blame]
// Copyright 2019 The LUCI Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// Package redisconn implements integration with a Redis connection pool.
//
// Usage as a server module:
//
// func main() {
// modules := []module.Module{
// redisconn.NewModuleFromFlags(),
// }
// server.Main(nil, modules, func(srv *server.Server) error {
// srv.Routes.GET("/", ..., func(c *router.Context) {
// conn, err := redisconn.Get(c.Context)
// if err != nil {
// // handle error
// }
// defer conn.Close()
// // use Redis API via `conn`
// })
// return nil
// })
// }
//
// When used that way, Redis is also installed as the default implementation
// of caching.BlobCache (which basically speeds up various internal guts of
// the LUCI server framework).
//
// Can also be used as a low-level Redis connection pool library, see
// NewPool(...)
package redisconn
import (
"context"
"time"
"github.com/gomodule/redigo/redis"
"go.chromium.org/luci/common/errors"
"go.chromium.org/luci/common/logging"
"go.chromium.org/luci/common/tsmon/field"
"go.chromium.org/luci/common/tsmon/metric"
"go.chromium.org/luci/common/tsmon/types"
)
// ErrNotConfigured is returned by Get if the context has no Redis pool inside.
var ErrNotConfigured = errors.New("Redis connection pool is not configured")
// Per-pool metrics derived from redis.Pool.Stats() by ReportStats.
var (
connsMetric = metric.NewInt(
"redis/pool/conns",
"The number of connections in the pool (idle or in-use depending if state field)",
&types.MetricMetadata{},
field.String("pool"), // e.g. "default"
field.String("state"), // either "idle" or "in-use"
)
waitCountMetric = metric.NewCounter(
"redis/pool/wait_count",
"The total number of connections waited for.",
&types.MetricMetadata{},
field.String("pool"), // e.g. "default"
)
waitDurationMetric = metric.NewCounter(
"redis/pool/wait_duration",
"The total time blocked waiting for a new connection.",
&types.MetricMetadata{Units: types.Microseconds},
field.String("pool"), // e.g. "default"
)
)
// NewPool returns a new pool configured with default parameters.
//
// "addr" is TCP "host:port" of a Redis server to connect to. No actual
// connection is established yet (this happens first time the pool is used).
//
// "db" is a index of a logical DB to SELECT in the connection by default,
// see https://redis.io/commands/select. It can be used as a weak form of
// namespacing. It is easy to bypass though, so please do not depend on it
// for anything critical (better to setup multiple Redis instances in this
// case).
//
// Doesn't use any authentication or encryption.
func NewPool(addr string, db int) *redis.Pool {
// TODO(vadimsh): Tune the parameters or make them configurable. The values
// below were picked somewhat arbitrarily.
return &redis.Pool{
MaxIdle: 64,
MaxActive: 512,
IdleTimeout: 3 * time.Minute,
Wait: true, // if all connections are busy, wait for an available one
DialContext: func(ctx context.Context) (redis.Conn, error) {
logging.Debugf(ctx, "Opening new Redis connection to %q...", addr)
conn, err := redis.Dial("tcp", addr,
redis.DialDatabase(db),
redis.DialConnectTimeout(5*time.Second),
redis.DialReadTimeout(5*time.Second),
redis.DialWriteTimeout(5*time.Second),
)
if err != nil {
return nil, errors.Annotate(err, "redis").Err()
}
return conn, nil
},
// If the connection was idle for more than a minute, verify it is still
// alive by pinging the server.
TestOnBorrow: func(c redis.Conn, t time.Time) error {
if time.Since(t) < time.Minute {
return nil
}
_, err := c.Do("PING")
return err
},
}
}
var contextKey = "redisconn.Pool"
// UsePool installs a connection pool into the context, to be used by Get.
func UsePool(ctx context.Context, pool *redis.Pool) context.Context {
return context.WithValue(ctx, &contextKey, pool)
}
// GetPool returns a connection pool in the context or nil if not there.
func GetPool(ctx context.Context) *redis.Pool {
p, _ := ctx.Value(&contextKey).(*redis.Pool)
return p
}
// ReportStats reports the connection pool stats as tsmon metrics.
//
// For best results should be called once a minute or right before tsmon flush.
//
// "name" is used as "pool" metric field, to distinguish pools between each
// other.
func ReportStats(ctx context.Context, pool *redis.Pool, name string) {
stats := pool.Stats()
connsMetric.Set(ctx, int64(stats.IdleCount), name, "idle")
connsMetric.Set(ctx, int64(stats.ActiveCount-stats.IdleCount), name, "in-use")
waitCountMetric.Set(ctx, int64(stats.WaitCount), name)
waitDurationMetric.Set(ctx, int64(stats.WaitDuration.Nanoseconds()/1000), name)
}
// Get returns a Redis connection using the pool installed in the context.
//
// May block until such connection is available. Returns an error if the
// context expires before that. The returned connection itself is not associated
// with the context and can outlive it.
//
// The connection MUST be explicitly closed as soon as it's no longer needed,
// otherwise leaks and slow downs are eminent.
func Get(ctx context.Context) (redis.Conn, error) {
if p := GetPool(ctx); p != nil {
return p.GetContext(ctx)
}
return nil, ErrNotConfigured
}