Skip to content

Commit

Permalink
add e2e tests
Browse files Browse the repository at this point in the history
Signed-off-by: Ben Ye <[email protected]>
  • Loading branch information
Ben Ye committed Mar 14, 2022
1 parent 5cd2495 commit 2481793
Show file tree
Hide file tree
Showing 4 changed files with 157 additions and 12 deletions.
6 changes: 5 additions & 1 deletion docs/components/rule.md
Original file line number Diff line number Diff line change
Expand Up @@ -391,7 +391,7 @@ Flags:
https://thanos.io/tip/components/rule.md/#configuration.
If defined, it takes precedence over the
'--query' and '--query.sd-files' flags.
--query.default-step=1s Default range query step to use. This is only
--query.default-step=10s Default range query step to use. This is only
used in stateless Ruler and alert state
restoration.
--query.http-method=POST HTTP method to use when sending queries.
Expand Down Expand Up @@ -438,6 +438,10 @@ Flags:
https://thanos.io/tip/thanos/logging.md/#configuration
--resend-delay=1m Minimum amount of time to wait before resending
an alert to Alertmanager.
--restore-ignore-labels= ...
Labels to be ignored when restoring alerts from
the remote storage. This is only used in
stateless mode.
--rule-file=rules/ ... Rule files that should be used by rule manager.
Can be in glob format (repeated).
--shipper.upload-compacted
Expand Down
23 changes: 23 additions & 0 deletions pkg/promclient/promclient.go
Original file line number Diff line number Diff line change
Expand Up @@ -778,6 +778,29 @@ func (c *Client) RulesInGRPC(ctx context.Context, base *url.URL, typeRules strin
return m.Data.Groups, nil
}

// AlertsInGRPC returns the rules from Prometheus alerts API. It uses gRPC errors.
// NOTE: This method is tested in pkg/store/prometheus_test.go against Prometheus.
func (c *Client) AlertsInGRPC(ctx context.Context, base *url.URL) ([]*rulespb.AlertInstance, error) {
u := *base
u.Path = path.Join(u.Path, "/api/v1/alerts")

var m struct {
Data struct {
Alerts []*rulespb.AlertInstance `json:"alerts"`
} `json:"data"`
}

if err := c.get2xxResultWithGRPCErrors(ctx, "/prom_alerts HTTP[client]", &u, &m); err != nil {
return nil, err
}

// Prometheus does not support PartialResponseStrategy, and probably would never do. Make it Abort by default.
for _, g := range m.Data.Alerts {
g.PartialResponseStrategy = storepb.PartialResponseStrategy_ABORT
}
return m.Data.Alerts, nil
}

