diff --git a/cmd/thanos/query.go b/cmd/thanos/query.go index 368770e6422..2822cdb2738 100644 --- a/cmd/thanos/query.go +++ b/cmd/thanos/query.go @@ -318,7 +318,7 @@ func runQuery( ) proxy = store.NewProxyStore(logger, func(context.Context) ([]store.Client, error) { return stores.Get(), nil - }, component.Query, selectorLset) + }, component.Query, selectorLset, storeReadTimeout) queryableCreator = query.NewQueryableCreator(logger, proxy, replicaLabel, storeReadTimeout) engine = promql.NewEngine( promql.EngineOpts{ diff --git a/pkg/query/querier.go b/pkg/query/querier.go index a6ddd2e9b90..d911bed62f5 100644 --- a/pkg/query/querier.go +++ b/pkg/query/querier.go @@ -57,7 +57,7 @@ type queryable struct { // Querier returns a new storage querier against the underlying proxy store API. func (q *queryable) Querier(ctx context.Context, mint, maxt int64) (storage.Querier, error) { - return newQuerier(ctx, q.logger, mint, maxt, q.replicaLabel, q.proxy, q.deduplicate, int64(q.maxSourceResolution/time.Millisecond), q.storeReadTimeout, q.partialResponse, q.warningReporter), nil + return newQuerier(ctx, q.logger, mint, maxt, q.replicaLabel, q.proxy, q.deduplicate, int64(q.maxSourceResolution/time.Millisecond), q.partialResponse, q.warningReporter), nil } type querier struct { @@ -71,11 +71,6 @@ type querier struct { maxSourceResolution int64 partialResponse bool warningReporter WarningReporter - - // storeReadTimeout is an additional timeout for reading data from stores. - // Its separated from ctx because querier.ctx is forwarded from prometheus query executor - // and already contains timeout for entire query execution. - storeReadTimeout time.Duration } // newQuerier creates implementation of storage.Querier that fetches data from the proxy @@ -88,7 +83,6 @@ func newQuerier( proxy storepb.StoreServer, deduplicate bool, maxSourceResolution int64, - storeReadTimeout time.Duration, partialResponse bool, warningReporter WarningReporter, ) *querier { @@ -109,7 +103,6 @@ func newQuerier( proxy: proxy, deduplicate: deduplicate, maxSourceResolution: maxSourceResolution, - storeReadTimeout: storeReadTimeout, partialResponse: partialResponse, warningReporter: warningReporter, } @@ -189,16 +182,7 @@ func (q *querier) Select(params *storage.SelectParams, ms ...*labels.Matcher) (s queryAggrs, resAggr := aggrsFromFunc(params.Func) - // We're limiting store read time here. - // This limit affects behaviour what result Thanos Query will return to user if one of requested stores timed out. - // If query.timeout > store.read-timeout - // client will get partial response from stores who responded faster than store.read-timeout - // If query.timeout <= store.read-timeout - // client will get an error with timeout for whole request even if some of stores responded in time, but one timed out - storeReadCtx, storeReadCancelFunc := context.WithTimeout(ctx, q.storeReadTimeout) - defer storeReadCancelFunc() - - resp := &seriesServer{ctx: storeReadCtx} + resp := &seriesServer{ctx: ctx} if err := q.proxy.Series(&storepb.SeriesRequest{ MinTime: q.mint, MaxTime: q.maxt, @@ -270,10 +254,10 @@ func (q *querier) LabelValues(name string) ([]string, error) { span, ctx := tracing.StartSpan(q.ctx, "querier_label_values") defer span.Finish() - storeReadCtx, storeReadCancelFunc := context.WithTimeout(ctx, q.storeReadTimeout) - defer storeReadCancelFunc() + //storeReadCtx, storeReadCancelFunc := context.WithTimeout(ctx, q.storeReadTimeout) + //defer storeReadCancelFunc() - resp, err := q.proxy.LabelValues(storeReadCtx, &storepb.LabelValuesRequest{Label: name, PartialResponseDisabled: !q.partialResponse}) + resp, err := q.proxy.LabelValues(ctx, &storepb.LabelValuesRequest{Label: name, PartialResponseDisabled: !q.partialResponse}) if err != nil { return nil, errors.Wrap(err, "proxy LabelValues()") } diff --git a/pkg/query/querier_test.go b/pkg/query/querier_test.go index e58cc163872..b8da440f1c5 100644 --- a/pkg/query/querier_test.go +++ b/pkg/query/querier_test.go @@ -36,7 +36,7 @@ func TestQuerier_Series(t *testing.T) { // Querier clamps the range to [1,300], which should drop some samples of the result above. // The store API allows endpoints to send more data then initially requested. - q := newQuerier(context.Background(), nil, 1, 300, "", testProxy, false, 0, time.Minute, true, nil) + q := newQuerier(context.Background(), nil, 1, 300, "", testProxy, false, 0, true, nil) defer func() { testutil.Ok(t, q.Close()) }() res, _, err := q.Select(&storage.SelectParams{}) @@ -290,19 +290,19 @@ func TestQuerier_StoreReadTimeout(t *testing.T) { LabelsCallback: func() []storepb.Label { return nil }, } + storeReadTimeout := 1 * time.Millisecond + queryTimeout := 1 * time.Second + proxy := store.NewProxyStore( nil, func(context.Context) ([]store.Client, error) { return []store.Client{storeClient}, nil }, component.Query, - nil, + nil, storeReadTimeout, ) - queryTimeout := 1 * time.Second - storeReadTimeout := 1 * time.Millisecond - queryCtx, _ := context.WithTimeout(context.Background(), queryTimeout) - q := newQuerier(queryCtx, nil, 1, 300, "", proxy, false, 0, storeReadTimeout, true, nil) + q := newQuerier(queryCtx, nil, 1, 300, "", proxy, false, 0, true, nil) storeQueryDone := make(chan struct{}) diff --git a/pkg/store/proxy.go b/pkg/store/proxy.go index 4b697f14b8f..93e7ab8ce97 100644 --- a/pkg/store/proxy.go +++ b/pkg/store/proxy.go @@ -7,6 +7,7 @@ import ( "math" "strings" "sync" + "time" "github.com/go-kit/kit/log" "github.com/go-kit/kit/log/level" @@ -40,6 +41,11 @@ type ProxyStore struct { stores func(context.Context) ([]Client, error) component component.StoreAPI selectorLabels labels.Labels + + // storeReadTimeout is an additional timeout for reading data from stores. + // Its separated from ctx because querier.ctx is forwarded from prometheus query executor + // and already contains timeout for entire query execution. + storeReadTimeout time.Duration } // NewProxyStore returns a new ProxyStore that uses the given clients that implements storeAPI to fan-in all series to the client. @@ -49,15 +55,17 @@ func NewProxyStore( stores func(context.Context) ([]Client, error), component component.StoreAPI, selectorLabels labels.Labels, + storeReadTimeout time.Duration, ) *ProxyStore { if logger == nil { logger = log.NewNopLogger() } s := &ProxyStore{ - logger: logger, - stores: stores, - component: component, - selectorLabels: selectorLabels, + logger: logger, + stores: stores, + component: component, + selectorLabels: selectorLabels, + storeReadTimeout: storeReadTimeout, } return s } @@ -108,8 +116,10 @@ func (s *ProxyStore) Series(r *storepb.SeriesRequest, srv storepb.Store_SeriesSe if !match { return nil } + ctx, cancel := context.WithTimeout(srv.Context(), s.storeReadTimeout) + defer cancel() - stores, err := s.stores(srv.Context()) + stores, err := s.stores(ctx) if err != nil { err = errors.Wrap(err, "failed to get store APIs") level.Error(s.logger).Log("err", err) @@ -203,7 +213,6 @@ func (s *ProxyStore) Series(r *storepb.SeriesRequest, srv storepb.Store_SeriesSe return err } return nil - } type warnSender interface { @@ -330,6 +339,9 @@ func (s *ProxyStore) LabelNames(ctx context.Context, r *storepb.LabelNamesReques func (s *ProxyStore) LabelValues(ctx context.Context, r *storepb.LabelValuesRequest) ( *storepb.LabelValuesResponse, error, ) { + ctx, cancel := context.WithTimeout(ctx, s.storeReadTimeout) + defer cancel() + var ( warnings []string all [][]string @@ -345,7 +357,7 @@ func (s *ProxyStore) LabelValues(ctx context.Context, r *storepb.LabelValuesRequ store := st g.Go(func() error { resp, err := store.LabelValues(gctx, &storepb.LabelValuesRequest{ - Label: r.Label, + Label: r.Label, PartialResponseDisabled: r.PartialResponseDisabled, }) if err != nil { diff --git a/pkg/store/proxy_test.go b/pkg/store/proxy_test.go index d159b887407..127a5de90da 100644 --- a/pkg/store/proxy_test.go +++ b/pkg/store/proxy_test.go @@ -48,7 +48,7 @@ func TestProxyStore_Series_StoresFetchFail(t *testing.T) { q := NewProxyStore(nil, func(_ context.Context) ([]Client, error) { return nil, errors.New("Fail") }, component.Query, - nil, + nil, 100*time.Second, ) s := newStoreSeriesServer(context.Background()) @@ -68,7 +68,7 @@ func TestProxyStore_Info(t *testing.T) { q := NewProxyStore(nil, func(context.Context) ([]Client, error) { return nil, nil }, component.Query, - nil, + nil, 100*time.Second, ) resp, err := q.Info(ctx, &storepb.InfoRequest{}) @@ -422,6 +422,7 @@ func TestProxyStore_Series(t *testing.T) { func(_ context.Context) ([]Client, error) { return tc.storeAPIs, nil }, // what if err? component.Query, tc.selectorLabels, + 100*time.Second, ) s := newStoreSeriesServer(context.Background()) @@ -463,6 +464,7 @@ func TestProxyStore_Series_RequestParamsProxied(t *testing.T) { func(context.Context) ([]Client, error) { return cls, nil }, component.Query, nil, + 100*time.Second, ) ctx := context.Background() @@ -521,6 +523,7 @@ func TestProxyStore_Series_RegressionFillResponseChannel(t *testing.T) { func(context.Context) ([]Client, error) { return cls, nil }, component.Query, tlabels.FromStrings("fed", "a"), + 100*time.Second, ) ctx := context.Background() @@ -558,11 +561,12 @@ func TestProxyStore_LabelValues(t *testing.T) { func(context.Context) ([]Client, error) { return cls, nil }, component.Query, nil, + 100*time.Second, ) ctx := context.Background() req := &storepb.LabelValuesRequest{ - Label: "a", + Label: "a", PartialResponseDisabled: true, } resp, err := q.LabelValues(ctx, req)