Skip to content

Commit

Permalink
ring: Handle decreasing number of tokens
Browse files Browse the repository at this point in the history
Signed-off-by: Arve Knudsen <[email protected]>
  • Loading branch information
aknuds1 committed Feb 10, 2022
1 parent ef4b8ac commit 856020c
Show file tree
Hide file tree
Showing 2 changed files with 67 additions and 4 deletions.
13 changes: 9 additions & 4 deletions ring/lifecycler.go
Original file line number Diff line number Diff line change
Expand Up @@ -605,13 +605,18 @@ func (i *Lifecycler) initRing(ctx context.Context) error {
level.Debug(i.logger).Log("msg", "adding tokens from file", "tokens", len(tokensFromFile))
isActive = len(tokensFromFile) >= i.cfg.NumTokens
tokens = tokensFromFile
} else {
level.Debug(i.logger).Log("msg", "no tokens in file, generating new ones")
takenTokens := ringDesc.GetTokens()
} else if i.cfg.NumTokens == len(instanceDesc.Tokens) {
level.Debug(i.logger).Log("msg", "no tokens in file, adopting those of existing instance")
tokens = instanceDesc.Tokens
} else if i.cfg.NumTokens > len(instanceDesc.Tokens) {
needTokens := i.cfg.NumTokens - len(instanceDesc.Tokens)
newTokens := GenerateTokens(needTokens, takenTokens)
level.Debug(i.logger).Log("msg", "no tokens in file, generating new ones in addition to those of existing instance", "newTokens", needTokens)
newTokens := GenerateTokens(needTokens, ringDesc.GetTokens())
tokens = append(instanceDesc.Tokens, newTokens...)
sort.Sort(tokens)
} else {
level.Debug(i.logger).Log("msg", "no tokens in file, adopting a subset of existing instance's tokens", "numTokens", i.cfg.NumTokens)
tokens = instanceDesc.Tokens[0:i.cfg.NumTokens]
}
}

Expand Down
58 changes: 58 additions & 0 deletions ring/lifecycler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -313,6 +313,64 @@ func TestLifecycler_IncreasingTokensLeavingInstanceInTheRing(t *testing.T) {
})
}

// Test Lifecycler when decreasing tokens and instance is already in the ring in leaving state.
func TestLifecycler_DecreasingTokensLeavingInstanceInTheRing(t *testing.T) {
ctx := context.Background()

ringStore, closer := consul.NewInMemoryClient(GetCodec(), log.NewNopLogger(), nil)
t.Cleanup(func() { assert.NoError(t, closer.Close()) })

var ringConfig Config
flagext.DefaultValues(&ringConfig)
ringConfig.KVStore.Mock = ringStore
r, err := New(ringConfig, "ingester", IngesterRingKey, log.NewNopLogger(), nil)
require.NoError(t, err)
require.NoError(t, services.StartAndAwaitRunning(ctx, r))
t.Cleanup(func() {
assert.NoError(t, services.StopAndAwaitTerminated(ctx, r))
})

tokenDir := t.TempDir()
lifecyclerConfig := testLifecyclerConfig(ringConfig, "ing1")
// Make sure changes are applied instantly
lifecyclerConfig.HeartbeatPeriod = 0
lifecyclerConfig.NumTokens = 64
lifecyclerConfig.TokensFilePath = filepath.Join(tokenDir, "/tokens")

// Simulate ingester with 128 tokens left the ring in LEAVING state
err = r.KVClient.CAS(ctx, IngesterRingKey, func(in interface{}) (out interface{}, retry bool, err error) {
ringDesc := NewDesc()
addr, err := GetInstanceAddr(lifecyclerConfig.Addr, lifecyclerConfig.InfNames, nil)
if err != nil {
return nil, false, err
}

ringDesc.AddIngester("ing1", addr, lifecyclerConfig.Zone, GenerateTokens(128, nil), LEAVING, time.Now())
return ringDesc, true, nil
})
require.NoError(t, err)

// Start ingester with decreased number of tokens
l, err := NewLifecycler(lifecyclerConfig, &noopFlushTransferer{}, "ingester", IngesterRingKey, true, log.NewNopLogger(), nil)
require.NoError(t, err)
require.NoError(t, services.StartAndAwaitRunning(ctx, l))
t.Cleanup(func() {
assert.NoError(t, services.StopAndAwaitTerminated(ctx, l))
})

// Verify ingester joined, is active, and has 64 tokens
test.Poll(t, time.Second, true, func() interface{} {
d, err := r.KVClient.Get(ctx, IngesterRingKey)
require.NoError(t, err)

desc, ok := d.(*Desc)
require.True(t, ok)
ingDesc := desc.Ingesters["ing1"]
t.Log("Polling for new ingester to have become active with 64 tokens", "state", ingDesc.State, "tokens", len(ingDesc.Tokens))
return ingDesc.State == ACTIVE && len(ingDesc.Tokens) == 64
})
}

type MockClient struct {
ListFunc func(ctx context.Context, prefix string) ([]string, error)
GetFunc func(ctx context.Context, key string) (interface{}, error)
Expand Down

0 comments on commit 856020c

Please sign in to comment.