Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for object upload options #164

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion inmem.go
Original file line number Diff line number Diff line change
Expand Up @@ -213,7 +213,7 @@ func (b *InMemBucket) Attributes(_ context.Context, name string) (ObjectAttribut
}

// Upload writes the file specified in src to into the memory.
func (b *InMemBucket) Upload(_ context.Context, name string, r io.Reader) error {
func (b *InMemBucket) Upload(_ context.Context, name string, r io.Reader, _ ...ObjectUploadOption) error {
b.mtx.Lock()
defer b.mtx.Unlock()
body, err := io.ReadAll(r)
Expand Down
26 changes: 23 additions & 3 deletions objstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ type Bucket interface {

// Upload the contents of the reader as an object into the bucket.
// Upload should be idempotent.
Upload(ctx context.Context, name string, r io.Reader) error
Upload(ctx context.Context, name string, r io.Reader, opts ...ObjectUploadOption) error

// Delete removes the object with the given name.
// If object does not exist in the moment of deletion, Delete should throw error.
Expand Down Expand Up @@ -196,6 +196,26 @@ func ApplyIterOptions(options ...IterOption) IterParams {
return out
}

type UploadObjectParams struct {
ContentType string
}

type ObjectUploadOption func(f *UploadObjectParams)

func WithContentType(contentType string) ObjectUploadOption {
return func(f *UploadObjectParams) {
f.ContentType = contentType
}
}

func ApplyObjectUploadOptions(opts ...ObjectUploadOption) UploadObjectParams {
out := UploadObjectParams{}
for _, opt := range opts {
opt(&out)
}
return out
}

// DownloadOption configures the provided params.
type DownloadOption func(params *downloadParams)

Expand Down Expand Up @@ -747,7 +767,7 @@ func (b *metricBucket) Exists(ctx context.Context, name string) (bool, error) {
return ok, nil
}

func (b *metricBucket) Upload(ctx context.Context, name string, r io.Reader) error {
func (b *metricBucket) Upload(ctx context.Context, name string, r io.Reader, opts ...ObjectUploadOption) error {
const op = OpUpload
b.metrics.ops.WithLabelValues(op).Inc()

Expand All @@ -765,7 +785,7 @@ func (b *metricBucket) Upload(ctx context.Context, name string, r io.Reader) err
b.metrics.opsTransferredBytes,
)
defer trc.Close()
err := b.bkt.Upload(ctx, name, trc)
err := b.bkt.Upload(ctx, name, trc, opts...)
if err != nil {
if !b.metrics.isOpFailureExpected(err) && ctx.Err() != context.Canceled {
b.metrics.opsFailures.WithLabelValues(op).Inc()
Expand Down
8 changes: 4 additions & 4 deletions objstore_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ func TestMetricBucket_UploadShouldPreserveReaderFeatures(t *testing.T) {

m := &mockBucket{
Bucket: WrapWithMetrics(NewInMemBucket(), nil, ""),
upload: func(ctx context.Context, name string, r io.Reader) error {
upload: func(ctx context.Context, name string, r io.Reader, opts ...ObjectUploadOption) error {
uploadReader = r
return nil
},
Expand Down Expand Up @@ -570,14 +570,14 @@ func (r *mockReader) Close() error {
type mockBucket struct {
Bucket

upload func(ctx context.Context, name string, r io.Reader) error
upload func(ctx context.Context, name string, r io.Reader, opts ...ObjectUploadOption) error
get func(ctx context.Context, name string) (io.ReadCloser, error)
getRange func(ctx context.Context, name string, off, length int64) (io.ReadCloser, error)
}

func (b *mockBucket) Upload(ctx context.Context, name string, r io.Reader) error {
func (b *mockBucket) Upload(ctx context.Context, name string, r io.Reader, opts ...ObjectUploadOption) error {
if b.upload != nil {
return b.upload(ctx, name, r)
return b.upload(ctx, name, r, opts...)
}
return errors.New("Upload has not been mocked")
}
Expand Down
4 changes: 2 additions & 2 deletions prefixed_bucket.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,8 +101,8 @@ func (p *PrefixedBucket) Attributes(ctx context.Context, name string) (ObjectAtt

// Upload the contents of the reader as an object into the bucket.
// Upload should be idempotent.
func (p *PrefixedBucket) Upload(ctx context.Context, name string, r io.Reader) error {
return p.bkt.Upload(ctx, conditionalPrefix(p.prefix, name), r)
func (p *PrefixedBucket) Upload(ctx context.Context, name string, r io.Reader, opts ...ObjectUploadOption) error {
return p.bkt.Upload(ctx, conditionalPrefix(p.prefix, name), r, opts...)
}

// Delete removes the object with the given name.
Expand Down
7 changes: 6 additions & 1 deletion providers/azure/azure.go
Original file line number Diff line number Diff line change
Expand Up @@ -365,12 +365,17 @@ func (b *Bucket) Exists(ctx context.Context, name string) (bool, error) {
}

// Upload the contents of the reader as an object into the bucket.
func (b *Bucket) Upload(ctx context.Context, name string, r io.Reader) error {
func (b *Bucket) Upload(ctx context.Context, name string, r io.Reader, uploadOpts ...objstore.ObjectUploadOption) error {
level.Debug(b.logger).Log("msg", "uploading blob", "blob", name)
blobClient := b.containerClient.NewBlockBlobClient(name)

uploadOptions := objstore.ApplyObjectUploadOptions(uploadOpts...)
opts := &blockblob.UploadStreamOptions{
BlockSize: 3 * 1024 * 1024,
Concurrency: 4,
HTTPHeaders: &blob.HTTPHeaders{
BlobContentType: &uploadOptions.ContentType,
},
}
if _, err := blobClient.UploadStream(ctx, r, opts); err != nil {
return errors.Wrapf(err, "cannot upload Azure blob, address: %s", name)
Expand Down
7 changes: 4 additions & 3 deletions providers/bos/bos.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,27 +113,28 @@ func (b *Bucket) Delete(_ context.Context, name string) error {
}

// Upload the contents of the reader as an object into the bucket.
func (b *Bucket) Upload(_ context.Context, name string, r io.Reader) error {
func (b *Bucket) Upload(_ context.Context, name string, r io.Reader, opts ...objstore.ObjectUploadOption) error {
size, err := objstore.TryToGetSize(r)
if err != nil {
return errors.Wrapf(err, "getting size of %s", name)
}

uploadOpts := objstore.ApplyObjectUploadOptions(opts...)
partNums, lastSlice := int(math.Floor(float64(size)/partSize)), size%partSize
if partNums == 0 {
body, err := bce.NewBodyFromSizedReader(r, lastSlice)
if err != nil {
return errors.Wrapf(err, "failed to create SizedReader for %s", name)
}

if _, err := b.client.PutObject(b.name, name, body, nil); err != nil {
if _, err := b.client.PutObject(b.name, name, body, &api.PutObjectArgs{ContentType: uploadOpts.ContentType}); err != nil {
return errors.Wrapf(err, "failed to upload %s", name)
}

return nil
}

result, err := b.client.BasicInitiateMultipartUpload(b.name, name)
result, err := b.client.InitiateMultipartUpload(b.name, name, uploadOpts.ContentType, nil)
if err != nil {
return errors.Wrapf(err, "failed to initiate MultipartUpload for %s", name)
}
Expand Down
18 changes: 15 additions & 3 deletions providers/cos/cos.go
Original file line number Diff line number Diff line change
Expand Up @@ -213,22 +213,34 @@ func (r fixedLengthReader) Size() int64 {
}

// Upload the contents of the reader as an object into the bucket.
func (b *Bucket) Upload(ctx context.Context, name string, r io.Reader) error {
func (b *Bucket) Upload(ctx context.Context, name string, r io.Reader, opts ...objstore.ObjectUploadOption) error {
size, err := objstore.TryToGetSize(r)
if err != nil {
return errors.Wrapf(err, "getting size of %s", name)
}
uploadOpts := objstore.ApplyObjectUploadOptions(opts...)

// partSize 128MB.
const partSize = 1024 * 1024 * 128
partNums, lastSlice := int(math.Floor(float64(size)/partSize)), size%partSize
if partNums == 0 {
if _, err := b.client.Object.Put(ctx, name, r, nil); err != nil {
cosOpts := &cos.ObjectPutOptions{
ObjectPutHeaderOptions: &cos.ObjectPutHeaderOptions{
ContentType: uploadOpts.ContentType,
},
}
if _, err := b.client.Object.Put(ctx, name, r, cosOpts); err != nil {
return errors.Wrapf(err, "Put object: %s", name)
}
return nil
}
// 1. init.
result, _, err := b.client.Object.InitiateMultipartUpload(ctx, name, nil)
cosOpts := &cos.InitiateMultipartUploadOptions{
ObjectPutHeaderOptions: &cos.ObjectPutHeaderOptions{
ContentType: uploadOpts.ContentType,
},
}
result, _, err := b.client.Object.InitiateMultipartUpload(ctx, name, cosOpts)
if err != nil {
return errors.Wrapf(err, "InitiateMultipartUpload %s", name)
}
Expand Down
2 changes: 1 addition & 1 deletion providers/filesystem/filesystem.go
Original file line number Diff line number Diff line change
Expand Up @@ -247,7 +247,7 @@ func (b *Bucket) Exists(ctx context.Context, name string) (bool, error) {
}

// Upload writes the file specified in src to into the memory.
func (b *Bucket) Upload(ctx context.Context, name string, r io.Reader) (err error) {
func (b *Bucket) Upload(ctx context.Context, name string, r io.Reader, _ ...objstore.ObjectUploadOption) (err error) {
if ctx.Err() != nil {
return ctx.Err()
}
Expand Down
4 changes: 3 additions & 1 deletion providers/gcs/gcs.go
Original file line number Diff line number Diff line change
Expand Up @@ -332,13 +332,15 @@ func (b *Bucket) Exists(ctx context.Context, name string) (bool, error) {
}

// Upload writes the file specified in src to remote GCS location specified as target.
func (b *Bucket) Upload(ctx context.Context, name string, r io.Reader) error {
func (b *Bucket) Upload(ctx context.Context, name string, r io.Reader, opts ...objstore.ObjectUploadOption) error {
w := b.bkt.Object(name).NewWriter(ctx)

uploadOpts := objstore.ApplyObjectUploadOptions(opts...)
// if `chunkSize` is 0, we don't set any custom value for writer's ChunkSize.
// It uses whatever the default value https://pkg.go.dev/google.golang.org/cloud/storage#Writer
if b.chunkSize > 0 {
w.ChunkSize = b.chunkSize
w.ContentType = uploadOpts.ContentType
}

if _, err := io.Copy(w, r); err != nil {
Expand Down
14 changes: 9 additions & 5 deletions providers/obs/obs.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@ func (b *Bucket) Delete(ctx context.Context, name string) error {
}

// Upload the contents of the reader as an object into the bucket.
func (b *Bucket) Upload(ctx context.Context, name string, r io.Reader) error {
func (b *Bucket) Upload(ctx context.Context, name string, r io.Reader, opts ...objstore.ObjectUploadOption) error {
size, err := objstore.TryToGetSize(r)

if err != nil {
Expand All @@ -147,14 +147,16 @@ func (b *Bucket) Upload(ctx context.Context, name string, r io.Reader) error {
if size <= 0 {
return errors.New("object size must be provided")
}

uploadOpts := objstore.ApplyObjectUploadOptions(opts...)
if size <= MinMultipartUploadSize {
err = b.putObjectSingle(name, r)
err = b.putObjectSingle(name, r, uploadOpts)
if err != nil {
return err
}
} else {
var initOutput *obs.InitiateMultipartUploadOutput
initOutput, err = b.initiateMultipartUpload(name)
initOutput, err = b.initiateMultipartUpload(name, uploadOpts)
if err != nil {
return err
}
Expand Down Expand Up @@ -190,22 +192,24 @@ func (b *Bucket) Upload(ctx context.Context, name string, r io.Reader) error {
return nil
}

func (b *Bucket) putObjectSingle(key string, body io.Reader) error {
func (b *Bucket) putObjectSingle(key string, body io.Reader, opts objstore.UploadObjectParams) error {
input := &obs.PutObjectInput{}
input.Bucket = b.name
input.Key = key
input.Body = body
input.ContentType = opts.ContentType
_, err := b.client.PutObject(input)
if err != nil {
return errors.Wrap(err, "failed to upload object")
}
return nil
}

func (b *Bucket) initiateMultipartUpload(key string) (output *obs.InitiateMultipartUploadOutput, err error) {
func (b *Bucket) initiateMultipartUpload(key string, opts objstore.UploadObjectParams) (output *obs.InitiateMultipartUploadOutput, err error) {
initInput := &obs.InitiateMultipartUploadInput{}
initInput.Bucket = b.name
initInput.Key = key
initInput.ContentType = opts.ContentType
initOutput, err := b.client.InitiateMultipartUpload(initInput)
if err != nil {
return nil, errors.Wrap(err, "failed to init multipart upload job")
Expand Down
7 changes: 6 additions & 1 deletion providers/oci/oci.go
Original file line number Diff line number Diff line change
Expand Up @@ -196,7 +196,7 @@ func (b *Bucket) GetRange(ctx context.Context, name string, offset, length int64

// Upload the contents of the reader as an object into the bucket.
// Upload should be idempotent.
func (b *Bucket) Upload(ctx context.Context, name string, r io.Reader) (err error) {
func (b *Bucket) Upload(ctx context.Context, name string, r io.Reader, opts ...objstore.ObjectUploadOption) (err error) {
req := transfer.UploadStreamRequest{
UploadRequest: transfer.UploadRequest{
NamespaceName: common.String(b.namespace),
Expand All @@ -212,6 +212,11 @@ func (b *Bucket) Upload(ctx context.Context, name string, r io.Reader) (err erro
req.UploadRequest.PartSize = &b.partSize
}

uploadOptions := objstore.ApplyObjectUploadOptions(opts...)
if uploadOptions.ContentType != "" {
req.UploadRequest.ContentType = &uploadOptions.ContentType
}

uploadManager := transfer.NewUploadManager()
_, err = uploadManager.UploadStream(ctx, req)

Expand Down
8 changes: 5 additions & 3 deletions providers/oss/oss.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,24 +71,26 @@ func NewTestBucket(t testing.TB) (objstore.Bucket, func(), error) {
func (b *Bucket) Provider() objstore.ObjProvider { return objstore.ALIYUNOSS }

// Upload the contents of the reader as an object into the bucket.
func (b *Bucket) Upload(_ context.Context, name string, r io.Reader) error {
func (b *Bucket) Upload(_ context.Context, name string, r io.Reader, opts ...objstore.ObjectUploadOption) error {
// TODO(https://github.com/thanos-io/thanos/issues/678): Remove guessing length when minio provider will support multipart upload without this.
size, err := objstore.TryToGetSize(r)
if err != nil {
return errors.Wrapf(err, "failed to get size apriori to upload %s", name)
}

uploadOpts := objstore.ApplyObjectUploadOptions(opts...)

chunksnum, lastslice := int(math.Floor(float64(size)/PartSize)), size%PartSize

ncloser := io.NopCloser(r)
switch chunksnum {
case 0:
if err := b.bucket.PutObject(name, ncloser); err != nil {
if err := b.bucket.PutObject(name, ncloser, oss.ContentType(uploadOpts.ContentType)); err != nil {
return errors.Wrap(err, "failed to upload oss object")
}
default:
{
init, err := b.bucket.InitiateMultipartUpload(name)
init, err := b.bucket.InitiateMultipartUpload(name, oss.ContentType(uploadOpts.ContentType))
if err != nil {
return errors.Wrap(err, "failed to initiate multi-part upload")
}
Expand Down
7 changes: 5 additions & 2 deletions providers/s3/s3.go
Original file line number Diff line number Diff line change
Expand Up @@ -532,7 +532,7 @@ func (b *Bucket) Exists(ctx context.Context, name string) (bool, error) {
}

// Upload the contents of the reader as an object into the bucket.
func (b *Bucket) Upload(ctx context.Context, name string, r io.Reader) error {
func (b *Bucket) Upload(ctx context.Context, name string, r io.Reader, opts ...objstore.ObjectUploadOption) error {
sse, err := b.getServerSideEncryption(ctx)
if err != nil {
return err
Expand All @@ -556,6 +556,8 @@ func (b *Bucket) Upload(ctx context.Context, name string, r io.Reader) error {
userMetadata[k] = v
}

uploadOpts := objstore.ApplyObjectUploadOptions(opts...)

if _, err := b.client.PutObject(
ctx,
b.name,
Expand All @@ -572,7 +574,8 @@ func (b *Bucket) Upload(ctx context.Context, name string, r io.Reader) error {
// 4 is what minio-go have as the default. To be certain we do micro benchmark before any changes we
// ensure we pin this number to four.
// TODO(bwplotka): Consider adjusting this number to GOMAXPROCS or to expose this in config if it becomes bottleneck.
NumThreads: 4,
NumThreads: 4,
ContentType: uploadOpts.ContentType,
},
); err != nil {
return errors.Wrap(err, "upload s3 object")
Expand Down
8 changes: 6 additions & 2 deletions providers/swift/swift.go
Original file line number Diff line number Diff line change
Expand Up @@ -338,13 +338,16 @@ func (c *Container) IsAccessDeniedErr(err error) bool {
}

// Upload writes the contents of the reader as an object into the container.
func (c *Container) Upload(_ context.Context, name string, r io.Reader) (err error) {
func (c *Container) Upload(_ context.Context, name string, r io.Reader, opts ...objstore.ObjectUploadOption) (err error) {
size, err := objstore.TryToGetSize(r)
if err != nil {
level.Warn(c.logger).Log("msg", "could not guess file size, using large object to avoid issues if the file is larger than limit", "name", name, "err", err)
// Anything higher or equal to chunk size so the SLO is used.
size = c.chunkSize
}

uploadOpts := objstore.ApplyObjectUploadOptions(opts...)

var file io.WriteCloser
if size >= c.chunkSize {
opts := swift.LargeObjectOpts{
Expand All @@ -353,6 +356,7 @@ func (c *Container) Upload(_ context.Context, name string, r io.Reader) (err err
ChunkSize: c.chunkSize,
SegmentContainer: c.segmentsContainer,
CheckHash: true,
ContentType: uploadOpts.ContentType,
}
if c.useDynamicLargeObjects {
if file, err = c.connection.DynamicLargeObjectCreateFile(&opts); err != nil {
Expand All @@ -364,7 +368,7 @@ func (c *Container) Upload(_ context.Context, name string, r io.Reader) (err err
}
}
} else {
if file, err = c.connection.ObjectCreate(c.name, name, true, "", "", swift.Headers{}); err != nil {
if file, err = c.connection.ObjectCreate(c.name, name, true, "", uploadOpts.ContentType, swift.Headers{}); err != nil {
return errors.Wrap(err, "create file")
}
}
Expand Down
4 changes: 2 additions & 2 deletions testing.go
Original file line number Diff line number Diff line change
Expand Up @@ -316,9 +316,9 @@ func (d *delayingBucket) Exists(ctx context.Context, name string) (bool, error)
return d.bkt.Exists(ctx, name)
}

func (d *delayingBucket) Upload(ctx context.Context, name string, r io.Reader) error {
func (d *delayingBucket) Upload(ctx context.Context, name string, r io.Reader, opts ...ObjectUploadOption) error {
time.Sleep(d.delay)
return d.bkt.Upload(ctx, name, r)
return d.bkt.Upload(ctx, name, r, opts...)
}

func (d *delayingBucket) Delete(ctx context.Context, name string) error {
Expand Down
Loading
Loading