Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve read consistency observability #7193

Merged
merged 4 commits into from
Jan 24, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 6 additions & 3 deletions pkg/frontend/transport/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -301,10 +301,13 @@ func (f *Handler) reportQueryStats(r *http.Request, queryString url.Values, quer
"results_cache_hit_bytes", details.ResultsCacheHitBytes,
"results_cache_miss_bytes", details.ResultsCacheMissBytes,
)
if consistency, ok := querierapi.ReadConsistencyFromContext(r.Context()); ok {
logMessage = append(logMessage, "read_consistency", consistency)
}
}

// Log the read consistency only when explicitly defined.
if consistency, ok := querierapi.ReadConsistencyFromContext(r.Context()); ok {
logMessage = append(logMessage, "read_consistency", consistency)
}

if len(f.cfg.LogQueryRequestHeaders) != 0 {
logMessage = append(logMessage, formatRequestHeaders(&r.Header, f.cfg.LogQueryRequestHeaders)...)
}
Expand Down
62 changes: 45 additions & 17 deletions pkg/frontend/transport/handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import (
"go.uber.org/atomic"

"github.com/grafana/mimir/pkg/frontend/querymiddleware"
"github.com/grafana/mimir/pkg/querier/api"
"github.com/grafana/mimir/pkg/util/activitytracker"
)

