diff --git a/cmd/thanos/config.go b/cmd/thanos/config.go index ec26713aec0..48d459cec95 100644 --- a/cmd/thanos/config.go +++ b/cmd/thanos/config.go @@ -182,6 +182,7 @@ type queryConfig struct { dnsSDInterval time.Duration httpMethod string dnsSDResolver string + step time.Duration } func (qc *queryConfig) registerFlag(cmd extkingpin.FlagClause) *queryConfig { @@ -198,6 +199,8 @@ func (qc *queryConfig) registerFlag(cmd extkingpin.FlagClause) *queryConfig { Default("POST").EnumVar(&qc.httpMethod, "GET", "POST") cmd.Flag("query.sd-dns-resolver", "Resolver to use. Possible options: [golang, miekgdns]"). Default("golang").Hidden().StringVar(&qc.dnsSDResolver) + cmd.Flag("query.default-step", "Default range query step to use. This is only used in stateless Ruler and alert state restoration."). + Default("1s").DurationVar(&qc.step) return qc } diff --git a/cmd/thanos/rule.go b/cmd/thanos/rule.go index 3e803be31de..ef624e6107c 100644 --- a/cmd/thanos/rule.go +++ b/cmd/thanos/rule.go @@ -85,12 +85,15 @@ type ruleConfig struct { rwConfig *extflag.PathOrContent - resendDelay time.Duration - evalInterval time.Duration - ruleFiles []string - objStoreConfig *extflag.PathOrContent - dataDir string - lset labels.Labels + resendDelay time.Duration + evalInterval time.Duration + outageTolerance time.Duration + forGracePeriod time.Duration + ruleFiles []string + objStoreConfig *extflag.PathOrContent + dataDir string + lset labels.Labels + ignoredLabelNames []string } func (rc *ruleConfig) registerFlag(cmd extkingpin.FlagClause) { @@ -126,6 +129,12 @@ func registerRule(app *extkingpin.App) { Default("1m").DurationVar(&conf.resendDelay) cmd.Flag("eval-interval", "The default evaluation interval to use."). Default("1m").DurationVar(&conf.evalInterval) + cmd.Flag("for-outage-tolerance", "Max time to tolerate prometheus outage for restoring \"for\" state of alert."). + Default("1h").DurationVar(&conf.outageTolerance) + cmd.Flag("for-grace-period", "Minimum duration between alert and restored \"for\" state. This is maintained only for alerts with configured \"for\" time greater than grace period."). + Default("10m").DurationVar(&conf.forGracePeriod) + cmd.Flag("restore-ignored-label", "Label names to be ignored when restoring alerts from the remote storage. This is only used in stateless mode."). + StringsVar(&conf.ignoredLabelNames) conf.rwConfig = extflag.RegisterPathOrContent(cmd, "remote-write.config", "YAML config for the remote-write configurations, that specify servers where samples should be sent to (see https://prometheus.io/docs/prometheus/latest/configuration/configuration/#remote_write). This automatically enables stateless mode for ruler and no series will be stored in the ruler's TSDB. If an empty config (or file) is provided, the flag is ignored and ruler is run with its own TSDB.", extflag.WithEnvSubstitution()) @@ -321,7 +330,10 @@ func runRule( extprom.WrapRegistererWithPrefix("thanos_rule_query_apis_", reg), dns.ResolverType(conf.query.dnsSDResolver), ) - var queryClients []*httpconfig.Client + var ( + queryClients []*httpconfig.Client + promClients []*promclient.Client + ) queryClientMetrics := extpromhttp.NewClientMetrics(extprom.WrapRegistererWith(prometheus.Labels{"client": "query"}, reg)) for _, cfg := range queryCfg { cfg.HTTPClientConfig.ClientMetrics = queryClientMetrics @@ -335,6 +347,7 @@ func runRule( return err } queryClients = append(queryClients, queryClient) + promClients = append(promClients, promclient.NewClient(queryClient, logger, "thanos-rule")) // Discover and resolve query addresses. addDiscoveryGroups(g, queryClient, conf.query.dnsSDInterval) } @@ -377,7 +390,8 @@ func runRule( } fanoutStore := storage.NewFanout(logger, agentDB, remoteStore) appendable = fanoutStore - queryable = fanoutStore + // Use a separate queryable to restore the ALERTS firing states. + queryable = thanosrules.NewPromClientsQueryable(logger, queryClients, promClients, conf.query.httpMethod, conf.query.step, conf.ignoredLabelNames) } else { tsdbDB, err = tsdb.Open(conf.dataDir, log.With(logger, "component", "tsdb"), reg, tsdbOpts, nil) if err != nil { @@ -495,14 +509,16 @@ func runRule( reg, conf.dataDir, rules.ManagerOptions{ - NotifyFunc: notifyFunc, - Logger: logger, - Appendable: appendable, - ExternalURL: nil, - Queryable: queryable, - ResendDelay: conf.resendDelay, + NotifyFunc: notifyFunc, + Logger: logger, + Appendable: appendable, + ExternalURL: nil, + Queryable: queryable, + ResendDelay: conf.resendDelay, + OutageTolerance: conf.outageTolerance, + ForGracePeriod: conf.forGracePeriod, }, - queryFuncCreator(logger, queryClients, metrics.duplicatedQuery, metrics.ruleEvalWarnings, conf.query.httpMethod), + queryFuncCreator(logger, queryClients, promClients, metrics.duplicatedQuery, metrics.ruleEvalWarnings, conf.query.httpMethod), conf.lset, // In our case the querying URL is the external URL because in Prometheus // --web.external-url points to it i.e. it points at something where the user @@ -774,24 +790,10 @@ func labelsTSDBToProm(lset labels.Labels) (res labels.Labels) { return res } -func removeDuplicateQueryEndpoints(logger log.Logger, duplicatedQueriers prometheus.Counter, urls []*url.URL) []*url.URL { - set := make(map[string]struct{}) - deduplicated := make([]*url.URL, 0, len(urls)) - for _, u := range urls { - if _, ok := set[u.String()]; ok { - level.Warn(logger).Log("msg", "duplicate query address is provided", "addr", u.String()) - duplicatedQueriers.Inc() - continue - } - deduplicated = append(deduplicated, u) - set[u.String()] = struct{}{} - } - return deduplicated -} - func queryFuncCreator( logger log.Logger, queriers []*httpconfig.Client, + promClients []*promclient.Client, duplicatedQuery prometheus.Counter, ruleEvalWarnings *prometheus.CounterVec, httpMethod string, @@ -812,15 +814,10 @@ func queryFuncCreator( panic(errors.Errorf("unknown partial response strategy %v", partialResponseStrategy).Error()) } - promClients := make([]*promclient.Client, 0, len(queriers)) - for _, q := range queriers { - promClients = append(promClients, promclient.NewClient(q, logger, "thanos-rule")) - } - return func(ctx context.Context, q string, t time.Time) (promql.Vector, error) { for _, i := range rand.Perm(len(queriers)) { promClient := promClients[i] - endpoints := removeDuplicateQueryEndpoints(logger, duplicatedQuery, queriers[i].Endpoints()) + endpoints := thanosrules.RemoveDuplicateQueryEndpoints(logger, duplicatedQuery, queriers[i].Endpoints()) for _, i := range rand.Perm(len(endpoints)) { span, ctx := tracing.StartSpan(ctx, spanID) v, warns, err := promClient.PromqlQueryInstant(ctx, endpoints[i], q, t, promclient.QueryOptions{ diff --git a/docs/components/rule.md b/docs/components/rule.md index 1badd787e63..5c25020a2af 100644 --- a/docs/components/rule.md +++ b/docs/components/rule.md @@ -311,6 +311,12 @@ Flags: prefix for the regular Alertmanager API path. --data-dir="data/" data directory --eval-interval=1m The default evaluation interval to use. + --for-grace-period=10m Minimum duration between alert and restored + "for" state. This is maintained only for alerts + with configured "for" time greater than grace + period. + --for-outage-tolerance=1h Max time to tolerate prometheus outage for + restoring "for" state of alert. --grpc-address="0.0.0.0:10901" Listen ip:port address for gRPC endpoints (StoreAPI). Make sure this address is routable @@ -385,6 +391,9 @@ Flags: https://thanos.io/tip/components/rule.md/#configuration. If defined, it takes precedence over the '--query' and '--query.sd-files' flags. + --query.default-step=1s Default range query step to use. This is only + used in stateless Ruler and alert state + restoration. --query.http-method=POST HTTP method to use when sending queries. Possible options: [GET, POST] --query.sd-dns-interval=30s @@ -429,6 +438,10 @@ Flags: https://thanos.io/tip/thanos/logging.md/#configuration --resend-delay=1m Minimum amount of time to wait before resending an alert to Alertmanager. + --restore-ignored-label=RESTORE-IGNORED-LABEL ... + Label names to be ignored when restoring alerts + from the remote storage. This is only used in + stateless mode. --rule-file=rules/ ... Rule files that should be used by rule manager. Can be in glob format (repeated). Note that rules are not automatically detected, diff --git a/pkg/promclient/promclient.go b/pkg/promclient/promclient.go index 22e48b41aae..748ebf189b4 100644 --- a/pkg/promclient/promclient.go +++ b/pkg/promclient/promclient.go @@ -780,6 +780,29 @@ func (c *Client) RulesInGRPC(ctx context.Context, base *url.URL, typeRules strin return m.Data.Groups, nil } +// AlertsInGRPC returns the rules from Prometheus alerts API. It uses gRPC errors. +// NOTE: This method is tested in pkg/store/prometheus_test.go against Prometheus. +func (c *Client) AlertsInGRPC(ctx context.Context, base *url.URL) ([]*rulespb.AlertInstance, error) { + u := *base + u.Path = path.Join(u.Path, "/api/v1/alerts") + + var m struct { + Data struct { + Alerts []*rulespb.AlertInstance `json:"alerts"` + } `json:"data"` + } + + if err := c.get2xxResultWithGRPCErrors(ctx, "/prom_alerts HTTP[client]", &u, &m); err != nil { + return nil, err + } + + // Prometheus does not support PartialResponseStrategy, and probably would never do. Make it Abort by default. + for _, g := range m.Data.Alerts { + g.PartialResponseStrategy = storepb.PartialResponseStrategy_ABORT + } + return m.Data.Alerts, nil +} + // MetricMetadataInGRPC returns the metadata from Prometheus metric metadata API. It uses gRPC errors. func (c *Client) MetricMetadataInGRPC(ctx context.Context, base *url.URL, metric string, limit int) (map[string][]metadatapb.Meta, error) { u := *base diff --git a/pkg/rules/queryable.go b/pkg/rules/queryable.go new file mode 100644 index 00000000000..95a2a681cab --- /dev/null +++ b/pkg/rules/queryable.go @@ -0,0 +1,150 @@ +// Copyright (c) The Thanos Authors. +// Licensed under the Apache License 2.0. + +package rules + +import ( + "context" + "math/rand" + "net/url" + "strings" + "time" + + "github.com/cortexproject/cortex/pkg/querier/series" + "github.com/go-kit/log" + "github.com/go-kit/log/level" + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/promauto" + "github.com/prometheus/common/model" + "github.com/prometheus/prometheus/model/labels" + "github.com/prometheus/prometheus/storage" + + "github.com/thanos-io/thanos/pkg/httpconfig" + "github.com/thanos-io/thanos/pkg/promclient" + "github.com/thanos-io/thanos/pkg/store/storepb" +) + +type promClientsQueryable struct { + httpMethod string + step time.Duration + + logger log.Logger + promClients []*promclient.Client + queryClients []*httpconfig.Client + ignoredLabelNames []string + + duplicatedQuery prometheus.Counter +} +type promClientsQuerier struct { + ctx context.Context + mint, maxt int64 + step int64 + httpMethod string + + logger log.Logger + promClients []*promclient.Client + queryClients []*httpconfig.Client + restoreIgnoreLabels []string + + // We use a dummy counter here because the duplicated + // addresses are already tracked by rule evaluation part. + duplicatedQuery prometheus.Counter +} + +// NewPromClientsQueryable creates a queryable that queries queriers from Prometheus clients. +func NewPromClientsQueryable(logger log.Logger, queryClients []*httpconfig.Client, promClients []*promclient.Client, + httpMethod string, step time.Duration, ignoredLabelNames []string) *promClientsQueryable { + return &promClientsQueryable{ + logger: logger, + queryClients: queryClients, + promClients: promClients, + duplicatedQuery: promauto.With(nil).NewCounter(prometheus.CounterOpts{}), + httpMethod: httpMethod, + step: step, + ignoredLabelNames: ignoredLabelNames, + } +} + +// Querier returns a new Querier for the given time range. +func (q *promClientsQueryable) Querier(ctx context.Context, mint, maxt int64) (storage.Querier, error) { + return &promClientsQuerier{ + ctx: ctx, + mint: mint, + maxt: maxt, + step: int64(q.step / time.Second), + httpMethod: q.httpMethod, + logger: q.logger, + queryClients: q.queryClients, + promClients: q.promClients, + restoreIgnoreLabels: q.ignoredLabelNames, + }, nil +} + +// Select implements storage.Querier interface. +func (q *promClientsQuerier) Select(_ bool, _ *storage.SelectHints, matchers ...*labels.Matcher) storage.SeriesSet { + query := storepb.PromMatchersToString(matchers...) + + for _, i := range rand.Perm(len(q.queryClients)) { + promClient := q.promClients[i] + endpoints := RemoveDuplicateQueryEndpoints(q.logger, q.duplicatedQuery, q.queryClients[i].Endpoints()) + for _, i := range rand.Perm(len(endpoints)) { + m, warns, err := promClient.QueryRange(q.ctx, endpoints[i], query, q.mint, q.maxt, q.step, promclient.QueryOptions{ + Deduplicate: true, + Method: q.httpMethod, + }) + + if err != nil { + level.Error(q.logger).Log("err", err, "query", q) + continue + } + if len(warns) > 0 { + level.Warn(q.logger).Log("warnings", strings.Join(warns, ", "), "query", q) + } + matrix := make([]*model.SampleStream, 0, m.Len()) + for _, metric := range m { + for _, label := range q.restoreIgnoreLabels { + delete(metric.Metric, model.LabelName(label)) + } + + matrix = append(matrix, &model.SampleStream{ + Metric: metric.Metric, + Values: metric.Values, + }) + } + + return series.MatrixToSeriesSet(matrix) + } + } + return storage.NoopSeriesSet() +} + +// LabelValues implements storage.LabelQuerier interface. +func (q *promClientsQuerier) LabelValues(name string, matchers ...*labels.Matcher) ([]string, storage.Warnings, error) { + return nil, nil, nil +} + +// LabelNames implements storage.LabelQuerier interface. +func (q *promClientsQuerier) LabelNames(matchers ...*labels.Matcher) ([]string, storage.Warnings, error) { + return nil, nil, nil +} + +// Close implements storage.LabelQuerier interface. +func (q *promClientsQuerier) Close() error { + return nil +} + +// RemoveDuplicateQueryEndpoints removes duplicate endpoints from the list of urls. +func RemoveDuplicateQueryEndpoints(logger log.Logger, duplicatedQueriers prometheus.Counter, urls []*url.URL) []*url.URL { + set := make(map[string]struct{}) + deduplicated := make([]*url.URL, 0, len(urls)) + for _, u := range urls { + if _, ok := set[u.String()]; ok { + level.Warn(logger).Log("msg", "duplicate query address is provided", "addr", u.String()) + duplicatedQueriers.Inc() + continue + } + deduplicated = append(deduplicated, u) + set[u.String()] = struct{}{} + } + return deduplicated +} diff --git a/test/e2e/e2ethanos/services.go b/test/e2e/e2ethanos/services.go index c9f0403dec2..258f8f21b4c 100644 --- a/test/e2e/e2ethanos/services.go +++ b/test/e2e/e2ethanos/services.go @@ -601,11 +601,13 @@ type RulerBuilder struct { f e2e.FutureRunnable - amCfg []alert.AlertmanagerConfig - replicaLabel string - image string - resendDelay string - evalInterval string + amCfg []alert.AlertmanagerConfig + replicaLabel string + image string + resendDelay string + evalInterval string + forGracePeriod string + restoreIgnoredLabels []string } // NewRulerBuilder is a Ruler future that allows extra configuration before initialization. @@ -646,6 +648,16 @@ func (r *RulerBuilder) WithEvalInterval(evalInterval string) *RulerBuilder { return r } +func (r *RulerBuilder) WithForGracePeriod(forGracePeriod string) *RulerBuilder { + r.forGracePeriod = forGracePeriod + return r +} + +func (r *RulerBuilder) WithRestoreIgnoredLabels(labels ...string) *RulerBuilder { + r.restoreIgnoredLabels = labels + return r +} + func (r *RulerBuilder) InitTSDB(internalRuleDir string, queryCfg []httpconfig.Config) *e2emon.InstrumentedRunnable { return r.initRule(internalRuleDir, queryCfg, nil) } @@ -685,6 +697,7 @@ func (r *RulerBuilder) initRule(internalRuleDir string, queryCfg []httpconfig.Co "--query.config": string(queryCfgBytes), "--query.sd-dns-interval": "1s", "--resend-delay": "5s", + "--for-grace-period": "1s", } if r.replicaLabel != "" { ruleArgs["--label"] = fmt.Sprintf(`%s="%s"`, replicaLabel, r.replicaLabel) @@ -698,6 +711,10 @@ func (r *RulerBuilder) initRule(internalRuleDir string, queryCfg []httpconfig.Co ruleArgs["--eval-interval"] = r.evalInterval } + if r.forGracePeriod != "" { + ruleArgs["--for-grace-period"] = r.forGracePeriod + } + if remoteWriteCfg != nil { rwCfgBytes, err := yaml.Marshal(struct { RemoteWriteConfigs []*config.RemoteWriteConfig `yaml:"remote_write,omitempty"` @@ -708,9 +725,15 @@ func (r *RulerBuilder) initRule(internalRuleDir string, queryCfg []httpconfig.Co ruleArgs["--remote-write.config"] = string(rwCfgBytes) } + args := e2e.BuildArgs(ruleArgs) + + for _, label := range r.restoreIgnoredLabels { + args = append(args, "--restore-ignored-label="+label) + } + return e2emon.AsInstrumented(r.f.Init(wrapWithDefaults(e2e.StartOptions{ Image: r.image, - Command: e2e.NewCommand("rule", e2e.BuildArgs(ruleArgs)...), + Command: e2e.NewCommand("rule", args...), Readiness: e2e.NewHTTPReadinessProbe("http", "/-/ready", 200, 200), })), "http") } diff --git a/test/e2e/rule_test.go b/test/e2e/rule_test.go index 8835e04d9a6..0fb52b0efec 100644 --- a/test/e2e/rule_test.go +++ b/test/e2e/rule_test.go @@ -124,6 +124,22 @@ groups: - record: test_absent_metric expr: absent(nonexistent{job='thanos-receive'}) ` + + testAlertRuleHoldDuration = ` +groups: +- name: example_rule_hold_duration + interval: 1s + rules: + - alert: TestAlert_RuleHoldDuration + # It must be based on actual metric, otherwise call to StoreAPI would be not involved. + expr: absent(some_metric) + for: 2s + labels: + severity: page + annotations: + summary: "I always complain and allow partial response in query." +` + amTimeout = model.Duration(10 * time.Second) ) @@ -586,6 +602,114 @@ func TestRule_CanRemoteWriteData(t *testing.T) { }) } +func TestStatelessRulerAlertStateRestore(t *testing.T) { + t.Parallel() + + e, err := e2e.NewDockerEnvironment("e2e_test_stateless_rule_alert_state_restore") + testutil.Ok(t, err) + t.Cleanup(e2ethanos.CleanScenario(t, e)) + + ctx, cancel := context.WithTimeout(context.Background(), 3*time.Minute) + t.Cleanup(cancel) + + am := e2ethanos.NewAlertmanager(e, "1") + testutil.Ok(t, e2e.StartAndWaitReady(am)) + + receiver := e2ethanos.NewReceiveBuilder(e, "1").WithIngestionEnabled().Init() + testutil.Ok(t, e2e.StartAndWaitReady(receiver)) + rwURL := urlParse(t, e2ethanos.RemoteWriteEndpoint(receiver.InternalEndpoint("remote-write"))) + + q := e2ethanos.NewQuerierBuilder(e, "1", receiver.InternalEndpoint("grpc")). + WithReplicaLabels("replica", "receive").Init() + testutil.Ok(t, e2e.StartAndWaitReady(q)) + rulesSubDir := "rules" + rulers := []e2e.InstrumentedRunnable{} + for i := 1; i <= 2; i++ { + rFuture := e2ethanos.NewRulerBuilder(e, fmt.Sprintf("%d", i)) + rulesPath := filepath.Join(rFuture.Dir(), rulesSubDir) + testutil.Ok(t, os.MkdirAll(rulesPath, os.ModePerm)) + for i, rule := range []string{testAlertRuleHoldDuration} { + createRuleFile(t, filepath.Join(rulesPath, fmt.Sprintf("rules-%d.yaml", i)), rule) + } + r := rFuture.WithAlertManagerConfig([]alert.AlertmanagerConfig{ + { + EndpointsConfig: httpconfig.EndpointsConfig{ + StaticAddresses: []string{ + am.InternalEndpoint("http"), + }, + Scheme: "http", + }, + Timeout: amTimeout, + APIVersion: alert.APIv1, + }, + }).WithForGracePeriod("500ms"). + WithRestoreIgnoredLabels("tenant_id"). + InitStateless(filepath.Join(rFuture.InternalDir(), rulesSubDir), []httpconfig.Config{ + { + EndpointsConfig: httpconfig.EndpointsConfig{ + StaticAddresses: []string{ + q.InternalEndpoint("http"), + }, + Scheme: "http", + }, + }, + }, []*config.RemoteWriteConfig{ + {URL: &common_cfg.URL{URL: rwURL}, Name: "thanos-receiver"}, + }) + rulers = append(rulers, r) + } + + // Start the ruler 1 first. + testutil.Ok(t, e2e.StartAndWaitReady(rulers[0])) + + // Wait until remote write samples are written to receivers successfully. + testutil.Ok(t, rulers[0].WaitSumMetricsWithOptions(e2e.GreaterOrEqual(1), []string{"prometheus_remote_storage_samples_total"}, e2e.WaitMissingMetrics())) + + // Wait until the alert firing. + var alerts []*rulespb.AlertInstance + client := promclient.NewDefaultClient() + err = runutil.Repeat(time.Second*1, ctx.Done(), func() error { + alerts, err = client.AlertsInGRPC(ctx, urlParse(t, "http://"+rulers[0].Endpoint("http"))) + testutil.Ok(t, err) + if len(alerts) > 0 { + if alerts[0].State == rulespb.AlertState_FIRING { + return nil + } + } + return fmt.Errorf("alert is not firing") + }) + testutil.Ok(t, err) + // Record the alert active time. + alertActiveAt := alerts[0].ActiveAt + testutil.Ok(t, rulers[0].Stop()) + + // Start the ruler 2 now and ruler 2 should be able + // to restore the firing alert state. + testutil.Ok(t, e2e.StartAndWaitReady(rulers[1])) + + // Wait for 4 rule evaluation iterations to make sure the alert state is restored. + testutil.Ok(t, rulers[1].WaitSumMetricsWithOptions(e2e.GreaterOrEqual(4), []string{"prometheus_rule_group_duration_seconds_count"}, e2e.WaitMissingMetrics())) + + // Wait until the alert is firing on the second ruler. + err = runutil.Repeat(time.Second*1, ctx.Done(), func() error { + alerts, err = client.AlertsInGRPC(ctx, urlParse(t, "http://"+rulers[1].Endpoint("http"))) + testutil.Ok(t, err) + if len(alerts) > 0 { + if alerts[0].State == rulespb.AlertState_FIRING { + // The second ruler alert's active at time is the same as the previous one, + // which means the alert state is restored successfully. + if alertActiveAt.Unix() == alerts[0].ActiveAt.Unix() { + return nil + } else { + return fmt.Errorf("alert active time is not restored") + } + } + } + return fmt.Errorf("alert is not firing") + }) + testutil.Ok(t, err) +} + // TestRule_CanPersistWALData checks that in stateless mode, Thanos Ruler can persist rule evaluations // which couldn't be sent to the remote write endpoint (e.g because receiver isn't available). func TestRule_CanPersistWALData(t *testing.T) {