Skip to content

Commit

Permalink
feat(logqlmetric): implement absent_over_time
Browse files Browse the repository at this point in the history
  • Loading branch information
tdakkota committed Apr 26, 2024
1 parent f4f52af commit b781f38
Show file tree
Hide file tree
Showing 2 changed files with 43 additions and 8 deletions.
9 changes: 9 additions & 0 deletions internal/logql/logqlengine/logqlmetric/aggregator.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ func buildBatchAggregator(expr *logql.RangeAggregationExpr) (BatchAggregator, er
case logql.RangeOpLast:
return &LastOverTime{}, nil
case logql.RangeOpAbsent:
return &absentOverTime{}, nil
default:
return nil, errors.Errorf("unexpected range operation %q", expr.Op)
}
Expand Down Expand Up @@ -132,3 +133,11 @@ func (LastOverTime) Aggregate(points []FPoint) (last float64) {
}
return points[len(points)-1].Value
}

// absentOverTime implements `absent_over_time` aggregation.
type absentOverTime struct{}

// Aggregate implements BatchAggregator.
func (absentOverTime) Aggregate([]FPoint) float64 {
return 1.
}
42 changes: 34 additions & 8 deletions internal/logql/logqlengine/logqlmetric/range_agg.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,8 @@ var nopGrouper = func(al logqlabels.AggregatedLabels, _ ...logql.Label) logqlabe
type rangeAggIterator struct {
iter iterators.Iterator[SampledEntry]

agg BatchAggregator
agg BatchAggregator
absentLabels logqlabels.AggregatedLabels
// step state
stepper stepper

Expand Down Expand Up @@ -49,6 +50,21 @@ func RangeAggregation(
if err != nil {
return nil, errors.Wrap(err, "build aggregator")
}
var absentLabels logqlabels.AggregatedLabels
if expr.Op == logql.RangeOpAbsent {
matchers := expr.Range.Sel.Matchers
if len(matchers) == 0 {
absentLabels = logqlabels.EmptyAggregatedLabels()
} else {
labels := map[string]string{}
for _, m := range matchers {
if _, ok := labels[string(m.Label)]; !ok && m.Op == logql.OpEq {
labels[string(m.Label)] = m.Value
}
}
absentLabels = logqlabels.AggregatedLabelsFromMap(labels)
}
}

var (
grouper = nopGrouper
Expand All @@ -66,8 +82,9 @@ func RangeAggregation(
return &rangeAggIterator{
iter: iter,

agg: agg,
stepper: newStepper(start, end, step),
agg: agg,
absentLabels: absentLabels,
stepper: newStepper(start, end, step),

grouper: grouper,
groupLabels: groupLabels,
Expand All @@ -91,11 +108,20 @@ func (i *rangeAggIterator) Next(r *Step) bool {
// Aggregate the window.
r.Timestamp = otelstorage.NewTimestampFromTime(current)
r.Samples = r.Samples[:0]
for _, s := range i.window {
r.Samples = append(r.Samples, Sample{
Data: i.agg.Aggregate(s.Data),
Set: s.Set,
})
if set := i.absentLabels; set != nil {
if len(i.window) == 0 {
r.Samples = append(r.Samples, Sample{
Data: i.agg.Aggregate(nil),
Set: set,
})
}
} else {
for _, s := range i.window {
r.Samples = append(r.Samples, Sample{
Data: i.agg.Aggregate(s.Data),
Set: s.Set,
})
}
}

return true
Expand Down

0 comments on commit b781f38

Please sign in to comment.