Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Mimir query engine: add support for scalar constants and pi() #8995

Merged
merged 8 commits into from
Aug 16, 2024
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
* [CHANGE] Ingester client: experimental support for client-side circuit breakers, their configuration options (`-ingester.client.circuit-breaker.*`) and metrics (`cortex_ingester_client_circuit_breaker_results_total`, `cortex_ingester_client_circuit_breaker_transitions_total`) were removed. #8802
* [CHANGE] Ingester: circuit breakers do not open in case of per-instance limit errors anymore. Opening can be triggered only in case of push and pull requests exceeding the configured duration. #8854
* [CHANGE] Query-frontend: Return `413 Request Entity Too Large` if a response shard for an `/active_series` request is too large. #8861
* [FEATURE] Querier: add experimental streaming PromQL engine, enabled with `-querier.query-engine=mimir`. #8422 #8430 #8454 #8455 #8360 #8490 #8508 #8577 #8660 #8671 #8677 #8747 #8850 #8872 #8838 #8911 #8909 #8923 #8924 #8925 #8932 #8933 #8934 #8962 #8986 #8993
* [FEATURE] Querier: add experimental streaming PromQL engine, enabled with `-querier.query-engine=mimir`. #8422 #8430 #8454 #8455 #8360 #8490 #8508 #8577 #8660 #8671 #8677 #8747 #8850 #8872 #8838 #8911 #8909 #8923 #8924 #8925 #8932 #8933 #8934 #8962 #8986 #8993 #8995
* [FEATURE] Experimental Kafka-based ingest storage. #6888 #6894 #6929 #6940 #6951 #6974 #6982 #7029 #7030 #7091 #7142 #7147 #7148 #7153 #7160 #7193 #7349 #7376 #7388 #7391 #7393 #7394 #7402 #7404 #7423 #7424 #7437 #7486 #7503 #7508 #7540 #7621 #7682 #7685 #7694 #7695 #7696 #7697 #7701 #7733 #7734 #7741 #7752 #7838 #7851 #7871 #7877 #7880 #7882 #7887 #7891 #7925 #7955 #7967 #8031 #8063 #8077 #8088 #8135 #8176 #8184 #8194 #8216 #8217 #8222 #8233 #8503 #8542 #8579 #8657 #8686 #8688 #8703 #8706 #8708 #8738 #8750 #8778 #8808 #8809 #8841 #8842 #8845 #8853 #8886
* What it is:
* When the new ingest storage architecture is enabled, distributors write incoming write requests to a Kafka-compatible backend, and the ingesters asynchronously replay ingested data from Kafka. In this architecture, the write and read path are de-coupled through a Kafka-compatible backend. The write path and Kafka load is a function of the incoming write traffic, the read path load is a function of received queries. Whatever the load on the read path, it doesn't affect the write path.
Expand Down
11 changes: 11 additions & 0 deletions cmd/mimir/config-descriptor.json
Original file line number Diff line number Diff line change
Expand Up @@ -2032,6 +2032,17 @@
"fieldFlag": "querier.mimir-query-engine.enable-offset-modifier",
"fieldType": "boolean",
"fieldCategory": "experimental"
},
{
"kind": "field",
"name": "enable_scalars",
"required": false,
"desc": "Enable support for scalars in Mimir's query engine. Only applies if the Mimir query engine is in use.",
"fieldValue": null,
"fieldDefaultValue": true,
"fieldFlag": "querier.mimir-query-engine.enable-scalars",
"fieldType": "boolean",
"fieldCategory": "experimental"
}
],
"fieldValue": null,
Expand Down
2 changes: 2 additions & 0 deletions cmd/mimir/help-all.txt.tmpl
Original file line number Diff line number Diff line change
Expand Up @@ -1905,6 +1905,8 @@ Usage of ./cmd/mimir/mimir:
[experimental] Enable support for offset modifier in Mimir's query engine. Only applies if the Mimir query engine is in use. (default true)
-querier.mimir-query-engine.enable-over-time-functions
[experimental] Enable support for ..._over_time functions in Mimir's query engine. Only applies if the Mimir query engine is in use. (default true)
-querier.mimir-query-engine.enable-scalars
[experimental] Enable support for scalars in Mimir's query engine. Only applies if the Mimir query engine is in use. (default true)
-querier.minimize-ingester-requests
If true, when querying ingesters, only the minimum required ingesters required to reach quorum will be queried initially, with other ingesters queried only if needed due to failures from the initial set of ingesters. Enabling this option reduces resource consumption for the happy path at the cost of increased latency for the unhappy path. (default true)
-querier.minimize-ingester-requests-hedging-delay duration
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1530,6 +1530,11 @@ mimir_query_engine:
# Only applies if the Mimir query engine is in use.
# CLI flag: -querier.mimir-query-engine.enable-offset-modifier
[enable_offset_modifier: <boolean> | default = true]

