Skip to content

Commit

Permalink
Fix #730 by using a guaranteed reuse buffer pool.
Browse files Browse the repository at this point in the history
Always return buffers to the pool.
  • Loading branch information
sb10 committed Aug 8, 2017
1 parent 1a09415 commit 5991548
Show file tree
Hide file tree
Showing 2 changed files with 20 additions and 18 deletions.
31 changes: 16 additions & 15 deletions api-put-object-multipart.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,11 +27,21 @@ import (
"sort"
"strconv"
"strings"
"sync"

"github.com/minio/minio-go/pkg/s3utils"
"github.com/oxtoacart/bpool"
)

// Pool to manage re-usable memory for upload objects
// with streams with unknown size.
var bufPool *bpool.BytePool

func init() {
_, partSize, _, _ := optimalPartInfo(-1)
maxBuffers := 5 // *** ideally this would be 75% of (available_system_memory / partSize), but this is reasonable for 4GB machines?
bufPool = bpool.NewBytePool(maxBuffers, int(partSize))
}

func (c Client) putObjectMultipart(bucketName, objectName string, reader io.Reader, size int64,
metadata map[string][]string, progress io.Reader) (n int64, err error) {
n, err = c.putObjectMultipartNoStream(bucketName, objectName, reader, metadata, progress)
Expand All @@ -51,16 +61,6 @@ func (c Client) putObjectMultipart(bucketName, objectName string, reader io.Read
return n, err
}

// Pool to manage re-usable memory for upload objects
// with streams with unknown size.
var bufPool = sync.Pool{
New: func() interface{} {
_, partSize, _, _ := optimalPartInfo(-1)
b := make([]byte, partSize)
return &b
},
}

func (c Client) putObjectMultipartNoStream(bucketName, objectName string, reader io.Reader, metadata map[string][]string, progress io.Reader) (n int64, err error) {
// Input validation.
if err = s3utils.CheckValidBucketName(bucketName); err != nil {
Expand Down Expand Up @@ -107,9 +107,10 @@ func (c Client) putObjectMultipartNoStream(bucketName, objectName string, reader
// HTTPS connection.
hashAlgos, hashSums := c.hashMaterials()

bufp := bufPool.Get().(*[]byte)
length, rErr := io.ReadFull(reader, *bufp)
bufp := bufPool.Get()
length, rErr := io.ReadFull(reader, bufp)
if rErr == io.EOF {
bufPool.Put(bufp)
break
}
if rErr != nil && rErr != io.ErrUnexpectedEOF {
Expand All @@ -119,13 +120,13 @@ func (c Client) putObjectMultipartNoStream(bucketName, objectName string, reader

// Calculates hash sums while copying partSize bytes into cw.
for k, v := range hashAlgos {
v.Write((*bufp)[:length])
v.Write(bufp[:length])
hashSums[k] = v.Sum(nil)
}

// Update progress reader appropriately to the latest offset
// as we read from the source.
rd := newHook(bytes.NewReader((*bufp)[:length]), progress)
rd := newHook(bytes.NewReader(bufp[:length]), progress)

// Proceed to upload the part.
var objPart ObjectPart
Expand Down
7 changes: 4 additions & 3 deletions api-put-object.go
Original file line number Diff line number Diff line change
Expand Up @@ -257,9 +257,10 @@ func (c Client) putObjectMultipartStreamNoLength(bucketName, objectName string,
partsInfo := make(map[int]ObjectPart)

for partNumber <= totalPartsCount {
bufp := bufPool.Get().(*[]byte)
length, rErr := io.ReadFull(reader, *bufp)
bufp := bufPool.Get()
length, rErr := io.ReadFull(reader, bufp)
if rErr == io.EOF {
bufPool.Put(bufp)
break
}
if rErr != nil && rErr != io.ErrUnexpectedEOF {
Expand All @@ -269,7 +270,7 @@ func (c Client) putObjectMultipartStreamNoLength(bucketName, objectName string,

// Update progress reader appropriately to the latest offset
// as we read from the source.
rd := newHook(bytes.NewReader((*bufp)[:length]), progress)
rd := newHook(bytes.NewReader(bufp[:length]), progress)

// Proceed to upload the part.
var objPart ObjectPart
Expand Down

0 comments on commit 5991548

Please sign in to comment.