Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix #730 by using a guaranteed reuse buffer pool. #781

Merged
merged 1 commit into from
Aug 11, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 9 additions & 21 deletions api-put-object-multipart.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,10 @@ import (
"io/ioutil"
"net/http"
"net/url"
"runtime/debug"
"sort"
"strconv"
"strings"
"sync"

"github.com/minio/minio-go/pkg/s3utils"
)
Expand All @@ -51,16 +51,6 @@ func (c Client) putObjectMultipart(bucketName, objectName string, reader io.Read
return n, err
}

// Pool to manage re-usable memory for upload objects
// with streams with unknown size.
var bufPool = sync.Pool{
New: func() interface{} {
_, partSize, _, _ := optimalPartInfo(-1)
b := make([]byte, partSize)
return &b
},
}

func (c Client) putObjectMultipartNoStream(bucketName, objectName string, reader io.Reader, metadata map[string][]string, progress io.Reader) (n int64, err error) {
// Input validation.
if err = s3utils.CheckValidBucketName(bucketName); err != nil {
Expand All @@ -78,7 +68,7 @@ func (c Client) putObjectMultipartNoStream(bucketName, objectName string, reader
var complMultipartUpload completeMultipartUpload

// Calculate the optimal parts info for a given size.
totalPartsCount, _, _, err := optimalPartInfo(-1)
totalPartsCount, partSize, _, err := optimalPartInfo(-1)
if err != nil {
return 0, err
}
Expand All @@ -101,38 +91,39 @@ func (c Client) putObjectMultipartNoStream(bucketName, objectName string, reader
// Initialize parts uploaded map.
partsInfo := make(map[int]ObjectPart)

// Create a buffer.
buf := make([]byte, partSize)
defer debug.FreeOSMemory()

for partNumber <= totalPartsCount {
// Choose hash algorithms to be calculated by hashCopyN,
// avoid sha256 with non-v4 signature request or
// HTTPS connection.
hashAlgos, hashSums := c.hashMaterials()

bufp := bufPool.Get().(*[]byte)
length, rErr := io.ReadFull(reader, *bufp)
length, rErr := io.ReadFull(reader, buf)
if rErr == io.EOF {
break
}
if rErr != nil && rErr != io.ErrUnexpectedEOF {
bufPool.Put(bufp)
return 0, rErr
}

// Calculates hash sums while copying partSize bytes into cw.
for k, v := range hashAlgos {
v.Write((*bufp)[:length])
v.Write(buf[:length])
hashSums[k] = v.Sum(nil)
}

// Update progress reader appropriately to the latest offset
// as we read from the source.
rd := newHook(bytes.NewReader((*bufp)[:length]), progress)
rd := newHook(bytes.NewReader(buf[:length]), progress)

// Proceed to upload the part.
var objPart ObjectPart
objPart, err = c.uploadPart(bucketName, objectName, uploadID, rd, partNumber,
hashSums["md5"], hashSums["sha256"], int64(length), metadata)
if err != nil {
bufPool.Put(bufp)
return totalUploadedSize, err
}

Expand All @@ -145,9 +136,6 @@ func (c Client) putObjectMultipartNoStream(bucketName, objectName string, reader
// Increment part number.
partNumber++

// Put back data into bufpool.
bufPool.Put(bufp)

// For unknown size, Read EOF we break away.
// We do not have to upload till totalPartsCount.
if rErr == io.EOF {
Expand Down
17 changes: 8 additions & 9 deletions api-put-object.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"os"
"reflect"
"runtime"
"runtime/debug"
"sort"
"strings"

Expand Down Expand Up @@ -233,7 +234,7 @@ func (c Client) putObjectMultipartStreamNoLength(bucketName, objectName string,
var complMultipartUpload completeMultipartUpload

// Calculate the optimal parts info for a given size.
totalPartsCount, _, _, err := optimalPartInfo(-1)
totalPartsCount, partSize, _, err := optimalPartInfo(-1)
if err != nil {
return 0, err
}
Expand All @@ -256,27 +257,28 @@ func (c Client) putObjectMultipartStreamNoLength(bucketName, objectName string,
// Initialize parts uploaded map.
partsInfo := make(map[int]ObjectPart)

// Create a buffer.
buf := make([]byte, partSize)
defer debug.FreeOSMemory()

for partNumber <= totalPartsCount {
bufp := bufPool.Get().(*[]byte)
length, rErr := io.ReadFull(reader, *bufp)
length, rErr := io.ReadFull(reader, buf)
if rErr == io.EOF {
break
}
if rErr != nil && rErr != io.ErrUnexpectedEOF {
bufPool.Put(bufp)
return 0, rErr
}

// Update progress reader appropriately to the latest offset
// as we read from the source.
rd := newHook(bytes.NewReader((*bufp)[:length]), progress)
rd := newHook(bytes.NewReader(buf[:length]), progress)

// Proceed to upload the part.
var objPart ObjectPart
objPart, err = c.uploadPart(bucketName, objectName, uploadID, rd, partNumber,
nil, nil, int64(length), metadata)
if err != nil {
bufPool.Put(bufp)
return totalUploadedSize, err
}

Expand All @@ -289,9 +291,6 @@ func (c Client) putObjectMultipartStreamNoLength(bucketName, objectName string,
// Increment part number.
partNumber++

// Put back data into bufpool.
bufPool.Put(bufp)

// For unknown size, Read EOF we break away.
// We do not have to upload till totalPartsCount.
if rErr == io.EOF {
Expand Down
72 changes: 72 additions & 0 deletions functional_tests.go
Original file line number Diff line number Diff line change
Expand Up @@ -3671,6 +3671,77 @@ func testPutObjectNoLengthV2() {
}
}

// Test put objects of unknown size.
func testPutObjectsUnknownV2() {
logger().Info()

// Seed random based on current time.
rand.Seed(time.Now().Unix())

// Instantiate new minio client object.
c, err := minio.NewV2(
os.Getenv(serverEndpoint),
os.Getenv(accessKey),
os.Getenv(secretKey),
mustParseBool(os.Getenv(enableHTTPS)),
)
if err != nil {
log.Fatal("Error:", err)
}

// Enable tracing, write to stderr.
// c.TraceOn(os.Stderr)

// Set user agent.
c.SetAppInfo("Minio-go-FunctionalTest", "0.1.0")

// Generate a new random bucket name.
bucketName := randString(60, rand.NewSource(time.Now().UnixNano()),
"minio-go-test")

// Make a new bucket.
err = c.MakeBucket(bucketName, "us-east-1")
if err != nil {
log.Fatal("Error:", err, bucketName)
}

// Issues are revealed by trying to upload multiple files of unknown size
// sequentially (on 4GB machines)
for i := 1; i <= 4; i++ {
// Simulate that we could be receiving byte slices of data that we want
// to upload as a file
rpipe, wpipe := io.Pipe()
defer rpipe.Close()
go func() {
b := []byte("test")
wpipe.Write(b)
wpipe.Close()
}()

// Upload the object.
objectName := fmt.Sprintf("%sunique%d", bucketName, i)
n, err := c.PutObjectStreaming(bucketName, objectName, rpipe)
if err != nil {
log.Fatalf("Error: %v %s %s", err, bucketName, objectName)
}
if n != int64(4) {
log.Error(fmt.Errorf("Expected upload object size %d but got %d", 4, n))
}

// Remove the object.
err = c.RemoveObject(bucketName, objectName)
if err != nil {
log.Fatal("Error:", err)
}
}

// Remove the bucket.
err = c.RemoveBucket(bucketName)
if err != nil {
log.Fatal("Error:", err)
}
}

// Test put object with 0 byte object.
func testPutObject0ByteV2() {
logger().Info()
Expand Down Expand Up @@ -4086,6 +4157,7 @@ func main() {
testUserMetadataCopyingV2()
testPutObject0ByteV2()
testPutObjectNoLengthV2()
testPutObjectsUnknownV2()
testMakeBucketError()
testMakeBucketRegions()
testPutObjectWithMetadata()
Expand Down