From 5d8bc6129eba9df85bb8c399ad09f59911d98ab6 Mon Sep 17 00:00:00 2001 From: Cyril Tovena Date: Thu, 17 Feb 2022 20:52:43 +0100 Subject: [PATCH] Correctly sets hash value for headblock iterator (#5423) --- pkg/chunkenc/memchunk.go | 6 ++++-- pkg/chunkenc/unordered.go | 8 ++++---- pkg/chunkenc/unordered_test.go | 32 +++++++++++++++++++++++++++++--- pkg/ingester/recovery_test.go | 4 +++- 4 files changed, 40 insertions(+), 10 deletions(-) diff --git a/pkg/chunkenc/memchunk.go b/pkg/chunkenc/memchunk.go index eccda20ab3c9c..7a9fdebeb099e 100644 --- a/pkg/chunkenc/memchunk.go +++ b/pkg/chunkenc/memchunk.go @@ -1015,6 +1015,7 @@ func (hb *headBlock) Iterator(ctx context.Context, direction logproto.Direction, if stream, ok = streams[lhash]; !ok { stream = &logproto.Stream{ Labels: parsedLbs.String(), + Hash: lhash, } streams[lhash] = stream } @@ -1062,8 +1063,9 @@ func (hb *headBlock) SampleIterator(ctx context.Context, mint, maxt int64, extra lhash := parsedLabels.Hash() if s, found = series[lhash]; !found { s = &logproto.Series{ - Labels: parsedLabels.String(), - Samples: SamplesPool.Get(len(hb.entries)).([]logproto.Sample)[:0], + Labels: parsedLabels.String(), + Samples: SamplesPool.Get(len(hb.entries)).([]logproto.Sample)[:0], + StreamHash: lhash, } series[lhash] = s } diff --git a/pkg/chunkenc/unordered.go b/pkg/chunkenc/unordered.go index f7bf4ae6a0883..346f85dda56b4 100644 --- a/pkg/chunkenc/unordered.go +++ b/pkg/chunkenc/unordered.go @@ -215,7 +215,6 @@ func (hb *unorderedHeadBlock) Iterator( maxt int64, pipeline log.StreamPipeline, ) iter.EntryIterator { - // We are doing a copy everytime, this is because b.entries could change completely, // the alternate would be that we allocate a new b.entries everytime we cut a block, // but the tradeoff is that queries to near-realtime data would be much lower than @@ -238,6 +237,7 @@ func (hb *unorderedHeadBlock) Iterator( if stream, ok = streams[lhash]; !ok { stream = &logproto.Stream{ Labels: parsedLbs.String(), + Hash: lhash, } streams[lhash] = stream } @@ -267,7 +267,6 @@ func (hb *unorderedHeadBlock) SampleIterator( maxt int64, extractor log.StreamSampleExtractor, ) iter.SampleIterator { - series := map[uint64]*logproto.Series{} _ = hb.forEntries( @@ -285,8 +284,9 @@ func (hb *unorderedHeadBlock) SampleIterator( lhash := parsedLabels.Hash() if s, found = series[lhash]; !found { s = &logproto.Series{ - Labels: parsedLabels.String(), - Samples: SamplesPool.Get(hb.lines).([]logproto.Sample)[:0], + Labels: parsedLabels.String(), + Samples: SamplesPool.Get(hb.lines).([]logproto.Sample)[:0], + StreamHash: parsedLabels.Hash(), } series[lhash] = s } diff --git a/pkg/chunkenc/unordered_test.go b/pkg/chunkenc/unordered_test.go index 447df07e917f1..34b866ea13313 100644 --- a/pkg/chunkenc/unordered_test.go +++ b/pkg/chunkenc/unordered_test.go @@ -9,6 +9,7 @@ import ( "testing" "time" + "github.com/prometheus/prometheus/model/labels" "github.com/stretchr/testify/require" "github.com/grafana/loki/pkg/iter" @@ -268,7 +269,7 @@ func BenchmarkHeadBlockWrites(b *testing.B) { // unordered, unordered // current default block size of 256kb with 75b avg log lines =~ 5.2k lines/block - var nWrites = (256 << 10) / 50 + nWrites := (256 << 10) / 50 headBlockFn := func() func(int64, string) { hb := &headBlock{} @@ -449,7 +450,6 @@ func BenchmarkUnorderedRead(b *testing.B) { }) } }) - } func TestUnorderedIteratorCountsAllEntries(t *testing.T) { @@ -563,7 +563,6 @@ func TestReorder(t *testing.T) { require.Equal(t, exp, b) }) - } } @@ -610,3 +609,30 @@ func TestReorderAcrossBlocks(t *testing.T) { } iterEq(t, exp, itr) } + +func Test_HeadIteratorHash(t *testing.T) { + lbs := labels.Labels{labels.Label{Name: "foo", Value: "bar"}} + ex, err := log.NewLineSampleExtractor(log.CountExtractor, nil, nil, false, false) + if err != nil { + panic(err) + } + + for name, b := range map[string]HeadBlock{ + "unordered": newUnorderedHeadBlock(), + "ordered": &headBlock{}, + } { + t.Run(name, func(t *testing.T) { + require.NoError(t, b.Append(1, "foo")) + eit := b.Iterator(context.Background(), logproto.BACKWARD, 0, 2, log.NewNoopPipeline().ForStream(lbs)) + + for eit.Next() { + require.Equal(t, lbs.Hash(), eit.StreamHash()) + } + + sit := b.SampleIterator(context.TODO(), 0, 2, ex.ForStream(lbs)) + for sit.Next() { + require.Equal(t, lbs.Hash(), sit.StreamHash()) + } + }) + } +} diff --git a/pkg/ingester/recovery_test.go b/pkg/ingester/recovery_test.go index cb2ce94d135fd..1f643b108b0bf 100644 --- a/pkg/ingester/recovery_test.go +++ b/pkg/ingester/recovery_test.go @@ -262,15 +262,17 @@ func TestSeriesRecoveryNoDuplicates(t *testing.T) { }, &result) require.NoError(t, err) require.Len(t, result.resps, 1) + lbls := labels.Labels{{Name: "bar", Value: "baz1"}, {Name: "foo", Value: "bar"}} expected := []logproto.Stream{ { - Labels: `{bar="baz1", foo="bar"}`, + Labels: lbls.String(), Entries: []logproto.Entry{ { Timestamp: time.Unix(1, 0), Line: "line 1", }, }, + Hash: lbls.Hash(), }, } require.Equal(t, expected, result.resps[0].Streams)