Skip to content

Commit

Permalink
Add aggregation specific errors (#1731)
Browse files Browse the repository at this point in the history
  • Loading branch information
robskillington authored Jun 14, 2019
1 parent 187e4c1 commit 75c2664
Show file tree
Hide file tree
Showing 6 changed files with 211 additions and 70 deletions.
13 changes: 8 additions & 5 deletions src/aggregator/aggregator/aggregator.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ import (
"github.com/m3db/m3/src/metrics/metric/id"
"github.com/m3db/m3/src/metrics/metric/unaggregated"
"github.com/m3db/m3/src/x/clock"
xerrors "github.com/m3db/m3/src/x/errors"
"github.com/m3db/m3/src/x/instrument"

"github.com/uber-go/tally"
Expand Down Expand Up @@ -706,10 +707,12 @@ func newAggregatorAddTimedMetrics(
}

func (m *aggregatorAddTimedMetrics) ReportError(err error) {
switch err {
case errTooFarInTheFuture:
switch {
case err == errTooFarInTheFuture ||
xerrors.InnerError(err) == errTooFarInTheFuture:
m.tooFarInTheFuture.Inc(1)
case errTooFarInThePast:
case err == errTooFarInThePast ||
xerrors.InnerError(err) == errTooFarInThePast:
m.tooFarInThePast.Inc(1)
default:
m.aggregatorAddMetricMetrics.ReportError(err)
Expand All @@ -736,8 +739,8 @@ func newAggregatorAddForwardedMetrics(
maxAllowedForwardingDelayFn MaxAllowedForwardingDelayFn,
) aggregatorAddForwardedMetrics {
return aggregatorAddForwardedMetrics{
aggregatorAddMetricMetrics: newAggregatorAddMetricMetrics(scope, samplingRate),
scope: scope,
aggregatorAddMetricMetrics: newAggregatorAddMetricMetrics(scope, samplingRate),
scope: scope,
maxAllowedForwardingDelayFn: maxAllowedForwardingDelayFn,
forwardingLatency: make(map[latencyBucketKey]tally.Histogram),
}
Expand Down
62 changes: 55 additions & 7 deletions src/aggregator/aggregator/entry.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ package aggregator
import (
"container/list"
"errors"
"fmt"
"sync"
"sync/atomic"
"time"
Expand Down Expand Up @@ -57,6 +58,7 @@ var (
errTooFarInTheFuture = xerrors.NewInvalidParamsError(errors.New("too far in the future"))
errTooFarInThePast = xerrors.NewInvalidParamsError(errors.New("too far in the past"))
errArrivedTooLate = xerrors.NewInvalidParamsError(errors.New("arrived too late"))
errTimestampFormat = time.RFC822Z
)

type rateLimitEntryMetrics struct {
Expand Down Expand Up @@ -640,8 +642,8 @@ func (e *Entry) addTimed(

// Reject datapoints that arrive too late or too early.
if err := e.checkTimestampForTimedMetric(
metric,
currTime.UnixNano(),
metric.TimeNanos,
metadata.StoragePolicy.Resolution().Window,
); err != nil {
e.RUnlock()
Expand Down Expand Up @@ -691,19 +693,47 @@ func (e *Entry) addTimed(
}

func (e *Entry) checkTimestampForTimedMetric(
currNanos, metricTimeNanos int64,
metric aggregated.Metric,
currNanos int64,
resolution time.Duration,
) error {
metricTimeNanos := metric.TimeNanos
timedBufferFuture := e.opts.BufferForFutureTimedMetric()
if metricTimeNanos-currNanos > timedBufferFuture.Nanoseconds() {
e.metrics.timed.tooFarInTheFuture.Inc(1)
return errTooFarInTheFuture
if !e.opts.VerboseErrors() {
// Don't return verbose errors if not enabled.
return errTooFarInTheFuture
}
timestamp := time.Unix(0, metricTimeNanos)
futureLimit := time.Unix(0, currNanos+timedBufferFuture.Nanoseconds())
err := fmt.Errorf("datapoint for aggregation too far in future: "+
"id=%s, off_by=%s, timestamp=%s, future_limit=%s, "+
"timestamp_unix_nanos=%d, future_limit_unix_nanos=%d",
metric.ID, timestamp.Sub(futureLimit).String(),
timestamp.Format(errTimestampFormat),
futureLimit.Format(errTimestampFormat),
timestamp.UnixNano(), futureLimit.UnixNano())
return xerrors.NewRenamedError(errTooFarInTheFuture, err)
}
bufferPastFn := e.opts.BufferForPastTimedMetricFn()
timedBufferPast := bufferPastFn(resolution)
if currNanos-metricTimeNanos > timedBufferPast.Nanoseconds() {
e.metrics.timed.tooFarInThePast.Inc(1)
return errTooFarInThePast
if !e.opts.VerboseErrors() {
// Don't return verbose errors if not enabled.
return errTooFarInThePast
}
timestamp := time.Unix(0, metricTimeNanos)
pastLimit := time.Unix(0, currNanos-timedBufferPast.Nanoseconds())
err := fmt.Errorf("datapoint for aggregation too far in past: "+
"id=%s, off_by=%s, timestamp=%s, past_limit=%s, "+
"timestamp_unix_nanos=%d, past_limit_unix_nanos=%d",
metric.ID, pastLimit.Sub(timestamp).String(),
timestamp.Format(errTimestampFormat),
pastLimit.Format(errTimestampFormat),
timestamp.UnixNano(), pastLimit.UnixNano())
return xerrors.NewRenamedError(errTooFarInThePast, err)
}
return nil
}
Expand Down Expand Up @@ -769,8 +799,8 @@ func (e *Entry) addForwarded(

// Reject datapoints that arrive too late.
if err := e.checkLatenessForForwardedMetric(
metric,
currTime.UnixNano(),
metric.TimeNanos,
metadata.StoragePolicy.Resolution().Window,
metadata.NumForwardedTimes,
); err != nil {
Expand Down Expand Up @@ -823,17 +853,35 @@ func (e *Entry) addForwarded(
}

func (e *Entry) checkLatenessForForwardedMetric(
currNanos, metricTimeNanos int64,
metric aggregated.ForwardedMetric,
currNanos int64,
resolution time.Duration,
numForwardedTimes int,
) error {
metricTimeNanos := metric.TimeNanos
maxAllowedForwardingDelayFn := e.opts.MaxAllowedForwardingDelayFn()
maxLatenessAllowed := maxAllowedForwardingDelayFn(resolution, numForwardedTimes)
if currNanos-metricTimeNanos <= maxLatenessAllowed.Nanoseconds() {
return nil
}

e.metrics.forwarded.arrivedTooLate.Inc(1)
return errArrivedTooLate

if !e.opts.VerboseErrors() {
// Don't return verbose errors if not enabled.
return errArrivedTooLate
}

timestamp := time.Unix(0, metricTimeNanos)
pastLimit := time.Unix(0, currNanos-maxLatenessAllowed.Nanoseconds())
err := fmt.Errorf("datapoint for aggregation forwarded too late: "+
"id=%s, off_by=%s, timestamp=%s, past_limit=%s, "+
"timestamp_unix_nanos=%d, past_limit_unix_nanos=%d",
metric.ID, maxLatenessAllowed.String(),
timestamp.Format(errTimestampFormat),
pastLimit.Format(errTimestampFormat),
timestamp.UnixNano(), pastLimit.UnixNano())
return xerrors.NewRenamedError(errArrivedTooLate, err)
}

func (e *Entry) updateForwardMetadataWithLock(
Expand Down
Loading

0 comments on commit 75c2664

Please sign in to comment.