From 2a7590f69a5ace6014ddcc932788056935f201ba Mon Sep 17 00:00:00 2001 From: Ben Ye Date: Tue, 20 Oct 2020 08:10:33 -0400 Subject: [PATCH] Cache labels and series results (#3315) * cache labels and series results Signed-off-by: Ben Ye * add changelog Signed-off-by: Ben Ye * fix style Signed-off-by: Ben Ye * update changelog Signed-off-by: Ben Ye * rebase Signed-off-by: Ben Ye --- CHANGELOG.md | 3 +- cmd/thanos/query_frontend.go | 17 + docs/components/query-frontend.md | 8 + go.mod | 2 +- go.sum | 20 +- .../{cache_splitter.go => cache.go} | 7 +- .../{cache_splitter_test.go => cache_test.go} | 0 pkg/queryfrontend/config.go | 13 +- pkg/queryfrontend/labels_codec.go | 6 + pkg/queryfrontend/request.go | 10 +- pkg/queryfrontend/response.go | 46 +++ pkg/queryfrontend/response.pb.go | 376 ++++++++++++++++-- pkg/queryfrontend/response.proto | 19 +- pkg/queryfrontend/roundtrip.go | 32 +- pkg/queryfrontend/roundtrip_test.go | 190 ++++++++- 15 files changed, 696 insertions(+), 53 deletions(-) rename pkg/queryfrontend/{cache_splitter.go => cache.go} (83%) rename pkg/queryfrontend/{cache_splitter_test.go => cache_test.go} (100%) create mode 100644 pkg/queryfrontend/response.go diff --git a/CHANGELOG.md b/CHANGELOG.md index 6ce02f9fac..ad6b46a74e 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -12,7 +12,8 @@ We use *breaking :warning:* to mark changes that are not backward compatible (re ## Unreleased - [#3259](https://github.com/thanos-io/thanos/pull/3259) Thanos BlockViewer: Added a button in the blockviewer that allows users to download the metadata of a block. - [#3261](https://github.com/thanos-io/thanos/pull/3261) Thanos Store: Use segment files specified in meta.json file, if present. If not present, Store does the LIST operation as before. -- [#3276](https://github.com/thanos-io/thanos/pull/3276) Query Frontend: Support query splitting and retry for labels and series requests. +- [#3276](https://github.com/thanos-io/thanos/pull/3276) Query Frontend: Support query splitting and retry for label names, label values and series requests. +- [#3315](https://github.com/thanos-io/thanos/pull/3315) Query Frontend: Support results caching for label names, label values and series requests. ### Fixed - [#3257](https://github.com/thanos-io/thanos/pull/3257) Ruler: Prevent Ruler from crashing when using default DNS to lookup hosts that results in "No such hosts" errors. diff --git a/cmd/thanos/query_frontend.go b/cmd/thanos/query_frontend.go index 09f09aa165..9286035b6e 100644 --- a/cmd/thanos/query_frontend.go +++ b/cmd/thanos/query_frontend.go @@ -96,6 +96,8 @@ func registerQueryFrontend(app *extkingpin.App) { cmd.Flag("labels.default-time-range", "The default metadata time range duration for retrieving labels through Labels and Series API when the range parameters are not specified."). Default("24h").DurationVar(&cfg.DefaultTimeRange) + cfg.LabelsConfig.CachePathOrContent = *extflag.RegisterPathOrContent(cmd, "labels.response-cache-config", "YAML file that contains response cache configuration.", false) + cmd.Flag("cache-compression-type", "Use compression in results cache. Supported values are: 'snappy' and '' (disable compression)."). Default("").StringVar(&cfg.CacheCompression) @@ -138,6 +140,21 @@ func runQueryFrontend( } } + labelsCacheConfContentYaml, err := cfg.LabelsConfig.CachePathOrContent.Content() + if err != nil { + return err + } + if len(labelsCacheConfContentYaml) > 0 { + cacheConfig, err := queryfrontend.NewCacheConfig(logger, queryRangeCacheConfContentYaml) + if err != nil { + return errors.Wrap(err, "initializing the labels cache config") + } + cfg.LabelsConfig.ResultsCacheConfig = &queryrange.ResultsCacheConfig{ + Compression: cfg.CacheCompression, + CacheConfig: *cacheConfig, + } + } + if err := cfg.Validate(); err != nil { return errors.Wrap(err, "error validating the config") } diff --git a/docs/components/query-frontend.md b/docs/components/query-frontend.md index d96a25951c..f8bd6d0a81 100644 --- a/docs/components/query-frontend.md +++ b/docs/components/query-frontend.md @@ -181,6 +181,14 @@ Flags: The default metadata time range duration for retrieving labels through Labels and Series API when the range parameters are not specified. + --labels.response-cache-config-file= + Path to YAML file that contains response cache + configuration. + --labels.response-cache-config= + Alternative to + 'labels.response-cache-config-file' flag (lower + priority). Content of YAML file that contains + response cache configuration. --cache-compression-type="" Use compression in results cache. Supported values are: 'snappy' and ” (disable diff --git a/go.mod b/go.mod index 78c2d54090..10b01a1e5e 100644 --- a/go.mod +++ b/go.mod @@ -13,7 +13,7 @@ require ( github.com/cespare/xxhash v1.1.0 github.com/chromedp/cdproto v0.0.0-20200424080200-0de008e41fa0 github.com/chromedp/chromedp v0.5.3 - github.com/cortexproject/cortex v1.3.1-0.20200923145333-8587ea61fe17 + github.com/cortexproject/cortex v1.4.1-0.20201013144911-21bad57b346c github.com/davecgh/go-spew v1.1.1 github.com/facette/natsort v0.0.0-20181210072756-2cd4dd1e2dcb github.com/fatih/structtag v1.1.0 diff --git a/go.sum b/go.sum index 60b7ed8799..431722125b 100644 --- a/go.sum +++ b/go.sum @@ -206,8 +206,9 @@ github.com/coreos/pkg v0.0.0-20180928190104-399ea9e2e55f h1:lBNOc5arjvs8E5mO2tbp github.com/coreos/pkg v0.0.0-20180928190104-399ea9e2e55f/go.mod h1:E3G3o1h8I7cfcXa63jLwjI0eiQQMgzzUDFVpN/nH/eA= github.com/cortexproject/cortex v0.6.1-0.20200228110116-92ab6cbe0995/go.mod h1:3Xa3DjJxtpXqxcMGdk850lcIRb81M0fyY1MQ6udY134= github.com/cortexproject/cortex v1.2.1-0.20200805064754-d8edc95e2c91/go.mod h1:PVPxNLrxKH+yc8asaJOxuz7TiRmMizFfnSMOnRzM6oM= -github.com/cortexproject/cortex v1.3.1-0.20200923145333-8587ea61fe17 h1:69LF7OuwaAS/h3GGJUBcI1Y9ZAFEcUpBZSpbHLK1eyc= -github.com/cortexproject/cortex v1.3.1-0.20200923145333-8587ea61fe17/go.mod h1:dJ9gpW7dzQ7z09cKtNN9PfebumgyO4dtNdFQ6eQEed0= +github.com/cortexproject/cortex v1.3.1-0.20200901115931-255ff3306960/go.mod h1:ub8BpRZrRa02BOM8NJTnI2YklxW/mGhEkJDrhsDfcfg= +github.com/cortexproject/cortex v1.4.1-0.20201013144911-21bad57b346c h1:4Qcgdgk3cFAFKufkcRQBn82ngaij97+M+C7jBifPt/A= +github.com/cortexproject/cortex v1.4.1-0.20201013144911-21bad57b346c/go.mod h1:bDhuzYyoL2kC0taY6nMdfv13kKYlUVoiQuy6QnkAIZw= github.com/cpuguy83/go-md2man/v2 v2.0.0-20190314233015-f79a8a8ca69d/go.mod h1:maD7wRr/U5Z6m/iR4s+kqSMx2CaBsrgA7czyZG/E6dU= github.com/creack/pty v1.1.7/go.mod h1:lj5s0c3V2DBrqTV7llrYr5NG6My20zk30Fl46Y7DoTY= github.com/cznic/b v0.0.0-20180115125044-35e9bbe41f07/go.mod h1:URriBxXwVq5ijiJ12C7iIZqlA69nTlI+LgI6/pwftG8= @@ -375,8 +376,8 @@ github.com/go-openapi/validate v0.19.2/go.mod h1:1tRCw7m3jtI8eNWEEliiAqUIcBztB2K github.com/go-openapi/validate v0.19.3/go.mod h1:90Vh6jjkTn+OT1Eefm0ZixWNFjhtOH7vS9k0lo6zwJo= github.com/go-openapi/validate v0.19.8 h1:YFzsdWIDfVuLvIOF+ZmKjVg1MbPJ1QgY9PihMwei1ys= github.com/go-openapi/validate v0.19.8/go.mod h1:8DJv2CVJQ6kGNpFW6eV9N3JviE1C85nY1c2z52x1Gk4= -github.com/go-redis/redis/v8 v8.0.0-beta.10.0.20200905143926-df7fe4e2ce72 h1:HJkWCywZsCtt//EFYNtHAOQglik0kzonhiilQCrQEgs= -github.com/go-redis/redis/v8 v8.0.0-beta.10.0.20200905143926-df7fe4e2ce72/go.mod h1:CJP1ZIHwhosNYwIdaHPZK9vHsM3+roNBaZ7U9Of1DXc= +github.com/go-redis/redis/v8 v8.2.3 h1:eNesND+DWt/sjQOtPFxAbQkTIXaXX00qNLxjVWkZ70k= +github.com/go-redis/redis/v8 v8.2.3/go.mod h1:ysgGY09J/QeDYbu3HikWEIPCwaeOkuNoTgKayTEaEOw= github.com/go-sql-driver/mysql v1.4.0/go.mod h1:zAC/RDZ24gD3HViQzih4MyKcchzm+sOG5ZlKdlhCg5w= github.com/go-sql-driver/mysql v1.4.1/go.mod h1:zAC/RDZ24gD3HViQzih4MyKcchzm+sOG5ZlKdlhCg5w= github.com/go-sql-driver/mysql v1.5.0/go.mod h1:DCzpHaOWr8IXmIStZouvnhqoel9Qv2LBy8hT2VhHyBg= @@ -809,14 +810,15 @@ github.com/onsi/ginkgo v1.10.1/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+ github.com/onsi/ginkgo v1.10.3/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE= github.com/onsi/ginkgo v1.11.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE= github.com/onsi/ginkgo v1.12.1/go.mod h1:zj2OWP4+oCPe1qIXoGWkgMRwljMUYCdkwsT2108oapk= -github.com/onsi/ginkgo v1.14.0 h1:2mOpI4JVVPBN+WQRa0WKH2eXR+Ey+uK4n7Zj0aYpIQA= -github.com/onsi/ginkgo v1.14.0/go.mod h1:iSB4RoI2tjJc9BBv4NKIKWKya62Rps+oPG/Lv9klQyY= +github.com/onsi/ginkgo v1.14.1 h1:jMU0WaQrP0a/YAEq8eJmJKjBoMs+pClEr1vDMlM/Do4= +github.com/onsi/ginkgo v1.14.1/go.mod h1:iSB4RoI2tjJc9BBv4NKIKWKya62Rps+oPG/Lv9klQyY= github.com/onsi/gomega v1.4.2/go.mod h1:ex+gbHU/CVuBBDIJjb2X0qEXbFg53c61hWP/1CpauHY= github.com/onsi/gomega v1.4.3/go.mod h1:ex+gbHU/CVuBBDIJjb2X0qEXbFg53c61hWP/1CpauHY= github.com/onsi/gomega v1.7.0/go.mod h1:ex+gbHU/CVuBBDIJjb2X0qEXbFg53c61hWP/1CpauHY= github.com/onsi/gomega v1.7.1/go.mod h1:XdKZgCCFLUoM/7CFJVPcG8C1xQ1AJ0vpAezJrB7JYyY= -github.com/onsi/gomega v1.10.1 h1:o0+MgICZLuZ7xjH7Vx6zS/zcu93/BEp1VwkIW1mEXCE= github.com/onsi/gomega v1.10.1/go.mod h1:iN09h71vgCQne3DLsj+A5owkum+a2tYe+TOCB1ybHNo= +github.com/onsi/gomega v1.10.2 h1:aY/nuoWlKJud2J6U0E3NWsjlg+0GtwXxgEqthRdzlcs= +github.com/onsi/gomega v1.10.2/go.mod h1:iN09h71vgCQne3DLsj+A5owkum+a2tYe+TOCB1ybHNo= github.com/op/go-logging v0.0.0-20160315200505-970db520ece7/go.mod h1:HzydrMdWErDVzsI23lYNej1Htcns9BCg93Dk0bBINWk= github.com/opencontainers/go-digest v1.0.0-rc1/go.mod h1:cMLVZDEM3+U2I4VmLI6N8jQYUd2OVphdqWwCJHrFt2s= github.com/opencontainers/go-digest v1.0.0 h1:apOUWs51W5PlhuyGyz9FCeeBIOUDA/6nW8Oi/yOhh5U= @@ -1002,6 +1004,7 @@ github.com/stretchr/testify v1.6.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/ github.com/thanos-io/thanos v0.8.1-0.20200109203923-552ffa4c1a0d/go.mod h1:usT/TxtJQ7DzinTt+G9kinDQmRS5sxwu0unVKZ9vdcw= github.com/thanos-io/thanos v0.13.1-0.20200731083140-69b87607decf/go.mod h1:G8caR6G7pSDreRDvFm9wFuyjEBztmr8Ag3kBYpa/fEc= github.com/thanos-io/thanos v0.13.1-0.20200807203500-9b578afb4763/go.mod h1:KyW0a93tsh7v4hXAwo2CVAIRYuZT1Kkf4e04gisQjAg= +github.com/thanos-io/thanos v0.13.1-0.20200923175059-57035bf8f843/go.mod h1:U7HVxfAHYptOk9xCuxr8WoILGL1wWdXVqZD3t6JifNA= github.com/themihai/gomemcache v0.0.0-20180902122335-24332e2d58ab h1:7ZR3hmisBWw77ZpO1/o86g+JV3VKlk3d48jopJxzTjU= github.com/themihai/gomemcache v0.0.0-20180902122335-24332e2d58ab/go.mod h1:eheTFp954zcWZXCU8d0AT76ftsQOTo4DTqkN/h3k1MY= github.com/tidwall/pretty v0.0.0-20180105212114-65a9db5fad51/go.mod h1:XNkn88O1ChpSDQmQeStsy+sBenx6DDtFZJxhVysOjyk= @@ -1025,6 +1028,7 @@ github.com/urfave/cli v1.22.1/go.mod h1:Gos4lmkARVdJ6EkW0WaNv/tZAAMe9V7XWyB60NtX github.com/vektah/gqlparser v1.1.2/go.mod h1:1ycwN7Ij5njmMkPPAOaRFY4rET2Enx7IkVv3vaXspKw= github.com/weaveworks/common v0.0.0-20200206153930-760e36ae819a/go.mod h1:6enWAqfQBFrE8X/XdJwZr8IKgh1chStuFR0mjU/UOUw= github.com/weaveworks/common v0.0.0-20200625145055-4b1847531bc9/go.mod h1:c98fKi5B9u8OsKGiWHLRKus6ToQ1Tubeow44ECO1uxY= +github.com/weaveworks/common v0.0.0-20200820123129-280614068c5e/go.mod h1:hz10LOsAdzC3K/iXaKoFxOKTDRgxJl+BTGX1GY+TzO4= github.com/weaveworks/common v0.0.0-20200914083218-61ffdd448099 h1:MS5M2antM8wzMUqVxIfAi+yb6yjXvDINRFvLnmNXeIw= github.com/weaveworks/common v0.0.0-20200914083218-61ffdd448099/go.mod h1:hz10LOsAdzC3K/iXaKoFxOKTDRgxJl+BTGX1GY+TzO4= github.com/weaveworks/promrus v1.2.0 h1:jOLf6pe6/vss4qGHjXmGz4oDJQA+AOCqEL3FvvZGz7M= @@ -1135,8 +1139,6 @@ golang.org/x/exp v0.0.0-20191227195350-da58074b4299/go.mod h1:2RIsYlXP63K8oxa1u0 golang.org/x/exp v0.0.0-20200119233911-0405dc783f0a/go.mod h1:2RIsYlXP63K8oxa1u096TMicItID8zy7Y6sNkU49FU4= golang.org/x/exp v0.0.0-20200207192155-f17229e696bd/go.mod h1:J/WKrq2StrnmMY6+EHIKF9dgMWnmCNThgcyBT1FY9mM= golang.org/x/exp v0.0.0-20200224162631-6cc2880d07d6/go.mod h1:3jZMyOhIsHpP37uCMkUooju7aAi5cS1Q23tOzKc+0MU= -golang.org/x/exp v0.0.0-20200821190819-94841d0725da h1:vfV2BR+q1+/jmgJR30Ms3RHbryruQ3Yd83lLAAue9cs= -golang.org/x/exp v0.0.0-20200821190819-94841d0725da/go.mod h1:3jZMyOhIsHpP37uCMkUooju7aAi5cS1Q23tOzKc+0MU= golang.org/x/image v0.0.0-20180708004352-c73c2afc3b81/go.mod h1:ux5Hcp/YLpHSI86hEcLt0YII63i6oz57MZXIpbrjZUs= golang.org/x/image v0.0.0-20190227222117-0694c2d4d067/go.mod h1:kZ7UVZpmo3dzQBMxlp+ypCbDeSB+sBbTgSJuh5dn5js= golang.org/x/image v0.0.0-20190802002840-cff245a6509b/go.mod h1:FeLwcggjj3mMvU+oOTbSwawSJRM1uh48EjtB4UJZlP0= diff --git a/pkg/queryfrontend/cache_splitter.go b/pkg/queryfrontend/cache.go similarity index 83% rename from pkg/queryfrontend/cache_splitter.go rename to pkg/queryfrontend/cache.go index 4da5074a4e..8ed069e71a 100644 --- a/pkg/queryfrontend/cache_splitter.go +++ b/pkg/queryfrontend/cache.go @@ -29,11 +29,16 @@ func newThanosCacheKeyGenerator(interval time.Duration) thanosCacheKeyGenerator // GenerateCacheKey generates a cache key based on the Request and interval. func (t thanosCacheKeyGenerator) GenerateCacheKey(_ string, r queryrange.Request) string { currentInterval := r.GetStart() / t.interval.Milliseconds() - if tr, ok := r.(*ThanosQueryRangeRequest); ok { + switch tr := r.(type) { + case *ThanosQueryRangeRequest: i := 0 for ; i < len(t.resolutions) && t.resolutions[i] > tr.MaxSourceResolution; i++ { } return fmt.Sprintf("%s:%d:%d:%d", tr.Query, tr.Step, currentInterval, i) + case *ThanosLabelsRequest: + return fmt.Sprintf("%s:%d", tr.Label, currentInterval) + case *ThanosSeriesRequest: + return fmt.Sprintf("%s:%d", tr.Matchers, currentInterval) } return fmt.Sprintf("%s:%d:%d", r.GetQuery(), r.GetStep(), currentInterval) } diff --git a/pkg/queryfrontend/cache_splitter_test.go b/pkg/queryfrontend/cache_test.go similarity index 100% rename from pkg/queryfrontend/cache_splitter_test.go rename to pkg/queryfrontend/cache_test.go diff --git a/pkg/queryfrontend/config.go b/pkg/queryfrontend/config.go index c3e4fbf376..1f5cdd3264 100644 --- a/pkg/queryfrontend/config.go +++ b/pkg/queryfrontend/config.go @@ -164,10 +164,19 @@ type LabelsConfig struct { func (cfg *Config) Validate() error { if cfg.QueryRangeConfig.ResultsCacheConfig != nil { if cfg.QueryRangeConfig.SplitQueriesByInterval <= 0 { - return errors.New("split queries interval should be greater than 0") + return errors.New("split queries interval should be greater than 0 when caching is enabled") } if err := cfg.QueryRangeConfig.ResultsCacheConfig.Validate(); err != nil { - return errors.Wrap(err, "invalid ResultsCache config") + return errors.Wrap(err, "invalid ResultsCache config for query_range tripperware") + } + } + + if cfg.LabelsConfig.ResultsCacheConfig != nil { + if cfg.LabelsConfig.SplitQueriesByInterval <= 0 { + return errors.New("split queries interval should be greater than 0 when caching is enabled") + } + if err := cfg.LabelsConfig.ResultsCacheConfig.Validate(); err != nil { + return errors.Wrap(err, "invalid ResultsCache config for labels tripperware") } } diff --git a/pkg/queryfrontend/labels_codec.go b/pkg/queryfrontend/labels_codec.go index c9101390dd..6b2f5c4481 100644 --- a/pkg/queryfrontend/labels_codec.go +++ b/pkg/queryfrontend/labels_codec.go @@ -204,12 +204,18 @@ func (c labelsCodec) DecodeResponse(ctx context.Context, r *http.Response, req q if err := json.Unmarshal(buf, &resp); err != nil { return nil, httpgrpc.Errorf(http.StatusInternalServerError, "error decoding response: %v", err) } + for h, hv := range r.Header { + resp.Headers = append(resp.Headers, &ResponseHeader{Name: h, Values: hv}) + } return &resp, nil case *ThanosSeriesRequest: var resp ThanosSeriesResponse if err := json.Unmarshal(buf, &resp); err != nil { return nil, httpgrpc.Errorf(http.StatusInternalServerError, "error decoding response: %v", err) } + for h, hv := range r.Header { + resp.Headers = append(resp.Headers, &ResponseHeader{Name: h, Values: hv}) + } return &resp, nil default: return nil, httpgrpc.Errorf(http.StatusInternalServerError, "invalid request type") diff --git a/pkg/queryfrontend/request.go b/pkg/queryfrontend/request.go index 2074815ac6..564d3e631f 100644 --- a/pkg/queryfrontend/request.go +++ b/pkg/queryfrontend/request.go @@ -114,8 +114,9 @@ func (r *ThanosLabelsRequest) GetStart() int64 { return r.Start } // GetEnd returns the end timestamp of the request in milliseconds. func (r *ThanosLabelsRequest) GetEnd() int64 { return r.End } -// GetStep returns the step of the request in milliseconds. -func (r *ThanosLabelsRequest) GetStep() int64 { return 0 } +// GetStep returns the step of the request in milliseconds. Returns 1 is a trick to avoid panic in +// https://github.com/cortexproject/cortex/blob/master/pkg/querier/queryrange/results_cache.go#L447. +func (r *ThanosLabelsRequest) GetStep() int64 { return 1 } // GetQuery returns the query of the request. func (r *ThanosLabelsRequest) GetQuery() string { return "" } @@ -183,8 +184,9 @@ func (r *ThanosSeriesRequest) GetStart() int64 { return r.Start } // GetEnd returns the end timestamp of the request in milliseconds. func (r *ThanosSeriesRequest) GetEnd() int64 { return r.End } -// GetStep returns the step of the request in milliseconds. -func (r *ThanosSeriesRequest) GetStep() int64 { return 0 } +// GetStep returns the step of the request in milliseconds. Returns 1 is a trick to avoid panic in +// https://github.com/cortexproject/cortex/blob/master/pkg/querier/queryrange/results_cache.go#L447. +func (r *ThanosSeriesRequest) GetStep() int64 { return 1 } // GetQuery returns the query of the request. func (r *ThanosSeriesRequest) GetQuery() string { return "" } diff --git a/pkg/queryfrontend/response.go b/pkg/queryfrontend/response.go new file mode 100644 index 0000000000..4eb754c68a --- /dev/null +++ b/pkg/queryfrontend/response.go @@ -0,0 +1,46 @@ +// Copyright (c) The Thanos Authors. +// Licensed under the Apache License 2.0. + +package queryfrontend + +import ( + "unsafe" + + "github.com/cortexproject/cortex/pkg/querier/queryrange" +) + +// ThanosResponseExtractor helps extracting specific info from Query Response. +type ThanosResponseExtractor struct{} + +// Extract extracts response for specific a range from a response. +// This interface is not used for labels and series responses. +func (ThanosResponseExtractor) Extract(_, _ int64, resp queryrange.Response) queryrange.Response { + return resp +} + +// ResponseWithoutHeaders returns the response without HTTP headers. +func (ThanosResponseExtractor) ResponseWithoutHeaders(resp queryrange.Response) queryrange.Response { + switch tr := resp.(type) { + case *ThanosLabelsResponse: + return &ThanosLabelsResponse{Status: queryrange.StatusSuccess, Data: tr.Data} + case *ThanosSeriesResponse: + return &ThanosSeriesResponse{Status: queryrange.StatusSuccess, Data: tr.Data} + } + return resp +} + +// headersToQueryRangeHeaders convert slice of ResponseHeader to Cortex queryrange.PrometheusResponseHeader in an +// unsafe manner. It reuses the same memory. +func headersToQueryRangeHeaders(headers []*ResponseHeader) []*queryrange.PrometheusResponseHeader { + return *(*[]*queryrange.PrometheusResponseHeader)(unsafe.Pointer(&headers)) +} + +// GetHeaders returns the HTTP headers in the response. +func (m *ThanosLabelsResponse) GetHeaders() []*queryrange.PrometheusResponseHeader { + return headersToQueryRangeHeaders(m.Headers) +} + +// GetHeaders returns the HTTP headers in the response. +func (m *ThanosSeriesResponse) GetHeaders() []*queryrange.PrometheusResponseHeader { + return headersToQueryRangeHeaders(m.Headers) +} diff --git a/pkg/queryfrontend/response.pb.go b/pkg/queryfrontend/response.pb.go index 3816c31872..861e144053 100644 --- a/pkg/queryfrontend/response.pb.go +++ b/pkg/queryfrontend/response.pb.go @@ -26,10 +26,11 @@ var _ = math.Inf const _ = proto.GoGoProtoPackageIsVersion3 // please upgrade the proto package type ThanosLabelsResponse struct { - Status string `protobuf:"bytes,1,opt,name=Status,proto3" json:"status"` - Data []string `protobuf:"bytes,2,rep,name=Data,proto3" json:"data"` - ErrorType string `protobuf:"bytes,3,opt,name=ErrorType,proto3" json:"errorType,omitempty"` - Error string `protobuf:"bytes,4,opt,name=Error,proto3" json:"error,omitempty"` + Status string `protobuf:"bytes,1,opt,name=Status,proto3" json:"status"` + Data []string `protobuf:"bytes,2,rep,name=Data,proto3" json:"data"` + ErrorType string `protobuf:"bytes,3,opt,name=ErrorType,proto3" json:"errorType,omitempty"` + Error string `protobuf:"bytes,4,opt,name=Error,proto3" json:"error,omitempty"` + Headers []*ResponseHeader `protobuf:"bytes,5,rep,name=Headers,proto3" json:"-"` } func (m *ThanosLabelsResponse) Reset() { *m = ThanosLabelsResponse{} } @@ -71,6 +72,7 @@ type ThanosSeriesResponse struct { Data []labelpb.LabelSet `protobuf:"bytes,2,rep,name=Data,proto3" json:"data"` ErrorType string `protobuf:"bytes,3,opt,name=ErrorType,proto3" json:"errorType,omitempty"` Error string `protobuf:"bytes,4,opt,name=Error,proto3" json:"error,omitempty"` + Headers []*ResponseHeader `protobuf:"bytes,5,rep,name=Headers,proto3" json:"-"` } func (m *ThanosSeriesResponse) Reset() { *m = ThanosSeriesResponse{} } @@ -106,34 +108,77 @@ func (m *ThanosSeriesResponse) XXX_DiscardUnknown() { var xxx_messageInfo_ThanosSeriesResponse proto.InternalMessageInfo +type ResponseHeader struct { + Name string `protobuf:"bytes,1,opt,name=Name,proto3" json:"-"` + Values []string `protobuf:"bytes,2,rep,name=Values,proto3" json:"-"` +} + +func (m *ResponseHeader) Reset() { *m = ResponseHeader{} } +func (m *ResponseHeader) String() string { return proto.CompactTextString(m) } +func (*ResponseHeader) ProtoMessage() {} +func (*ResponseHeader) Descriptor() ([]byte, []int) { + return fileDescriptor_b882fa7024d92f38, []int{2} +} +func (m *ResponseHeader) XXX_Unmarshal(b []byte) error { + return m.Unmarshal(b) +} +func (m *ResponseHeader) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { + if deterministic { + return xxx_messageInfo_ResponseHeader.Marshal(b, m, deterministic) + } else { + b = b[:cap(b)] + n, err := m.MarshalToSizedBuffer(b) + if err != nil { + return nil, err + } + return b[:n], nil + } +} +func (m *ResponseHeader) XXX_Merge(src proto.Message) { + xxx_messageInfo_ResponseHeader.Merge(m, src) +} +func (m *ResponseHeader) XXX_Size() int { + return m.Size() +} +func (m *ResponseHeader) XXX_DiscardUnknown() { + xxx_messageInfo_ResponseHeader.DiscardUnknown(m) +} + +var xxx_messageInfo_ResponseHeader proto.InternalMessageInfo + func init() { proto.RegisterType((*ThanosLabelsResponse)(nil), "queryfrontend.ThanosLabelsResponse") proto.RegisterType((*ThanosSeriesResponse)(nil), "queryfrontend.ThanosSeriesResponse") + proto.RegisterType((*ResponseHeader)(nil), "queryfrontend.ResponseHeader") } func init() { proto.RegisterFile("queryfrontend/response.proto", fileDescriptor_b882fa7024d92f38) } var fileDescriptor_b882fa7024d92f38 = []byte{ - // 299 bytes of a gzipped FileDescriptorProto - 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xbc, 0x91, 0x31, 0x4e, 0xc3, 0x30, - 0x14, 0x86, 0x63, 0x5a, 0x2a, 0x6a, 0x40, 0xa0, 0xb4, 0x12, 0xa1, 0xaa, 0x9c, 0xaa, 0x53, 0x91, - 0x50, 0x22, 0x15, 0x71, 0x81, 0x08, 0x36, 0xa6, 0xa4, 0x17, 0x70, 0xd4, 0x47, 0xa9, 0xd4, 0xc6, - 0xc6, 0x7e, 0x1d, 0x72, 0x0b, 0xce, 0xc3, 0x09, 0x32, 0x66, 0x64, 0x8a, 0x20, 0xd9, 0x72, 0x0a, - 0x54, 0xa7, 0x40, 0x19, 0x59, 0xd8, 0xec, 0xff, 0xfb, 0xdf, 0xaf, 0xdf, 0x7e, 0x74, 0xf8, 0xbc, - 0x01, 0x95, 0x3e, 0x2a, 0x91, 0x20, 0x24, 0x73, 0x5f, 0x81, 0x96, 0x22, 0xd1, 0xe0, 0x49, 0x25, - 0x50, 0xd8, 0xa7, 0xbf, 0xe8, 0xa0, 0xbf, 0x10, 0x0b, 0x61, 0x88, 0xbf, 0x3d, 0x35, 0xa6, 0xc1, - 0xa5, 0x46, 0xa1, 0xc0, 0x5f, 0xf1, 0x18, 0x56, 0x32, 0xf6, 0x31, 0x95, 0xa0, 0x1b, 0x34, 0x7e, - 0x25, 0xb4, 0x3f, 0x7b, 0xe2, 0x89, 0xd0, 0x0f, 0x5b, 0xaa, 0xc3, 0x5d, 0xbc, 0x3d, 0xa6, 0x9d, - 0x08, 0x39, 0x6e, 0xb4, 0x43, 0x46, 0x64, 0xd2, 0x0d, 0x68, 0x5d, 0xb8, 0x1d, 0x6d, 0x94, 0x70, - 0x47, 0xec, 0x21, 0x6d, 0xdf, 0x71, 0xe4, 0xce, 0xc1, 0xa8, 0x35, 0xe9, 0x06, 0x47, 0x75, 0xe1, - 0xb6, 0xe7, 0x1c, 0x79, 0x68, 0x54, 0xfb, 0x96, 0x76, 0xef, 0x95, 0x12, 0x6a, 0x96, 0x4a, 0x70, - 0x5a, 0x26, 0xe4, 0xa2, 0x2e, 0xdc, 0x1e, 0x7c, 0x89, 0xd7, 0x62, 0xbd, 0x44, 0x58, 0x4b, 0x4c, - 0xc3, 0x1f, 0xa7, 0x7d, 0x45, 0x0f, 0xcd, 0xc5, 0x69, 0x9b, 0x91, 0x5e, 0x5d, 0xb8, 0x67, 0x66, - 0x64, 0xcf, 0xde, 0x38, 0xc6, 0xf9, 0x77, 0xf9, 0x08, 0xd4, 0x12, 0xfe, 0x56, 0x7e, 0xba, 0x57, - 0xfe, 0x78, 0x7a, 0xee, 0xa1, 0x09, 0xf2, 0xcc, 0x37, 0x44, 0x80, 0xc1, 0x49, 0x56, 0xb8, 0xd6, - 0x7f, 0x3f, 0x29, 0x18, 0x66, 0x1f, 0xcc, 0xca, 0x4a, 0x46, 0xf2, 0x92, 0x91, 0xf7, 0x92, 0x91, - 0x97, 0x8a, 0x59, 0x79, 0xc5, 0xac, 0xb7, 0x8a, 0x59, 0x71, 0xc7, 0x2c, 0xed, 0xe6, 0x33, 0x00, - 0x00, 0xff, 0xff, 0xd2, 0x7d, 0x34, 0xcb, 0x14, 0x02, 0x00, 0x00, + // 362 bytes of a gzipped FileDescriptorProto + 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xd4, 0x92, 0x3f, 0x4f, 0xc2, 0x40, + 0x18, 0xc6, 0x5b, 0x28, 0x55, 0x0e, 0xff, 0xe5, 0x20, 0xb1, 0x10, 0x68, 0x09, 0x13, 0x26, 0xda, + 0x26, 0x18, 0x57, 0x87, 0x46, 0x13, 0x63, 0x8c, 0x43, 0x21, 0xee, 0x47, 0x78, 0x45, 0x12, 0xe8, + 0xd5, 0xbb, 0x63, 0xe8, 0xb7, 0xe0, 0x63, 0x31, 0x32, 0x3a, 0x35, 0x0a, 0x5b, 0x3f, 0x82, 0x93, + 0xe1, 0x7a, 0x8d, 0x30, 0x3a, 0xba, 0xdd, 0x3d, 0xcf, 0xef, 0x7d, 0x93, 0xe7, 0xc9, 0x8b, 0x9a, + 0xef, 0x73, 0x60, 0xf1, 0x2b, 0xa3, 0xa1, 0x80, 0x70, 0xe4, 0x31, 0xe0, 0x11, 0x0d, 0x39, 0xb8, + 0x11, 0xa3, 0x82, 0xe2, 0xe3, 0x3d, 0xb7, 0x51, 0x1b, 0xd3, 0x31, 0x95, 0x8e, 0xb7, 0x7d, 0x65, + 0x50, 0xa3, 0xce, 0x05, 0x65, 0xe0, 0x4d, 0xc9, 0x10, 0xa6, 0xd1, 0xd0, 0x13, 0x71, 0x04, 0x3c, + 0xb3, 0x3a, 0xdf, 0x3a, 0xaa, 0x0d, 0xde, 0x48, 0x48, 0xf9, 0xd3, 0xd6, 0xe5, 0x81, 0x5a, 0x8f, + 0x3b, 0xc8, 0xec, 0x0b, 0x22, 0xe6, 0xdc, 0xd2, 0xdb, 0x7a, 0xb7, 0xec, 0xa3, 0x34, 0x71, 0x4c, + 0x2e, 0x95, 0x40, 0x39, 0xb8, 0x89, 0x8c, 0x3b, 0x22, 0x88, 0x55, 0x68, 0x17, 0xbb, 0x65, 0xff, + 0x30, 0x4d, 0x1c, 0x63, 0x44, 0x04, 0x09, 0xa4, 0x8a, 0x6f, 0x50, 0xf9, 0x9e, 0x31, 0xca, 0x06, + 0x71, 0x04, 0x56, 0x51, 0x2e, 0x39, 0x4f, 0x13, 0xa7, 0x0a, 0xb9, 0x78, 0x49, 0x67, 0x13, 0x01, + 0xb3, 0x48, 0xc4, 0xc1, 0x2f, 0x89, 0x2f, 0x50, 0x49, 0x7e, 0x2c, 0x43, 0x8e, 0x54, 0xd3, 0xc4, + 0x39, 0x95, 0x23, 0x3b, 0x78, 0x46, 0xe0, 0x5b, 0x74, 0xf0, 0x00, 0x64, 0x04, 0x8c, 0x5b, 0xa5, + 0x76, 0xb1, 0x5b, 0xe9, 0xb5, 0xdc, 0xbd, 0x3a, 0xdc, 0x3c, 0x4d, 0x46, 0xf9, 0xa5, 0x34, 0x71, + 0xf4, 0xab, 0x20, 0x1f, 0xea, 0x2c, 0x0a, 0x79, 0xf8, 0x3e, 0xb0, 0x09, 0xfc, 0x2d, 0x7c, 0x6f, + 0x27, 0x7c, 0xa5, 0x77, 0xe6, 0x0a, 0xb9, 0xc8, 0x95, 0x35, 0xf6, 0x41, 0xf8, 0x47, 0xcb, 0xc4, + 0xd1, 0xfe, 0x5d, 0x25, 0x8f, 0xe8, 0x64, 0x9f, 0xc0, 0x75, 0x64, 0x3c, 0x93, 0x19, 0xa8, 0x26, + 0x14, 0x2f, 0x25, 0xdc, 0x42, 0xe6, 0x0b, 0x99, 0xce, 0x81, 0xab, 0x0b, 0x50, 0xa6, 0x12, 0xfd, + 0xe6, 0xf2, 0xcb, 0xd6, 0x96, 0x6b, 0x5b, 0x5f, 0xad, 0x6d, 0xfd, 0x73, 0x6d, 0xeb, 0x8b, 0x8d, + 0xad, 0xad, 0x36, 0xb6, 0xf6, 0xb1, 0xb1, 0xb5, 0xa1, 0x29, 0x0f, 0xf0, 0xfa, 0x27, 0x00, 0x00, + 0xff, 0xff, 0x68, 0xd1, 0x0b, 0xae, 0xe0, 0x02, 0x00, 0x00, } func (m *ThanosLabelsResponse) Marshal() (dAtA []byte, err error) { @@ -156,6 +201,20 @@ func (m *ThanosLabelsResponse) MarshalToSizedBuffer(dAtA []byte) (int, error) { _ = i var l int _ = l + if len(m.Headers) > 0 { + for iNdEx := len(m.Headers) - 1; iNdEx >= 0; iNdEx-- { + { + size, err := m.Headers[iNdEx].MarshalToSizedBuffer(dAtA[:i]) + if err != nil { + return 0, err + } + i -= size + i = encodeVarintResponse(dAtA, i, uint64(size)) + } + i-- + dAtA[i] = 0x2a + } + } if len(m.Error) > 0 { i -= len(m.Error) copy(dAtA[i:], m.Error) @@ -209,6 +268,20 @@ func (m *ThanosSeriesResponse) MarshalToSizedBuffer(dAtA []byte) (int, error) { _ = i var l int _ = l + if len(m.Headers) > 0 { + for iNdEx := len(m.Headers) - 1; iNdEx >= 0; iNdEx-- { + { + size, err := m.Headers[iNdEx].MarshalToSizedBuffer(dAtA[:i]) + if err != nil { + return 0, err + } + i -= size + i = encodeVarintResponse(dAtA, i, uint64(size)) + } + i-- + dAtA[i] = 0x2a + } + } if len(m.Error) > 0 { i -= len(m.Error) copy(dAtA[i:], m.Error) @@ -247,6 +320,45 @@ func (m *ThanosSeriesResponse) MarshalToSizedBuffer(dAtA []byte) (int, error) { return len(dAtA) - i, nil } +func (m *ResponseHeader) Marshal() (dAtA []byte, err error) { + size := m.Size() + dAtA = make([]byte, size) + n, err := m.MarshalToSizedBuffer(dAtA[:size]) + if err != nil { + return nil, err + } + return dAtA[:n], nil +} + +func (m *ResponseHeader) MarshalTo(dAtA []byte) (int, error) { + size := m.Size() + return m.MarshalToSizedBuffer(dAtA[:size]) +} + +func (m *ResponseHeader) MarshalToSizedBuffer(dAtA []byte) (int, error) { + i := len(dAtA) + _ = i + var l int + _ = l + if len(m.Values) > 0 { + for iNdEx := len(m.Values) - 1; iNdEx >= 0; iNdEx-- { + i -= len(m.Values[iNdEx]) + copy(dAtA[i:], m.Values[iNdEx]) + i = encodeVarintResponse(dAtA, i, uint64(len(m.Values[iNdEx]))) + i-- + dAtA[i] = 0x12 + } + } + if len(m.Name) > 0 { + i -= len(m.Name) + copy(dAtA[i:], m.Name) + i = encodeVarintResponse(dAtA, i, uint64(len(m.Name))) + i-- + dAtA[i] = 0xa + } + return len(dAtA) - i, nil +} + func encodeVarintResponse(dAtA []byte, offset int, v uint64) int { offset -= sovResponse(v) base := offset @@ -282,6 +394,12 @@ func (m *ThanosLabelsResponse) Size() (n int) { if l > 0 { n += 1 + l + sovResponse(uint64(l)) } + if len(m.Headers) > 0 { + for _, e := range m.Headers { + l = e.Size() + n += 1 + l + sovResponse(uint64(l)) + } + } return n } @@ -309,6 +427,31 @@ func (m *ThanosSeriesResponse) Size() (n int) { if l > 0 { n += 1 + l + sovResponse(uint64(l)) } + if len(m.Headers) > 0 { + for _, e := range m.Headers { + l = e.Size() + n += 1 + l + sovResponse(uint64(l)) + } + } + return n +} + +func (m *ResponseHeader) Size() (n int) { + if m == nil { + return 0 + } + var l int + _ = l + l = len(m.Name) + if l > 0 { + n += 1 + l + sovResponse(uint64(l)) + } + if len(m.Values) > 0 { + for _, s := range m.Values { + l = len(s) + n += 1 + l + sovResponse(uint64(l)) + } + } return n } @@ -475,6 +618,40 @@ func (m *ThanosLabelsResponse) Unmarshal(dAtA []byte) error { } m.Error = string(dAtA[iNdEx:postIndex]) iNdEx = postIndex + case 5: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field Headers", wireType) + } + var msglen int + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowResponse + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + msglen |= int(b&0x7F) << shift + if b < 0x80 { + break + } + } + if msglen < 0 { + return ErrInvalidLengthResponse + } + postIndex := iNdEx + msglen + if postIndex < 0 { + return ErrInvalidLengthResponse + } + if postIndex > l { + return io.ErrUnexpectedEOF + } + m.Headers = append(m.Headers, &ResponseHeader{}) + if err := m.Headers[len(m.Headers)-1].Unmarshal(dAtA[iNdEx:postIndex]); err != nil { + return err + } + iNdEx = postIndex default: iNdEx = preIndex skippy, err := skipResponse(dAtA[iNdEx:]) @@ -658,6 +835,157 @@ func (m *ThanosSeriesResponse) Unmarshal(dAtA []byte) error { } m.Error = string(dAtA[iNdEx:postIndex]) iNdEx = postIndex + case 5: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field Headers", wireType) + } + var msglen int + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowResponse + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + msglen |= int(b&0x7F) << shift + if b < 0x80 { + break + } + } + if msglen < 0 { + return ErrInvalidLengthResponse + } + postIndex := iNdEx + msglen + if postIndex < 0 { + return ErrInvalidLengthResponse + } + if postIndex > l { + return io.ErrUnexpectedEOF + } + m.Headers = append(m.Headers, &ResponseHeader{}) + if err := m.Headers[len(m.Headers)-1].Unmarshal(dAtA[iNdEx:postIndex]); err != nil { + return err + } + iNdEx = postIndex + default: + iNdEx = preIndex + skippy, err := skipResponse(dAtA[iNdEx:]) + if err != nil { + return err + } + if skippy < 0 { + return ErrInvalidLengthResponse + } + if (iNdEx + skippy) < 0 { + return ErrInvalidLengthResponse + } + if (iNdEx + skippy) > l { + return io.ErrUnexpectedEOF + } + iNdEx += skippy + } + } + + if iNdEx > l { + return io.ErrUnexpectedEOF + } + return nil +} +func (m *ResponseHeader) Unmarshal(dAtA []byte) error { + l := len(dAtA) + iNdEx := 0 + for iNdEx < l { + preIndex := iNdEx + var wire uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowResponse + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + wire |= uint64(b&0x7F) << shift + if b < 0x80 { + break + } + } + fieldNum := int32(wire >> 3) + wireType := int(wire & 0x7) + if wireType == 4 { + return fmt.Errorf("proto: ResponseHeader: wiretype end group for non-group") + } + if fieldNum <= 0 { + return fmt.Errorf("proto: ResponseHeader: illegal tag %d (wire type %d)", fieldNum, wire) + } + switch fieldNum { + case 1: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field Name", wireType) + } + var stringLen uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowResponse + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + stringLen |= uint64(b&0x7F) << shift + if b < 0x80 { + break + } + } + intStringLen := int(stringLen) + if intStringLen < 0 { + return ErrInvalidLengthResponse + } + postIndex := iNdEx + intStringLen + if postIndex < 0 { + return ErrInvalidLengthResponse + } + if postIndex > l { + return io.ErrUnexpectedEOF + } + m.Name = string(dAtA[iNdEx:postIndex]) + iNdEx = postIndex + case 2: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field Values", wireType) + } + var stringLen uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowResponse + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + stringLen |= uint64(b&0x7F) << shift + if b < 0x80 { + break + } + } + intStringLen := int(stringLen) + if intStringLen < 0 { + return ErrInvalidLengthResponse + } + postIndex := iNdEx + intStringLen + if postIndex < 0 { + return ErrInvalidLengthResponse + } + if postIndex > l { + return io.ErrUnexpectedEOF + } + m.Values = append(m.Values, string(dAtA[iNdEx:postIndex])) + iNdEx = postIndex default: iNdEx = preIndex skippy, err := skipResponse(dAtA[iNdEx:]) diff --git a/pkg/queryfrontend/response.proto b/pkg/queryfrontend/response.proto index b2361c5a26..6a097217a5 100644 --- a/pkg/queryfrontend/response.proto +++ b/pkg/queryfrontend/response.proto @@ -21,16 +21,23 @@ option (gogoproto.goproto_sizecache_all) = false; message ThanosLabelsResponse { - string Status = 1 [(gogoproto.jsontag) = "status"]; + string Status = 1 [(gogoproto.jsontag) = "status"]; repeated string Data = 2 [(gogoproto.jsontag) = "data"]; - string ErrorType = 3 [(gogoproto.jsontag) = "errorType,omitempty"]; - string Error = 4 [(gogoproto.jsontag) = "error,omitempty"]; + string ErrorType = 3 [(gogoproto.jsontag) = "errorType,omitempty"]; + string Error = 4 [(gogoproto.jsontag) = "error,omitempty"]; + repeated ResponseHeader Headers = 5 [(gogoproto.jsontag) = "-"]; } message ThanosSeriesResponse { - string Status = 1 [(gogoproto.jsontag) = "status"]; + string Status = 1 [(gogoproto.jsontag) = "status"]; // TODO(bwplotka): Experiment with ZLabelSet here. repeated thanos.LabelSet Data = 2 [(gogoproto.nullable) = false, (gogoproto.jsontag) = "data"]; - string ErrorType = 3 [(gogoproto.jsontag) = "errorType,omitempty"]; - string Error = 4 [(gogoproto.jsontag) = "error,omitempty"]; + string ErrorType = 3 [(gogoproto.jsontag) = "errorType,omitempty"]; + string Error = 4 [(gogoproto.jsontag) = "error,omitempty"]; + repeated ResponseHeader Headers = 5 [(gogoproto.jsontag) = "-"]; +} + +message ResponseHeader { + string Name = 1 [(gogoproto.jsontag) = "-"]; + repeated string Values = 2 [(gogoproto.jsontag) = "-"]; } diff --git a/pkg/queryfrontend/roundtrip.go b/pkg/queryfrontend/roundtrip.go index 1ca3f857bc..5190ae8db3 100644 --- a/pkg/queryfrontend/roundtrip.go +++ b/pkg/queryfrontend/roundtrip.go @@ -57,8 +57,11 @@ func NewTripperware(config Config, reg prometheus.Registerer, logger log.Logger) return nil, err } - labelsTripperware := newLabelsTripperware(config.LabelsConfig, labelsLimits, labelsCodec, + labelsTripperware, err := newLabelsTripperware(config.LabelsConfig, labelsLimits, labelsCodec, prometheus.WrapRegistererWith(prometheus.Labels{"tripperware": "labels"}, reg), logger) + if err != nil { + return nil, err + } return func(next http.RoundTripper) http.RoundTripper { return newRoundTripper(next, queryRangeTripperware(next), labelsTripperware(next), reg) @@ -206,7 +209,7 @@ func newLabelsTripperware( codec *labelsCodec, reg prometheus.Registerer, logger log.Logger, -) frontend.Tripperware { +) (frontend.Tripperware, error) { labelsMiddleware := []queryrange.Middleware{} m := queryrange.NewInstrumentMiddlewareMetrics(reg) @@ -222,6 +225,29 @@ func newLabelsTripperware( ) } + if config.ResultsCacheConfig != nil { + queryCacheMiddleware, _, err := queryrange.NewResultsCacheMiddleware( + logger, + *config.ResultsCacheConfig, + newThanosCacheKeyGenerator(config.SplitQueriesByInterval), + limits, + codec, + ThanosResponseExtractor{}, + nil, + shouldCache, + reg, + ) + if err != nil { + return nil, errors.Wrap(err, "create results cache middleware") + } + + labelsMiddleware = append( + labelsMiddleware, + queryrange.InstrumentMiddleware("results_cache", m), + queryCacheMiddleware, + ) + } + if config.MaxRetries > 0 { labelsMiddleware = append( labelsMiddleware, @@ -234,7 +260,7 @@ func newLabelsTripperware( return frontend.RoundTripFunc(func(r *http.Request) (*http.Response, error) { return rt.RoundTrip(r) }) - } + }, nil } // Don't go to response cache if StoreMatchers are set. diff --git a/pkg/queryfrontend/roundtrip_test.go b/pkg/queryfrontend/roundtrip_test.go index a6efcf391e..24f95387d1 100644 --- a/pkg/queryfrontend/roundtrip_test.go +++ b/pkg/queryfrontend/roundtrip_test.go @@ -373,8 +373,8 @@ func TestRoundTripSplitIntervalMiddleware(t *testing.T) { } } -// TestRoundTripCacheMiddleware tests the cache middleware. -func TestRoundTripCacheMiddleware(t *testing.T) { +// TestRoundTripQueryRangeCacheMiddleware tests the cache middleware. +func TestRoundTripQueryRangeCacheMiddleware(t *testing.T) { testRequest := &ThanosQueryRangeRequest{ Path: "/api/v1/query_range", Start: 0, @@ -500,6 +500,192 @@ func TestRoundTripCacheMiddleware(t *testing.T) { } } +// TestRoundTripLabelsCacheMiddleware tests the cache middleware for labels requests. +func TestRoundTripLabelsCacheMiddleware(t *testing.T) { + testRequest := &ThanosLabelsRequest{ + Path: "/api/v1/labels", + Start: 0, + End: 2 * hour, + } + + // Same query params as testRequest, but with storeMatchers + testRequestWithStoreMatchers := &ThanosLabelsRequest{ + Path: "/api/v1/labels", + Start: 0, + End: 2 * hour, + StoreMatchers: [][]*labels.Matcher{{labels.MustNewMatcher(labels.MatchEqual, "foo", "bar")}}, + } + + testLabelValuesRequestFoo := &ThanosLabelsRequest{ + Path: "/api/v1/label/foo/values", + Start: 0, + End: 2 * hour, + Label: "foo", + } + + testLabelValuesRequestBar := &ThanosLabelsRequest{ + Path: "/api/v1/label/bar/values", + Start: 0, + End: 2 * hour, + Label: "bar", + } + + cacheConf := &queryrange.ResultsCacheConfig{ + CacheConfig: cortexcache.Config{ + EnableFifoCache: true, + Fifocache: cortexcache.FifoCacheConfig{ + MaxSizeBytes: "1MiB", + MaxSizeItems: 1000, + Validity: time.Hour, + }, + }, + } + + now := time.Now() + tpw, err := NewTripperware( + Config{ + LabelsConfig: LabelsConfig{ + Limits: defaultLimits, + ResultsCacheConfig: cacheConf, + SplitQueriesByInterval: day, + }, + }, nil, log.NewNopLogger(), + ) + testutil.Ok(t, err) + + rt, err := newFakeRoundTripper() + testutil.Ok(t, err) + defer rt.Close() + res, handler := labelsResults(false) + rt.setHandler(handler) + + for _, tc := range []struct { + name string + req queryrange.Request + handlerAndResult func() (*int, http.Handler) + expected int + }{ + {name: "first request", req: testRequest, expected: 1}, + {name: "same request as the first one, directly use cache", req: testRequest, expected: 1}, + {name: "storeMatchers requests won't go to cache", req: testRequestWithStoreMatchers, expected: 2}, + {name: "label values request label name foo", req: testLabelValuesRequestFoo, expected: 3}, + {name: "same label values query, use cache", req: testLabelValuesRequestFoo, expected: 3}, + {name: "label values request different label", req: testLabelValuesRequestBar, expected: 4}, + { + name: "request but will be partitioned", + req: &ThanosLabelsRequest{ + Path: "/api/v1/labels", + Start: timestamp.FromTime(now.Add(-time.Hour)), + End: timestamp.FromTime(now.Add(time.Hour)), + }, + expected: 5, + }, + { + name: "same query as the previous one", + req: &ThanosLabelsRequest{ + Path: "/api/v1/labels", + Start: timestamp.FromTime(now.Add(-time.Hour)), + End: timestamp.FromTime(now.Add(time.Hour)), + }, + expected: 6, + }, + } { + + t.Run(tc.name, func(t *testing.T) { + + ctx := user.InjectOrgID(context.Background(), "1") + httpReq, err := NewThanosLabelsCodec(true, 24*time.Hour).EncodeRequest(ctx, tc.req) + testutil.Ok(t, err) + + _, err = tpw(rt).RoundTrip(httpReq) + testutil.Ok(t, err) + + testutil.Equals(t, tc.expected, *res) + }) + + } +} + +// TestRoundTripSeriesCacheMiddleware tests the cache middleware for series requests. +func TestRoundTripSeriesCacheMiddleware(t *testing.T) { + testRequest := &ThanosSeriesRequest{ + Path: "/api/v1/series", + Start: 0, + End: 2 * hour, + Matchers: [][]*labels.Matcher{{labels.MustNewMatcher(labels.MatchEqual, "foo", "bar")}}, + } + + // Different matchers set with the first request. + testRequest2 := &ThanosSeriesRequest{ + Path: "/api/v1/series", + Start: 0, + End: 2 * hour, + Matchers: [][]*labels.Matcher{{labels.MustNewMatcher(labels.MatchEqual, "foo", "baz")}}, + } + + // Same query params as testRequest, but with storeMatchers + testRequestWithStoreMatchers := &ThanosLabelsRequest{ + Path: "/api/v1/series", + Start: 0, + End: 2 * hour, + StoreMatchers: [][]*labels.Matcher{{labels.MustNewMatcher(labels.MatchEqual, "foo", "bar")}}, + } + + cacheConf := &queryrange.ResultsCacheConfig{ + CacheConfig: cortexcache.Config{ + EnableFifoCache: true, + Fifocache: cortexcache.FifoCacheConfig{ + MaxSizeBytes: "1MiB", + MaxSizeItems: 1000, + Validity: time.Hour, + }, + }, + } + + tpw, err := NewTripperware( + Config{ + LabelsConfig: LabelsConfig{ + Limits: defaultLimits, + ResultsCacheConfig: cacheConf, + SplitQueriesByInterval: day, + }, + }, nil, log.NewNopLogger(), + ) + testutil.Ok(t, err) + + rt, err := newFakeRoundTripper() + testutil.Ok(t, err) + defer rt.Close() + res, handler := seriesResults(false) + rt.setHandler(handler) + + for _, tc := range []struct { + name string + req queryrange.Request + handlerAndResult func() (*int, http.Handler) + expected int + }{ + {name: "first request", req: testRequest, expected: 1}, + {name: "same request as the first one, directly use cache", req: testRequest, expected: 1}, + {name: "different series request, not use cache", req: testRequest2, expected: 2}, + {name: "storeMatchers requests won't go to cache", req: testRequestWithStoreMatchers, expected: 3}, + } { + + t.Run(tc.name, func(t *testing.T) { + + ctx := user.InjectOrgID(context.Background(), "1") + httpReq, err := NewThanosLabelsCodec(true, 24*time.Hour).EncodeRequest(ctx, tc.req) + testutil.Ok(t, err) + + _, err = tpw(rt).RoundTrip(httpReq) + testutil.Ok(t, err) + + testutil.Equals(t, tc.expected, *res) + }) + + } +} + // promqlResults is a mock handler used to test split and cache middleware. // Modified from Loki https://github.com/grafana/loki/blob/master/pkg/querier/queryrange/roundtrip_test.go#L547. func promqlResults(fail bool) (*int, http.Handler) {