Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

MQE: add support for unless #9533

Merged
merged 6 commits into from
Oct 7, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
* `cortex_alertmanager_alerts`
* `cortex_alertmanager_silences`
* [CHANGE] Cache: Deprecate experimental support for Redis as a cache backend. #9453
* [FEATURE] Querier: add experimental streaming PromQL engine, enabled with `-querier.query-engine=mimir`. #9367 #9368 #9398 #9399 #9403 #9417 #9418 #9419 #9420 #9482 #9504 #9505 #9507 #9518
* [FEATURE] Querier: add experimental streaming PromQL engine, enabled with `-querier.query-engine=mimir`. #9367 #9368 #9398 #9399 #9403 #9417 #9418 #9419 #9420 #9482 #9504 #9505 #9507 #9518 #9533
* [FEATURE] Query-frontend: added experimental configuration options `query-frontend.cache-errors` and `query-frontend.results-cache-ttl-for-errors` to allow non-transient responses to be cached. When set to `true` error responses from hitting limits or bad data are cached for a short TTL. #9028
* [FEATURE] gRPC: Support S2 compression. #9322
* `-alertmanager.alertmanager-client.grpc-compression=s2`
Expand Down
6 changes: 3 additions & 3 deletions pkg/streamingpromql/benchmarks/benchmarks.go
Original file line number Diff line number Diff line change
Expand Up @@ -173,9 +173,9 @@ func TestCases(metricSizes []int) []BenchCase {
//{
// Expr: "a_X or b_X{l=~'.*[0-4]$'}",
//},
//{
// Expr: "a_X unless b_X{l=~'.*[0-4]$'}",
//},
{
Expr: "a_X unless b_X{l=~'.*[0-4]$'}",
},
{
Expr: "a_X and b_X{l='notfound'}",
},
Expand Down
3 changes: 1 addition & 2 deletions pkg/streamingpromql/engine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,6 @@ func TestUnsupportedPromQLFeatures(t *testing.T) {
// different cases and make sure we produce a reasonable error message when these cases are encountered.
unsupportedExpressions := map[string]string{
"metric{} or other_metric{}": "binary expression with 'or'",
"metric{} unless other_metric{}": "binary expression with 'unless'",
"metric{} + on() group_left() other_metric{}": "binary expression with many-to-one matching",
"metric{} + on() group_right() other_metric{}": "binary expression with one-to-many matching",
"topk(5, metric{})": "'topk' aggregation with parameter",
Expand Down Expand Up @@ -1903,7 +1902,7 @@ func TestCompareVariousMixedMetricsBinaryOperations(t *testing.T) {
expressions := []string{}

for _, labels := range labelCombinations {
for _, op := range []string{"+", "-", "*", "/", "and"} {
for _, op := range []string{"+", "-", "*", "/", "and", "unless"} {
binaryExpr := fmt.Sprintf(`series{label="%s"}`, labels[0])
for _, label := range labels[1:] {
binaryExpr += fmt.Sprintf(` %s series{label="%s"}`, op, label)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,13 @@ import (
"github.com/grafana/mimir/pkg/streamingpromql/types"
)

// AndBinaryOperation represents a logical 'and' between two vectors.
type AndBinaryOperation struct {
// AndUnlessBinaryOperation represents a logical 'and' or 'unless' between two vectors.
type AndUnlessBinaryOperation struct {
Left types.InstantVectorOperator
Right types.InstantVectorOperator
VectorMatching parser.VectorMatching
MemoryConsumptionTracker *limiting.MemoryConsumptionTracker
IsUnless bool // If true, this operator represents an 'unless', if false, this operator represents an 'and'

timeRange types.QueryTimeRange
expressionPosition posrange.PositionRange
Expand All @@ -26,27 +27,29 @@ type AndBinaryOperation struct {
nextRightSeriesIndex int
}

var _ types.InstantVectorOperator = &AndBinaryOperation{}
var _ types.InstantVectorOperator = &AndUnlessBinaryOperation{}

func NewAndBinaryOperation(
func NewAndUnlessBinaryOperation(
left types.InstantVectorOperator,
right types.InstantVectorOperator,
vectorMatching parser.VectorMatching,
memoryConsumptionTracker *limiting.MemoryConsumptionTracker,
isUnless bool,
timeRange types.QueryTimeRange,
expressionPosition posrange.PositionRange,
) *AndBinaryOperation {
return &AndBinaryOperation{
) *AndUnlessBinaryOperation {
return &AndUnlessBinaryOperation{
Left: left,
Right: right,
VectorMatching: vectorMatching,
MemoryConsumptionTracker: memoryConsumptionTracker,
IsUnless: isUnless,
timeRange: timeRange,
expressionPosition: expressionPosition,
}
}

func (a *AndBinaryOperation) SeriesMetadata(ctx context.Context) ([]types.SeriesMetadata, error) {
func (a *AndUnlessBinaryOperation) SeriesMetadata(ctx context.Context) ([]types.SeriesMetadata, error) {
leftMetadata, err := a.Left.SeriesMetadata(ctx)
if err != nil {
return nil, err
Expand All @@ -65,7 +68,7 @@ func (a *AndBinaryOperation) SeriesMetadata(ctx context.Context) ([]types.Series

defer types.PutSeriesMetadataSlice(rightMetadata)

if len(rightMetadata) == 0 {
if len(rightMetadata) == 0 && !a.IsUnless {
// We can't produce any series, we are done.
return nil, nil
}
Expand Down Expand Up @@ -104,6 +107,14 @@ func (a *AndBinaryOperation) SeriesMetadata(ctx context.Context) ([]types.Series
a.rightSeriesGroups = append(a.rightSeriesGroups, group)
}

if a.IsUnless {
return a.computeUnlessSeriesMetadata(leftMetadata), nil
}

return a.computeAndSeriesMetadata(leftMetadata), nil
}

func (a *AndUnlessBinaryOperation) computeAndSeriesMetadata(leftMetadata []types.SeriesMetadata) []types.SeriesMetadata {
// Iterate through the left-hand series again, and build the list of output series based on those that matched at least one series on the right.
// It's safe to reuse the left metadata slice as we'll return series in the same order, and only ever return fewer series than the left operator produces.
nextOutputSeriesIndex := 0
Expand All @@ -119,10 +130,22 @@ func (a *AndBinaryOperation) SeriesMetadata(ctx context.Context) ([]types.Series
}
}

return leftMetadata[:nextOutputSeriesIndex], nil
return leftMetadata[:nextOutputSeriesIndex]
}

func (a *AndUnlessBinaryOperation) computeUnlessSeriesMetadata(leftMetadata []types.SeriesMetadata) []types.SeriesMetadata {
// Iterate through the left-hand series again, and remove references to any groups that don't match any series from the right side:
// we can just return the left-hand series as-is if it does not match anything from the right side.
for seriesIdx, group := range a.leftSeriesGroups {
if group.lastRightSeriesIndex == -1 {
a.leftSeriesGroups[seriesIdx] = nil
}
}

return leftMetadata
}

func (a *AndBinaryOperation) NextSeries(ctx context.Context) (types.InstantVectorSeriesData, error) {
func (a *AndUnlessBinaryOperation) NextSeries(ctx context.Context) (types.InstantVectorSeriesData, error) {
for {
if len(a.leftSeriesGroups) == 0 {
// No more series to return.
Expand All @@ -134,12 +157,17 @@ func (a *AndBinaryOperation) NextSeries(ctx context.Context) (types.InstantVecto

if thisSeriesGroup == nil {
// This series from the left side has no matching series on the right side.
// Read it, discard it, and move on to the next series.
d, err := a.Left.NextSeries(ctx)
if err != nil {
return types.InstantVectorSeriesData{}, err
}

if a.IsUnless {
// If this is an 'unless' operation, we should return the series as-is, as this series can't be filtered by anything on the right.
return d, nil
}

// If this is an 'and' operation, we should discard it and move on to the next series, as this series can't contribute to the result.
types.PutInstantVectorSeriesData(d, a.MemoryConsumptionTracker)
continue
}
Expand All @@ -156,7 +184,7 @@ func (a *AndBinaryOperation) NextSeries(ctx context.Context) (types.InstantVecto
return types.InstantVectorSeriesData{}, err
}

filteredData, err := thisSeriesGroup.FilterLeftSeries(originalData, a.MemoryConsumptionTracker, a.timeRange)
filteredData, err := thisSeriesGroup.FilterLeftSeries(originalData, a.MemoryConsumptionTracker, a.timeRange, a.IsUnless)
if err != nil {
return types.InstantVectorSeriesData{}, err
}
Expand All @@ -173,7 +201,7 @@ func (a *AndBinaryOperation) NextSeries(ctx context.Context) (types.InstantVecto
}

// readRightSideUntilGroupComplete reads series from the right-hand side until all series for desiredGroup have been read.
func (a *AndBinaryOperation) readRightSideUntilGroupComplete(ctx context.Context, desiredGroup *andGroup) error {
func (a *AndUnlessBinaryOperation) readRightSideUntilGroupComplete(ctx context.Context, desiredGroup *andGroup) error {
for a.nextRightSeriesIndex <= desiredGroup.lastRightSeriesIndex {
groupForRightSeries := a.rightSeriesGroups[0]
a.rightSeriesGroups = a.rightSeriesGroups[1:]
Expand All @@ -196,11 +224,11 @@ func (a *AndBinaryOperation) readRightSideUntilGroupComplete(ctx context.Context
return nil
}

func (a *AndBinaryOperation) ExpressionPosition() posrange.PositionRange {
func (a *AndUnlessBinaryOperation) ExpressionPosition() posrange.PositionRange {
return a.expressionPosition
}

func (a *AndBinaryOperation) Close() {
func (a *AndUnlessBinaryOperation) Close() {
a.Left.Close()
a.Right.Close()
}
Expand Down Expand Up @@ -237,12 +265,12 @@ func (g *andGroup) AccumulateRightSeriesPresence(data types.InstantVectorSeriesD

// FilterLeftSeries returns leftData filtered based on samples seen for the right-hand side.
// The return value reuses the slices from leftData, and returns any unused slices to the pool.
func (g *andGroup) FilterLeftSeries(leftData types.InstantVectorSeriesData, memoryConsumptionTracker *limiting.MemoryConsumptionTracker, timeRange types.QueryTimeRange) (types.InstantVectorSeriesData, error) {
func (g *andGroup) FilterLeftSeries(leftData types.InstantVectorSeriesData, memoryConsumptionTracker *limiting.MemoryConsumptionTracker, timeRange types.QueryTimeRange, isUnless bool) (types.InstantVectorSeriesData, error) {
filteredData := types.InstantVectorSeriesData{}
nextOutputFloatIndex := 0

for _, p := range leftData.Floats {
if !g.rightSamplePresence[timeRange.PointIndex(p.T)] {
if g.rightSamplePresence[timeRange.PointIndex(p.T)] == isUnless {
continue
}

Expand All @@ -261,7 +289,7 @@ func (g *andGroup) FilterLeftSeries(leftData types.InstantVectorSeriesData, memo
nextOutputHistogramIndex := 0

for idx, p := range leftData.Histograms {
if !g.rightSamplePresence[timeRange.PointIndex(p.T)] {
if g.rightSamplePresence[timeRange.PointIndex(p.T)] == isUnless {
continue
}

Expand Down
4 changes: 2 additions & 2 deletions pkg/streamingpromql/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -237,8 +237,8 @@ func (q *Query) convertToInstantVectorOperator(expr parser.Expr) (types.InstantV
}

switch e.Op {
case parser.LAND:
return operators.NewAndBinaryOperation(lhs, rhs, *e.VectorMatching, q.memoryConsumptionTracker, q.timeRange, e.PositionRange()), nil
case parser.LAND, parser.LUNLESS:
return operators.NewAndUnlessBinaryOperation(lhs, rhs, *e.VectorMatching, q.memoryConsumptionTracker, e.Op == parser.LUNLESS, q.timeRange, e.PositionRange()), nil
default:
return operators.NewVectorVectorBinaryOperation(lhs, rhs, *e.VectorMatching, e.Op, e.ReturnBool, q.memoryConsumptionTracker, q.annotations, e.PositionRange())
}
Expand Down
97 changes: 76 additions & 21 deletions pkg/streamingpromql/testdata/ours/binary_operators.test
Original file line number Diff line number Diff line change
Expand Up @@ -766,14 +766,29 @@ clear
# Single matching series on each side.
load 6m
left_side{env="test", pod="a"} 1 2 3 4 {{count:5 sum:5}}
left_side{env="test", pod="b"} 6 7 8 _ 10
left_side{env="test", pod="b"} 6 7 8 _ {{count:10 sum:10}}
left_side{env="prod", pod="a"} 11 12 13 14 15
left_side{env="prod", pod="b"} 16 17 18 _ _
right_side{env="test", pod="a"} 0 0 0 {{count:0 sum:0}} {{count:0 sum:0}}
right_side{env="test", pod="b"} _ 0 _ 0 _
right_side{env="prod", pod="b"} _ _ _ 0 0
right_side{env="foo", pod="a"} 0 0 0 0 0

eval range from 0 to 24m step 6m left_side and does_not_match
# Should return no results.

eval range from 0 to 24m step 6m does_not_match and right_side
# Should return no results.

eval range from 0 to 24m step 6m left_side unless does_not_match
left_side{env="test", pod="a"} 1 2 3 4 {{count:5 sum:5}}
left_side{env="test", pod="b"} 6 7 8 _ {{count:10 sum:10}}
left_side{env="prod", pod="a"} 11 12 13 14 15
left_side{env="prod", pod="b"} 16 17 18 _ _

eval range from 0 to 24m step 6m does_not_match unless right_side
# Should return no results.

# {env="test", pod="a"}: Matching series, all samples align
# {env="test", pod="b"}: Matching series, only some samples align
# {env="prod", pod="a"}: No matching series on RHS
Expand All @@ -783,6 +798,11 @@ eval range from 0 to 24m step 6m left_side and right_side
left_side{env="test", pod="a"} 1 2 3 4 {{count:5 sum:5}}
left_side{env="test", pod="b"} _ 7 _ _ _

eval range from 0 to 24m step 6m left_side unless right_side
left_side{env="test", pod="b"} 6 _ 8 _ {{count:10 sum:10}}
left_side{env="prod", pod="a"} 11 12 13 14 15
left_side{env="prod", pod="b"} 16 17 18 _ _

clear

# Multiple matching series on each side.
Expand All @@ -800,32 +820,67 @@ load 6m
eval range from 0 to 24m step 6m left_side and right_side
# Should return no results.

eval range from 0 to 24m step 6m left_side unless right_side
left_side{env="test", cluster="blah", pod="a"} 1 2 3 4
left_side{env="test", cluster="blah", pod="b"} _ 6 7 8
left_side{env="prod", cluster="blah", pod="a"} 9 10 11 12
left_side{env="prod", cluster="blah", pod="b"} 13 14 15 16
left_side{env="test", cluster="food", pod="a"} 17 18 19 20

eval range from 0 to 24m step 6m left_side and on(cluster, env) right_side
left_side{env="test", cluster="blah", pod="a"} 1 2 3 _
left_side{env="test", cluster="blah", pod="b"} _ 6 7 _
left_side{env="prod", cluster="blah", pod="a"} _ 10 _ 12
left_side{env="prod", cluster="blah", pod="b"} _ 14 _ 16
left_side{env="test", cluster="food", pod="a"} _ _ _ 20
left_side{env="test", cluster="blah", pod="a"} 1 2 3 _
left_side{env="test", cluster="blah", pod="b"} _ 6 7 _
left_side{env="prod", cluster="blah", pod="a"} _ 10 _ 12
left_side{env="prod", cluster="blah", pod="b"} _ 14 _ 16
left_side{env="test", cluster="food", pod="a"} _ _ _ 20

eval range from 0 to 24m step 6m left_side unless on(cluster, env) right_side
left_side{env="test", cluster="blah", pod="a"} _ _ _ 4
left_side{env="test", cluster="blah", pod="b"} _ _ _ 8
left_side{env="prod", cluster="blah", pod="a"} 9 _ 11 _
left_side{env="prod", cluster="blah", pod="b"} 13 _ 15 _
left_side{env="test", cluster="food", pod="a"} 17 18 19 _

# Same thing again, with labels in different order.
eval range from 0 to 24m step 6m left_side and on(env, cluster) right_side
left_side{env="test", cluster="blah", pod="a"} 1 2 3 _
left_side{env="test", cluster="blah", pod="b"} _ 6 7 _
left_side{env="prod", cluster="blah", pod="a"} _ 10 _ 12
left_side{env="prod", cluster="blah", pod="b"} _ 14 _ 16
left_side{env="test", cluster="food", pod="a"} _ _ _ 20
left_side{env="test", cluster="blah", pod="a"} 1 2 3 _
left_side{env="test", cluster="blah", pod="b"} _ 6 7 _
left_side{env="prod", cluster="blah", pod="a"} _ 10 _ 12
left_side{env="prod", cluster="blah", pod="b"} _ 14 _ 16
left_side{env="test", cluster="food", pod="a"} _ _ _ 20

eval range from 0 to 24m step 6m left_side unless on(env, cluster) right_side
left_side{env="test", cluster="blah", pod="a"} _ _ _ 4
left_side{env="test", cluster="blah", pod="b"} _ _ _ 8
left_side{env="prod", cluster="blah", pod="a"} 9 _ 11 _
left_side{env="prod", cluster="blah", pod="b"} 13 _ 15 _
left_side{env="test", cluster="food", pod="a"} 17 18 19 _

eval range from 0 to 24m step 6m left_side and ignoring(idx, pod) right_side
left_side{env="test", cluster="blah", pod="a"} 1 2 3 _
left_side{env="test", cluster="blah", pod="b"} _ 6 7 _
left_side{env="prod", cluster="blah", pod="a"} _ 10 _ 12
left_side{env="prod", cluster="blah", pod="b"} _ 14 _ 16
left_side{env="test", cluster="food", pod="a"} _ _ _ 20
left_side{env="test", cluster="blah", pod="a"} 1 2 3 _
left_side{env="test", cluster="blah", pod="b"} _ 6 7 _
left_side{env="prod", cluster="blah", pod="a"} _ 10 _ 12
left_side{env="prod", cluster="blah", pod="b"} _ 14 _ 16
left_side{env="test", cluster="food", pod="a"} _ _ _ 20

eval range from 0 to 24m step 6m left_side unless ignoring(idx, pod) right_side
left_side{env="test", cluster="blah", pod="a"} _ _ _ 4
left_side{env="test", cluster="blah", pod="b"} _ _ _ 8
left_side{env="prod", cluster="blah", pod="a"} 9 _ 11 _
left_side{env="prod", cluster="blah", pod="b"} 13 _ 15 _
left_side{env="test", cluster="food", pod="a"} 17 18 19 _

# Same thing again, with labels in different order.
eval range from 0 to 24m step 6m left_side and ignoring(pod, idx) right_side
left_side{env="test", cluster="blah", pod="a"} 1 2 3 _
left_side{env="test", cluster="blah", pod="b"} _ 6 7 _
left_side{env="prod", cluster="blah", pod="a"} _ 10 _ 12
left_side{env="prod", cluster="blah", pod="b"} _ 14 _ 16
left_side{env="test", cluster="food", pod="a"} _ _ _ 20
left_side{env="test", cluster="blah", pod="a"} 1 2 3 _
left_side{env="test", cluster="blah", pod="b"} _ 6 7 _
left_side{env="prod", cluster="blah", pod="a"} _ 10 _ 12
left_side{env="prod", cluster="blah", pod="b"} _ 14 _ 16
left_side{env="test", cluster="food", pod="a"} _ _ _ 20

eval range from 0 to 24m step 6m left_side unless ignoring(pod, idx) right_side
left_side{env="test", cluster="blah", pod="a"} _ _ _ 4
left_side{env="test", cluster="blah", pod="b"} _ _ _ 8
left_side{env="prod", cluster="blah", pod="a"} 9 _ 11 _
left_side{env="prod", cluster="blah", pod="b"} 13 _ 15 _
left_side{env="test", cluster="food", pod="a"} 17 18 19 _
27 changes: 11 additions & 16 deletions pkg/streamingpromql/testdata/upstream/operators.test
Original file line number Diff line number Diff line change
Expand Up @@ -210,32 +210,27 @@ eval instant at 50m (http_requests{group="canary"} + 1) and ignoring(group, job)
# vector_matching_a{l="x"} 10
# vector_matching_a{l="y"} 20

# Unsupported by streaming engine.
# eval instant at 50m http_requests{group="canary"} unless http_requests{instance="0"}
# http_requests{group="canary", instance="1", job="api-server"} 400
# http_requests{group="canary", instance="1", job="app-server"} 800
eval instant at 50m http_requests{group="canary"} unless http_requests{instance="0"}
http_requests{group="canary", instance="1", job="api-server"} 400
http_requests{group="canary", instance="1", job="app-server"} 800

# Unsupported by streaming engine.
# eval instant at 50m http_requests{group="canary"} unless on(job) http_requests{instance="0"}
eval instant at 50m http_requests{group="canary"} unless on(job) http_requests{instance="0"}

# Unsupported by streaming engine.
# eval instant at 50m http_requests{group="canary"} unless on(job, instance) http_requests{instance="0"}
# http_requests{group="canary", instance="1", job="api-server"} 400
# http_requests{group="canary", instance="1", job="app-server"} 800
eval instant at 50m http_requests{group="canary"} unless on(job, instance) http_requests{instance="0"}
http_requests{group="canary", instance="1", job="api-server"} 400
http_requests{group="canary", instance="1", job="app-server"} 800

eval instant at 50m http_requests{group="canary"} / on(instance,job) http_requests{group="production"}
{instance="0", job="api-server"} 3
{instance="0", job="app-server"} 1.4
{instance="1", job="api-server"} 2
{instance="1", job="app-server"} 1.3333333333333333

# Unsupported by streaming engine.
# eval instant at 50m http_requests{group="canary"} unless ignoring(group, instance) http_requests{instance="0"}
eval instant at 50m http_requests{group="canary"} unless ignoring(group, instance) http_requests{instance="0"}

# Unsupported by streaming engine.
# eval instant at 50m http_requests{group="canary"} unless ignoring(group) http_requests{instance="0"}
# http_requests{group="canary", instance="1", job="api-server"} 400
# http_requests{group="canary", instance="1", job="app-server"} 800
eval instant at 50m http_requests{group="canary"} unless ignoring(group) http_requests{instance="0"}
http_requests{group="canary", instance="1", job="api-server"} 400
http_requests{group="canary", instance="1", job="app-server"} 800

eval instant at 50m http_requests{group="canary"} / ignoring(group) http_requests{group="production"}
{instance="0", job="api-server"} 3
Expand Down
Loading
Loading