Skip to content

Commit

Permalink
Introduce SpreadMinimizingZones config in LifecyclerConfig and BasicL…
Browse files Browse the repository at this point in the history
…ifecyclerConfig

Signed-off-by: Yuri Nikolic <[email protected]>
  • Loading branch information
duricanikolic committed Jun 19, 2023
1 parent ded3750 commit 877e53d
Show file tree
Hide file tree
Showing 7 changed files with 144 additions and 18 deletions.
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,9 @@
* [ENHANCEMENT] Ring: add `ReplicationSet.ZoneCount()` method. #298
* [ENHANCEMENT] Ring: add request minimization to `DoUntilQuorum` method. #306
* [ENHANCEMENT] grpcclient: add `<prefix>.initial-stream-window-size` and `<prefix>.initial-connection-window-size` configuration flags to alter HTTP flow control options for a gRPC client. #312
* [ENHANCEMENT] Added `TokenGenerator` interface with `RandomTokenGenerator` (generating random tokens), and `SpreadMinimizingTokenGenerator` (generating tokens with almost even distribution) implementation. By default `RandomTokenGenerator` is used. #321
* [ENHANCEMENT] Lifecycler: Added `SpreadMinimizingZones` configuration for configuration of zones in which tokens are generated by using `SpreadMinimizingTokenGenerator`. The former is empty by default, meaning that `RandomTokenGenerator` is used. #323
* [ENHANCEMENT] BasicLifecycler: Added `SpreadMinimizingZones` configuration for configuration of zones in which tokens are generated by using `SpreadMinimizingTokenGenerator`. The former is empty by default, meaning that `RandomTokenGenerator` is used. #323
* [BUGFIX] spanlogger: Support multiple tenant IDs. #59
* [BUGFIX] Memberlist: fixed corrupted packets when sending compound messages with more than 255 messages or messages bigger than 64KB. #85
* [BUGFIX] Ring: `ring_member_ownership_percent` and `ring_tokens_owned` metrics are not updated on scale down. #109
Expand Down
9 changes: 8 additions & 1 deletion ring/basic_lifecycler.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,10 @@ type BasicLifecyclerConfig struct {
// If true lifecycler doesn't unregister instance from the ring when it's stopping. Default value is false,
// which means unregistering.
KeepInstanceInTheRingOnShutdown bool

// If set, it specifies in which zones tokens will be generated by SpreadMinimizingTokenGenerator.
// Default value is an empty slice, which means that RandomTokenGenerator is used for all zones.
SpreadMinimizingZones []string
}

