Skip to content

Commit

Permalink
Introduce SpreadMinimizingZones config in LifecyclerConfig and BasicL…
Browse files Browse the repository at this point in the history
…ifecyclerConfig

Signed-off-by: Yuri Nikolic <[email protected]>
  • Loading branch information
duricanikolic committed Jun 20, 2023
1 parent ded3750 commit 4d259ed
Show file tree
Hide file tree
Showing 9 changed files with 217 additions and 56 deletions.
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,9 @@
* [ENHANCEMENT] Ring: add `ReplicationSet.ZoneCount()` method. #298
* [ENHANCEMENT] Ring: add request minimization to `DoUntilQuorum` method. #306
* [ENHANCEMENT] grpcclient: add `<prefix>.initial-stream-window-size` and `<prefix>.initial-connection-window-size` configuration flags to alter HTTP flow control options for a gRPC client. #312
* [ENHANCEMENT] Added `TokenGenerator` interface with `RandomTokenGenerator` (generating random tokens), and `SpreadMinimizingTokenGenerator` (generating tokens with almost even distribution) implementation. By default `RandomTokenGenerator` is used. #321
* [ENHANCEMENT] Lifecycler: Added `SpreadMinimizingZones` configuration for configuration of zones in which tokens are generated by using `SpreadMinimizingTokenGenerator`. The former is empty by default, meaning that `RandomTokenGenerator` is used. #323
* [ENHANCEMENT] BasicLifecycler: Added `SpreadMinimizingZones` configuration for configuration of zones in which tokens are generated by using `SpreadMinimizingTokenGenerator`. The former is empty by default, meaning that `RandomTokenGenerator` is used. #323
* [BUGFIX] spanlogger: Support multiple tenant IDs. #59
* [BUGFIX] Memberlist: fixed corrupted packets when sending compound messages with more than 255 messages or messages bigger than 64KB. #85
* [BUGFIX] Ring: `ring_member_ownership_percent` and `ring_tokens_owned` metrics are not updated on scale down. #109
Expand Down
10 changes: 9 additions & 1 deletion ring/basic_lifecycler.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,10 @@ type BasicLifecyclerConfig struct {
// If true lifecycler doesn't unregister instance from the ring when it's stopping. Default value is false,
// which means unregistering.
KeepInstanceInTheRingOnShutdown bool

// If not empty, specifies in which zones tokens will be generated by SpreadMinimizingTokenGenerator.
// By default it is empty, which means that RandomTokenGenerator is used for all zones.
SpreadMinimizingConfig SpreadMinimizingConfig
}