Expand Down Expand Up @@ -61,12 +62,13 @@ func TestWriteError(t *testing.T) {

func TestHandler_ServeHTTP(t *testing.T) {
for _, tt := range []struct {
name string
cfg HandlerConfig
request func() *http.Request
expectedParams url.Values
expectedMetrics int
expectedActivity string
name string
cfg HandlerConfig
request func() *http.Request
expectedParams url.Values
expectedMetrics int
expectedActivity string
expectedReadConsistency string
}{
{
name: "handler with stats enabled, POST request with params",
Expand All @@ -85,8 +87,9 @@ func TestHandler_ServeHTTP(t *testing.T) {
"query": []string{"some_metric"},
"time": []string{"42"},
},
expectedMetrics: 5,
expectedActivity: "12345 POST /api/v1/query query=some_metric&time=42",
expectedMetrics: 5,
expectedActivity: "12345 POST /api/v1/query query=some_metric&time=42",
expectedReadConsistency: "",
},
{
name: "handler with stats enabled, GET request with params",
Expand All @@ -98,18 +101,35 @@ func TestHandler_ServeHTTP(t *testing.T) {
"query": []string{"some_metric"},
"time": []string{"42"},
},
expectedMetrics: 5,
expectedActivity: "12345 GET /api/v1/query query=some_metric&time=42",
expectedMetrics: 5,
expectedActivity: "12345 GET /api/v1/query query=some_metric&time=42",
expectedReadConsistency: "",
},
{
name: "handler with stats enabled, GET request with params and read consistency specified",
cfg: HandlerConfig{QueryStatsEnabled: true},
request: func() *http.Request {
r := httptest.NewRequest("GET", "/api/v1/query?query=some_metric&time=42", nil)
return r.WithContext(api.ContextWithReadConsistency(context.Background(), api.ReadConsistencyStrong))
},
expectedParams: url.Values{
"query": []string{"some_metric"},
"time": []string{"42"},
},
expectedMetrics: 5,
expectedActivity: "12345 GET /api/v1/query query=some_metric&time=42",
expectedReadConsistency: api.ReadConsistencyStrong,
},
{
name: "handler with stats enabled, GET request without params",
cfg: HandlerConfig{QueryStatsEnabled: true},
request: func() *http.Request {
return httptest.NewRequest("GET", "/api/v1/query", nil)
},
expectedParams: url.Values{},
expectedMetrics: 5,
expectedActivity: "12345 GET /api/v1/query (no params)",
expectedParams: url.Values{},
expectedMetrics: 5,
expectedActivity: "12345 GET /api/v1/query (no params)",
expectedReadConsistency: "",
},
{
name: "handler with stats disabled, GET request with params",
Expand All @@ -121,8 +141,9 @@ func TestHandler_ServeHTTP(t *testing.T) {
"query": []string{"some_metric"},
"time": []string{"42"},
},
expectedMetrics: 0,
expectedActivity: "12345 GET /api/v1/query query=some_metric&time=42",
expectedMetrics: 0,
expectedActivity: "12345 GET /api/v1/query query=some_metric&time=42",
expectedReadConsistency: "",
},
} {
t.Run(tt.name, func(t *testing.T) {
Expand Down Expand Up @@ -152,7 +173,8 @@ func TestHandler_ServeHTTP(t *testing.T) {
logger := &testLogger{}
handler := NewHandler(tt.cfg, roundTripper, logger, reg, at)

req := tt.request().WithContext(user.InjectOrgID(context.Background(), "12345"))
req := tt.request()
req = req.WithContext(user.InjectOrgID(req.Context(), "12345"))
resp := httptest.NewRecorder()

handler.ServeHTTP(resp, req)
Expand All @@ -179,7 +201,6 @@ func TestHandler_ServeHTTP(t *testing.T) {
require.Len(t, logger.logMessages, 1)

msg := logger.logMessages[0]
require.Len(t, msg, 21+len(tt.expectedParams))
require.Equal(t, level.InfoValue(), msg["level"])
require.Equal(t, "query stats", msg["msg"])
require.Equal(t, "query-frontend", msg["component"])
Expand All @@ -199,6 +220,13 @@ func TestHandler_ServeHTTP(t *testing.T) {
require.EqualValues(t, 0, msg["split_queries"])
require.EqualValues(t, 0, msg["estimated_series_count"])
require.EqualValues(t, 0, msg["queue_time_seconds"])

if tt.expectedReadConsistency != "" {
require.Equal(t, tt.expectedReadConsistency, msg["read_consistency"])
} else {
_, ok := msg["read_consistency"]
require.False(t, ok)
}
} else {
require.Empty(t, logger.logMessages)
}
Expand Down
4 changes: 2 additions & 2 deletions pkg/ruler/compat.go
Original file line number Diff line number Diff line change
Expand Up @@ -306,12 +306,12 @@ func DefaultTenantManagerFactory(
}

// Wrap the query function with our custom logic.
wrappedQueryFunc := WrapQueryFuncWithReadConsistency(queryFunc)
wrappedQueryFunc := WrapQueryFuncWithReadConsistency(queryFunc, logger)
wrappedQueryFunc = MetricsQueryFunc(wrappedQueryFunc, totalQueries, failedQueries)
wrappedQueryFunc = RecordAndReportRuleQueryMetrics(wrappedQueryFunc, queryTime, zeroFetchedSeriesCount, logger)

// Wrap the queryable with our custom logic.
wrappedQueryable := WrapQueryableWithReadConsistency(queryable)
wrappedQueryable := WrapQueryableWithReadConsistency(queryable, logger)

return rules.NewManager(&rules.ManagerOptions{
Appendable: NewPusherAppendable(p, userID, totalWrites, failedWrites),
Expand Down
64 changes: 40 additions & 24 deletions pkg/ruler/rule_query_consistency.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,28 +4,38 @@ package ruler

import (
"context"
"fmt"
"time"

"github.com/go-kit/log"
"github.com/prometheus/prometheus/model/labels"
"github.com/prometheus/prometheus/promql"
"github.com/prometheus/prometheus/rules"
"github.com/prometheus/prometheus/storage"
"github.com/prometheus/prometheus/util/annotations"

"github.com/grafana/mimir/pkg/querier/api"
"github.com/grafana/mimir/pkg/util/spanlogger"
)

const alertForStateMetricName = "ALERTS_FOR_STATE"

// WrapQueryFuncWithReadConsistency wraps rules.QueryFunc with a function that injects strong read consistency
// requirement in the context if the query is originated from a rule which depends on other rules in the same
// rule group.
func WrapQueryFuncWithReadConsistency(fn rules.QueryFunc) rules.QueryFunc {
func WrapQueryFuncWithReadConsistency(fn rules.QueryFunc, logger log.Logger) rules.QueryFunc {
return func(ctx context.Context, qs string, t time.Time) (promql.Vector, error) {

// Get details about the rule.
detail := rules.FromOriginContext(ctx)

// If the rule has dependencies then we should enforce strong read consistency,
// otherwise we'll fallback to the per-tenant default.
// otherwise we leave it empty to have Mimir falling back to the per-tenant default.
if !detail.NoDependencyRules {
spanLog := spanlogger.FromContext(ctx, logger)
spanLog.SetTag("read_consistency", api.ReadConsistencyStrong)
spanLog.DebugLog("msg", "forced strong read consistency because the rule depends on other rules in the same rule group")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would adding a tag / attribute to the span help so that we could query by it?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is e3525fb what you had in mind?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Exactly, thanks!


ctx = api.ContextWithReadConsistency(ctx, api.ReadConsistencyStrong)
}

Expand All @@ -41,12 +51,16 @@ func WrapQueryFuncWithReadConsistency(fn rules.QueryFunc) rules.QueryFunc {
//
// When querying the ALERTS_FOR_STATE, ruler requires strong consistency in order to ensure we restore the state
// from the last evaluation. Without such guarantee, the ruler may query a previous state.
func WrapQueryableWithReadConsistency(q storage.Queryable) storage.Queryable {
return &readConsistencyQueryable{next: q}
func WrapQueryableWithReadConsistency(q storage.Queryable, logger log.Logger) storage.Queryable {
return &readConsistencyQueryable{
next: q,
logger: logger,
}
}

type readConsistencyQueryable struct {
next storage.Queryable
next storage.Queryable
logger log.Logger
}

// Querier implements storage.Queryable.
Expand All @@ -56,42 +70,31 @@ func (q *readConsistencyQueryable) Querier(mint, maxt int64) (storage.Querier, e
return querier, err
}

return &readConsistencyQuerier{next: querier}, nil
return &readConsistencyQuerier{next: querier, logger: q.logger}, nil
}

type readConsistencyQuerier struct {
next storage.Querier
next storage.Querier
logger log.Logger
}

// Select implements storage.Querier.
func (q *readConsistencyQuerier) Select(ctx context.Context, sortSeries bool, hints *storage.SelectHints, matchers ...*labels.Matcher) storage.SeriesSet {
// Enforce strong read consistency if it's querying the ALERTS_FOR_STATE metric, otherwise
// fallback to the default.
if isQueryingAlertsForStateMetric("", matchers...) {
ctx = api.ContextWithReadConsistency(ctx, api.ReadConsistencyStrong)
}
ctx = forceStrongReadConsistencyIfQueryingAlertsForStateMetric(ctx, matchers, q.logger)

return q.next.Select(ctx, sortSeries, hints, matchers...)
}

// LabelValues implements storage.Querier.
func (q *readConsistencyQuerier) LabelValues(ctx context.Context, name string, matchers ...*labels.Matcher) ([]string, annotations.Annotations, error) {
// Enforce strong read consistency if it's querying the ALERTS_FOR_STATE metric, otherwise
// fallback to the default.
if isQueryingAlertsForStateMetric(name, matchers...) {
ctx = api.ContextWithReadConsistency(ctx, api.ReadConsistencyStrong)
}
ctx = forceStrongReadConsistencyIfQueryingAlertsForStateMetric(ctx, matchers, q.logger)

return q.next.LabelValues(ctx, name, matchers...)
}

// LabelNames implements storage.Querier.
func (q *readConsistencyQuerier) LabelNames(ctx context.Context, matchers ...*labels.Matcher) ([]string, annotations.Annotations, error) {
// Enforce strong read consistency if it's querying the ALERTS_FOR_STATE metric, otherwise
// fallback to the default.
if isQueryingAlertsForStateMetric("", matchers...) {
ctx = api.ContextWithReadConsistency(ctx, api.ReadConsistencyStrong)
}
ctx = forceStrongReadConsistencyIfQueryingAlertsForStateMetric(ctx, matchers, q.logger)

return q.next.LabelNames(ctx, matchers...)
}
Expand All @@ -104,8 +107,6 @@ func (q *readConsistencyQuerier) Close() error {
// isQueryingAlertsForStateMetric checks whether the input metricName or matchers match the
// ALERTS_FOR_STATE metric.
func isQueryingAlertsForStateMetric(metricName string, matchers ...*labels.Matcher) bool {
const alertForStateMetricName = "ALERTS_FOR_STATE"

if metricName == alertForStateMetricName {
return true
}
Expand All @@ -118,3 +119,18 @@ func isQueryingAlertsForStateMetric(metricName string, matchers ...*labels.Match

return false
}

// forceStrongReadConsistencyIfQueryingAlertsForStateMetric enforces strong read consistency if from matchers we
// detect that the query is querying the ALERTS_FOR_STATE metric, otherwise we leave it empty to have Mimir falling
// back to the per-tenant default.
func forceStrongReadConsistencyIfQueryingAlertsForStateMetric(ctx context.Context, matchers []*labels.Matcher, logger log.Logger) context.Context {
if isQueryingAlertsForStateMetric("", matchers...) {
spanLog := spanlogger.FromContext(ctx, logger)
spanLog.SetTag("read_consistency", api.ReadConsistencyStrong)
spanLog.DebugLog("msg", fmt.Sprintf("forced strong read consistency because %s metric has been queried", alertForStateMetricName))

ctx = api.ContextWithReadConsistency(ctx, api.ReadConsistencyStrong)
}

return ctx
}
5 changes: 3 additions & 2 deletions pkg/ruler/rule_query_consistency_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"testing"
"time"

"github.com/go-kit/log"
"github.com/prometheus/prometheus/model/labels"
"github.com/prometheus/prometheus/promql"
"github.com/prometheus/prometheus/promql/parser"
Expand All @@ -26,7 +27,7 @@ func TestWrapQueryFuncWithReadConsistency(t *testing.T) {
return promql.Vector{}, nil
}

_, _ = WrapQueryFuncWithReadConsistency(orig)(ctx, "", time.Now())
_, _ = WrapQueryFuncWithReadConsistency(orig, log.NewNopLogger())(ctx, "", time.Now())
return
}

Expand Down Expand Up @@ -75,7 +76,7 @@ func TestWrapQueryableWithReadConsistency(t *testing.T) {
return storage.EmptySeriesSet()
}

wrappedQueryable := WrapQueryableWithReadConsistency(&storage.MockQueryable{MockQuerier: querier})
wrappedQueryable := WrapQueryableWithReadConsistency(&storage.MockQueryable{MockQuerier: querier}, log.NewNopLogger())
wrapperQuerier, err := wrappedQueryable.Querier(math.MinInt64, math.MaxInt64)
require.NoError(t, err)

Expand Down
Loading