From 58a509b0e72a132d949021ca69e4539486e33c2c Mon Sep 17 00:00:00 2001 From: Ashwanth Date: Fri, 15 Nov 2024 19:40:21 +0530 Subject: [PATCH] feat(thanos): add support for aliyun oss and baidu bos (#14891) --- pkg/storage/bucket/bos/bucket_client.go | 17 + pkg/storage/bucket/bos/config.go | 26 ++ pkg/storage/bucket/client.go | 22 +- pkg/storage/bucket/oss/bucket_client.go | 18 + pkg/storage/bucket/oss/config.go | 28 ++ .../thanos-io/objstore/clientutil/parse.go | 65 +++ .../thanos-io/objstore/providers/bos/bos.go | 440 ++++++++++++++++++ .../thanos-io/objstore/providers/oss/oss.go | 426 +++++++++++++++++ vendor/modules.txt | 3 + 9 files changed, 1042 insertions(+), 3 deletions(-) create mode 100644 pkg/storage/bucket/bos/bucket_client.go create mode 100644 pkg/storage/bucket/bos/config.go create mode 100644 pkg/storage/bucket/oss/bucket_client.go create mode 100644 pkg/storage/bucket/oss/config.go create mode 100644 vendor/github.com/thanos-io/objstore/clientutil/parse.go create mode 100644 vendor/github.com/thanos-io/objstore/providers/bos/bos.go create mode 100644 vendor/github.com/thanos-io/objstore/providers/oss/oss.go diff --git a/pkg/storage/bucket/bos/bucket_client.go b/pkg/storage/bucket/bos/bucket_client.go new file mode 100644 index 0000000000000..438ccb34a89f1 --- /dev/null +++ b/pkg/storage/bucket/bos/bucket_client.go @@ -0,0 +1,17 @@ +package bos + +import ( + "github.com/go-kit/log" + "github.com/thanos-io/objstore" + "github.com/thanos-io/objstore/providers/bos" +) + +func NewBucketClient(cfg Config, name string, logger log.Logger) (objstore.Bucket, error) { + bosCfg := bos.Config{ + Endpoint: cfg.Endpoint, + Bucket: cfg.Bucket, + SecretKey: cfg.SecretKey.String(), + AccessKey: cfg.AccessKey, + } + return bos.NewBucketWithConfig(logger, bosCfg, name) +} diff --git a/pkg/storage/bucket/bos/config.go b/pkg/storage/bucket/bos/config.go new file mode 100644 index 0000000000000..73f1b3091a387 --- /dev/null +++ b/pkg/storage/bucket/bos/config.go @@ -0,0 +1,26 @@ +package bos + +import ( + "flag" + + "github.com/grafana/dskit/flagext" +) + +// Config holds the configuration for Baidu Cloud BOS client +type Config struct { + Bucket string `yaml:"bucket"` + Endpoint string `yaml:"endpoint"` + AccessKey string `yaml:"access_key"` + SecretKey flagext.Secret `yaml:"secret_key"` +} + +func (cfg *Config) RegisterFlags(f *flag.FlagSet) { + cfg.RegisterFlagsWithPrefix("", f) +} + +func (cfg *Config) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet) { + f.StringVar(&cfg.Bucket, prefix+"bos.bucket", "", "Name of BOS bucket.") + f.StringVar(&cfg.Endpoint, prefix+"bos.endpoint", "", "BOS endpoint to connect to.") + f.StringVar(&cfg.AccessKey, prefix+"bos.access-key", "", "Baidu Cloud Engine (BCE) Access Key ID.") + f.Var(&cfg.SecretKey, prefix+"bos.secret-key", "Baidu Cloud Engine (BCE) Secret Access Key.") +} diff --git a/pkg/storage/bucket/client.go b/pkg/storage/bucket/client.go index 88813b59eac30..7d6ddb7bf1a28 100644 --- a/pkg/storage/bucket/client.go +++ b/pkg/storage/bucket/client.go @@ -15,8 +15,10 @@ import ( objstoretracing "github.com/thanos-io/objstore/tracing/opentracing" "github.com/grafana/loki/v3/pkg/storage/bucket/azure" + "github.com/grafana/loki/v3/pkg/storage/bucket/bos" "github.com/grafana/loki/v3/pkg/storage/bucket/filesystem" "github.com/grafana/loki/v3/pkg/storage/bucket/gcs" + "github.com/grafana/loki/v3/pkg/storage/bucket/oss" "github.com/grafana/loki/v3/pkg/storage/bucket/s3" "github.com/grafana/loki/v3/pkg/storage/bucket/swift" ) @@ -37,12 +39,18 @@ const ( // Filesystem is the value for the filesystem storage backend. Filesystem = "filesystem" + // Alibaba is the value for the Alibaba Cloud OSS storage backend + Alibaba = "alibabacloud" + + // BOS is the value for the Baidu Cloud BOS storage backend + BOS = "bos" + // validPrefixCharactersRegex allows only alphanumeric characters to prevent subtle bugs and simplify validation validPrefixCharactersRegex = `^[\da-zA-Z]+$` ) var ( - SupportedBackends = []string{S3, GCS, Azure, Swift, Filesystem} + SupportedBackends = []string{S3, GCS, Azure, Swift, Filesystem, Alibaba, BOS} ErrUnsupportedStorageBackend = errors.New("unsupported storage backend") ErrInvalidCharactersInStoragePrefix = errors.New("storage prefix contains invalid characters, it may only contain digits and English alphabet letters") @@ -73,6 +81,8 @@ type StorageBackendConfig struct { Azure azure.Config `yaml:"azure"` Swift swift.Config `yaml:"swift"` Filesystem filesystem.Config `yaml:"filesystem"` + Alibaba oss.Config `yaml:"alibaba"` + BOS bos.Config `yaml:"bos"` // Used to inject additional backends into the config. Allows for this config to // be embedded in multiple contexts and support non-object storage based backends. @@ -95,6 +105,8 @@ func (cfg *StorageBackendConfig) RegisterFlagsWithPrefixAndDefaultDirectory(pref cfg.Azure.RegisterFlagsWithPrefix(prefix, f) cfg.Swift.RegisterFlagsWithPrefix(prefix, f) cfg.Filesystem.RegisterFlagsWithPrefixAndDefaultDirectory(prefix, dir, f) + cfg.Alibaba.RegisterFlagsWithPrefix(prefix, f) + cfg.BOS.RegisterFlagsWithPrefix(prefix, f) } func (cfg *StorageBackendConfig) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet) { @@ -154,7 +166,7 @@ func (cfg *Config) disableRetries(backend string) error { cfg.Azure.MaxRetries = 1 case Swift: cfg.Swift.MaxRetries = 1 - case Filesystem: + case Filesystem, Alibaba, BOS: // do nothing default: return fmt.Errorf("cannot disable retries for backend: %s", backend) @@ -173,7 +185,7 @@ func (cfg *Config) configureTransport(backend string, rt http.RoundTripper) erro cfg.Azure.Transport = rt case Swift: cfg.Swift.Transport = rt - case Filesystem: + case Filesystem, Alibaba, BOS: // do nothing default: return fmt.Errorf("cannot configure transport for backend: %s", backend) @@ -201,6 +213,10 @@ func NewClient(ctx context.Context, backend string, cfg Config, name string, log client, err = swift.NewBucketClient(cfg.Swift, name, logger, instrumentTransport()) case Filesystem: client, err = filesystem.NewBucketClient(cfg.Filesystem) + case Alibaba: + client, err = oss.NewBucketClient(cfg.Alibaba, name, logger) + case BOS: + client, err = bos.NewBucketClient(cfg.BOS, name, logger) default: return nil, ErrUnsupportedStorageBackend } diff --git a/pkg/storage/bucket/oss/bucket_client.go b/pkg/storage/bucket/oss/bucket_client.go new file mode 100644 index 0000000000000..84c6cb4d4c979 --- /dev/null +++ b/pkg/storage/bucket/oss/bucket_client.go @@ -0,0 +1,18 @@ +package oss + +import ( + "github.com/go-kit/log" + "github.com/thanos-io/objstore" + "github.com/thanos-io/objstore/providers/oss" +) + +// NewBucketClient creates a new Alibaba Cloud OSS bucket client +func NewBucketClient(cfg Config, component string, logger log.Logger) (objstore.Bucket, error) { + ossCfg := oss.Config{ + Endpoint: cfg.Endpoint, + Bucket: cfg.Bucket, + AccessKeyID: cfg.AccessKeyID, + AccessKeySecret: cfg.AccessKeySecret.String(), + } + return oss.NewBucketWithConfig(logger, ossCfg, component, nil) +} diff --git a/pkg/storage/bucket/oss/config.go b/pkg/storage/bucket/oss/config.go new file mode 100644 index 0000000000000..2b9b3fa38f255 --- /dev/null +++ b/pkg/storage/bucket/oss/config.go @@ -0,0 +1,28 @@ +package oss + +import ( + "flag" + + "github.com/grafana/dskit/flagext" +) + +// Config holds the configuration for Alibaba Cloud OSS client +type Config struct { + Endpoint string `yaml:"endpoint"` + Bucket string `yaml:"bucket"` + AccessKeyID string `yaml:"access_key_id"` + AccessKeySecret flagext.Secret `yaml:"access_key_secret"` +} + +// RegisterFlags registers the flags for Alibaba Cloud OSS storage config +func (cfg *Config) RegisterFlags(f *flag.FlagSet) { + cfg.RegisterFlagsWithPrefix("", f) +} + +// RegisterFlagsWithPrefix registers the flags for Alibaba Cloud OSS storage config with prefix +func (cfg *Config) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet) { + f.StringVar(&cfg.Bucket, prefix+"oss.bucketname", "", "Name of OSS bucket.") + f.StringVar(&cfg.Endpoint, prefix+"oss.endpoint", "", "Endpoint to connect to.") + f.StringVar(&cfg.AccessKeyID, prefix+"oss.access-key-id", "", "alibabacloud Access Key ID") + f.Var(&cfg.AccessKeySecret, prefix+"oss.access-key-secret", "alibabacloud Secret Access Key") +} diff --git a/vendor/github.com/thanos-io/objstore/clientutil/parse.go b/vendor/github.com/thanos-io/objstore/clientutil/parse.go new file mode 100644 index 0000000000000..759c42d29c87a --- /dev/null +++ b/vendor/github.com/thanos-io/objstore/clientutil/parse.go @@ -0,0 +1,65 @@ +// Copyright (c) The Thanos Authors. +// Licensed under the Apache License 2.0. + +package clientutil + +import ( + "net/http" + "strconv" + "time" + + "github.com/pkg/errors" +) + +// ParseContentLength returns the content length (in bytes) parsed from the Content-Length +// HTTP header in input. +func ParseContentLength(m http.Header) (int64, error) { + const name = "Content-Length" + + v, ok := m[name] + if !ok { + return 0, errors.Errorf("%s header not found", name) + } + + if len(v) == 0 { + return 0, errors.Errorf("%s header has no values", name) + } + + ret, err := strconv.ParseInt(v[0], 10, 64) + if err != nil { + return 0, errors.Wrapf(err, "convert %s", name) + } + + return ret, nil +} + +// ParseLastModified returns the timestamp parsed from the Last-Modified +// HTTP header in input. +// Passing an second parameter, named f, to specify the time format. +// If f is empty then RFC3339 will be used as default format. +func ParseLastModified(m http.Header, f string) (time.Time, error) { + const ( + name = "Last-Modified" + defaultFormat = time.RFC3339 + ) + + v, ok := m[name] + if !ok { + return time.Time{}, errors.Errorf("%s header not found", name) + } + + if len(v) == 0 { + return time.Time{}, errors.Errorf("%s header has no values", name) + } + + if f == "" { + f = defaultFormat + } + + mod, err := time.Parse(f, v[0]) + if err != nil { + return time.Time{}, errors.Wrapf(err, "parse %s", name) + } + + return mod, nil +} diff --git a/vendor/github.com/thanos-io/objstore/providers/bos/bos.go b/vendor/github.com/thanos-io/objstore/providers/bos/bos.go new file mode 100644 index 0000000000000..20c8dd3e52be5 --- /dev/null +++ b/vendor/github.com/thanos-io/objstore/providers/bos/bos.go @@ -0,0 +1,440 @@ +// Copyright (c) The Thanos Authors. +// Licensed under the Apache License 2.0. + +package bos + +import ( + "context" + "fmt" + "io" + "math" + "math/rand" + "net/http" + "os" + "strings" + "testing" + "time" + + "github.com/baidubce/bce-sdk-go/bce" + "github.com/baidubce/bce-sdk-go/services/bos" + "github.com/baidubce/bce-sdk-go/services/bos/api" + "github.com/go-kit/log" + "github.com/pkg/errors" + "gopkg.in/yaml.v2" + + "github.com/thanos-io/objstore" +) + +// partSize 128MB. +const partSize = 1024 * 1024 * 128 + +// Bucket implements the store.Bucket interface against bos-compatible(Baidu Object Storage) APIs. +type Bucket struct { + logger log.Logger + client *bos.Client + name string +} + +// Config encapsulates the necessary config values to instantiate an bos client. +type Config struct { + Bucket string `yaml:"bucket"` + Endpoint string `yaml:"endpoint"` + AccessKey string `yaml:"access_key"` + SecretKey string `yaml:"secret_key"` +} + +func (conf *Config) validate() error { + if conf.Bucket == "" || + conf.Endpoint == "" || + conf.AccessKey == "" || + conf.SecretKey == "" { + return errors.New("insufficient BOS configuration information") + } + + return nil +} + +// parseConfig unmarshal a buffer into a Config with default HTTPConfig values. +func parseConfig(conf []byte) (Config, error) { + config := Config{} + if err := yaml.Unmarshal(conf, &config); err != nil { + return Config{}, err + } + + return config, nil +} + +// NewBucket new bos bucket. +func NewBucket(logger log.Logger, conf []byte, component string) (*Bucket, error) { + // TODO(https://github.com/thanos-io/objstore/pull/150): Add support for roundtripper wrapper. + if logger == nil { + logger = log.NewNopLogger() + } + + config, err := parseConfig(conf) + if err != nil { + return nil, errors.Wrap(err, "parsing BOS configuration") + } + + return NewBucketWithConfig(logger, config, component) +} + +// NewBucketWithConfig returns a new Bucket using the provided bos config struct. +func NewBucketWithConfig(logger log.Logger, config Config, component string) (*Bucket, error) { + if err := config.validate(); err != nil { + return nil, errors.Wrap(err, "validating BOS configuration") + } + + client, err := bos.NewClient(config.AccessKey, config.SecretKey, config.Endpoint) + if err != nil { + return nil, errors.Wrap(err, "creating BOS client") + } + + client.Config.UserAgent = fmt.Sprintf("thanos-%s", component) + + bkt := &Bucket{ + logger: logger, + client: client, + name: config.Bucket, + } + return bkt, nil +} + +// Name returns the bucket name for the provider. +func (b *Bucket) Name() string { + return b.name +} + +// Delete removes the object with the given name. +func (b *Bucket) Delete(_ context.Context, name string) error { + return b.client.DeleteObject(b.name, name) +} + +// Upload the contents of the reader as an object into the bucket. +func (b *Bucket) Upload(_ context.Context, name string, r io.Reader) error { + size, err := objstore.TryToGetSize(r) + if err != nil { + return errors.Wrapf(err, "getting size of %s", name) + } + + partNums, lastSlice := int(math.Floor(float64(size)/partSize)), size%partSize + if partNums == 0 { + body, err := bce.NewBodyFromSizedReader(r, lastSlice) + if err != nil { + return errors.Wrapf(err, "failed to create SizedReader for %s", name) + } + + if _, err := b.client.PutObject(b.name, name, body, nil); err != nil { + return errors.Wrapf(err, "failed to upload %s", name) + } + + return nil + } + + result, err := b.client.BasicInitiateMultipartUpload(b.name, name) + if err != nil { + return errors.Wrapf(err, "failed to initiate MultipartUpload for %s", name) + } + + uploadEveryPart := func(partSize int64, part int, uploadId string) (string, error) { + body, err := bce.NewBodyFromSizedReader(r, partSize) + if err != nil { + return "", err + } + + etag, err := b.client.UploadPart(b.name, name, uploadId, part, body, nil) + if err != nil { + if err := b.client.AbortMultipartUpload(b.name, name, uploadId); err != nil { + return etag, err + } + return etag, err + } + return etag, nil + } + + var parts []api.UploadInfoType + + for part := 1; part <= partNums; part++ { + etag, err := uploadEveryPart(partSize, part, result.UploadId) + if err != nil { + return errors.Wrapf(err, "failed to upload part %d for %s", part, name) + } + parts = append(parts, api.UploadInfoType{PartNumber: part, ETag: etag}) + } + + if lastSlice != 0 { + etag, err := uploadEveryPart(lastSlice, partNums+1, result.UploadId) + if err != nil { + return errors.Wrapf(err, "failed to upload the last part for %s", name) + } + parts = append(parts, api.UploadInfoType{PartNumber: partNums + 1, ETag: etag}) + } + + if _, err := b.client.CompleteMultipartUploadFromStruct(b.name, name, result.UploadId, &api.CompleteMultipartUploadArgs{Parts: parts}); err != nil { + return errors.Wrapf(err, "failed to set %s upload completed", name) + } + return nil +} + +func (b *Bucket) SupportedIterOptions() []objstore.IterOptionType { + return []objstore.IterOptionType{objstore.Recursive, objstore.UpdatedAt} +} + +func (b *Bucket) IterWithAttributes(ctx context.Context, dir string, f func(attrs objstore.IterObjectAttributes) error, options ...objstore.IterOption) error { + if err := objstore.ValidateIterOptions(b.SupportedIterOptions(), options...); err != nil { + return err + } + + if dir != "" { + dir = strings.TrimSuffix(dir, objstore.DirDelim) + objstore.DirDelim + } + + delimiter := objstore.DirDelim + + params := objstore.ApplyIterOptions(options...) + if params.Recursive { + delimiter = "" + } + + var marker string + for { + if err := ctx.Err(); err != nil { + return err + } + + objects, err := b.client.ListObjects(b.name, &api.ListObjectsArgs{ + Delimiter: delimiter, + Marker: marker, + MaxKeys: 1000, + Prefix: dir, + }) + if err != nil { + return err + } + + marker = objects.NextMarker + for _, object := range objects.Contents { + attrs := objstore.IterObjectAttributes{ + Name: object.Key, + } + + if params.LastModified && object.LastModified != "" { + lastModified, err := time.Parse(time.RFC1123, object.LastModified) + if err != nil { + return fmt.Errorf("iter: get last modified: %w", err) + } + attrs.SetLastModified(lastModified) + } + + if err := f(attrs); err != nil { + return err + } + } + + for _, object := range objects.CommonPrefixes { + if err := f(objstore.IterObjectAttributes{Name: object.Prefix}); err != nil { + return err + } + } + if !objects.IsTruncated { + break + } + } + return nil +} + +// Iter calls f for each entry in the given directory. The argument to f is the full +// object name including the prefix of the inspected directory. +func (b *Bucket) Iter(ctx context.Context, dir string, f func(string) error, opts ...objstore.IterOption) error { + // Only include recursive option since attributes are not used in this method. + var filteredOpts []objstore.IterOption + for _, opt := range opts { + if opt.Type == objstore.Recursive { + filteredOpts = append(filteredOpts, opt) + break + } + } + + return b.IterWithAttributes(ctx, dir, func(attrs objstore.IterObjectAttributes) error { + return f(attrs.Name) + }, filteredOpts...) +} + +// Get returns a reader for the given object name. +func (b *Bucket) Get(ctx context.Context, name string) (io.ReadCloser, error) { + return b.getRange(ctx, b.name, name, 0, -1) +} + +// GetRange returns a new range reader for the given object name and range. +func (b *Bucket) GetRange(ctx context.Context, name string, off, length int64) (io.ReadCloser, error) { + return b.getRange(ctx, b.name, name, off, length) +} + +// Exists checks if the given object exists in the bucket. +func (b *Bucket) Exists(_ context.Context, name string) (bool, error) { + _, err := b.client.GetObjectMeta(b.name, name) + if err != nil { + if b.IsObjNotFoundErr(err) { + return false, nil + } + return false, errors.Wrapf(err, "getting object metadata of %s", name) + } + return true, nil +} + +func (b *Bucket) Close() error { + return nil +} + +// ObjectSize returns the size of the specified object. +func (b *Bucket) ObjectSize(_ context.Context, name string) (uint64, error) { + objMeta, err := b.client.GetObjectMeta(b.name, name) + if err != nil { + return 0, err + } + return uint64(objMeta.ContentLength), nil +} + +// Attributes returns information about the specified object. +func (b *Bucket) Attributes(_ context.Context, name string) (objstore.ObjectAttributes, error) { + objMeta, err := b.client.GetObjectMeta(b.name, name) + if err != nil { + return objstore.ObjectAttributes{}, errors.Wrapf(err, "gettting objectmeta of %s", name) + } + + lastModified, err := time.Parse(time.RFC1123, objMeta.LastModified) + if err != nil { + return objstore.ObjectAttributes{}, err + } + + return objstore.ObjectAttributes{ + Size: objMeta.ContentLength, + LastModified: lastModified, + }, nil +} + +// IsObjNotFoundErr returns true if error means that object is not found. Relevant to Get operations. +func (b *Bucket) IsObjNotFoundErr(err error) bool { + switch bosErr := errors.Cause(err).(type) { + case *bce.BceServiceError: + if bosErr.StatusCode == http.StatusNotFound || bosErr.Code == "NoSuchKey" { + return true + } + } + return false +} + +// IsAccessDeniedErr returns true if access to object is denied. +func (b *Bucket) IsAccessDeniedErr(_ error) bool { + return false +} + +func (b *Bucket) getRange(_ context.Context, bucketName, objectKey string, off, length int64) (io.ReadCloser, error) { + if len(objectKey) == 0 { + return nil, errors.Errorf("given object name should not empty") + } + + ranges := []int64{off} + if length != -1 { + ranges = append(ranges, off+length-1) + } + + obj, err := b.client.GetObject(bucketName, objectKey, map[string]string{}, ranges...) + if err != nil { + return nil, err + } + + return objstore.ObjectSizerReadCloser{ + ReadCloser: obj.Body, + Size: func() (int64, error) { + return obj.ContentLength, nil + }, + }, err +} + +func configFromEnv() Config { + c := Config{ + Bucket: os.Getenv("BOS_BUCKET"), + Endpoint: os.Getenv("BOS_ENDPOINT"), + AccessKey: os.Getenv("BOS_ACCESS_KEY"), + SecretKey: os.Getenv("BOS_SECRET_KEY"), + } + return c +} + +// NewTestBucket creates test bkt client that before returning creates temporary bucket. +// In a close function it empties and deletes the bucket. +func NewTestBucket(t testing.TB) (objstore.Bucket, func(), error) { + c := configFromEnv() + if err := validateForTest(c); err != nil { + return nil, nil, err + } + + if c.Bucket != "" { + if os.Getenv("THANOS_ALLOW_EXISTING_BUCKET_USE") == "" { + return nil, nil, errors.New("BOS_BUCKET is defined. Normally this tests will create temporary bucket " + + "and delete it after test. Unset BOS_BUCKET env variable to use default logic. If you really want to run " + + "tests against provided (NOT USED!) bucket, set THANOS_ALLOW_EXISTING_BUCKET_USE=true. WARNING: That bucket " + + "needs to be manually cleared. This means that it is only useful to run one test in a time. This is due " + + "to safety (accidentally pointing prod bucket for test) as well as BOS not being fully strong consistent.") + } + + bc, err := yaml.Marshal(c) + if err != nil { + return nil, nil, err + } + + b, err := NewBucket(log.NewNopLogger(), bc, "thanos-e2e-test") + if err != nil { + return nil, nil, err + } + + if err := b.Iter(context.Background(), "", func(f string) error { + return errors.Errorf("bucket %s is not empty", c.Bucket) + }); err != nil { + return nil, nil, errors.Wrapf(err, "checking bucket %s", c.Bucket) + } + + t.Log("WARNING. Reusing", c.Bucket, "BOS bucket for BOS tests. Manual cleanup afterwards is required") + return b, func() {}, nil + } + + src := rand.NewSource(time.Now().UnixNano()) + tmpBucketName := strings.Replace(fmt.Sprintf("test_%x", src.Int63()), "_", "-", -1) + + if len(tmpBucketName) >= 31 { + tmpBucketName = tmpBucketName[:31] + } + + c.Bucket = tmpBucketName + bc, err := yaml.Marshal(c) + if err != nil { + return nil, nil, err + } + + b, err := NewBucket(log.NewNopLogger(), bc, "thanos-e2e-test") + if err != nil { + return nil, nil, err + } + + if _, err := b.client.PutBucket(b.name); err != nil { + return nil, nil, err + } + + t.Log("created temporary BOS bucket for BOS tests with name", tmpBucketName) + return b, func() { + objstore.EmptyBucket(t, context.Background(), b) + if err := b.client.DeleteBucket(b.name); err != nil { + t.Logf("deleting bucket %s failed: %s", tmpBucketName, err) + } + }, nil +} + +func validateForTest(conf Config) error { + if conf.Endpoint == "" || + conf.AccessKey == "" || + conf.SecretKey == "" { + return errors.New("insufficient BOS configuration information") + } + return nil +} diff --git a/vendor/github.com/thanos-io/objstore/providers/oss/oss.go b/vendor/github.com/thanos-io/objstore/providers/oss/oss.go new file mode 100644 index 0000000000000..aee8c62312da6 --- /dev/null +++ b/vendor/github.com/thanos-io/objstore/providers/oss/oss.go @@ -0,0 +1,426 @@ +// Copyright (c) The Thanos Authors. +// Licensed under the Apache License 2.0. + +package oss + +import ( + "context" + "fmt" + "io" + "math" + "math/rand" + "net/http" + "os" + "strconv" + "strings" + "testing" + "time" + + "github.com/aliyun/aliyun-oss-go-sdk/oss" + alioss "github.com/aliyun/aliyun-oss-go-sdk/oss" + "github.com/go-kit/log" + "github.com/pkg/errors" + "gopkg.in/yaml.v2" + + "github.com/thanos-io/objstore" + "github.com/thanos-io/objstore/clientutil" + "github.com/thanos-io/objstore/exthttp" +) + +// PartSize is a part size for multi part upload. +const PartSize = 1024 * 1024 * 128 + +// Config stores the configuration for oss bucket. +type Config struct { + Endpoint string `yaml:"endpoint"` + Bucket string `yaml:"bucket"` + AccessKeyID string `yaml:"access_key_id"` + AccessKeySecret string `yaml:"access_key_secret"` +} + +// Bucket implements the store.Bucket interface. +type Bucket struct { + name string + logger log.Logger + client *alioss.Client + config Config + bucket *alioss.Bucket +} + +func NewTestBucket(t testing.TB) (objstore.Bucket, func(), error) { + c := Config{ + Endpoint: os.Getenv("ALIYUNOSS_ENDPOINT"), + Bucket: os.Getenv("ALIYUNOSS_BUCKET"), + AccessKeyID: os.Getenv("ALIYUNOSS_ACCESS_KEY_ID"), + AccessKeySecret: os.Getenv("ALIYUNOSS_ACCESS_KEY_SECRET"), + } + + if c.Endpoint == "" || c.AccessKeyID == "" || c.AccessKeySecret == "" { + return nil, nil, errors.New("aliyun oss endpoint or access_key_id or access_key_secret " + + "is not present in config file") + } + if c.Bucket != "" && os.Getenv("THANOS_ALLOW_EXISTING_BUCKET_USE") == "true" { + t.Log("ALIYUNOSS_BUCKET is defined. Normally this tests will create temporary bucket " + + "and delete it after test. Unset ALIYUNOSS_BUCKET env variable to use default logic. If you really want to run " + + "tests against provided (NOT USED!) bucket, set THANOS_ALLOW_EXISTING_BUCKET_USE=true.") + return NewTestBucketFromConfig(t, c, true) + } + return NewTestBucketFromConfig(t, c, false) +} + +// Upload the contents of the reader as an object into the bucket. +func (b *Bucket) Upload(_ context.Context, name string, r io.Reader) error { + // TODO(https://github.com/thanos-io/thanos/issues/678): Remove guessing length when minio provider will support multipart upload without this. + size, err := objstore.TryToGetSize(r) + if err != nil { + return errors.Wrapf(err, "failed to get size apriori to upload %s", name) + } + + chunksnum, lastslice := int(math.Floor(float64(size)/PartSize)), size%PartSize + + ncloser := io.NopCloser(r) + switch chunksnum { + case 0: + if err := b.bucket.PutObject(name, ncloser); err != nil { + return errors.Wrap(err, "failed to upload oss object") + } + default: + { + init, err := b.bucket.InitiateMultipartUpload(name) + if err != nil { + return errors.Wrap(err, "failed to initiate multi-part upload") + } + chunk := 0 + uploadEveryPart := func(everypartsize int64, cnk int) (alioss.UploadPart, error) { + prt, err := b.bucket.UploadPart(init, ncloser, everypartsize, cnk) + if err != nil { + if err := b.bucket.AbortMultipartUpload(init); err != nil { + return prt, errors.Wrap(err, "failed to abort multi-part upload") + } + + return prt, errors.Wrap(err, "failed to upload multi-part chunk") + } + return prt, nil + } + var parts []alioss.UploadPart + for ; chunk < chunksnum; chunk++ { + part, err := uploadEveryPart(PartSize, chunk+1) + if err != nil { + return errors.Wrap(err, "failed to upload every part") + } + parts = append(parts, part) + } + if lastslice != 0 { + part, err := uploadEveryPart(lastslice, chunksnum+1) + if err != nil { + return errors.Wrap(err, "failed to upload the last chunk") + } + parts = append(parts, part) + } + if _, err := b.bucket.CompleteMultipartUpload(init, parts); err != nil { + return errors.Wrap(err, "failed to set multi-part upload completive") + } + } + } + return nil +} + +// Delete removes the object with the given name. +func (b *Bucket) Delete(ctx context.Context, name string) error { + if err := b.bucket.DeleteObject(name); err != nil { + return errors.Wrap(err, "delete oss object") + } + return nil +} + +// Attributes returns information about the specified object. +func (b *Bucket) Attributes(ctx context.Context, name string) (objstore.ObjectAttributes, error) { + m, err := b.bucket.GetObjectMeta(name) + if err != nil { + return objstore.ObjectAttributes{}, err + } + + size, err := clientutil.ParseContentLength(m) + if err != nil { + return objstore.ObjectAttributes{}, err + } + + // aliyun oss return Last-Modified header in RFC1123 format. + // see api doc for details: https://www.alibabacloud.com/help/doc-detail/31985.htm + mod, err := clientutil.ParseLastModified(m, time.RFC1123) + if err != nil { + return objstore.ObjectAttributes{}, err + } + + return objstore.ObjectAttributes{ + Size: size, + LastModified: mod, + }, nil +} + +// NewBucket returns a new Bucket using the provided oss config values. +func NewBucket(logger log.Logger, conf []byte, component string, wrapRoundtripper func(http.RoundTripper) http.RoundTripper) (*Bucket, error) { + var config Config + if err := yaml.Unmarshal(conf, &config); err != nil { + return nil, errors.Wrap(err, "parse aliyun oss config file failed") + } + return NewBucketWithConfig(logger, config, component, wrapRoundtripper) +} + +// NewBucketWithConfig returns a new Bucket using the provided oss config struct. +func NewBucketWithConfig(logger log.Logger, config Config, component string, wrapRoundtripper func(http.RoundTripper) http.RoundTripper) (*Bucket, error) { + if err := validate(config); err != nil { + return nil, err + } + var clientOptions []alioss.ClientOption + if wrapRoundtripper != nil { + rt, err := exthttp.DefaultTransport(exthttp.DefaultHTTPConfig) + if err != nil { + return nil, err + } + clientOptions = append(clientOptions, func(client *alioss.Client) { + client.HTTPClient = &http.Client{ + Transport: wrapRoundtripper(rt), + } + }) + } + client, err := alioss.New(config.Endpoint, config.AccessKeyID, config.AccessKeySecret, clientOptions...) + if err != nil { + return nil, errors.Wrap(err, "create aliyun oss client failed") + } + bk, err := client.Bucket(config.Bucket) + if err != nil { + return nil, errors.Wrapf(err, "use aliyun oss bucket %s failed", config.Bucket) + } + + bkt := &Bucket{ + logger: logger, + client: client, + name: config.Bucket, + config: config, + bucket: bk, + } + return bkt, nil +} + +// validate checks to see the config options are set. +func validate(config Config) error { + if config.Endpoint == "" || config.Bucket == "" { + return errors.New("aliyun oss endpoint or bucket is not present in config file") + } + if config.AccessKeyID == "" || config.AccessKeySecret == "" { + return errors.New("aliyun oss access_key_id or access_key_secret is not present in config file") + } + + return nil +} + +func (b *Bucket) SupportedIterOptions() []objstore.IterOptionType { + return []objstore.IterOptionType{objstore.Recursive} +} + +// Iter calls f for each entry in the given directory. The argument to f is the full +// object name including the prefix of the inspected directory. +func (b *Bucket) Iter(ctx context.Context, dir string, f func(string) error, options ...objstore.IterOption) error { + if dir != "" { + dir = strings.TrimSuffix(dir, objstore.DirDelim) + objstore.DirDelim + } + + delimiter := alioss.Delimiter(objstore.DirDelim) + if objstore.ApplyIterOptions(options...).Recursive { + delimiter = nil + } + + marker := alioss.Marker("") + for { + if err := ctx.Err(); err != nil { + return errors.Wrap(err, "context closed while iterating bucket") + } + objects, err := b.bucket.ListObjects(alioss.Prefix(dir), delimiter, marker) + if err != nil { + return errors.Wrap(err, "listing aliyun oss bucket failed") + } + marker = alioss.Marker(objects.NextMarker) + + for _, object := range objects.Objects { + if err := f(object.Key); err != nil { + return errors.Wrapf(err, "callback func invoke for object %s failed ", object.Key) + } + } + + for _, object := range objects.CommonPrefixes { + if err := f(object); err != nil { + return errors.Wrapf(err, "callback func invoke for directory %s failed", object) + } + } + if !objects.IsTruncated { + break + } + } + + return nil +} + +func (b *Bucket) IterWithAttributes(ctx context.Context, dir string, f func(attrs objstore.IterObjectAttributes) error, options ...objstore.IterOption) error { + if err := objstore.ValidateIterOptions(b.SupportedIterOptions(), options...); err != nil { + return err + } + + return b.Iter(ctx, dir, func(name string) error { + return f(objstore.IterObjectAttributes{Name: name}) + }, options...) +} + +func (b *Bucket) Name() string { + return b.name +} + +func NewTestBucketFromConfig(t testing.TB, c Config, reuseBucket bool) (objstore.Bucket, func(), error) { + if c.Bucket == "" { + src := rand.NewSource(time.Now().UnixNano()) + + bktToCreate := strings.ReplaceAll(fmt.Sprintf("test_%s_%x", strings.ToLower(t.Name()), src.Int63()), "_", "-") + if len(bktToCreate) >= 63 { + bktToCreate = bktToCreate[:63] + } + testclient, err := alioss.New(c.Endpoint, c.AccessKeyID, c.AccessKeySecret) + if err != nil { + return nil, nil, errors.Wrap(err, "create aliyun oss client failed") + } + + if err := testclient.CreateBucket(bktToCreate); err != nil { + return nil, nil, errors.Wrapf(err, "create aliyun oss bucket %s failed", bktToCreate) + } + c.Bucket = bktToCreate + } + + bc, err := yaml.Marshal(c) + if err != nil { + return nil, nil, err + } + + b, err := NewBucket(log.NewNopLogger(), bc, "thanos-aliyun-oss-test", nil) + if err != nil { + return nil, nil, err + } + + if reuseBucket { + if err := b.Iter(context.Background(), "", func(_ string) error { + return errors.Errorf("bucket %s is not empty", c.Bucket) + }); err != nil { + return nil, nil, errors.Wrapf(err, "oss check bucket %s", c.Bucket) + } + + t.Log("WARNING. Reusing", c.Bucket, "Aliyun OSS bucket for OSS tests. Manual cleanup afterwards is required") + return b, func() {}, nil + } + + return b, func() { + objstore.EmptyBucket(t, context.Background(), b) + if err := b.client.DeleteBucket(c.Bucket); err != nil { + t.Logf("deleting bucket %s failed: %s", c.Bucket, err) + } + }, nil +} + +func (b *Bucket) Close() error { return nil } + +func (b *Bucket) setRange(start, end int64, name string) (alioss.Option, error) { + var opt alioss.Option + if 0 <= start && start <= end { + header, err := b.bucket.GetObjectMeta(name) + if err != nil { + return nil, err + } + + size, err := strconv.ParseInt(header["Content-Length"][0], 10, 64) + if err != nil { + return nil, err + } + + if end > size { + end = size - 1 + } + + opt = alioss.Range(start, end) + } else { + return nil, errors.Errorf("Invalid range specified: start=%d end=%d", start, end) + } + return opt, nil +} + +func (b *Bucket) getRange(_ context.Context, name string, off, length int64) (io.ReadCloser, error) { + if name == "" { + return nil, errors.New("given object name should not empty") + } + + var opts []alioss.Option + if length != -1 { + opt, err := b.setRange(off, off+length-1, name) + if err != nil { + return nil, err + } + opts = append(opts, opt) + } + + resp, err := b.bucket.DoGetObject(&oss.GetObjectRequest{ObjectKey: name}, opts) + if err != nil { + return nil, err + } + + size, err := clientutil.ParseContentLength(resp.Response.Headers) + if err == nil { + return objstore.ObjectSizerReadCloser{ + ReadCloser: resp.Response, + Size: func() (int64, error) { + return size, nil + }, + }, nil + } + + return resp.Response, nil +} + +// Get returns a reader for the given object name. +func (b *Bucket) Get(ctx context.Context, name string) (io.ReadCloser, error) { + return b.getRange(ctx, name, 0, -1) +} + +func (b *Bucket) GetRange(ctx context.Context, name string, off, length int64) (io.ReadCloser, error) { + return b.getRange(ctx, name, off, length) +} + +// Exists checks if the given object exists in the bucket. +func (b *Bucket) Exists(ctx context.Context, name string) (bool, error) { + exists, err := b.bucket.IsObjectExist(name) + if err != nil { + if b.IsObjNotFoundErr(err) { + return false, nil + } + return false, errors.Wrap(err, "cloud not check if object exists") + } + + return exists, nil +} + +// IsObjNotFoundErr returns true if error means that object is not found. Relevant to Get operations. +func (b *Bucket) IsObjNotFoundErr(err error) bool { + switch aliErr := errors.Cause(err).(type) { + case alioss.ServiceError: + if aliErr.StatusCode == http.StatusNotFound { + return true + } + } + return false +} + +// IsAccessDeniedErr returns true if access to object is denied. +func (b *Bucket) IsAccessDeniedErr(err error) bool { + switch aliErr := errors.Cause(err).(type) { + case alioss.ServiceError: + if aliErr.StatusCode == http.StatusForbidden { + return true + } + } + return false +} diff --git a/vendor/modules.txt b/vendor/modules.txt index 9291d3f44f955..b688db14dac25 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -1588,10 +1588,13 @@ github.com/stretchr/testify/suite # github.com/thanos-io/objstore v0.0.0-20241111205755-d1dd89d41f97 ## explicit; go 1.22 github.com/thanos-io/objstore +github.com/thanos-io/objstore/clientutil github.com/thanos-io/objstore/exthttp github.com/thanos-io/objstore/providers/azure +github.com/thanos-io/objstore/providers/bos github.com/thanos-io/objstore/providers/filesystem github.com/thanos-io/objstore/providers/gcs +github.com/thanos-io/objstore/providers/oss github.com/thanos-io/objstore/providers/s3 github.com/thanos-io/objstore/providers/swift github.com/thanos-io/objstore/tracing/opentracing