diff --git a/go.mod b/go.mod index 7c929e58ad..4a6cb407a4 100644 --- a/go.mod +++ b/go.mod @@ -72,7 +72,8 @@ require ( // See https://github.com/thanos-io/thanos/issues/1415 replace ( // Make sure Cortex is not forcing us to some other Prometheus version. - github.com/prometheus/prometheus => github.com/prometheus/prometheus v1.8.2-0.20200707115909-30505a202a4c + // TODO: This points to https://github.com/prometheus/prometheus/pull/7069 fix once merged. + github.com/prometheus/prometheus => github.com/prometheus/prometheus v1.8.2-0.20200710110332-22d52d35242f k8s.io/klog => k8s.io/klog v0.3.1 k8s.io/kube-openapi => k8s.io/kube-openapi v0.0.0-20190228160746-b3a7cee44a30 ) diff --git a/go.sum b/go.sum index 480ceaeb2f..7fae3e63c2 100644 --- a/go.sum +++ b/go.sum @@ -858,8 +858,8 @@ github.com/prometheus/procfs v0.0.6/go.mod h1:7Qr8sr6344vo1JqZ6HhLceV9o3AJ1Ff+Gx github.com/prometheus/procfs v0.0.8/go.mod h1:7Qr8sr6344vo1JqZ6HhLceV9o3AJ1Ff+GxbHq6oeK9A= github.com/prometheus/procfs v0.1.3 h1:F0+tqvhOksq22sc6iCHF5WGlWjdwj92p0udFh1VFBS8= github.com/prometheus/procfs v0.1.3/go.mod h1:lV6e/gmhEcM9IjHGsFOCxxuZ+z1YqCvr4OA4YeYWdaU= -github.com/prometheus/prometheus v1.8.2-0.20200707115909-30505a202a4c h1:Iz2q3wgo4xiURb7Ku0MCrM7osAVHX03lF1vHNht1fb8= -github.com/prometheus/prometheus v1.8.2-0.20200707115909-30505a202a4c/go.mod h1:/kMSPIRsxr/apyHxlzYMdFnaPXUXXqILU5uzIoNhOvc= +github.com/prometheus/prometheus v1.8.2-0.20200710110332-22d52d35242f h1:svi/ASwrWXXAqW/KwJHB8YOnyD4Gd1q//l860mqVqt0= +github.com/prometheus/prometheus v1.8.2-0.20200710110332-22d52d35242f/go.mod h1:/kMSPIRsxr/apyHxlzYMdFnaPXUXXqILU5uzIoNhOvc= github.com/rafaeljusto/redigomock v0.0.0-20190202135759-257e089e14a1/go.mod h1:JaY6n2sDr+z2WTsXkOmNRUfDy6FN0L6Nk7x06ndm4tY= github.com/rcrowley/go-metrics v0.0.0-20181016184325-3113b8401b8a/go.mod h1:bCqnVzQkZxMG4s8nGwiZ5l3QUCyqpo9Y+/ZMZ9VjZe4= github.com/retailnext/hllpp v1.0.1-0.20180308014038-101a6d2f8b52/go.mod h1:RDpi1RftBQPUCDRw6SmxeaREsAaRKnOclghuzp/WRzc= diff --git a/pkg/store/tsdb.go b/pkg/store/tsdb.go index ff229fb323..0e3c05f0ed 100644 --- a/pkg/store/tsdb.go +++ b/pkg/store/tsdb.go @@ -13,7 +13,6 @@ import ( "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/prometheus/pkg/labels" "github.com/prometheus/prometheus/storage" - "github.com/prometheus/prometheus/tsdb/chunkenc" "google.golang.org/grpc/codes" "google.golang.org/grpc/status" @@ -24,7 +23,7 @@ import ( ) type TSDBReader interface { - storage.Queryable + storage.ChunkQueryable StartTime() (int64, error) } @@ -32,10 +31,11 @@ type TSDBReader interface { // It attaches the provided external labels to all results. It only responds with raw data // and does not support downsampling. type TSDBStore struct { - logger log.Logger - db TSDBReader - component component.StoreAPI - externalLabels labels.Labels + logger log.Logger + db TSDBReader + component component.StoreAPI + externalLabels labels.Labels + maxBytesPerFrame int } // ReadWriteTSDBStore is a TSDBStore that can also be written to. @@ -50,10 +50,11 @@ func NewTSDBStore(logger log.Logger, _ prometheus.Registerer, db TSDBReader, com logger = log.NewNopLogger() } return &TSDBStore{ - logger: logger, - db: db, - component: component, - externalLabels: externalLabels, + logger: logger, + db: db, + component: component, + externalLabels: externalLabels, + maxBytesPerFrame: 1024 * 1024, // 1MB as recommended by gRPC. } } @@ -109,7 +110,7 @@ func (s *TSDBStore) Series(r *storepb.SeriesRequest, srv storepb.Store_SeriesSer return status.Error(codes.InvalidArgument, err.Error()) } - q, err := s.db.Querier(context.Background(), r.MinTime, r.MaxTime) + q, err := s.db.ChunkQuerier(context.Background(), r.MinTime, r.MaxTime) if err != nil { return status.Error(codes.Internal, err.Error()) } @@ -119,72 +120,67 @@ func (s *TSDBStore) Series(r *storepb.SeriesRequest, srv storepb.Store_SeriesSer set = q.Select(false, nil, matchers...) respSeries storepb.Series ) + + // Stream at most one series per frame; series may be split over multiple frames according to maxBytesInFrame. for set.Next() { series := set.At() - respSeries.Labels = s.translateAndExtendLabels(series.Labels(), s.externalLabels) - - if !r.SkipChunks { - // TODO(fabxc): An improvement over this trivial approach would be to directly - // use the chunks provided by TSDB in the response. - c, err := s.encodeChunks(series.Iterator(), maxSamplesPerChunk) - if err != nil { - return status.Errorf(codes.Internal, "encode chunk: %s", err) + respSeries.Chunks = respSeries.Chunks[:0] + if r.SkipChunks { + if err := srv.Send(storepb.NewSeriesResponse(&respSeries)); err != nil { + return status.Error(codes.Aborted, err.Error()) } - - respSeries.Chunks = append(respSeries.Chunks[:0], c...) + continue } - if err := srv.Send(storepb.NewSeriesResponse(&respSeries)); err != nil { - return status.Error(codes.Aborted, err.Error()) + frameBytesLeft := s.maxBytesPerFrame + for _, lbl := range respSeries.Labels { + frameBytesLeft -= lbl.Size() } - } - if err := set.Err(); err != nil { - return status.Error(codes.Internal, err.Error()) - } - return nil -} - -func (s *TSDBStore) encodeChunks(it chunkenc.Iterator, maxSamplesPerChunk int) (chks []storepb.AggrChunk, err error) { - var ( - chkMint int64 - chk *chunkenc.XORChunk - app chunkenc.Appender - isNext = it.Next() - ) - for isNext { - if chk == nil { - chk = chunkenc.NewXORChunk() - app, err = chk.Appender() - if err != nil { - return nil, err + chIter := series.Iterator() + isNext := chIter.Next() + for isNext { + chk := chIter.At() + if chk.Chunk == nil { + return status.Errorf(codes.Internal, "TSDBStore: found not populated chunk returned by SeriesSet at ref: %v", chk.Ref) } - chkMint, _ = it.At() - } - app.Append(it.At()) - chkMaxt, _ := it.At() + respSeries.Chunks = append(respSeries.Chunks, storepb.AggrChunk{ + MinTime: chk.MinTime, + MaxTime: chk.MaxTime, + Raw: &storepb.Chunk{ + Type: storepb.Chunk_Encoding(chk.Chunk.Encoding() - 1), // proto chunk encoding is one off to TSDB one. + Data: chk.Chunk.Bytes(), + }, + }) + frameBytesLeft -= respSeries.Chunks[len(respSeries.Chunks)-1].Size() + + // We are fine with minor inaccuracy of max bytes per frame. The inaccuracy will be max of full chunk size. + isNext = chIter.Next() + if frameBytesLeft > 0 && isNext { + continue + } - isNext = it.Next() - if isNext && chk.NumSamples() < maxSamplesPerChunk { - continue + if err := srv.Send(storepb.NewSeriesResponse(&respSeries)); err != nil { + return status.Error(codes.Aborted, err.Error()) + } + respSeries.Chunks = respSeries.Chunks[:0] + } + if err := chIter.Err(); err != nil { + return status.Error(codes.Internal, errors.Wrap(err, "chunk iter").Error()) } - // Cut the chunk. - chks = append(chks, storepb.AggrChunk{ - MinTime: chkMint, - MaxTime: chkMaxt, - Raw: &storepb.Chunk{Type: storepb.Chunk_XOR, Data: chk.Bytes()}, - }) - chk = nil } - if it.Err() != nil { - return nil, errors.Wrap(it.Err(), "read TSDB series") + if err := set.Err(); err != nil { + return status.Error(codes.Internal, err.Error()) + } + for _, w := range set.Warnings() { + if err := srv.Send(storepb.NewWarnSeriesResponse(w)); err != nil { + return status.Error(codes.Aborted, err.Error()) + } } - - return chks, nil - + return nil } // translateAndExtendLabels transforms a metrics into a protobuf label set. It additionally @@ -217,7 +213,7 @@ func (s *TSDBStore) translateAndExtendLabels(m, extend labels.Labels) []storepb. func (s *TSDBStore) LabelNames(ctx context.Context, _ *storepb.LabelNamesRequest) ( *storepb.LabelNamesResponse, error, ) { - q, err := s.db.Querier(ctx, math.MinInt64, math.MaxInt64) + q, err := s.db.ChunkQuerier(ctx, math.MinInt64, math.MaxInt64) if err != nil { return nil, status.Error(codes.Internal, err.Error()) } @@ -234,7 +230,7 @@ func (s *TSDBStore) LabelNames(ctx context.Context, _ *storepb.LabelNamesRequest func (s *TSDBStore) LabelValues(ctx context.Context, r *storepb.LabelValuesRequest) ( *storepb.LabelValuesResponse, error, ) { - q, err := s.db.Querier(ctx, math.MinInt64, math.MaxInt64) + q, err := s.db.ChunkQuerier(ctx, math.MinInt64, math.MaxInt64) if err != nil { return nil, status.Error(codes.Internal, err.Error()) }