Skip to content

Commit

Permalink
Tenant deletion: API, ingester. (grafana#3549)
Browse files Browse the repository at this point in the history
- Introduced API for tenant deletion.
- Ingester: don't ship blocks and close TSDB faster if tenant deletion mark exists.

Signed-off-by: Peter Štibraný <[email protected]>
  • Loading branch information
pstibrany authored Dec 3, 2020
1 parent 7fe6d3e commit f5fb694
Show file tree
Hide file tree
Showing 3 changed files with 213 additions and 2 deletions.
121 changes: 121 additions & 0 deletions purger/blocks_purger_api.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,121 @@
package purger

import (
"context"
"net/http"
"strings"

"github.com/go-kit/kit/log"
"github.com/go-kit/kit/log/level"
"github.com/oklog/ulid"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
"github.com/thanos-io/thanos/pkg/objstore"

"github.com/cortexproject/cortex/pkg/storage/bucket"
cortex_tsdb "github.com/cortexproject/cortex/pkg/storage/tsdb"
"github.com/cortexproject/cortex/pkg/tenant"
"github.com/cortexproject/cortex/pkg/util"
)

type BlocksPurgerAPI struct {
bucketClient objstore.Bucket
logger log.Logger
}

func NewBlocksPurgerAPI(storageCfg cortex_tsdb.BlocksStorageConfig, logger log.Logger, reg prometheus.Registerer) (*BlocksPurgerAPI, error) {
bucketClient, err := createBucketClient(storageCfg, logger, reg)
if err != nil {
return nil, err
}

return newBlocksPurgerAPI(bucketClient, logger), nil
}

func newBlocksPurgerAPI(bkt objstore.Bucket, logger log.Logger) *BlocksPurgerAPI {
return &BlocksPurgerAPI{bucketClient: bkt, logger: logger}
}

func (api *BlocksPurgerAPI) DeleteTenant(w http.ResponseWriter, r *http.Request) {
ctx := r.Context()
userID, err := tenant.TenantID(ctx)
if err != nil {
http.Error(w, err.Error(), http.StatusBadRequest)
return
}

err = cortex_tsdb.WriteTenantDeletionMark(r.Context(), api.bucketClient, userID)
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}

level.Info(api.logger).Log("msg", "tenant deletion marker created", "user", userID)

w.WriteHeader(http.StatusOK)
}

type DeleteTenantStatusResponse struct {
TenantID string `json:"tenant_id"`
BlocksDeleted bool `json:"blocks_deleted"`
RuleGroupsDeleted bool `json:"rule_groups_deleted"`
AlertManagerConfigDeleted bool `json:"alert_manager_config_deleted"`
}

func (api *BlocksPurgerAPI) DeleteTenantStatus(w http.ResponseWriter, r *http.Request) {
ctx := r.Context()
userID, err := tenant.TenantID(ctx)
if err != nil {
http.Error(w, err.Error(), http.StatusBadRequest)
return
}

result := DeleteTenantStatusResponse{}
result.TenantID = userID
result.BlocksDeleted, err = api.checkBlocksForUser(ctx, userID)

if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}

util.WriteJSONResponse(w, result)
}

func (api *BlocksPurgerAPI) checkBlocksForUser(ctx context.Context, userID string) (bool, error) {
var errBlockFound = errors.New("block found")

userBucket := bucket.NewUserBucketClient(userID, api.bucketClient)
err := userBucket.Iter(ctx, "", func(s string) error {
s = strings.TrimSuffix(s, "/")

_, err := ulid.Parse(s)
if err != nil {
// not block, keep looking
return nil
}

// Used as shortcut to stop iteration.
return errBlockFound
})

if errors.Is(err, errBlockFound) {
return false, nil
}

if err != nil {
return false, err
}

// No blocks found, all good.
return true, nil
}

func createBucketClient(cfg cortex_tsdb.BlocksStorageConfig, logger log.Logger, reg prometheus.Registerer) (objstore.Bucket, error) {
bucketClient, err := bucket.NewClient(context.Background(), cfg.Bucket, "purger", logger, reg)
if err != nil {
return nil, errors.Wrap(err, "create bucket client")
}

return bucketClient, nil
}
90 changes: 90 additions & 0 deletions purger/blocks_purger_api_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
package purger

import (
"bytes"
"context"
"net/http"
"net/http/httptest"
"path"
"testing"

"github.com/go-kit/kit/log"
"github.com/stretchr/testify/require"
"github.com/thanos-io/thanos/pkg/objstore"
"github.com/weaveworks/common/user"

"github.com/cortexproject/cortex/pkg/storage/tsdb"
)

func TestDeleteTenant(t *testing.T) {
bkt := objstore.NewInMemBucket()
api := newBlocksPurgerAPI(bkt, log.NewNopLogger())

{
resp := httptest.NewRecorder()
api.DeleteTenant(resp, &http.Request{})
require.Equal(t, http.StatusBadRequest, resp.Code)
}

{
ctx := context.Background()
ctx = user.InjectOrgID(ctx, "fake")

req := &http.Request{}
resp := httptest.NewRecorder()
api.DeleteTenant(resp, req.WithContext(ctx))

require.Equal(t, http.StatusOK, resp.Code)
objs := bkt.Objects()
require.NotNil(t, objs[path.Join("fake", tsdb.TenantDeletionMarkPath)])
}
}

func TestDeleteTenantStatus(t *testing.T) {
const username = "user"

for name, tc := range map[string]struct {
objects map[string][]byte
expectedBlocksDeleted bool
}{
"empty": {
objects: nil,
expectedBlocksDeleted: true,
},

"no user objects": {
objects: map[string][]byte{
"different-user/01EQK4QKFHVSZYVJ908Y7HH9E0/meta.json": []byte("data"),
},
expectedBlocksDeleted: true,
},

"non-block files": {
objects: map[string][]byte{
"user/deletion-mark.json": []byte("data"),
},
expectedBlocksDeleted: true,
},

"block files": {
objects: map[string][]byte{
"user/01EQK4QKFHVSZYVJ908Y7HH9E0/meta.json": []byte("data"),
},
expectedBlocksDeleted: false,
},
} {
t.Run(name, func(t *testing.T) {
bkt := objstore.NewInMemBucket()
// "upload" objects
for objName, data := range tc.objects {
require.NoError(t, bkt.Upload(context.Background(), objName, bytes.NewReader(data)))
}

api := newBlocksPurgerAPI(bkt, log.NewNopLogger())

res, err := api.checkBlocksForUser(context.Background(), username)
require.NoError(t, err)
require.Equal(t, tc.expectedBlocksDeleted, res)
})
}
}
4 changes: 2 additions & 2 deletions purger/purger.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ type deleteRequestWithLogger struct {
logger log.Logger // logger is initialized with userID and requestID to add context to every log generated using this
}

// Config holds config for Purger
// Config holds config for chunks Purger
type Config struct {
Enable bool `yaml:"enable"`
NumWorkers int `yaml:"num_workers"`
Expand All @@ -108,7 +108,7 @@ type workerJob struct {
logger log.Logger
}

// Purger does the purging of data which is requested to be deleted
// Purger does the purging of data which is requested to be deleted. Purger only works for chunks.
type Purger struct {
services.Service

Expand Down

0 comments on commit f5fb694

Please sign in to comment.