diff --git a/CHANGELOG.md b/CHANGELOG.md index 0f66d71b3fc..bf88333f66f 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -32,6 +32,8 @@ We use *breaking* word for marking changes that are not backward compatible (rel - [#2893](https://github.com/thanos-io/thanos/pull/2893) Store: Rename metric `thanos_bucket_store_cached_postings_compression_time_seconds` to `thanos_bucket_store_cached_postings_compression_time_seconds_total`. - [#2915](https://github.com/thanos-io/thanos/pull/2915) Receive,Ruler: Enable TSDB directory locking by default. Add a new flag (`--tsdb.no-lockfile`) to override behavior. +- [#2876](https://github.com/thanos-io/thanos/pull/2876) Receive,Ruler: Updated TSDB and switched to ChunkIterators instead of sample one, which avoids +unnecessary decoding / encoding. ## [v0.14.0](https://github.com/thanos-io/thanos/releases/tag/v0.14.0) - 2020.07.10 diff --git a/cmd/thanos/query.go b/cmd/thanos/query.go index 8085549d5bc..71e2d7be44d 100644 --- a/cmd/thanos/query.go +++ b/cmd/thanos/query.go @@ -150,8 +150,6 @@ func registerQuery(m map[string]setupFunc, app *kingpin.Application) { level.Warn(logger).Log("msg", "different values for --web.route-prefix and --web.external-prefix detected, web UI may not work without a reverse-proxy.") } - promql.SetDefaultEvaluationInterval(time.Duration(*defaultEvaluationInterval)) - flagsMap := map[string]string{} // Exclude kingpin default flags to expose only Thanos ones. @@ -204,10 +202,15 @@ func registerQuery(m map[string]setupFunc, app *kingpin.Application) { time.Duration(*instantDefaultMaxSourceResolution), *strictStores, component.Query, + time.Duration(*defaultEvaluationInterval), ) } } +func durationToInt64Millis(d time.Duration) int64 { + return int64(d / time.Millisecond) +} + // runQuery starts a server that exposes PromQL Query API. It is responsible for querying configured // store nodes, merging and duplicating the data to satisfy user query. func runQuery( @@ -249,6 +252,7 @@ func runQuery( instantDefaultMaxSourceResolution time.Duration, strictStores []string, comp component.Component, + defaultEvaluationInterval time.Duration, ) error { // TODO(bplotka in PR #513 review): Move arguments into struct. duplicatedStores := promauto.With(reg).NewCounter(prometheus.CounterOpts{ @@ -319,6 +323,9 @@ func runQuery( // TODO(bwplotka): Expose this as a flag: https://github.com/thanos-io/thanos/issues/703. MaxSamples: math.MaxInt32, Timeout: queryTimeout, + NoStepSubqueryIntervalFn: func(rangeMillis int64) int64 { + return durationToInt64Millis(defaultEvaluationInterval) + }, }, ) ) diff --git a/go.mod b/go.mod index 398b2adb878..4dbebbc91f3 100644 --- a/go.mod +++ b/go.mod @@ -73,7 +73,8 @@ require ( // See https://github.com/thanos-io/thanos/issues/1415 replace ( // Make sure Cortex is not forcing us to some other Prometheus version. - github.com/prometheus/prometheus => github.com/prometheus/prometheus v1.8.2-0.20200714083622-823b218e1b2e + // TODO: This points to https://github.com/prometheus/prometheus/pull/7069. Remove and point to master once merged. + github.com/prometheus/prometheus => github.com/prometheus/prometheus v1.8.2-0.20200724113653-2605f60545cb k8s.io/klog => k8s.io/klog v0.3.1 k8s.io/kube-openapi => k8s.io/kube-openapi v0.0.0-20190228160746-b3a7cee44a30 ) diff --git a/go.sum b/go.sum index 668f6932bbc..436538f9cc7 100644 --- a/go.sum +++ b/go.sum @@ -876,8 +876,8 @@ github.com/prometheus/procfs v0.0.8/go.mod h1:7Qr8sr6344vo1JqZ6HhLceV9o3AJ1Ff+Gx github.com/prometheus/procfs v0.0.11/go.mod h1:lV6e/gmhEcM9IjHGsFOCxxuZ+z1YqCvr4OA4YeYWdaU= github.com/prometheus/procfs v0.1.3 h1:F0+tqvhOksq22sc6iCHF5WGlWjdwj92p0udFh1VFBS8= github.com/prometheus/procfs v0.1.3/go.mod h1:lV6e/gmhEcM9IjHGsFOCxxuZ+z1YqCvr4OA4YeYWdaU= -github.com/prometheus/prometheus v1.8.2-0.20200714083622-823b218e1b2e h1:NCV6Sz7qguIRsvG6Aii4wDwWeiUPadaWT/4l4eP7Ax0= -github.com/prometheus/prometheus v1.8.2-0.20200714083622-823b218e1b2e/go.mod h1:F3OdzfA9PNvJ0PxQwHL58k9zOhOLhtcIAOtVqwyYxwk= +github.com/prometheus/prometheus v1.8.2-0.20200724113653-2605f60545cb h1:P2x7BW33WySguYW4ZHG5HaKRI8yOiksDfH56ucTHbwk= +github.com/prometheus/prometheus v1.8.2-0.20200724113653-2605f60545cb/go.mod h1:+/y4DzJ62qmhy0o/H4PtXegRXw+80E8RVRHhLbv+bkM= github.com/rafaeljusto/redigomock v0.0.0-20190202135759-257e089e14a1/go.mod h1:JaY6n2sDr+z2WTsXkOmNRUfDy6FN0L6Nk7x06ndm4tY= github.com/rcrowley/go-metrics v0.0.0-20181016184325-3113b8401b8a/go.mod h1:bCqnVzQkZxMG4s8nGwiZ5l3QUCyqpo9Y+/ZMZ9VjZe4= github.com/retailnext/hllpp v1.0.1-0.20180308014038-101a6d2f8b52/go.mod h1:RDpi1RftBQPUCDRw6SmxeaREsAaRKnOclghuzp/WRzc= @@ -1020,6 +1020,8 @@ go.uber.org/atomic v1.6.0 h1:Ezj3JGmsOnG1MoRWQkPBsKLe9DwWD9QeXzTRzzldNVk= go.uber.org/atomic v1.6.0/go.mod h1:sABNBOSYdrvTF6hTgEIbc7YasKWGhgEQZyfxyTvoXHQ= go.uber.org/automaxprocs v1.2.0 h1:+RUihKM+nmYUoB9w0D0Ov5TJ2PpFO2FgenTxMJiZBZA= go.uber.org/automaxprocs v1.2.0/go.mod h1:YfO3fm683kQpzETxlTGZhGIVmXAhaw3gxeBADbpZtnU= +go.uber.org/goleak v1.0.0 h1:qsup4IcBdlmsnGfqyLl4Ntn3C2XCCuKAE7DwHpScyUo= +go.uber.org/goleak v1.0.0/go.mod h1:8a7PlsEVH3e/a/GLqe5IIrQx6GzcnRmZEufDUTk4A7A= go.uber.org/multierr v1.1.0/go.mod h1:wR5kodmAFQ0UK8QlbwjlSNy0Z68gJhDJUG5sjR94q/0= go.uber.org/multierr v1.3.0/go.mod h1:VgVr7evmIr6uPjLBxg28wmKNXyqE9akIJ5XnfpiKl+4= go.uber.org/tools v0.0.0-20190618225709-2cfd321de3ee/go.mod h1:vJERXedbb3MVM5f9Ejo0C68/HhF8uaILCdgjnY+goOA= @@ -1251,6 +1253,7 @@ golang.org/x/tools v0.0.0-20190911174233-4f2ddba30aff/go.mod h1:b+2E5dAYhXwXZwtn golang.org/x/tools v0.0.0-20191012152004-8de300cfc20a/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo= golang.org/x/tools v0.0.0-20191029041327-9cc4af7d6b2c/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo= golang.org/x/tools v0.0.0-20191029190741-b9c20aec41a5/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo= +golang.org/x/tools v0.0.0-20191108193012-7d206e10da11/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo= golang.org/x/tools v0.0.0-20191111182352-50fa39b762bc/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo= golang.org/x/tools v0.0.0-20191113191852-77e3bb0ad9e7/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo= golang.org/x/tools v0.0.0-20191115202509-3a792d9c32b2/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo= @@ -1334,7 +1337,6 @@ google.golang.org/genproto v0.0.0-20200224152610-e50cd9704f63/go.mod h1:55QSHmfG google.golang.org/genproto v0.0.0-20200331122359-1ee6d9798940/go.mod h1:55QSHmfGQM9UVYDPBsyGGes0y52j32PQ3BqQfXhyH3c= google.golang.org/genproto v0.0.0-20200513103714-09dca8ec2884/go.mod h1:55QSHmfGQM9UVYDPBsyGGes0y52j32PQ3BqQfXhyH3c= google.golang.org/genproto v0.0.0-20200526211855-cb27e3aa2013/go.mod h1:NbSheEEYHJ7i3ixzK3sjbqSGDJWnxyFXZblF3eUsNvo= -google.golang.org/genproto v0.0.0-20200624020401-64a14ca9d1ad/go.mod h1:FWY/as6DDZQgahTzZj3fqbO1CbirC29ZNUFHwi0/+no= google.golang.org/genproto v0.0.0-20200710124503-20a17af7bd0e h1:k+p/u26/lVeNEpdxSeUrm7rTvoFckBKaf7gTzgmHyDA= google.golang.org/genproto v0.0.0-20200710124503-20a17af7bd0e/go.mod h1:FWY/as6DDZQgahTzZj3fqbO1CbirC29ZNUFHwi0/+no= google.golang.org/grpc v1.14.0/go.mod h1:yo6s7OP7yaDglbqo1J04qKzAhqBH6lvTonzMVmEdcZw= @@ -1357,7 +1359,6 @@ google.golang.org/grpc v1.29.1 h1:EC2SB8S04d2r73uptxphDSUG+kTKVgjRPF+N3xpxRB4= google.golang.org/grpc v1.29.1/go.mod h1:itym6AZVZYACWQqET3MqgPpjcuV5QH3BxFS3IjizoKk= google.golang.org/grpc v1.30.0 h1:M5a8xTlYTxwMn5ZFkwhRabsygDY5G8TYLyQDBxJNAxE= google.golang.org/grpc v1.30.0/go.mod h1:N36X2cJ7JwdamYAgDz+s+rVMFjt3numwzf/HckM8pak= -google.golang.org/grpc/examples v0.0.0-20200709232328-d8193ee9cc3e/go.mod h1:5j1uub0jRGhRiSghIlrThmBUgcgLXOVJQ/l1getT4uo= google.golang.org/protobuf v0.0.0-20200109180630-ec00e32a8dfd/go.mod h1:DFci5gLYBciE7Vtevhsrf46CRTquxDuWsQurQQe4oz8= google.golang.org/protobuf v0.0.0-20200221191635-4d8936d0db64/go.mod h1:kwYJMbMJ01Woi6D6+Kah6886xMZcty6N08ah7+eCXa0= google.golang.org/protobuf v0.0.0-20200228230310-ab0ca4ff8a60/go.mod h1:cfTl7dwQJ+fmap5saPgwCLgHXTUD7jkjRqWcaiX5VyM= diff --git a/pkg/store/tsdb.go b/pkg/store/tsdb.go index 70454a39f50..0e3c05f0edc 100644 --- a/pkg/store/tsdb.go +++ b/pkg/store/tsdb.go @@ -13,7 +13,6 @@ import ( "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/prometheus/pkg/labels" "github.com/prometheus/prometheus/storage" - "github.com/prometheus/prometheus/tsdb/chunkenc" "google.golang.org/grpc/codes" "google.golang.org/grpc/status" @@ -24,7 +23,7 @@ import ( ) type TSDBReader interface { - storage.Queryable + storage.ChunkQueryable StartTime() (int64, error) } @@ -32,10 +31,11 @@ type TSDBReader interface { // It attaches the provided external labels to all results. It only responds with raw data // and does not support downsampling. type TSDBStore struct { - logger log.Logger - db TSDBReader - component component.StoreAPI - externalLabels labels.Labels + logger log.Logger + db TSDBReader + component component.StoreAPI + externalLabels labels.Labels + maxBytesPerFrame int } // ReadWriteTSDBStore is a TSDBStore that can also be written to. @@ -50,10 +50,11 @@ func NewTSDBStore(logger log.Logger, _ prometheus.Registerer, db TSDBReader, com logger = log.NewNopLogger() } return &TSDBStore{ - logger: logger, - db: db, - component: component, - externalLabels: externalLabels, + logger: logger, + db: db, + component: component, + externalLabels: externalLabels, + maxBytesPerFrame: 1024 * 1024, // 1MB as recommended by gRPC. } } @@ -109,7 +110,7 @@ func (s *TSDBStore) Series(r *storepb.SeriesRequest, srv storepb.Store_SeriesSer return status.Error(codes.InvalidArgument, err.Error()) } - q, err := s.db.Querier(context.Background(), r.MinTime, r.MaxTime) + q, err := s.db.ChunkQuerier(context.Background(), r.MinTime, r.MaxTime) if err != nil { return status.Error(codes.Internal, err.Error()) } @@ -119,72 +120,67 @@ func (s *TSDBStore) Series(r *storepb.SeriesRequest, srv storepb.Store_SeriesSer set = q.Select(false, nil, matchers...) respSeries storepb.Series ) + + // Stream at most one series per frame; series may be split over multiple frames according to maxBytesInFrame. for set.Next() { series := set.At() - respSeries.Labels = s.translateAndExtendLabels(series.Labels(), s.externalLabels) - - if !r.SkipChunks { - // TODO(fabxc): An improvement over this trivial approach would be to directly - // use the chunks provided by TSDB in the response. - c, err := s.encodeChunks(series.Iterator(), MaxSamplesPerChunk) - if err != nil { - return status.Errorf(codes.Internal, "encode chunk: %s", err) + respSeries.Chunks = respSeries.Chunks[:0] + if r.SkipChunks { + if err := srv.Send(storepb.NewSeriesResponse(&respSeries)); err != nil { + return status.Error(codes.Aborted, err.Error()) } - - respSeries.Chunks = append(respSeries.Chunks[:0], c...) + continue } - if err := srv.Send(storepb.NewSeriesResponse(&respSeries)); err != nil { - return status.Error(codes.Aborted, err.Error()) + frameBytesLeft := s.maxBytesPerFrame + for _, lbl := range respSeries.Labels { + frameBytesLeft -= lbl.Size() } - } - if err := set.Err(); err != nil { - return status.Error(codes.Internal, err.Error()) - } - return nil -} - -func (s *TSDBStore) encodeChunks(it chunkenc.Iterator, maxSamplesPerChunk int) (chks []storepb.AggrChunk, err error) { - var ( - chkMint int64 - chk *chunkenc.XORChunk - app chunkenc.Appender - isNext = it.Next() - ) - for isNext { - if chk == nil { - chk = chunkenc.NewXORChunk() - app, err = chk.Appender() - if err != nil { - return nil, err + chIter := series.Iterator() + isNext := chIter.Next() + for isNext { + chk := chIter.At() + if chk.Chunk == nil { + return status.Errorf(codes.Internal, "TSDBStore: found not populated chunk returned by SeriesSet at ref: %v", chk.Ref) } - chkMint, _ = it.At() - } - app.Append(it.At()) - chkMaxt, _ := it.At() + respSeries.Chunks = append(respSeries.Chunks, storepb.AggrChunk{ + MinTime: chk.MinTime, + MaxTime: chk.MaxTime, + Raw: &storepb.Chunk{ + Type: storepb.Chunk_Encoding(chk.Chunk.Encoding() - 1), // proto chunk encoding is one off to TSDB one. + Data: chk.Chunk.Bytes(), + }, + }) + frameBytesLeft -= respSeries.Chunks[len(respSeries.Chunks)-1].Size() + + // We are fine with minor inaccuracy of max bytes per frame. The inaccuracy will be max of full chunk size. + isNext = chIter.Next() + if frameBytesLeft > 0 && isNext { + continue + } - isNext = it.Next() - if isNext && chk.NumSamples() < maxSamplesPerChunk { - continue + if err := srv.Send(storepb.NewSeriesResponse(&respSeries)); err != nil { + return status.Error(codes.Aborted, err.Error()) + } + respSeries.Chunks = respSeries.Chunks[:0] + } + if err := chIter.Err(); err != nil { + return status.Error(codes.Internal, errors.Wrap(err, "chunk iter").Error()) } - // Cut the chunk. - chks = append(chks, storepb.AggrChunk{ - MinTime: chkMint, - MaxTime: chkMaxt, - Raw: &storepb.Chunk{Type: storepb.Chunk_XOR, Data: chk.Bytes()}, - }) - chk = nil } - if it.Err() != nil { - return nil, errors.Wrap(it.Err(), "read TSDB series") + if err := set.Err(); err != nil { + return status.Error(codes.Internal, err.Error()) + } + for _, w := range set.Warnings() { + if err := srv.Send(storepb.NewWarnSeriesResponse(w)); err != nil { + return status.Error(codes.Aborted, err.Error()) + } } - - return chks, nil - + return nil } // translateAndExtendLabels transforms a metrics into a protobuf label set. It additionally @@ -217,7 +213,7 @@ func (s *TSDBStore) translateAndExtendLabels(m, extend labels.Labels) []storepb. func (s *TSDBStore) LabelNames(ctx context.Context, _ *storepb.LabelNamesRequest) ( *storepb.LabelNamesResponse, error, ) { - q, err := s.db.Querier(ctx, math.MinInt64, math.MaxInt64) + q, err := s.db.ChunkQuerier(ctx, math.MinInt64, math.MaxInt64) if err != nil { return nil, status.Error(codes.Internal, err.Error()) } @@ -234,7 +230,7 @@ func (s *TSDBStore) LabelNames(ctx context.Context, _ *storepb.LabelNamesRequest func (s *TSDBStore) LabelValues(ctx context.Context, r *storepb.LabelValuesRequest) ( *storepb.LabelValuesResponse, error, ) { - q, err := s.db.Querier(ctx, math.MinInt64, math.MaxInt64) + q, err := s.db.ChunkQuerier(ctx, math.MinInt64, math.MaxInt64) if err != nil { return nil, status.Error(codes.Internal, err.Error()) }