From 7f0dfe010d9fbcf8e280feebefecb9fb0e7e756f Mon Sep 17 00:00:00 2001 From: Bernd Verst <4535280+berndverst@users.noreply.github.com> Date: Thu, 15 Sep 2022 14:36:05 -0700 Subject: [PATCH 1/3] Add support for multiple redis versions Signed-off-by: Bernd Verst <4535280+berndverst@users.noreply.github.com> --- bindings/redis/redis.go | 44 ++- bindings/redis/redis_test.go | 58 +++- go.mod | 1 + go.sum | 4 +- internal/component/redis/metadata.go | 49 +-- internal/component/redis/redis.go | 156 ++++++++- lock/redis/standalone.go | 129 ++++++-- pubsub/redis/redis.go | 429 ++++++++++++++++++------- pubsub/redis/redis_test.go | 2 +- state/redis/redis.go | 207 +++++++++--- state/redis/redis_query.go | 20 +- state/redis/redis_test.go | 414 +++++++++++++++++++++--- tests/certification/state/redis/go.mod | 1 + tests/certification/state/redis/go.sum | 4 +- 14 files changed, 1226 insertions(+), 292 deletions(-) diff --git a/bindings/redis/redis.go b/bindings/redis/redis.go index aa70213ab2..5296678588 100644 --- a/bindings/redis/redis.go +++ b/bindings/redis/redis.go @@ -18,7 +18,8 @@ import ( "errors" "fmt" - "github.com/go-redis/redis/v8" + v8 "github.com/go-redis/redis/v8" + v9 "github.com/go-redis/redis/v9" "github.com/dapr/components-contrib/bindings" rediscomponent "github.com/dapr/components-contrib/internal/component/redis" @@ -27,9 +28,11 @@ import ( // Redis is a redis output binding. type Redis struct { - client redis.UniversalClient + clientv8 v8.UniversalClient + clientv9 v9.UniversalClient clientSettings *rediscomponent.Settings logger logger.Logger + legacyRedis bool ctx context.Context cancel context.CancelFunc @@ -42,14 +45,24 @@ func NewRedis(logger logger.Logger) bindings.OutputBinding { // Init performs metadata parsing and connection creation. func (r *Redis) Init(meta bindings.Metadata) (err error) { - r.client, r.clientSettings, err = rediscomponent.ParseClientFromProperties(meta.Properties, nil) + if rediscomponent.IsLegacyRedisVersion(meta.Properties) { + r.legacyRedis = true + r.clientv8, r.clientSettings, err = rediscomponent.ParseClientv8FromProperties(meta.Properties, nil) + } else { + r.legacyRedis = false + r.clientv9, r.clientSettings, err = rediscomponent.ParseClientv9FromProperties(meta.Properties, nil) + } if err != nil { return err } r.ctx, r.cancel = context.WithCancel(context.Background()) - _, err = r.client.Ping(r.ctx).Result() + if r.legacyRedis { + _, err = r.clientv8.Ping(r.ctx).Result() + } else { + _, err = r.clientv9.Ping(r.ctx).Result() + } if err != nil { return fmt.Errorf("redis binding: error connecting to redis at %s: %s", r.clientSettings.Host, err) } @@ -58,8 +71,14 @@ func (r *Redis) Init(meta bindings.Metadata) (err error) { } func (r *Redis) Ping() error { - if _, err := r.client.Ping(r.ctx).Result(); err != nil { - return fmt.Errorf("redis binding: error connecting to redis at %s: %s", r.clientSettings.Host, err) + if r.legacyRedis { + if _, err := r.clientv8.Ping(r.ctx).Result(); err != nil { + return fmt.Errorf("redis binding: error connecting to redis at %s: %s", r.clientSettings.Host, err) + } + } else { + if _, err := r.clientv9.Ping(r.ctx).Result(); err != nil { + return fmt.Errorf("redis binding: error connecting to redis at %s: %s", r.clientSettings.Host, err) + } } return nil @@ -72,7 +91,12 @@ func (r *Redis) Operations() []bindings.OperationKind { func (r *Redis) Invoke(ctx context.Context, req *bindings.InvokeRequest) (*bindings.InvokeResponse, error) { if val, ok := req.Metadata["key"]; ok && val != "" { key := val - _, err := r.client.Do(ctx, "SET", key, req.Data).Result() + var err error + if r.legacyRedis { + _, err = r.clientv8.Do(ctx, "SET", key, req.Data).Result() + } else { + _, err = r.clientv9.Do(ctx, "SET", key, req.Data).Result() + } if err != nil { return nil, err } @@ -86,5 +110,9 @@ func (r *Redis) Invoke(ctx context.Context, req *bindings.InvokeRequest) (*bindi func (r *Redis) Close() error { r.cancel() - return r.client.Close() + if r.legacyRedis { + return r.clientv8.Close() + } else { + return r.clientv9.Close() + } } diff --git a/bindings/redis/redis_test.go b/bindings/redis/redis_test.go index f57d819931..d3d4a784e0 100644 --- a/bindings/redis/redis_test.go +++ b/bindings/redis/redis_test.go @@ -18,7 +18,8 @@ import ( "testing" miniredis "github.com/alicebob/miniredis/v2" - "github.com/go-redis/redis/v8" + v8 "github.com/go-redis/redis/v8" + v9 "github.com/go-redis/redis/v9" "github.com/stretchr/testify/assert" "github.com/dapr/components-contrib/bindings" @@ -30,18 +31,19 @@ const ( testKey = "test" ) -func TestInvoke(t *testing.T) { - s, c := setupMiniredis() +func TestInvokev8(t *testing.T) { + s, c := setupMiniredisv8() defer s.Close() bind := &Redis{ - client: c, - logger: logger.NewLogger("test"), + legacyRedis: true, + clientv8: c, + logger: logger.NewLogger("test"), } bind.ctx, bind.cancel = context.WithCancel(context.Background()) _, err := c.Do(context.Background(), "GET", testKey).Result() - assert.Equal(t, redis.Nil, err) + assert.Equal(t, v8.Nil, err) bindingRes, err := bind.Invoke(context.TODO(), &bindings.InvokeRequest{ Data: []byte(testData), @@ -55,15 +57,53 @@ func TestInvoke(t *testing.T) { assert.Equal(t, true, getRes == testData) } -func setupMiniredis() (*miniredis.Miniredis, *redis.Client) { +func TestInvokev9(t *testing.T) { + s, c := setupMiniredisv9() + defer s.Close() + + bind := &Redis{ + clientv9: c, + logger: logger.NewLogger("test"), + } + bind.ctx, bind.cancel = context.WithCancel(context.Background()) + + _, err := c.Do(context.Background(), "GET", testKey).Result() + assert.Equal(t, v9.Nil, err) + + bindingRes, err := bind.Invoke(context.TODO(), &bindings.InvokeRequest{ + Data: []byte(testData), + Metadata: map[string]string{"key": testKey}, + }) + assert.Equal(t, nil, err) + assert.Equal(t, true, bindingRes == nil) + + getRes, err := c.Do(context.Background(), "GET", testKey).Result() + assert.Equal(t, nil, err) + assert.Equal(t, true, getRes == testData) +} + +func setupMiniredisv8() (*miniredis.Miniredis, *v8.Client) { + s, err := miniredis.Run() + if err != nil { + panic(err) + } + opts := &v8.Options{ + Addr: s.Addr(), + DB: 0, + } + + return s, v8.NewClient(opts) +} + +func setupMiniredisv9() (*miniredis.Miniredis, *v9.Client) { s, err := miniredis.Run() if err != nil { panic(err) } - opts := &redis.Options{ + opts := &v9.Options{ Addr: s.Addr(), DB: 0, } - return s, redis.NewClient(opts) + return s, v9.NewClient(opts) } diff --git a/go.mod b/go.mod index b0e55db135..36843d7598 100644 --- a/go.mod +++ b/go.mod @@ -447,6 +447,7 @@ require ( github.com/eapache/queue v1.1.0 // indirect github.com/emirpasic/gods v1.12.0 // indirect github.com/go-ozzo/ozzo-validation/v4 v4.3.0 // indirect + github.com/go-redis/redis/v9 v9.0.0-beta.2 github.com/go-stack/stack v1.8.0 // indirect github.com/gobwas/glob v0.2.3 // indirect github.com/godbus/dbus v0.0.0-20190726142602-4481cbc300e2 // indirect diff --git a/go.sum b/go.sum index 52605a20ef..4d9ca5cabe 100644 --- a/go.sum +++ b/go.sum @@ -936,6 +936,8 @@ github.com/go-playground/validator/v10 v10.11.0 h1:0W+xRM511GY47Yy3bZUbJVitCNg2B github.com/go-playground/validator/v10 v10.11.0/go.mod h1:i+3WkQ1FvaUjjxh1kSvIA4dMGDBiPU55YFDl0WbKdWU= github.com/go-redis/redis/v8 v8.11.5 h1:AcZZR7igkdvfVmQTPnu9WE37LRrO/YrBH5zWyjDC0oI= github.com/go-redis/redis/v8 v8.11.5/go.mod h1:gREzHqY1hg6oD9ngVRbLStwAWKhA0FEgq8Jd4h5lpwo= +github.com/go-redis/redis/v9 v9.0.0-beta.2 h1:ZSr84TsnQyKMAg8gnV+oawuQezeJR11/09THcWCQzr4= +github.com/go-redis/redis/v9 v9.0.0-beta.2/go.mod h1:Bldcd/M/bm9HbnNPi/LUtYBSD8ttcZYBMupwMXhdU0o= github.com/go-resty/resty/v2 v2.7.0/go.mod h1:9PWDzw47qPphMRFfhsyk0NnSgvluHcljSMVIq3w7q0I= github.com/go-sql-driver/mysql v1.4.0/go.mod h1:zAC/RDZ24gD3HViQzih4MyKcchzm+sOG5ZlKdlhCg5w= github.com/go-sql-driver/mysql v1.5.0/go.mod h1:DCzpHaOWr8IXmIStZouvnhqoel9Qv2LBy8hT2VhHyBg= @@ -2437,7 +2439,7 @@ github.com/onsi/gomega v1.13.0/go.mod h1:lRk9szgn8TxENtWd0Tp4c3wjlRfMTMH27I+3Je4 github.com/onsi/gomega v1.15.0/go.mod h1:cIuvLEne0aoVhAgh/O6ac0Op8WWw9H6eYCriF+tEHG0= github.com/onsi/gomega v1.16.0/go.mod h1:HnhC7FXeEQY45zxNK3PPoIUhzk/80Xly9PcubAlGdZY= github.com/onsi/gomega v1.17.0/go.mod h1:HnhC7FXeEQY45zxNK3PPoIUhzk/80Xly9PcubAlGdZY= -github.com/onsi/gomega v1.18.1 h1:M1GfJqGRrBrrGGsbxzV5dqM2U2ApXefZCQpkukxYRLE= +github.com/onsi/gomega v1.20.0 h1:8W0cWlwFkflGPLltQvLRB7ZVD5HuP6ng320w2IS245Q= github.com/op/go-logging v0.0.0-20160315200505-970db520ece7/go.mod h1:HzydrMdWErDVzsI23lYNej1Htcns9BCg93Dk0bBINWk= github.com/open-policy-agent/opa v0.42.0 h1:CTJ240+A+sZEYSuLDYiT5l8Q3lcQf2eZc53jCbWNjZE= github.com/open-policy-agent/opa v0.42.0/go.mod h1:MrmoTi/BsKWT58kXlVayBb+rYVeaMwuBm3nYAN3923s= diff --git a/internal/component/redis/metadata.go b/internal/component/redis/metadata.go index 5c4fd02a8e..07a2c086eb 100644 --- a/internal/component/redis/metadata.go +++ b/internal/component/redis/metadata.go @@ -14,63 +14,30 @@ limitations under the License. package redis import ( - "fmt" - "strconv" "time" + + "github.com/dapr/components-contrib/metadata" ) const ( - maxRetries = "maxRetries" - maxRetryBackoff = "maxRetryBackoff" - ttlInSeconds = "ttlInSeconds" - queryIndexes = "queryIndexes" - defaultBase = 10 - defaultBitSize = 0 defaultMaxRetries = 3 defaultMaxRetryBackoff = time.Second * 2 ) type Metadata struct { - MaxRetries int - MaxRetryBackoff time.Duration - TTLInSeconds *int - QueryIndexes string + MaxRetries int `json:"maxRetries,string,omitempty"` + MaxRetryBackoff time.Duration `json:"maxRetryBackoff,string,omitempty"` + TTLInSeconds *int `json:"ttlInSeconds,string,omitempty"` + QueryIndexes string `json:"queryIndexes,omitempty"` + RedisVersion string `json:"redisVersion,omitempty"` } func ParseRedisMetadata(properties map[string]string) (Metadata, error) { m := Metadata{} m.MaxRetries = defaultMaxRetries - if val, ok := properties[maxRetries]; ok && val != "" { - parsedVal, err := strconv.ParseInt(val, defaultBase, defaultBitSize) - if err != nil { - return m, fmt.Errorf("redis store error: can't parse maxRetries field: %s", err) - } - m.MaxRetries = int(parsedVal) - } - m.MaxRetryBackoff = defaultMaxRetryBackoff - if val, ok := properties[maxRetryBackoff]; ok && val != "" { - parsedVal, err := strconv.ParseInt(val, defaultBase, defaultBitSize) - if err != nil { - return m, fmt.Errorf("redis store error: can't parse maxRetryBackoff field: %s", err) - } - m.MaxRetryBackoff = time.Duration(parsedVal) - } - - if val, ok := properties[ttlInSeconds]; ok && val != "" { - parsedVal, err := strconv.ParseInt(val, defaultBase, defaultBitSize) - if err != nil { - return m, fmt.Errorf("redis store error: can't parse ttlInSeconds field: %s", err) - } - intVal := int(parsedVal) - m.TTLInSeconds = &intVal - } else { - m.TTLInSeconds = nil - } + metadata.DecodeMetadata(properties, &m) - if val, ok := properties[queryIndexes]; ok && val != "" { - m.QueryIndexes = val - } return m, nil } diff --git a/internal/component/redis/redis.go b/internal/component/redis/redis.go index 733caed925..5f06c7d02c 100644 --- a/internal/component/redis/redis.go +++ b/internal/component/redis/redis.go @@ -19,15 +19,17 @@ import ( "strings" "time" - "github.com/go-redis/redis/v8" + v8 "github.com/go-redis/redis/v8" + v9 "github.com/go-redis/redis/v9" ) const ( - ClusterType = "cluster" - NodeType = "node" + ClusterType = "cluster" + NodeType = "node" + RedisVersionKey = "redisVersion" ) -func ParseClientFromProperties(properties map[string]string, defaultSettings *Settings) (client redis.UniversalClient, settings *Settings, err error) { +func ParseClientv8FromProperties(properties map[string]string, defaultSettings *Settings) (client v8.UniversalClient, settings *Settings, err error) { if defaultSettings == nil { settings = &Settings{} } else { @@ -38,17 +40,32 @@ func ParseClientFromProperties(properties map[string]string, defaultSettings *Se return nil, nil, fmt.Errorf("redis client configuration error: %w", err) } if settings.Failover { - return newFailoverClient(settings), settings, nil + return newFailoverClientv8(settings), settings, nil } + return newClientv8(settings), settings, nil +} - return newClient(settings), settings, nil +func ParseClientv9FromProperties(properties map[string]string, defaultSettings *Settings) (client v9.UniversalClient, settings *Settings, err error) { + if defaultSettings == nil { + settings = &Settings{} + } else { + settings = defaultSettings + } + err = settings.Decode(properties) + if err != nil { + return nil, nil, fmt.Errorf("redis client configuration error: %w", err) + } + if settings.Failover { + return newFailoverClientv9(settings), settings, nil + } + return newClientv9(settings), settings, nil } -func newFailoverClient(s *Settings) redis.UniversalClient { +func newFailoverClientv8(s *Settings) v8.UniversalClient { if s == nil { return nil } - opts := &redis.FailoverOptions{ + opts := &v8.FailoverOptions{ DB: s.DB, MasterName: s.SentinelMasterName, SentinelAddrs: []string{s.Host}, @@ -78,18 +95,57 @@ func newFailoverClient(s *Settings) redis.UniversalClient { if s.RedisType == ClusterType { opts.SentinelAddrs = strings.Split(s.Host, ",") - return redis.NewFailoverClusterClient(opts) + return v8.NewFailoverClusterClient(opts) + } + + return v8.NewFailoverClient(opts) +} + +func newFailoverClientv9(s *Settings) v9.UniversalClient { + if s == nil { + return nil + } + opts := &v9.FailoverOptions{ + DB: s.DB, + MasterName: s.SentinelMasterName, + SentinelAddrs: []string{s.Host}, + Password: s.Password, + Username: s.Username, + MaxRetries: s.RedisMaxRetries, + MaxRetryBackoff: time.Duration(s.RedisMaxRetryInterval), + MinRetryBackoff: time.Duration(s.RedisMinRetryInterval), + DialTimeout: time.Duration(s.DialTimeout), + ReadTimeout: time.Duration(s.ReadTimeout), + WriteTimeout: time.Duration(s.WriteTimeout), + PoolSize: s.PoolSize, + ConnMaxLifetime: time.Duration(s.MaxConnAge), + ConnMaxIdleTime: time.Duration(s.IdleTimeout), + MinIdleConns: s.MinIdleConns, + PoolTimeout: time.Duration(s.PoolTimeout), + } + + /* #nosec */ + if s.EnableTLS { + opts.TLSConfig = &tls.Config{ + InsecureSkipVerify: s.EnableTLS, + } + } + + if s.RedisType == ClusterType { + opts.SentinelAddrs = strings.Split(s.Host, ",") + + return v9.NewFailoverClusterClient(opts) } - return redis.NewFailoverClient(opts) + return v9.NewFailoverClient(opts) } -func newClient(s *Settings) redis.UniversalClient { +func newClientv8(s *Settings) v8.UniversalClient { if s == nil { return nil } if s.RedisType == ClusterType { - options := &redis.ClusterOptions{ + options := &v8.ClusterOptions{ Addrs: strings.Split(s.Host, ","), Password: s.Password, Username: s.Username, @@ -113,10 +169,10 @@ func newClient(s *Settings) redis.UniversalClient { } } - return redis.NewClusterClient(options) + return v8.NewClusterClient(options) } - options := &redis.Options{ + options := &v8.Options{ Addr: s.Host, Password: s.Password, Username: s.Username, @@ -142,5 +198,75 @@ func newClient(s *Settings) redis.UniversalClient { } } - return redis.NewClient(options) + return v8.NewClient(options) +} + +func newClientv9(s *Settings) v9.UniversalClient { + if s == nil { + return nil + } + if s.RedisType == ClusterType { + options := &v9.ClusterOptions{ + Addrs: strings.Split(s.Host, ","), + Password: s.Password, + Username: s.Username, + MaxRetries: s.RedisMaxRetries, + MaxRetryBackoff: time.Duration(s.RedisMaxRetryInterval), + MinRetryBackoff: time.Duration(s.RedisMinRetryInterval), + DialTimeout: time.Duration(s.DialTimeout), + ReadTimeout: time.Duration(s.ReadTimeout), + WriteTimeout: time.Duration(s.WriteTimeout), + PoolSize: s.PoolSize, + ConnMaxLifetime: time.Duration(s.MaxConnAge), + MinIdleConns: s.MinIdleConns, + PoolTimeout: time.Duration(s.PoolTimeout), + ConnMaxIdleTime: time.Duration(s.IdleTimeout), + } + /* #nosec */ + if s.EnableTLS { + options.TLSConfig = &tls.Config{ + InsecureSkipVerify: s.EnableTLS, + } + } + + return v9.NewClusterClient(options) + } + + options := &v9.Options{ + Addr: s.Host, + Password: s.Password, + Username: s.Username, + DB: s.DB, + MaxRetries: s.RedisMaxRetries, + MaxRetryBackoff: time.Duration(s.RedisMaxRetryInterval), + MinRetryBackoff: time.Duration(s.RedisMinRetryInterval), + DialTimeout: time.Duration(s.DialTimeout), + ReadTimeout: time.Duration(s.ReadTimeout), + WriteTimeout: time.Duration(s.WriteTimeout), + PoolSize: s.PoolSize, + ConnMaxLifetime: time.Duration(s.MaxConnAge), + MinIdleConns: s.MinIdleConns, + PoolTimeout: time.Duration(s.PoolTimeout), + ConnMaxIdleTime: time.Duration(s.IdleTimeout), + } + + /* #nosec */ + if s.EnableTLS { + options.TLSConfig = &tls.Config{ + InsecureSkipVerify: s.EnableTLS, + } + } + + return v9.NewClient(options) +} + +func IsLegacyRedisVersion(props map[string]string) bool { + if val, ok := props[RedisVersionKey]; !ok { + return true + } else { + if val == "" || (strings.Trim(val, "vV ") < "7") { + return true + } + return false + } } diff --git a/lock/redis/standalone.go b/lock/redis/standalone.go index 8a4a0c68dc..88885a4e81 100644 --- a/lock/redis/standalone.go +++ b/lock/redis/standalone.go @@ -20,7 +20,8 @@ import ( "strings" "time" - "github.com/go-redis/redis/v8" + v8 "github.com/go-redis/redis/v8" + v9 "github.com/go-redis/redis/v9" rediscomponent "github.com/dapr/components-contrib/internal/component/redis" "github.com/dapr/components-contrib/lock" @@ -35,9 +36,11 @@ const ( // Standalone Redis lock store.Any fail-over related features are not supported,such as Sentinel and Redis Cluster. type StandaloneRedisLock struct { - client redis.UniversalClient + clientv8 v8.UniversalClient + clientv9 v9.UniversalClient clientSettings *rediscomponent.Settings metadata rediscomponent.Metadata + legacyRedis bool logger logger.Logger @@ -63,6 +66,11 @@ func (r *StandaloneRedisLock) InitLockStore(metadata lock.Metadata) error { return err } r.metadata = m + if rediscomponent.IsLegacyRedisVersion(metadata.Properties) { + r.legacyRedis = true + } else { + r.legacyRedis = false + } // must have `redisHost` if metadata.Properties["redisHost"] == "" { return fmt.Errorf("[standaloneRedisLock]: InitLockStore error. redisHost is empty") @@ -73,14 +81,24 @@ func (r *StandaloneRedisLock) InitLockStore(metadata lock.Metadata) error { } // 2. construct client defaultSettings := rediscomponent.Settings{RedisMaxRetries: m.MaxRetries, RedisMaxRetryInterval: rediscomponent.Duration(m.MaxRetryBackoff)} - r.client, r.clientSettings, err = rediscomponent.ParseClientFromProperties(metadata.Properties, &defaultSettings) + if r.legacyRedis { + r.clientv8, r.clientSettings, err = rediscomponent.ParseClientv8FromProperties(metadata.Properties, &defaultSettings) + } else { + r.clientv9, r.clientSettings, err = rediscomponent.ParseClientv9FromProperties(metadata.Properties, &defaultSettings) + } if err != nil { return err } r.ctx, r.cancel = context.WithCancel(context.Background()) // 3. connect to redis - if _, err = r.client.Ping(r.ctx).Result(); err != nil { - return fmt.Errorf("[standaloneRedisLock]: error connecting to redis at %s: %s", r.clientSettings.Host, err) + if r.legacyRedis { + if _, err = r.clientv8.Ping(r.ctx).Result(); err != nil { + return fmt.Errorf("[standaloneRedisLock]: error connecting to redis at %s: %s", r.clientSettings.Host, err) + } + } else { + if _, err = r.clientv9.Ping(r.ctx).Result(); err != nil { + return fmt.Errorf("[standaloneRedisLock]: error connecting to redis at %s: %s", r.clientSettings.Host, err) + } } // no replica replicas, err := r.getConnectedSlaves() @@ -104,7 +122,13 @@ func needFailover(properties map[string]string) bool { } func (r *StandaloneRedisLock) getConnectedSlaves() (int, error) { - res, err := r.client.Do(r.ctx, "INFO", "replication").Result() + var res interface{} + var err error + if r.legacyRedis { + res, err = r.clientv8.Do(r.ctx, "INFO", "replication").Result() + } else { + res, err = r.clientv9.Do(r.ctx, "INFO", "replication").Result() + } if err != nil { return 0, err } @@ -134,36 +158,71 @@ func (r *StandaloneRedisLock) parseConnectedSlaves(res string) int { // Try to acquire a redis lock. func (r *StandaloneRedisLock) TryLock(req *lock.TryLockRequest) (*lock.TryLockResponse, error) { - // 1.Setting redis expiration time - nx := r.client.SetNX(r.ctx, req.ResourceID, req.LockOwner, time.Second*time.Duration(req.ExpiryInSeconds)) - if nx == nil { - return &lock.TryLockResponse{}, fmt.Errorf("[standaloneRedisLock]: SetNX returned nil.ResourceID: %s", req.ResourceID) - } - // 2. check error - err := nx.Err() - if err != nil { - return &lock.TryLockResponse{}, err - } + if r.legacyRedis { + // 1.Setting redis expiration time + nx := r.clientv8.SetNX(r.ctx, req.ResourceID, req.LockOwner, time.Second*time.Duration(req.ExpiryInSeconds)) + if nx == nil { + return &lock.TryLockResponse{}, fmt.Errorf("[standaloneRedisLock]: SetNX returned nil.ResourceID: %s", req.ResourceID) + } + // 2. check error + err := nx.Err() + if err != nil { + return &lock.TryLockResponse{}, err + } - return &lock.TryLockResponse{ - Success: nx.Val(), - }, nil + return &lock.TryLockResponse{ + Success: nx.Val(), + }, nil + } else { + // 1.Setting redis expiration time + nx := r.clientv9.SetNX(r.ctx, req.ResourceID, req.LockOwner, time.Second*time.Duration(req.ExpiryInSeconds)) + if nx == nil { + return &lock.TryLockResponse{}, fmt.Errorf("[standaloneRedisLock]: SetNX returned nil.ResourceID: %s", req.ResourceID) + } + // 2. check error + err := nx.Err() + if err != nil { + return &lock.TryLockResponse{}, err + } + + return &lock.TryLockResponse{ + Success: nx.Val(), + }, nil + } } // Try to release a redis lock. func (r *StandaloneRedisLock) Unlock(req *lock.UnlockRequest) (*lock.UnlockResponse, error) { - // 1. delegate to client.eval lua script - eval := r.client.Eval(r.ctx, unlockScript, []string{req.ResourceID}, req.LockOwner) - // 2. check error - if eval == nil { - return newInternalErrorUnlockResponse(), fmt.Errorf("[standaloneRedisLock]: Eval unlock script returned nil.ResourceID: %s", req.ResourceID) - } - err := eval.Err() - if err != nil { - return newInternalErrorUnlockResponse(), err + var i int + var err error + if r.legacyRedis { + // 1. delegate to client.eval lua script + eval := r.clientv8.Eval(r.ctx, unlockScript, []string{req.ResourceID}, req.LockOwner) + // 2. check error + if eval == nil { + return newInternalErrorUnlockResponse(), fmt.Errorf("[standaloneRedisLock]: Eval unlock script returned nil.ResourceID: %s", req.ResourceID) + } + err := eval.Err() + if err != nil { + return newInternalErrorUnlockResponse(), err + } + // 3. parse result + i, err = eval.Int() + } else { + // 1. delegate to client.eval lua script + eval := r.clientv9.Eval(r.ctx, unlockScript, []string{req.ResourceID}, req.LockOwner) + // 2. check error + if eval == nil { + return newInternalErrorUnlockResponse(), fmt.Errorf("[standaloneRedisLock]: Eval unlock script returned nil.ResourceID: %s", req.ResourceID) + } + err := eval.Err() + if err != nil { + return newInternalErrorUnlockResponse(), err + } + // 3. parse result + i, err = eval.Int() } - // 3. parse result - i, err := eval.Int() + status := lock.InternalError if err != nil { return &lock.UnlockResponse{ @@ -193,8 +252,14 @@ func (r *StandaloneRedisLock) Close() error { if r.cancel != nil { r.cancel() } - if r.client != nil { - return r.client.Close() + if r.legacyRedis { + if r.clientv8 != nil { + return r.clientv8.Close() + } + } else { + if r.clientv9 != nil { + return r.clientv9.Close() + } } return nil } diff --git a/pubsub/redis/redis.go b/pubsub/redis/redis.go index 15ea43cc53..8fe2b404c1 100644 --- a/pubsub/redis/redis.go +++ b/pubsub/redis/redis.go @@ -21,6 +21,8 @@ import ( "time" "github.com/go-redis/redis/v8" + v8 "github.com/go-redis/redis/v8" + v9 "github.com/go-redis/redis/v9" rediscomponent "github.com/dapr/components-contrib/internal/component/redis" "github.com/dapr/components-contrib/pubsub" @@ -45,7 +47,9 @@ const ( // on the mechanics of Redis Streams. type redisStreams struct { metadata metadata - client redis.UniversalClient + clientv8 v8.UniversalClient + clientv9 v9.UniversalClient + legacyRedis bool clientSettings *rediscomponent.Settings logger logger.Logger @@ -137,15 +141,29 @@ func (r *redisStreams) Init(metadata pubsub.Metadata) error { return err } r.metadata = m - r.client, r.clientSettings, err = rediscomponent.ParseClientFromProperties(metadata.Properties, nil) + if rediscomponent.IsLegacyRedisVersion(metadata.Properties) { + r.legacyRedis = true + r.clientv8, r.clientSettings, err = rediscomponent.ParseClientv8FromProperties(metadata.Properties, nil) + } else { + r.legacyRedis = false + r.clientv9, r.clientSettings, err = rediscomponent.ParseClientv9FromProperties(metadata.Properties, nil) + } + if err != nil { return err } r.ctx, r.cancel = context.WithCancel(context.Background()) - if _, err = r.client.Ping(r.ctx).Result(); err != nil { - return fmt.Errorf("redis streams: error connecting to redis at %s: %s", r.clientSettings.Host, err) + // redis 6 and below + if r.legacyRedis { + if _, err = r.clientv8.Ping(r.ctx).Result(); err != nil { + return fmt.Errorf("redis streams: error connecting to redis at %s: %s", r.clientSettings.Host, err) + } + } else { + if _, err = r.clientv9.Ping(r.ctx).Result(); err != nil { + return fmt.Errorf("redis streams: error connecting to redis at %s: %s", r.clientSettings.Host, err) + } } r.queue = make(chan redisMessageWrapper, int(r.metadata.queueDepth)) @@ -157,20 +175,40 @@ func (r *redisStreams) Init(metadata pubsub.Metadata) error { } func (r *redisStreams) Publish(req *pubsub.PublishRequest) error { - _, err := r.client.XAdd(r.ctx, &redis.XAddArgs{ - Stream: req.Topic, - MaxLenApprox: r.metadata.maxLenApprox, - Values: map[string]interface{}{"data": req.Data}, - }).Result() - if err != nil { - return fmt.Errorf("redis streams: error from publish: %s", err) + var err error + // redis 6 and below + if r.legacyRedis { + _, err = r.clientv8.XAdd(r.ctx, &v8.XAddArgs{ + Stream: req.Topic, + MaxLenApprox: r.metadata.maxLenApprox, + Values: map[string]interface{}{"data": req.Data}, + }).Result() + if err != nil { + return fmt.Errorf("redis streams: error from publish: %s", err) + } + } else { + _, err = r.clientv9.XAdd(r.ctx, &v9.XAddArgs{ + Stream: req.Topic, + MaxLen: r.metadata.maxLenApprox, + Approx: true, + Values: map[string]interface{}{"data": req.Data}, + }).Result() + if err != nil { + return fmt.Errorf("redis streams: error from publish: %s", err) + } } return nil } func (r *redisStreams) Subscribe(ctx context.Context, req pubsub.SubscribeRequest, handler pubsub.Handler) error { - err := r.client.XGroupCreateMkStream(ctx, req.Topic, r.metadata.consumerID, "0").Err() + var err error + // redis 6 and below + if r.legacyRedis { + err = r.clientv8.XGroupCreateMkStream(ctx, req.Topic, r.metadata.consumerID, "0").Err() + } else { + err = r.clientv9.XGroupCreateMkStream(ctx, req.Topic, r.metadata.consumerID, "0").Err() + } // Ignore BUSYGROUP errors if err != nil && err.Error() != "BUSYGROUP Consumer Group name already exists" { r.logger.Errorf("redis streams: %s", err) @@ -183,12 +221,12 @@ func (r *redisStreams) Subscribe(ctx context.Context, req pubsub.SubscribeReques return nil } -// enqueueMessages is a shared function that funnels new messages (via polling) +// enqueueMessagesv8 is a shared function that funnels new messages (via polling) // and redelivered messages (via reclaiming) to a channel where workers can // pick them up for processing. -func (r *redisStreams) enqueueMessages(ctx context.Context, stream string, handler pubsub.Handler, msgs []redis.XMessage) { +func (r *redisStreams) enqueueMessagesv8(ctx context.Context, stream string, handler pubsub.Handler, msgs []v8.XMessage) { for _, msg := range msgs { - rmsg := createRedisMessageWrapper(ctx, stream, handler, msg) + rmsg := createRedisMessageWrapperv8(ctx, stream, handler, msg) select { // Might block if the queue is full so we need the ctx.Done below. @@ -201,9 +239,51 @@ func (r *redisStreams) enqueueMessages(ctx context.Context, stream string, handl } } -// createRedisMessageWrapper encapsulates the Redis message, message identifier, and handler +// enqueueMessagesv9 is a shared function that funnels new messages (via polling) +// and redelivered messages (via reclaiming) to a channel where workers can +// pick them up for processing. +func (r *redisStreams) enqueueMessagesv9(ctx context.Context, stream string, handler pubsub.Handler, msgs []v9.XMessage) { + for _, msg := range msgs { + rmsg := createRedisMessageWrapperv9(ctx, stream, handler, msg) + + select { + // Might block if the queue is full so we need the ctx.Done below. + case r.queue <- rmsg: + + // Handle cancelation + case <-ctx.Done(): + return + } + } +} + +// createRedisMessageWrapperv8 encapsulates the Redis message, message identifier, and handler +// in `redisMessage` for processing. +func createRedisMessageWrapperv8(ctx context.Context, stream string, handler pubsub.Handler, msg v8.XMessage) redisMessageWrapper { + var data []byte + if dataValue, exists := msg.Values["data"]; exists && dataValue != nil { + switch v := dataValue.(type) { + case string: + data = []byte(v) + case []byte: + data = v + } + } + + return redisMessageWrapper{ + ctx: ctx, + message: pubsub.NewMessage{ + Topic: stream, + Data: data, + }, + messageID: msg.ID, + handler: handler, + } +} + +// createRedisMessageWrapperv9 encapsulates the Redis message, message identifier, and handler // in `redisMessage` for processing. -func createRedisMessageWrapper(ctx context.Context, stream string, handler pubsub.Handler, msg redis.XMessage) redisMessageWrapper { +func createRedisMessageWrapperv9(ctx context.Context, stream string, handler pubsub.Handler, msg v9.XMessage) redisMessageWrapper { var data []byte if dataValue, exists := msg.Values["data"]; exists && dataValue != nil { switch v := dataValue.(type) { @@ -259,10 +339,18 @@ func (r *redisStreams) processMessage(msg redisMessageWrapper) error { } // Use the background context in case subscriptionCtx is already closed - if err := r.client.XAck(context.Background(), msg.message.Topic, r.metadata.consumerID, msg.messageID).Err(); err != nil { - r.logger.Errorf("Error acknowledging Redis message %s: %v", msg.messageID, err) + if r.legacyRedis { + if err := r.clientv8.XAck(context.Background(), msg.message.Topic, r.metadata.consumerID, msg.messageID).Err(); err != nil { + r.logger.Errorf("Error acknowledging Redis message %s: %v", msg.messageID, err) - return err + return err + } + } else { + if err := r.clientv9.XAck(context.Background(), msg.message.Topic, r.metadata.consumerID, msg.messageID).Err(); err != nil { + r.logger.Errorf("Error acknowledging Redis message %s: %v", msg.messageID, err) + + return err + } } return nil @@ -278,24 +366,46 @@ func (r *redisStreams) pollNewMessagesLoop(ctx context.Context, stream string, h } // Read messages - streams, err := r.client.XReadGroup(ctx, &redis.XReadGroupArgs{ - Group: r.metadata.consumerID, - Consumer: r.metadata.consumerID, - Streams: []string{stream, ">"}, - Count: int64(r.metadata.queueDepth), - Block: time.Duration(r.clientSettings.ReadTimeout), - }).Result() - if err != nil { - if !errors.Is(err, redis.Nil) && err != context.Canceled { - r.logger.Errorf("redis streams: error reading from stream %s: %s", stream, err) + if r.legacyRedis { + streams, err := r.clientv8.XReadGroup(ctx, &v8.XReadGroupArgs{ + Group: r.metadata.consumerID, + Consumer: r.metadata.consumerID, + Streams: []string{stream, ">"}, + Count: int64(r.metadata.queueDepth), + Block: time.Duration(r.clientSettings.ReadTimeout), + }).Result() + if err != nil { + if !errors.Is(err, redis.Nil) && err != context.Canceled { + r.logger.Errorf("redis streams: error reading from stream %s: %s", stream, err) + } + continue + } + + // Enqueue messages for the returned streams + for _, s := range streams { + r.enqueueMessagesv8(ctx, s.Stream, handler, s.Messages) + } + } else { + streams, err := r.clientv9.XReadGroup(ctx, &v9.XReadGroupArgs{ + Group: r.metadata.consumerID, + Consumer: r.metadata.consumerID, + Streams: []string{stream, ">"}, + Count: int64(r.metadata.queueDepth), + Block: time.Duration(r.clientSettings.ReadTimeout), + }).Result() + if err != nil { + if !errors.Is(err, redis.Nil) && err != context.Canceled { + r.logger.Errorf("redis streams: error reading from stream %s: %s", stream, err) + } + continue } - continue - } - // Enqueue messages for the returned streams - for _, s := range streams { - r.enqueueMessages(ctx, s.Stream, handler, s.Messages) + // Enqueue messages for the returned streams + for _, s := range streams { + r.enqueueMessagesv9(ctx, s.Stream, handler, s.Messages) + } } + } } @@ -327,66 +437,131 @@ func (r *redisStreams) reclaimPendingMessagesLoop(ctx context.Context, stream st // reclaimPendingMessages handles reclaiming messages that previously failed to process and // funneling them to the message channel by calling `enqueueMessages`. func (r *redisStreams) reclaimPendingMessages(ctx context.Context, stream string, handler pubsub.Handler) { - for { - // Retrieve pending messages for this stream and consumer - pendingResult, err := r.client.XPendingExt(ctx, &redis.XPendingExtArgs{ - Stream: stream, - Group: r.metadata.consumerID, - Start: "-", - End: "+", - Count: int64(r.metadata.queueDepth), - }).Result() - if err != nil && !errors.Is(err, redis.Nil) { - r.logger.Errorf("error retrieving pending Redis messages: %v", err) - - break - } + if r.legacyRedis { + for { + // Retrieve pending messages for this stream and consumer + pendingResult, err := r.clientv8.XPendingExt(ctx, &redis.XPendingExtArgs{ + Stream: stream, + Group: r.metadata.consumerID, + Start: "-", + End: "+", + Count: int64(r.metadata.queueDepth), + }).Result() + if err != nil && !errors.Is(err, redis.Nil) { + r.logger.Errorf("error retrieving pending Redis messages: %v", err) + + break + } - // Filter out messages that have not timed out yet - msgIDs := make([]string, 0, len(pendingResult)) - for _, msg := range pendingResult { - if msg.Idle >= r.metadata.processingTimeout { - msgIDs = append(msgIDs, msg.ID) + // Filter out messages that have not timed out yet + msgIDs := make([]string, 0, len(pendingResult)) + for _, msg := range pendingResult { + if msg.Idle >= r.metadata.processingTimeout { + msgIDs = append(msgIDs, msg.ID) + } } - } - // Nothing to claim - if len(msgIDs) == 0 { - break - } + // Nothing to claim + if len(msgIDs) == 0 { + break + } - // Attempt to claim the messages for the filtered IDs - claimResult, err := r.client.XClaim(ctx, &redis.XClaimArgs{ - Stream: stream, - Group: r.metadata.consumerID, - Consumer: r.metadata.consumerID, - MinIdle: r.metadata.processingTimeout, - Messages: msgIDs, - }).Result() - if err != nil && !errors.Is(err, redis.Nil) { - r.logger.Errorf("error claiming pending Redis messages: %v", err) + // Attempt to claim the messages for the filtered IDs + claimResult, err := r.clientv8.XClaim(ctx, &redis.XClaimArgs{ + Stream: stream, + Group: r.metadata.consumerID, + Consumer: r.metadata.consumerID, + MinIdle: r.metadata.processingTimeout, + Messages: msgIDs, + }).Result() + if err != nil && !errors.Is(err, redis.Nil) { + r.logger.Errorf("error claiming pending Redis messages: %v", err) + + break + } - break + // Enqueue claimed messages + r.enqueueMessagesv8(ctx, stream, handler, claimResult) + + // If the Redis nil error is returned, it means somes message in the pending + // state no longer exist. We need to acknowledge these messages to + // remove them from the pending list. + if errors.Is(err, redis.Nil) { + // Build a set of message IDs that were not returned + // that potentially no longer exist. + expectedMsgIDs := make(map[string]struct{}, len(msgIDs)) + for _, id := range msgIDs { + expectedMsgIDs[id] = struct{}{} + } + for _, claimed := range claimResult { + delete(expectedMsgIDs, claimed.ID) + } + + r.removeMessagesThatNoLongerExistFromPending(ctx, stream, expectedMsgIDs, handler) + } } + } else { + for { + // Retrieve pending messages for this stream and consumer + pendingResult, err := r.clientv9.XPendingExt(ctx, &v9.XPendingExtArgs{ + Stream: stream, + Group: r.metadata.consumerID, + Start: "-", + End: "+", + Count: int64(r.metadata.queueDepth), + }).Result() + if err != nil && !errors.Is(err, redis.Nil) { + r.logger.Errorf("error retrieving pending Redis messages: %v", err) + + break + } - // Enqueue claimed messages - r.enqueueMessages(ctx, stream, handler, claimResult) - - // If the Redis nil error is returned, it means somes message in the pending - // state no longer exist. We need to acknowledge these messages to - // remove them from the pending list. - if errors.Is(err, redis.Nil) { - // Build a set of message IDs that were not returned - // that potentially no longer exist. - expectedMsgIDs := make(map[string]struct{}, len(msgIDs)) - for _, id := range msgIDs { - expectedMsgIDs[id] = struct{}{} + // Filter out messages that have not timed out yet + msgIDs := make([]string, 0, len(pendingResult)) + for _, msg := range pendingResult { + if msg.Idle >= r.metadata.processingTimeout { + msgIDs = append(msgIDs, msg.ID) + } } - for _, claimed := range claimResult { - delete(expectedMsgIDs, claimed.ID) + + // Nothing to claim + if len(msgIDs) == 0 { + break + } + + // Attempt to claim the messages for the filtered IDs + claimResult, err := r.clientv9.XClaim(ctx, &v9.XClaimArgs{ + Stream: stream, + Group: r.metadata.consumerID, + Consumer: r.metadata.consumerID, + MinIdle: r.metadata.processingTimeout, + Messages: msgIDs, + }).Result() + if err != nil && !errors.Is(err, redis.Nil) { + r.logger.Errorf("error claiming pending Redis messages: %v", err) + + break } - r.removeMessagesThatNoLongerExistFromPending(ctx, stream, expectedMsgIDs, handler) + // Enqueue claimed messages + r.enqueueMessagesv9(ctx, stream, handler, claimResult) + + // If the Redis nil error is returned, it means somes message in the pending + // state no longer exist. We need to acknowledge these messages to + // remove them from the pending list. + if errors.Is(err, redis.Nil) { + // Build a set of message IDs that were not returned + // that potentially no longer exist. + expectedMsgIDs := make(map[string]struct{}, len(msgIDs)) + for _, id := range msgIDs { + expectedMsgIDs[id] = struct{}{} + } + for _, claimed := range claimResult { + delete(expectedMsgIDs, claimed.ID) + } + + r.removeMessagesThatNoLongerExistFromPending(ctx, stream, expectedMsgIDs, handler) + } } } } @@ -394,30 +569,59 @@ func (r *redisStreams) reclaimPendingMessages(ctx context.Context, stream string // removeMessagesThatNoLongerExistFromPending attempts to claim messages individually so that messages in the pending list // that no longer exist can be removed from the pending list. This is done by calling `XACK`. func (r *redisStreams) removeMessagesThatNoLongerExistFromPending(ctx context.Context, stream string, messageIDs map[string]struct{}, handler pubsub.Handler) { - // Check each message ID individually. - for pendingID := range messageIDs { - claimResultSingleMsg, err := r.client.XClaim(ctx, &redis.XClaimArgs{ - Stream: stream, - Group: r.metadata.consumerID, - Consumer: r.metadata.consumerID, - MinIdle: 0, - Messages: []string{pendingID}, - }).Result() - if err != nil && !errors.Is(err, redis.Nil) { - r.logger.Errorf("error claiming pending Redis message %s: %v", pendingID, err) + if r.legacyRedis { + // Check each message ID individually. + for pendingID := range messageIDs { + claimResultSingleMsg, err := r.clientv8.XClaim(ctx, &v8.XClaimArgs{ + Stream: stream, + Group: r.metadata.consumerID, + Consumer: r.metadata.consumerID, + MinIdle: 0, + Messages: []string{pendingID}, + }).Result() + if err != nil && !errors.Is(err, redis.Nil) { + r.logger.Errorf("error claiming pending Redis message %s: %v", pendingID, err) + + continue + } - continue + // Ack the message to remove it from the pending list. + if errors.Is(err, redis.Nil) { + // Use the background context in case subscriptionCtx is already closed + if err = r.clientv8.XAck(context.Background(), stream, r.metadata.consumerID, pendingID).Err(); err != nil { + r.logger.Errorf("error acknowledging Redis message %s after failed claim for %s: %v", pendingID, stream, err) + } + } else { + // This should not happen but if it does the message should be processed. + r.enqueueMessagesv8(ctx, stream, handler, claimResultSingleMsg) + } } + } else { + // Check each message ID individually. + for pendingID := range messageIDs { + claimResultSingleMsg, err := r.clientv9.XClaim(ctx, &v9.XClaimArgs{ + Stream: stream, + Group: r.metadata.consumerID, + Consumer: r.metadata.consumerID, + MinIdle: 0, + Messages: []string{pendingID}, + }).Result() + if err != nil && !errors.Is(err, redis.Nil) { + r.logger.Errorf("error claiming pending Redis message %s: %v", pendingID, err) + + continue + } - // Ack the message to remove it from the pending list. - if errors.Is(err, redis.Nil) { - // Use the background context in case subscriptionCtx is already closed - if err = r.client.XAck(context.Background(), stream, r.metadata.consumerID, pendingID).Err(); err != nil { - r.logger.Errorf("error acknowledging Redis message %s after failed claim for %s: %v", pendingID, stream, err) + // Ack the message to remove it from the pending list. + if errors.Is(err, redis.Nil) { + // Use the background context in case subscriptionCtx is already closed + if err = r.clientv9.XAck(context.Background(), stream, r.metadata.consumerID, pendingID).Err(); err != nil { + r.logger.Errorf("error acknowledging Redis message %s after failed claim for %s: %v", pendingID, stream, err) + } + } else { + // This should not happen but if it does the message should be processed. + r.enqueueMessagesv9(ctx, stream, handler, claimResultSingleMsg) } - } else { - // This should not happen but if it does the message should be processed. - r.enqueueMessages(ctx, stream, handler, claimResultSingleMsg) } } } @@ -425,7 +629,10 @@ func (r *redisStreams) removeMessagesThatNoLongerExistFromPending(ctx context.Co func (r *redisStreams) Close() error { r.cancel() - return r.client.Close() + if r.legacyRedis { + return r.clientv8.Close() + } + return r.clientv9.Close() } func (r *redisStreams) Features() []pubsub.Feature { @@ -433,8 +640,14 @@ func (r *redisStreams) Features() []pubsub.Feature { } func (r *redisStreams) Ping() error { - if _, err := r.client.Ping(context.Background()).Result(); err != nil { - return fmt.Errorf("redis pubsub: error connecting to redis at %s: %s", r.clientSettings.Host, err) + if r.legacyRedis { + if _, err := r.clientv8.Ping(context.Background()).Result(); err != nil { + return fmt.Errorf("redis pubsub: error connecting to redis at %s: %s", r.clientSettings.Host, err) + } + } else { + if _, err := r.clientv9.Ping(context.Background()).Result(); err != nil { + return fmt.Errorf("redis pubsub: error connecting to redis at %s: %s", r.clientSettings.Host, err) + } } return nil diff --git a/pubsub/redis/redis_test.go b/pubsub/redis/redis_test.go index 2ad82cd0c7..bbd93ac154 100644 --- a/pubsub/redis/redis_test.go +++ b/pubsub/redis/redis_test.go @@ -98,7 +98,7 @@ func TestProcessStreams(t *testing.T) { testRedisStream.ctx, testRedisStream.cancel = context.WithCancel(context.Background()) testRedisStream.queue = make(chan redisMessageWrapper, 10) go testRedisStream.worker() - testRedisStream.enqueueMessages(context.Background(), fakeConsumerID, fakeHandler, generateRedisStreamTestData(2, 3, expectedData)) + testRedisStream.enqueueMessagesv8(context.Background(), fakeConsumerID, fakeHandler, generateRedisStreamTestData(2, 3, expectedData)) // Wait for the handler to finish processing wg.Wait() diff --git a/state/redis/redis.go b/state/redis/redis.go index 3ff7c55fe6..6f9a0b03a2 100644 --- a/state/redis/redis.go +++ b/state/redis/redis.go @@ -20,7 +20,8 @@ import ( "strings" "github.com/agrea/ptr" - "github.com/go-redis/redis/v8" + redisClientv8 "github.com/go-redis/redis/v8" + redisClientv9 "github.com/go-redis/redis/v9" jsoniter "github.com/json-iterator/go" "github.com/dapr/components-contrib/contenttype" @@ -91,7 +92,9 @@ const ( // StateStore is a Redis state store. type StateStore struct { state.DefaultBulkStore - client redis.UniversalClient + clientv8 redisClientv8.UniversalClient + clientv9 redisClientv9.UniversalClient + legacyRedis bool clientSettings *rediscomponent.Settings json jsoniter.API metadata rediscomponent.Metadata @@ -118,8 +121,15 @@ func NewRedisStateStore(logger logger.Logger) state.Store { } func (r *StateStore) Ping() error { - if _, err := r.client.Ping(context.Background()).Result(); err != nil { - return fmt.Errorf("redis store: error connecting to redis at %s: %s", r.clientSettings.Host, err) + // redis 6 + if r.legacyRedis { + if _, err := r.clientv8.Ping(context.Background()).Result(); err != nil { + return fmt.Errorf("redis store: error connecting to redis at %s: %s", r.clientSettings.Host, err) + } + } else { + if _, err := r.clientv9.Ping(context.Background()).Result(); err != nil { + return fmt.Errorf("redis store: error connecting to redis at %s: %s", r.clientSettings.Host, err) + } } return nil @@ -132,9 +142,17 @@ func (r *StateStore) Init(metadata state.Metadata) error { return err } r.metadata = m + if rediscomponent.IsLegacyRedisVersion(metadata.Properties) { + r.legacyRedis = true + } defaultSettings := rediscomponent.Settings{RedisMaxRetries: m.MaxRetries, RedisMaxRetryInterval: rediscomponent.Duration(m.MaxRetryBackoff)} - r.client, r.clientSettings, err = rediscomponent.ParseClientFromProperties(metadata.Properties, &defaultSettings) + // redis 6 and below + if r.legacyRedis { + r.clientv8, r.clientSettings, err = rediscomponent.ParseClientv8FromProperties(metadata.Properties, &defaultSettings) + } else { + r.clientv9, r.clientSettings, err = rediscomponent.ParseClientv9FromProperties(metadata.Properties, &defaultSettings) + } if err != nil { return err } @@ -146,8 +164,15 @@ func (r *StateStore) Init(metadata state.Metadata) error { r.ctx, r.cancel = context.WithCancel(context.Background()) - if _, err = r.client.Ping(r.ctx).Result(); err != nil { - return fmt.Errorf("redis store: error connecting to redis at %s: %v", r.clientSettings.Host, err) + // redis 6 and below + if r.legacyRedis { + if _, err = r.clientv8.Ping(r.ctx).Result(); err != nil { + return fmt.Errorf("redis store: error connecting to redis at %s: %v", r.clientSettings.Host, err) + } + } else { + if _, err = r.clientv9.Ping(r.ctx).Result(); err != nil { + return fmt.Errorf("redis store: error connecting to redis at %s: %v", r.clientSettings.Host, err) + } } if r.replicas, err = r.getConnectedSlaves(); err != nil { @@ -167,7 +192,15 @@ func (r *StateStore) Features() []state.Feature { } func (r *StateStore) getConnectedSlaves() (int, error) { - res, err := r.client.Do(r.ctx, "INFO", "replication").Result() + var res interface{} + var err error + + // redis 6 and below + if r.legacyRedis { + res, err = r.clientv8.Do(r.ctx, "INFO", "replication").Result() + } else { + res, err = r.clientv9.Do(r.ctx, "INFO", "replication").Result() + } if err != nil { return 0, err } @@ -207,7 +240,13 @@ func (r *StateStore) deleteValue(req *state.DeleteRequest) error { } else { delQuery = delDefaultQuery } - _, err := r.client.Do(r.ctx, "EVAL", delQuery, 1, req.Key, *req.ETag).Result() + var err error + // redis 6 and below + if r.legacyRedis { + _, err = r.clientv8.Do(r.ctx, "EVAL", delQuery, 1, req.Key, *req.ETag).Result() + } else { + _, err = r.clientv9.Do(r.ctx, "EVAL", delQuery, 1, req.Key, *req.ETag).Result() + } if err != nil { return state.NewETagError(state.ETagMismatch, err) } @@ -226,7 +265,15 @@ func (r *StateStore) Delete(req *state.DeleteRequest) error { } func (r *StateStore) directGet(req *state.GetRequest) (*state.GetResponse, error) { - res, err := r.client.Do(r.ctx, "GET", req.Key).Result() + var res interface{} + var err error + + // redis 6 and below + if r.legacyRedis { + res, err = r.clientv8.Do(r.ctx, "GET", req.Key).Result() + } else { + res, err = r.clientv9.Do(r.ctx, "GET", req.Key).Result() + } if err != nil { return nil, err } @@ -243,7 +290,15 @@ func (r *StateStore) directGet(req *state.GetRequest) (*state.GetResponse, error } func (r *StateStore) getDefault(req *state.GetRequest) (*state.GetResponse, error) { - res, err := r.client.Do(r.ctx, "HGETALL", req.Key).Result() // Prefer values with ETags + var res interface{} + var err error + + // redis 6 and below + if r.legacyRedis { + res, err = r.clientv8.Do(r.ctx, "HGETALL", req.Key).Result() // Prefer values with ETags + } else { + res, err = r.clientv9.Do(r.ctx, "HGETALL", req.Key).Result() // Prefer values with ETags + } if err != nil { return r.directGet(req) // Falls back to original get for backward compats. } @@ -267,7 +322,15 @@ func (r *StateStore) getDefault(req *state.GetRequest) (*state.GetResponse, erro } func (r *StateStore) getJSON(req *state.GetRequest) (*state.GetResponse, error) { - res, err := r.client.Do(r.ctx, "JSON.GET", req.Key).Result() + var res interface{} + var err error + + // redis 6 and below + if r.legacyRedis { + res, err = r.clientv8.Do(r.ctx, "JSON.GET", req.Key).Result() + } else { + res, err = r.clientv9.Do(r.ctx, "JSON.GET", req.Key).Result() + } if err != nil { return nil, err } @@ -350,7 +413,12 @@ func (r *StateStore) setValue(req *state.SetRequest) error { bt, _ = utils.Marshal(req.Value, r.json.Marshal) } - err = r.client.Do(r.ctx, "EVAL", setQuery, 1, req.Key, ver, bt, firstWrite).Err() + // redis 6 and below + if r.legacyRedis { + err = r.clientv8.Do(r.ctx, "EVAL", setQuery, 1, req.Key, ver, bt, firstWrite).Err() + } else { + err = r.clientv9.Do(r.ctx, "EVAL", setQuery, 1, req.Key, ver, bt, firstWrite).Err() + } if err != nil { if req.ETag != nil { return state.NewETagError(state.ETagMismatch, err) @@ -360,21 +428,36 @@ func (r *StateStore) setValue(req *state.SetRequest) error { } if ttl != nil && *ttl > 0 { - _, err = r.client.Do(r.ctx, "EXPIRE", req.Key, *ttl).Result() + // redis 6 and below + if r.legacyRedis { + _, err = r.clientv8.Do(r.ctx, "EXPIRE", req.Key, *ttl).Result() + } else { + _, err = r.clientv9.Do(r.ctx, "EXPIRE", req.Key, *ttl).Result() + } if err != nil { return fmt.Errorf("failed to set key %s ttl: %s", req.Key, err) } } if ttl != nil && *ttl <= 0 { - _, err = r.client.Do(r.ctx, "PERSIST", req.Key).Result() + // redis 6 and below + if r.legacyRedis { + _, err = r.clientv8.Do(r.ctx, "PERSIST", req.Key).Result() + } else { + _, err = r.clientv9.Do(r.ctx, "PERSIST", req.Key).Result() + } if err != nil { return fmt.Errorf("failed to persist key %s: %s", req.Key, err) } } if req.Options.Consistency == state.Strong && r.replicas > 0 { - _, err = r.client.Do(r.ctx, "WAIT", r.replicas, 1000).Result() + // redis 6 and below + if r.legacyRedis { + _, err = r.clientv8.Do(r.ctx, "WAIT", r.replicas, 1000).Result() + } else { + _, err = r.clientv9.Do(r.ctx, "WAIT", r.replicas, 1000).Result() + } if err != nil { return fmt.Errorf("redis waiting for %v replicas to acknowledge write, err: %s", r.replicas, err.Error()) } @@ -401,7 +484,13 @@ func (r *StateStore) Multi(request *state.TransactionalStateRequest) error { delQuery = delDefaultQuery } - pipe := r.client.TxPipeline() + var pipe any + // redis 6 and below + if r.legacyRedis { + pipe = r.clientv8.TxPipeline() + } else { + pipe = r.clientv9.TxPipeline() + } for _, o := range request.Operations { if o.Operation == state.Upsert { req := o.Request.(state.SetRequest) @@ -423,24 +512,46 @@ func (r *StateStore) Multi(request *state.TransactionalStateRequest) error { } else { bt, _ = utils.Marshal(req.Value, r.json.Marshal) } - pipe.Do(r.ctx, "EVAL", setQuery, 1, req.Key, ver, bt) - if ttl != nil && *ttl > 0 { - pipe.Do(r.ctx, "EXPIRE", req.Key, *ttl) - } - if ttl != nil && *ttl <= 0 { - pipe.Do(r.ctx, "PERSIST", req.Key) + + if r.legacyRedis { + pipe.(redisClientv8.Pipeliner).Do(r.ctx, "EVAL", setQuery, 1, req.Key, ver, bt) + if ttl != nil && *ttl > 0 { + pipe.(redisClientv8.Pipeliner).Do(r.ctx, "EXPIRE", req.Key, *ttl) + } + if ttl != nil && *ttl <= 0 { + pipe.(redisClientv8.Pipeliner).Do(r.ctx, "PERSIST", req.Key) + } + } else { + pipe.(redisClientv9.Pipeliner).Do(r.ctx, "EVAL", setQuery, 1, req.Key, ver, bt) + if ttl != nil && *ttl > 0 { + pipe.(redisClientv9.Pipeliner).Do(r.ctx, "EXPIRE", req.Key, *ttl) + } + if ttl != nil && *ttl <= 0 { + pipe.(redisClientv9.Pipeliner).Do(r.ctx, "PERSIST", req.Key) + } } + } else if o.Operation == state.Delete { req := o.Request.(state.DeleteRequest) if req.ETag == nil { etag := "0" req.ETag = &etag } - pipe.Do(r.ctx, "EVAL", delQuery, 1, req.Key, *req.ETag) + if r.legacyRedis { + pipe.(redisClientv8.Pipeliner).Do(r.ctx, "EVAL", delQuery, 1, req.Key, *req.ETag) + } else { + pipe.(redisClientv9.Pipeliner).Do(r.ctx, "EVAL", delQuery, 1, req.Key, *req.ETag) + } } } - _, err := pipe.Exec(r.ctx) + var err error + // redis 6 and below + if r.legacyRedis { + _, err = pipe.(redisClientv8.Pipeliner).Exec(r.ctx) + } else { + _, err = pipe.(redisClientv9.Pipeliner).Exec(r.ctx) + } return err } @@ -448,16 +559,32 @@ func (r *StateStore) Multi(request *state.TransactionalStateRequest) error { func (r *StateStore) registerSchemas() error { for name, elem := range r.querySchemas { r.logger.Infof("redis: create query index %s", name) - if err := r.client.Do(r.ctx, elem.schema...).Err(); err != nil { - if err.Error() != "Index already exists" { - return err + // redis 6 and below + if r.legacyRedis { + if err := r.clientv8.Do(r.ctx, elem.schema...).Err(); err != nil { + if err.Error() != "Index already exists" { + return err + } + r.logger.Infof("redis: drop stale query index %s", name) + if err = r.clientv8.Do(r.ctx, "FT.DROPINDEX", name).Err(); err != nil { + return err + } + if err = r.clientv8.Do(r.ctx, elem.schema...).Err(); err != nil { + return err + } } - r.logger.Infof("redis: drop stale query index %s", name) - if err = r.client.Do(r.ctx, "FT.DROPINDEX", name).Err(); err != nil { - return err - } - if err = r.client.Do(r.ctx, elem.schema...).Err(); err != nil { - return err + } else { + if err := r.clientv9.Do(r.ctx, elem.schema...).Err(); err != nil { + if err.Error() != "Index already exists" { + return err + } + r.logger.Infof("redis: drop stale query index %s", name) + if err = r.clientv9.Do(r.ctx, "FT.DROPINDEX", name).Err(); err != nil { + return err + } + if err = r.clientv9.Do(r.ctx, elem.schema...).Err(); err != nil { + return err + } } } } @@ -529,7 +656,15 @@ func (r *StateStore) Query(req *state.QueryRequest) (*state.QueryResponse, error if err := qbuilder.BuildQuery(&req.Query); err != nil { return &state.QueryResponse{}, err } - data, token, err := q.execute(r.ctx, r.client) + var data []state.QueryItem + var token string + var err error + // redis 6 and below + if r.legacyRedis { + data, token, err = q.executev8(r.ctx, r.clientv8) + } else { + data, token, err = q.executev9(r.ctx, r.clientv9) + } if err != nil { return &state.QueryResponse{}, err } @@ -543,5 +678,5 @@ func (r *StateStore) Query(req *state.QueryRequest) (*state.QueryResponse, error func (r *StateStore) Close() error { r.cancel() - return r.client.Close() + return r.clientv8.Close() } diff --git a/state/redis/redis_query.go b/state/redis/redis_query.go index d539dfece9..1cae28ba8c 100644 --- a/state/redis/redis_query.go +++ b/state/redis/redis_query.go @@ -23,7 +23,8 @@ import ( "github.com/dapr/components-contrib/state" "github.com/dapr/components-contrib/state/query" - "github.com/go-redis/redis/v8" + v8 "github.com/go-redis/redis/v8" + v9 "github.com/go-redis/redis/v9" ) var ErrMultipleSortBy error = errors.New("multiple SORTBY steps are not allowed. Sort multiple fields in a single step") @@ -190,12 +191,25 @@ func (q *Query) Finalize(filters string, qq *query.Query) error { return nil } -func (q *Query) execute(ctx context.Context, client redis.UniversalClient) ([]state.QueryItem, string, error) { +func (q *Query) executev8(ctx context.Context, client v8.UniversalClient) ([]state.QueryItem, string, error) { query := append(append([]interface{}{"FT.SEARCH", q.schemaName}, q.query...), "RETURN", "2", "$.data", "$.version") ret, err := client.Do(ctx, query...).Result() if err != nil { return nil, "", err } + return parseQueryResult(q, ret) +} + +func (q *Query) executev9(ctx context.Context, client v9.UniversalClient) ([]state.QueryItem, string, error) { + query := append(append([]interface{}{"FT.SEARCH", q.schemaName}, q.query...), "RETURN", "2", "$.data", "$.version") + ret, err := client.Do(ctx, query...).Result() + if err != nil { + return nil, "", err + } + return parseQueryResult(q, ret) +} + +func parseQueryResult(q *Query, ret interface{}) ([]state.QueryItem, string, error) { arr, ok := ret.([]interface{}) if !ok { return nil, "", fmt.Errorf("invalid output") @@ -229,5 +243,5 @@ func (q *Query) execute(ctx context.Context, client redis.UniversalClient) ([]st token = strconv.FormatInt(q.offset+int64(len(res)), 10) } - return res, token, err + return res, token, nil } diff --git a/state/redis/redis_test.go b/state/redis/redis_test.go index fc99bd7a4f..0a8e8c48b7 100644 --- a/state/redis/redis_test.go +++ b/state/redis/redis_test.go @@ -21,7 +21,8 @@ import ( "github.com/agrea/ptr" miniredis "github.com/alicebob/miniredis/v2" - redis "github.com/go-redis/redis/v8" + v8 "github.com/go-redis/redis/v8" + v9 "github.com/go-redis/redis/v9" jsoniter "github.com/json-iterator/go" "github.com/stretchr/testify/assert" @@ -195,14 +196,15 @@ func TestParseConnectedSlavs(t *testing.T) { }) } -func TestTransactionalUpsert(t *testing.T) { - s, c := setupMiniredis() +func TestTransactionalUpsertv8(t *testing.T) { + s, c := setupMiniredisv8() defer s.Close() ss := &StateStore{ - client: c, - json: jsoniter.ConfigFastest, - logger: logger.NewLogger("test"), + legacyRedis: true, + clientv8: c, + json: jsoniter.ConfigFastest, + logger: logger.NewLogger("test"), } ss.ctx, ss.cancel = context.WithCancel(context.Background()) @@ -261,14 +263,15 @@ func TestTransactionalUpsert(t *testing.T) { assert.Equal(t, int64(-1), res) } -func TestTransactionalDelete(t *testing.T) { - s, c := setupMiniredis() +func TestTransactionalDeletev8(t *testing.T) { + s, c := setupMiniredisv8() defer s.Close() ss := &StateStore{ - client: c, - json: jsoniter.ConfigFastest, - logger: logger.NewLogger("test"), + legacyRedis: true, + clientv8: c, + json: jsoniter.ConfigFastest, + logger: logger.NewLogger("test"), } ss.ctx, ss.cancel = context.WithCancel(context.Background()) @@ -297,11 +300,12 @@ func TestTransactionalDelete(t *testing.T) { assert.Equal(t, 0, len(vals)) } -func TestPing(t *testing.T) { - s, c := setupMiniredis() +func TestPingv8(t *testing.T) { + s, c := setupMiniredisv8() ss := &StateStore{ - client: c, + legacyRedis: true, + clientv8: c, json: jsoniter.ConfigFastest, logger: logger.NewLogger("test"), clientSettings: &rediscomponent.Settings{}, @@ -316,14 +320,337 @@ func TestPing(t *testing.T) { assert.Error(t, err) } -func TestRequestsWithGlobalTTL(t *testing.T) { - s, c := setupMiniredis() +func TestRequestsWithGlobalTTLv8(t *testing.T) { + s, c := setupMiniredisv8() defer s.Close() globalTTLInSeconds := 100 ss := &StateStore{ - client: c, + legacyRedis: true, + clientv8: c, + json: jsoniter.ConfigFastest, + logger: logger.NewLogger("test"), + metadata: rediscomponent.Metadata{TTLInSeconds: &globalTTLInSeconds}, + } + ss.ctx, ss.cancel = context.WithCancel(context.Background()) + + t.Run("TTL: Only global specified", func(t *testing.T) { + ss.Set(&state.SetRequest{ + Key: "weapon100", + Value: "deathstar100", + }) + ttl, _ := ss.clientv8.TTL(ss.ctx, "weapon100").Result() + + assert.Equal(t, time.Duration(globalTTLInSeconds)*time.Second, ttl) + }) + + t.Run("TTL: Global and Request specified", func(t *testing.T) { + requestTTL := 200 + ss.Set(&state.SetRequest{ + Key: "weapon100", + Value: "deathstar100", + Metadata: map[string]string{ + "ttlInSeconds": strconv.Itoa(requestTTL), + }, + }) + ttl, _ := ss.clientv8.TTL(ss.ctx, "weapon100").Result() + + assert.Equal(t, time.Duration(requestTTL)*time.Second, ttl) + }) + + t.Run("TTL: Global and Request specified", func(t *testing.T) { + err := ss.Multi(&state.TransactionalStateRequest{ + Operations: []state.TransactionalStateOperation{ + { + Operation: state.Upsert, + Request: state.SetRequest{ + Key: "weapon", + Value: "deathstar", + }, + }, + { + Operation: state.Upsert, + Request: state.SetRequest{ + Key: "weapon2", + Value: "deathstar2", + Metadata: map[string]string{ + "ttlInSeconds": "123", + }, + }, + }, + { + Operation: state.Upsert, + Request: state.SetRequest{ + Key: "weapon3", + Value: "deathstar3", + Metadata: map[string]string{ + "ttlInSeconds": "-1", + }, + }, + }, + }, + }) + assert.Equal(t, nil, err) + + res, err := c.Do(context.Background(), "HGETALL", "weapon").Result() + assert.Equal(t, nil, err) + + vals := res.([]interface{}) + data, version, err := ss.getKeyVersion(vals) + assert.Equal(t, nil, err) + assert.Equal(t, ptr.String("1"), version) + assert.Equal(t, `"deathstar"`, data) + + res, err = c.Do(context.Background(), "TTL", "weapon").Result() + assert.Equal(t, nil, err) + assert.Equal(t, int64(globalTTLInSeconds), res) + + res, err = c.Do(context.Background(), "TTL", "weapon2").Result() + assert.Equal(t, nil, err) + assert.Equal(t, int64(123), res) + + res, err = c.Do(context.Background(), "TTL", "weapon3").Result() + assert.Equal(t, nil, err) + assert.Equal(t, int64(-1), res) + }) +} + +func TestSetRequestWithTTLv8(t *testing.T) { + s, c := setupMiniredisv8() + defer s.Close() + + ss := &StateStore{ + legacyRedis: true, + clientv8: c, + json: jsoniter.ConfigFastest, + logger: logger.NewLogger("test"), + } + ss.ctx, ss.cancel = context.WithCancel(context.Background()) + + t.Run("TTL specified", func(t *testing.T) { + ttlInSeconds := 100 + ss.Set(&state.SetRequest{ + Key: "weapon100", + Value: "deathstar100", + Metadata: map[string]string{ + "ttlInSeconds": strconv.Itoa(ttlInSeconds), + }, + }) + + ttl, _ := ss.clientv8.TTL(ss.ctx, "weapon100").Result() + + assert.Equal(t, time.Duration(ttlInSeconds)*time.Second, ttl) + }) + + t.Run("TTL not specified", func(t *testing.T) { + ss.Set(&state.SetRequest{ + Key: "weapon200", + Value: "deathstar200", + }) + + ttl, _ := ss.clientv8.TTL(ss.ctx, "weapon200").Result() + + assert.Equal(t, time.Duration(-1), ttl) + }) + + t.Run("TTL Changed for Existing Key", func(t *testing.T) { + ss.Set(&state.SetRequest{ + Key: "weapon300", + Value: "deathstar300", + }) + ttl, _ := ss.clientv8.TTL(ss.ctx, "weapon300").Result() + assert.Equal(t, time.Duration(-1), ttl) + + // make the key no longer persistent + ttlInSeconds := 123 + ss.Set(&state.SetRequest{ + Key: "weapon300", + Value: "deathstar300", + Metadata: map[string]string{ + "ttlInSeconds": strconv.Itoa(ttlInSeconds), + }, + }) + ttl, _ = ss.clientv8.TTL(ss.ctx, "weapon300").Result() + assert.Equal(t, time.Duration(ttlInSeconds)*time.Second, ttl) + + // make the key persistent again + ss.Set(&state.SetRequest{ + Key: "weapon300", + Value: "deathstar301", + Metadata: map[string]string{ + "ttlInSeconds": strconv.Itoa(-1), + }, + }) + ttl, _ = ss.clientv8.TTL(ss.ctx, "weapon300").Result() + assert.Equal(t, time.Duration(-1), ttl) + }) +} + +func TestTransactionalDeleteNoEtagv8(t *testing.T) { + s, c := setupMiniredisv8() + defer s.Close() + + ss := &StateStore{ + legacyRedis: true, + clientv8: c, + json: jsoniter.ConfigFastest, + logger: logger.NewLogger("test"), + } + ss.ctx, ss.cancel = context.WithCancel(context.Background()) + + // Insert a record first. + ss.Set(&state.SetRequest{ + Key: "weapon100", + Value: "deathstar100", + }) + + err := ss.Multi(&state.TransactionalStateRequest{ + Operations: []state.TransactionalStateOperation{{ + Operation: state.Delete, + Request: state.DeleteRequest{ + Key: "weapon100", + }, + }}, + }) + assert.Equal(t, nil, err) + + res, err := c.Do(context.Background(), "HGETALL", "weapon100").Result() + assert.Equal(t, nil, err) + + vals := res.([]interface{}) + assert.Equal(t, 0, len(vals)) +} + +func TestTransactionalUpsertv9(t *testing.T) { + s, c := setupMiniredisv9() + defer s.Close() + + ss := &StateStore{ + clientv9: c, + json: jsoniter.ConfigFastest, + logger: logger.NewLogger("test"), + } + ss.ctx, ss.cancel = context.WithCancel(context.Background()) + + err := ss.Multi(&state.TransactionalStateRequest{ + Operations: []state.TransactionalStateOperation{ + { + Operation: state.Upsert, + Request: state.SetRequest{ + Key: "weapon", + Value: "deathstar", + }, + }, + { + Operation: state.Upsert, + Request: state.SetRequest{ + Key: "weapon2", + Value: "deathstar2", + Metadata: map[string]string{ + "ttlInSeconds": "123", + }, + }, + }, + { + Operation: state.Upsert, + Request: state.SetRequest{ + Key: "weapon3", + Value: "deathstar3", + Metadata: map[string]string{ + "ttlInSeconds": "-1", + }, + }, + }, + }, + }) + assert.Equal(t, nil, err) + + res, err := c.Do(context.Background(), "HGETALL", "weapon").Result() + assert.Equal(t, nil, err) + + vals := res.([]interface{}) + data, version, err := ss.getKeyVersion(vals) + assert.Equal(t, nil, err) + assert.Equal(t, ptr.String("1"), version) + assert.Equal(t, `"deathstar"`, data) + + res, err = c.Do(context.Background(), "TTL", "weapon").Result() + assert.Equal(t, nil, err) + assert.Equal(t, int64(-1), res) + + res, err = c.Do(context.Background(), "TTL", "weapon2").Result() + assert.Equal(t, nil, err) + assert.Equal(t, int64(123), res) + + res, err = c.Do(context.Background(), "TTL", "weapon3").Result() + assert.Equal(t, nil, err) + assert.Equal(t, int64(-1), res) +} + +func TestTransactionalDeletev9(t *testing.T) { + s, c := setupMiniredisv9() + defer s.Close() + + ss := &StateStore{ + clientv9: c, + json: jsoniter.ConfigFastest, + logger: logger.NewLogger("test"), + } + ss.ctx, ss.cancel = context.WithCancel(context.Background()) + + // Insert a record first. + ss.Set(&state.SetRequest{ + Key: "weapon", + Value: "deathstar", + }) + + etag := "1" + err := ss.Multi(&state.TransactionalStateRequest{ + Operations: []state.TransactionalStateOperation{{ + Operation: state.Delete, + Request: state.DeleteRequest{ + Key: "weapon", + ETag: &etag, + }, + }}, + }) + assert.Equal(t, nil, err) + + res, err := c.Do(context.Background(), "HGETALL", "weapon").Result() + assert.Equal(t, nil, err) + + vals := res.([]interface{}) + assert.Equal(t, 0, len(vals)) +} + +func TestPingv9(t *testing.T) { + s, c := setupMiniredisv9() + + ss := &StateStore{ + clientv9: c, + json: jsoniter.ConfigFastest, + logger: logger.NewLogger("test"), + clientSettings: &rediscomponent.Settings{}, + } + + err := state.Ping(ss) + assert.NoError(t, err) + + s.Close() + + err = state.Ping(ss) + assert.Error(t, err) +} + +func TestRequestsWithGlobalTTLv9(t *testing.T) { + s, c := setupMiniredisv9() + defer s.Close() + + globalTTLInSeconds := 100 + + ss := &StateStore{ + clientv9: c, json: jsoniter.ConfigFastest, logger: logger.NewLogger("test"), metadata: rediscomponent.Metadata{TTLInSeconds: &globalTTLInSeconds}, @@ -335,7 +662,7 @@ func TestRequestsWithGlobalTTL(t *testing.T) { Key: "weapon100", Value: "deathstar100", }) - ttl, _ := ss.client.TTL(ss.ctx, "weapon100").Result() + ttl, _ := ss.clientv9.TTL(ss.ctx, "weapon100").Result() assert.Equal(t, time.Duration(globalTTLInSeconds)*time.Second, ttl) }) @@ -349,7 +676,7 @@ func TestRequestsWithGlobalTTL(t *testing.T) { "ttlInSeconds": strconv.Itoa(requestTTL), }, }) - ttl, _ := ss.client.TTL(ss.ctx, "weapon100").Result() + ttl, _ := ss.clientv9.TTL(ss.ctx, "weapon100").Result() assert.Equal(t, time.Duration(requestTTL)*time.Second, ttl) }) @@ -411,14 +738,14 @@ func TestRequestsWithGlobalTTL(t *testing.T) { }) } -func TestSetRequestWithTTL(t *testing.T) { - s, c := setupMiniredis() +func TestSetRequestWithTTLv9(t *testing.T) { + s, c := setupMiniredisv9() defer s.Close() ss := &StateStore{ - client: c, - json: jsoniter.ConfigFastest, - logger: logger.NewLogger("test"), + clientv9: c, + json: jsoniter.ConfigFastest, + logger: logger.NewLogger("test"), } ss.ctx, ss.cancel = context.WithCancel(context.Background()) @@ -432,7 +759,7 @@ func TestSetRequestWithTTL(t *testing.T) { }, }) - ttl, _ := ss.client.TTL(ss.ctx, "weapon100").Result() + ttl, _ := ss.clientv9.TTL(ss.ctx, "weapon100").Result() assert.Equal(t, time.Duration(ttlInSeconds)*time.Second, ttl) }) @@ -443,7 +770,7 @@ func TestSetRequestWithTTL(t *testing.T) { Value: "deathstar200", }) - ttl, _ := ss.client.TTL(ss.ctx, "weapon200").Result() + ttl, _ := ss.clientv9.TTL(ss.ctx, "weapon200").Result() assert.Equal(t, time.Duration(-1), ttl) }) @@ -453,7 +780,7 @@ func TestSetRequestWithTTL(t *testing.T) { Key: "weapon300", Value: "deathstar300", }) - ttl, _ := ss.client.TTL(ss.ctx, "weapon300").Result() + ttl, _ := ss.clientv9.TTL(ss.ctx, "weapon300").Result() assert.Equal(t, time.Duration(-1), ttl) // make the key no longer persistent @@ -465,7 +792,7 @@ func TestSetRequestWithTTL(t *testing.T) { "ttlInSeconds": strconv.Itoa(ttlInSeconds), }, }) - ttl, _ = ss.client.TTL(ss.ctx, "weapon300").Result() + ttl, _ = ss.clientv9.TTL(ss.ctx, "weapon300").Result() assert.Equal(t, time.Duration(ttlInSeconds)*time.Second, ttl) // make the key persistent again @@ -476,19 +803,19 @@ func TestSetRequestWithTTL(t *testing.T) { "ttlInSeconds": strconv.Itoa(-1), }, }) - ttl, _ = ss.client.TTL(ss.ctx, "weapon300").Result() + ttl, _ = ss.clientv9.TTL(ss.ctx, "weapon300").Result() assert.Equal(t, time.Duration(-1), ttl) }) } -func TestTransactionalDeleteNoEtag(t *testing.T) { - s, c := setupMiniredis() +func TestTransactionalDeleteNoEtagv9(t *testing.T) { + s, c := setupMiniredisv9() defer s.Close() ss := &StateStore{ - client: c, - json: jsoniter.ConfigFastest, - logger: logger.NewLogger("test"), + clientv9: c, + json: jsoniter.ConfigFastest, + logger: logger.NewLogger("test"), } ss.ctx, ss.cancel = context.WithCancel(context.Background()) @@ -515,15 +842,28 @@ func TestTransactionalDeleteNoEtag(t *testing.T) { assert.Equal(t, 0, len(vals)) } -func setupMiniredis() (*miniredis.Miniredis, *redis.Client) { +func setupMiniredisv8() (*miniredis.Miniredis, *v8.Client) { + s, err := miniredis.Run() + if err != nil { + panic(err) + } + opts := &v8.Options{ + Addr: s.Addr(), + DB: defaultDB, + } + + return s, v8.NewClient(opts) +} + +func setupMiniredisv9() (*miniredis.Miniredis, *v9.Client) { s, err := miniredis.Run() if err != nil { panic(err) } - opts := &redis.Options{ + opts := &v9.Options{ Addr: s.Addr(), DB: defaultDB, } - return s, redis.NewClient(opts) + return s, v9.NewClient(opts) } diff --git a/tests/certification/state/redis/go.mod b/tests/certification/state/redis/go.mod index e876c24186..c7a2bd33a4 100644 --- a/tests/certification/state/redis/go.mod +++ b/tests/certification/state/redis/go.mod @@ -15,6 +15,7 @@ require ( github.com/antlr/antlr4/runtime/Go/antlr v0.0.0-20210826220005-b48c857c3a0e // indirect github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f // indirect github.com/go-logr/stdr v1.2.2 // indirect + github.com/go-redis/redis/v9 v9.0.0-beta.2 // indirect github.com/golang/mock v1.6.0 // indirect github.com/grpc-ecosystem/grpc-gateway/v2 v2.7.0 // indirect github.com/tylertreat/comcast v1.0.1 // indirect diff --git a/tests/certification/state/redis/go.sum b/tests/certification/state/redis/go.sum index 507f20ba64..515838827f 100644 --- a/tests/certification/state/redis/go.sum +++ b/tests/certification/state/redis/go.sum @@ -216,6 +216,8 @@ github.com/go-openapi/swag v0.19.5/go.mod h1:POnQmlKehdgb5mhVOsnJFsivZCEZ/vjK9gh github.com/go-openapi/swag v0.19.14/go.mod h1:QYRuS/SOXUCsnplDa677K7+DxSOj6IPNl/eQntq43wQ= github.com/go-redis/redis/v8 v8.11.5 h1:AcZZR7igkdvfVmQTPnu9WE37LRrO/YrBH5zWyjDC0oI= github.com/go-redis/redis/v8 v8.11.5/go.mod h1:gREzHqY1hg6oD9ngVRbLStwAWKhA0FEgq8Jd4h5lpwo= +github.com/go-redis/redis/v9 v9.0.0-beta.2 h1:ZSr84TsnQyKMAg8gnV+oawuQezeJR11/09THcWCQzr4= +github.com/go-redis/redis/v9 v9.0.0-beta.2/go.mod h1:Bldcd/M/bm9HbnNPi/LUtYBSD8ttcZYBMupwMXhdU0o= github.com/go-stack/stack v1.8.0/go.mod h1:v0f6uXyyMGvRgIKkXu+yp6POWl0qKG85gN/melR3HDY= github.com/go-task/slim-sprig v0.0.0-20210107165309-348f09dbbbc0/go.mod h1:fyg7847qk6SyHyPtNmDHnmrv/HOrqktSC+C9fM+CJOE= github.com/godbus/dbus/v5 v5.0.4/go.mod h1:xhWf0FNVPg57R7Z0UbKHbJfkEywrmjJnf7w5xrFpKfA= @@ -505,7 +507,7 @@ github.com/onsi/gomega v0.0.0-20170829124025-dcabb60a477c/go.mod h1:C1qb7wdrVGGV github.com/onsi/gomega v1.7.1/go.mod h1:XdKZgCCFLUoM/7CFJVPcG8C1xQ1AJ0vpAezJrB7JYyY= github.com/onsi/gomega v1.10.1/go.mod h1:iN09h71vgCQne3DLsj+A5owkum+a2tYe+TOCB1ybHNo= github.com/onsi/gomega v1.16.0/go.mod h1:HnhC7FXeEQY45zxNK3PPoIUhzk/80Xly9PcubAlGdZY= -github.com/onsi/gomega v1.18.1 h1:M1GfJqGRrBrrGGsbxzV5dqM2U2ApXefZCQpkukxYRLE= +github.com/onsi/gomega v1.20.0 h1:8W0cWlwFkflGPLltQvLRB7ZVD5HuP6ng320w2IS245Q= github.com/opentracing/opentracing-go v1.1.0/go.mod h1:UkNAQd3GIcIGf0SeVgPpRdFStlNbqXla1AfSYxPUl2o= github.com/openzipkin/zipkin-go v0.4.0 h1:CtfRrOVZtbDj8rt1WXjklw0kqqJQwICrCKmlfUuBUUw= github.com/openzipkin/zipkin-go v0.4.0/go.mod h1:4c3sLeE8xjNqehmF5RpAFLPLJxXscc0R4l6Zg0P1tTQ= From 00d849260c1aa97e005157a4b08f946a964d656c Mon Sep 17 00:00:00 2001 From: Bernd Verst <4535280+berndverst@users.noreply.github.com> Date: Thu, 15 Sep 2022 14:47:18 -0700 Subject: [PATCH 2/3] Add redis beta SDK warning Signed-off-by: Bernd Verst <4535280+berndverst@users.noreply.github.com> --- bindings/redis/redis.go | 1 + lock/redis/standalone.go | 24 ++++++++++++++++-------- pubsub/redis/redis.go | 31 +++++++++++++++---------------- state/redis/redis.go | 4 +++- 4 files changed, 35 insertions(+), 25 deletions(-) diff --git a/bindings/redis/redis.go b/bindings/redis/redis.go index 5296678588..9975bfc03a 100644 --- a/bindings/redis/redis.go +++ b/bindings/redis/redis.go @@ -51,6 +51,7 @@ func (r *Redis) Init(meta bindings.Metadata) (err error) { } else { r.legacyRedis = false r.clientv9, r.clientSettings, err = rediscomponent.ParseClientv9FromProperties(meta.Properties, nil) + r.logger.Warnf("Redis version 7 and above uses a Beta SDK at this time. Please use caution.") } if err != nil { return err diff --git a/lock/redis/standalone.go b/lock/redis/standalone.go index 88885a4e81..173d419674 100644 --- a/lock/redis/standalone.go +++ b/lock/redis/standalone.go @@ -70,6 +70,7 @@ func (r *StandaloneRedisLock) InitLockStore(metadata lock.Metadata) error { r.legacyRedis = true } else { r.legacyRedis = false + r.logger.Warnf("Redis version 7 and above uses a Beta SDK at this time. Please use caution.") } // must have `redisHost` if metadata.Properties["redisHost"] == "" { @@ -195,6 +196,7 @@ func (r *StandaloneRedisLock) TryLock(req *lock.TryLockRequest) (*lock.TryLockRe func (r *StandaloneRedisLock) Unlock(req *lock.UnlockRequest) (*lock.UnlockResponse, error) { var i int var err error + var status lock.Status if r.legacyRedis { // 1. delegate to client.eval lua script eval := r.clientv8.Eval(r.ctx, unlockScript, []string{req.ResourceID}, req.LockOwner) @@ -202,12 +204,18 @@ func (r *StandaloneRedisLock) Unlock(req *lock.UnlockRequest) (*lock.UnlockRespo if eval == nil { return newInternalErrorUnlockResponse(), fmt.Errorf("[standaloneRedisLock]: Eval unlock script returned nil.ResourceID: %s", req.ResourceID) } - err := eval.Err() + err = eval.Err() if err != nil { return newInternalErrorUnlockResponse(), err } // 3. parse result i, err = eval.Int() + status = lock.InternalError + if err != nil { + return &lock.UnlockResponse{ + Status: status, + }, err + } } else { // 1. delegate to client.eval lua script eval := r.clientv9.Eval(r.ctx, unlockScript, []string{req.ResourceID}, req.LockOwner) @@ -215,20 +223,20 @@ func (r *StandaloneRedisLock) Unlock(req *lock.UnlockRequest) (*lock.UnlockRespo if eval == nil { return newInternalErrorUnlockResponse(), fmt.Errorf("[standaloneRedisLock]: Eval unlock script returned nil.ResourceID: %s", req.ResourceID) } - err := eval.Err() + err = eval.Err() if err != nil { return newInternalErrorUnlockResponse(), err } // 3. parse result i, err = eval.Int() + status = lock.InternalError + if err != nil { + return &lock.UnlockResponse{ + Status: status, + }, err + } } - status := lock.InternalError - if err != nil { - return &lock.UnlockResponse{ - Status: status, - }, err - } if i >= 0 { status = lock.Success } else if i == -1 { diff --git a/pubsub/redis/redis.go b/pubsub/redis/redis.go index 8fe2b404c1..50079ca0ad 100644 --- a/pubsub/redis/redis.go +++ b/pubsub/redis/redis.go @@ -20,7 +20,6 @@ import ( "strconv" "time" - "github.com/go-redis/redis/v8" v8 "github.com/go-redis/redis/v8" v9 "github.com/go-redis/redis/v9" @@ -147,6 +146,7 @@ func (r *redisStreams) Init(metadata pubsub.Metadata) error { } else { r.legacyRedis = false r.clientv9, r.clientSettings, err = rediscomponent.ParseClientv9FromProperties(metadata.Properties, nil) + r.logger.Warnf("Redis version 7 and above uses a Beta SDK at this time. Please use caution.") } if err != nil { @@ -375,7 +375,7 @@ func (r *redisStreams) pollNewMessagesLoop(ctx context.Context, stream string, h Block: time.Duration(r.clientSettings.ReadTimeout), }).Result() if err != nil { - if !errors.Is(err, redis.Nil) && err != context.Canceled { + if !errors.Is(err, v8.Nil) && err != context.Canceled { r.logger.Errorf("redis streams: error reading from stream %s: %s", stream, err) } continue @@ -394,7 +394,7 @@ func (r *redisStreams) pollNewMessagesLoop(ctx context.Context, stream string, h Block: time.Duration(r.clientSettings.ReadTimeout), }).Result() if err != nil { - if !errors.Is(err, redis.Nil) && err != context.Canceled { + if !errors.Is(err, v9.Nil) && err != context.Canceled { r.logger.Errorf("redis streams: error reading from stream %s: %s", stream, err) } continue @@ -405,7 +405,6 @@ func (r *redisStreams) pollNewMessagesLoop(ctx context.Context, stream string, h r.enqueueMessagesv9(ctx, s.Stream, handler, s.Messages) } } - } } @@ -440,14 +439,14 @@ func (r *redisStreams) reclaimPendingMessages(ctx context.Context, stream string if r.legacyRedis { for { // Retrieve pending messages for this stream and consumer - pendingResult, err := r.clientv8.XPendingExt(ctx, &redis.XPendingExtArgs{ + pendingResult, err := r.clientv8.XPendingExt(ctx, &v8.XPendingExtArgs{ Stream: stream, Group: r.metadata.consumerID, Start: "-", End: "+", Count: int64(r.metadata.queueDepth), }).Result() - if err != nil && !errors.Is(err, redis.Nil) { + if err != nil && !errors.Is(err, v8.Nil) { r.logger.Errorf("error retrieving pending Redis messages: %v", err) break @@ -467,14 +466,14 @@ func (r *redisStreams) reclaimPendingMessages(ctx context.Context, stream string } // Attempt to claim the messages for the filtered IDs - claimResult, err := r.clientv8.XClaim(ctx, &redis.XClaimArgs{ + claimResult, err := r.clientv8.XClaim(ctx, &v8.XClaimArgs{ Stream: stream, Group: r.metadata.consumerID, Consumer: r.metadata.consumerID, MinIdle: r.metadata.processingTimeout, Messages: msgIDs, }).Result() - if err != nil && !errors.Is(err, redis.Nil) { + if err != nil && !errors.Is(err, v8.Nil) { r.logger.Errorf("error claiming pending Redis messages: %v", err) break @@ -486,7 +485,7 @@ func (r *redisStreams) reclaimPendingMessages(ctx context.Context, stream string // If the Redis nil error is returned, it means somes message in the pending // state no longer exist. We need to acknowledge these messages to // remove them from the pending list. - if errors.Is(err, redis.Nil) { + if errors.Is(err, v8.Nil) { // Build a set of message IDs that were not returned // that potentially no longer exist. expectedMsgIDs := make(map[string]struct{}, len(msgIDs)) @@ -510,7 +509,7 @@ func (r *redisStreams) reclaimPendingMessages(ctx context.Context, stream string End: "+", Count: int64(r.metadata.queueDepth), }).Result() - if err != nil && !errors.Is(err, redis.Nil) { + if err != nil && !errors.Is(err, v9.Nil) { r.logger.Errorf("error retrieving pending Redis messages: %v", err) break @@ -537,7 +536,7 @@ func (r *redisStreams) reclaimPendingMessages(ctx context.Context, stream string MinIdle: r.metadata.processingTimeout, Messages: msgIDs, }).Result() - if err != nil && !errors.Is(err, redis.Nil) { + if err != nil && !errors.Is(err, v9.Nil) { r.logger.Errorf("error claiming pending Redis messages: %v", err) break @@ -549,7 +548,7 @@ func (r *redisStreams) reclaimPendingMessages(ctx context.Context, stream string // If the Redis nil error is returned, it means somes message in the pending // state no longer exist. We need to acknowledge these messages to // remove them from the pending list. - if errors.Is(err, redis.Nil) { + if errors.Is(err, v9.Nil) { // Build a set of message IDs that were not returned // that potentially no longer exist. expectedMsgIDs := make(map[string]struct{}, len(msgIDs)) @@ -579,14 +578,14 @@ func (r *redisStreams) removeMessagesThatNoLongerExistFromPending(ctx context.Co MinIdle: 0, Messages: []string{pendingID}, }).Result() - if err != nil && !errors.Is(err, redis.Nil) { + if err != nil && !errors.Is(err, v8.Nil) { r.logger.Errorf("error claiming pending Redis message %s: %v", pendingID, err) continue } // Ack the message to remove it from the pending list. - if errors.Is(err, redis.Nil) { + if errors.Is(err, v8.Nil) { // Use the background context in case subscriptionCtx is already closed if err = r.clientv8.XAck(context.Background(), stream, r.metadata.consumerID, pendingID).Err(); err != nil { r.logger.Errorf("error acknowledging Redis message %s after failed claim for %s: %v", pendingID, stream, err) @@ -606,14 +605,14 @@ func (r *redisStreams) removeMessagesThatNoLongerExistFromPending(ctx context.Co MinIdle: 0, Messages: []string{pendingID}, }).Result() - if err != nil && !errors.Is(err, redis.Nil) { + if err != nil && !errors.Is(err, v9.Nil) { r.logger.Errorf("error claiming pending Redis message %s: %v", pendingID, err) continue } // Ack the message to remove it from the pending list. - if errors.Is(err, redis.Nil) { + if errors.Is(err, v9.Nil) { // Use the background context in case subscriptionCtx is already closed if err = r.clientv9.XAck(context.Background(), stream, r.metadata.consumerID, pendingID).Err(); err != nil { r.logger.Errorf("error acknowledging Redis message %s after failed claim for %s: %v", pendingID, stream, err) diff --git a/state/redis/redis.go b/state/redis/redis.go index 6f9a0b03a2..ffb1c6e585 100644 --- a/state/redis/redis.go +++ b/state/redis/redis.go @@ -144,6 +144,9 @@ func (r *StateStore) Init(metadata state.Metadata) error { r.metadata = m if rediscomponent.IsLegacyRedisVersion(metadata.Properties) { r.legacyRedis = true + } else { + r.legacyRedis = false + r.logger.Warnf("Redis version 7 and above uses a Beta SDK at this time. Please use caution.") } defaultSettings := rediscomponent.Settings{RedisMaxRetries: m.MaxRetries, RedisMaxRetryInterval: rediscomponent.Duration(m.MaxRetryBackoff)} @@ -530,7 +533,6 @@ func (r *StateStore) Multi(request *state.TransactionalStateRequest) error { pipe.(redisClientv9.Pipeliner).Do(r.ctx, "PERSIST", req.Key) } } - } else if o.Operation == state.Delete { req := o.Request.(state.DeleteRequest) if req.ETag == nil { From 6acc590dd5195bd9f9a43ed24574176e1b2bc743 Mon Sep 17 00:00:00 2001 From: Bernd Verst <4535280+berndverst@users.noreply.github.com> Date: Thu, 15 Sep 2022 15:08:17 -0700 Subject: [PATCH 3/3] run cert test with redis6 Signed-off-by: Bernd Verst <4535280+berndverst@users.noreply.github.com> --- .../components/docker/enableTLSConf/redisstatestore.yaml | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/tests/certification/state/redis/components/docker/enableTLSConf/redisstatestore.yaml b/tests/certification/state/redis/components/docker/enableTLSConf/redisstatestore.yaml index 99f3d4f31a..ffd96690f9 100644 --- a/tests/certification/state/redis/components/docker/enableTLSConf/redisstatestore.yaml +++ b/tests/certification/state/redis/components/docker/enableTLSConf/redisstatestore.yaml @@ -15,4 +15,6 @@ spec: - name: timeout value: 100s - name: enableTLS - value: true \ No newline at end of file + value: true + - name: redisVersion + value: "6" \ No newline at end of file