Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Introduce RingTokenGenerator configuration in LifecyclerConfig and BasicLifecyclerConfig #323

Merged
merged 3 commits into from
Jun 20, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,9 @@
* [ENHANCEMENT] Ring: add `ReplicationSet.ZoneCount()` method. #298
* [ENHANCEMENT] Ring: add request minimization to `DoUntilQuorum` method. #306
* [ENHANCEMENT] grpcclient: add `<prefix>.initial-stream-window-size` and `<prefix>.initial-connection-window-size` configuration flags to alter HTTP flow control options for a gRPC client. #312
* [ENHANCEMENT] Added `TokenGenerator` interface with `RandomTokenGenerator` (generating random tokens), and `SpreadMinimizingTokenGenerator` (generating tokens with almost even distribution) implementation. By default `RandomTokenGenerator` is used. #321
* [ENHANCEMENT] Lifecycler: Added `RingTokenGenerator` configuration that specifies the `TokenGenerator` implementation that is used for token generation. Default value is nil, meaning that `RandomTokenGenerator` is used. #323
* [ENHANCEMENT] BasicLifecycler: Added `RingTokenGenerator` configuration that specifies the `TokenGenerator` implementation that is used for token generation. Default value is nil, meaning that `RandomTokenGenerator` is used. #323
* [BUGFIX] spanlogger: Support multiple tenant IDs. #59
* [BUGFIX] Memberlist: fixed corrupted packets when sending compound messages with more than 255 messages or messages bigger than 64KB. #85
* [BUGFIX] Ring: `ring_member_ownership_percent` and `ring_tokens_owned` metrics are not updated on scale down. #109
Expand Down
10 changes: 9 additions & 1 deletion ring/basic_lifecycler.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,10 @@ type BasicLifecyclerConfig struct {
// If true lifecycler doesn't unregister instance from the ring when it's stopping. Default value is false,
// which means unregistering.
KeepInstanceInTheRingOnShutdown bool

// If set, specifies the TokenGenerator implementation that will be used for generating tokens.
// Default value is nil, which means that RandomTokenGenerator is used.
RingTokenGenerator TokenGenerator
}

