diff --git a/CHANGELOG.md b/CHANGELOG.md index 1922781b407..94ddd1ca8ea 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -19,6 +19,7 @@ We use *breaking :warning:* to mark changes that are not backward compatible (re - [#5337](https://github.com/thanos-io/thanos/pull/5337) Thanos Object Store: Add the `prefix` option to buckets - [#5352](https://github.com/thanos-io/thanos/pull/5352) Cache: Add cache metrics to groupcache. - [#5391](https://github.com/thanos-io/thanos/pull/5391) Receive: Add relabeling support. +- [#5420](https://github.com/thanos-io/thanos/pull/5420) Receive: Automatically remove stale tenants. ### Changed diff --git a/cmd/thanos/receive.go b/cmd/thanos/receive.go index 6ed11a28359..2ad8318fcba 100644 --- a/cmd/thanos/receive.go +++ b/cmd/thanos/receive.go @@ -290,6 +290,22 @@ func runReceive( ) } + level.Debug(logger).Log("msg", "setting up periodic tenant pruning") + { + ctx, cancel := context.WithCancel(context.Background()) + g.Add(func() error { + return runutil.Repeat(2*time.Hour, ctx.Done(), func() error { + err := dbs.Prune(ctx) + if err != nil { + level.Error(logger).Log("err", err) + } + return nil + }) + }, func(err error) { + cancel() + }) + } + level.Info(logger).Log("msg", "starting receiver") return nil } diff --git a/docs/components/receive.md b/docs/components/receive.md index 542546b6e12..e7533b49e89 100644 --- a/docs/components/receive.md +++ b/docs/components/receive.md @@ -12,6 +12,14 @@ For more information please check out [initial design proposal](../proposals-don > NOTE: As the block producer it's important to set correct "external labels" that will identify data block across Thanos clusters. See [external labels](../storage.md#external-labels) docs for details. +## Tenant lifecycle management + +Tenants in Receivers are created dynamically and do not need to be provisioned upfront. When a new value is detected in the tenant HTTP header, Receivers will provision and start managing an independent TSDB for that tenant. TSDB blocks that are sent to S3 will contain a unique `tenant_id` label which can be used to compact blocks independently for each tenant. + +A Receiver will automatically decommission a tenant once new samples have not been seen for longer than the `--tsdb.retention` period configured for the Receiver. The tenant decommission process includes flushing all in-memory samples for that tenant to disk, sending all unsent blocks to S3, and removing the tenant TSDB from the filesystem. If a tenant receives new samples after being decommissioned, a new TSDB will be created for the tenant. + +Note that because of the built-in decommissioning process, the semantic of the `--tsdb.retention` flag in the Receiver is different than the one in Prometheus. For Receivers, `--tsdb.retention=t` indicates that the data for a tenant will be kept for `t` amount of time, whereas in Prometheus, `--tsdb.retention=t` denotes that the last `t` duration of data will be maintained in TSDB. In other words, Prometheus will keep the last `t` duration of data even when it stops getting new samples. + ## Example ```bash diff --git a/pkg/errutil/multierror.go b/pkg/errutil/multierror.go index aa53706dde6..40bce8a36ab 100644 --- a/pkg/errutil/multierror.go +++ b/pkg/errutil/multierror.go @@ -6,6 +6,7 @@ package errutil import ( "bytes" "fmt" + "sync" ) // The MultiError type implements the error interface, and contains the @@ -32,6 +33,31 @@ func (es MultiError) Err() error { return NonNilMultiError(es) } +// SyncMultiError is a thread-safe implementation of MultiError. +type SyncMultiError struct { + mtx sync.Mutex + es MultiError +} + +// Add adds the error to the error list if it is not nil. +func (es *SyncMultiError) Add(err error) { + if err == nil { + return + } + es.mtx.Lock() + defer es.mtx.Unlock() + + es.Add(err) +} + +// Err returns the error list as an error or nil if it is empty. +func (es *SyncMultiError) Err() error { + es.mtx.Lock() + defer es.mtx.Unlock() + + return es.es.Err() +} + type NonNilMultiError MultiError // Returns a concatenated string of the contained errors. diff --git a/pkg/receive/multitsdb.go b/pkg/receive/multitsdb.go index 9e158776c0a..8dac6e7460d 100644 --- a/pkg/receive/multitsdb.go +++ b/pkg/receive/multitsdb.go @@ -10,6 +10,7 @@ import ( "path" "path/filepath" "sync" + "time" "github.com/go-kit/log" "github.com/go-kit/log/level" @@ -198,6 +199,84 @@ func (t *MultiTSDB) Close() error { return merr.Err() } +// Prune flushes and closes the TSDB for tenants that haven't received +// any new samples for longer than the TSDB retention period. +func (t *MultiTSDB) Prune(ctx context.Context) error { + t.mtx.Lock() + defer t.mtx.Unlock() + + var ( + wg sync.WaitGroup + merr errutil.SyncMultiError + ) + + for tenantID, tenantInstance := range t.tenants { + wg.Add(1) + go func(tenantID string, tenantInstance *tenant) { + defer wg.Done() + tlog := log.With(t.logger, "tenant", tenantID) + pruned, err := t.pruneTSDB(ctx, tlog, tenantInstance) + if err != nil { + merr.Add(err) + return + } + + if pruned { + level.Info(t.logger).Log("msg", "Pruned tenant", "tenant", tenantID) + delete(t.tenants, tenantID) + } + }(tenantID, tenantInstance) + } + wg.Wait() + + return merr.Err() +} + +// pruneTSDB removes a TSDB if its past the retention period. +// It compacts the TSDB head, sends all remaining blocks to S3 and removes the TSDB from disk. +func (t *MultiTSDB) pruneTSDB(ctx context.Context, logger log.Logger, tenantInstance *tenant) (bool, error) { + tenantTSDB := tenantInstance.readyStorage().get() + if tenantTSDB == nil { + return false, nil + } + tdb := tenantTSDB.db + head := tdb.Head() + if head.MaxTime() < 0 { + return false, nil + } + + sinceLastAppend := time.Now().Sub(time.UnixMilli(head.MaxTime())) + if sinceLastAppend.Milliseconds() <= t.tsdbOpts.RetentionDuration { + return false, nil + } + + level.Info(logger).Log("msg", "Pruning tenant") + if err := tdb.CompactHead(tsdb.NewRangeHead(head, head.MinTime(), head.MaxTime())); err != nil { + return false, err + } + + if tenantInstance.shipper() != nil { + uploaded, err := tenantInstance.shipper().Sync(ctx) + if err != nil { + return false, err + } + + if uploaded > 0 { + level.Info(logger).Log("msg", "Uploaded head block") + } + } + + if err := tdb.Close(); err != nil { + return false, err + } + + if err := os.RemoveAll(tenantTSDB.db.Dir()); err != nil { + return false, err + } + + return true, nil +} + func (t *MultiTSDB) Sync(ctx context.Context) (int, error) { if t.bucket == nil { return 0, errors.New("bucket is not specified, Sync should not be invoked") diff --git a/pkg/receive/multitsdb_test.go b/pkg/receive/multitsdb_test.go index bc7ccea7876..efa55a8b4fc 100644 --- a/pkg/receive/multitsdb_test.go +++ b/pkg/receive/multitsdb_test.go @@ -11,6 +11,8 @@ import ( "testing" "time" + "github.com/thanos-io/thanos/pkg/objstore" + "github.com/go-kit/log" "github.com/gogo/protobuf/types" "github.com/prometheus/client_golang/prometheus" @@ -412,6 +414,94 @@ func checkExemplarsResponse(t *testing.T, name string, expected, data []exemplar } } +func TestMultiTSDBPrune(t *testing.T) { + tests := []struct { + name string + bucket objstore.Bucket + expectedTenants int + expectedUploads int + }{ + { + name: "prune tsdbs without object storage", + bucket: nil, + expectedTenants: 1, + expectedUploads: 0, + }, + { + name: "prune tsdbs with object storage", + bucket: objstore.NewInMemBucket(), + expectedTenants: 1, + expectedUploads: 2, + }, + } + + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + dir, err := ioutil.TempDir("", "multitsdb-prune") + testutil.Ok(t, err) + defer func() { testutil.Ok(t, os.RemoveAll(dir)) }() + + m := NewMultiTSDB(dir, log.NewNopLogger(), prometheus.NewRegistry(), + &tsdb.Options{ + MinBlockDuration: (2 * time.Hour).Milliseconds(), + MaxBlockDuration: (2 * time.Hour).Milliseconds(), + RetentionDuration: (6 * time.Hour).Milliseconds(), + }, + labels.FromStrings("replica", "test"), + "tenant_id", + test.bucket, + false, + metadata.NoneFunc, + ) + defer func() { testutil.Ok(t, m.Close()) }() + + for i := 0; i < 100; i++ { + testutil.Ok(t, appendSample(m, "foo", time.UnixMilli(int64(10+i)))) + testutil.Ok(t, appendSample(m, "bar", time.UnixMilli(int64(10+i)))) + testutil.Ok(t, appendSample(m, "baz", time.Now().Add(time.Duration(i)*time.Second))) + } + testutil.Equals(t, 3, len(m.TSDBStores())) + + testutil.Ok(t, m.Prune(context.Background())) + testutil.Equals(t, test.expectedTenants, len(m.TSDBStores())) + + var shippedBlocks int + if test.bucket != nil { + testutil.Ok(t, test.bucket.Iter(context.Background(), "", func(s string) error { + shippedBlocks++ + return nil + })) + } + testutil.Equals(t, test.expectedUploads, shippedBlocks) + }) + } +} + +func appendSample(m *MultiTSDB, tenant string, timestamp time.Time) error { + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() + + app, err := m.TenantAppendable(tenant) + if err != nil { + return err + } + + var a storage.Appender + if err := runutil.Retry(1*time.Second, ctx.Done(), func() error { + a, err = app.Appender(ctx) + return err + }); err != nil { + return err + } + + _, err = a.Append(0, labels.FromStrings("foo", "bar"), timestamp.UnixMilli(), 10) + if err != nil { + return err + } + + return a.Commit() +} + func BenchmarkMultiTSDB(b *testing.B) { dir, err := ioutil.TempDir("", "multitsdb") testutil.Ok(b, err)