Skip to content

Commit

Permalink
Fix issue where annotations would not be returned for sharded queries
Browse files Browse the repository at this point in the history
  • Loading branch information
charleskorn committed Aug 30, 2024
1 parent d341851 commit 8529a0c
Show file tree
Hide file tree
Showing 5 changed files with 133 additions and 28 deletions.
95 changes: 90 additions & 5 deletions pkg/frontend/querymiddleware/querysharding.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ package querymiddleware
import (
"context"
"fmt"
"slices"
"sync"
"time"

"github.com/go-kit/log"
Expand Down Expand Up @@ -148,7 +150,9 @@ func (s *querySharding) Do(ctx context.Context, r MetricsQueryRequest) (Response
if err != nil {
return nil, apierror.New(apierror.TypeBadData, err.Error())
}
shardedQueryable := newShardedQueryable(r, s.next)

annotationAccumulator := newAnnotationAccumulator()
shardedQueryable := newShardedQueryable(r, annotationAccumulator, s.next)

qry, err := newQuery(ctx, r, s.engine, lazyquery.NewLazyQueryable(shardedQueryable))
if err != nil {
Expand All @@ -160,17 +164,25 @@ func (s *querySharding) Do(ctx context.Context, r MetricsQueryRequest) (Response
if err != nil {
return nil, mapEngineError(err)
}
// Note that the positions based on the original query may be wrong as the rewritten
// query which is actually used is different, but the user does not see the rewritten
// query, so we pass in an empty string as the query so the positions will be hidden.
warn, info := res.Warnings.AsStrings("", 0, 0)

// Add any annotations returned by the sharded queries, and remove any duplicates.
accumulatedWarnings, accumulatedInfos := annotationAccumulator.getAll()
warn = append(warn, accumulatedWarnings...)
info = append(info, accumulatedInfos...)
warn = removeDuplicates(warn)
info = removeDuplicates(info)

return &PrometheusResponse{
Status: statusSuccess,
Data: &PrometheusData{
ResultType: string(res.Value.Type()),
Result: extracted,
},
Headers: shardedQueryable.getResponseHeaders(),
// Note that the positions based on the original query may be wrong as the rewritten
// query which is actually used is different, but the user does not see the rewritten
// query, so we pass in an empty string as the query so the positions will be hidden.
Headers: shardedQueryable.getResponseHeaders(),
Warnings: warn,
Infos: info,
}, nil
Expand Down Expand Up @@ -477,3 +489,76 @@ func longestRegexpMatcherBytes(expr parser.Expr) int {

return longest
}

// annotationAccumulator collects annotations returned by sharded queries.
type annotationAccumulator struct {
warnings *sync.Map
infos *sync.Map
}

func newAnnotationAccumulator() *annotationAccumulator {
return &annotationAccumulator{
warnings: &sync.Map{},
infos: &sync.Map{},
}
}

// addWarning collects the warning annotation w.
//
// addWarning is safe to call from multiple goroutines.
func (a *annotationAccumulator) addWarning(w string) {
// We use LoadOrStore here to add the annotation if it doesn't already exist or otherwise do nothing.
a.warnings.LoadOrStore(w, struct{}{})
}

// addWarnings collects all of the warning annotations in warnings.
//
// addWarnings is safe to call from multiple goroutines.
func (a *annotationAccumulator) addWarnings(warnings []string) {
for _, w := range warnings {
a.addWarning(w)
}
}

// addInfo collects the info annotation i.
//
// addInfo is safe to call from multiple goroutines.
func (a *annotationAccumulator) addInfo(i string) {
// We use LoadOrStore here to add the annotation if it doesn't already exist or otherwise do nothing.
a.infos.LoadOrStore(i, struct{}{})
}

// addInfos collects all of the info annotations in infos.
//
// addInfo is safe to call from multiple goroutines.
func (a *annotationAccumulator) addInfos(infos []string) {
for _, i := range infos {
a.addInfo(i)
}
}

// getAll returns all annotations collected by this accumulator.
//
// getAll may return inconsistent or unexpected results if it is called concurrently with addInfo or addWarning.
func (a *annotationAccumulator) getAll() (warnings, infos []string) {
return getAllKeys(a.warnings), getAllKeys(a.infos)
}

func getAllKeys(m *sync.Map) []string {
var keys []string

m.Range(func(k, v interface{}) bool {
keys = append(keys, k.(string))
return true
})

return keys
}

// removeDuplicates removes duplicate entries from s.
//
// s may be modified and should not be used after removeDuplicates returns.
func removeDuplicates(s []string) []string {
slices.Sort(s)
return slices.Compact(s)
}
3 changes: 3 additions & 0 deletions pkg/frontend/querymiddleware/querysharding_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,9 @@ func approximatelyEquals(t *testing.T, a, b *PrometheusResponse) {
compareExpectedAndActual(t, expected.TimestampMs, actual.TimestampMs, expected.Histogram.Sum, actual.Histogram.Sum, j, a.Labels, "histogram", 1e-12)
}
}

require.ElementsMatch(t, a.Infos, b.Infos, "expected same info annotations")
require.ElementsMatch(t, a.Warnings, b.Warnings, "expected same info annotations")
}

func compareExpectedAndActual(t *testing.T, expectedTs, actualTs int64, expectedVal, actualVal float64, j int, labels []mimirpb.LabelAdapter, sampleType string, tolerance float64) {
Expand Down
49 changes: 28 additions & 21 deletions pkg/frontend/querymiddleware/sharded_queryable.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,25 +34,27 @@ var (

// shardedQueryable is an implementor of the Queryable interface.
type shardedQueryable struct {
req MetricsQueryRequest
handler MetricsQueryHandler
responseHeaders *responseHeadersTracker
req MetricsQueryRequest
annotationAccumulator *annotationAccumulator
handler MetricsQueryHandler
responseHeaders *responseHeadersTracker
}

// newShardedQueryable makes a new shardedQueryable. We expect a new queryable is created for each
// query, otherwise the response headers tracker doesn't work as expected, because it merges the
// headers for all queries run through the queryable and never reset them.
func newShardedQueryable(req MetricsQueryRequest, next MetricsQueryHandler) *shardedQueryable {
func newShardedQueryable(req MetricsQueryRequest, annotationAccumulator *annotationAccumulator, next MetricsQueryHandler) *shardedQueryable {
return &shardedQueryable{
req: req,
handler: next,
responseHeaders: newResponseHeadersTracker(),
req: req,
annotationAccumulator: annotationAccumulator,
handler: next,
responseHeaders: newResponseHeadersTracker(),
}
}

// Querier implements storage.Queryable.
func (q *shardedQueryable) Querier(_, _ int64) (storage.Querier, error) {
return &shardedQuerier{req: q.req, handler: q.handler, responseHeaders: q.responseHeaders}, nil
return &shardedQuerier{req: q.req, annotationAccumulator: q.annotationAccumulator, handler: q.handler, responseHeaders: q.responseHeaders}, nil
}

// getResponseHeaders returns the merged response headers received by the downstream
Expand All @@ -65,8 +67,9 @@ func (q *shardedQueryable) getResponseHeaders() []*PrometheusHeader {
// from the astmapper.EmbeddedQueriesMetricName metric label value and concurrently run embedded queries
// through the downstream handler.
type shardedQuerier struct {
req MetricsQueryRequest
handler MetricsQueryHandler
req MetricsQueryRequest
annotationAccumulator *annotationAccumulator
handler MetricsQueryHandler

// Keep track of response headers received when running embedded queries.
responseHeaders *responseHeadersTracker
Expand Down Expand Up @@ -119,13 +122,20 @@ func (q *shardedQuerier) handleEmbeddedQueries(ctx context.Context, queries []st
return err
}

resStreams, err := responseToSamples(resp)
promRes, ok := resp.(*PrometheusResponse)
if !ok {
return errors.Errorf("error invalid response type: %T, expected: %T", resp, &PrometheusResponse{})
}
resStreams, err := responseToSamples(promRes)
if err != nil {
return err
}
streams[idx] = resStreams // No mutex is needed since each job writes its own index. This is like writing separate variables.

q.responseHeaders.mergeHeaders(resp.(*PrometheusResponse).Headers)
q.annotationAccumulator.addInfos(promRes.Infos)
q.annotationAccumulator.addWarnings(promRes.Warnings)

return nil
})

Expand Down Expand Up @@ -289,25 +299,22 @@ func newSeriesSetFromEmbeddedQueriesResults(results [][]SampleStream, hints *sto
}

// responseToSamples is needed to map back from api response to the underlying series data
func responseToSamples(resp Response) ([]SampleStream, error) {
promRes, ok := resp.(*PrometheusResponse)
if !ok {
return nil, errors.Errorf("error invalid response type: %T, expected: %T", resp, &PrometheusResponse{})
func responseToSamples(resp *PrometheusResponse) ([]SampleStream, error) {
if resp.Error != "" {
return nil, errors.New(resp.Error)
}
if promRes.Error != "" {
return nil, errors.New(promRes.Error)
}
switch promRes.Data.ResultType {

switch resp.Data.ResultType {
case string(parser.ValueTypeString),
string(parser.ValueTypeScalar),
string(parser.ValueTypeVector),
string(parser.ValueTypeMatrix):
return promRes.Data.Result, nil
return resp.Data.Result, nil
}

return nil, errors.Errorf(
"Invalid promql.Value type: [%s]. Only %s, %s, %s and %s supported",
promRes.Data.ResultType,
resp.Data.ResultType,
parser.ValueTypeString,
parser.ValueTypeScalar,
parser.ValueTypeVector,
Expand Down
2 changes: 1 addition & 1 deletion pkg/frontend/querymiddleware/sharded_queryable_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -257,7 +257,7 @@ func TestShardedQuerier_Select_ShouldConcurrentlyRunEmbeddedQueries(t *testing.T
}

func TestShardedQueryable_GetResponseHeaders(t *testing.T) {
queryable := newShardedQueryable(&PrometheusRangeQueryRequest{}, nil)
queryable := newShardedQueryable(&PrometheusRangeQueryRequest{}, nil, nil)
assert.Empty(t, queryable.getResponseHeaders())

// Merge some response headers from the 1st querier.
Expand Down
12 changes: 11 additions & 1 deletion pkg/frontend/querymiddleware/split_by_instant_interval.go
Original file line number Diff line number Diff line change
Expand Up @@ -179,7 +179,9 @@ func (s *splitInstantQueryByIntervalMiddleware) Do(ctx context.Context, req Metr
if err != nil {
return nil, err
}
shardedQueryable := newShardedQueryable(req, s.next)

annotationAccumulator := newAnnotationAccumulator()
shardedQueryable := newShardedQueryable(req, annotationAccumulator, s.next)

qry, err := newQuery(ctx, req, s.engine, lazyquery.NewLazyQueryable(shardedQueryable))
if err != nil {
Expand All @@ -194,6 +196,14 @@ func (s *splitInstantQueryByIntervalMiddleware) Do(ctx context.Context, req Metr
return nil, mapEngineError(err)
}
warn, info := res.Warnings.AsStrings("", 0, 0)

// Add any annotations returned by the sharded queries, and remove any duplicates.
accumulatedWarnings, accumulatedInfos := annotationAccumulator.getAll()
warn = append(warn, accumulatedWarnings...)
info = append(info, accumulatedInfos...)
warn = removeDuplicates(warn)
info = removeDuplicates(info)

return &PrometheusResponse{
Status: statusSuccess,
Data: &PrometheusData{
Expand Down

0 comments on commit 8529a0c

Please sign in to comment.