diff --git a/docs/components/rule.md b/docs/components/rule.md index e64593a54c2..d0204654ecf 100644 --- a/docs/components/rule.md +++ b/docs/components/rule.md @@ -391,7 +391,7 @@ Flags: https://thanos.io/tip/components/rule.md/#configuration. If defined, it takes precedence over the '--query' and '--query.sd-files' flags. - --query.default-step=1s Default range query step to use. This is only + --query.default-step=10s Default range query step to use. This is only used in stateless Ruler and alert state restoration. --query.http-method=POST HTTP method to use when sending queries. @@ -438,6 +438,10 @@ Flags: https://thanos.io/tip/thanos/logging.md/#configuration --resend-delay=1m Minimum amount of time to wait before resending an alert to Alertmanager. + --restore-ignore-labels= ... + Labels to be ignored when restoring alerts from + the remote storage. This is only used in + stateless mode. --rule-file=rules/ ... Rule files that should be used by rule manager. Can be in glob format (repeated). --shipper.upload-compacted diff --git a/pkg/promclient/promclient.go b/pkg/promclient/promclient.go index f6183849114..005858096b7 100644 --- a/pkg/promclient/promclient.go +++ b/pkg/promclient/promclient.go @@ -778,6 +778,29 @@ func (c *Client) RulesInGRPC(ctx context.Context, base *url.URL, typeRules strin return m.Data.Groups, nil } +// AlertsInGRPC returns the rules from Prometheus alerts API. It uses gRPC errors. +// NOTE: This method is tested in pkg/store/prometheus_test.go against Prometheus. +func (c *Client) AlertsInGRPC(ctx context.Context, base *url.URL) ([]*rulespb.AlertInstance, error) { + u := *base + u.Path = path.Join(u.Path, "/api/v1/alerts") + + var m struct { + Data struct { + Alerts []*rulespb.AlertInstance `json:"alerts"` + } `json:"data"` + } + + if err := c.get2xxResultWithGRPCErrors(ctx, "/prom_alerts HTTP[client]", &u, &m); err != nil { + return nil, err + } + + // Prometheus does not support PartialResponseStrategy, and probably would never do. Make it Abort by default. + for _, g := range m.Data.Alerts { + g.PartialResponseStrategy = storepb.PartialResponseStrategy_ABORT + } + return m.Data.Alerts, nil +} + // MetricMetadataInGRPC returns the metadata from Prometheus metric metadata API. It uses gRPC errors. func (c *Client) MetricMetadataInGRPC(ctx context.Context, base *url.URL, metric string, limit int) (map[string][]metadatapb.Meta, error) { u := *base diff --git a/test/e2e/e2ethanos/services.go b/test/e2e/e2ethanos/services.go index 75c5bb16cd2..4b54b8e1799 100644 --- a/test/e2e/e2ethanos/services.go +++ b/test/e2e/e2ethanos/services.go @@ -174,12 +174,13 @@ func NewPrometheusWithSidecarCustomImage(e e2e.Environment, name, promConfig, we } type QuerierBuilder struct { - environment e2e.Environment - sharedDir string - name string - routePrefix string - externalPrefix string - image string + environment e2e.Environment + sharedDir string + name string + routePrefix string + externalPrefix string + image string + additionalReplicaLabels []string storeAddresses []string fileSDStoreAddresses []string @@ -258,6 +259,11 @@ func (q *QuerierBuilder) WithTracingConfig(tracingConfig string) *QuerierBuilder return q } +func (q *QuerierBuilder) WithAdditionalReplicaLabels(replicaLabels []string) *QuerierBuilder { + q.additionalReplicaLabels = replicaLabels + return q +} + func (q *QuerierBuilder) BuildUninitiated() e2e.InstrumentedRunnableBuilder { return newUninitiatedService( q.environment, @@ -381,6 +387,10 @@ func (q *QuerierBuilder) collectArgs() ([]string, error) { args = append(args, "--tracing.config="+q.tracingConfig) } + for _, label := range q.additionalReplicaLabels { + args = append(args, "--query.replica-label="+label) + } + return args, nil } @@ -561,14 +571,14 @@ func NewIngestingReceiver(e e2e.Environment, name string) (e2e.InstrumentedRunna } func NewTSDBRuler(e e2e.Environment, name, ruleSubDir string, amCfg []alert.AlertmanagerConfig, queryCfg []httpconfig.Config) (e2e.InstrumentedRunnable, error) { - return newRuler(e, name, ruleSubDir, amCfg, queryCfg, nil) + return newRuler(e, name, ruleSubDir, amCfg, queryCfg, nil, nil) } -func NewStatelessRuler(e e2e.Environment, name, ruleSubDir string, amCfg []alert.AlertmanagerConfig, queryCfg []httpconfig.Config, remoteWriteCfg []*config.RemoteWriteConfig) (e2e.InstrumentedRunnable, error) { - return newRuler(e, name, ruleSubDir, amCfg, queryCfg, remoteWriteCfg) +func NewStatelessRuler(e e2e.Environment, name, ruleSubDir string, amCfg []alert.AlertmanagerConfig, queryCfg []httpconfig.Config, remoteWriteCfg []*config.RemoteWriteConfig, restoreIgnoredLabels []string) (e2e.InstrumentedRunnable, error) { + return newRuler(e, name, ruleSubDir, amCfg, queryCfg, remoteWriteCfg, restoreIgnoredLabels) } -func newRuler(e e2e.Environment, name, ruleSubDir string, amCfg []alert.AlertmanagerConfig, queryCfg []httpconfig.Config, remoteWriteCfg []*config.RemoteWriteConfig) (e2e.InstrumentedRunnable, error) { +func newRuler(e e2e.Environment, name, ruleSubDir string, amCfg []alert.AlertmanagerConfig, queryCfg []httpconfig.Config, remoteWriteCfg []*config.RemoteWriteConfig, restoreIgnoredLabels []string) (e2e.InstrumentedRunnable, error) { dir := filepath.Join(e.SharedDir(), "data", "rule", name) container := filepath.Join(ContainerSharedDir, "data", "rule", name) @@ -614,6 +624,10 @@ func newRuler(e e2e.Environment, name, ruleSubDir string, amCfg []alert.Alertman ruleArgs["--remote-write.config"] = string(rwCfgBytes) } + for _, label := range restoreIgnoredLabels { + ruleArgs["--restore-ignored-labels"] = label + } + ruler := NewService(e, fmt.Sprintf("rule-%v", name), DefaultImage(), diff --git a/test/e2e/rule_test.go b/test/e2e/rule_test.go index f4e1f6f2c01..bf83e6ab787 100644 --- a/test/e2e/rule_test.go +++ b/test/e2e/rule_test.go @@ -12,6 +12,7 @@ import ( "net/http" "os" "path/filepath" + "strconv" "testing" "time" @@ -523,7 +524,7 @@ func TestRule_CanRemoteWriteData(t *testing.T) { }, []*config.RemoteWriteConfig{ {URL: &common_cfg.URL{URL: rwURL}, Name: "thanos-receiver"}, {URL: &common_cfg.URL{URL: rwURL2}, Name: "thanos-receiver2"}, - }) + }, []string{"tenant_id"}) testutil.Ok(t, err) testutil.Ok(t, e2e.StartAndWaitReady(r)) @@ -553,6 +554,109 @@ func TestRule_CanRemoteWriteData(t *testing.T) { }) } +func TestStatelessRulerAlertStateRestore(t *testing.T) { + t.Parallel() + + e, err := e2e.NewDockerEnvironment("e2e_test_stateless_rule_alert_state_restore") + testutil.Ok(t, err) + t.Cleanup(e2ethanos.CleanScenario(t, e)) + + ctx, cancel := context.WithTimeout(context.Background(), 3*time.Minute) + t.Cleanup(cancel) + + rulesSubDir := "rules" + rulesPath := filepath.Join(e.SharedDir(), rulesSubDir) + testutil.Ok(t, os.MkdirAll(rulesPath, os.ModePerm)) + for i, rule := range []string{testAlertRuleWarnOnPartialResponse} { + createRuleFile(t, filepath.Join(rulesPath, fmt.Sprintf("rules-%d.yaml", i)), rule) + } + + am, err := e2ethanos.NewAlertmanager(e, "1") + testutil.Ok(t, err) + testutil.Ok(t, e2e.StartAndWaitReady(am)) + + receiver, err := e2ethanos.NewIngestingReceiver(e, "1") + testutil.Ok(t, err) + testutil.Ok(t, e2e.StartAndWaitReady(receiver)) + rwURL := mustURLParse(t, e2ethanos.RemoteWriteEndpoint(receiver.InternalEndpoint("remote-write"))) + + q, err := e2ethanos.NewQuerierBuilder(e, "1", receiver.InternalEndpoint("grpc")). + WithAdditionalReplicaLabels([]string{"rule", "receive"}).Build() + testutil.Ok(t, err) + testutil.Ok(t, e2e.StartAndWaitReady(q)) + rulers := []e2e.InstrumentedRunnable{} + for i := 1; i <= 2; i++ { + r, err := e2ethanos.NewStatelessRuler(e, strconv.Itoa(i), rulesSubDir, []alert.AlertmanagerConfig{ + { + EndpointsConfig: httpconfig.EndpointsConfig{ + StaticAddresses: []string{ + am.InternalEndpoint("http"), + }, + Scheme: "http", + }, + Timeout: amTimeout, + APIVersion: alert.APIv1, + }, + }, []httpconfig.Config{ + { + EndpointsConfig: httpconfig.EndpointsConfig{ + StaticAddresses: []string{ + q.InternalEndpoint("http"), + }, + Scheme: "http", + }, + }, + }, []*config.RemoteWriteConfig{ + { + URL: &common_cfg.URL{URL: rwURL}, + Name: "thanos-receiver", + QueueConfig: config.QueueConfig{ + BatchSendDeadline: model.Duration(time.Second * 1), + }, + }, + }, nil) + testutil.Ok(t, err) + rulers = append(rulers, r) + } + + // Start the ruler 1 first. + testutil.Ok(t, e2e.StartAndWaitReady(rulers[0])) + + // Wait until remote write samples are written to receivers successfully. + testutil.Ok(t, rulers[0].WaitSumMetricsWithOptions(e2e.GreaterOrEqual(1), []string{"prometheus_remote_storage_samples_total"}, e2e.WaitMissingMetrics())) + + // Wait until the alert firing. + var alerts []*rulespb.AlertInstance + client := promclient.NewDefaultClient() + for { + alerts, err = client.AlertsInGRPC(ctx, mustURLParse(t, "http://"+rulers[0].Endpoint("http"))) + testutil.Ok(t, err) + if alerts[0].State == rulespb.AlertState_FIRING { + break + } + time.Sleep(time.Second * 1) + } + alertActiveAt := alerts[0].ActiveAt + + time.Sleep(time.Second * 5) + // Start the ruler 2 now and ruler 2 should be able + // to restore the firing alert state. + testutil.Ok(t, e2e.StartAndWaitReady(rulers[1])) + + // Wait until the alert is firing on the second ruler. + for { + alerts, err = client.AlertsInGRPC(ctx, mustURLParse(t, "http://"+rulers[1].Endpoint("http"))) + testutil.Ok(t, err) + if alerts[0].State == rulespb.AlertState_FIRING { + break + } + time.Sleep(time.Second * 1) + } + // The second ruler alert's active at time is the same as the previous one, + // which means the alert state is restored successfully. + testutil.Equals(t, alertActiveAt, alerts[0]) +} + // TestRule_CanPersistWALData checks that in stateless mode, Thanos Ruler can persist rule evaluations // which couldn't be sent to the remote write endpoint (e.g because receiver isn't available). func TestRule_CanPersistWALData(t *testing.T) {