diff --git a/CHANGELOG.md b/CHANGELOG.md index d1859f47e70..9a011d1177c 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -84,6 +84,7 @@ ### Tools +* [CHANGE] copyblocks: add support for S3 and the ability to copy between different object storage services. Due to this, the `-source-service` and `-destination-service` flags are now required and the `-service` flag has been removed. #5486 * [BUGFIX] Stop tools from panicking when `-help` flag is passed. #5412 * [BUGFIX] Remove github.com/golang/glog command line flags from tools. #5413 diff --git a/tools/copyblocks/README.md b/tools/copyblocks/README.md index 11bed54f0a0..97a2a45efd4 100644 --- a/tools/copyblocks/README.md +++ b/tools/copyblocks/README.md @@ -1,7 +1,10 @@ # Copyblocks -This program can copy Mimir blocks server-side between two buckets on the same object storage service provider. -The currently supported services are Google Cloud Storage (GCS) and Azure Blob Storage (ABS). +This program can copy Mimir blocks between two buckets. + +By default, the copy will be attempted server-side if both the source and destination specify the same object storage service. If the buckets are on different object storage services, or if `--client-side-copy` is passed, then the copy will be performed client side. + +The currently supported services are Amazon Simple Storage Service (S3 and S3-compatible), Azure Blob Storage (ABS), and Google Cloud Storage (GCS). ## Features @@ -16,24 +19,67 @@ The currently supported services are Google Cloud Storage (GCS) and Azure Blob S ```bash ./copyblocks \ - --service gcs \ + --source-service gcs \ + --destination-service gcs \ --copy-period 24h \ + --min-block-duration 23h \ --source-bucket \ - --destination-bucket \ - --min-block-duration 23h + --destination-bucket ``` ### Example for Azure Blob Storage ```bash ./copyblocks \ - --service abs \ + --source-service abs \ + --destination-service abs \ --copy-period 24h \ + --min-block-duration 23h \ --source-bucket https://.blob.core.windows.net/ \ --azure-source-account-name \ --azure-source-account-key \ --destination-bucket https://.blob.core.windows.net/ \ --azure-destination-account-name \ - --azure-destination-account-key \ - --min-block-duration 23h + --azure-destination-account-key +``` + +### Example for Amazon Simple Storage Service + +The destination is called to intiate the server-side copy which may require setting up additional permissions for the copy to have access the source bucket. +Consider passing `--client-side-copy` to avoid having to deal with that. + +```bash +./copyblocks \ + --source-service s3 \ + --destination-service s3 \ + --copy-period 24h \ + --min-block-duration 23h \ + --source-bucket \ + --s3-source-access-key \ + --s3-source-secret-key \ + --s3-source-endpoint \ + --destination-bucket \ + --s3-destination-access-key \ + --s3-destination-secret-key \ + --s3-destination-endpoint +``` + +### Example for copying between different providers + +Combine the relavant source and destination configuration options using the above examples as a guide. +For instance, to copy from S3 to ABS: + +```bash +./copyblocks \ + --source-service s3 \ + --destination-service abs \ + --copy-period 24h \ + --min-block-duration 23h \ + --source-bucket \ + --s3-source-access-key \ + --s3-source-secret-key \ + --s3-source-endpoint \ + --destination-bucket https://.blob.core.windows.net/ \ + --azure-destination-account-name \ + --azure-destination-account-key ``` diff --git a/tools/copyblocks/abs.go b/tools/copyblocks/abs.go index 21313ea58c6..7fa7876cf11 100644 --- a/tools/copyblocks/abs.go +++ b/tools/copyblocks/abs.go @@ -5,6 +5,7 @@ package main import ( "context" "flag" + "fmt" "io" "strings" "time" @@ -19,23 +20,51 @@ import ( ) type azureConfig struct { - sourceAccountName string - sourceAccountKey string - destinationAccountName string - destinationAccountKey string - copyStatusBackoff backoff.Config + source azureClientConfig + destination azureClientConfig + copyStatusBackoff backoff.Config } func (c *azureConfig) RegisterFlags(f *flag.FlagSet) { - f.StringVar(&c.sourceAccountName, "azure-source-account-name", "", "Account name for the azure source bucket.") - f.StringVar(&c.sourceAccountKey, "azure-source-account-key", "", "Account key for the azure source bucket.") - f.StringVar(&c.destinationAccountName, "azure-destination-account-name", "", "Account name for the azure destination bucket.") - f.StringVar(&c.destinationAccountKey, "azure-destination-account-key", "", "Account key for the azure destination bucket.") + c.source.RegisterFlags("azure-source-", f) + c.destination.RegisterFlags("azure-destination-", f) f.DurationVar(&c.copyStatusBackoff.MinBackoff, "azure-copy-status-backoff-min-duration", 15*time.Second, "The minimum amount of time to back off per copy operation.") f.DurationVar(&c.copyStatusBackoff.MaxBackoff, "azure-copy-status-backoff-max-duration", 20*time.Second, "The maximum amount of time to back off per copy operation.") f.IntVar(&c.copyStatusBackoff.MaxRetries, "azure-copy-status-backoff-max-retries", 40, "The maximum number of retries while checking the copy status.") } +func (c *azureConfig) validate(source, destination string) error { + if source == serviceABS { + if err := c.source.validate("azure-source-"); err != nil { + return err + } + } + if destination == serviceABS { + return c.destination.validate("azure-destination-") + } + return nil +} + +type azureClientConfig struct { + accountName string + accountKey string +} + +func (c *azureClientConfig) RegisterFlags(prefix string, f *flag.FlagSet) { + f.StringVar(&c.accountName, prefix+"account-name", "", "Account name for the Azure bucket.") + f.StringVar(&c.accountKey, prefix+"account-key", "", "Account key for the Azure bucket.") +} + +func (c *azureClientConfig) validate(prefix string) error { + if c.accountName == "" { + return fmt.Errorf("the Azure bucket's account name (%s) is required", prefix+"account-name") + } + if c.accountKey == "" { + return fmt.Errorf("the Azure bucket's account key (%s) is required", prefix+"account-key") + } + return nil +} + type azureBucket struct { azblob.Client containerClient container.Client @@ -43,22 +72,22 @@ type azureBucket struct { copyStatusBackoffConfig backoff.Config } -func newAzureBucketClient(containerURL string, accountName string, sharedKey string, copyStatusBackoffConfig backoff.Config) (bucket, error) { +func newAzureBucketClient(cfg azureClientConfig, containerURL string, backoffCfg backoff.Config) (bucket, error) { urlParts, err := blob.ParseURL(containerURL) if err != nil { return nil, err } containerName := urlParts.ContainerName if containerName == "" { - return nil, errors.New("container name missing from azure bucket URL") + return nil, errors.New("container name missing from Azure bucket URL") } serviceURL, found := strings.CutSuffix(containerURL, containerName) if !found { - return nil, errors.New("malformed or unexpected azure bucket URL") + return nil, errors.New("malformed or unexpected Azure bucket URL") } - keyCred, err := azblob.NewSharedKeyCredential(accountName, sharedKey) + keyCred, err := azblob.NewSharedKeyCredential(cfg.accountName, cfg.accountKey) if err != nil { - return nil, errors.Wrapf(err, "failed to get azure shared key credential") + return nil, errors.Wrapf(err, "failed to get Azure shared key credential") } client, err := azblob.NewClientWithSharedKeyCredential(serviceURL, keyCred, nil) if err != nil { @@ -72,7 +101,7 @@ func newAzureBucketClient(containerURL string, accountName string, sharedKey str Client: *client, containerClient: *containerClient, containerName: containerName, - copyStatusBackoffConfig: copyStatusBackoffConfig, + copyStatusBackoffConfig: backoffCfg, }, nil } @@ -85,8 +114,13 @@ func (bkt *azureBucket) Get(ctx context.Context, objectName string) (io.ReadClos return response.Body, nil } -func (bkt *azureBucket) Copy(ctx context.Context, objectName string, dstBucket bucket) error { +func (bkt *azureBucket) ServerSideCopy(ctx context.Context, objectName string, dstBucket bucket) error { sourceClient := bkt.containerClient.NewBlobClient(objectName) + d, ok := dstBucket.(*azureBucket) + if !ok { + return errors.New("destination bucket wasn't an Azure bucket") + } + dstClient := d.containerClient.NewBlobClient(objectName) start := time.Now() expiry := start.Add(10 * time.Minute) @@ -95,12 +129,6 @@ func (bkt *azureBucket) Copy(ctx context.Context, objectName string, dstBucket b if err != nil { return err } - d, ok := dstBucket.(*azureBucket) - if !ok { - return errors.New("destination bucket wasn't a blob storage bucket") - } - - dstClient := d.containerClient.NewBlobClient(objectName) var copyStatus *blob.CopyStatusType var copyStatusDescription *string @@ -165,6 +193,23 @@ func checkCopyStatus(ctx context.Context, client *blob.Client) (*blob.CopyStatus return response.CopyStatus, response.CopyStatusDescription, nil } +func (bkt *azureBucket) ClientSideCopy(ctx context.Context, objectName string, dstBucket bucket) error { + sourceClient := bkt.containerClient.NewBlobClient(objectName) + response, err := sourceClient.DownloadStream(ctx, nil) + if err != nil { + return errors.Wrap(err, "failed while getting source object from Azure") + } + if response.ContentLength == nil { + return errors.New("source object from Azure did not contain a content length") + } + body := response.DownloadResponse.Body + if err := dstBucket.Upload(ctx, objectName, body, *response.ContentLength); err != nil { + _ = body.Close() + return errors.New("failed uploading source object from Azure to destination") + } + return errors.Wrap(body.Close(), "failed closing Azure source object reader") +} + func (bkt *azureBucket) ListPrefix(ctx context.Context, prefix string, recursive bool) ([]string, error) { if prefix != "" && !strings.HasSuffix(prefix, delim) { prefix = prefix + delim @@ -209,8 +254,8 @@ func (bkt *azureBucket) ListPrefix(ctx context.Context, prefix string, recursive return list, nil } -func (bkt *azureBucket) UploadMarkerFile(ctx context.Context, objectName string) error { - _, err := bkt.UploadBuffer(ctx, bkt.containerName, objectName, []byte{}, nil) +func (bkt *azureBucket) Upload(ctx context.Context, objectName string, reader io.Reader, _ int64) error { + _, err := bkt.UploadStream(ctx, bkt.containerName, objectName, reader, nil) return err } diff --git a/tools/copyblocks/gcs.go b/tools/copyblocks/gcs.go index 4616b8ae9d9..d2be9f17f22 100644 --- a/tools/copyblocks/gcs.go +++ b/tools/copyblocks/gcs.go @@ -33,10 +33,10 @@ func (bkt *gcsBucket) Get(ctx context.Context, objectName string) (io.ReadCloser return r, nil } -func (bkt *gcsBucket) Copy(ctx context.Context, objectName string, dstBucket bucket) error { +func (bkt *gcsBucket) ServerSideCopy(ctx context.Context, objectName string, dstBucket bucket) error { d, ok := dstBucket.(*gcsBucket) if !ok { - return errors.New("destination bucket wasn't a gcs bucket") + return errors.New("destination bucket wasn't a GCS bucket") } srcObj := bkt.Object(objectName) dstObject := d.BucketHandle.Object(objectName) @@ -45,6 +45,19 @@ func (bkt *gcsBucket) Copy(ctx context.Context, objectName string, dstBucket buc return err } +func (bkt *gcsBucket) ClientSideCopy(ctx context.Context, objectName string, dstBucket bucket) error { + srcObj := bkt.Object(objectName) + reader, err := srcObj.NewReader(ctx) + if err != nil { + return errors.Wrap(err, "failed to get GCS source object reader") + } + if err := dstBucket.Upload(ctx, objectName, reader, reader.Attrs.Size); err != nil { + _ = reader.Close() + return errors.Wrap(err, "failed to upload GCS source object to destination") + } + return errors.Wrap(reader.Close(), "failed closing GCS source object reader") +} + func (bkt *gcsBucket) ListPrefix(ctx context.Context, prefix string, recursive bool) ([]string, error) { if len(prefix) > 0 && prefix[len(prefix)-1:] != delim { prefix = prefix + delim @@ -90,9 +103,19 @@ func (bkt *gcsBucket) ListPrefix(ctx context.Context, prefix string, recursive b return result, nil } -func (bkt *gcsBucket) UploadMarkerFile(ctx context.Context, objectName string) error { +func (bkt *gcsBucket) Upload(ctx context.Context, objectName string, reader io.Reader, contentLength int64) error { + ctx, cancel := context.WithCancel(ctx) + defer cancel() + obj := bkt.Object(objectName) w := obj.NewWriter(ctx) + n, err := io.Copy(w, reader) + if err != nil { + return errors.Wrap(err, "failed during copy stage of GCS upload") + } + if n != contentLength { + return errors.Wrapf(err, "unexpected content length from copy: expected=%d, actual=%d", contentLength, n) + } return w.Close() } diff --git a/tools/copyblocks/main.go b/tools/copyblocks/main.go index 02078e1610e..4c6b0757ef0 100644 --- a/tools/copyblocks/main.go +++ b/tools/copyblocks/main.go @@ -6,6 +6,7 @@ package main import ( + "bytes" "context" "encoding/json" "flag" @@ -36,36 +37,45 @@ import ( const ( serviceGCS = "gcs" // Google Cloud Storage serviceABS = "abs" // Azure Blob Storage + serviceS3 = "s3" // Amazon Simple Storage Service delim = "/" // Used by Mimir to delimit tenants and blocks, and objects within blocks. ) type bucket interface { Get(ctx context.Context, objectName string) (io.ReadCloser, error) - Copy(ctx context.Context, objectName string, dstBucket bucket) error + ServerSideCopy(ctx context.Context, objectName string, dstBucket bucket) error + ClientSideCopy(ctx context.Context, objectName string, dstBucket bucket) error ListPrefix(ctx context.Context, prefix string, recursive bool) ([]string, error) - UploadMarkerFile(ctx context.Context, objectName string) error + Upload(ctx context.Context, objectName string, reader io.Reader, contentLength int64) error Name() string } +type CopyFunc func(context.Context, string) error + type config struct { - service string - sourceBucket string - destBucket string - minBlockDuration time.Duration - minTime flagext.Time - maxTime flagext.Time - tenantConcurrency int - blocksConcurrency int - copyPeriod time.Duration - enabledUsers flagext.StringSliceCSV - disabledUsers flagext.StringSliceCSV - dryRun bool - httpListen string - azureConfig azureConfig + sourceService string + destinationService string + sourceBucket string + destBucket string + minBlockDuration time.Duration + minTime flagext.Time + maxTime flagext.Time + tenantConcurrency int + blocksConcurrency int + copyPeriod time.Duration + enabledUsers flagext.StringSliceCSV + disabledUsers flagext.StringSliceCSV + dryRun bool + clientSideCopy bool + httpListen string + azureConfig azureConfig + s3Config s3Config } func (c *config) RegisterFlags(f *flag.FlagSet) { - f.StringVar(&c.service, "service", "", fmt.Sprintf("Service that both the buckets are on. Acceptable values: %s or %s.", serviceGCS, serviceABS)) + acceptedServices := fmt.Sprintf(" Accepted values: %s, %s or %s.", serviceABS, serviceGCS, serviceS3) + f.StringVar(&c.sourceService, "source-service", "", "Service that the source bucket is on."+acceptedServices) + f.StringVar(&c.destinationService, "destination-service", "", "Service that the destination bucket is on."+acceptedServices) f.StringVar(&c.sourceBucket, "source-bucket", "", "Source bucket with blocks.") f.StringVar(&c.destBucket, "destination-bucket", "", "Destination bucket with blocks.") f.DurationVar(&c.minBlockDuration, "min-block-duration", 0, "If non-zero, ignore blocks that cover block range smaller than this.") @@ -76,9 +86,11 @@ func (c *config) RegisterFlags(f *flag.FlagSet) { f.DurationVar(&c.copyPeriod, "copy-period", 0, "How often to repeat the copy. If set to 0, copy is done once, and the program stops. Otherwise, the program keeps running and copying blocks until it is terminated.") f.Var(&c.enabledUsers, "enabled-users", "If not empty, only blocks for these users are copied.") f.Var(&c.disabledUsers, "disabled-users", "If not empty, blocks for these users are not copied.") + f.BoolVar(&c.dryRun, "dry-run", false, "Don't perform any copy; only log what would happen.") + f.BoolVar(&c.clientSideCopy, "client-side-copy", false, "Use client side copying. This option is only respected if copying between two buckets of the same service. Client side copying is always used when copying between different services.") f.StringVar(&c.httpListen, "http-listen-address", ":8080", "HTTP listen address.") - f.BoolVar(&c.dryRun, "dry-run", false, "Don't perform copy; only log what would happen.") c.azureConfig.RegisterFlags(f) + c.s3Config.RegisterFlags(f) } type metrics struct { @@ -157,54 +169,60 @@ func main() { } func initializeBuckets(ctx context.Context, cfg config) (sourceBucket bucket, destBucket bucket, err error) { - if cfg.sourceBucket == "" || cfg.destBucket == "" { - return nil, nil, errors.New("--source-bucket or --destination-bucket is missing") + if cfg.sourceService == "" { + return nil, nil, errors.New("--source-service is missing") + } + if cfg.destinationService == "" { + return nil, nil, errors.New("--destination-service is missing") + } + if cfg.sourceBucket == "" { + return nil, nil, errors.New("--source-bucket is missing") + } + if cfg.destBucket == "" { + return nil, nil, errors.New("--destination-bucket is missing") } if cfg.sourceBucket == cfg.destBucket { return nil, nil, errors.New("--source-bucket and --destination-bucket can not be the same") } + if err := cfg.azureConfig.validate(cfg.sourceService, cfg.destinationService); err != nil { + return nil, nil, err + } + if err := cfg.s3Config.validate(cfg.sourceService, cfg.destinationService); err != nil { + return nil, nil, err + } - switch strings.ToLower(cfg.service) { - case serviceGCS: - client, err := storage.NewClient(ctx) - if err != nil { - return nil, nil, errors.Wrapf(err, "failed to create client") - } - sourceBucket = newGCSBucket(client, cfg.sourceBucket) - destBucket = newGCSBucket(client, cfg.destBucket) + sourceBucket, err = initializeBucket(ctx, cfg, cfg.sourceService, cfg.sourceBucket, true) + if err != nil { + return nil, nil, err + } + destBucket, err = initializeBucket(ctx, cfg, cfg.destinationService, cfg.destBucket, false) + if err != nil { + return nil, nil, err + } + return sourceBucket, destBucket, nil +} + +func initializeBucket(ctx context.Context, cfg config, service, bucket string, isSource bool) (bucket, error) { + switch strings.ToLower(service) { case serviceABS: - azureConfig := cfg.azureConfig - if azureConfig.sourceAccountKey == "" || azureConfig.sourceAccountName == "" { - return nil, nil, errors.New("the azure source bucket's account name (--azure-source-account-name) and account key (--azure-source-account-key) are required") - } - if azureConfig.destinationAccountKey == "" || azureConfig.destinationAccountName == "" { - return nil, nil, errors.New("the azure destination bucket's account name (--azure-destination-account-name) and account key (--azure-destination-account-key) are required") + if isSource { + return newAzureBucketClient(cfg.azureConfig.source, bucket, cfg.azureConfig.copyStatusBackoff) } - - var err error - sourceBucket, err = newAzureBucketClient( - cfg.sourceBucket, - azureConfig.sourceAccountName, - azureConfig.sourceAccountKey, - azureConfig.copyStatusBackoff, - ) + return newAzureBucketClient(cfg.azureConfig.destination, bucket, cfg.azureConfig.copyStatusBackoff) + case serviceGCS: + client, err := storage.NewClient(ctx) if err != nil { - return nil, nil, errors.Wrapf(err, "failed to create source azure bucket client") + return nil, errors.Wrapf(err, "failed to create GCS storage client") } - destBucket, err = newAzureBucketClient( - cfg.destBucket, - azureConfig.destinationAccountName, - azureConfig.destinationAccountKey, - azureConfig.copyStatusBackoff, - ) - if err != nil { - return nil, nil, errors.Wrapf(err, "failed to create destination azure bucket client") + return newGCSBucket(client, bucket), nil + case serviceS3: + if isSource { + return newS3Client(cfg.s3Config.source, bucket) } + return newS3Client(cfg.s3Config.destination, bucket) default: - return nil, nil, errors.Errorf("invalid service: %v", cfg.service) + return nil, errors.Errorf("invalid service: %v", service) } - - return sourceBucket, destBucket, nil } func runCopy(ctx context.Context, cfg config, logger log.Logger, m *metrics) bool { @@ -226,6 +244,17 @@ func copyBlocks(ctx context.Context, cfg config, logger log.Logger, m *metrics) return err } + var copyFunc CopyFunc + if cfg.clientSideCopy || cfg.sourceService != cfg.destinationService { + copyFunc = func(ctx context.Context, objectName string) error { + return sourceBucket.ClientSideCopy(ctx, objectName, destBucket) + } + } else { + copyFunc = func(ctx context.Context, objectName string) error { + return sourceBucket.ServerSideCopy(ctx, objectName, destBucket) + } + } + tenants, err := listTenants(ctx, sourceBucket) if err != nil { return errors.Wrapf(err, "failed to list tenants") @@ -329,7 +358,7 @@ func copyBlocks(ctx context.Context, cfg config, logger log.Logger, m *metrics) level.Info(logger).Log("msg", "copying block") - err = copySingleBlock(ctx, tenantID, blockID, markers[blockID], sourceBucket, destBucket) + err = copySingleBlock(ctx, tenantID, blockID, markers[blockID], sourceBucket, copyFunc) if err != nil { m.blocksCopyFailed.Inc() level.Error(logger).Log("msg", "failed to copy block", "err", err) @@ -366,7 +395,7 @@ func isAllowedUser(enabled map[string]struct{}, disabled map[string]struct{}, te } // This method copies files within single TSDB block to a destination bucket. -func copySingleBlock(ctx context.Context, tenantID string, blockID ulid.ULID, markers blockMarkers, srcBkt, destBkt bucket) error { +func copySingleBlock(ctx context.Context, tenantID string, blockID ulid.ULID, markers blockMarkers, srcBkt bucket, copyFunc CopyFunc) error { paths, err := srcBkt.ListPrefix(ctx, tenantID+delim+blockID.String(), true) if err != nil { return errors.Wrapf(err, "copySingleBlock: failed to list block files for %v/%v", tenantID, blockID.String()) @@ -394,7 +423,7 @@ func copySingleBlock(ctx context.Context, tenantID string, blockID ulid.ULID, ma } for _, fullPath := range paths { - err := srcBkt.Copy(ctx, fullPath, destBkt) + err := copyFunc(ctx, fullPath) if err != nil { return errors.Wrapf(err, "copySingleBlock: failed to copy %v", fullPath) } @@ -404,7 +433,8 @@ func copySingleBlock(ctx context.Context, tenantID string, blockID ulid.ULID, ma } func uploadCopiedMarkerFile(ctx context.Context, bkt bucket, tenantID string, blockID ulid.ULID, targetBucketName string) error { - err := bkt.UploadMarkerFile(ctx, tenantID+delim+CopiedToBucketMarkFilename(blockID, targetBucketName)) + objectName := tenantID + delim + CopiedToBucketMarkFilename(blockID, targetBucketName) + err := bkt.Upload(ctx, objectName, bytes.NewReader([]byte{}), 0) return errors.Wrap(err, "uploadCopiedMarkerFile") } diff --git a/tools/copyblocks/s3.go b/tools/copyblocks/s3.go new file mode 100644 index 00000000000..edc3ebf2c31 --- /dev/null +++ b/tools/copyblocks/s3.go @@ -0,0 +1,158 @@ +// SPDX-License-Identifier: AGPL-3.0-only + +package main + +import ( + "context" + "flag" + "io" + "strings" + + "github.com/minio/minio-go/v7" + "github.com/minio/minio-go/v7/pkg/credentials" + "github.com/pkg/errors" +) + +type s3Config struct { + source s3ClientConfig + destination s3ClientConfig +} + +func (c *s3Config) RegisterFlags(f *flag.FlagSet) { + c.source.RegisterFlags("s3-source-", f) + c.destination.RegisterFlags("s3-destination-", f) +} + +func (c *s3Config) validate(source, destination string) error { + if source == serviceS3 { + if err := c.source.validate("s3-source-"); err != nil { + return err + } + } + if destination == serviceS3 { + return c.destination.validate("s3-destination-") + } + return nil +} + +type s3ClientConfig struct { + endpoint string + accessKey string + secretKey string + secure bool +} + +func (c *s3ClientConfig) RegisterFlags(prefix string, f *flag.FlagSet) { + f.StringVar(&c.endpoint, prefix+"endpoint", "", "The endpoint to contact when accessing the bucket.") + f.StringVar(&c.accessKey, prefix+"access-key", "", "The access key used in AWS Signature Version 4 authentication.") + f.StringVar(&c.secretKey, prefix+"secret-key", "", "The secret key used in AWS Signature Version 4 authentication.") + f.BoolVar(&c.secure, prefix+"secure", true, "If true (default), use HTTPS when connecting to the bucket. If false, insecure HTTP is used.") +} + +func (c *s3ClientConfig) validate(prefix string) error { + if c.endpoint == "" { + return errors.New(prefix + "endpoint is missing") + } + if c.accessKey == "" { + return errors.New(prefix + "access-key is missing") + } + if c.secretKey == "" { + return errors.New(prefix + "secret-key is missing") + } + return nil +} + +type s3Bucket struct { + *minio.Client + bucketName string +} + +func newS3Client(cfg s3ClientConfig, bucketName string) (bucket, error) { + client, err := minio.New(cfg.endpoint, &minio.Options{ + Creds: credentials.NewStaticV4(cfg.accessKey, cfg.secretKey, ""), + Secure: cfg.secure, + }) + if err != nil { + return nil, err + } + return &s3Bucket{ + Client: client, + bucketName: bucketName, + }, nil +} + +func (bkt *s3Bucket) Get(ctx context.Context, objectName string) (io.ReadCloser, error) { + obj, err := bkt.GetObject(ctx, bkt.bucketName, objectName, minio.GetObjectOptions{}) + if err != nil { + return nil, err + } + return obj, nil +} + +func (bkt *s3Bucket) ServerSideCopy(ctx context.Context, objectName string, dstBucket bucket) error { + d, ok := dstBucket.(*s3Bucket) + if !ok { + return errors.New("destination bucket wasn't an S3 bucket") + } + _, err := d.CopyObject(ctx, + minio.CopyDestOptions{ + Bucket: d.bucketName, + Object: objectName, + }, + minio.CopySrcOptions{ + Bucket: bkt.bucketName, + Object: objectName, + }, + ) + return err +} + +func (bkt *s3Bucket) ClientSideCopy(ctx context.Context, objectName string, dstBucket bucket) error { + obj, err := bkt.GetObject(ctx, bkt.bucketName, objectName, minio.GetObjectOptions{}) + if err != nil { + return errors.Wrap(err, "failed to get source object from S3") + } + objInfo, err := obj.Stat() + if err != nil { + return errors.Wrap(err, "failed to get source object information from S3") + } + if err := dstBucket.Upload(ctx, objectName, obj, objInfo.Size); err != nil { + _ = obj.Close() + return errors.Wrap(err, "failed to upload source object from S3 to destination") + } + return errors.Wrap(obj.Close(), "failed to close source object reader from S3") +} + +func (bkt *s3Bucket) ListPrefix(ctx context.Context, prefix string, recursive bool) ([]string, error) { + if prefix != "" && !strings.HasSuffix(prefix, delim) { + prefix = prefix + delim + } + options := minio.ListObjectsOptions{ + Prefix: prefix, + Recursive: recursive, + } + result := make([]string, 0, 10) + objects := bkt.ListObjects(ctx, bkt.bucketName, options) + for obj := range objects { + if obj.Err != nil { + return nil, obj.Err + } + key := obj.Key + if strings.HasPrefix(key, prefix) { + key = strings.TrimPrefix(key, prefix) + } else { + return nil, errors.Errorf("listPrefix: path has invalid prefix: %v, expected prefix: %v", key, prefix) + } + result = append(result, key) + } + return result, ctx.Err() +} + +func (bkt *s3Bucket) Upload(ctx context.Context, objectName string, reader io.Reader, contentLength int64) error { + _, err := bkt.PutObject(ctx, bkt.bucketName, objectName, reader, contentLength, minio.PutObjectOptions{}) + return err +} + +func (bkt *s3Bucket) Name() string { + return bkt.bucketName +}