Skip to content

Commit

Permalink
Implementing review findings 2
Browse files Browse the repository at this point in the history
Signed-off-by: Yuri Nikolic <[email protected]>
  • Loading branch information
duricanikolic committed Jun 20, 2023
1 parent fea8923 commit a4fd5e4
Show file tree
Hide file tree
Showing 5 changed files with 32 additions and 65 deletions.
2 changes: 1 addition & 1 deletion ring/basic_lifecycler.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ type BasicLifecyclerConfig struct {

// If set, specifies the TokenGenerator implementation that will be used for generating tokens.
// Default value is nil, which means that RandomTokenGenerator is used.
RingTokenGenerator TokenGenerator `yaml:"-"`
RingTokenGenerator TokenGenerator
}

// BasicLifecycler is a basic ring lifecycler which allows to hook custom
Expand Down
2 changes: 1 addition & 1 deletion ring/basic_lifecycler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ const (
func TestBasicLifecycler_GetTokenGenerator(t *testing.T) {
cfg := prepareBasicLifecyclerConfig()

spreadMinimizingTokenGenerator, err := NewSpreadMinimizingTokenGenerator(SpreadMinimizingConfig{[]string{zone(1), zone(2), zone(3)}}, cfg.ID, cfg.Zone, log.NewNopLogger())
spreadMinimizingTokenGenerator, err := NewSpreadMinimizingTokenGenerator(cfg.ID, cfg.Zone, []string{zone(1), zone(2), zone(3)}, log.NewNopLogger())
require.NoError(t, err)

tests := []TokenGenerator{nil, NewRandomTokenGenerator(), spreadMinimizingTokenGenerator}
Expand Down
2 changes: 1 addition & 1 deletion ring/lifecycler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ func TestLifecycler_TokenGenerator(t *testing.T) {

cfg := testLifecyclerConfig(ringConfig, "instance-1")

spreadMinimizingTokenGenerator, err := NewSpreadMinimizingTokenGenerator(SpreadMinimizingConfig{[]string{zone(1), zone(2), zone(3)}}, cfg.ID, cfg.Zone, log.NewNopLogger())
spreadMinimizingTokenGenerator, err := NewSpreadMinimizingTokenGenerator(cfg.ID, cfg.Zone, []string{zone(1), zone(2), zone(3)}, log.NewNopLogger())
require.NoError(t, err)

tests := []TokenGenerator{nil, NewRandomTokenGenerator(), spreadMinimizingTokenGenerator}
Expand Down
42 changes: 15 additions & 27 deletions ring/spread_minimizing_token_generator.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,6 @@ import (
"github.com/go-kit/log"
"github.com/go-kit/log/level"

"github.com/grafana/dskit/flagext"

"golang.org/x/exp/slices"
)

Expand Down Expand Up @@ -44,29 +42,19 @@ var (
}
)

type SpreadMinimizingConfig struct {
SpreadMinimizingZones flagext.StringSliceCSV `yaml:"spread_minimizing_zones" category:"experimental"`
}

// Empty returns true if this SpreadMinimizingConfig is empty, i.e., if it has no configuration.
// Otherwise, it returns false.
func (cfg *SpreadMinimizingConfig) Empty() bool {
return len(cfg.SpreadMinimizingZones) == 0
}

type SpreadMinimizingTokenGenerator struct {
cfg SpreadMinimizingConfig
instanceID int
zoneID int
logger log.Logger
instanceID int
zoneID int
spreadMinimizingZones []string
logger log.Logger
}

func NewSpreadMinimizingTokenGenerator(cfg SpreadMinimizingConfig, instance, zone string, logger log.Logger) (*SpreadMinimizingTokenGenerator, error) {
if len(cfg.SpreadMinimizingZones) <= 0 || len(cfg.SpreadMinimizingZones) > maxZonesCount {
return nil, errorZoneCountOutOfBound(len(cfg.SpreadMinimizingZones))
func NewSpreadMinimizingTokenGenerator(instance, zone string, spreadMinimizingZones []string, logger log.Logger) (*SpreadMinimizingTokenGenerator, error) {
if len(spreadMinimizingZones) <= 0 || len(spreadMinimizingZones) > maxZonesCount {
return nil, errorZoneCountOutOfBound(len(spreadMinimizingZones))
}
sortedZones := make([]string, len(cfg.SpreadMinimizingZones))
copy(sortedZones, cfg.SpreadMinimizingZones)
sortedZones := make([]string, len(spreadMinimizingZones))
copy(sortedZones, spreadMinimizingZones)
if !slices.IsSorted(sortedZones) {
sort.Strings(sortedZones)
}
Expand All @@ -80,10 +68,10 @@ func NewSpreadMinimizingTokenGenerator(cfg SpreadMinimizingConfig, instance, zon
}

tokenGenerator := &SpreadMinimizingTokenGenerator{
cfg: cfg,
instanceID: instanceID,
zoneID: zoneID,
logger: logger,
instanceID: instanceID,
zoneID: zoneID,
spreadMinimizingZones: sortedZones,
logger: logger,
}
return tokenGenerator, nil
}
Expand Down Expand Up @@ -256,7 +244,7 @@ func (t *SpreadMinimizingTokenGenerator) generateTokensByInstanceID() map[int]To
if highestOwnershipInstance == nil || highestOwnershipInstance.ownership <= float64(optimalTokenOwnership) {
level.Warn(t.logger).Log("msg", "it was impossible to add a token because the instance with the highest ownership cannot satisfy the request", "added tokens", addedTokens+1, "highest ownership", highestOwnershipInstance.ownership, "requested ownership", optimalTokenOwnership)
// if this happens, it means that we cannot accommodate other tokens, so we panic
err := fmt.Errorf("it was impossible to add %dth token for instance with id %d in zone %s because the instance with the highest ownership cannot satisfy the requested ownership %d", addedTokens+1, i, t.cfg.SpreadMinimizingZones[t.zoneID], optimalTokenOwnership)
err := fmt.Errorf("it was impossible to add %dth token for instance with id %d in zone %s because the instance with the highest ownership cannot satisfy the requested ownership %d", addedTokens+1, i, t.spreadMinimizingZones[t.zoneID], optimalTokenOwnership)
panic(err)
}
tokensQueue := tokensQueues[highestOwnershipInstance.item.instanceID]
Expand All @@ -272,7 +260,7 @@ func (t *SpreadMinimizingTokenGenerator) generateTokensByInstanceID() map[int]To
if err != nil {
level.Error(t.logger).Log("msg", "it was impossible to calculate a new token because an error occurred", "err", err)
// if this happens, it means that we cannot accommodate additional tokens, so we panic
err := fmt.Errorf("it was impossible to calculate the %dth token for instance with id %d in zone %s", addedTokens+1, i, t.cfg.SpreadMinimizingZones[t.zoneID])
err := fmt.Errorf("it was impossible to calculate the %dth token for instance with id %d in zone %s", addedTokens+1, i, t.spreadMinimizingZones[t.zoneID])
panic(err)
}
tokens = append(tokens, newToken)
Expand Down
49 changes: 14 additions & 35 deletions ring/spread_minimizing_token_generator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,14 +26,6 @@ var (
}
)

func TestSpreadMinimizingConfig_Empty(t *testing.T) {
var spreadMinimizingconfig SpreadMinimizingConfig
require.True(t, spreadMinimizingconfig.Empty())

spreadMinimizingconfig.SpreadMinimizingZones = zones
require.False(t, spreadMinimizingconfig.Empty())
}

func TestSpreadMinimizingTokenGenerator_ParseInstanceID(t *testing.T) {
tests := map[string]struct {
instanceID string
Expand Down Expand Up @@ -140,9 +132,8 @@ func TestSpreadMinimizingTokenGenerator_NewSpreadMinimizingTokenGenerator(t *tes
}

for _, testData := range tests {
cfg := SpreadMinimizingConfig{testData.spreadMinimizingZones}
instance := fmt.Sprintf("instance-%s-1", testData.zone)
tokenGenerator, err := NewSpreadMinimizingTokenGenerator(cfg, instance, testData.zone, log.NewNopLogger())
tokenGenerator, err := NewSpreadMinimizingTokenGenerator(instance, testData.zone, testData.spreadMinimizingZones, log.NewNopLogger())
if testData.expectedError != nil {
require.Error(t, err)
require.Equal(t, testData.expectedError, err)
Expand All @@ -154,10 +145,9 @@ func TestSpreadMinimizingTokenGenerator_NewSpreadMinimizingTokenGenerator(t *tes
}

func TestSpreadMinimizingTokenGenerator_GenerateFirstInstanceTokens(t *testing.T) {
cfg := &SpreadMinimizingConfig{zones}
for z, zone := range zones {
instance := fmt.Sprintf("instance-%s-%d", zone, 10)
tokenGenerator := createSpreadMinimizingTokenGenerator(t, cfg, instance, zone)
tokenGenerator := createSpreadMinimizingTokenGenerator(t, instance, zone, zones)
tokens := tokenGenerator.generateFirstInstanceTokens()
for i, token := range tokens {
require.Equal(t, uint32(1<<23*i+z), token)
Expand All @@ -167,10 +157,9 @@ func TestSpreadMinimizingTokenGenerator_GenerateFirstInstanceTokens(t *testing.T
}

func TestSpreadMinimizingTokenGenerator_GenerateFirstInstanceTokensIdempotent(t *testing.T) {
cfg := &SpreadMinimizingConfig{zones}
for _, zone := range zones {
instance := fmt.Sprintf("instance-%s-%d", zone, 10)
tokenGenerator := createSpreadMinimizingTokenGenerator(t, cfg, instance, zone)
tokenGenerator := createSpreadMinimizingTokenGenerator(t, instance, zone, zones)
tokens1 := tokenGenerator.generateFirstInstanceTokens()
require.Len(t, tokens1, tokensPerInstance)
tokens2 := tokenGenerator.generateFirstInstanceTokens()
Expand Down Expand Up @@ -198,7 +187,7 @@ func TestSpreadMinimizingTokenGenerator_OptimalTokenOwnership(t *testing.T) {
expectedOptimalTokenOwnership: 0,
},
}
tokenGenerator := createSpreadMinimizingTokenGenerator(t, nil, testInstance, testZone)
tokenGenerator := createSpreadMinimizingTokenGenerator(t, testInstance, testZone, zones)
for _, testData := range tests {
optimalTokenOwnership := tokenGenerator.optimalTokenOwnership(testData.optimalInstanceOwnership, testData.currInstanceOwnership, testData.currTokensCount)
require.Equal(t, testData.expectedOptimalTokenOwnership, optimalTokenOwnership)
Expand Down Expand Up @@ -278,7 +267,7 @@ func TestSpreadMinimizingTokenGenerator_CalculateNewToken(t *testing.T) {
expectedError: fmt.Errorf("calculation of a new token between 80 and 240 with optimal token ownership 400 was impossible: distance between lower and upper bound 160 is not big enough"),
},
}
tokenGenerator := createSpreadMinimizingTokenGenerator(t, nil, testInstance, testZone)
tokenGenerator := createSpreadMinimizingTokenGenerator(t, testInstance, testZone, zones)
for _, testData := range tests {
newToken, err := tokenGenerator.calculateNewToken(testData.ringToken, testData.optimalTokenOwnership)
if testData.expectedError == nil {
Expand All @@ -293,11 +282,10 @@ func TestSpreadMinimizingTokenGenerator_CalculateNewToken(t *testing.T) {

func TestSpreadMinimizingTokenGenerator_GenerateAllTokensIdempotent(t *testing.T) {
maxInstanceID := 128
cfg := &SpreadMinimizingConfig{zones}
for instanceID := 0; instanceID < maxInstanceID; instanceID++ {
for _, zone := range zones {
instance := fmt.Sprintf("instance-%s-%d", zone, instanceID)
tokenGenerator := createSpreadMinimizingTokenGenerator(t, cfg, instance, zone)
tokenGenerator := createSpreadMinimizingTokenGenerator(t, instance, zone, zones)
tokens1 := tokenGenerator.generateAllTokens()
require.Len(t, tokens1, tokensPerInstance)
tokens2 := tokenGenerator.generateAllTokens()
Expand Down Expand Up @@ -341,11 +329,10 @@ func TestSpreadMinimizingTokenGenerator_VerifyInstanceOwnershipSpreadByZone(t *t
func TestSpreadMinimizingTokenGenerator_CheckTokenUniqueness(t *testing.T) {
tokensPerInstance := 512
instanceID := 10000
cfg := &SpreadMinimizingConfig{zones}
allTokens := make(map[uint32]bool, tokensPerInstance*(instanceID+1)*len(zones))
for _, zone := range zones {
instance := fmt.Sprintf("instance-%s-%d", zone, instanceID)
tokenGenerator := createSpreadMinimizingTokenGenerator(t, cfg, instance, zone)
tokenGenerator := createSpreadMinimizingTokenGenerator(t, instance, zone, zones)
tokens := tokenGenerator.generateTokensByInstanceID()
for i := 0; i <= instanceID; i++ {
tks := tokens[i]
Expand All @@ -361,8 +348,7 @@ func TestSpreadMinimizingTokenGenerator_CheckTokenUniqueness(t *testing.T) {
}

func TestSpreadMinimizingTokenGenerator_GenerateAtMost512Tokens(t *testing.T) {
cfg := &SpreadMinimizingConfig{zones}
tokenGenerator := createSpreadMinimizingTokenGenerator(t, cfg, testInstance, testZone)
tokenGenerator := createSpreadMinimizingTokenGenerator(t, testInstance, testZone, zones)
// we try to generate 2*optimalTokensPerInstance tokens, and we ensure
// that only optimalTokensPerInstance tokens are generated
tokens := tokenGenerator.GenerateTokens(2*optimalTokensPerInstance, nil)
Expand All @@ -374,8 +360,7 @@ func TestSpreadMinimizingTokenGenerator_GenerateTokens(t *testing.T) {
instanceID := 1000
zone := zones[0]
instance := fmt.Sprintf("instance-%s-%d", zone, instanceID)
cfg := &SpreadMinimizingConfig{zones}
tokenGenerator := createSpreadMinimizingTokenGenerator(t, cfg, instance, zone)
tokenGenerator := createSpreadMinimizingTokenGenerator(t, instance, zone, zones)
// this is the set of all sorted tokens assigned to instance
allTokens := tokenGenerator.generateAllTokens()
require.Len(t, allTokens, tokensPerInstance)
Expand Down Expand Up @@ -410,8 +395,7 @@ func TestSpreadMinimizingTokenGenerator_GenerateTokens(t *testing.T) {
}

func BenchmarkSpreadMinimizingTokenGenerator_GenerateTokens(b *testing.B) {
cfg := &SpreadMinimizingConfig{zones}
tokenGenerator := createSpreadMinimizingTokenGenerator(b, cfg, testInstance, testZone)
tokenGenerator := createSpreadMinimizingTokenGenerator(b, testInstance, testZone, zones)
b.ResetTimer()
for i := 0; i < b.N; i++ {
tokenGenerator.GenerateTokens(512, nil)
Expand All @@ -420,8 +404,7 @@ func BenchmarkSpreadMinimizingTokenGenerator_GenerateTokens(b *testing.B) {

func TestSpreadMinimizingTokenGenerator_GetMissingTokens(t *testing.T) {
tokensPerInstance := 512
cfg := &SpreadMinimizingConfig{zones}
tokenGenerator := createSpreadMinimizingTokenGenerator(t, cfg, testInstance, testZone)
tokenGenerator := createSpreadMinimizingTokenGenerator(t, testInstance, testZone, zones)

// we get all the tokens for the underlying instance, but we don't mark all of them as taken
// in order to simulate that some tokens were taken by another instance when the method was
Expand Down Expand Up @@ -450,10 +433,9 @@ func TestSpreadMinimizingTokenGenerator_GetMissingTokens(t *testing.T) {
func createTokensForAllInstancesAndZones(t *testing.T, maxInstanceID, tokensPerInstance int) (map[uint32]*instanceInfo, map[string][]uint32) {
instanceByToken := make(map[uint32]*instanceInfo, (maxInstanceID+1)*tokensPerInstance*len(zones))
tokenSetsByZone := make(map[string][][]uint32, len(zones))
cfg := &SpreadMinimizingConfig{zones}
for _, zone := range zones {
instance := fmt.Sprintf("instance-%s-%d", zone, maxInstanceID)
tokenGenerator := createSpreadMinimizingTokenGenerator(t, cfg, instance, zone)
tokenGenerator := createSpreadMinimizingTokenGenerator(t, instance, zone, zones)
tokensByInstance := tokenGenerator.generateTokensByInstanceID()
for id, tokens := range tokensByInstance {
if !slices.IsSorted(tokens) {
Expand Down Expand Up @@ -483,11 +465,8 @@ func createTokensForAllInstancesAndZones(t *testing.T, maxInstanceID, tokensPerI
return instanceByToken, tokensByZone
}

func createSpreadMinimizingTokenGenerator(t testing.TB, cfg *SpreadMinimizingConfig, instance, zone string) *SpreadMinimizingTokenGenerator {
if cfg == nil {
cfg = &SpreadMinimizingConfig{zones}
}
tokenGenerator, err := NewSpreadMinimizingTokenGenerator(*cfg, instance, zone, log.NewLogfmtLogger(os.Stdout))
func createSpreadMinimizingTokenGenerator(t testing.TB, instance, zone string, zones []string) *SpreadMinimizingTokenGenerator {
tokenGenerator, err := NewSpreadMinimizingTokenGenerator(instance, zone, zones, log.NewLogfmtLogger(os.Stdout))
require.NoError(t, err)
require.NotNil(t, tokenGenerator)
return tokenGenerator
Expand Down

0 comments on commit a4fd5e4

Please sign in to comment.