diff --git a/cmd/thanos/query.go b/cmd/thanos/query.go index 82f04f67d19..5311d584845 100644 --- a/cmd/thanos/query.go +++ b/cmd/thanos/query.go @@ -197,7 +197,7 @@ func registerQuery(app *extkingpin.App) { activeQueryDir := cmd.Flag("query.active-query-path", "Directory to log currently active queries in the queries.active file.").Default("").String() - featureList := cmd.Flag("enable-feature", "Comma separated experimental feature names to enable.The current list of features is "+queryPushdown+".").Default("").Strings() + featureList := cmd.Flag("enable-feature", "Comma separated experimental feature names to enable.The current list of features is empty.").Default("").Strings() enableExemplarPartialResponse := cmd.Flag("exemplar.partial-response", "Enable partial response for exemplar endpoint. --no-exemplar.partial-response for disabling."). Hidden().Default("true").Bool() @@ -230,17 +230,16 @@ func registerQuery(app *extkingpin.App) { return errors.Wrap(err, "parse federation labels") } - var enableQueryPushdown bool for _, feature := range *featureList { - if feature == queryPushdown { - enableQueryPushdown = true - } if feature == promqlAtModifier { level.Warn(logger).Log("msg", "This option for --enable-feature is now permanently enabled and therefore a no-op.", "option", promqlAtModifier) } if feature == promqlNegativeOffset { level.Warn(logger).Log("msg", "This option for --enable-feature is now permanently enabled and therefore a no-op.", "option", promqlNegativeOffset) } + if feature == queryPushdown { + level.Warn(logger).Log("msg", "This option for --enable-feature is now permanently deprecated and therefore ignored.", "option", queryPushdown) + } } httpLogOpts, err := logging.ParseHTTPOptions(reqLogConfig) @@ -333,7 +332,6 @@ func registerQuery(app *extkingpin.App) { *strictEndpoints, *strictEndpointGroups, *webDisableCORS, - enableQueryPushdown, *alertQueryURL, *grpcProxyStrategy, component.Query, @@ -412,7 +410,6 @@ func runQuery( strictEndpoints []string, strictEndpointGroups []string, disableCORS bool, - enableQueryPushdown bool, alertQueryURL string, grpcProxyStrategy string, comp component.Component, @@ -699,7 +696,6 @@ func runQuery( enableTargetPartialResponse, enableMetricMetadataPartialResponse, enableExemplarPartialResponse, - enableQueryPushdown, queryReplicaLabels, flagsMap, defaultRangeQueryStep, diff --git a/docs/components/query.md b/docs/components/query.md index 2c4f4734642..70d9a830912 100644 --- a/docs/components/query.md +++ b/docs/components/query.md @@ -272,9 +272,8 @@ Flags: --alert.query-url=ALERT.QUERY-URL The external Thanos Query URL that would be set in all alerts 'Source' field. - --enable-feature= ... Comma separated experimental feature names - to enable.The current list of features is - query-pushdown. + --enable-feature= ... Comma separated experimental feature names to + enable.The current list of features is empty. --endpoint= ... Addresses of statically configured Thanos API servers (repeatable). The scheme may be prefixed with 'dns+' or 'dnssrv+' to detect diff --git a/pkg/api/query/grpc.go b/pkg/api/query/grpc.go index 769b0001c5b..8b72b98da30 100644 --- a/pkg/api/query/grpc.go +++ b/pkg/api/query/grpc.go @@ -97,7 +97,6 @@ func (g *GRPCAPI) Query(request *querypb.QueryRequest, server querypb.Query_Quer storeMatchers, maxResolution, request.EnablePartialResponse, - request.EnableQueryPushdown, false, request.ShardInfo, query.NoopSeriesStatsReporter, @@ -195,7 +194,6 @@ func (g *GRPCAPI) QueryRange(request *querypb.QueryRangeRequest, srv querypb.Que storeMatchers, maxResolution, request.EnablePartialResponse, - request.EnableQueryPushdown, false, request.ShardInfo, query.NoopSeriesStatsReporter, diff --git a/pkg/api/query/querypb/query.pb.go b/pkg/api/query/querypb/query.pb.go index bd5b5d47246..79ea1d95c2f 100644 --- a/pkg/api/query/querypb/query.pb.go +++ b/pkg/api/query/querypb/query.pb.go @@ -59,19 +59,20 @@ func (EngineType) EnumDescriptor() ([]byte, []int) { } type QueryRequest struct { - Query string `protobuf:"bytes,1,opt,name=query,proto3" json:"query,omitempty"` - TimeSeconds int64 `protobuf:"varint,2,opt,name=time_seconds,json=timeSeconds,proto3" json:"time_seconds,omitempty"` - TimeoutSeconds int64 `protobuf:"varint,3,opt,name=timeout_seconds,json=timeoutSeconds,proto3" json:"timeout_seconds,omitempty"` - MaxResolutionSeconds int64 `protobuf:"varint,4,opt,name=max_resolution_seconds,json=maxResolutionSeconds,proto3" json:"max_resolution_seconds,omitempty"` - ReplicaLabels []string `protobuf:"bytes,5,rep,name=replica_labels,json=replicaLabels,proto3" json:"replica_labels,omitempty"` - StoreMatchers []StoreMatchers `protobuf:"bytes,6,rep,name=storeMatchers,proto3" json:"storeMatchers"` - EnableDedup bool `protobuf:"varint,7,opt,name=enableDedup,proto3" json:"enableDedup,omitempty"` - EnablePartialResponse bool `protobuf:"varint,8,opt,name=enablePartialResponse,proto3" json:"enablePartialResponse,omitempty"` - EnableQueryPushdown bool `protobuf:"varint,9,opt,name=enableQueryPushdown,proto3" json:"enableQueryPushdown,omitempty"` - SkipChunks bool `protobuf:"varint,10,opt,name=skipChunks,proto3" json:"skipChunks,omitempty"` - ShardInfo *storepb.ShardInfo `protobuf:"bytes,11,opt,name=shard_info,json=shardInfo,proto3" json:"shard_info,omitempty"` - LookbackDeltaSeconds int64 `protobuf:"varint,12,opt,name=lookback_delta_seconds,json=lookbackDeltaSeconds,proto3" json:"lookback_delta_seconds,omitempty"` - Engine EngineType `protobuf:"varint,13,opt,name=engine,proto3,enum=thanos.EngineType" json:"engine,omitempty"` + Query string `protobuf:"bytes,1,opt,name=query,proto3" json:"query,omitempty"` + TimeSeconds int64 `protobuf:"varint,2,opt,name=time_seconds,json=timeSeconds,proto3" json:"time_seconds,omitempty"` + TimeoutSeconds int64 `protobuf:"varint,3,opt,name=timeout_seconds,json=timeoutSeconds,proto3" json:"timeout_seconds,omitempty"` + MaxResolutionSeconds int64 `protobuf:"varint,4,opt,name=max_resolution_seconds,json=maxResolutionSeconds,proto3" json:"max_resolution_seconds,omitempty"` + ReplicaLabels []string `protobuf:"bytes,5,rep,name=replica_labels,json=replicaLabels,proto3" json:"replica_labels,omitempty"` + StoreMatchers []StoreMatchers `protobuf:"bytes,6,rep,name=storeMatchers,proto3" json:"storeMatchers"` + EnableDedup bool `protobuf:"varint,7,opt,name=enableDedup,proto3" json:"enableDedup,omitempty"` + EnablePartialResponse bool `protobuf:"varint,8,opt,name=enablePartialResponse,proto3" json:"enablePartialResponse,omitempty"` + // Deprecated + EnableQueryPushdown bool `protobuf:"varint,9,opt,name=enableQueryPushdown,proto3" json:"enableQueryPushdown,omitempty"` + SkipChunks bool `protobuf:"varint,10,opt,name=skipChunks,proto3" json:"skipChunks,omitempty"` + ShardInfo *storepb.ShardInfo `protobuf:"bytes,11,opt,name=shard_info,json=shardInfo,proto3" json:"shard_info,omitempty"` + LookbackDeltaSeconds int64 `protobuf:"varint,12,opt,name=lookback_delta_seconds,json=lookbackDeltaSeconds,proto3" json:"lookback_delta_seconds,omitempty"` + Engine EngineType `protobuf:"varint,13,opt,name=engine,proto3,enum=thanos.EngineType" json:"engine,omitempty"` } func (m *QueryRequest) Reset() { *m = QueryRequest{} } @@ -230,21 +231,22 @@ func (*QueryResponse) XXX_OneofWrappers() []interface{} { } type QueryRangeRequest struct { - Query string `protobuf:"bytes,1,opt,name=query,proto3" json:"query,omitempty"` - StartTimeSeconds int64 `protobuf:"varint,2,opt,name=start_time_seconds,json=startTimeSeconds,proto3" json:"start_time_seconds,omitempty"` - EndTimeSeconds int64 `protobuf:"varint,3,opt,name=end_time_seconds,json=endTimeSeconds,proto3" json:"end_time_seconds,omitempty"` - IntervalSeconds int64 `protobuf:"varint,4,opt,name=interval_seconds,json=intervalSeconds,proto3" json:"interval_seconds,omitempty"` - TimeoutSeconds int64 `protobuf:"varint,5,opt,name=timeout_seconds,json=timeoutSeconds,proto3" json:"timeout_seconds,omitempty"` - MaxResolutionSeconds int64 `protobuf:"varint,6,opt,name=max_resolution_seconds,json=maxResolutionSeconds,proto3" json:"max_resolution_seconds,omitempty"` - ReplicaLabels []string `protobuf:"bytes,7,rep,name=replica_labels,json=replicaLabels,proto3" json:"replica_labels,omitempty"` - StoreMatchers []StoreMatchers `protobuf:"bytes,8,rep,name=storeMatchers,proto3" json:"storeMatchers"` - EnableDedup bool `protobuf:"varint,9,opt,name=enableDedup,proto3" json:"enableDedup,omitempty"` - EnablePartialResponse bool `protobuf:"varint,10,opt,name=enablePartialResponse,proto3" json:"enablePartialResponse,omitempty"` - EnableQueryPushdown bool `protobuf:"varint,11,opt,name=enableQueryPushdown,proto3" json:"enableQueryPushdown,omitempty"` - SkipChunks bool `protobuf:"varint,12,opt,name=skipChunks,proto3" json:"skipChunks,omitempty"` - ShardInfo *storepb.ShardInfo `protobuf:"bytes,13,opt,name=shard_info,json=shardInfo,proto3" json:"shard_info,omitempty"` - LookbackDeltaSeconds int64 `protobuf:"varint,14,opt,name=lookback_delta_seconds,json=lookbackDeltaSeconds,proto3" json:"lookback_delta_seconds,omitempty"` - Engine EngineType `protobuf:"varint,15,opt,name=engine,proto3,enum=thanos.EngineType" json:"engine,omitempty"` + Query string `protobuf:"bytes,1,opt,name=query,proto3" json:"query,omitempty"` + StartTimeSeconds int64 `protobuf:"varint,2,opt,name=start_time_seconds,json=startTimeSeconds,proto3" json:"start_time_seconds,omitempty"` + EndTimeSeconds int64 `protobuf:"varint,3,opt,name=end_time_seconds,json=endTimeSeconds,proto3" json:"end_time_seconds,omitempty"` + IntervalSeconds int64 `protobuf:"varint,4,opt,name=interval_seconds,json=intervalSeconds,proto3" json:"interval_seconds,omitempty"` + TimeoutSeconds int64 `protobuf:"varint,5,opt,name=timeout_seconds,json=timeoutSeconds,proto3" json:"timeout_seconds,omitempty"` + MaxResolutionSeconds int64 `protobuf:"varint,6,opt,name=max_resolution_seconds,json=maxResolutionSeconds,proto3" json:"max_resolution_seconds,omitempty"` + ReplicaLabels []string `protobuf:"bytes,7,rep,name=replica_labels,json=replicaLabels,proto3" json:"replica_labels,omitempty"` + StoreMatchers []StoreMatchers `protobuf:"bytes,8,rep,name=storeMatchers,proto3" json:"storeMatchers"` + EnableDedup bool `protobuf:"varint,9,opt,name=enableDedup,proto3" json:"enableDedup,omitempty"` + EnablePartialResponse bool `protobuf:"varint,10,opt,name=enablePartialResponse,proto3" json:"enablePartialResponse,omitempty"` + // Deprecated + EnableQueryPushdown bool `protobuf:"varint,11,opt,name=enableQueryPushdown,proto3" json:"enableQueryPushdown,omitempty"` + SkipChunks bool `protobuf:"varint,12,opt,name=skipChunks,proto3" json:"skipChunks,omitempty"` + ShardInfo *storepb.ShardInfo `protobuf:"bytes,13,opt,name=shard_info,json=shardInfo,proto3" json:"shard_info,omitempty"` + LookbackDeltaSeconds int64 `protobuf:"varint,14,opt,name=lookback_delta_seconds,json=lookbackDeltaSeconds,proto3" json:"lookback_delta_seconds,omitempty"` + Engine EngineType `protobuf:"varint,15,opt,name=engine,proto3,enum=thanos.EngineType" json:"engine,omitempty"` } func (m *QueryRangeRequest) Reset() { *m = QueryRangeRequest{} } diff --git a/pkg/api/query/querypb/query.proto b/pkg/api/query/querypb/query.proto index 4d346454ef0..c173014b807 100644 --- a/pkg/api/query/querypb/query.proto +++ b/pkg/api/query/querypb/query.proto @@ -41,6 +41,7 @@ message QueryRequest { bool enableDedup = 7; bool enablePartialResponse = 8; + // Deprecated bool enableQueryPushdown = 9; bool skipChunks = 10; @@ -80,6 +81,7 @@ message QueryRangeRequest { bool enableDedup = 9; bool enablePartialResponse = 10; + // Deprecated bool enableQueryPushdown = 11; bool skipChunks = 12; diff --git a/pkg/api/query/v1.go b/pkg/api/query/v1.go index 9f654b549b8..59c83967828 100644 --- a/pkg/api/query/v1.go +++ b/pkg/api/query/v1.go @@ -158,7 +158,6 @@ type QueryAPI struct { enableTargetPartialResponse bool enableMetricMetadataPartialResponse bool enableExemplarPartialResponse bool - enableQueryPushdown bool disableCORS bool replicaLabels []string @@ -195,7 +194,6 @@ func NewQueryAPI( enableTargetPartialResponse bool, enableMetricMetadataPartialResponse bool, enableExemplarPartialResponse bool, - enableQueryPushdown bool, replicaLabels []string, flagsMap map[string]string, defaultRangeQueryStep time.Duration, @@ -230,7 +228,6 @@ func NewQueryAPI( enableTargetPartialResponse: enableTargetPartialResponse, enableMetricMetadataPartialResponse: enableMetricMetadataPartialResponse, enableExemplarPartialResponse: enableExemplarPartialResponse, - enableQueryPushdown: enableQueryPushdown, replicaLabels: replicaLabels, endpointStatus: endpointStatus, defaultRangeQueryStep: defaultRangeQueryStep, @@ -560,7 +557,6 @@ func (qapi *QueryAPI) queryExplain(r *http.Request) (interface{}, []error, *api. storeDebugMatchers, maxSourceResolution, enablePartialResponse, - qapi.enableQueryPushdown, false, shardInfo, query.NewAggregateStatsReporter(&seriesStats), @@ -665,7 +661,6 @@ func (qapi *QueryAPI) query(r *http.Request) (interface{}, []error, *api.ApiErro storeDebugMatchers, maxSourceResolution, enablePartialResponse, - qapi.enableQueryPushdown, false, shardInfo, query.NewAggregateStatsReporter(&seriesStats), @@ -832,7 +827,6 @@ func (qapi *QueryAPI) queryRangeExplain(r *http.Request) (interface{}, []error, storeDebugMatchers, maxSourceResolution, enablePartialResponse, - qapi.enableQueryPushdown, false, shardInfo, query.NewAggregateStatsReporter(&seriesStats), @@ -967,7 +961,6 @@ func (qapi *QueryAPI) queryRange(r *http.Request) (interface{}, []error, *api.Ap storeDebugMatchers, maxSourceResolution, enablePartialResponse, - qapi.enableQueryPushdown, false, shardInfo, query.NewAggregateStatsReporter(&seriesStats), @@ -1071,7 +1064,6 @@ func (qapi *QueryAPI) labelValues(r *http.Request) (interface{}, []error, *api.A storeDebugMatchers, 0, enablePartialResponse, - qapi.enableQueryPushdown, true, nil, query.NoopSeriesStatsReporter, @@ -1174,7 +1166,6 @@ func (qapi *QueryAPI) series(r *http.Request) (interface{}, []error, *api.ApiErr storeDebugMatchers, math.MaxInt64, enablePartialResponse, - qapi.enableQueryPushdown, true, nil, query.NoopSeriesStatsReporter, @@ -1241,7 +1232,6 @@ func (qapi *QueryAPI) labelNames(r *http.Request) (interface{}, []error, *api.Ap storeDebugMatchers, 0, enablePartialResponse, - qapi.enableQueryPushdown, true, nil, query.NoopSeriesStatsReporter, diff --git a/pkg/dedup/iter.go b/pkg/dedup/iter.go index 8f60c40363b..0a3a312f6fb 100644 --- a/pkg/dedup/iter.go +++ b/pkg/dedup/iter.go @@ -20,17 +20,12 @@ type dedupSeriesSet struct { isCounter bool replicas []storage.Series - // Pushed down series. Currently, they are being handled in a specific way. - // In the future, we might want to relax this and handle these depending - // on what function has been passed. - pushedDown []storage.Series lset labels.Labels peek storage.Series ok bool - f string - pushdownEnabled bool + f string } // isCounter deduces whether a counter metric has been passed. There must be @@ -107,9 +102,9 @@ func (o *overlapSplitSet) Err() error { // NewSeriesSet returns seriesSet that deduplicates the same series. // The series in series set are expected be sorted by all labels. -func NewSeriesSet(set storage.SeriesSet, f string, pushdownEnabled bool) storage.SeriesSet { +func NewSeriesSet(set storage.SeriesSet, f string) storage.SeriesSet { // TODO: remove dependency on knowing whether it is a counter. - s := &dedupSeriesSet{pushdownEnabled: pushdownEnabled, set: set, isCounter: isCounter(f), f: f} + s := &dedupSeriesSet{set: set, isCounter: isCounter(f), f: f} s.ok = s.set.Next() if s.ok { s.peek = s.set.At() @@ -117,34 +112,16 @@ func NewSeriesSet(set storage.SeriesSet, f string, pushdownEnabled bool) storage return s } -// trimPushdownMarker trims the pushdown marker from the given labels. -// Returns true if there was a pushdown marker. -func trimPushdownMarker(lbls labels.Labels) (labels.Labels, bool) { - return labels.NewBuilder(lbls).Del(PushdownMarker.Name).Labels(), lbls.Has(PushdownMarker.Name) -} - func (s *dedupSeriesSet) Next() bool { if !s.ok { return false } - // Reset both because they might have some leftovers. - if s.pushdownEnabled { - s.pushedDown = s.pushedDown[:0] - } s.replicas = s.replicas[:0] // Set the label set we are currently gathering to the peek element. s.lset = s.peek.Labels() + s.replicas = append(s.replicas[:0], s.peek) - pushedDown := false - if s.pushdownEnabled { - s.lset, pushedDown = trimPushdownMarker(s.lset) - } - if pushedDown { - s.pushedDown = append(s.pushedDown[:0], s.peek) - } else { - s.replicas = append(s.replicas[:0], s.peek) - } return s.next() } @@ -153,49 +130,31 @@ func (s *dedupSeriesSet) next() bool { s.ok = s.set.Next() if !s.ok { // There's no next series, the current replicas are the last element. - return len(s.replicas) > 0 || len(s.pushedDown) > 0 + return len(s.replicas) > 0 } s.peek = s.set.At() nextLset := s.peek.Labels() - var pushedDown bool - if s.pushdownEnabled { - nextLset, pushedDown = trimPushdownMarker(nextLset) - } - // If the label set modulo the replica label is equal to the current label set // look for more replicas, otherwise a series is complete. if !labels.Equal(s.lset, nextLset) { return true } - if pushedDown { - s.pushedDown = append(s.pushedDown, s.peek) - } else { - s.replicas = append(s.replicas, s.peek) - } + s.replicas = append(s.replicas, s.peek) return s.next() } func (s *dedupSeriesSet) At() storage.Series { - if len(s.replicas) == 1 && len(s.pushedDown) == 0 { + if len(s.replicas) == 1 { return seriesWithLabels{Series: s.replicas[0], lset: s.lset} } - if len(s.replicas) == 0 && len(s.pushedDown) == 1 { - return seriesWithLabels{Series: s.pushedDown[0], lset: s.lset} - } // Clients may store the series, so we must make a copy of the slice before advancing. repl := make([]storage.Series, len(s.replicas)) copy(repl, s.replicas) - var pushedDown []storage.Series - if s.pushdownEnabled { - pushedDown = make([]storage.Series, len(s.pushedDown)) - copy(pushedDown, s.pushedDown) - } - - return newDedupSeries(s.lset, repl, pushedDown, s.f) + return newDedupSeries(s.lset, repl, s.f) } func (s *dedupSeriesSet) Err() error { @@ -214,111 +173,22 @@ type seriesWithLabels struct { func (s seriesWithLabels) Labels() labels.Labels { return s.lset } type dedupSeries struct { - lset labels.Labels - replicas []storage.Series - pushedDown []storage.Series + lset labels.Labels + replicas []storage.Series isCounter bool f string } -func newDedupSeries(lset labels.Labels, replicas []storage.Series, pushedDown []storage.Series, f string) *dedupSeries { - return &dedupSeries{lset: lset, isCounter: isCounter(f), replicas: replicas, pushedDown: pushedDown, f: f} +func newDedupSeries(lset labels.Labels, replicas []storage.Series, f string) *dedupSeries { + return &dedupSeries{lset: lset, isCounter: isCounter(f), replicas: replicas, f: f} } func (s *dedupSeries) Labels() labels.Labels { return s.lset } -// pushdownIterator creates an iterator that handles -// all pushed down series. -func (s *dedupSeries) pushdownIterator(_ chunkenc.Iterator) chunkenc.Iterator { - var pushedDownIterator adjustableSeriesIterator - if s.isCounter { - pushedDownIterator = &counterErrAdjustSeriesIterator{Iterator: s.pushedDown[0].Iterator(nil)} - } else { - pushedDownIterator = noopAdjustableSeriesIterator{Iterator: s.pushedDown[0].Iterator(nil)} - } - - for _, o := range s.pushedDown[1:] { - var replicaIterator adjustableSeriesIterator - - if s.isCounter { - replicaIterator = &counterErrAdjustSeriesIterator{Iterator: o.Iterator(nil)} - } else { - replicaIterator = noopAdjustableSeriesIterator{Iterator: o.Iterator(nil)} - } - - pushedDownIterator = noopAdjustableSeriesIterator{newPushdownSeriesIterator(pushedDownIterator, replicaIterator, s.f)} - } - - return pushedDownIterator -} - -// allSeriesIterator creates an iterator over all series - pushed down -// and regular replicas. -func (s *dedupSeries) allSeriesIterator(_ chunkenc.Iterator) chunkenc.Iterator { - var replicasIterator, pushedDownIterator adjustableSeriesIterator - if len(s.replicas) != 0 { - if s.isCounter { - replicasIterator = &counterErrAdjustSeriesIterator{Iterator: s.replicas[0].Iterator(nil)} - } else { - replicasIterator = noopAdjustableSeriesIterator{Iterator: s.replicas[0].Iterator(nil)} - } - - for _, o := range s.replicas[1:] { - var replicaIter adjustableSeriesIterator - if s.isCounter { - replicaIter = &counterErrAdjustSeriesIterator{Iterator: o.Iterator(nil)} - } else { - replicaIter = noopAdjustableSeriesIterator{Iterator: o.Iterator(nil)} - } - replicasIterator = newDedupSeriesIterator(replicasIterator, replicaIter) - } - } - - if len(s.pushedDown) != 0 { - if s.isCounter { - pushedDownIterator = &counterErrAdjustSeriesIterator{Iterator: s.pushedDown[0].Iterator(nil)} - } else { - pushedDownIterator = noopAdjustableSeriesIterator{Iterator: s.pushedDown[0].Iterator(nil)} - } - - for _, o := range s.pushedDown[1:] { - var replicaIter adjustableSeriesIterator - if s.isCounter { - replicaIter = &counterErrAdjustSeriesIterator{Iterator: o.Iterator(nil)} - } else { - replicaIter = noopAdjustableSeriesIterator{Iterator: o.Iterator(nil)} - } - pushedDownIterator = newDedupSeriesIterator(pushedDownIterator, replicaIter) - } - } - - if replicasIterator == nil { - return pushedDownIterator - } - if pushedDownIterator == nil { - return replicasIterator - } - return newDedupSeriesIterator(pushedDownIterator, replicasIterator) -} - func (s *dedupSeries) Iterator(_ chunkenc.Iterator) chunkenc.Iterator { - // This function needs a regular iterator over all series. Behavior is identical - // whether it was pushed down or not. - if s.f == "group" { - return s.allSeriesIterator(nil) - } - // If there are no replicas then jump straight to constructing an iterator - // for pushed down series. - if len(s.replicas) == 0 { - return s.pushdownIterator(nil) - } - - // Finally, if we have both then construct a tree out of them. - // Pushed down series have their own special iterator. - // We deduplicate everything in the end. var it adjustableSeriesIterator if s.isCounter { it = &counterErrAdjustSeriesIterator{Iterator: s.replicas[0].Iterator(nil)} @@ -336,31 +206,7 @@ func (s *dedupSeries) Iterator(_ chunkenc.Iterator) chunkenc.Iterator { it = newDedupSeriesIterator(it, replicaIter) } - if len(s.pushedDown) == 0 { - return it - } - - // Join all of the pushed down iterators into one. - var pushedDownIterator adjustableSeriesIterator - if s.isCounter { - pushedDownIterator = &counterErrAdjustSeriesIterator{Iterator: s.pushedDown[0].Iterator(nil)} - } else { - pushedDownIterator = noopAdjustableSeriesIterator{Iterator: s.pushedDown[0].Iterator(nil)} - } - - for _, o := range s.pushedDown[1:] { - var replicaIterator adjustableSeriesIterator - - if s.isCounter { - replicaIterator = &counterErrAdjustSeriesIterator{Iterator: o.Iterator(nil)} - } else { - replicaIterator = noopAdjustableSeriesIterator{Iterator: o.Iterator(nil)} - } - - pushedDownIterator = noopAdjustableSeriesIterator{newPushdownSeriesIterator(pushedDownIterator, replicaIterator, s.f)} - } - - return newDedupSeriesIterator(it, pushedDownIterator) + return it } // adjustableSeriesIterator iterates over the data of a time series and allows to adjust current value based on diff --git a/pkg/dedup/iter_test.go b/pkg/dedup/iter_test.go index 8b9b4161258..817ea935567 100644 --- a/pkg/dedup/iter_test.go +++ b/pkg/dedup/iter_test.go @@ -534,7 +534,7 @@ func TestDedupSeriesSet(t *testing.T) { if tcase.isCounter { f = "rate" } - dedupSet := NewSeriesSet(&mockedSeriesSet{series: tcase.input}, f, false) + dedupSet := NewSeriesSet(&mockedSeriesSet{series: tcase.input}, f) var ats []storage.Series for dedupSet.Next() { ats = append(ats, dedupSet.At()) @@ -660,38 +660,3 @@ func expandSeries(t testing.TB, it chunkenc.Iterator) (res []sample) { testutil.Ok(t, it.Err()) return res } - -func TestPushdownSeriesIterator(t *testing.T) { - cases := []struct { - a, b, exp []sample - function string - tcase string - }{ - { - tcase: "simple case", - a: []sample{{10000, 10}, {20000, 11}, {30000, 12}, {40000, 13}}, - b: []sample{{10000, 20}, {20000, 21}, {30000, 22}, {40000, 23}}, - exp: []sample{{10000, 20}, {20000, 21}, {30000, 22}, {40000, 23}}, - function: "max", - }, - { - tcase: "gaps but catches up", - a: []sample{{10000, 10}, {20000, 11}, {30000, 12}, {40000, 13}}, - b: []sample{{10000, 20}, {40000, 23}}, - exp: []sample{{10000, 20}, {20000, 11}, {30000, 12}, {40000, 23}}, - function: "max", - }, - } - for _, c := range cases { - t.Run(c.tcase, func(t *testing.T) { - it := newPushdownSeriesIterator( - noopAdjustableSeriesIterator{newMockedSeriesIterator(c.a)}, - noopAdjustableSeriesIterator{newMockedSeriesIterator(c.b)}, - c.function, - ) - res := expandSeries(t, noopAdjustableSeriesIterator{it}) - testutil.Equals(t, c.exp, res) - }) - - } -} diff --git a/pkg/dedup/pushdown_iter.go b/pkg/dedup/pushdown_iter.go deleted file mode 100644 index 76f8958e79f..00000000000 --- a/pkg/dedup/pushdown_iter.go +++ /dev/null @@ -1,146 +0,0 @@ -// Copyright (c) The Thanos Authors. -// Licensed under the Apache License 2.0. - -package dedup - -import ( - "fmt" - "math" - - "github.com/prometheus/prometheus/model/histogram" - "github.com/prometheus/prometheus/model/labels" - "github.com/prometheus/prometheus/tsdb/chunkenc" -) - -// PushdownMarker is a label that gets attached on pushed down series so that -// the receiver would be able to handle them in potentially special way. -var PushdownMarker = labels.Label{Name: "__thanos_pushed_down", Value: "true"} - -type pushdownSeriesIterator struct { - a, b chunkenc.Iterator - aval, bval chunkenc.ValueType - aused, bused bool - - function func(float64, float64) float64 -} - -// newPushdownSeriesIterator constructs a new iterator that steps through both -// series and performs the following algorithm: -// * If both timestamps match up then the function is applied on them; -// * If one of the series has a gap then the other one is used until the timestamps match up. -// It is guaranteed that stepping through both of them that the timestamps will match eventually -// because the samples have been processed by a PromQL engine. -func newPushdownSeriesIterator(a, b chunkenc.Iterator, function string) *pushdownSeriesIterator { - var fn func(float64, float64) float64 - switch function { - case "max", "max_over_time": - fn = math.Max - case "min", "min_over_time": - fn = math.Min - default: - panic(fmt.Errorf("unsupported function %s passed", function)) - } - return &pushdownSeriesIterator{ - a: a, b: b, function: fn, aused: true, bused: true, - } -} - -func (it *pushdownSeriesIterator) Next() chunkenc.ValueType { - // Push A if we've used A before. Push B if we've used B before. - // Push both if we've used both before. - switch { - case !it.aused && !it.bused: - return chunkenc.ValNone - case it.aused && !it.bused: - it.aval = it.a.Next() - case !it.aused && it.bused: - it.bval = it.b.Next() - case it.aused && it.bused: - it.aval = it.a.Next() - it.bval = it.b.Next() - } - it.aused = false - it.bused = false - - if it.aval != chunkenc.ValNone { - return it.aval - } - - if it.bval != chunkenc.ValNone { - return it.bval - } - - return chunkenc.ValNone -} - -func (it *pushdownSeriesIterator) At() (int64, float64) { - - var timestamp int64 - var val float64 - - if it.aval != chunkenc.ValNone && it.bval != chunkenc.ValNone { - ta, va := it.a.At() - tb, vb := it.b.At() - if ta == tb { - val = it.function(va, vb) - timestamp = ta - it.aused = true - it.bused = true - } else { - if ta < tb { - timestamp = ta - val = va - it.aused = true - } else { - timestamp = tb - val = vb - it.bused = true - } - } - } else if it.aval != chunkenc.ValNone { - ta, va := it.a.At() - val = va - timestamp = ta - it.aused = true - } else { - tb, vb := it.b.At() - val = vb - timestamp = tb - it.bused = true - } - - return timestamp, val -} - -// TODO(rabenhorst): Needs to be implemented for native histogram support. -func (it *pushdownSeriesIterator) AtHistogram() (int64, *histogram.Histogram) { - panic("not implemented") -} - -func (it *pushdownSeriesIterator) AtFloatHistogram() (int64, *histogram.FloatHistogram) { - panic("not implemented") -} - -func (it *pushdownSeriesIterator) AtT() int64 { - t := it.a.AtT() - return t -} - -func (it *pushdownSeriesIterator) Seek(t int64) chunkenc.ValueType { - for { - ts := it.AtT() - if ts >= t { - return chunkenc.ValFloat - } - if it.Next() == chunkenc.ValNone { - return chunkenc.ValNone - } - } -} - -func (it *pushdownSeriesIterator) Err() error { - if it.a.Err() != nil { - return it.a.Err() - } - return it.b.Err() -} diff --git a/pkg/query/querier.go b/pkg/query/querier.go index cdfdfcadd9f..0cfcc2ad211 100644 --- a/pkg/query/querier.go +++ b/pkg/query/querier.go @@ -14,7 +14,6 @@ import ( "github.com/pkg/errors" "github.com/prometheus/client_golang/prometheus" - "github.com/prometheus/common/model" "github.com/prometheus/prometheus/model/labels" "github.com/prometheus/prometheus/storage" "github.com/prometheus/prometheus/util/annotations" @@ -53,7 +52,6 @@ type QueryableCreator func( storeDebugMatchers [][]*labels.Matcher, maxResolutionMillis int64, partialResponse, - enableQueryPushdown, skipChunks bool, shardInfo *storepb.ShardInfo, seriesStatsReporter seriesStatsReporter, @@ -76,7 +74,6 @@ func NewQueryableCreator( storeDebugMatchers [][]*labels.Matcher, maxResolutionMillis int64, partialResponse, - enableQueryPushdown, skipChunks bool, shardInfo *storepb.ShardInfo, seriesStatsReporter seriesStatsReporter, @@ -95,7 +92,6 @@ func NewQueryableCreator( }, maxConcurrentSelects: maxConcurrentSelects, selectTimeout: selectTimeout, - enableQueryPushdown: enableQueryPushdown, shardInfo: shardInfo, seriesStatsReporter: seriesStatsReporter, } @@ -114,14 +110,13 @@ type queryable struct { gateProviderFn func() gate.Gate maxConcurrentSelects int selectTimeout time.Duration - enableQueryPushdown bool shardInfo *storepb.ShardInfo seriesStatsReporter seriesStatsReporter } // Querier returns a new storage querier against the underlying proxy store API. func (q *queryable) Querier(mint, maxt int64) (storage.Querier, error) { - return newQuerier(q.logger, mint, maxt, q.replicaLabels, q.storeDebugMatchers, q.proxy, q.deduplicate, q.maxResolutionMillis, q.partialResponse, q.enableQueryPushdown, q.skipChunks, q.gateProviderFn(), q.selectTimeout, q.shardInfo, q.seriesStatsReporter), nil + return newQuerier(q.logger, mint, maxt, q.replicaLabels, q.storeDebugMatchers, q.proxy, q.deduplicate, q.maxResolutionMillis, q.partialResponse, q.skipChunks, q.gateProviderFn(), q.selectTimeout, q.shardInfo, q.seriesStatsReporter), nil } type querier struct { @@ -133,7 +128,6 @@ type querier struct { deduplicate bool maxResolutionMillis int64 partialResponseStrategy storepb.PartialResponseStrategy - enableQueryPushdown bool skipChunks bool selectGate gate.Gate selectTimeout time.Duration @@ -153,7 +147,6 @@ func newQuerier( deduplicate bool, maxResolutionMillis int64, partialResponse, - enableQueryPushdown, skipChunks bool, selectGate gate.Gate, selectTimeout time.Duration, @@ -186,7 +179,6 @@ func newQuerier( maxResolutionMillis: maxResolutionMillis, partialResponseStrategy: partialResponseStrategy, skipChunks: skipChunks, - enableQueryPushdown: enableQueryPushdown, shardInfo: shardInfo, seriesStatsReporter: seriesStatsReporter, } @@ -359,9 +351,7 @@ func (q *querier) selectFn(ctx context.Context, hints *storage.SelectHints, ms . ShardInfo: q.shardInfo, PartialResponseStrategy: q.partialResponseStrategy, SkipChunks: q.skipChunks, - } - if q.enableQueryPushdown { - req.QueryHints = storeHintsFromPromHints(hints) + QueryHints: storeHintsFromPromHints(hints), } if q.isDedupEnabled() { // Soft ask to sort without replica labels and push them at the end of labelset. @@ -373,22 +363,6 @@ func (q *querier) selectFn(ctx context.Context, hints *storage.SelectHints, ms . } warns := annotations.New().Merge(resp.warnings) - if q.enableQueryPushdown && (hints.Func == "max_over_time" || hints.Func == "min_over_time") { - // On query pushdown, delete the metric's name from the result because that's what the - // PromQL does either way, and we want our iterator to work with data - // that was either pushed down or not. - for i := range resp.seriesSet { - lbls := resp.seriesSet[i].Labels - for j, lbl := range lbls { - if lbl.Name != model.MetricNameLabel { - continue - } - resp.seriesSet[i].Labels = append(resp.seriesSet[i].Labels[:j], resp.seriesSet[i].Labels[j+1:]...) - break - } - } - } - if !q.isDedupEnabled() { return &promSeriesSet{ mint: q.mint, @@ -400,7 +374,7 @@ func (q *querier) selectFn(ctx context.Context, hints *storage.SelectHints, ms . } // TODO(bwplotka): Move to deduplication on chunk level inside promSeriesSet, similar to what we have in dedup.NewDedupChunkMerger(). - // This however require big refactor, caring about correct AggrChunk to iterator conversion, pushdown logic and counter reset apply. + // This however require big refactor, caring about correct AggrChunk to iterator conversion and counter reset apply. // For now we apply simple logic that splits potential overlapping chunks into separate replica series, so we can split the work. set := &promSeriesSet{ mint: q.mint, @@ -410,7 +384,7 @@ func (q *querier) selectFn(ctx context.Context, hints *storage.SelectHints, ms . warns: warns, } - return dedup.NewSeriesSet(set, hints.Func, q.enableQueryPushdown), resp.seriesSetStats, nil + return dedup.NewSeriesSet(set, hints.Func), resp.seriesSetStats, nil } // LabelValues returns all potential values for a label name. diff --git a/pkg/query/querier_test.go b/pkg/query/querier_test.go index faf56e1113b..c038cecfd5b 100644 --- a/pkg/query/querier_test.go +++ b/pkg/query/querier_test.go @@ -55,7 +55,6 @@ func TestQueryableCreator_MaxResolution(t *testing.T) { oneHourMillis, false, false, - false, nil, NoopSeriesStatsReporter, ) @@ -97,7 +96,6 @@ func TestQuerier_DownsampledData(t *testing.T) { 9999999, false, false, - false, nil, NoopSeriesStatsReporter, ) @@ -395,7 +393,7 @@ func TestQuerier_Select_AfterPromQL(t *testing.T) { g := gate.New(2) mq := &mockedQueryable{ Creator: func(mint, maxt int64) storage.Querier { - return newQuerier(nil, mint, maxt, tcase.replicaLabels, nil, tcase.storeAPI, sc.dedup, 0, true, false, false, g, timeout, nil, NoopSeriesStatsReporter) + return newQuerier(nil, mint, maxt, tcase.replicaLabels, nil, tcase.storeAPI, sc.dedup, 0, true, false, g, timeout, nil, NoopSeriesStatsReporter) }, } t.Cleanup(func() { @@ -784,7 +782,6 @@ func TestQuerier_Select(t *testing.T) { 0, true, false, - false, g, timeout, nil, @@ -1078,7 +1075,7 @@ func TestQuerierWithDedupUnderstoodByPromQL_Rate(t *testing.T) { timeout := 100 * time.Second g := gate.New(2) - q := newQuerier(logger, realSeriesWithStaleMarkerMint, realSeriesWithStaleMarkerMaxt, []string{"replica"}, nil, newProxyStore(s), false, 0, true, false, false, g, timeout, nil, NoopSeriesStatsReporter) + q := newQuerier(logger, realSeriesWithStaleMarkerMint, realSeriesWithStaleMarkerMaxt, []string{"replica"}, nil, newProxyStore(s), false, 0, true, false, g, timeout, nil, NoopSeriesStatsReporter) t.Cleanup(func() { testutil.Ok(t, q.Close()) }) @@ -1148,7 +1145,7 @@ func TestQuerierWithDedupUnderstoodByPromQL_Rate(t *testing.T) { timeout := 5 * time.Second g := gate.New(2) - q := newQuerier(logger, realSeriesWithStaleMarkerMint, realSeriesWithStaleMarkerMaxt, []string{"replica"}, nil, newProxyStore(s), true, 0, true, false, false, g, timeout, nil, NoopSeriesStatsReporter) + q := newQuerier(logger, realSeriesWithStaleMarkerMint, realSeriesWithStaleMarkerMaxt, []string{"replica"}, nil, newProxyStore(s), true, 0, true, false, g, timeout, nil, NoopSeriesStatsReporter) t.Cleanup(func() { testutil.Ok(t, q.Close()) }) diff --git a/pkg/query/query_bench_test.go b/pkg/query/query_bench_test.go index 48a9d7673da..44e4373a26b 100644 --- a/pkg/query/query_bench_test.go +++ b/pkg/query/query_bench_test.go @@ -94,7 +94,6 @@ func benchQuerySelect(t testutil.TB, totalSamples, totalSeries int, dedup bool) 0, false, false, - false, gate.NewNoop(), 10*time.Second, nil, diff --git a/pkg/query/query_test.go b/pkg/query/query_test.go index 2f36a3ca2b3..14e05080cdc 100644 --- a/pkg/query/query_test.go +++ b/pkg/query/query_test.go @@ -62,7 +62,6 @@ func TestQuerier_Proxy(t *testing.T) { 0, false, false, - false, nil, NoopSeriesStatsReporter, ) diff --git a/pkg/query/test_test.go b/pkg/query/test_test.go index 612d44f0f8c..b8457870cac 100644 --- a/pkg/query/test_test.go +++ b/pkg/query/test_test.go @@ -89,7 +89,7 @@ func (s *testStore) close(t testing.TB) { } // NewTest returns an initialized empty Test. -// It's compatible with promql.Test, allowing additionally multi StoreAPIs for query pushdown testing. +// It's compatible with promql.Test, allowing additionally multi StoreAPIs. // TODO(bwplotka): Move to unittest and add add support for multi-store upstream. See: https://github.com/prometheus/prometheus/pull/8300 func newTest(t testing.TB, input string) (*test, error) { cmds, err := parse(input) diff --git a/pkg/store/prometheus.go b/pkg/store/prometheus.go index 197364ca040..701ed1cb6fa 100644 --- a/pkg/store/prometheus.go +++ b/pkg/store/prometheus.go @@ -16,9 +16,6 @@ import ( "strings" "sync" - "github.com/prometheus/common/model" - "github.com/prometheus/prometheus/model/timestamp" - "github.com/blang/semver/v4" "github.com/go-kit/log" "github.com/go-kit/log/level" @@ -35,7 +32,6 @@ import ( "github.com/thanos-io/thanos/pkg/clientconfig" "github.com/thanos-io/thanos/pkg/component" - "github.com/thanos-io/thanos/pkg/dedup" "github.com/thanos-io/thanos/pkg/info/infopb" "github.com/thanos-io/thanos/pkg/promclient" "github.com/thanos-io/thanos/pkg/runutil" @@ -162,8 +158,7 @@ func (p *PrometheusStore) Series(r *storepb.SeriesRequest, seriesSrv storepb.Sto // Don't ask for more than available time. This includes potential `minTime` flag limit. availableMinTime, _ := p.timestamps() if r.MinTime < availableMinTime { - // If pushdown is enabled then align min time with the step to avoid missing data - // when it gets retrieved by the upper layer's PromQL engine. + // Align min time with the step to avoid missing data when it gets retrieved by the upper layer's PromQL engine. // This also is necessary when Sidecar uploads a block and then availableMinTime // becomes a fixed timestamp. if r.QueryHints != nil && r.QueryHints.StepMillis != 0 { @@ -210,10 +205,6 @@ func (p *PrometheusStore) Series(r *storepb.SeriesRequest, seriesSrv storepb.Sto shardMatcher := r.ShardInfo.Matcher(&p.buffers) defer shardMatcher.Close() - if r.QueryHints != nil && r.QueryHints.IsSafeToExecute() && !shardMatcher.IsSharded() { - return p.queryPrometheus(s, r, extLsetToRemove) - } - q := &prompb.Query{StartTimestampMs: r.MinTime, EndTimestampMs: r.MaxTime} for _, m := range matchers { pm := &prompb.LabelMatcher{Name: m.Name, Value: m.Value} @@ -255,76 +246,6 @@ func (p *PrometheusStore) Series(r *storepb.SeriesRequest, seriesSrv storepb.Sto return p.handleStreamedPrometheusResponse(s, shardMatcher, httpResp, queryPrometheusSpan, extLset, enableChunkHashCalculation, extLsetToRemove) } -func (p *PrometheusStore) queryPrometheus( - s flushableServer, - r *storepb.SeriesRequest, - extLsetToRemove map[string]struct{}, -) error { - var matrix model.Matrix - - opts := promclient.QueryOptions{} - step := r.QueryHints.StepMillis / 1000 - if step != 0 { - result, _, _, err := p.client.QueryRange(s.Context(), p.base, r.ToPromQL(), r.MinTime, r.MaxTime, step, opts) - if err != nil { - return err - } - matrix = result - } else { - vector, _, _, err := p.client.QueryInstant(s.Context(), p.base, r.ToPromQL(), timestamp.Time(r.MaxTime), opts) - if err != nil { - return err - } - - matrix = make(model.Matrix, 0, len(vector)) - for _, sample := range vector { - matrix = append(matrix, &model.SampleStream{ - Metric: sample.Metric, - Values: []model.SamplePair{ - { - Timestamp: sample.Timestamp, - Value: sample.Value, - }, - }, - }) - } - } - - externalLbls := rmLabels(p.externalLabelsFn().Copy(), extLsetToRemove) - b := labels.NewScratchBuilder(16) - for _, vector := range matrix { - b.Reset() - - // Attach labels from samples. - for k, v := range vector.Metric { - b.Add(string(k), string(v)) - } - b.Add(dedup.PushdownMarker.Name, dedup.PushdownMarker.Value) - b.Sort() - - finalLbls := labelpb.ExtendSortedLabels(b.Labels(), externalLbls) - - series := &prompb.TimeSeries{ - Labels: labelpb.ZLabelsFromPromLabels(finalLbls), - Samples: prompb.SamplesFromSamplePairs(vector.Values), - } - - chks, err := p.chunkSamples(series, MaxSamplesPerChunk, enableChunkHashCalculation) - if err != nil { - return err - } - - if err := s.Send(storepb.NewSeriesResponse(&storepb.Series{ - Labels: series.Labels, - Chunks: chks, - })); err != nil { - return err - } - } - - return s.Flush() -} - func (p *PrometheusStore) handleSampledPrometheusResponse( s flushableServer, httpResp *http.Response, diff --git a/pkg/store/prometheus_test.go b/pkg/store/prometheus_test.go index 5b7ff187366..68b5aab14ed 100644 --- a/pkg/store/prometheus_test.go +++ b/pkg/store/prometheus_test.go @@ -156,38 +156,6 @@ func testPrometheusStoreSeriesE2e(t *testing.T, prefix string) { testutil.Equals(t, []string(nil), srv.Warnings) testutil.Equals(t, "rpc error: code = InvalidArgument desc = no matchers specified (excluding external labels)", err.Error()) } - // Querying with pushdown. - { - srv := newStoreSeriesServer(ctx) - testutil.Ok(t, proxy.Series(&storepb.SeriesRequest{ - MinTime: baseT + 101, - MaxTime: baseT + 300, - Matchers: []storepb.LabelMatcher{ - {Type: storepb.LabelMatcher_EQ, Name: "a", Value: "b"}, - }, - QueryHints: &storepb.QueryHints{Func: &storepb.Func{Name: "min_over_time"}, Range: &storepb.Range{Millis: 300}}, - }, srv)) - - testutil.Equals(t, 1, len(srv.SeriesSet)) - - testutil.Equals(t, []labelpb.ZLabel{ - {Name: "__thanos_pushed_down", Value: "true"}, - {Name: "a", Value: "b"}, - {Name: "region", Value: "eu-west"}, - }, srv.SeriesSet[0].Labels) - testutil.Equals(t, []string(nil), srv.Warnings) - testutil.Equals(t, 1, len(srv.SeriesSet[0].Chunks)) - - c := srv.SeriesSet[0].Chunks[0] - testutil.Equals(t, storepb.Chunk_XOR, c.Raw.Type) - - chk, err := chunkenc.FromData(chunkenc.EncXOR, c.Raw.Data) - testutil.Ok(t, err) - - samples := expandChunk(chk.Iterator(nil)) - testutil.Equals(t, []sample{{baseT + 300, 1}}, samples) - - } } type sample struct { diff --git a/pkg/store/proxy_heap_test.go b/pkg/store/proxy_heap_test.go index 50fe2d46beb..e6cf2f199a3 100644 --- a/pkg/store/proxy_heap_test.go +++ b/pkg/store/proxy_heap_test.go @@ -10,7 +10,6 @@ import ( "github.com/efficientgo/core/testutil" "github.com/prometheus/prometheus/model/labels" - "github.com/thanos-io/thanos/pkg/dedup" "github.com/thanos-io/thanos/pkg/errors" "github.com/thanos-io/thanos/pkg/store/storepb" ) @@ -248,22 +247,6 @@ func TestSortWithoutLabels(t *testing.T) { }, dedupLabels: map[string]struct{}{"b": {}, "b1": {}}, }, - // Pushdown label at the end. - { - input: []*storepb.SeriesResponse{ - storeSeriesResponse(t, labelsFromStrings("a", "1", "b", "replica-1", "c", "3")), - storeSeriesResponse(t, labelsFromStrings("a", "1", "b", "replica-1", "c", "3", "d", "4")), - storeSeriesResponse(t, labelsFromStrings("a", "1", "b", "replica-1", "c", "4", dedup.PushdownMarker.Name, dedup.PushdownMarker.Value)), - storeSeriesResponse(t, labelsFromStrings("a", "1", "b", "replica-2", "c", "3")), - }, - exp: []*storepb.SeriesResponse{ - storeSeriesResponse(t, labelsFromStrings("a", "1", "c", "3")), - storeSeriesResponse(t, labelsFromStrings("a", "1", "c", "3")), - storeSeriesResponse(t, labelsFromStrings("a", "1", "c", "3", "d", "4")), - storeSeriesResponse(t, labelsFromStrings("a", "1", "c", "4", dedup.PushdownMarker.Name, dedup.PushdownMarker.Value)), - }, - dedupLabels: map[string]struct{}{"b": {}}, - }, // Non series responses mixed. { input: []*storepb.SeriesResponse{ diff --git a/pkg/store/storepb/custom.go b/pkg/store/storepb/custom.go index 09a6f9ff980..faed79bc7b1 100644 --- a/pkg/store/storepb/custom.go +++ b/pkg/store/storepb/custom.go @@ -532,23 +532,3 @@ func (c *SeriesStatsCounter) Count(series *Series) { func (m *SeriesRequest) ToPromQL() string { return m.QueryHints.toPromQL(m.Matchers) } - -// IsSafeToExecute returns true if the function or aggregation from the query hint -// can be safely executed by the underlying Prometheus instance without affecting the -// result of the query. -func (m *QueryHints) IsSafeToExecute() bool { - distributiveOperations := []string{ - "max", - "max_over_time", - "min", - "min_over_time", - "group", - } - for _, op := range distributiveOperations { - if m.Func.Name == op { - return true - } - } - - return false -} diff --git a/test/e2e/native_histograms_test.go b/test/e2e/native_histograms_test.go index 29c15326610..09a61261688 100644 --- a/test/e2e/native_histograms_test.go +++ b/test/e2e/native_histograms_test.go @@ -54,7 +54,6 @@ func TestQueryNativeHistograms(t *testing.T) { testutil.Ok(t, e2e.StartAndWaitReady(prom1, sidecar1, prom2, sidecar2)) querier := e2ethanos.NewQuerierBuilder(e, "querier", sidecar1.InternalEndpoint("grpc"), sidecar2.InternalEndpoint("grpc")). - WithEnabledFeatures([]string{"query-pushdown"}). Init() testutil.Ok(t, e2e.StartAndWaitReady(querier)) @@ -95,7 +94,7 @@ func TestQueryNativeHistograms(t *testing.T) { }) }) - t.Run("query histogram using group function for testing pushdown", func(t *testing.T) { + t.Run("query histogram using group function", func(t *testing.T) { queryAndAssert(t, ctx, querier.Endpoint("http"), func() string { return fmt.Sprintf("group(%v)", testHistogramMetricName) }, ts, promclient.QueryOptions{Deduplicate: true}, model.Vector{ &model.Sample{ Value: 1, diff --git a/test/e2e/query_test.go b/test/e2e/query_test.go index 6584c7b8426..211c6c8eec2 100644 --- a/test/e2e/query_test.go +++ b/test/e2e/query_test.go @@ -889,124 +889,6 @@ func TestQueryStoreMetrics(t *testing.T) { } -// Regression test for https://github.com/thanos-io/thanos/issues/5033. -// Tests whether queries work with mixed sources, and with functions -// that we are pushing down: min, max, min_over_time, max_over_time, -// group. -func TestSidecarStorePushdown(t *testing.T) { - t.Parallel() - - // Build up. - e, err := e2e.NewDockerEnvironment("sidecar-pushdown") - testutil.Ok(t, err) - t.Cleanup(e2ethanos.CleanScenario(t, e)) - - prom1, sidecar1 := e2ethanos.NewPrometheusWithSidecar(e, "p1", e2ethanos.DefaultPromConfig("p1", 0, "", ""), "", e2ethanos.DefaultPrometheusImage(), "", "remote-write-receiver") - testutil.Ok(t, e2e.StartAndWaitReady(prom1, sidecar1)) - - const bucket = "store-gateway-test-sidecar-pushdown" - m := e2edb.NewMinio(e, "thanos-minio", bucket, e2edb.WithMinioTLS()) - testutil.Ok(t, e2e.StartAndWaitReady(m)) - - dir := filepath.Join(e.SharedDir(), "tmp") - testutil.Ok(t, os.MkdirAll(filepath.Join(e.SharedDir(), dir), os.ModePerm)) - - series := []labels.Labels{labels.FromStrings("__name__", "my_fake_metric", "instance", "foo")} - extLset := labels.FromStrings("prometheus", "p1", "replica", "0") - - ctx, cancel := context.WithTimeout(context.Background(), 2*time.Minute) - t.Cleanup(cancel) - - now := time.Now() - id1, err := e2eutil.CreateBlockWithBlockDelay(ctx, dir, series, 10, timestamp.FromTime(now), timestamp.FromTime(now.Add(2*time.Hour)), 30*time.Minute, extLset, 0, metadata.NoneFunc) - testutil.Ok(t, err) - - l := log.NewLogfmtLogger(os.Stdout) - bkt, err := s3.NewBucketWithConfig(l, e2ethanos.NewS3Config(bucket, m.Endpoint("http"), m.Dir()), "test") - testutil.Ok(t, err) - testutil.Ok(t, objstore.UploadDir(ctx, l, bkt, path.Join(dir, id1.String()), id1.String())) - - s1 := e2ethanos.NewStoreGW( - e, - "1", - client.BucketConfig{ - Type: client.S3, - Config: e2ethanos.NewS3Config(bucket, m.InternalEndpoint("http"), m.InternalDir()), - }, - "", - "", - nil, - ) - testutil.Ok(t, e2e.StartAndWaitReady(s1)) - - q := e2ethanos.NewQuerierBuilder(e, "1", s1.InternalEndpoint("grpc"), sidecar1.InternalEndpoint("grpc")).WithEnabledFeatures([]string{"query-pushdown"}).Init() - testutil.Ok(t, e2e.StartAndWaitReady(q)) - testutil.Ok(t, s1.WaitSumMetrics(e2emon.Equals(1), "thanos_blocks_meta_synced")) - - testutil.Ok(t, synthesizeFakeMetricSamples(ctx, prom1, []fakeMetricSample{ - { - label: "foo", - value: 123, - timestampUnixNano: now.UnixNano(), - }, - })) - - queryAndAssertSeries(t, ctx, q.Endpoint("http"), func() string { - return "max_over_time(my_fake_metric[2h])" - }, time.Now, promclient.QueryOptions{ - Deduplicate: true, - }, []model.Metric{ - { - "instance": "foo", - "prometheus": "p1", - }, - }) - - queryAndAssertSeries(t, ctx, q.Endpoint("http"), func() string { - return "max(my_fake_metric) by (__name__, instance)" - }, time.Now, promclient.QueryOptions{ - Deduplicate: true, - }, []model.Metric{ - { - "instance": "foo", - "__name__": "my_fake_metric", - }, - }) - - queryAndAssertSeries(t, ctx, q.Endpoint("http"), func() string { - return "min_over_time(my_fake_metric[2h])" - }, time.Now, promclient.QueryOptions{ - Deduplicate: true, - }, []model.Metric{ - { - "instance": "foo", - "prometheus": "p1", - }, - }) - - queryAndAssertSeries(t, ctx, q.Endpoint("http"), func() string { - return "min(my_fake_metric) by (instance, __name__)" - }, time.Now, promclient.QueryOptions{ - Deduplicate: true, - }, []model.Metric{ - { - "instance": "foo", - "__name__": "my_fake_metric", - }, - }) - - queryAndAssertSeries(t, ctx, q.Endpoint("http"), func() string { - return "group(my_fake_metric) by (__name__, instance)" - }, time.Now, promclient.QueryOptions{ - Deduplicate: true, - }, []model.Metric{ - { - "instance": "foo", - "__name__": "my_fake_metric", - }, - }) -} - type seriesWithLabels struct { intLabels labels.Labels extLabels labels.Labels @@ -1435,7 +1317,7 @@ func TestSidecarQueryEvaluation(t *testing.T) { for _, tc := range ts { t.Run(tc.query, func(t *testing.T) { - e, err := e2e.NewDockerEnvironment("query-pushdown") + e, err := e2e.NewDockerEnvironment("query-evaluation") testutil.Ok(t, err) t.Cleanup(e2ethanos.CleanScenario(t, e)) @@ -1451,7 +1333,6 @@ func TestSidecarQueryEvaluation(t *testing.T) { } q := e2ethanos. NewQuerierBuilder(e, "1", endpoints...). - WithEnabledFeatures([]string{"query-pushdown"}). Init() testutil.Ok(t, e2e.StartAndWaitReady(q)) @@ -1928,7 +1809,7 @@ func TestSidecarQueryEvaluationWithDedup(t *testing.T) { for _, tc := range ts { t.Run(tc.query, func(t *testing.T) { - e, err := e2e.NewDockerEnvironment("pushdown-dedup") + e, err := e2e.NewDockerEnvironment("query-dedup") testutil.Ok(t, err) t.Cleanup(e2ethanos.CleanScenario(t, e)) @@ -1944,7 +1825,6 @@ func TestSidecarQueryEvaluationWithDedup(t *testing.T) { } q := e2ethanos. NewQuerierBuilder(e, "1", endpoints...). - WithEnabledFeatures([]string{"query-pushdown"}). Init() testutil.Ok(t, err) testutil.Ok(t, e2e.StartAndWaitReady(q)) @@ -1963,93 +1843,6 @@ func TestSidecarQueryEvaluationWithDedup(t *testing.T) { } } -// TestSidecarStoreAlignmentPushdown tests how pushdown works with -// --min-time and --max-time. -func TestSidecarAlignmentPushdown(t *testing.T) { - t.Parallel() - - e, err := e2e.NewDockerEnvironment("pushdown-min-max") - testutil.Ok(t, err) - t.Cleanup(e2ethanos.CleanScenario(t, e)) - - now := time.Now() - - prom1, sidecar1 := e2ethanos.NewPrometheusWithSidecar(e, "p1", e2ethanos.DefaultPromConfig("p1", 0, "", ""), "", e2ethanos.DefaultPrometheusImage(), now.Add(time.Duration(-1)*time.Hour).Format(time.RFC3339), now.Format(time.RFC3339), "remote-write-receiver") - testutil.Ok(t, e2e.StartAndWaitReady(prom1, sidecar1)) - - endpoints := []string{ - sidecar1.InternalEndpoint("grpc"), - } - q1 := e2ethanos. - NewQuerierBuilder(e, "1", endpoints...). - Init() - testutil.Ok(t, err) - testutil.Ok(t, e2e.StartAndWaitReady(q1)) - q2 := e2ethanos. - NewQuerierBuilder(e, "2", endpoints...). - WithEnabledFeatures([]string{"query-pushdown"}). - Init() - testutil.Ok(t, err) - testutil.Ok(t, e2e.StartAndWaitReady(q2)) - - ctx, cancel := context.WithTimeout(context.Background(), 1*time.Minute) - t.Cleanup(cancel) - - samples := make([]fakeMetricSample, 0) - for i := now.Add(time.Duration(-3) * time.Hour); i.Before(now); i = i.Add(30 * time.Second) { - samples = append(samples, fakeMetricSample{ - label: "test", - value: 1, - timestampUnixNano: i.UnixNano(), - }) - } - - testutil.Ok(t, synthesizeFakeMetricSamples(ctx, prom1, samples)) - - // This query should have identical requests. - testQuery := func() string { return `max_over_time({instance="test"}[5m])` } - - logger := log.NewLogfmtLogger(os.Stdout) - logger = log.With(logger, "ts", log.DefaultTimestampUTC) - - var expectedRes model.Matrix - testutil.Ok(t, runutil.RetryWithLog(logger, time.Second, ctx.Done(), func() error { - res, warnings, _, err := promclient.NewDefaultClient().QueryRange(ctx, urlParse(t, "http://"+q1.Endpoint("http")), testQuery(), - timestamp.FromTime(now.Add(time.Duration(-7*24)*time.Hour)), - timestamp.FromTime(now), - 2419, // Taken from UI. - promclient.QueryOptions{ - Deduplicate: true, - }) - if err != nil { - return err - } - - if len(warnings) > 0 { - return errors.Errorf("unexpected warnings %s", warnings) - } - - if len(res) == 0 { - return errors.Errorf("got empty result") - } - - expectedRes = res - return nil - })) - - rangeQuery(t, ctx, q2.Endpoint("http"), testQuery, timestamp.FromTime(now.Add(time.Duration(-7*24)*time.Hour)), - timestamp.FromTime(now), - 2419, // Taken from UI. - promclient.QueryOptions{ - Deduplicate: true, - }, func(res model.Matrix) error { - if !reflect.DeepEqual(res, expectedRes) { - return fmt.Errorf("unexpected results (got %v but expected %v)", res, expectedRes) - } - return nil - }) -} - func TestGrpcInstantQuery(t *testing.T) { t.Parallel()