Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

copyblocks: support copying between tenants #10110

Merged
merged 4 commits into from
Dec 4, 2024
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,8 @@
### Tools

* [FEATURE] `splitblocks`: add new tool to split blocks larger than a specified duration into multiple blocks. #9517, #9779
* [ENHANCEMENT] `copyblocks`: Added `--skip-no-compact-block-duration-check`, which defaults to `false`, to simplify targeting blocks that are not awaiting compaction. #9439
* [ENHANCEMENT] `copyblocks`: add `--skip-no-compact-block-duration-check`, which defaults to `false`, to simplify targeting blocks that are not awaiting compaction. #9439
* [ENHANCEMENT] `copyblocks`: add `--cross-tenant-mapping` to support copying data between tenants. #10110
* [ENHANCEMENT] `kafkatool`: add SASL plain authentication support. The following new CLI flags have been added: #9584
* `--kafka-sasl-username`
* `--kafka-sasl-password`
Expand Down
1 change: 1 addition & 0 deletions tools/copyblocks/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ The currently supported services are Amazon Simple Storage Service (S3 and S3-co
- Include or exclude users from having blocks copied (`--enabled-users` and `--disabled-users`)
- Configurable minimum block duration (`--min-block-duration`) and (`--skip-no-compact-block-duration-check`) to target blocks that are not awaiting compaction
- Configurable time range (`--min-time` and `--max-time`) to only copy blocks inclusively within a provided range
- Copy blocks between tenants with `--cross-tenant-mapping`. For instance, `--cross-tenant-mapping="tenant1:tenant2,tenant3:tenant4"` maps source blocks from `tenant1` to `tenant2` and source blocks from `tenant3` to tenant `tenant4`. If you don't provide a mapping for a tenant, it is assumed to be identical to the source tenant.
- Log what would be copied without actually copying anything with `--dry-run`

## Running
Expand Down
82 changes: 62 additions & 20 deletions tools/copyblocks/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
"os"
"os/signal"
"path/filepath"
"slices"
"strings"
"syscall"
"time"
Expand All @@ -24,6 +25,7 @@ import (
"github.com/go-kit/log/level"
"github.com/grafana/dskit/concurrency"
"github.com/grafana/dskit/flagext"
"github.com/grafana/dskit/tenant"
"github.com/oklog/ulid"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
Expand All @@ -44,6 +46,7 @@ type config struct {
copyPeriod time.Duration
enabledUsers flagext.StringSliceCSV
disabledUsers flagext.StringSliceCSV
crossTenantMapping flagext.StringSliceCSV
dryRun bool
skipNoCompactBlockDurationCheck bool
httpListen string
Expand All @@ -59,6 +62,7 @@ func (c *config) registerFlags(f *flag.FlagSet) {
f.DurationVar(&c.copyPeriod, "copy-period", 0, "How often to repeat the copy. If set to 0, copy is done once, and the program stops. Otherwise, the program keeps running and copying blocks until it is terminated.")
f.Var(&c.enabledUsers, "enabled-users", "If not empty, only blocks for these users are copied.")
f.Var(&c.disabledUsers, "disabled-users", "If not empty, blocks for these users are not copied.")
f.Var(&c.crossTenantMapping, "cross-tenant-mapping", "A comma-separated list of (source tenant):(destination tenant). If a source tenant is not mapped then its destination tenant is assumed to be identical.")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

cross-tenant-mapping

(nit) Maybe it's better to stick to "users" on the inputs level? The command has --enabled-users and --disabled-users flags. So it feels confusing if one doesn't know that those are the same. Can we name the flag --user-mapping?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice catch, thanks. I changed it to --user-mapping and updated the documentation to reflect that.

f.BoolVar(&c.dryRun, "dry-run", false, "Don't perform any copy; only log what would happen.")
f.BoolVar(&c.skipNoCompactBlockDurationCheck, "skip-no-compact-block-duration-check", false, "If set, blocks marked as no-compact are not checked against min-block-duration")
f.StringVar(&c.httpListen, "http-listen-address", ":8080", "HTTP listen address.")
Expand All @@ -80,6 +84,23 @@ func (c *config) validate() error {
return nil
}

func (c *config) parseCrossTenantMapping() (map[string]string, error) {
m := make(map[string]string, len(c.crossTenantMapping))
for _, mapping := range c.crossTenantMapping {
splitMapping := strings.Split(mapping, ":")
if len(splitMapping) != 2 || slices.Contains(splitMapping, "") {
return nil, fmt.Errorf("invalid tenant mapping: %s", mapping)
}
for _, id := range splitMapping {
if err := tenant.ValidTenantID(id); err != nil {
return nil, err
}
}
m[splitMapping[0]] = splitMapping[1]
}
return m, nil
}

type metrics struct {
copyCyclesSucceeded prometheus.Counter
copyCyclesFailed prometheus.Counter
Expand Down Expand Up @@ -126,6 +147,12 @@ func main() {
os.Exit(1)
}

crossTenantMapping, err := cfg.parseCrossTenantMapping()
if err != nil {
fmt.Fprintln(os.Stderr, err.Error())
os.Exit(1)
}

logger := log.NewLogfmtLogger(os.Stdout)
logger = log.With(logger, "ts", log.DefaultTimestampUTC)

Expand All @@ -144,7 +171,7 @@ func main() {
}
}()

success := runCopy(ctx, cfg, logger, m)
success := runCopy(ctx, cfg, crossTenantMapping, logger, m)
if cfg.copyPeriod <= 0 {
if success {
os.Exit(0)
Expand All @@ -158,14 +185,14 @@ func main() {
for ctx.Err() == nil {
select {
case <-t.C:
_ = runCopy(ctx, cfg, logger, m)
_ = runCopy(ctx, cfg, crossTenantMapping, logger, m)
case <-ctx.Done():
}
}
}

func runCopy(ctx context.Context, cfg config, logger log.Logger, m *metrics) bool {
err := copyBlocks(ctx, cfg, logger, m)
func runCopy(ctx context.Context, cfg config, crossTenantMapping map[string]string, logger log.Logger, m *metrics) bool {
err := copyBlocks(ctx, cfg, crossTenantMapping, logger, m)
if err != nil {
m.copyCyclesFailed.Inc()
level.Error(logger).Log("msg", "failed to copy blocks", "err", err, "dryRun", cfg.dryRun)
Expand All @@ -177,7 +204,7 @@ func runCopy(ctx context.Context, cfg config, logger log.Logger, m *metrics) boo
return true
}

func copyBlocks(ctx context.Context, cfg config, logger log.Logger, m *metrics) error {
func copyBlocks(ctx context.Context, cfg config, crossTenantMapping map[string]string, logger log.Logger, m *metrics) error {
sourceBucket, destBucket, copyFunc, err := cfg.copyConfig.ToBuckets(ctx)
if err != nil {
return err
Expand All @@ -197,23 +224,28 @@ func copyBlocks(ctx context.Context, cfg config, logger log.Logger, m *metrics)
disabledUsers[u] = struct{}{}
}

return concurrency.ForEachUser(ctx, tenants, cfg.tenantConcurrency, func(ctx context.Context, tenantID string) error {
if !isAllowedUser(enabledUsers, disabledUsers, tenantID) {
return concurrency.ForEachUser(ctx, tenants, cfg.tenantConcurrency, func(ctx context.Context, sourceTenantID string) error {
if !isAllowedUser(enabledUsers, disabledUsers, sourceTenantID) {
return nil
}

logger := log.With(logger, "tenantID", tenantID)
destinationTenantID, ok := crossTenantMapping[sourceTenantID]
if !ok {
destinationTenantID = sourceTenantID
}

logger := log.With(logger, "sourceTenantID", sourceTenantID, "destinationTenantID", destinationTenantID)

blocks, err := listBlocksForTenant(ctx, sourceBucket, tenantID)
blocks, err := listBlocksForTenant(ctx, sourceBucket, sourceTenantID)
if err != nil {
level.Error(logger).Log("msg", "failed to list blocks for tenant", "err", err)
return errors.Wrapf(err, "failed to list blocks for tenant %v", tenantID)
return errors.Wrapf(err, "failed to list blocks for tenant %v", sourceTenantID)
}

markers, err := listBlockMarkersForTenant(ctx, sourceBucket, tenantID, destBucket.Name())
markers, err := listBlockMarkersForTenant(ctx, sourceBucket, sourceTenantID, destBucket.Name())
if err != nil {
level.Error(logger).Log("msg", "failed to list blocks markers for tenant", "err", err)
return errors.Wrapf(err, "failed to list block markers for tenant %v", tenantID)
return errors.Wrapf(err, "failed to list block markers for tenant %v", sourceTenantID)
}

var blockIDs []string
Expand Down Expand Up @@ -243,7 +275,7 @@ func copyBlocks(ctx context.Context, cfg config, logger log.Logger, m *metrics)
return nil
}

blockMeta, err := loadMetaJSONFile(ctx, sourceBucket, tenantID, blockID)
blockMeta, err := loadMetaJSONFile(ctx, sourceBucket, sourceTenantID, blockID)
if err != nil {
level.Error(logger).Log("msg", "skipping block, failed to read meta.json file", "err", err)
return err
Expand Down Expand Up @@ -287,7 +319,7 @@ func copyBlocks(ctx context.Context, cfg config, logger log.Logger, m *metrics)

level.Info(logger).Log("msg", "copying block")

err = copySingleBlock(ctx, tenantID, blockID, markers[blockID], sourceBucket, copyFunc)
err = copySingleBlock(ctx, sourceTenantID, destinationTenantID, blockID, markers[blockID], sourceBucket, copyFunc)
if err != nil {
m.blocksCopyFailed.Inc()
level.Error(logger).Log("msg", "failed to copy block", "err", err)
Expand All @@ -297,7 +329,7 @@ func copyBlocks(ctx context.Context, cfg config, logger log.Logger, m *metrics)
m.blocksCopied.Inc()
level.Info(logger).Log("msg", "block copied successfully")

err = uploadCopiedMarkerFile(ctx, sourceBucket, tenantID, blockID, destBucket.Name())
err = uploadCopiedMarkerFile(ctx, sourceBucket, sourceTenantID, blockID, destBucket.Name())
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(nit) Maybe we can add a note about the behaviour around markers, just it was clear from the code

Suggested change
err = uploadCopiedMarkerFile(ctx, sourceBucket, sourceTenantID, blockID, destBucket.Name())
// Note that markers copying ignores the tenants mapping. This may not work if copying blocks to multiple destination tenants.
err = uploadCopiedMarkerFile(ctx, sourceBucket, sourceTenantID, blockID, destBucket.Name())

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I reworded it a bit and added the comment:

// Note that only the blockID and destination bucket are considered in the copy marker.
// If multiple tenants in the same destination bucket are copied to from the same source tenant the markers will currently clash.

if err != nil {
level.Error(logger).Log("msg", "failed to upload copied-marker file for block", "block", blockID.String(), "err", err)
return err
Expand All @@ -324,13 +356,13 @@ func isAllowedUser(enabled map[string]struct{}, disabled map[string]struct{}, te
}

// This method copies files within single TSDB block to a destination bucket.
func copySingleBlock(ctx context.Context, tenantID string, blockID ulid.ULID, markers blockMarkers, srcBkt objtools.Bucket, copyFunc objtools.CopyFunc) error {
func copySingleBlock(ctx context.Context, sourceTenantID, destinationTenantID string, blockID ulid.ULID, markers blockMarkers, srcBkt objtools.Bucket, copyFunc objtools.CopyFunc) error {
result, err := srcBkt.List(ctx, objtools.ListOptions{
Prefix: tenantID + objtools.Delim + blockID.String(),
Prefix: sourceTenantID + objtools.Delim + blockID.String(),
Recursive: true,
})
if err != nil {
return errors.Wrapf(err, "copySingleBlock: failed to list block files for %v/%v", tenantID, blockID.String())
return errors.Wrapf(err, "copySingleBlock: failed to list block files for %v/%v", sourceTenantID, blockID.String())
}
paths := result.ToNames()

Expand All @@ -346,11 +378,21 @@ func copySingleBlock(ctx context.Context, tenantID string, blockID ulid.ULID, ma

// Copy global markers too (skipping deletion mark because deleted blocks are not copied by this tool).
if markers.noCompact {
paths = append(paths, tenantID+objtools.Delim+block.NoCompactMarkFilepath(blockID))
paths = append(paths, sourceTenantID+objtools.Delim+block.NoCompactMarkFilepath(blockID))
}

isCrossTenant := sourceTenantID != destinationTenantID

for _, fullPath := range paths {
err := copyFunc(ctx, fullPath, objtools.CopyOptions{})
options := objtools.CopyOptions{}
if isCrossTenant {
after, found := strings.CutPrefix(fullPath, sourceTenantID)
if !found {
return fmt.Errorf("unexpected object path that does not begin with sourceTenantID: path=%s, sourceTenantID=%s", fullPath, sourceTenantID)
}
options.DestinationObjectName = destinationTenantID + after
}
err := copyFunc(ctx, fullPath, options)
if err != nil {
return errors.Wrapf(err, "copySingleBlock: failed to copy %v", fullPath)
}
Expand Down
Loading