Skip to content

Commit

Permalink
Merge pull request #2244 from go-redis/feat/metrics
Browse files Browse the repository at this point in the history
Add OpenTelemetry Metrics using new hooks
  • Loading branch information
vmihailenco authored Oct 12, 2022
2 parents d07457b + 0dff3d1 commit 180f107
Show file tree
Hide file tree
Showing 25 changed files with 1,134 additions and 719 deletions.
96 changes: 30 additions & 66 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,70 +1,34 @@
# [9.0.0-beta.2](https://github.com/go-redis/redis/compare/v9.0.0-beta.1...v9.0.0-beta.2) (2022-07-28)


### Bug Fixes

* [#2114](https://github.com/go-redis/redis/issues/2114) for redis-server not support Hello ([b6d2a92](https://github.com/go-redis/redis/commit/b6d2a925297e3e516eb5c76c114c1c9fcd5b68c5))
* additional node failures in clustered pipelined reads ([03376a5](https://github.com/go-redis/redis/commit/03376a5d9c7dfd7197b14ce13b24a0431a07a663))
* disregard failed pings in updateLatency() for cluster nodes ([64f972f](https://github.com/go-redis/redis/commit/64f972fbeae401e52a2c066a0e1c922af617e15c))
* don't panic when test cannot start ([9e16c79](https://github.com/go-redis/redis/commit/9e16c79951e7769621b7320f1ecdf04baf539b82))
* handle panic in ringShards Hash function when Ring got closed ([a80b84f](https://github.com/go-redis/redis/commit/a80b84f01f9fc0d3e6f08445ba21f7e07880775e)), closes [#2126](https://github.com/go-redis/redis/issues/2126)
* ignore Nil error when reading EntriesRead ([89d6dfe](https://github.com/go-redis/redis/commit/89d6dfe09a88321d445858c1c5b24d2757b95a3e))
* log errors from cmdsInfoCache ([fa4d1ea](https://github.com/go-redis/redis/commit/fa4d1ea8398cd729ad5cbaaff88e4b8805393945))
* provide a signal channel to end heartbeat goroutine ([f032c12](https://github.com/go-redis/redis/commit/f032c126db3e2c1a239ce1790b0ab81994df75cf))
* remove conn reaper from the pool and uptrace option names ([f6a8adc](https://github.com/go-redis/redis/commit/f6a8adc50cdaec30527f50d06468f9176ee674fe))
* replace heartbeat signal channel with context.WithCancel ([20d0ca2](https://github.com/go-redis/redis/commit/20d0ca235efff48ad48cc05b98790b825d4ba979))



# [9.0.0-beta.1](https://github.com/go-redis/redis/compare/v8.11.5...v9.0.0-beta.1) (2022-06-04)

### Bug Fixes

- **#1943:** xInfoConsumer.Idle should be time.Duration instead of int64
([#2052](https://github.com/go-redis/redis/issues/2052))
([997ab5e](https://github.com/go-redis/redis/commit/997ab5e7e3ddf53837917013a4babbded73e944f)),
closes [#1943](https://github.com/go-redis/redis/issues/1943)
- add XInfoConsumers test
([6f1a1ac](https://github.com/go-redis/redis/commit/6f1a1ac284ea3f683eeb3b06a59969e8424b6376))
- fix tests
([3a722be](https://github.com/go-redis/redis/commit/3a722be81180e4d2a9cf0a29dc9a1ee1421f5859))
- remove test(XInfoConsumer.idle), not a stable return value when tested.
([f5fbb36](https://github.com/go-redis/redis/commit/f5fbb367e7d9dfd7f391fc535a7387002232fa8a))
- update ChannelWithSubscriptions to accept options
([c98c5f0](https://github.com/go-redis/redis/commit/c98c5f0eebf8d254307183c2ce702a48256b718d))
- update COMMAND parser for Redis 7
([b0bb514](https://github.com/go-redis/redis/commit/b0bb514059249e01ed7328c9094e5b8a439dfb12))
- use redis over ssh channel([#2057](https://github.com/go-redis/redis/issues/2057))
([#2060](https://github.com/go-redis/redis/issues/2060))
([3961b95](https://github.com/go-redis/redis/commit/3961b9577f622a3079fe74f8fc8da12ba67a77ff))

### Features

- add ClientUnpause
([91171f5](https://github.com/go-redis/redis/commit/91171f5e19a261dc4cfbf8706626d461b6ba03e4))
- add NewXPendingResult for unit testing XPending
([#2066](https://github.com/go-redis/redis/issues/2066))
([b7fd09e](https://github.com/go-redis/redis/commit/b7fd09e59479bc6ed5b3b13c4645a3620fd448a3))
- add WriteArg and Scan net.IP([#2062](https://github.com/go-redis/redis/issues/2062))
([7d5167e](https://github.com/go-redis/redis/commit/7d5167e8624ac1515e146ed183becb97dadb3d1a))
- **pool:** add check for badConnection
([a8a7665](https://github.com/go-redis/redis/commit/a8a7665ddf8cc657c5226b1826a8ee83dab4b8c1)),
closes [#2053](https://github.com/go-redis/redis/issues/2053)
- provide a username and password callback method, so that the plaintext username and password will
not be stored in the memory, and the username and password will only be generated once when the
CredentialsProvider is called. After the method is executed, the username and password strings on
the stack will be released. ([#2097](https://github.com/go-redis/redis/issues/2097))
([56a3dbc](https://github.com/go-redis/redis/commit/56a3dbc7b656525eb88e0735e239d56e04a23bee))
- upgrade to Redis 7
([d09c27e](https://github.com/go-redis/redis/commit/d09c27e6046129fd27b1d275e5a13a477bd7f778))

## v9 UNRELEASED

### Added

- Added support for [RESP3](https://github.com/antirez/RESP3/blob/master/spec.md) protocol.
- Removed `Pipeline.Close` since there is no real need to explicitly manage pipeline resources.
`Pipeline.Discard` is still available if you want to reset commands for some reason.
- Replaced `*redis.Z` with `redis.Z` since it is small enough to be passed as value.
- Renamed `MaxConnAge` to `ConnMaxLifetime`.
- Renamed `IdleTimeout` to `ConnMaxIdleTime`.
Contributed by @monkey92t who has done a lot of work recently.
- Added `ContextTimeoutEnabled` option that controls whether the client respects context timeouts
and deadlines. See
[Redis Timeouts](https://redis.uptrace.dev/guide/go-redis-debugging.html#timeouts) for details.
- Added `ParseClusterURL` to parse URLs into `ClusterOptions`, for example,
`redis://user:password@localhost:6789?dial_timeout=3&read_timeout=6s&addr=localhost:6790&addr=localhost:6791`.
- Added metrics instrumentation using `redisotel.IstrumentMetrics`. See
[documentation](https://redis.uptrace.dev/guide/go-redis-monitoring.html)

### Changed

- Reworked hook interface and added `DialHook`.
- Replaced `redisotel.NewTracingHook` with `redisotel.InstrumentTracing`. See
[example](example/otel) and
[documentation](https://redis.uptrace.dev/guide/go-redis-monitoring.html).
- Replaced `*redis.Z` with `redis.Z` since it is small enough to be passed as value without making
an allocation.
- Renamed the option `MaxConnAge` to `ConnMaxLifetime`.
- Renamed the option `IdleTimeout` to `ConnMaxIdleTime`.
- Removed connection reaper in favor of `MaxIdleConns`.
- Removed `WithContext`.
- Removed `WithContext` since `context.Context` can be passed directly as an arg.
- Removed `Pipeline.Close` since there is no real need to explicitly manage pipeline resources and
it can be safely reused via `sync.Pool` etc. `Pipeline.Discard` is still available if you want to
reset commands for some reason.

### Fixed

- Improved and fixed pipeline retries.
- As usual, added more commands and fixed some bugs.
121 changes: 64 additions & 57 deletions cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,11 +71,8 @@ type ClusterOptions struct {
WriteTimeout time.Duration
ContextTimeoutEnabled bool

// PoolFIFO uses FIFO mode for each node connection pool GET/PUT (default LIFO).
PoolFIFO bool

// PoolSize applies per cluster node and not for the whole cluster.
PoolSize int
PoolFIFO bool
PoolSize int // applies per cluster node and not for the whole cluster
PoolTimeout time.Duration
MinIdleConns int
MaxIdleConns int
Expand Down Expand Up @@ -391,6 +388,7 @@ type clusterNodes struct {
nodes map[string]*clusterNode
activeAddrs []string
closed bool
onNewNode []func(rdb *Client)

_generation uint32 // atomic
}
Expand Down Expand Up @@ -426,6 +424,12 @@ func (c *clusterNodes) Close() error {
return firstErr
}

func (c *clusterNodes) OnNewNode(fn func(rdb *Client)) {
c.mu.Lock()
c.onNewNode = append(c.onNewNode, fn)
c.mu.Unlock()
}

func (c *clusterNodes) Addrs() ([]string, error) {
var addrs []string

Expand Down Expand Up @@ -503,6 +507,9 @@ func (c *clusterNodes) GetOrCreate(addr string) (*clusterNode, error) {
}

node = newClusterNode(c.opt, addr)
for _, fn := range c.onNewNode {
fn(node.Client)
}

c.addrs = appendIfNotExists(c.addrs, addr)
c.nodes[addr] = node
Expand Down Expand Up @@ -812,18 +819,14 @@ func (c *clusterStateHolder) ReloadOrGet(ctx context.Context) (*clusterState, er

//------------------------------------------------------------------------------

type clusterClient struct {
opt *ClusterOptions
nodes *clusterNodes
state *clusterStateHolder //nolint:structcheck
cmdsInfoCache *cmdsInfoCache //nolint:structcheck
}

// ClusterClient is a Redis Cluster client representing a pool of zero
// or more underlying connections. It's safe for concurrent use by
// multiple goroutines.
type ClusterClient struct {
*clusterClient
opt *ClusterOptions
nodes *clusterNodes
state *clusterStateHolder
cmdsInfoCache *cmdsInfoCache
cmdable
hooks
}
Expand All @@ -834,15 +837,18 @@ func NewClusterClient(opt *ClusterOptions) *ClusterClient {
opt.init()

c := &ClusterClient{
clusterClient: &clusterClient{
opt: opt,
nodes: newClusterNodes(opt),
},
opt: opt,
nodes: newClusterNodes(opt),
}

c.state = newClusterStateHolder(c.loadState)
c.cmdsInfoCache = newCmdsInfoCache(c.cmdsInfo)
c.cmdable = c.Process

c.hooks.process = c.process
c.hooks.processPipeline = c._processPipeline
c.hooks.processTxPipeline = c._processTxPipeline

return c
}

Expand Down Expand Up @@ -873,13 +879,14 @@ func (c *ClusterClient) Do(ctx context.Context, args ...interface{}) *Cmd {
}

func (c *ClusterClient) Process(ctx context.Context, cmd Cmder) error {
return c.hooks.process(ctx, cmd, c.process)
err := c.hooks.process(ctx, cmd)
cmd.SetErr(err)
return err
}

func (c *ClusterClient) process(ctx context.Context, cmd Cmder) error {
cmdInfo := c.cmdInfo(ctx, cmd.Name())
slot := c.cmdSlot(ctx, cmd)

var node *clusterNode
var ask bool
var lastErr error
Expand All @@ -899,11 +906,12 @@ func (c *ClusterClient) process(ctx context.Context, cmd Cmder) error {
}

if ask {
ask = false

pipe := node.Client.Pipeline()
_ = pipe.Process(ctx, NewCmd(ctx, "asking"))
_ = pipe.Process(ctx, cmd)
_, lastErr = pipe.Exec(ctx)
ask = false
} else {
lastErr = node.Client.Process(ctx, cmd)
}
Expand Down Expand Up @@ -958,6 +966,10 @@ func (c *ClusterClient) process(ctx context.Context, cmd Cmder) error {
return lastErr
}

func (c *ClusterClient) OnNewNode(fn func(rdb *Client)) {
c.nodes.OnNewNode(fn)
}

// ForEachMaster concurrently calls the fn on each master node in the cluster.
// It returns the first error if any.
func (c *ClusterClient) ForEachMaster(
Expand Down Expand Up @@ -1165,7 +1177,7 @@ func (c *ClusterClient) loadState(ctx context.Context) (*clusterState, error) {

func (c *ClusterClient) Pipeline() Pipeliner {
pipe := Pipeline{
exec: c.processPipeline,
exec: pipelineExecer(c.hooks.processPipeline),
}
pipe.init()
return &pipe
Expand All @@ -1175,10 +1187,6 @@ func (c *ClusterClient) Pipelined(ctx context.Context, fn func(Pipeliner) error)
return c.Pipeline().Pipelined(ctx, fn)
}

func (c *ClusterClient) processPipeline(ctx context.Context, cmds []Cmder) error {
return c.hooks.processPipeline(ctx, cmds, c._processPipeline)
}

func (c *ClusterClient) _processPipeline(ctx context.Context, cmds []Cmder) error {
cmdsMap := newCmdsMap()

Expand Down Expand Up @@ -1258,7 +1266,7 @@ func (c *ClusterClient) cmdsAreReadOnly(ctx context.Context, cmds []Cmder) bool
func (c *ClusterClient) _processPipelineNode(
ctx context.Context, node *clusterNode, cmds []Cmder, failedCmds *cmdsMap,
) {
_ = node.Client.hooks.processPipeline(ctx, cmds, func(ctx context.Context, cmds []Cmder) error {
_ = node.Client.hooks.withProcessPipelineHook(ctx, cmds, func(ctx context.Context, cmds []Cmder) error {
return node.Client.withConn(ctx, func(ctx context.Context, cn *pool.Conn) error {
if err := cn.WithWriter(c.context(ctx), c.opt.WriteTimeout, func(wr *proto.Writer) error {
return writeCmds(wr, cmds)
Expand Down Expand Up @@ -1344,7 +1352,10 @@ func (c *ClusterClient) checkMovedErr(
// TxPipeline acts like Pipeline, but wraps queued commands with MULTI/EXEC.
func (c *ClusterClient) TxPipeline() Pipeliner {
pipe := Pipeline{
exec: c.processTxPipeline,
exec: func(ctx context.Context, cmds []Cmder) error {
cmds = wrapMultiExec(ctx, cmds)
return c.hooks.processTxPipeline(ctx, cmds)
},
}
pipe.init()
return &pipe
Expand All @@ -1354,10 +1365,6 @@ func (c *ClusterClient) TxPipelined(ctx context.Context, fn func(Pipeliner) erro
return c.TxPipeline().Pipelined(ctx, fn)
}

func (c *ClusterClient) processTxPipeline(ctx context.Context, cmds []Cmder) error {
return c.hooks.processTxPipeline(ctx, cmds, c._processTxPipeline)
}

func (c *ClusterClient) _processTxPipeline(ctx context.Context, cmds []Cmder) error {
// Trim multi .. exec.
cmds = cmds[1 : len(cmds)-1]
Expand Down Expand Up @@ -1419,38 +1426,38 @@ func (c *ClusterClient) mapCmdsBySlot(ctx context.Context, cmds []Cmder) map[int
func (c *ClusterClient) _processTxPipelineNode(
ctx context.Context, node *clusterNode, cmds []Cmder, failedCmds *cmdsMap,
) {
_ = node.Client.hooks.processTxPipeline(
ctx, cmds, func(ctx context.Context, cmds []Cmder) error {
return node.Client.withConn(ctx, func(ctx context.Context, cn *pool.Conn) error {
if err := cn.WithWriter(c.context(ctx), c.opt.WriteTimeout, func(wr *proto.Writer) error {
return writeCmds(wr, cmds)
}); err != nil {
setCmdsErr(cmds, err)
return err
}

return cn.WithReader(c.context(ctx), c.opt.ReadTimeout, func(rd *proto.Reader) error {
statusCmd := cmds[0].(*StatusCmd)
// Trim multi and exec.
trimmedCmds := cmds[1 : len(cmds)-1]
cmds = wrapMultiExec(ctx, cmds)
_ = node.Client.hooks.withProcessPipelineHook(ctx, cmds, func(ctx context.Context, cmds []Cmder) error {
return node.Client.withConn(ctx, func(ctx context.Context, cn *pool.Conn) error {
if err := cn.WithWriter(c.context(ctx), c.opt.WriteTimeout, func(wr *proto.Writer) error {
return writeCmds(wr, cmds)
}); err != nil {
setCmdsErr(cmds, err)
return err
}

if err := c.txPipelineReadQueued(
ctx, rd, statusCmd, trimmedCmds, failedCmds,
); err != nil {
setCmdsErr(cmds, err)
return cn.WithReader(c.context(ctx), c.opt.ReadTimeout, func(rd *proto.Reader) error {
statusCmd := cmds[0].(*StatusCmd)
// Trim multi and exec.
trimmedCmds := cmds[1 : len(cmds)-1]

moved, ask, addr := isMovedError(err)
if moved || ask {
return c.cmdsMoved(ctx, trimmedCmds, moved, ask, addr, failedCmds)
}
if err := c.txPipelineReadQueued(
ctx, rd, statusCmd, trimmedCmds, failedCmds,
); err != nil {
setCmdsErr(cmds, err)

return err
moved, ask, addr := isMovedError(err)
if moved || ask {
return c.cmdsMoved(ctx, trimmedCmds, moved, ask, addr, failedCmds)
}

return pipelineReadCmds(rd, trimmedCmds)
})
return err
}

return pipelineReadCmds(rd, trimmedCmds)
})
})
})
}

func (c *ClusterClient) txPipelineReadQueued(
Expand Down Expand Up @@ -1742,7 +1749,7 @@ func (c *ClusterClient) cmdNode(
return state.slotMasterNode(slot)
}

func (c *clusterClient) slotReadOnlyNode(state *clusterState, slot int) (*clusterNode, error) {
func (c *ClusterClient) slotReadOnlyNode(state *clusterState, slot int) (*clusterNode, error) {
if c.opt.RouteByLatency {
return state.slotClosestNode(slot)
}
Expand Down
10 changes: 5 additions & 5 deletions cluster_commands.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import (

func (c *ClusterClient) DBSize(ctx context.Context) *IntCmd {
cmd := NewIntCmd(ctx, "dbsize")
_ = c.hooks.process(ctx, cmd, func(ctx context.Context, _ Cmder) error {
_ = c.hooks.withProcessHook(ctx, cmd, func(ctx context.Context, _ Cmder) error {
var size int64
err := c.ForEachMaster(ctx, func(ctx context.Context, master *Client) error {
n, err := master.DBSize(ctx).Result()
Expand All @@ -30,7 +30,7 @@ func (c *ClusterClient) DBSize(ctx context.Context) *IntCmd {

func (c *ClusterClient) ScriptLoad(ctx context.Context, script string) *StringCmd {
cmd := NewStringCmd(ctx, "script", "load", script)
_ = c.hooks.process(ctx, cmd, func(ctx context.Context, _ Cmder) error {
_ = c.hooks.withProcessHook(ctx, cmd, func(ctx context.Context, _ Cmder) error {
mu := &sync.Mutex{}
err := c.ForEachShard(ctx, func(ctx context.Context, shard *Client) error {
val, err := shard.ScriptLoad(ctx, script).Result()
Expand All @@ -56,7 +56,7 @@ func (c *ClusterClient) ScriptLoad(ctx context.Context, script string) *StringCm

func (c *ClusterClient) ScriptFlush(ctx context.Context) *StatusCmd {
cmd := NewStatusCmd(ctx, "script", "flush")
_ = c.hooks.process(ctx, cmd, func(ctx context.Context, _ Cmder) error {
_ = c.hooks.withProcessHook(ctx, cmd, func(ctx context.Context, _ Cmder) error {
err := c.ForEachShard(ctx, func(ctx context.Context, shard *Client) error {
return shard.ScriptFlush(ctx).Err()
})
Expand All @@ -82,8 +82,8 @@ func (c *ClusterClient) ScriptExists(ctx context.Context, hashes ...string) *Boo
result[i] = true
}

_ = c.hooks.process(ctx, cmd, func(ctx context.Context, _ Cmder) error {
mu := &sync.Mutex{}
_ = c.hooks.withProcessHook(ctx, cmd, func(ctx context.Context, _ Cmder) error {
var mu sync.Mutex
err := c.ForEachShard(ctx, func(ctx context.Context, shard *Client) error {
val, err := shard.ScriptExists(ctx, hashes...).Result()
if err != nil {
Expand Down
Loading

0 comments on commit 180f107

Please sign in to comment.