Skip to content

Commit

Permalink
MQE: add support for unless (#9533)
Browse files Browse the repository at this point in the history
* Enable upstream test cases

* Add test cases

* Update existing tests

* Enable benchmark

* Initial implementation

* Add changelog entry
  • Loading branch information
charleskorn authored Oct 7, 2024
1 parent da1051f commit e69a4c5
Show file tree
Hide file tree
Showing 8 changed files with 141 additions and 64 deletions.
2 changes: 1 addition & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
* `cortex_alertmanager_alerts`
* `cortex_alertmanager_silences`
* [CHANGE] Cache: Deprecate experimental support for Redis as a cache backend. #9453
* [FEATURE] Querier: add experimental streaming PromQL engine, enabled with `-querier.query-engine=mimir`. #9367 #9368 #9398 #9399 #9403 #9417 #9418 #9419 #9420 #9482 #9504 #9505 #9507 #9518
* [FEATURE] Querier: add experimental streaming PromQL engine, enabled with `-querier.query-engine=mimir`. #9367 #9368 #9398 #9399 #9403 #9417 #9418 #9419 #9420 #9482 #9504 #9505 #9507 #9518 #9533
* [FEATURE] Query-frontend: added experimental configuration options `query-frontend.cache-errors` and `query-frontend.results-cache-ttl-for-errors` to allow non-transient responses to be cached. When set to `true` error responses from hitting limits or bad data are cached for a short TTL. #9028
* [FEATURE] gRPC: Support S2 compression. #9322
* `-alertmanager.alertmanager-client.grpc-compression=s2`
Expand Down
6 changes: 3 additions & 3 deletions pkg/streamingpromql/benchmarks/benchmarks.go
Original file line number Diff line number Diff line change
Expand Up @@ -173,9 +173,9 @@ func TestCases(metricSizes []int) []BenchCase {
//{
// Expr: "a_X or b_X{l=~'.*[0-4]$'}",
//},
//{
// Expr: "a_X unless b_X{l=~'.*[0-4]$'}",
//},
{
Expr: "a_X unless b_X{l=~'.*[0-4]$'}",
},
{
Expr: "a_X and b_X{l='notfound'}",
},
Expand Down
3 changes: 1 addition & 2 deletions pkg/streamingpromql/engine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,6 @@ func TestUnsupportedPromQLFeatures(t *testing.T) {
// different cases and make sure we produce a reasonable error message when these cases are encountered.
unsupportedExpressions := map[string]string{
"metric{} or other_metric{}": "binary expression with 'or'",
"metric{} unless other_metric{}": "binary expression with 'unless'",
"metric{} + on() group_left() other_metric{}": "binary expression with many-to-one matching",
"metric{} + on() group_right() other_metric{}": "binary expression with one-to-many matching",
"topk(5, metric{})": "'topk' aggregation with parameter",
Expand Down Expand Up @@ -1903,7 +1902,7 @@ func TestCompareVariousMixedMetricsBinaryOperations(t *testing.T) {
expressions := []string{}

for _, labels := range labelCombinations {
for _, op := range []string{"+", "-", "*", "/", "and"} {
for _, op := range []string{"+", "-", "*", "/", "and", "unless"} {
binaryExpr := fmt.Sprintf(`series{label="%s"}`, labels[0])
for _, label := range labels[1:] {
binaryExpr += fmt.Sprintf(` %s series{label="%s"}`, op, label)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,13 @@ import (
"github.com/grafana/mimir/pkg/streamingpromql/types"
)

// AndBinaryOperation represents a logical 'and' between two vectors.
type AndBinaryOperation struct {
// AndUnlessBinaryOperation represents a logical 'and' or 'unless' between two vectors.
type AndUnlessBinaryOperation struct {
Left types.InstantVectorOperator
Right types.InstantVectorOperator
VectorMatching parser.VectorMatching
MemoryConsumptionTracker *limiting.MemoryConsumptionTracker
IsUnless bool // If true, this operator represents an 'unless', if false, this operator represents an 'and'

timeRange types.QueryTimeRange
expressionPosition posrange.PositionRange
Expand All @@ -26,27 +27,29 @@ type AndBinaryOperation struct {
nextRightSeriesIndex int
}

var _ types.InstantVectorOperator = &AndBinaryOperation{}
var _ types.InstantVectorOperator = &AndUnlessBinaryOperation{}

func NewAndBinaryOperation(
func NewAndUnlessBinaryOperation(
left types.InstantVectorOperator,
right types.InstantVectorOperator,
vectorMatching parser.VectorMatching,
memoryConsumptionTracker *limiting.MemoryConsumptionTracker,
isUnless bool,
timeRange types.QueryTimeRange,
expressionPosition posrange.PositionRange,
) *AndBinaryOperation {
return &AndBinaryOperation{
) *AndUnlessBinaryOperation {
return &AndUnlessBinaryOperation{
Left: left,
Right: right,
VectorMatching: vectorMatching,
MemoryConsumptionTracker: memoryConsumptionTracker,
IsUnless: isUnless,
timeRange: timeRange,
expressionPosition: expressionPosition,
}
}

func (a *AndBinaryOperation) SeriesMetadata(ctx context.Context) ([]types.SeriesMetadata, error) {
func (a *AndUnlessBinaryOperation) SeriesMetadata(ctx context.Context) ([]types.SeriesMetadata, error) {
leftMetadata, err := a.Left.SeriesMetadata(ctx)
if err != nil {
return nil, err
Expand All @@ -65,7 +68,7 @@ func (a *AndBinaryOperation) SeriesMetadata(ctx context.Context) ([]types.Series

defer types.PutSeriesMetadataSlice(rightMetadata)

if len(rightMetadata) == 0 {
if len(rightMetadata) == 0 && !a.IsUnless {
// We can't produce any series, we are done.
return nil, nil
}
Expand Down Expand Up @@ -104,6 +107,14 @@ func (a *AndBinaryOperation) SeriesMetadata(ctx context.Context) ([]types.Series
a.rightSeriesGroups = append(a.rightSeriesGroups, group)
}

if a.IsUnless {
return a.computeUnlessSeriesMetadata(leftMetadata), nil
}

return a.computeAndSeriesMetadata(leftMetadata), nil
}

func (a *AndUnlessBinaryOperation) computeAndSeriesMetadata(leftMetadata []types.SeriesMetadata) []types.SeriesMetadata {
// Iterate through the left-hand series again, and build the list of output series based on those that matched at least one series on the right.
// It's safe to reuse the left metadata slice as we'll return series in the same order, and only ever return fewer series than the left operator produces.
nextOutputSeriesIndex := 0
Expand All @@ -119,10 +130,22 @@ func (a *AndBinaryOperation) SeriesMetadata(ctx context.Context) ([]types.Series
}
}

return leftMetadata[:nextOutputSeriesIndex], nil
return leftMetadata[:nextOutputSeriesIndex]
}

func (a *AndUnlessBinaryOperation) computeUnlessSeriesMetadata(leftMetadata []types.SeriesMetadata) []types.SeriesMetadata {
// Iterate through the left-hand series again, and remove references to any groups that don't match any series from the right side:
// we can just return the left-hand series as-is if it does not match anything from the right side.
for seriesIdx, group := range a.leftSeriesGroups {
if group.lastRightSeriesIndex == -1 {
a.leftSeriesGroups[seriesIdx] = nil
}
}

return leftMetadata
}

func (a *AndBinaryOperation) NextSeries(ctx context.Context) (types.InstantVectorSeriesData, error) {
func (a *AndUnlessBinaryOperation) NextSeries(ctx context.Context) (types.InstantVectorSeriesData, error) {
for {
if len(a.leftSeriesGroups) == 0 {
// No more series to return.
Expand All @@ -134,12 +157,17 @@ func (a *AndBinaryOperation) NextSeries(ctx context.Context) (types.InstantVecto

if thisSeriesGroup == nil {
// This series from the left side has no matching series on the right side.
// Read it, discard it, and move on to the next series.
d, err := a.Left.NextSeries(ctx)
if err != nil {
return types.InstantVectorSeriesData{}, err
}

if a.IsUnless {
// If this is an 'unless' operation, we should return the series as-is, as this series can't be filtered by anything on the right.
return d, nil
}

// If this is an 'and' operation, we should discard it and move on to the next series, as this series can't contribute to the result.
types.PutInstantVectorSeriesData(d, a.MemoryConsumptionTracker)
continue
}
Expand All @@ -156,7 +184,7 @@ func (a *AndBinaryOperation) NextSeries(ctx context.Context) (types.InstantVecto
return types.InstantVectorSeriesData{}, err
}

filteredData, err := thisSeriesGroup.FilterLeftSeries(originalData, a.MemoryConsumptionTracker, a.timeRange)
filteredData, err := thisSeriesGroup.FilterLeftSeries(originalData, a.MemoryConsumptionTracker, a.timeRange, a.IsUnless)
if err != nil {
return types.InstantVectorSeriesData{}, err
}
Expand All @@ -173,7 +201,7 @@ func (a *AndBinaryOperation) NextSeries(ctx context.Context) (types.InstantVecto
}

// readRightSideUntilGroupComplete reads series from the right-hand side until all series for desiredGroup have been read.
func (a *AndBinaryOperation) readRightSideUntilGroupComplete(ctx context.Context, desiredGroup *andGroup) error {
func (a *AndUnlessBinaryOperation) readRightSideUntilGroupComplete(ctx context.Context, desiredGroup *andGroup) error {
for a.nextRightSeriesIndex <= desiredGroup.lastRightSeriesIndex {
groupForRightSeries := a.rightSeriesGroups[0]
a.rightSeriesGroups = a.rightSeriesGroups[1:]
Expand All @@ -196,11 +224,11 @@ func (a *AndBinaryOperation) readRightSideUntilGroupComplete(ctx context.Context
return nil
}

func (a *AndBinaryOperation) ExpressionPosition() posrange.PositionRange {
func (a *AndUnlessBinaryOperation) ExpressionPosition() posrange.PositionRange {
return a.expressionPosition
}

func (a *AndBinaryOperation) Close() {
func (a *AndUnlessBinaryOperation) Close() {
a.Left.Close()
a.Right.Close()
}
Expand Down Expand Up @@ -237,12 +265,12 @@ func (g *andGroup) AccumulateRightSeriesPresence(data types.InstantVectorSeriesD

// FilterLeftSeries returns leftData filtered based on samples seen for the right-hand side.
// The return value reuses the slices from leftData, and returns any unused slices to the pool.
func (g *andGroup) FilterLeftSeries(leftData types.InstantVectorSeriesData, memoryConsumptionTracker *limiting.MemoryConsumptionTracker, timeRange types.QueryTimeRange) (types.InstantVectorSeriesData, error) {
func (g *andGroup) FilterLeftSeries(leftData types.InstantVectorSeriesData, memoryConsumptionTracker *limiting.MemoryConsumptionTracker, timeRange types.QueryTimeRange, isUnless bool) (types.InstantVectorSeriesData, error) {
filteredData := types.InstantVectorSeriesData{}
nextOutputFloatIndex := 0

for _, p := range leftData.Floats {
if !g.rightSamplePresence[timeRange.PointIndex(p.T)] {
if g.rightSamplePresence[timeRange.PointIndex(p.T)] == isUnless {
continue
}

Expand All @@ -261,7 +289,7 @@ func (g *andGroup) FilterLeftSeries(leftData types.InstantVectorSeriesData, memo
nextOutputHistogramIndex := 0

for idx, p := range leftData.Histograms {
if !g.rightSamplePresence[timeRange.PointIndex(p.T)] {
if g.rightSamplePresence[timeRange.PointIndex(p.T)] == isUnless {
continue
}

Expand Down
4 changes: 2 additions & 2 deletions pkg/streamingpromql/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -237,8 +237,8 @@ func (q *Query) convertToInstantVectorOperator(expr parser.Expr) (types.InstantV
}

switch e.Op {
case parser.LAND:
return operators.NewAndBinaryOperation(lhs, rhs, *e.VectorMatching, q.memoryConsumptionTracker, q.timeRange, e.PositionRange()), nil
case parser.LAND, parser.LUNLESS:
return operators.NewAndUnlessBinaryOperation(lhs, rhs, *e.VectorMatching, q.memoryConsumptionTracker, e.Op == parser.LUNLESS, q.timeRange, e.PositionRange()), nil
default:
return operators.NewVectorVectorBinaryOperation(lhs, rhs, *e.VectorMatching, e.Op, e.ReturnBool, q.memoryConsumptionTracker, q.annotations, e.PositionRange())
}
Expand Down
97 changes: 76 additions & 21 deletions pkg/streamingpromql/testdata/ours/binary_operators.test
Original file line number Diff line number Diff line change
Expand Up @@ -766,14 +766,29 @@ clear
# Single matching series on each side.
load 6m
left_side{env="test", pod="a"} 1 2 3 4 {{count:5 sum:5}}
left_side{env="test", pod="b"} 6 7 8 _ 10
left_side{env="test", pod="b"} 6 7 8 _ {{count:10 sum:10}}
left_side{env="prod", pod="a"} 11 12 13 14 15
left_side{env="prod", pod="b"} 16 17 18 _ _
right_side{env="test", pod="a"} 0 0 0 {{count:0 sum:0}} {{count:0 sum:0}}
right_side{env="test", pod="b"} _ 0 _ 0 _
right_side{env="prod", pod="b"} _ _ _ 0 0
right_side{env="foo", pod="a"} 0 0 0 0 0

eval range from 0 to 24m step 6m left_side and does_not_match
# Should return no results.

eval range from 0 to 24m step 6m does_not_match and right_side
# Should return no results.

eval range from 0 to 24m step 6m left_side unless does_not_match
left_side{env="test", pod="a"} 1 2 3 4 {{count:5 sum:5}}
left_side{env="test", pod="b"} 6 7 8 _ {{count:10 sum:10}}
left_side{env="prod", pod="a"} 11 12 13 14 15
left_side{env="prod", pod="b"} 16 17 18 _ _

eval range from 0 to 24m step 6m does_not_match unless right_side
# Should return no results.

# {env="test", pod="a"}: Matching series, all samples align
# {env="test", pod="b"}: Matching series, only some samples align
# {env="prod", pod="a"}: No matching series on RHS
Expand All @@ -783,6 +798,11 @@ eval range from 0 to 24m step 6m left_side and right_side
left_side{env="test", pod="a"} 1 2 3 4 {{count:5 sum:5}}
left_side{env="test", pod="b"} _ 7 _ _ _

eval range from 0 to 24m step 6m left_side unless right_side
left_side{env="test", pod="b"} 6 _ 8 _ {{count:10 sum:10}}
left_side{env="prod", pod="a"} 11 12 13 14 15
left_side{env="prod", pod="b"} 16 17 18 _ _

clear

# Multiple matching series on each side.
Expand All @@ -800,32 +820,67 @@ load 6m
eval range from 0 to 24m step 6m left_side and right_side
# Should return no results.

eval range from 0 to 24m step 6m left_side unless right_side
left_side{env="test", cluster="blah", pod="a"} 1 2 3 4
left_side{env="test", cluster="blah", pod="b"} _ 6 7 8
left_side{env="prod", cluster="blah", pod="a"} 9 10 11 12
left_side{env="prod", cluster="blah", pod="b"} 13 14 15 16
left_side{env="test", cluster="food", pod="a"} 17 18 19 20

eval range from 0 to 24m step 6m left_side and on(cluster, env) right_side
left_side{env="test", cluster="blah", pod="a"} 1 2 3 _
left_side{env="test", cluster="blah", pod="b"} _ 6 7 _
left_side{env="prod", cluster="blah", pod="a"} _ 10 _ 12
left_side{env="prod", cluster="blah", pod="b"} _ 14 _ 16
left_side{env="test", cluster="food", pod="a"} _ _ _ 20
left_side{env="test", cluster="blah", pod="a"} 1 2 3 _
left_side{env="test", cluster="blah", pod="b"} _ 6 7 _
left_side{env="prod", cluster="blah", pod="a"} _ 10 _ 12
left_side{env="prod", cluster="blah", pod="b"} _ 14 _ 16
left_side{env="test", cluster="food", pod="a"} _ _ _ 20

eval range from 0 to 24m step 6m left_side unless on(cluster, env) right_side
left_side{env="test", cluster="blah", pod="a"} _ _ _ 4
left_side{env="test", cluster="blah", pod="b"} _ _ _ 8
left_side{env="prod", cluster="blah", pod="a"} 9 _ 11 _
left_side{env="prod", cluster="blah", pod="b"} 13 _ 15 _
left_side{env="test", cluster="food", pod="a"} 17 18 19 _

# Same thing again, with labels in different order.
eval range from 0 to 24m step 6m left_side and on(env, cluster) right_side
left_side{env="test", cluster="blah", pod="a"} 1 2 3 _
left_side{env="test", cluster="blah", pod="b"} _ 6 7 _
left_side{env="prod", cluster="blah", pod="a"} _ 10 _ 12
left_side{env="prod", cluster="blah", pod="b"} _ 14 _ 16
left_side{env="test", cluster="food", pod="a"} _ _ _ 20
left_side{env="test", cluster="blah", pod="a"} 1 2 3 _
left_side{env="test", cluster="blah", pod="b"} _ 6 7 _
left_side{env="prod", cluster="blah", pod="a"} _ 10 _ 12
left_side{env="prod", cluster="blah", pod="b"} _ 14 _ 16
left_side{env="test", cluster="food", pod="a"} _ _ _ 20

eval range from 0 to 24m step 6m left_side unless on(env, cluster) right_side
left_side{env="test", cluster="blah", pod="a"} _ _ _ 4
left_side{env="test", cluster="blah", pod="b"} _ _ _ 8
left_side{env="prod", cluster="blah", pod="a"} 9 _ 11 _
left_side{env="prod", cluster="blah", pod="b"} 13 _ 15 _
left_side{env="test", cluster="food", pod="a"} 17 18 19 _

eval range from 0 to 24m step 6m left_side and ignoring(idx, pod) right_side
left_side{env="test", cluster="blah", pod="a"} 1 2 3 _
left_side{env="test", cluster="blah", pod="b"} _ 6 7 _
left_side{env="prod", cluster="blah", pod="a"} _ 10 _ 12
left_side{env="prod", cluster="blah", pod="b"} _ 14 _ 16
left_side{env="test", cluster="food", pod="a"} _ _ _ 20
left_side{env="test", cluster="blah", pod="a"} 1 2 3 _
left_side{env="test", cluster="blah", pod="b"} _ 6 7 _
left_side{env="prod", cluster="blah", pod="a"} _ 10 _ 12
left_side{env="prod", cluster="blah", pod="b"} _ 14 _ 16
left_side{env="test", cluster="food", pod="a"} _ _ _ 20

eval range from 0 to 24m step 6m left_side unless ignoring(idx, pod) right_side
left_side{env="test", cluster="blah", pod="a"} _ _ _ 4
left_side{env="test", cluster="blah", pod="b"} _ _ _ 8
left_side{env="prod", cluster="blah", pod="a"} 9 _ 11 _
left_side{env="prod", cluster="blah", pod="b"} 13 _ 15 _
left_side{env="test", cluster="food", pod="a"} 17 18 19 _

# Same thing again, with labels in different order.
eval range from 0 to 24m step 6m left_side and ignoring(pod, idx) right_side
left_side{env="test", cluster="blah", pod="a"} 1 2 3 _
left_side{env="test", cluster="blah", pod="b"} _ 6 7 _
left_side{env="prod", cluster="blah", pod="a"} _ 10 _ 12
left_side{env="prod", cluster="blah", pod="b"} _ 14 _ 16
left_side{env="test", cluster="food", pod="a"} _ _ _ 20
left_side{env="test", cluster="blah", pod="a"} 1 2 3 _
left_side{env="test", cluster="blah", pod="b"} _ 6 7 _
left_side{env="prod", cluster="blah", pod="a"} _ 10 _ 12
left_side{env="prod", cluster="blah", pod="b"} _ 14 _ 16
left_side{env="test", cluster="food", pod="a"} _ _ _ 20

eval range from 0 to 24m step 6m left_side unless ignoring(pod, idx) right_side
left_side{env="test", cluster="blah", pod="a"} _ _ _ 4
left_side{env="test", cluster="blah", pod="b"} _ _ _ 8
left_side{env="prod", cluster="blah", pod="a"} 9 _ 11 _
left_side{env="prod", cluster="blah", pod="b"} 13 _ 15 _
left_side{env="test", cluster="food", pod="a"} 17 18 19 _
27 changes: 11 additions & 16 deletions pkg/streamingpromql/testdata/upstream/operators.test
Original file line number Diff line number Diff line change
Expand Up @@ -210,32 +210,27 @@ eval instant at 50m (http_requests{group="canary"} + 1) and ignoring(group, job)
# vector_matching_a{l="x"} 10
# vector_matching_a{l="y"} 20

# Unsupported by streaming engine.
# eval instant at 50m http_requests{group="canary"} unless http_requests{instance="0"}
# http_requests{group="canary", instance="1", job="api-server"} 400
# http_requests{group="canary", instance="1", job="app-server"} 800
eval instant at 50m http_requests{group="canary"} unless http_requests{instance="0"}
http_requests{group="canary", instance="1", job="api-server"} 400
http_requests{group="canary", instance="1", job="app-server"} 800

# Unsupported by streaming engine.
# eval instant at 50m http_requests{group="canary"} unless on(job) http_requests{instance="0"}
eval instant at 50m http_requests{group="canary"} unless on(job) http_requests{instance="0"}

# Unsupported by streaming engine.
# eval instant at 50m http_requests{group="canary"} unless on(job, instance) http_requests{instance="0"}
# http_requests{group="canary", instance="1", job="api-server"} 400
# http_requests{group="canary", instance="1", job="app-server"} 800
eval instant at 50m http_requests{group="canary"} unless on(job, instance) http_requests{instance="0"}
http_requests{group="canary", instance="1", job="api-server"} 400
http_requests{group="canary", instance="1", job="app-server"} 800

eval instant at 50m http_requests{group="canary"} / on(instance,job) http_requests{group="production"}
{instance="0", job="api-server"} 3
{instance="0", job="app-server"} 1.4
{instance="1", job="api-server"} 2
{instance="1", job="app-server"} 1.3333333333333333

# Unsupported by streaming engine.
# eval instant at 50m http_requests{group="canary"} unless ignoring(group, instance) http_requests{instance="0"}
eval instant at 50m http_requests{group="canary"} unless ignoring(group, instance) http_requests{instance="0"}

# Unsupported by streaming engine.
# eval instant at 50m http_requests{group="canary"} unless ignoring(group) http_requests{instance="0"}
# http_requests{group="canary", instance="1", job="api-server"} 400
# http_requests{group="canary", instance="1", job="app-server"} 800
eval instant at 50m http_requests{group="canary"} unless ignoring(group) http_requests{instance="0"}
http_requests{group="canary", instance="1", job="api-server"} 400
http_requests{group="canary", instance="1", job="app-server"} 800

eval instant at 50m http_requests{group="canary"} / ignoring(group) http_requests{group="production"}
{instance="0", job="api-server"} 3
Expand Down
Loading

0 comments on commit e69a4c5

Please sign in to comment.