From b654c954dde19e9dccf99d8d1bc4fb75bc90b680 Mon Sep 17 00:00:00 2001 From: Jimmie Han Date: Wed, 9 Feb 2022 21:42:23 +0800 Subject: [PATCH] objectstore/cos: support multi-part upload (#5137) Signed-off-by: Jimmie Han --- pkg/objstore/cos/cos.go | 81 ++++++++++++++++++++++++++++++++++++++++- 1 file changed, 79 insertions(+), 2 deletions(-) diff --git a/pkg/objstore/cos/cos.go b/pkg/objstore/cos/cos.go index 165f035476..a6478dddd9 100644 --- a/pkg/objstore/cos/cos.go +++ b/pkg/objstore/cos/cos.go @@ -7,6 +7,7 @@ import ( "context" "fmt" "io" + "math" "math/rand" "net/http" "net/url" @@ -194,10 +195,86 @@ func (b *Bucket) Attributes(ctx context.Context, name string) (objstore.ObjectAt }, nil } +var ( + _ cos.FixedLengthReader = (*fixedLengthReader)(nil) +) + +type fixedLengthReader struct { + io.Reader + size int64 +} + +func newFixedLengthReader(r io.Reader, size int64) io.Reader { + return fixedLengthReader{ + Reader: io.LimitReader(r, size), + size: size, + } +} + +// Size implement cos.FixedLengthReader interface. +func (r fixedLengthReader) Size() int64 { + return r.size +} + // Upload the contents of the reader as an object into the bucket. func (b *Bucket) Upload(ctx context.Context, name string, r io.Reader) error { - if _, err := b.client.Object.Put(ctx, name, r, nil); err != nil { - return errors.Wrap(err, "upload cos object") + size, err := objstore.TryToGetSize(r) + if err != nil { + return errors.Wrapf(err, "getting size of %s", name) + } + // partSize 128MB. + const partSize = 1024 * 1024 * 128 + partNums, lastSlice := int(math.Floor(float64(size)/partSize)), size%partSize + if partNums == 0 { + if _, err := b.client.Object.Put(ctx, name, r, nil); err != nil { + return errors.Wrapf(err, "Put object: %s", name) + } + return nil + } + // 1. init. + result, _, err := b.client.Object.InitiateMultipartUpload(ctx, name, nil) + if err != nil { + return errors.Wrapf(err, "InitiateMultipartUpload %s", name) + } + uploadEveryPart := func(partSize int64, part int, uploadID string) (string, error) { + r := newFixedLengthReader(r, partSize) + resp, err := b.client.Object.UploadPart(ctx, name, uploadID, part, r, &cos.ObjectUploadPartOptions{ + ContentLength: partSize, + }) + if err != nil { + if _, err := b.client.Object.AbortMultipartUpload(ctx, name, result.UploadID); err != nil { + return "", err + } + return "", err + } + etag := resp.Header.Get("ETag") + return etag, nil + } + optcom := &cos.CompleteMultipartUploadOptions{} + // 2. upload parts. + for part := 1; part <= partNums; part++ { + etag, err := uploadEveryPart(partSize, part, result.UploadID) + if err != nil { + return errors.Wrapf(err, "uploadPart %d, %s", part, name) + } + optcom.Parts = append(optcom.Parts, cos.Object{ + PartNumber: part, ETag: etag}, + ) + } + // 3. upload last part. + if lastSlice != 0 { + part := partNums + 1 + etag, err := uploadEveryPart(lastSlice, part, result.UploadID) + if err != nil { + return errors.Wrapf(err, "uploadPart %d, %s", part, name) + } + optcom.Parts = append(optcom.Parts, cos.Object{ + PartNumber: part, ETag: etag}, + ) + } + // 4. complete. + if _, _, err := b.client.Object.CompleteMultipartUpload(ctx, name, result.UploadID, optcom); err != nil { + return errors.Wrapf(err, "CompleteMultipartUpload %s", name) } return nil }