Skip to content

Commit

Permalink
Improve read consistency observability (#7193)
Browse files Browse the repository at this point in the history
* Log read_consistency in 'query stats' even when query details are not available

Signed-off-by: Marco Pracucci <[email protected]>

* Log in a tracing span when the ruler requests strong read consistency

Signed-off-by: Marco Pracucci <[email protected]>

* Add tag to span

Signed-off-by: Marco Pracucci <[email protected]>

* Clarify comment

Signed-off-by: Marco Pracucci <[email protected]>

---------

Signed-off-by: Marco Pracucci <[email protected]>
  • Loading branch information
pracucci authored Jan 24, 2024
1 parent 4d69925 commit 8e9fa31
Show file tree
Hide file tree
Showing 5 changed files with 96 additions and 48 deletions.
9 changes: 6 additions & 3 deletions pkg/frontend/transport/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -301,10 +301,13 @@ func (f *Handler) reportQueryStats(r *http.Request, queryString url.Values, quer
"results_cache_hit_bytes", details.ResultsCacheHitBytes,
"results_cache_miss_bytes", details.ResultsCacheMissBytes,
)
if consistency, ok := querierapi.ReadConsistencyFromContext(r.Context()); ok {
logMessage = append(logMessage, "read_consistency", consistency)
}
}

// Log the read consistency only when explicitly defined.
if consistency, ok := querierapi.ReadConsistencyFromContext(r.Context()); ok {
logMessage = append(logMessage, "read_consistency", consistency)
}

if len(f.cfg.LogQueryRequestHeaders) != 0 {
logMessage = append(logMessage, formatRequestHeaders(&r.Header, f.cfg.LogQueryRequestHeaders)...)
}
Expand Down
62 changes: 45 additions & 17 deletions pkg/frontend/transport/handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import (
"go.uber.org/atomic"

"github.com/grafana/mimir/pkg/frontend/querymiddleware"
"github.com/grafana/mimir/pkg/querier/api"
"github.com/grafana/mimir/pkg/util/activitytracker"
)

