Skip to content

Commit

Permalink
Improve ring tests, wait for watch update instead of sleeping. (#6824) (
Browse files Browse the repository at this point in the history
#6840)

Signed-off-by: Peter Štibraný <[email protected]>
(cherry picked from commit 11ddac2)

Co-authored-by: Peter Štibraný <[email protected]>
  • Loading branch information
grafanabot and pstibrany authored Dec 6, 2023
1 parent 2a56165 commit 5792199
Showing 1 changed file with 62 additions and 22 deletions.
84 changes: 62 additions & 22 deletions pkg/ingester/owned_series_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,15 +7,18 @@ import (
"fmt"
"math/rand"
"slices"
"sync"
"testing"
"time"

"github.com/go-kit/log"
"github.com/grafana/dskit/concurrency"
"github.com/grafana/dskit/flagext"
"github.com/grafana/dskit/kv"
"github.com/grafana/dskit/kv/consul"
"github.com/grafana/dskit/ring"
"github.com/grafana/dskit/services"
"github.com/grafana/dskit/test"
"github.com/grafana/dskit/user"
"github.com/prometheus/prometheus/model/labels"
"github.com/stretchr/testify/assert"
Expand All @@ -36,7 +39,7 @@ type ownedSeriesTestContext struct {
ownedSeries *ownedSeriesService
ing *Ingester
db *userTSDB
kvStore *consul.Client
kvStore *watchingKV

buf *concurrency.SyncBuffer
tenantShards map[string]int
Expand Down Expand Up @@ -69,7 +72,7 @@ func (c *ownedSeriesTestContext) registerTestedIngesterIntoRing(t *testing.T, in
c.ingesterZone = instanceZone

// Insert our ingester into the ring. When lifecycler starts, it will find this entry, and keep the tokens.
updateRing(t, c.kvStore, func(desc *ring.Desc) {
updateRingAndWaitForWatcherToReadUpdate(t, c.kvStore, func(desc *ring.Desc) {
var tokens []uint32
for i := 0; i < ownedServiceSeriesCount/2; i++ {
tokens = append(tokens, c.seriesTokens[i]+1)
Expand All @@ -85,7 +88,7 @@ func (c *ownedSeriesTestContext) registerTestedIngesterIntoRing(t *testing.T, in
// Insert second ingester to the ring, with tokens that will make it second half of the series.
// This ingester will also be first ingester in the user's shuffle shard (skip: 0).
func (c *ownedSeriesTestContext) registerSecondIngesterOwningHalfOfTheTokens(t *testing.T) {
updateRing(t, c.kvStore, func(desc *ring.Desc) {
updateRingAndWaitForWatcherToReadUpdate(t, c.kvStore, func(desc *ring.Desc) {
var tokens []uint32
for i := ownedServiceSeriesCount / 2; i < ownedServiceSeriesCount; i++ {
tokens = append(tokens, c.seriesTokens[i]+1)
Expand All @@ -99,7 +102,7 @@ func (c *ownedSeriesTestContext) registerSecondIngesterOwningHalfOfTheTokens(t *
}

func (c *ownedSeriesTestContext) removeSecondIngester(t *testing.T) {
updateRing(t, c.kvStore, func(desc *ring.Desc) {
updateRingAndWaitForWatcherToReadUpdate(t, c.kvStore, func(desc *ring.Desc) {
desc.RemoveIngester("second-ingester")
})
}
Expand Down Expand Up @@ -254,32 +257,34 @@ func TestOwnedSeriesService(t *testing.T) {
kvStore, closer := consul.NewInMemoryClient(ring.GetCodec(), log.NewNopLogger(), nil)
t.Cleanup(func() { assert.NoError(t, closer.Close()) })

wkv := &watchingKV{Client: kvStore}

cfg := defaultIngesterTestConfig(t)
cfg.IngesterRing.KVStore.Mock = kvStore
cfg.IngesterRing.KVStore.Mock = wkv // Use "watchingKV" so that we know when update was processed.
cfg.IngesterRing.InstanceID = "first-ingester"
cfg.IngesterRing.NumTokens = ownedServiceSeriesCount/2 + 1 // We will use token for half of the series + one token for user.
cfg.IngesterRing.ZoneAwarenessEnabled = true
cfg.IngesterRing.InstanceZone = "zone"
cfg.IngesterRing.ReplicationFactor = 1 // Currently we require RF=number of zones, and we will only work with single zone.

// Start the ring watching. We need watcher to be running when we're doing ring updates, otherwise our update-and-watch function will fail.
rng, err := ring.New(cfg.IngesterRing.ToRingConfig(), "ingester", IngesterRingKey, log.NewNopLogger(), nil)
require.NoError(t, err)
require.NoError(t, services.StartAndAwaitRunning(context.Background(), rng))
t.Cleanup(func() {
require.NoError(t, services.StopAndAwaitTerminated(context.Background(), rng))
})

c := ownedSeriesTestContext{
seriesToWrite: seriesToWrite,
seriesTokens: seriesTokens,
kvStore: kvStore,
kvStore: wkv,
}

c.registerTestedIngesterIntoRing(t, cfg.IngesterRing.InstanceID, cfg.IngesterRing.InstanceAddr, cfg.IngesterRing.InstanceZone)

c.ing = setupIngester(t, cfg)

// Start the ring watching.
rng, err := ring.New(cfg.IngesterRing.ToRingConfig(), "ingester", IngesterRingKey, log.NewNopLogger(), nil)
require.NoError(t, err)
require.NoError(t, services.StartAndAwaitRunning(context.Background(), rng))
t.Cleanup(func() {
require.NoError(t, services.StopAndAwaitTerminated(context.Background(), rng))
})

c.tenantShards = map[string]int{}
c.buf = &concurrency.SyncBuffer{}

Expand All @@ -294,11 +299,13 @@ func TestOwnedSeriesRingChanged(t *testing.T) {
kvStore, closer := consul.NewInMemoryClient(ring.GetCodec(), log.NewNopLogger(), nil)
t.Cleanup(func() { assert.NoError(t, closer.Close()) })

wkv := &watchingKV{Client: kvStore}

rc := ring.Config{}
flagext.DefaultValues(&rc)

// Configure ring
rc.KVStore.Mock = kvStore
rc.KVStore.Mock = wkv
rc.HeartbeatTimeout = 1 * time.Minute
rc.ReplicationFactor = 3
rc.ZoneAwarenessEnabled = true
Expand All @@ -318,7 +325,7 @@ func TestOwnedSeriesRingChanged(t *testing.T) {

ownedSeries := newOwnedSeriesService(10*time.Minute, instanceID1, rng, log.NewLogfmtLogger(&buf), nil, nil, nil, nil)

updateRing(t, kvStore, func(desc *ring.Desc) {
updateRingAndWaitForWatcherToReadUpdate(t, wkv, func(desc *ring.Desc) {
desc.AddIngester(instanceID1, "localhost:11111", "zone", []uint32{1, 2, 3}, ring.ACTIVE, time.Now())
})

Expand All @@ -336,7 +343,7 @@ func TestOwnedSeriesRingChanged(t *testing.T) {
})

t.Run("new instance added", func(t *testing.T) {
updateRing(t, kvStore, func(desc *ring.Desc) {
updateRingAndWaitForWatcherToReadUpdate(t, wkv, func(desc *ring.Desc) {
desc.AddIngester(instanceID2, "localhost:22222", "zone", []uint32{4, 5, 6}, ring.ACTIVE, time.Now())
})

Expand All @@ -346,7 +353,7 @@ func TestOwnedSeriesRingChanged(t *testing.T) {
})

t.Run("change of state is not interesting", func(t *testing.T) {
updateRing(t, kvStore, func(desc *ring.Desc) {
updateRingAndWaitForWatcherToReadUpdate(t, wkv, func(desc *ring.Desc) {
desc.AddIngester(instanceID2, "localhost:22222", "zone", []uint32{4, 5, 6}, ring.LEAVING, time.Now())
})

Expand All @@ -357,7 +364,7 @@ func TestOwnedSeriesRingChanged(t *testing.T) {
})

t.Run("removal of instance", func(t *testing.T) {
updateRing(t, kvStore, func(desc *ring.Desc) {
updateRingAndWaitForWatcherToReadUpdate(t, wkv, func(desc *ring.Desc) {
desc.RemoveIngester(instanceID2)
})

Expand Down Expand Up @@ -386,7 +393,10 @@ func setupIngester(t *testing.T, cfg Config) *Ingester {
return ing
}

func updateRing(t *testing.T, kvStore *consul.Client, updateFn func(*ring.Desc)) {
func updateRingAndWaitForWatcherToReadUpdate(t *testing.T, kvStore *watchingKV, updateFn func(*ring.Desc)) {
// Clear existing updates, so that we can test if next update was processed.
kvStore.getAndResetUpdatedKeys()

err := kvStore.CAS(context.Background(), IngesterRingKey, func(in interface{}) (out interface{}, retry bool, err error) {
d, _ := in.(*ring.Desc)
if d == nil {
Expand All @@ -399,6 +409,36 @@ func updateRing(t *testing.T, kvStore *consul.Client, updateFn func(*ring.Desc))
})
require.NoError(t, err)

// Wait a bit to make sure that ring has received the update.
time.Sleep(100 * time.Millisecond)
test.Poll(t, 1*time.Second, true, func() interface{} {
v := kvStore.getAndResetUpdatedKeys()
return slices.Contains(v, IngesterRingKey)
})
}

type watchingKV struct {
kv.Client

updatedKeysMu sync.Mutex
updatedKeys []string
}

func (w *watchingKV) WatchKey(ctx context.Context, key string, f func(interface{}) bool) {
w.Client.WatchKey(ctx, key, func(i interface{}) bool {
v := f(i)

w.updatedKeysMu.Lock()
defer w.updatedKeysMu.Unlock()
w.updatedKeys = append(w.updatedKeys, key)

return v
})
}

func (w *watchingKV) getAndResetUpdatedKeys() []string {
w.updatedKeysMu.Lock()
defer w.updatedKeysMu.Unlock()

r := w.updatedKeys
w.updatedKeys = nil
return r
}

0 comments on commit 5792199

Please sign in to comment.