Skip to content

Commit

Permalink
copyblocks: support copying between tenants (#10110)
Browse files Browse the repository at this point in the history
* copyblocks: support copying between tenants

* Update tools/copyblocks/README.md

* Update tools/copyblocks/README.md

Co-authored-by: Taylor C <[email protected]>

* Address feedback

---------

Co-authored-by: Taylor C <[email protected]>
  • Loading branch information
andyasp and tacole02 authored Dec 4, 2024
1 parent 94aa454 commit be2f23b
Show file tree
Hide file tree
Showing 3 changed files with 72 additions and 22 deletions.
3 changes: 2 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,8 @@
### Tools

* [FEATURE] `splitblocks`: add new tool to split blocks larger than a specified duration into multiple blocks. #9517, #9779
* [ENHANCEMENT] `copyblocks`: Added `--skip-no-compact-block-duration-check`, which defaults to `false`, to simplify targeting blocks that are not awaiting compaction. #9439
* [ENHANCEMENT] `copyblocks`: add `--skip-no-compact-block-duration-check`, which defaults to `false`, to simplify targeting blocks that are not awaiting compaction. #9439
* [ENHANCEMENT] `copyblocks`: add `--user-mapping` to support copying blocks between users. #10110
* [ENHANCEMENT] `kafkatool`: add SASL plain authentication support. The following new CLI flags have been added: #9584
* `--kafka-sasl-username`
* `--kafka-sasl-password`
Expand Down
1 change: 1 addition & 0 deletions tools/copyblocks/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ The currently supported services are Amazon Simple Storage Service (S3 and S3-co
- Include or exclude users from having blocks copied (`--enabled-users` and `--disabled-users`)
- Configurable minimum block duration (`--min-block-duration`) and (`--skip-no-compact-block-duration-check`) to target blocks that are not awaiting compaction
- Configurable time range (`--min-time` and `--max-time`) to only copy blocks inclusively within a provided range
- Copy blocks between users with `--user-mapping`. For instance, `--user-mapping="user1:user2,user3:user4"` maps source blocks from `user1` to `user2` and source blocks from `user3` to `user4`. If you don't provide a mapping for a user, it is assumed to be identical to the source user.
- Log what would be copied without actually copying anything with `--dry-run`

## Running
Expand Down
90 changes: 69 additions & 21 deletions tools/copyblocks/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
"os"
"os/signal"
"path/filepath"
"slices"
"strings"
"syscall"
"time"
Expand All @@ -24,6 +25,7 @@ import (
"github.com/go-kit/log/level"
"github.com/grafana/dskit/concurrency"
"github.com/grafana/dskit/flagext"
"github.com/grafana/dskit/tenant"
"github.com/oklog/ulid"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
Expand All @@ -44,6 +46,7 @@ type config struct {
copyPeriod time.Duration
enabledUsers flagext.StringSliceCSV
disabledUsers flagext.StringSliceCSV
userMapping flagext.StringSliceCSV
dryRun bool
skipNoCompactBlockDurationCheck bool
httpListen string
Expand All @@ -59,8 +62,9 @@ func (c *config) registerFlags(f *flag.FlagSet) {
f.DurationVar(&c.copyPeriod, "copy-period", 0, "How often to repeat the copy. If set to 0, copy is done once, and the program stops. Otherwise, the program keeps running and copying blocks until it is terminated.")
f.Var(&c.enabledUsers, "enabled-users", "If not empty, only blocks for these users are copied.")
f.Var(&c.disabledUsers, "disabled-users", "If not empty, blocks for these users are not copied.")
f.Var(&c.userMapping, "user-mapping", "A comma-separated list of (source user):(destination user). If a user is not mapped then its destination user is assumed to be identical.")
f.BoolVar(&c.dryRun, "dry-run", false, "Don't perform any copy; only log what would happen.")
f.BoolVar(&c.skipNoCompactBlockDurationCheck, "skip-no-compact-block-duration-check", false, "If set, blocks marked as no-compact are not checked against min-block-duration")
f.BoolVar(&c.skipNoCompactBlockDurationCheck, "skip-no-compact-block-duration-check", false, "If set, blocks marked as no-compact are not checked against min-block-duration.")
f.StringVar(&c.httpListen, "http-listen-address", ":8080", "HTTP listen address.")
}

Expand All @@ -80,6 +84,27 @@ func (c *config) validate() error {
return nil
}

func (c *config) parseUserMapping() (map[string]string, error) {
m := make(map[string]string, len(c.userMapping))
for _, mapping := range c.userMapping {
splitMapping := strings.Split(mapping, ":")
if len(splitMapping) != 2 || slices.Contains(splitMapping, "") {
return nil, fmt.Errorf("invalid user mapping: %s", mapping)
}
for _, id := range splitMapping {
if err := tenant.ValidTenantID(id); err != nil {
return nil, err
}
}
source := splitMapping[0]
if _, ok := m[source]; ok {
return nil, fmt.Errorf("multiple user mappings for source user: %s", source)
}
m[source] = splitMapping[1]
}
return m, nil
}

type metrics struct {
copyCyclesSucceeded prometheus.Counter
copyCyclesFailed prometheus.Counter
Expand Down Expand Up @@ -126,6 +151,12 @@ func main() {
os.Exit(1)
}

userMapping, err := cfg.parseUserMapping()
if err != nil {
fmt.Fprintln(os.Stderr, err.Error())
os.Exit(1)
}

logger := log.NewLogfmtLogger(os.Stdout)
logger = log.With(logger, "ts", log.DefaultTimestampUTC)

Expand All @@ -144,7 +175,7 @@ func main() {
}
}()

success := runCopy(ctx, cfg, logger, m)
success := runCopy(ctx, cfg, userMapping, logger, m)
if cfg.copyPeriod <= 0 {
if success {
os.Exit(0)
Expand All @@ -158,14 +189,14 @@ func main() {
for ctx.Err() == nil {
select {
case <-t.C:
_ = runCopy(ctx, cfg, logger, m)
_ = runCopy(ctx, cfg, userMapping, logger, m)
case <-ctx.Done():
}
}
}

func runCopy(ctx context.Context, cfg config, logger log.Logger, m *metrics) bool {
err := copyBlocks(ctx, cfg, logger, m)
func runCopy(ctx context.Context, cfg config, userMapping map[string]string, logger log.Logger, m *metrics) bool {
err := copyBlocks(ctx, cfg, userMapping, logger, m)
if err != nil {
m.copyCyclesFailed.Inc()
level.Error(logger).Log("msg", "failed to copy blocks", "err", err, "dryRun", cfg.dryRun)
Expand All @@ -177,7 +208,7 @@ func runCopy(ctx context.Context, cfg config, logger log.Logger, m *metrics) boo
return true
}

func copyBlocks(ctx context.Context, cfg config, logger log.Logger, m *metrics) error {
func copyBlocks(ctx context.Context, cfg config, userMapping map[string]string, logger log.Logger, m *metrics) error {
sourceBucket, destBucket, copyFunc, err := cfg.copyConfig.ToBuckets(ctx)
if err != nil {
return err
Expand All @@ -197,23 +228,28 @@ func copyBlocks(ctx context.Context, cfg config, logger log.Logger, m *metrics)
disabledUsers[u] = struct{}{}
}

return concurrency.ForEachUser(ctx, tenants, cfg.tenantConcurrency, func(ctx context.Context, tenantID string) error {
if !isAllowedUser(enabledUsers, disabledUsers, tenantID) {
return concurrency.ForEachUser(ctx, tenants, cfg.tenantConcurrency, func(ctx context.Context, sourceTenantID string) error {
if !isAllowedUser(enabledUsers, disabledUsers, sourceTenantID) {
return nil
}

logger := log.With(logger, "tenantID", tenantID)
destinationTenantID, ok := userMapping[sourceTenantID]
if !ok {
destinationTenantID = sourceTenantID
}

blocks, err := listBlocksForTenant(ctx, sourceBucket, tenantID)
logger := log.With(logger, "sourceTenantID", sourceTenantID, "destinationTenantID", destinationTenantID)

blocks, err := listBlocksForTenant(ctx, sourceBucket, sourceTenantID)
if err != nil {
level.Error(logger).Log("msg", "failed to list blocks for tenant", "err", err)
return errors.Wrapf(err, "failed to list blocks for tenant %v", tenantID)
return errors.Wrapf(err, "failed to list blocks for tenant %v", sourceTenantID)
}

markers, err := listBlockMarkersForTenant(ctx, sourceBucket, tenantID, destBucket.Name())
markers, err := listBlockMarkersForTenant(ctx, sourceBucket, sourceTenantID, destBucket.Name())
if err != nil {
level.Error(logger).Log("msg", "failed to list blocks markers for tenant", "err", err)
return errors.Wrapf(err, "failed to list block markers for tenant %v", tenantID)
return errors.Wrapf(err, "failed to list block markers for tenant %v", sourceTenantID)
}

var blockIDs []string
Expand Down Expand Up @@ -243,7 +279,7 @@ func copyBlocks(ctx context.Context, cfg config, logger log.Logger, m *metrics)
return nil
}

blockMeta, err := loadMetaJSONFile(ctx, sourceBucket, tenantID, blockID)
blockMeta, err := loadMetaJSONFile(ctx, sourceBucket, sourceTenantID, blockID)
if err != nil {
level.Error(logger).Log("msg", "skipping block, failed to read meta.json file", "err", err)
return err
Expand Down Expand Up @@ -287,7 +323,7 @@ func copyBlocks(ctx context.Context, cfg config, logger log.Logger, m *metrics)

level.Info(logger).Log("msg", "copying block")

err = copySingleBlock(ctx, tenantID, blockID, markers[blockID], sourceBucket, copyFunc)
err = copySingleBlock(ctx, sourceTenantID, destinationTenantID, blockID, markers[blockID], sourceBucket, copyFunc)
if err != nil {
m.blocksCopyFailed.Inc()
level.Error(logger).Log("msg", "failed to copy block", "err", err)
Expand All @@ -297,7 +333,9 @@ func copyBlocks(ctx context.Context, cfg config, logger log.Logger, m *metrics)
m.blocksCopied.Inc()
level.Info(logger).Log("msg", "block copied successfully")

err = uploadCopiedMarkerFile(ctx, sourceBucket, tenantID, blockID, destBucket.Name())
// Note that only the blockID and destination bucket are considered in the copy marker.
// If multiple tenants in the same destination bucket are copied to from the same source tenant the markers will currently clash.
err = uploadCopiedMarkerFile(ctx, sourceBucket, sourceTenantID, blockID, destBucket.Name())
if err != nil {
level.Error(logger).Log("msg", "failed to upload copied-marker file for block", "block", blockID.String(), "err", err)
return err
Expand All @@ -324,13 +362,13 @@ func isAllowedUser(enabled map[string]struct{}, disabled map[string]struct{}, te
}

// This method copies files within single TSDB block to a destination bucket.
func copySingleBlock(ctx context.Context, tenantID string, blockID ulid.ULID, markers blockMarkers, srcBkt objtools.Bucket, copyFunc objtools.CopyFunc) error {
func copySingleBlock(ctx context.Context, sourceTenantID, destinationTenantID string, blockID ulid.ULID, markers blockMarkers, srcBkt objtools.Bucket, copyFunc objtools.CopyFunc) error {
result, err := srcBkt.List(ctx, objtools.ListOptions{
Prefix: tenantID + objtools.Delim + blockID.String(),
Prefix: sourceTenantID + objtools.Delim + blockID.String(),
Recursive: true,
})
if err != nil {
return errors.Wrapf(err, "copySingleBlock: failed to list block files for %v/%v", tenantID, blockID.String())
return errors.Wrapf(err, "copySingleBlock: failed to list block files for %v/%v", sourceTenantID, blockID.String())
}
paths := result.ToNames()

Expand All @@ -346,11 +384,21 @@ func copySingleBlock(ctx context.Context, tenantID string, blockID ulid.ULID, ma

// Copy global markers too (skipping deletion mark because deleted blocks are not copied by this tool).
if markers.noCompact {
paths = append(paths, tenantID+objtools.Delim+block.NoCompactMarkFilepath(blockID))
paths = append(paths, sourceTenantID+objtools.Delim+block.NoCompactMarkFilepath(blockID))
}

isCrossTenant := sourceTenantID != destinationTenantID

for _, fullPath := range paths {
err := copyFunc(ctx, fullPath, objtools.CopyOptions{})
options := objtools.CopyOptions{}
if isCrossTenant {
after, found := strings.CutPrefix(fullPath, sourceTenantID)
if !found {
return fmt.Errorf("unexpected object path that does not begin with sourceTenantID: path=%s, sourceTenantID=%s", fullPath, sourceTenantID)
}
options.DestinationObjectName = destinationTenantID + after
}
err := copyFunc(ctx, fullPath, options)
if err != nil {
return errors.Wrapf(err, "copySingleBlock: failed to copy %v", fullPath)
}
Expand Down

0 comments on commit be2f23b

Please sign in to comment.