diff --git a/pkg/query/iter_bench_test.go b/pkg/query/iter_bench_test.go new file mode 100644 index 0000000000..f55a677e61 --- /dev/null +++ b/pkg/query/iter_bench_test.go @@ -0,0 +1,226 @@ +// Copyright (c) The Thanos Authors. +// Licensed under the Apache License 2.0. + +package query + +import ( + "fmt" + "math" + "math/rand" + "testing" + + "github.com/go-kit/kit/log" + "github.com/prometheus/prometheus/pkg/labels" + "github.com/prometheus/prometheus/tsdb/chunks" + "github.com/thanos-io/thanos/pkg/store/storepb" + "github.com/thanos-io/thanos/pkg/testutil" + "github.com/thanos-io/thanos/pkg/testutil/benchutil" +) + +func TestStoreSeriesSet(t *testing.T) { + tb := testutil.NewTB(t) + tb.Run(benchutil.OneSampleSeriesSubTestName(200e3), func(tb testutil.TB) { + benchStoreSeriesSet(tb, 200e3, benchutil.SeriesDimension) + }) + tb.Run(benchutil.OneSeriesManySamplesSubTestName(200e3), func(tb testutil.TB) { + benchStoreSeriesSet(tb, 200e3, benchutil.SamplesDimension) + }) +} + +func BenchmarkStoreSeriesSet(b *testing.B) { + tb := testutil.NewTB(b) + tb.Run(benchutil.OneSampleSeriesSubTestName(10e6), func(tb testutil.TB) { + benchStoreSeriesSet(tb, 10e6, benchutil.SeriesDimension) + }) + tb.Run(benchutil.OneSeriesManySamplesSubTestName(100e6), func(tb testutil.TB) { + // 100e6 samples = ~17361 days with 15s scrape. + benchStoreSeriesSet(tb, 100e6, benchutil.SamplesDimension) + }) +} + +func benchStoreSeriesSet(t testutil.TB, number int, dimension benchutil.Dimension) { + const numOfClients = 4 + var ( + numberPerClient = number / 4 + random = rand.New(rand.NewSource(120)) + ) + + // Build numOfClients of clients. + clients := make([]Client, numOfClients) + + for j := range clients { + var resps []*storepb.SeriesResponse + + switch dimension { + case benchutil.SeriesDimension: + fmt.Println("Building client with numSeries:", numberPerClient) + + h, created := benchutil.CreateSeriesWithOneSample(t, j, numberPerClient) + testutil.Ok(t, h.Close()) + + for i := 0; i < len(created); i++ { + resps = append(resps, storepb.NewSeriesResponse(&created[i])) + } + + clients[j] = &testClient{ + StoreClient: &mockedStoreAPI{ + RespSeries: resps, + }, + minTime: math.MinInt64, + maxTime: math.MaxInt64, + } + case benchutil.SamplesDimension: + fmt.Println("Building client with one series with numSamples:", numberPerClient) + + lblsSize := 0 + for _, l := range benchutil.SingleSeries.Labels { + lblsSize += l.Size() + } + func() { + h := benchutil.CreateOneSeriesWithManySamples(t, j, numberPerClient, random) + defer h.Close() + + chks, err := h.Chunks() + testutil.Ok(t, err) + + ir, err := h.Index() + testutil.Ok(t, err) + defer ir.Close() + + var ( + lset labels.Labels + chunkMetas []chunks.Meta + sBytes = lblsSize + ) + + all := allPostings(t, ir) + for all.Next() { + testutil.Ok(t, ir.Series(all.At(), &lset, &chunkMetas)) + + i := 0 + r := storepb.NewSeriesResponse(&storepb.Series{ + Labels: storepb.PromLabelsToLabelsUnsafe(lset), + }) + for { + c := chunkMetas[i] + i++ + + chBytes, err := chks.Chunk(c.Ref) + testutil.Ok(t, err) + + sBytes += len(chBytes.Bytes()) + + r.GetSeries().Chunks = append(r.GetSeries().Chunks, storepb.AggrChunk{ + MinTime: c.MinTime, + MaxTime: c.MaxTime, + Raw: &storepb.Chunk{Type: storepb.Chunk_XOR, Data: chBytes.Bytes()}, + }) + + // Compose many frames as remote read would do (so sidecar StoreAPI): 1048576 + if i >= len(chunkMetas) || sBytes >= 1048576 { + resps = append(resps, r) + r = storepb.NewSeriesResponse(&storepb.Series{ + Labels: storepb.PromLabelsToLabelsUnsafe(lset), + }) + } + if i >= len(chunkMetas) { + break + } + + } + } + testutil.Ok(t, all.Err()) + + clients[j] = &testClient{ + StoreClient: &mockedStoreAPI{ + RespSeries: resps, + }, + minTime: math.MinInt64, + maxTime: math.MaxInt64, + } + testutil.Ok(t, h.Close()) + }() + + default: + t.Fatal("unknown dimension", dimension) + } + } + + logger := log.NewNopLogger() + store := &ProxyStore{ + logger: logger, + stores: func() []Client { return clients }, + metrics: newProxyStoreMetrics(nil), + responseTimeout: 0, + } + + var resps []*storepb.SeriesResponse + var expected []storepb.Series + lastLabels := storepb.Series{} + for _, c := range clients { + m := c.(*testClient).StoreClient.(*mockedStoreAPI) + + for _, r := range m.RespSeries { + resps = append(resps, r) + + // Proxy will merge all series with same labels without limit (https://github.com/thanos-io/thanos/issues/2332). + // Let's do this here as well. + x := storepb.Series{Labels: r.GetSeries().Labels} + if x.String() == lastLabels.String() { + expected[len(expected)-1].Chunks = append(expected[len(expected)-1].Chunks, r.GetSeries().Chunks...) + continue + } + lastLabels = x + expected = append(expected, *r.GetSeries()) + } + + } + + chunkLen := len(resps[len(resps)-1].GetSeries().Chunks) + maxTime := resps[len(resps)-1].GetSeries().Chunks[chunkLen-1].MaxTime + benchmarkSeries(t, store, + &benchSeriesCase{ + name: fmt.Sprintf("%d of client with %d each, total %d", numOfClients, numberPerClient, number), + req: &storepb.SeriesRequest{ + MinTime: 0, + MaxTime: maxTime, + Matchers: []storepb.LabelMatcher{ + {Type: storepb.LabelMatcher_EQ, Name: "foo", Value: "bar"}, + }, + }, + expected: expected, + }, + ) + + // Change client to just one. + store.stores = func() []Client { + return []Client{&testClient{ + StoreClient: &mockedStoreAPI{ + // All responses. + RespSeries: resps, + }, + labelSets: []storepb.LabelSet{{Labels: []storepb.Label{{Name: "ext1", Value: "1"}}}}, + minTime: math.MinInt64, + maxTime: math.MaxInt64, + }} + } + + // In this we expect exactly the same response as input. + expected = expected[:0] + for _, r := range resps { + expected = append(expected, *r.GetSeries()) + } + benchmarkSeries(t, store, + &benchSeriesCase{ + name: fmt.Sprintf("single client with %d", number), + req: &storepb.SeriesRequest{ + MinTime: 0, + MaxTime: maxTime, + Matchers: []storepb.LabelMatcher{ + {Type: storepb.LabelMatcher_EQ, Name: "foo", Value: "bar"}, + }, + }, + expected: expected, + }, + ) +} diff --git a/pkg/store/bucket_test.go b/pkg/store/bucket_test.go index 3c799e26f3..21b51895a4 100644 --- a/pkg/store/bucket_test.go +++ b/pkg/store/bucket_test.go @@ -38,10 +38,8 @@ import ( "github.com/prometheus/prometheus/pkg/relabel" "github.com/prometheus/prometheus/storage" "github.com/prometheus/prometheus/tsdb" - "github.com/prometheus/prometheus/tsdb/chunkenc" - "github.com/prometheus/prometheus/tsdb/chunks" "github.com/prometheus/prometheus/tsdb/encoding" - "github.com/prometheus/prometheus/tsdb/index" + "github.com/thanos-io/thanos/pkg/store/storepb/storetestutil" "gopkg.in/yaml.v2" "github.com/thanos-io/thanos/pkg/block" @@ -1023,11 +1021,6 @@ func BenchmarkBucketIndexReader_ExpandedPostings(b *testing.B) { benchmarkExpandedPostings(tb, bkt, id, r, 50e5) } -// Make entries ~50B in size, to emulate real-world high cardinality. -const ( - postingsBenchSuffix = "aaaaaaaaaabbbbbbbbbbccccccccccdddddddddd" -) - func uploadTestBlock(t testing.TB, tmpDir string, bkt objstore.Bucket, series int) ulid.ULID { h, err := tsdb.NewHead(nil, nil, nil, 1000, tmpDir, nil, tsdb.DefaultStripeSize, nil) testutil.Ok(t, err) @@ -1062,12 +1055,12 @@ func appendTestData(t testing.TB, app storage.Appender, series int) { series = series / 5 for n := 0; n < 10; n++ { for i := 0; i < series/10; i++ { - addSeries(labels.FromStrings("i", strconv.Itoa(i)+postingsBenchSuffix, "n", strconv.Itoa(n)+postingsBenchSuffix, "j", "foo")) + addSeries(labels.FromStrings("i", strconv.Itoa(i)+storetestutil.LabelLongSuffix, "n", strconv.Itoa(n)+storetestutil.LabelLongSuffix, "j", "foo")) // Have some series that won't be matched, to properly test inverted matches. - addSeries(labels.FromStrings("i", strconv.Itoa(i)+postingsBenchSuffix, "n", strconv.Itoa(n)+postingsBenchSuffix, "j", "bar")) - addSeries(labels.FromStrings("i", strconv.Itoa(i)+postingsBenchSuffix, "n", "0_"+strconv.Itoa(n)+postingsBenchSuffix, "j", "bar")) - addSeries(labels.FromStrings("i", strconv.Itoa(i)+postingsBenchSuffix, "n", "1_"+strconv.Itoa(n)+postingsBenchSuffix, "j", "bar")) - addSeries(labels.FromStrings("i", strconv.Itoa(i)+postingsBenchSuffix, "n", "2_"+strconv.Itoa(n)+postingsBenchSuffix, "j", "foo")) + addSeries(labels.FromStrings("i", strconv.Itoa(i)+storetestutil.LabelLongSuffix, "n", strconv.Itoa(n)+storetestutil.LabelLongSuffix, "j", "bar")) + addSeries(labels.FromStrings("i", strconv.Itoa(i)+storetestutil.LabelLongSuffix, "n", "0_"+strconv.Itoa(n)+storetestutil.LabelLongSuffix, "j", "bar")) + addSeries(labels.FromStrings("i", strconv.Itoa(i)+storetestutil.LabelLongSuffix, "n", "1_"+strconv.Itoa(n)+storetestutil.LabelLongSuffix, "j", "bar")) + addSeries(labels.FromStrings("i", strconv.Itoa(i)+storetestutil.LabelLongSuffix, "n", "2_"+strconv.Itoa(n)+storetestutil.LabelLongSuffix, "j", "foo")) } } testutil.Ok(t, app.Commit()) @@ -1095,7 +1088,7 @@ func benchmarkExpandedPostings( r indexheader.Reader, series int, ) { - n1 := labels.MustNewMatcher(labels.MatchEqual, "n", "1"+postingsBenchSuffix) + n1 := labels.MustNewMatcher(labels.MatchEqual, "n", "1"+storetestutil.LabelLongSuffix) jFoo := labels.MustNewMatcher(labels.MatchEqual, "j", "foo") jNotFoo := labels.MustNewMatcher(labels.MatchNotEqual, "j", "foo") @@ -1105,9 +1098,9 @@ func benchmarkExpandedPostings( i1Plus := labels.MustNewMatcher(labels.MatchRegexp, "i", "^1.+$") iEmptyRe := labels.MustNewMatcher(labels.MatchRegexp, "i", "^$") iNotEmpty := labels.MustNewMatcher(labels.MatchNotEqual, "i", "") - iNot2 := labels.MustNewMatcher(labels.MatchNotEqual, "n", "2"+postingsBenchSuffix) + iNot2 := labels.MustNewMatcher(labels.MatchNotEqual, "n", "2"+storetestutil.LabelLongSuffix) iNot2Star := labels.MustNewMatcher(labels.MatchNotRegexp, "i", "^2.*$") - iRegexSet := labels.MustNewMatcher(labels.MatchRegexp, "i", "0"+postingsBenchSuffix+"|1"+postingsBenchSuffix+"|2"+postingsBenchSuffix) + iRegexSet := labels.MustNewMatcher(labels.MatchRegexp, "i", "0"+storetestutil.LabelLongSuffix+"|1"+storetestutil.LabelLongSuffix+"|2"+storetestutil.LabelLongSuffix) series = series / 5 cases := []struct { @@ -1158,106 +1151,25 @@ func benchmarkExpandedPostings( } } -func newSeries(t testing.TB, lset labels.Labels, smplChunks [][]sample) storepb.Series { - var s storepb.Series - - for _, l := range lset { - s.Labels = append(s.Labels, storepb.Label{Name: l.Name, Value: l.Value}) - } - - for _, smpls := range smplChunks { - c := chunkenc.NewXORChunk() - a, err := c.Appender() - testutil.Ok(t, err) - - for _, smpl := range smpls { - a.Append(smpl.t, smpl.v) - } - - ch := storepb.AggrChunk{ - MinTime: smpls[0].t, - MaxTime: smpls[len(smpls)-1].t, - Raw: &storepb.Chunk{Type: storepb.Chunk_XOR, Data: c.Bytes()}, - } - - s.Chunks = append(s.Chunks, ch) - } - return s -} - -func TestSeries(t *testing.T) { +func TestBucketSeries(t *testing.T) { tb := testutil.NewTB(t) - tb.Run("200e3SeriesWithOneSample", func(tb testutil.TB) { - benchSeries(tb, 200e3, seriesDimension, 200e3) - }) - tb.Run("OneSeriesWith200e3Samples", func(tb testutil.TB) { - benchSeries(tb, 200e3, samplesDimension, 200e3) + storetestutil.RunSeriesInterestingCases(tb, 200e3, 200e3, func(t testutil.TB, samplesPerSeries, series int) { + benchBucketSeries(t, samplesPerSeries, series, 1) }) } -func BenchmarkSeries(b *testing.B) { +func BenchmarkBucketSeries(b *testing.B) { tb := testutil.NewTB(b) - tb.Run("10e6SeriesWithOneSample", func(tb testutil.TB) { - benchSeries(tb, 10e6, seriesDimension, 1, 10, 10e1, 10e2, 10e3, 10e4, 10e5) // This is too big for my machine: 10e6. - }) - tb.Run("OneSeriesWith100e6Samples", func(tb testutil.TB) { - // 100e6 samples = ~17361 days with 15s scrape. - benchSeries(tb, 100e6, samplesDimension, 1, 10, 10e1, 10e2, 10e3, 10e4, 10e5, 10e6) // This is too big for my machine: 100e6. + // 10e6 samples = ~1736 days with 15s scrape + storetestutil.RunSeriesInterestingCases(tb, 10e6, 10e5, func(t testutil.TB, samplesPerSeries, series int) { + benchBucketSeries(t, samplesPerSeries, series, 1/100e6, 1/10e4, 1) }) } -func createBlockWithOneSample(t testutil.TB, dir string, blockIndex int, totalSeries int) (ulid.ULID, []storepb.Series) { - fmt.Println("Building block with numSeries:", totalSeries) +func benchBucketSeries(t testutil.TB, totalSamples, totalSeries int, requestedRatios ...float64) { + const numOfBlocks = 4 - var series []storepb.Series - h, err := tsdb.NewHead(nil, nil, nil, 1, dir, nil, tsdb.DefaultStripeSize, nil) - testutil.Ok(t, err) - defer func() { testutil.Ok(t, h.Close()) }() - - app := h.Appender() - - for i := 0; i < totalSeries; i++ { - ts := int64(blockIndex*totalSeries + i) - lbls := labels.FromStrings("foo", "bar", "i", fmt.Sprintf("%07d%s", ts, postingsBenchSuffix)) - series = append(series, newSeries(t, append(labels.Labels{{Name: "ext1", Value: "1"}}, lbls...), [][]sample{{sample{t: ts, v: 0}}})) - - _, err := app.Add(lbls, ts, 0) - testutil.Ok(t, err) - } - testutil.Ok(t, app.Commit()) - - return createBlockFromHead(t, dir, h), series -} - -func createBlockWithOneSeries(t testutil.TB, dir string, lbls labels.Labels, blockIndex int, totalSamples int, random *rand.Rand) ulid.ULID { - fmt.Println("Building block with one series with numSamples:", totalSamples) - - h, err := tsdb.NewHead(nil, nil, nil, int64(totalSamples), dir, nil, tsdb.DefaultStripeSize, nil) - testutil.Ok(t, err) - defer func() { testutil.Ok(t, h.Close()) }() - - app := h.Appender() - - ref, err := app.Add(lbls, int64(blockIndex*totalSamples), random.Float64()) - testutil.Ok(t, err) - for i := 1; i < totalSamples; i++ { - ts := int64(blockIndex*totalSamples + i) - testutil.Ok(t, app.AddFast(ref, ts, random.Float64())) - } - testutil.Ok(t, app.Commit()) - - return createBlockFromHead(t, dir, h) -} - -type Dimension string - -const ( - seriesDimension = Dimension("series") - samplesDimension = Dimension("samples") -) - -func benchSeries(t testutil.TB, number int, dimension Dimension, cases ...int) { - tmpDir, err := ioutil.TempDir("", "testorbench-series") + tmpDir, err := ioutil.TempDir("", "testorbench-bucketseries") testutil.Ok(t, err) defer func() { testutil.Ok(t, os.RemoveAll(tmpDir)) }() @@ -1272,17 +1184,6 @@ func benchSeries(t testutil.TB, number int, dimension Dimension, cases ...int) { random = rand.New(rand.NewSource(120)) ) - numberPerBlock := number / 4 - lbls := labels.FromStrings("foo", "bar", "i", postingsBenchSuffix) - switch dimension { - case seriesDimension: - series = make([]storepb.Series, 0, 4*numberPerBlock) - case samplesDimension: - series = []storepb.Series{newSeries(t, append(labels.Labels{{Name: "ext1", Value: "1"}}, lbls...), nil)} - default: - t.Fatal("unknown dimension", dimension) - } - thanosMeta := metadata.Thanos{ Labels: labels.Labels{{Name: "ext1", Value: "1"}}.Map(), Downsample: metadata.ThanosDownsample{Resolution: 0}, @@ -1296,106 +1197,25 @@ func benchSeries(t testutil.TB, number int, dimension Dimension, cases ...int) { if !t.IsBenchmark() { chunkPool = &mockedPool{parent: chunkPool} } - blockDir := filepath.Join(tmpDir, "tmp") - var preBuildBlockIDs []ulid.ULID - // Local dev optimization to fetch those big blocks, instead of recreating. - // We cannot really commit this to Git (2GB). - // TODO(bwplotka): Provide them in objstore instead?. - if t.IsBenchmark() { - switch dimension { - case seriesDimension: - p := filepath.Join(".", "test-data", "10e6seriesOneSample") - if _, err := os.Stat(p); err == nil { - blockDir = p - } - case samplesDimension: - p := filepath.Join(".", "test-data", "1series100e6Samples") - if _, err := os.Stat(p); err == nil { - blockDir = p - } - } - - info, err := ioutil.ReadDir(blockDir) - if err == nil { - for _, d := range info { - if !d.IsDir() { - continue - } - - id, err := ulid.Parse(d.Name()) - if err != nil { - continue - } - - preBuildBlockIDs = append(preBuildBlockIDs, id) - } - } + samplesPerSeriesPerBlock := totalSamples / numOfBlocks + if samplesPerSeriesPerBlock == 0 { + samplesPerSeriesPerBlock = 1 + } + seriesPerBlock := totalSeries / numOfBlocks + if seriesPerBlock == 0 { + seriesPerBlock = 1 } - for bi := 0; bi < 4; bi++ { - var bSeries []storepb.Series - - var id ulid.ULID - switch dimension { - case seriesDimension: - if len(preBuildBlockIDs) > 0 { - id = preBuildBlockIDs[bi] - fmt.Println("Using pre-build block:", id) - break - } - // Create 4 blocks. Each will have numSeriesPerBlock number of series that have 1 sample only. - // Timestamp will be counted for each new series, so each series will have unique timestamp. - // This allows to pick time range that will correspond to number of series picked 1:1. - id, bSeries = createBlockWithOneSample(t, blockDir, bi, numberPerBlock) - series = append(series, bSeries...) - case samplesDimension: - if len(preBuildBlockIDs) > 0 { - id = preBuildBlockIDs[bi] - fmt.Println("Using pre-build block:", id) - } else { - // Create 4 blocks. Each will have numSeriesPerBlock number of series that have 1 sample only. - // Timestamp will be counted for each new series, so each series will have unique timestamp. - // This allows to pick time range that will correspond to number of series picked 1:1. - id = createBlockWithOneSeries(t, blockDir, lbls, bi, numberPerBlock, random) - } - - if !t.IsBenchmark() { - // Reread chunks for ref. - indexr, err := index.NewFileReader(filepath.Join(blockDir, id.String(), "index")) - testutil.Ok(t, err) - b, err := chunks.NewDirReader(filepath.Join(blockDir, id.String(), "chunks"), nil) - testutil.Ok(t, err) - - k, v := index.AllPostingsKey() - all, err := indexr.Postings(k, v) - testutil.Ok(t, err) - - p, err := index.ExpandPostings(all) - testutil.Ok(t, err) - - // One series expected. - testutil.Equals(t, 1, len(p)) - l := labels.Labels{} - chs := []chunks.Meta{} - testutil.Ok(t, indexr.Series(p[0], &l, &chs)) - - for _, c := range chs { - raw, err := b.Chunk(c.Ref) - testutil.Ok(t, err) - - series[0].Chunks = append(series[0].Chunks, storepb.AggrChunk{ - MaxTime: c.MaxTime, - MinTime: c.MinTime, - Raw: &storepb.Chunk{ - Data: raw.Bytes(), - Type: storepb.Chunk_XOR, - }, - }) - } - } - } + // Create 4 blocks. Each will have seriesPerBlock number of series that have samplesPerSeriesPerBlock samples. + // Timestamp will be counted for each new series and new sample, so each each series will have unique timestamp. + // This allows to pick time range that will correspond to number of series picked 1:1. + for bi := 0; bi < numOfBlocks; bi++ { + head, bSeries := storetestutil.CreateHeadWithSeries(t, bi, samplesPerSeriesPerBlock, seriesPerBlock, random) + id := createBlockFromHead(t, blockDir, head) + testutil.Ok(t, head.Close()) + series = append(series, bSeries...) meta, err := metadata.InjectThanos(log.NewNopLogger(), filepath.Join(blockDir, id.String()), thanosMeta, nil) testutil.Ok(t, err) @@ -1431,36 +1251,31 @@ func benchSeries(t testutil.TB, number int, dimension Dimension, cases ...int) { testutil.Ok(t, err) } - var bCases []*benchSeriesCase - for _, c := range cases { - var expected []storepb.Series - - switch dimension { - case seriesDimension: - expected = series[:c] - case samplesDimension: - expected = series + var bCases []*storetestutil.SeriesCase + for _, p := range requestedRatios { + seriesCut := int(p * float64(totalSeries)) + if seriesCut == 0 { + seriesCut = 1 } - - bCases = append(bCases, &benchSeriesCase{ - name: fmt.Sprintf("%dof%d", c, 4*numberPerBlock), - req: &storepb.SeriesRequest{ + allCut := int(p * float64(totalSeries*totalSamples)) + bCases = append(bCases, &storetestutil.SeriesCase{ + Name: fmt.Sprintf("%dof%d", allCut, totalSeries*totalSamples), + Req: &storepb.SeriesRequest{ MinTime: 0, - MaxTime: int64(c) - 1, + MaxTime: int64(allCut) - 1, Matchers: []storepb.LabelMatcher{ {Type: storepb.LabelMatcher_EQ, Name: "foo", Value: "bar"}, }, }, - expectedSeries: expected, + // This does not cut chunks properly, but those are assured against for non benchmarks only, where we use 100% case only. + ExpectedSeries: series[:seriesCut], }) } + storetestutil.TestServerSeries(t, store, bCases...) - fmt.Println("Starting") - - benchmarkSeries(t, store, bCases) if !t.IsBenchmark() { // Make sure the pool is correctly used. This is expected for 200k numbers. - testutil.Equals(t, 4, int(chunkPool.(*mockedPool).gets)) + testutil.Equals(t, numOfBlocks, int(chunkPool.(*mockedPool).gets)) // TODO(bwplotka): This is super negative for large number of samples (1mln). Investigate. testutil.Equals(t, 0, int(chunkPool.(*mockedPool).balance)) chunkPool.(*mockedPool).gets = 0 @@ -1513,49 +1328,8 @@ type noopLimiter struct{} func (noopLimiter) Check(uint64) error { return nil } -type benchSeriesCase struct { - name string - req *storepb.SeriesRequest - expectedSeries []storepb.Series - expectedHints []hintspb.SeriesResponseHints -} - -func benchmarkSeries(t testutil.TB, store *BucketStore, cases []*benchSeriesCase) { - for _, c := range cases { - t.Run(c.name, func(t testutil.TB) { - t.ResetTimer() - for i := 0; i < t.N(); i++ { - srv := newStoreSeriesServer(context.Background()) - testutil.Ok(t, store.Series(c.req, srv)) - testutil.Equals(t, 0, len(srv.Warnings)) - testutil.Equals(t, len(c.expectedSeries), len(srv.SeriesSet)) - - if !t.IsBenchmark() { - if len(c.expectedSeries) == 1 { - // Chunks are not sorted within response. TODO: Investigate: Is this fine? - sort.Slice(srv.SeriesSet[0].Chunks, func(i, j int) bool { - return srv.SeriesSet[0].Chunks[i].MinTime < srv.SeriesSet[0].Chunks[j].MinTime - }) - } - // This might give unreadable output for millions of series if error. - testutil.Equals(t, c.expectedSeries, srv.SeriesSet) - - var actualHints []hintspb.SeriesResponseHints - for _, anyHints := range srv.HintsSet { - hints := hintspb.SeriesResponseHints{} - testutil.Ok(t, types.UnmarshalAny(anyHints, &hints)) - actualHints = append(actualHints, hints) - } - testutil.Equals(t, c.expectedHints, actualHints) - } - - } - }) - } -} - // Regression test against: https://github.com/thanos-io/thanos/issues/2147. -func TestSeries_OneBlock_InMemIndexCacheSegfault(t *testing.T) { +func TestBucketSeries_OneBlock_InMemIndexCacheSegfault(t *testing.T) { tmpDir, err := ioutil.TempDir("", "segfault-series") testutil.Ok(t, err) defer func() { testutil.Ok(t, os.RemoveAll(tmpDir)) }() @@ -1565,7 +1339,6 @@ func TestSeries_OneBlock_InMemIndexCacheSegfault(t *testing.T) { defer func() { testutil.Ok(t, bkt.Close()) }() logger := log.NewNopLogger() - thanosMeta := metadata.Thanos{ Labels: labels.Labels{{Name: "ext1", Value: "1"}}.Map(), Downsample: metadata.ThanosDownsample{Resolution: 0}, @@ -1600,7 +1373,7 @@ func TestSeries_OneBlock_InMemIndexCacheSegfault(t *testing.T) { for i := 0; i < numSeries; i++ { ts := int64(i) - lbls := labels.FromStrings("foo", "bar", "b", "1", "i", fmt.Sprintf("%07d%s", ts, postingsBenchSuffix)) + lbls := labels.FromStrings("foo", "bar", "b", "1", "i", fmt.Sprintf("%07d%s", ts, storetestutil.LabelLongSuffix)) _, err := app.Add(lbls, ts, 0) testutil.Ok(t, err) @@ -1638,7 +1411,7 @@ func TestSeries_OneBlock_InMemIndexCacheSegfault(t *testing.T) { for i := 0; i < numSeries; i++ { ts := int64(i) - lbls := labels.FromStrings("foo", "bar", "b", "2", "i", fmt.Sprintf("%07d%s", ts, postingsBenchSuffix)) + lbls := labels.FromStrings("foo", "bar", "b", "2", "i", fmt.Sprintf("%07d%s", ts, storetestutil.LabelLongSuffix)) _, err := app.Add(lbls, ts, 0) testutil.Ok(t, err) @@ -1745,11 +1518,16 @@ func TestSeries_RequestAndResponseHints(t *testing.T) { var ( logger = log.NewNopLogger() instrBkt = objstore.WithNoopInstr(bkt) + random = rand.New(rand.NewSource(120)) ) // Create TSDB blocks. - block1, seriesSet1 := createBlockWithOneSample(tb, bktDir, 0, 2) - block2, seriesSet2 := createBlockWithOneSample(tb, bktDir, 1, 2) + head, seriesSet1 := storetestutil.CreateHeadWithSeries(t, 0, 1, 2, random) + block1 := createBlockFromHead(t, bktDir, head) + testutil.Ok(t, head.Close()) + head2, seriesSet2 := storetestutil.CreateHeadWithSeries(t, 0, 1, 2, random) + block2 := createBlockFromHead(t, bktDir, head2) + testutil.Ok(t, head2.Close()) // Inject the Thanos meta to each block in the storage. thanosMeta := metadata.Thanos{ @@ -1791,18 +1569,18 @@ func TestSeries_RequestAndResponseHints(t *testing.T) { testutil.Ok(tb, err) testutil.Ok(tb, store.SyncBlocks(context.Background())) - testCases := []*benchSeriesCase{ + testCases := []*storetestutil.SeriesCase{ { - name: "querying a range containing 1 block should return 1 block in the response hints", - req: &storepb.SeriesRequest{ + Name: "querying a range containing 1 block should return 1 block in the response hints", + Req: &storepb.SeriesRequest{ MinTime: 0, MaxTime: 1, Matchers: []storepb.LabelMatcher{ {Type: storepb.LabelMatcher_EQ, Name: "foo", Value: "bar"}, }, }, - expectedSeries: seriesSet1, - expectedHints: []hintspb.SeriesResponseHints{ + ExpectedSeries: seriesSet1, + ExpectedHints: []hintspb.SeriesResponseHints{ { QueriedBlocks: []hintspb.Block{ {Id: block1.String()}, @@ -1810,16 +1588,16 @@ func TestSeries_RequestAndResponseHints(t *testing.T) { }, }, }, { - name: "querying a range containing multiple blocks should return multiple blocks in the response hints", - req: &storepb.SeriesRequest{ + Name: "querying a range containing multiple blocks should return multiple blocks in the response hints", + Req: &storepb.SeriesRequest{ MinTime: 0, MaxTime: 3, Matchers: []storepb.LabelMatcher{ {Type: storepb.LabelMatcher_EQ, Name: "foo", Value: "bar"}, }, }, - expectedSeries: append(append([]storepb.Series{}, seriesSet1...), seriesSet2...), - expectedHints: []hintspb.SeriesResponseHints{ + ExpectedSeries: append(append([]storepb.Series{}, seriesSet1...), seriesSet2...), + ExpectedHints: []hintspb.SeriesResponseHints{ { QueriedBlocks: []hintspb.Block{ {Id: block1.String()}, @@ -1828,8 +1606,8 @@ func TestSeries_RequestAndResponseHints(t *testing.T) { }, }, }, { - name: "querying a range containing multiple blocks but filtering a specific block should query only the requested block", - req: &storepb.SeriesRequest{ + Name: "querying a range containing multiple blocks but filtering a specific block should query only the requested block", + Req: &storepb.SeriesRequest{ MinTime: 0, MaxTime: 3, Matchers: []storepb.LabelMatcher{ @@ -1841,8 +1619,8 @@ func TestSeries_RequestAndResponseHints(t *testing.T) { }, }), }, - expectedSeries: seriesSet1, - expectedHints: []hintspb.SeriesResponseHints{ + ExpectedSeries: seriesSet1, + ExpectedHints: []hintspb.SeriesResponseHints{ { QueriedBlocks: []hintspb.Block{ {Id: block1.String()}, @@ -1852,7 +1630,7 @@ func TestSeries_RequestAndResponseHints(t *testing.T) { }, } - benchmarkSeries(tb, store, testCases) + storetestutil.TestServerSeries(tb, store, testCases...) } func TestSeries_ErrorUnmarshallingRequestHints(t *testing.T) { diff --git a/pkg/store/postings_codec_test.go b/pkg/store/postings_codec_test.go index 7f22493557..2513980d4c 100644 --- a/pkg/store/postings_codec_test.go +++ b/pkg/store/postings_codec_test.go @@ -14,7 +14,7 @@ import ( "github.com/prometheus/prometheus/pkg/labels" "github.com/prometheus/prometheus/tsdb" "github.com/prometheus/prometheus/tsdb/index" - + "github.com/thanos-io/thanos/pkg/store/storepb/storetestutil" "github.com/thanos-io/thanos/pkg/testutil" ) @@ -39,7 +39,7 @@ func TestDiffVarintCodec(t *testing.T) { postingsMap := map[string]index.Postings{ "all": allPostings(t, idx), - `n="1"`: matchPostings(t, idx, labels.MustNewMatcher(labels.MatchEqual, "n", "1"+postingsBenchSuffix)), + `n="1"`: matchPostings(t, idx, labels.MustNewMatcher(labels.MatchEqual, "n", "1"+storetestutil.LabelLongSuffix)), `j="foo"`: matchPostings(t, idx, labels.MustNewMatcher(labels.MatchEqual, "j", "foo")), `j!="foo"`: matchPostings(t, idx, labels.MustNewMatcher(labels.MatchNotEqual, "j", "foo")), `i=~".*"`: matchPostings(t, idx, labels.MustNewMatcher(labels.MatchRegexp, "i", ".*")), @@ -47,7 +47,7 @@ func TestDiffVarintCodec(t *testing.T) { `i=~"1.+"`: matchPostings(t, idx, labels.MustNewMatcher(labels.MatchRegexp, "i", "1.+")), `i=~"^$"'`: matchPostings(t, idx, labels.MustNewMatcher(labels.MatchRegexp, "i", "^$")), `i!~""`: matchPostings(t, idx, labels.MustNewMatcher(labels.MatchNotEqual, "i", "")), - `n!="2"`: matchPostings(t, idx, labels.MustNewMatcher(labels.MatchNotEqual, "n", "2"+postingsBenchSuffix)), + `n!="2"`: matchPostings(t, idx, labels.MustNewMatcher(labels.MatchNotEqual, "n", "2"+storetestutil.LabelLongSuffix)), `i!~"2.*"`: matchPostings(t, idx, labels.MustNewMatcher(labels.MatchNotRegexp, "i", "^2.*$")), } diff --git a/pkg/store/proxy.go b/pkg/store/proxy.go index b5c310920e..a14db1d572 100644 --- a/pkg/store/proxy.go +++ b/pkg/store/proxy.go @@ -294,6 +294,10 @@ func (s *ProxyStore) Series(r *storepb.SeriesRequest, srv storepb.Store_SeriesSe return nil } + // TODO(bwplotka): Currently we stream into big frames. Consider ensuring 1MB maximum. + // This however does not matter much when used with QueryAPI. Matters for federated Queries a lot. + // https://github.com/thanos-io/thanos/issues/2332 + // Series are not necessarily merged across themselves. mergedSet := storepb.MergeSeriesSets(seriesSet...) for mergedSet.Next() { var series storepb.Series diff --git a/pkg/store/proxy_bench_test.go b/pkg/store/proxy_bench_test.go new file mode 100644 index 0000000000..f28a6f09b9 --- /dev/null +++ b/pkg/store/proxy_bench_test.go @@ -0,0 +1,142 @@ +// Copyright (c) The Thanos Authors. +// Licensed under the Apache License 2.0. + +package store + +import ( + "fmt" + "math" + "math/rand" + "testing" + + "github.com/go-kit/kit/log" + "github.com/thanos-io/thanos/pkg/store/storepb" + "github.com/thanos-io/thanos/pkg/store/storepb/storetestutil" + "github.com/thanos-io/thanos/pkg/testutil" +) + +func TestProxySeries(t *testing.T) { + tb := testutil.NewTB(t) + storetestutil.RunSeriesInterestingCases(tb, 200e3, 200e3, func(t testutil.TB, samplesPerSeries, series int) { + benchProxySeries(t, samplesPerSeries, series) + }) +} + +func BenchmarkProxySeries(b *testing.B) { + tb := testutil.NewTB(b) + storetestutil.RunSeriesInterestingCases(tb, 10e6, 10e5, func(t testutil.TB, samplesPerSeries, series int) { + benchProxySeries(t, samplesPerSeries, series) + }) +} + +func benchProxySeries(t testutil.TB, totalSamples, totalSeries int) { + const numOfClients = 4 + + samplesPerSeriesPerClient := totalSamples / numOfClients + if samplesPerSeriesPerClient == 0 { + samplesPerSeriesPerClient = 1 + } + seriesPerClient := totalSeries / numOfClients + if seriesPerClient == 0 { + seriesPerClient = 1 + } + + random := rand.New(rand.NewSource(120)) + clients := make([]Client, numOfClients) + for j := range clients { + var resps []*storepb.SeriesResponse + + head, created := storetestutil.CreateHeadWithSeries(t, j, samplesPerSeriesPerClient, seriesPerClient, random) + testutil.Ok(t, head.Close()) + + for i := 0; i < len(created); i++ { + resps = append(resps, storepb.NewSeriesResponse(&created[i])) + } + + clients[j] = &testClient{ + StoreClient: &mockedStoreAPI{ + RespSeries: resps, + }, + minTime: math.MinInt64, + maxTime: math.MaxInt64, + } + } + + logger := log.NewNopLogger() + store := &ProxyStore{ + logger: logger, + stores: func() []Client { return clients }, + metrics: newProxyStoreMetrics(nil), + responseTimeout: 0, + } + + var allResps []*storepb.SeriesResponse + var expected []storepb.Series + lastLabels := storepb.Series{} + for _, c := range clients { + m := c.(*testClient).StoreClient.(*mockedStoreAPI) + + for _, r := range m.RespSeries { + allResps = append(allResps, r) + + // Proxy will merge all series with same labels without limit (https://github.com/thanos-io/thanos/issues/2332). + // Let's do this here as well. + x := storepb.Series{Labels: r.GetSeries().Labels} + if x.String() == lastLabels.String() { + expected[len(expected)-1].Chunks = append(expected[len(expected)-1].Chunks, r.GetSeries().Chunks...) + continue + } + lastLabels = x + expected = append(expected, *r.GetSeries()) + } + + } + + chunkLen := len(allResps[len(allResps)-1].GetSeries().Chunks) + maxTime := allResps[len(allResps)-1].GetSeries().Chunks[chunkLen-1].MaxTime + storetestutil.TestServerSeries(t, store, + &storetestutil.SeriesCase{ + Name: fmt.Sprintf("%d client with %d samples, %d series each", numOfClients, samplesPerSeriesPerClient, seriesPerClient), + Req: &storepb.SeriesRequest{ + MinTime: 0, + MaxTime: maxTime, + Matchers: []storepb.LabelMatcher{ + {Type: storepb.LabelMatcher_EQ, Name: "foo", Value: "bar"}, + }, + }, + ExpectedSeries: expected, + }, + ) + + // Change client to just one. + store.stores = func() []Client { + return []Client{&testClient{ + StoreClient: &mockedStoreAPI{ + // All responses. + RespSeries: allResps, + }, + labelSets: []storepb.LabelSet{{Labels: []storepb.Label{{Name: "ext1", Value: "1"}}}}, + minTime: math.MinInt64, + maxTime: math.MaxInt64, + }} + } + + // In this we expect exactly the same response as input. + expected = expected[:0] + for _, r := range allResps { + expected = append(expected, *r.GetSeries()) + } + storetestutil.TestServerSeries(t, store, + &storetestutil.SeriesCase{ + Name: fmt.Sprintf("single client with %d samples, %d series", totalSamples, totalSeries), + Req: &storepb.SeriesRequest{ + MinTime: 0, + MaxTime: maxTime, + Matchers: []storepb.LabelMatcher{ + {Type: storepb.LabelMatcher_EQ, Name: "foo", Value: "bar"}, + }, + }, + ExpectedSeries: expected, + }, + ) +} diff --git a/pkg/store/storepb/storetestutil/series.go b/pkg/store/storepb/storetestutil/series.go new file mode 100644 index 0000000000..d22321ccc5 --- /dev/null +++ b/pkg/store/storepb/storetestutil/series.go @@ -0,0 +1,235 @@ +package storetestutil + +import ( + "context" + "fmt" + "math/rand" + "runtime" + "sort" + "testing" + + "github.com/gogo/protobuf/types" + "github.com/prometheus/prometheus/pkg/labels" + "github.com/prometheus/prometheus/tsdb" + "github.com/prometheus/prometheus/tsdb/chunks" + "github.com/prometheus/prometheus/tsdb/index" + "github.com/thanos-io/thanos/pkg/store/hintspb" + "github.com/thanos-io/thanos/pkg/store/storepb" + "github.com/thanos-io/thanos/pkg/testutil" +) + +const ( + // LabelLongSuffix is a label with ~50B in size, to emulate real-world high cardinality. + LabelLongSuffix = "aaaaaaaaaabbbbbbbbbbccccccccccdddddddddd" +) + +func allPostings(t testing.TB, ix tsdb.IndexReader) index.Postings { + k, v := index.AllPostingsKey() + p, err := ix.Postings(k, v) + testutil.Ok(t, err) + return p +} + +// CreateHeadWithSeries returns head filled with given samples and same series returned in separate list for assertion purposes. +// Returned series list has "ext1"="1" prepended. Each series looks as follows: +// {foo=bar,i=000001aaaaaaaaaabbbbbbbbbbccccccccccdddddddddd} where number indicate sample number from 0. +// Returned series are frame in same way as remote read would frame them. +func CreateHeadWithSeries(t testing.TB, j int, samplesPerSeries int, series int, random *rand.Rand) (*tsdb.Head, []storepb.Series) { + if samplesPerSeries < 1 || series < 1 { + t.Fatal("samples and series has to be 1 or more") + } + + fmt.Printf("Creating %d %d-sample series:\n", samplesPerSeries, series) + + h, err := tsdb.NewHead(nil, nil, nil, int64(samplesPerSeries), "", nil, tsdb.DefaultStripeSize, nil) + testutil.Ok(t, err) + + app := h.Appender() + for i := 0; i < series; i++ { + ts := int64(j*series*samplesPerSeries + i) + ref, err := app.Add(labels.FromStrings("foo", "bar", "i", fmt.Sprintf("%07d%s", ts, LabelLongSuffix)), ts, random.Float64()) + testutil.Ok(t, err) + + for is := 1; is < samplesPerSeries; is++ { + testutil.Ok(t, app.AddFast(ref, ts+int64(is), random.Float64())) + } + } + testutil.Ok(t, app.Commit()) + + // Use TSDB and get all series for assertion. + chks, err := h.Chunks() + testutil.Ok(t, err) + defer func() { testutil.Ok(t, chks.Close()) }() + + ir, err := h.Index() + testutil.Ok(t, err) + defer func() { testutil.Ok(t, ir.Close()) }() + + resps := make([]storepb.Series, series) + var ( + lset labels.Labels + chunkMetas []chunks.Meta + sBytes int + ) + + all := allPostings(t, ir) + for all.Next() { + testutil.Ok(t, ir.Series(all.At(), &lset, &chunkMetas)) + + i := 0 + sLset := storepb.PromLabelsToLabels(lset) + resps = append(resps, storepb.Series{ + Labels: sLset, + }) + + lBytes := 0 + for _, l := range sLset { + lBytes += l.Size() + } + sBytes = lBytes + + for { + c := chunkMetas[i] + i++ + + chBytes, err := chks.Chunk(c.Ref) + testutil.Ok(t, err) + + sBytes += len(chBytes.Bytes()) + + resps[len(resps)-1].Chunks = append(resps[len(resps)-1].Chunks, storepb.AggrChunk{ + MinTime: c.MinTime, + MaxTime: c.MaxTime, + Raw: &storepb.Chunk{Type: storepb.Chunk_XOR, Data: chBytes.Bytes()}, + }) + + // Compose many frames as remote read would do (so sidecar StoreAPI): 1048576 + if i >= len(chunkMetas) || sBytes >= 1048576 { + resps = append(resps, storepb.Series{ + Labels: sLset, + }) + sBytes = lBytes + } + if i >= len(chunkMetas) { + break + } + + } + } + testutil.Ok(t, all.Err()) + return h, resps +} + +// SeriesCase represents single test/benchmark case for testing storepb series. +type SeriesCase struct { + Name string + Req *storepb.SeriesRequest + + // Exact expectations are checked only for tests. For benchmarks only length is assured. + ExpectedSeries []storepb.Series + ExpectedWarnings []string + ExpectedHints []hintspb.SeriesResponseHints +} + +// TestServerSeries runs tests against given cases. +func TestServerSeries(t testutil.TB, store storepb.StoreServer, cases ...*SeriesCase) { + for _, c := range cases { + t.Run(c.Name, func(t testutil.TB) { + t.ResetTimer() + for i := 0; i < t.N(); i++ { + srv := NewSeriesServer(context.Background()) + testutil.Ok(t, store.Series(c.Req, srv)) + testutil.Equals(t, len(c.ExpectedWarnings), len(srv.Warnings)) + testutil.Equals(t, len(c.ExpectedSeries), len(srv.SeriesSet)) + testutil.Equals(t, len(c.ExpectedHints), len(srv.HintsSet)) + + if !t.IsBenchmark() { + if len(c.ExpectedSeries) == 1 { + // For bucketStoreAPI chunks are not sorted within response. TODO: Investigate: Is this fine? + sort.Slice(srv.SeriesSet[0].Chunks, func(i, j int) bool { + return srv.SeriesSet[0].Chunks[i].MinTime < srv.SeriesSet[0].Chunks[j].MinTime + }) + } + // This might give unreadable output for millions of series if error. + testutil.Equals(t, c.ExpectedSeries, srv.SeriesSet) + + var actualHints []hintspb.SeriesResponseHints + for _, anyHints := range srv.HintsSet { + hints := hintspb.SeriesResponseHints{} + testutil.Ok(t, types.UnmarshalAny(anyHints, &hints)) + actualHints = append(actualHints, hints) + } + testutil.Equals(t, c.ExpectedHints, actualHints) + } + } + }) + } +} + +// SeriesServer is test gRPC storeAPI series server. +type SeriesServer struct { + // This field just exist to pseudo-implement the unused methods of the interface. + storepb.Store_SeriesServer + + ctx context.Context + + SeriesSet []storepb.Series + Warnings []string + HintsSet []*types.Any + + Size int64 +} + +func NewSeriesServer(ctx context.Context) *SeriesServer { + return &SeriesServer{ctx: ctx} +} + +func (s *SeriesServer) Send(r *storepb.SeriesResponse) error { + s.Size += int64(r.Size()) + + if r.GetWarning() != "" { + s.Warnings = append(s.Warnings, r.GetWarning()) + return nil + } + + if r.GetSeries() != nil { + s.SeriesSet = append(s.SeriesSet, *r.GetSeries()) + return nil + } + + if r.GetHints() != nil { + s.HintsSet = append(s.HintsSet, r.GetHints()) + return nil + } + // Unsupported field, skip. + return nil +} + +func (s *SeriesServer) Context() context.Context { + return s.ctx +} + +func RunSeriesInterestingCases(t testutil.TB, maxSamples, maxSeries int, f func(t testutil.TB, samplesPerSeries, series int)) { + for _, tc := range []struct { + samplesPerSeries int + series int + }{ + { + samplesPerSeries: 1, + series: maxSeries, + }, + { + samplesPerSeries: maxSamples / 2, + series: maxSeries / 2, + }, + { + samplesPerSeries: maxSamples, + series: 1, + }, + } { + t.Run(fmt.Sprintf("%dSeriesWith%dSamples", tc.series, tc.samplesPerSeries), func(t testutil.TB) { + runtime.GC() + f(t, tc.samplesPerSeries, tc.series) + }) + } +}