-
Notifications
You must be signed in to change notification settings - Fork 28.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-13921] Store serialized blocks as multiple chunks in MemoryStore #11748
Conversation
Test build #53251 has finished for PR 11748 at commit
|
|
||
class ChunkedByteBufferSuite extends SparkFunSuite { | ||
|
||
test("must have at least one chunk") { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
any reason we want to enforce this?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it marginally simplified some initialization code.
Test build #53290 has finished for PR 11748 at commit
|
It was a static method before, but its location was confusing.
@rxin, I believe that I've addressed all of your review comments (plus the outstanding TODOs that I left). |
Test build #53355 has finished for PR 11748 at commit
|
Looks good from my side. Would be good for somebody else to take a look at this too. E.g. @andrewor14 ? |
Test build #53352 has finished for PR 11748 at commit
|
Test build #53351 has finished for PR 11748 at commit
|
Test build #53363 has finished for PR 11748 at commit
|
Jenkins, retest this please. |
Test build #53442 has finished for PR 11748 at commit
|
Test build #53445 has finished for PR 11748 at commit
|
Jenkins, retest this please. |
currentChunk.position(currentChunk.position + amountToSkip) | ||
if (currentChunk.remaining() == 0) { | ||
if (chunks.hasNext) { | ||
currentChunk = chunks.next() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does this need to call dispose()?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Currently we dispose of all chunks at the same time (in the close()
call).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Any reason this behaves differently that way than read()
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh, whoops. Stupid mistake.
Test build #53452 has finished for PR 11748 at commit
|
Test build #53489 has finished for PR 11748 at commit
|
Merging to master. |
This patch modifies the BlockManager, MemoryStore, and several other storage components so that serialized cached blocks are stored as multiple small chunks rather than as a single contiguous ByteBuffer. This change will help to improve the efficiency of memory allocation and the accuracy of memory accounting when serializing blocks. Our current serialization code uses a ByteBufferOutputStream, which doubles and re-allocates its backing byte array; this increases the peak memory requirements during serialization (since we need to hold extra memory while expanding the array). In addition, we currently don't account for the extra wasted space at the end of the ByteBuffer's backing array, so a 129 megabyte serialized block may actually consume 256 megabytes of memory. After switching to storing blocks in multiple chunks, we'll be able to efficiently trim the backing buffers so that no space is wasted. This change is also a prerequisite to being able to cache blocks which are larger than 2GB (although full support for that depends on several other changes which have not bee implemented yet). Author: Josh Rosen <[email protected]> Closes apache#11748 from JoshRosen/chunked-block-serialization.
Hi @JoshRosen , I am trying spark 2.0, and I believe I am hitting a bug that was introduced in this commit. In summary, the problem is that when kryo serialization is enabled and you have an RDD with less elements than the default parallelism being serialized with kryo, spark will attempt to create an empty ChunkedByteBuffer and this code will throw "chunks must be non-empty". If you believe there is a better forum for me to discuss this, let me know. Happy to contribute pull requests if appropriate. The problem is easy to reproduce. First, open a spark shell.
Then just try to serialize a RDD with a single element (two elements or above works fine, non kryo serialization works fine):
And you get back:
I am in mac, and I used the stock preview binary from |
Amazing. Thanks a lot @JoshRosen . |
…ByteBuffer ## What changes were proposed in this pull request? It's possible to also change the callers to not pass in empty chunks, but it seems cleaner to just allow `ChunkedByteBuffer` to handle empty arrays. cc JoshRosen ## How was this patch tested? Unit tests, also checked that the original reproduction case in #11748 (comment) is resolved. Author: Eric Liang <[email protected]> Closes #14099 from ericl/spark-16432.
…ByteBuffer ## What changes were proposed in this pull request? It's possible to also change the callers to not pass in empty chunks, but it seems cleaner to just allow `ChunkedByteBuffer` to handle empty arrays. cc JoshRosen ## How was this patch tested? Unit tests, also checked that the original reproduction case in #11748 (comment) is resolved. Author: Eric Liang <[email protected]> Closes #14099 from ericl/spark-16432. (cherry picked from commit d8b06f1) Signed-off-by: Reynold Xin <[email protected]>
This patch modifies the BlockManager, MemoryStore, and several other storage components so that serialized cached blocks are stored as multiple small chunks rather than as a single contiguous ByteBuffer.
This change will help to improve the efficiency of memory allocation and the accuracy of memory accounting when serializing blocks. Our current serialization code uses a ByteBufferOutputStream, which doubles and re-allocates its backing byte array; this increases the peak memory requirements during serialization (since we need to hold extra memory while expanding the array). In addition, we currently don't account for the extra wasted space at the end of the ByteBuffer's backing array, so a 129 megabyte serialized block may actually consume 256 megabytes of memory. After switching to storing blocks in multiple chunks, we'll be able to efficiently trim the backing buffers so that no space is wasted.
This change is also a prerequisite to being able to cache blocks which are larger than 2GB (although full support for that depends on several other changes which have not bee implemented yet).