# (experimental) Enable support for scalars in Mimir's query engine. Only
# applies if the Mimir query engine is in use.
# CLI flag: -querier.mimir-query-engine.enable-scalars
[enable_scalars: <boolean> | default = true]
```

### frontend
Expand Down
3 changes: 3 additions & 0 deletions pkg/streamingpromql/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ type FeatureToggles struct {
EnableBinaryOperations bool `yaml:"enable_binary_operations" category:"experimental"`
EnableOverTimeFunctions bool `yaml:"enable_over_time_functions" category:"experimental"`
EnableOffsetModifier bool `yaml:"enable_offset_modifier" category:"experimental"`
EnableScalars bool `yaml:"enable_scalars" category:"experimental"`
}

var overTimeFunctionNames = []string{
Expand All @@ -35,10 +36,12 @@ var EnableAllFeatures = FeatureToggles{
true,
true,
true,
true,
}

func (t *FeatureToggles) RegisterFlags(f *flag.FlagSet) {
f.BoolVar(&t.EnableBinaryOperations, "querier.mimir-query-engine.enable-binary-operations", true, "Enable support for binary operations in Mimir's query engine. Only applies if the Mimir query engine is in use.")
f.BoolVar(&t.EnableOverTimeFunctions, "querier.mimir-query-engine.enable-over-time-functions", true, "Enable support for ..._over_time functions in Mimir's query engine. Only applies if the Mimir query engine is in use.")
f.BoolVar(&t.EnableOffsetModifier, "querier.mimir-query-engine.enable-offset-modifier", true, "Enable support for offset modifier in Mimir's query engine. Only applies if the Mimir query engine is in use.")
f.BoolVar(&t.EnableScalars, "querier.mimir-query-engine.enable-scalars", true, "Enable support for scalars in Mimir's query engine. Only applies if the Mimir query engine is in use.")
}
21 changes: 14 additions & 7 deletions pkg/streamingpromql/engine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,18 +41,17 @@ func TestUnsupportedPromQLFeatures(t *testing.T) {
// The goal of this is not to list every conceivable expression that is unsupported, but to cover all the
// different cases and make sure we produce a reasonable error message when these cases are encountered.
unsupportedExpressions := map[string]string{
"1 + 2": "scalar value as top-level expression",
"1 + metric{}": "binary expression with scalars",
"metric{} + 1": "binary expression with scalars",
"1 + 2": "binary expression between two scalars",
"1 + metric{}": "binary expression between scalar and instant vector",
"metric{} + 1": "binary expression between scalar and instant vector",
"metric{} < other_metric{}": "binary expression with '<'",
"metric{} or other_metric{}": "binary expression with many-to-many matching",
"metric{} + on() group_left() other_metric{}": "binary expression with many-to-one matching",
"metric{} + on() group_right() other_metric{}": "binary expression with one-to-many matching",
"1": "scalar value as top-level expression",
"avg(metric{})": "'avg' aggregation",
"rate(metric{}[5m:1m])": "PromQL expression type *parser.SubqueryExpr",
"rate(metric{}[5m:1m])": "PromQL expression type *parser.SubqueryExpr for range vectors",
"quantile_over_time(0.4, metric{}[5m])": "'quantile_over_time' function",
"-sum(metric{})": "PromQL expression type *parser.UnaryExpr",
"-sum(metric{})": "PromQL expression type *parser.UnaryExpr for instant vectors",
}

for expression, expectedError := range unsupportedExpressions {
Expand All @@ -65,7 +64,7 @@ func TestUnsupportedPromQLFeatures(t *testing.T) {
// These expressions are also unsupported, but are only valid as instant queries.
unsupportedInstantQueryExpressions := map[string]string{
"'a'": "string value as top-level expression",
"metric{}[5m:1m]": "PromQL expression type *parser.SubqueryExpr",
"metric{}[5m:1m]": "PromQL expression type *parser.SubqueryExpr for range vectors",
}

for expression, expectedError := range unsupportedInstantQueryExpressions {
Expand Down Expand Up @@ -103,6 +102,14 @@ func TestUnsupportedPromQLFeaturesWithFeatureToggles(t *testing.T) {
requireRangeQueryIsUnsupported(t, featureToggles, "rate(metric[2m] offset 1m)", "range vector selector with 'offset'")
requireInstantQueryIsUnsupported(t, featureToggles, "rate(metric[2m] offset 1m)", "range vector selector with 'offset'")
})

t.Run("scalars", func(t *testing.T) {
featureToggles := EnableAllFeatures
featureToggles.EnableScalars = false

requireRangeQueryIsUnsupported(t, featureToggles, "2", "scalar values")
requireInstantQueryIsUnsupported(t, featureToggles, "2", "scalar values")
})
}

func requireRangeQueryIsUnsupported(t *testing.T, featureToggles FeatureToggles, expression string, expectedError string) {
Expand Down
40 changes: 39 additions & 1 deletion pkg/streamingpromql/functions.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ package streamingpromql

import (
"fmt"
"math"

"github.com/prometheus/prometheus/promql/parser/posrange"
"github.com/prometheus/prometheus/util/annotations"
Expand All @@ -14,7 +15,20 @@ import (
"github.com/grafana/mimir/pkg/streamingpromql/types"
)

type InstantVectorFunctionOperatorFactory func(args []types.Operator, memoryConsumptionTracker *limiting.MemoryConsumptionTracker, annotations *annotations.Annotations, expressionPosition posrange.PositionRange) (types.InstantVectorOperator, error)
type InstantVectorFunctionOperatorFactory func(
args []types.Operator,
memoryConsumptionTracker *limiting.MemoryConsumptionTracker,
annotations *annotations.Annotations,
expressionPosition posrange.PositionRange,
) (types.InstantVectorOperator, error)

type ScalarFunctionOperatorFactory func(
args []types.Operator,
memoryConsumptionTracker *limiting.MemoryConsumptionTracker,
annotations *annotations.Annotations,
expressionPosition posrange.PositionRange,
start, end, interval int64,
) (types.ScalarOperator, error)

// SingleInputVectorFunctionOperatorFactory creates an InstantVectorFunctionOperatorFactory for functions
// that have exactly 1 argument (v instant-vector).
Expand Down Expand Up @@ -135,3 +149,27 @@ func RegisterInstantVectorFunctionOperatorFactory(functionName string, factory I
instantVectorFunctionOperatorFactories[functionName] = factory
return nil
}

// These functions return a scalar.
var scalarFunctionOperatorFactories = map[string]ScalarFunctionOperatorFactory{
// Please keep this list sorted alphabetically.
"pi": piOperatorFactory,
}

func RegisterScalarFunctionOperatorFactory(functionName string, factory ScalarFunctionOperatorFactory) error {
if _, exists := scalarFunctionOperatorFactories[functionName]; exists {
return fmt.Errorf("function '%s' has already been registered", functionName)
}

scalarFunctionOperatorFactories[functionName] = factory
return nil
}

func piOperatorFactory(args []types.Operator, memoryConsumptionTracker *limiting.MemoryConsumptionTracker, annotations *annotations.Annotations, expressionPosition posrange.PositionRange, start, end, interval int64) (types.ScalarOperator, error) {
if len(args) != 0 {
// Should be caught by the PromQL parser, but we check here for safety.
return nil, fmt.Errorf("expected exactly 0 arguments for pi, got %v", len(args))
}

return operators.NewScalarConstant(math.Pi, start, end, interval, memoryConsumptionTracker, expressionPosition), nil
}
20 changes: 20 additions & 0 deletions pkg/streamingpromql/functions_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,3 +30,23 @@ func TestRegisterInstantVectorFunctionOperatorFactory(t *testing.T) {
// Cleanup changes to instantVectorFunctionOperatorFactories
delete(instantVectorFunctionOperatorFactories, "new_function")
}

func TestRegisterScalarFunctionOperatorFactory(t *testing.T) {
// Register an already existing function
err := RegisterScalarFunctionOperatorFactory("pi", piOperatorFactory)
require.Error(t, err)
require.Equal(t, "function 'pi' has already been registered", err.Error())

// Register a new function
err = RegisterScalarFunctionOperatorFactory("new_function", piOperatorFactory)
require.NoError(t, err)
require.Contains(t, scalarFunctionOperatorFactories, "new_function")

// Register existing function we registered previously
err = RegisterScalarFunctionOperatorFactory("new_function", piOperatorFactory)
require.Error(t, err)
require.Equal(t, "function 'new_function' has already been registered", err.Error())

// Cleanup changes to instantVectorFunctionOperatorFactories
delete(scalarFunctionOperatorFactories, "new_function")
}
23 changes: 9 additions & 14 deletions pkg/streamingpromql/operators/aggregation.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,17 +9,14 @@ import (
"context"
"errors"
"fmt"
"slices"
"sort"
"time"

"github.com/prometheus/prometheus/model/histogram"
"github.com/prometheus/prometheus/model/labels"
"github.com/prometheus/prometheus/model/timestamp"
"github.com/prometheus/prometheus/promql"
"github.com/prometheus/prometheus/promql/parser/posrange"
"github.com/prometheus/prometheus/util/annotations"
"github.com/prometheus/prometheus/util/zeropool"
"slices"
"sort"

"github.com/grafana/mimir/pkg/streamingpromql/floats"
"github.com/grafana/mimir/pkg/streamingpromql/functions"
Expand Down Expand Up @@ -53,17 +50,15 @@ type Aggregation struct {

func NewAggregation(
inner types.InstantVectorOperator,
start time.Time,
end time.Time,
interval time.Duration,
startT int64,
endT int64,
intervalMs int64,
grouping []string,
without bool,
memoryConsumptionTracker *limiting.MemoryConsumptionTracker,
annotations *annotations.Annotations,
expressionPosition posrange.PositionRange,
) *Aggregation {
s, e, i := timestamp.FromTime(start), timestamp.FromTime(end), interval.Milliseconds()

if without {
labelsToDrop := make([]string, 0, len(grouping)+1)
labelsToDrop = append(labelsToDrop, labels.MetricName)
Expand All @@ -75,10 +70,10 @@ func NewAggregation(

a := &Aggregation{
Inner: inner,
Start: s,
End: e,
Interval: i,
Steps: stepCount(s, e, i),
Start: startT,
End: endT,
Interval: intervalMs,
Steps: stepCount(startT, endT, intervalMs),
Grouping: grouping,
Without: without,
MemoryConsumptionTracker: memoryConsumptionTracker,
Expand Down
68 changes: 68 additions & 0 deletions pkg/streamingpromql/operators/scalar_constant.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
// SPDX-License-Identifier: AGPL-3.0-only

package operators

import (
"context"

"github.com/prometheus/prometheus/promql/parser/posrange"

"github.com/grafana/mimir/pkg/streamingpromql/limiting"
"github.com/grafana/mimir/pkg/streamingpromql/types"
)

type ScalarConstant struct {
Value float64
Start int64 // Milliseconds since Unix epoch
End int64 // Milliseconds since Unix epoch
Interval int64 // In milliseconds
MemoryConsumptionTracker *limiting.MemoryConsumptionTracker

expressionPosition posrange.PositionRange
}

var _ types.ScalarOperator = &ScalarConstant{}

func NewScalarConstant(
value float64,
start int64,
end int64,
interval int64,
memoryConsumptionTracker *limiting.MemoryConsumptionTracker,
expressionPosition posrange.PositionRange,
) *ScalarConstant {
return &ScalarConstant{
Value: value,
Start: start,
End: end,
Interval: interval,
MemoryConsumptionTracker: memoryConsumptionTracker,
expressionPosition: expressionPosition,
}
}

func (s *ScalarConstant) GetValues(_ context.Context) (types.ScalarData, error) {
numSteps := stepCount(s.Start, s.End, s.Interval)
samples, err := types.FPointSlicePool.Get(numSteps, s.MemoryConsumptionTracker)

if err != nil {
return types.ScalarData{}, err
}

samples = samples[:numSteps]

for step := 0; step < numSteps; step++ {
samples[step].T = s.Start + int64(step)*s.Interval
samples[step].F = s.Value
}

return types.ScalarData{Samples: samples}, nil
jhesketh marked this conversation as resolved.
Show resolved Hide resolved
}

func (s *ScalarConstant) ExpressionPosition() posrange.PositionRange {
return s.expressionPosition
}

func (s *ScalarConstant) Close() {
// Nothing to do.
}
Loading