Skip to content

Commit

Permalink
Move Store timeout to Proxy
Browse files Browse the repository at this point in the history
  • Loading branch information
povilasv committed Mar 15, 2019
1 parent b0ee13b commit 4f385bd
Show file tree
Hide file tree
Showing 6 changed files with 37 additions and 40 deletions.
2 changes: 1 addition & 1 deletion cmd/thanos/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -318,7 +318,7 @@ func runQuery(
)
proxy = store.NewProxyStore(logger, func(context.Context) ([]store.Client, error) {
return stores.Get(), nil
}, component.Query, selectorLset)
}, component.Query, selectorLset, storeReadTimeout)
queryableCreator = query.NewQueryableCreator(logger, proxy, replicaLabel, storeReadTimeout)
engine = promql.NewEngine(
promql.EngineOpts{
Expand Down
4 changes: 2 additions & 2 deletions docs/components/query.md
Original file line number Diff line number Diff line change
Expand Up @@ -275,7 +275,7 @@ Flags:
store.read-timeout < query.timeout partial
response will be returned. If
store.read-timeout >= query.timeout one of
stores is timed out clien will get no data and
timeout error.
stores is timed out the client will get no data
and timeout error.
```
25 changes: 3 additions & 22 deletions pkg/query/querier.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ type queryable struct {

// Querier returns a new storage querier against the underlying proxy store API.
func (q *queryable) Querier(ctx context.Context, mint, maxt int64) (storage.Querier, error) {
return newQuerier(ctx, q.logger, mint, maxt, q.replicaLabel, q.proxy, q.deduplicate, int64(q.maxSourceResolution/time.Millisecond), q.storeReadTimeout, q.partialResponse, q.warningReporter), nil
return newQuerier(ctx, q.logger, mint, maxt, q.replicaLabel, q.proxy, q.deduplicate, int64(q.maxSourceResolution/time.Millisecond), q.partialResponse, q.warningReporter), nil
}

type querier struct {
Expand All @@ -71,11 +71,6 @@ type querier struct {
maxSourceResolution int64
partialResponse bool
warningReporter WarningReporter

// storeReadTimeout is an additional timeout for reading data from stores.
// Its separated from ctx because querier.ctx is forwarded from prometheus query executor
// and already contains timeout for entire query execution.
storeReadTimeout time.Duration
}

// newQuerier creates implementation of storage.Querier that fetches data from the proxy
Expand All @@ -88,7 +83,6 @@ func newQuerier(
proxy storepb.StoreServer,
deduplicate bool,
maxSourceResolution int64,
storeReadTimeout time.Duration,
partialResponse bool,
warningReporter WarningReporter,
) *querier {
Expand All @@ -109,7 +103,6 @@ func newQuerier(
proxy: proxy,
deduplicate: deduplicate,
maxSourceResolution: maxSourceResolution,
storeReadTimeout: storeReadTimeout,
partialResponse: partialResponse,
warningReporter: warningReporter,
}
Expand Down Expand Up @@ -189,16 +182,7 @@ func (q *querier) Select(params *storage.SelectParams, ms ...*labels.Matcher) (s

queryAggrs, resAggr := aggrsFromFunc(params.Func)

// We're limiting store read time here.
// This limit affects behaviour what result Thanos Query will return to user if one of requested stores timed out.
// If query.timeout > store.read-timeout
// client will get partial response from stores who responded faster than store.read-timeout
// If query.timeout <= store.read-timeout
// client will get an error with timeout for whole request even if some of stores responded in time, but one timed out
storeReadCtx, storeReadCancelFunc := context.WithTimeout(ctx, q.storeReadTimeout)
defer storeReadCancelFunc()

resp := &seriesServer{ctx: storeReadCtx}
resp := &seriesServer{ctx: ctx}
if err := q.proxy.Series(&storepb.SeriesRequest{
MinTime: q.mint,
MaxTime: q.maxt,
Expand Down Expand Up @@ -270,10 +254,7 @@ func (q *querier) LabelValues(name string) ([]string, error) {
span, ctx := tracing.StartSpan(q.ctx, "querier_label_values")
defer span.Finish()

storeReadCtx, storeReadCancelFunc := context.WithTimeout(ctx, q.storeReadTimeout)
defer storeReadCancelFunc()

resp, err := q.proxy.LabelValues(storeReadCtx, &storepb.LabelValuesRequest{Label: name, PartialResponseDisabled: !q.partialResponse})
resp, err := q.proxy.LabelValues(ctx, &storepb.LabelValuesRequest{Label: name, PartialResponseDisabled: !q.partialResponse})
if err != nil {
return nil, errors.Wrap(err, "proxy LabelValues()")
}
Expand Down
12 changes: 6 additions & 6 deletions pkg/query/querier_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ func TestQuerier_Series(t *testing.T) {

// Querier clamps the range to [1,300], which should drop some samples of the result above.
// The store API allows endpoints to send more data then initially requested.
q := newQuerier(context.Background(), nil, 1, 300, "", testProxy, false, 0, time.Minute, true, nil)
q := newQuerier(context.Background(), nil, 1, 300, "", testProxy, false, 0, true, nil)
defer func() { testutil.Ok(t, q.Close()) }()

res, _, err := q.Select(&storage.SelectParams{})
Expand Down Expand Up @@ -290,19 +290,19 @@ func TestQuerier_StoreReadTimeout(t *testing.T) {
LabelsCallback: func() []storepb.Label { return nil },
}

storeReadTimeout := 1 * time.Millisecond
queryTimeout := 1 * time.Second

proxy := store.NewProxyStore(
nil,
func(context.Context) ([]store.Client, error) { return []store.Client{storeClient}, nil },
component.Query,
nil,
nil, storeReadTimeout,
)

queryTimeout := 1 * time.Second
storeReadTimeout := 1 * time.Millisecond

queryCtx, _ := context.WithTimeout(context.Background(), queryTimeout)

q := newQuerier(queryCtx, nil, 1, 300, "", proxy, false, 0, storeReadTimeout, true, nil)
q := newQuerier(queryCtx, nil, 1, 300, "", proxy, false, 0, true, nil)

storeQueryDone := make(chan struct{})

Expand Down
26 changes: 19 additions & 7 deletions pkg/store/proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"math"
"strings"
"sync"
"time"

"github.com/go-kit/kit/log"
"github.com/go-kit/kit/log/level"
Expand Down Expand Up @@ -40,6 +41,11 @@ type ProxyStore struct {
stores func(context.Context) ([]Client, error)
component component.StoreAPI
selectorLabels labels.Labels

// storeReadTimeout is an additional timeout for reading data from stores.
// Its separated from ctx because querier.ctx is forwarded from prometheus query executor
// and already contains timeout for entire query execution.
storeReadTimeout time.Duration
}

// NewProxyStore returns a new ProxyStore that uses the given clients that implements storeAPI to fan-in all series to the client.
Expand All @@ -49,15 +55,17 @@ func NewProxyStore(
stores func(context.Context) ([]Client, error),
component component.StoreAPI,
selectorLabels labels.Labels,
storeReadTimeout time.Duration,
) *ProxyStore {
if logger == nil {
logger = log.NewNopLogger()
}
s := &ProxyStore{
logger: logger,
stores: stores,
component: component,
selectorLabels: selectorLabels,
logger: logger,
stores: stores,
component: component,
selectorLabels: selectorLabels,
storeReadTimeout: storeReadTimeout,
}
return s
}
Expand Down Expand Up @@ -108,16 +116,18 @@ func (s *ProxyStore) Series(r *storepb.SeriesRequest, srv storepb.Store_SeriesSe
if !match {
return nil
}
ctx, cancel := context.WithTimeout(srv.Context(), s.storeReadTimeout)
defer cancel()

stores, err := s.stores(srv.Context())
stores, err := s.stores(ctx)
if err != nil {
err = errors.Wrap(err, "failed to get store APIs")
level.Error(s.logger).Log("err", err)
return status.Errorf(codes.Unknown, err.Error())
}

var (
g, gctx = errgroup.WithContext(srv.Context())
g, gctx = errgroup.WithContext(ctx)

// Allow to buffer max 10 series response.
// Each might be quite large (multi chunk long series given by sidecar).
Expand Down Expand Up @@ -203,7 +213,6 @@ func (s *ProxyStore) Series(r *storepb.SeriesRequest, srv storepb.Store_SeriesSe
return err
}
return nil

}

type warnSender interface {
Expand Down Expand Up @@ -330,6 +339,9 @@ func (s *ProxyStore) LabelNames(ctx context.Context, r *storepb.LabelNamesReques
func (s *ProxyStore) LabelValues(ctx context.Context, r *storepb.LabelValuesRequest) (
*storepb.LabelValuesResponse, error,
) {
ctx, cancel := context.WithTimeout(ctx, s.storeReadTimeout)
defer cancel()

var (
warnings []string
all [][]string
Expand Down
8 changes: 6 additions & 2 deletions pkg/store/proxy_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ func TestProxyStore_Series_StoresFetchFail(t *testing.T) {
q := NewProxyStore(nil,
func(_ context.Context) ([]Client, error) { return nil, errors.New("Fail") },
component.Query,
nil,
nil, 100*time.Second,
)

s := newStoreSeriesServer(context.Background())
Expand All @@ -68,7 +68,7 @@ func TestProxyStore_Info(t *testing.T) {
q := NewProxyStore(nil,
func(context.Context) ([]Client, error) { return nil, nil },
component.Query,
nil,
nil, 100*time.Second,
)

resp, err := q.Info(ctx, &storepb.InfoRequest{})
Expand Down Expand Up @@ -422,6 +422,7 @@ func TestProxyStore_Series(t *testing.T) {
func(_ context.Context) ([]Client, error) { return tc.storeAPIs, nil }, // what if err?
component.Query,
tc.selectorLabels,
100*time.Second,
)

s := newStoreSeriesServer(context.Background())
Expand Down Expand Up @@ -463,6 +464,7 @@ func TestProxyStore_Series_RequestParamsProxied(t *testing.T) {
func(context.Context) ([]Client, error) { return cls, nil },
component.Query,
nil,
100*time.Second,
)

ctx := context.Background()
Expand Down Expand Up @@ -521,6 +523,7 @@ func TestProxyStore_Series_RegressionFillResponseChannel(t *testing.T) {
func(context.Context) ([]Client, error) { return cls, nil },
component.Query,
tlabels.FromStrings("fed", "a"),
100*time.Second,
)

ctx := context.Background()
Expand Down Expand Up @@ -558,6 +561,7 @@ func TestProxyStore_LabelValues(t *testing.T) {
func(context.Context) ([]Client, error) { return cls, nil },
component.Query,
nil,
100*time.Second,
)

ctx := context.Background()
Expand Down

0 comments on commit 4f385bd

Please sign in to comment.