From 6de7f9f3e412432f2ac93bb99a889334760c4308 Mon Sep 17 00:00:00 2001 From: Harshavardhana Date: Fri, 7 Jul 2017 15:41:09 -0700 Subject: [PATCH] api: Optimize multipart upload memory usage for unknown sized stream Fixes #730 --- api-put-object-encrypted.go | 2 +- api-put-object-multipart.go | 39 +++++++++++++++-------- api-put-object.go | 29 +++++++++-------- capped-writer.go | 49 +++++++++++++++++++++++++++++ functional_tests.go | 63 +++++++++++++++++++++++++++++++++++++ 5 files changed, 155 insertions(+), 27 deletions(-) create mode 100644 capped-writer.go diff --git a/api-put-object-encrypted.go b/api-put-object-encrypted.go index 141b3e91cb..84553d670c 100644 --- a/api-put-object-encrypted.go +++ b/api-put-object-encrypted.go @@ -42,5 +42,5 @@ func (c Client) PutEncryptedObject(bucketName, objectName string, reader io.Read metadata[amzHeaderKey] = []string{encryptMaterials.GetKey()} metadata[amzHeaderMatDesc] = []string{encryptMaterials.GetDesc()} - return c.putObjectMultipart(bucketName, objectName, encryptMaterials, -1, metadata, progress) + return c.putObjectMultipartStreamNoLength(bucketName, objectName, encryptMaterials, -1, metadata, progress) } diff --git a/api-put-object-multipart.go b/api-put-object-multipart.go index 1938378f88..8b65545deb 100644 --- a/api-put-object-multipart.go +++ b/api-put-object-multipart.go @@ -27,6 +27,7 @@ import ( "sort" "strconv" "strings" + "sync" "github.com/minio/minio-go/pkg/s3utils" ) @@ -50,6 +51,16 @@ func (c Client) putObjectMultipart(bucketName, objectName string, reader io.Read return n, err } +// Pool to manage re-usable memory for upload objects +// with streams with unknown size. +var bufPool = sync.Pool{ + New: func() interface{} { + _, partSize, _, _ := optimalPartInfo(-1) + b := make([]byte, partSize) + return &b + }, +} + func (c Client) putObjectMultipartNoStream(bucketName, objectName string, reader io.Reader, size int64, metadata map[string][]string, progress io.Reader) (n int64, err error) { // Input validation. @@ -88,9 +99,6 @@ func (c Client) putObjectMultipartNoStream(bucketName, objectName string, reader // Part number always starts with '1'. partNumber := 1 - // Initialize a temporary buffer. - tmpBuffer := new(bytes.Buffer) - // Initialize parts uploaded map. partsInfo := make(map[int]ObjectPart) @@ -100,39 +108,44 @@ func (c Client) putObjectMultipartNoStream(bucketName, objectName string, reader // HTTPS connection. hashAlgos, hashSums := c.hashMaterials() - // Calculates hash sums while copying partSize bytes into tmpBuffer. - prtSize, rErr := hashCopyN(hashAlgos, hashSums, tmpBuffer, reader, partSize) + bufp := bufPool.Get().(*[]byte) + cw := &cappedWriter{ + buffer: *bufp, + cap: int64(cap(*bufp)), + } + + // Calculates hash sums while copying partSize bytes into cw. + prtSize, rErr := hashCopyN(hashAlgos, hashSums, cw, reader, partSize) if rErr != nil && rErr != io.EOF { + bufPool.Put(bufp) return 0, rErr } - var reader io.Reader // Update progress reader appropriately to the latest offset // as we read from the source. - reader = newHook(tmpBuffer, progress) + rd := newHook(bytes.NewReader(cw.GetBytes(prtSize)), progress) // Proceed to upload the part. var objPart ObjectPart - objPart, err = c.uploadPart(bucketName, objectName, uploadID, reader, partNumber, + objPart, err = c.uploadPart(bucketName, objectName, uploadID, rd, partNumber, hashSums["md5"], hashSums["sha256"], prtSize, metadata) if err != nil { - // Reset the temporary buffer upon any error. - tmpBuffer.Reset() + bufPool.Put(bufp) return totalUploadedSize, err } // Save successfully uploaded part metadata. partsInfo[partNumber] = objPart - // Reset the temporary buffer. - tmpBuffer.Reset() - // Save successfully uploaded size. totalUploadedSize += prtSize // Increment part number. partNumber++ + // Put back data into bufpool. + bufPool.Put(bufp) + // For unknown size, Read EOF we break away. // We do not have to upload till totalPartsCount. if size < 0 && rErr == io.EOF { diff --git a/api-put-object.go b/api-put-object.go index a1547d5b55..8de0c965e8 100644 --- a/api-put-object.go +++ b/api-put-object.go @@ -180,6 +180,7 @@ func (c Client) PutObjectWithProgress(bucketName, objectName string, reader io.R if err != nil { return 0, err } + return c.putObjectCommon(bucketName, objectName, reader, size, metadata, progress) } @@ -252,46 +253,48 @@ func (c Client) putObjectMultipartStreamNoLength(bucketName, objectName string, // Part number always starts with '1'. partNumber := 1 - // Initialize a temporary buffer. - tmpBuffer := new(bytes.Buffer) - // Initialize parts uploaded map. partsInfo := make(map[int]ObjectPart) for partNumber <= totalPartsCount { - // Calculates hash sums while copying partSize bytes into tmpBuffer. - prtSize, rErr := io.CopyN(tmpBuffer, reader, partSize) + bufp := bufPool.Get().(*[]byte) + cw := &cappedWriter{ + buffer: *bufp, + cap: int64(cap(*bufp)), + } + + // Copies partSize bytes into tmpBuffer. + prtSize, rErr := io.CopyN(cw, reader, partSize) if rErr != nil && rErr != io.EOF { + bufPool.Put(bufp) return 0, rErr } - var reader io.Reader // Update progress reader appropriately to the latest offset // as we read from the source. - reader = newHook(tmpBuffer, progress) + rd := newHook(bytes.NewReader(cw.GetBytes(prtSize)), progress) // Proceed to upload the part. var objPart ObjectPart - objPart, err = c.uploadPart(bucketName, objectName, uploadID, reader, partNumber, + objPart, err = c.uploadPart(bucketName, objectName, uploadID, rd, partNumber, nil, nil, prtSize, metadata) if err != nil { - // Reset the temporary buffer upon any error. - tmpBuffer.Reset() + bufPool.Put(bufp) return totalUploadedSize, err } // Save successfully uploaded part metadata. partsInfo[partNumber] = objPart - // Reset the temporary buffer. - tmpBuffer.Reset() - // Save successfully uploaded size. totalUploadedSize += prtSize // Increment part number. partNumber++ + // Put back data into bufpool. + bufPool.Put(bufp) + // For unknown size, Read EOF we break away. // We do not have to upload till totalPartsCount. if size < 0 && rErr == io.EOF { diff --git a/capped-writer.go b/capped-writer.go new file mode 100644 index 0000000000..186a245a98 --- /dev/null +++ b/capped-writer.go @@ -0,0 +1,49 @@ +/* + * Minio Cloud Storage, (C) 2016 Minio, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package minio + +import ( + "errors" +) + +// Used for adding entry to the object cache. +// Implements io.WriteCloser +type cappedWriter struct { + offset int64 + cap int64 + buffer []byte +} + +// Write implements a limited writer, returns error. +// if the writes go beyond allocated size. +func (c *cappedWriter) Write(b []byte) (n int, err error) { + if c.offset+int64(len(b)) > c.cap { + return 0, errors.New("excess data") + } + n = copy(c.buffer[int(c.offset):int(c.offset)+len(b)], b) + c.offset = c.offset + int64(n) + return n, nil +} + +func (c *cappedWriter) Len() int { + return len(c.buffer) +} + +func (c *cappedWriter) GetBytes(offset int64) []byte { + return c.buffer[:offset] +} diff --git a/functional_tests.go b/functional_tests.go index 6b653ab5f4..3d16da4fec 100644 --- a/functional_tests.go +++ b/functional_tests.go @@ -3609,6 +3609,68 @@ func testUserMetadataCopyingV2() { testUserMetadataCopyingWrapper(c) } +// Test put object with size -1 byte object. +func testPutObjectNoLengthV2() { + logger().Info() + + // Seed random based on current time. + rand.Seed(time.Now().Unix()) + + // Instantiate new minio client object. + c, err := minio.NewV2( + os.Getenv(serverEndpoint), + os.Getenv(accessKey), + os.Getenv(secretKey), + mustParseBool(os.Getenv(enableHTTPS)), + ) + if err != nil { + log.Fatal("Error:", err) + } + + // Enable tracing, write to stderr. + // c.TraceOn(os.Stderr) + + // Set user agent. + c.SetAppInfo("Minio-go-FunctionalTest", "0.1.0") + + // Generate a new random bucket name. + bucketName := randString(60, rand.NewSource(time.Now().UnixNano()), + "minio-go-test") + + // Make a new bucket. + err = c.MakeBucket(bucketName, "us-east-1") + if err != nil { + log.Fatal("Error:", err, bucketName) + } + + objectName := bucketName + "unique" + + // Generate data using 4 parts so that all 3 'workers' are utilized and a part is leftover. + // Use different data for each part for multipart tests to ensure part order at the end. + var buf = getDataBuffer("datafile-65-MB", MinPartSize) + + // Upload an object. + n, err := c.PutObjectWithSize(bucketName, objectName, bytes.NewReader(buf), -1, nil, nil) + if err != nil { + log.Fatalf("Error: %v %s %s", err, bucketName, objectName) + } + if n != int64(len(buf)) { + log.Error(fmt.Errorf("Expected upload object size %d but got %d", len(buf), n)) + } + + // Remove the object. + err = c.RemoveObject(bucketName, objectName) + if err != nil { + log.Fatal("Error:", err) + } + + // Remove the bucket. + err = c.RemoveBucket(bucketName) + if err != nil { + log.Fatal("Error:", err) + } +} + // Test put object with 0 byte object. func testPutObject0ByteV2() { logger().Info() @@ -4023,6 +4085,7 @@ func main() { testEncryptedCopyObjectV2() testUserMetadataCopyingV2() testPutObject0ByteV2() + testPutObjectNoLengthV2() testMakeBucketError() testMakeBucketRegions() testPutObjectWithMetadata()