diff --git a/CHANGELOG.md b/CHANGELOG.md index 91024aa5833..1c0cb7840e9 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -14,7 +14,7 @@ * `cortex_alertmanager_alerts` * `cortex_alertmanager_silences` * [CHANGE] Cache: Deprecate experimental support for Redis as a cache backend. #9453 -* [FEATURE] Querier: add experimental streaming PromQL engine, enabled with `-querier.query-engine=mimir`. #9367 #9368 #9398 #9399 #9403 #9417 #9418 #9419 #9420 #9482 #9504 #9505 #9507 #9518 +* [FEATURE] Querier: add experimental streaming PromQL engine, enabled with `-querier.query-engine=mimir`. #9367 #9368 #9398 #9399 #9403 #9417 #9418 #9419 #9420 #9482 #9504 #9505 #9507 #9518 #9533 * [FEATURE] Query-frontend: added experimental configuration options `query-frontend.cache-errors` and `query-frontend.results-cache-ttl-for-errors` to allow non-transient responses to be cached. When set to `true` error responses from hitting limits or bad data are cached for a short TTL. #9028 * [FEATURE] gRPC: Support S2 compression. #9322 * `-alertmanager.alertmanager-client.grpc-compression=s2` diff --git a/pkg/streamingpromql/benchmarks/benchmarks.go b/pkg/streamingpromql/benchmarks/benchmarks.go index 270e004bece..1ad4a03780c 100644 --- a/pkg/streamingpromql/benchmarks/benchmarks.go +++ b/pkg/streamingpromql/benchmarks/benchmarks.go @@ -173,9 +173,9 @@ func TestCases(metricSizes []int) []BenchCase { //{ // Expr: "a_X or b_X{l=~'.*[0-4]$'}", //}, - //{ - // Expr: "a_X unless b_X{l=~'.*[0-4]$'}", - //}, + { + Expr: "a_X unless b_X{l=~'.*[0-4]$'}", + }, { Expr: "a_X and b_X{l='notfound'}", }, diff --git a/pkg/streamingpromql/engine_test.go b/pkg/streamingpromql/engine_test.go index 7ff5acf99a1..1e5c752fc29 100644 --- a/pkg/streamingpromql/engine_test.go +++ b/pkg/streamingpromql/engine_test.go @@ -43,7 +43,6 @@ func TestUnsupportedPromQLFeatures(t *testing.T) { // different cases and make sure we produce a reasonable error message when these cases are encountered. unsupportedExpressions := map[string]string{ "metric{} or other_metric{}": "binary expression with 'or'", - "metric{} unless other_metric{}": "binary expression with 'unless'", "metric{} + on() group_left() other_metric{}": "binary expression with many-to-one matching", "metric{} + on() group_right() other_metric{}": "binary expression with one-to-many matching", "topk(5, metric{})": "'topk' aggregation with parameter", @@ -1903,7 +1902,7 @@ func TestCompareVariousMixedMetricsBinaryOperations(t *testing.T) { expressions := []string{} for _, labels := range labelCombinations { - for _, op := range []string{"+", "-", "*", "/", "and"} { + for _, op := range []string{"+", "-", "*", "/", "and", "unless"} { binaryExpr := fmt.Sprintf(`series{label="%s"}`, labels[0]) for _, label := range labels[1:] { binaryExpr += fmt.Sprintf(` %s series{label="%s"}`, op, label) diff --git a/pkg/streamingpromql/operators/and_binary_operation.go b/pkg/streamingpromql/operators/and_unless_binary_operation.go similarity index 77% rename from pkg/streamingpromql/operators/and_binary_operation.go rename to pkg/streamingpromql/operators/and_unless_binary_operation.go index 5d16a396a90..404d3bdc126 100644 --- a/pkg/streamingpromql/operators/and_binary_operation.go +++ b/pkg/streamingpromql/operators/and_unless_binary_operation.go @@ -12,12 +12,13 @@ import ( "github.com/grafana/mimir/pkg/streamingpromql/types" ) -// AndBinaryOperation represents a logical 'and' between two vectors. -type AndBinaryOperation struct { +// AndUnlessBinaryOperation represents a logical 'and' or 'unless' between two vectors. +type AndUnlessBinaryOperation struct { Left types.InstantVectorOperator Right types.InstantVectorOperator VectorMatching parser.VectorMatching MemoryConsumptionTracker *limiting.MemoryConsumptionTracker + IsUnless bool // If true, this operator represents an 'unless', if false, this operator represents an 'and' timeRange types.QueryTimeRange expressionPosition posrange.PositionRange @@ -26,27 +27,29 @@ type AndBinaryOperation struct { nextRightSeriesIndex int } -var _ types.InstantVectorOperator = &AndBinaryOperation{} +var _ types.InstantVectorOperator = &AndUnlessBinaryOperation{} -func NewAndBinaryOperation( +func NewAndUnlessBinaryOperation( left types.InstantVectorOperator, right types.InstantVectorOperator, vectorMatching parser.VectorMatching, memoryConsumptionTracker *limiting.MemoryConsumptionTracker, + isUnless bool, timeRange types.QueryTimeRange, expressionPosition posrange.PositionRange, -) *AndBinaryOperation { - return &AndBinaryOperation{ +) *AndUnlessBinaryOperation { + return &AndUnlessBinaryOperation{ Left: left, Right: right, VectorMatching: vectorMatching, MemoryConsumptionTracker: memoryConsumptionTracker, + IsUnless: isUnless, timeRange: timeRange, expressionPosition: expressionPosition, } } -func (a *AndBinaryOperation) SeriesMetadata(ctx context.Context) ([]types.SeriesMetadata, error) { +func (a *AndUnlessBinaryOperation) SeriesMetadata(ctx context.Context) ([]types.SeriesMetadata, error) { leftMetadata, err := a.Left.SeriesMetadata(ctx) if err != nil { return nil, err @@ -65,7 +68,7 @@ func (a *AndBinaryOperation) SeriesMetadata(ctx context.Context) ([]types.Series defer types.PutSeriesMetadataSlice(rightMetadata) - if len(rightMetadata) == 0 { + if len(rightMetadata) == 0 && !a.IsUnless { // We can't produce any series, we are done. return nil, nil } @@ -104,6 +107,14 @@ func (a *AndBinaryOperation) SeriesMetadata(ctx context.Context) ([]types.Series a.rightSeriesGroups = append(a.rightSeriesGroups, group) } + if a.IsUnless { + return a.computeUnlessSeriesMetadata(leftMetadata), nil + } + + return a.computeAndSeriesMetadata(leftMetadata), nil +} + +func (a *AndUnlessBinaryOperation) computeAndSeriesMetadata(leftMetadata []types.SeriesMetadata) []types.SeriesMetadata { // Iterate through the left-hand series again, and build the list of output series based on those that matched at least one series on the right. // It's safe to reuse the left metadata slice as we'll return series in the same order, and only ever return fewer series than the left operator produces. nextOutputSeriesIndex := 0 @@ -119,10 +130,22 @@ func (a *AndBinaryOperation) SeriesMetadata(ctx context.Context) ([]types.Series } } - return leftMetadata[:nextOutputSeriesIndex], nil + return leftMetadata[:nextOutputSeriesIndex] +} + +func (a *AndUnlessBinaryOperation) computeUnlessSeriesMetadata(leftMetadata []types.SeriesMetadata) []types.SeriesMetadata { + // Iterate through the left-hand series again, and remove references to any groups that don't match any series from the right side: + // we can just return the left-hand series as-is if it does not match anything from the right side. + for seriesIdx, group := range a.leftSeriesGroups { + if group.lastRightSeriesIndex == -1 { + a.leftSeriesGroups[seriesIdx] = nil + } + } + + return leftMetadata } -func (a *AndBinaryOperation) NextSeries(ctx context.Context) (types.InstantVectorSeriesData, error) { +func (a *AndUnlessBinaryOperation) NextSeries(ctx context.Context) (types.InstantVectorSeriesData, error) { for { if len(a.leftSeriesGroups) == 0 { // No more series to return. @@ -134,12 +157,17 @@ func (a *AndBinaryOperation) NextSeries(ctx context.Context) (types.InstantVecto if thisSeriesGroup == nil { // This series from the left side has no matching series on the right side. - // Read it, discard it, and move on to the next series. d, err := a.Left.NextSeries(ctx) if err != nil { return types.InstantVectorSeriesData{}, err } + if a.IsUnless { + // If this is an 'unless' operation, we should return the series as-is, as this series can't be filtered by anything on the right. + return d, nil + } + + // If this is an 'and' operation, we should discard it and move on to the next series, as this series can't contribute to the result. types.PutInstantVectorSeriesData(d, a.MemoryConsumptionTracker) continue } @@ -156,7 +184,7 @@ func (a *AndBinaryOperation) NextSeries(ctx context.Context) (types.InstantVecto return types.InstantVectorSeriesData{}, err } - filteredData, err := thisSeriesGroup.FilterLeftSeries(originalData, a.MemoryConsumptionTracker, a.timeRange) + filteredData, err := thisSeriesGroup.FilterLeftSeries(originalData, a.MemoryConsumptionTracker, a.timeRange, a.IsUnless) if err != nil { return types.InstantVectorSeriesData{}, err } @@ -173,7 +201,7 @@ func (a *AndBinaryOperation) NextSeries(ctx context.Context) (types.InstantVecto } // readRightSideUntilGroupComplete reads series from the right-hand side until all series for desiredGroup have been read. -func (a *AndBinaryOperation) readRightSideUntilGroupComplete(ctx context.Context, desiredGroup *andGroup) error { +func (a *AndUnlessBinaryOperation) readRightSideUntilGroupComplete(ctx context.Context, desiredGroup *andGroup) error { for a.nextRightSeriesIndex <= desiredGroup.lastRightSeriesIndex { groupForRightSeries := a.rightSeriesGroups[0] a.rightSeriesGroups = a.rightSeriesGroups[1:] @@ -196,11 +224,11 @@ func (a *AndBinaryOperation) readRightSideUntilGroupComplete(ctx context.Context return nil } -func (a *AndBinaryOperation) ExpressionPosition() posrange.PositionRange { +func (a *AndUnlessBinaryOperation) ExpressionPosition() posrange.PositionRange { return a.expressionPosition } -func (a *AndBinaryOperation) Close() { +func (a *AndUnlessBinaryOperation) Close() { a.Left.Close() a.Right.Close() } @@ -237,12 +265,12 @@ func (g *andGroup) AccumulateRightSeriesPresence(data types.InstantVectorSeriesD // FilterLeftSeries returns leftData filtered based on samples seen for the right-hand side. // The return value reuses the slices from leftData, and returns any unused slices to the pool. -func (g *andGroup) FilterLeftSeries(leftData types.InstantVectorSeriesData, memoryConsumptionTracker *limiting.MemoryConsumptionTracker, timeRange types.QueryTimeRange) (types.InstantVectorSeriesData, error) { +func (g *andGroup) FilterLeftSeries(leftData types.InstantVectorSeriesData, memoryConsumptionTracker *limiting.MemoryConsumptionTracker, timeRange types.QueryTimeRange, isUnless bool) (types.InstantVectorSeriesData, error) { filteredData := types.InstantVectorSeriesData{} nextOutputFloatIndex := 0 for _, p := range leftData.Floats { - if !g.rightSamplePresence[timeRange.PointIndex(p.T)] { + if g.rightSamplePresence[timeRange.PointIndex(p.T)] == isUnless { continue } @@ -261,7 +289,7 @@ func (g *andGroup) FilterLeftSeries(leftData types.InstantVectorSeriesData, memo nextOutputHistogramIndex := 0 for idx, p := range leftData.Histograms { - if !g.rightSamplePresence[timeRange.PointIndex(p.T)] { + if g.rightSamplePresence[timeRange.PointIndex(p.T)] == isUnless { continue } diff --git a/pkg/streamingpromql/query.go b/pkg/streamingpromql/query.go index c4d14273bb6..98693db23dc 100644 --- a/pkg/streamingpromql/query.go +++ b/pkg/streamingpromql/query.go @@ -237,8 +237,8 @@ func (q *Query) convertToInstantVectorOperator(expr parser.Expr) (types.InstantV } switch e.Op { - case parser.LAND: - return operators.NewAndBinaryOperation(lhs, rhs, *e.VectorMatching, q.memoryConsumptionTracker, q.timeRange, e.PositionRange()), nil + case parser.LAND, parser.LUNLESS: + return operators.NewAndUnlessBinaryOperation(lhs, rhs, *e.VectorMatching, q.memoryConsumptionTracker, e.Op == parser.LUNLESS, q.timeRange, e.PositionRange()), nil default: return operators.NewVectorVectorBinaryOperation(lhs, rhs, *e.VectorMatching, e.Op, e.ReturnBool, q.memoryConsumptionTracker, q.annotations, e.PositionRange()) } diff --git a/pkg/streamingpromql/testdata/ours/binary_operators.test b/pkg/streamingpromql/testdata/ours/binary_operators.test index 1427313c631..044d2a0e093 100644 --- a/pkg/streamingpromql/testdata/ours/binary_operators.test +++ b/pkg/streamingpromql/testdata/ours/binary_operators.test @@ -766,7 +766,7 @@ clear # Single matching series on each side. load 6m left_side{env="test", pod="a"} 1 2 3 4 {{count:5 sum:5}} - left_side{env="test", pod="b"} 6 7 8 _ 10 + left_side{env="test", pod="b"} 6 7 8 _ {{count:10 sum:10}} left_side{env="prod", pod="a"} 11 12 13 14 15 left_side{env="prod", pod="b"} 16 17 18 _ _ right_side{env="test", pod="a"} 0 0 0 {{count:0 sum:0}} {{count:0 sum:0}} @@ -774,6 +774,21 @@ load 6m right_side{env="prod", pod="b"} _ _ _ 0 0 right_side{env="foo", pod="a"} 0 0 0 0 0 +eval range from 0 to 24m step 6m left_side and does_not_match + # Should return no results. + +eval range from 0 to 24m step 6m does_not_match and right_side + # Should return no results. + +eval range from 0 to 24m step 6m left_side unless does_not_match + left_side{env="test", pod="a"} 1 2 3 4 {{count:5 sum:5}} + left_side{env="test", pod="b"} 6 7 8 _ {{count:10 sum:10}} + left_side{env="prod", pod="a"} 11 12 13 14 15 + left_side{env="prod", pod="b"} 16 17 18 _ _ + +eval range from 0 to 24m step 6m does_not_match unless right_side + # Should return no results. + # {env="test", pod="a"}: Matching series, all samples align # {env="test", pod="b"}: Matching series, only some samples align # {env="prod", pod="a"}: No matching series on RHS @@ -783,6 +798,11 @@ eval range from 0 to 24m step 6m left_side and right_side left_side{env="test", pod="a"} 1 2 3 4 {{count:5 sum:5}} left_side{env="test", pod="b"} _ 7 _ _ _ +eval range from 0 to 24m step 6m left_side unless right_side + left_side{env="test", pod="b"} 6 _ 8 _ {{count:10 sum:10}} + left_side{env="prod", pod="a"} 11 12 13 14 15 + left_side{env="prod", pod="b"} 16 17 18 _ _ + clear # Multiple matching series on each side. @@ -800,32 +820,67 @@ load 6m eval range from 0 to 24m step 6m left_side and right_side # Should return no results. +eval range from 0 to 24m step 6m left_side unless right_side + left_side{env="test", cluster="blah", pod="a"} 1 2 3 4 + left_side{env="test", cluster="blah", pod="b"} _ 6 7 8 + left_side{env="prod", cluster="blah", pod="a"} 9 10 11 12 + left_side{env="prod", cluster="blah", pod="b"} 13 14 15 16 + left_side{env="test", cluster="food", pod="a"} 17 18 19 20 + eval range from 0 to 24m step 6m left_side and on(cluster, env) right_side - left_side{env="test", cluster="blah", pod="a"} 1 2 3 _ - left_side{env="test", cluster="blah", pod="b"} _ 6 7 _ - left_side{env="prod", cluster="blah", pod="a"} _ 10 _ 12 - left_side{env="prod", cluster="blah", pod="b"} _ 14 _ 16 - left_side{env="test", cluster="food", pod="a"} _ _ _ 20 + left_side{env="test", cluster="blah", pod="a"} 1 2 3 _ + left_side{env="test", cluster="blah", pod="b"} _ 6 7 _ + left_side{env="prod", cluster="blah", pod="a"} _ 10 _ 12 + left_side{env="prod", cluster="blah", pod="b"} _ 14 _ 16 + left_side{env="test", cluster="food", pod="a"} _ _ _ 20 + +eval range from 0 to 24m step 6m left_side unless on(cluster, env) right_side + left_side{env="test", cluster="blah", pod="a"} _ _ _ 4 + left_side{env="test", cluster="blah", pod="b"} _ _ _ 8 + left_side{env="prod", cluster="blah", pod="a"} 9 _ 11 _ + left_side{env="prod", cluster="blah", pod="b"} 13 _ 15 _ + left_side{env="test", cluster="food", pod="a"} 17 18 19 _ # Same thing again, with labels in different order. eval range from 0 to 24m step 6m left_side and on(env, cluster) right_side - left_side{env="test", cluster="blah", pod="a"} 1 2 3 _ - left_side{env="test", cluster="blah", pod="b"} _ 6 7 _ - left_side{env="prod", cluster="blah", pod="a"} _ 10 _ 12 - left_side{env="prod", cluster="blah", pod="b"} _ 14 _ 16 - left_side{env="test", cluster="food", pod="a"} _ _ _ 20 + left_side{env="test", cluster="blah", pod="a"} 1 2 3 _ + left_side{env="test", cluster="blah", pod="b"} _ 6 7 _ + left_side{env="prod", cluster="blah", pod="a"} _ 10 _ 12 + left_side{env="prod", cluster="blah", pod="b"} _ 14 _ 16 + left_side{env="test", cluster="food", pod="a"} _ _ _ 20 + +eval range from 0 to 24m step 6m left_side unless on(env, cluster) right_side + left_side{env="test", cluster="blah", pod="a"} _ _ _ 4 + left_side{env="test", cluster="blah", pod="b"} _ _ _ 8 + left_side{env="prod", cluster="blah", pod="a"} 9 _ 11 _ + left_side{env="prod", cluster="blah", pod="b"} 13 _ 15 _ + left_side{env="test", cluster="food", pod="a"} 17 18 19 _ eval range from 0 to 24m step 6m left_side and ignoring(idx, pod) right_side - left_side{env="test", cluster="blah", pod="a"} 1 2 3 _ - left_side{env="test", cluster="blah", pod="b"} _ 6 7 _ - left_side{env="prod", cluster="blah", pod="a"} _ 10 _ 12 - left_side{env="prod", cluster="blah", pod="b"} _ 14 _ 16 - left_side{env="test", cluster="food", pod="a"} _ _ _ 20 + left_side{env="test", cluster="blah", pod="a"} 1 2 3 _ + left_side{env="test", cluster="blah", pod="b"} _ 6 7 _ + left_side{env="prod", cluster="blah", pod="a"} _ 10 _ 12 + left_side{env="prod", cluster="blah", pod="b"} _ 14 _ 16 + left_side{env="test", cluster="food", pod="a"} _ _ _ 20 + +eval range from 0 to 24m step 6m left_side unless ignoring(idx, pod) right_side + left_side{env="test", cluster="blah", pod="a"} _ _ _ 4 + left_side{env="test", cluster="blah", pod="b"} _ _ _ 8 + left_side{env="prod", cluster="blah", pod="a"} 9 _ 11 _ + left_side{env="prod", cluster="blah", pod="b"} 13 _ 15 _ + left_side{env="test", cluster="food", pod="a"} 17 18 19 _ # Same thing again, with labels in different order. eval range from 0 to 24m step 6m left_side and ignoring(pod, idx) right_side - left_side{env="test", cluster="blah", pod="a"} 1 2 3 _ - left_side{env="test", cluster="blah", pod="b"} _ 6 7 _ - left_side{env="prod", cluster="blah", pod="a"} _ 10 _ 12 - left_side{env="prod", cluster="blah", pod="b"} _ 14 _ 16 - left_side{env="test", cluster="food", pod="a"} _ _ _ 20 + left_side{env="test", cluster="blah", pod="a"} 1 2 3 _ + left_side{env="test", cluster="blah", pod="b"} _ 6 7 _ + left_side{env="prod", cluster="blah", pod="a"} _ 10 _ 12 + left_side{env="prod", cluster="blah", pod="b"} _ 14 _ 16 + left_side{env="test", cluster="food", pod="a"} _ _ _ 20 + +eval range from 0 to 24m step 6m left_side unless ignoring(pod, idx) right_side + left_side{env="test", cluster="blah", pod="a"} _ _ _ 4 + left_side{env="test", cluster="blah", pod="b"} _ _ _ 8 + left_side{env="prod", cluster="blah", pod="a"} 9 _ 11 _ + left_side{env="prod", cluster="blah", pod="b"} 13 _ 15 _ + left_side{env="test", cluster="food", pod="a"} 17 18 19 _ diff --git a/pkg/streamingpromql/testdata/upstream/operators.test b/pkg/streamingpromql/testdata/upstream/operators.test index 035804cf103..b9e61d17d9d 100644 --- a/pkg/streamingpromql/testdata/upstream/operators.test +++ b/pkg/streamingpromql/testdata/upstream/operators.test @@ -210,18 +210,15 @@ eval instant at 50m (http_requests{group="canary"} + 1) and ignoring(group, job) # vector_matching_a{l="x"} 10 # vector_matching_a{l="y"} 20 -# Unsupported by streaming engine. -# eval instant at 50m http_requests{group="canary"} unless http_requests{instance="0"} -# http_requests{group="canary", instance="1", job="api-server"} 400 -# http_requests{group="canary", instance="1", job="app-server"} 800 +eval instant at 50m http_requests{group="canary"} unless http_requests{instance="0"} + http_requests{group="canary", instance="1", job="api-server"} 400 + http_requests{group="canary", instance="1", job="app-server"} 800 -# Unsupported by streaming engine. -# eval instant at 50m http_requests{group="canary"} unless on(job) http_requests{instance="0"} +eval instant at 50m http_requests{group="canary"} unless on(job) http_requests{instance="0"} -# Unsupported by streaming engine. -# eval instant at 50m http_requests{group="canary"} unless on(job, instance) http_requests{instance="0"} -# http_requests{group="canary", instance="1", job="api-server"} 400 -# http_requests{group="canary", instance="1", job="app-server"} 800 +eval instant at 50m http_requests{group="canary"} unless on(job, instance) http_requests{instance="0"} + http_requests{group="canary", instance="1", job="api-server"} 400 + http_requests{group="canary", instance="1", job="app-server"} 800 eval instant at 50m http_requests{group="canary"} / on(instance,job) http_requests{group="production"} {instance="0", job="api-server"} 3 @@ -229,13 +226,11 @@ eval instant at 50m http_requests{group="canary"} / on(instance,job) http_reques {instance="1", job="api-server"} 2 {instance="1", job="app-server"} 1.3333333333333333 -# Unsupported by streaming engine. -# eval instant at 50m http_requests{group="canary"} unless ignoring(group, instance) http_requests{instance="0"} +eval instant at 50m http_requests{group="canary"} unless ignoring(group, instance) http_requests{instance="0"} -# Unsupported by streaming engine. -# eval instant at 50m http_requests{group="canary"} unless ignoring(group) http_requests{instance="0"} -# http_requests{group="canary", instance="1", job="api-server"} 400 -# http_requests{group="canary", instance="1", job="app-server"} 800 +eval instant at 50m http_requests{group="canary"} unless ignoring(group) http_requests{instance="0"} + http_requests{group="canary", instance="1", job="api-server"} 400 + http_requests{group="canary", instance="1", job="app-server"} 800 eval instant at 50m http_requests{group="canary"} / ignoring(group) http_requests{group="production"} {instance="0", job="api-server"} 3 diff --git a/pkg/streamingpromql/testutils/utils.go b/pkg/streamingpromql/testutils/utils.go index 775fe9cdce9..922f094ba96 100644 --- a/pkg/streamingpromql/testutils/utils.go +++ b/pkg/streamingpromql/testutils/utils.go @@ -51,7 +51,7 @@ func RequireEqualResults(t testing.TB, expr string, expected, actual *promql.Res actualMatrix, err := actual.Matrix() require.NoError(t, err) - require.Len(t, actualMatrix, len(expectedMatrix)) + require.Lenf(t, actualMatrix, len(expectedMatrix), "expected result %v", expectedMatrix) for i, expectedSeries := range expectedMatrix { actualSeries := actualMatrix[i]