-
Notifications
You must be signed in to change notification settings - Fork 28.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-1777] Prevent OOMs from single partitions #1165
Conversation
This buffer is supported by an underlying PrimitiveVector. It is to be used for unrolling partitions, so we can efficiently check the size of the in-memory buffer periodically. Note that the underlying buffer cannot be an existing implementation of a mutable Scala or Java collection. This is because we need to be exposed to when the underlying array is resized. Otherwise, size estimation may not be accurate.
... rather than ArrayBuffer. We only ever iterate through it anyway, so there is really no reason for it to be a mutable buffer of any sort. This change is introduced so that we can eventually directly pass our SizeTrackingAppendOnlyBuffer's underlying array to BlockManager, instead of having to awkwardly make it an ArrayBuffer first.
In addition, avoid using EasyMock for one of our tests, which expects BlockManager#put to be called with an Array[Any] parameter. Even with all the EasyMock matchers, it is impossible to match an Array[Any] because of a combination of the following: (1) the fact that Arrays are not covariant, (2) EasyMock provides `aryEq` matchers for all the Java primitive types, which conflict with Any, and (3) EasyMock's super general matchers like `anyObject` or `isA` also do not match for some reason.
The idea is to always use at least a fixed amount of memory (M bytes) to unroll RDD partitions. This space is not reserved, but instead allocated dynamically by dropping existing blocks when necessary. We maintain this buffer as a global quantity shared across all cores. The way we synchronize the usage of this buffer is very similar to the way we share memory across all threads for shuffle aggregation. In particular, each thread cautiously requests for more memory periodically, and if there is not enough global memory to grant the request, the thread concedes and spills. However, this relies on the accuracy of size estimation, which is not guaranteed. Therefore, as in the shuffle case, we need an equivalent spark.storage.safetyFraction in case size estimation is slightly off. We expose M to the user as spark.storage.bufferFraction, with a default value of 0.2.
Merged build triggered. |
Merged build started. |
Merged build finished. |
Refer to this link for build results: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/15979/ |
Merged build triggered. |
Merged build started. |
Merged build finished. |
Refer to this link for build results: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/15981/ |
Merged build triggered. |
Merged build started. |
Merged build triggered. |
Merged build started. |
Merged build finished. |
Refer to this link for build results: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/15987/ |
Merged build finished. |
Refer to this link for build results: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/15988/ |
Merged build triggered. |
I have addressed your latest comments and rebased to master. Anything else? |
QA tests have started for PR 1165. This patch merges cleanly. |
QA results for PR 1165: |
Thanks Andrew! The changes look good to me -- I've merged this in. |
This adds a new ShuffleManager based on sorting, as described in https://issues.apache.org/jira/browse/SPARK-2045. The bulk of the code is in an ExternalSorter class that is similar to ExternalAppendOnlyMap, but sorts key-value pairs by partition ID and can be used to create a single sorted file with a map task's output. (Longer-term I think this can take on the remaining functionality in ExternalAppendOnlyMap and replace it so we don't have code duplication.) The main TODOs still left are: - [x] enabling ExternalSorter to merge across spilled files - [x] with an Ordering - [x] without an Ordering, using the keys' hash codes - [x] adding more tests (e.g. a version of our shuffle suite that runs on this) - [x] rebasing on top of the size-tracking refactoring in #1165 when that is merged - [x] disabling spilling if spark.shuffle.spill is set to false Despite this though, this seems to work pretty well (running successfully in cases where the hash shuffle would OOM, such as 1000 reduce tasks on executors with only 1G memory), and it seems to be comparable in speed or faster than hash-based shuffle (it will create much fewer files for the OS to keep track of). So I'm posting it to get some early feedback. After these TODOs are done, I'd also like to enable ExternalSorter to sort data within each partition by a key as well, which will allow us to use it to implement external spilling in reduce tasks in `sortByKey`. Author: Matei Zaharia <[email protected]> Closes #1499 from mateiz/sort-based-shuffle and squashes the following commits: bd841f9 [Matei Zaharia] Various review comments d1c137f [Matei Zaharia] Various review comments a611159 [Matei Zaharia] Compile fixes due to rebase 62c56c8 [Matei Zaharia] Fix ShuffledRDD sometimes not returning Tuple2s. f617432 [Matei Zaharia] Fix a failing test (seems to be due to change in SizeTracker logic) 9464d5f [Matei Zaharia] Simplify code and fix conflicts after latest rebase 0174149 [Matei Zaharia] Add cleanup behavior and cleanup tests for sort-based shuffle eb4ee0d [Matei Zaharia] Remove customizable element type in ShuffledRDD fa2e8db [Matei Zaharia] Allow nextBatchStream to be called after we're done looking at all streams a34b352 [Matei Zaharia] Fix tracking of indices within a partition in SpillReader, and add test 03e1006 [Matei Zaharia] Add a SortShuffleSuite that runs ShuffleSuite with sort-based shuffle 3c7ff1f [Matei Zaharia] Obey the spark.shuffle.spill setting in ExternalSorter ad65fbd [Matei Zaharia] Rebase on top of Aaron's Sorter change, and use Sorter in our buffer 44d2a93 [Matei Zaharia] Use estimateSize instead of atGrowThreshold to test collection sizes 5686f71 [Matei Zaharia] Optimize merging phase for in-memory only data: 5461cbb [Matei Zaharia] Review comments and more tests (e.g. tests with 1 element per partition) e9ad356 [Matei Zaharia] Update ContextCleanerSuite to make sure shuffle cleanup tests use hash shuffle (since they were written for it) c72362a [Matei Zaharia] Added bug fix and test for when iterators are empty de1fb40 [Matei Zaharia] Make trait SizeTrackingCollection private[spark] 4988d16 [Matei Zaharia] tweak c1b7572 [Matei Zaharia] Small optimization ba7db7f [Matei Zaharia] Handle null keys in hash-based comparator, and add tests for collisions ef4e397 [Matei Zaharia] Support for partial aggregation even without an Ordering 4b7a5ce [Matei Zaharia] More tests, and ability to sort data if a total ordering is given e1f84be [Matei Zaharia] Fix disk block manager test 5a40a1c [Matei Zaharia] More tests 614f1b4 [Matei Zaharia] Add spill metrics to map tasks cc52caf [Matei Zaharia] Add more error handling and tests for error cases bbf359d [Matei Zaharia] More work 3a56341 [Matei Zaharia] More partial work towards sort-based shuffle 7a0895d [Matei Zaharia] Some more partial work towards sort-based shuffle b615476 [Matei Zaharia] Scaffolding for sort-based shuffle
// we release the memory claimed by this thread later on when the task finishes. | ||
if (keepUnrolling) { | ||
val amountToRelease = currentUnrollMemoryForThisThread - previousMemoryReserved | ||
releaseUnrollMemoryForThisThread(amountToRelease) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@andrewor14
Hi Andrew, I have two questions here:
1, If we return an array, I think we should not release the memory for this thread here, instead should release the memory immediately after the block has been put into the memory. Because the memory is reserved by this thread for caching the block in memory in future, if we release it here, the released memory might be regarded as free memory by other threads. For example here is a case: there are 2 threads executing this function (unrollSafely
) in serial (for some reason not in parallel), the second thread can gets more actualFreeMemory
because the first thread has released it's memory for unrolling in future. And when the two threads try to call putArray
in parallel to cache the block in memory, they may find that there is no space for them to cache the block at the same time. So will this cause the wrong estimation?
2, if we return an iterator, why shall we release the memory later on when the task finishes? Returning iterator means we ran out of memory and would drop the block to disk when using MEMORY_AND_DISK
or just skip caching in memory when using MEMORY_ONLY
. So the memory stored in unrollMemoryMap
for this thread will no longer be used. why not release it here for other threads to use?
Maybe I have some misunderstanding of the code, can you explain it?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@liyezhang556520 Thanks for your detailed comments.
-
Your analysis is correct. If we return an array, we still hold onto the space occupied for unrolling, it's just that we are using this space to additionally store the return values. In retrospect it makes little sense to release memory here.
-
If we return an iterator, we cannot actually release the occupied memory here. Note that the iterator that we return depends on the buffer we used for unrolling, so we can't just release it. Unfortunately I don't think there is an alternative here because the original iterator passed in as an argument is already partially exhausted, so the only way to retrieve the traversed items is through the buffer we temporarily stored them in.
Could you submit a PR for (1)? This hasn't come to bite us yet, but it would be good to have it fixed.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@andrewor14
Oh I see, we can not release the memory when returning an iterator (the original iterator can not roll back or cloned).
I'm making a patch for SPARK-3000, in which I will resolve (1), and I'll make the PR soon, you can help me to review the code, thanks.
**Problem.** When caching, we currently unroll the entire RDD partition before making sure we have enough free memory. This is a common cause for OOMs especially when (1) the BlockManager has little free space left in memory, and (2) the partition is large. **Solution.** We maintain a global memory pool of `M` bytes shared across all threads, similar to the way we currently manage memory for shuffle aggregation. Then, while we unroll each partition, periodically check if there is enough space to continue. If not, drop enough RDD blocks to ensure we have at least `M` bytes to work with, then try again. If we still don't have enough space to unroll the partition, give up and drop the block to disk directly if applicable. **New configurations.** - `spark.storage.bufferFraction` - the value of `M` as a fraction of the storage memory. (default: 0.2) - `spark.storage.safetyFraction` - a margin of safety in case size estimation is slightly off. This is the equivalent of the existing `spark.shuffle.safetyFraction`. (default 0.9) For more detail, see the [design document](https://issues.apache.org/jira/secure/attachment/12651793/spark-1777-design-doc.pdf). Tests pending for performance and memory usage patterns. Author: Andrew Or <[email protected]> Closes apache#1165 from andrewor14/them-rdd-memories and squashes the following commits: e77f451 [Andrew Or] Merge branch 'master' of github.com:apache/spark into them-rdd-memories c7c8832 [Andrew Or] Simplify logic + update a few comments 269d07b [Andrew Or] Very minor changes to tests 6645a8a [Andrew Or] Merge branch 'master' of github.com:apache/spark into them-rdd-memories b7e165c [Andrew Or] Add new tests for unrolling blocks f12916d [Andrew Or] Slightly clean up tests 71672a7 [Andrew Or] Update unrollSafely tests 369ad07 [Andrew Or] Correct ensureFreeSpace and requestMemory behavior f4d035c [Andrew Or] Allow one thread to unroll multiple blocks a66fbd2 [Andrew Or] Rename a few things + update comments 68730b3 [Andrew Or] Fix weird scalatest behavior e40c60d [Andrew Or] Fix MIMA excludes ff77aa1 [Andrew Or] Fix tests 1a43c06 [Andrew Or] Merge branch 'master' of github.com:apache/spark into them-rdd-memories b9a6eee [Andrew Or] Simplify locking behavior on unrollMemoryMap ed6cda4 [Andrew Or] Formatting fix (super minor) f9ff82e [Andrew Or] putValues -> putIterator + putArray beb368f [Andrew Or] Merge branch 'master' of github.com:apache/spark into them-rdd-memories 8448c9b [Andrew Or] Fix tests a49ba4d [Andrew Or] Do not expose unroll memory check period 69bc0a5 [Andrew Or] Always synchronize on putLock before unrollMemoryMap 3f5a083 [Andrew Or] Simplify signature of ensureFreeSpace dce55c8 [Andrew Or] Merge branch 'master' of github.com:apache/spark into them-rdd-memories 8288228 [Andrew Or] Synchronize put and unroll properly 4f18a3d [Andrew Or] bufferFraction -> unrollFraction 28edfa3 [Andrew Or] Update a few comments / log messages 728323b [Andrew Or] Do not synchronize every 1000 elements 5ab2329 [Andrew Or] Merge branch 'master' of github.com:apache/spark into them-rdd-memories 129c441 [Andrew Or] Fix bug: Use toArray rather than array 9a65245 [Andrew Or] Update a few comments + minor control flow changes 57f8d85 [Andrew Or] Merge branch 'master' of github.com:apache/spark into them-rdd-memories abeae4f [Andrew Or] Add comment clarifying the MEMORY_AND_DISK case 3dd96aa [Andrew Or] AppendOnlyBuffer -> Vector (+ a few small changes) f920531 [Andrew Or] Merge branch 'master' of github.com:apache/spark into them-rdd-memories 0871835 [Andrew Or] Add an effective storage level interface to BlockManager 64e7d4c [Andrew Or] Add/modify a few comments (minor) 8af2f35 [Andrew Or] Merge branch 'master' of github.com:apache/spark into them-rdd-memories 4f4834e [Andrew Or] Use original storage level for blocks dropped to disk ecc8c2d [Andrew Or] Fix binary incompatibility 24185ea [Andrew Or] Avoid dropping a block back to disk if reading from disk 2b7ee66 [Andrew Or] Fix bug in SizeTracking* 9b9a273 [Andrew Or] Fix tests 20eb3e5 [Andrew Or] Merge branch 'master' of github.com:apache/spark into them-rdd-memories 649bdb3 [Andrew Or] Document spark.storage.bufferFraction a10b0e7 [Andrew Or] Add initial memory request threshold + rename a few things e9c3cb0 [Andrew Or] cacheMemoryMap -> unrollMemoryMap 198e374 [Andrew Or] Unfold -> unroll 0d50155 [Andrew Or] Merge branch 'master' of github.com:apache/spark into them-rdd-memories d9d02a8 [Andrew Or] Remove unused param in unfoldSafely ec728d8 [Andrew Or] Add tests for safe unfolding of blocks 22b2209 [Andrew Or] Merge branch 'master' of github.com:apache/spark into them-rdd-memories 078eb83 [Andrew Or] Add check for hasNext in PrimitiveVector.iterator 0871535 [Andrew Or] Fix tests in BlockManagerSuite d68f31e [Andrew Or] Safely unfold blocks for all memory puts 5961f50 [Andrew Or] Fix tests 195abd7 [Andrew Or] Refactor: move unfold logic to MemoryStore 1e82d00 [Andrew Or] Merge branch 'master' of github.com:apache/spark into them-rdd-memories 3ce413e [Andrew Or] Merge branch 'master' of github.com:apache/spark into them-rdd-memories d5dd3b4 [Andrew Or] Free buffer memory in finally ea02eec [Andrew Or] Fix tests b8e1d9c [Andrew Or] Merge branch 'master' of github.com:apache/spark into them-rdd-memories a8704c1 [Andrew Or] Merge branch 'master' of github.com:apache/spark into them-rdd-memories e1b8b25 [Andrew Or] Merge branch 'master' of github.com:apache/spark into them-rdd-memories 87aa75c [Andrew Or] Fix mima excludes again (typo) 11eb921 [Andrew Or] Clarify comment (minor) 50cae44 [Andrew Or] Remove now duplicate mima exclude 7de5ef9 [Andrew Or] Merge branch 'master' of github.com:apache/spark into them-rdd-memories df47265 [Andrew Or] Fix binary incompatibility 6d05a81 [Andrew Or] Merge branch 'master' of github.com:apache/spark into them-rdd-memories f94f5af [Andrew Or] Update a few comments (minor) 776aec9 [Andrew Or] Prevent OOM if a single RDD partition is too large bbd3eea [Andrew Or] Fix CacheManagerSuite to use Array 97ea499 [Andrew Or] Change BlockManager interface to use Arrays c12f093 [Andrew Or] Add SizeTrackingAppendOnlyBuffer and tests
This adds a new ShuffleManager based on sorting, as described in https://issues.apache.org/jira/browse/SPARK-2045. The bulk of the code is in an ExternalSorter class that is similar to ExternalAppendOnlyMap, but sorts key-value pairs by partition ID and can be used to create a single sorted file with a map task's output. (Longer-term I think this can take on the remaining functionality in ExternalAppendOnlyMap and replace it so we don't have code duplication.) The main TODOs still left are: - [x] enabling ExternalSorter to merge across spilled files - [x] with an Ordering - [x] without an Ordering, using the keys' hash codes - [x] adding more tests (e.g. a version of our shuffle suite that runs on this) - [x] rebasing on top of the size-tracking refactoring in apache#1165 when that is merged - [x] disabling spilling if spark.shuffle.spill is set to false Despite this though, this seems to work pretty well (running successfully in cases where the hash shuffle would OOM, such as 1000 reduce tasks on executors with only 1G memory), and it seems to be comparable in speed or faster than hash-based shuffle (it will create much fewer files for the OS to keep track of). So I'm posting it to get some early feedback. After these TODOs are done, I'd also like to enable ExternalSorter to sort data within each partition by a key as well, which will allow us to use it to implement external spilling in reduce tasks in `sortByKey`. Author: Matei Zaharia <[email protected]> Closes apache#1499 from mateiz/sort-based-shuffle and squashes the following commits: bd841f9 [Matei Zaharia] Various review comments d1c137f [Matei Zaharia] Various review comments a611159 [Matei Zaharia] Compile fixes due to rebase 62c56c8 [Matei Zaharia] Fix ShuffledRDD sometimes not returning Tuple2s. f617432 [Matei Zaharia] Fix a failing test (seems to be due to change in SizeTracker logic) 9464d5f [Matei Zaharia] Simplify code and fix conflicts after latest rebase 0174149 [Matei Zaharia] Add cleanup behavior and cleanup tests for sort-based shuffle eb4ee0d [Matei Zaharia] Remove customizable element type in ShuffledRDD fa2e8db [Matei Zaharia] Allow nextBatchStream to be called after we're done looking at all streams a34b352 [Matei Zaharia] Fix tracking of indices within a partition in SpillReader, and add test 03e1006 [Matei Zaharia] Add a SortShuffleSuite that runs ShuffleSuite with sort-based shuffle 3c7ff1f [Matei Zaharia] Obey the spark.shuffle.spill setting in ExternalSorter ad65fbd [Matei Zaharia] Rebase on top of Aaron's Sorter change, and use Sorter in our buffer 44d2a93 [Matei Zaharia] Use estimateSize instead of atGrowThreshold to test collection sizes 5686f71 [Matei Zaharia] Optimize merging phase for in-memory only data: 5461cbb [Matei Zaharia] Review comments and more tests (e.g. tests with 1 element per partition) e9ad356 [Matei Zaharia] Update ContextCleanerSuite to make sure shuffle cleanup tests use hash shuffle (since they were written for it) c72362a [Matei Zaharia] Added bug fix and test for when iterators are empty de1fb40 [Matei Zaharia] Make trait SizeTrackingCollection private[spark] 4988d16 [Matei Zaharia] tweak c1b7572 [Matei Zaharia] Small optimization ba7db7f [Matei Zaharia] Handle null keys in hash-based comparator, and add tests for collisions ef4e397 [Matei Zaharia] Support for partial aggregation even without an Ordering 4b7a5ce [Matei Zaharia] More tests, and ability to sort data if a total ordering is given e1f84be [Matei Zaharia] Fix disk block manager test 5a40a1c [Matei Zaharia] More tests 614f1b4 [Matei Zaharia] Add spill metrics to map tasks cc52caf [Matei Zaharia] Add more error handling and tests for error cases bbf359d [Matei Zaharia] More work 3a56341 [Matei Zaharia] More partial work towards sort-based shuffle 7a0895d [Matei Zaharia] Some more partial work towards sort-based shuffle b615476 [Matei Zaharia] Scaffolding for sort-based shuffle
} finally { | ||
// If we return an array, the values returned do not depend on the underlying vector and | ||
// we can immediately free up space for other threads. Otherwise, if we return an iterator, | ||
// we release the memory claimed by this thread later on when the task finishes. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I understand we need release the memory later when the task finishes, but where? I checked CacheManager#putInBlockManager
but didn't find it. Sorry if I missed something.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is done in Executor.scala
(see TaskRunner
)
Hi @andrewor14 , I think about your fix and focus on the timing to release unrollMemory. If we unroll a partition successfully, currently we release unrollMemory immediately(this may be dangerous as we still hold the unrollMemory) and return the vector. If we unroll a part of partition, we return the iterator of vector and the rest partition's iterator. Then we release unrollMemory after the whole task is finished. I think this is a little late as we can release unrollMemory once the vector has been iterated over. Maybe we can create a special iterator for the vector that automatically release the unrollMemory when reach the end. An example:
We can also create this iterator for vector if unroll successfully in |
* Return a trimmed version of the underlying array. | ||
*/ | ||
def toArray: Array[T] = { | ||
super.iterator.toArray |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why not use super.trim.array
? I think array copy is more efficient when there are a lot of elements.
Problem. When caching, we currently unroll the entire RDD partition before making sure we have enough free memory. This is a common cause for OOMs especially when (1) the BlockManager has little free space left in memory, and (2) the partition is large.
Solution. We maintain a global memory pool of
M
bytes shared across all threads, similar to the way we currently manage memory for shuffle aggregation. Then, while we unroll each partition, periodically check if there is enough space to continue. If not, drop enough RDD blocks to ensure we have at leastM
bytes to work with, then try again. If we still don't have enough space to unroll the partition, give up and drop the block to disk directly if applicable.New configurations.
spark.storage.unrollFraction
- the value ofM
as a fraction of the storage memory. (default: 0.2)spark.storage.safetyFraction
- a margin of safety in case size estimation is slightly off. This is the equivalent of the existingspark.shuffle.safetyFraction
. (default 0.9)For more detail, see the design document. Tests pending for performance and memory usage patterns.