From 3f831f9e31ed1f65712c27623a21f7ed250b2d24 Mon Sep 17 00:00:00 2001 From: Zach Leslie Date: Fri, 3 May 2024 20:48:42 +0000 Subject: [PATCH] Implement tenant deletion for empty tenants (#3611) * Extend backend.Reader with Find() function * Extend backend.Writer with Delete() function * Add EmptyTenantDeletion age config to tempodb * Add delete handling to poller * Add integration tests for poller change * Update backend implementations * Fix lint * Setup test defaults * Integrate prefix test permutations * Add doc for the new configuration option * Update changelog * Fix local implementation to ignore directories * Tidy up todo after test coverage * Update docs/sources/tempo/configuration/_index.md Co-authored-by: Kim Nylander <104772500+knylander-grafana@users.noreply.github.com> * Address some PR feedback * Rename backend.FindOpts -> backend.FindMatch * Adjust default delete time from 20m -> 12h * Add log message for object deletion * Add additional safety check to avoid deletion * Add test default value * Godoc update after rename * Drop logging from the backend used for testing * Require empty tenant deletion to be enabled in the config * Add docs for _enabled config --------- Co-authored-by: Kim Nylander <104772500+knylander-grafana@users.noreply.github.com> --- CHANGELOG.md | 1 + docs/sources/tempo/configuration/_index.md | 13 ++ integration/poller/poller_test.go | 199 ++++++++++++++++++++- tempodb/backend/azure/v1/v1.go | 38 ++++ tempodb/backend/azure/v2/v2.go | 38 ++++ tempodb/backend/backend.go | 5 + tempodb/backend/cache/cache.go | 6 + tempodb/backend/gcs/gcs.go | 33 ++++ tempodb/backend/local/local.go | 32 ++++ tempodb/backend/mocks.go | 14 +- tempodb/backend/raw.go | 23 ++- tempodb/backend/s3/s3.go | 43 ++++- tempodb/blocklist/poller.go | 81 ++++++++- tempodb/blocklist/poller_test.go | 17 +- tempodb/config.go | 5 + tempodb/tempodb.go | 25 ++- 16 files changed, 545 insertions(+), 28 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 480184df371..2fd123ebd5a 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -21,6 +21,7 @@ * [CHANGE] Clean Metrics Generator's Prometheus wal before creating instance [#3548](https://github.com/grafana/tempo/pull/3548) (@ie-pham) * [CHANGE] Update docker examples for permissions, deprecations, and clean-up [#3603](https://github.com/grafana/tempo/pull/3603) (@zalegrala) * [FEATURE] Add messaging-system latency histogram to service-graph [#3453](https://github.com/grafana/tempo/pull/3453) (@adirmatzkin) +* [CHANGE] Delete any remaining objects for empty tenants after a configurable duration, requires config enable [#3611](https://github.com/grafana/tempo/pull/3611) (@zalegrala) * [ENHANCEMENT] Add string interning to TraceQL queries [#3411](https://github.com/grafana/tempo/pull/3411) (@mapno) * [ENHANCEMENT] Add new (unsafe) query hints for metrics queries [#3396](https://github.com/grafana/tempo/pull/3396) (@mdisibio) * [ENHANCEMENT] Add nestedSetLeft/Right/Parent instrinsics to TraceQL. [#3497](https://github.com/grafana/tempo/pull/3497) (@joe-elliott) diff --git a/docs/sources/tempo/configuration/_index.md b/docs/sources/tempo/configuration/_index.md index f6916045785..5aa6d45c5e6 100644 --- a/docs/sources/tempo/configuration/_index.md +++ b/docs/sources/tempo/configuration/_index.md @@ -982,6 +982,19 @@ storage: # Default 1 [blocklist_poll_tolerate_consecutive_errors: ] + # Used to tune how quickly the poller will delete any remaining backend + # objects found in the tenant path. This functionality requires enabling + # below. + # Default: 12h + [empty_tenant_deletion_age: ] + + # Polling will delete the index for a tenant if no blocks are found to + # exist. If this setting is enabled, the poller will also delete any + # remaining backend objects found in the tenant path. This is used to + # clean up partial blocks which may have not been cleaned up by the + # retention. + [empty_tenant_deletion_enabled: | default = false] + # Cache type to use. Should be one of "redis", "memcached" # Example: "cache: memcached" # Deprecated. See [cache](#cache) section below. diff --git a/integration/poller/poller_test.go b/integration/poller/poller_test.go index 5d013455dae..35c6ebe15d7 100644 --- a/integration/poller/poller_test.go +++ b/integration/poller/poller_test.go @@ -1,7 +1,9 @@ package poller import ( + "bytes" "context" + "crypto/rand" "os" "sort" "testing" @@ -23,6 +25,7 @@ import ( "github.com/grafana/tempo/tempodb/backend/gcs" "github.com/grafana/tempo/tempodb/backend/s3" "github.com/grafana/tempo/tempodb/blocklist" + "github.com/grafana/tempo/tempodb/encoding/vparquet3" ) const ( @@ -146,8 +149,9 @@ func TestPollerOwnership(t *testing.T) { w := backend.NewWriter(ww) blocklistPoller := blocklist.NewPoller(&blocklist.PollerConfig{ - PollConcurrency: 3, - TenantIndexBuilders: 1, + PollConcurrency: 3, + TenantIndexBuilders: 1, + EmptyTenantDeletionAge: 10 * time.Minute, }, OwnsEverythingSharder, r, cc, w, logger) // Use the block boundaries in the GCS and S3 implementation @@ -216,6 +220,165 @@ func TestPollerOwnership(t *testing.T) { } } +func TestTenantDeletion(t *testing.T) { + testCompactorOwnershipBackends := []struct { + name string + configFile string + }{ + { + name: "s3", + configFile: configS3, + }, + { + name: "azure", + configFile: configAzurite, + }, + { + name: "gcs", + configFile: configGCS, + }, + } + + storageBackendTestPermutations := []struct { + name string + prefix string + }{ + { + name: "empty-string-prefix", + prefix: "", + }, + { + name: "no-prefix", + }, + { + name: "prefix", + prefix: "a/b/c/", + }, + { + name: "prefix-no-trailing-slash", + prefix: "a/b/c", + }, + } + + logger := log.NewLogfmtLogger(os.Stdout) + var hhh *e2e.HTTPService + t.Parallel() + for _, tc := range testCompactorOwnershipBackends { + for _, pc := range storageBackendTestPermutations { + t.Run(tc.name+"-"+pc.name, func(t *testing.T) { + s, err := e2e.NewScenario("tempo-poller-integration") + require.NoError(t, err) + defer s.Close() + + // set up the backend + cfg := app.Config{} + buff, err := os.ReadFile(tc.configFile) + require.NoError(t, err) + err = yaml.UnmarshalStrict(buff, &cfg) + require.NoError(t, err) + hhh, err = e2eBackend.New(s, cfg) + require.NoError(t, err) + + err = hhh.WaitReady() + require.NoError(t, err) + + err = hhh.Ready() + require.NoError(t, err) + + // Give some time for startup + time.Sleep(1 * time.Second) + + t.Logf("backend: %s", hhh.Endpoint(hhh.HTTPPort())) + + require.NoError(t, util.CopyFileToSharedDir(s, tc.configFile, "config.yaml")) + + var rr backend.RawReader + var ww backend.RawWriter + var cc backend.Compactor + + concurrency := 3 + ctx := context.Background() + + e := hhh.Endpoint(hhh.HTTPPort()) + switch tc.name { + case "s3": + cfg.StorageConfig.Trace.S3.Endpoint = e + cfg.StorageConfig.Trace.S3.ListBlocksConcurrency = concurrency + cfg.StorageConfig.Trace.S3.Prefix = pc.prefix + cfg.Overrides.UserConfigurableOverridesConfig.Client.S3.Endpoint = e + rr, ww, cc, err = s3.New(cfg.StorageConfig.Trace.S3) + case "gcs": + cfg.Overrides.UserConfigurableOverridesConfig.Client.GCS.Endpoint = e + cfg.StorageConfig.Trace.GCS.Endpoint = e + cfg.StorageConfig.Trace.GCS.ListBlocksConcurrency = concurrency + cfg.StorageConfig.Trace.GCS.Prefix = pc.prefix + rr, ww, cc, err = gcs.New(cfg.StorageConfig.Trace.GCS) + case "azure": + cfg.Overrides.UserConfigurableOverridesConfig.Client.Azure.Endpoint = e + cfg.StorageConfig.Trace.Azure.Endpoint = e + cfg.StorageConfig.Trace.Azure.Prefix = pc.prefix + rr, ww, cc, err = azure.New(cfg.StorageConfig.Trace.Azure) + } + require.NoError(t, err) + + r := backend.NewReader(rr) + w := backend.NewWriter(ww) + + // Tenant deletion is not enabled by default + blocklistPoller := blocklist.NewPoller(&blocklist.PollerConfig{ + PollConcurrency: 3, + TenantIndexBuilders: 1, + EmptyTenantDeletionAge: 100 * time.Millisecond, + }, OwnsEverythingSharder, r, cc, w, logger) + + l := blocklist.New() + mm, cm, err := blocklistPoller.Do(l) + require.NoError(t, err) + t.Logf("mm: %v", mm) + t.Logf("cm: %v", cm) + + tennants, err := r.Tenants(ctx) + require.NoError(t, err) + require.Equal(t, 0, len(tennants)) + + writeBadBlockFiles(t, ww, rr, tenant) + + // Now we should have a tenant + tennants, err = r.Tenants(ctx) + require.NoError(t, err) + require.Equal(t, 1, len(tennants)) + + time.Sleep(500 * time.Millisecond) + + _, _, err = blocklistPoller.Do(l) + require.NoError(t, err) + + tennants, err = r.Tenants(ctx) + t.Logf("tennants: %v", tennants) + require.NoError(t, err) + require.Equal(t, 1, len(tennants)) + + // Create a new poller with tenantion deletion enabled + blocklistPoller = blocklist.NewPoller(&blocklist.PollerConfig{ + PollConcurrency: 3, + TenantIndexBuilders: 1, + EmptyTenantDeletionAge: 100 * time.Millisecond, + EmptyTenantDeletionEnabled: true, + }, OwnsEverythingSharder, r, cc, w, logger) + + // Again + _, _, err = blocklistPoller.Do(l) + require.NoError(t, err) + + tennants, err = r.Tenants(ctx) + t.Logf("tennants: %v", tennants) + require.NoError(t, err) + require.Equal(t, 0, len(tennants)) + }) + } + } +} + func found(id uuid.UUID, blockMetas []*backend.BlockMeta) bool { for _, b := range blockMetas { if b.BlockID == id { @@ -260,3 +423,35 @@ func incrementUUIDBytes(uuidBytes []byte) { uuidBytes[i] = 0 // Wrap around if the byte is 255 } } + +func writeBadBlockFiles(t *testing.T, ww backend.RawWriter, rr backend.RawReader, tenant string) { + t.Logf("writing bad block files") + + ctx := context.Background() + + token := make([]byte, 32) + _, err := rand.Read(token) + require.NoError(t, err) + + err = ww.Write( + ctx, + vparquet3.DataFileName, + backend.KeyPath([]string{tenant, uuid.New().String()}), + bytes.NewReader(token), + int64(len(token)), nil) + + require.NoError(t, err) + + items, err := rr.List(context.Background(), backend.KeyPath([]string{tenant})) + require.NoError(t, err) + t.Logf("items: %v", items) + + var found []string + f := func(opts backend.FindMatch) { + found = append(found, opts.Key) + } + + err = rr.Find(ctx, backend.KeyPath{}, f) + require.NoError(t, err) + t.Logf("items: %v", found) +} diff --git a/tempodb/backend/azure/v1/v1.go b/tempodb/backend/azure/v1/v1.go index d06d0cf82f1..65fc70c4bb9 100644 --- a/tempodb/backend/azure/v1/v1.go +++ b/tempodb/backend/azure/v1/v1.go @@ -226,6 +226,44 @@ func (rw *V1) ListBlocks(ctx context.Context, tenant string) ([]uuid.UUID, []uui return blockIDs, compactedBlockIDs, nil } +// Find implements backend.Reader +func (rw *V1) Find(ctx context.Context, keypath backend.KeyPath, f backend.FindFunc) (err error) { + keypath = backend.KeyPathWithPrefix(keypath, rw.cfg.Prefix) + + marker := blob.Marker{} + prefix := path.Join(keypath...) + + if len(prefix) > 0 { + prefix = prefix + dir + } + + for { + res, err := rw.containerURL.ListBlobsFlatSegment(ctx, marker, blob.ListBlobsSegmentOptions{ + Prefix: prefix, + Details: blob.BlobListingDetails{}, + }) + if err != nil { + return fmt.Errorf("iterating objects: %w", err) + } + marker = res.NextMarker + + for _, blob := range res.Segment.BlobItems { + opts := backend.FindMatch{ + Key: blob.Name, + Modified: blob.Properties.LastModified, + } + f(opts) + } + + // Continue iterating if we are not done. + if !marker.NotDone() { + break + } + } + + return +} + // Read implements backend.Reader func (rw *V1) Read(ctx context.Context, name string, keypath backend.KeyPath, _ *backend.CacheInfo) (io.ReadCloser, int64, error) { keypath = backend.KeyPathWithPrefix(keypath, rw.cfg.Prefix) diff --git a/tempodb/backend/azure/v2/v2.go b/tempodb/backend/azure/v2/v2.go index 56f7126d60d..4470ce84564 100644 --- a/tempodb/backend/azure/v2/v2.go +++ b/tempodb/backend/azure/v2/v2.go @@ -227,6 +227,44 @@ func (rw *V2) ListBlocks(ctx context.Context, tenant string) ([]uuid.UUID, []uui return blockIDs, compactedBlockIDs, nil } +// Find implements backend.Reader +func (rw *V2) Find(ctx context.Context, keypath backend.KeyPath, f backend.FindFunc) (err error) { + keypath = backend.KeyPathWithPrefix(keypath, rw.cfg.Prefix) + + prefix := path.Join(keypath...) + + if len(prefix) > 0 { + prefix = prefix + dir + } + + pager := rw.containerClient.NewListBlobsFlatPager(&container.ListBlobsFlatOptions{ + Prefix: &prefix, + }) + + var o string + for pager.More() { + page, err := pager.NextPage(ctx) + if err != nil { + return fmt.Errorf("iterating objects: %w", err) + } + + for _, b := range page.Segment.BlobItems { + if b == nil || b.Name == nil { + continue + } + o = strings.TrimPrefix(strings.TrimSuffix(*b.Name, dir), prefix) + opts := backend.FindMatch{ + Key: o, + Modified: *b.Properties.LastModified, + } + f(opts) + } + + } + + return +} + // Read implements backend.Reader func (rw *V2) Read(ctx context.Context, name string, keypath backend.KeyPath, _ *backend.CacheInfo) (io.ReadCloser, int64, error) { keypath = backend.KeyPathWithPrefix(keypath, rw.cfg.Prefix) diff --git a/tempodb/backend/backend.go b/tempodb/backend/backend.go index 7b5c71d4c23..c3a49444b6f 100644 --- a/tempodb/backend/backend.go +++ b/tempodb/backend/backend.go @@ -6,6 +6,7 @@ import ( "io" "github.com/google/uuid" + "github.com/grafana/tempo/pkg/cache" ) @@ -47,6 +48,8 @@ type Writer interface { CloseAppend(ctx context.Context, tracker AppendTracker) error // WriteTenantIndex writes the two meta slices as a tenant index WriteTenantIndex(ctx context.Context, tenantID string, meta []*BlockMeta, compactedMeta []*CompactedBlockMeta) error + // Delete deletes an object. + Delete(ctx context.Context, name string, keypath KeyPath) error } // Reader is a collection of methods to read data from tempodb backends @@ -65,6 +68,8 @@ type Reader interface { BlockMeta(ctx context.Context, blockID uuid.UUID, tenantID string) (*BlockMeta, error) // TenantIndex returns lists of all metas given a tenant TenantIndex(ctx context.Context, tenantID string) (*TenantIndex, error) + // Find executes f for each object in the backend that matches the keypath. + Find(ctx context.Context, keypath KeyPath, f FindFunc) error // Shutdown shuts...down? Shutdown() } diff --git a/tempodb/backend/cache/cache.go b/tempodb/backend/cache/cache.go index 0b6888327b6..d0313697eef 100644 --- a/tempodb/backend/cache/cache.go +++ b/tempodb/backend/cache/cache.go @@ -11,6 +11,7 @@ import ( "github.com/go-kit/log" "github.com/go-kit/log/level" "github.com/google/uuid" + "github.com/grafana/tempo/pkg/cache" tempo_io "github.com/grafana/tempo/pkg/io" @@ -72,6 +73,11 @@ func (r *readerWriter) ListBlocks(ctx context.Context, tenant string) (blockIDs return r.nextReader.ListBlocks(ctx, tenant) } +// Find implements backend.Reader +func (r *readerWriter) Find(ctx context.Context, keypath backend.KeyPath, f backend.FindFunc) (err error) { + return r.nextReader.Find(ctx, keypath, f) +} + // Read implements backend.RawReader func (r *readerWriter) Read(ctx context.Context, name string, keypath backend.KeyPath, cacheInfo *backend.CacheInfo) (io.ReadCloser, int64, error) { var k string diff --git a/tempodb/backend/gcs/gcs.go b/tempodb/backend/gcs/gcs.go index 1d4b346e68b..a8e110ff4f2 100644 --- a/tempodb/backend/gcs/gcs.go +++ b/tempodb/backend/gcs/gcs.go @@ -308,6 +308,39 @@ func (rw *readerWriter) ListBlocks(ctx context.Context, tenant string) ([]uuid.U return blockIDs, compactedBlockIDs, nil } +// Find implements backend.Reader +func (rw *readerWriter) Find(ctx context.Context, keypath backend.KeyPath, f backend.FindFunc) (err error) { + keypath = backend.KeyPathWithPrefix(keypath, rw.cfg.Prefix) + prefix := path.Join(keypath...) + if len(prefix) > 0 { + prefix = prefix + "/" + } + + iter := rw.bucket.Objects(ctx, &storage.Query{ + Delimiter: "", + Prefix: prefix, + Versions: false, + }) + + for { + attrs, iterErr := iter.Next() + if errors.Is(iterErr, iterator.Done) { + break + } + if iterErr != nil { + return fmt.Errorf("iterating objects: %w", err) + } + + opts := backend.FindMatch{ + Key: attrs.Name, + Modified: attrs.Updated, + } + f(opts) + } + + return +} + // Read implements backend.Reader func (rw *readerWriter) Read(ctx context.Context, name string, keypath backend.KeyPath, _ *backend.CacheInfo) (io.ReadCloser, int64, error) { keypath = backend.KeyPathWithPrefix(keypath, rw.cfg.Prefix) diff --git a/tempodb/backend/local/local.go b/tempodb/backend/local/local.go index 08f120cd490..42f45b616f9 100644 --- a/tempodb/backend/local/local.go +++ b/tempodb/backend/local/local.go @@ -190,6 +190,38 @@ func (rw *Backend) ListBlocks(_ context.Context, tenant string) (metas []uuid.UU return } +// Find implements backend.Reader +func (rw *Backend) Find(_ context.Context, keypath backend.KeyPath, f backend.FindFunc) (err error) { + path := rw.rootPath(keypath) + fff := os.DirFS(path) + err = fs.WalkDir(fff, ".", func(path string, d fs.DirEntry, err error) error { + if err != nil { + return err + } + + if d.IsDir() { + return nil + } + + info, err := d.Info() + if err != nil { + return err + } + + tenantFilePath := filepath.Join(filepath.Join(keypath...), path) + opts := backend.FindMatch{ + Key: tenantFilePath, + Modified: info.ModTime(), + } + + f(opts) + + return nil + }) + + return +} + // Read implements backend.Reader func (rw *Backend) Read(ctx context.Context, name string, keypath backend.KeyPath, _ *backend.CacheInfo) (io.ReadCloser, int64, error) { if err := ctx.Err(); err != nil { diff --git a/tempodb/backend/mocks.go b/tempodb/backend/mocks.go index d4773100fbb..15f5dcbc836 100644 --- a/tempodb/backend/mocks.go +++ b/tempodb/backend/mocks.go @@ -28,10 +28,10 @@ type MockRawReader struct { R []byte // read Range []byte // ReadRange ReadFn func(ctx context.Context, name string, keypath KeyPath, cacheInfo *CacheInfo) (io.ReadCloser, int64, error) + DeleteResult []string BlockIDs []uuid.UUID CompactedBlockIDs []uuid.UUID - FindResult []string } func (m *MockRawReader) List(ctx context.Context, keypath KeyPath) ([]string, error) { @@ -50,6 +50,10 @@ func (m *MockRawReader) ListBlocks(ctx context.Context, tenant string) ([]uuid.U return m.BlockIDs, m.CompactedBlockIDs, nil } +func (m *MockRawReader) Find(_ context.Context, _ KeyPath, _ FindFunc) error { + return nil +} + func (m *MockRawReader) Read(ctx context.Context, name string, keypath KeyPath, cacheInfo *CacheInfo) (io.ReadCloser, int64, error) { if m.ReadFn != nil { return m.ReadFn(ctx, name, keypath, cacheInfo) @@ -155,6 +159,10 @@ type MockReader struct { CompactedBlockIDs []uuid.UUID // blocks } +func (m *MockReader) Find(_ context.Context, _ KeyPath, _ FindFunc) error { + return nil +} + func (m *MockReader) Tenants(context.Context) ([]string, error) { return m.T, nil } @@ -242,6 +250,10 @@ func (m *MockWriter) CloseAppend(context.Context, AppendTracker) error { return nil } +func (m *MockWriter) Delete(context.Context, string, KeyPath) error { + return nil +} + func (m *MockWriter) WriteTenantIndex(_ context.Context, tenantID string, meta []*BlockMeta, compactedMeta []*CompactedBlockMeta) error { m.Lock() defer m.Unlock() diff --git a/tempodb/backend/raw.go b/tempodb/backend/raw.go index d4d332c27e2..4fc463860d6 100644 --- a/tempodb/backend/raw.go +++ b/tempodb/backend/raw.go @@ -7,6 +7,7 @@ import ( "errors" "io" "path" + "time" "github.com/google/uuid" @@ -25,7 +26,15 @@ const ( // from the backend type KeyPath []string -type Feature int +// FundFunc is executed for each object in the backend. The provided FindMatch +// are used to determine how to handle the object. Any collection of these +// objects is the callers responsibility. +type FindFunc func(FindMatch) + +type FindMatch struct { + Modified time.Time + Key string +} // RawWriter is a collection of methods to write data to tempodb backends type RawWriter interface { @@ -45,6 +54,8 @@ type RawReader interface { List(ctx context.Context, keypath KeyPath) ([]string, error) // ListBlocks returns all blockIDs and compactedBlockIDs for a tenant. ListBlocks(ctx context.Context, tenant string) (blockIDs []uuid.UUID, compactedBlockIDs []uuid.UUID, err error) + // Find executes the FindFunc for each object in the backend starting at the specified keypath. Collection of these objects is the callers responsibility. + Find(ctx context.Context, keypath KeyPath, f FindFunc) error // Read is for streaming entire objects from the backend. There will be an attempt to retrieve this from cache if shouldCache is true. Read(ctx context.Context, name string, keyPath KeyPath, cacheInfo *CacheInfo) (io.ReadCloser, int64, error) // ReadRange is for reading parts of large objects from the backend. @@ -131,6 +142,11 @@ func (w *writer) WriteTenantIndex(ctx context.Context, tenantID string, meta []* return nil } +// Delete implements backend.Writer +func (w *writer) Delete(ctx context.Context, name string, keypath KeyPath) error { + return w.w.Delete(ctx, name, keypath, nil) +} + type reader struct { r RawReader } @@ -227,6 +243,11 @@ func (r *reader) TenantIndex(ctx context.Context, tenantID string) (*TenantIndex return i, nil } +// Find implements backend.Reader +func (r *reader) Find(ctx context.Context, keypath KeyPath, f FindFunc) error { + return r.r.Find(ctx, keypath, f) +} + // Shutdown implements backend.Reader func (r *reader) Shutdown() { r.r.Shutdown() diff --git a/tempodb/backend/s3/s3.go b/tempodb/backend/s3/s3.go index dd037112e68..cdc5369e8d2 100644 --- a/tempodb/backend/s3/s3.go +++ b/tempodb/backend/s3/s3.go @@ -20,7 +20,7 @@ import ( "github.com/cristalhq/hedgedhttp" gkLog "github.com/go-kit/log" "github.com/go-kit/log/level" - "github.com/minio/minio-go/v7" + minio "github.com/minio/minio-go/v7" "github.com/minio/minio-go/v7/pkg/credentials" "github.com/opentracing/opentracing-go" @@ -391,6 +391,47 @@ func (rw *readerWriter) ListBlocks( return blockIDs, compactedBlockIDs, nil } +// Find implements backend.Reader +func (rw *readerWriter) Find(ctx context.Context, keypath backend.KeyPath, f backend.FindFunc) (err error) { + keypath = backend.KeyPathWithPrefix(keypath, rw.cfg.Prefix) + prefix := path.Join(keypath...) + + if len(prefix) > 0 { + prefix = prefix + "/" + } + + nextToken := "" + isTruncated := true + var res minio.ListBucketV2Result + + for isTruncated { + select { + case <-ctx.Done(): + return + default: + res, err = rw.core.ListObjectsV2(rw.cfg.Bucket, prefix, "", nextToken, "", 0) + if err != nil { + return fmt.Errorf("error finding objects in s3 bucket, bucket: %s: %w", rw.cfg.Bucket, err) + } + + isTruncated = res.IsTruncated + nextToken = res.NextContinuationToken + + if len(res.Contents) > 0 { + for _, c := range res.Contents { + opts := backend.FindMatch{ + Key: c.Key, + Modified: c.LastModified, + } + f(opts) + } + } + } + } + + return +} + // Read implements backend.Reader func (rw *readerWriter) Read(ctx context.Context, name string, keypath backend.KeyPath, _ *backend.CacheInfo) (io.ReadCloser, int64, error) { span, derivedCtx := opentracing.StartSpanFromContext(ctx, "s3.Read") diff --git a/tempodb/blocklist/poller.go b/tempodb/blocklist/poller.go index 6afa995440b..90a1f9d9a59 100644 --- a/tempodb/blocklist/poller.go +++ b/tempodb/blocklist/poller.go @@ -5,6 +5,7 @@ import ( "errors" "fmt" "math/rand" + "path" "sort" "strconv" "sync" @@ -74,12 +75,14 @@ var ( // Config is used to configure the poller type PollerConfig struct { - PollConcurrency uint - PollFallback bool - TenantIndexBuilders int - StaleTenantIndex time.Duration - PollJitterMs int - TolerateConsecutiveErrors int + PollConcurrency uint + PollFallback bool + TenantIndexBuilders int + StaleTenantIndex time.Duration + PollJitterMs int + TolerateConsecutiveErrors int + EmptyTenantDeletionAge time.Duration + EmptyTenantDeletionEnabled bool } // JobSharder is used to determine if a particular job is owned by this process @@ -231,7 +234,7 @@ func (p *Poller) pollTenantAndCreateIndex( metricTenantIndexBuilder.WithLabelValues(tenantID).Set(1) blocklist, compactedBlocklist, err := p.pollTenantBlocks(derivedCtx, tenantID, previous) if err != nil { - return nil, nil, err + return nil, nil, fmt.Errorf("failed to poll tenant blocks: %w", err) } // everything is happy, write this tenant index @@ -241,6 +244,14 @@ func (p *Poller) pollTenantAndCreateIndex( metricTenantIndexErrors.WithLabelValues(tenantID).Inc() level.Error(p.logger).Log("msg", "failed to write tenant index", "tenant", tenantID, "err", err) } + + if len(blocklist) == 0 && len(compactedBlocklist) == 0 { + err := p.deleteTenant(ctx, tenantID) + if err != nil { + return nil, nil, fmt.Errorf("failed to delete tenant: %w", err) + } + } + metricTenantIndexAgeSeconds.WithLabelValues(tenantID).Set(0) return blocklist, compactedBlocklist, nil @@ -456,6 +467,62 @@ func (p *Poller) tenantIndexPollError(idx *backend.TenantIndex, err error) error return nil } +// deleteTenant will delete all of a tenant's objects if there is not a tenant index present. +func (p *Poller) deleteTenant(ctx context.Context, tenantID string) error { + // If we have not enabled empty tenant deletion, do nothing. + if !p.cfg.EmptyTenantDeletionEnabled { + return nil + } + + level.Info(p.logger).Log("msg", "deleting tenant", "tenant", tenantID) + + if p.cfg.EmptyTenantDeletionAge == 0 { + return fmt.Errorf("empty tenant deletion age must be greater than 0") + } + + var ( + foundObjects []string + recentObjects int + ) + err := p.reader.Find(ctx, backend.KeyPath{tenantID}, func(opts backend.FindMatch) { + level.Info(p.logger).Log("msg", "checking object for deletion", "object", opts.Key, "modified", opts.Modified) + + if time.Since(opts.Modified) > p.cfg.EmptyTenantDeletionAge { + foundObjects = append(foundObjects, opts.Key) + } else { + recentObjects++ + } + }) + if err != nil { + return err + } + + // do nothing if there are recent objects for this tenant. + if recentObjects > 0 { + return nil + } + + // do nothing if the tenant index has appeared. + _, err = p.reader.TenantIndex(ctx, tenantID) + // If we have any error other than that which indicates that the tenant index + // call was made successfully, and that it does not exist, do nothing. Only + // proceed if we know that the index does not exist. + if !errors.Is(err, backend.ErrDoesNotExist) { + return nil + } + + for _, object := range foundObjects { + dir, name := path.Split(object) + level.Info(p.logger).Log("msg", "deleting", "tenant", tenantID, "object", object) + err = p.writer.Delete(ctx, name, backend.KeyPath{dir}) + if err != nil { + return err + } + } + + return nil +} + type backendMetaMetrics struct { blockMetaTotalObjects int compactedBlockMetaTotalObjects int diff --git a/tempodb/blocklist/poller_test.go b/tempodb/blocklist/poller_test.go index 1cb63234b1b..ecff72d057d 100644 --- a/tempodb/blocklist/poller_test.go +++ b/tempodb/blocklist/poller_test.go @@ -21,9 +21,10 @@ import ( ) var ( - testPollConcurrency = uint(10) - testPollFallback = true - testBuilders = 1 + testPollConcurrency = uint(10) + testPollFallback = true + testBuilders = 1 + testEmptyTenantIndexAge = 1 * time.Minute ) type mockJobSharder struct { @@ -260,10 +261,11 @@ func TestTenantIndexFallback(t *testing.T) { } poller := NewPoller(&PollerConfig{ - PollConcurrency: testPollConcurrency, - PollFallback: tc.pollFallback, - TenantIndexBuilders: testBuilders, - StaleTenantIndex: tc.staleTenantIndex, + PollConcurrency: testPollConcurrency, + PollFallback: tc.pollFallback, + TenantIndexBuilders: testBuilders, + StaleTenantIndex: tc.staleTenantIndex, + EmptyTenantDeletionAge: testEmptyTenantIndexAge, }, &mockJobSharder{ owns: tc.isTenantIndexBuilder, }, r, c, w, log.NewNopLogger()) @@ -573,6 +575,7 @@ func TestPollTolerateConsecutiveErrors(t *testing.T) { PollFallback: testPollFallback, TenantIndexBuilders: testBuilders, TolerateConsecutiveErrors: tc.tolerate, + EmptyTenantDeletionAge: testEmptyTenantIndexAge, }, s, r, c, w, log.NewNopLogger()) _, _, err := poller.Do(b) diff --git a/tempodb/config.go b/tempodb/config.go index 76d0cbbc35d..ce1578ec57d 100644 --- a/tempodb/config.go +++ b/tempodb/config.go @@ -29,6 +29,8 @@ const ( DefaultTenantIndexBuilders = 2 DefaultTolerateConsecutiveErrors = 1 + DefaultEmptyTenantDeletionAge = 12 * time.Hour + DefaultPrefetchTraceCount = 1000 DefaultSearchChunkSizeBytes = 1_000_000 DefaultReadBufferCount = 32 @@ -51,6 +53,9 @@ type Config struct { BlocklistPollJitterMs int `yaml:"blocklist_poll_jitter_ms"` BlocklistPollTolerateConsecutiveErrors int `yaml:"blocklist_poll_tolerate_consecutive_errors"` + EmptyTenantDeletionEnabled bool `yaml:"empty_tenant_deletion_enabled"` + EmptyTenantDeletionAge time.Duration `yaml:"empty_tenant_deletion_age"` + // backends Backend string `yaml:"backend"` Local *local.Config `yaml:"local"` diff --git a/tempodb/tempodb.go b/tempodb/tempodb.go index 28ffc448ca9..4dbfbb39460 100644 --- a/tempodb/tempodb.go +++ b/tempodb/tempodb.go @@ -13,6 +13,10 @@ import ( gkLog "github.com/go-kit/log" "github.com/go-kit/log/level" "github.com/google/uuid" + "github.com/opentracing/opentracing-go" + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/promauto" + "github.com/grafana/tempo/modules/cache/memcached" "github.com/grafana/tempo/modules/cache/redis" "github.com/grafana/tempo/pkg/cache" @@ -30,9 +34,6 @@ import ( "github.com/grafana/tempo/tempodb/encoding/common" "github.com/grafana/tempo/tempodb/pool" "github.com/grafana/tempo/tempodb/wal" - "github.com/opentracing/opentracing-go" - "github.com/prometheus/client_golang/prometheus" - "github.com/prometheus/client_golang/prometheus/promauto" ) const ( @@ -501,15 +502,21 @@ func (rw *readerWriter) EnablePolling(ctx context.Context, sharder blocklist.Job rw.cfg.BlocklistPollTenantIndexBuilders = DefaultTenantIndexBuilders } + if rw.cfg.EmptyTenantDeletionAge <= 0 { + rw.cfg.EmptyTenantDeletionAge = DefaultEmptyTenantDeletionAge + } + level.Info(rw.logger).Log("msg", "polling enabled", "interval", rw.cfg.BlocklistPoll, "blocklist_concurrency", rw.cfg.BlocklistPollConcurrency) blocklistPoller := blocklist.NewPoller(&blocklist.PollerConfig{ - PollConcurrency: rw.cfg.BlocklistPollConcurrency, - PollFallback: rw.cfg.BlocklistPollFallback, - TenantIndexBuilders: rw.cfg.BlocklistPollTenantIndexBuilders, - StaleTenantIndex: rw.cfg.BlocklistPollStaleTenantIndex, - PollJitterMs: rw.cfg.BlocklistPollJitterMs, - TolerateConsecutiveErrors: rw.cfg.BlocklistPollTolerateConsecutiveErrors, + PollConcurrency: rw.cfg.BlocklistPollConcurrency, + PollFallback: rw.cfg.BlocklistPollFallback, + TenantIndexBuilders: rw.cfg.BlocklistPollTenantIndexBuilders, + StaleTenantIndex: rw.cfg.BlocklistPollStaleTenantIndex, + PollJitterMs: rw.cfg.BlocklistPollJitterMs, + TolerateConsecutiveErrors: rw.cfg.BlocklistPollTolerateConsecutiveErrors, + EmptyTenantDeletionAge: rw.cfg.EmptyTenantDeletionAge, + EmptyTenantDeletionEnabled: rw.cfg.EmptyTenantDeletionEnabled, }, sharder, rw.r, rw.c, rw.w, rw.logger) rw.blocklistPoller = blocklistPoller