Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

copyblocks: add support for S3-compatible buckets and object storage migration #5486

Merged
merged 9 commits into from
Jul 20, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@

### Tools

* [CHANGE] copyblocks: add support for S3 and the ability to copy between different object storage services. Due to this, the `-source-service` and `-destination-service` flags are now required and the `-service` flag has been removed. #5486
* [BUGFIX] Stop tools from panicking when `-help` flag is passed. #5412
* [BUGFIX] Remove github.com/golang/glog command line flags from tools. #5413

Expand Down
62 changes: 54 additions & 8 deletions tools/copyblocks/README.md
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
# Copyblocks

This program can copy Mimir blocks server-side between two buckets on the same object storage service provider.
The currently supported services are Google Cloud Storage (GCS) and Azure Blob Storage (ABS).
This program can copy Mimir blocks between two buckets.

By default, the copy will be attempted server-side if both the source and destination specify the same object storage service. If the buckets are on different object storage services, or if `--client-side-copy` is passed, then the copy will be performed client side.

The currently supported services are Amazon Simple Storage Service (S3 and S3-compatible), Azure Blob Storage (ABS), and Google Cloud Storage (GCS).

## Features

Expand All @@ -16,24 +19,67 @@ The currently supported services are Google Cloud Storage (GCS) and Azure Blob S

```bash
./copyblocks \
--service gcs \
--source-service gcs \
--destination-service gcs \
--copy-period 24h \
--min-block-duration 23h \
--source-bucket <source bucket name> \
--destination-bucket <destination bucket name> \
--min-block-duration 23h
--destination-bucket <destination bucket name>
```

### Example for Azure Blob Storage

```bash
./copyblocks \
--service abs \
--source-service abs \
--destination-service abs \
--copy-period 24h \
--min-block-duration 23h \
--source-bucket https://<source account name>.blob.core.windows.net/<source bucket name> \
--azure-source-account-name <source account name> \
--azure-source-account-key <source account key> \
--destination-bucket https://<destination account name>.blob.core.windows.net/<destination bucket name> \
--azure-destination-account-name <destination account name> \
--azure-destination-account-key <destination account key> \
--min-block-duration 23h
--azure-destination-account-key <destination account key>
```

### Example for Amazon Simple Storage Service

The destination is called to intiate the server-side copy which may require setting up additional permissions for the copy to have access the source bucket.
Consider passing `--client-side-copy` to avoid having to deal with that.

```bash
./copyblocks \
--source-service s3 \
--destination-service s3 \
--copy-period 24h \
--min-block-duration 23h \
--source-bucket <source bucket name> \
--s3-source-access-key <source access key> \
--s3-source-secret-key <source secret key> \
--s3-source-endpoint <source endpoint> \
--destination-bucket <destination bucket name> \
--s3-destination-access-key <destination access key> \
--s3-destination-secret-key <destination secret key> \
--s3-destination-endpoint <destination endpoint>
```

### Example for copying between different providers

Combine the relavant source and destination configuration options using the above examples as a guide.
For instance, to copy from S3 to ABS:

