Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

MQE: fix issues with vector/vector binary comparsion operations #10235

Merged
merged 7 commits into from
Jan 10, 2025
Merged
3 changes: 3 additions & 0 deletions pkg/streamingpromql/benchmarks/benchmarks.go
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,9 @@ func TestCases(metricSizes []int) []BenchCase {
{
Expr: "nh_X / a_X",
},
{
Expr: "a_X == b_X",
},
{
Expr: "2 * a_X",
},
Expand Down
39 changes: 39 additions & 0 deletions pkg/streamingpromql/operators/binops/binary_operation.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,11 @@ package binops
import (
"fmt"
"slices"
"time"

"github.com/prometheus/prometheus/model/histogram"
"github.com/prometheus/prometheus/model/labels"
"github.com/prometheus/prometheus/model/timestamp"
"github.com/prometheus/prometheus/promql"
"github.com/prometheus/prometheus/promql/parser"
"github.com/prometheus/prometheus/promql/parser/posrange"
Expand Down Expand Up @@ -82,6 +84,43 @@ func groupLabelsFunc(vectorMatching parser.VectorMatching, op parser.ItemType, r
}
}

func formatConflictError(
firstConflictingSeriesIndex int,
secondConflictingSeriesIndex int,
description string,
ts int64,
sourceSeriesMetadata []types.SeriesMetadata,
side string,
vectorMatching parser.VectorMatching,
op parser.ItemType,
returnBool bool,
) error {
firstConflictingSeriesLabels := sourceSeriesMetadata[firstConflictingSeriesIndex].Labels
groupLabels := groupLabelsFunc(vectorMatching, op, returnBool)(firstConflictingSeriesLabels)

if secondConflictingSeriesIndex == -1 {
return fmt.Errorf(
"found %s for the match group %s on the %s side of the operation at timestamp %s",
description,
groupLabels,
side,
timestamp.Time(ts).Format(time.RFC3339Nano),
)
}

secondConflictingSeriesLabels := sourceSeriesMetadata[secondConflictingSeriesIndex].Labels

return fmt.Errorf(
"found %s for the match group %s on the %s side of the operation at timestamp %s: %s and %s",
description,
groupLabels,
side,
timestamp.Time(ts).Format(time.RFC3339Nano),
firstConflictingSeriesLabels,
secondConflictingSeriesLabels,
)
}

// filterSeries returns data filtered based on the mask provided.
//
// mask is expected to contain one value for each time step in the query time range.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,8 @@ import (
"fmt"
"slices"
"sort"
"time"

"github.com/prometheus/prometheus/model/labels"
"github.com/prometheus/prometheus/model/timestamp"
"github.com/prometheus/prometheus/promql/parser"
"github.com/prometheus/prometheus/promql/parser/posrange"
"github.com/prometheus/prometheus/util/annotations"
Expand Down Expand Up @@ -618,13 +616,13 @@ func (g *GroupedVectorVectorBinaryOperation) updateOneSidePresence(side *oneSide

for _, p := range seriesData.Floats {
if otherSeriesIdx := matchGroup.updatePresence(g.timeRange.PointIndex(p.T), seriesIdx); otherSeriesIdx != -1 {
return g.formatConflictError(otherSeriesIdx, seriesIdx, "duplicate series", p.T, g.oneSideMetadata, g.oneSideHandedness())
return formatConflictError(otherSeriesIdx, seriesIdx, "duplicate series", p.T, g.oneSideMetadata, g.oneSideHandedness(), g.VectorMatching, g.Op, g.ReturnBool)
}
}

for _, p := range seriesData.Histograms {
if otherSeriesIdx := matchGroup.updatePresence(g.timeRange.PointIndex(p.T), seriesIdx); otherSeriesIdx != -1 {
return g.formatConflictError(otherSeriesIdx, seriesIdx, "duplicate series", p.T, g.oneSideMetadata, g.oneSideHandedness())
return formatConflictError(otherSeriesIdx, seriesIdx, "duplicate series", p.T, g.oneSideMetadata, g.oneSideHandedness(), g.VectorMatching, g.Op, g.ReturnBool)
}
}
}
Expand All @@ -646,7 +644,8 @@ func (g *GroupedVectorVectorBinaryOperation) mergeOneSide(data []types.InstantVe
}

if conflict != nil {
return types.InstantVectorSeriesData{}, g.formatConflictError(conflict.FirstConflictingSeriesIndex, conflict.SecondConflictingSeriesIndex, conflict.Description, conflict.Timestamp, g.oneSideMetadata, g.oneSideHandedness())
err := formatConflictError(conflict.FirstConflictingSeriesIndex, conflict.SecondConflictingSeriesIndex, conflict.Description, conflict.Timestamp, g.oneSideMetadata, g.oneSideHandedness(), g.VectorMatching, g.Op, g.ReturnBool)
return types.InstantVectorSeriesData{}, err
}

return merged, nil
Expand Down Expand Up @@ -689,40 +688,6 @@ func (g *GroupedVectorVectorBinaryOperation) mergeManySide(data []types.InstantV
return merged, nil
}

func (g *GroupedVectorVectorBinaryOperation) formatConflictError(
firstConflictingSeriesIndex int,
secondConflictingSeriesIndex int,
description string,
ts int64,
sourceSeriesMetadata []types.SeriesMetadata,
side string,
) error {
firstConflictingSeriesLabels := sourceSeriesMetadata[firstConflictingSeriesIndex].Labels
groupLabels := groupLabelsFunc(g.VectorMatching, g.Op, g.ReturnBool)(firstConflictingSeriesLabels)

if secondConflictingSeriesIndex == -1 {
return fmt.Errorf(
"found %s for the match group %s on the %s side of the operation at timestamp %s",
description,
groupLabels,
side,
timestamp.Time(ts).Format(time.RFC3339Nano),
)
}

secondConflictingSeriesLabels := sourceSeriesMetadata[secondConflictingSeriesIndex].Labels

return fmt.Errorf(
"found %s for the match group %s on the %s side of the operation at timestamp %s: %s and %s",
description,
groupLabels,
side,
timestamp.Time(ts).Format(time.RFC3339Nano),
firstConflictingSeriesLabels,
secondConflictingSeriesLabels,
)
}

func (g *GroupedVectorVectorBinaryOperation) oneSideHandedness() string {
switch g.VectorMatching.Card {
case parser.CardOneToMany:
Expand Down
Loading
Loading