Skip to content

Commit

Permalink
Fix minio#730 by using a guaranteed reuse buffer pool.
Browse files Browse the repository at this point in the history
Always return buffers to the pool.
  • Loading branch information
sb10 committed Aug 9, 2017
1 parent 1a09415 commit 07089a4
Show file tree
Hide file tree
Showing 4 changed files with 93 additions and 18 deletions.
31 changes: 16 additions & 15 deletions api-put-object-multipart.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,11 +27,21 @@ import (
"sort"
"strconv"
"strings"
"sync"

"github.com/minio/minio-go/pkg/s3utils"
"github.com/oxtoacart/bpool"
)

// Pool to manage re-usable memory for upload objects
// with streams with unknown size.
var bufPool *bpool.BytePool

func init() {
_, partSize, _, _ := optimalPartInfo(-1)
maxBuffers := 5 // *** ideally this would be 75% of (available_system_memory / partSize), but this is reasonable for 4GB machines?
bufPool = bpool.NewBytePool(maxBuffers, int(partSize))
}

func (c Client) putObjectMultipart(bucketName, objectName string, reader io.Reader, size int64,
metadata map[string][]string, progress io.Reader) (n int64, err error) {
n, err = c.putObjectMultipartNoStream(bucketName, objectName, reader, metadata, progress)
Expand All @@ -51,16 +61,6 @@ func (c Client) putObjectMultipart(bucketName, objectName string, reader io.Read
return n, err
}

// Pool to manage re-usable memory for upload objects
// with streams with unknown size.
var bufPool = sync.Pool{
New: func() interface{} {
_, partSize, _, _ := optimalPartInfo(-1)
b := make([]byte, partSize)
return &b
},
}

func (c Client) putObjectMultipartNoStream(bucketName, objectName string, reader io.Reader, metadata map[string][]string, progress io.Reader) (n int64, err error) {
// Input validation.
if err = s3utils.CheckValidBucketName(bucketName); err != nil {
Expand Down Expand Up @@ -107,9 +107,10 @@ func (c Client) putObjectMultipartNoStream(bucketName, objectName string, reader
// HTTPS connection.
hashAlgos, hashSums := c.hashMaterials()

bufp := bufPool.Get().(*[]byte)
length, rErr := io.ReadFull(reader, *bufp)
bufp := bufPool.Get()
length, rErr := io.ReadFull(reader, bufp)
if rErr == io.EOF {
bufPool.Put(bufp)
break
}
if rErr != nil && rErr != io.ErrUnexpectedEOF {
Expand All @@ -119,13 +120,13 @@ func (c Client) putObjectMultipartNoStream(bucketName, objectName string, reader

// Calculates hash sums while copying partSize bytes into cw.
for k, v := range hashAlgos {
v.Write((*bufp)[:length])
v.Write(bufp[:length])
hashSums[k] = v.Sum(nil)
}

// Update progress reader appropriately to the latest offset
// as we read from the source.
rd := newHook(bytes.NewReader((*bufp)[:length]), progress)
rd := newHook(bytes.NewReader(bufp[:length]), progress)

// Proceed to upload the part.
var objPart ObjectPart
Expand Down
7 changes: 4 additions & 3 deletions api-put-object.go
Original file line number Diff line number Diff line change
Expand Up @@ -257,9 +257,10 @@ func (c Client) putObjectMultipartStreamNoLength(bucketName, objectName string,
partsInfo := make(map[int]ObjectPart)

for partNumber <= totalPartsCount {
bufp := bufPool.Get().(*[]byte)
length, rErr := io.ReadFull(reader, *bufp)
bufp := bufPool.Get()
length, rErr := io.ReadFull(reader, bufp)
if rErr == io.EOF {
bufPool.Put(bufp)
break
}
if rErr != nil && rErr != io.ErrUnexpectedEOF {
Expand All @@ -269,7 +270,7 @@ func (c Client) putObjectMultipartStreamNoLength(bucketName, objectName string,

// Update progress reader appropriately to the latest offset
// as we read from the source.
rd := newHook(bytes.NewReader((*bufp)[:length]), progress)
rd := newHook(bytes.NewReader(bufp[:length]), progress)

// Proceed to upload the part.
var objPart ObjectPart
Expand Down
1 change: 1 addition & 0 deletions appveyor.yml
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ install:
- go get -u github.com/minio/go-homedir
- go get -u github.com/remyoudompheng/go-misc/deadcode
- go get -u github.com/gordonklaus/ineffassign
- go get -u github.com/oxtoacart/bpool

# to run your custom scripts instead of automatic MSBuild
build_script:
Expand Down
72 changes: 72 additions & 0 deletions functional_tests.go
Original file line number Diff line number Diff line change
Expand Up @@ -3671,6 +3671,77 @@ func testPutObjectNoLengthV2() {
}
}

// Test put objects of unknown size.
func testPutObjectsUnknownV2() {
logger().Info()

// Seed random based on current time.
rand.Seed(time.Now().Unix())

// Instantiate new minio client object.
c, err := minio.NewV2(
os.Getenv(serverEndpoint),
os.Getenv(accessKey),
os.Getenv(secretKey),
mustParseBool(os.Getenv(enableHTTPS)),
)
if err != nil {
log.Fatal("Error:", err)
}

// Enable tracing, write to stderr.
// c.TraceOn(os.Stderr)

// Set user agent.
c.SetAppInfo("Minio-go-FunctionalTest", "0.1.0")

// Generate a new random bucket name.
bucketName := randString(60, rand.NewSource(time.Now().UnixNano()),
"minio-go-test")

// Make a new bucket.
err = c.MakeBucket(bucketName, "us-east-1")
if err != nil {
log.Fatal("Error:", err, bucketName)
}

// Issues are revealed by trying to upload multiple files of unknown size
// sequentially
for i := 1; i <= 12; i++ {
// Simulate that we could be receiving byte slices of data that we want
// to upload as a file
rpipe, wpipe := io.Pipe()
defer rpipe.Close()
go func() {
b := []byte("test")
wpipe.Write(b)
wpipe.Close()
}()

// Upload the object.
objectName := fmt.Sprintf("%sunique%d", bucketName, i)
n, err := c.PutObjectStreaming(bucketName, objectName, rpipe)
if err != nil {
log.Fatalf("Error: %v %s %s", err, bucketName, objectName)
}
if n != int64(4) {
log.Error(fmt.Errorf("Expected upload object size %d but got %d", 4, n))
}

// Remove the object.
err = c.RemoveObject(bucketName, objectName)
if err != nil {
log.Fatal("Error:", err)
}
}

// Remove the bucket.
err = c.RemoveBucket(bucketName)
if err != nil {
log.Fatal("Error:", err)
}
}

// Test put object with 0 byte object.
func testPutObject0ByteV2() {
logger().Info()
Expand Down Expand Up @@ -4086,6 +4157,7 @@ func main() {
testUserMetadataCopyingV2()
testPutObject0ByteV2()
testPutObjectNoLengthV2()
testPutObjectsUnknownV2()
testMakeBucketError()
testMakeBucketRegions()
testPutObjectWithMetadata()
Expand Down

0 comments on commit 07089a4

Please sign in to comment.