Skip to content

Commit

Permalink
Add parallel buffered multipart upload (#1745)
Browse files Browse the repository at this point in the history
Add multipart upload mode that will buffer a number of parts in memory 
and fill/upload them in parallel.

Use `ConcurrentStreamParts` to enable and `NumThreads` to control the 
number of buffers. `PartSize` will control the size of each buffer.
  • Loading branch information
klauspost authored Jan 6, 2023
1 parent 4eab739 commit 2f6e915
Show file tree
Hide file tree
Showing 2 changed files with 216 additions and 2 deletions.
208 changes: 207 additions & 1 deletion api-put-object-streaming.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
"net/url"
"sort"
"strings"
"sync"

"github.com/google/uuid"
"github.com/minio/minio-go/v7/pkg/s3utils"
Expand All @@ -44,7 +45,9 @@ import (
func (c *Client) putObjectMultipartStream(ctx context.Context, bucketName, objectName string,
reader io.Reader, size int64, opts PutObjectOptions,
) (info UploadInfo, err error) {
if !isObject(reader) && isReadAt(reader) && !opts.SendContentMd5 {
if opts.ConcurrentStreamParts && opts.NumThreads > 1 {
info, err = c.putObjectMultipartStreamParallel(ctx, bucketName, objectName, reader, opts)
} else if !isObject(reader) && isReadAt(reader) && !opts.SendContentMd5 {
// Verify if the reader implements ReadAt and it is not a *minio.Object then we will use parallel uploader.
info, err = c.putObjectMultipartStreamFromReadAt(ctx, bucketName, objectName, reader.(io.ReaderAt), size, opts)
} else {
Expand Down Expand Up @@ -446,6 +449,209 @@ func (c *Client) putObjectMultipartStreamOptionalChecksum(ctx context.Context, b
return uploadInfo, nil
}

// putObjectMultipartStreamParallel uploads opts.NumThreads parts in parallel.
// This is expected to take opts.PartSize * opts.NumThreads * (GOGC / 100) bytes of buffer.
func (c *Client) putObjectMultipartStreamParallel(ctx context.Context, bucketName, objectName string,
reader io.Reader, opts PutObjectOptions) (info UploadInfo, err error) {
// Input validation.
if err = s3utils.CheckValidBucketName(bucketName); err != nil {
return UploadInfo{}, err
}

if err = s3utils.CheckValidObjectName(objectName); err != nil {
return UploadInfo{}, err
}

if !opts.SendContentMd5 {
if opts.UserMetadata == nil {
opts.UserMetadata = make(map[string]string, 1)
}
opts.UserMetadata["X-Amz-Checksum-Algorithm"] = "CRC32C"
}

// Cancel all when an error occurs.
ctx, cancel := context.WithCancel(ctx)
defer cancel()

// Calculate the optimal parts info for a given size.
totalPartsCount, partSize, _, err := OptimalPartInfo(-1, opts.PartSize)
if err != nil {
return UploadInfo{}, err
}

// Initiates a new multipart request
uploadID, err := c.newUploadID(ctx, bucketName, objectName, opts)
if err != nil {
return UploadInfo{}, err
}
delete(opts.UserMetadata, "X-Amz-Checksum-Algorithm")

// Aborts the multipart upload if the function returns
// any error, since we do not resume we should purge
// the parts which have been uploaded to relinquish
// storage space.
defer func() {
if err != nil {
c.abortMultipartUpload(ctx, bucketName, objectName, uploadID)
}
}()

// Create checksums
// CRC32C is ~50% faster on AMD64 @ 30GB/s
var crcBytes []byte
crc := crc32.New(crc32.MakeTable(crc32.Castagnoli))
md5Hash := c.md5Hasher()
defer md5Hash.Close()

// Total data read and written to server. should be equal to 'size' at the end of the call.
var totalUploadedSize int64

// Initialize parts uploaded map.
partsInfo := make(map[int]ObjectPart)

// Create a buffer.
nBuffers := int64(opts.NumThreads)
bufs := make(chan []byte, nBuffers)
all := make([]byte, nBuffers*partSize)
for i := int64(0); i < nBuffers; i++ {
bufs <- all[i*partSize : i*partSize+partSize]
}

var wg sync.WaitGroup
var mu sync.Mutex
errCh := make(chan error, opts.NumThreads)

reader = newHook(reader, opts.Progress)

// Part number always starts with '1'.
var partNumber int
for partNumber = 1; partNumber <= totalPartsCount; partNumber++ {
// Proceed to upload the part.
var buf []byte
select {
case buf = <-bufs:
case err = <-errCh:
cancel()
wg.Wait()
return UploadInfo{}, err
}

if int64(len(buf)) != partSize {
return UploadInfo{}, fmt.Errorf("read buffer < %d than expected partSize: %d", len(buf), partSize)
}

length, rerr := readFull(reader, buf)
if rerr == io.EOF && partNumber > 1 {
// Done
break
}

if rerr != nil && rerr != io.ErrUnexpectedEOF && err != io.EOF {
cancel()
wg.Wait()
return UploadInfo{}, rerr
}

// Calculate md5sum.
customHeader := make(http.Header)
if !opts.SendContentMd5 {
// Add CRC32C instead.
crc.Reset()
crc.Write(buf[:length])
cSum := crc.Sum(nil)
customHeader.Set("x-amz-checksum-crc32c", base64.StdEncoding.EncodeToString(cSum))
crcBytes = append(crcBytes, cSum...)
}

wg.Add(1)
go func(partNumber int) {
// Avoid declaring variables in the for loop
var md5Base64 string

if opts.SendContentMd5 {
md5Hash.Reset()
md5Hash.Write(buf[:length])
md5Base64 = base64.StdEncoding.EncodeToString(md5Hash.Sum(nil))
}

defer wg.Done()
p := uploadPartParams{
bucketName: bucketName,
objectName: objectName,
uploadID: uploadID,
reader: bytes.NewReader(buf[:length]),
partNumber: partNumber,
md5Base64: md5Base64,
size: int64(length),
sse: opts.ServerSideEncryption,
streamSha256: !opts.DisableContentSha256,
customHeader: customHeader,
}
objPart, uerr := c.uploadPart(ctx, p)
if uerr != nil {
errCh <- uerr
}

// Save successfully uploaded part metadata.
mu.Lock()
partsInfo[partNumber] = objPart
mu.Unlock()

// Send buffer back so it can be reused.
bufs <- buf
}(partNumber)

// Save successfully uploaded size.
totalUploadedSize += int64(length)
}
wg.Wait()

// Collect any error
select {
case err = <-errCh:
return UploadInfo{}, err
default:
}

// Complete multipart upload.
var complMultipartUpload completeMultipartUpload

// Loop over total uploaded parts to save them in
// Parts array before completing the multipart request.
for i := 1; i < partNumber; i++ {
part, ok := partsInfo[i]
if !ok {
return UploadInfo{}, errInvalidArgument(fmt.Sprintf("Missing part number %d", i))
}
complMultipartUpload.Parts = append(complMultipartUpload.Parts, CompletePart{
ETag: part.ETag,
PartNumber: part.PartNumber,
ChecksumCRC32: part.ChecksumCRC32,
ChecksumCRC32C: part.ChecksumCRC32C,
ChecksumSHA1: part.ChecksumSHA1,
ChecksumSHA256: part.ChecksumSHA256,
})
}

// Sort all completed parts.
sort.Sort(completedParts(complMultipartUpload.Parts))

opts = PutObjectOptions{}
if len(crcBytes) > 0 {
// Add hash of hashes.
crc.Reset()
crc.Write(crcBytes)
opts.UserMetadata = map[string]string{"X-Amz-Checksum-Crc32c": base64.StdEncoding.EncodeToString(crc.Sum(nil))}
}
uploadInfo, err := c.completeMultipartUpload(ctx, bucketName, objectName, uploadID, complMultipartUpload, opts)
if err != nil {
return UploadInfo{}, err
}

uploadInfo.Size = totalUploadedSize
return uploadInfo, nil
}

// putObject special function used Google Cloud Storage. This special function
// is used for Google Cloud Storage since Google's multipart API is not S3 compatible.
func (c *Client) putObject(ctx context.Context, bucketName, objectName string, reader io.Reader, size int64, opts PutObjectOptions) (info UploadInfo, err error) {
Expand Down
10 changes: 9 additions & 1 deletion api-put-object.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,12 @@ type PutObjectOptions struct {
SendContentMd5 bool
DisableContentSha256 bool
DisableMultipart bool
Internal AdvancedPutOptions

// ConcurrentStreamParts will create NumThreads buffers of PartSize bytes,
// fill them serially and upload them in parallel.
// This can be used for faster uploads on non-seekable or slow-to-seek input.
ConcurrentStreamParts bool
Internal AdvancedPutOptions
}

// getNumThreads - gets the number of threads to be used in the multipart
Expand Down Expand Up @@ -272,6 +277,9 @@ func (c *Client) putObjectCommon(ctx context.Context, bucketName, objectName str
if opts.DisableMultipart {
return UploadInfo{}, errors.New("no length provided and multipart disabled")
}
if opts.ConcurrentStreamParts && opts.NumThreads > 1 {
return c.putObjectMultipartStreamParallel(ctx, bucketName, objectName, reader, opts)
}
return c.putObjectMultipartStreamNoLength(ctx, bucketName, objectName, reader, opts)
}

Expand Down

0 comments on commit 2f6e915

Please sign in to comment.