Skip to content

Commit

Permalink
api: Optimize multipart upload memory usage for unknown sized stream
Browse files Browse the repository at this point in the history
Fixes #730
  • Loading branch information
harshavardhana authored and minio-trusted committed Aug 2, 2017
1 parent d2e3c5c commit 4591f13
Show file tree
Hide file tree
Showing 5 changed files with 121 additions and 80 deletions.
23 changes: 0 additions & 23 deletions api-put-object-common.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
package minio

import (
"hash"
"io"
"math"
"os"
Expand Down Expand Up @@ -76,28 +75,6 @@ func optimalPartInfo(objectSize int64) (totalPartsCount int, partSize int64, las
return totalPartsCount, partSize, lastPartSize, nil
}

// hashCopyN - Calculates chosen hashes up to partSize amount of bytes.
func hashCopyN(hashAlgorithms map[string]hash.Hash, hashSums map[string][]byte, writer io.Writer, reader io.Reader, partSize int64) (size int64, err error) {
hashWriter := writer
for _, v := range hashAlgorithms {
hashWriter = io.MultiWriter(hashWriter, v)
}

// Copies to input at writer.
size, err = io.CopyN(hashWriter, reader, partSize)
if err != nil {
// If not EOF return error right here.
if err != io.EOF {
return 0, err
}
}

for k, v := range hashAlgorithms {
hashSums[k] = v.Sum(nil)
}
return size, err
}

// getUploadID - fetch upload id if already present for an object name
// or initiate a new request to fetch a new upload id.
func (c Client) newUploadID(bucketName, objectName string, metaData map[string][]string) (uploadID string, err error) {
Expand Down
2 changes: 1 addition & 1 deletion api-put-object-encrypted.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,5 +42,5 @@ func (c Client) PutEncryptedObject(bucketName, objectName string, reader io.Read
metadata[amzHeaderKey] = []string{encryptMaterials.GetKey()}
metadata[amzHeaderMatDesc] = []string{encryptMaterials.GetDesc()}

return c.putObjectMultipart(bucketName, objectName, encryptMaterials, -1, metadata, progress)
return c.putObjectMultipartStreamNoLength(bucketName, objectName, encryptMaterials, metadata, progress)
}
64 changes: 36 additions & 28 deletions api-put-object-multipart.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,13 +27,14 @@ import (
"sort"
"strconv"
"strings"
"sync"

"github.com/minio/minio-go/pkg/s3utils"
)

func (c Client) putObjectMultipart(bucketName, objectName string, reader io.Reader, size int64,
metadata map[string][]string, progress io.Reader) (n int64, err error) {
n, err = c.putObjectMultipartNoStream(bucketName, objectName, reader, size, metadata, progress)
n, err = c.putObjectMultipartNoStream(bucketName, objectName, reader, metadata, progress)
if err != nil {
errResp := ToErrorResponse(err)
// Verify if multipart functionality is not available, if not
Expand All @@ -50,8 +51,17 @@ func (c Client) putObjectMultipart(bucketName, objectName string, reader io.Read
return n, err
}

func (c Client) putObjectMultipartNoStream(bucketName, objectName string, reader io.Reader, size int64,
metadata map[string][]string, progress io.Reader) (n int64, err error) {
// Pool to manage re-usable memory for upload objects
// with streams with unknown size.
var bufPool = sync.Pool{
New: func() interface{} {
_, partSize, _, _ := optimalPartInfo(-1)
b := make([]byte, partSize)
return &b
},
}

func (c Client) putObjectMultipartNoStream(bucketName, objectName string, reader io.Reader, metadata map[string][]string, progress io.Reader) (n int64, err error) {
// Input validation.
if err = s3utils.CheckValidBucketName(bucketName); err != nil {
return 0, err
Expand All @@ -68,7 +78,7 @@ func (c Client) putObjectMultipartNoStream(bucketName, objectName string, reader
var complMultipartUpload completeMultipartUpload

// Calculate the optimal parts info for a given size.
totalPartsCount, partSize, _, err := optimalPartInfo(size)
totalPartsCount, _, _, err := optimalPartInfo(-1)
if err != nil {
return 0, err
}
Expand All @@ -88,9 +98,6 @@ func (c Client) putObjectMultipartNoStream(bucketName, objectName string, reader
// Part number always starts with '1'.
partNumber := 1

// Initialize a temporary buffer.
tmpBuffer := new(bytes.Buffer)

// Initialize parts uploaded map.
partsInfo := make(map[int]ObjectPart)

Expand All @@ -100,53 +107,54 @@ func (c Client) putObjectMultipartNoStream(bucketName, objectName string, reader
// HTTPS connection.
hashAlgos, hashSums := c.hashMaterials()

// Calculates hash sums while copying partSize bytes into tmpBuffer.
prtSize, rErr := hashCopyN(hashAlgos, hashSums, tmpBuffer, reader, partSize)
if rErr != nil && rErr != io.EOF {
bufp := bufPool.Get().(*[]byte)
length, rErr := io.ReadFull(reader, *bufp)
if rErr == io.EOF {
break
}
if rErr != nil && rErr != io.ErrUnexpectedEOF {
bufPool.Put(bufp)
return 0, rErr
}

var reader io.Reader
// Calculates hash sums while copying partSize bytes into cw.
for k, v := range hashAlgos {
v.Write((*bufp)[:length])
hashSums[k] = v.Sum(nil)
}

// Update progress reader appropriately to the latest offset
// as we read from the source.
reader = newHook(tmpBuffer, progress)
rd := newHook(bytes.NewReader((*bufp)[:length]), progress)

// Proceed to upload the part.
var objPart ObjectPart
objPart, err = c.uploadPart(bucketName, objectName, uploadID, reader, partNumber,
hashSums["md5"], hashSums["sha256"], prtSize, metadata)
objPart, err = c.uploadPart(bucketName, objectName, uploadID, rd, partNumber,
hashSums["md5"], hashSums["sha256"], int64(length), metadata)
if err != nil {
// Reset the temporary buffer upon any error.
tmpBuffer.Reset()
bufPool.Put(bufp)
return totalUploadedSize, err
}

// Save successfully uploaded part metadata.
partsInfo[partNumber] = objPart

// Reset the temporary buffer.
tmpBuffer.Reset()

// Save successfully uploaded size.
totalUploadedSize += prtSize
totalUploadedSize += int64(length)

// Increment part number.
partNumber++

// Put back data into bufpool.
bufPool.Put(bufp)

// For unknown size, Read EOF we break away.
// We do not have to upload till totalPartsCount.
if size < 0 && rErr == io.EOF {
if rErr == io.EOF {
break
}
}

// Verify if we uploaded all the data.
if size > 0 {
if totalUploadedSize != size {
return totalUploadedSize, ErrUnexpectedEOF(totalUploadedSize, size, bucketName, objectName)
}
}

// Loop over total uploaded parts to save them in
// Parts array before completing the multipart request.
for i := 1; i < partNumber; i++ {
Expand Down
49 changes: 21 additions & 28 deletions api-put-object.go
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,7 @@ func (c Client) PutObjectWithProgress(bucketName, objectName string, reader io.R
if err != nil {
return 0, err
}

return c.putObjectCommon(bucketName, objectName, reader, size, metadata, progress)
}

Expand All @@ -203,7 +204,7 @@ func (c Client) putObjectCommon(bucketName, objectName string, reader io.Reader,
}

if size < 0 {
return c.putObjectMultipartStreamNoLength(bucketName, objectName, reader, size, metadata, progress)
return c.putObjectMultipartStreamNoLength(bucketName, objectName, reader, metadata, progress)
}

if size < minPartSize {
Expand All @@ -214,8 +215,8 @@ func (c Client) putObjectCommon(bucketName, objectName string, reader io.Reader,
return c.putObjectMultipartStream(bucketName, objectName, reader, size, metadata, progress)
}

func (c Client) putObjectMultipartStreamNoLength(bucketName, objectName string, reader io.Reader, size int64,
metadata map[string][]string, progress io.Reader) (n int64, err error) {
func (c Client) putObjectMultipartStreamNoLength(bucketName, objectName string, reader io.Reader, metadata map[string][]string,
progress io.Reader) (n int64, err error) {
// Input validation.
if err = s3utils.CheckValidBucketName(bucketName); err != nil {
return 0, err
Expand All @@ -232,7 +233,7 @@ func (c Client) putObjectMultipartStreamNoLength(bucketName, objectName string,
var complMultipartUpload completeMultipartUpload

// Calculate the optimal parts info for a given size.
totalPartsCount, partSize, _, err := optimalPartInfo(size)
totalPartsCount, _, _, err := optimalPartInfo(-1)
if err != nil {
return 0, err
}
Expand All @@ -252,60 +253,52 @@ func (c Client) putObjectMultipartStreamNoLength(bucketName, objectName string,
// Part number always starts with '1'.
partNumber := 1

// Initialize a temporary buffer.
tmpBuffer := new(bytes.Buffer)

// Initialize parts uploaded map.
partsInfo := make(map[int]ObjectPart)

for partNumber <= totalPartsCount {
// Calculates hash sums while copying partSize bytes into tmpBuffer.
prtSize, rErr := io.CopyN(tmpBuffer, reader, partSize)
if rErr != nil && rErr != io.EOF {
bufp := bufPool.Get().(*[]byte)
length, rErr := io.ReadFull(reader, *bufp)
if rErr == io.EOF {
break
}
if rErr != nil && rErr != io.ErrUnexpectedEOF {
bufPool.Put(bufp)
return 0, rErr
}

var reader io.Reader
// Update progress reader appropriately to the latest offset
// as we read from the source.
reader = newHook(tmpBuffer, progress)
rd := newHook(bytes.NewReader((*bufp)[:length]), progress)

// Proceed to upload the part.
var objPart ObjectPart
objPart, err = c.uploadPart(bucketName, objectName, uploadID, reader, partNumber,
nil, nil, prtSize, metadata)
objPart, err = c.uploadPart(bucketName, objectName, uploadID, rd, partNumber,
nil, nil, int64(length), metadata)
if err != nil {
// Reset the temporary buffer upon any error.
tmpBuffer.Reset()
bufPool.Put(bufp)
return totalUploadedSize, err
}

// Save successfully uploaded part metadata.
partsInfo[partNumber] = objPart

// Reset the temporary buffer.
tmpBuffer.Reset()

// Save successfully uploaded size.
totalUploadedSize += prtSize
totalUploadedSize += int64(length)

// Increment part number.
partNumber++

// Put back data into bufpool.
bufPool.Put(bufp)

// For unknown size, Read EOF we break away.
// We do not have to upload till totalPartsCount.
if size < 0 && rErr == io.EOF {
if rErr == io.EOF {
break
}
}

// Verify if we uploaded all the data.
if size > 0 {
if totalUploadedSize != size {
return totalUploadedSize, ErrUnexpectedEOF(totalUploadedSize, size, bucketName, objectName)
}
}

// Loop over total uploaded parts to save them in
// Parts array before completing the multipart request.
for i := 1; i < partNumber; i++ {
Expand Down
63 changes: 63 additions & 0 deletions functional_tests.go
Original file line number Diff line number Diff line change
Expand Up @@ -3609,6 +3609,68 @@ func testUserMetadataCopyingV2() {
testUserMetadataCopyingWrapper(c)
}

// Test put object with size -1 byte object.
func testPutObjectNoLengthV2() {
logger().Info()

// Seed random based on current time.
rand.Seed(time.Now().Unix())

// Instantiate new minio client object.
c, err := minio.NewV2(
os.Getenv(serverEndpoint),
os.Getenv(accessKey),
os.Getenv(secretKey),
mustParseBool(os.Getenv(enableHTTPS)),
)
if err != nil {
log.Fatal("Error:", err)
}

// Enable tracing, write to stderr.
// c.TraceOn(os.Stderr)

// Set user agent.
c.SetAppInfo("Minio-go-FunctionalTest", "0.1.0")

// Generate a new random bucket name.
bucketName := randString(60, rand.NewSource(time.Now().UnixNano()),
"minio-go-test")

// Make a new bucket.
err = c.MakeBucket(bucketName, "us-east-1")
if err != nil {
log.Fatal("Error:", err, bucketName)
}

objectName := bucketName + "unique"

// Generate data using 4 parts so that all 3 'workers' are utilized and a part is leftover.
// Use different data for each part for multipart tests to ensure part order at the end.
var buf = getDataBuffer("datafile-65-MB", MinPartSize)

// Upload an object.
n, err := c.PutObjectWithSize(bucketName, objectName, bytes.NewReader(buf), -1, nil, nil)
if err != nil {
log.Fatalf("Error: %v %s %s", err, bucketName, objectName)
}
if n != int64(len(buf)) {
log.Error(fmt.Errorf("Expected upload object size %d but got %d", len(buf), n))
}

// Remove the object.
err = c.RemoveObject(bucketName, objectName)
if err != nil {
log.Fatal("Error:", err)
}

// Remove the bucket.
err = c.RemoveBucket(bucketName)
if err != nil {
log.Fatal("Error:", err)
}
}

// Test put object with 0 byte object.
func testPutObject0ByteV2() {
logger().Info()
Expand Down Expand Up @@ -4023,6 +4085,7 @@ func main() {
testEncryptedCopyObjectV2()
testUserMetadataCopyingV2()
testPutObject0ByteV2()
testPutObjectNoLengthV2()
testMakeBucketError()
testMakeBucketRegions()
testPutObjectWithMetadata()
Expand Down

0 comments on commit 4591f13

Please sign in to comment.