From 4bfa3807cfdf906321f9b9fdcbb5f1f278910ccc Mon Sep 17 00:00:00 2001 From: Salva Corts Date: Fri, 8 Nov 2024 10:21:23 +0100 Subject: [PATCH] revert: "perf(bloom): Compute chunkrefs for series right before sending task to builder" (#14839) --- pkg/bloombuild/common/tsdb.go | 26 +++-- pkg/bloombuild/common/tsdb_test.go | 6 +- pkg/bloombuild/planner/planner.go | 80 +++++-------- pkg/bloombuild/planner/planner_test.go | 11 +- pkg/bloombuild/planner/plannertest/utils.go | 58 +--------- .../planner/strategies/chunksize.go | 82 +++++++++----- .../planner/strategies/chunksize_test.go | 29 +++-- pkg/bloombuild/planner/strategies/factory.go | 3 +- .../planner/strategies/splitkeyspace.go | 13 ++- .../planner/strategies/splitkeyspace_test.go | 60 ++++++++-- pkg/bloombuild/planner/strategies/task.go | 106 ------------------ pkg/bloombuild/planner/task.go | 17 +-- pkg/bloombuild/protos/compat.go | 2 - 13 files changed, 188 insertions(+), 305 deletions(-) delete mode 100644 pkg/bloombuild/planner/strategies/task.go diff --git a/pkg/bloombuild/common/tsdb.go b/pkg/bloombuild/common/tsdb.go index 868828e72a337..a58b7cd6130f9 100644 --- a/pkg/bloombuild/common/tsdb.go +++ b/pkg/bloombuild/common/tsdb.go @@ -29,10 +29,8 @@ const ( gzipExtension = ".gz" ) -type ForSeries = sharding.ForSeries - type ClosableForSeries interface { - ForSeries + sharding.ForSeries Close() error } @@ -126,21 +124,33 @@ func (b *BloomTSDBStore) LoadTSDB( return idx, nil } -func NewTSDBSeriesIter(ctx context.Context, user string, f sharding.ForSeries, bounds v1.FingerprintBounds) (iter.Iterator[model.Fingerprint], error) { +func NewTSDBSeriesIter(ctx context.Context, user string, f sharding.ForSeries, bounds v1.FingerprintBounds) (iter.Iterator[*v1.Series], error) { // TODO(salvacorts): Create a pool - series := make([]model.Fingerprint, 0, 100) + series := make([]*v1.Series, 0, 100) if err := f.ForSeries( ctx, user, bounds, 0, math.MaxInt64, - func(_ labels.Labels, fp model.Fingerprint, _ []index.ChunkMeta) (stop bool) { + func(_ labels.Labels, fp model.Fingerprint, chks []index.ChunkMeta) (stop bool) { select { case <-ctx.Done(): return true default: - series = append(series, fp) + res := &v1.Series{ + Fingerprint: fp, + Chunks: make(v1.ChunkRefs, 0, len(chks)), + } + for _, chk := range chks { + res.Chunks = append(res.Chunks, v1.ChunkRef{ + From: model.Time(chk.MinTime), + Through: model.Time(chk.MaxTime), + Checksum: chk.Checksum, + }) + } + + series = append(series, res) return false } }, @@ -151,7 +161,7 @@ func NewTSDBSeriesIter(ctx context.Context, user string, f sharding.ForSeries, b select { case <-ctx.Done(): - return iter.NewEmptyIter[model.Fingerprint](), ctx.Err() + return iter.NewEmptyIter[*v1.Series](), ctx.Err() default: return iter.NewCancelableIter(ctx, iter.NewSliceIter(series)), nil } diff --git a/pkg/bloombuild/common/tsdb_test.go b/pkg/bloombuild/common/tsdb_test.go index 13e83c14719f9..b2df7982f4382 100644 --- a/pkg/bloombuild/common/tsdb_test.go +++ b/pkg/bloombuild/common/tsdb_test.go @@ -66,10 +66,10 @@ func TestTSDBSeriesIter(t *testing.T) { itr, err := NewTSDBSeriesIter(context.Background(), "", forSeriesTestImpl(input), v1.NewBounds(0, math.MaxUint64)) require.NoError(t, err) - v1.CompareIterators( + v1.EqualIterators( t, - func(t *testing.T, a model.Fingerprint, b *v1.Series) { - require.Equal(t, a, b.Fingerprint) + func(a, b *v1.Series) { + require.Equal(t, a, b) }, itr, srcItr, diff --git a/pkg/bloombuild/planner/planner.go b/pkg/bloombuild/planner/planner.go index 82c83d594b1e4..7c13dedb50452 100644 --- a/pkg/bloombuild/planner/planner.go +++ b/pkg/bloombuild/planner/planner.go @@ -227,10 +227,9 @@ func (p *Planner) runOne(ctx context.Context) error { } var ( - wg sync.WaitGroup - start = time.Now() - status = statusFailure - openTSDBs strategies.TSDBSet + wg sync.WaitGroup + start = time.Now() + status = statusFailure ) defer func() { p.metrics.buildCompleted.WithLabelValues(status).Inc() @@ -239,15 +238,6 @@ func (p *Planner) runOne(ctx context.Context) error { if status == statusSuccess { p.metrics.buildLastSuccess.SetToCurrentTime() } - - // Close all open TSDBs. - // These are used to get the chunkrefs for the series in the gaps. - // We populate the chunkrefs when we send the task to the builder. - for idx, reader := range openTSDBs { - if err := reader.Close(); err != nil { - level.Error(p.logger).Log("msg", "failed to close tsdb", "tsdb", idx.Name(), "err", err) - } - } }() p.metrics.buildStarted.Inc() @@ -285,19 +275,7 @@ func (p *Planner) runOne(ctx context.Context) error { table: table, } - tsdbs, err := p.tsdbStore.ResolveTSDBs(ctx, table, tenant) - if err != nil { - level.Error(logger).Log("msg", "failed to resolve tsdbs", "err", err) - continue - } - - openTSDBs, err = openAllTSDBs(ctx, table, tenant, p.tsdbStore, tsdbs, openTSDBs) - if err != nil { - level.Error(logger).Log("msg", "failed to open all tsdbs", "err", err) - continue - } - - tasks, existingMetas, err := p.computeTasks(ctx, table, tenant, openTSDBs) + tasks, existingMetas, err := p.computeTasks(ctx, table, tenant) if err != nil { level.Error(logger).Log("msg", "failed to compute tasks", "err", err) continue @@ -308,7 +286,7 @@ func (p *Planner) runOne(ctx context.Context) error { now := time.Now() for _, task := range tasks { - queueTask := NewQueueTask(ctx, now, task, openTSDBs[task.TSDB], resultsCh) + queueTask := NewQueueTask(ctx, now, task, resultsCh) if err := p.enqueueTask(queueTask); err != nil { level.Error(logger).Log("msg", "error enqueuing task", "err", err) continue @@ -396,8 +374,7 @@ func (p *Planner) computeTasks( ctx context.Context, table config.DayTable, tenant string, - tsdbs strategies.TSDBSet, -) ([]*strategies.Task, []bloomshipper.Meta, error) { +) ([]*protos.Task, []bloomshipper.Meta, error) { strategy, err := strategies.NewStrategy(tenant, p.limits, p.logger) if err != nil { return nil, nil, fmt.Errorf("error creating strategy: %w", err) @@ -425,11 +402,29 @@ func (p *Planner) computeTasks( return nil, nil, fmt.Errorf("failed to delete outdated metas during planning: %w", err) } + // Resolve TSDBs + tsdbs, err := p.tsdbStore.ResolveTSDBs(ctx, table, tenant) + if err != nil { + return nil, nil, fmt.Errorf("failed to resolve tsdbs: %w", err) + } + if len(tsdbs) == 0 { return nil, metas, nil } - tasks, err := strategy.Plan(ctx, table, tenant, tsdbs, metas) + openTSDBs, err := openAllTSDBs(ctx, table, tenant, p.tsdbStore, tsdbs) + if err != nil { + return nil, nil, fmt.Errorf("failed to open all tsdbs: %w", err) + } + defer func() { + for idx, reader := range openTSDBs { + if err := reader.Close(); err != nil { + level.Error(logger).Log("msg", "failed to close index", "err", err, "tsdb", idx.Name()) + } + } + }() + + tasks, err := strategy.Plan(ctx, table, tenant, openTSDBs, metas) if err != nil { return nil, nil, fmt.Errorf("failed to plan tasks: %w", err) } @@ -511,26 +506,18 @@ func openAllTSDBs( tenant string, store common.TSDBStore, tsdbs []tsdb.SingleTenantTSDBIdentifier, - alreadyOpen strategies.TSDBSet, -) (strategies.TSDBSet, error) { - if len(alreadyOpen) == 0 { - alreadyOpen = make(strategies.TSDBSet, len(tsdbs)) - } - +) (map[tsdb.SingleTenantTSDBIdentifier]common.ClosableForSeries, error) { + openTSDBs := make(map[tsdb.SingleTenantTSDBIdentifier]common.ClosableForSeries, len(tsdbs)) for _, idx := range tsdbs { - if _, ok := alreadyOpen[idx]; ok { - continue - } - - reader, err := store.LoadTSDB(ctx, table, tenant, idx) + tsdb, err := store.LoadTSDB(ctx, table, tenant, idx) if err != nil { return nil, fmt.Errorf("failed to load tsdb: %w", err) } - alreadyOpen[idx] = reader + openTSDBs[idx] = tsdb } - return alreadyOpen, nil + return openTSDBs, nil } // deleteOutdatedMetasAndBlocks filters out the outdated metas from the `metas` argument and deletes them from the store. @@ -860,13 +847,8 @@ func (p *Planner) forwardTaskToBuilder( builderID string, task *QueueTask, ) (*protos.TaskResult, error) { - protoTask, err := task.ToProtoTask(builder.Context()) - if err != nil { - return nil, fmt.Errorf("error converting task to proto task: %w", err) - } - msg := &protos.PlannerToBuilder{ - Task: protoTask, + Task: task.ToProtoTask(), } if err := builder.Send(msg); err != nil { diff --git a/pkg/bloombuild/planner/planner_test.go b/pkg/bloombuild/planner/planner_test.go index 9b0082f15e08c..6b1b1e0beba16 100644 --- a/pkg/bloombuild/planner/planner_test.go +++ b/pkg/bloombuild/planner/planner_test.go @@ -713,21 +713,12 @@ func (f *fakeBuilder) Recv() (*protos.BuilderToPlanner, error) { } func createTasks(n int, resultsCh chan *protos.TaskResult) []*QueueTask { - forSeries := plannertest.NewFakeForSeries(plannertest.GenV1Series(v1.NewBounds(0, 100))) - tasks := make([]*QueueTask, 0, n) // Enqueue tasks for i := 0; i < n; i++ { task := NewQueueTask( context.Background(), time.Now(), - strategies.NewTask( - config.NewDayTable(plannertest.TestDay, "fake"), - "fakeTenant", - v1.NewBounds(0, 10), - plannertest.TsdbID(1), - nil, - ), - forSeries, + protos.NewTask(config.NewDayTable(plannertest.TestDay, "fake"), "fakeTenant", v1.NewBounds(0, 10), plannertest.TsdbID(1), nil), resultsCh, ) tasks = append(tasks, task) diff --git a/pkg/bloombuild/planner/plannertest/utils.go b/pkg/bloombuild/planner/plannertest/utils.go index 8938966ff83bb..706e0abdf00a7 100644 --- a/pkg/bloombuild/planner/plannertest/utils.go +++ b/pkg/bloombuild/planner/plannertest/utils.go @@ -6,7 +6,6 @@ import ( "time" "github.com/prometheus/common/model" - "github.com/prometheus/prometheus/model/labels" "github.com/grafana/loki/v3/pkg/compression" v2 "github.com/grafana/loki/v3/pkg/iter/v2" @@ -14,7 +13,6 @@ import ( "github.com/grafana/loki/v3/pkg/storage/config" "github.com/grafana/loki/v3/pkg/storage/stores/shipper/bloomshipper" "github.com/grafana/loki/v3/pkg/storage/stores/shipper/indexshipper/tsdb" - "github.com/grafana/loki/v3/pkg/storage/stores/shipper/indexshipper/tsdb/index" ) var TestDay = ParseDayTime("2023-09-01") @@ -89,23 +87,11 @@ func GenBlock(ref bloomshipper.BlockRef) (bloomshipper.Block, error) { }, nil } -func GenSeries(bounds v1.FingerprintBounds) []model.Fingerprint { +func GenSeries(bounds v1.FingerprintBounds) []*v1.Series { return GenSeriesWithStep(bounds, 1) } -func GenSeriesWithStep(bounds v1.FingerprintBounds, step int) []model.Fingerprint { - series := make([]model.Fingerprint, 0, int(bounds.Max-bounds.Min+1)/step) - for i := bounds.Min; i <= bounds.Max; i += model.Fingerprint(step) { - series = append(series, i) - } - return series -} - -func GenV1Series(bounds v1.FingerprintBounds) []*v1.Series { - return GenV1SeriesWithStep(bounds, 1) -} - -func GenV1SeriesWithStep(bounds v1.FingerprintBounds, step int) []*v1.Series { +func GenSeriesWithStep(bounds v1.FingerprintBounds, step int) []*v1.Series { series := make([]*v1.Series, 0, int(bounds.Max-bounds.Min+1)/step) for i := bounds.Min; i <= bounds.Max; i += model.Fingerprint(step) { series = append(series, &v1.Series{ @@ -153,43 +139,3 @@ func ParseDayTime(s string) config.DayTime { Time: model.TimeFromUnix(t.Unix()), } } - -type FakeForSeries struct { - series []*v1.Series -} - -func NewFakeForSeries(series []*v1.Series) *FakeForSeries { - return &FakeForSeries{ - series: series, - } -} - -func (f FakeForSeries) ForSeries(_ context.Context, _ string, ff index.FingerprintFilter, _ model.Time, _ model.Time, fn func(labels.Labels, model.Fingerprint, []index.ChunkMeta) (stop bool), _ ...*labels.Matcher) error { - overlapping := make([]*v1.Series, 0, len(f.series)) - for _, s := range f.series { - if ff.Match(s.Fingerprint) { - overlapping = append(overlapping, s) - } - } - - for _, s := range overlapping { - chunks := make([]index.ChunkMeta, 0, len(s.Chunks)) - for _, c := range s.Chunks { - chunks = append(chunks, index.ChunkMeta{ - MinTime: int64(c.From), - MaxTime: int64(c.Through), - Checksum: c.Checksum, - KB: 100, - }) - } - - if fn(labels.EmptyLabels(), s.Fingerprint, chunks) { - break - } - } - return nil -} - -func (f FakeForSeries) Close() error { - return nil -} diff --git a/pkg/bloombuild/planner/strategies/chunksize.go b/pkg/bloombuild/planner/strategies/chunksize.go index 3d59f40fb56ab..21f473908dd99 100644 --- a/pkg/bloombuild/planner/strategies/chunksize.go +++ b/pkg/bloombuild/planner/strategies/chunksize.go @@ -12,6 +12,7 @@ import ( "github.com/prometheus/common/model" "github.com/prometheus/prometheus/model/labels" + "github.com/grafana/loki/v3/pkg/bloombuild/protos" iter "github.com/grafana/loki/v3/pkg/iter/v2" v1 "github.com/grafana/loki/v3/pkg/storage/bloom/v1" "github.com/grafana/loki/v3/pkg/storage/config" @@ -49,7 +50,7 @@ func (s *ChunkSizeStrategy) Plan( tenant string, tsdbs TSDBSet, metas []bloomshipper.Meta, -) ([]*Task, error) { +) ([]*protos.Task, error) { targetTaskSize := s.limits.BloomTaskTargetSeriesChunksSizeBytes(tenant) logger := log.With(s.logger, "table", table.Addr(), "tenant", tenant) @@ -72,29 +73,29 @@ func (s *ChunkSizeStrategy) Plan( return nil, fmt.Errorf("failed to get sized series iter: %w", err) } - tasks := make([]*Task, 0, iterSize) + tasks := make([]*protos.Task, 0, iterSize) for sizedIter.Next() { - batch := sizedIter.At() - if batch.Len() == 0 { + series := sizedIter.At() + if series.Len() == 0 { // This should never happen, but just in case. - level.Warn(logger).Log("msg", "got empty series batch", "tsdb", batch.TSDB().Name()) + level.Warn(logger).Log("msg", "got empty series batch", "tsdb", series.TSDB().Name()) continue } - bounds := batch.Bounds() + bounds := series.Bounds() blocks, err := getBlocksMatchingBounds(metas, bounds) if err != nil { return nil, fmt.Errorf("failed to get blocks matching bounds: %w", err) } - planGap := Gap{ + planGap := protos.Gap{ Bounds: bounds, - Series: batch.series, + Series: series.V1Series(), Blocks: blocks, } - tasks = append(tasks, NewTask(table, tenant, bounds, batch.TSDB(), []Gap{planGap})) + tasks = append(tasks, protos.NewTask(table, tenant, bounds, series.TSDB(), []protos.Gap{planGap})) } if err := sizedIter.Err(); err != nil { return nil, fmt.Errorf("failed to iterate over sized series: %w", err) @@ -154,16 +155,20 @@ func getBlocksMatchingBounds(metas []bloomshipper.Meta, bounds v1.FingerprintBou return deduped, nil } -type seriesBatch struct { +type seriesWithChunks struct { tsdb tsdb.SingleTenantTSDBIdentifier - series []model.Fingerprint + fp model.Fingerprint + chunks []index.ChunkMeta +} + +type seriesBatch struct { + series []seriesWithChunks size uint64 } -func newSeriesBatch(tsdb tsdb.SingleTenantTSDBIdentifier) seriesBatch { +func newSeriesBatch() seriesBatch { return seriesBatch{ - tsdb: tsdb, - series: make([]model.Fingerprint, 0, 100), + series: make([]seriesWithChunks, 0, 100), } } @@ -174,11 +179,32 @@ func (b *seriesBatch) Bounds() v1.FingerprintBounds { // We assume that the series are sorted by fingerprint. // This is guaranteed since series are iterated in order by the TSDB. - return v1.NewBounds(b.series[0], b.series[len(b.series)-1]) + return v1.NewBounds(b.series[0].fp, b.series[len(b.series)-1].fp) +} + +func (b *seriesBatch) V1Series() []*v1.Series { + series := make([]*v1.Series, 0, len(b.series)) + for _, s := range b.series { + res := &v1.Series{ + Fingerprint: s.fp, + Chunks: make(v1.ChunkRefs, 0, len(s.chunks)), + } + for _, chk := range s.chunks { + res.Chunks = append(res.Chunks, v1.ChunkRef{ + From: model.Time(chk.MinTime), + Through: model.Time(chk.MaxTime), + Checksum: chk.Checksum, + }) + } + + series = append(series, res) + } + + return series } -func (b *seriesBatch) Append(series model.Fingerprint, size uint64) { - b.series = append(b.series, series) +func (b *seriesBatch) Append(s seriesWithChunks, size uint64) { + b.series = append(b.series, s) b.size += size } @@ -191,7 +217,10 @@ func (b *seriesBatch) Size() uint64 { } func (b *seriesBatch) TSDB() tsdb.SingleTenantTSDBIdentifier { - return b.tsdb + if len(b.series) == 0 { + return tsdb.SingleTenantTSDBIdentifier{} + } + return b.series[0].tsdb } func (s *ChunkSizeStrategy) sizedSeriesIter( @@ -201,12 +230,9 @@ func (s *ChunkSizeStrategy) sizedSeriesIter( targetTaskSizeBytes uint64, ) (iter.Iterator[seriesBatch], int, error) { batches := make([]seriesBatch, 0, 100) - var currentBatch seriesBatch + currentBatch := newSeriesBatch() for _, idx := range tsdbsWithGaps { - // We cut a new batch for each TSDB. - currentBatch = newSeriesBatch(idx.tsdbIdentifier) - for _, gap := range idx.gaps { if err := idx.tsdb.ForSeries( ctx, @@ -227,10 +253,14 @@ func (s *ChunkSizeStrategy) sizedSeriesIter( // AND Adding this series to the batch would exceed the target task size. if currentBatch.Len() > 0 && currentBatch.Size()+seriesSize > targetTaskSizeBytes { batches = append(batches, currentBatch) - currentBatch = newSeriesBatch(idx.tsdbIdentifier) + currentBatch = newSeriesBatch() } - currentBatch.Append(fp, seriesSize) + currentBatch.Append(seriesWithChunks{ + tsdb: idx.tsdbIdentifier, + fp: fp, + chunks: chks, + }, seriesSize) return false } }, @@ -239,10 +269,10 @@ func (s *ChunkSizeStrategy) sizedSeriesIter( return nil, 0, err } - // Add the last batch for this gap if it's not empty. + // Add the last batch for this TSDB if it's not empty. if currentBatch.Len() > 0 { batches = append(batches, currentBatch) - currentBatch = newSeriesBatch(idx.tsdbIdentifier) + currentBatch = newSeriesBatch() } } } diff --git a/pkg/bloombuild/planner/strategies/chunksize_test.go b/pkg/bloombuild/planner/strategies/chunksize_test.go index 3c0f88dc39006..951d033e5c100 100644 --- a/pkg/bloombuild/planner/strategies/chunksize_test.go +++ b/pkg/bloombuild/planner/strategies/chunksize_test.go @@ -8,13 +8,14 @@ import ( "github.com/stretchr/testify/require" "github.com/grafana/loki/v3/pkg/bloombuild/planner/plannertest" + "github.com/grafana/loki/v3/pkg/bloombuild/protos" v1 "github.com/grafana/loki/v3/pkg/storage/bloom/v1" "github.com/grafana/loki/v3/pkg/storage/stores/shipper/bloomshipper" "github.com/grafana/loki/v3/pkg/storage/stores/shipper/indexshipper/tsdb" ) -func taskForGap(tsdb tsdb.SingleTenantTSDBIdentifier, bounds v1.FingerprintBounds, blocks []bloomshipper.BlockRef) *Task { - return NewTask(plannertest.TestTable, "fake", bounds, tsdb, []Gap{ +func taskForGap(tsdb tsdb.SingleTenantTSDBIdentifier, bounds v1.FingerprintBounds, blocks []bloomshipper.BlockRef) *protos.Task { + return protos.NewTask(plannertest.TestTable, "fake", bounds, tsdb, []protos.Gap{ { Bounds: bounds, Series: plannertest.GenSeriesWithStep(bounds, 10), @@ -24,14 +25,12 @@ func taskForGap(tsdb tsdb.SingleTenantTSDBIdentifier, bounds v1.FingerprintBound } func Test_ChunkSizeStrategy_Plan(t *testing.T) { - forSeries := plannertest.NewFakeForSeries(plannertest.GenV1SeriesWithStep(v1.NewBounds(0, 100), 10)) - for _, tc := range []struct { name string limits ChunkSizeStrategyLimits originalMetas []bloomshipper.Meta tsdbs TSDBSet - expectedTasks []*Task + expectedTasks []*protos.Task }{ { name: "no previous blocks and metas", @@ -39,11 +38,11 @@ func Test_ChunkSizeStrategy_Plan(t *testing.T) { // Each series will have 1 chunk of 100KB each tsdbs: TSDBSet{ - plannertest.TsdbID(0): forSeries, // 10 series + plannertest.TsdbID(0): newFakeForSeries(plannertest.GenSeriesWithStep(v1.NewBounds(0, 100), 10)), // 10 series }, // We expect 5 tasks, each with 2 series each - expectedTasks: []*Task{ + expectedTasks: []*protos.Task{ taskForGap(plannertest.TsdbID(0), v1.NewBounds(0, 10), nil), taskForGap(plannertest.TsdbID(0), v1.NewBounds(20, 30), nil), taskForGap(plannertest.TsdbID(0), v1.NewBounds(40, 50), nil), @@ -85,11 +84,11 @@ func Test_ChunkSizeStrategy_Plan(t *testing.T) { }, tsdbs: TSDBSet{ - plannertest.TsdbID(0): forSeries, // 10 series + plannertest.TsdbID(0): newFakeForSeries(plannertest.GenSeriesWithStep(v1.NewBounds(0, 100), 10)), // 10 series }, // We expect no tasks - expectedTasks: []*Task{}, + expectedTasks: []*protos.Task{}, }, { name: "Original metas do not cover the entire range", @@ -122,11 +121,11 @@ func Test_ChunkSizeStrategy_Plan(t *testing.T) { }, tsdbs: TSDBSet{ - plannertest.TsdbID(0): forSeries, // 10 series + plannertest.TsdbID(0): newFakeForSeries(plannertest.GenSeriesWithStep(v1.NewBounds(0, 100), 10)), // 10 series }, // We expect 1 tasks for the missing series - expectedTasks: []*Task{ + expectedTasks: []*protos.Task{ taskForGap(plannertest.TsdbID(0), v1.NewBounds(20, 30), nil), }, }, @@ -151,11 +150,11 @@ func Test_ChunkSizeStrategy_Plan(t *testing.T) { }, tsdbs: TSDBSet{ - plannertest.TsdbID(1): forSeries, // 10 series + plannertest.TsdbID(1): newFakeForSeries(plannertest.GenSeriesWithStep(v1.NewBounds(0, 100), 10)), // 10 series }, // We expect 5 tasks, each with 2 series each - expectedTasks: []*Task{ + expectedTasks: []*protos.Task{ taskForGap(plannertest.TsdbID(1), v1.NewBounds(0, 10), []bloomshipper.BlockRef{ plannertest.GenBlockRef(0, 0), plannertest.GenBlockRef(10, 10), @@ -206,11 +205,11 @@ func Test_ChunkSizeStrategy_Plan(t *testing.T) { }, tsdbs: TSDBSet{ - plannertest.TsdbID(1): forSeries, // 10 series + plannertest.TsdbID(1): newFakeForSeries(plannertest.GenSeriesWithStep(v1.NewBounds(0, 100), 10)), // 10 series }, // We expect 5 tasks, each with 2 series each - expectedTasks: []*Task{ + expectedTasks: []*protos.Task{ taskForGap(plannertest.TsdbID(1), v1.NewBounds(0, 10), []bloomshipper.BlockRef{ plannertest.GenBlockRef(0, 0), plannertest.GenBlockRef(10, 10), diff --git a/pkg/bloombuild/planner/strategies/factory.go b/pkg/bloombuild/planner/strategies/factory.go index a3ca12f57c3bd..f58f91e51708d 100644 --- a/pkg/bloombuild/planner/strategies/factory.go +++ b/pkg/bloombuild/planner/strategies/factory.go @@ -7,6 +7,7 @@ import ( "github.com/go-kit/log" "github.com/grafana/loki/v3/pkg/bloombuild/common" + "github.com/grafana/loki/v3/pkg/bloombuild/protos" "github.com/grafana/loki/v3/pkg/storage/config" "github.com/grafana/loki/v3/pkg/storage/stores/shipper/bloomshipper" "github.com/grafana/loki/v3/pkg/storage/stores/shipper/indexshipper/tsdb" @@ -28,7 +29,7 @@ type TSDBSet = map[tsdb.SingleTenantTSDBIdentifier]common.ClosableForSeries type PlanningStrategy interface { Name() string // Plan returns a set of tasks for a given tenant-table tuple and TSDBs. - Plan(ctx context.Context, table config.DayTable, tenant string, tsdbs TSDBSet, metas []bloomshipper.Meta) ([]*Task, error) + Plan(ctx context.Context, table config.DayTable, tenant string, tsdbs TSDBSet, metas []bloomshipper.Meta) ([]*protos.Task, error) } func NewStrategy( diff --git a/pkg/bloombuild/planner/strategies/splitkeyspace.go b/pkg/bloombuild/planner/strategies/splitkeyspace.go index ccb2c0141ca22..2e799d1ed4903 100644 --- a/pkg/bloombuild/planner/strategies/splitkeyspace.go +++ b/pkg/bloombuild/planner/strategies/splitkeyspace.go @@ -8,6 +8,7 @@ import ( "github.com/go-kit/log/level" "github.com/grafana/loki/v3/pkg/bloombuild/common" + "github.com/grafana/loki/v3/pkg/bloombuild/protos" iter "github.com/grafana/loki/v3/pkg/iter/v2" v1 "github.com/grafana/loki/v3/pkg/storage/bloom/v1" "github.com/grafana/loki/v3/pkg/storage/config" @@ -44,14 +45,14 @@ func (s *SplitKeyspaceStrategy) Plan( tenant string, tsdbs TSDBSet, metas []bloomshipper.Meta, -) ([]*Task, error) { +) ([]*protos.Task, error) { splitFactor := s.limits.BloomSplitSeriesKeyspaceBy(tenant) ownershipRanges := SplitFingerprintKeyspaceByFactor(splitFactor) logger := log.With(s.logger, "table", table.Addr(), "tenant", tenant) level.Debug(s.logger).Log("msg", "loading work for tenant", "splitFactor", splitFactor) - var tasks []*Task + var tasks []*protos.Task for _, ownershipRange := range ownershipRanges { logger := log.With(logger, "ownership", ownershipRange.String()) @@ -66,7 +67,7 @@ func (s *SplitKeyspaceStrategy) Plan( } for _, gap := range gaps { - tasks = append(tasks, NewTask(table, tenant, ownershipRange, gap.tsdb, gap.gaps)) + tasks = append(tasks, protos.NewTask(table, tenant, ownershipRange, gap.tsdb, gap.gaps)) } } @@ -84,7 +85,7 @@ func (s *SplitKeyspaceStrategy) Plan( // This is a performance optimization to avoid expensive re-reindexing type blockPlan struct { tsdb tsdb.SingleTenantTSDBIdentifier - gaps []Gap + gaps []protos.Gap } func (s *SplitKeyspaceStrategy) findOutdatedGaps( @@ -174,11 +175,11 @@ func blockPlansForGaps( for _, idx := range tsdbs { plan := blockPlan{ tsdb: idx.tsdbIdentifier, - gaps: make([]Gap, 0, len(idx.gaps)), + gaps: make([]protos.Gap, 0, len(idx.gaps)), } for _, gap := range idx.gaps { - planGap := Gap{ + planGap := protos.Gap{ Bounds: gap, } diff --git a/pkg/bloombuild/planner/strategies/splitkeyspace_test.go b/pkg/bloombuild/planner/strategies/splitkeyspace_test.go index e934205199c2b..18480d74c98fc 100644 --- a/pkg/bloombuild/planner/strategies/splitkeyspace_test.go +++ b/pkg/bloombuild/planner/strategies/splitkeyspace_test.go @@ -4,13 +4,17 @@ import ( "context" "testing" + "github.com/prometheus/common/model" + "github.com/prometheus/prometheus/model/labels" "github.com/stretchr/testify/require" "github.com/grafana/loki/v3/pkg/bloombuild/common" "github.com/grafana/loki/v3/pkg/bloombuild/planner/plannertest" + "github.com/grafana/loki/v3/pkg/bloombuild/protos" v1 "github.com/grafana/loki/v3/pkg/storage/bloom/v1" "github.com/grafana/loki/v3/pkg/storage/stores/shipper/bloomshipper" "github.com/grafana/loki/v3/pkg/storage/stores/shipper/indexshipper/tsdb" + "github.com/grafana/loki/v3/pkg/storage/stores/shipper/indexshipper/tsdb/index" ) func Test_gapsBetweenTSDBsAndMetas(t *testing.T) { @@ -135,7 +139,7 @@ func Test_blockPlansForGaps(t *testing.T) { exp: []blockPlan{ { tsdb: plannertest.TsdbID(0), - gaps: []Gap{ + gaps: []protos.Gap{ { Bounds: v1.NewBounds(0, 10), Series: plannertest.GenSeries(v1.NewBounds(0, 10)), @@ -154,7 +158,7 @@ func Test_blockPlansForGaps(t *testing.T) { exp: []blockPlan{ { tsdb: plannertest.TsdbID(0), - gaps: []Gap{ + gaps: []protos.Gap{ { Bounds: v1.NewBounds(0, 10), Series: plannertest.GenSeries(v1.NewBounds(0, 10)), @@ -178,7 +182,7 @@ func Test_blockPlansForGaps(t *testing.T) { exp: []blockPlan{ { tsdb: plannertest.TsdbID(0), - gaps: []Gap{ + gaps: []protos.Gap{ { Bounds: v1.NewBounds(0, 8), Series: plannertest.GenSeries(v1.NewBounds(0, 8)), @@ -198,7 +202,7 @@ func Test_blockPlansForGaps(t *testing.T) { exp: []blockPlan{ { tsdb: plannertest.TsdbID(0), - gaps: []Gap{ + gaps: []protos.Gap{ { Bounds: v1.NewBounds(0, 8), Series: plannertest.GenSeries(v1.NewBounds(0, 8)), @@ -225,7 +229,7 @@ func Test_blockPlansForGaps(t *testing.T) { exp: []blockPlan{ { tsdb: plannertest.TsdbID(0), - gaps: []Gap{ + gaps: []protos.Gap{ // tsdb (id=0) can source chunks from the blocks built from tsdb (id=1) { Bounds: v1.NewBounds(3, 5), @@ -242,7 +246,7 @@ func Test_blockPlansForGaps(t *testing.T) { // tsdb (id=1) can source chunks from the blocks built from tsdb (id=0) { tsdb: plannertest.TsdbID(1), - gaps: []Gap{ + gaps: []protos.Gap{ { Bounds: v1.NewBounds(0, 2), Series: plannertest.GenSeries(v1.NewBounds(0, 2)), @@ -277,7 +281,7 @@ func Test_blockPlansForGaps(t *testing.T) { exp: []blockPlan{ { tsdb: plannertest.TsdbID(0), - gaps: []Gap{ + gaps: []protos.Gap{ { Bounds: v1.NewBounds(0, 10), Series: plannertest.GenSeries(v1.NewBounds(0, 10)), @@ -296,7 +300,7 @@ func Test_blockPlansForGaps(t *testing.T) { // We add series spanning the whole FP ownership range tsdbs := make(map[tsdb.SingleTenantTSDBIdentifier]common.ClosableForSeries) for _, id := range tc.tsdbs { - tsdbs[id] = plannertest.NewFakeForSeries(plannertest.GenV1Series(tc.ownershipRange)) + tsdbs[id] = newFakeForSeries(plannertest.GenSeries(tc.ownershipRange)) } // we reuse the gapsBetweenTSDBsAndMetas function to generate the gaps as this function is tested @@ -318,3 +322,43 @@ func Test_blockPlansForGaps(t *testing.T) { }) } } + +type fakeForSeries struct { + series []*v1.Series +} + +func newFakeForSeries(series []*v1.Series) *fakeForSeries { + return &fakeForSeries{ + series: series, + } +} + +func (f fakeForSeries) ForSeries(_ context.Context, _ string, ff index.FingerprintFilter, _ model.Time, _ model.Time, fn func(labels.Labels, model.Fingerprint, []index.ChunkMeta) (stop bool), _ ...*labels.Matcher) error { + overlapping := make([]*v1.Series, 0, len(f.series)) + for _, s := range f.series { + if ff.Match(s.Fingerprint) { + overlapping = append(overlapping, s) + } + } + + for _, s := range overlapping { + chunks := make([]index.ChunkMeta, 0, len(s.Chunks)) + for _, c := range s.Chunks { + chunks = append(chunks, index.ChunkMeta{ + MinTime: int64(c.From), + MaxTime: int64(c.Through), + Checksum: c.Checksum, + KB: 100, + }) + } + + if fn(labels.EmptyLabels(), s.Fingerprint, chunks) { + break + } + } + return nil +} + +func (f fakeForSeries) Close() error { + return nil +} diff --git a/pkg/bloombuild/planner/strategies/task.go b/pkg/bloombuild/planner/strategies/task.go deleted file mode 100644 index 660c85f449016..0000000000000 --- a/pkg/bloombuild/planner/strategies/task.go +++ /dev/null @@ -1,106 +0,0 @@ -package strategies - -import ( - "context" - "fmt" - "math" - "slices" - - "github.com/prometheus/common/model" - "github.com/prometheus/prometheus/model/labels" - - "github.com/grafana/loki/v3/pkg/bloombuild/common" - "github.com/grafana/loki/v3/pkg/bloombuild/protos" - v1 "github.com/grafana/loki/v3/pkg/storage/bloom/v1" - "github.com/grafana/loki/v3/pkg/storage/config" - "github.com/grafana/loki/v3/pkg/storage/stores/shipper/bloomshipper" - "github.com/grafana/loki/v3/pkg/storage/stores/shipper/indexshipper/tsdb" - "github.com/grafana/loki/v3/pkg/storage/stores/shipper/indexshipper/tsdb/index" -) - -type Gap struct { - Bounds v1.FingerprintBounds - Series []model.Fingerprint - Blocks []bloomshipper.BlockRef -} - -// Task represents a task that is enqueued in the planner. -type Task struct { - *protos.Task - // Override the protos.Task.Gaps field with gaps that use model.Fingerprint instead of v1.Series. - Gaps []Gap -} - -func NewTask( - table config.DayTable, - tenant string, - bounds v1.FingerprintBounds, - tsdb tsdb.SingleTenantTSDBIdentifier, - gaps []Gap, -) *Task { - return &Task{ - Task: protos.NewTask(table, tenant, bounds, tsdb, nil), - Gaps: gaps, - } -} - -// ToProtoTask converts a Task to a ProtoTask. -// It will use the opened TSDB to get the chunks for the series in the gaps. -func (t *Task) ToProtoTask(ctx context.Context, forSeries common.ForSeries) (*protos.ProtoTask, error) { - // Populate the gaps with the series and chunks. - protoGaps := make([]protos.Gap, 0, len(t.Gaps)) - for _, gap := range t.Gaps { - if !slices.IsSorted(gap.Series) { - slices.Sort(gap.Series) - } - - series := make([]*v1.Series, 0, len(gap.Series)) - if err := forSeries.ForSeries( - ctx, - t.Tenant, - gap.Bounds, - 0, math.MaxInt64, - func(_ labels.Labels, fp model.Fingerprint, chks []index.ChunkMeta) (stop bool) { - select { - case <-ctx.Done(): - return true - default: - // Skip this series if it's not in the gap. - // Series are sorted, so we can break early. - if _, found := slices.BinarySearch(gap.Series, fp); !found { - return false - } - - chunks := make(v1.ChunkRefs, 0, len(chks)) - for _, chk := range chks { - chunks = append(chunks, v1.ChunkRef{ - From: model.Time(chk.MinTime), - Through: model.Time(chk.MaxTime), - Checksum: chk.Checksum, - }) - } - - series = append(series, &v1.Series{ - Fingerprint: fp, - Chunks: chunks, - }) - return false - } - }, - labels.MustNewMatcher(labels.MatchEqual, "", ""), - ); err != nil { - return nil, fmt.Errorf("failed to load series from TSDB for gap (%s): %w", gap.Bounds.String(), err) - } - - protoGaps = append(protoGaps, protos.Gap{ - Bounds: gap.Bounds, - Series: series, - Blocks: gap.Blocks, - }) - } - - // Copy inner task and set gaps - task := *t.Task - task.Gaps = protoGaps - return task.ToProtoTask(), nil -} diff --git a/pkg/bloombuild/planner/task.go b/pkg/bloombuild/planner/task.go index a20ed806788f5..3080ec47a171c 100644 --- a/pkg/bloombuild/planner/task.go +++ b/pkg/bloombuild/planner/task.go @@ -6,16 +6,11 @@ import ( "go.uber.org/atomic" - "github.com/grafana/loki/v3/pkg/bloombuild/common" - "github.com/grafana/loki/v3/pkg/bloombuild/planner/strategies" "github.com/grafana/loki/v3/pkg/bloombuild/protos" ) type QueueTask struct { - *strategies.Task - - // We use forSeries in ToProtoTask to get the chunks for the series in the gaps. - forSeries common.ForSeries + *protos.Task resultsChannel chan *protos.TaskResult @@ -28,8 +23,7 @@ type QueueTask struct { func NewQueueTask( ctx context.Context, queueTime time.Time, - task *strategies.Task, - forSeries common.ForSeries, + task *protos.Task, resultsChannel chan *protos.TaskResult, ) *QueueTask { return &QueueTask{ @@ -37,12 +31,5 @@ func NewQueueTask( resultsChannel: resultsChannel, ctx: ctx, queueTime: queueTime, - forSeries: forSeries, } } - -// ToProtoTask converts a Task to a ProtoTask. -// It will use the opened TSDB to get the chunks for the series in the gaps. -func (t *QueueTask) ToProtoTask(ctx context.Context) (*protos.ProtoTask, error) { - return t.Task.ToProtoTask(ctx, t.forSeries) -} diff --git a/pkg/bloombuild/protos/compat.go b/pkg/bloombuild/protos/compat.go index a2fc221728760..7c910d405ad9b 100644 --- a/pkg/bloombuild/protos/compat.go +++ b/pkg/bloombuild/protos/compat.go @@ -20,7 +20,6 @@ type Gap struct { Blocks []bloomshipper.BlockRef } -// Task is a convenience struct equivalent to the protobuf ProtoTask message but with Loki types. type Task struct { ID string @@ -119,7 +118,6 @@ func (t *Task) ToProtoTask() *ProtoTask { blockRefs = append(blockRefs, block.String()) } - // TODO(salvacorts): Cast []*v1.Series to []*ProtoSeries right away series := make([]*ProtoSeries, 0, len(gap.Series)) for _, s := range gap.Series { chunks := make([]*logproto.ShortRef, 0, len(s.Chunks))