From a5fe8ae2c0144cecdd9730b6e8d18f2cb7ae7b57 Mon Sep 17 00:00:00 2001 From: Artem Date: Mon, 14 Oct 2019 15:34:40 -0400 Subject: [PATCH 1/3] [query] Graphite fetches truncated to resolution --- .../api/v1/handler/graphite/render_test.go | 36 +++++++++---- src/query/graphite/storage/m3_wrapper.go | 41 +++++++++++++-- src/query/graphite/storage/m3_wrapper_test.go | 50 ++++++++++++++----- 3 files changed, 100 insertions(+), 27 deletions(-) diff --git a/src/query/api/v1/handler/graphite/render_test.go b/src/query/api/v1/handler/graphite/render_test.go index 797b3adce8..d526426669 100644 --- a/src/query/api/v1/handler/graphite/render_test.go +++ b/src/query/api/v1/handler/graphite/render_test.go @@ -74,8 +74,9 @@ func TestParseQueryNoResults(t *testing.T) { func TestParseQueryResults(t *testing.T) { mockStorage := mock.NewMockStorage() - start := time.Now().Add(-30 * time.Minute) resolution := 10 * time.Second + truncateStart := time.Now().Add(-30 * time.Minute).Truncate(resolution) + start := truncateStart.Add(time.Second) vals := ts.NewFixedStepValues(resolution, 3, 3, start) tags := models.NewTags(0, nil) tags = tags.AddTag(models.Tag{Name: graphite.TagName(0), Value: []byte("foo")}) @@ -104,10 +105,11 @@ func TestParseQueryResults(t *testing.T) { buf, err := ioutil.ReadAll(res.Body) require.NoError(t, err) + firstTimestamp := truncateStart.Unix() + 10 expected := fmt.Sprintf( `[{"target":"series_name","datapoints":[[3.000000,%d],`+ - `[3.000000,%d],[3.000000,%d]],"step_size_ms":%d}]`, - start.Unix(), start.Unix()+10, start.Unix()+20, resolution/time.Millisecond) + `[3.000000,%d]],"step_size_ms":%d}]`, + firstTimestamp, firstTimestamp+10, resolution/time.Millisecond) require.Equal(t, expected, string(buf)) } @@ -119,7 +121,7 @@ func TestParseQueryResultsMaxDatapoints(t *testing.T) { endStr := "03/07/15" start, err := graphite.ParseTime(startStr, time.Now(), 0) require.NoError(t, err) - end, err := graphite.ParseTime(endStr, time.Now(), 0) + _, err = graphite.ParseTime(endStr, time.Now(), 0) require.NoError(t, err) resolution := 10 * time.Second @@ -138,7 +140,10 @@ func TestParseQueryResultsMaxDatapoints(t *testing.T) { models.QueryContextOptions{}, nil, instrument.NewOptions()) req := newGraphiteReadHTTPRequest(t) - req.URL.RawQuery = "target=foo.bar&from=" + startStr + "&until=" + endStr + "&maxDataPoints=1" + req.URL.RawQuery = fmt.Sprintf( + "target=foo.bar&from=%s&until=%s&maxDataPoints=1", + startStr, endStr, + ) recorder := httptest.NewRecorder() handler.ServeHTTP(recorder, req) @@ -148,9 +153,11 @@ func TestParseQueryResultsMaxDatapoints(t *testing.T) { buf, err := ioutil.ReadAll(res.Body) require.NoError(t, err) + // Expected resolution should be in milliseconds and subsume all datapoints. + stepSize := resolution / time.Millisecond * 4 expected := fmt.Sprintf( `[{"target":"a","datapoints":[[4.000000,%d]],"step_size_ms":%d}]`, - start.Unix(), end.Sub(start)/time.Millisecond) + start.Unix(), stepSize) require.Equal(t, expected, string(buf)) } @@ -158,8 +165,11 @@ func TestParseQueryResultsMaxDatapoints(t *testing.T) { func TestParseQueryResultsMultiTarget(t *testing.T) { mockStorage := mock.NewMockStorage() minsAgo := 12 - start := time.Now().Add(-1 * time.Duration(minsAgo) * time.Minute) resolution := 10 * time.Second + start := time.Now(). + Add(-1 * time.Duration(minsAgo) * time.Minute). + Truncate(resolution) + vals := ts.NewFixedStepValues(resolution, 3, 3, start) seriesList := ts.SeriesList{ ts.NewSeries([]byte("a"), vals, models.NewTags(0, nil)), @@ -175,8 +185,10 @@ func TestParseQueryResultsMultiTarget(t *testing.T) { models.QueryContextOptions{}, nil, instrument.NewOptions()) req := newGraphiteReadHTTPRequest(t) - req.URL.RawQuery = fmt.Sprintf("target=foo.bar&target=baz.qux&from=%d&until=%d", - start.Unix(), start.Unix()+30) + req.URL.RawQuery = fmt.Sprintf( + "target=foo.bar&target=baz.qux&from=%d&until=%d", + start.Unix(), start.Unix()+30, + ) recorder := httptest.NewRecorder() handler.ServeHTTP(recorder, req) @@ -228,8 +240,10 @@ func TestParseQueryResultsMultiTargetWithLimits(t *testing.T) { models.QueryContextOptions{}, nil, instrument.NewOptions()) req := newGraphiteReadHTTPRequest(t) - req.URL.RawQuery = fmt.Sprintf("target=foo.bar&target=bar.baz&from=%d&until=%d", - start.Unix(), start.Unix()+30) + req.URL.RawQuery = fmt.Sprintf( + "target=foo.bar&target=bar.baz&from=%d&until=%d", + start.Unix(), start.Unix()+30, + ) recorder := httptest.NewRecorder() h.ServeHTTP(recorder, req) diff --git a/src/query/graphite/storage/m3_wrapper.go b/src/query/graphite/storage/m3_wrapper.go index ba22db17e1..38d4ec40e1 100644 --- a/src/query/graphite/storage/m3_wrapper.go +++ b/src/query/graphite/storage/m3_wrapper.go @@ -24,6 +24,7 @@ import ( "context" "errors" "fmt" + "math" "time" "github.com/m3db/m3/src/query/block" @@ -120,6 +121,24 @@ func translateQuery(query string, opts FetchOptions) (*storage.FetchQuery, error }, nil } +func truncateBoundsToResolution( + start time.Time, + end time.Time, + resolution time.Duration, +) (time.Time, time.Time) { + truncatedStart := start.Truncate(resolution) + // NB: if truncated start matches start, it's already valid. + if truncatedStart.Before(start) { + start = truncatedStart.Add(resolution) + } + + length := float64(end.Sub(truncatedStart)) + steps := math.Floor(length / float64(resolution)) + truncatedLength := time.Duration(steps) * resolution + end = start.Add(truncatedLength) + return start, end +} + func translateTimeseries( ctx xctx.Context, result *storage.FetchResult, @@ -128,6 +147,7 @@ func translateTimeseries( m3list := result.SeriesList series := make([]*ts.Series, len(m3list)) resolutions := result.Metadata.Resolutions + fmt.Println("Resolutions", resolutions) if len(series) != len(resolutions) { return nil, fmt.Errorf("number of timeseries %d does not match number of "+ "resolutions %d", len(series), len(resolutions)) @@ -139,20 +159,33 @@ func translateTimeseries( return nil, errSeriesNoResolution } + start, end := truncateBoundsToResolution(start, end, resolution) length := int(end.Sub(start) / resolution) millisPerStep := int(resolution / time.Millisecond) values := ts.NewValues(ctx, millisPerStep, length) + dataLength := 0 for _, datapoint := range m3series.Values().Datapoints() { - index := int(datapoint.Timestamp.Sub(start) / resolution) - if index < 0 || index >= length { - // Outside of range requested + ts := datapoint.Timestamp + if ts.Before(start) { + // Outside of range requested. continue } + + if ts.After(end) { + // No more valid datapoints. + break + } + + dataLength++ + index := int(datapoint.Timestamp.Sub(start) / resolution) values.SetValueAt(index, datapoint.Value) } + // NB: depending on bounds alignment vs resolution, some datapoints may not + // fit in the truncated period, so the output should be sanitized. + truncatedValues := values.Slice(0, dataLength) name := string(m3series.Name()) - series[i] = ts.NewSeries(ctx, name, start, values) + series[i] = ts.NewSeries(ctx, name, start, truncatedValues) } return series, nil diff --git a/src/query/graphite/storage/m3_wrapper_test.go b/src/query/graphite/storage/m3_wrapper_test.go index 4faa5979de..c4d40cd270 100644 --- a/src/query/graphite/storage/m3_wrapper_test.go +++ b/src/query/graphite/storage/m3_wrapper_test.go @@ -99,9 +99,16 @@ func TestTranslateQueryTrailingDot(t *testing.T) { func TestTranslateTimeseries(t *testing.T) { ctx := xctx.New() resolution := 10 * time.Second - steps := 1 - start := time.Now() - end := start.Add(time.Duration(steps) * resolution) + steps := 3 + start := time.Now().Truncate(resolution).Add(time.Second) + end := start.Add(time.Duration(steps) * resolution).Add(time.Second * -2) + + // NB: truncated steps should have 1 less data point than input series since + // the first data point is not valid. + truncatedSteps := steps - 1 + truncated := start.Truncate(resolution).Add(resolution) + truncatedEnd := truncated.Add(resolution * time.Duration(truncatedSteps)) + expected := 5 seriesList := make(m3ts.SeriesList, expected) for i := 0; i < expected; i++ { @@ -128,7 +135,13 @@ func TestTranslateTimeseries(t *testing.T) { require.Equal(t, expected, len(translated)) for i, tt := range translated { - ex := []float64{float64(i)} + ex := make([]float64, truncatedSteps) + for j := range ex { + ex[j] = float64(i) + } + + assert.Equal(t, truncated, tt.StartTime()) + assert.Equal(t, truncatedEnd, tt.EndTime()) assert.Equal(t, ex, tt.SafeValues()) assert.Equal(t, fmt.Sprint("a", i), tt.Name()) } @@ -137,9 +150,15 @@ func TestTranslateTimeseries(t *testing.T) { func TestTranslateTimeseriesWithTags(t *testing.T) { ctx := xctx.New() resolution := 10 * time.Second - steps := 1 - start := time.Now() - end := start.Add(time.Duration(steps) * resolution) + steps := 3 + start := time.Now().Truncate(resolution).Add(time.Second) + end := start.Add(time.Duration(steps) * resolution).Add(time.Second * -2) + + // NB: truncated steps should have 1 less data point than input series since + // the first data point is not valid. + truncatedSteps := steps - 1 + truncated := start.Truncate(resolution).Add(resolution) + truncatedEnd := truncated.Add(resolution * time.Duration(truncatedSteps)) expected := 5 seriesList := make(m3ts.SeriesList, expected) for i := 0; i < expected; i++ { @@ -150,7 +169,7 @@ func TestTranslateTimeseriesWithTags(t *testing.T) { } resos := make([]int64, 0, expected) - for _ = range seriesList { + for range seriesList { resos = append(resos, int64(resolution)) } @@ -166,7 +185,13 @@ func TestTranslateTimeseriesWithTags(t *testing.T) { require.Equal(t, expected, len(translated)) for i, tt := range translated { - ex := []float64{float64(i)} + ex := make([]float64, truncatedSteps) + for j := range ex { + ex[j] = float64(i) + } + + assert.Equal(t, truncated, tt.StartTime()) + assert.Equal(t, truncatedEnd, tt.EndTime()) assert.Equal(t, ex, tt.SafeValues()) assert.Equal(t, fmt.Sprint("a", i), tt.Name()) } @@ -174,8 +199,8 @@ func TestTranslateTimeseriesWithTags(t *testing.T) { func TestFetchByQuery(t *testing.T) { store := mock.NewMockStorage() - start := time.Now().Add(time.Hour * -1) resolution := 10 * time.Second + start := time.Now().Add(time.Hour * -1).Truncate(resolution).Add(time.Second) steps := 3 vals := m3ts.NewFixedStepValues(resolution, steps, 3, start) seriesList := m3ts.SeriesList{ @@ -203,7 +228,7 @@ func TestFetchByQuery(t *testing.T) { wrapper := NewM3WrappedStorage(store, enforcer, instrument.NewOptions()) ctx := xctx.New() ctx.SetRequestContext(context.TODO()) - end := time.Now() + end := start.Add(time.Duration(steps) * resolution) opts := FetchOptions{ StartTime: start, EndTime: end, @@ -218,7 +243,8 @@ func TestFetchByQuery(t *testing.T) { require.Equal(t, 1, len(result.SeriesList)) series := result.SeriesList[0] assert.Equal(t, "a", series.Name()) - assert.Equal(t, []float64{3, 3, 3}, series.SafeValues()) + // NB: last point is expected to be truncated. + assert.Equal(t, []float64{3, 3}, series.SafeValues()) // NB: ensure the fetch was called with the base enforcer's child correctly assert.Equal(t, childEnforcer, store.LastFetchOptions().Enforcer) From 56969f87f1d614bb050ea347c3b39005852bb0d2 Mon Sep 17 00:00:00 2001 From: Artem Date: Mon, 14 Oct 2019 19:29:05 -0400 Subject: [PATCH 2/3] Reverting incorrect optimization that caused a bug --- scripts/docker-integration-tests/carbon/test.sh | 6 +++--- src/query/api/v1/handler/graphite/render_test.go | 12 ++++++------ src/query/graphite/storage/m3_wrapper.go | 8 +------- 3 files changed, 10 insertions(+), 16 deletions(-) diff --git a/scripts/docker-integration-tests/carbon/test.sh b/scripts/docker-integration-tests/carbon/test.sh index e7c3827161..2bc019422c 100755 --- a/scripts/docker-integration-tests/carbon/test.sh +++ b/scripts/docker-integration-tests/carbon/test.sh @@ -53,7 +53,7 @@ t=$(date +%s) echo "foo.min.aggregate.baz 41 $t" | nc 0.0.0.0 7204 echo "foo.min.aggregate.baz 42 $t" | nc 0.0.0.0 7204 echo "Attempting to read min aggregated carbon metric" -ATTEMPTS=10 TIMEOUT=1 retry_with_backoff read_carbon foo.min.aggregate.baz 41 +ATTEMPTS=20 MAX_TIMEOUT=4 TIMEOUT=1 retry_with_backoff read_carbon foo.min.aggregate.baz 41 echo "Writing out a carbon metric that should not be aggregated" t=$(date +%s) @@ -64,7 +64,7 @@ t=$(date +%s) echo "foo.min.already-aggregated.baz 42 $t" | nc 0.0.0.0 7204 echo "foo.min.already-aggregated.baz 43 $t" | nc 0.0.0.0 7204 echo "Attempting to read unaggregated carbon metric" -ATTEMPTS=10 TIMEOUT=1 retry_with_backoff read_carbon foo.min.already-aggregated.baz 43 +ATTEMPTS=20 MAX_TIMEOUT=4 TIMEOUT=1 retry_with_backoff read_carbon foo.min.already-aggregated.baz 43 echo "Writing out a carbon metric that should should use the default mean aggregation" t=$(date +%s) @@ -72,7 +72,7 @@ t=$(date +%s) echo "foo.min.catch-all.baz 10 $t" | nc 0.0.0.0 7204 echo "foo.min.catch-all.baz 20 $t" | nc 0.0.0.0 7204 echo "Attempting to read mean aggregated carbon metric" -ATTEMPTS=10 TIMEOUT=1 retry_with_backoff read_carbon foo.min.catch-all.baz 15 +ATTEMPTS=20 MAX_TIMEOUT=4 TIMEOUT=1 retry_with_backoff read_carbon foo.min.catch-all.baz 15 # Test writing and reading IDs with colons in them. t=$(date +%s) diff --git a/src/query/api/v1/handler/graphite/render_test.go b/src/query/api/v1/handler/graphite/render_test.go index d526426669..5fc67bbe7a 100644 --- a/src/query/api/v1/handler/graphite/render_test.go +++ b/src/query/api/v1/handler/graphite/render_test.go @@ -105,11 +105,11 @@ func TestParseQueryResults(t *testing.T) { buf, err := ioutil.ReadAll(res.Body) require.NoError(t, err) - firstTimestamp := truncateStart.Unix() + 10 + exTimestamp := truncateStart.Unix() + 10 expected := fmt.Sprintf( `[{"target":"series_name","datapoints":[[3.000000,%d],`+ - `[3.000000,%d]],"step_size_ms":%d}]`, - firstTimestamp, firstTimestamp+10, resolution/time.Millisecond) + `[3.000000,%d],[null,%d]],"step_size_ms":%d}]`, + exTimestamp, exTimestamp+10, exTimestamp+20, resolution/time.Millisecond) require.Equal(t, expected, string(buf)) } @@ -121,7 +121,7 @@ func TestParseQueryResultsMaxDatapoints(t *testing.T) { endStr := "03/07/15" start, err := graphite.ParseTime(startStr, time.Now(), 0) require.NoError(t, err) - _, err = graphite.ParseTime(endStr, time.Now(), 0) + end, err := graphite.ParseTime(endStr, time.Now(), 0) require.NoError(t, err) resolution := 10 * time.Second @@ -154,10 +154,10 @@ func TestParseQueryResultsMaxDatapoints(t *testing.T) { require.NoError(t, err) // Expected resolution should be in milliseconds and subsume all datapoints. - stepSize := resolution / time.Millisecond * 4 + exStep := end.Sub(start) / time.Millisecond expected := fmt.Sprintf( `[{"target":"a","datapoints":[[4.000000,%d]],"step_size_ms":%d}]`, - start.Unix(), stepSize) + start.Unix(), exStep) require.Equal(t, expected, string(buf)) } diff --git a/src/query/graphite/storage/m3_wrapper.go b/src/query/graphite/storage/m3_wrapper.go index 38d4ec40e1..cf76d67ba1 100644 --- a/src/query/graphite/storage/m3_wrapper.go +++ b/src/query/graphite/storage/m3_wrapper.go @@ -147,7 +147,6 @@ func translateTimeseries( m3list := result.SeriesList series := make([]*ts.Series, len(m3list)) resolutions := result.Metadata.Resolutions - fmt.Println("Resolutions", resolutions) if len(series) != len(resolutions) { return nil, fmt.Errorf("number of timeseries %d does not match number of "+ "resolutions %d", len(series), len(resolutions)) @@ -163,7 +162,6 @@ func translateTimeseries( length := int(end.Sub(start) / resolution) millisPerStep := int(resolution / time.Millisecond) values := ts.NewValues(ctx, millisPerStep, length) - dataLength := 0 for _, datapoint := range m3series.Values().Datapoints() { ts := datapoint.Timestamp if ts.Before(start) { @@ -176,16 +174,12 @@ func translateTimeseries( break } - dataLength++ index := int(datapoint.Timestamp.Sub(start) / resolution) values.SetValueAt(index, datapoint.Value) } - // NB: depending on bounds alignment vs resolution, some datapoints may not - // fit in the truncated period, so the output should be sanitized. - truncatedValues := values.Slice(0, dataLength) name := string(m3series.Name()) - series[i] = ts.NewSeries(ctx, name, start, truncatedValues) + series[i] = ts.NewSeries(ctx, name, start, values) } return series, nil From f53b511171dbaec1947f342e70f51fa2ce09efb9 Mon Sep 17 00:00:00 2001 From: Artem Date: Tue, 15 Oct 2019 00:15:36 -0400 Subject: [PATCH 3/3] PR response --- src/query/graphite/storage/m3_wrapper.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/query/graphite/storage/m3_wrapper.go b/src/query/graphite/storage/m3_wrapper.go index cf76d67ba1..e0df61c2b7 100644 --- a/src/query/graphite/storage/m3_wrapper.go +++ b/src/query/graphite/storage/m3_wrapper.go @@ -169,7 +169,7 @@ func translateTimeseries( continue } - if ts.After(end) { + if !ts.Before(end) { // No more valid datapoints. break }