diff --git a/CHANGELOG.md b/CHANGELOG.md index d09d3aa1722..712aee4ba6e 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -13,12 +13,14 @@ We use *breaking* word for marking changes that are not backward compatible (rel ### Fixed -* [#2665](https://github.com/thanos-io/thanos/pull/2665) Swift: fix issue with missing Content-Type HTTP headers. +- [#2665](https://github.com/thanos-io/thanos/pull/2665) Swift: fix issue with missing Content-Type HTTP headers. - [#2800](https://github.com/thanos-io/thanos/pull/2800) Query: Fix handling of `--web.external-prefix` and `--web.route-prefix` - [#2834](https://github.com/thanos-io/thanos/pull/2834) Query: Fix rendered JSON state value for rules and alerts should be in lowercase ### Changed +- [#2305](https://github.com/thanos-io/thanos/pull/2305) Receive,Sidecar,Ruler: Propagate correct (stricter) MinTime for no-block TSDBs. + ## [v0.14.0](https://github.com/thanos-io/thanos/releases) - IN PROGRESS ### Fixed diff --git a/pkg/store/bucket_test.go b/pkg/store/bucket_test.go index 3c799e26f34..82c3980f217 100644 --- a/pkg/store/bucket_test.go +++ b/pkg/store/bucket_test.go @@ -38,12 +38,7 @@ import ( "github.com/prometheus/prometheus/pkg/relabel" "github.com/prometheus/prometheus/storage" "github.com/prometheus/prometheus/tsdb" - "github.com/prometheus/prometheus/tsdb/chunkenc" - "github.com/prometheus/prometheus/tsdb/chunks" "github.com/prometheus/prometheus/tsdb/encoding" - "github.com/prometheus/prometheus/tsdb/index" - "gopkg.in/yaml.v2" - "github.com/thanos-io/thanos/pkg/block" "github.com/thanos-io/thanos/pkg/block/indexheader" "github.com/thanos-io/thanos/pkg/block/metadata" @@ -54,8 +49,10 @@ import ( storecache "github.com/thanos-io/thanos/pkg/store/cache" "github.com/thanos-io/thanos/pkg/store/hintspb" "github.com/thanos-io/thanos/pkg/store/storepb" + storetestutil "github.com/thanos-io/thanos/pkg/store/storepb/testutil" "github.com/thanos-io/thanos/pkg/testutil" "github.com/thanos-io/thanos/pkg/testutil/e2eutil" + "gopkg.in/yaml.v2" ) var emptyRelabelConfig = make([]*relabel.Config, 0) @@ -1023,11 +1020,6 @@ func BenchmarkBucketIndexReader_ExpandedPostings(b *testing.B) { benchmarkExpandedPostings(tb, bkt, id, r, 50e5) } -// Make entries ~50B in size, to emulate real-world high cardinality. -const ( - postingsBenchSuffix = "aaaaaaaaaabbbbbbbbbbccccccccccdddddddddd" -) - func uploadTestBlock(t testing.TB, tmpDir string, bkt objstore.Bucket, series int) ulid.ULID { h, err := tsdb.NewHead(nil, nil, nil, 1000, tmpDir, nil, tsdb.DefaultStripeSize, nil) testutil.Ok(t, err) @@ -1062,12 +1054,12 @@ func appendTestData(t testing.TB, app storage.Appender, series int) { series = series / 5 for n := 0; n < 10; n++ { for i := 0; i < series/10; i++ { - addSeries(labels.FromStrings("i", strconv.Itoa(i)+postingsBenchSuffix, "n", strconv.Itoa(n)+postingsBenchSuffix, "j", "foo")) + addSeries(labels.FromStrings("i", strconv.Itoa(i)+storetestutil.LabelLongSuffix, "n", strconv.Itoa(n)+storetestutil.LabelLongSuffix, "j", "foo")) // Have some series that won't be matched, to properly test inverted matches. - addSeries(labels.FromStrings("i", strconv.Itoa(i)+postingsBenchSuffix, "n", strconv.Itoa(n)+postingsBenchSuffix, "j", "bar")) - addSeries(labels.FromStrings("i", strconv.Itoa(i)+postingsBenchSuffix, "n", "0_"+strconv.Itoa(n)+postingsBenchSuffix, "j", "bar")) - addSeries(labels.FromStrings("i", strconv.Itoa(i)+postingsBenchSuffix, "n", "1_"+strconv.Itoa(n)+postingsBenchSuffix, "j", "bar")) - addSeries(labels.FromStrings("i", strconv.Itoa(i)+postingsBenchSuffix, "n", "2_"+strconv.Itoa(n)+postingsBenchSuffix, "j", "foo")) + addSeries(labels.FromStrings("i", strconv.Itoa(i)+storetestutil.LabelLongSuffix, "n", strconv.Itoa(n)+storetestutil.LabelLongSuffix, "j", "bar")) + addSeries(labels.FromStrings("i", strconv.Itoa(i)+storetestutil.LabelLongSuffix, "n", "0_"+strconv.Itoa(n)+storetestutil.LabelLongSuffix, "j", "bar")) + addSeries(labels.FromStrings("i", strconv.Itoa(i)+storetestutil.LabelLongSuffix, "n", "1_"+strconv.Itoa(n)+storetestutil.LabelLongSuffix, "j", "bar")) + addSeries(labels.FromStrings("i", strconv.Itoa(i)+storetestutil.LabelLongSuffix, "n", "2_"+strconv.Itoa(n)+storetestutil.LabelLongSuffix, "j", "foo")) } } testutil.Ok(t, app.Commit()) @@ -1095,7 +1087,7 @@ func benchmarkExpandedPostings( r indexheader.Reader, series int, ) { - n1 := labels.MustNewMatcher(labels.MatchEqual, "n", "1"+postingsBenchSuffix) + n1 := labels.MustNewMatcher(labels.MatchEqual, "n", "1"+storetestutil.LabelLongSuffix) jFoo := labels.MustNewMatcher(labels.MatchEqual, "j", "foo") jNotFoo := labels.MustNewMatcher(labels.MatchNotEqual, "j", "foo") @@ -1105,9 +1097,9 @@ func benchmarkExpandedPostings( i1Plus := labels.MustNewMatcher(labels.MatchRegexp, "i", "^1.+$") iEmptyRe := labels.MustNewMatcher(labels.MatchRegexp, "i", "^$") iNotEmpty := labels.MustNewMatcher(labels.MatchNotEqual, "i", "") - iNot2 := labels.MustNewMatcher(labels.MatchNotEqual, "n", "2"+postingsBenchSuffix) + iNot2 := labels.MustNewMatcher(labels.MatchNotEqual, "n", "2"+storetestutil.LabelLongSuffix) iNot2Star := labels.MustNewMatcher(labels.MatchNotRegexp, "i", "^2.*$") - iRegexSet := labels.MustNewMatcher(labels.MatchRegexp, "i", "0"+postingsBenchSuffix+"|1"+postingsBenchSuffix+"|2"+postingsBenchSuffix) + iRegexSet := labels.MustNewMatcher(labels.MatchRegexp, "i", "0"+storetestutil.LabelLongSuffix+"|1"+storetestutil.LabelLongSuffix+"|2"+storetestutil.LabelLongSuffix) series = series / 5 cases := []struct { @@ -1158,106 +1150,25 @@ func benchmarkExpandedPostings( } } -func newSeries(t testing.TB, lset labels.Labels, smplChunks [][]sample) storepb.Series { - var s storepb.Series - - for _, l := range lset { - s.Labels = append(s.Labels, storepb.Label{Name: l.Name, Value: l.Value}) - } - - for _, smpls := range smplChunks { - c := chunkenc.NewXORChunk() - a, err := c.Appender() - testutil.Ok(t, err) - - for _, smpl := range smpls { - a.Append(smpl.t, smpl.v) - } - - ch := storepb.AggrChunk{ - MinTime: smpls[0].t, - MaxTime: smpls[len(smpls)-1].t, - Raw: &storepb.Chunk{Type: storepb.Chunk_XOR, Data: c.Bytes()}, - } - - s.Chunks = append(s.Chunks, ch) - } - return s -} - -func TestSeries(t *testing.T) { +func TestBucketSeries(t *testing.T) { tb := testutil.NewTB(t) - tb.Run("200e3SeriesWithOneSample", func(tb testutil.TB) { - benchSeries(tb, 200e3, seriesDimension, 200e3) - }) - tb.Run("OneSeriesWith200e3Samples", func(tb testutil.TB) { - benchSeries(tb, 200e3, samplesDimension, 200e3) + storetestutil.RunSeriesInterestingCases(tb, 200e3, 200e3, func(t testutil.TB, samplesPerSeries, series int) { + benchBucketSeries(t, samplesPerSeries, series, 1) }) } -func BenchmarkSeries(b *testing.B) { +func BenchmarkBucketSeries(b *testing.B) { tb := testutil.NewTB(b) - tb.Run("10e6SeriesWithOneSample", func(tb testutil.TB) { - benchSeries(tb, 10e6, seriesDimension, 1, 10, 10e1, 10e2, 10e3, 10e4, 10e5) // This is too big for my machine: 10e6. - }) - tb.Run("OneSeriesWith100e6Samples", func(tb testutil.TB) { - // 100e6 samples = ~17361 days with 15s scrape. - benchSeries(tb, 100e6, samplesDimension, 1, 10, 10e1, 10e2, 10e3, 10e4, 10e5, 10e6) // This is too big for my machine: 100e6. + // 10e6 samples = ~1736 days with 15s scrape + storetestutil.RunSeriesInterestingCases(tb, 10e6, 10e5, func(t testutil.TB, samplesPerSeries, series int) { + benchBucketSeries(t, samplesPerSeries, series, 1/100e6, 1/10e4, 1) }) } -func createBlockWithOneSample(t testutil.TB, dir string, blockIndex int, totalSeries int) (ulid.ULID, []storepb.Series) { - fmt.Println("Building block with numSeries:", totalSeries) - - var series []storepb.Series - h, err := tsdb.NewHead(nil, nil, nil, 1, dir, nil, tsdb.DefaultStripeSize, nil) - testutil.Ok(t, err) - defer func() { testutil.Ok(t, h.Close()) }() - - app := h.Appender() - - for i := 0; i < totalSeries; i++ { - ts := int64(blockIndex*totalSeries + i) - lbls := labels.FromStrings("foo", "bar", "i", fmt.Sprintf("%07d%s", ts, postingsBenchSuffix)) - series = append(series, newSeries(t, append(labels.Labels{{Name: "ext1", Value: "1"}}, lbls...), [][]sample{{sample{t: ts, v: 0}}})) - - _, err := app.Add(lbls, ts, 0) - testutil.Ok(t, err) - } - testutil.Ok(t, app.Commit()) - - return createBlockFromHead(t, dir, h), series -} - -func createBlockWithOneSeries(t testutil.TB, dir string, lbls labels.Labels, blockIndex int, totalSamples int, random *rand.Rand) ulid.ULID { - fmt.Println("Building block with one series with numSamples:", totalSamples) +func benchBucketSeries(t testutil.TB, samplesPerSeries, totalSeries int, requestedRatios ...float64) { + const numOfBlocks = 4 - h, err := tsdb.NewHead(nil, nil, nil, int64(totalSamples), dir, nil, tsdb.DefaultStripeSize, nil) - testutil.Ok(t, err) - defer func() { testutil.Ok(t, h.Close()) }() - - app := h.Appender() - - ref, err := app.Add(lbls, int64(blockIndex*totalSamples), random.Float64()) - testutil.Ok(t, err) - for i := 1; i < totalSamples; i++ { - ts := int64(blockIndex*totalSamples + i) - testutil.Ok(t, app.AddFast(ref, ts, random.Float64())) - } - testutil.Ok(t, app.Commit()) - - return createBlockFromHead(t, dir, h) -} - -type Dimension string - -const ( - seriesDimension = Dimension("series") - samplesDimension = Dimension("samples") -) - -func benchSeries(t testutil.TB, number int, dimension Dimension, cases ...int) { - tmpDir, err := ioutil.TempDir("", "testorbench-series") + tmpDir, err := ioutil.TempDir("", "testorbench-bucketseries") testutil.Ok(t, err) defer func() { testutil.Ok(t, os.RemoveAll(tmpDir)) }() @@ -1272,19 +1183,9 @@ func benchSeries(t testutil.TB, number int, dimension Dimension, cases ...int) { random = rand.New(rand.NewSource(120)) ) - numberPerBlock := number / 4 - lbls := labels.FromStrings("foo", "bar", "i", postingsBenchSuffix) - switch dimension { - case seriesDimension: - series = make([]storepb.Series, 0, 4*numberPerBlock) - case samplesDimension: - series = []storepb.Series{newSeries(t, append(labels.Labels{{Name: "ext1", Value: "1"}}, lbls...), nil)} - default: - t.Fatal("unknown dimension", dimension) - } - + extLset := labels.Labels{{Name: "ext1", Value: "1"}} thanosMeta := metadata.Thanos{ - Labels: labels.Labels{{Name: "ext1", Value: "1"}}.Map(), + Labels: extLset.Map(), Downsample: metadata.ThanosDownsample{Resolution: 0}, Source: metadata.TestSource, } @@ -1296,106 +1197,33 @@ func benchSeries(t testutil.TB, number int, dimension Dimension, cases ...int) { if !t.IsBenchmark() { chunkPool = &mockedPool{parent: chunkPool} } - blockDir := filepath.Join(tmpDir, "tmp") - var preBuildBlockIDs []ulid.ULID - // Local dev optimization to fetch those big blocks, instead of recreating. - // We cannot really commit this to Git (2GB). - // TODO(bwplotka): Provide them in objstore instead?. - if t.IsBenchmark() { - switch dimension { - case seriesDimension: - p := filepath.Join(".", "test-data", "10e6seriesOneSample") - if _, err := os.Stat(p); err == nil { - blockDir = p - } - case samplesDimension: - p := filepath.Join(".", "test-data", "1series100e6Samples") - if _, err := os.Stat(p); err == nil { - blockDir = p - } - } - - info, err := ioutil.ReadDir(blockDir) - if err == nil { - for _, d := range info { - if !d.IsDir() { - continue - } - - id, err := ulid.Parse(d.Name()) - if err != nil { - continue - } - - preBuildBlockIDs = append(preBuildBlockIDs, id) - } - } + samplesPerSeriesPerBlock := samplesPerSeries / numOfBlocks + if samplesPerSeriesPerBlock == 0 { + samplesPerSeriesPerBlock = 1 } - for bi := 0; bi < 4; bi++ { - var bSeries []storepb.Series - - var id ulid.ULID - switch dimension { - case seriesDimension: - if len(preBuildBlockIDs) > 0 { - id = preBuildBlockIDs[bi] - fmt.Println("Using pre-build block:", id) - break - } - // Create 4 blocks. Each will have numSeriesPerBlock number of series that have 1 sample only. - // Timestamp will be counted for each new series, so each series will have unique timestamp. - // This allows to pick time range that will correspond to number of series picked 1:1. - id, bSeries = createBlockWithOneSample(t, blockDir, bi, numberPerBlock) - series = append(series, bSeries...) - case samplesDimension: - if len(preBuildBlockIDs) > 0 { - id = preBuildBlockIDs[bi] - fmt.Println("Using pre-build block:", id) - } else { - // Create 4 blocks. Each will have numSeriesPerBlock number of series that have 1 sample only. - // Timestamp will be counted for each new series, so each series will have unique timestamp. - // This allows to pick time range that will correspond to number of series picked 1:1. - id = createBlockWithOneSeries(t, blockDir, lbls, bi, numberPerBlock, random) - } - - if !t.IsBenchmark() { - // Reread chunks for ref. - indexr, err := index.NewFileReader(filepath.Join(blockDir, id.String(), "index")) - testutil.Ok(t, err) - b, err := chunks.NewDirReader(filepath.Join(blockDir, id.String(), "chunks"), nil) - testutil.Ok(t, err) - - k, v := index.AllPostingsKey() - all, err := indexr.Postings(k, v) - testutil.Ok(t, err) - - p, err := index.ExpandPostings(all) - testutil.Ok(t, err) + seriesPerBlock := totalSeries / numOfBlocks + if seriesPerBlock == 0 { + seriesPerBlock = 1 + } - // One series expected. - testutil.Equals(t, 1, len(p)) - l := labels.Labels{} - chs := []chunks.Meta{} - testutil.Ok(t, indexr.Series(p[0], &l, &chs)) - - for _, c := range chs { - raw, err := b.Chunk(c.Ref) - testutil.Ok(t, err) - - series[0].Chunks = append(series[0].Chunks, storepb.AggrChunk{ - MaxTime: c.MaxTime, - MinTime: c.MinTime, - Raw: &storepb.Chunk{ - Data: raw.Bytes(), - Type: storepb.Chunk_XOR, - }, - }) - } - } - } + // Create 4 blocks. Each will have seriesPerBlock number of series that have samplesPerSeriesPerBlock samples. + // Timestamp will be counted for each new series and new sample, so each each series will have unique timestamp. + // This allows to pick time range that will correspond to number of series picked 1:1. + for bi := 0; bi < numOfBlocks; bi++ { + head, bSeries := storetestutil.CreateHeadWithSeries(t, bi, storetestutil.HeadGenOptions{ + Dir: tmpDir, + SamplesPerSeries: samplesPerSeriesPerBlock, + Series: seriesPerBlock, + PrependLabels: extLset, + Random: random, + SkipChunks: t.IsBenchmark(), + }) + id := createBlockFromHead(t, blockDir, head) + testutil.Ok(t, head.Close()) + series = append(series, bSeries...) meta, err := metadata.InjectThanos(log.NewNopLogger(), filepath.Join(blockDir, id.String()), thanosMeta, nil) testutil.Ok(t, err) @@ -1431,37 +1259,32 @@ func benchSeries(t testutil.TB, number int, dimension Dimension, cases ...int) { testutil.Ok(t, err) } - var bCases []*benchSeriesCase - for _, c := range cases { - var expected []storepb.Series - - switch dimension { - case seriesDimension: - expected = series[:c] - case samplesDimension: - expected = series + var bCases []*storetestutil.SeriesCase + for _, p := range requestedRatios { + seriesCut := int(p * float64(numOfBlocks*seriesPerBlock)) + if seriesCut == 0 { + seriesCut = 1 } - - bCases = append(bCases, &benchSeriesCase{ - name: fmt.Sprintf("%dof%d", c, 4*numberPerBlock), - req: &storepb.SeriesRequest{ + allCut := int(p * float64(totalSeries*samplesPerSeries)) + bCases = append(bCases, &storetestutil.SeriesCase{ + Name: fmt.Sprintf("%dof%d", allCut, totalSeries*samplesPerSeries), + Req: &storepb.SeriesRequest{ MinTime: 0, - MaxTime: int64(c) - 1, + MaxTime: int64(allCut) - 1, Matchers: []storepb.LabelMatcher{ {Type: storepb.LabelMatcher_EQ, Name: "foo", Value: "bar"}, }, }, - expectedSeries: expected, + // This does not cut chunks properly, but those are assured against for non benchmarks only, where we use 100% case only. + ExpectedSeries: series[:seriesCut], }) } + storetestutil.TestServerSeries(t, store, bCases...) - fmt.Println("Starting") - - benchmarkSeries(t, store, bCases) if !t.IsBenchmark() { // Make sure the pool is correctly used. This is expected for 200k numbers. - testutil.Equals(t, 4, int(chunkPool.(*mockedPool).gets)) - // TODO(bwplotka): This is super negative for large number of samples (1mln). Investigate. + testutil.Equals(t, numOfBlocks, int(chunkPool.(*mockedPool).gets)) + // TODO(bwplotka): This is wrong negative for large number of samples (1mln). Investigate. testutil.Equals(t, 0, int(chunkPool.(*mockedPool).balance)) chunkPool.(*mockedPool).gets = 0 @@ -1513,49 +1336,8 @@ type noopLimiter struct{} func (noopLimiter) Check(uint64) error { return nil } -type benchSeriesCase struct { - name string - req *storepb.SeriesRequest - expectedSeries []storepb.Series - expectedHints []hintspb.SeriesResponseHints -} - -func benchmarkSeries(t testutil.TB, store *BucketStore, cases []*benchSeriesCase) { - for _, c := range cases { - t.Run(c.name, func(t testutil.TB) { - t.ResetTimer() - for i := 0; i < t.N(); i++ { - srv := newStoreSeriesServer(context.Background()) - testutil.Ok(t, store.Series(c.req, srv)) - testutil.Equals(t, 0, len(srv.Warnings)) - testutil.Equals(t, len(c.expectedSeries), len(srv.SeriesSet)) - - if !t.IsBenchmark() { - if len(c.expectedSeries) == 1 { - // Chunks are not sorted within response. TODO: Investigate: Is this fine? - sort.Slice(srv.SeriesSet[0].Chunks, func(i, j int) bool { - return srv.SeriesSet[0].Chunks[i].MinTime < srv.SeriesSet[0].Chunks[j].MinTime - }) - } - // This might give unreadable output for millions of series if error. - testutil.Equals(t, c.expectedSeries, srv.SeriesSet) - - var actualHints []hintspb.SeriesResponseHints - for _, anyHints := range srv.HintsSet { - hints := hintspb.SeriesResponseHints{} - testutil.Ok(t, types.UnmarshalAny(anyHints, &hints)) - actualHints = append(actualHints, hints) - } - testutil.Equals(t, c.expectedHints, actualHints) - } - - } - }) - } -} - // Regression test against: https://github.com/thanos-io/thanos/issues/2147. -func TestSeries_OneBlock_InMemIndexCacheSegfault(t *testing.T) { +func TestBucketSeries_OneBlock_InMemIndexCacheSegfault(t *testing.T) { tmpDir, err := ioutil.TempDir("", "segfault-series") testutil.Ok(t, err) defer func() { testutil.Ok(t, os.RemoveAll(tmpDir)) }() @@ -1565,7 +1347,6 @@ func TestSeries_OneBlock_InMemIndexCacheSegfault(t *testing.T) { defer func() { testutil.Ok(t, bkt.Close()) }() logger := log.NewNopLogger() - thanosMeta := metadata.Thanos{ Labels: labels.Labels{{Name: "ext1", Value: "1"}}.Map(), Downsample: metadata.ThanosDownsample{Resolution: 0}, @@ -1600,7 +1381,7 @@ func TestSeries_OneBlock_InMemIndexCacheSegfault(t *testing.T) { for i := 0; i < numSeries; i++ { ts := int64(i) - lbls := labels.FromStrings("foo", "bar", "b", "1", "i", fmt.Sprintf("%07d%s", ts, postingsBenchSuffix)) + lbls := labels.FromStrings("foo", "bar", "b", "1", "i", fmt.Sprintf("%07d%s", ts, storetestutil.LabelLongSuffix)) _, err := app.Add(lbls, ts, 0) testutil.Ok(t, err) @@ -1638,7 +1419,7 @@ func TestSeries_OneBlock_InMemIndexCacheSegfault(t *testing.T) { for i := 0; i < numSeries; i++ { ts := int64(i) - lbls := labels.FromStrings("foo", "bar", "b", "2", "i", fmt.Sprintf("%07d%s", ts, postingsBenchSuffix)) + lbls := labels.FromStrings("foo", "bar", "b", "2", "i", fmt.Sprintf("%07d%s", ts, storetestutil.LabelLongSuffix)) _, err := app.Add(lbls, ts, 0) testutil.Ok(t, err) @@ -1745,19 +1526,32 @@ func TestSeries_RequestAndResponseHints(t *testing.T) { var ( logger = log.NewNopLogger() instrBkt = objstore.WithNoopInstr(bkt) + random = rand.New(rand.NewSource(120)) ) - // Create TSDB blocks. - block1, seriesSet1 := createBlockWithOneSample(tb, bktDir, 0, 2) - block2, seriesSet2 := createBlockWithOneSample(tb, bktDir, 1, 2) - + extLset := labels.Labels{{Name: "ext1", Value: "1"}} // Inject the Thanos meta to each block in the storage. thanosMeta := metadata.Thanos{ - Labels: labels.Labels{{Name: "ext1", Value: "1"}}.Map(), + Labels: extLset.Map(), Downsample: metadata.ThanosDownsample{Resolution: 0}, Source: metadata.TestSource, } + // Create TSDB blocks. + opts := storetestutil.HeadGenOptions{ + Dir: tmpDir, + SamplesPerSeries: 1, + Series: 2, + PrependLabels: extLset, + Random: random, + } + head, seriesSet1 := storetestutil.CreateHeadWithSeries(t, 0, opts) + block1 := createBlockFromHead(t, bktDir, head) + testutil.Ok(t, head.Close()) + head2, seriesSet2 := storetestutil.CreateHeadWithSeries(t, 1, opts) + block2 := createBlockFromHead(t, bktDir, head2) + testutil.Ok(t, head2.Close()) + for _, blockID := range []ulid.ULID{block1, block2} { _, err := metadata.InjectThanos(logger, filepath.Join(bktDir, blockID.String()), thanosMeta, nil) testutil.Ok(t, err) @@ -1791,18 +1585,18 @@ func TestSeries_RequestAndResponseHints(t *testing.T) { testutil.Ok(tb, err) testutil.Ok(tb, store.SyncBlocks(context.Background())) - testCases := []*benchSeriesCase{ + testCases := []*storetestutil.SeriesCase{ { - name: "querying a range containing 1 block should return 1 block in the response hints", - req: &storepb.SeriesRequest{ + Name: "querying a range containing 1 block should return 1 block in the response hints", + Req: &storepb.SeriesRequest{ MinTime: 0, MaxTime: 1, Matchers: []storepb.LabelMatcher{ {Type: storepb.LabelMatcher_EQ, Name: "foo", Value: "bar"}, }, }, - expectedSeries: seriesSet1, - expectedHints: []hintspb.SeriesResponseHints{ + ExpectedSeries: seriesSet1, + ExpectedHints: []hintspb.SeriesResponseHints{ { QueriedBlocks: []hintspb.Block{ {Id: block1.String()}, @@ -1810,16 +1604,16 @@ func TestSeries_RequestAndResponseHints(t *testing.T) { }, }, }, { - name: "querying a range containing multiple blocks should return multiple blocks in the response hints", - req: &storepb.SeriesRequest{ + Name: "querying a range containing multiple blocks should return multiple blocks in the response hints", + Req: &storepb.SeriesRequest{ MinTime: 0, MaxTime: 3, Matchers: []storepb.LabelMatcher{ {Type: storepb.LabelMatcher_EQ, Name: "foo", Value: "bar"}, }, }, - expectedSeries: append(append([]storepb.Series{}, seriesSet1...), seriesSet2...), - expectedHints: []hintspb.SeriesResponseHints{ + ExpectedSeries: append(append([]storepb.Series{}, seriesSet1...), seriesSet2...), + ExpectedHints: []hintspb.SeriesResponseHints{ { QueriedBlocks: []hintspb.Block{ {Id: block1.String()}, @@ -1828,8 +1622,8 @@ func TestSeries_RequestAndResponseHints(t *testing.T) { }, }, }, { - name: "querying a range containing multiple blocks but filtering a specific block should query only the requested block", - req: &storepb.SeriesRequest{ + Name: "querying a range containing multiple blocks but filtering a specific block should query only the requested block", + Req: &storepb.SeriesRequest{ MinTime: 0, MaxTime: 3, Matchers: []storepb.LabelMatcher{ @@ -1841,8 +1635,8 @@ func TestSeries_RequestAndResponseHints(t *testing.T) { }, }), }, - expectedSeries: seriesSet1, - expectedHints: []hintspb.SeriesResponseHints{ + ExpectedSeries: seriesSet1, + ExpectedHints: []hintspb.SeriesResponseHints{ { QueriedBlocks: []hintspb.Block{ {Id: block1.String()}, @@ -1852,7 +1646,7 @@ func TestSeries_RequestAndResponseHints(t *testing.T) { }, } - benchmarkSeries(tb, store, testCases) + storetestutil.TestServerSeries(tb, store, testCases...) } func TestSeries_ErrorUnmarshallingRequestHints(t *testing.T) { diff --git a/pkg/store/multitsdb.go b/pkg/store/multitsdb.go index b55535b0366..c8dd3c05a89 100644 --- a/pkg/store/multitsdb.go +++ b/pkg/store/multitsdb.go @@ -121,7 +121,7 @@ func (s *tenantSeriesSetServer) Series(store *TSDBStore, r *storepb.SeriesReques }) if err != nil { - if r.PartialResponseDisabled { + if r.PartialResponseDisabled || r.PartialResponseStrategy == storepb.PartialResponseStrategy_ABORT { s.err = errors.Wrapf(err, "get series for tenant %s", s.tenant) } else { // Consistently prefix tenant specific warnings as done in various other places. diff --git a/pkg/store/multitsdb_test.go b/pkg/store/multitsdb_test.go new file mode 100644 index 00000000000..a22a3af50de --- /dev/null +++ b/pkg/store/multitsdb_test.go @@ -0,0 +1,156 @@ +// Copyright (c) The Thanos Authors. +// Licensed under the Apache License 2.0. + +package store + +import ( + "fmt" + "io/ioutil" + "math" + "math/rand" + "os" + "path/filepath" + "testing" + + "github.com/go-kit/kit/log" + "github.com/prometheus/prometheus/tsdb" + "github.com/thanos-io/thanos/pkg/component" + "github.com/thanos-io/thanos/pkg/store/storepb" + storetestutil "github.com/thanos-io/thanos/pkg/store/storepb/testutil" + "github.com/thanos-io/thanos/pkg/testutil" +) + +func TestMultiTSDBSeries(t *testing.T) { + tb := testutil.NewTB(t) + storetestutil.RunSeriesInterestingCases(tb, 200e3, 200e3, func(t testutil.TB, samplesPerSeries, series int) { + if ok := t.Run("headOnly", func(t testutil.TB) { + benchMultiTSDBSeries(t, samplesPerSeries, series, false) + }); !ok { + return + } + t.Run("blocksOnly", func(t testutil.TB) { + benchMultiTSDBSeries(t, samplesPerSeries, series, true) + }) + }) +} + +func BenchmarkMultiTSDBSeries(b *testing.B) { + tb := testutil.NewTB(b) + storetestutil.RunSeriesInterestingCases(tb, 10e6, 10e5, func(t testutil.TB, samplesPerSeries, series int) { + if ok := t.Run("headOnly", func(t testutil.TB) { + benchMultiTSDBSeries(t, samplesPerSeries, series, false) + }); !ok { + return + } + t.Run("blocksOnly", func(t testutil.TB) { + benchMultiTSDBSeries(t, samplesPerSeries, series, true) + }) + }) +} + +type mockedStartTimeDB struct { + *tsdb.DBReadOnly + startTime int64 +} + +func (db *mockedStartTimeDB) StartTime() (int64, error) { return db.startTime, nil } + +func benchMultiTSDBSeries(t testutil.TB, totalSamples, totalSeries int, flushToBlocks bool) { + tmpDir, err := ioutil.TempDir("", "testorbench-multitsdbseries") + testutil.Ok(t, err) + defer func() { testutil.Ok(t, os.RemoveAll(tmpDir)) }() + + const numOfTSDBs = 4 + + samplesPerSeriesPerTSDB := totalSamples / numOfTSDBs + if samplesPerSeriesPerTSDB == 0 { + samplesPerSeriesPerTSDB = 1 + } + seriesPerTSDB := totalSeries / numOfTSDBs + if seriesPerTSDB == 0 { + seriesPerTSDB = 1 + } + + var ( + dbs = make([]*mockedStartTimeDB, numOfTSDBs) + resps = make([][]*storepb.SeriesResponse, 4) + random = rand.New(rand.NewSource(120)) + logger = log.NewNopLogger() + ) + + defer func() { + for _, db := range dbs { + if db != nil { + testutil.Ok(t, db.Close()) + } + } + }() + for j := range dbs { + head, created := storetestutil.CreateHeadWithSeries(t, j, storetestutil.HeadGenOptions{ + Dir: tmpDir, + SamplesPerSeries: samplesPerSeriesPerTSDB, + Series: seriesPerTSDB, + WithWAL: true, + Random: random, + SkipChunks: t.IsBenchmark(), + }) + testutil.Ok(t, head.Close()) + + tsdbDir := filepath.Join(tmpDir, fmt.Sprintf("%d", j)) + + for i := 0; i < len(created); i++ { + resps[j] = append(resps[j], storepb.NewSeriesResponse(&created[i])) + } + + if flushToBlocks { + db, err := tsdb.OpenDBReadOnly(tsdbDir, logger) + testutil.Ok(t, err) + + testutil.Ok(t, db.FlushWAL(tmpDir)) + testutil.Ok(t, db.Close()) + } + + db, err := tsdb.OpenDBReadOnly(tsdbDir, logger) + testutil.Ok(t, err) + + dbs[j] = &mockedStartTimeDB{DBReadOnly: db, startTime: int64(j * samplesPerSeriesPerTSDB * seriesPerTSDB)} + } + + tsdbs := map[string]*TSDBStore{} + for i, db := range dbs { + tsdbs[fmt.Sprintf("%v", i)] = &TSDBStore{db: db, logger: logger, maxSamplesPerChunk: 120} // On production we have math.MaxInt64 + } + + store := NewMultiTSDBStore(logger, nil, component.Receive, func() map[string]*TSDBStore { return tsdbs }) + + var expected []storepb.Series + lastLabels := storepb.Series{} + for _, resp := range resps { + for _, r := range resp { + // MultiTSDB same as Proxy will merge all series with same labels without limit (https://github.com/thanos-io/thanos/issues/2332). + // Let's do this here as well. + x := storepb.Series{Labels: r.GetSeries().Labels} + if x.String() == lastLabels.String() { + expected[len(expected)-1].Chunks = append(expected[len(expected)-1].Chunks, r.GetSeries().Chunks...) + continue + } + lastLabels = x + expected = append(expected, *r.GetSeries()) + } + } + + storetestutil.TestServerSeries(t, store, + &storetestutil.SeriesCase{ + Name: fmt.Sprintf("%d TSDBs with %d samples, %d series each", numOfTSDBs, samplesPerSeriesPerTSDB, seriesPerTSDB), + Req: &storepb.SeriesRequest{ + MinTime: 0, + MaxTime: math.MaxInt64, + Matchers: []storepb.LabelMatcher{ + {Type: storepb.LabelMatcher_EQ, Name: "foo", Value: "bar"}, + }, + PartialResponseStrategy: storepb.PartialResponseStrategy_ABORT, + }, + ExpectedSeries: expected, + }, + ) +} diff --git a/pkg/store/postings_codec_test.go b/pkg/store/postings_codec_test.go index 7f224935570..602eb5a8ca7 100644 --- a/pkg/store/postings_codec_test.go +++ b/pkg/store/postings_codec_test.go @@ -14,7 +14,7 @@ import ( "github.com/prometheus/prometheus/pkg/labels" "github.com/prometheus/prometheus/tsdb" "github.com/prometheus/prometheus/tsdb/index" - + storetestutil "github.com/thanos-io/thanos/pkg/store/storepb/testutil" "github.com/thanos-io/thanos/pkg/testutil" ) @@ -39,7 +39,7 @@ func TestDiffVarintCodec(t *testing.T) { postingsMap := map[string]index.Postings{ "all": allPostings(t, idx), - `n="1"`: matchPostings(t, idx, labels.MustNewMatcher(labels.MatchEqual, "n", "1"+postingsBenchSuffix)), + `n="1"`: matchPostings(t, idx, labels.MustNewMatcher(labels.MatchEqual, "n", "1"+storetestutil.LabelLongSuffix)), `j="foo"`: matchPostings(t, idx, labels.MustNewMatcher(labels.MatchEqual, "j", "foo")), `j!="foo"`: matchPostings(t, idx, labels.MustNewMatcher(labels.MatchNotEqual, "j", "foo")), `i=~".*"`: matchPostings(t, idx, labels.MustNewMatcher(labels.MatchRegexp, "i", ".*")), @@ -47,7 +47,7 @@ func TestDiffVarintCodec(t *testing.T) { `i=~"1.+"`: matchPostings(t, idx, labels.MustNewMatcher(labels.MatchRegexp, "i", "1.+")), `i=~"^$"'`: matchPostings(t, idx, labels.MustNewMatcher(labels.MatchRegexp, "i", "^$")), `i!~""`: matchPostings(t, idx, labels.MustNewMatcher(labels.MatchNotEqual, "i", "")), - `n!="2"`: matchPostings(t, idx, labels.MustNewMatcher(labels.MatchNotEqual, "n", "2"+postingsBenchSuffix)), + `n!="2"`: matchPostings(t, idx, labels.MustNewMatcher(labels.MatchNotEqual, "n", "2"+storetestutil.LabelLongSuffix)), `i!~"2.*"`: matchPostings(t, idx, labels.MustNewMatcher(labels.MatchNotRegexp, "i", "^2.*$")), } diff --git a/pkg/store/proxy.go b/pkg/store/proxy.go index b5c310920ed..a14db1d5721 100644 --- a/pkg/store/proxy.go +++ b/pkg/store/proxy.go @@ -294,6 +294,10 @@ func (s *ProxyStore) Series(r *storepb.SeriesRequest, srv storepb.Store_SeriesSe return nil } + // TODO(bwplotka): Currently we stream into big frames. Consider ensuring 1MB maximum. + // This however does not matter much when used with QueryAPI. Matters for federated Queries a lot. + // https://github.com/thanos-io/thanos/issues/2332 + // Series are not necessarily merged across themselves. mergedSet := storepb.MergeSeriesSets(seriesSet...) for mergedSet.Next() { var series storepb.Series diff --git a/pkg/store/proxy_test.go b/pkg/store/proxy_test.go index 83841a093d8..f3d85c27f43 100644 --- a/pkg/store/proxy_test.go +++ b/pkg/store/proxy_test.go @@ -7,12 +7,16 @@ import ( "context" "fmt" "io" + "io/ioutil" + "math" + "math/rand" "os" "sort" "testing" "time" "github.com/fortytw2/leaktest" + "github.com/go-kit/kit/log" "github.com/gogo/protobuf/proto" "github.com/gogo/protobuf/types" "github.com/pkg/errors" @@ -20,6 +24,7 @@ import ( "github.com/prometheus/prometheus/tsdb/chunkenc" "github.com/thanos-io/thanos/pkg/component" "github.com/thanos-io/thanos/pkg/store/storepb" + storetestutil "github.com/thanos-io/thanos/pkg/store/storepb/testutil" "github.com/thanos-io/thanos/pkg/testutil" "google.golang.org/grpc" "google.golang.org/grpc/codes" @@ -1511,3 +1516,140 @@ func TestMergeLabels(t *testing.T) { testutil.Equals(t, expected, resLabels) } + +func TestProxySeries(t *testing.T) { + tb := testutil.NewTB(t) + storetestutil.RunSeriesInterestingCases(tb, 200e3, 200e3, func(t testutil.TB, samplesPerSeries, series int) { + benchProxySeries(t, samplesPerSeries, series) + }) +} + +func BenchmarkProxySeries(b *testing.B) { + tb := testutil.NewTB(b) + storetestutil.RunSeriesInterestingCases(tb, 10e6, 10e5, func(t testutil.TB, samplesPerSeries, series int) { + benchProxySeries(t, samplesPerSeries, series) + }) +} + +func benchProxySeries(t testutil.TB, totalSamples, totalSeries int) { + tmpDir, err := ioutil.TempDir("", "testorbench-proxyseries") + testutil.Ok(t, err) + defer func() { testutil.Ok(t, os.RemoveAll(tmpDir)) }() + + const numOfClients = 4 + + samplesPerSeriesPerClient := totalSamples / numOfClients + if samplesPerSeriesPerClient == 0 { + samplesPerSeriesPerClient = 1 + } + seriesPerClient := totalSeries / numOfClients + if seriesPerClient == 0 { + seriesPerClient = 1 + } + + random := rand.New(rand.NewSource(120)) + clients := make([]Client, numOfClients) + for j := range clients { + var resps []*storepb.SeriesResponse + + head, created := storetestutil.CreateHeadWithSeries(t, j, storetestutil.HeadGenOptions{ + Dir: tmpDir, + SamplesPerSeries: samplesPerSeriesPerClient, + Series: seriesPerClient, + MaxFrameBytes: storetestutil.RemoteReadFrameLimit, + Random: random, + SkipChunks: t.IsBenchmark(), + }) + testutil.Ok(t, head.Close()) + + for i := 0; i < len(created); i++ { + resps = append(resps, storepb.NewSeriesResponse(&created[i])) + } + + clients[j] = &testClient{ + StoreClient: &mockedStoreAPI{ + RespSeries: resps, + }, + minTime: math.MinInt64, + maxTime: math.MaxInt64, + } + } + + logger := log.NewNopLogger() + store := &ProxyStore{ + logger: logger, + stores: func() []Client { return clients }, + metrics: newProxyStoreMetrics(nil), + responseTimeout: 0, + } + + var allResps []*storepb.SeriesResponse + var expected []storepb.Series + lastLabels := storepb.Series{} + for _, c := range clients { + m := c.(*testClient).StoreClient.(*mockedStoreAPI) + + for _, r := range m.RespSeries { + allResps = append(allResps, r) + + // Proxy will merge all series with same labels without limit (https://github.com/thanos-io/thanos/issues/2332). + // Let's do this here as well. + x := storepb.Series{Labels: r.GetSeries().Labels} + if x.String() == lastLabels.String() { + expected[len(expected)-1].Chunks = append(expected[len(expected)-1].Chunks, r.GetSeries().Chunks...) + continue + } + lastLabels = x + expected = append(expected, *r.GetSeries()) + } + + } + + chunkLen := len(allResps[len(allResps)-1].GetSeries().Chunks) + maxTime := allResps[len(allResps)-1].GetSeries().Chunks[chunkLen-1].MaxTime + storetestutil.TestServerSeries(t, store, + &storetestutil.SeriesCase{ + Name: fmt.Sprintf("%d client with %d samples, %d series each", numOfClients, samplesPerSeriesPerClient, seriesPerClient), + Req: &storepb.SeriesRequest{ + MinTime: 0, + MaxTime: maxTime, + Matchers: []storepb.LabelMatcher{ + {Type: storepb.LabelMatcher_EQ, Name: "foo", Value: "bar"}, + }, + }, + ExpectedSeries: expected, + }, + ) + + // Change client to just one. + store.stores = func() []Client { + return []Client{&testClient{ + StoreClient: &mockedStoreAPI{ + // All responses. + RespSeries: allResps, + }, + labelSets: []storepb.LabelSet{{Labels: []storepb.Label{{Name: "ext1", Value: "1"}}}}, + minTime: math.MinInt64, + maxTime: math.MaxInt64, + }} + } + + // In this we expect exactly the same response as input. + expected = expected[:0] + for _, r := range allResps { + expected = append(expected, *r.GetSeries()) + } + storetestutil.TestServerSeries(t, store, + &storetestutil.SeriesCase{ + Name: fmt.Sprintf("single client with %d samples, %d series", totalSamples, totalSeries), + Req: &storepb.SeriesRequest{ + MinTime: 0, + MaxTime: maxTime, + Matchers: []storepb.LabelMatcher{ + {Type: storepb.LabelMatcher_EQ, Name: "foo", Value: "bar"}, + }, + }, + ExpectedSeries: expected, + }, + ) +} diff --git a/pkg/store/storepb/testutil/series.go b/pkg/store/storepb/testutil/series.go new file mode 100644 index 00000000000..3f28ec2c3c1 --- /dev/null +++ b/pkg/store/storepb/testutil/series.go @@ -0,0 +1,271 @@ +// Copyright (c) The Thanos Authors. +// Licensed under the Apache License 2.0. + +package storetestutil + +import ( + "context" + "fmt" + "math" + "math/rand" + "path/filepath" + "runtime" + "sort" + "testing" + + "github.com/gogo/protobuf/types" + "github.com/prometheus/prometheus/pkg/labels" + "github.com/prometheus/prometheus/tsdb" + "github.com/prometheus/prometheus/tsdb/chunks" + "github.com/prometheus/prometheus/tsdb/index" + "github.com/prometheus/prometheus/tsdb/wal" + "github.com/thanos-io/thanos/pkg/store/hintspb" + "github.com/thanos-io/thanos/pkg/store/storepb" + "github.com/thanos-io/thanos/pkg/testutil" +) + +const ( + // LabelLongSuffix is a label with ~50B in size, to emulate real-world high cardinality. + LabelLongSuffix = "aaaaaaaaaabbbbbbbbbbccccccccccdddddddddd" +) + +func allPostings(t testing.TB, ix tsdb.IndexReader) index.Postings { + k, v := index.AllPostingsKey() + p, err := ix.Postings(k, v) + testutil.Ok(t, err) + return p +} + +const RemoteReadFrameLimit = 1048576 + +type HeadGenOptions struct { + Dir string + SamplesPerSeries, Series int + + MaxFrameBytes int // No limit by default. + WithWAL bool + PrependLabels labels.Labels + SkipChunks bool + + Random *rand.Rand +} + +// CreateHeadWithSeries returns head filled with given samples and same series returned in separate list for assertion purposes. +// Returned series list has "ext1"="1" prepended. Each series looks as follows: +// {foo=bar,i=000001aaaaaaaaaabbbbbbbbbbccccccccccdddddddddd} where number indicate sample number from 0. +// Returned series are frame in same way as remote read would frame them. +func CreateHeadWithSeries(t testing.TB, j int, opts HeadGenOptions) (*tsdb.Head, []storepb.Series) { + if opts.SamplesPerSeries < 1 || opts.Series < 1 { + t.Fatal("samples and series has to be 1 or more") + } + + tsdbDir := filepath.Join(opts.Dir, fmt.Sprintf("%d", j)) + fmt.Printf("Creating %d %d-sample series in %s\n", opts.Series, opts.SamplesPerSeries, tsdbDir) + + var w *wal.WAL + var err error + if opts.WithWAL { + w, err = wal.New(nil, nil, filepath.Join(tsdbDir, "wal"), true) + testutil.Ok(t, err) + } + + h, err := tsdb.NewHead(nil, nil, w, 10000000, tsdbDir, nil, tsdb.DefaultStripeSize, nil) + testutil.Ok(t, err) + + app := h.Appender() + for i := 0; i < opts.Series; i++ { + ts := int64(j*opts.Series*opts.SamplesPerSeries + i*opts.SamplesPerSeries) + ref, err := app.Add(labels.FromStrings("foo", "bar", "i", fmt.Sprintf("%07d%s", ts, LabelLongSuffix)), ts, opts.Random.Float64()) + testutil.Ok(t, err) + + for is := 1; is < opts.SamplesPerSeries; is++ { + testutil.Ok(t, app.AddFast(ref, ts+int64(is), opts.Random.Float64())) + } + } + testutil.Ok(t, app.Commit()) + + // Use TSDB and get all series for assertion. + chks, err := h.Chunks() + testutil.Ok(t, err) + defer func() { testutil.Ok(t, chks.Close()) }() + + ir, err := h.Index() + testutil.Ok(t, err) + defer func() { testutil.Ok(t, ir.Close()) }() + + var ( + lset labels.Labels + chunkMetas []chunks.Meta + expected = make([]storepb.Series, 0, opts.Series) + sBytes int + ) + + all := allPostings(t, ir) + for all.Next() { + testutil.Ok(t, ir.Series(all.At(), &lset, &chunkMetas)) + i := 0 + sLset := storepb.PromLabelsToLabels(lset) + expected = append(expected, storepb.Series{Labels: append(storepb.PromLabelsToLabels(opts.PrependLabels), sLset...)}) + + if opts.SkipChunks { + continue + } + + lBytes := 0 + for _, l := range sLset { + lBytes += l.Size() + } + sBytes = lBytes + + for { + c := chunkMetas[i] + i++ + + chEnc, err := chks.Chunk(c.Ref) + testutil.Ok(t, err) + + // Open Chunk. + if c.MaxTime == math.MaxInt64 { + c.MaxTime = c.MinTime + int64(chEnc.NumSamples()) - 1 + } + + sBytes += len(chEnc.Bytes()) + + expected[len(expected)-1].Chunks = append(expected[len(expected)-1].Chunks, storepb.AggrChunk{ + MinTime: c.MinTime, + MaxTime: c.MaxTime, + Raw: &storepb.Chunk{Type: storepb.Chunk_XOR, Data: chEnc.Bytes()}, + }) + if i >= len(chunkMetas) { + break + } + + // Compose many frames as remote read (so sidecar StoreAPI) would do if requested by maxFrameBytes. + if opts.MaxFrameBytes > 0 && sBytes >= opts.MaxFrameBytes { + expected = append(expected, storepb.Series{Labels: sLset}) + sBytes = lBytes + } + } + } + testutil.Ok(t, all.Err()) + return h, expected +} + +// SeriesServer is test gRPC storeAPI series server. +type SeriesServer struct { + // This field just exist to pseudo-implement the unused methods of the interface. + storepb.Store_SeriesServer + + ctx context.Context + + SeriesSet []storepb.Series + Warnings []string + HintsSet []*types.Any + + Size int64 +} + +func NewSeriesServer(ctx context.Context) *SeriesServer { + return &SeriesServer{ctx: ctx} +} + +func (s *SeriesServer) Send(r *storepb.SeriesResponse) error { + s.Size += int64(r.Size()) + + if r.GetWarning() != "" { + s.Warnings = append(s.Warnings, r.GetWarning()) + return nil + } + + if r.GetSeries() != nil { + s.SeriesSet = append(s.SeriesSet, *r.GetSeries()) + return nil + } + + if r.GetHints() != nil { + s.HintsSet = append(s.HintsSet, r.GetHints()) + return nil + } + // Unsupported field, skip. + return nil +} + +func (s *SeriesServer) Context() context.Context { + return s.ctx +} + +func RunSeriesInterestingCases(t testutil.TB, maxSamples, maxSeries int, f func(t testutil.TB, samplesPerSeries, series int)) { + for _, tc := range []struct { + samplesPerSeries int + series int + }{ + { + samplesPerSeries: 1, + series: maxSeries, + }, + { + samplesPerSeries: maxSamples / (maxSeries / 10), + series: maxSeries / 10, + }, + { + samplesPerSeries: maxSamples, + series: 1, + }, + } { + if ok := t.Run(fmt.Sprintf("%dSeriesWith%dSamples", tc.series, tc.samplesPerSeries), func(t testutil.TB) { + f(t, tc.samplesPerSeries, tc.series) + }); !ok { + return + } + runtime.GC() + } +} + +// SeriesCase represents single test/benchmark case for testing storepb series. +type SeriesCase struct { + Name string + Req *storepb.SeriesRequest + + // Exact expectations are checked only for tests. For benchmarks only length is assured. + ExpectedSeries []storepb.Series + ExpectedWarnings []string + ExpectedHints []hintspb.SeriesResponseHints +} + +// TestServerSeries runs tests against given cases. +func TestServerSeries(t testutil.TB, store storepb.StoreServer, cases ...*SeriesCase) { + for _, c := range cases { + t.Run(c.Name, func(t testutil.TB) { + t.ResetTimer() + for i := 0; i < t.N(); i++ { + srv := NewSeriesServer(context.Background()) + testutil.Ok(t, store.Series(c.Req, srv)) + testutil.Equals(t, len(c.ExpectedWarnings), len(srv.Warnings), "%v", srv.Warnings) + testutil.Equals(t, len(c.ExpectedSeries), len(srv.SeriesSet)) + testutil.Equals(t, len(c.ExpectedHints), len(srv.HintsSet)) + + if !t.IsBenchmark() { + if len(c.ExpectedSeries) == 1 { + // For bucketStoreAPI chunks are not sorted within response. TODO: Investigate: Is this fine? + sort.Slice(srv.SeriesSet[0].Chunks, func(i, j int) bool { + return srv.SeriesSet[0].Chunks[i].MinTime < srv.SeriesSet[0].Chunks[j].MinTime + }) + } + + testutil.Equals(t, c.ExpectedSeries[0].Chunks[0], srv.SeriesSet[0].Chunks[0]) + + // This might give unreadable output for millions of series on fail.. + testutil.Equals(t, c.ExpectedSeries, srv.SeriesSet) + + var actualHints []hintspb.SeriesResponseHints + for _, anyHints := range srv.HintsSet { + hints := hintspb.SeriesResponseHints{} + testutil.Ok(t, types.UnmarshalAny(anyHints, &hints)) + actualHints = append(actualHints, hints) + } + testutil.Equals(t, c.ExpectedHints, actualHints) + } + } + }) + } +} diff --git a/pkg/store/tsdb.go b/pkg/store/tsdb.go index b06cb3ac3ef..2b6055952e4 100644 --- a/pkg/store/tsdb.go +++ b/pkg/store/tsdb.go @@ -12,7 +12,7 @@ import ( "github.com/pkg/errors" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/prometheus/pkg/labels" - "github.com/prometheus/prometheus/tsdb" + "github.com/prometheus/prometheus/storage" "github.com/prometheus/prometheus/tsdb/chunkenc" "google.golang.org/grpc/codes" "google.golang.org/grpc/status" @@ -23,14 +23,20 @@ import ( "github.com/thanos-io/thanos/pkg/store/storepb" ) +type TSDBReader interface { + storage.Queryable + StartTime() (int64, error) +} + // TSDBStore implements the store API against a local TSDB instance. // It attaches the provided external labels to all results. It only responds with raw data // and does not support downsampling. type TSDBStore struct { - logger log.Logger - db *tsdb.DB - component component.StoreAPI - externalLabels labels.Labels + logger log.Logger + db TSDBReader + component component.StoreAPI + externalLabels labels.Labels + maxSamplesPerChunk int } // ReadWriteTSDBStore is a TSDBStore that can also be written to. @@ -40,7 +46,7 @@ type ReadWriteTSDBStore struct { } // NewTSDBStore creates a new TSDBStore. -func NewTSDBStore(logger log.Logger, _ prometheus.Registerer, db *tsdb.DB, component component.StoreAPI, externalLabels labels.Labels) *TSDBStore { +func NewTSDBStore(logger log.Logger, _ prometheus.Registerer, db TSDBReader, component component.StoreAPI, externalLabels labels.Labels) *TSDBStore { if logger == nil { logger = log.NewNopLogger() } @@ -49,20 +55,27 @@ func NewTSDBStore(logger log.Logger, _ prometheus.Registerer, db *tsdb.DB, compo db: db, component: component, externalLabels: externalLabels, + // NOTE: XOR encoding supports a max size of 2^16 - 1 samples, so we need + // to chunk all samples into groups of no more than 2^16 - 1 + // See: https://github.com/thanos-io/thanos/pull/1038. + // TODO(bwplotka): Consider 120 samples? + maxSamplesPerChunk: math.MaxInt64, } } // Info returns store information about the Prometheus instance. func (s *TSDBStore) Info(_ context.Context, _ *storepb.InfoRequest) (*storepb.InfoResponse, error) { + minTime, err := s.db.StartTime() + if err != nil { + return nil, errors.Wrap(err, "TSDB min Time") + } + res := &storepb.InfoResponse{ Labels: make([]storepb.Label, 0, len(s.externalLabels)), StoreType: s.component.ToProto(), - MinTime: 0, + MinTime: minTime, MaxTime: math.MaxInt64, } - if blocks := s.db.Blocks(); len(blocks) > 0 { - res.MinTime = blocks[0].Meta().MinTime - } for _, l := range s.externalLabels { res.Labels = append(res.Labels, storepb.Label{ Name: l.Name, @@ -120,12 +133,7 @@ func (s *TSDBStore) Series(r *storepb.SeriesRequest, srv storepb.Store_SeriesSer if !r.SkipChunks { // TODO(fabxc): An improvement over this trivial approach would be to directly // use the chunks provided by TSDB in the response. - // But since the sidecar has a similar approach, optimizing here has only - // limited benefit for now. - // NOTE: XOR encoding supports a max size of 2^16 - 1 samples, so we need - // to chunk all samples into groups of no more than 2^16 - 1 - // See: https://github.com/thanos-io/thanos/pull/1038. - c, err := s.encodeChunks(series.Iterator(), math.MaxUint16) + c, err := s.encodeChunks(series.Iterator(), s.maxSamplesPerChunk) if err != nil { return status.Errorf(codes.Internal, "encode chunk: %s", err) }