// MetricMetadataInGRPC returns the metadata from Prometheus metric metadata API. It uses gRPC errors.
func (c *Client) MetricMetadataInGRPC(ctx context.Context, base *url.URL, metric string, limit int) (map[string][]metadatapb.Meta, error) {
u := *base
Expand Down
34 changes: 24 additions & 10 deletions test/e2e/e2ethanos/services.go
Original file line number Diff line number Diff line change
Expand Up @@ -174,12 +174,13 @@ func NewPrometheusWithSidecarCustomImage(e e2e.Environment, name, promConfig, we
}

type QuerierBuilder struct {
environment e2e.Environment
sharedDir string
name string
routePrefix string
externalPrefix string
image string
environment e2e.Environment
sharedDir string
name string
routePrefix string
externalPrefix string
image string
additionalReplicaLabels []string

storeAddresses []string
fileSDStoreAddresses []string
Expand Down Expand Up @@ -258,6 +259,11 @@ func (q *QuerierBuilder) WithTracingConfig(tracingConfig string) *QuerierBuilder
return q
}

func (q *QuerierBuilder) WithAdditionalReplicaLabels(replicaLabels []string) *QuerierBuilder {
q.additionalReplicaLabels = replicaLabels
return q
}

func (q *QuerierBuilder) BuildUninitiated() e2e.InstrumentedRunnableBuilder {
return newUninitiatedService(
q.environment,
Expand Down Expand Up @@ -381,6 +387,10 @@ func (q *QuerierBuilder) collectArgs() ([]string, error) {
args = append(args, "--tracing.config="+q.tracingConfig)
}

for _, label := range q.additionalReplicaLabels {
args = append(args, "--query.replica-label="+label)
}

return args, nil
}

Expand Down Expand Up @@ -561,14 +571,14 @@ func NewIngestingReceiver(e e2e.Environment, name string) (e2e.InstrumentedRunna
}

func NewTSDBRuler(e e2e.Environment, name, ruleSubDir string, amCfg []alert.AlertmanagerConfig, queryCfg []httpconfig.Config) (e2e.InstrumentedRunnable, error) {
return newRuler(e, name, ruleSubDir, amCfg, queryCfg, nil)
return newRuler(e, name, ruleSubDir, amCfg, queryCfg, nil, nil)
}

func NewStatelessRuler(e e2e.Environment, name, ruleSubDir string, amCfg []alert.AlertmanagerConfig, queryCfg []httpconfig.Config, remoteWriteCfg []*config.RemoteWriteConfig) (e2e.InstrumentedRunnable, error) {
return newRuler(e, name, ruleSubDir, amCfg, queryCfg, remoteWriteCfg)
func NewStatelessRuler(e e2e.Environment, name, ruleSubDir string, amCfg []alert.AlertmanagerConfig, queryCfg []httpconfig.Config, remoteWriteCfg []*config.RemoteWriteConfig, restoreIgnoredLabels []string) (e2e.InstrumentedRunnable, error) {
return newRuler(e, name, ruleSubDir, amCfg, queryCfg, remoteWriteCfg, restoreIgnoredLabels)
}

func newRuler(e e2e.Environment, name, ruleSubDir string, amCfg []alert.AlertmanagerConfig, queryCfg []httpconfig.Config, remoteWriteCfg []*config.RemoteWriteConfig) (e2e.InstrumentedRunnable, error) {
func newRuler(e e2e.Environment, name, ruleSubDir string, amCfg []alert.AlertmanagerConfig, queryCfg []httpconfig.Config, remoteWriteCfg []*config.RemoteWriteConfig, restoreIgnoredLabels []string) (e2e.InstrumentedRunnable, error) {
dir := filepath.Join(e.SharedDir(), "data", "rule", name)
container := filepath.Join(ContainerSharedDir, "data", "rule", name)

Expand Down Expand Up @@ -614,6 +624,10 @@ func newRuler(e e2e.Environment, name, ruleSubDir string, amCfg []alert.Alertman
ruleArgs["--remote-write.config"] = string(rwCfgBytes)
}

for _, label := range restoreIgnoredLabels {
ruleArgs["--restore-ignored-labels"] = label
}

ruler := NewService(e,
fmt.Sprintf("rule-%v", name),
DefaultImage(),
Expand Down
106 changes: 105 additions & 1 deletion test/e2e/rule_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"net/http"
"os"
"path/filepath"
"strconv"
"testing"
"time"

Expand Down Expand Up @@ -523,7 +524,7 @@ func TestRule_CanRemoteWriteData(t *testing.T) {
}, []*config.RemoteWriteConfig{
{URL: &common_cfg.URL{URL: rwURL}, Name: "thanos-receiver"},
{URL: &common_cfg.URL{URL: rwURL2}, Name: "thanos-receiver2"},
})
}, []string{"tenant_id"})
testutil.Ok(t, err)
testutil.Ok(t, e2e.StartAndWaitReady(r))

Expand Down Expand Up @@ -553,6 +554,109 @@ func TestRule_CanRemoteWriteData(t *testing.T) {
})
}

func TestStatelessRulerAlertStateRestore(t *testing.T) {
t.Parallel()

e, err := e2e.NewDockerEnvironment("e2e_test_stateless_rule_alert_state_restore")
testutil.Ok(t, err)
t.Cleanup(e2ethanos.CleanScenario(t, e))

ctx, cancel := context.WithTimeout(context.Background(), 3*time.Minute)
t.Cleanup(cancel)

rulesSubDir := "rules"
rulesPath := filepath.Join(e.SharedDir(), rulesSubDir)
testutil.Ok(t, os.MkdirAll(rulesPath, os.ModePerm))
for i, rule := range []string{testAlertRuleWarnOnPartialResponse} {
createRuleFile(t, filepath.Join(rulesPath, fmt.Sprintf("rules-%d.yaml", i)), rule)
}

am, err := e2ethanos.NewAlertmanager(e, "1")
testutil.Ok(t, err)
testutil.Ok(t, e2e.StartAndWaitReady(am))

receiver, err := e2ethanos.NewIngestingReceiver(e, "1")
testutil.Ok(t, err)
testutil.Ok(t, e2e.StartAndWaitReady(receiver))
rwURL := mustURLParse(t, e2ethanos.RemoteWriteEndpoint(receiver.InternalEndpoint("remote-write")))

q, err := e2ethanos.NewQuerierBuilder(e, "1", receiver.InternalEndpoint("grpc")).
WithAdditionalReplicaLabels([]string{"rule", "receive"}).Build()
testutil.Ok(t, err)
testutil.Ok(t, e2e.StartAndWaitReady(q))
rulers := []e2e.InstrumentedRunnable{}
for i := 1; i <= 2; i++ {
r, err := e2ethanos.NewStatelessRuler(e, strconv.Itoa(i), rulesSubDir, []alert.AlertmanagerConfig{
{
EndpointsConfig: httpconfig.EndpointsConfig{
StaticAddresses: []string{
am.InternalEndpoint("http"),
},
Scheme: "http",
},
Timeout: amTimeout,
APIVersion: alert.APIv1,
},
}, []httpconfig.Config{
{
EndpointsConfig: httpconfig.EndpointsConfig{
StaticAddresses: []string{
q.InternalEndpoint("http"),
},
Scheme: "http",
},
},
}, []*config.RemoteWriteConfig{
{
URL: &common_cfg.URL{URL: rwURL},
Name: "thanos-receiver",
QueueConfig: config.QueueConfig{
BatchSendDeadline: model.Duration(time.Second * 1),
},
},
}, nil)
testutil.Ok(t, err)
rulers = append(rulers, r)
}

// Start the ruler 1 first.
testutil.Ok(t, e2e.StartAndWaitReady(rulers[0]))

// Wait until remote write samples are written to receivers successfully.
testutil.Ok(t, rulers[0].WaitSumMetricsWithOptions(e2e.GreaterOrEqual(1), []string{"prometheus_remote_storage_samples_total"}, e2e.WaitMissingMetrics()))

// Wait until the alert firing.
var alerts []*rulespb.AlertInstance
client := promclient.NewDefaultClient()
for {
alerts, err = client.AlertsInGRPC(ctx, mustURLParse(t, "http://"+rulers[0].Endpoint("http")))
testutil.Ok(t, err)
if alerts[0].State == rulespb.AlertState_FIRING {
break
}
time.Sleep(time.Second * 1)
}
alertActiveAt := alerts[0].ActiveAt

time.Sleep(time.Second * 5)
// Start the ruler 2 now and ruler 2 should be able
// to restore the firing alert state.
testutil.Ok(t, e2e.StartAndWaitReady(rulers[1]))

// Wait until the alert is firing on the second ruler.
for {
alerts, err = client.AlertsInGRPC(ctx, mustURLParse(t, "http://"+rulers[1].Endpoint("http")))
testutil.Ok(t, err)
if alerts[0].State == rulespb.AlertState_FIRING {
break
}
time.Sleep(time.Second * 1)
}
// The second ruler alert's active at time is the same as the previous one,
// which means the alert state is restored successfully.
testutil.Equals(t, alertActiveAt, alerts[0])
}

// TestRule_CanPersistWALData checks that in stateless mode, Thanos Ruler can persist rule evaluations
// which couldn't be sent to the remote write endpoint (e.g because receiver isn't available).
func TestRule_CanPersistWALData(t *testing.T) {
Expand Down

0 comments on commit 2481793

Please sign in to comment.