From a1aac43e2ca62e8d56e49f5f2ef5b62d675f3bde Mon Sep 17 00:00:00 2001 From: Christian Haudum Date: Wed, 18 Dec 2024 17:17:45 +0100 Subject: [PATCH 1/5] fix(blooms): Match series to newest block only based on meta's TSDB timestamp Signed-off-by: Christian Haudum --- pkg/bloomgateway/resolver.go | 63 +++++++++++++++++++------------ pkg/bloomgateway/resolver_test.go | 16 ++++++-- 2 files changed, 51 insertions(+), 28 deletions(-) diff --git a/pkg/bloomgateway/resolver.go b/pkg/bloomgateway/resolver.go index 0f6fe27626958..efe82ddbcc02c 100644 --- a/pkg/bloomgateway/resolver.go +++ b/pkg/bloomgateway/resolver.go @@ -2,6 +2,7 @@ package bloomgateway import ( "context" + "slices" "sort" "time" @@ -61,36 +62,48 @@ func (r *defaultBlockResolver) Resolve(ctx context.Context, tenant string, inter } func blocksMatchingSeries(metas []bloomshipper.Meta, interval bloomshipper.Interval, series []*logproto.GroupedChunkRefs) []blockWithSeries { - result := make([]blockWithSeries, 0, len(metas)) - - for _, meta := range metas { - for _, block := range meta.Blocks { + slices.SortFunc(series, func(a, b *logproto.GroupedChunkRefs) int { return int(a.Fingerprint - b.Fingerprint) }) - // skip blocks that are not within time interval - if !interval.Overlaps(block.Interval()) { - continue + result := make([]blockWithSeries, 0, len(metas)) + cache := make(map[bloomshipper.BlockRef]int) + + // find the newest block for each series + for _, s := range series { + var b *bloomshipper.BlockRef + var ts time.Time + + for i := range metas { + for j := range metas[i].Blocks { + block := metas[i].Blocks[j] + version := metas[i].Sources[j].TS + // skip blocks that are not within time interval + if !interval.Overlaps(block.Interval()) { + continue + } + // skip blocks that do not contain the series + if block.Cmp(s.Fingerprint) != v1.Overlap { + continue + } + // only use the block if it is newer than the previous + if version.After(ts) { + b = &block + ts = version + } } + } - min := sort.Search(len(series), func(i int) bool { - return block.Cmp(series[i].Fingerprint) > v1.Before - }) - - max := sort.Search(len(series), func(i int) bool { - return block.Cmp(series[i].Fingerprint) == v1.After - }) - - // All fingerprints fall outside of the consumer's range - if min == len(series) || max == 0 || min == max { - continue - } + if b == nil { + continue + } - // At least one fingerprint is within bounds of the blocks - // so append to results - dst := make([]*logproto.GroupedChunkRefs, max-min) - _ = copy(dst, series[min:max]) + idx, ok := cache[*b] + if ok { + result[idx].series = append(result[idx].series, s) + } else { + cache[*b] = len(result) result = append(result, blockWithSeries{ - block: block, - series: dst, + block: *b, + series: []*logproto.GroupedChunkRefs{s}, }) } } diff --git a/pkg/bloomgateway/resolver_test.go b/pkg/bloomgateway/resolver_test.go index e6369cbeff9ea..8cb8ee401a332 100644 --- a/pkg/bloomgateway/resolver_test.go +++ b/pkg/bloomgateway/resolver_test.go @@ -9,6 +9,7 @@ import ( "github.com/grafana/loki/v3/pkg/logproto" v1 "github.com/grafana/loki/v3/pkg/storage/bloom/v1" "github.com/grafana/loki/v3/pkg/storage/stores/shipper/bloomshipper" + "github.com/grafana/loki/v3/pkg/storage/stores/shipper/indexshipper/tsdb" ) func makeBlockRef(minFp, maxFp model.Fingerprint, from, through model.Time) bloomshipper.BlockRef { @@ -28,6 +29,9 @@ func makeMeta(minFp, maxFp model.Fingerprint, from, through model.Time) bloomshi Blocks: []bloomshipper.BlockRef{ makeBlockRef(minFp, maxFp, from, through), }, + Sources: []tsdb.SingleTenantTSDBIdentifier{ + {TS: through.Time()}, + }, } } @@ -100,14 +104,20 @@ func TestBlockResolver_BlocksMatchingSeries(t *testing.T) { t.Run("multiple overlapping blocks within time range covering full keyspace", func(t *testing.T) { metas := []bloomshipper.Meta{ - makeMeta(0x00, 0xdf, 1000, 1999), - makeMeta(0xc0, 0xff, 1000, 1999), + makeMeta(0x00, 0xdf, 1000, 1499), + makeMeta(0xc0, 0xff, 1500, 1999), } res := blocksMatchingSeries(metas, interval, series) + for i := range res { + t.Logf("%s", res[i].block) + for j := range res[i].series { + t.Logf(" %016x", res[i].series[j].Fingerprint) + } + } expected := []blockWithSeries{ { block: metas[0].Blocks[0], - series: series[0:4], + series: series[0:2], // series 0x00c0 and 0x00d0 are covered in the newer block }, { block: metas[1].Blocks[0], From 3850e636754438f1482d6948e75db9d4208aaf93 Mon Sep 17 00:00:00 2001 From: Christian Haudum Date: Wed, 18 Dec 2024 17:30:02 +0100 Subject: [PATCH 2/5] fixup! fix(blooms): Match series to newest block only Signed-off-by: Christian Haudum --- pkg/bloomgateway/resolver_test.go | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/pkg/bloomgateway/resolver_test.go b/pkg/bloomgateway/resolver_test.go index 8cb8ee401a332..217f07324da3b 100644 --- a/pkg/bloomgateway/resolver_test.go +++ b/pkg/bloomgateway/resolver_test.go @@ -104,8 +104,9 @@ func TestBlockResolver_BlocksMatchingSeries(t *testing.T) { t.Run("multiple overlapping blocks within time range covering full keyspace", func(t *testing.T) { metas := []bloomshipper.Meta{ - makeMeta(0x00, 0xdf, 1000, 1499), - makeMeta(0xc0, 0xff, 1500, 1999), + // 2 series overlap + makeMeta(0x00, 0xdf, 1000, 1499), // "old" meta covers first 4 series + makeMeta(0xc0, 0xff, 1500, 1999), // "new" meta covers last 4 series } res := blocksMatchingSeries(metas, interval, series) for i := range res { From 8227c533d99e70ea426349a3491cdd17a306faba Mon Sep 17 00:00:00 2001 From: Christian Haudum Date: Thu, 19 Dec 2024 09:20:39 +0100 Subject: [PATCH 3/5] fixup! fixup! fix(blooms): Match series to newest block only Signed-off-by: Christian Haudum --- pkg/bloomgateway/resolver.go | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/pkg/bloomgateway/resolver.go b/pkg/bloomgateway/resolver.go index efe82ddbcc02c..08cf9769fc4fb 100644 --- a/pkg/bloomgateway/resolver.go +++ b/pkg/bloomgateway/resolver.go @@ -70,12 +70,12 @@ func blocksMatchingSeries(metas []bloomshipper.Meta, interval bloomshipper.Inter // find the newest block for each series for _, s := range series { var b *bloomshipper.BlockRef - var ts time.Time + var newestTs time.Time for i := range metas { for j := range metas[i].Blocks { block := metas[i].Blocks[j] - version := metas[i].Sources[j].TS + sourceTs := metas[i].Sources[j].TS // skip blocks that are not within time interval if !interval.Overlaps(block.Interval()) { continue @@ -85,9 +85,9 @@ func blocksMatchingSeries(metas []bloomshipper.Meta, interval bloomshipper.Inter continue } // only use the block if it is newer than the previous - if version.After(ts) { + if sourceTs.After(newestTs) { b = &block - ts = version + newestTs = sourceTs } } } From 92692e21987ae19491bf47cd0f5f582adf0509f9 Mon Sep 17 00:00:00 2001 From: Christian Haudum Date: Thu, 19 Dec 2024 10:11:18 +0100 Subject: [PATCH 4/5] fix: Populate "Sources" field in metas correctly The the slice of sources must be the same length as the slice of blocks. A source at index i is the TSDB identifier from which the block at index i was built. Signed-off-by: Christian Haudum --- pkg/bloombuild/builder/builder.go | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/pkg/bloombuild/builder/builder.go b/pkg/bloombuild/builder/builder.go index b5bb682e8efcd..4e45467eb52f7 100644 --- a/pkg/bloombuild/builder/builder.go +++ b/pkg/bloombuild/builder/builder.go @@ -31,7 +31,6 @@ import ( "github.com/grafana/loki/v3/pkg/storage/config" "github.com/grafana/loki/v3/pkg/storage/stores" "github.com/grafana/loki/v3/pkg/storage/stores/shipper/bloomshipper" - "github.com/grafana/loki/v3/pkg/storage/stores/shipper/indexshipper/tsdb" utillog "github.com/grafana/loki/v3/pkg/util/log" "github.com/grafana/loki/v3/pkg/util/ring" ) @@ -415,7 +414,6 @@ func (b *Builder) processTask( Bounds: gap.Bounds, }, }, - Sources: []tsdb.SingleTenantTSDBIdentifier{task.TSDB}, } // Fetch blocks that aren't up to date but are in the desired fingerprint range @@ -492,6 +490,7 @@ func (b *Builder) processTask( level.Debug(logger).Log("msg", "uploaded block", "progress_pct", fmt.Sprintf("%.2f", pct)) meta.Blocks = append(meta.Blocks, built.BlockRef) + meta.Sources = append(meta.Sources, task.TSDB) } if err := newBlocks.Err(); err != nil { From f566b92c5be4b6b5aecb802c52e04319b66f212f Mon Sep 17 00:00:00 2001 From: Christian Haudum Date: Thu, 19 Dec 2024 10:19:25 +0100 Subject: [PATCH 5/5] fixup! fix: Populate "Sources" field in metas correctly Signed-off-by: Christian Haudum --- pkg/bloomgateway/resolver.go | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/pkg/bloomgateway/resolver.go b/pkg/bloomgateway/resolver.go index 08cf9769fc4fb..71f410ad8f3d4 100644 --- a/pkg/bloomgateway/resolver.go +++ b/pkg/bloomgateway/resolver.go @@ -75,7 +75,14 @@ func blocksMatchingSeries(metas []bloomshipper.Meta, interval bloomshipper.Inter for i := range metas { for j := range metas[i].Blocks { block := metas[i].Blocks[j] - sourceTs := metas[i].Sources[j].TS + // To keep backwards compatibility, we can only look at the source at index 0 + // because in the past the slice had always length 1, see + // https://github.com/grafana/loki/blob/b4060154d198e17bef8ba0fbb1c99bb5c93a412d/pkg/bloombuild/builder/builder.go#L418 + sourceTs := metas[i].Sources[0].TS + // Newer metas have len(Sources) == len(Blocks) + if len(metas[i].Sources) > j { + sourceTs = metas[i].Sources[j].TS + } // skip blocks that are not within time interval if !interval.Overlaps(block.Interval()) { continue