Skip to content

Commit

Permalink
[gocql] Allow to set gocql HostSelectionPolicy (#6533)
Browse files Browse the repository at this point in the history
* [gocql] Allow to set gocql HostSelectionPolicy
  • Loading branch information
mantas-sidlauskas authored Dec 6, 2024
1 parent e5e553f commit d33ce74
Show file tree
Hide file tree
Showing 6 changed files with 70 additions and 9 deletions.
3 changes: 3 additions & 0 deletions common/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -260,6 +260,9 @@ type (
// Otherwise please add new fields to the struct for better documentation
// If being used in any database, update this comment here to make it clear
ConnectAttributes map[string]string `yaml:"connectAttributes"`
// HostSelectionPolicy sets gocql policy for selecting host for a query
// Available selections are: "tokenaware,roundrobin", "hostpool-epsilon-greedy", "roundrobin"
HostSelectionPolicy string `yaml:"hostSelectionPolicy"`
}

// ShardedNoSQL contains configuration to connect to a set of NoSQL Database clusters in a sharded manner
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,12 @@ func newCassandraCluster(cfg ClusterConfig) *gocql.ClusterConfig {
cluster.NumConns = cfg.MaxConns
}

cluster.PoolConfig.HostSelectionPolicy = gocql.TokenAwareHostPolicy(gocql.RoundRobinHostPolicy())
if cfg.HostSelectionPolicy != nil {
cluster.PoolConfig.HostSelectionPolicy = cfg.HostSelectionPolicy
} else {
// set default option if configuration was not provided
cluster.PoolConfig.HostSelectionPolicy = gocql.TokenAwareHostPolicy(gocql.RoundRobinHostPolicy())
}

return cluster
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ import (
"context"
"time"

"github.com/gocql/gocql"

"github.com/uber/cadence/common/config"
"github.com/uber/cadence/common/persistence/nosql/nosqlplugin"
)
Expand Down Expand Up @@ -116,5 +118,6 @@ type (
SerialConsistency SerialConsistency
Timeout time.Duration
ConnectTimeout time.Duration
HostSelectionPolicy gocql.HostSelectionPolicy
}
)
25 changes: 25 additions & 0 deletions common/persistence/nosql/nosqlplugin/cassandra/plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,12 @@
package cassandra

import (
"fmt"
"time"

gogocql "github.com/gocql/gocql"
"github.com/hailocab/go-hostpool"

"github.com/uber/cadence/common/config"
"github.com/uber/cadence/common/log"
"github.com/uber/cadence/common/persistence"
Expand Down Expand Up @@ -116,8 +120,13 @@ func toGoCqlConfig(cfg *config.NoSQL) (gocql.ClusterConfig, error) {
if err != nil {
return gocql.ClusterConfig{}, err
}

serialConsistency, err := gocql.ParseSerialConsistency(cfg.SerialConsistency)
if err != nil {
return gocql.ClusterConfig{}, err
}

hostSelection, err := toHostSelectionPolicy(cfg.HostSelectionPolicy)
if err != nil {
return gocql.ClusterConfig{}, err
}
Expand All @@ -138,5 +147,21 @@ func toGoCqlConfig(cfg *config.NoSQL) (gocql.ClusterConfig, error) {
SerialConsistency: serialConsistency,
Timeout: cfg.Timeout,
ConnectTimeout: cfg.ConnectTimeout,
HostSelectionPolicy: hostSelection,
}, nil
}

func toHostSelectionPolicy(policy string) (gogocql.HostSelectionPolicy, error) {
switch policy {
case "", "tokenaware,roundrobin":
return gogocql.TokenAwareHostPolicy(gogocql.RoundRobinHostPolicy()), nil
case "hostpool-epsilon-greedy":
return gogocql.HostPoolHostPolicy(
hostpool.NewEpsilonGreedy(nil, 0, &hostpool.LinearEpsilonValueCalculator{}),
), nil
case "roundrobin":
return gogocql.RoundRobinHostPolicy(), nil
default:
return nil, fmt.Errorf("unknown gocql host selection policy: %q", policy)
}
}
39 changes: 32 additions & 7 deletions common/persistence/nosql/nosqlplugin/cassandra/plugin_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"testing"
"time"

gogocql "github.com/gocql/gocql"
"github.com/stretchr/testify/assert"

"github.com/uber/cadence/common/config"
Expand All @@ -46,13 +47,14 @@ func Test_toGoCqlConfig(t *testing.T) {
"empty config will be filled with defaults",
&config.NoSQL{},
gocql.ClusterConfig{
Hosts: environment.Localhost,
Port: 9042,
ProtoVersion: 4,
Timeout: time.Second * 10,
Consistency: gocql.LocalQuorum,
SerialConsistency: gocql.LocalSerial,
ConnectTimeout: time.Second * 2,
Hosts: environment.Localhost,
Port: 9042,
ProtoVersion: 4,
Timeout: time.Second * 10,
Consistency: gocql.LocalQuorum,
SerialConsistency: gocql.LocalSerial,
ConnectTimeout: time.Second * 2,
HostSelectionPolicy: gogocql.TokenAwareHostPolicy(gogocql.RoundRobinHostPolicy()),
},
assert.NoError,
},
Expand All @@ -67,3 +69,26 @@ func Test_toGoCqlConfig(t *testing.T) {
})
}
}

func Test_toHostSelectionPolicy(t *testing.T) {
tests := []struct {
name string
policy string
want gogocql.HostSelectionPolicy
wantErr assert.ErrorAssertionFunc
}{
{name: "not supported policy", policy: "non-existing", want: nil, wantErr: assert.Error},
{name: "Default policy", policy: "", want: gogocql.TokenAwareHostPolicy(gogocql.RoundRobinHostPolicy()), wantErr: assert.NoError},
{name: "Round robin policy", policy: "roundrobin", want: gogocql.RoundRobinHostPolicy(), wantErr: assert.NoError},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
got, err := toHostSelectionPolicy(tt.policy)
if !tt.wantErr(t, err, fmt.Sprintf("toHostSelectionPolicy(%v)", tt.policy)) {
return
}

assert.Equal(t, tt.want, got, "toHostSelectionPolicy(%v)", tt.policy)
})
}
}
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ require (
github.com/gogo/status v1.1.0 // indirect
github.com/golang/protobuf v1.5.3 // indirect
github.com/golang/snappy v0.0.4 // indirect
github.com/hailocab/go-hostpool v0.0.0-20160125115350-e80d13ce29ed // indirect
github.com/hailocab/go-hostpool v0.0.0-20160125115350-e80d13ce29ed
github.com/hashicorp/errwrap v1.0.0 // indirect
github.com/hashicorp/go-multierror v1.1.1 // indirect
github.com/hashicorp/go-uuid v1.0.2 // indirect
Expand Down

0 comments on commit d33ce74

Please sign in to comment.