// BasicLifecycler is a basic ring lifecycler which allows to hook custom
Expand Down Expand Up @@ -93,7 +97,10 @@ type BasicLifecycler struct {

// NewBasicLifecycler makes a new BasicLifecycler.
func NewBasicLifecycler(cfg BasicLifecyclerConfig, ringName, ringKey string, store kv.Client, delegate BasicLifecyclerDelegate, logger log.Logger, reg prometheus.Registerer) (*BasicLifecycler, error) {
tokenGenerator := NewRandomTokenGenerator()
tokenGenerator, err := newTokenGenerator(cfg.ID, cfg.Zone, cfg.SpreadMinimizingZones, logger)
if err != nil {
return nil, err
}
l := &BasicLifecycler{
cfg: cfg,
ringName: ringName,
Expand Down
44 changes: 42 additions & 2 deletions ring/basic_lifecycler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"github.com/grafana/dskit/flagext"
"github.com/grafana/dskit/kv"
"github.com/grafana/dskit/kv/consul"
"github.com/grafana/dskit/services"
Expand All @@ -19,9 +20,48 @@ import (
const (
testRingKey = "test"
testRingName = "test"
testInstanceID = "test-id"
testInstanceID = "test-id-1"
)

func TestBasicLifecycler_GetTokenGenerator(t *testing.T) {
isRandomTokenGenerator := func(tg TokenGenerator) bool {
_, ok := tg.(*RandomTokenGenerator)
return ok
}
isSpreadMinimizingTokenGenerator := func(tg TokenGenerator) bool {
_, ok := tg.(*SpreadMinimizingTokenGenerator)
return ok
}
tests := map[string]struct {
spreadMinimizingZones flagext.StringSliceCSV
validateResult func(TokenGenerator) bool
}{
"empty SpreadMinimizingZones generates RandomTokenGenerator": {
validateResult: isRandomTokenGenerator,
},
"if SpreadMinimizingZones contains test-zone, it generates SpreadMinimizingTokenGenerator": {
spreadMinimizingZones: []string{zone(1), zone(2)},
validateResult: isSpreadMinimizingTokenGenerator,
},
"if SpreadMinimizingZones doesn't contain zone1, it generates RandomTokenGenerator": {
spreadMinimizingZones: []string{zone(2)},
validateResult: isRandomTokenGenerator,
},
"if SpreadMinimizingZones contains more than maxZonesCount elements, it generates RandomTokenGenerator": {
spreadMinimizingZones: []string{zone(1), zone(2), zone(3), zone(4), zone(5), zone(6), zone(7), zone(8), zone(9)},
validateResult: isRandomTokenGenerator,
},
}

cfg := prepareBasicLifecyclerConfig()
for _, testData := range tests {
cfg.SpreadMinimizingZones = testData.spreadMinimizingZones
lifecycler, _, _, err := prepareBasicLifecycler(t, cfg)
require.NoError(t, err)
require.True(t, testData.validateResult(lifecycler.tokenGenerator))
}
}

func TestBasicLifecycler_RegisterOnStart(t *testing.T) {
tests := map[string]struct {
initialInstanceID string
Expand Down Expand Up @@ -489,7 +529,7 @@ func prepareBasicLifecyclerConfig() BasicLifecyclerConfig {
return BasicLifecyclerConfig{
ID: testInstanceID,
Addr: "127.0.0.1:12345",
Zone: "test-zone",
Zone: zone(1),
HeartbeatPeriod: time.Minute,
TokensObservePeriod: 0,
NumTokens: 5,
Expand Down
17 changes: 11 additions & 6 deletions ring/lifecycler.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,11 +41,12 @@ type LifecyclerConfig struct {

// FinalSleep's default value can be overridden by
// setting it before calling RegisterFlags or RegisterFlagsWithPrefix.
FinalSleep time.Duration `yaml:"final_sleep" category:"advanced"`
TokensFilePath string `yaml:"tokens_file_path"`
Zone string `yaml:"availability_zone"`
UnregisterOnShutdown bool `yaml:"unregister_on_shutdown" category:"advanced"`
ReadinessCheckRingHealth bool `yaml:"readiness_check_ring_health" category:"advanced"`
FinalSleep time.Duration `yaml:"final_sleep" category:"advanced"`
TokensFilePath string `yaml:"tokens_file_path"`
Zone string `yaml:"availability_zone"`
UnregisterOnShutdown bool `yaml:"unregister_on_shutdown" category:"advanced"`
ReadinessCheckRingHealth bool `yaml:"readiness_check_ring_health" category:"advanced"`
SpreadMinimizingZones flagext.StringSliceCSV `yaml:"spread_minimizing_zones" category:"advanced"`

// For testing, you can override the address and ID of this ingester
Addr string `yaml:"address" category:"advanced"`
Expand Down Expand Up @@ -96,6 +97,7 @@ func (cfg *LifecyclerConfig) RegisterFlagsWithPrefix(prefix string, f *flag.Flag
f.BoolVar(&cfg.UnregisterOnShutdown, prefix+"unregister-on-shutdown", true, "Unregister from the ring upon clean shutdown. It can be useful to disable for rolling restarts with consistent naming in conjunction with -distributor.extend-writes=false.")
f.BoolVar(&cfg.ReadinessCheckRingHealth, prefix+"readiness-check-ring-health", true, "When enabled the readiness probe succeeds only after all instances are ACTIVE and healthy in the ring, otherwise only the instance itself is checked. This option should be disabled if in your cluster multiple instances can be rolled out simultaneously, otherwise rolling updates may be slowed down.")
f.BoolVar(&cfg.EnableInet6, prefix+"enable-inet6", false, "Enable IPv6 support. Required to make use of IP addresses from IPv6 interfaces.")
f.Var(&cfg.SpreadMinimizingZones, prefix+"spread-minimizing-zones", "Comma-separated list of zones whose tokens are generated by SpreadMinimizingTokenGenerator. By default it is empty, which means that tokens in all zones are generated by RandomTokenGenerator.")
}

// Lifecycler is responsible for managing the lifecycle of entries in the ring.
Expand Down Expand Up @@ -170,7 +172,10 @@ func NewLifecycler(cfg LifecyclerConfig, flushTransferer FlushTransferer, ringNa
flushTransferer = NewNoopFlushTransferer()
}

tokenGenerator := NewRandomTokenGenerator()
tokenGenerator, err := newTokenGenerator(cfg.ID, cfg.Zone, cfg.SpreadMinimizingZones, logger)
if err != nil {
return nil, err
}
l := &Lifecycler{
cfg: cfg,
flushTransferer: flushTransferer,
Expand Down
53 changes: 52 additions & 1 deletion ring/lifecycler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,12 @@ const (
ringKey = "ring"
)

var (
zone = func(id int) string {
return fmt.Sprintf("zone%d", id)
}
)

func testLifecyclerConfig(ringConfig Config, id string) LifecyclerConfig {
var lifecyclerConfig LifecyclerConfig
flagext.DefaultValues(&lifecyclerConfig)
Expand All @@ -33,7 +39,7 @@ func testLifecyclerConfig(ringConfig Config, id string) LifecyclerConfig {
lifecyclerConfig.RingConfig = ringConfig
lifecyclerConfig.NumTokens = 1
lifecyclerConfig.ID = id
lifecyclerConfig.Zone = "zone1"
lifecyclerConfig.Zone = zone(1)
lifecyclerConfig.FinalSleep = 0
lifecyclerConfig.HeartbeatPeriod = 100 * time.Millisecond

Expand All @@ -48,6 +54,51 @@ func checkNormalised(d interface{}, id string) bool {
len(desc.Ingesters[id].Tokens) == 1
}

func TestLifecycler_TokenGenerator(t *testing.T) {
isRandomTokenGenerator := func(tg TokenGenerator) bool {
_, ok := tg.(*RandomTokenGenerator)
return ok
}
isSpreadMinimizingTokenGenerator := func(tg TokenGenerator) bool {
_, ok := tg.(*SpreadMinimizingTokenGenerator)
return ok
}
tests := map[string]struct {
spreadMinimizingZones flagext.StringSliceCSV
validateResult func(TokenGenerator) bool
}{
"empty SpreadMinimizingZones generates RandomTokenGenerator": {
validateResult: isRandomTokenGenerator,
},
"if SpreadMinimizingZones contains zone1, it generates SpreadMinimizingTokenGenerator": {
spreadMinimizingZones: []string{zone(1), zone(2)},
validateResult: isSpreadMinimizingTokenGenerator,
},
"if SpreadMinimizingZones doesn't contain zone1, it generates RandomTokenGenerator": {
spreadMinimizingZones: []string{zone(2)},
validateResult: isRandomTokenGenerator,
},
"if SpreadMinimizingZones contains more than maxZonesCount elements, it generates RandomTokenGenerator": {
spreadMinimizingZones: []string{zone(1), zone(2), zone(3), zone(4), zone(5), zone(6), zone(7), zone(8), zone(9)},
validateResult: isRandomTokenGenerator,
},
}

ringStore, closer := consul.NewInMemoryClient(GetCodec(), log.NewNopLogger(), nil)
t.Cleanup(func() { assert.NoError(t, closer.Close()) })

var ringConfig Config
flagext.DefaultValues(&ringConfig)
ringConfig.KVStore.Mock = ringStore
cfg := testLifecyclerConfig(ringConfig, "instance-1")
for _, testData := range tests {
cfg.SpreadMinimizingZones = testData.spreadMinimizingZones
lifecycler, err := NewLifecycler(cfg, &nopFlushTransferer{}, "ingester", ringKey, true, log.NewNopLogger(), nil)
require.NoError(t, err)
require.True(t, testData.validateResult(lifecycler.tokenGenerator))
}
}

func TestLifecycler_HealthyInstancesCount(t *testing.T) {
ringStore, closer := consul.NewInMemoryClient(GetCodec(), log.NewNopLogger(), nil)
t.Cleanup(func() { assert.NoError(t, closer.Close()) })
Expand Down
14 changes: 6 additions & 8 deletions ring/spread_minimizing_token_generator.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ var (
return fmt.Errorf("unable to extract instance id from \"%s\"", instanceID)
}
errorZoneCountTooBig = func(zonesCount int) error {
return fmt.Errorf("number of zones %d is too big: it should be higher than %d", zonesCount, maxZonesCount)
return fmt.Errorf("number of zones %d is too big: it must not be higher than %d", zonesCount, maxZonesCount)
}
errorZoneNotValid = func(zone string) error {
return fmt.Errorf("zone %s is not valid", zone)
Expand All @@ -50,16 +50,15 @@ type SpreadMinimizingTokenGenerator struct {
cfg SpreadMinimizingConfig
instanceID int
zoneID int
zones []string
logger log.Logger
}

func NewSpreadMinimizingTokenGenerator(cfg SpreadMinimizingConfig, zones []string, logger log.Logger) (*SpreadMinimizingTokenGenerator, error) {
if len(zones) > maxZonesCount {
return nil, errorZoneCountTooBig(len(zones))
func NewSpreadMinimizingTokenGenerator(cfg SpreadMinimizingConfig, spreadMinimizingZones []string, logger log.Logger) (*SpreadMinimizingTokenGenerator, error) {
if len(spreadMinimizingZones) > maxZonesCount {
return nil, errorZoneCountTooBig(len(spreadMinimizingZones))
}
sortedZones := make([]string, len(zones))
copy(sortedZones, zones)
sortedZones := make([]string, len(spreadMinimizingZones))
copy(sortedZones, spreadMinimizingZones)
if !slices.IsSorted(sortedZones) {
sort.Strings(sortedZones)
}
Expand All @@ -76,7 +75,6 @@ func NewSpreadMinimizingTokenGenerator(cfg SpreadMinimizingConfig, zones []strin
cfg: cfg,
instanceID: instanceID,
zoneID: zoneID,
zones: sortedZones,
logger: logger,
}
return tokenGenerator, nil
Expand Down
22 changes: 22 additions & 0 deletions ring/token_generator.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,10 @@ import (
"math/rand"
"sort"
"time"

"github.com/go-kit/log"
"github.com/go-kit/log/level"
"golang.org/x/exp/slices"
)

type TokenGenerator interface {
Expand Down Expand Up @@ -52,3 +56,21 @@ func (t *RandomTokenGenerator) GenerateTokens(requestedTokensCount int, allTaken

return tokens
}

// newTokenGenerator creates a new TokenGenerator for the given instanceID and zone. If spreadMinimizingZones contains
// at most maxZonesCount zones, and one of them is the given zone, the new TokenGenerator will be an instance of
// SpreadMinimizingTokenGenerator. In all other cases, an instance of RandomTokenGenerator is returned.
func newTokenGenerator(instanceID, zone string, spreadMinimizingZones []string, logger log.Logger) (TokenGenerator, error) {
if slices.Index(spreadMinimizingZones, zone) >= 0 {
if len(spreadMinimizingZones) > maxZonesCount {
level.Warn(logger).Log("msg", "Number of configured zones via the SpreadMinimizing flag is too big. Tokens will be generated randomly.", "SpreadMinimizingZones", spreadMinimizingZones, "maximal supported number of zones", maxZonesCount)
return NewRandomTokenGenerator(), nil
}
spreadMinimizingConfig := SpreadMinimizingConfig{
InstanceID: instanceID,
Zone: zone,
}
return NewSpreadMinimizingTokenGenerator(spreadMinimizingConfig, spreadMinimizingZones, logger)
}
return NewRandomTokenGenerator(), nil
}

0 comments on commit 877e53d

Please sign in to comment.