From 8529a0c4d247b126ca7e7a0e70608130fc6c5a22 Mon Sep 17 00:00:00 2001 From: Charles Korn Date: Fri, 30 Aug 2024 13:35:34 +1000 Subject: [PATCH] Fix issue where annotations would not be returned for sharded queries --- pkg/frontend/querymiddleware/querysharding.go | 95 ++++++++++++++++++- .../querymiddleware/querysharding_test.go | 3 + .../querymiddleware/sharded_queryable.go | 49 ++++++---- .../querymiddleware/sharded_queryable_test.go | 2 +- .../split_by_instant_interval.go | 12 ++- 5 files changed, 133 insertions(+), 28 deletions(-) diff --git a/pkg/frontend/querymiddleware/querysharding.go b/pkg/frontend/querymiddleware/querysharding.go index bea9986f30c..e62b065d4b6 100644 --- a/pkg/frontend/querymiddleware/querysharding.go +++ b/pkg/frontend/querymiddleware/querysharding.go @@ -8,6 +8,8 @@ package querymiddleware import ( "context" "fmt" + "slices" + "sync" "time" "github.com/go-kit/log" @@ -148,7 +150,9 @@ func (s *querySharding) Do(ctx context.Context, r MetricsQueryRequest) (Response if err != nil { return nil, apierror.New(apierror.TypeBadData, err.Error()) } - shardedQueryable := newShardedQueryable(r, s.next) + + annotationAccumulator := newAnnotationAccumulator() + shardedQueryable := newShardedQueryable(r, annotationAccumulator, s.next) qry, err := newQuery(ctx, r, s.engine, lazyquery.NewLazyQueryable(shardedQueryable)) if err != nil { @@ -160,17 +164,25 @@ func (s *querySharding) Do(ctx context.Context, r MetricsQueryRequest) (Response if err != nil { return nil, mapEngineError(err) } + // Note that the positions based on the original query may be wrong as the rewritten + // query which is actually used is different, but the user does not see the rewritten + // query, so we pass in an empty string as the query so the positions will be hidden. warn, info := res.Warnings.AsStrings("", 0, 0) + + // Add any annotations returned by the sharded queries, and remove any duplicates. + accumulatedWarnings, accumulatedInfos := annotationAccumulator.getAll() + warn = append(warn, accumulatedWarnings...) + info = append(info, accumulatedInfos...) + warn = removeDuplicates(warn) + info = removeDuplicates(info) + return &PrometheusResponse{ Status: statusSuccess, Data: &PrometheusData{ ResultType: string(res.Value.Type()), Result: extracted, }, - Headers: shardedQueryable.getResponseHeaders(), - // Note that the positions based on the original query may be wrong as the rewritten - // query which is actually used is different, but the user does not see the rewritten - // query, so we pass in an empty string as the query so the positions will be hidden. + Headers: shardedQueryable.getResponseHeaders(), Warnings: warn, Infos: info, }, nil @@ -477,3 +489,76 @@ func longestRegexpMatcherBytes(expr parser.Expr) int { return longest } + +// annotationAccumulator collects annotations returned by sharded queries. +type annotationAccumulator struct { + warnings *sync.Map + infos *sync.Map +} + +func newAnnotationAccumulator() *annotationAccumulator { + return &annotationAccumulator{ + warnings: &sync.Map{}, + infos: &sync.Map{}, + } +} + +// addWarning collects the warning annotation w. +// +// addWarning is safe to call from multiple goroutines. +func (a *annotationAccumulator) addWarning(w string) { + // We use LoadOrStore here to add the annotation if it doesn't already exist or otherwise do nothing. + a.warnings.LoadOrStore(w, struct{}{}) +} + +// addWarnings collects all of the warning annotations in warnings. +// +// addWarnings is safe to call from multiple goroutines. +func (a *annotationAccumulator) addWarnings(warnings []string) { + for _, w := range warnings { + a.addWarning(w) + } +} + +// addInfo collects the info annotation i. +// +// addInfo is safe to call from multiple goroutines. +func (a *annotationAccumulator) addInfo(i string) { + // We use LoadOrStore here to add the annotation if it doesn't already exist or otherwise do nothing. + a.infos.LoadOrStore(i, struct{}{}) +} + +// addInfos collects all of the info annotations in infos. +// +// addInfo is safe to call from multiple goroutines. +func (a *annotationAccumulator) addInfos(infos []string) { + for _, i := range infos { + a.addInfo(i) + } +} + +// getAll returns all annotations collected by this accumulator. +// +// getAll may return inconsistent or unexpected results if it is called concurrently with addInfo or addWarning. +func (a *annotationAccumulator) getAll() (warnings, infos []string) { + return getAllKeys(a.warnings), getAllKeys(a.infos) +} + +func getAllKeys(m *sync.Map) []string { + var keys []string + + m.Range(func(k, v interface{}) bool { + keys = append(keys, k.(string)) + return true + }) + + return keys +} + +// removeDuplicates removes duplicate entries from s. +// +// s may be modified and should not be used after removeDuplicates returns. +func removeDuplicates(s []string) []string { + slices.Sort(s) + return slices.Compact(s) +} diff --git a/pkg/frontend/querymiddleware/querysharding_test.go b/pkg/frontend/querymiddleware/querysharding_test.go index ef7cd301e1c..af5b18b0660 100644 --- a/pkg/frontend/querymiddleware/querysharding_test.go +++ b/pkg/frontend/querymiddleware/querysharding_test.go @@ -101,6 +101,9 @@ func approximatelyEquals(t *testing.T, a, b *PrometheusResponse) { compareExpectedAndActual(t, expected.TimestampMs, actual.TimestampMs, expected.Histogram.Sum, actual.Histogram.Sum, j, a.Labels, "histogram", 1e-12) } } + + require.ElementsMatch(t, a.Infos, b.Infos, "expected same info annotations") + require.ElementsMatch(t, a.Warnings, b.Warnings, "expected same info annotations") } func compareExpectedAndActual(t *testing.T, expectedTs, actualTs int64, expectedVal, actualVal float64, j int, labels []mimirpb.LabelAdapter, sampleType string, tolerance float64) { diff --git a/pkg/frontend/querymiddleware/sharded_queryable.go b/pkg/frontend/querymiddleware/sharded_queryable.go index 4585d019ea7..7f65a2e3e91 100644 --- a/pkg/frontend/querymiddleware/sharded_queryable.go +++ b/pkg/frontend/querymiddleware/sharded_queryable.go @@ -34,25 +34,27 @@ var ( // shardedQueryable is an implementor of the Queryable interface. type shardedQueryable struct { - req MetricsQueryRequest - handler MetricsQueryHandler - responseHeaders *responseHeadersTracker + req MetricsQueryRequest + annotationAccumulator *annotationAccumulator + handler MetricsQueryHandler + responseHeaders *responseHeadersTracker } // newShardedQueryable makes a new shardedQueryable. We expect a new queryable is created for each // query, otherwise the response headers tracker doesn't work as expected, because it merges the // headers for all queries run through the queryable and never reset them. -func newShardedQueryable(req MetricsQueryRequest, next MetricsQueryHandler) *shardedQueryable { +func newShardedQueryable(req MetricsQueryRequest, annotationAccumulator *annotationAccumulator, next MetricsQueryHandler) *shardedQueryable { return &shardedQueryable{ - req: req, - handler: next, - responseHeaders: newResponseHeadersTracker(), + req: req, + annotationAccumulator: annotationAccumulator, + handler: next, + responseHeaders: newResponseHeadersTracker(), } } // Querier implements storage.Queryable. func (q *shardedQueryable) Querier(_, _ int64) (storage.Querier, error) { - return &shardedQuerier{req: q.req, handler: q.handler, responseHeaders: q.responseHeaders}, nil + return &shardedQuerier{req: q.req, annotationAccumulator: q.annotationAccumulator, handler: q.handler, responseHeaders: q.responseHeaders}, nil } // getResponseHeaders returns the merged response headers received by the downstream @@ -65,8 +67,9 @@ func (q *shardedQueryable) getResponseHeaders() []*PrometheusHeader { // from the astmapper.EmbeddedQueriesMetricName metric label value and concurrently run embedded queries // through the downstream handler. type shardedQuerier struct { - req MetricsQueryRequest - handler MetricsQueryHandler + req MetricsQueryRequest + annotationAccumulator *annotationAccumulator + handler MetricsQueryHandler // Keep track of response headers received when running embedded queries. responseHeaders *responseHeadersTracker @@ -119,13 +122,20 @@ func (q *shardedQuerier) handleEmbeddedQueries(ctx context.Context, queries []st return err } - resStreams, err := responseToSamples(resp) + promRes, ok := resp.(*PrometheusResponse) + if !ok { + return errors.Errorf("error invalid response type: %T, expected: %T", resp, &PrometheusResponse{}) + } + resStreams, err := responseToSamples(promRes) if err != nil { return err } streams[idx] = resStreams // No mutex is needed since each job writes its own index. This is like writing separate variables. q.responseHeaders.mergeHeaders(resp.(*PrometheusResponse).Headers) + q.annotationAccumulator.addInfos(promRes.Infos) + q.annotationAccumulator.addWarnings(promRes.Warnings) + return nil }) @@ -289,25 +299,22 @@ func newSeriesSetFromEmbeddedQueriesResults(results [][]SampleStream, hints *sto } // responseToSamples is needed to map back from api response to the underlying series data -func responseToSamples(resp Response) ([]SampleStream, error) { - promRes, ok := resp.(*PrometheusResponse) - if !ok { - return nil, errors.Errorf("error invalid response type: %T, expected: %T", resp, &PrometheusResponse{}) +func responseToSamples(resp *PrometheusResponse) ([]SampleStream, error) { + if resp.Error != "" { + return nil, errors.New(resp.Error) } - if promRes.Error != "" { - return nil, errors.New(promRes.Error) - } - switch promRes.Data.ResultType { + + switch resp.Data.ResultType { case string(parser.ValueTypeString), string(parser.ValueTypeScalar), string(parser.ValueTypeVector), string(parser.ValueTypeMatrix): - return promRes.Data.Result, nil + return resp.Data.Result, nil } return nil, errors.Errorf( "Invalid promql.Value type: [%s]. Only %s, %s, %s and %s supported", - promRes.Data.ResultType, + resp.Data.ResultType, parser.ValueTypeString, parser.ValueTypeScalar, parser.ValueTypeVector, diff --git a/pkg/frontend/querymiddleware/sharded_queryable_test.go b/pkg/frontend/querymiddleware/sharded_queryable_test.go index da5f223c576..273b3db6cc0 100644 --- a/pkg/frontend/querymiddleware/sharded_queryable_test.go +++ b/pkg/frontend/querymiddleware/sharded_queryable_test.go @@ -257,7 +257,7 @@ func TestShardedQuerier_Select_ShouldConcurrentlyRunEmbeddedQueries(t *testing.T } func TestShardedQueryable_GetResponseHeaders(t *testing.T) { - queryable := newShardedQueryable(&PrometheusRangeQueryRequest{}, nil) + queryable := newShardedQueryable(&PrometheusRangeQueryRequest{}, nil, nil) assert.Empty(t, queryable.getResponseHeaders()) // Merge some response headers from the 1st querier. diff --git a/pkg/frontend/querymiddleware/split_by_instant_interval.go b/pkg/frontend/querymiddleware/split_by_instant_interval.go index 28d71eb4ff7..87d25ebc2ef 100644 --- a/pkg/frontend/querymiddleware/split_by_instant_interval.go +++ b/pkg/frontend/querymiddleware/split_by_instant_interval.go @@ -179,7 +179,9 @@ func (s *splitInstantQueryByIntervalMiddleware) Do(ctx context.Context, req Metr if err != nil { return nil, err } - shardedQueryable := newShardedQueryable(req, s.next) + + annotationAccumulator := newAnnotationAccumulator() + shardedQueryable := newShardedQueryable(req, annotationAccumulator, s.next) qry, err := newQuery(ctx, req, s.engine, lazyquery.NewLazyQueryable(shardedQueryable)) if err != nil { @@ -194,6 +196,14 @@ func (s *splitInstantQueryByIntervalMiddleware) Do(ctx context.Context, req Metr return nil, mapEngineError(err) } warn, info := res.Warnings.AsStrings("", 0, 0) + + // Add any annotations returned by the sharded queries, and remove any duplicates. + accumulatedWarnings, accumulatedInfos := annotationAccumulator.getAll() + warn = append(warn, accumulatedWarnings...) + info = append(info, accumulatedInfos...) + warn = removeDuplicates(warn) + info = removeDuplicates(info) + return &PrometheusResponse{ Status: statusSuccess, Data: &PrometheusData{