diff --git a/CHANGELOG.md b/CHANGELOG.md index 832465e3e0d..3672936d526 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -51,6 +51,8 @@ We use *breaking :warning:* to mark changes that are not backward compatible (re ### Removed +- [#7014](https://github.com/thanos-io/thanos/pull/7014) *: *breaking :warning:* Removed experimental query pushdown feature to simplify query path. This feature has had high complexity for too little benefits. + ## [v0.33.0](https://github.com/thanos-io/thanos/tree/release-0.33) - 18.12.2023 ### Fixed diff --git a/cmd/thanos/query.go b/cmd/thanos/query.go index 4d831ab6d13..0e02982b9e7 100644 --- a/cmd/thanos/query.go +++ b/cmd/thanos/query.go @@ -197,7 +197,7 @@ func registerQuery(app *extkingpin.App) { activeQueryDir := cmd.Flag("query.active-query-path", "Directory to log currently active queries in the queries.active file.").Default("").String() - featureList := cmd.Flag("enable-feature", "Comma separated experimental feature names to enable.The current list of features is "+queryPushdown+".").Default("").Strings() + featureList := cmd.Flag("enable-feature", "Comma separated experimental feature names to enable.The current list of features is empty.").Hidden().Default("").Strings() enableExemplarPartialResponse := cmd.Flag("exemplar.partial-response", "Enable partial response for exemplar endpoint. --no-exemplar.partial-response for disabling."). Hidden().Default("true").Bool() @@ -232,17 +232,16 @@ func registerQuery(app *extkingpin.App) { return errors.Wrap(err, "parse federation labels") } - var enableQueryPushdown bool for _, feature := range *featureList { - if feature == queryPushdown { - enableQueryPushdown = true - } if feature == promqlAtModifier { level.Warn(logger).Log("msg", "This option for --enable-feature is now permanently enabled and therefore a no-op.", "option", promqlAtModifier) } if feature == promqlNegativeOffset { level.Warn(logger).Log("msg", "This option for --enable-feature is now permanently enabled and therefore a no-op.", "option", promqlNegativeOffset) } + if feature == queryPushdown { + level.Warn(logger).Log("msg", "This option for --enable-feature is now permanently deprecated and therefore ignored.", "option", queryPushdown) + } } httpLogOpts, err := logging.ParseHTTPOptions(reqLogConfig) @@ -335,7 +334,6 @@ func registerQuery(app *extkingpin.App) { *strictEndpoints, *strictEndpointGroups, *webDisableCORS, - enableQueryPushdown, *alertQueryURL, *grpcProxyStrategy, component.Query, @@ -417,7 +415,6 @@ func runQuery( strictEndpoints []string, strictEndpointGroups []string, disableCORS bool, - enableQueryPushdown bool, alertQueryURL string, grpcProxyStrategy string, comp component.Component, @@ -708,7 +705,6 @@ func runQuery( enableTargetPartialResponse, enableMetricMetadataPartialResponse, enableExemplarPartialResponse, - enableQueryPushdown, queryReplicaLabels, flagsMap, defaultRangeQueryStep, diff --git a/docs/components/query.md b/docs/components/query.md index 4584363ba3e..bfb5f1bb8a1 100644 --- a/docs/components/query.md +++ b/docs/components/query.md @@ -286,9 +286,6 @@ Flags: --alert.query-url=ALERT.QUERY-URL The external Thanos Query URL that would be set in all alerts 'Source' field. - --enable-feature= ... Comma separated experimental feature names - to enable.The current list of features is - query-pushdown. --endpoint= ... Addresses of statically configured Thanos API servers (repeatable). The scheme may be prefixed with 'dns+' or 'dnssrv+' to detect diff --git a/pkg/api/query/grpc.go b/pkg/api/query/grpc.go index 769b0001c5b..8b72b98da30 100644 --- a/pkg/api/query/grpc.go +++ b/pkg/api/query/grpc.go @@ -97,7 +97,6 @@ func (g *GRPCAPI) Query(request *querypb.QueryRequest, server querypb.Query_Quer storeMatchers, maxResolution, request.EnablePartialResponse, - request.EnableQueryPushdown, false, request.ShardInfo, query.NoopSeriesStatsReporter, @@ -195,7 +194,6 @@ func (g *GRPCAPI) QueryRange(request *querypb.QueryRangeRequest, srv querypb.Que storeMatchers, maxResolution, request.EnablePartialResponse, - request.EnableQueryPushdown, false, request.ShardInfo, query.NoopSeriesStatsReporter, diff --git a/pkg/api/query/querypb/query.pb.go b/pkg/api/query/querypb/query.pb.go index bd5b5d47246..b055b923bd2 100644 --- a/pkg/api/query/querypb/query.pb.go +++ b/pkg/api/query/querypb/query.pb.go @@ -67,7 +67,6 @@ type QueryRequest struct { StoreMatchers []StoreMatchers `protobuf:"bytes,6,rep,name=storeMatchers,proto3" json:"storeMatchers"` EnableDedup bool `protobuf:"varint,7,opt,name=enableDedup,proto3" json:"enableDedup,omitempty"` EnablePartialResponse bool `protobuf:"varint,8,opt,name=enablePartialResponse,proto3" json:"enablePartialResponse,omitempty"` - EnableQueryPushdown bool `protobuf:"varint,9,opt,name=enableQueryPushdown,proto3" json:"enableQueryPushdown,omitempty"` SkipChunks bool `protobuf:"varint,10,opt,name=skipChunks,proto3" json:"skipChunks,omitempty"` ShardInfo *storepb.ShardInfo `protobuf:"bytes,11,opt,name=shard_info,json=shardInfo,proto3" json:"shard_info,omitempty"` LookbackDeltaSeconds int64 `protobuf:"varint,12,opt,name=lookback_delta_seconds,json=lookbackDeltaSeconds,proto3" json:"lookback_delta_seconds,omitempty"` @@ -240,7 +239,6 @@ type QueryRangeRequest struct { StoreMatchers []StoreMatchers `protobuf:"bytes,8,rep,name=storeMatchers,proto3" json:"storeMatchers"` EnableDedup bool `protobuf:"varint,9,opt,name=enableDedup,proto3" json:"enableDedup,omitempty"` EnablePartialResponse bool `protobuf:"varint,10,opt,name=enablePartialResponse,proto3" json:"enablePartialResponse,omitempty"` - EnableQueryPushdown bool `protobuf:"varint,11,opt,name=enableQueryPushdown,proto3" json:"enableQueryPushdown,omitempty"` SkipChunks bool `protobuf:"varint,12,opt,name=skipChunks,proto3" json:"skipChunks,omitempty"` ShardInfo *storepb.ShardInfo `protobuf:"bytes,13,opt,name=shard_info,json=shardInfo,proto3" json:"shard_info,omitempty"` LookbackDeltaSeconds int64 `protobuf:"varint,14,opt,name=lookback_delta_seconds,json=lookbackDeltaSeconds,proto3" json:"lookback_delta_seconds,omitempty"` @@ -377,54 +375,53 @@ func init() { func init() { proto.RegisterFile("api/query/querypb/query.proto", fileDescriptor_4b2aba43925d729f) } var fileDescriptor_4b2aba43925d729f = []byte{ - // 752 bytes of a gzipped FileDescriptorProto - 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xbc, 0x55, 0x4b, 0x6f, 0xeb, 0x44, - 0x14, 0xb6, 0x6f, 0x9a, 0xd7, 0x71, 0x92, 0xe6, 0x0e, 0x29, 0xf8, 0x06, 0x30, 0x26, 0x52, 0x85, - 0xa9, 0x50, 0x52, 0x85, 0xc2, 0x0e, 0x09, 0x4a, 0x2b, 0x15, 0xa9, 0x48, 0xad, 0xdb, 0x15, 0x9b, - 0x68, 0x12, 0x4f, 0x13, 0x2b, 0xce, 0x8c, 0xeb, 0x19, 0xb7, 0x8d, 0xd8, 0xb3, 0xe6, 0xbf, 0xf0, - 0x27, 0xba, 0x42, 0x5d, 0xb2, 0x42, 0xd0, 0xfe, 0x11, 0xe4, 0xf1, 0xa3, 0x76, 0x15, 0xf5, 0x41, - 0xa5, 0xbb, 0x71, 0x66, 0xbe, 0xef, 0x3b, 0xf3, 0x38, 0x73, 0xbe, 0x1c, 0xf8, 0x14, 0xfb, 0xee, - 0xe0, 0x3c, 0x24, 0xc1, 0x32, 0xfe, 0xfa, 0xe3, 0xf8, 0xb7, 0xef, 0x07, 0x4c, 0x30, 0x54, 0x11, - 0x33, 0x4c, 0x19, 0xef, 0x76, 0xa6, 0x6c, 0xca, 0x24, 0x34, 0x88, 0x46, 0x31, 0xdb, 0x7d, 0xc7, - 0x05, 0x0b, 0xc8, 0x40, 0x7e, 0xfd, 0xf1, 0x40, 0x2c, 0x7d, 0xc2, 0x13, 0xea, 0xa3, 0x22, 0x15, - 0xf8, 0x93, 0x84, 0x30, 0x8b, 0x84, 0x1f, 0xb0, 0x45, 0x31, 0xb4, 0xf7, 0xe7, 0x1a, 0x34, 0x8e, - 0xa3, 0x33, 0xd8, 0xe4, 0x3c, 0x24, 0x5c, 0xa0, 0x0e, 0x94, 0xe5, 0x99, 0x74, 0xd5, 0x54, 0xad, - 0xba, 0x1d, 0x4f, 0xd0, 0xe7, 0xd0, 0x10, 0xee, 0x82, 0x8c, 0x38, 0x99, 0x30, 0xea, 0x70, 0xfd, - 0x8d, 0xa9, 0x5a, 0x25, 0x5b, 0x8b, 0xb0, 0x93, 0x18, 0x42, 0x5f, 0xc0, 0x7a, 0x34, 0x65, 0xa1, - 0xc8, 0x54, 0x25, 0xa9, 0x6a, 0x25, 0x70, 0x2a, 0xdc, 0x81, 0x0f, 0x17, 0xf8, 0x6a, 0x14, 0x10, - 0xce, 0xbc, 0x50, 0xb8, 0x8c, 0x66, 0xfa, 0x35, 0xa9, 0xef, 0x2c, 0xf0, 0x95, 0x9d, 0x91, 0x69, - 0xd4, 0x26, 0xb4, 0x02, 0xe2, 0x7b, 0xee, 0x04, 0x8f, 0x3c, 0x3c, 0x26, 0x1e, 0xd7, 0xcb, 0x66, - 0xc9, 0xaa, 0xdb, 0xcd, 0x04, 0x3d, 0x94, 0x20, 0xfa, 0x01, 0x9a, 0xf2, 0xb6, 0x3f, 0x63, 0x31, - 0x99, 0x91, 0x80, 0xeb, 0x15, 0xb3, 0x64, 0x69, 0xc3, 0x8d, 0x7e, 0x9c, 0xdb, 0xfe, 0x49, 0x9e, - 0xdc, 0x5d, 0xbb, 0xfe, 0xfb, 0x33, 0xc5, 0x2e, 0x46, 0x20, 0x13, 0x34, 0x42, 0xf1, 0xd8, 0x23, - 0x7b, 0xc4, 0x09, 0x7d, 0xbd, 0x6a, 0xaa, 0x56, 0xcd, 0xce, 0x43, 0x68, 0x07, 0x36, 0xe2, 0xe9, - 0x11, 0x0e, 0x84, 0x8b, 0x3d, 0x9b, 0x70, 0x9f, 0x51, 0x4e, 0xf4, 0x9a, 0xd4, 0xae, 0x26, 0xd1, - 0x36, 0x7c, 0x10, 0x13, 0x32, 0xdf, 0x47, 0x21, 0x9f, 0x39, 0xec, 0x92, 0xea, 0x75, 0x19, 0xb3, - 0x8a, 0x42, 0x06, 0x00, 0x9f, 0xbb, 0xfe, 0x8f, 0xb3, 0x90, 0xce, 0xb9, 0x0e, 0x52, 0x98, 0x43, - 0xd0, 0x36, 0x00, 0x9f, 0xe1, 0xc0, 0x19, 0xb9, 0xf4, 0x8c, 0xe9, 0x9a, 0xa9, 0x5a, 0xda, 0xf0, - 0x6d, 0x76, 0xd3, 0x88, 0xf9, 0x89, 0x9e, 0x31, 0xbb, 0xce, 0xd3, 0x61, 0x94, 0x7b, 0x8f, 0xb1, - 0xf9, 0x18, 0x4f, 0xe6, 0x23, 0x87, 0x78, 0x02, 0x67, 0xb9, 0x6f, 0xc4, 0xb9, 0x4f, 0xd9, 0xbd, - 0x88, 0x4c, 0x73, 0xbf, 0x05, 0x15, 0x42, 0xa7, 0x2e, 0x25, 0x7a, 0xd3, 0x54, 0xad, 0xd6, 0x10, - 0xa5, 0x7b, 0xec, 0x4b, 0xf4, 0x74, 0xe9, 0x13, 0x3b, 0x51, 0xf4, 0x8e, 0xa1, 0x59, 0xc8, 0x31, - 0xfa, 0x1e, 0x9a, 0xf2, 0xc1, 0xb2, 0x17, 0x51, 0xe5, 0x8b, 0x74, 0xd2, 0x35, 0x0e, 0x73, 0x64, - 0xfa, 0x20, 0x85, 0x80, 0xde, 0x05, 0x34, 0x93, 0x12, 0x4d, 0x32, 0xf9, 0x09, 0xd4, 0x2e, 0x71, - 0x40, 0x5d, 0x3a, 0xe5, 0x71, 0x99, 0x1e, 0x28, 0x76, 0x86, 0xa0, 0xef, 0x00, 0xa2, 0x8a, 0xe3, - 0x24, 0x70, 0x49, 0x5c, 0xa9, 0xda, 0xf0, 0xe3, 0xa8, 0xdc, 0x17, 0x44, 0xcc, 0x48, 0xc8, 0x47, - 0x13, 0xe6, 0x2f, 0xfb, 0xa7, 0xb2, 0x74, 0x23, 0xc9, 0x81, 0x62, 0xe7, 0x02, 0x76, 0x6b, 0x50, - 0x09, 0x08, 0x0f, 0x3d, 0xd1, 0xfb, 0xa3, 0x0c, 0x6f, 0xe3, 0x8d, 0x31, 0x9d, 0x92, 0xc7, 0x0d, - 0xf2, 0x15, 0x20, 0x2e, 0x70, 0x20, 0x46, 0x2b, 0x6c, 0xd2, 0x96, 0xcc, 0x69, 0xce, 0x2b, 0x16, - 0xb4, 0x09, 0x75, 0x8a, 0xda, 0xc4, 0x2c, 0x84, 0x3a, 0x79, 0xe5, 0x97, 0xd0, 0x76, 0xa9, 0x20, - 0xc1, 0x05, 0xf6, 0x1e, 0xd8, 0x64, 0x3d, 0xc5, 0x1f, 0x31, 0x60, 0xf9, 0x85, 0x06, 0xac, 0xbc, - 0xc8, 0x80, 0xd5, 0x67, 0x19, 0xb0, 0xf6, 0x5a, 0x03, 0xd6, 0x5f, 0x60, 0x40, 0xf8, 0x1f, 0x06, - 0xd4, 0x9e, 0x6b, 0xc0, 0xc6, 0x13, 0x06, 0x6c, 0xbe, 0xca, 0x80, 0xad, 0x67, 0x19, 0x70, 0xfd, - 0x49, 0x03, 0xfe, 0x0a, 0x28, 0x5f, 0xb4, 0xef, 0xd5, 0x32, 0x5b, 0xdf, 0x00, 0xdc, 0x1f, 0x09, - 0x69, 0x50, 0x75, 0xc8, 0x19, 0x0e, 0x3d, 0xd1, 0x56, 0x50, 0x0b, 0xe0, 0x7e, 0xc1, 0xb6, 0x8a, - 0x00, 0x92, 0x7e, 0xd7, 0x7e, 0x33, 0xfc, 0x4d, 0x85, 0xb2, 0x3c, 0x34, 0xfa, 0x36, 0x1d, 0x64, - 0xff, 0x0f, 0xf9, 0xee, 0xd4, 0xdd, 0x78, 0x80, 0xc6, 0xb7, 0xdb, 0x56, 0xd1, 0x3e, 0xc0, 0xfd, - 0xad, 0xd1, 0xbb, 0xa2, 0x2c, 0x67, 0xdf, 0x6e, 0x77, 0x15, 0x95, 0x2e, 0xb3, 0xbb, 0x79, 0xfd, - 0xaf, 0xa1, 0x5c, 0xdf, 0x1a, 0xea, 0xcd, 0xad, 0xa1, 0xfe, 0x73, 0x6b, 0xa8, 0xbf, 0xdf, 0x19, - 0xca, 0xcd, 0x9d, 0xa1, 0xfc, 0x75, 0x67, 0x28, 0xbf, 0x54, 0x93, 0xae, 0x3d, 0xae, 0xc8, 0xe6, - 0xf9, 0xf5, 0x7f, 0x01, 0x00, 0x00, 0xff, 0xff, 0x71, 0xcb, 0xbf, 0x05, 0xd1, 0x07, 0x00, 0x00, + // 734 bytes of a gzipped FileDescriptorProto + 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xbc, 0x55, 0xcf, 0x6f, 0xfb, 0x34, + 0x1c, 0x4d, 0xd6, 0xdf, 0x9f, 0xb4, 0x5d, 0x66, 0x75, 0x90, 0x15, 0x08, 0xa1, 0xd2, 0x44, 0x98, + 0x50, 0x3b, 0x95, 0xc1, 0x0d, 0x09, 0xc6, 0x26, 0x0d, 0x34, 0x24, 0x96, 0xed, 0xc4, 0xa5, 0x72, + 0x5b, 0xaf, 0x8d, 0x9a, 0xda, 0x59, 0xec, 0x8c, 0x55, 0xdc, 0xe1, 0xca, 0xdf, 0xc0, 0x5f, 0xb3, + 0xe3, 0x8e, 0x9c, 0x10, 0x6c, 0xff, 0x08, 0x8a, 0xf3, 0x63, 0xc9, 0x34, 0x8d, 0x4e, 0x93, 0xbe, + 0x97, 0x34, 0x7e, 0xef, 0xd9, 0xb1, 0x5f, 0xdf, 0x93, 0xe1, 0x23, 0xec, 0xbb, 0x83, 0xab, 0x90, + 0x04, 0xab, 0xf8, 0xe9, 0x8f, 0xe3, 0xdf, 0xbe, 0x1f, 0x30, 0xc1, 0x50, 0x55, 0xcc, 0x31, 0x65, + 0xbc, 0xdb, 0x99, 0xb1, 0x19, 0x93, 0xd0, 0x20, 0x7a, 0x8b, 0xd9, 0xee, 0x0e, 0x17, 0x2c, 0x20, + 0x03, 0xf9, 0xf4, 0xc7, 0x03, 0xb1, 0xf2, 0x09, 0x4f, 0xa8, 0xf7, 0x8b, 0x54, 0xe0, 0x4f, 0x12, + 0xc2, 0x2a, 0x12, 0x7e, 0xc0, 0x96, 0xc5, 0xa9, 0xbd, 0x3f, 0xcb, 0xd0, 0x3c, 0x8b, 0xf6, 0xe0, + 0x90, 0xab, 0x90, 0x70, 0x81, 0x3a, 0x50, 0x91, 0x7b, 0x32, 0x54, 0x4b, 0xb5, 0x1b, 0x4e, 0x3c, + 0x40, 0x9f, 0x40, 0x53, 0xb8, 0x4b, 0x32, 0xe2, 0x64, 0xc2, 0xe8, 0x94, 0x1b, 0x1b, 0x96, 0x6a, + 0x97, 0x1c, 0x2d, 0xc2, 0xce, 0x63, 0x08, 0x7d, 0x0a, 0x9b, 0xd1, 0x90, 0x85, 0x22, 0x53, 0x95, + 0xa4, 0xaa, 0x9d, 0xc0, 0xa9, 0xf0, 0x00, 0xde, 0x5b, 0xe2, 0x9b, 0x51, 0x40, 0x38, 0xf3, 0x42, + 0xe1, 0x32, 0x9a, 0xe9, 0xcb, 0x52, 0xdf, 0x59, 0xe2, 0x1b, 0x27, 0x23, 0xd3, 0x59, 0xbb, 0xd0, + 0x0e, 0x88, 0xef, 0xb9, 0x13, 0x3c, 0xf2, 0xf0, 0x98, 0x78, 0xdc, 0xa8, 0x58, 0x25, 0xbb, 0xe1, + 0xb4, 0x12, 0xf4, 0x54, 0x82, 0xe8, 0x5b, 0x68, 0xc9, 0xd3, 0xfe, 0x88, 0xc5, 0x64, 0x4e, 0x02, + 0x6e, 0x54, 0xad, 0x92, 0xad, 0x0d, 0xb7, 0xfb, 0xb1, 0xb7, 0xfd, 0xf3, 0x3c, 0x79, 0x58, 0xbe, + 0xfd, 0xfb, 0x63, 0xc5, 0x29, 0xce, 0x40, 0x16, 0x68, 0x84, 0xe2, 0xb1, 0x47, 0x8e, 0xc8, 0x34, + 0xf4, 0x8d, 0x9a, 0xa5, 0xda, 0x75, 0x27, 0x0f, 0xa1, 0x03, 0xd8, 0x8e, 0x87, 0x3f, 0xe1, 0x40, + 0xb8, 0xd8, 0x73, 0x08, 0xf7, 0x19, 0xe5, 0xc4, 0xa8, 0x4b, 0xed, 0xf3, 0x24, 0x32, 0x01, 0xf8, + 0xc2, 0xf5, 0xbf, 0x9b, 0x87, 0x74, 0xc1, 0x0d, 0x90, 0xd2, 0x1c, 0x82, 0xf6, 0x01, 0xf8, 0x1c, + 0x07, 0xd3, 0x91, 0x4b, 0x2f, 0x99, 0xa1, 0x59, 0xaa, 0xad, 0x0d, 0xb7, 0xb2, 0x7d, 0x47, 0xcc, + 0xf7, 0xf4, 0x92, 0x39, 0x0d, 0x9e, 0xbe, 0x46, 0x4e, 0x7a, 0x8c, 0x2d, 0xc6, 0x78, 0xb2, 0x18, + 0x4d, 0x89, 0x27, 0x70, 0xe6, 0x64, 0x33, 0x76, 0x32, 0x65, 0x8f, 0x22, 0x32, 0x75, 0x72, 0x0f, + 0xaa, 0x84, 0xce, 0x5c, 0x4a, 0x8c, 0x96, 0xa5, 0xda, 0xed, 0x21, 0x4a, 0xbf, 0x71, 0x2c, 0xd1, + 0x8b, 0x95, 0x4f, 0x9c, 0x44, 0xf1, 0x43, 0xb9, 0xde, 0xd0, 0xa1, 0x77, 0x06, 0xad, 0x82, 0x6f, + 0xe8, 0x1b, 0x68, 0xc9, 0x3f, 0x21, 0x73, 0x59, 0x95, 0x2e, 0x77, 0xd2, 0x95, 0x4e, 0x73, 0x64, + 0x6a, 0x72, 0x61, 0x42, 0xef, 0x1a, 0x5a, 0x49, 0xec, 0x12, 0x77, 0x3e, 0x84, 0xfa, 0x2f, 0x38, + 0xa0, 0x2e, 0x9d, 0xf1, 0x38, 0x7a, 0x27, 0x8a, 0x93, 0x21, 0xe8, 0x6b, 0x80, 0x28, 0x45, 0x9c, + 0x04, 0x2e, 0x89, 0xd3, 0xa7, 0x0d, 0x3f, 0x88, 0x22, 0xbc, 0x24, 0x62, 0x4e, 0x42, 0x3e, 0x9a, + 0x30, 0x7f, 0xd5, 0xbf, 0x90, 0x71, 0x8c, 0x24, 0x27, 0x8a, 0x93, 0x9b, 0x70, 0x58, 0x87, 0x6a, + 0x40, 0x78, 0xe8, 0x89, 0xde, 0xef, 0x15, 0xd8, 0x8a, 0x3f, 0x8c, 0xe9, 0x8c, 0xbc, 0x1c, 0xfa, + 0xcf, 0x01, 0x71, 0x81, 0x03, 0x31, 0x7a, 0x26, 0xfa, 0xba, 0x64, 0x2e, 0x72, 0xf9, 0xb7, 0x41, + 0x27, 0x74, 0x5a, 0xd4, 0x26, 0x05, 0x20, 0x74, 0x9a, 0x57, 0x7e, 0x06, 0xba, 0x4b, 0x05, 0x09, + 0xae, 0xb1, 0xf7, 0x24, 0xfa, 0x9b, 0x29, 0xfe, 0x42, 0xa9, 0x2a, 0xaf, 0x2c, 0x55, 0xf5, 0x55, + 0xa5, 0xaa, 0xad, 0x55, 0xaa, 0xfa, 0x5b, 0x4b, 0xd5, 0x78, 0x45, 0xa9, 0x60, 0xfd, 0x52, 0x35, + 0xff, 0xa7, 0x54, 0xad, 0x37, 0x95, 0xaa, 0xbd, 0x56, 0xa9, 0x36, 0xd7, 0x28, 0x95, 0xa6, 0x37, + 0x7b, 0xbf, 0x02, 0xca, 0x07, 0xf1, 0x9d, 0xd6, 0x60, 0xef, 0x4b, 0x80, 0xc7, 0x8d, 0x21, 0x0d, + 0x6a, 0x53, 0x72, 0x89, 0x43, 0x4f, 0xe8, 0x0a, 0x6a, 0x03, 0x3c, 0x2e, 0xa8, 0xab, 0x08, 0x20, + 0xb9, 0x97, 0xf4, 0x8d, 0xe1, 0x6f, 0x2a, 0x54, 0xe4, 0xa6, 0xd1, 0x57, 0xe9, 0x4b, 0xd6, 0xf9, + 0xfc, 0x2d, 0xd2, 0xdd, 0x7e, 0x82, 0xc6, 0xa7, 0xdb, 0x57, 0xd1, 0x31, 0xc0, 0xe3, 0xa9, 0xd1, + 0x4e, 0x51, 0x96, 0xab, 0x64, 0xb7, 0xfb, 0x1c, 0x95, 0x2e, 0x73, 0xb8, 0x7b, 0xfb, 0xaf, 0xa9, + 0xdc, 0xde, 0x9b, 0xea, 0xdd, 0xbd, 0xa9, 0xfe, 0x73, 0x6f, 0xaa, 0x7f, 0x3c, 0x98, 0xca, 0xdd, + 0x83, 0xa9, 0xfc, 0xf5, 0x60, 0x2a, 0x3f, 0xd7, 0x92, 0xdb, 0x75, 0x5c, 0x95, 0x97, 0xdc, 0x17, + 0xff, 0x05, 0x00, 0x00, 0xff, 0xff, 0xd2, 0xa3, 0xe4, 0xc6, 0x79, 0x07, 0x00, 0x00, } // Reference imports to suppress errors if they are not otherwise used. @@ -649,16 +646,6 @@ func (m *QueryRequest) MarshalToSizedBuffer(dAtA []byte) (int, error) { i-- dAtA[i] = 0x50 } - if m.EnableQueryPushdown { - i-- - if m.EnableQueryPushdown { - dAtA[i] = 1 - } else { - dAtA[i] = 0 - } - i-- - dAtA[i] = 0x48 - } if m.EnablePartialResponse { i-- if m.EnablePartialResponse { @@ -883,16 +870,6 @@ func (m *QueryRangeRequest) MarshalToSizedBuffer(dAtA []byte) (int, error) { i-- dAtA[i] = 0x60 } - if m.EnableQueryPushdown { - i-- - if m.EnableQueryPushdown { - dAtA[i] = 1 - } else { - dAtA[i] = 0 - } - i-- - dAtA[i] = 0x58 - } if m.EnablePartialResponse { i-- if m.EnablePartialResponse { @@ -1086,9 +1063,6 @@ func (m *QueryRequest) Size() (n int) { if m.EnablePartialResponse { n += 2 } - if m.EnableQueryPushdown { - n += 2 - } if m.SkipChunks { n += 2 } @@ -1197,9 +1171,6 @@ func (m *QueryRangeRequest) Size() (n int) { if m.EnablePartialResponse { n += 2 } - if m.EnableQueryPushdown { - n += 2 - } if m.SkipChunks { n += 2 } @@ -1481,26 +1452,6 @@ func (m *QueryRequest) Unmarshal(dAtA []byte) error { } } m.EnablePartialResponse = bool(v != 0) - case 9: - if wireType != 0 { - return fmt.Errorf("proto: wrong wireType = %d for field EnableQueryPushdown", wireType) - } - var v int - for shift := uint(0); ; shift += 7 { - if shift >= 64 { - return ErrIntOverflowQuery - } - if iNdEx >= l { - return io.ErrUnexpectedEOF - } - b := dAtA[iNdEx] - iNdEx++ - v |= int(b&0x7F) << shift - if b < 0x80 { - break - } - } - m.EnableQueryPushdown = bool(v != 0) case 10: if wireType != 0 { return fmt.Errorf("proto: wrong wireType = %d for field SkipChunks", wireType) @@ -2079,26 +2030,6 @@ func (m *QueryRangeRequest) Unmarshal(dAtA []byte) error { } } m.EnablePartialResponse = bool(v != 0) - case 11: - if wireType != 0 { - return fmt.Errorf("proto: wrong wireType = %d for field EnableQueryPushdown", wireType) - } - var v int - for shift := uint(0); ; shift += 7 { - if shift >= 64 { - return ErrIntOverflowQuery - } - if iNdEx >= l { - return io.ErrUnexpectedEOF - } - b := dAtA[iNdEx] - iNdEx++ - v |= int(b&0x7F) << shift - if b < 0x80 { - break - } - } - m.EnableQueryPushdown = bool(v != 0) case 12: if wireType != 0 { return fmt.Errorf("proto: wrong wireType = %d for field SkipChunks", wireType) diff --git a/pkg/api/query/querypb/query.proto b/pkg/api/query/querypb/query.proto index 4d346454ef0..ffffb6c53a9 100644 --- a/pkg/api/query/querypb/query.proto +++ b/pkg/api/query/querypb/query.proto @@ -41,13 +41,14 @@ message QueryRequest { bool enableDedup = 7; bool enablePartialResponse = 8; - bool enableQueryPushdown = 9; bool skipChunks = 10; ShardInfo shard_info = 11; int64 lookback_delta_seconds = 12; EngineType engine = 13; + + reserved 9; } message StoreMatchers { @@ -80,12 +81,13 @@ message QueryRangeRequest { bool enableDedup = 9; bool enablePartialResponse = 10; - bool enableQueryPushdown = 11; bool skipChunks = 12; ShardInfo shard_info = 13; int64 lookback_delta_seconds = 14; EngineType engine = 15; + + reserved 11; } message QueryRangeResponse { diff --git a/pkg/api/query/v1.go b/pkg/api/query/v1.go index d6767d9a94e..b94f5b4d744 100644 --- a/pkg/api/query/v1.go +++ b/pkg/api/query/v1.go @@ -157,7 +157,6 @@ type QueryAPI struct { enableTargetPartialResponse bool enableMetricMetadataPartialResponse bool enableExemplarPartialResponse bool - enableQueryPushdown bool disableCORS bool replicaLabels []string @@ -196,7 +195,6 @@ func NewQueryAPI( enableTargetPartialResponse bool, enableMetricMetadataPartialResponse bool, enableExemplarPartialResponse bool, - enableQueryPushdown bool, replicaLabels []string, flagsMap map[string]string, defaultRangeQueryStep time.Duration, @@ -233,7 +231,6 @@ func NewQueryAPI( enableTargetPartialResponse: enableTargetPartialResponse, enableMetricMetadataPartialResponse: enableMetricMetadataPartialResponse, enableExemplarPartialResponse: enableExemplarPartialResponse, - enableQueryPushdown: enableQueryPushdown, replicaLabels: replicaLabels, endpointStatus: endpointStatus, defaultRangeQueryStep: defaultRangeQueryStep, @@ -565,7 +562,6 @@ func (qapi *QueryAPI) queryExplain(r *http.Request) (interface{}, []error, *api. storeDebugMatchers, maxSourceResolution, enablePartialResponse, - qapi.enableQueryPushdown, false, shardInfo, query.NewAggregateStatsReporter(&seriesStats), @@ -668,7 +664,6 @@ func (qapi *QueryAPI) query(r *http.Request) (interface{}, []error, *api.ApiErro storeDebugMatchers, maxSourceResolution, enablePartialResponse, - qapi.enableQueryPushdown, false, shardInfo, query.NewAggregateStatsReporter(&seriesStats), @@ -835,7 +830,6 @@ func (qapi *QueryAPI) queryRangeExplain(r *http.Request) (interface{}, []error, storeDebugMatchers, maxSourceResolution, enablePartialResponse, - qapi.enableQueryPushdown, false, shardInfo, query.NewAggregateStatsReporter(&seriesStats), @@ -968,7 +962,6 @@ func (qapi *QueryAPI) queryRange(r *http.Request) (interface{}, []error, *api.Ap storeDebugMatchers, maxSourceResolution, enablePartialResponse, - qapi.enableQueryPushdown, false, shardInfo, query.NewAggregateStatsReporter(&seriesStats), @@ -1062,7 +1055,6 @@ func (qapi *QueryAPI) labelValues(r *http.Request) (interface{}, []error, *api.A storeDebugMatchers, 0, enablePartialResponse, - qapi.enableQueryPushdown, true, nil, query.NoopSeriesStatsReporter, @@ -1155,7 +1147,6 @@ func (qapi *QueryAPI) series(r *http.Request) (interface{}, []error, *api.ApiErr storeDebugMatchers, math.MaxInt64, enablePartialResponse, - qapi.enableQueryPushdown, true, nil, query.NoopSeriesStatsReporter, @@ -1212,7 +1203,6 @@ func (qapi *QueryAPI) labelNames(r *http.Request) (interface{}, []error, *api.Ap storeDebugMatchers, 0, enablePartialResponse, - qapi.enableQueryPushdown, true, nil, query.NoopSeriesStatsReporter, diff --git a/pkg/dedup/iter.go b/pkg/dedup/iter.go index 8f60c40363b..0a3a312f6fb 100644 --- a/pkg/dedup/iter.go +++ b/pkg/dedup/iter.go @@ -20,17 +20,12 @@ type dedupSeriesSet struct { isCounter bool replicas []storage.Series - // Pushed down series. Currently, they are being handled in a specific way. - // In the future, we might want to relax this and handle these depending - // on what function has been passed. - pushedDown []storage.Series lset labels.Labels peek storage.Series ok bool - f string - pushdownEnabled bool + f string } // isCounter deduces whether a counter metric has been passed. There must be @@ -107,9 +102,9 @@ func (o *overlapSplitSet) Err() error { // NewSeriesSet returns seriesSet that deduplicates the same series. // The series in series set are expected be sorted by all labels. -func NewSeriesSet(set storage.SeriesSet, f string, pushdownEnabled bool) storage.SeriesSet { +func NewSeriesSet(set storage.SeriesSet, f string) storage.SeriesSet { // TODO: remove dependency on knowing whether it is a counter. - s := &dedupSeriesSet{pushdownEnabled: pushdownEnabled, set: set, isCounter: isCounter(f), f: f} + s := &dedupSeriesSet{set: set, isCounter: isCounter(f), f: f} s.ok = s.set.Next() if s.ok { s.peek = s.set.At() @@ -117,34 +112,16 @@ func NewSeriesSet(set storage.SeriesSet, f string, pushdownEnabled bool) storage return s } -// trimPushdownMarker trims the pushdown marker from the given labels. -// Returns true if there was a pushdown marker. -func trimPushdownMarker(lbls labels.Labels) (labels.Labels, bool) { - return labels.NewBuilder(lbls).Del(PushdownMarker.Name).Labels(), lbls.Has(PushdownMarker.Name) -} - func (s *dedupSeriesSet) Next() bool { if !s.ok { return false } - // Reset both because they might have some leftovers. - if s.pushdownEnabled { - s.pushedDown = s.pushedDown[:0] - } s.replicas = s.replicas[:0] // Set the label set we are currently gathering to the peek element. s.lset = s.peek.Labels() + s.replicas = append(s.replicas[:0], s.peek) - pushedDown := false - if s.pushdownEnabled { - s.lset, pushedDown = trimPushdownMarker(s.lset) - } - if pushedDown { - s.pushedDown = append(s.pushedDown[:0], s.peek) - } else { - s.replicas = append(s.replicas[:0], s.peek) - } return s.next() } @@ -153,49 +130,31 @@ func (s *dedupSeriesSet) next() bool { s.ok = s.set.Next() if !s.ok { // There's no next series, the current replicas are the last element. - return len(s.replicas) > 0 || len(s.pushedDown) > 0 + return len(s.replicas) > 0 } s.peek = s.set.At() nextLset := s.peek.Labels() - var pushedDown bool - if s.pushdownEnabled { - nextLset, pushedDown = trimPushdownMarker(nextLset) - } - // If the label set modulo the replica label is equal to the current label set // look for more replicas, otherwise a series is complete. if !labels.Equal(s.lset, nextLset) { return true } - if pushedDown { - s.pushedDown = append(s.pushedDown, s.peek) - } else { - s.replicas = append(s.replicas, s.peek) - } + s.replicas = append(s.replicas, s.peek) return s.next() } func (s *dedupSeriesSet) At() storage.Series { - if len(s.replicas) == 1 && len(s.pushedDown) == 0 { + if len(s.replicas) == 1 { return seriesWithLabels{Series: s.replicas[0], lset: s.lset} } - if len(s.replicas) == 0 && len(s.pushedDown) == 1 { - return seriesWithLabels{Series: s.pushedDown[0], lset: s.lset} - } // Clients may store the series, so we must make a copy of the slice before advancing. repl := make([]storage.Series, len(s.replicas)) copy(repl, s.replicas) - var pushedDown []storage.Series - if s.pushdownEnabled { - pushedDown = make([]storage.Series, len(s.pushedDown)) - copy(pushedDown, s.pushedDown) - } - - return newDedupSeries(s.lset, repl, pushedDown, s.f) + return newDedupSeries(s.lset, repl, s.f) } func (s *dedupSeriesSet) Err() error { @@ -214,111 +173,22 @@ type seriesWithLabels struct { func (s seriesWithLabels) Labels() labels.Labels { return s.lset } type dedupSeries struct { - lset labels.Labels - replicas []storage.Series - pushedDown []storage.Series + lset labels.Labels + replicas []storage.Series isCounter bool f string } -func newDedupSeries(lset labels.Labels, replicas []storage.Series, pushedDown []storage.Series, f string) *dedupSeries { - return &dedupSeries{lset: lset, isCounter: isCounter(f), replicas: replicas, pushedDown: pushedDown, f: f} +func newDedupSeries(lset labels.Labels, replicas []storage.Series, f string) *dedupSeries { + return &dedupSeries{lset: lset, isCounter: isCounter(f), replicas: replicas, f: f} } func (s *dedupSeries) Labels() labels.Labels { return s.lset } -// pushdownIterator creates an iterator that handles -// all pushed down series. -func (s *dedupSeries) pushdownIterator(_ chunkenc.Iterator) chunkenc.Iterator { - var pushedDownIterator adjustableSeriesIterator - if s.isCounter { - pushedDownIterator = &counterErrAdjustSeriesIterator{Iterator: s.pushedDown[0].Iterator(nil)} - } else { - pushedDownIterator = noopAdjustableSeriesIterator{Iterator: s.pushedDown[0].Iterator(nil)} - } - - for _, o := range s.pushedDown[1:] { - var replicaIterator adjustableSeriesIterator - - if s.isCounter { - replicaIterator = &counterErrAdjustSeriesIterator{Iterator: o.Iterator(nil)} - } else { - replicaIterator = noopAdjustableSeriesIterator{Iterator: o.Iterator(nil)} - } - - pushedDownIterator = noopAdjustableSeriesIterator{newPushdownSeriesIterator(pushedDownIterator, replicaIterator, s.f)} - } - - return pushedDownIterator -} - -// allSeriesIterator creates an iterator over all series - pushed down -// and regular replicas. -func (s *dedupSeries) allSeriesIterator(_ chunkenc.Iterator) chunkenc.Iterator { - var replicasIterator, pushedDownIterator adjustableSeriesIterator - if len(s.replicas) != 0 { - if s.isCounter { - replicasIterator = &counterErrAdjustSeriesIterator{Iterator: s.replicas[0].Iterator(nil)} - } else { - replicasIterator = noopAdjustableSeriesIterator{Iterator: s.replicas[0].Iterator(nil)} - } - - for _, o := range s.replicas[1:] { - var replicaIter adjustableSeriesIterator - if s.isCounter { - replicaIter = &counterErrAdjustSeriesIterator{Iterator: o.Iterator(nil)} - } else { - replicaIter = noopAdjustableSeriesIterator{Iterator: o.Iterator(nil)} - } - replicasIterator = newDedupSeriesIterator(replicasIterator, replicaIter) - } - } - - if len(s.pushedDown) != 0 { - if s.isCounter { - pushedDownIterator = &counterErrAdjustSeriesIterator{Iterator: s.pushedDown[0].Iterator(nil)} - } else { - pushedDownIterator = noopAdjustableSeriesIterator{Iterator: s.pushedDown[0].Iterator(nil)} - } - - for _, o := range s.pushedDown[1:] { - var replicaIter adjustableSeriesIterator - if s.isCounter { - replicaIter = &counterErrAdjustSeriesIterator{Iterator: o.Iterator(nil)} - } else { - replicaIter = noopAdjustableSeriesIterator{Iterator: o.Iterator(nil)} - } - pushedDownIterator = newDedupSeriesIterator(pushedDownIterator, replicaIter) - } - } - - if replicasIterator == nil { - return pushedDownIterator - } - if pushedDownIterator == nil { - return replicasIterator - } - return newDedupSeriesIterator(pushedDownIterator, replicasIterator) -} - func (s *dedupSeries) Iterator(_ chunkenc.Iterator) chunkenc.Iterator { - // This function needs a regular iterator over all series. Behavior is identical - // whether it was pushed down or not. - if s.f == "group" { - return s.allSeriesIterator(nil) - } - // If there are no replicas then jump straight to constructing an iterator - // for pushed down series. - if len(s.replicas) == 0 { - return s.pushdownIterator(nil) - } - - // Finally, if we have both then construct a tree out of them. - // Pushed down series have their own special iterator. - // We deduplicate everything in the end. var it adjustableSeriesIterator if s.isCounter { it = &counterErrAdjustSeriesIterator{Iterator: s.replicas[0].Iterator(nil)} @@ -336,31 +206,7 @@ func (s *dedupSeries) Iterator(_ chunkenc.Iterator) chunkenc.Iterator { it = newDedupSeriesIterator(it, replicaIter) } - if len(s.pushedDown) == 0 { - return it - } - - // Join all of the pushed down iterators into one. - var pushedDownIterator adjustableSeriesIterator - if s.isCounter { - pushedDownIterator = &counterErrAdjustSeriesIterator{Iterator: s.pushedDown[0].Iterator(nil)} - } else { - pushedDownIterator = noopAdjustableSeriesIterator{Iterator: s.pushedDown[0].Iterator(nil)} - } - - for _, o := range s.pushedDown[1:] { - var replicaIterator adjustableSeriesIterator - - if s.isCounter { - replicaIterator = &counterErrAdjustSeriesIterator{Iterator: o.Iterator(nil)} - } else { - replicaIterator = noopAdjustableSeriesIterator{Iterator: o.Iterator(nil)} - } - - pushedDownIterator = noopAdjustableSeriesIterator{newPushdownSeriesIterator(pushedDownIterator, replicaIterator, s.f)} - } - - return newDedupSeriesIterator(it, pushedDownIterator) + return it } // adjustableSeriesIterator iterates over the data of a time series and allows to adjust current value based on diff --git a/pkg/dedup/iter_test.go b/pkg/dedup/iter_test.go index 8b9b4161258..817ea935567 100644 --- a/pkg/dedup/iter_test.go +++ b/pkg/dedup/iter_test.go @@ -534,7 +534,7 @@ func TestDedupSeriesSet(t *testing.T) { if tcase.isCounter { f = "rate" } - dedupSet := NewSeriesSet(&mockedSeriesSet{series: tcase.input}, f, false) + dedupSet := NewSeriesSet(&mockedSeriesSet{series: tcase.input}, f) var ats []storage.Series for dedupSet.Next() { ats = append(ats, dedupSet.At()) @@ -660,38 +660,3 @@ func expandSeries(t testing.TB, it chunkenc.Iterator) (res []sample) { testutil.Ok(t, it.Err()) return res } - -func TestPushdownSeriesIterator(t *testing.T) { - cases := []struct { - a, b, exp []sample - function string - tcase string - }{ - { - tcase: "simple case", - a: []sample{{10000, 10}, {20000, 11}, {30000, 12}, {40000, 13}}, - b: []sample{{10000, 20}, {20000, 21}, {30000, 22}, {40000, 23}}, - exp: []sample{{10000, 20}, {20000, 21}, {30000, 22}, {40000, 23}}, - function: "max", - }, - { - tcase: "gaps but catches up", - a: []sample{{10000, 10}, {20000, 11}, {30000, 12}, {40000, 13}}, - b: []sample{{10000, 20}, {40000, 23}}, - exp: []sample{{10000, 20}, {20000, 11}, {30000, 12}, {40000, 23}}, - function: "max", - }, - } - for _, c := range cases { - t.Run(c.tcase, func(t *testing.T) { - it := newPushdownSeriesIterator( - noopAdjustableSeriesIterator{newMockedSeriesIterator(c.a)}, - noopAdjustableSeriesIterator{newMockedSeriesIterator(c.b)}, - c.function, - ) - res := expandSeries(t, noopAdjustableSeriesIterator{it}) - testutil.Equals(t, c.exp, res) - }) - - } -} diff --git a/pkg/dedup/pushdown_iter.go b/pkg/dedup/pushdown_iter.go deleted file mode 100644 index 76f8958e79f..00000000000 --- a/pkg/dedup/pushdown_iter.go +++ /dev/null @@ -1,146 +0,0 @@ -// Copyright (c) The Thanos Authors. -// Licensed under the Apache License 2.0. - -package dedup - -import ( - "fmt" - "math" - - "github.com/prometheus/prometheus/model/histogram" - "github.com/prometheus/prometheus/model/labels" - "github.com/prometheus/prometheus/tsdb/chunkenc" -) - -// PushdownMarker is a label that gets attached on pushed down series so that -// the receiver would be able to handle them in potentially special way. -var PushdownMarker = labels.Label{Name: "__thanos_pushed_down", Value: "true"} - -type pushdownSeriesIterator struct { - a, b chunkenc.Iterator - aval, bval chunkenc.ValueType - aused, bused bool - - function func(float64, float64) float64 -} - -// newPushdownSeriesIterator constructs a new iterator that steps through both -// series and performs the following algorithm: -// * If both timestamps match up then the function is applied on them; -// * If one of the series has a gap then the other one is used until the timestamps match up. -// It is guaranteed that stepping through both of them that the timestamps will match eventually -// because the samples have been processed by a PromQL engine. -func newPushdownSeriesIterator(a, b chunkenc.Iterator, function string) *pushdownSeriesIterator { - var fn func(float64, float64) float64 - switch function { - case "max", "max_over_time": - fn = math.Max - case "min", "min_over_time": - fn = math.Min - default: - panic(fmt.Errorf("unsupported function %s passed", function)) - } - return &pushdownSeriesIterator{ - a: a, b: b, function: fn, aused: true, bused: true, - } -} - -func (it *pushdownSeriesIterator) Next() chunkenc.ValueType { - // Push A if we've used A before. Push B if we've used B before. - // Push both if we've used both before. - switch { - case !it.aused && !it.bused: - return chunkenc.ValNone - case it.aused && !it.bused: - it.aval = it.a.Next() - case !it.aused && it.bused: - it.bval = it.b.Next() - case it.aused && it.bused: - it.aval = it.a.Next() - it.bval = it.b.Next() - } - it.aused = false - it.bused = false - - if it.aval != chunkenc.ValNone { - return it.aval - } - - if it.bval != chunkenc.ValNone { - return it.bval - } - - return chunkenc.ValNone -} - -func (it *pushdownSeriesIterator) At() (int64, float64) { - - var timestamp int64 - var val float64 - - if it.aval != chunkenc.ValNone && it.bval != chunkenc.ValNone { - ta, va := it.a.At() - tb, vb := it.b.At() - if ta == tb { - val = it.function(va, vb) - timestamp = ta - it.aused = true - it.bused = true - } else { - if ta < tb { - timestamp = ta - val = va - it.aused = true - } else { - timestamp = tb - val = vb - it.bused = true - } - } - } else if it.aval != chunkenc.ValNone { - ta, va := it.a.At() - val = va - timestamp = ta - it.aused = true - } else { - tb, vb := it.b.At() - val = vb - timestamp = tb - it.bused = true - } - - return timestamp, val -} - -// TODO(rabenhorst): Needs to be implemented for native histogram support. -func (it *pushdownSeriesIterator) AtHistogram() (int64, *histogram.Histogram) { - panic("not implemented") -} - -func (it *pushdownSeriesIterator) AtFloatHistogram() (int64, *histogram.FloatHistogram) { - panic("not implemented") -} - -func (it *pushdownSeriesIterator) AtT() int64 { - t := it.a.AtT() - return t -} - -func (it *pushdownSeriesIterator) Seek(t int64) chunkenc.ValueType { - for { - ts := it.AtT() - if ts >= t { - return chunkenc.ValFloat - } - if it.Next() == chunkenc.ValNone { - return chunkenc.ValNone - } - } -} - -func (it *pushdownSeriesIterator) Err() error { - if it.a.Err() != nil { - return it.a.Err() - } - return it.b.Err() -} diff --git a/pkg/query/querier.go b/pkg/query/querier.go index cdfdfcadd9f..0cfcc2ad211 100644 --- a/pkg/query/querier.go +++ b/pkg/query/querier.go @@ -14,7 +14,6 @@ import ( "github.com/pkg/errors" "github.com/prometheus/client_golang/prometheus" - "github.com/prometheus/common/model" "github.com/prometheus/prometheus/model/labels" "github.com/prometheus/prometheus/storage" "github.com/prometheus/prometheus/util/annotations" @@ -53,7 +52,6 @@ type QueryableCreator func( storeDebugMatchers [][]*labels.Matcher, maxResolutionMillis int64, partialResponse, - enableQueryPushdown, skipChunks bool, shardInfo *storepb.ShardInfo, seriesStatsReporter seriesStatsReporter, @@ -76,7 +74,6 @@ func NewQueryableCreator( storeDebugMatchers [][]*labels.Matcher, maxResolutionMillis int64, partialResponse, - enableQueryPushdown, skipChunks bool, shardInfo *storepb.ShardInfo, seriesStatsReporter seriesStatsReporter, @@ -95,7 +92,6 @@ func NewQueryableCreator( }, maxConcurrentSelects: maxConcurrentSelects, selectTimeout: selectTimeout, - enableQueryPushdown: enableQueryPushdown, shardInfo: shardInfo, seriesStatsReporter: seriesStatsReporter, } @@ -114,14 +110,13 @@ type queryable struct { gateProviderFn func() gate.Gate maxConcurrentSelects int selectTimeout time.Duration - enableQueryPushdown bool shardInfo *storepb.ShardInfo seriesStatsReporter seriesStatsReporter } // Querier returns a new storage querier against the underlying proxy store API. func (q *queryable) Querier(mint, maxt int64) (storage.Querier, error) { - return newQuerier(q.logger, mint, maxt, q.replicaLabels, q.storeDebugMatchers, q.proxy, q.deduplicate, q.maxResolutionMillis, q.partialResponse, q.enableQueryPushdown, q.skipChunks, q.gateProviderFn(), q.selectTimeout, q.shardInfo, q.seriesStatsReporter), nil + return newQuerier(q.logger, mint, maxt, q.replicaLabels, q.storeDebugMatchers, q.proxy, q.deduplicate, q.maxResolutionMillis, q.partialResponse, q.skipChunks, q.gateProviderFn(), q.selectTimeout, q.shardInfo, q.seriesStatsReporter), nil } type querier struct { @@ -133,7 +128,6 @@ type querier struct { deduplicate bool maxResolutionMillis int64 partialResponseStrategy storepb.PartialResponseStrategy - enableQueryPushdown bool skipChunks bool selectGate gate.Gate selectTimeout time.Duration @@ -153,7 +147,6 @@ func newQuerier( deduplicate bool, maxResolutionMillis int64, partialResponse, - enableQueryPushdown, skipChunks bool, selectGate gate.Gate, selectTimeout time.Duration, @@ -186,7 +179,6 @@ func newQuerier( maxResolutionMillis: maxResolutionMillis, partialResponseStrategy: partialResponseStrategy, skipChunks: skipChunks, - enableQueryPushdown: enableQueryPushdown, shardInfo: shardInfo, seriesStatsReporter: seriesStatsReporter, } @@ -359,9 +351,7 @@ func (q *querier) selectFn(ctx context.Context, hints *storage.SelectHints, ms . ShardInfo: q.shardInfo, PartialResponseStrategy: q.partialResponseStrategy, SkipChunks: q.skipChunks, - } - if q.enableQueryPushdown { - req.QueryHints = storeHintsFromPromHints(hints) + QueryHints: storeHintsFromPromHints(hints), } if q.isDedupEnabled() { // Soft ask to sort without replica labels and push them at the end of labelset. @@ -373,22 +363,6 @@ func (q *querier) selectFn(ctx context.Context, hints *storage.SelectHints, ms . } warns := annotations.New().Merge(resp.warnings) - if q.enableQueryPushdown && (hints.Func == "max_over_time" || hints.Func == "min_over_time") { - // On query pushdown, delete the metric's name from the result because that's what the - // PromQL does either way, and we want our iterator to work with data - // that was either pushed down or not. - for i := range resp.seriesSet { - lbls := resp.seriesSet[i].Labels - for j, lbl := range lbls { - if lbl.Name != model.MetricNameLabel { - continue - } - resp.seriesSet[i].Labels = append(resp.seriesSet[i].Labels[:j], resp.seriesSet[i].Labels[j+1:]...) - break - } - } - } - if !q.isDedupEnabled() { return &promSeriesSet{ mint: q.mint, @@ -400,7 +374,7 @@ func (q *querier) selectFn(ctx context.Context, hints *storage.SelectHints, ms . } // TODO(bwplotka): Move to deduplication on chunk level inside promSeriesSet, similar to what we have in dedup.NewDedupChunkMerger(). - // This however require big refactor, caring about correct AggrChunk to iterator conversion, pushdown logic and counter reset apply. + // This however require big refactor, caring about correct AggrChunk to iterator conversion and counter reset apply. // For now we apply simple logic that splits potential overlapping chunks into separate replica series, so we can split the work. set := &promSeriesSet{ mint: q.mint, @@ -410,7 +384,7 @@ func (q *querier) selectFn(ctx context.Context, hints *storage.SelectHints, ms . warns: warns, } - return dedup.NewSeriesSet(set, hints.Func, q.enableQueryPushdown), resp.seriesSetStats, nil + return dedup.NewSeriesSet(set, hints.Func), resp.seriesSetStats, nil } // LabelValues returns all potential values for a label name. diff --git a/pkg/query/querier_test.go b/pkg/query/querier_test.go index faf56e1113b..c038cecfd5b 100644 --- a/pkg/query/querier_test.go +++ b/pkg/query/querier_test.go @@ -55,7 +55,6 @@ func TestQueryableCreator_MaxResolution(t *testing.T) { oneHourMillis, false, false, - false, nil, NoopSeriesStatsReporter, ) @@ -97,7 +96,6 @@ func TestQuerier_DownsampledData(t *testing.T) { 9999999, false, false, - false, nil, NoopSeriesStatsReporter, ) @@ -395,7 +393,7 @@ func TestQuerier_Select_AfterPromQL(t *testing.T) { g := gate.New(2) mq := &mockedQueryable{ Creator: func(mint, maxt int64) storage.Querier { - return newQuerier(nil, mint, maxt, tcase.replicaLabels, nil, tcase.storeAPI, sc.dedup, 0, true, false, false, g, timeout, nil, NoopSeriesStatsReporter) + return newQuerier(nil, mint, maxt, tcase.replicaLabels, nil, tcase.storeAPI, sc.dedup, 0, true, false, g, timeout, nil, NoopSeriesStatsReporter) }, } t.Cleanup(func() { @@ -784,7 +782,6 @@ func TestQuerier_Select(t *testing.T) { 0, true, false, - false, g, timeout, nil, @@ -1078,7 +1075,7 @@ func TestQuerierWithDedupUnderstoodByPromQL_Rate(t *testing.T) { timeout := 100 * time.Second g := gate.New(2) - q := newQuerier(logger, realSeriesWithStaleMarkerMint, realSeriesWithStaleMarkerMaxt, []string{"replica"}, nil, newProxyStore(s), false, 0, true, false, false, g, timeout, nil, NoopSeriesStatsReporter) + q := newQuerier(logger, realSeriesWithStaleMarkerMint, realSeriesWithStaleMarkerMaxt, []string{"replica"}, nil, newProxyStore(s), false, 0, true, false, g, timeout, nil, NoopSeriesStatsReporter) t.Cleanup(func() { testutil.Ok(t, q.Close()) }) @@ -1148,7 +1145,7 @@ func TestQuerierWithDedupUnderstoodByPromQL_Rate(t *testing.T) { timeout := 5 * time.Second g := gate.New(2) - q := newQuerier(logger, realSeriesWithStaleMarkerMint, realSeriesWithStaleMarkerMaxt, []string{"replica"}, nil, newProxyStore(s), true, 0, true, false, false, g, timeout, nil, NoopSeriesStatsReporter) + q := newQuerier(logger, realSeriesWithStaleMarkerMint, realSeriesWithStaleMarkerMaxt, []string{"replica"}, nil, newProxyStore(s), true, 0, true, false, g, timeout, nil, NoopSeriesStatsReporter) t.Cleanup(func() { testutil.Ok(t, q.Close()) }) diff --git a/pkg/query/query_bench_test.go b/pkg/query/query_bench_test.go index 48a9d7673da..44e4373a26b 100644 --- a/pkg/query/query_bench_test.go +++ b/pkg/query/query_bench_test.go @@ -94,7 +94,6 @@ func benchQuerySelect(t testutil.TB, totalSamples, totalSeries int, dedup bool) 0, false, false, - false, gate.NewNoop(), 10*time.Second, nil, diff --git a/pkg/query/query_test.go b/pkg/query/query_test.go index 2f36a3ca2b3..14e05080cdc 100644 --- a/pkg/query/query_test.go +++ b/pkg/query/query_test.go @@ -62,7 +62,6 @@ func TestQuerier_Proxy(t *testing.T) { 0, false, false, - false, nil, NoopSeriesStatsReporter, ) diff --git a/pkg/query/test_test.go b/pkg/query/test_test.go index 612d44f0f8c..b8457870cac 100644 --- a/pkg/query/test_test.go +++ b/pkg/query/test_test.go @@ -89,7 +89,7 @@ func (s *testStore) close(t testing.TB) { } // NewTest returns an initialized empty Test. -// It's compatible with promql.Test, allowing additionally multi StoreAPIs for query pushdown testing. +// It's compatible with promql.Test, allowing additionally multi StoreAPIs. // TODO(bwplotka): Move to unittest and add add support for multi-store upstream. See: https://github.com/prometheus/prometheus/pull/8300 func newTest(t testing.TB, input string) (*test, error) { cmds, err := parse(input) diff --git a/pkg/store/prometheus.go b/pkg/store/prometheus.go index eea0334dd03..c03d489a95a 100644 --- a/pkg/store/prometheus.go +++ b/pkg/store/prometheus.go @@ -16,9 +16,6 @@ import ( "strings" "sync" - "github.com/prometheus/common/model" - "github.com/prometheus/prometheus/model/timestamp" - "github.com/blang/semver/v4" "github.com/go-kit/log" "github.com/go-kit/log/level" @@ -35,7 +32,6 @@ import ( "github.com/thanos-io/thanos/pkg/clientconfig" "github.com/thanos-io/thanos/pkg/component" - "github.com/thanos-io/thanos/pkg/dedup" "github.com/thanos-io/thanos/pkg/info/infopb" "github.com/thanos-io/thanos/pkg/promclient" "github.com/thanos-io/thanos/pkg/runutil" @@ -167,8 +163,7 @@ func (p *PrometheusStore) Series(r *storepb.SeriesRequest, seriesSrv storepb.Sto // Don't ask for more than available time. This includes potential `minTime` flag limit. availableMinTime, _ := p.timestamps() if r.MinTime < availableMinTime { - // If pushdown is enabled then align min time with the step to avoid missing data - // when it gets retrieved by the upper layer's PromQL engine. + // Align min time with the step to avoid missing data when it gets retrieved by the upper layer's PromQL engine. // This also is necessary when Sidecar uploads a block and then availableMinTime // becomes a fixed timestamp. if r.QueryHints != nil && r.QueryHints.StepMillis != 0 { @@ -215,10 +210,6 @@ func (p *PrometheusStore) Series(r *storepb.SeriesRequest, seriesSrv storepb.Sto shardMatcher := r.ShardInfo.Matcher(&p.buffers) defer shardMatcher.Close() - if r.QueryHints != nil && r.QueryHints.IsSafeToExecute() && !shardMatcher.IsSharded() { - return p.queryPrometheus(s, r, extLsetToRemove) - } - q := &prompb.Query{StartTimestampMs: r.MinTime, EndTimestampMs: r.MaxTime} for _, m := range matchers { pm := &prompb.LabelMatcher{Name: m.Name, Value: m.Value} @@ -260,76 +251,6 @@ func (p *PrometheusStore) Series(r *storepb.SeriesRequest, seriesSrv storepb.Sto return p.handleStreamedPrometheusResponse(s, shardMatcher, httpResp, queryPrometheusSpan, extLset, enableChunkHashCalculation, extLsetToRemove) } -func (p *PrometheusStore) queryPrometheus( - s flushableServer, - r *storepb.SeriesRequest, - extLsetToRemove map[string]struct{}, -) error { - var matrix model.Matrix - - opts := promclient.QueryOptions{} - step := r.QueryHints.StepMillis / 1000 - if step != 0 { - result, _, _, err := p.client.QueryRange(s.Context(), p.base, r.ToPromQL(), r.MinTime, r.MaxTime, step, opts) - if err != nil { - return err - } - matrix = result - } else { - vector, _, _, err := p.client.QueryInstant(s.Context(), p.base, r.ToPromQL(), timestamp.Time(r.MaxTime), opts) - if err != nil { - return err - } - - matrix = make(model.Matrix, 0, len(vector)) - for _, sample := range vector { - matrix = append(matrix, &model.SampleStream{ - Metric: sample.Metric, - Values: []model.SamplePair{ - { - Timestamp: sample.Timestamp, - Value: sample.Value, - }, - }, - }) - } - } - - externalLbls := rmLabels(p.externalLabelsFn().Copy(), extLsetToRemove) - b := labels.NewScratchBuilder(16) - for _, vector := range matrix { - b.Reset() - - // Attach labels from samples. - for k, v := range vector.Metric { - b.Add(string(k), string(v)) - } - b.Add(dedup.PushdownMarker.Name, dedup.PushdownMarker.Value) - b.Sort() - - finalLbls := labelpb.ExtendSortedLabels(b.Labels(), externalLbls) - - series := &prompb.TimeSeries{ - Labels: labelpb.ZLabelsFromPromLabels(finalLbls), - Samples: prompb.SamplesFromSamplePairs(vector.Values), - } - - chks, err := p.chunkSamples(series, MaxSamplesPerChunk, enableChunkHashCalculation) - if err != nil { - return err - } - - if err := s.Send(storepb.NewSeriesResponse(&storepb.Series{ - Labels: series.Labels, - Chunks: chks, - })); err != nil { - return err - } - } - - return s.Flush() -} - func (p *PrometheusStore) handleSampledPrometheusResponse( s flushableServer, httpResp *http.Response, diff --git a/pkg/store/prometheus_test.go b/pkg/store/prometheus_test.go index 079d1f2f4d4..ac7060a04a4 100644 --- a/pkg/store/prometheus_test.go +++ b/pkg/store/prometheus_test.go @@ -154,38 +154,6 @@ func testPrometheusStoreSeriesE2e(t *testing.T, prefix string) { testutil.Equals(t, []string(nil), srv.Warnings) testutil.Equals(t, "rpc error: code = InvalidArgument desc = no matchers specified (excluding external labels)", err.Error()) } - // Querying with pushdown. - { - srv := newStoreSeriesServer(ctx) - testutil.Ok(t, proxy.Series(&storepb.SeriesRequest{ - MinTime: baseT + 101, - MaxTime: baseT + 300, - Matchers: []storepb.LabelMatcher{ - {Type: storepb.LabelMatcher_EQ, Name: "a", Value: "b"}, - }, - QueryHints: &storepb.QueryHints{Func: &storepb.Func{Name: "min_over_time"}, Range: &storepb.Range{Millis: 300}}, - }, srv)) - - testutil.Equals(t, 1, len(srv.SeriesSet)) - - testutil.Equals(t, []labelpb.ZLabel{ - {Name: "__thanos_pushed_down", Value: "true"}, - {Name: "a", Value: "b"}, - {Name: "region", Value: "eu-west"}, - }, srv.SeriesSet[0].Labels) - testutil.Equals(t, []string(nil), srv.Warnings) - testutil.Equals(t, 1, len(srv.SeriesSet[0].Chunks)) - - c := srv.SeriesSet[0].Chunks[0] - testutil.Equals(t, storepb.Chunk_XOR, c.Raw.Type) - - chk, err := chunkenc.FromData(chunkenc.EncXOR, c.Raw.Data) - testutil.Ok(t, err) - - samples := expandChunk(chk.Iterator(nil)) - testutil.Equals(t, []sample{{baseT + 300, 1}}, samples) - - } } type sample struct { diff --git a/pkg/store/proxy_heap_test.go b/pkg/store/proxy_heap_test.go index 50fe2d46beb..e6cf2f199a3 100644 --- a/pkg/store/proxy_heap_test.go +++ b/pkg/store/proxy_heap_test.go @@ -10,7 +10,6 @@ import ( "github.com/efficientgo/core/testutil" "github.com/prometheus/prometheus/model/labels" - "github.com/thanos-io/thanos/pkg/dedup" "github.com/thanos-io/thanos/pkg/errors" "github.com/thanos-io/thanos/pkg/store/storepb" ) @@ -248,22 +247,6 @@ func TestSortWithoutLabels(t *testing.T) { }, dedupLabels: map[string]struct{}{"b": {}, "b1": {}}, }, - // Pushdown label at the end. - { - input: []*storepb.SeriesResponse{ - storeSeriesResponse(t, labelsFromStrings("a", "1", "b", "replica-1", "c", "3")), - storeSeriesResponse(t, labelsFromStrings("a", "1", "b", "replica-1", "c", "3", "d", "4")), - storeSeriesResponse(t, labelsFromStrings("a", "1", "b", "replica-1", "c", "4", dedup.PushdownMarker.Name, dedup.PushdownMarker.Value)), - storeSeriesResponse(t, labelsFromStrings("a", "1", "b", "replica-2", "c", "3")), - }, - exp: []*storepb.SeriesResponse{ - storeSeriesResponse(t, labelsFromStrings("a", "1", "c", "3")), - storeSeriesResponse(t, labelsFromStrings("a", "1", "c", "3")), - storeSeriesResponse(t, labelsFromStrings("a", "1", "c", "3", "d", "4")), - storeSeriesResponse(t, labelsFromStrings("a", "1", "c", "4", dedup.PushdownMarker.Name, dedup.PushdownMarker.Value)), - }, - dedupLabels: map[string]struct{}{"b": {}}, - }, // Non series responses mixed. { input: []*storepb.SeriesResponse{ diff --git a/pkg/store/storepb/custom.go b/pkg/store/storepb/custom.go index 09a6f9ff980..faed79bc7b1 100644 --- a/pkg/store/storepb/custom.go +++ b/pkg/store/storepb/custom.go @@ -532,23 +532,3 @@ func (c *SeriesStatsCounter) Count(series *Series) { func (m *SeriesRequest) ToPromQL() string { return m.QueryHints.toPromQL(m.Matchers) } - -// IsSafeToExecute returns true if the function or aggregation from the query hint -// can be safely executed by the underlying Prometheus instance without affecting the -// result of the query. -func (m *QueryHints) IsSafeToExecute() bool { - distributiveOperations := []string{ - "max", - "max_over_time", - "min", - "min_over_time", - "group", - } - for _, op := range distributiveOperations { - if m.Func.Name == op { - return true - } - } - - return false -} diff --git a/test/e2e/native_histograms_test.go b/test/e2e/native_histograms_test.go index 29c15326610..09a61261688 100644 --- a/test/e2e/native_histograms_test.go +++ b/test/e2e/native_histograms_test.go @@ -54,7 +54,6 @@ func TestQueryNativeHistograms(t *testing.T) { testutil.Ok(t, e2e.StartAndWaitReady(prom1, sidecar1, prom2, sidecar2)) querier := e2ethanos.NewQuerierBuilder(e, "querier", sidecar1.InternalEndpoint("grpc"), sidecar2.InternalEndpoint("grpc")). - WithEnabledFeatures([]string{"query-pushdown"}). Init() testutil.Ok(t, e2e.StartAndWaitReady(querier)) @@ -95,7 +94,7 @@ func TestQueryNativeHistograms(t *testing.T) { }) }) - t.Run("query histogram using group function for testing pushdown", func(t *testing.T) { + t.Run("query histogram using group function", func(t *testing.T) { queryAndAssert(t, ctx, querier.Endpoint("http"), func() string { return fmt.Sprintf("group(%v)", testHistogramMetricName) }, ts, promclient.QueryOptions{Deduplicate: true}, model.Vector{ &model.Sample{ Value: 1, diff --git a/test/e2e/query_test.go b/test/e2e/query_test.go index 1844bb7a235..3fa09041720 100644 --- a/test/e2e/query_test.go +++ b/test/e2e/query_test.go @@ -936,124 +936,6 @@ func TestQueryStoreMetrics(t *testing.T) { } -// Regression test for https://github.com/thanos-io/thanos/issues/5033. -// Tests whether queries work with mixed sources, and with functions -// that we are pushing down: min, max, min_over_time, max_over_time, -// group. -func TestSidecarStorePushdown(t *testing.T) { - t.Parallel() - - // Build up. - e, err := e2e.NewDockerEnvironment("sidecar-pushdown") - testutil.Ok(t, err) - t.Cleanup(e2ethanos.CleanScenario(t, e)) - - prom1, sidecar1 := e2ethanos.NewPrometheusWithSidecar(e, "p1", e2ethanos.DefaultPromConfig("p1", 0, "", ""), "", e2ethanos.DefaultPrometheusImage(), "", "remote-write-receiver") - testutil.Ok(t, e2e.StartAndWaitReady(prom1, sidecar1)) - - const bucket = "store-gateway-test-sidecar-pushdown" - m := e2edb.NewMinio(e, "thanos-minio", bucket, e2edb.WithMinioTLS()) - testutil.Ok(t, e2e.StartAndWaitReady(m)) - - dir := filepath.Join(e.SharedDir(), "tmp") - testutil.Ok(t, os.MkdirAll(filepath.Join(e.SharedDir(), dir), os.ModePerm)) - - series := []labels.Labels{labels.FromStrings("__name__", "my_fake_metric", "instance", "foo")} - extLset := labels.FromStrings("prometheus", "p1", "replica", "0") - - ctx, cancel := context.WithTimeout(context.Background(), 2*time.Minute) - t.Cleanup(cancel) - - now := time.Now() - id1, err := e2eutil.CreateBlockWithBlockDelay(ctx, dir, series, 10, timestamp.FromTime(now), timestamp.FromTime(now.Add(2*time.Hour)), 30*time.Minute, extLset, 0, metadata.NoneFunc) - testutil.Ok(t, err) - - l := log.NewLogfmtLogger(os.Stdout) - bkt, err := s3.NewBucketWithConfig(l, e2ethanos.NewS3Config(bucket, m.Endpoint("http"), m.Dir()), "test") - testutil.Ok(t, err) - testutil.Ok(t, objstore.UploadDir(ctx, l, bkt, path.Join(dir, id1.String()), id1.String())) - - s1 := e2ethanos.NewStoreGW( - e, - "1", - client.BucketConfig{ - Type: client.S3, - Config: e2ethanos.NewS3Config(bucket, m.InternalEndpoint("http"), m.InternalDir()), - }, - "", - "", - nil, - ) - testutil.Ok(t, e2e.StartAndWaitReady(s1)) - - q := e2ethanos.NewQuerierBuilder(e, "1", s1.InternalEndpoint("grpc"), sidecar1.InternalEndpoint("grpc")).WithEnabledFeatures([]string{"query-pushdown"}).Init() - testutil.Ok(t, e2e.StartAndWaitReady(q)) - testutil.Ok(t, s1.WaitSumMetrics(e2emon.Equals(1), "thanos_blocks_meta_synced")) - - testutil.Ok(t, synthesizeFakeMetricSamples(ctx, prom1, []fakeMetricSample{ - { - label: "foo", - value: 123, - timestampUnixNano: now.UnixNano(), - }, - })) - - queryAndAssertSeries(t, ctx, q.Endpoint("http"), func() string { - return "max_over_time(my_fake_metric[2h])" - }, time.Now, promclient.QueryOptions{ - Deduplicate: true, - }, []model.Metric{ - { - "instance": "foo", - "prometheus": "p1", - }, - }) - - queryAndAssertSeries(t, ctx, q.Endpoint("http"), func() string { - return "max(my_fake_metric) by (__name__, instance)" - }, time.Now, promclient.QueryOptions{ - Deduplicate: true, - }, []model.Metric{ - { - "instance": "foo", - "__name__": "my_fake_metric", - }, - }) - - queryAndAssertSeries(t, ctx, q.Endpoint("http"), func() string { - return "min_over_time(my_fake_metric[2h])" - }, time.Now, promclient.QueryOptions{ - Deduplicate: true, - }, []model.Metric{ - { - "instance": "foo", - "prometheus": "p1", - }, - }) - - queryAndAssertSeries(t, ctx, q.Endpoint("http"), func() string { - return "min(my_fake_metric) by (instance, __name__)" - }, time.Now, promclient.QueryOptions{ - Deduplicate: true, - }, []model.Metric{ - { - "instance": "foo", - "__name__": "my_fake_metric", - }, - }) - - queryAndAssertSeries(t, ctx, q.Endpoint("http"), func() string { - return "group(my_fake_metric) by (__name__, instance)" - }, time.Now, promclient.QueryOptions{ - Deduplicate: true, - }, []model.Metric{ - { - "instance": "foo", - "__name__": "my_fake_metric", - }, - }) -} - type seriesWithLabels struct { intLabels labels.Labels extLabels labels.Labels @@ -1482,7 +1364,7 @@ func TestSidecarQueryEvaluation(t *testing.T) { for _, tc := range ts { t.Run(tc.query, func(t *testing.T) { - e, err := e2e.NewDockerEnvironment("query-pushdown") + e, err := e2e.NewDockerEnvironment("query-evaluation") testutil.Ok(t, err) t.Cleanup(e2ethanos.CleanScenario(t, e)) @@ -1498,7 +1380,6 @@ func TestSidecarQueryEvaluation(t *testing.T) { } q := e2ethanos. NewQuerierBuilder(e, "1", endpoints...). - WithEnabledFeatures([]string{"query-pushdown"}). Init() testutil.Ok(t, e2e.StartAndWaitReady(q)) @@ -1914,222 +1795,6 @@ func storeWriteRequest(ctx context.Context, rawRemoteWriteURL string, req *promp return client.Store(ctx, compressed, 0) } -func TestSidecarQueryEvaluationWithDedup(t *testing.T) { - t.Parallel() - - timeNow := time.Now().UnixNano() - - ts := []struct { - prom1Samples []fakeMetricSample - prom2Samples []fakeMetricSample - query string - result model.Vector - }{ - { - query: "max (my_fake_metric)", - prom1Samples: []fakeMetricSample{{"i1", 1, timeNow}, {"i2", 5, timeNow}, {"i3", 9, timeNow}}, - prom2Samples: []fakeMetricSample{{"i1", 3, timeNow}, {"i2", 4, timeNow}, {"i3", 10, timeNow}}, - result: []*model.Sample{ - { - Metric: map[model.LabelName]model.LabelValue{}, - Value: 10, - }, - }, - }, - { - query: "max by (instance) (my_fake_metric)", - prom1Samples: []fakeMetricSample{{"i1", 1, timeNow}, {"i2", 5, timeNow}, {"i3", 9, timeNow}}, - prom2Samples: []fakeMetricSample{{"i1", 3, timeNow}, {"i2", 4, timeNow}, {"i3", 10, timeNow}}, - result: []*model.Sample{ - { - Metric: map[model.LabelName]model.LabelValue{"instance": "i1"}, - Value: 3, - }, - { - Metric: map[model.LabelName]model.LabelValue{"instance": "i2"}, - Value: 5, - }, - { - Metric: map[model.LabelName]model.LabelValue{"instance": "i3"}, - Value: 10, - }, - }, - }, - { - query: "group by (instance) (my_fake_metric)", - prom1Samples: []fakeMetricSample{{"i1", 1, timeNow}, {"i2", 5, timeNow}, {"i3", 9, timeNow}}, - prom2Samples: []fakeMetricSample{{"i1", 3, timeNow}, {"i2", 4, timeNow}}, - result: []*model.Sample{ - { - Metric: map[model.LabelName]model.LabelValue{"instance": "i1"}, - Value: 1, - }, - { - Metric: map[model.LabelName]model.LabelValue{"instance": "i2"}, - Value: 1, - }, - { - Metric: map[model.LabelName]model.LabelValue{"instance": "i3"}, - Value: 1, - }, - }, - }, - { - query: "max_over_time(my_fake_metric[10m])", - prom1Samples: []fakeMetricSample{{"i1", 1, timeNow}, {"i2", 5, timeNow}}, - prom2Samples: []fakeMetricSample{{"i1", 3, timeNow}}, - result: []*model.Sample{ - { - Metric: map[model.LabelName]model.LabelValue{"instance": "i1", "prometheus": "p1"}, - Value: 3, - }, - { - Metric: map[model.LabelName]model.LabelValue{"instance": "i2", "prometheus": "p1"}, - Value: 5, - }, - }, - }, - { - query: "min_over_time(my_fake_metric[10m])", - prom1Samples: []fakeMetricSample{{"i1", 1, timeNow}, {"i2", 5, timeNow}}, - prom2Samples: []fakeMetricSample{{"i1", 3, timeNow}}, - result: []*model.Sample{ - { - Metric: map[model.LabelName]model.LabelValue{"instance": "i1", "prometheus": "p1"}, - Value: 1, - }, - { - Metric: map[model.LabelName]model.LabelValue{"instance": "i2", "prometheus": "p1"}, - Value: 5, - }, - }, - }, - } - - for _, tc := range ts { - t.Run(tc.query, func(t *testing.T) { - e, err := e2e.NewDockerEnvironment("pushdown-dedup") - testutil.Ok(t, err) - t.Cleanup(e2ethanos.CleanScenario(t, e)) - - prom1, sidecar1 := e2ethanos.NewPrometheusWithSidecar(e, "p1", e2ethanos.DefaultPromConfig("p1", 0, "", ""), "", e2ethanos.DefaultPrometheusImage(), "", "remote-write-receiver") - testutil.Ok(t, e2e.StartAndWaitReady(prom1, sidecar1)) - - prom2, sidecar2 := e2ethanos.NewPrometheusWithSidecar(e, "p2", e2ethanos.DefaultPromConfig("p1", 1, "", ""), "", e2ethanos.DefaultPrometheusImage(), "", "remote-write-receiver") - testutil.Ok(t, e2e.StartAndWaitReady(prom2, sidecar2)) - - endpoints := []string{ - sidecar1.InternalEndpoint("grpc"), - sidecar2.InternalEndpoint("grpc"), - } - q := e2ethanos. - NewQuerierBuilder(e, "1", endpoints...). - WithEnabledFeatures([]string{"query-pushdown"}). - Init() - testutil.Ok(t, err) - testutil.Ok(t, e2e.StartAndWaitReady(q)) - - ctx, cancel := context.WithTimeout(context.Background(), 1*time.Minute) - t.Cleanup(cancel) - - testutil.Ok(t, synthesizeFakeMetricSamples(ctx, prom1, tc.prom1Samples)) - testutil.Ok(t, synthesizeFakeMetricSamples(ctx, prom2, tc.prom2Samples)) - - testQuery := func() string { return tc.query } - queryAndAssert(t, ctx, q.Endpoint("http"), testQuery, time.Now, promclient.QueryOptions{ - Deduplicate: true, - }, tc.result) - }) - } -} - -// TestSidecarStoreAlignmentPushdown tests how pushdown works with -// --min-time and --max-time. -func TestSidecarAlignmentPushdown(t *testing.T) { - t.Parallel() - - e, err := e2e.NewDockerEnvironment("pushdown-min-max") - testutil.Ok(t, err) - t.Cleanup(e2ethanos.CleanScenario(t, e)) - - now := time.Now() - - prom1, sidecar1 := e2ethanos.NewPrometheusWithSidecar(e, "p1", e2ethanos.DefaultPromConfig("p1", 0, "", ""), "", e2ethanos.DefaultPrometheusImage(), now.Add(time.Duration(-1)*time.Hour).Format(time.RFC3339), now.Format(time.RFC3339), "remote-write-receiver") - testutil.Ok(t, e2e.StartAndWaitReady(prom1, sidecar1)) - - endpoints := []string{ - sidecar1.InternalEndpoint("grpc"), - } - q1 := e2ethanos. - NewQuerierBuilder(e, "1", endpoints...). - Init() - testutil.Ok(t, err) - testutil.Ok(t, e2e.StartAndWaitReady(q1)) - q2 := e2ethanos. - NewQuerierBuilder(e, "2", endpoints...). - WithEnabledFeatures([]string{"query-pushdown"}). - Init() - testutil.Ok(t, err) - testutil.Ok(t, e2e.StartAndWaitReady(q2)) - - ctx, cancel := context.WithTimeout(context.Background(), 1*time.Minute) - t.Cleanup(cancel) - - samples := make([]fakeMetricSample, 0) - for i := now.Add(time.Duration(-3) * time.Hour); i.Before(now); i = i.Add(30 * time.Second) { - samples = append(samples, fakeMetricSample{ - label: "test", - value: 1, - timestampUnixNano: i.UnixNano(), - }) - } - - testutil.Ok(t, synthesizeFakeMetricSamples(ctx, prom1, samples)) - - // This query should have identical requests. - testQuery := func() string { return `max_over_time({instance="test"}[5m])` } - - logger := log.NewLogfmtLogger(os.Stdout) - logger = log.With(logger, "ts", log.DefaultTimestampUTC) - - var expectedRes model.Matrix - testutil.Ok(t, runutil.RetryWithLog(logger, time.Second, ctx.Done(), func() error { - res, warnings, _, err := promclient.NewDefaultClient().QueryRange(ctx, urlParse(t, "http://"+q1.Endpoint("http")), testQuery(), - timestamp.FromTime(now.Add(time.Duration(-7*24)*time.Hour)), - timestamp.FromTime(now), - 2419, // Taken from UI. - promclient.QueryOptions{ - Deduplicate: true, - }) - if err != nil { - return err - } - - if len(warnings) > 0 { - return errors.Errorf("unexpected warnings %s", warnings) - } - - if len(res) == 0 { - return errors.Errorf("got empty result") - } - - expectedRes = res - return nil - })) - - rangeQuery(t, ctx, q2.Endpoint("http"), testQuery, timestamp.FromTime(now.Add(time.Duration(-7*24)*time.Hour)), - timestamp.FromTime(now), - 2419, // Taken from UI. - promclient.QueryOptions{ - Deduplicate: true, - }, func(res model.Matrix) error { - if !reflect.DeepEqual(res, expectedRes) { - return fmt.Errorf("unexpected results (got %v but expected %v)", res, expectedRes) - } - return nil - }) -} - func TestGrpcInstantQuery(t *testing.T) { t.Parallel()