From 75c2664d50aa3d624c2c14cfe51f07e115f292fb Mon Sep 17 00:00:00 2001
From: Rob Skillington <rob.skillington@gmail.com>
Date: Fri, 14 Jun 2019 10:38:41 +1000
Subject: [PATCH] Add aggregation specific errors (#1731)

---
 src/aggregator/aggregator/aggregator.go       |  13 ++-
 src/aggregator/aggregator/entry.go            |  62 ++++++++--
 src/aggregator/aggregator/entry_test.go       | 106 ++++++++++++------
 src/aggregator/aggregator/options.go          |  41 +++++--
 .../m3coordinator/downsample/options.go       |  51 +++++++--
 .../m3coordinator/downsample/options_test.go  |   8 +-
 6 files changed, 211 insertions(+), 70 deletions(-)

diff --git a/src/aggregator/aggregator/aggregator.go b/src/aggregator/aggregator/aggregator.go
index 85ba06cabb..03c9ac2fc1 100644
--- a/src/aggregator/aggregator/aggregator.go
+++ b/src/aggregator/aggregator/aggregator.go
@@ -40,6 +40,7 @@ import (
 	"github.com/m3db/m3/src/metrics/metric/id"
 	"github.com/m3db/m3/src/metrics/metric/unaggregated"
 	"github.com/m3db/m3/src/x/clock"
+	xerrors "github.com/m3db/m3/src/x/errors"
 	"github.com/m3db/m3/src/x/instrument"
 
 	"github.com/uber-go/tally"
@@ -706,10 +707,12 @@ func newAggregatorAddTimedMetrics(
 }
 
 func (m *aggregatorAddTimedMetrics) ReportError(err error) {
-	switch err {
-	case errTooFarInTheFuture:
+	switch {
+	case err == errTooFarInTheFuture ||
+		xerrors.InnerError(err) == errTooFarInTheFuture:
 		m.tooFarInTheFuture.Inc(1)
-	case errTooFarInThePast:
+	case err == errTooFarInThePast ||
+		xerrors.InnerError(err) == errTooFarInThePast:
 		m.tooFarInThePast.Inc(1)
 	default:
 		m.aggregatorAddMetricMetrics.ReportError(err)
@@ -736,8 +739,8 @@ func newAggregatorAddForwardedMetrics(
 	maxAllowedForwardingDelayFn MaxAllowedForwardingDelayFn,
 ) aggregatorAddForwardedMetrics {
 	return aggregatorAddForwardedMetrics{
-		aggregatorAddMetricMetrics: newAggregatorAddMetricMetrics(scope, samplingRate),
-		scope: scope,
+		aggregatorAddMetricMetrics:  newAggregatorAddMetricMetrics(scope, samplingRate),
+		scope:                       scope,
 		maxAllowedForwardingDelayFn: maxAllowedForwardingDelayFn,
 		forwardingLatency:           make(map[latencyBucketKey]tally.Histogram),
 	}
diff --git a/src/aggregator/aggregator/entry.go b/src/aggregator/aggregator/entry.go
index fa5a11d439..4729dddba3 100644
--- a/src/aggregator/aggregator/entry.go
+++ b/src/aggregator/aggregator/entry.go
@@ -23,6 +23,7 @@ package aggregator
 import (
 	"container/list"
 	"errors"
+	"fmt"
 	"sync"
 	"sync/atomic"
 	"time"
@@ -57,6 +58,7 @@ var (
 	errTooFarInTheFuture           = xerrors.NewInvalidParamsError(errors.New("too far in the future"))
 	errTooFarInThePast             = xerrors.NewInvalidParamsError(errors.New("too far in the past"))
 	errArrivedTooLate              = xerrors.NewInvalidParamsError(errors.New("arrived too late"))
+	errTimestampFormat             = time.RFC822Z
 )
 
 type rateLimitEntryMetrics struct {
@@ -640,8 +642,8 @@ func (e *Entry) addTimed(
 
 	// Reject datapoints that arrive too late or too early.
 	if err := e.checkTimestampForTimedMetric(
+		metric,
 		currTime.UnixNano(),
-		metric.TimeNanos,
 		metadata.StoragePolicy.Resolution().Window,
 	); err != nil {
 		e.RUnlock()
@@ -691,19 +693,47 @@ func (e *Entry) addTimed(
 }
 
 func (e *Entry) checkTimestampForTimedMetric(
-	currNanos, metricTimeNanos int64,
+	metric aggregated.Metric,
+	currNanos int64,
 	resolution time.Duration,
 ) error {
+	metricTimeNanos := metric.TimeNanos
 	timedBufferFuture := e.opts.BufferForFutureTimedMetric()
 	if metricTimeNanos-currNanos > timedBufferFuture.Nanoseconds() {
 		e.metrics.timed.tooFarInTheFuture.Inc(1)
-		return errTooFarInTheFuture
+		if !e.opts.VerboseErrors() {
+			// Don't return verbose errors if not enabled.
+			return errTooFarInTheFuture
+		}
+		timestamp := time.Unix(0, metricTimeNanos)
+		futureLimit := time.Unix(0, currNanos+timedBufferFuture.Nanoseconds())
+		err := fmt.Errorf("datapoint for aggregation too far in future: "+
+			"id=%s, off_by=%s, timestamp=%s, future_limit=%s, "+
+			"timestamp_unix_nanos=%d, future_limit_unix_nanos=%d",
+			metric.ID, timestamp.Sub(futureLimit).String(),
+			timestamp.Format(errTimestampFormat),
+			futureLimit.Format(errTimestampFormat),
+			timestamp.UnixNano(), futureLimit.UnixNano())
+		return xerrors.NewRenamedError(errTooFarInTheFuture, err)
 	}
 	bufferPastFn := e.opts.BufferForPastTimedMetricFn()
 	timedBufferPast := bufferPastFn(resolution)
 	if currNanos-metricTimeNanos > timedBufferPast.Nanoseconds() {
 		e.metrics.timed.tooFarInThePast.Inc(1)
-		return errTooFarInThePast
+		if !e.opts.VerboseErrors() {
+			// Don't return verbose errors if not enabled.
+			return errTooFarInThePast
+		}
+		timestamp := time.Unix(0, metricTimeNanos)
+		pastLimit := time.Unix(0, currNanos-timedBufferPast.Nanoseconds())
+		err := fmt.Errorf("datapoint for aggregation too far in past: "+
+			"id=%s, off_by=%s, timestamp=%s, past_limit=%s, "+
+			"timestamp_unix_nanos=%d, past_limit_unix_nanos=%d",
+			metric.ID, pastLimit.Sub(timestamp).String(),
+			timestamp.Format(errTimestampFormat),
+			pastLimit.Format(errTimestampFormat),
+			timestamp.UnixNano(), pastLimit.UnixNano())
+		return xerrors.NewRenamedError(errTooFarInThePast, err)
 	}
 	return nil
 }
@@ -769,8 +799,8 @@ func (e *Entry) addForwarded(
 
 	// Reject datapoints that arrive too late.
 	if err := e.checkLatenessForForwardedMetric(
+		metric,
 		currTime.UnixNano(),
-		metric.TimeNanos,
 		metadata.StoragePolicy.Resolution().Window,
 		metadata.NumForwardedTimes,
 	); err != nil {
@@ -823,17 +853,35 @@ func (e *Entry) addForwarded(
 }
 
 func (e *Entry) checkLatenessForForwardedMetric(
-	currNanos, metricTimeNanos int64,
+	metric aggregated.ForwardedMetric,
+	currNanos int64,
 	resolution time.Duration,
 	numForwardedTimes int,
 ) error {
+	metricTimeNanos := metric.TimeNanos
 	maxAllowedForwardingDelayFn := e.opts.MaxAllowedForwardingDelayFn()
 	maxLatenessAllowed := maxAllowedForwardingDelayFn(resolution, numForwardedTimes)
 	if currNanos-metricTimeNanos <= maxLatenessAllowed.Nanoseconds() {
 		return nil
 	}
+
 	e.metrics.forwarded.arrivedTooLate.Inc(1)
-	return errArrivedTooLate
+
+	if !e.opts.VerboseErrors() {
+		// Don't return verbose errors if not enabled.
+		return errArrivedTooLate
+	}
+
+	timestamp := time.Unix(0, metricTimeNanos)
+	pastLimit := time.Unix(0, currNanos-maxLatenessAllowed.Nanoseconds())
+	err := fmt.Errorf("datapoint for aggregation forwarded too late: "+
+		"id=%s, off_by=%s, timestamp=%s, past_limit=%s, "+
+		"timestamp_unix_nanos=%d, past_limit_unix_nanos=%d",
+		metric.ID, maxLatenessAllowed.String(),
+		timestamp.Format(errTimestampFormat),
+		pastLimit.Format(errTimestampFormat),
+		timestamp.UnixNano(), pastLimit.UnixNano())
+	return xerrors.NewRenamedError(errArrivedTooLate, err)
 }
 
 func (e *Entry) updateForwardMetadataWithLock(
diff --git a/src/aggregator/aggregator/entry_test.go b/src/aggregator/aggregator/entry_test.go
index 44ebf05fb3..0d25154f8a 100644
--- a/src/aggregator/aggregator/entry_test.go
+++ b/src/aggregator/aggregator/entry_test.go
@@ -39,6 +39,7 @@ import (
 	"github.com/m3db/m3/src/metrics/policy"
 	"github.com/m3db/m3/src/metrics/transformation"
 	"github.com/m3db/m3/src/x/clock"
+	xerrors "github.com/m3db/m3/src/x/errors"
 	"github.com/m3db/m3/src/x/pool"
 	xtime "github.com/m3db/m3/src/x/time"
 
@@ -245,7 +246,7 @@ func TestEntryResetSetData(t *testing.T) {
 	ctrl := gomock.NewController(t)
 	defer ctrl.Finish()
 
-	e, lists, now := testEntry(ctrl)
+	e, lists, now := testEntry(ctrl, testEntryOptions{})
 
 	require.False(t, e.closed)
 	require.Nil(t, e.rateLimiter)
@@ -265,7 +266,7 @@ func TestEntryBatchTimerRateLimiting(t *testing.T) {
 		ID:            testBatchTimerID,
 		BatchTimerVal: make([]float64, 1000),
 	}
-	e, _, now := testEntry(ctrl)
+	e, _, now := testEntry(ctrl, testEntryOptions{})
 
 	// Reset runtime options to disable rate limiting.
 	noRateLimitRuntimeOpts := runtime.NewOptions().SetWriteValuesPerMetricLimitPerSecond(0)
@@ -296,7 +297,7 @@ func TestEntryCounterRateLimiting(t *testing.T) {
 	ctrl := gomock.NewController(t)
 	defer ctrl.Finish()
 
-	e, _, now := testEntry(ctrl)
+	e, _, now := testEntry(ctrl, testEntryOptions{})
 
 	// Reset runtime options to disable rate limiting.
 	noRateLimitRuntimeOpts := runtime.NewOptions().SetWriteValuesPerMetricLimitPerSecond(0)
@@ -344,7 +345,7 @@ func TestEntryAddBatchTimerWithPoolAlloc(t *testing.T) {
 		BatchTimerVal: input,
 		TimerValPool:  timerValPool,
 	}
-	e, _, _ := testEntry(ctrl)
+	e, _, _ := testEntry(ctrl, testEntryOptions{})
 	require.NoError(t, e.AddUntimed(bt, testDefaultStagedMetadatas))
 
 	// Assert the timer values have been returned to pool.
@@ -356,7 +357,7 @@ func TestEntryAddBatchTimerWithTimerBatchSizeLimit(t *testing.T) {
 	ctrl := gomock.NewController(t)
 	defer ctrl.Finish()
 
-	e, _, now := testEntry(ctrl)
+	e, _, now := testEntry(ctrl, testEntryOptions{})
 	*now = time.Unix(105, 0)
 	e.opts = e.opts.
 		SetMaxTimerBatchSizePerWrite(2).
@@ -384,7 +385,7 @@ func TestEntryAddBatchTimerWithTimerBatchSizeLimitError(t *testing.T) {
 	ctrl := gomock.NewController(t)
 	defer ctrl.Finish()
 
-	e, _, _ := testEntry(ctrl)
+	e, _, _ := testEntry(ctrl, testEntryOptions{})
 	e.opts = e.opts.SetMaxTimerBatchSizePerWrite(2)
 	e.closed = true
 
@@ -400,7 +401,7 @@ func TestEntryAddUntimedEmptyMetadatasError(t *testing.T) {
 	ctrl := gomock.NewController(t)
 	defer ctrl.Finish()
 
-	e, _, _ := testEntry(ctrl)
+	e, _, _ := testEntry(ctrl, testEntryOptions{})
 	inputMetadatas := metadata.StagedMetadatas{}
 	require.Equal(t, errEmptyMetadatas, e.AddUntimed(testCounter, inputMetadatas))
 }
@@ -417,7 +418,7 @@ func TestEntryAddUntimedFutureMetadata(t *testing.T) {
 			Metadata:     metadata.Metadata{Pipelines: testPipelines},
 		},
 	}
-	e, _, now := testEntry(ctrl)
+	e, _, now := testEntry(ctrl, testEntryOptions{})
 	*now = time.Unix(0, nowNanos)
 	require.Equal(t, errNoApplicableMetadata, e.AddUntimed(testCounter, inputMetadatas))
 }
@@ -433,7 +434,7 @@ func TestEntryAddUntimedNoPipelinesInMetadata(t *testing.T) {
 			Tombstoned:   false,
 		},
 	}
-	e, _, now := testEntry(ctrl)
+	e, _, now := testEntry(ctrl, testEntryOptions{})
 	*now = time.Unix(0, nowNanos)
 	require.Equal(t, errNoPipelinesInMetadata, e.AddUntimed(testCounter, inputMetadatas))
 }
@@ -442,7 +443,7 @@ func TestEntryAddUntimedInvalidMetricError(t *testing.T) {
 	ctrl := gomock.NewController(t)
 	defer ctrl.Finish()
 
-	e, _, _ := testEntry(ctrl)
+	e, _, _ := testEntry(ctrl, testEntryOptions{})
 	require.Error(t, e.AddUntimed(testInvalidMetric, metadata.DefaultStagedMetadatas))
 }
 
@@ -450,7 +451,7 @@ func TestEntryAddUntimedClosedEntryError(t *testing.T) {
 	ctrl := gomock.NewController(t)
 	defer ctrl.Finish()
 
-	e, _, _ := testEntry(ctrl)
+	e, _, _ := testEntry(ctrl, testEntryOptions{})
 	e.closed = true
 	require.Equal(t, errEntryClosed, e.AddUntimed(testCounter, metadata.DefaultStagedMetadatas))
 }
@@ -459,7 +460,7 @@ func TestEntryAddUntimedClosedListError(t *testing.T) {
 	ctrl := gomock.NewController(t)
 	defer ctrl.Finish()
 
-	e, lists, _ := testEntry(ctrl)
+	e, lists, _ := testEntry(ctrl, testEntryOptions{})
 	e.closed = false
 	lists.closed = true
 	require.Error(t, e.AddUntimed(testCounter, metadata.DefaultStagedMetadatas))
@@ -978,7 +979,7 @@ func TestEntryAddUntimedWithInvalidAggregationType(t *testing.T) {
 	}
 
 	for _, input := range inputs {
-		e, _, _ := testEntry(ctrl)
+		e, _, _ := testEntry(ctrl, testEntryOptions{})
 		metadatas := metadata.StagedMetadatas{
 			{
 				Metadata: metadata.Metadata{
@@ -1010,7 +1011,7 @@ func TestEntryAddUntimedWithInvalidPipeline(t *testing.T) {
 			},
 		}),
 	}
-	e, _, _ := testEntry(ctrl)
+	e, _, _ := testEntry(ctrl, testEntryOptions{})
 	metadatas := metadata.StagedMetadatas{
 		{Metadata: metadata.Metadata{Pipelines: []metadata.PipelineMetadata{invalidPipeline}}},
 	}
@@ -1152,7 +1153,7 @@ func TestShouldUpdateStagedMetadataWithLock(t *testing.T) {
 	}
 
 	for _, input := range inputs {
-		e, _, _ := testEntry(ctrl)
+		e, _, _ := testEntry(ctrl, testEntryOptions{})
 		e.cutoverNanos = input.cutoverNanos
 		populateTestUntimedAggregations(t, e, input.aggregationKeys, metric.CounterType)
 		e.Lock()
@@ -1187,7 +1188,7 @@ func TestEntryStoragePolicies(t *testing.T) {
 		},
 	}
 
-	e, _, _ := testEntry(ctrl)
+	e, _, _ := testEntry(ctrl, testEntryOptions{})
 	for _, input := range inputs {
 		require.Equal(t, input.expected, e.storagePolicies(input.policies))
 	}
@@ -1197,7 +1198,7 @@ func TestEntryTimedRateLimiting(t *testing.T) {
 	ctrl := gomock.NewController(t)
 	defer ctrl.Finish()
 
-	e, _, now := testEntry(ctrl)
+	e, _, now := testEntry(ctrl, testEntryOptions{})
 
 	// Reset runtime options to disable rate limiting.
 	noRateLimitRuntimeOpts := runtime.NewOptions().SetWriteValuesPerMetricLimitPerSecond(0)
@@ -1231,7 +1232,7 @@ func TestEntryAddTimedEntryClosed(t *testing.T) {
 	ctrl := gomock.NewController(t)
 	defer ctrl.Finish()
 
-	e, _, _ := testEntry(ctrl)
+	e, _, _ := testEntry(ctrl, testEntryOptions{})
 	e.closed = true
 	require.Equal(t, errEntryClosed, e.AddTimed(testTimedMetric, testTimedMetadata))
 }
@@ -1245,7 +1246,9 @@ func TestEntryAddTimedMetricTooLate(t *testing.T) {
 	) time.Duration {
 		return resolution + time.Second
 	}
-	e, _, now := testEntry(ctrl)
+	e, _, now := testEntry(ctrl, testEntryOptions{
+		options: testOptions(ctrl).SetVerboseErrors(true),
+	})
 	e.opts = e.opts.SetBufferForPastTimedMetricFn(timedAggregationBufferPastFn)
 
 	inputs := []struct {
@@ -1265,7 +1268,13 @@ func TestEntryAddTimedMetricTooLate(t *testing.T) {
 	for _, input := range inputs {
 		metric := testTimedMetric
 		metric.TimeNanos = input.timeNanos
-		require.Equal(t, errTooFarInThePast, e.AddTimed(metric, metadata.TimedMetadata{StoragePolicy: input.storagePolicy}))
+		err := e.AddTimed(metric, metadata.TimedMetadata{StoragePolicy: input.storagePolicy})
+		require.True(t, xerrors.IsInvalidParams(err))
+		require.Equal(t, errTooFarInThePast, xerrors.InnerError(err))
+		require.True(t, strings.Contains(err.Error(), "datapoint for aggregation too far in past"))
+		require.True(t, strings.Contains(err.Error(), "id="+string(metric.ID)))
+		require.True(t, strings.Contains(err.Error(), "timestamp="))
+		require.True(t, strings.Contains(err.Error(), "past_limit="))
 	}
 }
 
@@ -1273,7 +1282,9 @@ func TestEntryAddTimedMetricTooEarly(t *testing.T) {
 	ctrl := gomock.NewController(t)
 	defer ctrl.Finish()
 
-	e, _, now := testEntry(ctrl)
+	e, _, now := testEntry(ctrl, testEntryOptions{
+		options: testOptions(ctrl).SetVerboseErrors(true),
+	})
 	e.opts = e.opts.SetBufferForFutureTimedMetric(time.Second)
 
 	inputs := []struct {
@@ -1293,7 +1304,13 @@ func TestEntryAddTimedMetricTooEarly(t *testing.T) {
 	for _, input := range inputs {
 		metric := testTimedMetric
 		metric.TimeNanos = input.timeNanos
-		require.Equal(t, errTooFarInTheFuture, e.AddTimed(metric, metadata.TimedMetadata{StoragePolicy: input.storagePolicy}))
+		err := e.AddTimed(metric, metadata.TimedMetadata{StoragePolicy: input.storagePolicy})
+		require.True(t, xerrors.IsInvalidParams(err))
+		require.Equal(t, errTooFarInTheFuture, xerrors.InnerError(err))
+		require.True(t, strings.Contains(err.Error(), "datapoint for aggregation too far in future"))
+		require.True(t, strings.Contains(err.Error(), "id="+string(metric.ID)))
+		require.True(t, strings.Contains(err.Error(), "timestamp="))
+		require.True(t, strings.Contains(err.Error(), "future_limit="))
 	}
 }
 
@@ -1301,7 +1318,7 @@ func TestEntryAddTimed(t *testing.T) {
 	ctrl := gomock.NewController(t)
 	defer ctrl.Finish()
 
-	e, lists, _ := testEntry(ctrl)
+	e, lists, _ := testEntry(ctrl, testEntryOptions{})
 
 	// Add an initial timed metric.
 	require.NoError(t, e.AddTimed(testTimedMetric, testTimedMetadata))
@@ -1413,7 +1430,7 @@ func TestEntryForwardedRateLimiting(t *testing.T) {
 	ctrl := gomock.NewController(t)
 	defer ctrl.Finish()
 
-	e, _, now := testEntry(ctrl)
+	e, _, now := testEntry(ctrl, testEntryOptions{})
 
 	// Reset runtime options to disable rate limiting.
 	noRateLimitRuntimeOpts := runtime.NewOptions().SetWriteValuesPerMetricLimitPerSecond(0)
@@ -1447,7 +1464,7 @@ func TestEntryAddForwardedEntryClosed(t *testing.T) {
 	ctrl := gomock.NewController(t)
 	defer ctrl.Finish()
 
-	e, _, _ := testEntry(ctrl)
+	e, _, _ := testEntry(ctrl, testEntryOptions{})
 	e.closed = true
 	require.Equal(t, errEntryClosed, e.AddForwarded(testForwardedMetric, testForwardMetadata))
 }
@@ -1462,7 +1479,9 @@ func TestEntryAddForwardedMetricTooLate(t *testing.T) {
 	) time.Duration {
 		return resolution + time.Second*time.Duration(numForwardedTimes)
 	}
-	e, _, now := testEntry(ctrl)
+	e, _, now := testEntry(ctrl, testEntryOptions{
+		options: testOptions(ctrl).SetVerboseErrors(true),
+	})
 	e.opts = e.opts.SetMaxAllowedForwardingDelayFn(maxAllowedForwardingDelayFn)
 
 	inputs := []struct {
@@ -1492,7 +1511,13 @@ func TestEntryAddForwardedMetricTooLate(t *testing.T) {
 		metadata := testForwardMetadata
 		metadata.StoragePolicy = input.storagePolicy
 		metadata.NumForwardedTimes = input.numForwardedTimes
-		require.Equal(t, errArrivedTooLate, e.AddForwarded(metric, metadata))
+		err := e.AddForwarded(metric, metadata)
+		require.True(t, xerrors.IsInvalidParams(err))
+		require.Equal(t, errArrivedTooLate, xerrors.InnerError(err))
+		require.True(t, strings.Contains(err.Error(), "datapoint for aggregation forwarded too late"))
+		require.True(t, strings.Contains(err.Error(), "id="+string(metric.ID)))
+		require.True(t, strings.Contains(err.Error(), "timestamp="))
+		require.True(t, strings.Contains(err.Error(), "past_limit="))
 	}
 }
 
@@ -1500,7 +1525,7 @@ func TestEntryAddForwarded(t *testing.T) {
 	ctrl := gomock.NewController(t)
 	defer ctrl.Finish()
 
-	e, lists, _ := testEntry(ctrl)
+	e, lists, _ := testEntry(ctrl, testEntryOptions{})
 
 	// Add an initial forwarded metric.
 	require.NoError(t, e.AddForwarded(testForwardedMetric, testForwardMetadata1))
@@ -1629,7 +1654,7 @@ func TestEntryMaybeExpireNoExpiry(t *testing.T) {
 	ctrl := gomock.NewController(t)
 	defer ctrl.Finish()
 
-	e, _, now := testEntry(ctrl)
+	e, _, now := testEntry(ctrl, testEntryOptions{})
 
 	// If we are still within entry TTL, should not expire.
 	require.False(t, e.ShouldExpire(now.Add(e.opts.EntryTTL()).Add(-time.Second)))
@@ -1648,7 +1673,7 @@ func TestEntryMaybeExpireWithExpiry(t *testing.T) {
 	ctrl := gomock.NewController(t)
 	defer ctrl.Finish()
 
-	e, _, now := testEntry(ctrl)
+	e, _, now := testEntry(ctrl, testEntryOptions{})
 	populateTestUntimedAggregations(t, e, testAggregationKeys, metric.CounterType)
 
 	var elems []*CounterElem
@@ -1677,7 +1702,7 @@ func TestEntryMaybeCopyIDWithLock(t *testing.T) {
 	defer ctrl.Finish()
 
 	id := id.RawID("foo")
-	e, _, _ := testEntry(ctrl)
+	e, _, _ := testEntry(ctrl, testEntryOptions{})
 	res := e.maybeCopyIDWithLock(id)
 	require.Equal(t, id, res)
 
@@ -1765,12 +1790,25 @@ func TestAggregationValues(t *testing.T) {
 	}
 }
 
-func testEntry(ctrl *gomock.Controller) (*Entry, *metricLists, *time.Time) {
+type testEntryOptions struct {
+	options Options
+}
+
+func testEntry(
+	ctrl *gomock.Controller,
+	testOpts testEntryOptions,
+) (*Entry, *metricLists, *time.Time) {
 	now := time.Now()
 	clockOpts := clock.NewOptions().SetNowFn(func() time.Time {
 		return now
 	})
-	opts := testOptions(ctrl).
+
+	opts := testOpts.options
+	if opts == nil {
+		opts = testOptions(ctrl)
+	}
+
+	opts = opts.
 		SetClockOptions(clockOpts).
 		SetDefaultStoragePolicies(testDefaultStoragePolicies)
 
@@ -1905,7 +1943,7 @@ func testEntryAddUntimed(
 	}
 
 	for _, input := range inputs {
-		e, _, now := testEntry(ctrl)
+		e, _, now := testEntry(ctrl, testEntryOptions{})
 		if withPrePopulation {
 			populateTestUntimedAggregations(t, e, prePopulateData, input.mu.Type)
 		}
diff --git a/src/aggregator/aggregator/options.go b/src/aggregator/aggregator/options.go
index d608bcfdbe..2bef24cb12 100644
--- a/src/aggregator/aggregator/options.go
+++ b/src/aggregator/aggregator/options.go
@@ -53,6 +53,7 @@ var (
 		policy.NewStoragePolicy(10*time.Second, xtime.Second, 2*24*time.Hour),
 		policy.NewStoragePolicy(time.Minute, xtime.Minute, 40*24*time.Hour),
 	}
+	defaultVerboseErrors = false
 
 	defaultTimedMetricBuffer = time.Minute
 
@@ -300,6 +301,12 @@ type Options interface {
 
 	// FullGaugePrefix returns the full prefix for gauges.
 	FullGaugePrefix() []byte
+
+	// SetVerboseErrors returns whether to return verbose errors or not.
+	SetVerboseErrors(value bool) Options
+
+	// VerboseErrors returns whether to return verbose errors or not.
+	VerboseErrors() bool
 }
 
 type options struct {
@@ -338,6 +345,7 @@ type options struct {
 	counterElemPool                  CounterElemPool
 	timerElemPool                    TimerElemPool
 	gaugeElemPool                    GaugeElemPool
+	verboseErrors                    bool
 
 	// Derived options.
 	fullCounterPrefix []byte
@@ -353,17 +361,17 @@ func NewOptions() Options {
 		SetTimerTypeStringTransformFn(aggregation.SuffixTransform).
 		SetGaugeTypeStringTransformFn(aggregation.EmptyTransform)
 	o := &options{
-		aggTypesOptions:    aggTypesOptions,
-		metricPrefix:       defaultMetricPrefix,
-		counterPrefix:      defaultCounterPrefix,
-		timerPrefix:        defaultTimerPrefix,
-		gaugePrefix:        defaultGaugePrefix,
-		timeLock:           &sync.RWMutex{},
-		clockOpts:          clock.NewOptions(),
-		instrumentOpts:     instrument.NewOptions(),
-		streamOpts:         cm.NewOptions(),
-		runtimeOptsManager: runtime.NewOptionsManager(runtime.NewOptions()),
-		shardFn:            sharding.Murmur32Hash.MustShardFn(),
+		aggTypesOptions:                  aggTypesOptions,
+		metricPrefix:                     defaultMetricPrefix,
+		counterPrefix:                    defaultCounterPrefix,
+		timerPrefix:                      defaultTimerPrefix,
+		gaugePrefix:                      defaultGaugePrefix,
+		timeLock:                         &sync.RWMutex{},
+		clockOpts:                        clock.NewOptions(),
+		instrumentOpts:                   instrument.NewOptions(),
+		streamOpts:                       cm.NewOptions(),
+		runtimeOptsManager:               runtime.NewOptionsManager(runtime.NewOptions()),
+		shardFn:                          sharding.Murmur32Hash.MustShardFn(),
 		bufferDurationBeforeShardCutover: defaultBufferDurationBeforeShardCutover,
 		bufferDurationAfterShardCutoff:   defaultBufferDurationAfterShardCutoff,
 		entryTTL:                         defaultEntryTTL,
@@ -377,6 +385,7 @@ func NewOptions() Options {
 		bufferForFutureTimedMetric:       defaultTimedMetricBuffer,
 		maxNumCachedSourceSets:           defaultMaxNumCachedSourceSets,
 		discardNaNAggregatedValues:       defaultDiscardNaNAggregatedValues,
+		verboseErrors:                    defaultVerboseErrors,
 	}
 
 	// Initialize pools.
@@ -732,6 +741,16 @@ func (o *options) GaugeElemPool() GaugeElemPool {
 	return o.gaugeElemPool
 }
 
+func (o *options) SetVerboseErrors(value bool) Options {
+	opts := *o
+	opts.verboseErrors = value
+	return &opts
+}
+
+func (o *options) VerboseErrors() bool {
+	return o.verboseErrors
+}
+
 func (o *options) FullCounterPrefix() []byte {
 	return o.fullCounterPrefix
 }
diff --git a/src/cmd/services/m3coordinator/downsample/options.go b/src/cmd/services/m3coordinator/downsample/options.go
index 01c966b084..934c0290ea 100644
--- a/src/cmd/services/m3coordinator/downsample/options.go
+++ b/src/cmd/services/m3coordinator/downsample/options.go
@@ -62,6 +62,7 @@ const (
 	defaultStorageFlushConcurrency = 20000
 	defaultOpenTimeout             = 10 * time.Second
 	defaultBufferFutureTimedMetric = time.Minute
+	defaultVerboseErrors           = true
 )
 
 var (
@@ -185,6 +186,9 @@ type Configuration struct {
 
 	// Pool of gauge elements.
 	GaugeElemPool pool.ObjectPoolConfiguration `yaml:"gaugeElemPool"`
+
+	// BufferPastLimits specifies the buffer past limits.
+	BufferPastLimits []BufferPastLimitConfiguration `yaml:"bufferPastLimits"`
 }
 
 // RemoteAggregatorConfiguration specifies a remote aggregator
@@ -207,6 +211,13 @@ func (c RemoteAggregatorConfiguration) newClient(
 	return c.Client.NewClient(kvClient, clockOpts, instrumentOpts)
 }
 
+// BufferPastLimitConfiguration specifies a custom buffer past limit
+// for aggregation tiles.
+type BufferPastLimitConfiguration struct {
+	Resolution time.Duration `yaml:"resolution"`
+	BufferPast time.Duration `yaml:"bufferPast"`
+}
+
 // NewDownsampler returns a new downsampler.
 func (cfg Configuration) NewDownsampler(
 	opts DownsamplerOptions,
@@ -316,7 +327,23 @@ func (cfg Configuration) newAggregator(o DownsamplerOptions) (agg, error) {
 		placementManager, flushTimesManager, electionManager, instrumentOpts,
 		storageFlushConcurrency, pools)
 
-	// Finally construct all options
+	bufferPastLimits := defaultBufferPastLimits
+	if numLimitsCfg := len(cfg.BufferPastLimits); numLimitsCfg > 0 {
+		// Allow overrides from config.
+		bufferPastLimits = make([]bufferPastLimit, 0, numLimitsCfg)
+		for _, limit := range cfg.BufferPastLimits {
+			bufferPastLimits = append(bufferPastLimits, bufferPastLimit{
+				upperBound: limit.Resolution,
+				bufferPast: limit.BufferPast,
+			})
+		}
+	}
+
+	bufferForPastTimedMetricFn := func(tile time.Duration) time.Duration {
+		return bufferForPastTimedMetric(bufferPastLimits, tile)
+	}
+
+	// Finally construct all options.
 	aggregatorOpts := aggregator.NewOptions().
 		SetClockOptions(clockOpts).
 		SetInstrumentOptions(instrumentOpts).
@@ -330,8 +357,9 @@ func (cfg Configuration) newAggregator(o DownsamplerOptions) (agg, error) {
 		SetElectionManager(electionManager).
 		SetFlushManager(flushManager).
 		SetFlushHandler(flushHandler).
-		SetBufferForPastTimedMetricFn(bufferForPastTimedMetric).
-		SetBufferForFutureTimedMetric(defaultBufferFutureTimedMetric)
+		SetBufferForPastTimedMetricFn(bufferForPastTimedMetricFn).
+		SetBufferForFutureTimedMetric(defaultBufferFutureTimedMetric).
+		SetVerboseErrors(defaultVerboseErrors)
 
 	if cfg.AggregationTypes != nil {
 		aggTypeOpts, err := cfg.AggregationTypes.NewOptions(instrumentOpts)
@@ -602,20 +630,23 @@ func (o DownsamplerOptions) newAggregatorFlushManagerAndHandler(
 	return flushManager, handler
 }
 
+type bufferPastLimit struct {
+	upperBound time.Duration
+	bufferPast time.Duration
+}
+
 var (
-	bufferPastLimits = []struct {
-		upperBound time.Duration
-		bufferPast time.Duration
-	}{
+	defaultBufferPastLimits = []bufferPastLimit{
 		{upperBound: 0, bufferPast: 15 * time.Second},
 		{upperBound: 30 * time.Second, bufferPast: 30 * time.Second},
 		{upperBound: time.Minute, bufferPast: time.Minute},
+		{upperBound: 2 * time.Minute, bufferPast: 2 * time.Minute},
 	}
 )
 
-func bufferForPastTimedMetric(tile time.Duration) time.Duration {
-	bufferPast := bufferPastLimits[0].bufferPast
-	for _, limit := range bufferPastLimits {
+func bufferForPastTimedMetric(limits []bufferPastLimit, tile time.Duration) time.Duration {
+	bufferPast := limits[0].bufferPast
+	for _, limit := range limits {
 		if tile < limit.upperBound {
 			return bufferPast
 		}
diff --git a/src/cmd/services/m3coordinator/downsample/options_test.go b/src/cmd/services/m3coordinator/downsample/options_test.go
index 8fe043a04e..bf8838fd0e 100644
--- a/src/cmd/services/m3coordinator/downsample/options_test.go
+++ b/src/cmd/services/m3coordinator/downsample/options_test.go
@@ -29,6 +29,7 @@ import (
 )
 
 func TestBufferForPastTimedMetric(t *testing.T) {
+	limits := defaultBufferPastLimits
 	tests := []struct {
 		value    time.Duration
 		expected time.Duration
@@ -40,12 +41,13 @@ func TestBufferForPastTimedMetric(t *testing.T) {
 		{value: 30 * time.Second, expected: 30 * time.Second},
 		{value: 59 * time.Second, expected: 30 * time.Second},
 		{value: 60 * time.Second, expected: time.Minute},
-		{value: 59 * time.Minute, expected: time.Minute},
-		{value: 61 * time.Minute, expected: time.Minute},
+		{value: 2 * time.Minute, expected: 2 * time.Minute},
+		{value: 59 * time.Minute, expected: 2 * time.Minute},
+		{value: 61 * time.Minute, expected: 2 * time.Minute},
 	}
 	for _, test := range tests {
 		t.Run(fmt.Sprintf("test_value_%v", test.value), func(t *testing.T) {
-			result := bufferForPastTimedMetric(test.value)
+			result := bufferForPastTimedMetric(limits, test.value)
 			require.Equal(t, test.expected, result)
 		})
 	}