// BasicLifecycler is a basic ring lifecycler which allows to hook custom
Expand Down Expand Up @@ -93,7 +97,11 @@ type BasicLifecycler struct {

// NewBasicLifecycler makes a new BasicLifecycler.
func NewBasicLifecycler(cfg BasicLifecyclerConfig, ringName, ringKey string, store kv.Client, delegate BasicLifecyclerDelegate, logger log.Logger, reg prometheus.Registerer) (*BasicLifecycler, error) {
tokenGenerator := NewRandomTokenGenerator()
tokenGenerator, err := newTokenGenerator(cfg.SpreadMinimizingConfig, cfg.ID, cfg.Zone, logger)
if err != nil {
return nil, err
}

l := &BasicLifecycler{
cfg: cfg,
ringName: ringName,
Expand Down
33 changes: 31 additions & 2 deletions ring/basic_lifecycler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,38 @@ import (
const (
testRingKey = "test"
testRingName = "test"
testInstanceID = "test-id"
testInstanceID = "test-id-1"
)

func TestBasicLifecycler_GetTokenGenerator(t *testing.T) {
cfg := prepareBasicLifecyclerConfig()

spreadMinimizingCfg := SpreadMinimizingConfig{}
cfg.SpreadMinimizingConfig = spreadMinimizingCfg

// If SpreadMinimizingZones is empty, RandomTokenGenerator is used
lifecycler, _, _, err := prepareBasicLifecycler(t, cfg)
require.NoError(t, err)
tokenGenerator1, ok := lifecycler.tokenGenerator.(*RandomTokenGenerator)
require.True(t, ok)
require.NotNil(t, tokenGenerator1)

// If SpreadMinimizingZones is not empty, SpreadMinimizingTokenGenerator is used
spreadMinimizingCfg.SpreadMinimizingZones = []string{zone(1), zone(2), zone(3)}
cfg.SpreadMinimizingConfig = spreadMinimizingCfg
lifecycler, _, _, err = prepareBasicLifecycler(t, cfg)
require.NoError(t, err)
tokenGenerator2, ok := lifecycler.tokenGenerator.(*SpreadMinimizingTokenGenerator)
require.True(t, ok)
require.NotNil(t, tokenGenerator2)

// If SpreadMinimizingZones is not empty, but configured bad, an error is returned
spreadMinimizingCfg.SpreadMinimizingZones = []string{zone(2), zone(3)}
cfg.SpreadMinimizingConfig = spreadMinimizingCfg
_, _, _, err = prepareBasicLifecycler(t, cfg)
require.Error(t, err)
}

func TestBasicLifecycler_RegisterOnStart(t *testing.T) {
tests := map[string]struct {
initialInstanceID string
Expand Down Expand Up @@ -489,7 +518,7 @@ func prepareBasicLifecyclerConfig() BasicLifecyclerConfig {
return BasicLifecyclerConfig{
ID: testInstanceID,
Addr: "127.0.0.1:12345",
Zone: "test-zone",
Zone: zone(1),
HeartbeatPeriod: time.Minute,
TokensObservePeriod: 0,
NumTokens: 5,
Expand Down
8 changes: 7 additions & 1 deletion ring/lifecycler.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,8 @@ type LifecyclerConfig struct {

// Injected internally
ListenPort int `yaml:"-"`

SpreadMinimizingConfig SpreadMinimizingConfig `yaml:"spread_minimizing_config" category:"experimental"`
}

// RegisterFlags adds the flags required to config this to the given FlagSet.
Expand All @@ -66,6 +68,7 @@ func (cfg *LifecyclerConfig) RegisterFlags(f *flag.FlagSet, logger log.Logger) {
// The default values of some flags can be changed; see docs of LifecyclerConfig.
func (cfg *LifecyclerConfig) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet, logger log.Logger) {
cfg.RingConfig.RegisterFlagsWithPrefix(prefix, f)
cfg.SpreadMinimizingConfig.RegisterFlagsWithPrefix(prefix, f)

// In order to keep backwards compatibility all of these need to be prefixed
// with "ingester."
Expand Down Expand Up @@ -170,7 +173,10 @@ func NewLifecycler(cfg LifecyclerConfig, flushTransferer FlushTransferer, ringNa
flushTransferer = NewNoopFlushTransferer()
}

tokenGenerator := NewRandomTokenGenerator()
tokenGenerator, err := newTokenGenerator(cfg.SpreadMinimizingConfig, cfg.ID, cfg.Zone, logger)
if err != nil {
return nil, err
}
l := &Lifecycler{
cfg: cfg,
flushTransferer: flushTransferer,
Expand Down
36 changes: 35 additions & 1 deletion ring/lifecycler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ func testLifecyclerConfig(ringConfig Config, id string) LifecyclerConfig {
lifecyclerConfig.RingConfig = ringConfig
lifecyclerConfig.NumTokens = 1
lifecyclerConfig.ID = id
lifecyclerConfig.Zone = "zone1"
lifecyclerConfig.Zone = zone(1)
lifecyclerConfig.FinalSleep = 0
lifecyclerConfig.HeartbeatPeriod = 100 * time.Millisecond

Expand All @@ -48,6 +48,40 @@ func checkNormalised(d interface{}, id string) bool {
len(desc.Ingesters[id].Tokens) == 1
}

func TestLifecycler_TokenGenerator(t *testing.T) {
ringStore, closer := consul.NewInMemoryClient(GetCodec(), log.NewNopLogger(), nil)
t.Cleanup(func() { assert.NoError(t, closer.Close()) })

var ringConfig Config
var spreadMinimizingConfig SpreadMinimizingConfig
flagext.DefaultValues(&ringConfig, &spreadMinimizingConfig)
ringConfig.KVStore.Mock = ringStore

cfg := testLifecyclerConfig(ringConfig, "instance-1")

// If SpreadMinimizingConfig is empty, RandomTokenGenerator is used
lifecycler, err := NewLifecycler(cfg, &nopFlushTransferer{}, "ingester", ringKey, true, log.NewNopLogger(), nil)
require.NoError(t, err)
tokenGenerator1, ok := lifecycler.tokenGenerator.(*RandomTokenGenerator)
require.True(t, ok)
require.NotNil(t, tokenGenerator1)

// If SpreadMinimizingConfig is not empty, SpreadMinimizingTokenGenerator is used
spreadMinimizingConfig.SpreadMinimizingZones = []string{zone(1), zone(2), zone(3)}
cfg.SpreadMinimizingConfig = spreadMinimizingConfig
lifecycler, err = NewLifecycler(cfg, &nopFlushTransferer{}, "ingester", ringKey, true, log.NewNopLogger(), nil)
require.NoError(t, err)
tokenGenerator2, ok := lifecycler.tokenGenerator.(*SpreadMinimizingTokenGenerator)
require.True(t, ok)
require.NotNil(t, tokenGenerator2)

// If SpreadMinimizingZones is not empty, but configured bad, an error is returned
spreadMinimizingConfig.SpreadMinimizingZones = []string{zone(2), zone(3)}
cfg.SpreadMinimizingConfig = spreadMinimizingConfig
_, err = NewLifecycler(cfg, &nopFlushTransferer{}, "ingester", ringKey, true, log.NewNopLogger(), nil)
require.Error(t, err)
}

func TestLifecycler_HealthyInstancesCount(t *testing.T) {
ringStore, closer := consul.NewInMemoryClient(GetCodec(), log.NewNopLogger(), nil)
t.Cleanup(func() { assert.NoError(t, closer.Close()) })
Expand Down
45 changes: 30 additions & 15 deletions ring/spread_minimizing_token_generator.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package ring

import (
"container/heap"
"flag"
"fmt"
"math"
"regexp"
Expand All @@ -10,6 +11,9 @@ import (

"github.com/go-kit/log"
"github.com/go-kit/log/level"

"github.com/grafana/dskit/flagext"

"golang.org/x/exp/slices"
)

Expand All @@ -24,8 +28,8 @@ var (
errorBadInstanceIDFormat = func(instanceID string) error {
return fmt.Errorf("unable to extract instance id from \"%s\"", instanceID)
}
errorZoneCountTooBig = func(zonesCount int) error {
return fmt.Errorf("number of zones %d is too big: it should be higher than %d", zonesCount, maxZonesCount)
errorZoneCountOutOfBound = func(zonesCount int) error {
return fmt.Errorf("number of zones %d is not correct: it must be greater than 0 and less or equal than %d", zonesCount, maxZonesCount)
}
errorZoneNotValid = func(zone string) error {
return fmt.Errorf("zone %s is not valid", zone)
Expand All @@ -42,32 +46,44 @@ var (
)

type SpreadMinimizingConfig struct {
InstanceID string
Zone string
SpreadMinimizingZones flagext.StringSliceCSV `yaml:"spread_minimizing_zones" category:"experimental"`
}

// RegisterFlags adds the flags required to config this to the given FlagSet
func (cfg *SpreadMinimizingConfig) RegisterFlags(f *flag.FlagSet) {
cfg.RegisterFlagsWithPrefix("", f)
}

// RegisterFlagsWithPrefix adds the flags required to config this to the given FlagSet with a specified prefix
func (cfg *SpreadMinimizingConfig) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet) {
f.Var(&cfg.SpreadMinimizingZones, prefix+"spread-minimizing-zones", "Comma-separated list of zones whose tokens are generated by SpreadMinimizingTokenGenerator. By default it is empty, which means that tokens in all zones are generated by RandomTokenGenerator.")
}

func (cfg *SpreadMinimizingConfig) Empty() bool {
return len(cfg.SpreadMinimizingZones) == 0
}

type SpreadMinimizingTokenGenerator struct {
cfg SpreadMinimizingConfig
instanceID int
zoneID int
zones []string
logger log.Logger
}

func NewSpreadMinimizingTokenGenerator(cfg SpreadMinimizingConfig, zones []string, logger log.Logger) (*SpreadMinimizingTokenGenerator, error) {
if len(zones) > maxZonesCount {
return nil, errorZoneCountTooBig(len(zones))
func NewSpreadMinimizingTokenGenerator(cfg SpreadMinimizingConfig, instance, zone string, logger log.Logger) (*SpreadMinimizingTokenGenerator, error) {
if len(cfg.SpreadMinimizingZones) <= 0 || len(cfg.SpreadMinimizingZones) > maxZonesCount {
return nil, errorZoneCountOutOfBound(len(cfg.SpreadMinimizingZones))
}
sortedZones := make([]string, len(zones))
copy(sortedZones, zones)
sortedZones := make([]string, len(cfg.SpreadMinimizingZones))
copy(sortedZones, cfg.SpreadMinimizingZones)
if !slices.IsSorted(sortedZones) {
sort.Strings(sortedZones)
}
instanceID, err := parseInstanceID(cfg.InstanceID)
instanceID, err := parseInstanceID(instance)
if err != nil {
return nil, err
}
zoneID, err := findZoneID(cfg.Zone, sortedZones)
zoneID, err := findZoneID(zone, sortedZones)
if err != nil {
return nil, err
}
Expand All @@ -76,7 +92,6 @@ func NewSpreadMinimizingTokenGenerator(cfg SpreadMinimizingConfig, zones []strin
cfg: cfg,
instanceID: instanceID,
zoneID: zoneID,
zones: sortedZones,
logger: logger,
}
return tokenGenerator, nil
Expand Down Expand Up @@ -250,7 +265,7 @@ func (t *SpreadMinimizingTokenGenerator) generateTokensByInstanceID() map[int]To
if highestOwnershipInstance == nil || highestOwnershipInstance.ownership <= float64(optimalTokenOwnership) {
level.Warn(t.logger).Log("msg", "it was impossible to add a token because the instance with the highest ownership cannot satisfy the request", "added tokens", addedTokens+1, "highest ownership", highestOwnershipInstance.ownership, "requested ownership", optimalTokenOwnership)
// if this happens, it means that we cannot accommodate other tokens, so we panic
err := fmt.Errorf("it was impossible to add %dth token for instance with id %d in zone %s because the instance with the highest ownership cannot satisfy the requested ownership %d", addedTokens+1, i, t.cfg.Zone, optimalTokenOwnership)
err := fmt.Errorf("it was impossible to add %dth token for instance with id %d in zone %s because the instance with the highest ownership cannot satisfy the requested ownership %d", addedTokens+1, i, t.cfg.SpreadMinimizingZones[t.zoneID], optimalTokenOwnership)
panic(err)
}
tokensQueue := tokensQueues[highestOwnershipInstance.item.instanceID]
Expand All @@ -266,7 +281,7 @@ func (t *SpreadMinimizingTokenGenerator) generateTokensByInstanceID() map[int]To
if err != nil {
level.Error(t.logger).Log("msg", "it was impossible to calculate a new token because an error occurred", "err", err)
// if this happens, it means that we cannot accommodate additional tokens, so we panic
err := fmt.Errorf("it was impossible to calculate the %dth token for instance with id %d in zone %s", addedTokens+1, i, t.cfg.Zone)
err := fmt.Errorf("it was impossible to calculate the %dth token for instance with id %d in zone %s", addedTokens+1, i, t.cfg.SpreadMinimizingZones[t.zoneID])
panic(err)
}
tokens = append(tokens, newToken)
Expand Down
Loading

0 comments on commit 4d259ed

Please sign in to comment.