Skip to content

Commit

Permalink
Introduce RingTokenGenerator configuration in LifecyclerConfig and Ba…
Browse files Browse the repository at this point in the history
…sicLifecyclerConfig (#323)

* Introduce SpreadMinimizingZones config in LifecyclerConfig and BasicLifecyclerConfig

Signed-off-by: Yuri Nikolic <[email protected]>

* Implementing review findings

Signed-off-by: Yuri Nikolic <[email protected]>

* Implementing review findings 2

Signed-off-by: Yuri Nikolic <[email protected]>

---------

Signed-off-by: Yuri Nikolic <[email protected]>
  • Loading branch information
duricanikolic authored Jun 20, 2023
1 parent ded3750 commit 3dc2113
Show file tree
Hide file tree
Showing 7 changed files with 166 additions and 68 deletions.
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,9 @@
* [ENHANCEMENT] Ring: add `ReplicationSet.ZoneCount()` method. #298
* [ENHANCEMENT] Ring: add request minimization to `DoUntilQuorum` method. #306
* [ENHANCEMENT] grpcclient: add `<prefix>.initial-stream-window-size` and `<prefix>.initial-connection-window-size` configuration flags to alter HTTP flow control options for a gRPC client. #312
* [ENHANCEMENT] Added `TokenGenerator` interface with `RandomTokenGenerator` (generating random tokens), and `SpreadMinimizingTokenGenerator` (generating tokens with almost even distribution) implementation. By default `RandomTokenGenerator` is used. #321
* [ENHANCEMENT] Lifecycler: Added `RingTokenGenerator` configuration that specifies the `TokenGenerator` implementation that is used for token generation. Default value is nil, meaning that `RandomTokenGenerator` is used. #323
* [ENHANCEMENT] BasicLifecycler: Added `RingTokenGenerator` configuration that specifies the `TokenGenerator` implementation that is used for token generation. Default value is nil, meaning that `RandomTokenGenerator` is used. #323
* [BUGFIX] spanlogger: Support multiple tenant IDs. #59
* [BUGFIX] Memberlist: fixed corrupted packets when sending compound messages with more than 255 messages or messages bigger than 64KB. #85
* [BUGFIX] Ring: `ring_member_ownership_percent` and `ring_tokens_owned` metrics are not updated on scale down. #109
Expand Down
10 changes: 9 additions & 1 deletion ring/basic_lifecycler.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,10 @@ type BasicLifecyclerConfig struct {
// If true lifecycler doesn't unregister instance from the ring when it's stopping. Default value is false,
// which means unregistering.
KeepInstanceInTheRingOnShutdown bool

// If set, specifies the TokenGenerator implementation that will be used for generating tokens.
// Default value is nil, which means that RandomTokenGenerator is used.
RingTokenGenerator TokenGenerator
}

// BasicLifecycler is a basic ring lifecycler which allows to hook custom
Expand Down Expand Up @@ -93,7 +97,11 @@ type BasicLifecycler struct {

// NewBasicLifecycler makes a new BasicLifecycler.
func NewBasicLifecycler(cfg BasicLifecyclerConfig, ringName, ringKey string, store kv.Client, delegate BasicLifecyclerDelegate, logger log.Logger, reg prometheus.Registerer) (*BasicLifecycler, error) {
tokenGenerator := NewRandomTokenGenerator()
tokenGenerator := cfg.RingTokenGenerator
if tokenGenerator == nil {
tokenGenerator = NewRandomTokenGenerator()
}

l := &BasicLifecycler{
cfg: cfg,
ringName: ringName,
Expand Down
29 changes: 27 additions & 2 deletions ring/basic_lifecycler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,34 @@ import (
const (
testRingKey = "test"
testRingName = "test"
testInstanceID = "test-id"
testInstanceID = "test-id-1"
)

func TestBasicLifecycler_GetTokenGenerator(t *testing.T) {
cfg := prepareBasicLifecyclerConfig()

spreadMinimizingTokenGenerator, err := NewSpreadMinimizingTokenGenerator(cfg.ID, cfg.Zone, []string{zone(1), zone(2), zone(3)}, log.NewNopLogger())
require.NoError(t, err)

tests := []TokenGenerator{nil, NewRandomTokenGenerator(), spreadMinimizingTokenGenerator}

for _, testData := range tests {
cfg.RingTokenGenerator = testData
lifecycler, _, _, err := prepareBasicLifecycler(t, cfg)
require.NoError(t, err)
if testData == nil {
// If cfg.RingTokenGenerator is empty, RandomTokenGenerator is used
tokenGenerator, ok := lifecycler.tokenGenerator.(*RandomTokenGenerator)
require.True(t, ok)
require.NotNil(t, tokenGenerator)
} else {
// If cfg.RingTokenGenerator is not empty, it is used
require.NotNil(t, lifecycler.tokenGenerator)
require.Equal(t, testData, lifecycler.tokenGenerator)
}
}
}

func TestBasicLifecycler_RegisterOnStart(t *testing.T) {
tests := map[string]struct {
initialInstanceID string
Expand Down Expand Up @@ -489,7 +514,7 @@ func prepareBasicLifecyclerConfig() BasicLifecyclerConfig {
return BasicLifecyclerConfig{
ID: testInstanceID,
Addr: "127.0.0.1:12345",
Zone: "test-zone",
Zone: zone(1),
HeartbeatPeriod: time.Minute,
TokensObservePeriod: 0,
NumTokens: 5,
Expand Down
10 changes: 9 additions & 1 deletion ring/lifecycler.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,10 @@ type LifecyclerConfig struct {

// Injected internally
ListenPort int `yaml:"-"`

// If set, specifies the TokenGenerator implementation that will be used for generating tokens.
// Default value is nil, which means that RandomTokenGenerator is used.
RingTokenGenerator TokenGenerator `yaml:"-"`
}

// RegisterFlags adds the flags required to config this to the given FlagSet.
Expand Down Expand Up @@ -170,7 +174,11 @@ func NewLifecycler(cfg LifecyclerConfig, flushTransferer FlushTransferer, ringNa
flushTransferer = NewNoopFlushTransferer()
}

tokenGenerator := NewRandomTokenGenerator()
tokenGenerator := cfg.RingTokenGenerator
if tokenGenerator == nil {
tokenGenerator = NewRandomTokenGenerator()
}

l := &Lifecycler{
cfg: cfg,
flushTransferer: flushTransferer,
Expand Down
34 changes: 33 additions & 1 deletion ring/lifecycler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ func testLifecyclerConfig(ringConfig Config, id string) LifecyclerConfig {
lifecyclerConfig.RingConfig = ringConfig
lifecyclerConfig.NumTokens = 1
lifecyclerConfig.ID = id
lifecyclerConfig.Zone = "zone1"
lifecyclerConfig.Zone = zone(1)
lifecyclerConfig.FinalSleep = 0
lifecyclerConfig.HeartbeatPeriod = 100 * time.Millisecond

Expand All @@ -48,6 +48,38 @@ func checkNormalised(d interface{}, id string) bool {
len(desc.Ingesters[id].Tokens) == 1
}

func TestLifecycler_TokenGenerator(t *testing.T) {
ringStore, closer := consul.NewInMemoryClient(GetCodec(), log.NewNopLogger(), nil)
t.Cleanup(func() { assert.NoError(t, closer.Close()) })

var ringConfig Config
flagext.DefaultValues(&ringConfig)
ringConfig.KVStore.Mock = ringStore

cfg := testLifecyclerConfig(ringConfig, "instance-1")

spreadMinimizingTokenGenerator, err := NewSpreadMinimizingTokenGenerator(cfg.ID, cfg.Zone, []string{zone(1), zone(2), zone(3)}, log.NewNopLogger())
require.NoError(t, err)

tests := []TokenGenerator{nil, NewRandomTokenGenerator(), spreadMinimizingTokenGenerator}

for _, testData := range tests {
cfg.RingTokenGenerator = testData
lifecycler, err := NewLifecycler(cfg, &nopFlushTransferer{}, "ingester", ringKey, true, log.NewNopLogger(), nil)
require.NoError(t, err)
if testData == nil {
// If cfg.RingTokenGenerator is empty, RandomTokenGenerator is used
tokenGenerator, ok := lifecycler.tokenGenerator.(*RandomTokenGenerator)
require.True(t, ok)
require.NotNil(t, tokenGenerator)
} else {
// If cfg.RingTokenGenerator is not empty, it is used
require.NotNil(t, lifecycler.tokenGenerator)
require.Equal(t, testData, lifecycler.tokenGenerator)
}
}
}

func TestLifecycler_HealthyInstancesCount(t *testing.T) {
ringStore, closer := consul.NewInMemoryClient(GetCodec(), log.NewNopLogger(), nil)
t.Cleanup(func() { assert.NoError(t, closer.Close()) })
Expand Down
46 changes: 20 additions & 26 deletions ring/spread_minimizing_token_generator.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (

"github.com/go-kit/log"
"github.com/go-kit/log/level"

"golang.org/x/exp/slices"
)

Expand All @@ -24,8 +25,8 @@ var (
errorBadInstanceIDFormat = func(instanceID string) error {
return fmt.Errorf("unable to extract instance id from \"%s\"", instanceID)
}
errorZoneCountTooBig = func(zonesCount int) error {
return fmt.Errorf("number of zones %d is too big: it should be higher than %d", zonesCount, maxZonesCount)
errorZoneCountOutOfBound = func(zonesCount int) error {
return fmt.Errorf("number of zones %d is not correct: it must be greater than 0 and less or equal than %d", zonesCount, maxZonesCount)
}
errorZoneNotValid = func(zone string) error {
return fmt.Errorf("zone %s is not valid", zone)
Expand All @@ -41,43 +42,36 @@ var (
}
)

type SpreadMinimizingConfig struct {
InstanceID string
Zone string
}

type SpreadMinimizingTokenGenerator struct {
cfg SpreadMinimizingConfig
instanceID int
zoneID int
zones []string
logger log.Logger
instanceID int
zoneID int
spreadMinimizingZones []string
logger log.Logger
}

func NewSpreadMinimizingTokenGenerator(cfg SpreadMinimizingConfig, zones []string, logger log.Logger) (*SpreadMinimizingTokenGenerator, error) {
if len(zones) > maxZonesCount {
return nil, errorZoneCountTooBig(len(zones))
func NewSpreadMinimizingTokenGenerator(instance, zone string, spreadMinimizingZones []string, logger log.Logger) (*SpreadMinimizingTokenGenerator, error) {
if len(spreadMinimizingZones) <= 0 || len(spreadMinimizingZones) > maxZonesCount {
return nil, errorZoneCountOutOfBound(len(spreadMinimizingZones))
}
sortedZones := make([]string, len(zones))
copy(sortedZones, zones)
sortedZones := make([]string, len(spreadMinimizingZones))
copy(sortedZones, spreadMinimizingZones)
if !slices.IsSorted(sortedZones) {
sort.Strings(sortedZones)
}
instanceID, err := parseInstanceID(cfg.InstanceID)
instanceID, err := parseInstanceID(instance)
if err != nil {
return nil, err
}
zoneID, err := findZoneID(cfg.Zone, sortedZones)
zoneID, err := findZoneID(zone, sortedZones)
if err != nil {
return nil, err
}

tokenGenerator := &SpreadMinimizingTokenGenerator{
cfg: cfg,
instanceID: instanceID,
zoneID: zoneID,
zones: sortedZones,
logger: logger,
instanceID: instanceID,
zoneID: zoneID,
spreadMinimizingZones: sortedZones,
logger: logger,
}
return tokenGenerator, nil
}
Expand Down Expand Up @@ -250,7 +244,7 @@ func (t *SpreadMinimizingTokenGenerator) generateTokensByInstanceID() map[int]To
if highestOwnershipInstance == nil || highestOwnershipInstance.ownership <= float64(optimalTokenOwnership) {
level.Warn(t.logger).Log("msg", "it was impossible to add a token because the instance with the highest ownership cannot satisfy the request", "added tokens", addedTokens+1, "highest ownership", highestOwnershipInstance.ownership, "requested ownership", optimalTokenOwnership)
// if this happens, it means that we cannot accommodate other tokens, so we panic
err := fmt.Errorf("it was impossible to add %dth token for instance with id %d in zone %s because the instance with the highest ownership cannot satisfy the requested ownership %d", addedTokens+1, i, t.cfg.Zone, optimalTokenOwnership)
err := fmt.Errorf("it was impossible to add %dth token for instance with id %d in zone %s because the instance with the highest ownership cannot satisfy the requested ownership %d", addedTokens+1, i, t.spreadMinimizingZones[t.zoneID], optimalTokenOwnership)
panic(err)
}
tokensQueue := tokensQueues[highestOwnershipInstance.item.instanceID]
Expand All @@ -266,7 +260,7 @@ func (t *SpreadMinimizingTokenGenerator) generateTokensByInstanceID() map[int]To
if err != nil {
level.Error(t.logger).Log("msg", "it was impossible to calculate a new token because an error occurred", "err", err)
// if this happens, it means that we cannot accommodate additional tokens, so we panic
err := fmt.Errorf("it was impossible to calculate the %dth token for instance with id %d in zone %s", addedTokens+1, i, t.cfg.Zone)
err := fmt.Errorf("it was impossible to calculate the %dth token for instance with id %d in zone %s", addedTokens+1, i, t.spreadMinimizingZones[t.zoneID])
panic(err)
}
tokens = append(tokens, newToken)
Expand Down
Loading

0 comments on commit 3dc2113

Please sign in to comment.