Skip to content

Commit

Permalink
Implement polling tenants concurrently (#3647)
Browse files Browse the repository at this point in the history
* Implement polling tenants concurrently

* Pass config through tempodb

* Avoid race on consecutiveErrors

* Add comments for atomic use

* Increase tenant count, concurrency and randomness in poller integration test

* Drop test logs

* Tidy up

* Include TenantPollConcurrency in deletion integration test

* Match a regex on the error rather than a specifc one to account for the concurrency

* Add log for base test

* Use math/rand/v2

* Fix for consistent error handling of consequetive errors

* Reduce test work and output for truncation and timeouts

* Update changelog

* Adjust error handling

Here we make changes to the error handling to account for the additional
complexity brought with the tenant concurrency.

This changes the behavior of the
blocklist_poll_tolerate_consecutive_errors configuration by applying to
a single tenant, which instructs the poller to retry until the threshold
is met.

A new configuration parameter blocklist_poll_tolerate_tenant_failures
has been added to account for the number of failing tenants that will be
tolerated.

This allows parts of the old behavior scoped to a single tenant, but
also accounts for a more global picture.  This means that a single
failing tenant by default will not stop the entire polling process.

Tests have been updated to account for this additional logic.

* Fix tenant failure increment

* Fix race in test

* Lint

* Avoid race for integer read/write

* Clarify tracking of failing tenants

* Play golf

* Add doc for new config option
  • Loading branch information
zalegrala authored Aug 5, 2024
1 parent 5a6f140 commit d3a9caf
Show file tree
Hide file tree
Showing 8 changed files with 352 additions and 141 deletions.
2 changes: 1 addition & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@
* [ENHANCEMENT] Reduce log level verbosity for e2e tests[#3900](https://github.com/grafana/tempo/pull/3900) (@javiermolinar)
* [ENHANCEMENT] Added new Traces api V2[#3912](https://github.com/grafana/tempo/pull/3912) (@javiermolinar)
* [ENHANCEMENT] Update to the latest dskit [#3915](https://github.com/grafana/tempo/pull/3915) (@andreasgerstmayr)

* [ENHANCEMENT] Implement polling tenants concurrently [#3647](https://github.com/grafana/tempo/pull/3647) (@zalegrala)
* [BUGFIX] Fix panic in certain metrics queries using `rate()` with `by` [#3847](https://github.com/grafana/tempo/pull/3847) (@stoewer)
* [BUGFIX] Fix double appending the primary iterator on second pass with event iterator [#3903](https://github.com/grafana/tempo/pull/3903) (@ie-pham)
* [BUGFIX] Fix metrics queries when grouping by attributes that may not exist [#3734](https://github.com/grafana/tempo/pull/3734) (@mdisibio)
Expand Down
17 changes: 14 additions & 3 deletions docs/sources/tempo/configuration/_index.md
Original file line number Diff line number Diff line change
Expand Up @@ -1071,6 +1071,9 @@ storage:
# the index. Default 2.
[blocklist_poll_tenant_index_builders: <int>]

# Number of tenants to poll concurrently. Default is 1.
[blocklist_poll_tenant_concurrency: <int>]

# The oldest allowable tenant index. If an index is pulled that is older than this duration,
# the polling will consider this an error. Note that `blocklist_poll_fallback` applies here.
# If fallback is true and a tenant index exceeds this duration, it will fall back to listing
Expand All @@ -1084,12 +1087,20 @@ storage:
# Default 0 (disabled)
[blocklist_poll_jitter_ms: <int>]

# Polling will tolerate this many consecutive errors before failing and exiting early for the
# current repoll. Can be set to 0 which means a single error is sufficient to fail and exit early
# (matches the original polling behavior).
# Polling will tolerate this many consecutive errors during the poll of
# a single tenant before marking the tenant as failed.
# This can be set to 0 which means a single error is sufficient to mark the tenant failed
# and exit early. Any previous results for the failing tenant will be kept.
# See also `blocklist_poll_tolerate_tenant_failures` below.
# Default 1
[blocklist_poll_tolerate_consecutive_errors: <int>]

# Polling will tolerate this number of tenants which have failed to poll.
# This can be set to 0 which means a single tenant failure sufficient to fail and exit
# early.
# Default 1
[blocklist_poll_tolerate_tenant_failures: <int>]

# Used to tune how quickly the poller will delete any remaining backend
# objects found in the tenant path. This functionality requires enabling
# below.
Expand Down
140 changes: 89 additions & 51 deletions integration/poller/poller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,10 @@ import (
"bytes"
"context"
"crypto/rand"
mathrand "math/rand/v2"
"os"
"sort"
"strconv"
"testing"
"time"

Expand Down Expand Up @@ -121,18 +123,18 @@ func TestPollerOwnership(t *testing.T) {
var ww backend.RawWriter
var cc backend.Compactor

concurrency := 3
listBlockConcurrency := 10

e := hhh.Endpoint(hhh.HTTPPort())
switch tc.name {
case "s3":
cfg.StorageConfig.Trace.S3.ListBlocksConcurrency = concurrency
cfg.StorageConfig.Trace.S3.ListBlocksConcurrency = listBlockConcurrency
cfg.StorageConfig.Trace.S3.Endpoint = e
cfg.StorageConfig.Trace.S3.Prefix = pc.prefix
cfg.Overrides.UserConfigurableOverridesConfig.Client.S3.Endpoint = e
rr, ww, cc, err = s3.New(cfg.StorageConfig.Trace.S3)
case "gcs":
cfg.StorageConfig.Trace.GCS.ListBlocksConcurrency = concurrency
cfg.StorageConfig.Trace.GCS.ListBlocksConcurrency = listBlockConcurrency
cfg.StorageConfig.Trace.GCS.Endpoint = e
cfg.StorageConfig.Trace.GCS.Prefix = pc.prefix
cfg.Overrides.UserConfigurableOverridesConfig.Client.GCS.Endpoint = e
Expand All @@ -150,70 +152,56 @@ func TestPollerOwnership(t *testing.T) {

blocklistPoller := blocklist.NewPoller(&blocklist.PollerConfig{
PollConcurrency: 3,
TenantPollConcurrency: 3,
TenantIndexBuilders: 1,
EmptyTenantDeletionAge: 10 * time.Minute,
}, OwnsEverythingSharder, r, cc, w, logger)

// Use the block boundaries in the GCS and S3 implementation
bb := blockboundary.CreateBlockBoundaries(concurrency)
// Pick a boundary to use for this test
base := bb[1]
expected := []uuid.UUID{}
bb := blockboundary.CreateBlockBoundaries(listBlockConcurrency)

expected = append(expected, uuid.MustParse("00000000-0000-0000-0000-000000000000"))
expected = append(expected, uuid.MustParse("ffffffff-ffff-ffff-ffff-ffffffffffff"))
tenantCount := 250
tenantExpected := map[string][]uuid.UUID{}

// Grab the one before the boundary
decrementUUIDBytes(base)
expected = append(expected, uuid.UUID(base))
// Push some data to a few tenants
for i := 0; i < tenantCount; i++ {
testTenant := tenant + strconv.Itoa(i)
tenantExpected[testTenant] = pushBlocksToTenant(t, testTenant, bb, w)

incrementUUIDBytes(base)
expected = append(expected, uuid.UUID(base))
mmResults, cmResults, listBlocksErr := rr.ListBlocks(context.Background(), testTenant)
require.NoError(t, listBlocksErr)
sort.Slice(mmResults, func(i, j int) bool { return mmResults[i].String() < mmResults[j].String() })

incrementUUIDBytes(base)
expected = append(expected, uuid.UUID(base))

incrementUUIDBytes(base)
expected = append(expected, uuid.UUID(base))

writeTenantBlocks(t, w, tenant, expected)

sort.Slice(expected, func(i, j int) bool { return expected[i].String() < expected[j].String() })
t.Logf("expected: %v", expected)

mmResults, cmResults, err := rr.ListBlocks(context.Background(), tenant)
require.NoError(t, err)
sort.Slice(mmResults, func(i, j int) bool { return mmResults[i].String() < mmResults[j].String() })
t.Logf("mmResults: %s", mmResults)
t.Logf("cmResults: %s", cmResults)

assert.Equal(t, expected, mmResults)
assert.Equal(t, len(expected), len(mmResults))
assert.Equal(t, 0, len(cmResults))
require.Equal(t, tenantExpected[testTenant], mmResults)
require.Equal(t, len(tenantExpected[testTenant]), len(mmResults))
require.Equal(t, 0, len(cmResults))
}

l := blocklist.New()
mm, cm, err := blocklistPoller.Do(l)
require.NoError(t, err)
t.Logf("mm: %v", mm)
t.Logf("cm: %v", cm)
// t.Logf("mm: %v", mm)
// t.Logf("cm: %v", cm)

l.ApplyPollResults(mm, cm)

metas := l.Metas(tenant)
for testTenant, expected := range tenantExpected {
metas := l.Metas(testTenant)

actual := []uuid.UUID{}
for _, m := range metas {
actual = append(actual, m.BlockID)
}
actual := []uuid.UUID{}
for _, m := range metas {
actual = append(actual, m.BlockID)
}

sort.Slice(actual, func(i, j int) bool { return actual[i].String() < actual[j].String() })
sort.Slice(actual, func(i, j int) bool { return actual[i].String() < actual[j].String() })

assert.Equal(t, expected, actual)
assert.Equal(t, len(expected), len(metas))
t.Logf("actual: %v", actual)
assert.Equal(t, expected, actual)
assert.Equal(t, len(expected), len(metas))
// t.Logf("actual: %v", actual)

for _, e := range expected {
assert.True(t, found(e, metas))
for _, e := range expected {
assert.True(t, found(e, metas))
}
}
})
}
Expand Down Expand Up @@ -296,21 +284,21 @@ func TestTenantDeletion(t *testing.T) {
var ww backend.RawWriter
var cc backend.Compactor

concurrency := 3
listBlockConcurrency := 3
ctx := context.Background()

e := hhh.Endpoint(hhh.HTTPPort())
switch tc.name {
case "s3":
cfg.StorageConfig.Trace.S3.Endpoint = e
cfg.StorageConfig.Trace.S3.ListBlocksConcurrency = concurrency
cfg.StorageConfig.Trace.S3.ListBlocksConcurrency = listBlockConcurrency
cfg.StorageConfig.Trace.S3.Prefix = pc.prefix
cfg.Overrides.UserConfigurableOverridesConfig.Client.S3.Endpoint = e
rr, ww, cc, err = s3.New(cfg.StorageConfig.Trace.S3)
case "gcs":
cfg.Overrides.UserConfigurableOverridesConfig.Client.GCS.Endpoint = e
cfg.StorageConfig.Trace.GCS.Endpoint = e
cfg.StorageConfig.Trace.GCS.ListBlocksConcurrency = concurrency
cfg.StorageConfig.Trace.GCS.ListBlocksConcurrency = listBlockConcurrency
cfg.StorageConfig.Trace.GCS.Prefix = pc.prefix
rr, ww, cc, err = gcs.New(cfg.StorageConfig.Trace.GCS)
case "azure":
Expand All @@ -329,6 +317,7 @@ func TestTenantDeletion(t *testing.T) {
PollConcurrency: 3,
TenantIndexBuilders: 1,
EmptyTenantDeletionAge: 100 * time.Millisecond,
TenantPollConcurrency: 3,
}, OwnsEverythingSharder, r, cc, w, logger)

l := blocklist.New()
Expand Down Expand Up @@ -364,14 +353,15 @@ func TestTenantDeletion(t *testing.T) {
TenantIndexBuilders: 1,
EmptyTenantDeletionAge: 100 * time.Millisecond,
EmptyTenantDeletionEnabled: true,
TenantPollConcurrency: 3,
}, OwnsEverythingSharder, r, cc, w, logger)

// Again
_, _, err = blocklistPoller.Do(l)
require.NoError(t, err)

tennants, err = r.Tenants(ctx)
t.Logf("tennants: %v", tennants)
// t.Logf("tennants: %v", tennants)
require.NoError(t, err)
require.Equal(t, 0, len(tennants))
})
Expand Down Expand Up @@ -455,3 +445,51 @@ func writeBadBlockFiles(t *testing.T, ww backend.RawWriter, rr backend.RawReader
require.NoError(t, err)
t.Logf("items: %v", found)
}

func pushBlocksToTenant(t *testing.T, tenant string, bb [][]byte, w backend.Writer) []uuid.UUID {
// Randomly pick a block boundary
r := mathrand.IntN(len(bb))

base := bb[r]
t.Logf("base: %v", base)
expected := []uuid.UUID{}

// Include the min and max in each tenant for testing
expected = append(expected, uuid.MustParse("00000000-0000-0000-0000-000000000000"))
expected = append(expected, uuid.MustParse("ffffffff-ffff-ffff-ffff-ffffffffffff"))

// If we are above zero, then we have room to decrement
if r > 0 {
decrementUUIDBytes(base)
expected = append(expected, uuid.UUID(base))
}

// If we are n-1 then we have room to increment
if r < len(bb)-1 {
// Grab the one after the boundary
incrementUUIDBytes(base)
expected = append(expected, uuid.UUID(base))
}

// If we are n-2 then we have room to increment again
if r < len(bb)-2 {
// Grab the one after the boundary
incrementUUIDBytes(base)
expected = append(expected, uuid.UUID(base))
}

// If we are n-3 then we have room to increment again
if r < len(bb)-3 {
// Grab the one after the boundary
incrementUUIDBytes(base)
expected = append(expected, uuid.UUID(base))
}

// Write the blocks using the expectaed block IDs
writeTenantBlocks(t, w, tenant, expected)

sort.Slice(expected, func(i, j int) bool { return expected[i].String() < expected[j].String() })
// t.Logf("expected: %v", expected)

return expected
}
1 change: 1 addition & 0 deletions modules/storage/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ func (cfg *Config) RegisterFlagsAndApplyDefaults(prefix string, f *flag.FlagSet)
cfg.Trace.BlocklistPollConcurrency = tempodb.DefaultBlocklistPollConcurrency
cfg.Trace.BlocklistPollTenantIndexBuilders = tempodb.DefaultTenantIndexBuilders
cfg.Trace.BlocklistPollTolerateConsecutiveErrors = tempodb.DefaultTolerateConsecutiveErrors
cfg.Trace.BlocklistPollTolerateTenantFailures = tempodb.DefaultTolerateTenantFailures

f.StringVar(&cfg.Trace.Backend, util.PrefixConfig(prefix, "trace.backend"), "", "Trace backend (s3, azure, gcs, local)")
f.DurationVar(&cfg.Trace.BlocklistPoll, util.PrefixConfig(prefix, "trace.blocklist_poll"), tempodb.DefaultBlocklistPoll, "Period at which to run the maintenance cycle.")
Expand Down
Loading

0 comments on commit d3a9caf

Please sign in to comment.