Skip to content

Commit

Permalink
query: add initial time-based query pushdown
Browse files Browse the repository at this point in the history
Add time-based pushdown mechanism. It works by directly querying a
Thanos Store node if it is the only one partially or fully matching the
time range of a query. I believe that this will be the case most of the
times because typically Sidecar/Ruler/Receive cover a few days of time
range and horizontally balanced Thanos Store instances cover the
historical data from remote object storage. With query-frontend in front
splitting the queries into smaller parts means that with higher time ranges, most of
the queries can be simply pushed down. For example, with Sidecars
covering 2 days of data, with a 7d query, and a split interval of 1d
this means that 5/7 * 100 = ~71% of processing can be pushed down to the
leaf nodes.

Small ad-hoc tests which touch a lot of timeseries show a 2x reduction
in query duration.

Hide this under an "experimental feature" flag
because this still doesn't have everything implemented intentionally:

* `lazySeriesSet` for concurrent `Select()`s in Thanos Store
* Some missing Prometheus engine flags such as the look-back delta
* Support the selection of StoreAPI nodes in the UI (use only them, not
all endpoints)
* No chunks/series limiters

Also, this API could be potentially implemented in other components.

If the approach looks good then we can merge this and implement these in follow-up PRs.

The main parts of this change is in:

* New QueryAPI service
* New `pkg/pushdown/`
* API request handling code

Signed-off-by: Giedrius Statkevičius <[email protected]>
  • Loading branch information
