From fc4cd7ad3b1718a7a20e8730b20052cbe3a93cad Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Giedrius=20Statkevi=C4=8Dius?= Date: Mon, 30 May 2022 17:46:17 +0300 Subject: [PATCH] proxy: make strategy tunable MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Giedrius Statkevičius --- pkg/store/proxy.go | 14 +- pkg/store/proxy_heap.go | 313 +++++++++++++++++++++++++++++++--------- 2 files changed, 256 insertions(+), 71 deletions(-) diff --git a/pkg/store/proxy.go b/pkg/store/proxy.go index 58c504c18d..1b3e9da368 100644 --- a/pkg/store/proxy.go +++ b/pkg/store/proxy.go @@ -283,15 +283,19 @@ func (s *ProxyStore) Series(originalRequest *storepb.SeriesRequest, srv storepb. } var ( - storeResponses = make([]*lazyRespSet, len(stores)) + storeResponses = make([]respSet, len(stores)) ) for i, st := range stores { storeDebugMsgs = append(storeDebugMsgs, fmt.Sprintf("Store %s queried", st)) st := st - storeResponses[i] = newLazyRespSet(srv.Context(), st, r, s.responseTimeout) - defer storeResponses[i].Close() + respSet, err := newAsyncRespSet(srv.Context(), st, r, s.responseTimeout, EagerRetrieval) + if err != nil { + return status.Error(codes.Unknown, errors.Wrapf(err, "starting %s stream", st.String()).Error()) + } + storeResponses[i] = respSet + defer respSet.Close() } if len(stores) == 0 { @@ -328,12 +332,12 @@ func (s *ProxyStore) Series(originalRequest *storepb.SeriesRequest, srv storepb. return errors.Wrap(err, "send series response") } } else { - storeID := labelpb.PromLabelSetsToString(respSet.st.LabelSets()) + storeID := respSet.Labelset() if storeID == "" { storeID = "Store Gateway" } - return errors.Wrapf(respSet.Err(), "fetch series for %s %s", storeID, respSet.st) + return errors.Wrapf(respSet.Err(), "fetch series for %s %s", storeID, respSet.StoreID()) } } diff --git a/pkg/store/proxy_heap.go b/pkg/store/proxy_heap.go index b2595a8439..35f43287f9 100644 --- a/pkg/store/proxy_heap.go +++ b/pkg/store/proxy_heap.go @@ -7,6 +7,7 @@ import ( "container/heap" "context" "crypto/md5" + "fmt" "io" "sort" "sync" @@ -16,7 +17,6 @@ import ( "github.com/opentracing/opentracing-go" "github.com/pkg/errors" "github.com/prometheus/prometheus/model/labels" - "github.com/thanos-io/thanos/pkg/errutil" "github.com/thanos-io/thanos/pkg/store/labelpb" "github.com/thanos-io/thanos/pkg/store/storepb" "github.com/thanos-io/thanos/pkg/tracing" @@ -214,10 +214,10 @@ func (h *ProxyResponseHeap) Min() *ProxyResponseHeapNode { } type ProxyResponseHeapNode struct { - rs *lazyRespSet + rs respSet } -func NewProxyResponseHeap(seriesSets ...*lazyRespSet) *ProxyResponseHeap { +func NewProxyResponseHeap(seriesSets ...respSet) *ProxyResponseHeap { ret := make(ProxyResponseHeap, 0, len(seriesSets)) for _, ss := range seriesSets { @@ -255,38 +255,47 @@ func (h *ProxyResponseHeap) Error() error { return h.Min().rs.Err() } +func (l *lazyRespSet) StoreID() string { + return l.st.String() +} + +func (l *lazyRespSet) Labelset() string { + return labelpb.PromLabelSetsToString(l.st.LabelSets()) +} + // lazyRespSet is a lazy storepb.SeriesSet that buffers // everything as fast as possible while at the same it permits // reading response-by-response. It blocks if there is no data // in Next(). type lazyRespSet struct { - st Client - cl storepb.Store_SeriesClient - - closeSeries context.CancelFunc + // Generic parameters. span opentracing.Span - ctx context.Context + cl storepb.Store_SeriesClient + closeSeries context.CancelFunc + st Client frameTimeout time.Duration + ctx context.Context - dataOrEndEvent *sync.Cond + // Internal bookkeeping. + dataOrFinishEvent *sync.Cond bufferedResponses []*storepb.SeriesResponse bufferedResponsesMtx *sync.Mutex - readThrough int lastResp *storepb.SeriesResponse - errs errutil.MultiError - errsMtx sync.Mutex + err error + errMtx sync.Mutex noMoreData bool } func (l *lazyRespSet) Err() error { - l.errsMtx.Lock() - defer l.errsMtx.Unlock() - return l.errs.Err() + l.errMtx.Lock() + defer l.errMtx.Unlock() + return l.err } // Next either blocks until more data is available or reads -// the next response. +// the next response. If it is not lazy then it waits for everything +// to finish. func (l *lazyRespSet) Next() bool { l.bufferedResponsesMtx.Lock() defer l.bufferedResponsesMtx.Unlock() @@ -298,7 +307,7 @@ func (l *lazyRespSet) Next() bool { } for len(l.bufferedResponses) == 0 { - l.dataOrEndEvent.Wait() + l.dataOrFinishEvent.Wait() if l.noMoreData && len(l.bufferedResponses) == 0 { break } @@ -307,7 +316,6 @@ func (l *lazyRespSet) Next() bool { if len(l.bufferedResponses) > 0 { l.lastResp = l.bufferedResponses[0] l.bufferedResponses = l.bufferedResponses[1:] - l.readThrough++ return true } @@ -319,30 +327,14 @@ func (l *lazyRespSet) At() *storepb.SeriesResponse { return l.lastResp } -func newLazyRespSet(ctx context.Context, st Client, req *storepb.SeriesRequest, frameTimeout time.Duration) *lazyRespSet { - storeID := labelpb.PromLabelSetsToString(st.LabelSets()) - if storeID == "" { - storeID = "Store Gateway" - } - - seriesCtx, closeSeries := context.WithCancel(ctx) - seriesCtx = grpc_opentracing.ClientAddContextTags(seriesCtx, opentracing.Tags{ - "target": st.Addr(), - }) - - span, seriesCtx := tracing.StartSpan(seriesCtx, "proxy.series", tracing.Tags{ - "store.id": storeID, - "store.addr": st.Addr(), - }) - - errs := errutil.MultiError{} - cl, err := st.Series(seriesCtx, req) - if err != nil { - errs.Add(err) - span.SetTag("err", err.Error()) - span.Finish() - } - +func newLazyRespSet( + ctx context.Context, + span opentracing.Span, + frameTimeout time.Duration, + st Client, + closeSeries context.CancelFunc, + cl storepb.Store_SeriesClient, +) respSet { bufferedResponses := []*storepb.SeriesResponse{} bufferedResponsesMtx := &sync.Mutex{} dataAvailable := sync.NewCond(bufferedResponsesMtx) @@ -353,19 +345,12 @@ func newLazyRespSet(ctx context.Context, st Client, req *storepb.SeriesRequest, st: st, closeSeries: closeSeries, span: span, - ctx: seriesCtx, - errs: errs, - dataOrEndEvent: dataAvailable, + ctx: ctx, + dataOrFinishEvent: dataAvailable, bufferedResponsesMtx: bufferedResponsesMtx, bufferedResponses: bufferedResponses, } - if errs.Err() != nil { - respSet.noMoreData = true - return respSet - } - - // Start a goroutine and immediately buffer everything. go func(st Client, l *lazyRespSet) { handleRecvResponse := func() bool { frameTimeoutCtx, cancel := frameCtx(l.ctx, frameTimeout) @@ -374,28 +359,28 @@ func newLazyRespSet(ctx context.Context, st Client, req *storepb.SeriesRequest, select { case <-l.ctx.Done(): err := errors.Wrapf(l.ctx.Err(), "failed to receive any data from %s", st.String()) - l.errsMtx.Lock() - l.errs.Add(err) - l.errsMtx.Unlock() + l.errMtx.Lock() + l.err = err + l.errMtx.Unlock() l.span.SetTag("err", err.Error()) l.span.Finish() l.bufferedResponsesMtx.Lock() l.noMoreData = true - l.dataOrEndEvent.Signal() + l.dataOrFinishEvent.Signal() l.bufferedResponsesMtx.Unlock() return false case <-frameTimeoutCtx.Done(): err := errors.Wrapf(frameTimeoutCtx.Err(), "failed to receive any data in %v from %s", frameTimeout, st.String()) - l.errsMtx.Lock() - l.errs.Add(err) - l.errsMtx.Unlock() + l.errMtx.Lock() + l.err = err + l.errMtx.Unlock() l.span.SetTag("err", err.Error()) l.span.Finish() l.bufferedResponsesMtx.Lock() l.noMoreData = true - l.dataOrEndEvent.Signal() + l.dataOrFinishEvent.Signal() l.bufferedResponsesMtx.Unlock() return false default: @@ -405,30 +390,30 @@ func newLazyRespSet(ctx context.Context, st Client, req *storepb.SeriesRequest, l.bufferedResponsesMtx.Lock() l.noMoreData = true - l.dataOrEndEvent.Signal() + l.dataOrFinishEvent.Signal() l.bufferedResponsesMtx.Unlock() return false } if err != nil { err := errors.Wrapf(err, "receive series from %s", st.String()) - l.errsMtx.Lock() - l.errs.Add(err) - l.errsMtx.Unlock() + l.errMtx.Lock() + l.err = err + l.errMtx.Unlock() l.span.SetTag("err", err.Error()) l.span.Finish() l.bufferedResponsesMtx.Lock() l.noMoreData = true - l.dataOrEndEvent.Signal() + l.dataOrFinishEvent.Signal() l.bufferedResponsesMtx.Unlock() return false } l.bufferedResponsesMtx.Lock() l.bufferedResponses = append(l.bufferedResponses, resp) - l.dataOrEndEvent.Signal() + l.dataOrFinishEvent.Signal() l.bufferedResponsesMtx.Unlock() return true } @@ -443,11 +428,207 @@ func newLazyRespSet(ctx context.Context, st Client, req *storepb.SeriesRequest, return respSet } +// RetrievalStrategy stores what kind of retrieval strategy +// shall be used for the async response set. +type RetrievalStrategy string + +const ( + LazyRetrieval RetrievalStrategy = "lazy" + + // TODO(GiedriusS): remove eager retrieval once + // https://github.com/prometheus/prometheus/blob/ce6a643ee88fba7c02fbd0459c4d0ac498f512dd/promql/engine.go#L877-L902 + // is removed. + EagerRetrieval RetrievalStrategy = "eager" +) + +func newAsyncRespSet(ctx context.Context, st Client, req *storepb.SeriesRequest, frameTimeout time.Duration, retrievalStrategy RetrievalStrategy) (respSet, error) { + storeID := labelpb.PromLabelSetsToString(st.LabelSets()) + if storeID == "" { + storeID = "Store Gateway" + } + + seriesCtx, closeSeries := context.WithCancel(ctx) + seriesCtx = grpc_opentracing.ClientAddContextTags(seriesCtx, opentracing.Tags{ + "target": st.Addr(), + }) + + span, seriesCtx := tracing.StartSpan(seriesCtx, "proxy.series", tracing.Tags{ + "store.id": storeID, + "store.addr": st.Addr(), + }) + + cl, err := st.Series(seriesCtx, req) + if err != nil { + span.SetTag("err", err.Error()) + span.Finish() + closeSeries() + return nil, err + } + + switch retrievalStrategy { + case LazyRetrieval: + return newLazyRespSet( + seriesCtx, + span, + frameTimeout, + st, + closeSeries, + cl, + ), nil + case EagerRetrieval: + return newEagerRespSet( + seriesCtx, + span, + frameTimeout, + st, + closeSeries, + cl, + ), nil + default: + panic(fmt.Sprintf("unsupported retrieval strategy %s", retrievalStrategy)) + } +} + func (l *lazyRespSet) Close() { l.bufferedResponsesMtx.Lock() defer l.bufferedResponsesMtx.Unlock() l.closeSeries() l.noMoreData = true - l.dataOrEndEvent.Signal() + l.dataOrFinishEvent.Signal() +} + +// eagerRespSet is a SeriesSet that blocks until all data is retrieved from +// the StoreAPI. +type eagerRespSet struct { + // Generic parameters. + span opentracing.Span + cl storepb.Store_SeriesClient + ctx context.Context + + closeSeries context.CancelFunc + st Client + frameTimeout time.Duration + + // Internal bookkeeping. + err error + bufferedResponses []*storepb.SeriesResponse + wg *sync.WaitGroup + i int +} + +func newEagerRespSet( + ctx context.Context, + span opentracing.Span, + frameTimeout time.Duration, + st Client, + closeSeries context.CancelFunc, + cl storepb.Store_SeriesClient, +) respSet { + ret := &eagerRespSet{ + span: span, + st: st, + closeSeries: closeSeries, + cl: cl, + frameTimeout: frameTimeout, + ctx: ctx, + bufferedResponses: []*storepb.SeriesResponse{}, + wg: &sync.WaitGroup{}, + } + + ret.wg.Add(1) + + // Start a goroutine and immediately buffer everything. + go func(st Client, l *eagerRespSet) { + defer ret.wg.Done() + + handleRecvResponse := func() bool { + frameTimeoutCtx, cancel := frameCtx(l.ctx, frameTimeout) + defer cancel() + + select { + case <-l.ctx.Done(): + err := errors.Wrapf(l.ctx.Err(), "failed to receive any data from %s", st.String()) + l.err = err + l.span.SetTag("err", err.Error()) + l.span.Finish() + return false + case <-frameTimeoutCtx.Done(): + err := errors.Wrapf(frameTimeoutCtx.Err(), "failed to receive any data in %v from %s", frameTimeout, st.String()) + l.err = err + l.span.SetTag("err", err.Error()) + l.span.Finish() + return false + default: + resp, err := cl.Recv() + if err == io.EOF { + l.span.Finish() + return false + } + + if err != nil { + err := errors.Wrapf(err, "receive series from %s", st.String()) + l.err = err + l.span.SetTag("err", err.Error()) + l.span.Finish() + return false + } + + l.bufferedResponses = append(l.bufferedResponses, resp) + return true + } + } + for { + if !handleRecvResponse() { + return + } + } + }(st, ret) + + return ret +} + +func (l *eagerRespSet) Close() { +} + +func (l *eagerRespSet) At() *storepb.SeriesResponse { + l.wg.Wait() + + if len(l.bufferedResponses) == 0 { + return nil + } + if l.i-1 < 0 { + return l.bufferedResponses[0] + } + return l.bufferedResponses[l.i-1] +} + +func (l *eagerRespSet) Next() bool { + l.wg.Wait() + + savedIndex := l.i + l.i++ + + return savedIndex < len(l.bufferedResponses) +} + +func (l *eagerRespSet) Err() error { + return l.err +} + +func (l *eagerRespSet) StoreID() string { + return l.st.String() +} + +func (l *eagerRespSet) Labelset() string { + return labelpb.PromLabelSetsToString(l.st.LabelSets()) +} + +type respSet interface { + Close() + At() *storepb.SeriesResponse + Next() bool + Err() error + StoreID() string + Labelset() string }