Skip to content

Commit

Permalink
objectstore/cos: support multi-part upload (thanos-io#5137)
Browse files Browse the repository at this point in the history
Signed-off-by: Jimmie Han <[email protected]>
  • Loading branch information
hanjm committed Feb 10, 2022
1 parent c038534 commit b654c95
Showing 1 changed file with 79 additions and 2 deletions.
81 changes: 79 additions & 2 deletions pkg/objstore/cos/cos.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"context"
"fmt"
"io"
"math"
"math/rand"
"net/http"
"net/url"
Expand Down Expand Up @@ -194,10 +195,86 @@ func (b *Bucket) Attributes(ctx context.Context, name string) (objstore.ObjectAt
}, nil
}

var (
_ cos.FixedLengthReader = (*fixedLengthReader)(nil)
)

type fixedLengthReader struct {
io.Reader
size int64
}

func newFixedLengthReader(r io.Reader, size int64) io.Reader {
return fixedLengthReader{
Reader: io.LimitReader(r, size),
size: size,
}
}

// Size implement cos.FixedLengthReader interface.
func (r fixedLengthReader) Size() int64 {
return r.size
}

// Upload the contents of the reader as an object into the bucket.
func (b *Bucket) Upload(ctx context.Context, name string, r io.Reader) error {
if _, err := b.client.Object.Put(ctx, name, r, nil); err != nil {
return errors.Wrap(err, "upload cos object")
size, err := objstore.TryToGetSize(r)
if err != nil {
return errors.Wrapf(err, "getting size of %s", name)
}
// partSize 128MB.
const partSize = 1024 * 1024 * 128
partNums, lastSlice := int(math.Floor(float64(size)/partSize)), size%partSize
if partNums == 0 {
if _, err := b.client.Object.Put(ctx, name, r, nil); err != nil {
return errors.Wrapf(err, "Put object: %s", name)
}
return nil
}
// 1. init.
result, _, err := b.client.Object.InitiateMultipartUpload(ctx, name, nil)
if err != nil {
return errors.Wrapf(err, "InitiateMultipartUpload %s", name)
}
uploadEveryPart := func(partSize int64, part int, uploadID string) (string, error) {
r := newFixedLengthReader(r, partSize)
resp, err := b.client.Object.UploadPart(ctx, name, uploadID, part, r, &cos.ObjectUploadPartOptions{
ContentLength: partSize,
})
if err != nil {
if _, err := b.client.Object.AbortMultipartUpload(ctx, name, result.UploadID); err != nil {
return "", err
}
return "", err
}
etag := resp.Header.Get("ETag")
return etag, nil
}
optcom := &cos.CompleteMultipartUploadOptions{}
// 2. upload parts.
for part := 1; part <= partNums; part++ {
etag, err := uploadEveryPart(partSize, part, result.UploadID)
if err != nil {
return errors.Wrapf(err, "uploadPart %d, %s", part, name)
}
optcom.Parts = append(optcom.Parts, cos.Object{
PartNumber: part, ETag: etag},
)
}
// 3. upload last part.
if lastSlice != 0 {
part := partNums + 1
etag, err := uploadEveryPart(lastSlice, part, result.UploadID)
if err != nil {
return errors.Wrapf(err, "uploadPart %d, %s", part, name)
}
optcom.Parts = append(optcom.Parts, cos.Object{
PartNumber: part, ETag: etag},
)
}
// 4. complete.
if _, _, err := b.client.Object.CompleteMultipartUpload(ctx, name, result.UploadID, optcom); err != nil {
return errors.Wrapf(err, "CompleteMultipartUpload %s", name)
}
return nil
}
Expand Down

0 comments on commit b654c95

Please sign in to comment.