diff --git a/pkg/querier/queryrange/roundtrip.go b/pkg/querier/queryrange/roundtrip.go index 11ccf90446335..64386a40800c2 100644 --- a/pkg/querier/queryrange/roundtrip.go +++ b/pkg/querier/queryrange/roundtrip.go @@ -3,8 +3,6 @@ package queryrange import ( "flag" "net/http" - "net/url" - "strconv" "strings" "github.com/cortexproject/cortex/pkg/chunk/cache" @@ -17,6 +15,7 @@ import ( "github.com/weaveworks/common/httpgrpc" "github.com/weaveworks/common/user" + "github.com/grafana/loki/pkg/loghttp" "github.com/grafana/loki/pkg/logql" ) @@ -55,60 +54,87 @@ func NewTripperware(cfg Config, log log.Logger, limits Limits, registerer promet return func(next http.RoundTripper) http.RoundTripper { metricRT := metricsTripperware(next) logFilterRT := logFilterTripperware(next) - return frontend.RoundTripFunc(func(req *http.Request) (*http.Response, error) { - if !strings.HasSuffix(req.URL.Path, "/query_range") && !strings.HasSuffix(req.URL.Path, "/prom/query") { - return next.RoundTrip(req) - } - params := req.URL.Query() - query := params.Get("query") - expr, err := logql.ParseExpr(query) - if err != nil { - // weavework server uses httpgrpc errors for status code. - return nil, httpgrpc.Errorf(http.StatusBadRequest, err.Error()) - } - - if _, ok := expr.(logql.SampleExpr); ok { - return metricRT.RoundTrip(req) - } - if logSelector, ok := expr.(logql.LogSelectorExpr); ok { - if err := validateLimits(req, params, limits); err != nil { - return nil, err - } - - // backport the old regexp params into the query params - regexp := params.Get("regexp") - if regexp != "" { - logSelector = logql.NewFilterExpr(logSelector, labels.MatchRegexp, regexp) - params.Set("query", logSelector.String()) - req.URL.RawQuery = params.Encode() - } - filter, err := logSelector.Filter() - if err != nil { - return nil, httpgrpc.Errorf(http.StatusBadRequest, err.Error()) - } - if filter != nil { - return logFilterRT.RoundTrip(req) - } - } - return next.RoundTrip(req) - }) + return newRoundTripper(next, logFilterRT, metricRT, limits) }, cache, nil } -// validates log entries limits -func validateLimits(req *http.Request, params url.Values, limits Limits) error { - userID, err := user.ExtractOrgID(req.Context()) +type roundTripper struct { + next, log, metric http.RoundTripper + + limits Limits +} + +// newRoundTripper creates a new queryrange roundtripper +func newRoundTripper(next, log, metric http.RoundTripper, limits Limits) roundTripper { + return roundTripper{ + log: log, + limits: limits, + metric: metric, + next: next, + } +} + +func (r roundTripper) RoundTrip(req *http.Request) (*http.Response, error) { + if !strings.HasSuffix(req.URL.Path, "/query_range") && !strings.HasSuffix(req.URL.Path, "/prom/query") { + return r.next.RoundTrip(req) + } + err := req.ParseForm() if err != nil { - return httpgrpc.Errorf(http.StatusBadRequest, err.Error()) + return nil, httpgrpc.Errorf(http.StatusBadRequest, err.Error()) } + rangeQuery, err := loghttp.ParseRangeQuery(req) + if err != nil { + return nil, httpgrpc.Errorf(http.StatusBadRequest, err.Error()) + } + expr, err := logql.ParseExpr(rangeQuery.Query) + if err != nil { + return nil, httpgrpc.Errorf(http.StatusBadRequest, err.Error()) + } + switch e := expr.(type) { + case logql.SampleExpr: + return r.metric.RoundTrip(req) + case logql.LogSelectorExpr: + filter, err := transformRegexQuery(req, e).Filter() + if err != nil { + return nil, httpgrpc.Errorf(http.StatusBadRequest, err.Error()) + } + if err := validateLimits(req, rangeQuery.Limit, r.limits); err != nil { + return nil, err + } + if filter == nil { + return r.next.RoundTrip(req) + } + return r.log.RoundTrip(req) - reqLimit, err := strconv.Atoi(params.Get("limit")) + default: + return r.next.RoundTrip(req) + } +} + +// transformRegexQuery backport the old regexp params into the v1 query format +func transformRegexQuery(req *http.Request, expr logql.LogSelectorExpr) logql.LogSelectorExpr { + regexp := req.Form.Get("regexp") + if regexp != "" { + expr = logql.NewFilterExpr(expr, labels.MatchRegexp, regexp) + params := req.URL.Query() + params.Set("query", expr.String()) + req.URL.RawQuery = params.Encode() + // force the form and query to be parsed again. + req.Form = nil + req.PostForm = nil + } + return expr +} + +// validates log entries limits +func validateLimits(req *http.Request, reqLimit uint32, limits Limits) error { + userID, err := user.ExtractOrgID(req.Context()) if err != nil { return httpgrpc.Errorf(http.StatusBadRequest, err.Error()) } maxEntriesLimit := limits.MaxEntriesLimitPerQuery(userID) - if reqLimit > maxEntriesLimit && maxEntriesLimit != 0 { + if int(reqLimit) > maxEntriesLimit && maxEntriesLimit != 0 { return httpgrpc.Errorf(http.StatusBadRequest, "max entries limit per query exceeded, limit > max_entries_limit (%d > %d)", reqLimit, maxEntriesLimit) } diff --git a/pkg/querier/queryrange/roundtrip_test.go b/pkg/querier/queryrange/roundtrip_test.go index fe6f15f8d6739..b6a328852924d 100644 --- a/pkg/querier/queryrange/roundtrip_test.go +++ b/pkg/querier/queryrange/roundtrip_test.go @@ -1,15 +1,19 @@ package queryrange import ( + "bytes" "context" + "io/ioutil" "net/http" "net/http/httptest" "net/url" + "strconv" "sync" "testing" "time" "github.com/cortexproject/cortex/pkg/chunk/cache" + "github.com/cortexproject/cortex/pkg/querier/frontend" "github.com/cortexproject/cortex/pkg/querier/queryrange" "github.com/cortexproject/cortex/pkg/util" "github.com/prometheus/prometheus/pkg/labels" @@ -287,6 +291,34 @@ func TestRegexpParamsSupport(t *testing.T) { require.NoError(t, err) } +func TestPostQueries(t *testing.T) { + req, err := http.NewRequest(http.MethodPost, "/loki/api/v1/query_range", nil) + data := url.Values{ + "query": {`{app="foo"} |~ "foo"`}, + } + body := bytes.NewBufferString(data.Encode()) + req.Body = ioutil.NopCloser(body) + req.Header.Add("Content-Type", "application/x-www-form-urlencoded") + req.Header.Add("Content-Length", strconv.Itoa(len(data.Encode()))) + req = req.WithContext(user.InjectOrgID(context.Background(), "1")) + require.NoError(t, err) + _, err = newRoundTripper( + frontend.RoundTripFunc(func(*http.Request) (*http.Response, error) { + t.Error("unexpected default roundtripper called") + return nil, nil + }), + frontend.RoundTripFunc(func(*http.Request) (*http.Response, error) { + return nil, nil + }), + frontend.RoundTripFunc(func(*http.Request) (*http.Response, error) { + t.Error("unexpected metric roundtripper called") + return nil, nil + }), + fakeLimits{}, + ).RoundTrip(req) + require.NoError(t, err) +} + func TestEntriesLimitsTripperware(t *testing.T) { tpw, stopper, err := NewTripperware(testConfig, util.Logger, fakeLimits{maxEntriesLimitPerQuery: 5000}, nil) if stopper != nil {