Skip to content

Commit

Permalink
Implement tenant expiration
Browse files Browse the repository at this point in the history
This commit adds dynamic TSDB pruning for tenants which have not
received new samples within a certain period of time.

Signed-off-by: Filip Petkovski <[email protected]>
  • Loading branch information
fpetkovski committed Jun 14, 2022
1 parent 03775c2 commit 705200e
Show file tree
Hide file tree
Showing 6 changed files with 220 additions and 0 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ We use *breaking :warning:* to mark changes that are not backward compatible (re
- [#5337](https://github.com/thanos-io/thanos/pull/5337) Thanos Object Store: Add the `prefix` option to buckets
- [#5352](https://github.com/thanos-io/thanos/pull/5352) Cache: Add cache metrics to groupcache.
- [#5391](https://github.com/thanos-io/thanos/pull/5391) Receive: Add relabeling support.
- [#5420](https://github.com/thanos-io/thanos/pull/5420) Receive: Automatically remove stale tenants.

### Changed

Expand Down
16 changes: 16 additions & 0 deletions cmd/thanos/receive.go
Original file line number Diff line number Diff line change
Expand Up @@ -290,6 +290,22 @@ func runReceive(
)
}

level.Debug(logger).Log("msg", "setting up periodic tenant pruning")
{
ctx, cancel := context.WithCancel(context.Background())
g.Add(func() error {
return runutil.Repeat(2*time.Hour, ctx.Done(), func() error {
err := dbs.Prune(ctx)
if err != nil {
level.Error(logger).Log("err", err)
}
return nil
})
}, func(err error) {
cancel()
})
}

level.Info(logger).Log("msg", "starting receiver")
return nil
}
Expand Down
8 changes: 8 additions & 0 deletions docs/components/receive.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,14 @@ For more information please check out [initial design proposal](../proposals-don

> NOTE: As the block producer it's important to set correct "external labels" that will identify data block across Thanos clusters. See [external labels](../storage.md#external-labels) docs for details.
## Tenant lifecycle management

Tenants in Receivers are created dynamically and do not need to be provisioned upfront. When a new value is detected in the tenant HTTP header, Receivers will provision and start managing an independent TSDB for that tenant. TSDB blocks that are sent to S3 will contain a unique `tenant_id` label which can be used to compact blocks independently for each tenant.

A Receiver will automatically decommission a tenant once new samples have not been seen for longer than the `--tsdb.retention` period configured for the Receiver. The tenant decommission process includes flushing all in-memory samples for that tenant to disk, sending all unsent blocks to S3, and removing the tenant TSDB from the filesystem. If a tenant receives new samples after being decommissioned, a new TSDB will be created for the tenant.

Note that because of the built-in decommissioning process, the semantic of the `--tsdb.retention` flag in the Receiver is different than the one in Prometheus. For Receivers, `--tsdb.retention=t` indicates that the data for a tenant will be kept for `t` amount of time, whereas in Prometheus, `--tsdb.retention=t` denotes that the last `t` duration of data will be maintained in TSDB. In other words, Prometheus will keep the last `t` duration of data even when it stops getting new samples.

## Example

```bash
Expand Down
26 changes: 26 additions & 0 deletions pkg/errutil/multierror.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ package errutil
import (
"bytes"
"fmt"
"sync"
)

// The MultiError type implements the error interface, and contains the
Expand All @@ -32,6 +33,31 @@ func (es MultiError) Err() error {
return NonNilMultiError(es)
}

// SyncMultiError is a thread-safe implementation of MultiError.
type SyncMultiError struct {
mtx sync.Mutex
es MultiError
}

// Add adds the error to the error list if it is not nil.
func (es *SyncMultiError) Add(err error) {
if err == nil {
return
}
es.mtx.Lock()
defer es.mtx.Unlock()

es.Add(err)
}

// Err returns the error list as an error or nil if it is empty.
func (es *SyncMultiError) Err() error {
es.mtx.Lock()
defer es.mtx.Unlock()

return es.es.Err()
}

type NonNilMultiError MultiError

// Returns a concatenated string of the contained errors.
Expand Down
79 changes: 79 additions & 0 deletions pkg/receive/multitsdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"path"
"path/filepath"
"sync"
"time"

"github.com/go-kit/log"
"github.com/go-kit/log/level"
Expand Down Expand Up @@ -198,6 +199,84 @@ func (t *MultiTSDB) Close() error {
return merr.Err()
}

// Prune flushes and closes the TSDB for tenants that haven't received
// any new samples for longer than the TSDB retention period.
func (t *MultiTSDB) Prune(ctx context.Context) error {
t.mtx.Lock()
defer t.mtx.Unlock()

var (
wg sync.WaitGroup
merr errutil.SyncMultiError
)

for tenantID, tenantInstance := range t.tenants {
wg.Add(1)
go func(tenantID string, tenantInstance *tenant) {
defer wg.Done()
tlog := log.With(t.logger, "tenant", tenantID)
pruned, err := t.pruneTSDB(ctx, tlog, tenantInstance)
if err != nil {
merr.Add(err)
return
}

if pruned {
level.Info(t.logger).Log("msg", "Pruned tenant", "tenant", tenantID)
delete(t.tenants, tenantID)
}
}(tenantID, tenantInstance)
}
wg.Wait()

return merr.Err()
}

// pruneTSDB removes a TSDB if its past the retention period.
// It compacts the TSDB head, sends all remaining blocks to S3 and removes the TSDB from disk.
func (t *MultiTSDB) pruneTSDB(ctx context.Context, logger log.Logger, tenantInstance *tenant) (bool, error) {
tenantTSDB := tenantInstance.readyStorage().get()
if tenantTSDB == nil {
return false, nil
}
tdb := tenantTSDB.db
head := tdb.Head()
if head.MaxTime() < 0 {
return false, nil
}

sinceLastAppend := time.Now().Sub(time.UnixMilli(head.MaxTime()))
if sinceLastAppend.Milliseconds() <= t.tsdbOpts.RetentionDuration {
return false, nil
}

level.Info(logger).Log("msg", "Pruning tenant")
if err := tdb.CompactHead(tsdb.NewRangeHead(head, head.MinTime(), head.MaxTime())); err != nil {
return false, err
}

if tenantInstance.shipper() != nil {
uploaded, err := tenantInstance.shipper().Sync(ctx)
if err != nil {
return false, err
}

if uploaded > 0 {
level.Info(logger).Log("msg", "Uploaded head block")
}
}

if err := tdb.Close(); err != nil {
return false, err
}

if err := os.RemoveAll(tenantTSDB.db.Dir()); err != nil {
return false, err
}

return true, nil
}

func (t *MultiTSDB) Sync(ctx context.Context) (int, error) {
if t.bucket == nil {
return 0, errors.New("bucket is not specified, Sync should not be invoked")
Expand Down
90 changes: 90 additions & 0 deletions pkg/receive/multitsdb_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ import (
"testing"
"time"

"github.com/thanos-io/thanos/pkg/objstore"

"github.com/go-kit/log"
"github.com/gogo/protobuf/types"
"github.com/prometheus/client_golang/prometheus"
Expand Down Expand Up @@ -412,6 +414,94 @@ func checkExemplarsResponse(t *testing.T, name string, expected, data []exemplar
}
}

func TestMultiTSDBPrune(t *testing.T) {
tests := []struct {
name string
bucket objstore.Bucket
expectedTenants int
expectedUploads int
}{
{
name: "prune tsdbs without object storage",
bucket: nil,
expectedTenants: 1,
expectedUploads: 0,
},
{
name: "prune tsdbs with object storage",
bucket: objstore.NewInMemBucket(),
expectedTenants: 1,
expectedUploads: 2,
},
}

for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
dir, err := ioutil.TempDir("", "multitsdb-prune")
testutil.Ok(t, err)
defer func() { testutil.Ok(t, os.RemoveAll(dir)) }()

m := NewMultiTSDB(dir, log.NewNopLogger(), prometheus.NewRegistry(),
&tsdb.Options{
MinBlockDuration: (2 * time.Hour).Milliseconds(),
MaxBlockDuration: (2 * time.Hour).Milliseconds(),
RetentionDuration: (6 * time.Hour).Milliseconds(),
},
labels.FromStrings("replica", "test"),
"tenant_id",
test.bucket,
false,
metadata.NoneFunc,
)
defer func() { testutil.Ok(t, m.Close()) }()

for i := 0; i < 100; i++ {
testutil.Ok(t, appendSample(m, "foo", time.UnixMilli(int64(10+i))))
testutil.Ok(t, appendSample(m, "bar", time.UnixMilli(int64(10+i))))
testutil.Ok(t, appendSample(m, "baz", time.Now().Add(time.Duration(i)*time.Second)))
}
testutil.Equals(t, 3, len(m.TSDBStores()))

testutil.Ok(t, m.Prune(context.Background()))
testutil.Equals(t, test.expectedTenants, len(m.TSDBStores()))

var shippedBlocks int
if test.bucket != nil {
testutil.Ok(t, test.bucket.Iter(context.Background(), "", func(s string) error {
shippedBlocks++
return nil
}))
}
testutil.Equals(t, test.expectedUploads, shippedBlocks)
})
}
}

func appendSample(m *MultiTSDB, tenant string, timestamp time.Time) error {
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()

app, err := m.TenantAppendable(tenant)
if err != nil {
return err
}

var a storage.Appender
if err := runutil.Retry(1*time.Second, ctx.Done(), func() error {
a, err = app.Appender(ctx)
return err
}); err != nil {
return err
}

_, err = a.Append(0, labels.FromStrings("foo", "bar"), timestamp.UnixMilli(), 10)
if err != nil {
return err
}

return a.Commit()
}

func BenchmarkMultiTSDB(b *testing.B) {
dir, err := ioutil.TempDir("", "multitsdb")
testutil.Ok(b, err)
Expand Down

0 comments on commit 705200e

Please sign in to comment.