Expand Down Expand Up @@ -61,12 +62,13 @@ func TestWriteError(t *testing.T) {

func TestHandler_ServeHTTP(t *testing.T) {
for _, tt := range []struct {
name string
cfg HandlerConfig
request func() *http.Request
expectedParams url.Values
expectedMetrics int
expectedActivity string
name string
cfg HandlerConfig
request func() *http.Request
expectedParams url.Values
expectedMetrics int
expectedActivity string
expectedReadConsistency string
}{
{
name: "handler with stats enabled, POST request with params",
Expand All @@ -85,8 +87,9 @@ func TestHandler_ServeHTTP(t *testing.T) {
"query": []string{"some_metric"},
"time": []string{"42"},
},
expectedMetrics: 5,
expectedActivity: "12345 POST /api/v1/query query=some_metric&time=42",
expectedMetrics: 5,
expectedActivity: "12345 POST /api/v1/query query=some_metric&time=42",
expectedReadConsistency: "",
},
{
name: "handler with stats enabled, GET request with params",
Expand All @@ -98,18 +101,35 @@ func TestHandler_ServeHTTP(t *testing.T) {
"query": []string{"some_metric"},
"time": []string{"42"},
},
expectedMetrics: 5,
expectedActivity: "12345 GET /api/v1/query query=some_metric&time=42",
expectedMetrics: 5,
expectedActivity: "12345 GET /api/v1/query query=some_metric&time=42",
expectedReadConsistency: "",
},
{
name: "handler with stats enabled, GET request with params and read consistency specified",
cfg: HandlerConfig{QueryStatsEnabled: true},
request: func() *http.Request {
r := httptest.NewRequest("GET", "/api/v1/query?query=some_metric&time=42", nil)
return r.WithContext(api.ContextWithReadConsistency(context.Background(), api.ReadConsistencyStrong))
},
expectedParams: url.Values{
"query": []string{"some_metric"},
"time": []string{"42"},
},
expectedMetrics: 5,
expectedActivity: "12345 GET /api/v1/query query=some_metric&time=42",
expectedReadConsistency: api.ReadConsistencyStrong,
},
{
name: "handler with stats enabled, GET request without params",
cfg: HandlerConfig{QueryStatsEnabled: true},
request: func() *http.Request {
return httptest.NewRequest("GET", "/api/v1/query", nil)
},
expectedParams: url.Values{},
expectedMetrics: 5,
expectedActivity: "12345 GET /api/v1/query (no params)",
expectedParams: url.Values{},
expectedMetrics: 5,
expectedActivity: "12345 GET /api/v1/query (no params)",
expectedReadConsistency: "",
},
{
name: "handler with stats disabled, GET request with params",
Expand All @@ -121,8 +141,9 @@ func TestHandler_ServeHTTP(t *testing.T) {
"query": []string{"some_metric"},
"time": []string{"42"},
},
expectedMetrics: 0,
expectedActivity: "12345 GET /api/v1/query query=some_metric&time=42",
expectedMetrics: 0,
expectedActivity: "12345 GET /api/v1/query query=some_metric&time=42",
expectedReadConsistency: "",
},
} {
t.Run(tt.name, func(t *testing.T) {
Expand Down Expand Up @@ -152,7 +173,8 @@ func TestHandler_ServeHTTP(t *testing.T) {
logger := &testLogger{}
handler := NewHandler(tt.cfg, roundTripper, logger, reg, at)

req := tt.request().WithContext(user.InjectOrgID(context.Background(), "12345"))
req := tt.request()
req = req.WithContext(user.InjectOrgID(req.Context(), "12345"))
resp := httptest.NewRecorder()

handler.ServeHTTP(resp, req)
Expand All @@ -179,7 +201,6 @@ func TestHandler_ServeHTTP(t *testing.T) {
require.Len(t, logger.logMessages, 1)

msg := logger.logMessages[0]
require.Len(t, msg, 21+len(tt.expectedParams))
require.Equal(t, level.InfoValue(), msg["level"])
require.Equal(t, "query stats", msg["msg"])
require.Equal(t, "query-frontend", msg["component"])
Expand All @@ -199,6 +220,13 @@ func TestHandler_ServeHTTP(t *testing.T) {
require.EqualValues(t, 0, msg["split_queries"])
require.EqualValues(t, 0, msg["estimated_series_count"])
require.EqualValues(t, 0, msg["queue_time_seconds"])

if tt.expectedReadConsistency != "" {
require.Equal(t, tt.expectedReadConsistency, msg["read_consistency"])
} else {
_, ok := msg["read_consistency"]
require.False(t, ok)
}
} else {
require.Empty(t, logger.logMessages)
}
Expand Down
4 changes: 2 additions & 2 deletions pkg/ruler/compat.go
Original file line number Diff line number Diff line change
Expand Up @@ -306,12 +306,12 @@ func DefaultTenantManagerFactory(
}

// Wrap the query function with our custom logic.
wrappedQueryFunc := WrapQueryFuncWithReadConsistency(queryFunc)
wrappedQueryFunc := WrapQueryFuncWithReadConsistency(queryFunc, logger)
wrappedQueryFunc = MetricsQueryFunc(wrappedQueryFunc, totalQueries, failedQueries)
wrappedQueryFunc = RecordAndReportRuleQueryMetrics(wrappedQueryFunc, queryTime, zeroFetchedSeriesCount, logger)

// Wrap the queryable with our custom logic.
wrappedQueryable := WrapQueryableWithReadConsistency(queryable)
wrappedQueryable := WrapQueryableWithReadConsistency(queryable, logger)

return rules.NewManager(&rules.ManagerOptions{
Appendable: NewPusherAppendable(p, userID, totalWrites, failedWrites),
Expand Down
64 changes: 40 additions & 24 deletions pkg/ruler/rule_query_consistency.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,28 +4,38 @@ package ruler

import (
"context"
"fmt"
"time"

"github.com/go-kit/log"
"github.com/prometheus/prometheus/model/labels"
"github.com/prometheus/prometheus/promql"
"github.com/prometheus/prometheus/rules"
"github.com/prometheus/prometheus/storage"
"github.com/prometheus/prometheus/util/annotations"

"github.com/grafana/mimir/pkg/querier/api"
"github.com/grafana/mimir/pkg/util/spanlogger"
)

const alertForStateMetricName = "ALERTS_FOR_STATE"

// WrapQueryFuncWithReadConsistency wraps rules.QueryFunc with a function that injects strong read consistency
// requirement in the context if the query is originated from a rule which depends on other rules in the same
// rule group.
func WrapQueryFuncWithReadConsistency(fn rules.QueryFunc) rules.QueryFunc {
func WrapQueryFuncWithReadConsistency(fn rules.QueryFunc, logger log.Logger) rules.QueryFunc {
return func(ctx context.Context, qs string, t time.Time) (promql.Vector, error) {

// Get details about the rule.
detail := rules.FromOriginContext(ctx)

// If the rule has dependencies then we should enforce strong read consistency,
// otherwise we'll fallback to the per-tenant default.
// otherwise we leave it empty to have Mimir falling back to the per-tenant default.
if !detail.NoDependencyRules {
spanLog := spanlogger.FromContext(ctx, logger)
spanLog.SetTag("read_consistency", api.ReadConsistencyStrong)
spanLog.DebugLog("msg", "forced strong read consistency because the rule depends on other rules in the same rule group")

ctx = api.ContextWithReadConsistency(ctx, api.ReadConsistencyStrong)
}

Expand All @@ -41,12 +51,16 @@ func WrapQueryFuncWithReadConsistency(fn rules.QueryFunc) rules.QueryFunc {
//
// When querying the ALERTS_FOR_STATE, ruler requires strong consistency in order to ensure we restore the state
// from the last evaluation. Without such guarantee, the ruler may query a previous state.
func WrapQueryableWithReadConsistency(q storage.Queryable) storage.Queryable {
return &readConsistencyQueryable{next: q}
func WrapQueryableWithReadConsistency(q storage.Queryable, logger log.Logger) storage.Queryable {
return &readConsistencyQueryable{
next: q,
logger: logger,
}
}

type readConsistencyQueryable struct {
next storage.Queryable
next storage.Queryable
logger log.Logger
}

// Querier implements storage.Queryable.
Expand All @@ -56,42 +70,31 @@ func (q *readConsistencyQueryable) Querier(mint, maxt int64) (storage.Querier, e
return querier, err
}

return &readConsistencyQuerier{next: querier}, nil
return &readConsistencyQuerier{next: querier, logger: q.logger}, nil
}

type readConsistencyQuerier struct {
next storage.Querier
next storage.Querier
logger log.Logger
}

// Select implements storage.Querier.
func (q *readConsistencyQuerier) Select(ctx context.Context, sortSeries bool, hints *storage.SelectHints, matchers ...*labels.Matcher) storage.SeriesSet {
// Enforce strong read consistency if it's querying the ALERTS_FOR_STATE metric, otherwise
// fallback to the default.
if isQueryingAlertsForStateMetric("", matchers...) {
ctx = api.ContextWithReadConsistency(ctx, api.ReadConsistencyStrong)
}
ctx = forceStrongReadConsistencyIfQueryingAlertsForStateMetric(ctx, matchers, q.logger)

return q.next.Select(ctx, sortSeries, hints, matchers...)
}

// LabelValues implements storage.Querier.
func (q *readConsistencyQuerier) LabelValues(ctx context.Context, name string, matchers ...*labels.Matcher) ([]string, annotations.Annotations, error) {
// Enforce strong read consistency if it's querying the ALERTS_FOR_STATE metric, otherwise
// fallback to the default.
if isQueryingAlertsForStateMetric(name, matchers...) {
ctx = api.ContextWithReadConsistency(ctx, api.ReadConsistencyStrong)
}
ctx = forceStrongReadConsistencyIfQueryingAlertsForStateMetric(ctx, matchers, q.logger)

return q.next.LabelValues(ctx, name, matchers...)
}

// LabelNames implements storage.Querier.
func (q *readConsistencyQuerier) LabelNames(ctx context.Context, matchers ...*labels.Matcher) ([]string, annotations.Annotations, error) {
// Enforce strong read consistency if it's querying the ALERTS_FOR_STATE metric, otherwise
// fallback to the default.
if isQueryingAlertsForStateMetric("", matchers...) {
ctx = api.ContextWithReadConsistency(ctx, api.ReadConsistencyStrong)
}
ctx = forceStrongReadConsistencyIfQueryingAlertsForStateMetric(ctx, matchers, q.logger)

return q.next.LabelNames(ctx, matchers...)
}
Expand All @@ -104,8 +107,6 @@ func (q *readConsistencyQuerier) Close() error {
// isQueryingAlertsForStateMetric checks whether the input metricName or matchers match the
// ALERTS_FOR_STATE metric.
func isQueryingAlertsForStateMetric(metricName string, matchers ...*labels.Matcher) bool {
const alertForStateMetricName = "ALERTS_FOR_STATE"

if metricName == alertForStateMetricName {
return true
}
Expand All @@ -118,3 +119,18 @@ func isQueryingAlertsForStateMetric(metricName string, matchers ...*labels.Match

return false
}

// forceStrongReadConsistencyIfQueryingAlertsForStateMetric enforces strong read consistency if from matchers we
// detect that the query is querying the ALERTS_FOR_STATE metric, otherwise we leave it empty to have Mimir falling
// back to the per-tenant default.
func forceStrongReadConsistencyIfQueryingAlertsForStateMetric(ctx context.Context, matchers []*labels.Matcher, logger log.Logger) context.Context {
if isQueryingAlertsForStateMetric("", matchers...) {
spanLog := spanlogger.FromContext(ctx, logger)
spanLog.SetTag("read_consistency", api.ReadConsistencyStrong)
spanLog.DebugLog("msg", fmt.Sprintf("forced strong read consistency because %s metric has been queried", alertForStateMetricName))

ctx = api.ContextWithReadConsistency(ctx, api.ReadConsistencyStrong)
}

return ctx
}
5 changes: 3 additions & 2 deletions pkg/ruler/rule_query_consistency_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"testing"
"time"

"github.com/go-kit/log"
"github.com/prometheus/prometheus/model/labels"
"github.com/prometheus/prometheus/promql"
"github.com/prometheus/prometheus/promql/parser"
Expand All @@ -26,7 +27,7 @@ func TestWrapQueryFuncWithReadConsistency(t *testing.T) {
return promql.Vector{}, nil
}

_, _ = WrapQueryFuncWithReadConsistency(orig)(ctx, "", time.Now())
_, _ = WrapQueryFuncWithReadConsistency(orig, log.NewNopLogger())(ctx, "", time.Now())
return
}

Expand Down Expand Up @@ -75,7 +76,7 @@ func TestWrapQueryableWithReadConsistency(t *testing.T) {
return storage.EmptySeriesSet()
}

wrappedQueryable := WrapQueryableWithReadConsistency(&storage.MockQueryable{MockQuerier: querier})
wrappedQueryable := WrapQueryableWithReadConsistency(&storage.MockQueryable{MockQuerier: querier}, log.NewNopLogger())
wrapperQuerier, err := wrappedQueryable.Querier(math.MinInt64, math.MaxInt64)
require.NoError(t, err)

Expand Down

0 comments on commit 8e9fa31

Please sign in to comment.