From c08f77558a483b5f61bb4916572f8c9a828f28a8 Mon Sep 17 00:00:00 2001 From: Bartlomiej Plotka Date: Fri, 10 Jul 2020 12:31:16 +0100 Subject: [PATCH] receive,ruler: Upgraded TSDB and used ChunkIterator in Series TSDB Store. Signed-off-by: Bartlomiej Plotka --- CHANGELOG.md | 1 + cmd/thanos/query.go | 1 - go.mod | 4 +- go.sum | 5 +- pkg/rules/manager.go | 2 +- pkg/rules/manager_test.go | 7 +- pkg/store/storepb/testutil/series.go | 2 +- pkg/store/tsdb.go | 124 +++++++++++++-------------- pkg/store/tsdb_test.go | 1 - 9 files changed, 70 insertions(+), 77 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index f1a16d64f2..caa81a2694 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -43,6 +43,7 @@ We use *breaking* word for marking changes that are not backward compatible (rel - [#2902](https://github.com/thanos-io/thanos/pull/2902) ui: React: Separate dedupe and partial response checkboxes per panel. - [#2931](https://github.com/thanos-io/thanos/pull/2931) Query: Allow passing a `storeMatcher[]` to select matching stores when debugging the querier. See [documentation](https://thanos.io/components/query.md/#store-filtering) - [#2991](https://github.com/thanos-io/thanos/pull/2991) store: `operation` label value `getrange` changed to `get_range` for `thanos_store_bucket_cache_operation_requests_total` and `thanos_store_bucket_cache_operation_hits_total` to be consistent with bucket operation metrics. +- [#2876](https://github.com/thanos-io/thanos/pull/2876) Receive,Ruler: Updated TSDB and switched to ChunkIterators instead of sample one, which avoids unnecessary decoding / encoding. ## [v0.14.0](https://github.com/thanos-io/thanos/releases/tag/v0.14.0) - 2020.07.10 diff --git a/cmd/thanos/query.go b/cmd/thanos/query.go index e29e39dce8..f967d8538c 100644 --- a/cmd/thanos/query.go +++ b/cmd/thanos/query.go @@ -151,7 +151,6 @@ func registerQuery(m map[string]setupFunc, app *kingpin.Application) { } flagsMap := getFlagsMap(cmd.Model().Flags) - return runQuery( g, logger, diff --git a/go.mod b/go.mod index 9c5694b1a7..8e2de29eed 100644 --- a/go.mod +++ b/go.mod @@ -74,8 +74,8 @@ require ( // so that we don't get errors about being incompatible with the Go proxies. // See https://github.com/thanos-io/thanos/issues/1415 replace ( - // Make sure Cortex is not forcing us to some other Prometheus version. - github.com/prometheus/prometheus => github.com/prometheus/prometheus v1.8.2-0.20200805082714-e0cf219f0de2 + // Make sure Prometheus version is pinned as Prometheus semver does not include Go APIs. + github.com/prometheus/prometheus => github.com/prometheus/prometheus v1.8.2-0.20200807135816-2899773b0159 google.golang.org/grpc => google.golang.org/grpc v1.29.1 k8s.io/klog => k8s.io/klog v0.3.1 k8s.io/kube-openapi => k8s.io/kube-openapi v0.0.0-20190228160746-b3a7cee44a30 diff --git a/go.sum b/go.sum index 39225afc56..f03cf19ff3 100644 --- a/go.sum +++ b/go.sum @@ -857,7 +857,6 @@ github.com/prometheus/common v0.6.0/go.mod h1:eBmuwkDJBwy6iBfxCBob6t6dR6ENT/y+J+ github.com/prometheus/common v0.7.0/go.mod h1:DjGbpBbp5NYNiECxcL/VnbXCCaQpKd3tt26CguLLsqA= github.com/prometheus/common v0.8.0/go.mod h1:PC/OgXc+UN7B4ALwvn1yzVZmVwvhXp5JsbBv6wSv6i0= github.com/prometheus/common v0.9.1/go.mod h1:yhUN8i9wzaXS3w1O07YhxHEBxD+W35wd8bs7vj7HSQ4= -github.com/prometheus/common v0.10.0 h1:RyRA7RzGXQZiW+tGMr7sxa85G1z0yOpM1qq5c8lNawc= github.com/prometheus/common v0.10.0/go.mod h1:Tlit/dnDKsSWFlCLTWaA1cyBgKHSMdTB80sz/V91rCo= github.com/prometheus/common v0.11.1 h1:0ZISXCMRuCZcxF77aT1BXY5m74mX2vrGYl1dSwBI0Jo= github.com/prometheus/common v0.11.1/go.mod h1:U+gB1OBLb1lF3O42bTCL+FK18tX9Oar16Clt/msog/s= @@ -876,8 +875,8 @@ github.com/prometheus/procfs v0.0.8/go.mod h1:7Qr8sr6344vo1JqZ6HhLceV9o3AJ1Ff+Gx github.com/prometheus/procfs v0.0.11/go.mod h1:lV6e/gmhEcM9IjHGsFOCxxuZ+z1YqCvr4OA4YeYWdaU= github.com/prometheus/procfs v0.1.3 h1:F0+tqvhOksq22sc6iCHF5WGlWjdwj92p0udFh1VFBS8= github.com/prometheus/procfs v0.1.3/go.mod h1:lV6e/gmhEcM9IjHGsFOCxxuZ+z1YqCvr4OA4YeYWdaU= -github.com/prometheus/prometheus v1.8.2-0.20200805082714-e0cf219f0de2 h1:eQ88y1vfbXuclr6B04jYTmhc6ydXlBUSIaXCjEs0osk= -github.com/prometheus/prometheus v1.8.2-0.20200805082714-e0cf219f0de2/go.mod h1:i1KZsZmyDTJRvnR7zE8z/u2v+tkpPjoiPpnWp6nwhr0= +github.com/prometheus/prometheus v1.8.2-0.20200807135816-2899773b0159 h1:mT66e//l/+QugUat5A42YvIPxlsS6O/yr9UtjlDhPYw= +github.com/prometheus/prometheus v1.8.2-0.20200807135816-2899773b0159/go.mod h1:zfAqy/MwhMFajB9E2n12/9gG2fvofIE9uKDtlZCDxqs= github.com/rafaeljusto/redigomock v0.0.0-20190202135759-257e089e14a1/go.mod h1:JaY6n2sDr+z2WTsXkOmNRUfDy6FN0L6Nk7x06ndm4tY= github.com/rcrowley/go-metrics v0.0.0-20181016184325-3113b8401b8a/go.mod h1:bCqnVzQkZxMG4s8nGwiZ5l3QUCyqpo9Y+/ZMZ9VjZe4= github.com/retailnext/hllpp v1.0.1-0.20180308014038-101a6d2f8b52/go.mod h1:RDpi1RftBQPUCDRw6SmxeaREsAaRKnOclghuzp/WRzc= diff --git a/pkg/rules/manager.go b/pkg/rules/manager.go index 5cde2f4ae7..b6c1f420a5 100644 --- a/pkg/rules/manager.go +++ b/pkg/rules/manager.go @@ -156,8 +156,8 @@ func (m *Manager) Stop() { mgr.Stop() } } - func (m *Manager) protoRuleGroups() []*rulespb.RuleGroup { + rg := m.RuleGroups() res := make([]*rulespb.RuleGroup, 0, len(rg)) for _, g := range rg { diff --git a/pkg/rules/manager_test.go b/pkg/rules/manager_test.go index a7aaaa927e..5cebfd2d90 100644 --- a/pkg/rules/manager_test.go +++ b/pkg/rules/manager_test.go @@ -44,7 +44,7 @@ func (n nopQueryable) Querier(_ context.Context, _, _ int64) (storage.Querier, e } // Regression test against https://github.com/thanos-io/thanos/issues/1779. -func TestRun(t *testing.T) { +func TestRun_Subqueries(t *testing.T) { dir, err := ioutil.TempDir("", "test_rule_run") testutil.Ok(t, err) defer func() { testutil.Ok(t, os.RemoveAll(dir)) }() @@ -84,17 +84,16 @@ groups: }, labels.FromStrings("replica", "1"), ) - testutil.Ok(t, thanosRuleMgr.Update(10*time.Second, []string{filepath.Join(dir, "rule.yaml")})) + testutil.Ok(t, thanosRuleMgr.Update(1*time.Second, []string{filepath.Join(dir, "rule.yaml")})) thanosRuleMgr.Run() defer thanosRuleMgr.Stop() select { - case <-time.After(2 * time.Minute): + case <-time.After(1 * time.Minute): t.Fatal("timeout while waiting on rule manager query evaluation") case <-queryDone: } - testutil.Equals(t, "rate(some_metric[1h:5m] offset 1d)", query) } diff --git a/pkg/store/storepb/testutil/series.go b/pkg/store/storepb/testutil/series.go index db96d3652e..90e11eee18 100644 --- a/pkg/store/storepb/testutil/series.go +++ b/pkg/store/storepb/testutil/series.go @@ -69,7 +69,7 @@ func CreateHeadWithSeries(t testing.TB, j int, opts HeadGenOptions) (*tsdb.Head, testutil.Ok(t, err) } - h, err := tsdb.NewHead(nil, nil, w, 10000000, tsdbDir, nil, tsdb.DefaultStripeSize, nil) + h, err := tsdb.NewHead(nil, nil, w, tsdb.DefaultBlockDuration, tsdbDir, nil, tsdb.DefaultStripeSize, nil) testutil.Ok(t, err) app := h.Appender(context.Background()) diff --git a/pkg/store/tsdb.go b/pkg/store/tsdb.go index ee7debc776..cbc5c7a5e1 100644 --- a/pkg/store/tsdb.go +++ b/pkg/store/tsdb.go @@ -13,7 +13,6 @@ import ( "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/prometheus/pkg/labels" "github.com/prometheus/prometheus/storage" - "github.com/prometheus/prometheus/tsdb/chunkenc" "google.golang.org/grpc/codes" "google.golang.org/grpc/status" @@ -24,7 +23,7 @@ import ( ) type TSDBReader interface { - storage.Queryable + storage.ChunkQueryable StartTime() (int64, error) } @@ -32,10 +31,11 @@ type TSDBReader interface { // It attaches the provided external labels to all results. It only responds with raw data // and does not support downsampling. type TSDBStore struct { - logger log.Logger - db TSDBReader - component component.StoreAPI - externalLabels labels.Labels + logger log.Logger + db TSDBReader + component component.StoreAPI + externalLabels labels.Labels + maxBytesPerFrame int } // ReadWriteTSDBStore is a TSDBStore that can also be written to. @@ -50,10 +50,11 @@ func NewTSDBStore(logger log.Logger, _ prometheus.Registerer, db TSDBReader, com logger = log.NewNopLogger() } return &TSDBStore{ - logger: logger, - db: db, - component: component, - externalLabels: externalLabels, + logger: logger, + db: db, + component: component, + externalLabels: externalLabels, + maxBytesPerFrame: 1024 * 1024, // 1MB as recommended by gRPC. } } @@ -109,7 +110,7 @@ func (s *TSDBStore) Series(r *storepb.SeriesRequest, srv storepb.Store_SeriesSer return status.Error(codes.InvalidArgument, err.Error()) } - q, err := s.db.Querier(context.Background(), r.MinTime, r.MaxTime) + q, err := s.db.ChunkQuerier(context.Background(), r.MinTime, r.MaxTime) if err != nil { return status.Error(codes.Internal, err.Error()) } @@ -119,72 +120,67 @@ func (s *TSDBStore) Series(r *storepb.SeriesRequest, srv storepb.Store_SeriesSer set = q.Select(false, nil, matchers...) respSeries storepb.Series ) + + // Stream at most one series per frame; series may be split over multiple frames according to maxBytesInFrame. for set.Next() { series := set.At() - respSeries.Labels = s.translateAndExtendLabels(series.Labels(), s.externalLabels) - - if !r.SkipChunks { - // TODO(fabxc): An improvement over this trivial approach would be to directly - // use the chunks provided by TSDB in the response. - c, err := s.encodeChunks(series.Iterator(), MaxSamplesPerChunk) - if err != nil { - return status.Errorf(codes.Internal, "encode chunk: %s", err) + respSeries.Chunks = respSeries.Chunks[:0] + if r.SkipChunks { + if err := srv.Send(storepb.NewSeriesResponse(&respSeries)); err != nil { + return status.Error(codes.Aborted, err.Error()) } - - respSeries.Chunks = append(respSeries.Chunks[:0], c...) + continue } - if err := srv.Send(storepb.NewSeriesResponse(&respSeries)); err != nil { - return status.Error(codes.Aborted, err.Error()) + frameBytesLeft := s.maxBytesPerFrame + for _, lbl := range respSeries.Labels { + frameBytesLeft -= lbl.Size() } - } - if err := set.Err(); err != nil { - return status.Error(codes.Internal, err.Error()) - } - return nil -} - -func (s *TSDBStore) encodeChunks(it chunkenc.Iterator, maxSamplesPerChunk int) (chks []storepb.AggrChunk, err error) { - var ( - chkMint int64 - chk *chunkenc.XORChunk - app chunkenc.Appender - isNext = it.Next() - ) - for isNext { - if chk == nil { - chk = chunkenc.NewXORChunk() - app, err = chk.Appender() - if err != nil { - return nil, err + chIter := series.Iterator() + isNext := chIter.Next() + for isNext { + chk := chIter.At() + if chk.Chunk == nil { + return status.Errorf(codes.Internal, "TSDBStore: found not populated chunk returned by SeriesSet at ref: %v", chk.Ref) } - chkMint, _ = it.At() - } - app.Append(it.At()) - chkMaxt, _ := it.At() + respSeries.Chunks = append(respSeries.Chunks, storepb.AggrChunk{ + MinTime: chk.MinTime, + MaxTime: chk.MaxTime, + Raw: &storepb.Chunk{ + Type: storepb.Chunk_Encoding(chk.Chunk.Encoding() - 1), // Proto chunk encoding is one off to TSDB one. + Data: chk.Chunk.Bytes(), + }, + }) + frameBytesLeft -= respSeries.Chunks[len(respSeries.Chunks)-1].Size() + + // We are fine with minor inaccuracy of max bytes per frame. The inaccuracy will be max of full chunk size. + isNext = chIter.Next() + if frameBytesLeft > 0 && isNext { + continue + } - isNext = it.Next() - if isNext && chk.NumSamples() < maxSamplesPerChunk { - continue + if err := srv.Send(storepb.NewSeriesResponse(&respSeries)); err != nil { + return status.Error(codes.Aborted, err.Error()) + } + respSeries.Chunks = respSeries.Chunks[:0] + } + if err := chIter.Err(); err != nil { + return status.Error(codes.Internal, errors.Wrap(err, "chunk iter").Error()) } - // Cut the chunk. - chks = append(chks, storepb.AggrChunk{ - MinTime: chkMint, - MaxTime: chkMaxt, - Raw: &storepb.Chunk{Type: storepb.Chunk_XOR, Data: chk.Bytes()}, - }) - chk = nil } - if it.Err() != nil { - return nil, errors.Wrap(it.Err(), "read TSDB series") + if err := set.Err(); err != nil { + return status.Error(codes.Internal, err.Error()) + } + for _, w := range set.Warnings() { + if err := srv.Send(storepb.NewWarnSeriesResponse(w)); err != nil { + return status.Error(codes.Aborted, err.Error()) + } } - - return chks, nil - + return nil } // translateAndExtendLabels transforms a metrics into a protobuf label set. It additionally @@ -217,7 +213,7 @@ func (s *TSDBStore) translateAndExtendLabels(m, extend labels.Labels) []storepb. func (s *TSDBStore) LabelNames(ctx context.Context, r *storepb.LabelNamesRequest) ( *storepb.LabelNamesResponse, error, ) { - q, err := s.db.Querier(ctx, r.Start, r.End) + q, err := s.db.ChunkQuerier(ctx, r.Start, r.End) if err != nil { return nil, status.Error(codes.Internal, err.Error()) } @@ -234,7 +230,7 @@ func (s *TSDBStore) LabelNames(ctx context.Context, r *storepb.LabelNamesRequest func (s *TSDBStore) LabelValues(ctx context.Context, r *storepb.LabelValuesRequest) ( *storepb.LabelValuesResponse, error, ) { - q, err := s.db.Querier(ctx, r.Start, r.End) + q, err := s.db.ChunkQuerier(ctx, r.Start, r.End) if err != nil { return nil, status.Error(codes.Internal, err.Error()) } diff --git a/pkg/store/tsdb_test.go b/pkg/store/tsdb_test.go index 42532c159c..468d9cec03 100644 --- a/pkg/store/tsdb_test.go +++ b/pkg/store/tsdb_test.go @@ -303,7 +303,6 @@ func TestTSDBStore_LabelValues(t *testing.T) { } tsdbStore := NewTSDBStore(nil, nil, db, component.Rule, labels.FromStrings("region", "eu-west")) - now := time.Now() head := db.Head() for _, tc := range []struct {