// BasicLifecycler is a basic ring lifecycler which allows to hook custom
Expand Down Expand Up @@ -93,7 +97,11 @@ type BasicLifecycler struct {

// NewBasicLifecycler makes a new BasicLifecycler.
func NewBasicLifecycler(cfg BasicLifecyclerConfig, ringName, ringKey string, store kv.Client, delegate BasicLifecyclerDelegate, logger log.Logger, reg prometheus.Registerer) (*BasicLifecycler, error) {
tokenGenerator := NewRandomTokenGenerator()
tokenGenerator := cfg.RingTokenGenerator
if tokenGenerator == nil {
tokenGenerator = NewRandomTokenGenerator()
}

l := &BasicLifecycler{
cfg: cfg,
ringName: ringName,
Expand Down
29 changes: 27 additions & 2 deletions ring/basic_lifecycler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,34 @@ import (
const (
testRingKey = "test"
testRingName = "test"
testInstanceID = "test-id"
testInstanceID = "test-id-1"
)

func TestBasicLifecycler_GetTokenGenerator(t *testing.T) {
cfg := prepareBasicLifecyclerConfig()

spreadMinimizingTokenGenerator, err := NewSpreadMinimizingTokenGenerator(cfg.ID, cfg.Zone, []string{zone(1), zone(2), zone(3)}, log.NewNopLogger())
require.NoError(t, err)

tests := []TokenGenerator{nil, NewRandomTokenGenerator(), spreadMinimizingTokenGenerator}

for _, testData := range tests {
cfg.RingTokenGenerator = testData
lifecycler, _, _, err := prepareBasicLifecycler(t, cfg)
require.NoError(t, err)
if testData == nil {
// If cfg.RingTokenGenerator is empty, RandomTokenGenerator is used
tokenGenerator, ok := lifecycler.tokenGenerator.(*RandomTokenGenerator)
require.True(t, ok)
require.NotNil(t, tokenGenerator)
} else {
// If cfg.RingTokenGenerator is not empty, it is used
require.NotNil(t, lifecycler.tokenGenerator)
require.Equal(t, testData, lifecycler.tokenGenerator)
}
}
}

func TestBasicLifecycler_RegisterOnStart(t *testing.T) {
tests := map[string]struct {
initialInstanceID string
Expand Down Expand Up @@ -489,7 +514,7 @@ func prepareBasicLifecyclerConfig() BasicLifecyclerConfig {
return BasicLifecyclerConfig{
ID: testInstanceID,
Addr: "127.0.0.1:12345",
Zone: "test-zone",
Zone: zone(1),
HeartbeatPeriod: time.Minute,
TokensObservePeriod: 0,
NumTokens: 5,
Expand Down
10 changes: 9 additions & 1 deletion ring/lifecycler.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,10 @@ type LifecyclerConfig struct {

// Injected internally
ListenPort int `yaml:"-"`

// If set, specifies the TokenGenerator implementation that will be used for generating tokens.
// Default value is nil, which means that RandomTokenGenerator is used.
RingTokenGenerator TokenGenerator `yaml:"-"`
}

// RegisterFlags adds the flags required to config this to the given FlagSet.
Expand Down Expand Up @@ -170,7 +174,11 @@ func NewLifecycler(cfg LifecyclerConfig, flushTransferer FlushTransferer, ringNa
flushTransferer = NewNoopFlushTransferer()
}

tokenGenerator := NewRandomTokenGenerator()
tokenGenerator := cfg.RingTokenGenerator
if tokenGenerator == nil {
tokenGenerator = NewRandomTokenGenerator()
}

l := &Lifecycler{
cfg: cfg,
flushTransferer: flushTransferer,
Expand Down
34 changes: 33 additions & 1 deletion ring/lifecycler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ func testLifecyclerConfig(ringConfig Config, id string) LifecyclerConfig {
lifecyclerConfig.RingConfig = ringConfig
lifecyclerConfig.NumTokens = 1
lifecyclerConfig.ID = id
lifecyclerConfig.Zone = "zone1"
lifecyclerConfig.Zone = zone(1)
lifecyclerConfig.FinalSleep = 0
lifecyclerConfig.HeartbeatPeriod = 100 * time.Millisecond

Expand All @@ -48,6 +48,38 @@ func checkNormalised(d interface{}, id string) bool {
len(desc.Ingesters[id].Tokens) == 1
}

func TestLifecycler_TokenGenerator(t *testing.T) {
ringStore, closer := consul.NewInMemoryClient(GetCodec(), log.NewNopLogger(), nil)
t.Cleanup(func() { assert.NoError(t, closer.Close()) })

var ringConfig Config
flagext.DefaultValues(&ringConfig)
ringConfig.KVStore.Mock = ringStore

cfg := testLifecyclerConfig(ringConfig, "instance-1")

spreadMinimizingTokenGenerator, err := NewSpreadMinimizingTokenGenerator(cfg.ID, cfg.Zone, []string{zone(1), zone(2), zone(3)}, log.NewNopLogger())
require.NoError(t, err)

tests := []TokenGenerator{nil, NewRandomTokenGenerator(), spreadMinimizingTokenGenerator}

for _, testData := range tests {
cfg.RingTokenGenerator = testData
lifecycler, err := NewLifecycler(cfg, &nopFlushTransferer{}, "ingester", ringKey, true, log.NewNopLogger(), nil)
require.NoError(t, err)
if testData == nil {
// If cfg.RingTokenGenerator is empty, RandomTokenGenerator is used
tokenGenerator, ok := lifecycler.tokenGenerator.(*RandomTokenGenerator)
require.True(t, ok)
require.NotNil(t, tokenGenerator)
} else {
// If cfg.RingTokenGenerator is not empty, it is used
require.NotNil(t, lifecycler.tokenGenerator)
require.Equal(t, testData, lifecycler.tokenGenerator)
}
}
}

func TestLifecycler_HealthyInstancesCount(t *testing.T) {
ringStore, closer := consul.NewInMemoryClient(GetCodec(), log.NewNopLogger(), nil)
t.Cleanup(func() { assert.NoError(t, closer.Close()) })
Expand Down
46 changes: 20 additions & 26 deletions ring/spread_minimizing_token_generator.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (

"github.com/go-kit/log"
"github.com/go-kit/log/level"

"golang.org/x/exp/slices"
)

Expand All @@ -24,8 +25,8 @@ var (
errorBadInstanceIDFormat = func(instanceID string) error {
return fmt.Errorf("unable to extract instance id from \"%s\"", instanceID)
}
errorZoneCountTooBig = func(zonesCount int) error {
return fmt.Errorf("number of zones %d is too big: it should be higher than %d", zonesCount, maxZonesCount)
errorZoneCountOutOfBound = func(zonesCount int) error {
return fmt.Errorf("number of zones %d is not correct: it must be greater than 0 and less or equal than %d", zonesCount, maxZonesCount)
}
errorZoneNotValid = func(zone string) error {
return fmt.Errorf("zone %s is not valid", zone)
Expand All @@ -41,43 +42,36 @@ var (
}
)

type SpreadMinimizingConfig struct {
InstanceID string
Zone string
}

type SpreadMinimizingTokenGenerator struct {
cfg SpreadMinimizingConfig
instanceID int
zoneID int
zones []string
logger log.Logger
instanceID int
zoneID int
spreadMinimizingZones []string
logger log.Logger
}

func NewSpreadMinimizingTokenGenerator(cfg SpreadMinimizingConfig, zones []string, logger log.Logger) (*SpreadMinimizingTokenGenerator, error) {
if len(zones) > maxZonesCount {
return nil, errorZoneCountTooBig(len(zones))
func NewSpreadMinimizingTokenGenerator(instance, zone string, spreadMinimizingZones []string, logger log.Logger) (*SpreadMinimizingTokenGenerator, error) {
if len(spreadMinimizingZones) <= 0 || len(spreadMinimizingZones) > maxZonesCount {
return nil, errorZoneCountOutOfBound(len(spreadMinimizingZones))
}
sortedZones := make([]string, len(zones))
copy(sortedZones, zones)
sortedZones := make([]string, len(spreadMinimizingZones))
copy(sortedZones, spreadMinimizingZones)
if !slices.IsSorted(sortedZones) {
sort.Strings(sortedZones)
}
instanceID, err := parseInstanceID(cfg.InstanceID)
instanceID, err := parseInstanceID(instance)
if err != nil {
return nil, err
}
zoneID, err := findZoneID(cfg.Zone, sortedZones)
zoneID, err := findZoneID(zone, sortedZones)
if err != nil {
return nil, err
}

tokenGenerator := &SpreadMinimizingTokenGenerator{
cfg: cfg,
instanceID: instanceID,
zoneID: zoneID,
zones: sortedZones,
logger: logger,
instanceID: instanceID,
zoneID: zoneID,
spreadMinimizingZones: sortedZones,
logger: logger,
}
return tokenGenerator, nil
}
Expand Down Expand Up @@ -250,7 +244,7 @@ func (t *SpreadMinimizingTokenGenerator) generateTokensByInstanceID() map[int]To
if highestOwnershipInstance == nil || highestOwnershipInstance.ownership <= float64(optimalTokenOwnership) {
level.Warn(t.logger).Log("msg", "it was impossible to add a token because the instance with the highest ownership cannot satisfy the request", "added tokens", addedTokens+1, "highest ownership", highestOwnershipInstance.ownership, "requested ownership", optimalTokenOwnership)
// if this happens, it means that we cannot accommodate other tokens, so we panic
err := fmt.Errorf("it was impossible to add %dth token for instance with id %d in zone %s because the instance with the highest ownership cannot satisfy the requested ownership %d", addedTokens+1, i, t.cfg.Zone, optimalTokenOwnership)
err := fmt.Errorf("it was impossible to add %dth token for instance with id %d in zone %s because the instance with the highest ownership cannot satisfy the requested ownership %d", addedTokens+1, i, t.spreadMinimizingZones[t.zoneID], optimalTokenOwnership)
panic(err)
}
tokensQueue := tokensQueues[highestOwnershipInstance.item.instanceID]
Expand All @@ -266,7 +260,7 @@ func (t *SpreadMinimizingTokenGenerator) generateTokensByInstanceID() map[int]To
if err != nil {
level.Error(t.logger).Log("msg", "it was impossible to calculate a new token because an error occurred", "err", err)
// if this happens, it means that we cannot accommodate additional tokens, so we panic
err := fmt.Errorf("it was impossible to calculate the %dth token for instance with id %d in zone %s", addedTokens+1, i, t.cfg.Zone)
err := fmt.Errorf("it was impossible to calculate the %dth token for instance with id %d in zone %s", addedTokens+1, i, t.spreadMinimizingZones[t.zoneID])
panic(err)
}
tokens = append(tokens, newToken)
Expand Down
Loading