GiedriusS committed Sep 30, 2021
1 parent 360b39e commit 5cb14f9
Show file tree
Hide file tree
Showing 26 changed files with 1,709 additions and 71 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ We use *breaking :warning:* to mark changes that are not backward compatible (re
- [#4680](https://github.com/thanos-io/thanos/pull/4680) Query: add `exemplar.partial-response` flag to control partial response.
- [#4679](https://github.com/thanos-io/thanos/pull/4679) Added `enable-feature` flag to enable negative offsets and @ modifier, similar to Prometheus.
- [#4696](https://github.com/thanos-io/thanos/pull/4696) Query: add cache name to tracing spans.
- [#4712](https://github.com/thanos-io/thanos/pull/4712) Query/Store: added experimental feature `store-pushdown` and corresponding flags to Thanos Store. You can enable it with `--enable-feature` on Thanos Query. Currently it makes Thanos Query push down a query to a leaf node if it is the only one matching the provided time range via the API. It should cover most cases where Sidecar/Ruler/Receive is responsible for a few days of data, and the rest of the data is covered by load-balanced Thanos Stores. Ad-hoc tests show a decrease of up to 50% in duration of queries which touch lots of time series because it is not necessary anymore to transfer all of them over the wire.

### Fixed

Expand Down
18 changes: 16 additions & 2 deletions cmd/thanos/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ import (
"github.com/thanos-io/thanos/pkg/logging"
"github.com/thanos-io/thanos/pkg/metadata"
"github.com/thanos-io/thanos/pkg/prober"
"github.com/thanos-io/thanos/pkg/pushdown"
"github.com/thanos-io/thanos/pkg/query"
"github.com/thanos-io/thanos/pkg/rules"
"github.com/thanos-io/thanos/pkg/runutil"
Expand All @@ -55,6 +56,7 @@ import (
const (
promqlNegativeOffset = "promql-negative-offset"
promqlAtModifier = "promql-at-modifier"
storePushdown = "store-pushdown"
)

// registerQuery registers a query command.
Expand Down Expand Up @@ -151,7 +153,7 @@ func registerQuery(app *extkingpin.App) {
enableMetricMetadataPartialResponse := cmd.Flag("metric-metadata.partial-response", "Enable partial response for metric metadata endpoint. --no-metric-metadata.partial-response for disabling.").
Hidden().Default("true").Bool()

featureList := cmd.Flag("enable-feature", "Comma separated experimental feature names to enable.The current list of features is "+promqlNegativeOffset+" and "+promqlAtModifier+".").Default("").Strings()
featureList := cmd.Flag("enable-feature", "Comma separated experimental feature names to enable.The current list of features is "+promqlNegativeOffset+", "+storePushdown+", and "+promqlAtModifier+".").Default("").Strings()

enableExemplarPartialResponse := cmd.Flag("exemplar.partial-response", "Enable partial response for exemplar endpoint. --no-exemplar.partial-response for disabling.").
Hidden().Default("true").Bool()
Expand All @@ -170,14 +172,17 @@ func registerQuery(app *extkingpin.App) {
return errors.Wrap(err, "parse federation labels")
}

var enableNegativeOffset, enableAtModifier bool
var enableNegativeOffset, enableAtModifier, enableStorePushdown bool
for _, feature := range *featureList {
if feature == promqlNegativeOffset {
enableNegativeOffset = true
}
if feature == promqlAtModifier {
enableAtModifier = true
}
if feature == storePushdown {
enableStorePushdown = true
}
}

if dup := firstDuplicate(*stores); dup != "" {
Expand Down Expand Up @@ -285,6 +290,7 @@ func registerQuery(app *extkingpin.App) {
*webDisableCORS,
enableAtModifier,
enableNegativeOffset,
enableStorePushdown,
component.Query,
)
})
Expand Down Expand Up @@ -350,6 +356,7 @@ func runQuery(
disableCORS bool,
enableAtModifier bool,
enableNegativeOffset bool,
enableStorePushdown bool,
comp component.Component,
) error {
// TODO(bplotka in PR #513 review): Move arguments into struct.
Expand Down Expand Up @@ -567,6 +574,12 @@ func runQuery(
// TODO(bplotka in PR #513 review): pass all flags, not only the flags needed by prefix rewriting.
ui.NewQueryUI(logger, endpoints, webExternalPrefix, webPrefixHeaderName).Register(router, ins)

var pushdownAdapter *pushdown.TimeBasedPushdown

if enableStorePushdown {
pushdownAdapter = pushdown.NewTimeBasedPushdown(endpoints.GetStoreClients, extprom.WrapRegistererWithPrefix("thanos_query_pushdown_", reg))
}

api := v1.NewQueryAPI(
logger,
endpoints,
Expand Down Expand Up @@ -594,6 +607,7 @@ func runQuery(
maxConcurrentQueries,
),
reg,
pushdownAdapter,
)

api.Register(router.WithPrefix("/api/v1"), tracer, logger, ins, logMiddleware)
Expand Down
13 changes: 12 additions & 1 deletion cmd/thanos/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ import (
"github.com/thanos-io/thanos/pkg/model"
"github.com/thanos-io/thanos/pkg/objstore/client"
"github.com/thanos-io/thanos/pkg/prober"
"github.com/thanos-io/thanos/pkg/pushdown"
"github.com/thanos-io/thanos/pkg/runutil"
grpcserver "github.com/thanos-io/thanos/pkg/server/grpc"
httpserver "github.com/thanos-io/thanos/pkg/server/http"
Expand Down Expand Up @@ -71,6 +72,9 @@ type storeConfig struct {
reqLogConfig *extflag.PathOrContent
lazyIndexReaderEnabled bool
lazyIndexReaderIdleTimeout time.Duration

maxSamples int
queryTimeout time.Duration
}

func (sc *storeConfig) registerFlag(cmd extkingpin.FlagClause) {
Expand Down Expand Up @@ -162,6 +166,12 @@ func (sc *storeConfig) registerFlag(cmd extkingpin.FlagClause) {
cmd.Flag("web.disable-cors", "Whether to disable CORS headers to be set by Thanos. By default Thanos sets CORS headers to be allowed by all.").
Default("false").BoolVar(&sc.webConfig.disableCORS)

cmd.Flag("pushdown.query-timeout", "Timeout of a query sent via the QueryAPI.").
Default("120s").DurationVar(&sc.queryTimeout)

cmd.Flag("pushdown.max-samples", "Maximum samples that could be loaded into memory when executing a query via the QueryAPI.").
Default("5000000").IntVar(&sc.maxSamples)

sc.reqLogConfig = extkingpin.RegisterRequestLoggingFlags(cmd)
}

Expand Down Expand Up @@ -382,7 +392,7 @@ func runStore(
cancel()
})
}
// Start query (proxy) gRPC StoreAPI.
// Start the gRPC server with APIs.
{
tlsCfg, err := tls.NewServerConfig(log.With(logger, "protocol", "gRPC"), conf.grpcConfig.tlsSrvCert, conf.grpcConfig.tlsSrvKey, conf.grpcConfig.tlsSrvClientCA)
if err != nil {
Expand All @@ -391,6 +401,7 @@ func runStore(

s := grpcserver.New(logger, reg, tracer, grpcLogOpts, tagOpts, conf.component, grpcProbe,
grpcserver.WithServer(store.RegisterStoreServer(bs)),
grpcserver.WithServer(pushdown.RegisterQueryServer(bs, conf.maxSamples, conf.queryTimeout, logger, extprom.WrapRegistererWithPrefix("thanos_bucket_queryapi_query_", reg))),
grpcserver.WithListen(conf.grpcConfig.bindAddress),
grpcserver.WithGracePeriod(time.Duration(conf.grpcConfig.gracePeriod)),
grpcserver.WithTLSConfig(tlsCfg),
Expand Down
3 changes: 2 additions & 1 deletion docs/components/query.md
Original file line number Diff line number Diff line change
Expand Up @@ -254,7 +254,8 @@ store nodes.
Flags:
--enable-feature= ... Comma separated experimental feature names to
enable.The current list of features is
promql-negative-offset and promql-at-modifier.
promql-negative-offset, store-pushdown, and
promql-at-modifier.
--grpc-address="0.0.0.0:10901"
Listen ip:port address for gRPC endpoints
(StoreAPI). Make sure this address is routable
Expand Down
5 changes: 5 additions & 0 deletions docs/components/store.md
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,11 @@ Flags:
Path to YAML file that contains object store
configuration. See format details:
https://thanos.io/tip/thanos/storage.md/#configuration
--pushdown.max-samples=5000000
Maximum samples that could be loaded into
memory when executing a query via the QueryAPI.
--pushdown.query-timeout=120s
Timeout of a query sent via the QueryAPI.
--request.logging-config=<content>
Alternative to 'request.logging-config-file'
flag (mutually exclusive). Content of YAML file
Expand Down
48 changes: 48 additions & 0 deletions pkg/api/query/v1.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (

cortexutil "github.com/cortexproject/cortex/pkg/util"
"github.com/go-kit/kit/log"
"github.com/go-kit/kit/log/level"
"github.com/opentracing/opentracing-go"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
Expand All @@ -51,6 +52,8 @@ import (
"github.com/thanos-io/thanos/pkg/logging"
"github.com/thanos-io/thanos/pkg/metadata"
"github.com/thanos-io/thanos/pkg/metadata/metadatapb"
"github.com/thanos-io/thanos/pkg/pushdown"
"github.com/thanos-io/thanos/pkg/pushdown/querypb"
"github.com/thanos-io/thanos/pkg/query"
"github.com/thanos-io/thanos/pkg/rules"
"github.com/thanos-io/thanos/pkg/rules/rulespb"
Expand Down Expand Up @@ -101,6 +104,8 @@ type QueryAPI struct {
defaultMetadataTimeRange time.Duration

queryRangeHist prometheus.Histogram

pushdownAdapter *pushdown.TimeBasedPushdown
}

// NewQueryAPI returns an initialized QueryAPI type.
Expand All @@ -127,6 +132,7 @@ func NewQueryAPI(
disableCORS bool,
gate gate.Gate,
reg *prometheus.Registry,
pushdownAdapter *pushdown.TimeBasedPushdown,
) *QueryAPI {
return &QueryAPI{
baseAPI: api.NewBaseAPI(logger, disableCORS, flagsMap),
Expand Down Expand Up @@ -157,6 +163,8 @@ func NewQueryAPI(
Help: "A histogram of the query range window in seconds",
Buckets: prometheus.ExponentialBuckets(15*60, 2, 12),
}),

pushdownAdapter: pushdownAdapter,
}
}

Expand Down Expand Up @@ -379,6 +387,16 @@ func (qapi *QueryAPI) query(r *http.Request) (interface{}, []error, *api.ApiErro
}, res.Warnings, nil
}

// asisJSONMarshaler returns the given string
// as-is when marshaling it into JSON.
type asisJSONMarshaler struct {
data string
}

func (m *asisJSONMarshaler) MarshalJSON() ([]byte, error) {
return []byte(m.data), nil
}

func (qapi *QueryAPI) queryRange(r *http.Request) (interface{}, []error, *api.ApiError) {
start, err := parseTime(r.FormValue("start"))
if err != nil {
Expand Down Expand Up @@ -443,6 +461,36 @@ func (qapi *QueryAPI) queryRange(r *http.Request) (interface{}, []error, *api.Ap
return nil, nil, apiErr
}

startNS := start.UnixNano()
endNS := end.UnixNano()
if qapi.pushdownAdapter != nil {
if node, match := qapi.pushdownAdapter.Match(startNS, endNS); match {
tracing.DoInSpan(ctx, "query_gate_ismyturn", func(ctx context.Context) {
err = qapi.gate.Start(ctx)
})
if err != nil {
return nil, nil, &api.ApiError{Typ: api.ErrorExec, Err: err}
}
defer qapi.gate.Done()

level.Debug(qapi.logger).Log("msg", "pushing down", "query", r.FormValue("query"))

resp, err := node.Query(ctx, &querypb.QueryRequest{
Query: r.FormValue("query"),
StartNs: startNS,
EndNs: endNS,
Interval: int64(step),
ReplicaLabels: replicaLabels,
MaxSourceResolution: maxSourceResolution,
})
if err != nil {
return nil, nil, &api.ApiError{Typ: api.ErrorExec, Err: err}
}

return &asisJSONMarshaler{resp.Response}, nil, nil
}
}

enablePartialResponse, apiErr := qapi.parsePartialResponseParam(r, qapi.enableQueryPartialResponse)
if apiErr != nil {
return nil, nil, apiErr
Expand Down
Loading

0 comments on commit 5cb14f9

Please sign in to comment.