diff --git a/cmd/thanos/store.go b/cmd/thanos/store.go index 40de4ae81d..546ee78d0b 100644 --- a/cmd/thanos/store.go +++ b/cmd/thanos/store.go @@ -332,6 +332,7 @@ func runStore( store.WithQueryGate(queriesGate), store.WithChunkPool(chunkPool), store.WithFilterConfig(conf.filterConf), + store.WithChunkHashCalculation(true), } if conf.debugLogging { diff --git a/internal/cortex/storegateway/bucket_stores.go b/internal/cortex/storegateway/bucket_stores.go index a0b81eabd1..cfda02a2db 100644 --- a/internal/cortex/storegateway/bucket_stores.go +++ b/internal/cortex/storegateway/bucket_stores.go @@ -481,6 +481,7 @@ func (u *BucketStores) getOrCreateStore(userID string) (*store.BucketStore, erro store.WithIndexCache(u.indexCache), store.WithQueryGate(u.queryGate), store.WithChunkPool(u.chunksPool), + store.WithChunkHashCalculation(true), } if u.logLevel.String() == "debug" { bucketStoreOpts = append(bucketStoreOpts, store.WithDebugLogging()) diff --git a/pkg/store/bucket.go b/pkg/store/bucket.go index 9e32d2d615..01dc32e498 100644 --- a/pkg/store/bucket.go +++ b/pkg/store/bucket.go @@ -9,6 +9,7 @@ import ( "context" "encoding/binary" "fmt" + "hash" "io" "math" "os" @@ -19,6 +20,8 @@ import ( "sync" "time" + "github.com/cespare/xxhash" + "github.com/alecthomas/units" "github.com/go-kit/log" "github.com/go-kit/log/level" @@ -96,10 +99,13 @@ const ( labelDecode = "decode" minBlockSyncConcurrency = 1 + + enableChunkHashCalculation = true ) var ( errBlockSyncConcurrencyNotValid = errors.New("the block sync concurrency must be equal or greater than 1.") + hashPool = sync.Pool{New: func() interface{} { return xxhash.New() }} ) type bucketStoreMetrics struct { @@ -325,10 +331,12 @@ type BucketStore struct { // Enables hints in the Series() response. enableSeriesResponseHints bool + + enableChunkHashCalculation bool } -func (b *BucketStore) validate() error { - if b.blockSyncConcurrency < minBlockSyncConcurrency { +func (s *BucketStore) validate() error { + if s.blockSyncConcurrency < minBlockSyncConcurrency { return errBlockSyncConcurrencyNotValid } return nil @@ -398,6 +406,12 @@ func WithDebugLogging() BucketStoreOption { } } +func WithChunkHashCalculation(enableChunkHashCalculation bool) BucketStoreOption { + return func(s *BucketStore) { + s.enableChunkHashCalculation = enableChunkHashCalculation + } +} + // NewBucketStore creates a new bucket backed store that implements the store API against // an object store bucket. It is optimized to work against high latency backends. func NewBucketStore( @@ -436,6 +450,7 @@ func NewBucketStore( enableCompatibilityLabel: enableCompatibilityLabel, postingOffsetsInMemSampling: postingOffsetsInMemSampling, enableSeriesResponseHints: enableSeriesResponseHints, + enableChunkHashCalculation: enableChunkHashCalculation, } for _, option := range options { @@ -794,17 +809,18 @@ func (s *bucketSeriesSet) Err() error { // blockSeries returns series matching given matchers, that have some data in given time range. func blockSeries( ctx context.Context, - extLset labels.Labels, // External labels added to the returned series labels. - indexr *bucketIndexReader, // Index reader for block. - chunkr *bucketChunkReader, // Chunk reader for block. - matchers []*labels.Matcher, // Series matchers. - chunksLimiter ChunksLimiter, // Rate limiter for loading chunks. - seriesLimiter SeriesLimiter, // Rate limiter for loading series. - skipChunks bool, // If true, chunks are not loaded. - minTime, maxTime int64, // Series must have data in this time range to be returned. - loadAggregates []storepb.Aggr, // List of aggregates to load when loading chunks. + extLset labels.Labels, + indexr *bucketIndexReader, + chunkr *bucketChunkReader, + matchers []*labels.Matcher, + chunksLimiter ChunksLimiter, + seriesLimiter SeriesLimiter, + skipChunks bool, + minTime, maxTime int64, + loadAggregates []storepb.Aggr, shardMatcher *storepb.ShardMatcher, emptyPostingsCount prometheus.Counter, + calculateChunkHash bool, ) (storepb.SeriesSet, *queryStats, error) { ps, err := indexr.ExpandedPostings(ctx, matchers) if err != nil { @@ -889,20 +905,23 @@ func blockSeries( return newBucketSeriesSet(res), indexr.stats, nil } - if err := chunkr.load(ctx, res, loadAggregates); err != nil { + if err := chunkr.load(ctx, res, loadAggregates, calculateChunkHash); err != nil { return nil, nil, errors.Wrap(err, "load chunks") } return newBucketSeriesSet(res), indexr.stats.merge(chunkr.stats), nil } -func populateChunk(out *storepb.AggrChunk, in chunkenc.Chunk, aggrs []storepb.Aggr, save func([]byte) ([]byte, error)) error { +func populateChunk(out *storepb.AggrChunk, in chunkenc.Chunk, aggrs []storepb.Aggr, save func([]byte) ([]byte, error), calculateChecksum bool) error { + hasher := hashPool.Get().(hash.Hash64) + defer hashPool.Put(hasher) + if in.Encoding() == chunkenc.EncXOR { b, err := save(in.Bytes()) if err != nil { return err } - out.Raw = &storepb.Chunk{Type: storepb.Chunk_XOR, Data: b} + out.Raw = &storepb.Chunk{Type: storepb.Chunk_XOR, Data: b, Hash: hashChunk(hasher, b, calculateChecksum)} return nil } if in.Encoding() != downsample.ChunkEncAggr { @@ -922,7 +941,7 @@ func populateChunk(out *storepb.AggrChunk, in chunkenc.Chunk, aggrs []storepb.Ag if err != nil { return err } - out.Count = &storepb.Chunk{Type: storepb.Chunk_XOR, Data: b} + out.Count = &storepb.Chunk{Type: storepb.Chunk_XOR, Data: b, Hash: hashChunk(hasher, b, calculateChecksum)} case storepb.Aggr_SUM: x, err := ac.Get(downsample.AggrSum) if err != nil { @@ -932,7 +951,7 @@ func populateChunk(out *storepb.AggrChunk, in chunkenc.Chunk, aggrs []storepb.Ag if err != nil { return err } - out.Sum = &storepb.Chunk{Type: storepb.Chunk_XOR, Data: b} + out.Sum = &storepb.Chunk{Type: storepb.Chunk_XOR, Data: b, Hash: hashChunk(hasher, b, calculateChecksum)} case storepb.Aggr_MIN: x, err := ac.Get(downsample.AggrMin) if err != nil { @@ -942,7 +961,7 @@ func populateChunk(out *storepb.AggrChunk, in chunkenc.Chunk, aggrs []storepb.Ag if err != nil { return err } - out.Min = &storepb.Chunk{Type: storepb.Chunk_XOR, Data: b} + out.Min = &storepb.Chunk{Type: storepb.Chunk_XOR, Data: b, Hash: hashChunk(hasher, b, calculateChecksum)} case storepb.Aggr_MAX: x, err := ac.Get(downsample.AggrMax) if err != nil { @@ -952,7 +971,7 @@ func populateChunk(out *storepb.AggrChunk, in chunkenc.Chunk, aggrs []storepb.Ag if err != nil { return err } - out.Max = &storepb.Chunk{Type: storepb.Chunk_XOR, Data: b} + out.Max = &storepb.Chunk{Type: storepb.Chunk_XOR, Data: b, Hash: hashChunk(hasher, b, calculateChecksum)} case storepb.Aggr_COUNTER: x, err := ac.Get(downsample.AggrCounter) if err != nil { @@ -962,12 +981,22 @@ func populateChunk(out *storepb.AggrChunk, in chunkenc.Chunk, aggrs []storepb.Ag if err != nil { return err } - out.Counter = &storepb.Chunk{Type: storepb.Chunk_XOR, Data: b} + out.Counter = &storepb.Chunk{Type: storepb.Chunk_XOR, Data: b, Hash: hashChunk(hasher, b, calculateChecksum)} } } return nil } +func hashChunk(hasher hash.Hash64, b []byte, doHash bool) uint64 { + if !doHash { + return 0 + } + hasher.Reset() + // Write never returns an error on the hasher implementation + _, _ = hasher.Write(b) + return hasher.Sum64() +} + // debugFoundBlockSetOverview logs on debug level what exactly blocks we used for query in terms of // labels and resolution. This is important because we allow mixed resolution results, so it is quite crucial // to be aware what exactly resolution we see on query. @@ -1104,6 +1133,7 @@ func (s *BucketStore) Series(req *storepb.SeriesRequest, srv storepb.Store_Serie req.Aggregates, shardMatcher, s.metrics.emptyPostingCount, + s.enableChunkHashCalculation, ) if err != nil { return errors.Wrapf(err, "fetch series for block %s", b.meta.ULID) @@ -1326,6 +1356,7 @@ func (s *BucketStore) LabelNames(ctx context.Context, req *storepb.LabelNamesReq nil, nil, s.metrics.emptyPostingCount, + false, ) if err != nil { return errors.Wrapf(err, "fetch series for block %s", b.meta.ULID) @@ -1494,6 +1525,7 @@ func (s *BucketStore) LabelValues(ctx context.Context, req *storepb.LabelValuesR nil, nil, s.metrics.emptyPostingCount, + false, ) if err != nil { return errors.Wrapf(err, "fetch series for block %s", b.meta.ULID) @@ -2566,7 +2598,7 @@ func (r *bucketChunkReader) addLoad(id chunks.ChunkRef, seriesEntry, chunk int) } // load loads all added chunks and saves resulting aggrs to res. -func (r *bucketChunkReader) load(ctx context.Context, res []seriesEntry, aggrs []storepb.Aggr) error { +func (r *bucketChunkReader) load(ctx context.Context, res []seriesEntry, aggrs []storepb.Aggr, calculateChunkChecksum bool) error { g, ctx := errgroup.WithContext(ctx) for seq, pIdxs := range r.toLoad { @@ -2582,7 +2614,7 @@ func (r *bucketChunkReader) load(ctx context.Context, res []seriesEntry, aggrs [ p := p indices := pIdxs[p.ElemRng[0]:p.ElemRng[1]] g.Go(func() error { - return r.loadChunks(ctx, res, aggrs, seq, p, indices) + return r.loadChunks(ctx, res, aggrs, seq, p, indices, calculateChunkChecksum) }) } } @@ -2591,7 +2623,7 @@ func (r *bucketChunkReader) load(ctx context.Context, res []seriesEntry, aggrs [ // loadChunks will read range [start, end] from the segment file with sequence number seq. // This data range covers chunks starting at supplied offsets. -func (r *bucketChunkReader) loadChunks(ctx context.Context, res []seriesEntry, aggrs []storepb.Aggr, seq int, part Part, pIdxs []loadIdx) error { +func (r *bucketChunkReader) loadChunks(ctx context.Context, res []seriesEntry, aggrs []storepb.Aggr, seq int, part Part, pIdxs []loadIdx, calculateChunkChecksum bool) error { fetchBegin := time.Now() // Get a reader for the required range. @@ -2670,7 +2702,7 @@ func (r *bucketChunkReader) loadChunks(ctx context.Context, res []seriesEntry, a // There is also crc32 after the chunk, but we ignore that. chunkLen = n + 1 + int(chunkDataLen) if chunkLen <= len(cb) { - err = populateChunk(&(res[pIdx.seriesEntry].chks[pIdx.chunk]), rawChunk(cb[n:chunkLen]), aggrs, r.save) + err = populateChunk(&(res[pIdx.seriesEntry].chks[pIdx.chunk]), rawChunk(cb[n:chunkLen]), aggrs, r.save, calculateChunkChecksum) if err != nil { return errors.Wrap(err, "populate chunk") } @@ -2701,7 +2733,7 @@ func (r *bucketChunkReader) loadChunks(ctx context.Context, res []seriesEntry, a r.stats.chunksFetchCount++ r.stats.ChunksFetchDurationSum += time.Since(fetchBegin) r.stats.ChunksFetchedSizeSum += units.Base2Bytes(len(*nb)) - err = populateChunk(&(res[pIdx.seriesEntry].chks[pIdx.chunk]), rawChunk((*nb)[n:]), aggrs, r.save) + err = populateChunk(&(res[pIdx.seriesEntry].chks[pIdx.chunk]), rawChunk((*nb)[n:]), aggrs, r.save, calculateChunkChecksum) if err != nil { r.block.chunkPool.Put(nb) return errors.Wrap(err, "populate chunk") diff --git a/pkg/store/bucket_test.go b/pkg/store/bucket_test.go index f51f9d6236..f134195b15 100644 --- a/pkg/store/bucket_test.go +++ b/pkg/store/bucket_test.go @@ -22,6 +22,8 @@ import ( "testing" "time" + "github.com/cespare/xxhash" + "github.com/go-kit/log" "github.com/gogo/protobuf/proto" "github.com/gogo/protobuf/types" @@ -2132,6 +2134,117 @@ func TestLabelNamesAndValuesHints(t *testing.T) { } } +func TestSeries_ChuncksHaveHashRepresentation(t *testing.T) { + tb := testutil.NewTB(t) + + tmpDir := t.TempDir() + + headOpts := tsdb.DefaultHeadOptions() + headOpts.ChunkDirRoot = filepath.Join(tmpDir, "block") + + h, err := tsdb.NewHead(nil, nil, nil, nil, headOpts, nil) + testutil.Ok(t, err) + defer func() { testutil.Ok(t, h.Close()) }() + + series := labels.FromStrings("__name__", "test") + app := h.Appender(context.Background()) + for ts := int64(0); ts < 10_000; ts++ { + _, err := app.Append(0, series, ts, float64(ts)) + testutil.Ok(t, err) + } + testutil.Ok(t, app.Commit()) + + blk := createBlockFromHead(t, headOpts.ChunkDirRoot, h) + + thanosMeta := metadata.Thanos{ + Labels: labels.Labels{{Name: "ext1", Value: "1"}}.Map(), + Downsample: metadata.ThanosDownsample{Resolution: 0}, + Source: metadata.TestSource, + } + + _, err = metadata.InjectThanos(log.NewNopLogger(), filepath.Join(headOpts.ChunkDirRoot, blk.String()), thanosMeta, nil) + testutil.Ok(t, err) + + // Create a bucket and upload the block there. + bktDir := filepath.Join(tmpDir, "bucket") + bkt, err := filesystem.NewBucket(bktDir) + testutil.Ok(t, err) + defer func() { testutil.Ok(t, bkt.Close()) }() + + instrBkt := objstore.WithNoopInstr(bkt) + logger := log.NewNopLogger() + testutil.Ok(t, block.Upload(context.Background(), logger, bkt, filepath.Join(headOpts.ChunkDirRoot, blk.String()), metadata.NoneFunc)) + + // Instance a real bucket store we'll use to query the series. + fetcher, err := block.NewMetaFetcher(logger, 10, instrBkt, tmpDir, nil, nil) + testutil.Ok(tb, err) + + indexCache, err := storecache.NewInMemoryIndexCacheWithConfig(logger, nil, storecache.InMemoryIndexCacheConfig{}) + testutil.Ok(tb, err) + + store, err := NewBucketStore( + instrBkt, + fetcher, + tmpDir, + NewChunksLimiterFactory(100000/MaxSamplesPerChunk), + NewSeriesLimiterFactory(0), + NewGapBasedPartitioner(PartitionerMaxGapSize), + 10, + false, + DefaultPostingOffsetInMemorySampling, + true, + false, + 0, + WithLogger(logger), + WithIndexCache(indexCache), + ) + testutil.Ok(tb, err) + testutil.Ok(tb, store.SyncBlocks(context.Background())) + + reqMinTime := math.MinInt64 + reqMaxTime := math.MaxInt64 + + testCases := []struct { + name string + calculateChecksum bool + }{ + { + name: "calculate checksum", + calculateChecksum: true, + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + req := &storepb.SeriesRequest{ + MinTime: int64(reqMinTime), + MaxTime: int64(reqMaxTime), + Matchers: []storepb.LabelMatcher{ + {Type: storepb.LabelMatcher_EQ, Name: "__name__", Value: "test"}, + }, + } + + srv := newStoreSeriesServer(context.Background()) + err = store.Series(req, srv) + testutil.Ok(t, err) + testutil.Assert(t, len(srv.SeriesSet) == 1) + + for _, rawChunk := range srv.SeriesSet[0].Chunks { + hash := rawChunk.Raw.Hash + decodedChunk, err := chunkenc.FromData(chunkenc.EncXOR, rawChunk.Raw.Data) + testutil.Ok(t, err) + + if tc.calculateChecksum { + expectedHash := xxhash.Sum64(decodedChunk.Bytes()) + testutil.Equals(t, expectedHash, hash) + } else { + testutil.Equals(t, uint64(0), hash) + } + } + }) + } +} + func labelNamesFromSeriesSet(series []*storepb.Series) []string { labelsMap := map[string]struct{}{} @@ -2326,7 +2439,7 @@ func benchmarkBlockSeriesWithConcurrency(b *testing.B, concurrency int, blockMet indexReader := blk.indexReader() chunkReader := blk.chunkReader() - seriesSet, _, err := blockSeries(ctx, nil, indexReader, chunkReader, matchers, chunksLimiter, seriesLimiter, req.SkipChunks, req.MinTime, req.MaxTime, req.Aggregates, nil, dummyCounter) + seriesSet, _, err := blockSeries(ctx, nil, indexReader, chunkReader, matchers, chunksLimiter, seriesLimiter, req.SkipChunks, req.MinTime, req.MaxTime, req.Aggregates, nil, dummyCounter, false) testutil.Ok(b, err) // Ensure at least 1 series has been returned (as expected). diff --git a/pkg/store/prometheus.go b/pkg/store/prometheus.go index 25aaafef9f..b4546c7d05 100644 --- a/pkg/store/prometheus.go +++ b/pkg/store/prometheus.go @@ -7,6 +7,7 @@ import ( "bytes" "context" "fmt" + "hash" "io" "net/http" "net/url" @@ -233,13 +234,13 @@ func (p *PrometheusStore) Series(r *storepb.SeriesRequest, s storepb.Store_Serie // remote read. contentType := httpResp.Header.Get("Content-Type") if strings.HasPrefix(contentType, "application/x-protobuf") { - return p.handleSampledPrometheusResponse(s, httpResp, queryPrometheusSpan, extLset) + return p.handleSampledPrometheusResponse(s, httpResp, queryPrometheusSpan, extLset, enableChunkHashCalculation) } if !strings.HasPrefix(contentType, "application/x-streamed-protobuf; proto=prometheus.ChunkedReadResponse") { return errors.Errorf("not supported remote read content type: %s", contentType) } - return p.handleStreamedPrometheusResponse(s, shardMatcher, httpResp, queryPrometheusSpan, extLset) + return p.handleStreamedPrometheusResponse(s, shardMatcher, httpResp, queryPrometheusSpan, extLset, enableChunkHashCalculation) } func (p *PrometheusStore) queryPrometheus(s storepb.Store_SeriesServer, r *storepb.SeriesRequest) error { @@ -293,7 +294,7 @@ func (p *PrometheusStore) queryPrometheus(s storepb.Store_SeriesServer, r *store Samples: prompb.SamplesFromSamplePairs(vector.Values), } - chks, err := p.chunkSamples(series, MaxSamplesPerChunk) + chks, err := p.chunkSamples(series, MaxSamplesPerChunk, enableChunkHashCalculation) if err != nil { return err } @@ -309,7 +310,13 @@ func (p *PrometheusStore) queryPrometheus(s storepb.Store_SeriesServer, r *store return nil } -func (p *PrometheusStore) handleSampledPrometheusResponse(s storepb.Store_SeriesServer, httpResp *http.Response, querySpan tracing.Span, extLset labels.Labels) error { +func (p *PrometheusStore) handleSampledPrometheusResponse( + s storepb.Store_SeriesServer, + httpResp *http.Response, + querySpan tracing.Span, + extLset labels.Labels, + calculateChecksums bool, +) error { level.Debug(p.logger).Log("msg", "started handling ReadRequest_SAMPLED response type.") resp, err := p.fetchSampledResponse(s.Context(), httpResp) @@ -337,7 +344,7 @@ func (p *PrometheusStore) handleSampledPrometheusResponse(s storepb.Store_Series continue } - aggregatedChunks, err := p.chunkSamples(e, MaxSamplesPerChunk) + aggregatedChunks, err := p.chunkSamples(e, MaxSamplesPerChunk, calculateChecksums) if err != nil { return err } @@ -359,6 +366,7 @@ func (p *PrometheusStore) handleStreamedPrometheusResponse( httpResp *http.Response, querySpan tracing.Span, extLset labels.Labels, + calculateChecksums bool, ) error { level.Debug(p.logger).Log("msg", "started handling ReadRequest_STREAMED_XOR_CHUNKS streamed read response.") @@ -379,6 +387,8 @@ func (p *PrometheusStore) handleStreamedPrometheusResponse( // TODO(bwplotka): Put read limit as a flag. stream := remote.NewChunkedReader(bodySizer, remote.DefaultChunkedReadLimit, *data) + hasher := hashPool.Get().(hash.Hash64) + defer hashPool.Put(hasher) for { res := &prompb.ChunkedReadResponse{} err := stream.NextProto(res) @@ -402,7 +412,9 @@ func (p *PrometheusStore) handleStreamedPrometheusResponse( seriesStats.CountSeries(series.Labels) thanosChks := make([]storepb.AggrChunk, len(series.Chunks)) + for i, chk := range series.Chunks { + chkHash := hashChunk(hasher, chk.Data, calculateChecksums) thanosChks[i] = storepb.AggrChunk{ MaxTime: chk.MaxTimeMs, MinTime: chk.MinTimeMs, @@ -412,6 +424,7 @@ func (p *PrometheusStore) handleStreamedPrometheusResponse( // has one difference. Prometheus has Chunk_UNKNOWN Chunk_Encoding = 0 vs we start from // XOR as 0. Compensate for that here: Type: storepb.Chunk_Encoding(chk.Type - 1), + Hash: chkHash, }, } seriesStats.Samples += thanosChks[i].Raw.XORNumSamples() @@ -495,8 +508,10 @@ func (p *PrometheusStore) fetchSampledResponse(ctx context.Context, resp *http.R return &data, nil } -func (p *PrometheusStore) chunkSamples(series *prompb.TimeSeries, maxSamplesPerChunk int) (chks []storepb.AggrChunk, err error) { +func (p *PrometheusStore) chunkSamples(series *prompb.TimeSeries, maxSamplesPerChunk int, calculateChecksums bool) (chks []storepb.AggrChunk, err error) { samples := series.Samples + hasher := hashPool.Get().(hash.Hash64) + defer hashPool.Put(hasher) for len(samples) > 0 { chunkSize := len(samples) @@ -509,10 +524,11 @@ func (p *PrometheusStore) chunkSamples(series *prompb.TimeSeries, maxSamplesPerC return nil, status.Error(codes.Unknown, err.Error()) } + chkHash := hashChunk(hasher, cb, calculateChecksums) chks = append(chks, storepb.AggrChunk{ MinTime: samples[0].Timestamp, MaxTime: samples[chunkSize-1].Timestamp, - Raw: &storepb.Chunk{Type: enc, Data: cb}, + Raw: &storepb.Chunk{Type: enc, Data: cb, Hash: chkHash}, }) samples = samples[chunkSize:] diff --git a/pkg/store/prometheus_test.go b/pkg/store/prometheus_test.go index 45939294f7..884f85bb12 100644 --- a/pkg/store/prometheus_test.go +++ b/pkg/store/prometheus_test.go @@ -11,6 +11,8 @@ import ( "testing" "time" + "github.com/cespare/xxhash" + "github.com/pkg/errors" "github.com/prometheus/prometheus/model/labels" "github.com/prometheus/prometheus/model/timestamp" @@ -430,6 +432,55 @@ func TestPrometheusStore_Series_MatchExternalLabel(t *testing.T) { testutil.Equals(t, 0, len(srv.SeriesSet)) } +func TestPrometheusStore_Series_ChunkHashCalculation_Integration(t *testing.T) { + defer testutil.TolerantVerifyLeak(t) + + p, err := e2eutil.NewPrometheus() + testutil.Ok(t, err) + defer func() { testutil.Ok(t, p.Stop()) }() + + baseT := timestamp.FromTime(time.Now()) / 1000 * 1000 + + a := p.Appender() + _, err = a.Append(0, labels.FromStrings("a", "b"), baseT+100, 1) + testutil.Ok(t, err) + _, err = a.Append(0, labels.FromStrings("a", "b"), baseT+200, 2) + testutil.Ok(t, err) + _, err = a.Append(0, labels.FromStrings("a", "b"), baseT+300, 3) + testutil.Ok(t, err) + testutil.Ok(t, a.Commit()) + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + testutil.Ok(t, p.Start()) + + u, err := url.Parse(fmt.Sprintf("http://%s", p.Addr())) + testutil.Ok(t, err) + + proxy, err := NewPrometheusStore(nil, nil, promclient.NewDefaultClient(), u, component.Sidecar, + func() labels.Labels { return labels.FromStrings("region", "eu-west") }, + func() (int64, int64) { return 0, math.MaxInt64 }, nil) + testutil.Ok(t, err) + srv := newStoreSeriesServer(ctx) + + testutil.Ok(t, proxy.Series(&storepb.SeriesRequest{ + MinTime: baseT + 101, + MaxTime: baseT + 300, + Matchers: []storepb.LabelMatcher{ + {Name: "a", Value: "b"}, + {Type: storepb.LabelMatcher_EQ, Name: "region", Value: "eu-west"}, + }, + }, srv)) + testutil.Equals(t, 1, len(srv.SeriesSet)) + + for _, chunk := range srv.SeriesSet[0].Chunks { + got := chunk.Raw.Hash + want := xxhash.Sum64(chunk.Raw.Data) + testutil.Equals(t, want, got) + } +} + func TestPrometheusStore_Info(t *testing.T) { defer testutil.TolerantVerifyLeak(t) diff --git a/pkg/store/storepb/testutil/series.go b/pkg/store/storepb/testutil/series.go index f3d2fad338..9ad9a4619b 100644 --- a/pkg/store/storepb/testutil/series.go +++ b/pkg/store/storepb/testutil/series.go @@ -15,6 +15,8 @@ import ( "testing" "time" + "github.com/cespare/xxhash" + "github.com/gogo/protobuf/types" "github.com/prometheus/prometheus/model/labels" "github.com/prometheus/prometheus/tsdb" @@ -140,7 +142,7 @@ func CreateHeadWithSeries(t testing.TB, j int, opts HeadGenOptions) (*tsdb.Head, expected[len(expected)-1].Chunks = append(expected[len(expected)-1].Chunks, storepb.AggrChunk{ MinTime: c.MinTime, MaxTime: c.MaxTime, - Raw: &storepb.Chunk{Type: storepb.Chunk_XOR, Data: chEnc.Bytes()}, + Raw: &storepb.Chunk{Type: storepb.Chunk_XOR, Data: chEnc.Bytes(), Hash: xxhash.Sum64(chEnc.Bytes())}, }) } } diff --git a/pkg/store/storepb/types.pb.go b/pkg/store/storepb/types.pb.go index ee028f4a33..271a785bf5 100644 --- a/pkg/store/storepb/types.pb.go +++ b/pkg/store/storepb/types.pb.go @@ -116,6 +116,7 @@ func (LabelMatcher_Type) EnumDescriptor() ([]byte, []int) { type Chunk struct { Type Chunk_Encoding `protobuf:"varint,1,opt,name=type,proto3,enum=thanos.Chunk_Encoding" json:"type,omitempty"` Data []byte `protobuf:"bytes,2,opt,name=data,proto3" json:"data,omitempty"` + Hash uint64 `protobuf:"varint,3,opt,name=hash,proto3" json:"hash,omitempty"` } func (m *Chunk) Reset() { *m = Chunk{} } @@ -286,40 +287,41 @@ func init() { func init() { proto.RegisterFile("store/storepb/types.proto", fileDescriptor_121fba57de02d8e0) } var fileDescriptor_121fba57de02d8e0 = []byte{ - // 522 bytes of a gzipped FileDescriptorProto - 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x6c, 0x93, 0xc1, 0x6f, 0xd3, 0x3e, - 0x14, 0xc7, 0xe3, 0x24, 0x4d, 0x5b, 0xff, 0xf6, 0x43, 0xc1, 0x4c, 0x90, 0xee, 0x90, 0x56, 0x41, - 0x88, 0x6a, 0xd2, 0x12, 0x69, 0x70, 0xe4, 0xd2, 0xa2, 0xde, 0x60, 0x63, 0x5e, 0x25, 0xd0, 0x84, - 0x84, 0xdc, 0xcc, 0x4a, 0xad, 0x35, 0x76, 0x94, 0x38, 0xd0, 0xfe, 0x17, 0x20, 0xee, 0xfc, 0x3d, - 0x3d, 0xee, 0x88, 0x38, 0x4c, 0xd0, 0xfe, 0x23, 0xc8, 0x4e, 0x0a, 0x54, 0xca, 0x25, 0x7a, 0x79, - 0xdf, 0xcf, 0x7b, 0xdf, 0xf8, 0xe5, 0x19, 0xf6, 0x0a, 0x29, 0x72, 0x1a, 0xe9, 0x67, 0x36, 0x8b, - 0xe4, 0x2a, 0xa3, 0x45, 0x98, 0xe5, 0x42, 0x0a, 0xe4, 0xc8, 0x39, 0xe1, 0xa2, 0x38, 0x3a, 0x4c, - 0x44, 0x22, 0x74, 0x2a, 0x52, 0x51, 0xa5, 0x1e, 0xd5, 0x85, 0x0b, 0x32, 0xa3, 0x8b, 0xfd, 0xc2, - 0xe0, 0x3d, 0x6c, 0xbd, 0x9c, 0x97, 0xfc, 0x06, 0x1d, 0x43, 0x5b, 0xe5, 0x3d, 0x30, 0x00, 0xc3, - 0x7b, 0xa7, 0x0f, 0xc3, 0xaa, 0x61, 0xa8, 0xc5, 0x70, 0xc2, 0x63, 0x71, 0xcd, 0x78, 0x82, 0x35, - 0x83, 0x10, 0xb4, 0xaf, 0x89, 0x24, 0x9e, 0x39, 0x00, 0xc3, 0x03, 0xac, 0xe3, 0xe0, 0x01, 0xec, - 0xec, 0x28, 0xd4, 0x86, 0xd6, 0xbb, 0x73, 0xec, 0x1a, 0xc1, 0x37, 0x00, 0x9d, 0x4b, 0x9a, 0x33, - 0x5a, 0xa0, 0x18, 0x3a, 0xda, 0xbf, 0xf0, 0xc0, 0xc0, 0x1a, 0xfe, 0x77, 0xfa, 0xff, 0xce, 0xe1, - 0x95, 0xca, 0x8e, 0x5f, 0xac, 0xef, 0xfa, 0xc6, 0x8f, 0xbb, 0xfe, 0xf3, 0x84, 0xc9, 0x79, 0x39, - 0x0b, 0x63, 0x91, 0x46, 0x15, 0x70, 0xc2, 0x44, 0x1d, 0x45, 0xd9, 0x4d, 0x12, 0xed, 0x1d, 0x25, - 0xbc, 0xd2, 0xd5, 0xb8, 0x6e, 0x8d, 0x22, 0xe8, 0xc4, 0xea, 0x83, 0x0b, 0xcf, 0xd4, 0x26, 0xf7, - 0x77, 0x26, 0xa3, 0x24, 0xc9, 0xf5, 0x51, 0xc6, 0xb6, 0x32, 0xc2, 0x35, 0x16, 0x7c, 0x35, 0x61, - 0xf7, 0x8f, 0x86, 0x7a, 0xb0, 0x93, 0x32, 0xfe, 0x41, 0xb2, 0xb4, 0x9a, 0x83, 0x85, 0xdb, 0x29, - 0xe3, 0x53, 0x96, 0x52, 0x2d, 0x91, 0x65, 0x25, 0x99, 0xb5, 0x44, 0x96, 0x5a, 0xea, 0x43, 0x2b, - 0x27, 0x9f, 0x3c, 0x6b, 0x00, 0xfe, 0x3d, 0x96, 0xee, 0x88, 0x95, 0x82, 0x1e, 0xc3, 0x56, 0x2c, - 0x4a, 0x2e, 0x3d, 0xbb, 0x09, 0xa9, 0x34, 0xd5, 0xa5, 0x28, 0x53, 0xaf, 0xd5, 0xd8, 0xa5, 0x28, - 0x53, 0x05, 0xa4, 0x8c, 0x7b, 0x4e, 0x23, 0x90, 0x32, 0xae, 0x01, 0xb2, 0xf4, 0xda, 0xcd, 0x00, - 0x59, 0xa2, 0xa7, 0xb0, 0xad, 0xbd, 0x68, 0xee, 0x75, 0x9a, 0xa0, 0x9d, 0x1a, 0x7c, 0x01, 0xf0, - 0x40, 0x0f, 0xf6, 0x35, 0x91, 0xf1, 0x9c, 0xe6, 0xe8, 0x64, 0x6f, 0x39, 0x7a, 0x7b, 0xbf, 0xae, - 0x66, 0xc2, 0xe9, 0x2a, 0xa3, 0x7f, 0xf7, 0x83, 0x93, 0x7a, 0x50, 0x5d, 0xac, 0x63, 0x74, 0x08, - 0x5b, 0x1f, 0xc9, 0xa2, 0xa4, 0x7a, 0x4e, 0x5d, 0x5c, 0xbd, 0x04, 0x43, 0x68, 0xab, 0x3a, 0xe4, - 0x40, 0x73, 0x72, 0xe1, 0x1a, 0x6a, 0x73, 0xce, 0x26, 0x17, 0x2e, 0x50, 0x09, 0x3c, 0x71, 0x4d, - 0x9d, 0xc0, 0x13, 0xd7, 0x3a, 0x0e, 0xe1, 0xa3, 0x37, 0x24, 0x97, 0x8c, 0x2c, 0x30, 0x2d, 0x32, - 0xc1, 0x0b, 0x7a, 0x29, 0x73, 0x22, 0x69, 0xb2, 0x42, 0x1d, 0x68, 0xbf, 0x1d, 0xe1, 0x33, 0xd7, - 0x40, 0x5d, 0xd8, 0x1a, 0x8d, 0xcf, 0xf1, 0xd4, 0x05, 0xe3, 0x27, 0xeb, 0x5f, 0xbe, 0xb1, 0xde, - 0xf8, 0xe0, 0x76, 0xe3, 0x83, 0x9f, 0x1b, 0x1f, 0x7c, 0xde, 0xfa, 0xc6, 0xed, 0xd6, 0x37, 0xbe, - 0x6f, 0x7d, 0xe3, 0xaa, 0x5d, 0x5f, 0xa2, 0x99, 0xa3, 0xaf, 0xc1, 0xb3, 0xdf, 0x01, 0x00, 0x00, - 0xff, 0xff, 0xd5, 0x63, 0x3a, 0x23, 0x5c, 0x03, 0x00, 0x00, + // 536 bytes of a gzipped FileDescriptorProto + 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x6c, 0x53, 0x4f, 0x6f, 0xd3, 0x4e, + 0x10, 0xf5, 0xda, 0x8e, 0x93, 0xcc, 0xaf, 0x3f, 0x64, 0x96, 0x0a, 0xdc, 0x1e, 0x9c, 0xc8, 0x08, + 0x11, 0x55, 0xaa, 0x2d, 0x15, 0x8e, 0x5c, 0x12, 0x94, 0x1b, 0xb4, 0x74, 0x1b, 0x09, 0xd4, 0x0b, + 0xda, 0xb8, 0x2b, 0xdb, 0x6a, 0xfc, 0x47, 0xf6, 0xba, 0x24, 0xdf, 0x02, 0xc4, 0x9d, 0xcf, 0x93, + 0x63, 0x8f, 0x88, 0x43, 0x04, 0xc9, 0x17, 0x41, 0x1e, 0x3b, 0x40, 0xa4, 0x5c, 0xac, 0xf1, 0x7b, + 0x6f, 0x66, 0x76, 0xde, 0xce, 0xc2, 0x51, 0x21, 0xd3, 0x5c, 0x78, 0xf8, 0xcd, 0xa6, 0x9e, 0x5c, + 0x64, 0xa2, 0x70, 0xb3, 0x3c, 0x95, 0x29, 0x35, 0x64, 0xc8, 0x93, 0xb4, 0x38, 0x3e, 0x0c, 0xd2, + 0x20, 0x45, 0xc8, 0xab, 0xa2, 0x9a, 0x3d, 0x6e, 0x12, 0x67, 0x7c, 0x2a, 0x66, 0xbb, 0x89, 0xce, + 0x1d, 0xb4, 0x5e, 0x87, 0x65, 0x72, 0x4b, 0x4f, 0x40, 0xaf, 0x70, 0x8b, 0xf4, 0xc9, 0xe0, 0xc1, + 0xd9, 0x63, 0xb7, 0x2e, 0xe8, 0x22, 0xe9, 0x8e, 0x13, 0x3f, 0xbd, 0x89, 0x92, 0x80, 0xa1, 0x86, + 0x52, 0xd0, 0x6f, 0xb8, 0xe4, 0x96, 0xda, 0x27, 0x83, 0x03, 0x86, 0x31, 0xb5, 0x40, 0x0f, 0x79, + 0x11, 0x5a, 0x5a, 0x9f, 0x0c, 0xf4, 0x91, 0xbe, 0x5c, 0xf5, 0x08, 0x43, 0xc4, 0x79, 0x04, 0x9d, + 0x6d, 0x3e, 0x6d, 0x83, 0xf6, 0xe1, 0x82, 0x99, 0x8a, 0xf3, 0x8d, 0x80, 0x71, 0x25, 0xf2, 0x48, + 0x14, 0xd4, 0x07, 0x03, 0x4f, 0x56, 0x58, 0xa4, 0xaf, 0x0d, 0xfe, 0x3b, 0xfb, 0x7f, 0xdb, 0xfb, + 0x4d, 0x85, 0x8e, 0x5e, 0x2d, 0x57, 0x3d, 0xe5, 0xc7, 0xaa, 0xf7, 0x32, 0x88, 0x64, 0x58, 0x4e, + 0x5d, 0x3f, 0x8d, 0xbd, 0x5a, 0x70, 0x1a, 0xa5, 0x4d, 0xe4, 0x65, 0xb7, 0x81, 0xb7, 0x33, 0xa4, + 0x7b, 0x8d, 0xd9, 0xac, 0x29, 0x4d, 0x3d, 0x30, 0xfc, 0x6a, 0x94, 0xc2, 0x52, 0xb1, 0xc9, 0xc3, + 0x6d, 0x93, 0x61, 0x10, 0xe4, 0x38, 0x24, 0x9e, 0x59, 0x61, 0x8d, 0xcc, 0xf9, 0xaa, 0x42, 0xf7, + 0x0f, 0x47, 0x8f, 0xa0, 0x13, 0x47, 0xc9, 0x47, 0x19, 0xc5, 0xb5, 0x43, 0x1a, 0x6b, 0xc7, 0x51, + 0x32, 0x89, 0x62, 0x81, 0x14, 0x9f, 0xd7, 0x94, 0xda, 0x50, 0x7c, 0x8e, 0x54, 0x0f, 0xb4, 0x9c, + 0x7f, 0x42, 0x4b, 0xfe, 0x19, 0x0b, 0x2b, 0xb2, 0x8a, 0xa1, 0x4f, 0xa1, 0xe5, 0xa7, 0x65, 0x22, + 0x2d, 0x7d, 0x9f, 0xa4, 0xe6, 0xaa, 0x2a, 0x45, 0x19, 0x5b, 0xad, 0xbd, 0x55, 0x8a, 0x32, 0xae, + 0x04, 0x71, 0x94, 0x58, 0xc6, 0x5e, 0x41, 0x1c, 0x25, 0x28, 0xe0, 0x73, 0xab, 0xbd, 0x5f, 0xc0, + 0xe7, 0xf4, 0x39, 0xb4, 0xb1, 0x97, 0xc8, 0xad, 0xce, 0x3e, 0xd1, 0x96, 0x75, 0xbe, 0x10, 0x38, + 0x40, 0x63, 0xdf, 0x72, 0xe9, 0x87, 0x22, 0xa7, 0xa7, 0x3b, 0x6b, 0x73, 0xb4, 0x73, 0x75, 0x8d, + 0xc6, 0x9d, 0x2c, 0x32, 0xf1, 0x77, 0x73, 0x12, 0xde, 0x18, 0xd5, 0x65, 0x18, 0xd3, 0x43, 0x68, + 0xdd, 0xf1, 0x59, 0x29, 0xd0, 0xa7, 0x2e, 0xab, 0x7f, 0x9c, 0x01, 0xe8, 0x55, 0x1e, 0x35, 0x40, + 0x1d, 0x5f, 0x9a, 0x4a, 0xb5, 0x39, 0xe7, 0xe3, 0x4b, 0x93, 0x54, 0x00, 0x1b, 0x9b, 0x2a, 0x02, + 0x6c, 0x6c, 0x6a, 0x27, 0x2e, 0x3c, 0x79, 0xc7, 0x73, 0x19, 0xf1, 0x19, 0x13, 0x45, 0x96, 0x26, + 0x85, 0xb8, 0x92, 0x39, 0x97, 0x22, 0x58, 0xd0, 0x0e, 0xe8, 0xef, 0x87, 0xec, 0xdc, 0x54, 0x68, + 0x17, 0x5a, 0xc3, 0xd1, 0x05, 0x9b, 0x98, 0x64, 0xf4, 0x6c, 0xf9, 0xcb, 0x56, 0x96, 0x6b, 0x9b, + 0xdc, 0xaf, 0x6d, 0xf2, 0x73, 0x6d, 0x93, 0xcf, 0x1b, 0x5b, 0xb9, 0xdf, 0xd8, 0xca, 0xf7, 0x8d, + 0xad, 0x5c, 0xb7, 0x9b, 0xe7, 0x35, 0x35, 0xf0, 0x81, 0xbc, 0xf8, 0x1d, 0x00, 0x00, 0xff, 0xff, + 0x13, 0xe7, 0xb3, 0x25, 0x76, 0x03, 0x00, 0x00, } func (m *Chunk) Marshal() (dAtA []byte, err error) { @@ -342,6 +344,11 @@ func (m *Chunk) MarshalToSizedBuffer(dAtA []byte) (int, error) { _ = i var l int _ = l + if m.Hash != 0 { + i = encodeVarintTypes(dAtA, i, uint64(m.Hash)) + i-- + dAtA[i] = 0x18 + } if len(m.Data) > 0 { i -= len(m.Data) copy(dAtA[i:], m.Data) @@ -579,6 +586,9 @@ func (m *Chunk) Size() (n int) { if l > 0 { n += 1 + l + sovTypes(uint64(l)) } + if m.Hash != 0 { + n += 1 + sovTypes(uint64(m.Hash)) + } return n } @@ -750,6 +760,25 @@ func (m *Chunk) Unmarshal(dAtA []byte) error { m.Data = []byte{} } iNdEx = postIndex + case 3: + if wireType != 0 { + return fmt.Errorf("proto: wrong wireType = %d for field Hash", wireType) + } + m.Hash = 0 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowTypes + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + m.Hash |= uint64(b&0x7F) << shift + if b < 0x80 { + break + } + } default: iNdEx = preIndex skippy, err := skipTypes(dAtA[iNdEx:]) diff --git a/pkg/store/storepb/types.proto b/pkg/store/storepb/types.proto index d80e1af1ba..3739e23b8f 100644 --- a/pkg/store/storepb/types.proto +++ b/pkg/store/storepb/types.proto @@ -26,6 +26,7 @@ message Chunk { } Encoding type = 1; bytes data = 2; + uint64 hash = 3 [(gogoproto.nullable) = true]; } message Series { diff --git a/pkg/store/tsdb.go b/pkg/store/tsdb.go index 9a6096192b..d876872e2e 100644 --- a/pkg/store/tsdb.go +++ b/pkg/store/tsdb.go @@ -5,6 +5,7 @@ package store import ( "context" + "hash" "io" "math" "sort" @@ -160,6 +161,8 @@ func (s *TSDBStore) Series(r *storepb.SeriesRequest, srv storepb.Store_SeriesSer shardMatcher := r.ShardInfo.Matcher(&s.buffers) defer shardMatcher.Close() + hasher := hashPool.Get().(hash.Hash64) + defer hashPool.Put(hasher) // Stream at most one series per frame; series may be split over multiple frames according to maxBytesInFrame. for set.Next() { series := set.At() @@ -197,6 +200,7 @@ func (s *TSDBStore) Series(r *storepb.SeriesRequest, srv storepb.Store_SeriesSer Raw: &storepb.Chunk{ Type: storepb.Chunk_Encoding(chk.Chunk.Encoding() - 1), // Proto chunk encoding is one off to TSDB one. Data: chk.Chunk.Bytes(), + Hash: hashChunk(hasher, chk.Chunk.Bytes(), enableChunkHashCalculation), }, } frameBytesLeft -= c.Size() diff --git a/pkg/store/tsdb_test.go b/pkg/store/tsdb_test.go index ff3d69a5d2..9fdaed7664 100644 --- a/pkg/store/tsdb_test.go +++ b/pkg/store/tsdb_test.go @@ -12,6 +12,7 @@ import ( "sort" "testing" + "github.com/cespare/xxhash" "github.com/go-kit/log" "github.com/prometheus/prometheus/model/labels" "github.com/prometheus/prometheus/storage" @@ -61,6 +62,46 @@ func TestTSDBStore_Info(t *testing.T) { testutil.Equals(t, int64(math.MaxInt64), resp.MaxTime) } +func TestTSDBStore_Series_ChunkChecksum(t *testing.T) { + defer testutil.TolerantVerifyLeak(t) + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + db, err := e2eutil.NewTSDB() + defer func() { testutil.Ok(t, db.Close()) }() + testutil.Ok(t, err) + + tsdbStore := NewTSDBStore(nil, db, component.Rule, labels.FromStrings("region", "eu-west")) + + appender := db.Appender(context.Background()) + + for i := 1; i <= 3; i++ { + _, err = appender.Append(0, labels.FromStrings("a", "1"), int64(i), float64(i)) + testutil.Ok(t, err) + } + err = appender.Commit() + testutil.Ok(t, err) + + srv := newStoreSeriesServer(ctx) + + req := &storepb.SeriesRequest{ + MinTime: 1, + MaxTime: 3, + Matchers: []storepb.LabelMatcher{ + {Type: storepb.LabelMatcher_EQ, Name: "a", Value: "1"}, + }, + } + + err = tsdbStore.Series(req, srv) + testutil.Ok(t, err) + + for _, chk := range srv.SeriesSet[0].Chunks { + want := xxhash.Sum64(chk.Raw.Data) + testutil.Equals(t, want, chk.Raw.Hash) + } +} + func TestTSDBStore_Series(t *testing.T) { defer testutil.TolerantVerifyLeak(t) diff --git a/test/e2e/query_frontend_test.go b/test/e2e/query_frontend_test.go index fdf54c304f..d8972c2ec5 100644 --- a/test/e2e/query_frontend_test.go +++ b/test/e2e/query_frontend_test.go @@ -594,7 +594,7 @@ func TestRangeQueryShardingWithRandomData(t *testing.T) { func TestRangeQueryDynamicHorizontalSharding(t *testing.T) { t.Parallel() - e, err := e2e.NewDockerEnvironment("query-frontend") + e, err := e2e.New(e2e.WithName("qfe-dyn-sharding")) testutil.Ok(t, err) t.Cleanup(e2ethanos.CleanScenario(t, e))