diff --git a/CHANGELOG.md b/CHANGELOG.md index 302653cf8f1..c6b4870b445 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -28,6 +28,7 @@ We use *breaking :warning:* to mark changes that are not backward compatible (re - [#5231](https://github.com/thanos-io/thanos/pull/5231) Tools: Bucket verify tool ignores blocks with deletion markers. - [#5244](https://github.com/thanos-io/thanos/pull/5244) Query: Promote negative offset and `@` modifier to stable features as per Prometheus [#10121](https://github.com/prometheus/prometheus/pull/10121). - [#5255](https://github.com/thanos-io/thanos/pull/5255) InfoAPI: Set store API unavailable when stores are not ready. +- [#5255](https://github.com/thanos-io/thanos/pull/5296) Query: Use k-way merging for the proxying logic. This ensures that network is used as efficiently as possible. Also, the proxying sub-system now uses much less resources (~30-50% less CPU usage, ~30-40% less RAM usage according to our benchmarks). We intend to add a series limiter on Thanos Query in the near future as now Query buffers series responses as fast as possible. ### Removed diff --git a/pkg/store/proxy.go b/pkg/store/proxy.go index 330cab85b6c..0b3e3daf535 100644 --- a/pkg/store/proxy.go +++ b/pkg/store/proxy.go @@ -14,8 +14,6 @@ import ( "github.com/go-kit/log" "github.com/go-kit/log/level" - grpc_opentracing "github.com/grpc-ecosystem/go-grpc-middleware/v2/interceptors/tracing" - "github.com/opentracing/opentracing-go" "github.com/pkg/errors" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promauto" @@ -26,10 +24,14 @@ import ( "google.golang.org/grpc/status" "github.com/thanos-io/thanos/pkg/component" + "github.com/thanos-io/thanos/pkg/errutil" "github.com/thanos-io/thanos/pkg/store/labelpb" "github.com/thanos-io/thanos/pkg/store/storepb" "github.com/thanos-io/thanos/pkg/strutil" "github.com/thanos-io/thanos/pkg/tracing" + + grpc_opentracing "github.com/grpc-ecosystem/go-grpc-middleware/v2/interceptors/tracing" + "github.com/opentracing/opentracing-go" ) type ctxKey int @@ -236,8 +238,21 @@ func (s cancelableRespSender) send(r *storepb.SeriesResponse) { } } -// Series returns all series for a requested time range and label matcher. Requested series are taken from other -// stores and proxied to RPC client. NOTE: Resulted data are not trimmed exactly to min and max time range. +type recvResponse struct { + r *storepb.SeriesResponse + err error +} + +func frameCtx(responseTimeout time.Duration) (context.Context, context.CancelFunc) { + frameTimeoutCtx := context.Background() + var cancel context.CancelFunc + if responseTimeout != 0 { + frameTimeoutCtx, cancel = context.WithTimeout(frameTimeoutCtx, responseTimeout) + return frameTimeoutCtx, cancel + } + return frameTimeoutCtx, func() {} +} + func (s *ProxyStore) Series(r *storepb.SeriesRequest, srv storepb.Store_SeriesServer) error { // TODO(bwplotka): This should be part of request logger, otherwise it does not make much sense. Also, could be // tiggered by tracing span to reduce cognitive load. @@ -255,305 +270,213 @@ func (s *ProxyStore) Series(r *storepb.SeriesRequest, srv storepb.Store_SeriesSe } storeMatchers, _ := storepb.PromMatchersToMatchers(matchers...) // Error would be returned by matchesExternalLabels, so skip check. - g, gctx := errgroup.WithContext(srv.Context()) - - // Allow to buffer max 10 series response. - // Each might be quite large (multi chunk long series given by sidecar). - respSender, respCh := newCancelableRespChannel(gctx, 10) - g.Go(func() error { - // This go routine is responsible for calling store's Series concurrently. Merged results - // are passed to respCh and sent concurrently to client (if buffer of 10 have room). - // When this go routine finishes or is canceled, respCh channel is closed. - - var ( - seriesSet []storepb.SeriesSet - storeDebugMsgs []string - r = &storepb.SeriesRequest{ - MinTime: r.MinTime, - MaxTime: r.MaxTime, - Matchers: storeMatchers, - Aggregates: r.Aggregates, - MaxResolutionWindow: r.MaxResolutionWindow, - SkipChunks: r.SkipChunks, - QueryHints: r.QueryHints, - PartialResponseDisabled: r.PartialResponseDisabled, - } - wg = &sync.WaitGroup{} - ) - - defer func() { - wg.Wait() - close(respCh) - }() - - for _, st := range s.stores() { - // We might be able to skip the store if its meta information indicates it cannot have series matching our query. - if ok, reason := storeMatches(gctx, st, r.MinTime, r.MaxTime, matchers...); !ok { - storeDebugMsgs = append(storeDebugMsgs, fmt.Sprintf("store %s filtered out: %v", st, reason)) - continue - } + wg := &sync.WaitGroup{} - storeDebugMsgs = append(storeDebugMsgs, fmt.Sprintf("Store %s queried", st)) + storeDebugMsgs := []string{} + actualRequest := &storepb.SeriesRequest{ + MinTime: r.MinTime, + MaxTime: r.MaxTime, + Matchers: storeMatchers, + Aggregates: r.Aggregates, + MaxResolutionWindow: r.MaxResolutionWindow, + SkipChunks: r.SkipChunks, + QueryHints: r.QueryHints, + PartialResponseDisabled: r.PartialResponseDisabled, + PartialResponseStrategy: r.PartialResponseStrategy, + } - // This is used to cancel this stream when one operation takes too long. - seriesCtx, closeSeries := context.WithCancel(gctx) - seriesCtx = grpc_opentracing.ClientAddContextTags(seriesCtx, opentracing.Tags{ - "target": st.Addr(), - }) - defer closeSeries() + stores := []Client{} + for _, st := range s.stores() { + // We might be able to skip the store if its meta information indicates it cannot have series matching our query. + if ok, reason := storeMatches(srv.Context(), st, r.MinTime, r.MaxTime, matchers...); !ok { + storeDebugMsgs = append(storeDebugMsgs, fmt.Sprintf("store %s filtered out: %v", st, reason)) + continue + } + + stores = append(stores, st) + } + + wg.Add(len(stores)) + + var ( + errs = errutil.MultiError{} + errMtx sync.Mutex + storeResponses = make([]struct { + responses []*storepb.SeriesResponse + }, len(stores)) + ) + + for i, st := range stores { + storeDebugMsgs = append(storeDebugMsgs, fmt.Sprintf("Store %s queried", st)) + + go func(i int, st Client) { + defer wg.Done() storeID := labelpb.PromLabelSetsToString(st.LabelSets()) if storeID == "" { storeID = "Store Gateway" } + + seriesCtx, closeSeries := context.WithCancel(srv.Context()) + seriesCtx = grpc_opentracing.ClientAddContextTags(seriesCtx, opentracing.Tags{ + "target": st.Addr(), + }) + defer closeSeries() + span, seriesCtx := tracing.StartSpan(seriesCtx, "proxy.series", tracing.Tags{ "store.id": storeID, "store.addr": st.Addr(), }) - - sc, err := st.Series(seriesCtx, r) + cl, err := st.Series(seriesCtx, actualRequest) if err != nil { err = errors.Wrapf(err, "fetch series for %s %s", storeID, st) + errMtx.Lock() + errs.Add(err) + errMtx.Unlock() + span.SetTag("err", err.Error()) span.Finish() - if r.PartialResponseDisabled { - level.Error(reqLogger).Log("err", err, "msg", "partial response disabled; aborting request") - return err - } - respSender.send(storepb.NewWarnSeriesResponse(err)) - continue + return } - // Schedule streamSeriesSet that translates gRPC streamed response - // into seriesSet (if series) or respCh if warnings. - seriesSet = append(seriesSet, startStreamSeriesSet(seriesCtx, reqLogger, span, closeSeries, - wg, sc, respSender, st.String(), !r.PartialResponseDisabled, s.responseTimeout, s.metrics.emptyStreamResponses)) - } - - level.Debug(reqLogger).Log("msg", "Series: started fanout streams", "status", strings.Join(storeDebugMsgs, ";")) - - if len(seriesSet) == 0 { - // This is indicates that configured StoreAPIs are not the ones end user expects. - err := errors.New("No StoreAPIs matched for this query") - level.Warn(reqLogger).Log("err", err, "stores", strings.Join(storeDebugMsgs, ";")) - respSender.send(storepb.NewWarnSeriesResponse(err)) - return nil - } - - // TODO(bwplotka): Currently we stream into big frames. Consider ensuring 1MB maximum. - // This however does not matter much when used with QueryAPI. Matters for federated Queries a lot. - // https://github.com/thanos-io/thanos/issues/2332 - // Series are not necessarily merged across themselves. - mergedSet := storepb.MergeSeriesSets(seriesSet...) - for mergedSet.Next() { - lset, chk := mergedSet.At() - respSender.send(storepb.NewSeriesResponse(&storepb.Series{Labels: labelpb.ZLabelsFromPromLabels(lset), Chunks: chk})) - } - return mergedSet.Err() - }) - g.Go(func() error { - // Go routine for gathering merged responses and sending them over to client. It stops when - // respCh channel is closed OR on error from client. - for resp := range respCh { - if err := srv.Send(resp); err != nil { - return status.Error(codes.Unknown, errors.Wrap(err, "send series response").Error()) - } - } - return nil - }) - if err := g.Wait(); err != nil { - // TODO(bwplotka): Replace with request logger. - level.Error(reqLogger).Log("err", err) - return err - } - return nil -} - -type directSender interface { - send(*storepb.SeriesResponse) -} - -// streamSeriesSet iterates over incoming stream of series. -// All errors are sent out of band via warning channel. -type streamSeriesSet struct { - ctx context.Context - logger log.Logger - - stream storepb.Store_SeriesClient - warnCh directSender + rCh := make(chan *recvResponse) + done := make(chan struct{}) + go func() { + for { + // TODO: we can buffer this in files to have infinitely + // scalable read path. + // TODO: this sorely needs some limit on the number of series + // per each request. + r, err := cl.Recv() + select { + case <-done: + close(rCh) + return + case rCh <- &recvResponse{r: r, err: err}: + } + } + }() + + // The `defer` only executed when function return, we do `defer cancel` in for loop, + // so make the loop body as a function, release timers created by context as early. + handleRecvResponse := func() (next bool) { + frameTimeoutCtx, cancel := frameCtx(s.responseTimeout) + defer cancel() + var rr *recvResponse + select { + case <-seriesCtx.Done(): + err := errors.Wrapf(seriesCtx.Err(), "failed to receive any data from %s", st.String()) - currSeries *storepb.Series - recvCh chan *storepb.Series + errMtx.Lock() + errs.Add(err) + errMtx.Unlock() - errMtx sync.Mutex - err error + span.SetTag("err", err.Error()) + span.Finish() + close(done) + return false + case <-frameTimeoutCtx.Done(): + err := errors.Wrapf(frameTimeoutCtx.Err(), "failed to receive any data in %v from %s", s.responseTimeout, st.String()) - name string - partialResponse bool + errMtx.Lock() + errs.Add(err) + errMtx.Unlock() - responseTimeout time.Duration - closeSeries context.CancelFunc -} + span.SetTag("err", err.Error()) + span.Finish() + close(done) + return false + case rr = <-rCh: + } -type recvResponse struct { - r *storepb.SeriesResponse - err error -} + if rr.err == io.EOF { + span.Finish() + close(done) + return false + } -func frameCtx(responseTimeout time.Duration) (context.Context, context.CancelFunc) { - frameTimeoutCtx := context.Background() - var cancel context.CancelFunc - if responseTimeout != 0 { - frameTimeoutCtx, cancel = context.WithTimeout(frameTimeoutCtx, responseTimeout) - return frameTimeoutCtx, cancel - } - return frameTimeoutCtx, func() {} -} + if rr.err != nil { + err := errors.Wrapf(rr.err, "receive series from %s", st.String()) + errMtx.Lock() + errs.Add(err) + errMtx.Unlock() -func startStreamSeriesSet( - ctx context.Context, - logger log.Logger, - span tracing.Span, - closeSeries context.CancelFunc, - wg *sync.WaitGroup, - stream storepb.Store_SeriesClient, - warnCh directSender, - name string, - partialResponse bool, - responseTimeout time.Duration, - emptyStreamResponses prometheus.Counter, -) *streamSeriesSet { - s := &streamSeriesSet{ - ctx: ctx, - logger: logger, - closeSeries: closeSeries, - stream: stream, - warnCh: warnCh, - recvCh: make(chan *storepb.Series, 10), - name: name, - partialResponse: partialResponse, - responseTimeout: responseTimeout, - } + span.SetTag("err", err.Error()) + span.Finish() + close(done) + return false + } - wg.Add(1) - go func() { - seriesStats := &storepb.SeriesStatsCounter{} - bytesProcessed := 0 - - defer func() { - span.SetTag("processed.series", seriesStats.Series) - span.SetTag("processed.chunks", seriesStats.Chunks) - span.SetTag("processed.samples", seriesStats.Samples) - span.SetTag("processed.bytes", bytesProcessed) - span.Finish() - close(s.recvCh) - wg.Done() - }() - - numResponses := 0 - defer func() { - if numResponses == 0 { - emptyStreamResponses.Inc() + storeResponses[i].responses = append(storeResponses[i].responses, rr.r) + return true } - }() - rCh := make(chan *recvResponse) - done := make(chan struct{}) - go func() { for { - r, err := s.stream.Recv() - select { - case <-done: - close(rCh) + if !handleRecvResponse() { return - case rCh <- &recvResponse{r: r, err: err}: } } - }() - // The `defer` only executed when function return, we do `defer cancel` in for loop, - // so make the loop body as a function, release timers created by context as early. - handleRecvResponse := func() (next bool) { - frameTimeoutCtx, cancel := frameCtx(s.responseTimeout) - defer cancel() - var rr *recvResponse - select { - case <-ctx.Done(): - s.handleErr(errors.Wrapf(ctx.Err(), "failed to receive any data from %s", s.name), done) - return false - case <-frameTimeoutCtx.Done(): - s.handleErr(errors.Wrapf(frameTimeoutCtx.Err(), "failed to receive any data in %s from %s", s.responseTimeout.String(), s.name), done) - return false - case rr = <-rCh: - } + }(i, st) + } - if rr.err == io.EOF { - close(done) - return false - } + level.Debug(reqLogger).Log("msg", "Series: started fanout streams", "status", strings.Join(storeDebugMsgs, ";")) - if rr.err != nil { - s.handleErr(errors.Wrapf(rr.err, "receive series from %s", s.name), done) - return false - } - numResponses++ - bytesProcessed += rr.r.Size() + wg.Wait() - if w := rr.r.GetWarning(); w != "" { - s.warnCh.send(storepb.NewWarnSeriesResponse(errors.New(w))) - } + if (actualRequest.PartialResponseStrategy == storepb.PartialResponseStrategy_ABORT || actualRequest.PartialResponseDisabled) && errs.Err() != nil { + level.Error(reqLogger).Log("err", errs.Err(), "msg", "partial response disabled; aborting request") - if series := rr.r.GetSeries(); series != nil { - seriesStats.Count(series) + return errs.Err() + } else if errs.Err() != nil && (actualRequest.PartialResponseStrategy == storepb.PartialResponseStrategy_WARN || !actualRequest.PartialResponseDisabled) { + for _, rerr := range errs { + if err := srv.Send(storepb.NewWarnSeriesResponse(rerr)); err != nil { + level.Error(reqLogger).Log("err", err) - select { - case s.recvCh <- series: - case <-ctx.Done(): - s.handleErr(errors.Wrapf(ctx.Err(), "failed to receive any data from %s", s.name), done) - return false - } + return status.Error(codes.Unknown, errors.Wrap(err, "send warning response").Error()) } - return true } - for { - if !handleRecvResponse() { - return - } + } + + seriesSets := []*respSet{} + for _, resp := range storeResponses { + if len(resp.responses) == 0 { + s.metrics.emptyStreamResponses.Inc() + continue } - }() - return s -} + seriesSets = append(seriesSets, &respSet{responses: resp.responses}) + } + + if len(seriesSets) == 0 { + err := errors.New("No StoreAPIs matched for this query") + level.Warn(reqLogger).Log("err", err, "stores", strings.Join(storeDebugMsgs, ";")) + if sendErr := srv.Send(storepb.NewWarnSeriesResponse(err)); sendErr != nil { + level.Error(reqLogger).Log("err", sendErr) -func (s *streamSeriesSet) handleErr(err error, done chan struct{}) { - defer close(done) - s.closeSeries() + return status.Error(codes.Unknown, errors.Wrap(sendErr, "send series response").Error()) + } - if s.partialResponse { - level.Warn(s.logger).Log("err", err, "msg", "returning partial response") - s.warnCh.send(storepb.NewWarnSeriesResponse(err)) - return + return nil } - s.errMtx.Lock() - s.err = err - s.errMtx.Unlock() -} -// Next blocks until new message is received or stream is closed or operation is timed out. -func (s *streamSeriesSet) Next() (ok bool) { - s.currSeries, ok = <-s.recvCh - return ok -} + respHeap := NewDedupResponseHeap(NewProxyResponseHeap(seriesSets...)) -func (s *streamSeriesSet) At() (labels.Labels, []storepb.AggrChunk) { - if s.currSeries == nil { - return nil, nil + for respHeap.Next() { + resp := respHeap.At() + + if resp.GetWarning() != "" && (actualRequest.PartialResponseStrategy == storepb.PartialResponseStrategy_ABORT || actualRequest.PartialResponseDisabled) { + return errors.New(resp.GetWarning()) + } + + if err := srv.Send(resp); err != nil { + return status.Error(codes.Unknown, errors.Wrap(err, "send series response").Error()) + } } - return s.currSeries.PromLabels(), s.currSeries.Chunks + + return nil } -func (s *streamSeriesSet) Err() error { - s.errMtx.Lock() - defer s.errMtx.Unlock() - return errors.Wrap(s.err, s.name) +type directSender interface { + send(*storepb.SeriesResponse) } // storeMatches returns boolean if the given store may hold data for the given label matchers, time ranges and debug store matches gathered from context. diff --git a/pkg/store/proxy_heap.go b/pkg/store/proxy_heap.go new file mode 100644 index 00000000000..93944ee6aac --- /dev/null +++ b/pkg/store/proxy_heap.go @@ -0,0 +1,261 @@ +// Copyright (c) The Thanos Authors. +// Licensed under the Apache License 2.0. + +package store + +import ( + "container/heap" + "sort" + + "github.com/prometheus/prometheus/model/labels" + "github.com/prometheus/prometheus/storage" + "github.com/thanos-io/thanos/pkg/store/labelpb" + "github.com/thanos-io/thanos/pkg/store/storepb" +) + +// dedupResponseHeap is a wrapper around ProxyResponseHeap +// that deduplicates identical chunks identified by the same labelset. +// It uses a hashing function to do that. +type dedupResponseHeap struct { + h *ProxyResponseHeap + + responses []*storepb.SeriesResponse + + previousResponse *storepb.SeriesResponse + previousNext bool +} + +func NewDedupResponseHeap(h *ProxyResponseHeap) *dedupResponseHeap { + return &dedupResponseHeap{ + h: h, + previousNext: h.Next(), + } +} + +func (d *dedupResponseHeap) At() *storepb.SeriesResponse { + defer func() { + d.responses = d.responses[:0] + }() + if len(d.responses) == 0 { + return nil + } else if len(d.responses) == 1 { + return d.responses[0] + } + + chunkDedupMap := map[string]*storepb.AggrChunk{} + + for _, resp := range d.responses { + for _, chk := range resp.GetSeries().Chunks { + h := chk.Hash() + + if _, ok := chunkDedupMap[h]; !ok { + chk := chk + + chunkDedupMap[h] = &chk + } + } + } + + // If no chunks were requested. + if len(chunkDedupMap) == 0 { + return storepb.NewSeriesResponse(&storepb.Series{ + Labels: d.responses[0].GetSeries().Labels, + Chunks: d.responses[0].GetSeries().Chunks, + }) + } + + finalChunks := make([]storepb.AggrChunk, 0, len(chunkDedupMap)) + + for _, chk := range chunkDedupMap { + finalChunks = append(finalChunks, *chk) + } + + sort.Slice(finalChunks, func(i, j int) bool { + return finalChunks[i].Compare(finalChunks[j]) > 0 + }) + + lbls := d.responses[0].GetSeries().Labels + + return storepb.NewSeriesResponse(&storepb.Series{ + Labels: lbls, + Chunks: finalChunks, + }) +} + +func (d *dedupResponseHeap) Next() bool { + d.responses = d.responses[:0] + + // If there is something buffered that is *not* a series. + if d.previousResponse != nil && d.previousResponse.GetSeries() == nil { + d.responses = append(d.responses, d.previousResponse) + d.previousResponse = nil + d.previousNext = d.h.Next() + return len(d.responses) > 0 || d.previousNext + } + + var resp *storepb.SeriesResponse + var nextHeap bool + + // If buffered then use it. + if d.previousResponse != nil { + resp = d.previousResponse + d.previousResponse = nil + } else { + // If not buffered then check whether there is anything. + nextHeap = d.h.Next() + if !nextHeap { + return false + } + resp = d.h.At() + } + + // Append buffered or retrieved response. + d.responses = append(d.responses, resp) + + // Update previousNext. + defer func(next *bool) { + d.previousNext = *next + }(&nextHeap) + + if resp.GetSeries() == nil { + return len(d.responses) > 0 || d.previousNext + } + + for { + nextHeap = d.h.Next() + if !nextHeap { + break + } + resp = d.h.At() + if resp.GetSeries() == nil { + d.previousResponse = resp + break + } + + lbls := resp.GetSeries().Labels + lastLbls := d.responses[len(d.responses)-1].GetSeries().Labels + + if labels.Compare(labelpb.ZLabelsToPromLabels(lbls), labelpb.ZLabelsToPromLabels(lastLbls)) == 0 { + d.responses = append(d.responses, resp) + } else { + // This one is different. It will be taken care of via the next Next() call. + d.previousResponse = resp + break + } + } + + return len(d.responses) > 0 || d.previousNext +} + +// ProxyResponseHeap is a heap for storepb.SeriesSets. +// It performs k-way merge between all of those sets. +// TODO(GiedriusS): can be improved with a tournament tree. +// This is O(n*logk) but can be Theta(n*logk). However, +// tournament trees need n-1 auxiliary nodes so there +// might not be much of a difference. +type ProxyResponseHeap []ProxyResponseHeapNode + +func (h *ProxyResponseHeap) Less(i, j int) bool { + iResp := (*h)[i].rs.At() + jResp := (*h)[j].rs.At() + + if iResp.GetSeries() != nil && jResp.GetSeries() != nil { + iLbls := labelpb.ZLabelsToPromLabels(iResp.GetSeries().Labels) + jLbls := labelpb.ZLabelsToPromLabels(jResp.GetSeries().Labels) + return labels.Compare(iLbls, jLbls) < 0 + } else if iResp.GetSeries() == nil && jResp.GetSeries() != nil { + return true + } else if iResp.GetSeries() != nil && jResp.GetSeries() == nil { + return false + } + + // If it is not a series then the order does not matter. What matters + // is that we get different types of responses one after another. + return false +} + +func (h *ProxyResponseHeap) Len() int { + return len(*h) +} + +func (h *ProxyResponseHeap) Swap(i, j int) { + (*h)[i], (*h)[j] = (*h)[j], (*h)[i] +} + +func (h *ProxyResponseHeap) Push(x interface{}) { + *h = append(*h, x.(ProxyResponseHeapNode)) +} + +func (h *ProxyResponseHeap) Pop() (v interface{}) { + *h, v = (*h)[:h.Len()-1], (*h)[h.Len()-1] + return +} + +func (h *ProxyResponseHeap) Empty() bool { + return h.Len() == 0 +} + +func (h *ProxyResponseHeap) Min() *ProxyResponseHeapNode { + return &(*h)[0] +} + +type ProxyResponseHeapNode struct { + rs *respSet +} + +func NewProxyResponseHeap(seriesSets ...*respSet) *ProxyResponseHeap { + ret := make(ProxyResponseHeap, 0, len(seriesSets)) + + for _, ss := range seriesSets { + ss := ss + ret.Push(ProxyResponseHeapNode{rs: ss}) + } + + heap.Init(&ret) + + return &ret +} + +func (h *ProxyResponseHeap) Next() bool { + return !h.Empty() +} + +func (h *ProxyResponseHeap) At() *storepb.SeriesResponse { + min := h.Min().rs + + atResp := min.At() + + if min.Next() { + heap.Fix(h, 0) + } else { + heap.Remove(h, 0) + } + + return atResp +} + +func (h *ProxyResponseHeap) Err() error { + return nil +} + +type respSet struct { + responses []*storepb.SeriesResponse + i int +} + +func (ss *respSet) Next() bool { + ss.i++ + return ss.i < len(ss.responses) +} + +func (ss *respSet) Err() error { + return nil +} + +func (ss *respSet) Warnings() storage.Warnings { + return nil +} + +func (ss *respSet) At() *storepb.SeriesResponse { + return ss.responses[ss.i] +} diff --git a/pkg/store/proxy_test.go b/pkg/store/proxy_test.go index 303a87e0279..a9583f29f8d 100644 --- a/pkg/store/proxy_test.go +++ b/pkg/store/proxy_test.go @@ -572,7 +572,7 @@ func TestProxyStore_SeriesSlowStores(t *testing.T) { Matchers: []storepb.LabelMatcher{{Name: "ext", Value: "1", Type: storepb.LabelMatcher_EQ}}, PartialResponseDisabled: true, }, - expectedErr: errors.New("test: receive series from test: test"), + expectedErr: errors.New("receive series from test: test"), }, { title: "partial response disabled; 1st store is slow, 2nd store is fast;", @@ -607,7 +607,7 @@ func TestProxyStore_SeriesSlowStores(t *testing.T) { Matchers: []storepb.LabelMatcher{{Name: "ext", Value: "1", Type: storepb.LabelMatcher_EQ}}, PartialResponseDisabled: true, }, - expectedErr: errors.New("test: failed to receive any data in 4s from test: context deadline exceeded"), + expectedErr: errors.New("failed to receive any data in 4s from test: context deadline exceeded"), }, { title: "partial response disabled; 1st store is fast, 2nd store is slow;", @@ -642,7 +642,7 @@ func TestProxyStore_SeriesSlowStores(t *testing.T) { Matchers: []storepb.LabelMatcher{{Name: "ext", Value: "1", Type: storepb.LabelMatcher_EQ}}, PartialResponseDisabled: true, }, - expectedErr: errors.New("test: failed to receive any data in 4s from test: context deadline exceeded"), + expectedErr: errors.New("failed to receive any data in 4s from test: context deadline exceeded"), }, { title: "partial response disabled; 1st store is slow on 2nd series, 2nd store is fast;", @@ -680,7 +680,7 @@ func TestProxyStore_SeriesSlowStores(t *testing.T) { Matchers: []storepb.LabelMatcher{{Name: "ext", Value: "1", Type: storepb.LabelMatcher_EQ}}, PartialResponseDisabled: true, }, - expectedErr: errors.New("test: failed to receive any data in 4s from test: context deadline exceeded"), + expectedErr: errors.New("failed to receive any data in 4s from test: context deadline exceeded"), }, { title: "partial response disabled; 1st store is fast to respond, 2nd store is slow on 2nd series;", @@ -718,7 +718,7 @@ func TestProxyStore_SeriesSlowStores(t *testing.T) { Matchers: []storepb.LabelMatcher{{Name: "ext", Value: "1", Type: storepb.LabelMatcher_EQ}}, PartialResponseDisabled: true, }, - expectedErr: errors.New("test: failed to receive any data in 4s from test: context deadline exceeded"), + expectedErr: errors.New("failed to receive any data in 4s from test: context deadline exceeded"), }, { title: "partial response enabled; 1st store is slow to respond, 2nd store is fast;", @@ -943,7 +943,7 @@ func TestProxyStore_SeriesSlowStores(t *testing.T) { chunks: [][]sample{{{1, 1}, {2, 2}, {3, 3}}}, }, }, - expectedErr: errors.New("test: failed to receive any data from test: context deadline exceeded"), + expectedErr: errors.New("failed to receive any data from test: context deadline exceeded"), }, { title: "partial response enabled; all stores respond 3s", @@ -1025,6 +1025,10 @@ func TestProxyStore_SeriesSlowStores(t *testing.T) { return } } + + // Wait until the last goroutine exits which is stuck on time.Sleep(). + // Otherwise, goleak complains. + time.Sleep(5 * time.Second) } func TestProxyStore_Series_RequestParamsProxied(t *testing.T) { @@ -1770,7 +1774,12 @@ func benchProxySeries(t testutil.TB, totalSamples, totalSeries int) { } chunkLen := len(allResps[len(allResps)-1].GetSeries().Chunks) - maxTime := allResps[len(allResps)-1].GetSeries().Chunks[chunkLen-1].MaxTime + var maxTime int64 + if len(allResps[len(allResps)-1].GetSeries().Chunks) == 0 { + maxTime = math.MaxInt64 + } else { + maxTime = allResps[len(allResps)-1].GetSeries().Chunks[chunkLen-1].MaxTime + } storetestutil.TestServerSeries(t, store, &storetestutil.SeriesCase{ Name: fmt.Sprintf("%d client with %d samples, %d series each", numOfClients, samplesPerSeriesPerClient, seriesPerClient), diff --git a/pkg/store/storepb/custom.go b/pkg/store/storepb/custom.go index eaa96f1ede6..8181231e942 100644 --- a/pkg/store/storepb/custom.go +++ b/pkg/store/storepb/custom.go @@ -5,6 +5,7 @@ package storepb import ( "bytes" + "crypto/sha256" "encoding/binary" "fmt" "sort" @@ -257,6 +258,28 @@ func (s *uniqueSeriesSet) Next() bool { return true } +func (m AggrChunk) Hash() string { + h := sha256.New() + + _, _ = h.Write([]byte(fmt.Sprintf("%v%v", m.MinTime, m.MaxTime))) + + for _, ch := range []*Chunk{ + m.Raw, + m.Count, + m.Sum, + m.Min, + m.Max, + m.Counter, + } { + if ch != nil { + _, _ = h.Write(ch.Data) + _, _ = h.Write([]byte(ch.Type.String())) + } + } + + return fmt.Sprintf("%x", h.Sum(nil)) +} + // Compare returns positive 1 if chunk is smaller -1 if larger than b by min time, then max time. // It returns 0 if chunks are exactly the same. func (m AggrChunk) Compare(b AggrChunk) int { diff --git a/test/e2e/query_test.go b/test/e2e/query_test.go index 49e85467e5d..836d2da60db 100644 --- a/test/e2e/query_test.go +++ b/test/e2e/query_test.go @@ -1311,7 +1311,7 @@ func TestSidecarAlignmentPushdown(t *testing.T) { func TestGrpcInstantQuery(t *testing.T) { t.Parallel() - e, err := e2e.NewDockerEnvironment("e2e_test_query_grpc_api") + e, err := e2e.NewDockerEnvironment("e2e_test_instant_query_grpc_api") testutil.Ok(t, err) t.Cleanup(e2ethanos.CleanScenario(t, e)) @@ -1419,7 +1419,7 @@ func TestGrpcInstantQuery(t *testing.T) { func TestGrpcQueryRange(t *testing.T) { t.Parallel() - e, err := e2e.NewDockerEnvironment("e2e_test_query_grpc_api") + e, err := e2e.NewDockerEnvironment("e2e_test_query_range_grpc_api") testutil.Ok(t, err) t.Cleanup(e2ethanos.CleanScenario(t, e))