Skip to content

Commit

Permalink
Implement polling tenants concurrently
Browse files Browse the repository at this point in the history
  • Loading branch information
zalegrala committed Jul 12, 2024
1 parent b446190 commit ee6de7c
Show file tree
Hide file tree
Showing 5 changed files with 88 additions and 49 deletions.
1 change: 1 addition & 0 deletions integration/poller/poller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,7 @@ func TestPollerOwnership(t *testing.T) {

blocklistPoller := blocklist.NewPoller(&blocklist.PollerConfig{
PollConcurrency: 3,
TenantPollConcurrency: 2,
TenantIndexBuilders: 1,
EmptyTenantDeletionAge: 10 * time.Minute,
}, OwnsEverythingSharder, r, cc, w, logger)
Expand Down
85 changes: 55 additions & 30 deletions tempodb/blocklist/poller.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
spanlog "github.com/opentracing/opentracing-go/log"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
"go.uber.org/atomic"

"github.com/grafana/tempo/pkg/boundedwaitgroup"
"github.com/grafana/tempo/tempodb/backend"
Expand Down Expand Up @@ -76,6 +77,7 @@ var (
// Config is used to configure the poller
type PollerConfig struct {
PollConcurrency uint
TenantPollConcurrency uint
PollFallback bool
TenantIndexBuilders int
StaleTenantIndex time.Duration
Expand Down Expand Up @@ -148,44 +150,67 @@ func (p *Poller) Do(previous *List) (PerTenant, PerTenantCompacted, error) {
return nil, nil, err
}

blocklist := PerTenant{}
compactedBlocklist := PerTenantCompacted{}
var (
wg = boundedwaitgroup.New(p.cfg.TenantPollConcurrency)
mtx = sync.Mutex{}

blocklist = PerTenant{}
compactedBlocklist = PerTenantCompacted{}

consecutiveErrors := 0
finalErr atomic.Error
consecutiveErrors int
)

for _, tenantID := range tenants {
newBlockList, newCompactedBlockList, err := p.pollTenantAndCreateIndex(ctx, tenantID, previous)
if err != nil {
level.Error(p.logger).Log("msg", "failed to poll or create index for tenant", "tenant", tenantID, "err", err)
consecutiveErrors++
if consecutiveErrors > p.cfg.TolerateConsecutiveErrors {
level.Error(p.logger).Log("msg", "exiting polling loop early because too many errors", "errCount", consecutiveErrors)
return nil, nil, err
if consecutiveErrors > p.cfg.TolerateConsecutiveErrors {
level.Error(p.logger).Log("msg", "exiting polling loop early because too many errors", "errCount", consecutiveErrors)
return nil, nil, finalErr.Load()
}

wg.Add(1)
go func(tenantID string) {
defer wg.Done()

newBlockList, newCompactedBlockList, err := p.pollTenantAndCreateIndex(ctx, tenantID, previous)
mtx.Lock()
defer mtx.Unlock()
if err != nil {
level.Error(p.logger).Log("msg", "failed to poll or create index for tenant", "tenant", tenantID, "err", err)
consecutiveErrors++
blocklist[tenantID] = previous.Metas(tenantID)
compactedBlocklist[tenantID] = previous.CompactedMetas(tenantID)
if consecutiveErrors > p.cfg.TolerateConsecutiveErrors {
finalErr.Store(err)
return
}
return
}

blocklist[tenantID] = previous.Metas(tenantID)
compactedBlocklist[tenantID] = previous.CompactedMetas(tenantID)
continue
}
consecutiveErrors = 0
if len(newBlockList) > 0 || len(newCompactedBlockList) > 0 {
blocklist[tenantID] = newBlockList
compactedBlocklist[tenantID] = newCompactedBlockList

consecutiveErrors = 0
if len(newBlockList) > 0 || len(newCompactedBlockList) > 0 {
blocklist[tenantID] = newBlockList
compactedBlocklist[tenantID] = newCompactedBlockList
metricBlocklistLength.WithLabelValues(tenantID).Set(float64(len(newBlockList)))

metricBlocklistLength.WithLabelValues(tenantID).Set(float64(len(newBlockList)))
backendMetaMetrics := sumTotalBackendMetaMetrics(newBlockList, newCompactedBlockList)
metricBackendObjects.WithLabelValues(tenantID, blockStatusLiveLabel).Set(float64(backendMetaMetrics.blockMetaTotalObjects))
metricBackendObjects.WithLabelValues(tenantID, blockStatusCompactedLabel).Set(float64(backendMetaMetrics.compactedBlockMetaTotalObjects))
metricBackendBytes.WithLabelValues(tenantID, blockStatusLiveLabel).Set(float64(backendMetaMetrics.blockMetaTotalBytes))
metricBackendBytes.WithLabelValues(tenantID, blockStatusCompactedLabel).Set(float64(backendMetaMetrics.compactedBlockMetaTotalBytes))
return
}
metricBlocklistLength.DeleteLabelValues(tenantID)
metricBackendObjects.DeleteLabelValues(tenantID)
metricBackendObjects.DeleteLabelValues(tenantID)
metricBackendBytes.DeleteLabelValues(tenantID)
}(tenantID)
}

backendMetaMetrics := sumTotalBackendMetaMetrics(newBlockList, newCompactedBlockList)
metricBackendObjects.WithLabelValues(tenantID, blockStatusLiveLabel).Set(float64(backendMetaMetrics.blockMetaTotalObjects))
metricBackendObjects.WithLabelValues(tenantID, blockStatusCompactedLabel).Set(float64(backendMetaMetrics.compactedBlockMetaTotalObjects))
metricBackendBytes.WithLabelValues(tenantID, blockStatusLiveLabel).Set(float64(backendMetaMetrics.blockMetaTotalBytes))
metricBackendBytes.WithLabelValues(tenantID, blockStatusCompactedLabel).Set(float64(backendMetaMetrics.compactedBlockMetaTotalBytes))
continue
}
metricBlocklistLength.DeleteLabelValues(tenantID)
metricBackendObjects.DeleteLabelValues(tenantID)
metricBackendObjects.DeleteLabelValues(tenantID)
metricBackendBytes.DeleteLabelValues(tenantID)
wg.Wait()

if err := finalErr.Load(); err != nil {
return nil, nil, err
}

return blocklist, compactedBlocklist, nil
Expand Down
33 changes: 20 additions & 13 deletions tempodb/blocklist/poller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,11 @@ import (
)

var (
testPollConcurrency = uint(10)
testPollFallback = true
testBuilders = 1
testEmptyTenantIndexAge = 1 * time.Minute
testPollConcurrency = uint(10)
testTenantPollConcurrency = uint(2)
testPollFallback = true
testBuilders = 1
testEmptyTenantIndexAge = 1 * time.Minute
)

type mockJobSharder struct {
Expand Down Expand Up @@ -158,9 +159,10 @@ func TestTenantIndexBuilder(t *testing.T) {
b := newBlocklist(PerTenant{}, PerTenantCompacted{})

poller := NewPoller(&PollerConfig{
PollConcurrency: testPollConcurrency,
PollFallback: testPollFallback,
TenantIndexBuilders: testBuilders,
PollConcurrency: testPollConcurrency,
TenantPollConcurrency: testTenantPollConcurrency,
PollFallback: testPollFallback,
TenantIndexBuilders: testBuilders,
}, &mockJobSharder{
owns: true,
}, r, c, w, log.NewNopLogger())
Expand Down Expand Up @@ -262,6 +264,7 @@ func TestTenantIndexFallback(t *testing.T) {

poller := NewPoller(&PollerConfig{
PollConcurrency: testPollConcurrency,
TenantPollConcurrency: testTenantPollConcurrency,
PollFallback: tc.pollFallback,
TenantIndexBuilders: testBuilders,
StaleTenantIndex: tc.staleTenantIndex,
Expand Down Expand Up @@ -352,9 +355,10 @@ func TestPollBlock(t *testing.T) {
w := &backend.MockWriter{}

poller := NewPoller(&PollerConfig{
PollConcurrency: testPollConcurrency,
PollFallback: testPollFallback,
TenantIndexBuilders: testBuilders,
PollConcurrency: testPollConcurrency,
TenantPollConcurrency: testTenantPollConcurrency,
PollFallback: testPollFallback,
TenantIndexBuilders: testBuilders,
}, &mockJobSharder{}, r, c, w, log.NewNopLogger())
actualMeta, actualCompactedMeta, err := poller.pollBlock(context.Background(), tc.pollTenantID, tc.pollBlockID, false)

Expand Down Expand Up @@ -572,6 +576,7 @@ func TestPollTolerateConsecutiveErrors(t *testing.T) {

poller := NewPoller(&PollerConfig{
PollConcurrency: testPollConcurrency,
TenantPollConcurrency: testTenantPollConcurrency,
PollFallback: testPollFallback,
TenantIndexBuilders: testBuilders,
TolerateConsecutiveErrors: tc.tolerate,
Expand Down Expand Up @@ -813,6 +818,7 @@ func TestPollComparePreviousResults(t *testing.T) {
PollConcurrency: testPollConcurrency,
PollFallback: testPollFallback,
TenantIndexBuilders: testBuilders,
TenantPollConcurrency: testTenantPollConcurrency,
TolerateConsecutiveErrors: tc.tollerateErrors,
}, s, r, c, w, log.NewNopLogger())

Expand Down Expand Up @@ -898,9 +904,10 @@ func BenchmarkPoller10k(b *testing.B) {

// This mock reader returns error or nil based on the tenant ID
poller := NewPoller(&PollerConfig{
PollConcurrency: testPollConcurrency,
PollFallback: testPollFallback,
TenantIndexBuilders: testBuilders,
PollConcurrency: testPollConcurrency,
TenantPollConcurrency: testTenantPollConcurrency,
PollFallback: testPollFallback,
TenantIndexBuilders: testBuilders,
}, s, r, c, w, log.NewNopLogger())

runName := fmt.Sprintf("%d-%d", tc.tenantCount, tc.blocksPerTenant)
Expand Down
14 changes: 8 additions & 6 deletions tempodb/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,13 @@ import (
)

const (
DefaultBlocklistPoll = 5 * time.Minute
DefaultMaxTimePerTenant = 5 * time.Minute
DefaultBlocklistPollConcurrency = uint(50)
DefaultRetentionConcurrency = uint(10)
DefaultTenantIndexBuilders = 2
DefaultTolerateConsecutiveErrors = 1
DefaultBlocklistPoll = 5 * time.Minute
DefaultMaxTimePerTenant = 5 * time.Minute
DefaultBlocklistPollConcurrency = uint(50)
DefaultBlocklistPollTenantConcurrency = uint(1)
DefaultRetentionConcurrency = uint(10)
DefaultTenantIndexBuilders = 2
DefaultTolerateConsecutiveErrors = 1

DefaultEmptyTenantDeletionAge = 12 * time.Hour

Expand All @@ -47,6 +48,7 @@ type Config struct {

BlocklistPoll time.Duration `yaml:"blocklist_poll"`
BlocklistPollConcurrency uint `yaml:"blocklist_poll_concurrency"`
BlocklistPollTenantConcurrency uint `yaml:"blocklist_poll_tenant_concurrency"`
BlocklistPollFallback bool `yaml:"blocklist_poll_fallback"`
BlocklistPollTenantIndexBuilders int `yaml:"blocklist_poll_tenant_index_builders"`
BlocklistPollStaleTenantIndex time.Duration `yaml:"blocklist_poll_stale_tenant_index"`
Expand Down
4 changes: 4 additions & 0 deletions tempodb/tempodb.go
Original file line number Diff line number Diff line change
Expand Up @@ -525,6 +525,10 @@ func (rw *readerWriter) EnablePolling(ctx context.Context, sharder blocklist.Job
rw.cfg.BlocklistPollConcurrency = DefaultBlocklistPollConcurrency
}

if rw.cfg.BlocklistPollTenantConcurrency == 0 {
rw.cfg.BlocklistPollTenantConcurrency = DefaultBlocklistPollTenantConcurrency
}

if rw.cfg.BlocklistPollTenantIndexBuilders <= 0 {
rw.cfg.BlocklistPollTenantIndexBuilders = DefaultTenantIndexBuilders
}
Expand Down

0 comments on commit ee6de7c

Please sign in to comment.