Skip to content

Commit

Permalink
Fix minio#730 by using a guaranteed reuse buffer pool.
Browse files Browse the repository at this point in the history
Always return buffers to the pool.
  • Loading branch information
sb10 committed Aug 9, 2017
1 parent 1a09415 commit b86d94e
Show file tree
Hide file tree
Showing 4 changed files with 101 additions and 28 deletions.
40 changes: 20 additions & 20 deletions api-put-object-multipart.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,11 +27,21 @@ import (
"sort"
"strconv"
"strings"
"sync"

"github.com/minio/minio-go/pkg/s3utils"
"github.com/oxtoacart/bpool"
)

// Pool to manage re-usable memory for upload objects
// with streams with unknown size.
var bufPool *bpool.BytePool

func init() {
_, partSize, _, _ := optimalPartInfo(-1)
maxBuffers := 5 // *** ideally this would be 75% of (available_system_memory / partSize), but this is reasonable for 4GB machines?
bufPool = bpool.NewBytePool(maxBuffers, int(partSize))
}

func (c Client) putObjectMultipart(bucketName, objectName string, reader io.Reader, size int64,
metadata map[string][]string, progress io.Reader) (n int64, err error) {
n, err = c.putObjectMultipartNoStream(bucketName, objectName, reader, metadata, progress)
Expand All @@ -51,16 +61,6 @@ func (c Client) putObjectMultipart(bucketName, objectName string, reader io.Read
return n, err
}

// Pool to manage re-usable memory for upload objects
// with streams with unknown size.
var bufPool = sync.Pool{
New: func() interface{} {
_, partSize, _, _ := optimalPartInfo(-1)
b := make([]byte, partSize)
return &b
},
}

func (c Client) putObjectMultipartNoStream(bucketName, objectName string, reader io.Reader, metadata map[string][]string, progress io.Reader) (n int64, err error) {
// Input validation.
if err = s3utils.CheckValidBucketName(bucketName); err != nil {
Expand Down Expand Up @@ -101,38 +101,41 @@ func (c Client) putObjectMultipartNoStream(bucketName, objectName string, reader
// Initialize parts uploaded map.
partsInfo := make(map[int]ObjectPart)

// Fetch a new buffer.
bufp := bufPool.Get()

// Put back data into bufpool.
defer bufPool.Put(bufp)

for partNumber <= totalPartsCount {
// Choose hash algorithms to be calculated by hashCopyN,
// avoid sha256 with non-v4 signature request or
// HTTPS connection.
hashAlgos, hashSums := c.hashMaterials()

bufp := bufPool.Get().(*[]byte)
length, rErr := io.ReadFull(reader, *bufp)
length, rErr := io.ReadFull(reader, bufp)
if rErr == io.EOF {
break
}
if rErr != nil && rErr != io.ErrUnexpectedEOF {
bufPool.Put(bufp)
return 0, rErr
}

// Calculates hash sums while copying partSize bytes into cw.
for k, v := range hashAlgos {
v.Write((*bufp)[:length])
v.Write(bufp[:length])
hashSums[k] = v.Sum(nil)
}

// Update progress reader appropriately to the latest offset
// as we read from the source.
rd := newHook(bytes.NewReader((*bufp)[:length]), progress)
rd := newHook(bytes.NewReader(bufp[:length]), progress)

// Proceed to upload the part.
var objPart ObjectPart
objPart, err = c.uploadPart(bucketName, objectName, uploadID, rd, partNumber,
hashSums["md5"], hashSums["sha256"], int64(length), metadata)
if err != nil {
bufPool.Put(bufp)
return totalUploadedSize, err
}

Expand All @@ -145,9 +148,6 @@ func (c Client) putObjectMultipartNoStream(bucketName, objectName string, reader
// Increment part number.
partNumber++

// Put back data into bufpool.
bufPool.Put(bufp)

// For unknown size, Read EOF we break away.
// We do not have to upload till totalPartsCount.
if rErr == io.EOF {
Expand Down
16 changes: 8 additions & 8 deletions api-put-object.go
Original file line number Diff line number Diff line change
Expand Up @@ -256,27 +256,30 @@ func (c Client) putObjectMultipartStreamNoLength(bucketName, objectName string,
// Initialize parts uploaded map.
partsInfo := make(map[int]ObjectPart)

// Fetch a new buffer.
bufp := bufPool.Get()

// Put back data into bufpool.
defer bufPool.Put(bufp)

for partNumber <= totalPartsCount {
bufp := bufPool.Get().(*[]byte)
length, rErr := io.ReadFull(reader, *bufp)
length, rErr := io.ReadFull(reader, bufp)
if rErr == io.EOF {
break
}
if rErr != nil && rErr != io.ErrUnexpectedEOF {
bufPool.Put(bufp)
return 0, rErr
}

// Update progress reader appropriately to the latest offset
// as we read from the source.
rd := newHook(bytes.NewReader((*bufp)[:length]), progress)
rd := newHook(bytes.NewReader(bufp[:length]), progress)

// Proceed to upload the part.
var objPart ObjectPart
objPart, err = c.uploadPart(bucketName, objectName, uploadID, rd, partNumber,
nil, nil, int64(length), metadata)
if err != nil {
bufPool.Put(bufp)
return totalUploadedSize, err
}

Expand All @@ -289,9 +292,6 @@ func (c Client) putObjectMultipartStreamNoLength(bucketName, objectName string,
// Increment part number.
partNumber++

// Put back data into bufpool.
bufPool.Put(bufp)

// For unknown size, Read EOF we break away.
// We do not have to upload till totalPartsCount.
if rErr == io.EOF {
Expand Down
1 change: 1 addition & 0 deletions appveyor.yml
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ install:
- go get -u github.com/minio/go-homedir
- go get -u github.com/remyoudompheng/go-misc/deadcode
- go get -u github.com/gordonklaus/ineffassign
- go get -u github.com/oxtoacart/bpool

# to run your custom scripts instead of automatic MSBuild
build_script:
Expand Down
72 changes: 72 additions & 0 deletions functional_tests.go
Original file line number Diff line number Diff line change
Expand Up @@ -3671,6 +3671,77 @@ func testPutObjectNoLengthV2() {
}
}

// Test put objects of unknown size.
func testPutObjectsUnknownV2() {
logger().Info()

// Seed random based on current time.
rand.Seed(time.Now().Unix())

// Instantiate new minio client object.
c, err := minio.NewV2(
os.Getenv(serverEndpoint),
os.Getenv(accessKey),
os.Getenv(secretKey),
mustParseBool(os.Getenv(enableHTTPS)),
)
if err != nil {
log.Fatal("Error:", err)
}

// Enable tracing, write to stderr.
// c.TraceOn(os.Stderr)

// Set user agent.
c.SetAppInfo("Minio-go-FunctionalTest", "0.1.0")

// Generate a new random bucket name.
bucketName := randString(60, rand.NewSource(time.Now().UnixNano()),
"minio-go-test")

// Make a new bucket.
err = c.MakeBucket(bucketName, "us-east-1")
if err != nil {
log.Fatal("Error:", err, bucketName)
}

// Issues are revealed by trying to upload multiple files of unknown size
// sequentially
for i := 1; i <= 12; i++ {
// Simulate that we could be receiving byte slices of data that we want
// to upload as a file
rpipe, wpipe := io.Pipe()
defer rpipe.Close()
go func() {
b := []byte("test")
wpipe.Write(b)
wpipe.Close()
}()

// Upload the object.
objectName := fmt.Sprintf("%sunique%d", bucketName, i)
n, err := c.PutObjectStreaming(bucketName, objectName, rpipe)
if err != nil {
log.Fatalf("Error: %v %s %s", err, bucketName, objectName)
}
if n != int64(4) {
log.Error(fmt.Errorf("Expected upload object size %d but got %d", 4, n))
}

// Remove the object.
err = c.RemoveObject(bucketName, objectName)
if err != nil {
log.Fatal("Error:", err)
}
}

// Remove the bucket.
err = c.RemoveBucket(bucketName)
if err != nil {
log.Fatal("Error:", err)
}
}

// Test put object with 0 byte object.
func testPutObject0ByteV2() {
logger().Info()
Expand Down Expand Up @@ -4086,6 +4157,7 @@ func main() {
testUserMetadataCopyingV2()
testPutObject0ByteV2()
testPutObjectNoLengthV2()
testPutObjectsUnknownV2()
testMakeBucketError()
testMakeBucketRegions()
testPutObjectWithMetadata()
Expand Down

0 comments on commit b86d94e

Please sign in to comment.