```bash
./copyblocks \
--source-service s3 \
--destination-service abs \
--copy-period 24h \
--min-block-duration 23h \
--source-bucket <source bucket name> \
--s3-source-access-key <source access key> \
--s3-source-secret-key <source secret key> \
--s3-source-endpoint <source endpoint> \
--destination-bucket https://<destination account name>.blob.core.windows.net/<destination bucket name> \
--azure-destination-account-name <destination account name> \
--azure-destination-account-key <destination account key>
```
93 changes: 69 additions & 24 deletions tools/copyblocks/abs.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ package main
import (
"context"
"flag"
"fmt"
"io"
"strings"
"time"
Expand All @@ -19,46 +20,74 @@ import (
)

type azureConfig struct {
sourceAccountName string
sourceAccountKey string
destinationAccountName string
destinationAccountKey string
copyStatusBackoff backoff.Config
source azureClientConfig
destination azureClientConfig
copyStatusBackoff backoff.Config
}

func (c *azureConfig) RegisterFlags(f *flag.FlagSet) {
f.StringVar(&c.sourceAccountName, "azure-source-account-name", "", "Account name for the azure source bucket.")
f.StringVar(&c.sourceAccountKey, "azure-source-account-key", "", "Account key for the azure source bucket.")
f.StringVar(&c.destinationAccountName, "azure-destination-account-name", "", "Account name for the azure destination bucket.")
f.StringVar(&c.destinationAccountKey, "azure-destination-account-key", "", "Account key for the azure destination bucket.")
c.source.RegisterFlags("azure-source-", f)
c.destination.RegisterFlags("azure-destination-", f)
f.DurationVar(&c.copyStatusBackoff.MinBackoff, "azure-copy-status-backoff-min-duration", 15*time.Second, "The minimum amount of time to back off per copy operation.")
f.DurationVar(&c.copyStatusBackoff.MaxBackoff, "azure-copy-status-backoff-max-duration", 20*time.Second, "The maximum amount of time to back off per copy operation.")
f.IntVar(&c.copyStatusBackoff.MaxRetries, "azure-copy-status-backoff-max-retries", 40, "The maximum number of retries while checking the copy status.")
}

func (c *azureConfig) validate(source, destination string) error {
if source == serviceABS {
if err := c.source.validate("azure-source-"); err != nil {
return err
}
}
if destination == serviceABS {
return c.destination.validate("azure-destination-")
}
return nil
}

type azureClientConfig struct {
accountName string
accountKey string
}

func (c *azureClientConfig) RegisterFlags(prefix string, f *flag.FlagSet) {
f.StringVar(&c.accountName, prefix+"account-name", "", "Account name for the Azure bucket.")
f.StringVar(&c.accountKey, prefix+"account-key", "", "Account key for the Azure bucket.")
}

func (c *azureClientConfig) validate(prefix string) error {
if c.accountName == "" {
return fmt.Errorf("the Azure bucket's account name (%s) is required", prefix+"account-name")
}
if c.accountKey == "" {
return fmt.Errorf("the Azure bucket's account key (%s) is required", prefix+"account-key")
}
return nil
}

type azureBucket struct {
azblob.Client
containerClient container.Client
containerName string
copyStatusBackoffConfig backoff.Config
}

func newAzureBucketClient(containerURL string, accountName string, sharedKey string, copyStatusBackoffConfig backoff.Config) (bucket, error) {
func newAzureBucketClient(cfg azureClientConfig, containerURL string, backoffCfg backoff.Config) (bucket, error) {
urlParts, err := blob.ParseURL(containerURL)
if err != nil {
return nil, err
}
containerName := urlParts.ContainerName
if containerName == "" {
return nil, errors.New("container name missing from azure bucket URL")
return nil, errors.New("container name missing from Azure bucket URL")
}
serviceURL, found := strings.CutSuffix(containerURL, containerName)
if !found {
return nil, errors.New("malformed or unexpected azure bucket URL")
return nil, errors.New("malformed or unexpected Azure bucket URL")
}
keyCred, err := azblob.NewSharedKeyCredential(accountName, sharedKey)
keyCred, err := azblob.NewSharedKeyCredential(cfg.accountName, cfg.accountKey)
if err != nil {
return nil, errors.Wrapf(err, "failed to get azure shared key credential")
return nil, errors.Wrapf(err, "failed to get Azure shared key credential")
}
client, err := azblob.NewClientWithSharedKeyCredential(serviceURL, keyCred, nil)
if err != nil {
Expand All @@ -72,7 +101,7 @@ func newAzureBucketClient(containerURL string, accountName string, sharedKey str
Client: *client,
containerClient: *containerClient,
containerName: containerName,
copyStatusBackoffConfig: copyStatusBackoffConfig,
copyStatusBackoffConfig: backoffCfg,
}, nil
}

Expand All @@ -85,8 +114,13 @@ func (bkt *azureBucket) Get(ctx context.Context, objectName string) (io.ReadClos
return response.Body, nil
}

func (bkt *azureBucket) Copy(ctx context.Context, objectName string, dstBucket bucket) error {
func (bkt *azureBucket) ServerSideCopy(ctx context.Context, objectName string, dstBucket bucket) error {
sourceClient := bkt.containerClient.NewBlobClient(objectName)
d, ok := dstBucket.(*azureBucket)
if !ok {
return errors.New("destination bucket wasn't an Azure bucket")
}
dstClient := d.containerClient.NewBlobClient(objectName)

start := time.Now()
expiry := start.Add(10 * time.Minute)
Expand All @@ -95,12 +129,6 @@ func (bkt *azureBucket) Copy(ctx context.Context, objectName string, dstBucket b
if err != nil {
return err
}
d, ok := dstBucket.(*azureBucket)
if !ok {
return errors.New("destination bucket wasn't a blob storage bucket")
}

dstClient := d.containerClient.NewBlobClient(objectName)

var copyStatus *blob.CopyStatusType
var copyStatusDescription *string
Expand Down Expand Up @@ -165,6 +193,23 @@ func checkCopyStatus(ctx context.Context, client *blob.Client) (*blob.CopyStatus
return response.CopyStatus, response.CopyStatusDescription, nil
}

func (bkt *azureBucket) ClientSideCopy(ctx context.Context, objectName string, dstBucket bucket) error {
sourceClient := bkt.containerClient.NewBlobClient(objectName)
response, err := sourceClient.DownloadStream(ctx, nil)
if err != nil {
return errors.Wrap(err, "failed while getting source object from Azure")
}
if response.ContentLength == nil {
return errors.New("source object from Azure did not contain a content length")
}
body := response.DownloadResponse.Body
if err := dstBucket.Upload(ctx, objectName, body, *response.ContentLength); err != nil {
_ = body.Close()
return errors.New("failed uploading source object from Azure to destination")
}
return errors.Wrap(body.Close(), "failed closing Azure source object reader")
}

func (bkt *azureBucket) ListPrefix(ctx context.Context, prefix string, recursive bool) ([]string, error) {
if prefix != "" && !strings.HasSuffix(prefix, delim) {
prefix = prefix + delim
Expand Down Expand Up @@ -209,8 +254,8 @@ func (bkt *azureBucket) ListPrefix(ctx context.Context, prefix string, recursive
return list, nil
}

func (bkt *azureBucket) UploadMarkerFile(ctx context.Context, objectName string) error {
_, err := bkt.UploadBuffer(ctx, bkt.containerName, objectName, []byte{}, nil)
func (bkt *azureBucket) Upload(ctx context.Context, objectName string, reader io.Reader, _ int64) error {
_, err := bkt.UploadStream(ctx, bkt.containerName, objectName, reader, nil)
return err
}

Expand Down
29 changes: 26 additions & 3 deletions tools/copyblocks/gcs.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,10 +33,10 @@ func (bkt *gcsBucket) Get(ctx context.Context, objectName string) (io.ReadCloser
return r, nil
}

func (bkt *gcsBucket) Copy(ctx context.Context, objectName string, dstBucket bucket) error {
func (bkt *gcsBucket) ServerSideCopy(ctx context.Context, objectName string, dstBucket bucket) error {
d, ok := dstBucket.(*gcsBucket)
if !ok {
return errors.New("destination bucket wasn't a gcs bucket")
return errors.New("destination bucket wasn't a GCS bucket")
}
srcObj := bkt.Object(objectName)
dstObject := d.BucketHandle.Object(objectName)
Expand All @@ -45,6 +45,19 @@ func (bkt *gcsBucket) Copy(ctx context.Context, objectName string, dstBucket buc
return err
}

func (bkt *gcsBucket) ClientSideCopy(ctx context.Context, objectName string, dstBucket bucket) error {
srcObj := bkt.Object(objectName)
reader, err := srcObj.NewReader(ctx)
if err != nil {
return errors.Wrap(err, "failed to get GCS source object reader")
}
if err := dstBucket.Upload(ctx, objectName, reader, reader.Attrs.Size); err != nil {
_ = reader.Close()
return errors.Wrap(err, "failed to upload GCS source object to destination")
}
return errors.Wrap(reader.Close(), "failed closing GCS source object reader")
}

func (bkt *gcsBucket) ListPrefix(ctx context.Context, prefix string, recursive bool) ([]string, error) {
if len(prefix) > 0 && prefix[len(prefix)-1:] != delim {
prefix = prefix + delim
Expand Down Expand Up @@ -90,9 +103,19 @@ func (bkt *gcsBucket) ListPrefix(ctx context.Context, prefix string, recursive b
return result, nil
}

func (bkt *gcsBucket) UploadMarkerFile(ctx context.Context, objectName string) error {
func (bkt *gcsBucket) Upload(ctx context.Context, objectName string, reader io.Reader, contentLength int64) error {
ctx, cancel := context.WithCancel(ctx)
defer cancel()

obj := bkt.Object(objectName)
w := obj.NewWriter(ctx)
n, err := io.Copy(w, reader)
if err != nil {
return errors.Wrap(err, "failed during copy stage of GCS upload")
}
if n != contentLength {
return errors.Wrapf(err, "unexpected content length from copy: expected=%d, actual=%d", contentLength, n)
}
return w.Close()
}

Expand Down
Loading