Skip to content

Commit

Permalink
Ruler: Move inner eval function (prometheus#15688)
Browse files Browse the repository at this point in the history
Refactoring in prevision of prometheus#15681

By moving the `eval` function outside of the for loop, we can modify the rule execution order more easily (without huge git changes)

Signed-off-by: Julien Duchesne <[email protected]>
  • Loading branch information
julienduchesne authored Dec 17, 2024
1 parent ff39806 commit 6151953
Showing 1 changed file with 120 additions and 123 deletions.
243 changes: 120 additions & 123 deletions rules/group.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,16 +23,16 @@ import (
"sync"
"time"

"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/codes"
"go.uber.org/atomic"

"github.com/prometheus/prometheus/promql/parser"

"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/common/model"
"github.com/prometheus/common/promslog"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/codes"

"github.com/prometheus/prometheus/model/labels"
"github.com/prometheus/prometheus/model/timestamp"
Expand Down Expand Up @@ -511,150 +511,148 @@ func (g *Group) CopyState(from *Group) {
// Rules can be evaluated concurrently if the `concurrent-rule-eval` feature flag is enabled.
func (g *Group) Eval(ctx context.Context, ts time.Time) {
var (
samplesTotal atomic.Float64
wg sync.WaitGroup
samplesTotal atomic.Float64
ruleQueryOffset = g.QueryOffset()
)

ruleQueryOffset := g.QueryOffset()

for i, rule := range g.rules {
select {
case <-g.done:
return
default:
eval := func(i int, rule Rule, cleanup func()) {
if cleanup != nil {
defer cleanup()
}

eval := func(i int, rule Rule, cleanup func()) {
if cleanup != nil {
defer cleanup()
}
logger := g.logger.With("name", rule.Name(), "index", i)
ctx, sp := otel.Tracer("").Start(ctx, "rule")
sp.SetAttributes(attribute.String("name", rule.Name()))
defer func(t time.Time) {
sp.End()

since := time.Since(t)
g.metrics.EvalDuration.Observe(since.Seconds())
rule.SetEvaluationDuration(since)
rule.SetEvaluationTimestamp(t)
}(time.Now())

logger := g.logger.With("name", rule.Name(), "index", i)
ctx, sp := otel.Tracer("").Start(ctx, "rule")
sp.SetAttributes(attribute.String("name", rule.Name()))
defer func(t time.Time) {
sp.End()
if sp.SpanContext().IsSampled() && sp.SpanContext().HasTraceID() {
logger = logger.With("trace_id", sp.SpanContext().TraceID())
}

since := time.Since(t)
g.metrics.EvalDuration.Observe(since.Seconds())
rule.SetEvaluationDuration(since)
rule.SetEvaluationTimestamp(t)
}(time.Now())
g.metrics.EvalTotal.WithLabelValues(GroupKey(g.File(), g.Name())).Inc()

if sp.SpanContext().IsSampled() && sp.SpanContext().HasTraceID() {
logger = logger.With("trace_id", sp.SpanContext().TraceID())
vector, err := rule.Eval(ctx, ruleQueryOffset, ts, g.opts.QueryFunc, g.opts.ExternalURL, g.Limit())
if err != nil {
rule.SetHealth(HealthBad)
rule.SetLastError(err)
sp.SetStatus(codes.Error, err.Error())
g.metrics.EvalFailures.WithLabelValues(GroupKey(g.File(), g.Name())).Inc()

// Canceled queries are intentional termination of queries. This normally
// happens on shutdown and thus we skip logging of any errors here.
var eqc promql.ErrQueryCanceled
if !errors.As(err, &eqc) {
logger.Warn("Evaluating rule failed", "rule", rule, "err", err)
}
return
}
rule.SetHealth(HealthGood)
rule.SetLastError(nil)
samplesTotal.Add(float64(len(vector)))

g.metrics.EvalTotal.WithLabelValues(GroupKey(g.File(), g.Name())).Inc()
if ar, ok := rule.(*AlertingRule); ok {
ar.sendAlerts(ctx, ts, g.opts.ResendDelay, g.interval, g.opts.NotifyFunc)
}
var (
numOutOfOrder = 0
numTooOld = 0
numDuplicates = 0
)

vector, err := rule.Eval(ctx, ruleQueryOffset, ts, g.opts.QueryFunc, g.opts.ExternalURL, g.Limit())
if err != nil {
app := g.opts.Appendable.Appender(ctx)
seriesReturned := make(map[string]labels.Labels, len(g.seriesInPreviousEval[i]))
defer func() {
if err := app.Commit(); err != nil {
rule.SetHealth(HealthBad)
rule.SetLastError(err)
sp.SetStatus(codes.Error, err.Error())
g.metrics.EvalFailures.WithLabelValues(GroupKey(g.File(), g.Name())).Inc()

// Canceled queries are intentional termination of queries. This normally
// happens on shutdown and thus we skip logging of any errors here.
var eqc promql.ErrQueryCanceled
if !errors.As(err, &eqc) {
logger.Warn("Evaluating rule failed", "rule", rule, "err", err)
}
logger.Warn("Rule sample appending failed", "err", err)
return
}
rule.SetHealth(HealthGood)
rule.SetLastError(nil)
samplesTotal.Add(float64(len(vector)))

if ar, ok := rule.(*AlertingRule); ok {
ar.sendAlerts(ctx, ts, g.opts.ResendDelay, g.interval, g.opts.NotifyFunc)
g.seriesInPreviousEval[i] = seriesReturned
}()

for _, s := range vector {
if s.H != nil {
_, err = app.AppendHistogram(0, s.Metric, s.T, nil, s.H)
} else {
app.SetOptions(g.appOpts)
_, err = app.Append(0, s.Metric, s.T, s.F)
}
var (
numOutOfOrder = 0
numTooOld = 0
numDuplicates = 0
)

app := g.opts.Appendable.Appender(ctx)
seriesReturned := make(map[string]labels.Labels, len(g.seriesInPreviousEval[i]))
defer func() {
if err := app.Commit(); err != nil {
rule.SetHealth(HealthBad)
rule.SetLastError(err)
sp.SetStatus(codes.Error, err.Error())
g.metrics.EvalFailures.WithLabelValues(GroupKey(g.File(), g.Name())).Inc()

logger.Warn("Rule sample appending failed", "err", err)
return
}
g.seriesInPreviousEval[i] = seriesReturned
}()

for _, s := range vector {
if s.H != nil {
_, err = app.AppendHistogram(0, s.Metric, s.T, nil, s.H)
} else {
app.SetOptions(g.appOpts)
_, err = app.Append(0, s.Metric, s.T, s.F)
if err != nil {
rule.SetHealth(HealthBad)
rule.SetLastError(err)
sp.SetStatus(codes.Error, err.Error())
unwrappedErr := errors.Unwrap(err)
if unwrappedErr == nil {
unwrappedErr = err
}

if err != nil {
rule.SetHealth(HealthBad)
rule.SetLastError(err)
sp.SetStatus(codes.Error, err.Error())
unwrappedErr := errors.Unwrap(err)
if unwrappedErr == nil {
unwrappedErr = err
}
switch {
case errors.Is(unwrappedErr, storage.ErrOutOfOrderSample):
numOutOfOrder++
logger.Debug("Rule evaluation result discarded", "err", err, "sample", s)
case errors.Is(unwrappedErr, storage.ErrTooOldSample):
numTooOld++
logger.Debug("Rule evaluation result discarded", "err", err, "sample", s)
case errors.Is(unwrappedErr, storage.ErrDuplicateSampleForTimestamp):
numDuplicates++
logger.Debug("Rule evaluation result discarded", "err", err, "sample", s)
default:
logger.Warn("Rule evaluation result discarded", "err", err, "sample", s)
}
} else {
buf := [1024]byte{}
seriesReturned[string(s.Metric.Bytes(buf[:]))] = s.Metric
switch {
case errors.Is(unwrappedErr, storage.ErrOutOfOrderSample):
numOutOfOrder++
logger.Debug("Rule evaluation result discarded", "err", err, "sample", s)
case errors.Is(unwrappedErr, storage.ErrTooOldSample):
numTooOld++
logger.Debug("Rule evaluation result discarded", "err", err, "sample", s)
case errors.Is(unwrappedErr, storage.ErrDuplicateSampleForTimestamp):
numDuplicates++
logger.Debug("Rule evaluation result discarded", "err", err, "sample", s)
default:
logger.Warn("Rule evaluation result discarded", "err", err, "sample", s)
}
} else {
buf := [1024]byte{}
seriesReturned[string(s.Metric.Bytes(buf[:]))] = s.Metric
}
if numOutOfOrder > 0 {
logger.Warn("Error on ingesting out-of-order result from rule evaluation", "num_dropped", numOutOfOrder)
}
if numTooOld > 0 {
logger.Warn("Error on ingesting too old result from rule evaluation", "num_dropped", numTooOld)
}
if numDuplicates > 0 {
logger.Warn("Error on ingesting results from rule evaluation with different value but same timestamp", "num_dropped", numDuplicates)
}
}
if numOutOfOrder > 0 {
logger.Warn("Error on ingesting out-of-order result from rule evaluation", "num_dropped", numOutOfOrder)
}
if numTooOld > 0 {
logger.Warn("Error on ingesting too old result from rule evaluation", "num_dropped", numTooOld)
}
if numDuplicates > 0 {
logger.Warn("Error on ingesting results from rule evaluation with different value but same timestamp", "num_dropped", numDuplicates)
}

for metric, lset := range g.seriesInPreviousEval[i] {
if _, ok := seriesReturned[metric]; !ok {
// Series no longer exposed, mark it stale.
_, err = app.Append(0, lset, timestamp.FromTime(ts.Add(-ruleQueryOffset)), math.Float64frombits(value.StaleNaN))
unwrappedErr := errors.Unwrap(err)
if unwrappedErr == nil {
unwrappedErr = err
}
switch {
case unwrappedErr == nil:
case errors.Is(unwrappedErr, storage.ErrOutOfOrderSample),
errors.Is(unwrappedErr, storage.ErrTooOldSample),
errors.Is(unwrappedErr, storage.ErrDuplicateSampleForTimestamp):
// Do not count these in logging, as this is expected if series
// is exposed from a different rule.
default:
logger.Warn("Adding stale sample failed", "sample", lset.String(), "err", err)
}
for metric, lset := range g.seriesInPreviousEval[i] {
if _, ok := seriesReturned[metric]; !ok {
// Series no longer exposed, mark it stale.
_, err = app.Append(0, lset, timestamp.FromTime(ts.Add(-ruleQueryOffset)), math.Float64frombits(value.StaleNaN))
unwrappedErr := errors.Unwrap(err)
if unwrappedErr == nil {
unwrappedErr = err
}
switch {
case unwrappedErr == nil:
case errors.Is(unwrappedErr, storage.ErrOutOfOrderSample),
errors.Is(unwrappedErr, storage.ErrTooOldSample),
errors.Is(unwrappedErr, storage.ErrDuplicateSampleForTimestamp):
// Do not count these in logging, as this is expected if series
// is exposed from a different rule.
default:
logger.Warn("Adding stale sample failed", "sample", lset.String(), "err", err)
}
}
}
}

var wg sync.WaitGroup
for i, rule := range g.rules {
select {
case <-g.done:
return
default:
}

if ctrl := g.concurrencyController; ctrl.Allow(ctx, g, rule) {
wg.Add(1)
Expand All @@ -667,7 +665,6 @@ func (g *Group) Eval(ctx context.Context, ts time.Time) {
eval(i, rule, nil)
}
}

wg.Wait()

g.metrics.GroupSamples.WithLabelValues(GroupKey(g.File(), g.Name())).Set(samplesTotal.Load())
Expand Down

0 comments on commit 6151953

Please sign in to comment.