From 89e17a06573366fb25b0375362cc8f4684754888 Mon Sep 17 00:00:00 2001 From: Rob Skillington Date: Thu, 13 Jun 2019 21:33:31 +1000 Subject: [PATCH 1/3] Add aggregation specific errors --- src/aggregator/aggregator/aggregator.go | 13 +++-- src/aggregator/aggregator/entry.go | 47 ++++++++++++++++--- src/aggregator/aggregator/entry_test.go | 13 +++-- .../m3coordinator/downsample/options.go | 1 + .../m3coordinator/downsample/options_test.go | 5 +- 5 files changed, 62 insertions(+), 17 deletions(-) diff --git a/src/aggregator/aggregator/aggregator.go b/src/aggregator/aggregator/aggregator.go index 85ba06cabb..03c9ac2fc1 100644 --- a/src/aggregator/aggregator/aggregator.go +++ b/src/aggregator/aggregator/aggregator.go @@ -40,6 +40,7 @@ import ( "github.com/m3db/m3/src/metrics/metric/id" "github.com/m3db/m3/src/metrics/metric/unaggregated" "github.com/m3db/m3/src/x/clock" + xerrors "github.com/m3db/m3/src/x/errors" "github.com/m3db/m3/src/x/instrument" "github.com/uber-go/tally" @@ -706,10 +707,12 @@ func newAggregatorAddTimedMetrics( } func (m *aggregatorAddTimedMetrics) ReportError(err error) { - switch err { - case errTooFarInTheFuture: + switch { + case err == errTooFarInTheFuture || + xerrors.InnerError(err) == errTooFarInTheFuture: m.tooFarInTheFuture.Inc(1) - case errTooFarInThePast: + case err == errTooFarInThePast || + xerrors.InnerError(err) == errTooFarInThePast: m.tooFarInThePast.Inc(1) default: m.aggregatorAddMetricMetrics.ReportError(err) @@ -736,8 +739,8 @@ func newAggregatorAddForwardedMetrics( maxAllowedForwardingDelayFn MaxAllowedForwardingDelayFn, ) aggregatorAddForwardedMetrics { return aggregatorAddForwardedMetrics{ - aggregatorAddMetricMetrics: newAggregatorAddMetricMetrics(scope, samplingRate), - scope: scope, + aggregatorAddMetricMetrics: newAggregatorAddMetricMetrics(scope, samplingRate), + scope: scope, maxAllowedForwardingDelayFn: maxAllowedForwardingDelayFn, forwardingLatency: make(map[latencyBucketKey]tally.Histogram), } diff --git a/src/aggregator/aggregator/entry.go b/src/aggregator/aggregator/entry.go index fa5a11d439..c377d7ee4a 100644 --- a/src/aggregator/aggregator/entry.go +++ b/src/aggregator/aggregator/entry.go @@ -23,6 +23,7 @@ package aggregator import ( "container/list" "errors" + "fmt" "sync" "sync/atomic" "time" @@ -57,6 +58,7 @@ var ( errTooFarInTheFuture = xerrors.NewInvalidParamsError(errors.New("too far in the future")) errTooFarInThePast = xerrors.NewInvalidParamsError(errors.New("too far in the past")) errArrivedTooLate = xerrors.NewInvalidParamsError(errors.New("arrived too late")) + errTimestampFormat = time.RFC822Z ) type rateLimitEntryMetrics struct { @@ -640,8 +642,8 @@ func (e *Entry) addTimed( // Reject datapoints that arrive too late or too early. if err := e.checkTimestampForTimedMetric( + metric, currTime.UnixNano(), - metric.TimeNanos, metadata.StoragePolicy.Resolution().Window, ); err != nil { e.RUnlock() @@ -691,19 +693,39 @@ func (e *Entry) addTimed( } func (e *Entry) checkTimestampForTimedMetric( - currNanos, metricTimeNanos int64, + metric aggregated.Metric, + currNanos int64, resolution time.Duration, ) error { + metricTimeNanos := metric.TimeNanos timedBufferFuture := e.opts.BufferForFutureTimedMetric() if metricTimeNanos-currNanos > timedBufferFuture.Nanoseconds() { e.metrics.timed.tooFarInTheFuture.Inc(1) - return errTooFarInTheFuture + timestamp := time.Unix(0, metricTimeNanos) + futureLimit := time.Unix(0, currNanos+timedBufferFuture.Nanoseconds()) + err := fmt.Errorf("datapoint for aggregation too far in future: "+ + "id=%s, off_by=%s, timestamp=%s, future_limit=%s, "+ + "timestamp_unix_nanos=%d, future_limit_unix_nanos=%d", + metric.ID, timestamp.Sub(futureLimit).String(), + timestamp.Format(errTimestampFormat), + futureLimit.Format(errTimestampFormat), + timestamp.UnixNano(), futureLimit.UnixNano()) + return xerrors.NewRenamedError(errTooFarInTheFuture, err) } bufferPastFn := e.opts.BufferForPastTimedMetricFn() timedBufferPast := bufferPastFn(resolution) if currNanos-metricTimeNanos > timedBufferPast.Nanoseconds() { e.metrics.timed.tooFarInThePast.Inc(1) - return errTooFarInThePast + timestamp := time.Unix(0, metricTimeNanos) + pastLimit := time.Unix(0, currNanos-timedBufferPast.Nanoseconds()) + err := fmt.Errorf("datapoint for aggregation too far in past: "+ + "id=%s, off_by=%s, timestamp=%s, past_limit=%s, "+ + "timestamp_unix_nanos=%d, past_limit_unix_nanos=%d", + metric.ID, pastLimit.Sub(timestamp).String(), + timestamp.Format(errTimestampFormat), + pastLimit.Format(errTimestampFormat), + timestamp.UnixNano(), pastLimit.UnixNano()) + return xerrors.NewRenamedError(errTooFarInThePast, err) } return nil } @@ -769,8 +791,8 @@ func (e *Entry) addForwarded( // Reject datapoints that arrive too late. if err := e.checkLatenessForForwardedMetric( + metric, currTime.UnixNano(), - metric.TimeNanos, metadata.StoragePolicy.Resolution().Window, metadata.NumForwardedTimes, ); err != nil { @@ -823,17 +845,28 @@ func (e *Entry) addForwarded( } func (e *Entry) checkLatenessForForwardedMetric( - currNanos, metricTimeNanos int64, + metric aggregated.ForwardedMetric, + currNanos int64, resolution time.Duration, numForwardedTimes int, ) error { + metricTimeNanos := metric.TimeNanos maxAllowedForwardingDelayFn := e.opts.MaxAllowedForwardingDelayFn() maxLatenessAllowed := maxAllowedForwardingDelayFn(resolution, numForwardedTimes) if currNanos-metricTimeNanos <= maxLatenessAllowed.Nanoseconds() { return nil } e.metrics.forwarded.arrivedTooLate.Inc(1) - return errArrivedTooLate + timestamp := time.Unix(0, metricTimeNanos) + pastLimit := time.Unix(0, currNanos-maxLatenessAllowed.Nanoseconds()) + err := fmt.Errorf("datapoint for aggregation forwarded too late: "+ + "id=%s, off_by=%s, timestamp=%s, future_limit=%s, "+ + "timestamp_unix_nanos=%d, future_limit_unix_nanos=%d", + metric.ID, maxLatenessAllowed.String(), + timestamp.Format(errTimestampFormat), + pastLimit.Format(errTimestampFormat), + timestamp.UnixNano(), pastLimit.UnixNano()) + return xerrors.NewRenamedError(errArrivedTooLate, err) } func (e *Entry) updateForwardMetadataWithLock( diff --git a/src/aggregator/aggregator/entry_test.go b/src/aggregator/aggregator/entry_test.go index 44ebf05fb3..5e9623346a 100644 --- a/src/aggregator/aggregator/entry_test.go +++ b/src/aggregator/aggregator/entry_test.go @@ -39,6 +39,7 @@ import ( "github.com/m3db/m3/src/metrics/policy" "github.com/m3db/m3/src/metrics/transformation" "github.com/m3db/m3/src/x/clock" + xerrors "github.com/m3db/m3/src/x/errors" "github.com/m3db/m3/src/x/pool" xtime "github.com/m3db/m3/src/x/time" @@ -1265,7 +1266,9 @@ func TestEntryAddTimedMetricTooLate(t *testing.T) { for _, input := range inputs { metric := testTimedMetric metric.TimeNanos = input.timeNanos - require.Equal(t, errTooFarInThePast, e.AddTimed(metric, metadata.TimedMetadata{StoragePolicy: input.storagePolicy})) + err := e.AddTimed(metric, metadata.TimedMetadata{StoragePolicy: input.storagePolicy}) + require.Equal(t, errTooFarInThePast, xerrors.InnerError(err)) + require.True(t, xerrors.IsInvalidParams(err)) } } @@ -1293,7 +1296,9 @@ func TestEntryAddTimedMetricTooEarly(t *testing.T) { for _, input := range inputs { metric := testTimedMetric metric.TimeNanos = input.timeNanos - require.Equal(t, errTooFarInTheFuture, e.AddTimed(metric, metadata.TimedMetadata{StoragePolicy: input.storagePolicy})) + err := e.AddTimed(metric, metadata.TimedMetadata{StoragePolicy: input.storagePolicy}) + require.Equal(t, errTooFarInTheFuture, xerrors.InnerError(err)) + require.True(t, xerrors.IsInvalidParams(err)) } } @@ -1492,7 +1497,9 @@ func TestEntryAddForwardedMetricTooLate(t *testing.T) { metadata := testForwardMetadata metadata.StoragePolicy = input.storagePolicy metadata.NumForwardedTimes = input.numForwardedTimes - require.Equal(t, errArrivedTooLate, e.AddForwarded(metric, metadata)) + err := e.AddForwarded(metric, metadata) + require.Equal(t, errArrivedTooLate, xerrors.InnerError(err)) + require.True(t, xerrors.IsInvalidParams(err)) } } diff --git a/src/cmd/services/m3coordinator/downsample/options.go b/src/cmd/services/m3coordinator/downsample/options.go index 01c966b084..e6116bf95f 100644 --- a/src/cmd/services/m3coordinator/downsample/options.go +++ b/src/cmd/services/m3coordinator/downsample/options.go @@ -610,6 +610,7 @@ var ( {upperBound: 0, bufferPast: 15 * time.Second}, {upperBound: 30 * time.Second, bufferPast: 30 * time.Second}, {upperBound: time.Minute, bufferPast: time.Minute}, + {upperBound: 2 * time.Minute, bufferPast: 2 * time.Minute}, } ) diff --git a/src/cmd/services/m3coordinator/downsample/options_test.go b/src/cmd/services/m3coordinator/downsample/options_test.go index 8fe043a04e..99aa147079 100644 --- a/src/cmd/services/m3coordinator/downsample/options_test.go +++ b/src/cmd/services/m3coordinator/downsample/options_test.go @@ -40,8 +40,9 @@ func TestBufferForPastTimedMetric(t *testing.T) { {value: 30 * time.Second, expected: 30 * time.Second}, {value: 59 * time.Second, expected: 30 * time.Second}, {value: 60 * time.Second, expected: time.Minute}, - {value: 59 * time.Minute, expected: time.Minute}, - {value: 61 * time.Minute, expected: time.Minute}, + {value: 2 * time.Minute, expected: 2 * time.Minute}, + {value: 59 * time.Minute, expected: 2 * time.Minute}, + {value: 61 * time.Minute, expected: 2 * time.Minute}, } for _, test := range tests { t.Run(fmt.Sprintf("test_value_%v", test.value), func(t *testing.T) { From 93861267240de8d6857a6b6f9a1a641b6024c09a Mon Sep 17 00:00:00 2001 From: Rob Skillington Date: Fri, 14 Jun 2019 08:26:25 +1000 Subject: [PATCH 2/3] Set verbose errors behind config --- src/aggregator/aggregator/entry.go | 15 +++ src/aggregator/aggregator/entry_test.go | 99 ++++++++++++------- src/aggregator/aggregator/options.go | 41 +++++--- .../m3coordinator/downsample/options.go | 50 ++++++++-- .../m3coordinator/downsample/options_test.go | 3 +- 5 files changed, 152 insertions(+), 56 deletions(-) diff --git a/src/aggregator/aggregator/entry.go b/src/aggregator/aggregator/entry.go index c377d7ee4a..e268b7dc9c 100644 --- a/src/aggregator/aggregator/entry.go +++ b/src/aggregator/aggregator/entry.go @@ -701,6 +701,10 @@ func (e *Entry) checkTimestampForTimedMetric( timedBufferFuture := e.opts.BufferForFutureTimedMetric() if metricTimeNanos-currNanos > timedBufferFuture.Nanoseconds() { e.metrics.timed.tooFarInTheFuture.Inc(1) + if !e.opts.VerboseErrors() { + // Don't return verbose errors if not enabled. + return errTooFarInTheFuture + } timestamp := time.Unix(0, metricTimeNanos) futureLimit := time.Unix(0, currNanos+timedBufferFuture.Nanoseconds()) err := fmt.Errorf("datapoint for aggregation too far in future: "+ @@ -716,6 +720,10 @@ func (e *Entry) checkTimestampForTimedMetric( timedBufferPast := bufferPastFn(resolution) if currNanos-metricTimeNanos > timedBufferPast.Nanoseconds() { e.metrics.timed.tooFarInThePast.Inc(1) + if !e.opts.VerboseErrors() { + // Don't return verbose errors if not enabled. + return errTooFarInThePast + } timestamp := time.Unix(0, metricTimeNanos) pastLimit := time.Unix(0, currNanos-timedBufferPast.Nanoseconds()) err := fmt.Errorf("datapoint for aggregation too far in past: "+ @@ -856,7 +864,14 @@ func (e *Entry) checkLatenessForForwardedMetric( if currNanos-metricTimeNanos <= maxLatenessAllowed.Nanoseconds() { return nil } + e.metrics.forwarded.arrivedTooLate.Inc(1) + + if !e.opts.VerboseErrors() { + // Don't return verbose errors if not enabled. + return errArrivedTooLate + } + timestamp := time.Unix(0, metricTimeNanos) pastLimit := time.Unix(0, currNanos-maxLatenessAllowed.Nanoseconds()) err := fmt.Errorf("datapoint for aggregation forwarded too late: "+ diff --git a/src/aggregator/aggregator/entry_test.go b/src/aggregator/aggregator/entry_test.go index 5e9623346a..0d25154f8a 100644 --- a/src/aggregator/aggregator/entry_test.go +++ b/src/aggregator/aggregator/entry_test.go @@ -246,7 +246,7 @@ func TestEntryResetSetData(t *testing.T) { ctrl := gomock.NewController(t) defer ctrl.Finish() - e, lists, now := testEntry(ctrl) + e, lists, now := testEntry(ctrl, testEntryOptions{}) require.False(t, e.closed) require.Nil(t, e.rateLimiter) @@ -266,7 +266,7 @@ func TestEntryBatchTimerRateLimiting(t *testing.T) { ID: testBatchTimerID, BatchTimerVal: make([]float64, 1000), } - e, _, now := testEntry(ctrl) + e, _, now := testEntry(ctrl, testEntryOptions{}) // Reset runtime options to disable rate limiting. noRateLimitRuntimeOpts := runtime.NewOptions().SetWriteValuesPerMetricLimitPerSecond(0) @@ -297,7 +297,7 @@ func TestEntryCounterRateLimiting(t *testing.T) { ctrl := gomock.NewController(t) defer ctrl.Finish() - e, _, now := testEntry(ctrl) + e, _, now := testEntry(ctrl, testEntryOptions{}) // Reset runtime options to disable rate limiting. noRateLimitRuntimeOpts := runtime.NewOptions().SetWriteValuesPerMetricLimitPerSecond(0) @@ -345,7 +345,7 @@ func TestEntryAddBatchTimerWithPoolAlloc(t *testing.T) { BatchTimerVal: input, TimerValPool: timerValPool, } - e, _, _ := testEntry(ctrl) + e, _, _ := testEntry(ctrl, testEntryOptions{}) require.NoError(t, e.AddUntimed(bt, testDefaultStagedMetadatas)) // Assert the timer values have been returned to pool. @@ -357,7 +357,7 @@ func TestEntryAddBatchTimerWithTimerBatchSizeLimit(t *testing.T) { ctrl := gomock.NewController(t) defer ctrl.Finish() - e, _, now := testEntry(ctrl) + e, _, now := testEntry(ctrl, testEntryOptions{}) *now = time.Unix(105, 0) e.opts = e.opts. SetMaxTimerBatchSizePerWrite(2). @@ -385,7 +385,7 @@ func TestEntryAddBatchTimerWithTimerBatchSizeLimitError(t *testing.T) { ctrl := gomock.NewController(t) defer ctrl.Finish() - e, _, _ := testEntry(ctrl) + e, _, _ := testEntry(ctrl, testEntryOptions{}) e.opts = e.opts.SetMaxTimerBatchSizePerWrite(2) e.closed = true @@ -401,7 +401,7 @@ func TestEntryAddUntimedEmptyMetadatasError(t *testing.T) { ctrl := gomock.NewController(t) defer ctrl.Finish() - e, _, _ := testEntry(ctrl) + e, _, _ := testEntry(ctrl, testEntryOptions{}) inputMetadatas := metadata.StagedMetadatas{} require.Equal(t, errEmptyMetadatas, e.AddUntimed(testCounter, inputMetadatas)) } @@ -418,7 +418,7 @@ func TestEntryAddUntimedFutureMetadata(t *testing.T) { Metadata: metadata.Metadata{Pipelines: testPipelines}, }, } - e, _, now := testEntry(ctrl) + e, _, now := testEntry(ctrl, testEntryOptions{}) *now = time.Unix(0, nowNanos) require.Equal(t, errNoApplicableMetadata, e.AddUntimed(testCounter, inputMetadatas)) } @@ -434,7 +434,7 @@ func TestEntryAddUntimedNoPipelinesInMetadata(t *testing.T) { Tombstoned: false, }, } - e, _, now := testEntry(ctrl) + e, _, now := testEntry(ctrl, testEntryOptions{}) *now = time.Unix(0, nowNanos) require.Equal(t, errNoPipelinesInMetadata, e.AddUntimed(testCounter, inputMetadatas)) } @@ -443,7 +443,7 @@ func TestEntryAddUntimedInvalidMetricError(t *testing.T) { ctrl := gomock.NewController(t) defer ctrl.Finish() - e, _, _ := testEntry(ctrl) + e, _, _ := testEntry(ctrl, testEntryOptions{}) require.Error(t, e.AddUntimed(testInvalidMetric, metadata.DefaultStagedMetadatas)) } @@ -451,7 +451,7 @@ func TestEntryAddUntimedClosedEntryError(t *testing.T) { ctrl := gomock.NewController(t) defer ctrl.Finish() - e, _, _ := testEntry(ctrl) + e, _, _ := testEntry(ctrl, testEntryOptions{}) e.closed = true require.Equal(t, errEntryClosed, e.AddUntimed(testCounter, metadata.DefaultStagedMetadatas)) } @@ -460,7 +460,7 @@ func TestEntryAddUntimedClosedListError(t *testing.T) { ctrl := gomock.NewController(t) defer ctrl.Finish() - e, lists, _ := testEntry(ctrl) + e, lists, _ := testEntry(ctrl, testEntryOptions{}) e.closed = false lists.closed = true require.Error(t, e.AddUntimed(testCounter, metadata.DefaultStagedMetadatas)) @@ -979,7 +979,7 @@ func TestEntryAddUntimedWithInvalidAggregationType(t *testing.T) { } for _, input := range inputs { - e, _, _ := testEntry(ctrl) + e, _, _ := testEntry(ctrl, testEntryOptions{}) metadatas := metadata.StagedMetadatas{ { Metadata: metadata.Metadata{ @@ -1011,7 +1011,7 @@ func TestEntryAddUntimedWithInvalidPipeline(t *testing.T) { }, }), } - e, _, _ := testEntry(ctrl) + e, _, _ := testEntry(ctrl, testEntryOptions{}) metadatas := metadata.StagedMetadatas{ {Metadata: metadata.Metadata{Pipelines: []metadata.PipelineMetadata{invalidPipeline}}}, } @@ -1153,7 +1153,7 @@ func TestShouldUpdateStagedMetadataWithLock(t *testing.T) { } for _, input := range inputs { - e, _, _ := testEntry(ctrl) + e, _, _ := testEntry(ctrl, testEntryOptions{}) e.cutoverNanos = input.cutoverNanos populateTestUntimedAggregations(t, e, input.aggregationKeys, metric.CounterType) e.Lock() @@ -1188,7 +1188,7 @@ func TestEntryStoragePolicies(t *testing.T) { }, } - e, _, _ := testEntry(ctrl) + e, _, _ := testEntry(ctrl, testEntryOptions{}) for _, input := range inputs { require.Equal(t, input.expected, e.storagePolicies(input.policies)) } @@ -1198,7 +1198,7 @@ func TestEntryTimedRateLimiting(t *testing.T) { ctrl := gomock.NewController(t) defer ctrl.Finish() - e, _, now := testEntry(ctrl) + e, _, now := testEntry(ctrl, testEntryOptions{}) // Reset runtime options to disable rate limiting. noRateLimitRuntimeOpts := runtime.NewOptions().SetWriteValuesPerMetricLimitPerSecond(0) @@ -1232,7 +1232,7 @@ func TestEntryAddTimedEntryClosed(t *testing.T) { ctrl := gomock.NewController(t) defer ctrl.Finish() - e, _, _ := testEntry(ctrl) + e, _, _ := testEntry(ctrl, testEntryOptions{}) e.closed = true require.Equal(t, errEntryClosed, e.AddTimed(testTimedMetric, testTimedMetadata)) } @@ -1246,7 +1246,9 @@ func TestEntryAddTimedMetricTooLate(t *testing.T) { ) time.Duration { return resolution + time.Second } - e, _, now := testEntry(ctrl) + e, _, now := testEntry(ctrl, testEntryOptions{ + options: testOptions(ctrl).SetVerboseErrors(true), + }) e.opts = e.opts.SetBufferForPastTimedMetricFn(timedAggregationBufferPastFn) inputs := []struct { @@ -1267,8 +1269,12 @@ func TestEntryAddTimedMetricTooLate(t *testing.T) { metric := testTimedMetric metric.TimeNanos = input.timeNanos err := e.AddTimed(metric, metadata.TimedMetadata{StoragePolicy: input.storagePolicy}) - require.Equal(t, errTooFarInThePast, xerrors.InnerError(err)) require.True(t, xerrors.IsInvalidParams(err)) + require.Equal(t, errTooFarInThePast, xerrors.InnerError(err)) + require.True(t, strings.Contains(err.Error(), "datapoint for aggregation too far in past")) + require.True(t, strings.Contains(err.Error(), "id="+string(metric.ID))) + require.True(t, strings.Contains(err.Error(), "timestamp=")) + require.True(t, strings.Contains(err.Error(), "past_limit=")) } } @@ -1276,7 +1282,9 @@ func TestEntryAddTimedMetricTooEarly(t *testing.T) { ctrl := gomock.NewController(t) defer ctrl.Finish() - e, _, now := testEntry(ctrl) + e, _, now := testEntry(ctrl, testEntryOptions{ + options: testOptions(ctrl).SetVerboseErrors(true), + }) e.opts = e.opts.SetBufferForFutureTimedMetric(time.Second) inputs := []struct { @@ -1297,8 +1305,12 @@ func TestEntryAddTimedMetricTooEarly(t *testing.T) { metric := testTimedMetric metric.TimeNanos = input.timeNanos err := e.AddTimed(metric, metadata.TimedMetadata{StoragePolicy: input.storagePolicy}) - require.Equal(t, errTooFarInTheFuture, xerrors.InnerError(err)) require.True(t, xerrors.IsInvalidParams(err)) + require.Equal(t, errTooFarInTheFuture, xerrors.InnerError(err)) + require.True(t, strings.Contains(err.Error(), "datapoint for aggregation too far in future")) + require.True(t, strings.Contains(err.Error(), "id="+string(metric.ID))) + require.True(t, strings.Contains(err.Error(), "timestamp=")) + require.True(t, strings.Contains(err.Error(), "future_limit=")) } } @@ -1306,7 +1318,7 @@ func TestEntryAddTimed(t *testing.T) { ctrl := gomock.NewController(t) defer ctrl.Finish() - e, lists, _ := testEntry(ctrl) + e, lists, _ := testEntry(ctrl, testEntryOptions{}) // Add an initial timed metric. require.NoError(t, e.AddTimed(testTimedMetric, testTimedMetadata)) @@ -1418,7 +1430,7 @@ func TestEntryForwardedRateLimiting(t *testing.T) { ctrl := gomock.NewController(t) defer ctrl.Finish() - e, _, now := testEntry(ctrl) + e, _, now := testEntry(ctrl, testEntryOptions{}) // Reset runtime options to disable rate limiting. noRateLimitRuntimeOpts := runtime.NewOptions().SetWriteValuesPerMetricLimitPerSecond(0) @@ -1452,7 +1464,7 @@ func TestEntryAddForwardedEntryClosed(t *testing.T) { ctrl := gomock.NewController(t) defer ctrl.Finish() - e, _, _ := testEntry(ctrl) + e, _, _ := testEntry(ctrl, testEntryOptions{}) e.closed = true require.Equal(t, errEntryClosed, e.AddForwarded(testForwardedMetric, testForwardMetadata)) } @@ -1467,7 +1479,9 @@ func TestEntryAddForwardedMetricTooLate(t *testing.T) { ) time.Duration { return resolution + time.Second*time.Duration(numForwardedTimes) } - e, _, now := testEntry(ctrl) + e, _, now := testEntry(ctrl, testEntryOptions{ + options: testOptions(ctrl).SetVerboseErrors(true), + }) e.opts = e.opts.SetMaxAllowedForwardingDelayFn(maxAllowedForwardingDelayFn) inputs := []struct { @@ -1498,8 +1512,12 @@ func TestEntryAddForwardedMetricTooLate(t *testing.T) { metadata.StoragePolicy = input.storagePolicy metadata.NumForwardedTimes = input.numForwardedTimes err := e.AddForwarded(metric, metadata) - require.Equal(t, errArrivedTooLate, xerrors.InnerError(err)) require.True(t, xerrors.IsInvalidParams(err)) + require.Equal(t, errArrivedTooLate, xerrors.InnerError(err)) + require.True(t, strings.Contains(err.Error(), "datapoint for aggregation forwarded too late")) + require.True(t, strings.Contains(err.Error(), "id="+string(metric.ID))) + require.True(t, strings.Contains(err.Error(), "timestamp=")) + require.True(t, strings.Contains(err.Error(), "past_limit=")) } } @@ -1507,7 +1525,7 @@ func TestEntryAddForwarded(t *testing.T) { ctrl := gomock.NewController(t) defer ctrl.Finish() - e, lists, _ := testEntry(ctrl) + e, lists, _ := testEntry(ctrl, testEntryOptions{}) // Add an initial forwarded metric. require.NoError(t, e.AddForwarded(testForwardedMetric, testForwardMetadata1)) @@ -1636,7 +1654,7 @@ func TestEntryMaybeExpireNoExpiry(t *testing.T) { ctrl := gomock.NewController(t) defer ctrl.Finish() - e, _, now := testEntry(ctrl) + e, _, now := testEntry(ctrl, testEntryOptions{}) // If we are still within entry TTL, should not expire. require.False(t, e.ShouldExpire(now.Add(e.opts.EntryTTL()).Add(-time.Second))) @@ -1655,7 +1673,7 @@ func TestEntryMaybeExpireWithExpiry(t *testing.T) { ctrl := gomock.NewController(t) defer ctrl.Finish() - e, _, now := testEntry(ctrl) + e, _, now := testEntry(ctrl, testEntryOptions{}) populateTestUntimedAggregations(t, e, testAggregationKeys, metric.CounterType) var elems []*CounterElem @@ -1684,7 +1702,7 @@ func TestEntryMaybeCopyIDWithLock(t *testing.T) { defer ctrl.Finish() id := id.RawID("foo") - e, _, _ := testEntry(ctrl) + e, _, _ := testEntry(ctrl, testEntryOptions{}) res := e.maybeCopyIDWithLock(id) require.Equal(t, id, res) @@ -1772,12 +1790,25 @@ func TestAggregationValues(t *testing.T) { } } -func testEntry(ctrl *gomock.Controller) (*Entry, *metricLists, *time.Time) { +type testEntryOptions struct { + options Options +} + +func testEntry( + ctrl *gomock.Controller, + testOpts testEntryOptions, +) (*Entry, *metricLists, *time.Time) { now := time.Now() clockOpts := clock.NewOptions().SetNowFn(func() time.Time { return now }) - opts := testOptions(ctrl). + + opts := testOpts.options + if opts == nil { + opts = testOptions(ctrl) + } + + opts = opts. SetClockOptions(clockOpts). SetDefaultStoragePolicies(testDefaultStoragePolicies) @@ -1912,7 +1943,7 @@ func testEntryAddUntimed( } for _, input := range inputs { - e, _, now := testEntry(ctrl) + e, _, now := testEntry(ctrl, testEntryOptions{}) if withPrePopulation { populateTestUntimedAggregations(t, e, prePopulateData, input.mu.Type) } diff --git a/src/aggregator/aggregator/options.go b/src/aggregator/aggregator/options.go index d608bcfdbe..2bef24cb12 100644 --- a/src/aggregator/aggregator/options.go +++ b/src/aggregator/aggregator/options.go @@ -53,6 +53,7 @@ var ( policy.NewStoragePolicy(10*time.Second, xtime.Second, 2*24*time.Hour), policy.NewStoragePolicy(time.Minute, xtime.Minute, 40*24*time.Hour), } + defaultVerboseErrors = false defaultTimedMetricBuffer = time.Minute @@ -300,6 +301,12 @@ type Options interface { // FullGaugePrefix returns the full prefix for gauges. FullGaugePrefix() []byte + + // SetVerboseErrors returns whether to return verbose errors or not. + SetVerboseErrors(value bool) Options + + // VerboseErrors returns whether to return verbose errors or not. + VerboseErrors() bool } type options struct { @@ -338,6 +345,7 @@ type options struct { counterElemPool CounterElemPool timerElemPool TimerElemPool gaugeElemPool GaugeElemPool + verboseErrors bool // Derived options. fullCounterPrefix []byte @@ -353,17 +361,17 @@ func NewOptions() Options { SetTimerTypeStringTransformFn(aggregation.SuffixTransform). SetGaugeTypeStringTransformFn(aggregation.EmptyTransform) o := &options{ - aggTypesOptions: aggTypesOptions, - metricPrefix: defaultMetricPrefix, - counterPrefix: defaultCounterPrefix, - timerPrefix: defaultTimerPrefix, - gaugePrefix: defaultGaugePrefix, - timeLock: &sync.RWMutex{}, - clockOpts: clock.NewOptions(), - instrumentOpts: instrument.NewOptions(), - streamOpts: cm.NewOptions(), - runtimeOptsManager: runtime.NewOptionsManager(runtime.NewOptions()), - shardFn: sharding.Murmur32Hash.MustShardFn(), + aggTypesOptions: aggTypesOptions, + metricPrefix: defaultMetricPrefix, + counterPrefix: defaultCounterPrefix, + timerPrefix: defaultTimerPrefix, + gaugePrefix: defaultGaugePrefix, + timeLock: &sync.RWMutex{}, + clockOpts: clock.NewOptions(), + instrumentOpts: instrument.NewOptions(), + streamOpts: cm.NewOptions(), + runtimeOptsManager: runtime.NewOptionsManager(runtime.NewOptions()), + shardFn: sharding.Murmur32Hash.MustShardFn(), bufferDurationBeforeShardCutover: defaultBufferDurationBeforeShardCutover, bufferDurationAfterShardCutoff: defaultBufferDurationAfterShardCutoff, entryTTL: defaultEntryTTL, @@ -377,6 +385,7 @@ func NewOptions() Options { bufferForFutureTimedMetric: defaultTimedMetricBuffer, maxNumCachedSourceSets: defaultMaxNumCachedSourceSets, discardNaNAggregatedValues: defaultDiscardNaNAggregatedValues, + verboseErrors: defaultVerboseErrors, } // Initialize pools. @@ -732,6 +741,16 @@ func (o *options) GaugeElemPool() GaugeElemPool { return o.gaugeElemPool } +func (o *options) SetVerboseErrors(value bool) Options { + opts := *o + opts.verboseErrors = value + return &opts +} + +func (o *options) VerboseErrors() bool { + return o.verboseErrors +} + func (o *options) FullCounterPrefix() []byte { return o.fullCounterPrefix } diff --git a/src/cmd/services/m3coordinator/downsample/options.go b/src/cmd/services/m3coordinator/downsample/options.go index e6116bf95f..934c0290ea 100644 --- a/src/cmd/services/m3coordinator/downsample/options.go +++ b/src/cmd/services/m3coordinator/downsample/options.go @@ -62,6 +62,7 @@ const ( defaultStorageFlushConcurrency = 20000 defaultOpenTimeout = 10 * time.Second defaultBufferFutureTimedMetric = time.Minute + defaultVerboseErrors = true ) var ( @@ -185,6 +186,9 @@ type Configuration struct { // Pool of gauge elements. GaugeElemPool pool.ObjectPoolConfiguration `yaml:"gaugeElemPool"` + + // BufferPastLimits specifies the buffer past limits. + BufferPastLimits []BufferPastLimitConfiguration `yaml:"bufferPastLimits"` } // RemoteAggregatorConfiguration specifies a remote aggregator @@ -207,6 +211,13 @@ func (c RemoteAggregatorConfiguration) newClient( return c.Client.NewClient(kvClient, clockOpts, instrumentOpts) } +// BufferPastLimitConfiguration specifies a custom buffer past limit +// for aggregation tiles. +type BufferPastLimitConfiguration struct { + Resolution time.Duration `yaml:"resolution"` + BufferPast time.Duration `yaml:"bufferPast"` +} + // NewDownsampler returns a new downsampler. func (cfg Configuration) NewDownsampler( opts DownsamplerOptions, @@ -316,7 +327,23 @@ func (cfg Configuration) newAggregator(o DownsamplerOptions) (agg, error) { placementManager, flushTimesManager, electionManager, instrumentOpts, storageFlushConcurrency, pools) - // Finally construct all options + bufferPastLimits := defaultBufferPastLimits + if numLimitsCfg := len(cfg.BufferPastLimits); numLimitsCfg > 0 { + // Allow overrides from config. + bufferPastLimits = make([]bufferPastLimit, 0, numLimitsCfg) + for _, limit := range cfg.BufferPastLimits { + bufferPastLimits = append(bufferPastLimits, bufferPastLimit{ + upperBound: limit.Resolution, + bufferPast: limit.BufferPast, + }) + } + } + + bufferForPastTimedMetricFn := func(tile time.Duration) time.Duration { + return bufferForPastTimedMetric(bufferPastLimits, tile) + } + + // Finally construct all options. aggregatorOpts := aggregator.NewOptions(). SetClockOptions(clockOpts). SetInstrumentOptions(instrumentOpts). @@ -330,8 +357,9 @@ func (cfg Configuration) newAggregator(o DownsamplerOptions) (agg, error) { SetElectionManager(electionManager). SetFlushManager(flushManager). SetFlushHandler(flushHandler). - SetBufferForPastTimedMetricFn(bufferForPastTimedMetric). - SetBufferForFutureTimedMetric(defaultBufferFutureTimedMetric) + SetBufferForPastTimedMetricFn(bufferForPastTimedMetricFn). + SetBufferForFutureTimedMetric(defaultBufferFutureTimedMetric). + SetVerboseErrors(defaultVerboseErrors) if cfg.AggregationTypes != nil { aggTypeOpts, err := cfg.AggregationTypes.NewOptions(instrumentOpts) @@ -602,11 +630,13 @@ func (o DownsamplerOptions) newAggregatorFlushManagerAndHandler( return flushManager, handler } +type bufferPastLimit struct { + upperBound time.Duration + bufferPast time.Duration +} + var ( - bufferPastLimits = []struct { - upperBound time.Duration - bufferPast time.Duration - }{ + defaultBufferPastLimits = []bufferPastLimit{ {upperBound: 0, bufferPast: 15 * time.Second}, {upperBound: 30 * time.Second, bufferPast: 30 * time.Second}, {upperBound: time.Minute, bufferPast: time.Minute}, @@ -614,9 +644,9 @@ var ( } ) -func bufferForPastTimedMetric(tile time.Duration) time.Duration { - bufferPast := bufferPastLimits[0].bufferPast - for _, limit := range bufferPastLimits { +func bufferForPastTimedMetric(limits []bufferPastLimit, tile time.Duration) time.Duration { + bufferPast := limits[0].bufferPast + for _, limit := range limits { if tile < limit.upperBound { return bufferPast } diff --git a/src/cmd/services/m3coordinator/downsample/options_test.go b/src/cmd/services/m3coordinator/downsample/options_test.go index 99aa147079..bf8838fd0e 100644 --- a/src/cmd/services/m3coordinator/downsample/options_test.go +++ b/src/cmd/services/m3coordinator/downsample/options_test.go @@ -29,6 +29,7 @@ import ( ) func TestBufferForPastTimedMetric(t *testing.T) { + limits := defaultBufferPastLimits tests := []struct { value time.Duration expected time.Duration @@ -46,7 +47,7 @@ func TestBufferForPastTimedMetric(t *testing.T) { } for _, test := range tests { t.Run(fmt.Sprintf("test_value_%v", test.value), func(t *testing.T) { - result := bufferForPastTimedMetric(test.value) + result := bufferForPastTimedMetric(limits, test.value) require.Equal(t, test.expected, result) }) } From 6be284d5c246de643ef16ebb4bfe4b0694abf4de Mon Sep 17 00:00:00 2001 From: Rob Skillington Date: Fri, 14 Jun 2019 09:43:04 +1000 Subject: [PATCH 3/3] Specify past_limit instead of future_limit --- src/aggregator/aggregator/entry.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/aggregator/aggregator/entry.go b/src/aggregator/aggregator/entry.go index e268b7dc9c..4729dddba3 100644 --- a/src/aggregator/aggregator/entry.go +++ b/src/aggregator/aggregator/entry.go @@ -875,8 +875,8 @@ func (e *Entry) checkLatenessForForwardedMetric( timestamp := time.Unix(0, metricTimeNanos) pastLimit := time.Unix(0, currNanos-maxLatenessAllowed.Nanoseconds()) err := fmt.Errorf("datapoint for aggregation forwarded too late: "+ - "id=%s, off_by=%s, timestamp=%s, future_limit=%s, "+ - "timestamp_unix_nanos=%d, future_limit_unix_nanos=%d", + "id=%s, off_by=%s, timestamp=%s, past_limit=%s, "+ + "timestamp_unix_nanos=%d, past_limit_unix_nanos=%d", metric.ID, maxLatenessAllowed.String(), timestamp.Format(errTimestampFormat), pastLimit.Format(errTimestampFormat),