Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

MQE: create slices with capacity for number of steps remaining, not all possible steps for functions over range vectors #10007

Merged
merged 2 commits into from
Nov 25, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
* [CHANGE] Ingester: remove experimental flags `-ingest-storage.kafka.ongoing-records-per-fetch` and `-ingest-storage.kafka.startup-records-per-fetch`. They are removed in favour of `-ingest-storage.kafka.max-buffered-bytes`. #9906
* [CHANGE] Ingester: Replace `cortex_discarded_samples_total` label from `sample-out-of-bounds` to `sample-timestamp-too-old`. #9885
* [CHANGE] Ruler: the `/prometheus/config/v1/rules` does not return an error anymore if a rule group is missing in the object storage after been successfully returned by listing the storage, because it could have been deleted in the meanwhile. #9936
* [FEATURE] Querier: add experimental streaming PromQL engine, enabled with `-querier.query-engine=mimir`. #9367 #9368 #9398 #9399 #9403 #9417 #9418 #9419 #9420 #9482 #9504 #9505 #9507 #9518 #9531 #9532 #9533 #9553 #9558 #9588 #9589 #9639 #9641 #9642 #9651 #9664 #9681 #9717 #9719 #9724 #9874 #9998
* [FEATURE] Querier: add experimental streaming PromQL engine, enabled with `-querier.query-engine=mimir`. #9367 #9368 #9398 #9399 #9403 #9417 #9418 #9419 #9420 #9482 #9504 #9505 #9507 #9518 #9531 #9532 #9533 #9553 #9558 #9588 #9589 #9639 #9641 #9642 #9651 #9664 #9681 #9717 #9719 #9724 #9874 #9998 #10007
* [FEATURE] Distributor: Add support for `lz4` OTLP compression. #9763
* [FEATURE] Query-frontend: added experimental configuration options `query-frontend.cache-errors` and `query-frontend.results-cache-ttl-for-errors` to allow non-transient responses to be cached. When set to `true` error responses from hitting limits or bad data are cached for a short TTL. #9028
* [FEATURE] Query-frontend: add middleware to control access to specific PromQL experimental functions on a per-tenant basis. #9798
Expand Down
4 changes: 2 additions & 2 deletions pkg/streamingpromql/functions.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ func FunctionOverRangeVectorOperatorFactory(
name string,
f functions.FunctionOverRangeVectorDefinition,
) InstantVectorFunctionOperatorFactory {
return func(args []types.Operator, memoryConsumptionTracker *limiting.MemoryConsumptionTracker, annotations *annotations.Annotations, expressionPosition posrange.PositionRange, _ types.QueryTimeRange) (types.InstantVectorOperator, error) {
return func(args []types.Operator, memoryConsumptionTracker *limiting.MemoryConsumptionTracker, annotations *annotations.Annotations, expressionPosition posrange.PositionRange, timeRange types.QueryTimeRange) (types.InstantVectorOperator, error) {
if len(args) != 1 {
// Should be caught by the PromQL parser, but we check here for safety.
return nil, fmt.Errorf("expected exactly 1 argument for %s, got %v", name, len(args))
Expand All @@ -115,7 +115,7 @@ func FunctionOverRangeVectorOperatorFactory(
return nil, fmt.Errorf("expected a range vector argument for %s, got %T", name, args[0])
}

var o types.InstantVectorOperator = functions.NewFunctionOverRangeVector(inner, memoryConsumptionTracker, f, annotations, expressionPosition)
var o types.InstantVectorOperator = functions.NewFunctionOverRangeVector(inner, memoryConsumptionTracker, f, annotations, expressionPosition, timeRange)

if f.SeriesMetadataFunction.NeedsSeriesDeduplication {
o = operators.NewDeduplicateAndMerge(o, memoryConsumptionTracker)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ type FunctionOverRangeVector struct {
metricNames *operators.MetricNames
currentSeriesIndex int

numSteps int
timeRange types.QueryTimeRange
rangeSeconds float64

expressionPosition posrange.PositionRange
Expand All @@ -45,13 +45,15 @@ func NewFunctionOverRangeVector(
f FunctionOverRangeVectorDefinition,
annotations *annotations.Annotations,
expressionPosition posrange.PositionRange,
timeRange types.QueryTimeRange,
) *FunctionOverRangeVector {
o := &FunctionOverRangeVector{
Inner: inner,
MemoryConsumptionTracker: memoryConsumptionTracker,
Func: f,
Annotations: annotations,
expressionPosition: expressionPosition,
timeRange: timeRange,
}

if f.SeriesValidationFuncFactory != nil {
Expand Down Expand Up @@ -81,7 +83,6 @@ func (m *FunctionOverRangeVector) SeriesMetadata(ctx context.Context) ([]types.S
m.metricNames.CaptureMetricNames(metadata)
}

m.numSteps = m.Inner.StepCount()
m.rangeSeconds = m.Inner.Range().Seconds()

if m.Func.SeriesMetadataFunction.Func != nil {
Expand Down Expand Up @@ -122,10 +123,11 @@ func (m *FunctionOverRangeVector) NextSeries(ctx context.Context) (types.Instant
}
if hasFloat {
if data.Floats == nil {
// Only get fPoint slice once we are sure we have float points.
// This potentially over-allocates as some points in the steps may be histograms,
// but this is expected to be rare.
data.Floats, err = types.FPointSlicePool.Get(m.numSteps, m.MemoryConsumptionTracker)
// Only get FPoint slice once we are sure we have float points.
// This potentially over-allocates as some points may be histograms, but this is expected to be rare.

remainingStepCount := m.timeRange.StepCount - int(m.timeRange.PointIndex(step.StepT)) // Only get a slice for the number of points remaining in the query range.
data.Floats, err = types.FPointSlicePool.Get(remainingStepCount, m.MemoryConsumptionTracker)
if err != nil {
return types.InstantVectorSeriesData{}, err
}
Expand All @@ -134,10 +136,11 @@ func (m *FunctionOverRangeVector) NextSeries(ctx context.Context) (types.Instant
}
if h != nil {
if data.Histograms == nil {
// Only get hPoint slice once we are sure we have histogram points.
// This potentially over-allocates as some points in the steps may be floats,
// but this is expected to be rare.
data.Histograms, err = types.HPointSlicePool.Get(m.numSteps, m.MemoryConsumptionTracker)
// Only get HPoint slice once we are sure we have histogram points.
// This potentially over-allocates as some points may be floats, but this is expected to be rare.

remainingStepCount := m.timeRange.StepCount - int(m.timeRange.PointIndex(step.StepT)) // Only get a slice for the number of points remaining in the query range.
data.Histograms, err = types.HPointSlicePool.Get(remainingStepCount, m.MemoryConsumptionTracker)
if err != nil {
return types.InstantVectorSeriesData{}, err
}
Expand Down
11 changes: 11 additions & 0 deletions pkg/streamingpromql/testdata/ours/functions.test
Original file line number Diff line number Diff line change
Expand Up @@ -537,3 +537,14 @@ eval range from 0 to 20m step 1m deriv(metric[3m])
{case="mixed float, NaN and Inf"} _ NaN NaN NaN NaN NaN NaN NaN NaN NaN NaN -0.4683333333333333 -0.016666666666666666 -0.01 -0.008333333333333333 -0.016666666666666666

clear

load 1m
some_metric{env="prod", cluster="eu"} _ _ _ 0+1x4
some_metric{env="prod", cluster="us"} _ _ _ 0+2x4
some_metric{env="prod", cluster="au"} _ _ _ {{count:5}}+{{count:5}}x4

# Function over range vector with many steps at beginning of range with no samples.
eval range from 0 to 7m step 1m last_over_time(some_metric[3m])
some_metric{env="prod", cluster="eu"} _ _ _ 0 1 2 3 4
some_metric{env="prod", cluster="us"} _ _ _ 0 2 4 6 8
some_metric{env="prod", cluster="au"} _ _ _ {{count:5}} {{count:10}} {{count:15}} {{count:20}} {{count:25}}
Loading