Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

query-frontend: support forward-header to downstream querier #5220

Merged
merged 1 commit into from
Mar 13, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,10 @@ We use *breaking :warning:* to mark changes that are not backward compatible (re

### Fixed

### Added

- [#5220](https://github.com/thanos-io/thanos/pull/5220) Query Frontend: Add `--query-frontend.forward-header` flag, forward headers to downstream querier.

### Changed

- [#5205](https://github.com/thanos-io/thanos/pull/5205) Rule: Add ruler labels as external labels in stateless ruler mode.
Expand Down
2 changes: 2 additions & 0 deletions cmd/thanos/query_frontend.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,8 @@ func registerQueryFrontend(app *extkingpin.App) {
"If multiple headers match the request, the first matching arg specified will take precedence. "+
"If no headers match 'anonymous' will be used.").PlaceHolder("<http-header-name>").StringsVar(&cfg.orgIdHeaders)

cmd.Flag("query-frontend.forward-header", "List of headers forwarded by the query-frontend to downstream queriers, default is empty").PlaceHolder("<http-header-name>").StringsVar(&cfg.ForwardHeaders)

cmd.Flag("log.request.decision", "Deprecation Warning - This flag would be soon deprecated, and replaced with `request.logging-config`. Request Logging for logging the start and end of requests. By default this flag is disabled. LogFinishCall : Logs the finish call of the requests. LogStartAndFinishCall : Logs the start and finish call of the requests. NoLogCall : Disable request logging.").Default("").EnumVar(&cfg.RequestLoggingDecision, "NoLogCall", "LogFinishCall", "LogStartAndFinishCall", "")
reqLogConfig := extkingpin.RegisterRequestLoggingFlags(cmd)

Expand Down
16 changes: 16 additions & 0 deletions docs/components/query-frontend.md
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,19 @@ Keys which denote a duration are strings that can end with `s` or `m` to indicat

You can find the default values [here](https://github.com/thanos-io/thanos/blob/55cb8ca38b3539381dc6a781e637df15c694e50a/pkg/exthttp/transport.go#L12-L27).

## Forward Headers to Downstream Queriers

`--query-frontend.forward-header` flag provides list of request headers forwarded by query frontend to downstream queriers.

If downstream queriers need basic authentication to access, we can run query-frontend:

```bash
thanos query-frontend \
--http-address "0.0.0.0:9090" \
--query-frontend.forward-header "Authorization"
--query-frontend.downstream-url="<thanos-querier>:<querier-http-port>"
```

## Flags

```$ mdox-exec="thanos query-frontend --help"
Expand Down Expand Up @@ -233,6 +246,9 @@ Flags:
--query-frontend.downstream-url="http://localhost:9090"
URL of downstream Prometheus Query compatible
API.
--query-frontend.forward-header=<http-header-name> ...
List of headers forwarded by the query-frontend
to downstream queriers, default is empty
--query-frontend.log-queries-longer-than=0
Log queries that are slower than the specified
duration. Set to 0 to disable. Set to < 0 to
Expand Down
1 change: 1 addition & 0 deletions pkg/queryfrontend/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -203,6 +203,7 @@ type Config struct {
CacheCompression string
RequestLoggingDecision string
DownstreamURL string
ForwardHeaders []string
}

// QueryRangeConfig holds the config for query range tripperware.
Expand Down
41 changes: 36 additions & 5 deletions pkg/queryfrontend/labels_codec.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ func (c labelsCodec) MergeResponse(responses ...queryrange.Response) (queryrange
}
}

func (c labelsCodec) DecodeRequest(_ context.Context, r *http.Request, _ []string) (queryrange.Request, error) {
func (c labelsCodec) DecodeRequest(_ context.Context, r *http.Request, forwardHeaders []string) (queryrange.Request, error) {
if err := r.ParseForm(); err != nil {
return nil, httpgrpc.Errorf(http.StatusBadRequest, err.Error())
}
Expand All @@ -118,9 +118,9 @@ func (c labelsCodec) DecodeRequest(_ context.Context, r *http.Request, _ []strin
)
switch op := getOperation(r); op {
case labelNamesOp, labelValuesOp:
req, err = c.parseLabelsRequest(r, op)
req, err = c.parseLabelsRequest(r, op, forwardHeaders)
case seriesOp:
req, err = c.parseSeriesRequest(r)
req, err = c.parseSeriesRequest(r, forwardHeaders)
}
if err != nil {
return nil, err
Expand Down Expand Up @@ -167,6 +167,12 @@ func (c labelsCodec) EncodeRequest(ctx context.Context, r queryrange.Request) (*
req.Header.Set("Content-Type", "application/x-www-form-urlencoded")
}

for _, hv := range thanosReq.Headers {
for _, v := range hv.Values {
req.Header.Add(hv.Name, v)
}
}

case *ThanosSeriesRequest:
var params = url.Values{
"start": []string{encodeTime(thanosReq.Start)},
Expand All @@ -187,6 +193,11 @@ func (c labelsCodec) EncodeRequest(ctx context.Context, r queryrange.Request) (*
return nil, httpgrpc.Errorf(http.StatusBadRequest, "error creating request: %s", err.Error())
}
req.Header.Set("Content-Type", "application/x-www-form-urlencoded")
for _, hv := range thanosReq.Headers {
for _, v := range hv.Values {
req.Header.Add(hv.Name, v)
}
}

default:
return nil, httpgrpc.Errorf(http.StatusInternalServerError, "invalid request format")
Expand Down Expand Up @@ -271,7 +282,7 @@ func (c labelsCodec) EncodeResponse(ctx context.Context, res queryrange.Response
return &resp, nil
}

func (c labelsCodec) parseLabelsRequest(r *http.Request, op string) (queryrange.Request, error) {
func (c labelsCodec) parseLabelsRequest(r *http.Request, op string, forwardHeaders []string) (queryrange.Request, error) {
var (
result ThanosLabelsRequest
err error
Expand Down Expand Up @@ -312,10 +323,20 @@ func (c labelsCodec) parseLabelsRequest(r *http.Request, op string) (queryrange.
}
}

// Include the specified headers from http request in prometheusRequest.
for _, header := range forwardHeaders {
for h, hv := range r.Header {
if strings.EqualFold(h, header) {
result.Headers = append(result.Headers, &RequestHeader{Name: h, Values: hv})
break
}
}
}

return &result, nil
}

func (c labelsCodec) parseSeriesRequest(r *http.Request) (queryrange.Request, error) {
func (c labelsCodec) parseSeriesRequest(r *http.Request, forwardHeaders []string) (queryrange.Request, error) {
var (
result ThanosSeriesRequest
err error
Expand Down Expand Up @@ -358,6 +379,16 @@ func (c labelsCodec) parseSeriesRequest(r *http.Request) (queryrange.Request, er
}
}

// Include the specified headers from http request in prometheusRequest.
for _, header := range forwardHeaders {
for h, hv := range r.Header {
if strings.EqualFold(h, header) {
result.Headers = append(result.Headers, &RequestHeader{Name: h, Values: hv})
break
}
}
}

return &result, nil
}

Expand Down
16 changes: 14 additions & 2 deletions pkg/queryfrontend/queryrange_codec.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ func NewThanosQueryRangeCodec(partialResponse bool) *queryRangeCodec {
}
}

func (c queryRangeCodec) DecodeRequest(_ context.Context, r *http.Request, _ []string) (queryrange.Request, error) {
func (c queryRangeCodec) DecodeRequest(_ context.Context, r *http.Request, forwardHeaders []string) (queryrange.Request, error) {
var (
result ThanosQueryRangeRequest
err error
Expand Down Expand Up @@ -126,6 +126,14 @@ func (c queryRangeCodec) DecodeRequest(_ context.Context, r *http.Request, _ []s
}
}

for _, header := range forwardHeaders {
for h, hv := range r.Header {
if strings.EqualFold(h, header) {
result.Headers = append(result.Headers, &RequestHeader{Name: h, Values: hv})
break
}
}
}
return &result, nil
}

Expand Down Expand Up @@ -161,7 +169,11 @@ func (c queryRangeCodec) EncodeRequest(ctx context.Context, r queryrange.Request
return nil, httpgrpc.Errorf(http.StatusBadRequest, "error creating request: %s", err.Error())
}
req.Header.Set("Content-Type", "application/x-www-form-urlencoded")

for _, hv := range thanosReq.Headers {
for _, v := range hv.Values {
req.Header.Add(hv.Name, v)
}
}
return req.WithContext(ctx), nil
}

Expand Down
8 changes: 8 additions & 0 deletions pkg/queryfrontend/request.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,11 @@ type ThanosRequest interface {
GetStoreMatchers() [][]*labels.Matcher
}

type RequestHeader struct {
Name string
Values []string
}

type ThanosQueryRangeRequest struct {
Path string
Start int64
Expand All @@ -33,6 +38,7 @@ type ThanosQueryRangeRequest struct {
ReplicaLabels []string
StoreMatchers [][]*labels.Matcher
CachingOptions queryrange.CachingOptions
Headers []*RequestHeader
}

// GetStart returns the start timestamp of the request in milliseconds.
Expand Down Expand Up @@ -107,6 +113,7 @@ type ThanosLabelsRequest struct {
StoreMatchers [][]*labels.Matcher
PartialResponse bool
CachingOptions queryrange.CachingOptions
Headers []*RequestHeader
}

// GetStart returns the start timestamp of the request in milliseconds.
Expand Down Expand Up @@ -178,6 +185,7 @@ type ThanosSeriesRequest struct {
Matchers [][]*labels.Matcher
StoreMatchers [][]*labels.Matcher
CachingOptions queryrange.CachingOptions
Headers []*RequestHeader
}

// GetStart returns the start timestamp of the request in milliseconds.
Expand Down
10 changes: 6 additions & 4 deletions pkg/queryfrontend/roundtrip.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,13 +51,13 @@ func NewTripperware(config Config, reg prometheus.Registerer, logger log.Logger)
labelsCodec := NewThanosLabelsCodec(config.LabelsConfig.PartialResponseStrategy, config.DefaultTimeRange)

queryRangeTripperware, err := newQueryRangeTripperware(config.QueryRangeConfig, queryRangeLimits, queryRangeCodec,
prometheus.WrapRegistererWith(prometheus.Labels{"tripperware": "query_range"}, reg), logger)
prometheus.WrapRegistererWith(prometheus.Labels{"tripperware": "query_range"}, reg), logger, config.ForwardHeaders)
if err != nil {
return nil, err
}

labelsTripperware, err := newLabelsTripperware(config.LabelsConfig, labelsLimits, labelsCodec,
prometheus.WrapRegistererWith(prometheus.Labels{"tripperware": "labels"}, reg), logger)
prometheus.WrapRegistererWith(prometheus.Labels{"tripperware": "labels"}, reg), logger, config.ForwardHeaders)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -138,6 +138,7 @@ func newQueryRangeTripperware(
codec *queryRangeCodec,
reg prometheus.Registerer,
logger log.Logger,
forwardHeaders []string,
) (queryrange.Tripperware, error) {
queryRangeMiddleware := []queryrange.Middleware{queryrange.NewLimitsMiddleware(limits)}
m := queryrange.NewInstrumentMiddlewareMetrics(reg)
Expand Down Expand Up @@ -203,7 +204,7 @@ func newQueryRangeTripperware(
}

return func(next http.RoundTripper) http.RoundTripper {
rt := queryrange.NewRoundTripper(next, codec, nil, queryRangeMiddleware...)
rt := queryrange.NewRoundTripper(next, codec, forwardHeaders, queryRangeMiddleware...)
return queryrange.RoundTripFunc(func(r *http.Request) (*http.Response, error) {
return rt.RoundTrip(r)
})
Expand All @@ -218,6 +219,7 @@ func newLabelsTripperware(
codec *labelsCodec,
reg prometheus.Registerer,
logger log.Logger,
forwardHeaders []string,
) (queryrange.Tripperware, error) {
labelsMiddleware := []queryrange.Middleware{}
m := queryrange.NewInstrumentMiddlewareMetrics(reg)
Expand Down Expand Up @@ -265,7 +267,7 @@ func newLabelsTripperware(
)
}
return func(next http.RoundTripper) http.RoundTripper {
rt := queryrange.NewRoundTripper(next, codec, nil, labelsMiddleware...)
rt := queryrange.NewRoundTripper(next, codec, forwardHeaders, labelsMiddleware...)
return queryrange.RoundTripFunc(func(r *http.Request) (*http.Response, error) {
return rt.RoundTrip(r)
})
Expand Down