From 8e9fa3107245f13a849f400d0dc956797081168a Mon Sep 17 00:00:00 2001 From: Marco Pracucci Date: Wed, 24 Jan 2024 11:53:29 +0100 Subject: [PATCH] Improve read consistency observability (#7193) * Log read_consistency in 'query stats' even when query details are not available Signed-off-by: Marco Pracucci * Log in a tracing span when the ruler requests strong read consistency Signed-off-by: Marco Pracucci * Add tag to span Signed-off-by: Marco Pracucci * Clarify comment Signed-off-by: Marco Pracucci --------- Signed-off-by: Marco Pracucci --- pkg/frontend/transport/handler.go | 9 ++-- pkg/frontend/transport/handler_test.go | 62 ++++++++++++++++------- pkg/ruler/compat.go | 4 +- pkg/ruler/rule_query_consistency.go | 64 +++++++++++++++--------- pkg/ruler/rule_query_consistency_test.go | 5 +- 5 files changed, 96 insertions(+), 48 deletions(-) diff --git a/pkg/frontend/transport/handler.go b/pkg/frontend/transport/handler.go index be52e38c031..65b84a03105 100644 --- a/pkg/frontend/transport/handler.go +++ b/pkg/frontend/transport/handler.go @@ -301,10 +301,13 @@ func (f *Handler) reportQueryStats(r *http.Request, queryString url.Values, quer "results_cache_hit_bytes", details.ResultsCacheHitBytes, "results_cache_miss_bytes", details.ResultsCacheMissBytes, ) - if consistency, ok := querierapi.ReadConsistencyFromContext(r.Context()); ok { - logMessage = append(logMessage, "read_consistency", consistency) - } } + + // Log the read consistency only when explicitly defined. + if consistency, ok := querierapi.ReadConsistencyFromContext(r.Context()); ok { + logMessage = append(logMessage, "read_consistency", consistency) + } + if len(f.cfg.LogQueryRequestHeaders) != 0 { logMessage = append(logMessage, formatRequestHeaders(&r.Header, f.cfg.LogQueryRequestHeaders)...) } diff --git a/pkg/frontend/transport/handler_test.go b/pkg/frontend/transport/handler_test.go index c30258b784e..3f02a29a621 100644 --- a/pkg/frontend/transport/handler_test.go +++ b/pkg/frontend/transport/handler_test.go @@ -32,6 +32,7 @@ import ( "go.uber.org/atomic" "github.com/grafana/mimir/pkg/frontend/querymiddleware" + "github.com/grafana/mimir/pkg/querier/api" "github.com/grafana/mimir/pkg/util/activitytracker" ) @@ -61,12 +62,13 @@ func TestWriteError(t *testing.T) { func TestHandler_ServeHTTP(t *testing.T) { for _, tt := range []struct { - name string - cfg HandlerConfig - request func() *http.Request - expectedParams url.Values - expectedMetrics int - expectedActivity string + name string + cfg HandlerConfig + request func() *http.Request + expectedParams url.Values + expectedMetrics int + expectedActivity string + expectedReadConsistency string }{ { name: "handler with stats enabled, POST request with params", @@ -85,8 +87,9 @@ func TestHandler_ServeHTTP(t *testing.T) { "query": []string{"some_metric"}, "time": []string{"42"}, }, - expectedMetrics: 5, - expectedActivity: "12345 POST /api/v1/query query=some_metric&time=42", + expectedMetrics: 5, + expectedActivity: "12345 POST /api/v1/query query=some_metric&time=42", + expectedReadConsistency: "", }, { name: "handler with stats enabled, GET request with params", @@ -98,8 +101,24 @@ func TestHandler_ServeHTTP(t *testing.T) { "query": []string{"some_metric"}, "time": []string{"42"}, }, - expectedMetrics: 5, - expectedActivity: "12345 GET /api/v1/query query=some_metric&time=42", + expectedMetrics: 5, + expectedActivity: "12345 GET /api/v1/query query=some_metric&time=42", + expectedReadConsistency: "", + }, + { + name: "handler with stats enabled, GET request with params and read consistency specified", + cfg: HandlerConfig{QueryStatsEnabled: true}, + request: func() *http.Request { + r := httptest.NewRequest("GET", "/api/v1/query?query=some_metric&time=42", nil) + return r.WithContext(api.ContextWithReadConsistency(context.Background(), api.ReadConsistencyStrong)) + }, + expectedParams: url.Values{ + "query": []string{"some_metric"}, + "time": []string{"42"}, + }, + expectedMetrics: 5, + expectedActivity: "12345 GET /api/v1/query query=some_metric&time=42", + expectedReadConsistency: api.ReadConsistencyStrong, }, { name: "handler with stats enabled, GET request without params", @@ -107,9 +126,10 @@ func TestHandler_ServeHTTP(t *testing.T) { request: func() *http.Request { return httptest.NewRequest("GET", "/api/v1/query", nil) }, - expectedParams: url.Values{}, - expectedMetrics: 5, - expectedActivity: "12345 GET /api/v1/query (no params)", + expectedParams: url.Values{}, + expectedMetrics: 5, + expectedActivity: "12345 GET /api/v1/query (no params)", + expectedReadConsistency: "", }, { name: "handler with stats disabled, GET request with params", @@ -121,8 +141,9 @@ func TestHandler_ServeHTTP(t *testing.T) { "query": []string{"some_metric"}, "time": []string{"42"}, }, - expectedMetrics: 0, - expectedActivity: "12345 GET /api/v1/query query=some_metric&time=42", + expectedMetrics: 0, + expectedActivity: "12345 GET /api/v1/query query=some_metric&time=42", + expectedReadConsistency: "", }, } { t.Run(tt.name, func(t *testing.T) { @@ -152,7 +173,8 @@ func TestHandler_ServeHTTP(t *testing.T) { logger := &testLogger{} handler := NewHandler(tt.cfg, roundTripper, logger, reg, at) - req := tt.request().WithContext(user.InjectOrgID(context.Background(), "12345")) + req := tt.request() + req = req.WithContext(user.InjectOrgID(req.Context(), "12345")) resp := httptest.NewRecorder() handler.ServeHTTP(resp, req) @@ -179,7 +201,6 @@ func TestHandler_ServeHTTP(t *testing.T) { require.Len(t, logger.logMessages, 1) msg := logger.logMessages[0] - require.Len(t, msg, 21+len(tt.expectedParams)) require.Equal(t, level.InfoValue(), msg["level"]) require.Equal(t, "query stats", msg["msg"]) require.Equal(t, "query-frontend", msg["component"]) @@ -199,6 +220,13 @@ func TestHandler_ServeHTTP(t *testing.T) { require.EqualValues(t, 0, msg["split_queries"]) require.EqualValues(t, 0, msg["estimated_series_count"]) require.EqualValues(t, 0, msg["queue_time_seconds"]) + + if tt.expectedReadConsistency != "" { + require.Equal(t, tt.expectedReadConsistency, msg["read_consistency"]) + } else { + _, ok := msg["read_consistency"] + require.False(t, ok) + } } else { require.Empty(t, logger.logMessages) } diff --git a/pkg/ruler/compat.go b/pkg/ruler/compat.go index 58a170bd6ab..4972270fcf8 100644 --- a/pkg/ruler/compat.go +++ b/pkg/ruler/compat.go @@ -306,12 +306,12 @@ func DefaultTenantManagerFactory( } // Wrap the query function with our custom logic. - wrappedQueryFunc := WrapQueryFuncWithReadConsistency(queryFunc) + wrappedQueryFunc := WrapQueryFuncWithReadConsistency(queryFunc, logger) wrappedQueryFunc = MetricsQueryFunc(wrappedQueryFunc, totalQueries, failedQueries) wrappedQueryFunc = RecordAndReportRuleQueryMetrics(wrappedQueryFunc, queryTime, zeroFetchedSeriesCount, logger) // Wrap the queryable with our custom logic. - wrappedQueryable := WrapQueryableWithReadConsistency(queryable) + wrappedQueryable := WrapQueryableWithReadConsistency(queryable, logger) return rules.NewManager(&rules.ManagerOptions{ Appendable: NewPusherAppendable(p, userID, totalWrites, failedWrites), diff --git a/pkg/ruler/rule_query_consistency.go b/pkg/ruler/rule_query_consistency.go index 7535fae795c..28b71fad39c 100644 --- a/pkg/ruler/rule_query_consistency.go +++ b/pkg/ruler/rule_query_consistency.go @@ -4,8 +4,10 @@ package ruler import ( "context" + "fmt" "time" + "github.com/go-kit/log" "github.com/prometheus/prometheus/model/labels" "github.com/prometheus/prometheus/promql" "github.com/prometheus/prometheus/rules" @@ -13,19 +15,27 @@ import ( "github.com/prometheus/prometheus/util/annotations" "github.com/grafana/mimir/pkg/querier/api" + "github.com/grafana/mimir/pkg/util/spanlogger" ) +const alertForStateMetricName = "ALERTS_FOR_STATE" + // WrapQueryFuncWithReadConsistency wraps rules.QueryFunc with a function that injects strong read consistency // requirement in the context if the query is originated from a rule which depends on other rules in the same // rule group. -func WrapQueryFuncWithReadConsistency(fn rules.QueryFunc) rules.QueryFunc { +func WrapQueryFuncWithReadConsistency(fn rules.QueryFunc, logger log.Logger) rules.QueryFunc { return func(ctx context.Context, qs string, t time.Time) (promql.Vector, error) { + // Get details about the rule. detail := rules.FromOriginContext(ctx) // If the rule has dependencies then we should enforce strong read consistency, - // otherwise we'll fallback to the per-tenant default. + // otherwise we leave it empty to have Mimir falling back to the per-tenant default. if !detail.NoDependencyRules { + spanLog := spanlogger.FromContext(ctx, logger) + spanLog.SetTag("read_consistency", api.ReadConsistencyStrong) + spanLog.DebugLog("msg", "forced strong read consistency because the rule depends on other rules in the same rule group") + ctx = api.ContextWithReadConsistency(ctx, api.ReadConsistencyStrong) } @@ -41,12 +51,16 @@ func WrapQueryFuncWithReadConsistency(fn rules.QueryFunc) rules.QueryFunc { // // When querying the ALERTS_FOR_STATE, ruler requires strong consistency in order to ensure we restore the state // from the last evaluation. Without such guarantee, the ruler may query a previous state. -func WrapQueryableWithReadConsistency(q storage.Queryable) storage.Queryable { - return &readConsistencyQueryable{next: q} +func WrapQueryableWithReadConsistency(q storage.Queryable, logger log.Logger) storage.Queryable { + return &readConsistencyQueryable{ + next: q, + logger: logger, + } } type readConsistencyQueryable struct { - next storage.Queryable + next storage.Queryable + logger log.Logger } // Querier implements storage.Queryable. @@ -56,42 +70,31 @@ func (q *readConsistencyQueryable) Querier(mint, maxt int64) (storage.Querier, e return querier, err } - return &readConsistencyQuerier{next: querier}, nil + return &readConsistencyQuerier{next: querier, logger: q.logger}, nil } type readConsistencyQuerier struct { - next storage.Querier + next storage.Querier + logger log.Logger } // Select implements storage.Querier. func (q *readConsistencyQuerier) Select(ctx context.Context, sortSeries bool, hints *storage.SelectHints, matchers ...*labels.Matcher) storage.SeriesSet { - // Enforce strong read consistency if it's querying the ALERTS_FOR_STATE metric, otherwise - // fallback to the default. - if isQueryingAlertsForStateMetric("", matchers...) { - ctx = api.ContextWithReadConsistency(ctx, api.ReadConsistencyStrong) - } + ctx = forceStrongReadConsistencyIfQueryingAlertsForStateMetric(ctx, matchers, q.logger) return q.next.Select(ctx, sortSeries, hints, matchers...) } // LabelValues implements storage.Querier. func (q *readConsistencyQuerier) LabelValues(ctx context.Context, name string, matchers ...*labels.Matcher) ([]string, annotations.Annotations, error) { - // Enforce strong read consistency if it's querying the ALERTS_FOR_STATE metric, otherwise - // fallback to the default. - if isQueryingAlertsForStateMetric(name, matchers...) { - ctx = api.ContextWithReadConsistency(ctx, api.ReadConsistencyStrong) - } + ctx = forceStrongReadConsistencyIfQueryingAlertsForStateMetric(ctx, matchers, q.logger) return q.next.LabelValues(ctx, name, matchers...) } // LabelNames implements storage.Querier. func (q *readConsistencyQuerier) LabelNames(ctx context.Context, matchers ...*labels.Matcher) ([]string, annotations.Annotations, error) { - // Enforce strong read consistency if it's querying the ALERTS_FOR_STATE metric, otherwise - // fallback to the default. - if isQueryingAlertsForStateMetric("", matchers...) { - ctx = api.ContextWithReadConsistency(ctx, api.ReadConsistencyStrong) - } + ctx = forceStrongReadConsistencyIfQueryingAlertsForStateMetric(ctx, matchers, q.logger) return q.next.LabelNames(ctx, matchers...) } @@ -104,8 +107,6 @@ func (q *readConsistencyQuerier) Close() error { // isQueryingAlertsForStateMetric checks whether the input metricName or matchers match the // ALERTS_FOR_STATE metric. func isQueryingAlertsForStateMetric(metricName string, matchers ...*labels.Matcher) bool { - const alertForStateMetricName = "ALERTS_FOR_STATE" - if metricName == alertForStateMetricName { return true } @@ -118,3 +119,18 @@ func isQueryingAlertsForStateMetric(metricName string, matchers ...*labels.Match return false } + +// forceStrongReadConsistencyIfQueryingAlertsForStateMetric enforces strong read consistency if from matchers we +// detect that the query is querying the ALERTS_FOR_STATE metric, otherwise we leave it empty to have Mimir falling +// back to the per-tenant default. +func forceStrongReadConsistencyIfQueryingAlertsForStateMetric(ctx context.Context, matchers []*labels.Matcher, logger log.Logger) context.Context { + if isQueryingAlertsForStateMetric("", matchers...) { + spanLog := spanlogger.FromContext(ctx, logger) + spanLog.SetTag("read_consistency", api.ReadConsistencyStrong) + spanLog.DebugLog("msg", fmt.Sprintf("forced strong read consistency because %s metric has been queried", alertForStateMetricName)) + + ctx = api.ContextWithReadConsistency(ctx, api.ReadConsistencyStrong) + } + + return ctx +} diff --git a/pkg/ruler/rule_query_consistency_test.go b/pkg/ruler/rule_query_consistency_test.go index 75605c297a5..81f42a1be7e 100644 --- a/pkg/ruler/rule_query_consistency_test.go +++ b/pkg/ruler/rule_query_consistency_test.go @@ -8,6 +8,7 @@ import ( "testing" "time" + "github.com/go-kit/log" "github.com/prometheus/prometheus/model/labels" "github.com/prometheus/prometheus/promql" "github.com/prometheus/prometheus/promql/parser" @@ -26,7 +27,7 @@ func TestWrapQueryFuncWithReadConsistency(t *testing.T) { return promql.Vector{}, nil } - _, _ = WrapQueryFuncWithReadConsistency(orig)(ctx, "", time.Now()) + _, _ = WrapQueryFuncWithReadConsistency(orig, log.NewNopLogger())(ctx, "", time.Now()) return } @@ -75,7 +76,7 @@ func TestWrapQueryableWithReadConsistency(t *testing.T) { return storage.EmptySeriesSet() } - wrappedQueryable := WrapQueryableWithReadConsistency(&storage.MockQueryable{MockQuerier: querier}) + wrappedQueryable := WrapQueryableWithReadConsistency(&storage.MockQueryable{MockQuerier: querier}, log.NewNopLogger()) wrapperQuerier, err := wrappedQueryable.Querier(math.MinInt64, math.MaxInt64) require.NoError(t, err)