diff --git a/api-put-object-multipart.go b/api-put-object-multipart.go index 6e0015acc..aefeb5f26 100644 --- a/api-put-object-multipart.go +++ b/api-put-object-multipart.go @@ -24,10 +24,10 @@ import ( "io/ioutil" "net/http" "net/url" + "runtime/debug" "sort" "strconv" "strings" - "sync" "github.com/minio/minio-go/pkg/s3utils" ) @@ -51,16 +51,6 @@ func (c Client) putObjectMultipart(bucketName, objectName string, reader io.Read return n, err } -// Pool to manage re-usable memory for upload objects -// with streams with unknown size. -var bufPool = sync.Pool{ - New: func() interface{} { - _, partSize, _, _ := optimalPartInfo(-1) - b := make([]byte, partSize) - return &b - }, -} - func (c Client) putObjectMultipartNoStream(bucketName, objectName string, reader io.Reader, metadata map[string][]string, progress io.Reader) (n int64, err error) { // Input validation. if err = s3utils.CheckValidBucketName(bucketName); err != nil { @@ -78,7 +68,7 @@ func (c Client) putObjectMultipartNoStream(bucketName, objectName string, reader var complMultipartUpload completeMultipartUpload // Calculate the optimal parts info for a given size. - totalPartsCount, _, _, err := optimalPartInfo(-1) + totalPartsCount, partSize, _, err := optimalPartInfo(-1) if err != nil { return 0, err } @@ -101,38 +91,39 @@ func (c Client) putObjectMultipartNoStream(bucketName, objectName string, reader // Initialize parts uploaded map. partsInfo := make(map[int]ObjectPart) + // Create a buffer. + buf := make([]byte, partSize) + defer debug.FreeOSMemory() + for partNumber <= totalPartsCount { // Choose hash algorithms to be calculated by hashCopyN, // avoid sha256 with non-v4 signature request or // HTTPS connection. hashAlgos, hashSums := c.hashMaterials() - bufp := bufPool.Get().(*[]byte) - length, rErr := io.ReadFull(reader, *bufp) + length, rErr := io.ReadFull(reader, buf) if rErr == io.EOF { break } if rErr != nil && rErr != io.ErrUnexpectedEOF { - bufPool.Put(bufp) return 0, rErr } // Calculates hash sums while copying partSize bytes into cw. for k, v := range hashAlgos { - v.Write((*bufp)[:length]) + v.Write(buf[:length]) hashSums[k] = v.Sum(nil) } // Update progress reader appropriately to the latest offset // as we read from the source. - rd := newHook(bytes.NewReader((*bufp)[:length]), progress) + rd := newHook(bytes.NewReader(buf[:length]), progress) // Proceed to upload the part. var objPart ObjectPart objPart, err = c.uploadPart(bucketName, objectName, uploadID, rd, partNumber, hashSums["md5"], hashSums["sha256"], int64(length), metadata) if err != nil { - bufPool.Put(bufp) return totalUploadedSize, err } @@ -145,9 +136,6 @@ func (c Client) putObjectMultipartNoStream(bucketName, objectName string, reader // Increment part number. partNumber++ - // Put back data into bufpool. - bufPool.Put(bufp) - // For unknown size, Read EOF we break away. // We do not have to upload till totalPartsCount. if rErr == io.EOF { diff --git a/api-put-object.go b/api-put-object.go index f4107132e..94db82593 100644 --- a/api-put-object.go +++ b/api-put-object.go @@ -23,6 +23,7 @@ import ( "os" "reflect" "runtime" + "runtime/debug" "sort" "strings" @@ -233,7 +234,7 @@ func (c Client) putObjectMultipartStreamNoLength(bucketName, objectName string, var complMultipartUpload completeMultipartUpload // Calculate the optimal parts info for a given size. - totalPartsCount, _, _, err := optimalPartInfo(-1) + totalPartsCount, partSize, _, err := optimalPartInfo(-1) if err != nil { return 0, err } @@ -256,27 +257,28 @@ func (c Client) putObjectMultipartStreamNoLength(bucketName, objectName string, // Initialize parts uploaded map. partsInfo := make(map[int]ObjectPart) + // Create a buffer. + buf := make([]byte, partSize) + defer debug.FreeOSMemory() + for partNumber <= totalPartsCount { - bufp := bufPool.Get().(*[]byte) - length, rErr := io.ReadFull(reader, *bufp) + length, rErr := io.ReadFull(reader, buf) if rErr == io.EOF { break } if rErr != nil && rErr != io.ErrUnexpectedEOF { - bufPool.Put(bufp) return 0, rErr } // Update progress reader appropriately to the latest offset // as we read from the source. - rd := newHook(bytes.NewReader((*bufp)[:length]), progress) + rd := newHook(bytes.NewReader(buf[:length]), progress) // Proceed to upload the part. var objPart ObjectPart objPart, err = c.uploadPart(bucketName, objectName, uploadID, rd, partNumber, nil, nil, int64(length), metadata) if err != nil { - bufPool.Put(bufp) return totalUploadedSize, err } @@ -289,9 +291,6 @@ func (c Client) putObjectMultipartStreamNoLength(bucketName, objectName string, // Increment part number. partNumber++ - // Put back data into bufpool. - bufPool.Put(bufp) - // For unknown size, Read EOF we break away. // We do not have to upload till totalPartsCount. if rErr == io.EOF { diff --git a/functional_tests.go b/functional_tests.go index 3d16da4fe..99affd3d3 100644 --- a/functional_tests.go +++ b/functional_tests.go @@ -3671,6 +3671,77 @@ func testPutObjectNoLengthV2() { } } +// Test put objects of unknown size. +func testPutObjectsUnknownV2() { + logger().Info() + + // Seed random based on current time. + rand.Seed(time.Now().Unix()) + + // Instantiate new minio client object. + c, err := minio.NewV2( + os.Getenv(serverEndpoint), + os.Getenv(accessKey), + os.Getenv(secretKey), + mustParseBool(os.Getenv(enableHTTPS)), + ) + if err != nil { + log.Fatal("Error:", err) + } + + // Enable tracing, write to stderr. + // c.TraceOn(os.Stderr) + + // Set user agent. + c.SetAppInfo("Minio-go-FunctionalTest", "0.1.0") + + // Generate a new random bucket name. + bucketName := randString(60, rand.NewSource(time.Now().UnixNano()), + "minio-go-test") + + // Make a new bucket. + err = c.MakeBucket(bucketName, "us-east-1") + if err != nil { + log.Fatal("Error:", err, bucketName) + } + + // Issues are revealed by trying to upload multiple files of unknown size + // sequentially (on 4GB machines) + for i := 1; i <= 4; i++ { + // Simulate that we could be receiving byte slices of data that we want + // to upload as a file + rpipe, wpipe := io.Pipe() + defer rpipe.Close() + go func() { + b := []byte("test") + wpipe.Write(b) + wpipe.Close() + }() + + // Upload the object. + objectName := fmt.Sprintf("%sunique%d", bucketName, i) + n, err := c.PutObjectStreaming(bucketName, objectName, rpipe) + if err != nil { + log.Fatalf("Error: %v %s %s", err, bucketName, objectName) + } + if n != int64(4) { + log.Error(fmt.Errorf("Expected upload object size %d but got %d", 4, n)) + } + + // Remove the object. + err = c.RemoveObject(bucketName, objectName) + if err != nil { + log.Fatal("Error:", err) + } + } + + // Remove the bucket. + err = c.RemoveBucket(bucketName) + if err != nil { + log.Fatal("Error:", err) + } +} + // Test put object with 0 byte object. func testPutObject0ByteV2() { logger().Info() @@ -4086,6 +4157,7 @@ func main() { testUserMetadataCopyingV2() testPutObject0ByteV2() testPutObjectNoLengthV2() + testPutObjectsUnknownV2() testMakeBucketError() testMakeBucketRegions() testPutObjectWithMetadata()