Skip to content

Commit

Permalink
address comments
Browse files Browse the repository at this point in the history
Signed-off-by: Ben Ye <[email protected]>
  • Loading branch information
Ben Ye committed Nov 8, 2021
1 parent fb0dfba commit 46acaee
Show file tree
Hide file tree
Showing 2 changed files with 14 additions and 13 deletions.
20 changes: 10 additions & 10 deletions cmd/thanos/rule.go
Original file line number Diff line number Diff line change
Expand Up @@ -336,7 +336,8 @@ func runRule(
var (
appendable storage.Appendable
queryable storage.Queryable
db *tsdb.DB
tsdbDB *tsdb.DB
agentDB *agent.DB
)

rwCfgYAML, err := conf.rwConfig.Content()
Expand All @@ -361,15 +362,15 @@ func runRule(
return errors.Wrap(err, "applying config to remote storage")
}

db, err := agent.Open(logger, reg, remoteStore, walDir, agentOpts)
agentDB, err = agent.Open(logger, reg, remoteStore, walDir, agentOpts)
if err != nil {
return errors.Wrap(err, "start remote write agent db")
}
fanoutStore := storage.NewFanout(logger, db, remoteStore)
fanoutStore := storage.NewFanout(logger, agentDB, remoteStore)
appendable = fanoutStore
queryable = fanoutStore
} else {
db, err = tsdb.Open(conf.dataDir, log.With(logger, "component", "tsdb"), reg, tsdbOpts, nil)
tsdbDB, err = tsdb.Open(conf.dataDir, log.With(logger, "component", "tsdb"), reg, tsdbOpts, nil)
if err != nil {
return errors.Wrap(err, "open TSDB")
}
Expand All @@ -383,13 +384,13 @@ func runRule(
done := make(chan struct{})
g.Add(func() error {
<-done
return db.Close()
return tsdbDB.Close()
}, func(error) {
close(done)
})
}
appendable = db
queryable = db
appendable = tsdbDB
queryable = tsdbDB
}

// Build the Alertmanager clients.
Expand Down Expand Up @@ -585,11 +586,10 @@ func runRule(
grpcserver.WithGracePeriod(time.Duration(conf.grpc.gracePeriod)),
grpcserver.WithTLSConfig(tlsCfg),
}
if db != nil {
tsdbStore := store.NewTSDBStore(logger, db, component.Rule, conf.lset)
if tsdbDB != nil {
tsdbStore := store.NewTSDBStore(logger, tsdbDB, component.Rule, conf.lset)
options = append(options, grpcserver.WithServer(store.RegisterStoreServer(tsdbStore)))
}
// TODO: Add rules API implementation when ready.
s := grpcserver.New(logger, reg, tracer, grpcLogOpts, tagOpts, comp, grpcProbe, options...)

g.Add(func() error {
Expand Down
7 changes: 4 additions & 3 deletions test/e2e/rule_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -507,11 +507,12 @@ func TestRule_CanRemoteWriteData(t *testing.T) {
testutil.Ok(t, err)
testutil.Ok(t, e2e.StartAndWaitReady(r))

// Wait until remote write samples are written to receivers successfully.
testutil.Ok(t, r.WaitSumMetricsWithOptions(e2e.GreaterOrEqual(1), []string{"prometheus_remote_storage_samples_total"}, e2e.WaitMissingMetrics()))

t.Run("can fetch remote-written samples from receiver", func(t *testing.T) {
testRecordedSamples := "test_absent_metric"
queryAndAssertSeries(t, ctx, q.Endpoint("http"), testRecordedSamples, func() time.Time {
return time.Now()
}, promclient.QueryOptions{
queryAndAssertSeries(t, ctx, q.Endpoint("http"), testRecordedSamples, time.Now, promclient.QueryOptions{
Deduplicate: false,
}, []model.Metric{
{
Expand Down

0 comments on commit 46acaee

Please sign in to comment.