diff --git a/cmd/thanos/rule.go b/cmd/thanos/rule.go index 6a6cb9cd67..5e867c91e5 100644 --- a/cmd/thanos/rule.go +++ b/cmd/thanos/rule.go @@ -26,10 +26,12 @@ import ( "github.com/prometheus/client_golang/prometheus/promauto" "github.com/prometheus/common/model" "github.com/prometheus/common/route" + "github.com/prometheus/prometheus/config" "github.com/prometheus/prometheus/pkg/labels" "github.com/prometheus/prometheus/pkg/relabel" "github.com/prometheus/prometheus/promql" "github.com/prometheus/prometheus/rules" + "github.com/prometheus/prometheus/storage" "github.com/prometheus/prometheus/tsdb" "github.com/prometheus/prometheus/util/strutil" "github.com/thanos-io/thanos/pkg/errutil" @@ -50,6 +52,7 @@ import ( "github.com/thanos-io/thanos/pkg/promclient" "github.com/thanos-io/thanos/pkg/query" thanosrules "github.com/thanos-io/thanos/pkg/rules" + "github.com/thanos-io/thanos/pkg/rules/remotewrite" "github.com/thanos-io/thanos/pkg/runutil" grpcserver "github.com/thanos-io/thanos/pkg/server/grpc" httpserver "github.com/thanos-io/thanos/pkg/server/http" @@ -75,6 +78,8 @@ type ruleConfig struct { alertQueryURL *url.URL alertRelabelConfigYAML []byte + rwConfig *extflag.PathOrContent + resendDelay time.Duration evalInterval time.Duration ruleFiles []string @@ -117,6 +122,8 @@ func registerRule(app *extkingpin.App) { cmd.Flag("eval-interval", "The default evaluation interval to use."). Default("30s").DurationVar(&conf.evalInterval) + conf.rwConfig = extflag.RegisterPathOrContent(cmd, "remote-write.config", "YAML config for the remote-write server where samples should be sent to (see https://prometheus.io/docs/prometheus/latest/configuration/configuration/#remote_write). This automatically enables stateless mode for ruler and no series will be stored in the ruler's TSDB. If an empty config (or file) is provided, the flag is ignored and ruler is run with its own TSDB.", extflag.WithEnvSubstitution()) + reqLogDecision := cmd.Flag("log.request.decision", "Deprecation Warning - This flag would be soon deprecated, and replaced with `request.logging-config`. Request Logging for logging the start and end of requests. By default this flag is disabled. LogFinishCall: Logs the finish call of the requests. LogStartAndFinishCall: Logs the start and finish call of the requests. NoLogCall: Disable request logging.").Default("").Enum("NoLogCall", "LogFinishCall", "LogStartAndFinishCall", "") conf.objStoreConfig = extkingpin.RegisterCommonObjStoreFlags(cmd, "", false) @@ -319,25 +326,52 @@ func runRule( // Discover and resolve query addresses. addDiscoveryGroups(g, queryClient, conf.query.dnsSDInterval) } + var ( + appendable storage.Appendable + queryable storage.Queryable + db *tsdb.DB + ) - db, err := tsdb.Open(conf.dataDir, log.With(logger, "component", "tsdb"), reg, tsdbOpts, nil) + rwCfgYAML, err := conf.rwConfig.Content() if err != nil { - return errors.Wrap(err, "open TSDB") + return err } - level.Debug(logger).Log("msg", "removing storage lock file if any") - if err := removeLockfileIfAny(logger, conf.dataDir); err != nil { - return errors.Wrap(err, "remove storage lock files") - } + if len(rwCfgYAML) > 0 { + var rwCfg config.RemoteWriteConfig + rwCfg, err = remotewrite.LoadRemoteWriteConfig(rwCfgYAML) + if err != nil { + return err + } + walDir := filepath.Join(conf.dataDir, rwCfg.Name) + remoteStore, err := remotewrite.NewFanoutStorage(logger, reg, walDir, &rwCfg) + if err != nil { + return errors.Wrap(err, "set up remote-write store for ruler") + } + appendable = remoteStore + queryable = remoteStore + } else { + db, err = tsdb.Open(conf.dataDir, log.With(logger, "component", "tsdb"), reg, tsdbOpts, nil) + if err != nil { + return errors.Wrap(err, "open TSDB") + } - { - done := make(chan struct{}) - g.Add(func() error { - <-done - return db.Close() - }, func(error) { - close(done) - }) + level.Debug(logger).Log("msg", "removing storage lock file if any") + if err := removeLockfileIfAny(logger, conf.dataDir); err != nil { + return errors.Wrap(err, "remove storage lock files") + } + + { + done := make(chan struct{}) + g.Add(func() error { + <-done + return db.Close() + }, func(error) { + close(done) + }) + } + appendable = db + queryable = db } // Build the Alertmanager clients. @@ -435,9 +469,9 @@ func runRule( rules.ManagerOptions{ NotifyFunc: notifyFunc, Logger: logger, - Appendable: db, + Appendable: appendable, ExternalURL: nil, - Queryable: db, + Queryable: queryable, ResendDelay: conf.resendDelay, }, queryFuncCreator(logger, queryClients, metrics.duplicatedQuery, metrics.ruleEvalWarnings, conf.query.httpMethod), @@ -522,7 +556,7 @@ func runRule( ) // Start gRPC server. - { + if db != nil { tsdbStore := store.NewTSDBStore(logger, db, component.Rule, conf.lset) tlsCfg, err := tls.NewServerConfig(log.With(logger, "protocol", "gRPC"), conf.grpc.tlsSrvCert, conf.grpc.tlsSrvKey, conf.grpc.tlsSrvClientCA) @@ -547,6 +581,7 @@ func runRule( s.Shutdown(err) }) } + // Start UI & metrics HTTP server. { router := route.New() diff --git a/docs/components/rule.md b/docs/components/rule.md index 8e9da7773e..b13470a22f 100644 --- a/docs/components/rule.md +++ b/docs/components/rule.md @@ -344,6 +344,26 @@ Flags: (repeatable). --query.sd-interval=5m Refresh interval to re-read file SD files. (used as a fallback) + --remote-write.config= + Alternative to 'remote-write.config-file' flag + (mutually exclusive). Content of YAML config + for the remote-write server where samples + should be sent to (see + https://prometheus.io/docs/prometheus/latest/configuration/configuration/#remote_write). + This automatically enables stateless mode for + ruler and no series will be stored in the + ruler's TSDB. If an empty config (or file) is + provided, the flag is ignored and ruler is run + with its own TSDB. + --remote-write.config-file= + Path to YAML config for the remote-write server + where samples should be sent to (see + https://prometheus.io/docs/prometheus/latest/configuration/configuration/#remote_write). + This automatically enables stateless mode for + ruler and no series will be stored in the + ruler's TSDB. If an empty config (or file) is + provided, the flag is ignored and ruler is run + with its own TSDB. --request.logging-config= Alternative to 'request.logging-config-file' flag (mutually exclusive). Content of YAML file diff --git a/go.mod b/go.mod index 9dc5c9f698..7878f29bcc 100644 --- a/go.mod +++ b/go.mod @@ -61,6 +61,7 @@ require ( github.com/prometheus/common v0.29.0 github.com/prometheus/exporter-toolkit v0.6.0 github.com/prometheus/prometheus v1.8.2-0.20210720123808-b1ed4a0a663d + github.com/stretchr/testify v1.7.0 github.com/tencentyun/cos-go-sdk-v5 v0.7.31 github.com/uber/jaeger-client-go v2.29.1+incompatible github.com/uber/jaeger-lib v2.4.1+incompatible diff --git a/pkg/rules/remotewrite/remotewrite.go b/pkg/rules/remotewrite/remotewrite.go new file mode 100644 index 0000000000..9f1f5f9f46 --- /dev/null +++ b/pkg/rules/remotewrite/remotewrite.go @@ -0,0 +1,43 @@ +// Copyright (c) The Thanos Authors. +// Licensed under the Apache License 2.0. + +package remotewrite + +import ( + "time" + + "github.com/pkg/errors" + + "github.com/go-kit/kit/log" + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/prometheus/config" + "github.com/prometheus/prometheus/storage" + "github.com/prometheus/prometheus/storage/remote" + "gopkg.in/yaml.v2" +) + +// LoadRemoteWriteConfig prepares a RemoteWriteConfig instance from a given YAML config. +func LoadRemoteWriteConfig(configYAML []byte) (config.RemoteWriteConfig, error) { + var cfg config.RemoteWriteConfig + if err := yaml.Unmarshal(configYAML, &cfg); err != nil { + return cfg, err + } + return cfg, nil +} + +// NewFanoutStorage creates a storage that fans-out to both the WAL and a configured remote storage. +// The remote storage tails the WAL and sends the metrics it reads using Prometheus' remote_write. +func NewFanoutStorage(logger log.Logger, reg prometheus.Registerer, walDir string, rwConfig *config.RemoteWriteConfig) (storage.Storage, error) { + walStore, err := NewStorage(logger, reg, walDir) + if err != nil { + return nil, err + } + remoteStore := remote.NewStorage(logger, reg, walStore.StartTime, walStore.Directory(), 1*time.Minute, nil) + if err := remoteStore.ApplyConfig(&config.Config{ + GlobalConfig: config.DefaultGlobalConfig, + RemoteWriteConfigs: []*config.RemoteWriteConfig{rwConfig}, + }); err != nil { + return nil, errors.Wrap(err, "applying config to remote storage") + } + return storage.NewFanout(logger, walStore, remoteStore), nil +} diff --git a/pkg/rules/remotewrite/series.go b/pkg/rules/remotewrite/series.go new file mode 100644 index 0000000000..02063d8bfc --- /dev/null +++ b/pkg/rules/remotewrite/series.go @@ -0,0 +1,259 @@ +// Copyright (c) The Thanos Authors. +// Licensed under the Apache License 2.0. + +// This is copied from https://github.com/grafana/agent/blob/a23bd5cf27c2ac99695b7449d38fb12444941a1c/pkg/prom/wal/series.go +// TODO(idoqo): Migrate to prometheus package when https://github.com/prometheus/prometheus/pull/8785 is ready. +package remotewrite + +import ( + "sync" + + "github.com/prometheus/prometheus/pkg/labels" +) + +type memSeries struct { + sync.Mutex + + ref uint64 + lset labels.Labels + lastTs int64 + + // TODO(rfratto): this solution below isn't perfect, and there's still + // the possibility for a series to be deleted before it's + // completely gone from the WAL. Rather, we should have gc return + // a "should delete" map and be given a "deleted" map. + // If a series that is going to be marked for deletion is in the + // "deleted" map, then it should be deleted instead. + // + // The "deleted" map will be populated by the Truncate function. + // It will be cleared with every call to gc. + + // willDelete marks a series as to be deleted on the next garbage + // collection. If it receives a write, willDelete is disabled. + willDelete bool + + // Whether this series has samples waiting to be committed to the WAL + pendingCommit bool +} + +func (s *memSeries) updateTs(ts int64) { + s.lastTs = ts + s.willDelete = false + s.pendingCommit = true +} + +// seriesHashmap is a simple hashmap for memSeries by their label set. It is +// built on top of a regular hashmap and holds a slice of series to resolve +// hash collisions. Its methods require the hash to be submitted with it to +// avoid re-computations throughout the code. +// +// This code is copied from the Prometheus TSDB. +type seriesHashmap map[uint64][]*memSeries + +func (m seriesHashmap) get(hash uint64, lset labels.Labels) *memSeries { + for _, s := range m[hash] { + if labels.Equal(s.lset, lset) { + return s + } + } + return nil +} + +func (m seriesHashmap) set(hash uint64, s *memSeries) { + l := m[hash] + for i, prev := range l { + if labels.Equal(prev.lset, s.lset) { + l[i] = s + return + } + } + m[hash] = append(l, s) +} + +func (m seriesHashmap) del(hash uint64, ref uint64) { + var rem []*memSeries + for _, s := range m[hash] { + if s.ref != ref { + rem = append(rem, s) + } + } + if len(rem) == 0 { + delete(m, hash) + } else { + m[hash] = rem + } +} + +const ( + // defaultStripeSize is the default number of entries to allocate in the + // stripeSeries hash map. + defaultStripeSize = 1 << 14 +) + +// stripeSeries locks modulo ranges of IDs and hashes to reduce lock contention. +// The locks are padded to not be on the same cache line. Filling the padded space +// with the maps was profiled to be slower – likely due to the additional pointer +// dereferences. +// +// This code is copied from the Prometheus TSDB. +type stripeSeries struct { + size int + series []map[uint64]*memSeries + hashes []seriesHashmap + locks []stripeLock +} + +type stripeLock struct { + sync.RWMutex + // Padding to avoid multiple locks being on the same cache line. + _ [40]byte +} + +func newStripeSeries() *stripeSeries { + stripeSize := defaultStripeSize + s := &stripeSeries{ + size: stripeSize, + series: make([]map[uint64]*memSeries, stripeSize), + hashes: make([]seriesHashmap, stripeSize), + locks: make([]stripeLock, stripeSize), + } + + for i := range s.series { + s.series[i] = map[uint64]*memSeries{} + } + for i := range s.hashes { + s.hashes[i] = seriesHashmap{} + } + return s +} + +// gc garbage collects old chunks that are strictly before mint and removes +// series entirely that have no chunks left. +func (s *stripeSeries) gc(mint int64) map[uint64]struct{} { + var ( + deleted = map[uint64]struct{}{} + ) + + // Run through all series and find series that haven't been written to + // since mint. Mark those series as deleted and store their ID. + for i := 0; i < s.size; i++ { + s.locks[i].Lock() + + for _, series := range s.series[i] { + series.Lock() + seriesHash := series.lset.Hash() + + // If the series has received a write after mint, there's still + // data and it's not completely gone yet. + if series.lastTs >= mint || series.pendingCommit { + series.willDelete = false + series.Unlock() + continue + } + + // The series hasn't received any data and *might* be gone, but + // we want to give it an opportunity to come back before marking + // it as deleted, so we wait one more GC cycle. + if !series.willDelete { + series.willDelete = true + series.Unlock() + continue + } + + // The series is gone entirely. We'll need to delete the label + // hash (if one exists) so we'll obtain a lock for that too. + j := int(seriesHash) & (s.size - 1) + if i != j { + s.locks[j].Lock() + } + + deleted[series.ref] = struct{}{} + delete(s.series[i], series.ref) + s.hashes[j].del(seriesHash, series.ref) + + if i != j { + s.locks[j].Unlock() + } + + series.Unlock() + } + + s.locks[i].Unlock() + } + + return deleted +} + +func (s *stripeSeries) getByID(id uint64) *memSeries { + i := id & uint64(s.size-1) + + s.locks[i].RLock() + series := s.series[i][id] + s.locks[i].RUnlock() + + return series +} + +func (s *stripeSeries) getByHash(hash uint64, lset labels.Labels) *memSeries { + i := hash & uint64(s.size-1) + + s.locks[i].RLock() + series := s.hashes[i].get(hash, lset) + s.locks[i].RUnlock() + + return series +} + +func (s *stripeSeries) set(hash uint64, series *memSeries) { + i := hash & uint64(s.size-1) + s.locks[i].Lock() + s.hashes[i].set(hash, series) + s.locks[i].Unlock() + + i = series.ref & uint64(s.size-1) + s.locks[i].Lock() + s.series[i][series.ref] = series + s.locks[i].Unlock() +} + +func (s *stripeSeries) iterator() *stripeSeriesIterator { + return &stripeSeriesIterator{s} +} + +// stripeSeriesIterator allows to iterate over series through a channel. +// The channel should always be completely consumed to not leak. +type stripeSeriesIterator struct { + s *stripeSeries +} + +func (it *stripeSeriesIterator) Channel() <-chan *memSeries { + ret := make(chan *memSeries) + + go func() { + for i := 0; i < it.s.size; i++ { + it.s.locks[i].RLock() + + for _, series := range it.s.series[i] { + series.Lock() + + j := int(series.lset.Hash()) & (it.s.size - 1) + if i != j { + it.s.locks[j].RLock() + } + + ret <- series + + if i != j { + it.s.locks[j].RUnlock() + } + series.Unlock() + } + + it.s.locks[i].RUnlock() + } + + close(ret) + }() + + return ret +} diff --git a/pkg/rules/remotewrite/util.go b/pkg/rules/remotewrite/util.go new file mode 100644 index 0000000000..bb3b2ec7c6 --- /dev/null +++ b/pkg/rules/remotewrite/util.go @@ -0,0 +1,128 @@ +// Copyright (c) The Thanos Authors. +// Licensed under the Apache License 2.0. + +// This is copied from https://github.com/grafana/agent/blob/a23bd5cf27c2ac99695b7449d38fb12444941a1c/pkg/prom/wal/util.go +// TODO(idoqo): Migrate to prometheus package when https://github.com/prometheus/prometheus/pull/8785 is ready. +package remotewrite + +import ( + "path/filepath" + "sync" + + "github.com/prometheus/prometheus/tsdb/record" + "github.com/prometheus/prometheus/tsdb/wal" +) + +type walReplayer struct { + w wal.WriteTo +} + +func (r walReplayer) Replay(dir string) error { + w, err := wal.Open(nil, dir) + if err != nil { + return err + } + + dir, startFrom, err := wal.LastCheckpoint(w.Dir()) + if err != nil && err != record.ErrNotFound { + return err + } + + if err == nil { + sr, err := wal.NewSegmentsReader(dir) + if err != nil { + return err + } + + err = r.replayWAL(wal.NewReader(sr)) + if closeErr := sr.Close(); closeErr != nil && err == nil { + err = closeErr + } + if err != nil { + return err + } + + startFrom++ + } + + _, last, err := wal.Segments(w.Dir()) + if err != nil { + return err + } + + for i := startFrom; i <= last; i++ { + s, err := wal.OpenReadSegment(wal.SegmentName(w.Dir(), i)) + if err != nil { + return err + } + + sr := wal.NewSegmentBufReader(s) + err = r.replayWAL(wal.NewReader(sr)) + if closeErr := sr.Close(); closeErr != nil && err == nil { + err = closeErr + } + if err != nil { + return err + } + } + + return nil +} + +func (r walReplayer) replayWAL(reader *wal.Reader) error { + var dec record.Decoder + + for reader.Next() { + rec := reader.Record() + switch dec.Type(rec) { + case record.Series: + series, err := dec.Series(rec, nil) + if err != nil { + return err + } + r.w.StoreSeries(series, 0) + case record.Samples: + samples, err := dec.Samples(rec, nil) + if err != nil { + return err + } + r.w.Append(samples) + } + } + + return nil +} + +type walDataCollector struct { + mut sync.Mutex + samples []record.RefSample + series []record.RefSeries +} + +func (c *walDataCollector) Append(samples []record.RefSample) bool { + c.mut.Lock() + defer c.mut.Unlock() + + c.samples = append(c.samples, samples...) + return true +} + +func (c *walDataCollector) AppendExemplars([]record.RefExemplar) bool { + // dummy implementation to make walDataCollector conform to the WriteTo interface + return true +} + +func (c *walDataCollector) StoreSeries(series []record.RefSeries, _ int) { + c.mut.Lock() + defer c.mut.Unlock() + + c.series = append(c.series, series...) +} + +func (c *walDataCollector) SeriesReset(_ int) {} + +// SubDirectory returns the subdirectory within a Storage directory used for +// the Prometheus WAL. +func SubDirectory(base string) string { + return filepath.Join(base, "wal") +} diff --git a/pkg/rules/remotewrite/wal.go b/pkg/rules/remotewrite/wal.go new file mode 100644 index 0000000000..bf2c0f033a --- /dev/null +++ b/pkg/rules/remotewrite/wal.go @@ -0,0 +1,697 @@ +// Copyright (c) The Thanos Authors. +// Licensed under the Apache License 2.0. + +// This is copied from https://github.com/grafana/agent/blob/a23bd5cf27c2ac99695b7449d38fb12444941a1c/pkg/prom/wal/wal.go +// TODO(idoqo): Migrate to prometheus package when https://github.com/prometheus/prometheus/pull/8785 is ready. + +package remotewrite + +import ( + "context" + "fmt" + "math" + "sync" + "time" + + "github.com/go-kit/kit/log" + "github.com/go-kit/kit/log/level" + "github.com/pkg/errors" + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/promauto" + "github.com/prometheus/prometheus/pkg/exemplar" + "github.com/prometheus/prometheus/pkg/labels" + "github.com/prometheus/prometheus/pkg/timestamp" + "github.com/prometheus/prometheus/pkg/value" + "github.com/prometheus/prometheus/storage" + "github.com/prometheus/prometheus/tsdb" + "github.com/prometheus/prometheus/tsdb/record" + "github.com/prometheus/prometheus/tsdb/wal" +) + +// ErrWALClosed is an error returned when a WAL operation can't run because the +// storage has already been closed. +var ErrWALClosed = fmt.Errorf("WAL storage closed") + +type storageMetrics struct { + r prometheus.Registerer + + numActiveSeries prometheus.Gauge + numDeletedSeries prometheus.Gauge + totalCreatedSeries prometheus.Counter + totalRemovedSeries prometheus.Counter + totalAppendedSamples prometheus.Counter +} + +func newStorageMetrics(r prometheus.Registerer) *storageMetrics { + m := storageMetrics{r: r} + m.numActiveSeries = promauto.With(r).NewGauge(prometheus.GaugeOpts{ + Name: "thanos_wal_storage_active_series", + Help: "Current number of active series being tracked by the WAL storage", + }) + + m.numDeletedSeries = promauto.With(r).NewGauge(prometheus.GaugeOpts{ + Name: "thanos_wal_storage_deleted_series", + Help: "Current number of series marked for deletion from memory", + }) + + m.totalCreatedSeries = promauto.With(r).NewCounter(prometheus.CounterOpts{ + Name: "thanos_wal_storage_created_series_total", + Help: "Total number of created series appended to the WAL", + }) + + m.totalRemovedSeries = promauto.With(r).NewCounter(prometheus.CounterOpts{ + Name: "thanos_wal_storage_removed_series_total", + Help: "Total number of created series removed from the WAL", + }) + + m.totalAppendedSamples = promauto.With(r).NewCounter(prometheus.CounterOpts{ + Name: "thanos_wal_samples_appended_total", + Help: "Total number of samples appended to the WAL", + }) + + return &m +} + +func (m *storageMetrics) Unregister() { + if m.r == nil { + return + } + cs := []prometheus.Collector{ + m.numActiveSeries, + m.numDeletedSeries, + m.totalCreatedSeries, + m.totalRemovedSeries, + } + for _, c := range cs { + m.r.Unregister(c) + } +} + +// Storage implements storage.Storage, and just writes to the WAL. +type Storage struct { + // Embed Queryable/ChunkQueryable for compatibility, but don't actually implement it. + storage.Queryable + storage.ChunkQueryable + + // Operations against the WAL must be protected by a mutex so it doesn't get + // closed in the middle of an operation. Other operations are concurrency-safe, so we + // use a RWMutex to allow multiple usages of the WAL at once. If the WAL is closed, all + // operations that change the WAL must fail. + walMtx sync.RWMutex + walClosed bool + + path string + wal *wal.WAL + logger log.Logger + + appenderPool sync.Pool + bufPool sync.Pool + + mtx sync.RWMutex + nextRef uint64 + series *stripeSeries + + deletedMtx sync.Mutex + deleted map[uint64]int // Deleted series, and what WAL segment they must be kept until. + + metrics *storageMetrics +} + +// NewStorage makes a new Storage. +func NewStorage(logger log.Logger, registerer prometheus.Registerer, path string) (*Storage, error) { + w, err := wal.NewSize(logger, registerer, SubDirectory(path), wal.DefaultSegmentSize, true) + if err != nil { + return nil, err + } + + storage := &Storage{ + path: path, + wal: w, + logger: logger, + deleted: map[uint64]int{}, + series: newStripeSeries(), + metrics: newStorageMetrics(registerer), + + // The first ref ID must be non-zero, as the scraping code treats 0 as a + // non-existent ID and won't cache it. + nextRef: 1, + } + + storage.bufPool.New = func() interface{} { + b := make([]byte, 0, 1024) + return b + } + + storage.appenderPool.New = func() interface{} { + return &appender{ + w: storage, + series: make([]record.RefSeries, 0, 100), + samples: make([]record.RefSample, 0, 100), + } + } + + if err := storage.replayWAL(); err != nil { + level.Warn(storage.logger).Log("msg", "encountered WAL read error, attempting repair", "err", err) + if err := w.Repair(err); err != nil { + return nil, errors.Wrap(err, "repair corrupted WAL") + } + } + + return storage, nil +} + +func (w *Storage) replayWAL() error { + w.walMtx.RLock() + defer w.walMtx.RUnlock() + + if w.walClosed { + return ErrWALClosed + } + + level.Info(w.logger).Log("msg", "replaying WAL, this may take a while", "dir", w.wal.Dir()) + dir, startFrom, err := wal.LastCheckpoint(w.wal.Dir()) + if err != nil && err != record.ErrNotFound { + return errors.Wrap(err, "find last checkpoint") + } + + if err == nil { + sr, err := wal.NewSegmentsReader(dir) + if err != nil { + return errors.Wrap(err, "open checkpoint") + } + defer func() { + if err := sr.Close(); err != nil { + level.Warn(w.logger).Log("msg", "error while closing the wal segments reader", "err", err) + } + }() + + // A corrupted checkpoint is a hard error for now and requires user + // intervention. There's likely little data that can be recovered anyway. + if err := w.loadWAL(wal.NewReader(sr)); err != nil { + return errors.Wrap(err, "backfill checkpoint") + } + startFrom++ + level.Info(w.logger).Log("msg", "WAL checkpoint loaded") + } + + // Find the last segment. + _, last, err := wal.Segments(w.wal.Dir()) + if err != nil { + return errors.Wrap(err, "finding WAL segments") + } + + // Backfill segments from the most recent checkpoint onwards. + for i := startFrom; i <= last; i++ { + s, err := wal.OpenReadSegment(wal.SegmentName(w.wal.Dir(), i)) + if err != nil { + return errors.Wrap(err, fmt.Sprintf("open WAL segment: %d", i)) + } + + sr := wal.NewSegmentBufReader(s) + err = w.loadWAL(wal.NewReader(sr)) + if err := sr.Close(); err != nil { + level.Warn(w.logger).Log("msg", "error while closing the wal segments reader", "err", err) + } + if err != nil { + return err + } + level.Info(w.logger).Log("msg", "WAL segment loaded", "segment", i, "maxSegment", last) + } + + return nil +} + +func (w *Storage) loadWAL(r *wal.Reader) (err error) { + var ( + dec record.Decoder + ) + + var ( + decoded = make(chan interface{}, 10) + errCh = make(chan error, 1) + seriesPool = sync.Pool{ + New: func() interface{} { + return []record.RefSeries{} + }, + } + samplesPool = sync.Pool{ + New: func() interface{} { + return []record.RefSample{} + }, + } + ) + + go func() { + defer close(decoded) + for r.Next() { + rec := r.Record() + switch dec.Type(rec) { + case record.Series: + series := seriesPool.Get().([]record.RefSeries)[:0] + series, err = dec.Series(rec, series) + if err != nil { + errCh <- &wal.CorruptionErr{ + Err: errors.Wrap(err, "decode series"), + Segment: r.Segment(), + Offset: r.Offset(), + } + return + } + decoded <- series + case record.Samples: + samples := samplesPool.Get().([]record.RefSample)[:0] + samples, err = dec.Samples(rec, samples) + if err != nil { + errCh <- &wal.CorruptionErr{ + Err: errors.Wrap(err, "decode samples"), + Segment: r.Segment(), + Offset: r.Offset(), + } + } + decoded <- samples + case record.Tombstones: + // We don't care about tombstones + continue + default: + errCh <- &wal.CorruptionErr{ + Err: errors.Errorf("invalid record type %v", dec.Type(rec)), + Segment: r.Segment(), + Offset: r.Offset(), + } + return + } + } + }() + + for d := range decoded { + switch v := d.(type) { + case []record.RefSeries: + for _, s := range v { + // If this is a new series, create it in memory without a timestamp. + // If we read in a sample for it, we'll use the timestamp of the latest + // sample. Otherwise, the series is stale and will be deleted once + // the truncation is performed. + if w.series.getByID(s.Ref) == nil { + series := &memSeries{ref: s.Ref, lset: s.Labels, lastTs: 0} + w.series.set(s.Labels.Hash(), series) + + w.metrics.numActiveSeries.Inc() + w.metrics.totalCreatedSeries.Inc() + + w.mtx.Lock() + if w.nextRef <= s.Ref { + w.nextRef = s.Ref + 1 + } + w.mtx.Unlock() + } + } + + //nolint:staticcheck + seriesPool.Put(v) + case []record.RefSample: + for _, s := range v { + // Update the lastTs for the series based + series := w.series.getByID(s.Ref) + if series == nil { + level.Warn(w.logger).Log("msg", "found sample referencing non-existing series, skipping") + continue + } + + series.Lock() + if s.T > series.lastTs { + series.lastTs = s.T + } + series.Unlock() + } + + //nolint:staticcheck + samplesPool.Put(v) + default: + panic(fmt.Errorf("unexpected decoded type: %T", d)) + } + } + + select { + case err := <-errCh: + return err + default: + } + + if r.Err() != nil { + return errors.Wrap(r.Err(), "read records") + } + + return nil +} + +// Directory returns the path where the WAL storage is held. +func (w *Storage) Directory() string { + return w.path +} + +// Appender returns a new appender against the storage. +func (w *Storage) Appender(_ context.Context) storage.Appender { + return w.appenderPool.Get().(storage.Appender) +} + +func (w *Storage) Querier(ctx context.Context, mint, maxt int64) (storage.Querier, error) { + return &remoteWriteQueryable{}, nil +} + +// StartTime always returns 0, nil. It is implemented for compatibility with +// Prometheus, but is unused in the agent. +func (*Storage) StartTime() (int64, error) { + return 0, nil +} + +// Truncate removes all data from the WAL prior to the timestamp specified by +// mint. +func (w *Storage) Truncate(mint int64) error { + w.walMtx.RLock() + defer w.walMtx.RUnlock() + + if w.walClosed { + return ErrWALClosed + } + + start := time.Now() + + // Garbage collect series that haven't received an update since mint. + w.gc(mint) + level.Info(w.logger).Log("msg", "series GC completed", "duration", time.Since(start)) + + first, last, err := wal.Segments(w.wal.Dir()) + if err != nil { + return errors.Wrap(err, "get segment range") + } + + // Start a new segment, so low ingestion volume instance don't have more WAL + // than needed. + err = w.wal.NextSegment() + if err != nil { + return errors.Wrap(err, "next segment") + } + + last-- // Never consider last segment for checkpoint. + if last < 0 { + return nil // no segments yet. + } + + // The lower two thirds of segments should contain mostly obsolete samples. + // If we have less than two segments, it's not worth checkpointing yet. + last = first + (last-first)*2/3 + if last <= first { + return nil + } + + keep := func(id uint64) bool { + if w.series.getByID(id) != nil { + return true + } + + w.deletedMtx.Lock() + _, ok := w.deleted[id] + w.deletedMtx.Unlock() + return ok + } + if _, err = wal.Checkpoint(w.logger, w.wal, first, last, keep, mint); err != nil { + return errors.Wrap(err, "create checkpoint") + } + if err := w.wal.Truncate(last + 1); err != nil { + // If truncating fails, we'll just try again at the next checkpoint. + // Leftover segments will just be ignored in the future if there's a checkpoint + // that supersedes them. + level.Error(w.logger).Log("msg", "truncating segments failed", "err", err) + } + + // The checkpoint is written and segments before it is truncated, so we no + // longer need to track deleted series that are before it. + w.deletedMtx.Lock() + for ref, segment := range w.deleted { + if segment < first { + delete(w.deleted, ref) + w.metrics.totalRemovedSeries.Inc() + } + } + w.metrics.numDeletedSeries.Set(float64(len(w.deleted))) + w.deletedMtx.Unlock() + + if err := wal.DeleteCheckpoints(w.wal.Dir(), last); err != nil { + // Leftover old checkpoints do not cause problems down the line beyond + // occupying disk space. + // They will just be ignored since a higher checkpoint exists. + level.Error(w.logger).Log("msg", "delete old checkpoints", "err", err) + } + + level.Info(w.logger).Log("msg", "WAL checkpoint complete", + "first", first, "last", last, "duration", time.Since(start)) + return nil +} + +// gc removes data before the minimum timestamp from the head. +func (w *Storage) gc(mint int64) { + deleted := w.series.gc(mint) + w.metrics.numActiveSeries.Sub(float64(len(deleted))) + + _, last, _ := wal.Segments(w.wal.Dir()) + w.deletedMtx.Lock() + defer w.deletedMtx.Unlock() + + // We want to keep series records for any newly deleted series + // until we've passed the last recorded segment. The WAL will + // still contain samples records with all of the ref IDs until + // the segment's samples has been deleted from the checkpoint. + // + // If the series weren't kept on startup when the WAL was replied, + // the samples wouldn't be able to be used since there wouldn't + // be any labels for that ref ID. + for ref := range deleted { + w.deleted[ref] = last + } + + w.metrics.numDeletedSeries.Set(float64(len(w.deleted))) +} + +// WriteStalenessMarkers appends a staleness sample for all active series. +func (w *Storage) WriteStalenessMarkers(remoteTsFunc func() int64) error { + var lastErr error + var lastTs int64 + + app := w.Appender(context.Background()) + it := w.series.iterator() + for series := range it.Channel() { + var ( + ref = series.ref + lset = series.lset + ) + + ts := timestamp.FromTime(time.Now()) + _, err := app.Append(ref, lset, ts, math.Float64frombits(value.StaleNaN)) + if err != nil { + lastErr = err + } + + // Remove millisecond precision; the remote write timestamp we get + // only has second precision. + lastTs = (ts / 1000) * 1000 + } + + if lastErr == nil { + if err := app.Commit(); err != nil { + return fmt.Errorf("failed to commit staleness markers: %w", err) + } + + // Wait for remote write to write the lastTs, but give up after 1m + level.Info(w.logger).Log("msg", "waiting for remote write to write staleness markers...") + + stopCh := time.After(1 * time.Minute) + start := time.Now() + + Outer: + for { + select { + case <-stopCh: + level.Error(w.logger).Log("msg", "timed out waiting for staleness markers to be written") + break Outer + default: + writtenTs := remoteTsFunc() + if writtenTs >= lastTs { + duration := time.Since(start) + level.Info(w.logger).Log("msg", "remote write wrote staleness markers", "duration", duration) + break Outer + } + + level.Info(w.logger).Log("msg", "remote write hasn't written staleness markers yet", "remoteTs", writtenTs, "lastTs", lastTs) + + // Wait a bit before reading again + time.Sleep(5 * time.Second) + } + } + } + + return lastErr +} + +// Close closes the storage and all its underlying resources. +func (w *Storage) Close() error { + w.walMtx.Lock() + defer w.walMtx.Unlock() + + if w.walClosed { + return fmt.Errorf("already closed") + } + w.walClosed = true + + if w.metrics != nil { + w.metrics.Unregister() + } + return w.wal.Close() +} + +type appender struct { + w *Storage + series []record.RefSeries + samples []record.RefSample +} + +func (a *appender) Append(ref uint64, l labels.Labels, t int64, v float64) (uint64, error) { + if ref == 0 { + return a.Add(l, t, v) + } + return ref, a.AddFast(ref, t, v) +} + +func (a *appender) Add(l labels.Labels, t int64, v float64) (uint64, error) { + hash := l.Hash() + series := a.w.series.getByHash(hash, l) + if series != nil { + return series.ref, a.AddFast(series.ref, t, v) + } + + // Ensure no empty or duplicate labels have gotten through. This mirrors the + // equivalent validation code in the TSDB's headAppender. + l = l.WithoutEmpty() + if len(l) == 0 { + return 0, errors.Wrap(tsdb.ErrInvalidSample, "empty labelset") + } + + if lbl, dup := l.HasDuplicateLabelNames(); dup { + return 0, errors.Wrap(tsdb.ErrInvalidSample, fmt.Sprintf(`label name "%s" is not unique`, lbl)) + } + + a.w.mtx.Lock() + ref := a.w.nextRef + a.w.nextRef++ + a.w.mtx.Unlock() + + series = &memSeries{ref: ref, lset: l} + series.updateTs(t) + + a.series = append(a.series, record.RefSeries{ + Ref: ref, + Labels: l, + }) + + a.w.series.set(hash, series) + + a.w.metrics.numActiveSeries.Inc() + a.w.metrics.totalCreatedSeries.Inc() + a.w.metrics.totalAppendedSamples.Inc() + + return series.ref, a.AddFast(series.ref, t, v) +} + +func (a *appender) AddFast(ref uint64, t int64, v float64) error { + series := a.w.series.getByID(ref) + if series == nil { + return storage.ErrNotFound + } + series.Lock() + defer series.Unlock() + + // Update last recorded timestamp. Used by Storage.gc to determine if a + // series is dead. + series.updateTs(t) + + a.samples = append(a.samples, record.RefSample{ + Ref: ref, + T: t, + V: v, + }) + + a.w.metrics.totalAppendedSamples.Inc() + return nil +} + +func (a *appender) AppendExemplar(ref uint64, l labels.Labels, e exemplar.Exemplar) (uint64, error) { + // remote_write doesn't support exemplars yet, so do nothing here. + return 0, nil +} + +// Commit submits the collected samples and purges the batch. +func (a *appender) Commit() error { + a.w.walMtx.RLock() + defer a.w.walMtx.RUnlock() + + if a.w.walClosed { + return ErrWALClosed + } + + var encoder record.Encoder + buf := a.w.bufPool.Get().([]byte) + + if len(a.series) > 0 { + buf = encoder.Series(a.series, buf) + if err := a.w.wal.Log(buf); err != nil { + return err + } + buf = buf[:0] + } + + if len(a.samples) > 0 { + buf = encoder.Samples(a.samples, buf) + if err := a.w.wal.Log(buf); err != nil { + return err + } + buf = buf[:0] + } + + //nolint:staticcheck + a.w.bufPool.Put(buf) + + for _, sample := range a.samples { + series := a.w.series.getByID(sample.Ref) + if series != nil { + series.Lock() + series.pendingCommit = false + series.Unlock() + } + } + + return a.Rollback() +} + +func (a *appender) Rollback() error { + a.series = a.series[:0] + a.samples = a.samples[:0] + a.w.appenderPool.Put(a) + return nil +} + +type remoteWriteQueryable struct{} + +func (r *remoteWriteQueryable) LabelValues(name string, matchers ...*labels.Matcher) ([]string, storage.Warnings, error) { + return nil, nil, nil +} + +func (r *remoteWriteQueryable) LabelNames(matchers ...*labels.Matcher) ([]string, storage.Warnings, error) { + return nil, nil, nil +} + +func (r *remoteWriteQueryable) Close() error { + return nil +} + +func (r *remoteWriteQueryable) Select(sortSeries bool, hints *storage.SelectHints, matchers ...*labels.Matcher) storage.SeriesSet { + return storage.EmptySeriesSet() +} diff --git a/pkg/rules/remotewrite/wal_test.go b/pkg/rules/remotewrite/wal_test.go new file mode 100644 index 0000000000..1843284081 --- /dev/null +++ b/pkg/rules/remotewrite/wal_test.go @@ -0,0 +1,379 @@ +// Copyright (c) The Thanos Authors. +// Licensed under the Apache License 2.0. + +// This is copied from https://github.com/grafana/agent/blob/a23bd5cf27c2ac99695b7449d38fb12444941a1c/pkg/prom/wal/wal_test.go +// TODO(idoqo): Migrate to prometheus package when https://github.com/prometheus/prometheus/pull/8785 is ready. +package remotewrite + +import ( + "context" + "io/ioutil" + "math" + "os" + "sort" + "testing" + "time" + + "github.com/go-kit/kit/log" + "github.com/prometheus/prometheus/pkg/labels" + "github.com/prometheus/prometheus/pkg/value" + "github.com/prometheus/prometheus/storage" + "github.com/prometheus/prometheus/tsdb/record" + "github.com/stretchr/testify/require" +) + +func TestStorage_InvalidSeries(t *testing.T) { + walDir, err := ioutil.TempDir(os.TempDir(), "wal") + require.NoError(t, err) + defer os.RemoveAll(walDir) + + s, err := NewStorage(log.NewNopLogger(), nil, walDir) + require.NoError(t, err) + defer func() { + require.NoError(t, s.Close()) + }() + + app := s.Appender(context.Background()) + + _, err = app.Append(0, labels.Labels{}, 0, 0) + require.Error(t, err, "should reject empty labels") + + _, err = app.Append(0, labels.Labels{{Name: "a", Value: "1"}, {Name: "a", Value: "2"}}, 0, 0) + require.Error(t, err, "should reject duplicate labels") + + // Sanity check: valid series + _, err = app.Append(0, labels.Labels{{Name: "a", Value: "1"}}, 0, 0) + require.NoError(t, err, "should not reject valid series") +} + +func TestStorage(t *testing.T) { + walDir, err := ioutil.TempDir(os.TempDir(), "wal") + require.NoError(t, err) + defer os.RemoveAll(walDir) + + s, err := NewStorage(log.NewNopLogger(), nil, walDir) + require.NoError(t, err) + defer func() { + require.NoError(t, s.Close()) + }() + + app := s.Appender(context.Background()) + + // Write some samples + payload := seriesList{ + {name: "foo", samples: []sample{{1, 10.0}, {10, 100.0}}}, + {name: "bar", samples: []sample{{2, 20.0}, {20, 200.0}}}, + {name: "baz", samples: []sample{{3, 30.0}, {30, 300.0}}}, + } + for _, metric := range payload { + metric.Write(t, app) + } + + require.NoError(t, app.Commit()) + + collector := walDataCollector{} + replayer := walReplayer{w: &collector} + require.NoError(t, replayer.Replay(s.wal.Dir())) + + names := []string{} + for _, series := range collector.series { + names = append(names, series.Labels.Get("__name__")) + } + require.Equal(t, payload.SeriesNames(), names) + + expectedSamples := payload.ExpectedSamples() + actual := collector.samples + sort.Sort(byRefSample(actual)) + require.Equal(t, expectedSamples, actual) +} + +func TestStorage_ExistingWAL(t *testing.T) { + walDir, err := ioutil.TempDir(os.TempDir(), "wal") + require.NoError(t, err) + defer os.RemoveAll(walDir) + + s, err := NewStorage(log.NewNopLogger(), nil, walDir) + require.NoError(t, err) + + app := s.Appender(context.Background()) + payload := seriesList{ + {name: "foo", samples: []sample{{1, 10.0}, {10, 100.0}}}, + {name: "bar", samples: []sample{{2, 20.0}, {20, 200.0}}}, + {name: "baz", samples: []sample{{3, 30.0}, {30, 300.0}}}, + {name: "blerg", samples: []sample{{4, 40.0}, {40, 400.0}}}, + } + + // Write half of the samples. + for _, metric := range payload[0 : len(payload)/2] { + metric.Write(t, app) + } + + require.NoError(t, app.Commit()) + require.NoError(t, s.Close()) + + // We need to wait a little bit for the previous store to finish + // flushing. + time.Sleep(time.Millisecond * 150) + + // Create a new storage, write the other half of samples. + s, err = NewStorage(log.NewNopLogger(), nil, walDir) + require.NoError(t, err) + defer func() { + require.NoError(t, s.Close()) + }() + + // Verify that the storage picked up existing series when it + // replayed the WAL. + for series := range s.series.iterator().Channel() { + require.Greater(t, series.lastTs, int64(0), "series timestamp not updated") + } + + app = s.Appender(context.Background()) + + for _, metric := range payload[len(payload)/2:] { + metric.Write(t, app) + } + + require.NoError(t, app.Commit()) + + collector := walDataCollector{} + replayer := walReplayer{w: &collector} + require.NoError(t, replayer.Replay(s.wal.Dir())) + + names := []string{} + for _, series := range collector.series { + names = append(names, series.Labels.Get("__name__")) + } + require.Equal(t, payload.SeriesNames(), names) + + expectedSamples := payload.ExpectedSamples() + actual := collector.samples + sort.Sort(byRefSample(actual)) + require.Equal(t, expectedSamples, actual) +} + +func TestStorage_Truncate(t *testing.T) { + // Same as before but now do the following: + // after writing all the data, forcefully create 4 more segments, + // then do a truncate of a timestamp for _some_ of the data. + // then read data back in. Expect to only get the latter half of data. + walDir, err := ioutil.TempDir(os.TempDir(), "wal") + require.NoError(t, err) + defer os.RemoveAll(walDir) + + s, err := NewStorage(log.NewNopLogger(), nil, walDir) + require.NoError(t, err) + defer func() { + require.NoError(t, s.Close()) + }() + + app := s.Appender(context.Background()) + + payload := seriesList{ + {name: "foo", samples: []sample{{1, 10.0}, {10, 100.0}}}, + {name: "bar", samples: []sample{{2, 20.0}, {20, 200.0}}}, + {name: "baz", samples: []sample{{3, 30.0}, {30, 300.0}}}, + {name: "blerg", samples: []sample{{4, 40.0}, {40, 400.0}}}, + } + + for _, metric := range payload { + metric.Write(t, app) + } + + require.NoError(t, app.Commit()) + + // Forefully create a bunch of new segments so when we truncate + // there's enough segments to be considered for truncation. + for i := 0; i < 5; i++ { + require.NoError(t, s.wal.NextSegment()) + } + + // Truncate half of the samples, keeping only the second sample + // per series. + keepTs := payload[len(payload)-1].samples[0].ts + 1 + err = s.Truncate(keepTs) + require.NoError(t, err) + + payload = payload.Filter(func(s sample) bool { + return s.ts >= keepTs + }) + expectedSamples := payload.ExpectedSamples() + + // Read back the WAL, collect series and samples. + collector := walDataCollector{} + replayer := walReplayer{w: &collector} + require.NoError(t, replayer.Replay(s.wal.Dir())) + + names := []string{} + for _, series := range collector.series { + names = append(names, series.Labels.Get("__name__")) + } + require.Equal(t, payload.SeriesNames(), names) + + actual := collector.samples + sort.Sort(byRefSample(actual)) + require.Equal(t, expectedSamples, actual) +} + +func TestStorage_WriteStalenessMarkers(t *testing.T) { + walDir, err := ioutil.TempDir(os.TempDir(), "wal") + require.NoError(t, err) + defer os.RemoveAll(walDir) + + s, err := NewStorage(log.NewNopLogger(), nil, walDir) + require.NoError(t, err) + defer func() { + require.NoError(t, s.Close()) + }() + + app := s.Appender(context.Background()) + + // Write some samples + payload := seriesList{ + {name: "foo", samples: []sample{{1, 10.0}, {10, 100.0}}}, + {name: "bar", samples: []sample{{2, 20.0}, {20, 200.0}}}, + {name: "baz", samples: []sample{{3, 30.0}, {30, 300.0}}}, + } + for _, metric := range payload { + metric.Write(t, app) + } + + require.NoError(t, app.Commit()) + + // Write staleness markers for every series + require.NoError(t, s.WriteStalenessMarkers(func() int64 { + // Pass math.MaxInt64 so it seems like everything was written already + return math.MaxInt64 + })) + + // Read back the WAL, collect series and samples. + collector := walDataCollector{} + replayer := walReplayer{w: &collector} + require.NoError(t, replayer.Replay(s.wal.Dir())) + + actual := collector.samples + sort.Sort(byRefSample(actual)) + + staleMap := map[uint64]bool{} + for _, sample := range actual { + if _, ok := staleMap[sample.Ref]; !ok { + staleMap[sample.Ref] = false + } + if value.IsStaleNaN(sample.V) { + staleMap[sample.Ref] = true + } + } + + for ref, v := range staleMap { + require.True(t, v, "ref %d doesn't have stale marker", ref) + } +} + +func TestStoraeg_TruncateAfterClose(t *testing.T) { + walDir, err := ioutil.TempDir(os.TempDir(), "wal") + require.NoError(t, err) + defer os.RemoveAll(walDir) + + s, err := NewStorage(log.NewNopLogger(), nil, walDir) + require.NoError(t, err) + + require.NoError(t, s.Close()) + require.Error(t, ErrWALClosed, s.Truncate(0)) +} + +type sample struct { + ts int64 + val float64 +} + +type series struct { + name string + samples []sample + + ref *uint64 +} + +func (s *series) Write(t *testing.T, app storage.Appender) { + t.Helper() + + lbls := labels.FromMap(map[string]string{"__name__": s.name}) + + offset := 0 + if s.ref == nil { + // Write first sample to get ref ID + ref, err := app.Append(0, lbls, s.samples[0].ts, s.samples[0].val) + require.NoError(t, err) + + s.ref = &ref + offset = 1 + } + + // Write other data points with AddFast + for _, sample := range s.samples[offset:] { + _, err := app.Append(*s.ref, lbls, sample.ts, sample.val) + require.NoError(t, err) + } +} + +type seriesList []*series + +// Filter creates a new seriesList with series filtered by a sample +// keep predicate function. +func (s seriesList) Filter(fn func(s sample) bool) seriesList { + var ret seriesList + + for _, entry := range s { + var samples []sample + + for _, sample := range entry.samples { + if fn(sample) { + samples = append(samples, sample) + } + } + + if len(samples) > 0 { + ret = append(ret, &series{ + name: entry.name, + ref: entry.ref, + samples: samples, + }) + } + } + + return ret +} + +func (s seriesList) SeriesNames() []string { + names := make([]string, 0, len(s)) + for _, series := range s { + names = append(names, series.name) + } + return names +} + +// ExpectedSamples returns the list of expected samples, sorted by ref ID and timestamp. +func (s seriesList) ExpectedSamples() []record.RefSample { + expect := []record.RefSample{} + for _, series := range s { + for _, sample := range series.samples { + expect = append(expect, record.RefSample{ + Ref: *series.ref, + T: sample.ts, + V: sample.val, + }) + } + } + sort.Sort(byRefSample(expect)) + return expect +} + +type byRefSample []record.RefSample + +func (b byRefSample) Len() int { return len(b) } +func (b byRefSample) Swap(i, j int) { b[i], b[j] = b[j], b[i] } +func (b byRefSample) Less(i, j int) bool { + if b[i].Ref == b[j].Ref { + return b[i].T < b[j].T + } + return b[i].Ref < b[j].Ref +} diff --git a/scripts/quickstart.sh b/scripts/quickstart.sh index e99ab8fdd9..779be02a71 100755 --- a/scripts/quickstart.sh +++ b/scripts/quickstart.sh @@ -245,6 +245,18 @@ QUERIER_JAEGER_CONFIG=$( EOF ) +REMOTE_WRITE_FLAGS="" +if [ -n "${STATELESS_RULER_ENABLED}" ]; then + cat >/data/rule-remote-write.yaml <<-EOF + name: "thanos-receivers" + remote_write: + url: "http://127.0.0.1:10908/api/v1/receive" + name: "receive-0" +EOF + + REMOTE_WRITE_FLAGS="--remote-write.config-file data/rule-remote-write.yaml" +fi + # Start Thanos Ruler. ${THANOS_EXECUTABLE} rule \ --data-dir data/ \ @@ -256,6 +268,7 @@ ${THANOS_EXECUTABLE} rule \ --http-address="0.0.0.0:19999" \ --grpc-address="0.0.0.0:19998" \ --label 'rule="true"' \ + "${REMOTE_WRITE_FLAGS}" \ ${OBJSTORECFG} & STORES="${STORES} --store 127.0.0.1:19998" diff --git a/test/e2e/e2ethanos/services.go b/test/e2e/e2ethanos/services.go index c15c150cbb..d582042aeb 100644 --- a/test/e2e/e2ethanos/services.go +++ b/test/e2e/e2ethanos/services.go @@ -17,6 +17,7 @@ import ( "github.com/grafana/dskit/backoff" "github.com/pkg/errors" "github.com/prometheus/common/model" + "github.com/prometheus/prometheus/config" "github.com/prometheus/prometheus/discovery/targetgroup" "github.com/prometheus/prometheus/pkg/relabel" "gopkg.in/yaml.v2" @@ -460,7 +461,15 @@ func NewRoutingAndIngestingReceiverWithConfigWatcher(sharedDir, networkName, nam return receiver, nil } -func NewRuler(sharedDir, name, ruleSubDir string, amCfg []alert.AlertmanagerConfig, queryCfg []query.Config) (*Service, error) { +func NewTSDBRuler(sharedDir string, name string, ruleSubDir string, amCfg []alert.AlertmanagerConfig, queryCfg []query.Config) (*Service, error) { + return newRuler(sharedDir, name, ruleSubDir, amCfg, queryCfg, nil) +} + +func NewStatelessRuler(sharedDir string, name string, ruleSubDir string, amCfg []alert.AlertmanagerConfig, queryCfg []query.Config, remoteWriteCfg *config.RemoteWriteConfig) (*Service, error) { + return newRuler(sharedDir, name, ruleSubDir, amCfg, queryCfg, remoteWriteCfg) +} + +func newRuler(sharedDir string, name string, ruleSubDir string, amCfg []alert.AlertmanagerConfig, queryCfg []query.Config, remoteWriteCfg *config.RemoteWriteConfig) (*Service, error) { dir := filepath.Join(sharedDir, "data", "rule", name) container := filepath.Join(e2e.ContainerSharedDir, "data", "rule", name) if err := os.MkdirAll(dir, 0750); err != nil { @@ -479,25 +488,34 @@ func NewRuler(sharedDir, name, ruleSubDir string, amCfg []alert.AlertmanagerConf return nil, errors.Wrapf(err, "generate query file: %v", queryCfg) } + ruleArgs := map[string]string{ + "--debug.name": fmt.Sprintf("rule-%v", name), + "--grpc-address": ":9091", + "--grpc-grace-period": "0s", + "--http-address": ":8080", + "--label": fmt.Sprintf(`replica="%s"`, name), + "--data-dir": container, + "--rule-file": filepath.Join(e2e.ContainerSharedDir, ruleSubDir, "*.yaml"), + "--eval-interval": "3s", + "--alertmanagers.config": string(amCfgBytes), + "--alertmanagers.sd-dns-interval": "1s", + "--log.level": infoLogLevel, + "--query.config": string(queryCfgBytes), + "--query.sd-dns-interval": "1s", + "--resend-delay": "5s", + } + if remoteWriteCfg != nil { + rwCfgBytes, err := yaml.Marshal(remoteWriteCfg) + if err != nil { + return nil, errors.Wrapf(err, "generate remote write config: %v", remoteWriteCfg) + } + ruleArgs["--remote-write.config"] = string(rwCfgBytes) + } + ruler := NewService( fmt.Sprintf("rule-%v", name), DefaultImage(), - e2e.NewCommand("rule", e2e.BuildArgs(map[string]string{ - "--debug.name": fmt.Sprintf("rule-%v", name), - "--grpc-address": ":9091", - "--grpc-grace-period": "0s", - "--http-address": ":8080", - "--label": fmt.Sprintf(`replica="%s"`, name), - "--data-dir": container, - "--rule-file": filepath.Join(e2e.ContainerSharedDir, ruleSubDir, "*.yaml"), - "--eval-interval": "3s", - "--alertmanagers.config": string(amCfgBytes), - "--alertmanagers.sd-dns-interval": "1s", - "--log.level": infoLogLevel, - "--query.config": string(queryCfgBytes), - "--query.sd-dns-interval": "1s", - "--resend-delay": "5s", - })...), + e2e.NewCommand("rule", e2e.BuildArgs(ruleArgs)...), e2e.NewHTTPReadinessProbe(8080, "/-/ready", 200, 200), 8080, 9091, diff --git a/test/e2e/rule_test.go b/test/e2e/rule_test.go index fb7289ca90..0a8755b9cd 100644 --- a/test/e2e/rule_test.go +++ b/test/e2e/rule_test.go @@ -16,7 +16,9 @@ import ( "time" "github.com/cortexproject/cortex/integration/e2e" + common_cfg "github.com/prometheus/common/config" "github.com/prometheus/common/model" + "github.com/prometheus/prometheus/config" "github.com/prometheus/prometheus/discovery/targetgroup" "gopkg.in/yaml.v2" @@ -97,6 +99,15 @@ groups: annotations: summary: "I always complain and I have been loaded via sighup signal." ` + testRuleRecordAbsentMetric = ` +groups: +- name: example_record_rules + interval: 1s + rules: + - record: test_absent_metric + expr: absent(nonexistent{job='thanos-receive'}) +` + amTimeout = model.Duration(10 * time.Second) ) type rulesResp struct { @@ -220,7 +231,7 @@ func TestRule(t *testing.T) { testutil.Ok(t, err) testutil.Ok(t, s.StartAndWaitReady(am1, am2)) - r, err := e2ethanos.NewRuler(s.SharedDir(), "1", rulesSubDir, []alert.AlertmanagerConfig{ + r, err := e2ethanos.NewTSDBRuler(s.SharedDir(), "1", rulesSubDir, []alert.AlertmanagerConfig{ { EndpointsConfig: http_util.EndpointsConfig{ FileSDConfigs: []http_util.FileSDConfig{ @@ -235,7 +246,7 @@ func TestRule(t *testing.T) { }, Scheme: "http", }, - Timeout: model.Duration(10 * time.Second), + Timeout: amTimeout, APIVersion: alert.APIv1, }, }, []query.Config{ @@ -435,6 +446,87 @@ func TestRule(t *testing.T) { } } +// TestRule_CanRemoteWriteData checks that Thanos Ruler can be run in stateless mode +// where it remote_writes rule evaluations to a Prometheus remote-write endpoint (typically +// a Thanos Receiver). +func TestRule_CanRemoteWriteData(t *testing.T) { + t.Parallel() + + s, err := e2e.NewScenario("e2e_test_rule_remote_write") + testutil.Ok(t, err) + t.Cleanup(e2ethanos.CleanScenario(t, s)) + + ctx, cancel := context.WithTimeout(context.Background(), 3*time.Minute) + t.Cleanup(cancel) + + rulesSubDir := "rules" + rulesPath := filepath.Join(s.SharedDir(), rulesSubDir) + testutil.Ok(t, os.MkdirAll(rulesPath, os.ModePerm)) + + for i, rule := range []string{testRuleRecordAbsentMetric, testAlertRuleWarnOnPartialResponse} { + createRuleFile(t, filepath.Join(rulesPath, fmt.Sprintf("rules-%d.yaml", i)), rule) + } + + am, err := e2ethanos.NewAlertmanager(s.SharedDir(), "1") + testutil.Ok(t, err) + testutil.Ok(t, s.StartAndWaitReady(am)) + + receiver, err := e2ethanos.NewIngestingReceiver(s.SharedDir(), s.NetworkName()) + testutil.Ok(t, err) + testutil.Ok(t, s.StartAndWaitReady(receiver)) + rwURL := mustURLParse(t, e2ethanos.RemoteWriteEndpoint(receiver.NetworkEndpoint(8081))) + + querier, err := e2ethanos.NewQuerierBuilder(s.SharedDir(), "1", receiver.GRPCNetworkEndpoint()).Build() + testutil.Ok(t, err) + testutil.Ok(t, s.StartAndWaitReady(querier)) + r, err := e2ethanos.NewStatelessRuler(s.SharedDir(), "1", rulesSubDir, []alert.AlertmanagerConfig{ + { + EndpointsConfig: http_util.EndpointsConfig{ + StaticAddresses: []string{ + am.NetworkHTTPEndpoint(), + }, + Scheme: "http", + }, + Timeout: amTimeout, + APIVersion: alert.APIv1, + }, + }, []query.Config{ + { + EndpointsConfig: http_util.EndpointsConfig{ + StaticAddresses: []string{ + querier.NetworkHTTPEndpoint(), + }, + Scheme: "http", + }, + }, + }, &config.RemoteWriteConfig{ + URL: &common_cfg.URL{URL: rwURL}, + Name: "thanos-receiver", + }) + testutil.Ok(t, err) + testutil.Ok(t, s.StartAndWaitReady(r)) + + t.Run("can fetch remote-written samples from receiver", func(t *testing.T) { + testRecordedSamples := "test_absent_metric" + queryAndAssertSeries(t, ctx, querier.HTTPEndpoint(), testRecordedSamples, promclient.QueryOptions{ + Deduplicate: false, + }, []model.Metric{ + { + "__name__": "test_absent_metric", + "job": "thanos-receive", + "receive": "e2e_test_rule_remote_write", + "tenant_id": "default-tenant", + }, + }) + }) +} + +// TestRule_CanPersistWALData checks that in stateless mode, Thanos Ruler can persist rule evaluations +// which couldn't be sent to the remote write endpoint (e.g because receiver isn't available). +func TestRule_CanPersistWALData(t *testing.T) { + //TODO: Implement test with unavailable remote-write endpoint(receiver) +} + // Test Ruler behavior on different storepb.PartialResponseStrategy when having partial response from single `failingStoreAPI`. func TestRulePartialResponse(t *testing.T) { t.Skip("TODO: Allow HTTP ports from binaries running on host to be accessible.") diff --git a/test/e2e/rules_api_test.go b/test/e2e/rules_api_test.go index 86f97cdf18..2a31b2d3a9 100644 --- a/test/e2e/rules_api_test.go +++ b/test/e2e/rules_api_test.go @@ -66,9 +66,9 @@ func TestRulesAPI_Fanout(t *testing.T) { testutil.Ok(t, s.StartAndWaitReady(prom1, sidecar1, prom2, sidecar2)) // 2x Rulers. - r1, err := e2ethanos.NewRuler(s.SharedDir(), "rule1", thanosRulesSubDir, nil, nil) + r1, err := e2ethanos.NewTSDBRuler(s.SharedDir(), "rule1", thanosRulesSubDir, nil, nil) testutil.Ok(t, err) - r2, err := e2ethanos.NewRuler(s.SharedDir(), "rule2", thanosRulesSubDir, nil, nil) + r2, err := e2ethanos.NewTSDBRuler(s.SharedDir(), "rule2", thanosRulesSubDir, nil, nil) testutil.Ok(t, err) stores := []string{sidecar1.GRPCNetworkEndpoint(), sidecar2.GRPCNetworkEndpoint(), r1.NetworkEndpointFor(s.NetworkName(), 9091), r2.NetworkEndpointFor(s.NetworkName(), 9091)} @@ -88,9 +88,9 @@ func TestRulesAPI_Fanout(t *testing.T) { } // Recreate rulers with the corresponding query config. - r1, err = e2ethanos.NewRuler(s.SharedDir(), "rule1", thanosRulesSubDir, nil, queryCfg) + r1, err = e2ethanos.NewTSDBRuler(s.SharedDir(), "rule1", thanosRulesSubDir, nil, queryCfg) testutil.Ok(t, err) - r2, err = e2ethanos.NewRuler(s.SharedDir(), "rule2", thanosRulesSubDir, nil, queryCfg) + r2, err = e2ethanos.NewTSDBRuler(s.SharedDir(), "rule2", thanosRulesSubDir, nil, queryCfg) testutil.Ok(t, err) testutil.Ok(t, s.StartAndWaitReady(r1, r2))