Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix #730 by using a guaranteed reuse buffer pool. #781

Merged
merged 1 commit into from
Aug 11, 2017

Conversation

sb10
Copy link
Contributor

@sb10 sb10 commented Aug 8, 2017

Always return buffers to the pool.

Current code fails to call bufPool.Put(bufp) for every bufPool.Get(), which is fixed here. But even with this fix, the code in #730 still results in signal: killed.

Reimplementing the buffer pool with github.com/oxtoacart/bpool instead of sync.Pool solves the problem. Now we always reuse buffers if possible.

@harshavardhana
Copy link
Member

harshavardhana commented Aug 8, 2017

Thanks for the PR @sb10 we don't need to use an external library - the problem was perhaps calling Get() inside a loop which resulted in multiple buffers even for a single PutObject() operation. Even after a success Put(bufp) Get() doesn't reuse in a loop - our usage of the sync.Pool was wrong or lack of awareness.

The right way is to re-use the buffer appropriately by calling it only once per PutObject() call.

diff --git a/api-put-object-multipart.go b/api-put-object-multipart.go
index 6e0015a..acc8595 100644
--- a/api-put-object-multipart.go
+++ b/api-put-object-multipart.go
@@ -101,19 +101,20 @@ func (c Client) putObjectMultipartNoStream(bucketName, objectName string, reader
        // Initialize parts uploaded map.
        partsInfo := make(map[int]ObjectPart)
 
+       bufp := bufPool.Get().(*[]byte)
+       defer bufPool.Put(bufp)
+
        for partNumber <= totalPartsCount {
                // Choose hash algorithms to be calculated by hashCopyN,
                // avoid sha256 with non-v4 signature request or
                // HTTPS connection.
                hashAlgos, hashSums := c.hashMaterials()
 
-               bufp := bufPool.Get().(*[]byte)
                length, rErr := io.ReadFull(reader, *bufp)
                if rErr == io.EOF {
                        break
                }
                if rErr != nil && rErr != io.ErrUnexpectedEOF {
-                       bufPool.Put(bufp)
                        return 0, rErr
                }
 
@@ -132,7 +133,6 @@ func (c Client) putObjectMultipartNoStream(bucketName, objectName string, reader
                objPart, err = c.uploadPart(bucketName, objectName, uploadID, rd, partNumber,
                        hashSums["md5"], hashSums["sha256"], int64(length), metadata)
                if err != nil {
-                       bufPool.Put(bufp)
                        return totalUploadedSize, err
                }
 
@@ -145,9 +145,6 @@ func (c Client) putObjectMultipartNoStream(bucketName, objectName string, reader
                // Increment part number.
                partNumber++
 
-               // Put back data into bufpool.
-               bufPool.Put(bufp)
-
                // For unknown size, Read EOF we break away.
                // We do not have to upload till totalPartsCount.
                if rErr == io.EOF {
diff --git a/api-put-object.go b/api-put-object.go
index f410713..95bbdcc 100644
--- a/api-put-object.go
+++ b/api-put-object.go
@@ -256,14 +256,18 @@ func (c Client) putObjectMultipartStreamNoLength(bucketName, objectName string,
        // Initialize parts uploaded map.
        partsInfo := make(map[int]ObjectPart)
 
+       // Fetch a new buffer.
+       bufp := bufPool.Get().(*[]byte)
+
+       // Put back data into bufpool.
+       defer bufPool.Put(bufp)
+
        for partNumber <= totalPartsCount {
-               bufp := bufPool.Get().(*[]byte)
                length, rErr := io.ReadFull(reader, *bufp)
                if rErr == io.EOF {
                        break
                }
                if rErr != nil && rErr != io.ErrUnexpectedEOF {
-                       bufPool.Put(bufp)
                        return 0, rErr
                }
 
@@ -276,7 +280,6 @@ func (c Client) putObjectMultipartStreamNoLength(bucketName, objectName string,
                objPart, err = c.uploadPart(bucketName, objectName, uploadID, rd, partNumber,
                        nil, nil, int64(length), metadata)
                if err != nil {
-                       bufPool.Put(bufp)
                        return totalUploadedSize, err
                }
 
@@ -289,9 +292,6 @@ func (c Client) putObjectMultipartStreamNoLength(bucketName, objectName string,
                // Increment part number.
                partNumber++
 
-               // Put back data into bufpool.
-               bufPool.Put(bufp)
-
                // For unknown size, Read EOF we break away.
                // We do not have to upload till totalPartsCount.
                if rErr == io.EOF {

This should address your bug @sb10 can you re-work this PR and test it on your end?

@sb10
Copy link
Contributor Author

sb10 commented Aug 9, 2017

Sorry, that doesn't work. Again, from the docs for sync.Pool:

Get may choose to ignore the pool and treat it as empty.

If I alter the test added in this pull request to get 3 or more files sequentially with your above version, it still fails with signal: killed. You should be able to replicate this on any 4GB machine or VM.

If you're worried about adding another dependency, the code from github.com/oxtoacart/bpool that I'm using is only 28 lines and apache 2 licensed, so could be copied directly in to api-put-object-multipart.go if you like.

@sb10 sb10 force-pushed the master branch 2 times, most recently from 07089a4 to b86d94e Compare August 9, 2017 08:44
@harshavardhana
Copy link
Member

Sorry, that doesn't work. Again, from the docs for sync.Pool:

Get may choose to ignore the pool and treat it as empty.
If I alter the test added in this pull request to get 3 or more files sequentially with your above version, it still fails with signal: killed. You should be able to replicate this on any 4GB machine or VM.

If you're worried about adding another dependency, the code from github.com/oxtoacart/bpool that I'm using is only 28 lines and apache 2 licensed, so could be copied directly in to api-put-object-multipart.go if you like.

I see again why because PutObject() if called through your app by nature is again being called in a for loop for each PutObject operations which ultimately results in the same scenario. This is quite unexpected from sync.Pool makes it quite not useful in most scenarios. The only reason i wanted to stick to sync.Pool is that its a natural fit for these situations but i do see the problem since Go is not providing any guarantee. Instead of using using external library we can leverage our own implementation here based on https://raw.githubusercontent.com/minio/minio/master/pkg/bpool/bpool.go which we used extensively with erasure coded backend - needs modification though can you try this following implementation instead?

package minio

import "sync"

// BytePool - temporary pool of byte slice.
type BytePool struct {
        sync.Mutex
        buf  []byte // array of byte slices
        size int64  // size of buf
}

// Get - Returns a buffer of size.
func (b *BytePool) Get() (buf []byte) {
        b.Lock()
        defer b.Unlock()
        if b.buf == nil {
                b.buf = make([]byte, b.size)
        }
        return b.buf
}

// NewBytePool - Returns a new byte pool.
// size - length of the pool'ed slice.
func NewBytePool(size int64) *BytePool {
        return &BytePool{size: size}
}

@sb10
Copy link
Contributor Author

sb10 commented Aug 9, 2017

That's not a pool? The only thing it does is guarantee that Get() always returns the same byte slice, which would be very bad in a concurrent situation.

The minio bpool implementation is comparatively inefficient and errors out once you try to create too many buffers.

The oxtoacart/bpool implementation is simple, works concurrently and has the desired behaviour:

  • You can always get a buffer
  • Concurrent callers never get the same buffer
  • We only store a limited number of past buffers
  • If one was previously made, is not being used right now and so got stored, you'll get one of those (guaranteed not to make a new one)

Now as a user I have a very simple and natural way of not running out of memory: don't upload too many files simultaneously.

@harshavardhana
Copy link
Member

That's not a pool? The only thing it does is guarantee that Get() always returns the same byte slice, which would be very bad in a concurrent situation.

Correct, it is not a pool yet. I wanted to see if it does help as a first step, if it does then i can implement a new version which does concurrent behavior as well without using channels.

The minio bpool implementation is comparatively inefficient and errors out once you try to create too many buffers.

It is expected to error out since it is unexpected that you wish to create more buffer than requested amount on the server which in turn leads to OOM. The reason is that Get() should never really be allowed to do make([]byte) beyond its prescribed limit. It also indicates that caller is doing something wrong in this situation.

The oxtoacart/bpool implementation is simple, works concurrently and has the desired behaviour:

You can always get a buffer
We only store a limited number of past buffers
If one was previously made, is not being used right now and so got stored, you'll get that one

We used it before when we wished to implement server side buffer pooling - bpool.Get() and bpool.Put() has to be synchronized i.e there should be a one to one relation otherwise there is no way bpool.Get() limits you from running out of memory.

I would even perhaps block the caller if a bpool.Get() is not synchronized with another bpool.Put() - this way run-away concurrency can be controlled and also mistakes can be avoided inside the code. So in a sense that you can possibly only upload 5 PutObject in parallel on a 4GiB VM or machine.

@sb10
Copy link
Contributor Author

sb10 commented Aug 9, 2017

Not quite sure how to take your last comments. Yes, you have to use it correctly. This pull request now defers the Put() after the Get(), so there won't be an issue.

Are you proposing to alter the oxtoacart/bpool implementation so that instead of "You can always get a buffer" it becomes "You can only get a new buffer if we haven't reached the limit, otherwise the Get() call blocks"?

But then you have to worry about a "reasonable" default limit and letting the user override it. I think the existing behaviour is better. The hard-coded limit prevents wasting too much memory on the pool, and the user decides exactly how many simultaneous uploads they would like to do, based on their available memory.

Why do you wish to avoid using channels? They seem ideal here.

@harshavardhana
Copy link
Member

harshavardhana commented Aug 9, 2017

Not quite sure how to take your last comments. Yes, you have to use it correctly. This pull request now defers the Put() after the Get(), so there won't be an issue.

It works essentially in some cases but not in high concurrent environments which is what we have seen in the past. If the basic idea behind using sort of a byte pool is to save ourselves when an app misbehaves doesn't seem to be possible without some form of throttling.

Lets take an example here if you set the pool size for example as 4GiB as 5 it doesn't really mean that you are essentially limited to a pool of 5 . Because if you have an application lets say uploads 10 objects in parallel then you have essentially overflowed your pool by 5 additional entries which are in-fact not holding any pool entry themselves but they are generally going to be lost and Go should garbage collect them. This is what i meant by run-away affect it doesn't matter if you use it properly inside the PutObject() implementation you can potentially end up in a situation we are in right now. Another reason is also because user himself is not in control of the overall memory usage so an SDK consumer has to peek into our implementation to understand how many go-routines he/she can have and which would safer to use in certain environments.

But then you have to worry about a "reasonable" default limit and letting the user override it. I think the existing behaviour is better. The hard-coded limit prevents wasting too much memory on the pool, and the user decides exactly how many simultaneous uploads they would like to do, based on their available memory.

My suggestion is to block such PutObject() operations than requesting more memory from the system, this way we will be safe and there are no crashes. Yes we do need to provide a way to tune these values so that user can choose based on the amount of go-routines he/she is going to use.

If a user is able to decide based on how much of amount of memory is available - which is harder to obtain in a cross platform safe way. I would think that user will know the object size before hand and not even get to this situation in the first place.

Why do you wish to avoid using channels? They seem ideal here.

This is perhaps my bad, channels are fine.

@harshavardhana
Copy link
Member

Lets take an example here if you set the pool size for example as 4GiB as 5 it doesn't really mean that you are essentially limited to a pool of 5 . Because if you have an application lets say uploads 10 objects in parallel then you have essentially overflowed your pool by 5 additional entries which are in-fact not holding any pool entry themselves but they are generally going to be lost and Go should garbage collect them. This is what i meant by run-away affect it doesn't matter if you use it properly inside the PutObject()

This is just quite easy in an application like yours since you do not know the object size even if the file size is 1KiB lets say and you are expecting 100 routines to upload different files. We are going to crash anyways so we are not safe here in the first place.

Let me think about this further, going to sleep now and perhaps provide more comprehensive solution to this situation.

@sb10
Copy link
Contributor Author

sb10 commented Aug 9, 2017

Let's clarify this down.

The aim to be achieved here is that minio-go should be able to upload files in constant memory.

  • It is OK if each additional simultaneous upload requires an additional constant amount of memory.
  • It is NOT OK if each sequential upload requires additional memory.

If a user of minio-go finds that they run out of memory because they did too many simultaneous uploads, all they have to do is do fewer simultaneous uploads. The user doesn't have to tune any hidden parameter in minio-go.

The user shouldn't, however, run out of memory doing single sequential uploads, because then there's nothing they can do to avoid it.

minio-go currently suffers from this problem. This pull request solves the problem.

Again, it is OK for the pool to allow unlimited parallel buffers. That's the user's problem to sort out if they run out of memory. The user can't solve running out of memory when they only upload single files.

@sb10
Copy link
Contributor Author

sb10 commented Aug 9, 2017

On running further tests on my own software, I find yet another memory issue caused by this pull request.

One of the "features" of sync.Pool is that any unreferenced sync.Pool objects are removed at each garbage collection cycle.

That meant I could force garbage collection to clean up unneeded upload buffers.

Using oxtoacart/bpool means a permanent pool of buffers, and I now run out of memory when trying to do something unrelated following an upload.

It's not so nice emptying out the pool with a buffered channel implementation, but possibly still the cleanest way of doing things. However I almost certainly need to add a new public function to minio-go to trigger the emptying of the pool...

@sb10 sb10 force-pushed the master branch 2 times, most recently from 820a809 to ba7fee5 Compare August 9, 2017 13:55
@sb10
Copy link
Contributor Author

sb10 commented Aug 9, 2017

I've now changed the pull request so that instead of using oxtoacart/bpool, an implementation based on that is included directly in api-put-object-multipart.go (you may prefer it in its own file: let me know) with the modification of having an Empty() method.

I also introduce a new function to the API called ReleaseBuffers() (you may prefer a different name: let me know) so that users can empty the pool and reclaim memory at will.

By using ReleaseBuffers() my own software can now pass all its tests without running out of memory.

@harshavardhana
Copy link
Member

Let's clarify this down.

The aim to be achieved here is that minio-go should be able to upload files in constant memory.

It is OK if each additional simultaneous upload requires an additional constant amount of memory.

If this is okay then i would suggest we don't have a pool but instead allocate a buffer once per PutObject() since in the end without having any control we are essentially going to reach the same stage.

It is NOT OK if each sequential upload requires additional memory.

Correct

If a user of minio-go finds that they run out of memory because they did too many simultaneous uploads, all they have to do is do fewer simultaneous uploads. The user doesn't have to tune any hidden parameter in minio-go.

This situation no one is aware the problem here is it is coupled with situations where objects are smaller in reality but user such as your application has decided to not provide content-length - then a buffer is mandated due the underlying requirements.

From an app point of view user can argue that why would he/she needs to limit themselves to a lower limit to something which they don't have control over and to the data they are trying to upload which fits well within the current memory limits.

The user shouldn't, however, run out of memory doing single sequential uploads, because then there's nothing they can do to avoid it.

minio-go currently suffers from this problem. This pull request solves the problem.

This is okay it exposes an architectural problem which was not addressed previously, it was wrongly assumed that sync.Pool will work.

Again, it is OK for the pool to allow unlimited parallel buffers. That's the user's problem to sort out if they run out of memory. The user can't solve running out of memory when they only upload single files.

This is not acceptable because they are in no control over how a buffer is allocated and used. This is the same problem what exists today using sync.Pool we just made it not so easy in the beginning but during a life time of an production app - it is possible to crash which would fail in unexpected ways.

Its easy to tell why you gravitated towards the idea of ReleaseBuffers() because we are now trying to fix something which is architecturally non communicable to the user i.e your app. Adding new APIs to patch a problem is not really easy for anyone to understand why they need to follow step1 to step5 to get it all working smoothly. It is very hard to explain these details and document from an SDK point of view.

Now think about this approach lets remove all buffer pooling, instead we just simply use make([]byte, partSize) inside each PutObject() and here is the diff.

       buf := make([]byte, partSize)
       defer debug.FreeOSMemory()

This avoids pooling and provides a much simpler way to think and reason about PutObject().

diff --git a/api-put-object-multipart.go b/api-put-object-multipart.go
index 6e0015a..0394920 100644
--- a/api-put-object-multipart.go
+++ b/api-put-object-multipart.go
@@ -24,10 +24,10 @@ import (
        "io/ioutil"
        "net/http"
        "net/url"
+       "runtime/debug"
        "sort"
        "strconv"
        "strings"
-       "sync"
 
        "github.com/minio/minio-go/pkg/s3utils"
 )
@@ -51,16 +51,6 @@ func (c Client) putObjectMultipart(bucketName, objectName string, reader io.Read
        return n, err
 }
 
-// Pool to manage re-usable memory for upload objects
-// with streams with unknown size.
-var bufPool = sync.Pool{
-       New: func() interface{} {
-               _, partSize, _, _ := optimalPartInfo(-1)
-               b := make([]byte, partSize)
-               return &b
-       },
-}
-
 func (c Client) putObjectMultipartNoStream(bucketName, objectName string, reader io.Reader, metadata map[string][]string, progress io.Reader) (n int64, err error) {
        // Input validation.
        if err = s3utils.CheckValidBucketName(bucketName); err != nil {
@@ -78,7 +68,7 @@ func (c Client) putObjectMultipartNoStream(bucketName, objectName string, reader
        var complMultipartUpload completeMultipartUpload
 
        // Calculate the optimal parts info for a given size.
-       totalPartsCount, _, _, err := optimalPartInfo(-1)
+       totalPartsCount, partSize, _, err := optimalPartInfo(-1)
        if err != nil {
                return 0, err
        }
@@ -101,38 +91,38 @@ func (c Client) putObjectMultipartNoStream(bucketName, objectName string, reader
        // Initialize parts uploaded map.
        partsInfo := make(map[int]ObjectPart)
 
+       buf := make([]byte, partSize)
+       defer debug.FreeOSMemory()
+
        for partNumber <= totalPartsCount {
                // Choose hash algorithms to be calculated by hashCopyN,
                // avoid sha256 with non-v4 signature request or
                // HTTPS connection.
                hashAlgos, hashSums := c.hashMaterials()
 
-               bufp := bufPool.Get().(*[]byte)
-               length, rErr := io.ReadFull(reader, *bufp)
+               length, rErr := io.ReadFull(reader, buf)
                if rErr == io.EOF {
                        break
                }
                if rErr != nil && rErr != io.ErrUnexpectedEOF {
-                       bufPool.Put(bufp)
                        return 0, rErr
                }
 
                // Calculates hash sums while copying partSize bytes into cw.
                for k, v := range hashAlgos {
-                       v.Write((*bufp)[:length])
+                       v.Write(buf[:length])
                        hashSums[k] = v.Sum(nil)
                }
 
                // Update progress reader appropriately to the latest offset
                // as we read from the source.
-               rd := newHook(bytes.NewReader((*bufp)[:length]), progress)
+               rd := newHook(bytes.NewReader(buf[:length]), progress)
 
                // Proceed to upload the part.
                var objPart ObjectPart
                objPart, err = c.uploadPart(bucketName, objectName, uploadID, rd, partNumber,
                        hashSums["md5"], hashSums["sha256"], int64(length), metadata)
                if err != nil {
-                       bufPool.Put(bufp)
                        return totalUploadedSize, err
                }
 
@@ -145,9 +135,6 @@ func (c Client) putObjectMultipartNoStream(bucketName, objectName string, reader
                // Increment part number.
                partNumber++
 
-               // Put back data into bufpool.
-               bufPool.Put(bufp)
-
                // For unknown size, Read EOF we break away.
                // We do not have to upload till totalPartsCount.
                if rErr == io.EOF {
diff --git a/api-put-object.go b/api-put-object.go
index f410713..b2e5c01 100644
--- a/api-put-object.go
+++ b/api-put-object.go
@@ -23,6 +23,7 @@ import (
        "os"
        "reflect"
        "runtime"
+       "runtime/debug"
        "sort"
        "strings"
 
@@ -233,7 +234,7 @@ func (c Client) putObjectMultipartStreamNoLength(bucketName, objectName string,
        var complMultipartUpload completeMultipartUpload
 
        // Calculate the optimal parts info for a given size.
-       totalPartsCount, _, _, err := optimalPartInfo(-1)
+       totalPartsCount, partSize, _, err := optimalPartInfo(-1)
        if err != nil {
                return 0, err
        }
@@ -256,27 +257,28 @@ func (c Client) putObjectMultipartStreamNoLength(bucketName, objectName string,
        // Initialize parts uploaded map.
        partsInfo := make(map[int]ObjectPart)
 
+       // Fetch a buffer.
+       buf := make([]byte, partSize)
+       defer debug.FreeOSMemory()
+
        for partNumber <= totalPartsCount {
-               bufp := bufPool.Get().(*[]byte)
-               length, rErr := io.ReadFull(reader, *bufp)
+               length, rErr := io.ReadFull(reader, buf)
                if rErr == io.EOF {
                        break
                }
                if rErr != nil && rErr != io.ErrUnexpectedEOF {
-                       bufPool.Put(bufp)
                        return 0, rErr
                }
 
                // Update progress reader appropriately to the latest offset
                // as we read from the source.
-               rd := newHook(bytes.NewReader((*bufp)[:length]), progress)
+               rd := newHook(bytes.NewReader(buf[:length]), progress)
 
                // Proceed to upload the part.
                var objPart ObjectPart
                objPart, err = c.uploadPart(bucketName, objectName, uploadID, rd, partNumber,
                        nil, nil, int64(length), metadata)
                if err != nil {
-                       bufPool.Put(bufp)
                        return totalUploadedSize, err
                }
 
@@ -289,9 +291,6 @@ func (c Client) putObjectMultipartStreamNoLength(bucketName, objectName string,
                // Increment part number.
                partNumber++
 
-               // Put back data into bufpool.
-               bufPool.Put(bufp)
-
                // For unknown size, Read EOF we break away.
                // We do not have to upload till totalPartsCount.
                if rErr == io.EOF {

@sb10
Copy link
Contributor Author

sb10 commented Aug 9, 2017

Yes, I think what you've come up with makes sense and seems like a better way of addressing the issue.

I'll try it out tomorrow and update the PR.

Copy link
Member

@harshavardhana harshavardhana left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Already reviewed, just adding a comment.. on validating the new style further.

@harshavardhana
Copy link
Member

Yes, I think what you've come up with makes sense and seems like a better way of addressing the issue.

I'll try it out tomorrow and update the PR.

Thanks @sb10 from my tests it seems like we are now using constant memory now. Please validate and let us know.

runtime/debug.FreeOSMemory() is used to ensure that after a write,
the buffer is immediately released so that subsequent operations
do not run out of memory.
@sb10
Copy link
Contributor Author

sb10 commented Aug 10, 2017

I've updated the pr with harshavardhana's code. It works well for me.

@deekoder deekoder merged commit 6871a54 into minio:master Aug 11, 2017
@harshavardhana
Copy link
Member

Thanks @sb10 for your patience 👍

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants