Skip to content

Commit

Permalink
Implement tenant deletion for empty tenants (#3611)
Browse files Browse the repository at this point in the history
* Extend backend.Reader with Find() function

* Extend backend.Writer with Delete() function

* Add EmptyTenantDeletion age config to tempodb

* Add delete handling to poller

* Add integration tests for poller change

* Update backend implementations

* Fix lint

* Setup test defaults

* Integrate prefix test permutations

* Add doc for the new configuration option

* Update changelog

* Fix local implementation to ignore directories

* Tidy up todo after test coverage

* Update docs/sources/tempo/configuration/_index.md

Co-authored-by: Kim Nylander <[email protected]>

* Address some PR feedback

* Rename backend.FindOpts -> backend.FindMatch
* Adjust default delete time from 20m -> 12h

* Add log message for object deletion

* Add additional safety check to avoid deletion

* Add test default value

* Godoc update after rename

* Drop logging from the backend used for testing

* Require empty tenant deletion to be enabled in the config

* Add docs for _enabled config

---------

Co-authored-by: Kim Nylander <[email protected]>
  • Loading branch information
zalegrala and knylander-grafana authored May 3, 2024
1 parent 35aa72e commit 3f831f9
Show file tree
Hide file tree
Showing 16 changed files with 545 additions and 28 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
* [CHANGE] Clean Metrics Generator's Prometheus wal before creating instance [#3548](https://github.com/grafana/tempo/pull/3548) (@ie-pham)
* [CHANGE] Update docker examples for permissions, deprecations, and clean-up [#3603](https://github.com/grafana/tempo/pull/3603) (@zalegrala)
* [FEATURE] Add messaging-system latency histogram to service-graph [#3453](https://github.com/grafana/tempo/pull/3453) (@adirmatzkin)
* [CHANGE] Delete any remaining objects for empty tenants after a configurable duration, requires config enable [#3611](https://github.com/grafana/tempo/pull/3611) (@zalegrala)
* [ENHANCEMENT] Add string interning to TraceQL queries [#3411](https://github.com/grafana/tempo/pull/3411) (@mapno)
* [ENHANCEMENT] Add new (unsafe) query hints for metrics queries [#3396](https://github.com/grafana/tempo/pull/3396) (@mdisibio)
* [ENHANCEMENT] Add nestedSetLeft/Right/Parent instrinsics to TraceQL. [#3497](https://github.com/grafana/tempo/pull/3497) (@joe-elliott)
Expand Down
13 changes: 13 additions & 0 deletions docs/sources/tempo/configuration/_index.md
Original file line number Diff line number Diff line change
Expand Up @@ -982,6 +982,19 @@ storage:
# Default 1
[blocklist_poll_tolerate_consecutive_errors: <int>]

# Used to tune how quickly the poller will delete any remaining backend
# objects found in the tenant path. This functionality requires enabling
# below.
# Default: 12h
[empty_tenant_deletion_age: <duration>]

# Polling will delete the index for a tenant if no blocks are found to
# exist. If this setting is enabled, the poller will also delete any
# remaining backend objects found in the tenant path. This is used to
# clean up partial blocks which may have not been cleaned up by the
# retention.
[empty_tenant_deletion_enabled: <bool> | default = false]

# Cache type to use. Should be one of "redis", "memcached"
# Example: "cache: memcached"
# Deprecated. See [cache](#cache) section below.
Expand Down
199 changes: 197 additions & 2 deletions integration/poller/poller_test.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
package poller

import (
"bytes"
"context"
"crypto/rand"
"os"
"sort"
"testing"
Expand All @@ -23,6 +25,7 @@ import (
"github.com/grafana/tempo/tempodb/backend/gcs"
"github.com/grafana/tempo/tempodb/backend/s3"
"github.com/grafana/tempo/tempodb/blocklist"
"github.com/grafana/tempo/tempodb/encoding/vparquet3"
)

const (
Expand Down Expand Up @@ -146,8 +149,9 @@ func TestPollerOwnership(t *testing.T) {
w := backend.NewWriter(ww)

blocklistPoller := blocklist.NewPoller(&blocklist.PollerConfig{
PollConcurrency: 3,
TenantIndexBuilders: 1,
PollConcurrency: 3,
TenantIndexBuilders: 1,
EmptyTenantDeletionAge: 10 * time.Minute,
}, OwnsEverythingSharder, r, cc, w, logger)

// Use the block boundaries in the GCS and S3 implementation
Expand Down Expand Up @@ -216,6 +220,165 @@ func TestPollerOwnership(t *testing.T) {
}
}

func TestTenantDeletion(t *testing.T) {
testCompactorOwnershipBackends := []struct {
name string
configFile string
}{
{
name: "s3",
configFile: configS3,
},
{
name: "azure",
configFile: configAzurite,
},
{
name: "gcs",
configFile: configGCS,
},
}

storageBackendTestPermutations := []struct {
name string
prefix string
}{
{
name: "empty-string-prefix",
prefix: "",
},
{
name: "no-prefix",
},
{
name: "prefix",
prefix: "a/b/c/",
},
{
name: "prefix-no-trailing-slash",
prefix: "a/b/c",
},
}

logger := log.NewLogfmtLogger(os.Stdout)
var hhh *e2e.HTTPService
t.Parallel()
for _, tc := range testCompactorOwnershipBackends {
for _, pc := range storageBackendTestPermutations {
t.Run(tc.name+"-"+pc.name, func(t *testing.T) {
s, err := e2e.NewScenario("tempo-poller-integration")
require.NoError(t, err)
defer s.Close()

// set up the backend
cfg := app.Config{}
buff, err := os.ReadFile(tc.configFile)
require.NoError(t, err)
err = yaml.UnmarshalStrict(buff, &cfg)
require.NoError(t, err)
hhh, err = e2eBackend.New(s, cfg)
require.NoError(t, err)

err = hhh.WaitReady()
require.NoError(t, err)

err = hhh.Ready()
require.NoError(t, err)

// Give some time for startup
time.Sleep(1 * time.Second)

t.Logf("backend: %s", hhh.Endpoint(hhh.HTTPPort()))

require.NoError(t, util.CopyFileToSharedDir(s, tc.configFile, "config.yaml"))

var rr backend.RawReader
var ww backend.RawWriter
var cc backend.Compactor

concurrency := 3
ctx := context.Background()

e := hhh.Endpoint(hhh.HTTPPort())
switch tc.name {
case "s3":
cfg.StorageConfig.Trace.S3.Endpoint = e
cfg.StorageConfig.Trace.S3.ListBlocksConcurrency = concurrency
cfg.StorageConfig.Trace.S3.Prefix = pc.prefix
cfg.Overrides.UserConfigurableOverridesConfig.Client.S3.Endpoint = e
rr, ww, cc, err = s3.New(cfg.StorageConfig.Trace.S3)
case "gcs":
cfg.Overrides.UserConfigurableOverridesConfig.Client.GCS.Endpoint = e
cfg.StorageConfig.Trace.GCS.Endpoint = e
cfg.StorageConfig.Trace.GCS.ListBlocksConcurrency = concurrency
cfg.StorageConfig.Trace.GCS.Prefix = pc.prefix
rr, ww, cc, err = gcs.New(cfg.StorageConfig.Trace.GCS)
case "azure":
cfg.Overrides.UserConfigurableOverridesConfig.Client.Azure.Endpoint = e
cfg.StorageConfig.Trace.Azure.Endpoint = e
cfg.StorageConfig.Trace.Azure.Prefix = pc.prefix
rr, ww, cc, err = azure.New(cfg.StorageConfig.Trace.Azure)
}
require.NoError(t, err)

r := backend.NewReader(rr)
w := backend.NewWriter(ww)

// Tenant deletion is not enabled by default
blocklistPoller := blocklist.NewPoller(&blocklist.PollerConfig{
PollConcurrency: 3,
TenantIndexBuilders: 1,
EmptyTenantDeletionAge: 100 * time.Millisecond,
}, OwnsEverythingSharder, r, cc, w, logger)

l := blocklist.New()
mm, cm, err := blocklistPoller.Do(l)
require.NoError(t, err)
t.Logf("mm: %v", mm)
t.Logf("cm: %v", cm)

tennants, err := r.Tenants(ctx)
require.NoError(t, err)
require.Equal(t, 0, len(tennants))

writeBadBlockFiles(t, ww, rr, tenant)

// Now we should have a tenant
tennants, err = r.Tenants(ctx)
require.NoError(t, err)
require.Equal(t, 1, len(tennants))

time.Sleep(500 * time.Millisecond)

_, _, err = blocklistPoller.Do(l)
require.NoError(t, err)

tennants, err = r.Tenants(ctx)
t.Logf("tennants: %v", tennants)
require.NoError(t, err)
require.Equal(t, 1, len(tennants))

// Create a new poller with tenantion deletion enabled
blocklistPoller = blocklist.NewPoller(&blocklist.PollerConfig{
PollConcurrency: 3,
TenantIndexBuilders: 1,
EmptyTenantDeletionAge: 100 * time.Millisecond,
EmptyTenantDeletionEnabled: true,
}, OwnsEverythingSharder, r, cc, w, logger)

// Again
_, _, err = blocklistPoller.Do(l)
require.NoError(t, err)

tennants, err = r.Tenants(ctx)
t.Logf("tennants: %v", tennants)
require.NoError(t, err)
require.Equal(t, 0, len(tennants))
})
}
}
}

func found(id uuid.UUID, blockMetas []*backend.BlockMeta) bool {
for _, b := range blockMetas {
if b.BlockID == id {
Expand Down Expand Up @@ -260,3 +423,35 @@ func incrementUUIDBytes(uuidBytes []byte) {
uuidBytes[i] = 0 // Wrap around if the byte is 255
}
}

func writeBadBlockFiles(t *testing.T, ww backend.RawWriter, rr backend.RawReader, tenant string) {
t.Logf("writing bad block files")

ctx := context.Background()

token := make([]byte, 32)
_, err := rand.Read(token)
require.NoError(t, err)

err = ww.Write(
ctx,
vparquet3.DataFileName,
backend.KeyPath([]string{tenant, uuid.New().String()}),
bytes.NewReader(token),
int64(len(token)), nil)

require.NoError(t, err)

items, err := rr.List(context.Background(), backend.KeyPath([]string{tenant}))
require.NoError(t, err)
t.Logf("items: %v", items)

var found []string
f := func(opts backend.FindMatch) {
found = append(found, opts.Key)
}

err = rr.Find(ctx, backend.KeyPath{}, f)
require.NoError(t, err)
t.Logf("items: %v", found)
}
38 changes: 38 additions & 0 deletions tempodb/backend/azure/v1/v1.go
Original file line number Diff line number Diff line change
Expand Up @@ -226,6 +226,44 @@ func (rw *V1) ListBlocks(ctx context.Context, tenant string) ([]uuid.UUID, []uui
return blockIDs, compactedBlockIDs, nil
}

// Find implements backend.Reader
func (rw *V1) Find(ctx context.Context, keypath backend.KeyPath, f backend.FindFunc) (err error) {
keypath = backend.KeyPathWithPrefix(keypath, rw.cfg.Prefix)

marker := blob.Marker{}
prefix := path.Join(keypath...)

if len(prefix) > 0 {
prefix = prefix + dir
}

for {
res, err := rw.containerURL.ListBlobsFlatSegment(ctx, marker, blob.ListBlobsSegmentOptions{
Prefix: prefix,
Details: blob.BlobListingDetails{},
})
if err != nil {
return fmt.Errorf("iterating objects: %w", err)
}
marker = res.NextMarker

for _, blob := range res.Segment.BlobItems {
opts := backend.FindMatch{
Key: blob.Name,
Modified: blob.Properties.LastModified,
}
f(opts)
}

// Continue iterating if we are not done.
if !marker.NotDone() {
break
}
}

return
}

// Read implements backend.Reader
func (rw *V1) Read(ctx context.Context, name string, keypath backend.KeyPath, _ *backend.CacheInfo) (io.ReadCloser, int64, error) {
keypath = backend.KeyPathWithPrefix(keypath, rw.cfg.Prefix)
Expand Down
38 changes: 38 additions & 0 deletions tempodb/backend/azure/v2/v2.go
Original file line number Diff line number Diff line change
Expand Up @@ -227,6 +227,44 @@ func (rw *V2) ListBlocks(ctx context.Context, tenant string) ([]uuid.UUID, []uui
return blockIDs, compactedBlockIDs, nil
}

// Find implements backend.Reader
func (rw *V2) Find(ctx context.Context, keypath backend.KeyPath, f backend.FindFunc) (err error) {
keypath = backend.KeyPathWithPrefix(keypath, rw.cfg.Prefix)

prefix := path.Join(keypath...)

if len(prefix) > 0 {
prefix = prefix + dir
}

pager := rw.containerClient.NewListBlobsFlatPager(&container.ListBlobsFlatOptions{
Prefix: &prefix,
})

var o string
for pager.More() {
page, err := pager.NextPage(ctx)
if err != nil {
return fmt.Errorf("iterating objects: %w", err)
}

for _, b := range page.Segment.BlobItems {
if b == nil || b.Name == nil {
continue
}
o = strings.TrimPrefix(strings.TrimSuffix(*b.Name, dir), prefix)
opts := backend.FindMatch{
Key: o,
Modified: *b.Properties.LastModified,
}
f(opts)
}

}

return
}

// Read implements backend.Reader
func (rw *V2) Read(ctx context.Context, name string, keypath backend.KeyPath, _ *backend.CacheInfo) (io.ReadCloser, int64, error) {
keypath = backend.KeyPathWithPrefix(keypath, rw.cfg.Prefix)
Expand Down
5 changes: 5 additions & 0 deletions tempodb/backend/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"io"

"github.com/google/uuid"

"github.com/grafana/tempo/pkg/cache"
)

Expand Down Expand Up @@ -47,6 +48,8 @@ type Writer interface {
CloseAppend(ctx context.Context, tracker AppendTracker) error
// WriteTenantIndex writes the two meta slices as a tenant index
WriteTenantIndex(ctx context.Context, tenantID string, meta []*BlockMeta, compactedMeta []*CompactedBlockMeta) error
// Delete deletes an object.
Delete(ctx context.Context, name string, keypath KeyPath) error
}

// Reader is a collection of methods to read data from tempodb backends
Expand All @@ -65,6 +68,8 @@ type Reader interface {
BlockMeta(ctx context.Context, blockID uuid.UUID, tenantID string) (*BlockMeta, error)
// TenantIndex returns lists of all metas given a tenant
TenantIndex(ctx context.Context, tenantID string) (*TenantIndex, error)
// Find executes f for each object in the backend that matches the keypath.
Find(ctx context.Context, keypath KeyPath, f FindFunc) error
// Shutdown shuts...down?
Shutdown()
}
Expand Down
6 changes: 6 additions & 0 deletions tempodb/backend/cache/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"github.com/go-kit/log"
"github.com/go-kit/log/level"
"github.com/google/uuid"

"github.com/grafana/tempo/pkg/cache"

tempo_io "github.com/grafana/tempo/pkg/io"
Expand Down Expand Up @@ -72,6 +73,11 @@ func (r *readerWriter) ListBlocks(ctx context.Context, tenant string) (blockIDs
return r.nextReader.ListBlocks(ctx, tenant)
}

// Find implements backend.Reader
func (r *readerWriter) Find(ctx context.Context, keypath backend.KeyPath, f backend.FindFunc) (err error) {
return r.nextReader.Find(ctx, keypath, f)
}

// Read implements backend.RawReader
func (r *readerWriter) Read(ctx context.Context, name string, keypath backend.KeyPath, cacheInfo *backend.CacheInfo) (io.ReadCloser, int64, error) {
var k string
Expand Down
Loading

0 comments on commit 3f831f9

Please sign in to comment.