-
Notifications
You must be signed in to change notification settings - Fork 657
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Fix #730 by using a guaranteed reuse buffer pool. #781
Conversation
Thanks for the PR @sb10 we don't need to use an external library - the problem was perhaps calling Get() inside a loop which resulted in multiple buffers even for a single PutObject() operation. Even after a success Put(bufp) Get() doesn't reuse in a loop - our usage of the sync.Pool was wrong or lack of awareness. The right way is to re-use the buffer appropriately by calling it only once per PutObject() call. diff --git a/api-put-object-multipart.go b/api-put-object-multipart.go
index 6e0015a..acc8595 100644
--- a/api-put-object-multipart.go
+++ b/api-put-object-multipart.go
@@ -101,19 +101,20 @@ func (c Client) putObjectMultipartNoStream(bucketName, objectName string, reader
// Initialize parts uploaded map.
partsInfo := make(map[int]ObjectPart)
+ bufp := bufPool.Get().(*[]byte)
+ defer bufPool.Put(bufp)
+
for partNumber <= totalPartsCount {
// Choose hash algorithms to be calculated by hashCopyN,
// avoid sha256 with non-v4 signature request or
// HTTPS connection.
hashAlgos, hashSums := c.hashMaterials()
- bufp := bufPool.Get().(*[]byte)
length, rErr := io.ReadFull(reader, *bufp)
if rErr == io.EOF {
break
}
if rErr != nil && rErr != io.ErrUnexpectedEOF {
- bufPool.Put(bufp)
return 0, rErr
}
@@ -132,7 +133,6 @@ func (c Client) putObjectMultipartNoStream(bucketName, objectName string, reader
objPart, err = c.uploadPart(bucketName, objectName, uploadID, rd, partNumber,
hashSums["md5"], hashSums["sha256"], int64(length), metadata)
if err != nil {
- bufPool.Put(bufp)
return totalUploadedSize, err
}
@@ -145,9 +145,6 @@ func (c Client) putObjectMultipartNoStream(bucketName, objectName string, reader
// Increment part number.
partNumber++
- // Put back data into bufpool.
- bufPool.Put(bufp)
-
// For unknown size, Read EOF we break away.
// We do not have to upload till totalPartsCount.
if rErr == io.EOF {
diff --git a/api-put-object.go b/api-put-object.go
index f410713..95bbdcc 100644
--- a/api-put-object.go
+++ b/api-put-object.go
@@ -256,14 +256,18 @@ func (c Client) putObjectMultipartStreamNoLength(bucketName, objectName string,
// Initialize parts uploaded map.
partsInfo := make(map[int]ObjectPart)
+ // Fetch a new buffer.
+ bufp := bufPool.Get().(*[]byte)
+
+ // Put back data into bufpool.
+ defer bufPool.Put(bufp)
+
for partNumber <= totalPartsCount {
- bufp := bufPool.Get().(*[]byte)
length, rErr := io.ReadFull(reader, *bufp)
if rErr == io.EOF {
break
}
if rErr != nil && rErr != io.ErrUnexpectedEOF {
- bufPool.Put(bufp)
return 0, rErr
}
@@ -276,7 +280,6 @@ func (c Client) putObjectMultipartStreamNoLength(bucketName, objectName string,
objPart, err = c.uploadPart(bucketName, objectName, uploadID, rd, partNumber,
nil, nil, int64(length), metadata)
if err != nil {
- bufPool.Put(bufp)
return totalUploadedSize, err
}
@@ -289,9 +292,6 @@ func (c Client) putObjectMultipartStreamNoLength(bucketName, objectName string,
// Increment part number.
partNumber++
- // Put back data into bufpool.
- bufPool.Put(bufp)
-
// For unknown size, Read EOF we break away.
// We do not have to upload till totalPartsCount.
if rErr == io.EOF { This should address your bug @sb10 can you re-work this PR and test it on your end? |
Sorry, that doesn't work. Again, from the docs for sync.Pool:
If I alter the test added in this pull request to get 3 or more files sequentially with your above version, it still fails with If you're worried about adding another dependency, the code from github.com/oxtoacart/bpool that I'm using is only 28 lines and apache 2 licensed, so could be copied directly in to api-put-object-multipart.go if you like. |
07089a4
to
b86d94e
Compare
I see again why because PutObject() if called through your app by nature is again being called in a for loop for each PutObject operations which ultimately results in the same scenario. This is quite unexpected from package minio
import "sync"
// BytePool - temporary pool of byte slice.
type BytePool struct {
sync.Mutex
buf []byte // array of byte slices
size int64 // size of buf
}
// Get - Returns a buffer of size.
func (b *BytePool) Get() (buf []byte) {
b.Lock()
defer b.Unlock()
if b.buf == nil {
b.buf = make([]byte, b.size)
}
return b.buf
}
// NewBytePool - Returns a new byte pool.
// size - length of the pool'ed slice.
func NewBytePool(size int64) *BytePool {
return &BytePool{size: size}
} |
That's not a pool? The only thing it does is guarantee that Get() always returns the same byte slice, which would be very bad in a concurrent situation. The minio bpool implementation is comparatively inefficient and errors out once you try to create too many buffers. The oxtoacart/bpool implementation is simple, works concurrently and has the desired behaviour:
Now as a user I have a very simple and natural way of not running out of memory: don't upload too many files simultaneously. |
Correct, it is not a pool yet. I wanted to see if it does help as a first step, if it does then i can implement a new version which does concurrent behavior as well without using channels.
It is expected to error out since it is unexpected that you wish to create more buffer than requested amount on the server which in turn leads to OOM. The reason is that Get() should never really be allowed to do
We used it before when we wished to implement server side buffer pooling - bpool.Get() and bpool.Put() has to be synchronized i.e there should be a one to one relation otherwise there is no way bpool.Get() limits you from running out of memory. I would even perhaps block the caller if a bpool.Get() is not synchronized with another bpool.Put() - this way run-away concurrency can be controlled and also mistakes can be avoided inside the code. So in a sense that you can possibly only upload 5 PutObject in parallel on a 4GiB VM or machine. |
Not quite sure how to take your last comments. Yes, you have to use it correctly. This pull request now defers the Put() after the Get(), so there won't be an issue. Are you proposing to alter the oxtoacart/bpool implementation so that instead of "You can always get a buffer" it becomes "You can only get a new buffer if we haven't reached the limit, otherwise the Get() call blocks"? But then you have to worry about a "reasonable" default limit and letting the user override it. I think the existing behaviour is better. The hard-coded limit prevents wasting too much memory on the pool, and the user decides exactly how many simultaneous uploads they would like to do, based on their available memory. Why do you wish to avoid using channels? They seem ideal here. |
It works essentially in some cases but not in high concurrent environments which is what we have seen in the past. If the basic idea behind using sort of a byte pool is to save ourselves when an app misbehaves doesn't seem to be possible without some form of throttling. Lets take an example here if you set the pool size for example as 4GiB as 5 it doesn't really mean that you are essentially limited to a pool of 5 . Because if you have an application lets say uploads 10 objects in parallel then you have essentially overflowed your pool by 5 additional entries which are in-fact not holding any pool entry themselves but they are generally going to be lost and Go should garbage collect them. This is what i meant by run-away affect it doesn't matter if you use it properly inside the PutObject() implementation you can potentially end up in a situation we are in right now. Another reason is also because user himself is not in control of the overall memory usage so an SDK consumer has to peek into our implementation to understand how many go-routines he/she can have and which would safer to use in certain environments.
My suggestion is to block such PutObject() operations than requesting more memory from the system, this way we will be safe and there are no crashes. Yes we do need to provide a way to tune these values so that user can choose based on the amount of go-routines he/she is going to use. If a user is able to decide based on how much of amount of memory is available - which is harder to obtain in a cross platform safe way. I would think that user will know the object size before hand and not even get to this situation in the first place.
This is perhaps my bad, channels are fine. |
This is just quite easy in an application like yours since you do not know the object size even if the file size is 1KiB lets say and you are expecting 100 routines to upload different files. We are going to crash anyways so we are not safe here in the first place. Let me think about this further, going to sleep now and perhaps provide more comprehensive solution to this situation. |
Let's clarify this down. The aim to be achieved here is that minio-go should be able to upload files in constant memory.
If a user of minio-go finds that they run out of memory because they did too many simultaneous uploads, all they have to do is do fewer simultaneous uploads. The user doesn't have to tune any hidden parameter in minio-go. The user shouldn't, however, run out of memory doing single sequential uploads, because then there's nothing they can do to avoid it. minio-go currently suffers from this problem. This pull request solves the problem. Again, it is OK for the pool to allow unlimited parallel buffers. That's the user's problem to sort out if they run out of memory. The user can't solve running out of memory when they only upload single files. |
On running further tests on my own software, I find yet another memory issue caused by this pull request. One of the "features" of sync.Pool is that any unreferenced sync.Pool objects are removed at each garbage collection cycle. That meant I could force garbage collection to clean up unneeded upload buffers. Using oxtoacart/bpool means a permanent pool of buffers, and I now run out of memory when trying to do something unrelated following an upload. It's not so nice emptying out the pool with a buffered channel implementation, but possibly still the cleanest way of doing things. However I almost certainly need to add a new public function to minio-go to trigger the emptying of the pool... |
820a809
to
ba7fee5
Compare
I've now changed the pull request so that instead of using oxtoacart/bpool, an implementation based on that is included directly in api-put-object-multipart.go (you may prefer it in its own file: let me know) with the modification of having an Empty() method. I also introduce a new function to the API called ReleaseBuffers() (you may prefer a different name: let me know) so that users can empty the pool and reclaim memory at will. By using ReleaseBuffers() my own software can now pass all its tests without running out of memory. |
If this is okay then i would suggest we don't have a pool but instead allocate a buffer once per PutObject() since in the end without having any control we are essentially going to reach the same stage.
Correct
This situation no one is aware the problem here is it is coupled with situations where objects are smaller in reality but user such as your application has decided to not provide content-length - then a buffer is mandated due the underlying requirements. From an app point of view user can argue that why would he/she needs to limit themselves to a lower limit to something which they don't have control over and to the data they are trying to upload which fits well within the current memory limits.
This is okay it exposes an architectural problem which was not addressed previously, it was wrongly assumed that
This is not acceptable because they are in no control over how a buffer is allocated and used. This is the same problem what exists today using Its easy to tell why you gravitated towards the idea of ReleaseBuffers() because we are now trying to fix something which is architecturally non communicable to the user i.e your app. Adding new APIs to patch a problem is not really easy for anyone to understand why they need to follow step1 to step5 to get it all working smoothly. It is very hard to explain these details and document from an SDK point of view. Now think about this approach lets remove all buffer pooling, instead we just simply use buf := make([]byte, partSize)
defer debug.FreeOSMemory() This avoids pooling and provides a much simpler way to think and reason about PutObject(). diff --git a/api-put-object-multipart.go b/api-put-object-multipart.go
index 6e0015a..0394920 100644
--- a/api-put-object-multipart.go
+++ b/api-put-object-multipart.go
@@ -24,10 +24,10 @@ import (
"io/ioutil"
"net/http"
"net/url"
+ "runtime/debug"
"sort"
"strconv"
"strings"
- "sync"
"github.com/minio/minio-go/pkg/s3utils"
)
@@ -51,16 +51,6 @@ func (c Client) putObjectMultipart(bucketName, objectName string, reader io.Read
return n, err
}
-// Pool to manage re-usable memory for upload objects
-// with streams with unknown size.
-var bufPool = sync.Pool{
- New: func() interface{} {
- _, partSize, _, _ := optimalPartInfo(-1)
- b := make([]byte, partSize)
- return &b
- },
-}
-
func (c Client) putObjectMultipartNoStream(bucketName, objectName string, reader io.Reader, metadata map[string][]string, progress io.Reader) (n int64, err error) {
// Input validation.
if err = s3utils.CheckValidBucketName(bucketName); err != nil {
@@ -78,7 +68,7 @@ func (c Client) putObjectMultipartNoStream(bucketName, objectName string, reader
var complMultipartUpload completeMultipartUpload
// Calculate the optimal parts info for a given size.
- totalPartsCount, _, _, err := optimalPartInfo(-1)
+ totalPartsCount, partSize, _, err := optimalPartInfo(-1)
if err != nil {
return 0, err
}
@@ -101,38 +91,38 @@ func (c Client) putObjectMultipartNoStream(bucketName, objectName string, reader
// Initialize parts uploaded map.
partsInfo := make(map[int]ObjectPart)
+ buf := make([]byte, partSize)
+ defer debug.FreeOSMemory()
+
for partNumber <= totalPartsCount {
// Choose hash algorithms to be calculated by hashCopyN,
// avoid sha256 with non-v4 signature request or
// HTTPS connection.
hashAlgos, hashSums := c.hashMaterials()
- bufp := bufPool.Get().(*[]byte)
- length, rErr := io.ReadFull(reader, *bufp)
+ length, rErr := io.ReadFull(reader, buf)
if rErr == io.EOF {
break
}
if rErr != nil && rErr != io.ErrUnexpectedEOF {
- bufPool.Put(bufp)
return 0, rErr
}
// Calculates hash sums while copying partSize bytes into cw.
for k, v := range hashAlgos {
- v.Write((*bufp)[:length])
+ v.Write(buf[:length])
hashSums[k] = v.Sum(nil)
}
// Update progress reader appropriately to the latest offset
// as we read from the source.
- rd := newHook(bytes.NewReader((*bufp)[:length]), progress)
+ rd := newHook(bytes.NewReader(buf[:length]), progress)
// Proceed to upload the part.
var objPart ObjectPart
objPart, err = c.uploadPart(bucketName, objectName, uploadID, rd, partNumber,
hashSums["md5"], hashSums["sha256"], int64(length), metadata)
if err != nil {
- bufPool.Put(bufp)
return totalUploadedSize, err
}
@@ -145,9 +135,6 @@ func (c Client) putObjectMultipartNoStream(bucketName, objectName string, reader
// Increment part number.
partNumber++
- // Put back data into bufpool.
- bufPool.Put(bufp)
-
// For unknown size, Read EOF we break away.
// We do not have to upload till totalPartsCount.
if rErr == io.EOF {
diff --git a/api-put-object.go b/api-put-object.go
index f410713..b2e5c01 100644
--- a/api-put-object.go
+++ b/api-put-object.go
@@ -23,6 +23,7 @@ import (
"os"
"reflect"
"runtime"
+ "runtime/debug"
"sort"
"strings"
@@ -233,7 +234,7 @@ func (c Client) putObjectMultipartStreamNoLength(bucketName, objectName string,
var complMultipartUpload completeMultipartUpload
// Calculate the optimal parts info for a given size.
- totalPartsCount, _, _, err := optimalPartInfo(-1)
+ totalPartsCount, partSize, _, err := optimalPartInfo(-1)
if err != nil {
return 0, err
}
@@ -256,27 +257,28 @@ func (c Client) putObjectMultipartStreamNoLength(bucketName, objectName string,
// Initialize parts uploaded map.
partsInfo := make(map[int]ObjectPart)
+ // Fetch a buffer.
+ buf := make([]byte, partSize)
+ defer debug.FreeOSMemory()
+
for partNumber <= totalPartsCount {
- bufp := bufPool.Get().(*[]byte)
- length, rErr := io.ReadFull(reader, *bufp)
+ length, rErr := io.ReadFull(reader, buf)
if rErr == io.EOF {
break
}
if rErr != nil && rErr != io.ErrUnexpectedEOF {
- bufPool.Put(bufp)
return 0, rErr
}
// Update progress reader appropriately to the latest offset
// as we read from the source.
- rd := newHook(bytes.NewReader((*bufp)[:length]), progress)
+ rd := newHook(bytes.NewReader(buf[:length]), progress)
// Proceed to upload the part.
var objPart ObjectPart
objPart, err = c.uploadPart(bucketName, objectName, uploadID, rd, partNumber,
nil, nil, int64(length), metadata)
if err != nil {
- bufPool.Put(bufp)
return totalUploadedSize, err
}
@@ -289,9 +291,6 @@ func (c Client) putObjectMultipartStreamNoLength(bucketName, objectName string,
// Increment part number.
partNumber++
- // Put back data into bufpool.
- bufPool.Put(bufp)
-
// For unknown size, Read EOF we break away.
// We do not have to upload till totalPartsCount.
if rErr == io.EOF { |
Yes, I think what you've come up with makes sense and seems like a better way of addressing the issue. I'll try it out tomorrow and update the PR. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Already reviewed, just adding a comment.. on validating the new style further.
Thanks @sb10 from my tests it seems like we are now using constant memory now. Please validate and let us know. |
runtime/debug.FreeOSMemory() is used to ensure that after a write, the buffer is immediately released so that subsequent operations do not run out of memory.
I've updated the pr with harshavardhana's code. It works well for me. |
Thanks @sb10 for your patience 👍 |
Always return buffers to the pool.
Current code fails to call
bufPool.Put(bufp)
for everybufPool.Get()
, which is fixed here. But even with this fix, the code in #730 still results insignal: killed
.Reimplementing the buffer pool with github.com/oxtoacart/bpool instead of sync.Pool solves the problem. Now we always reuse buffers if possible.