Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-1777] Prevent OOMs from single partitions #1165

Closed
wants to merge 74 commits into from

Conversation

andrewor14
Copy link
Contributor

Problem. When caching, we currently unroll the entire RDD partition before making sure we have enough free memory. This is a common cause for OOMs especially when (1) the BlockManager has little free space left in memory, and (2) the partition is large.

Solution. We maintain a global memory pool of M bytes shared across all threads, similar to the way we currently manage memory for shuffle aggregation. Then, while we unroll each partition, periodically check if there is enough space to continue. If not, drop enough RDD blocks to ensure we have at least M bytes to work with, then try again. If we still don't have enough space to unroll the partition, give up and drop the block to disk directly if applicable.

New configurations.

  • spark.storage.unrollFraction - the value of M as a fraction of the storage memory. (default: 0.2)
  • spark.storage.safetyFraction - a margin of safety in case size estimation is slightly off. This is the equivalent of the existing spark.shuffle.safetyFraction. (default 0.9)

For more detail, see the design document. Tests pending for performance and memory usage patterns.

This buffer is supported by an underlying PrimitiveVector. It is to be
used for unrolling partitions, so we can efficiently check the size of
the in-memory buffer periodically.

Note that the underlying buffer cannot be an existing implementation of
a mutable Scala or Java collection. This is because we need to be exposed
to when the underlying array is resized. Otherwise, size estimation may
not be accurate.
... rather than ArrayBuffer. We only ever iterate through it anyway,
so there is really no reason for it to be a mutable buffer of any sort.
This change is introduced so that we can eventually directly pass our
SizeTrackingAppendOnlyBuffer's underlying array to BlockManager, instead
of having to awkwardly make it an ArrayBuffer first.
In addition, avoid using EasyMock for one of our tests, which expects
BlockManager#put to be called with an Array[Any] parameter. Even with
all the EasyMock matchers, it is impossible to match an Array[Any]
because of a combination of the following: (1) the fact that Arrays
are not covariant, (2) EasyMock provides `aryEq` matchers for all the
Java primitive types, which conflict with Any, and (3) EasyMock's super
general matchers like `anyObject` or `isA` also do not match for some
reason.
The idea is to always use at least a fixed amount of memory (M bytes)
to unroll RDD partitions. This space is not reserved, but instead
allocated dynamically by dropping existing blocks when necessary.

We maintain this buffer as a global quantity shared across all cores.
The way we synchronize the usage of this buffer is very similar to the
way we share memory across all threads for shuffle aggregation. In
particular, each thread cautiously requests for more memory
periodically,
and if there is not enough global memory to grant the request, the
thread concedes and spills. However, this relies on the accuracy of
size estimation, which is not guaranteed. Therefore, as in the shuffle
case, we need an equivalent spark.storage.safetyFraction in case size
estimation is slightly off.

We expose M to the user as spark.storage.bufferFraction, with a default
value of 0.2.
@AmplabJenkins
Copy link

Merged build triggered.

@AmplabJenkins
Copy link

Merged build started.

@AmplabJenkins
Copy link

Merged build finished.

@AmplabJenkins
Copy link

Refer to this link for build results: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/15979/

@AmplabJenkins
Copy link

Merged build triggered.

@AmplabJenkins
Copy link

Merged build started.

@AmplabJenkins
Copy link

Merged build finished.

@AmplabJenkins
Copy link

Refer to this link for build results: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/15981/

@andrewor14 andrewor14 changed the title [SPARK-1777] Prevent OOMs from single partitions [WIP][SPARK-1777] Prevent OOMs from single partitions Jun 21, 2014
@andrewor14 andrewor14 changed the title [WIP][SPARK-1777] Prevent OOMs from single partitions [SPARK-1777][WIP] Prevent OOMs from single partitions Jun 21, 2014
@AmplabJenkins
Copy link

Merged build triggered.

@AmplabJenkins
Copy link

Merged build started.

@AmplabJenkins
Copy link

Merged build triggered.

@AmplabJenkins
Copy link

Merged build started.

@AmplabJenkins
Copy link

Merged build finished.

@AmplabJenkins
Copy link

Refer to this link for build results: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/15987/

@AmplabJenkins
Copy link

Merged build finished.

@AmplabJenkins
Copy link

Refer to this link for build results: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/15988/

@AmplabJenkins
Copy link

Merged build triggered.

@andrewor14
Copy link
Contributor Author

I have addressed your latest comments and rebased to master. Anything else?

@SparkQA
Copy link

SparkQA commented Jul 27, 2014

QA tests have started for PR 1165. This patch merges cleanly.
View progress: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/17245/consoleFull

@SparkQA
Copy link

SparkQA commented Jul 27, 2014

QA results for PR 1165:
- This patch PASSES unit tests.
- This patch merges cleanly
- This patch adds the following public classes (experimental):
case class Sample(size: Long, numUpdates: Long)

For more information see test ouptut:
https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/17245/consoleFull

@mateiz
Copy link
Contributor

mateiz commented Jul 27, 2014

Thanks Andrew! The changes look good to me -- I've merged this in.

@asfgit asfgit closed this in ecf30ee Jul 27, 2014
@andrewor14
Copy link
Contributor Author

Great. Thanks for reviewing @pwendell @mateiz @mridulm.

@andrewor14 andrewor14 deleted the them-rdd-memories branch July 28, 2014 17:30
asfgit pushed a commit that referenced this pull request Jul 31, 2014
This adds a new ShuffleManager based on sorting, as described in https://issues.apache.org/jira/browse/SPARK-2045. The bulk of the code is in an ExternalSorter class that is similar to ExternalAppendOnlyMap, but sorts key-value pairs by partition ID and can be used to create a single sorted file with a map task's output. (Longer-term I think this can take on the remaining functionality in ExternalAppendOnlyMap and replace it so we don't have code duplication.)

The main TODOs still left are:
- [x] enabling ExternalSorter to merge across spilled files
  - [x] with an Ordering
  - [x] without an Ordering, using the keys' hash codes
- [x] adding more tests (e.g. a version of our shuffle suite that runs on this)
- [x] rebasing on top of the size-tracking refactoring in #1165 when that is merged
- [x] disabling spilling if spark.shuffle.spill is set to false

Despite this though, this seems to work pretty well (running successfully in cases where the hash shuffle would OOM, such as 1000 reduce tasks on executors with only 1G memory), and it seems to be comparable in speed or faster than hash-based shuffle (it will create much fewer files for the OS to keep track of). So I'm posting it to get some early feedback.

After these TODOs are done, I'd also like to enable ExternalSorter to sort data within each partition by a key as well, which will allow us to use it to implement external spilling in reduce tasks in `sortByKey`.

Author: Matei Zaharia <[email protected]>

Closes #1499 from mateiz/sort-based-shuffle and squashes the following commits:

bd841f9 [Matei Zaharia] Various review comments
d1c137f [Matei Zaharia] Various review comments
a611159 [Matei Zaharia] Compile fixes due to rebase
62c56c8 [Matei Zaharia] Fix ShuffledRDD sometimes not returning Tuple2s.
f617432 [Matei Zaharia] Fix a failing test (seems to be due to change in SizeTracker logic)
9464d5f [Matei Zaharia] Simplify code and fix conflicts after latest rebase
0174149 [Matei Zaharia] Add cleanup behavior and cleanup tests for sort-based shuffle
eb4ee0d [Matei Zaharia] Remove customizable element type in ShuffledRDD
fa2e8db [Matei Zaharia] Allow nextBatchStream to be called after we're done looking at all streams
a34b352 [Matei Zaharia] Fix tracking of indices within a partition in SpillReader, and add test
03e1006 [Matei Zaharia] Add a SortShuffleSuite that runs ShuffleSuite with sort-based shuffle
3c7ff1f [Matei Zaharia] Obey the spark.shuffle.spill setting in ExternalSorter
ad65fbd [Matei Zaharia] Rebase on top of Aaron's Sorter change, and use Sorter in our buffer
44d2a93 [Matei Zaharia] Use estimateSize instead of atGrowThreshold to test collection sizes
5686f71 [Matei Zaharia] Optimize merging phase for in-memory only data:
5461cbb [Matei Zaharia] Review comments and more tests (e.g. tests with 1 element per partition)
e9ad356 [Matei Zaharia] Update ContextCleanerSuite to make sure shuffle cleanup tests use hash shuffle (since they were written for it)
c72362a [Matei Zaharia] Added bug fix and test for when iterators are empty
de1fb40 [Matei Zaharia] Make trait SizeTrackingCollection private[spark]
4988d16 [Matei Zaharia] tweak
c1b7572 [Matei Zaharia] Small optimization
ba7db7f [Matei Zaharia] Handle null keys in hash-based comparator, and add tests for collisions
ef4e397 [Matei Zaharia] Support for partial aggregation even without an Ordering
4b7a5ce [Matei Zaharia] More tests, and ability to sort data if a total ordering is given
e1f84be [Matei Zaharia] Fix disk block manager test
5a40a1c [Matei Zaharia] More tests
614f1b4 [Matei Zaharia] Add spill metrics to map tasks
cc52caf [Matei Zaharia] Add more error handling and tests for error cases
bbf359d [Matei Zaharia] More work
3a56341 [Matei Zaharia] More partial work towards sort-based shuffle
7a0895d [Matei Zaharia] Some more partial work towards sort-based shuffle
b615476 [Matei Zaharia] Scaffolding for sort-based shuffle
// we release the memory claimed by this thread later on when the task finishes.
if (keepUnrolling) {
val amountToRelease = currentUnrollMemoryForThisThread - previousMemoryReserved
releaseUnrollMemoryForThisThread(amountToRelease)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@andrewor14
Hi Andrew, I have two questions here:
1, If we return an array, I think we should not release the memory for this thread here, instead should release the memory immediately after the block has been put into the memory. Because the memory is reserved by this thread for caching the block in memory in future, if we release it here, the released memory might be regarded as free memory by other threads. For example here is a case: there are 2 threads executing this function (unrollSafely) in serial (for some reason not in parallel), the second thread can gets more actualFreeMemory because the first thread has released it's memory for unrolling in future. And when the two threads try to call putArray in parallel to cache the block in memory, they may find that there is no space for them to cache the block at the same time. So will this cause the wrong estimation?

2, if we return an iterator, why shall we release the memory later on when the task finishes? Returning iterator means we ran out of memory and would drop the block to disk when using MEMORY_AND_DISK or just skip caching in memory when using MEMORY_ONLY. So the memory stored in unrollMemoryMap for this thread will no longer be used. why not release it here for other threads to use?

Maybe I have some misunderstanding of the code, can you explain it?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@liyezhang556520 Thanks for your detailed comments.

  1. Your analysis is correct. If we return an array, we still hold onto the space occupied for unrolling, it's just that we are using this space to additionally store the return values. In retrospect it makes little sense to release memory here.

  2. If we return an iterator, we cannot actually release the occupied memory here. Note that the iterator that we return depends on the buffer we used for unrolling, so we can't just release it. Unfortunately I don't think there is an alternative here because the original iterator passed in as an argument is already partially exhausted, so the only way to retrieve the traversed items is through the buffer we temporarily stored them in.

Could you submit a PR for (1)? This hasn't come to bite us yet, but it would be good to have it fixed.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@andrewor14
Oh I see, we can not release the memory when returning an iterator (the original iterator can not roll back or cloned).

I'm making a patch for SPARK-3000, in which I will resolve (1), and I'll make the PR soon, you can help me to review the code, thanks.

xiliu82 pushed a commit to xiliu82/spark that referenced this pull request Sep 4, 2014
**Problem.** When caching, we currently unroll the entire RDD partition before making sure we have enough free memory. This is a common cause for OOMs especially when (1) the BlockManager has little free space left in memory, and (2) the partition is large.

**Solution.** We maintain a global memory pool of `M` bytes shared across all threads, similar to the way we currently manage memory for shuffle aggregation. Then, while we unroll each partition, periodically check if there is enough space to continue. If not, drop enough RDD blocks to ensure we have at least `M` bytes to work with, then try again. If we still don't have enough space to unroll the partition, give up and drop the block to disk directly if applicable.

**New configurations.**
- `spark.storage.bufferFraction` - the value of `M` as a fraction of the storage memory. (default: 0.2)
- `spark.storage.safetyFraction` - a margin of safety in case size estimation is slightly off. This is the equivalent of the existing `spark.shuffle.safetyFraction`. (default 0.9)

For more detail, see the [design document](https://issues.apache.org/jira/secure/attachment/12651793/spark-1777-design-doc.pdf). Tests pending for performance and memory usage patterns.

Author: Andrew Or <[email protected]>

Closes apache#1165 from andrewor14/them-rdd-memories and squashes the following commits:

e77f451 [Andrew Or] Merge branch 'master' of github.com:apache/spark into them-rdd-memories
c7c8832 [Andrew Or] Simplify logic + update a few comments
269d07b [Andrew Or] Very minor changes to tests
6645a8a [Andrew Or] Merge branch 'master' of github.com:apache/spark into them-rdd-memories
b7e165c [Andrew Or] Add new tests for unrolling blocks
f12916d [Andrew Or] Slightly clean up tests
71672a7 [Andrew Or] Update unrollSafely tests
369ad07 [Andrew Or] Correct ensureFreeSpace and requestMemory behavior
f4d035c [Andrew Or] Allow one thread to unroll multiple blocks
a66fbd2 [Andrew Or] Rename a few things + update comments
68730b3 [Andrew Or] Fix weird scalatest behavior
e40c60d [Andrew Or] Fix MIMA excludes
ff77aa1 [Andrew Or] Fix tests
1a43c06 [Andrew Or] Merge branch 'master' of github.com:apache/spark into them-rdd-memories
b9a6eee [Andrew Or] Simplify locking behavior on unrollMemoryMap
ed6cda4 [Andrew Or] Formatting fix (super minor)
f9ff82e [Andrew Or] putValues -> putIterator + putArray
beb368f [Andrew Or] Merge branch 'master' of github.com:apache/spark into them-rdd-memories
8448c9b [Andrew Or] Fix tests
a49ba4d [Andrew Or] Do not expose unroll memory check period
69bc0a5 [Andrew Or] Always synchronize on putLock before unrollMemoryMap
3f5a083 [Andrew Or] Simplify signature of ensureFreeSpace
dce55c8 [Andrew Or] Merge branch 'master' of github.com:apache/spark into them-rdd-memories
8288228 [Andrew Or] Synchronize put and unroll properly
4f18a3d [Andrew Or] bufferFraction -> unrollFraction
28edfa3 [Andrew Or] Update a few comments / log messages
728323b [Andrew Or] Do not synchronize every 1000 elements
5ab2329 [Andrew Or] Merge branch 'master' of github.com:apache/spark into them-rdd-memories
129c441 [Andrew Or] Fix bug: Use toArray rather than array
9a65245 [Andrew Or] Update a few comments + minor control flow changes
57f8d85 [Andrew Or] Merge branch 'master' of github.com:apache/spark into them-rdd-memories
abeae4f [Andrew Or] Add comment clarifying the MEMORY_AND_DISK case
3dd96aa [Andrew Or] AppendOnlyBuffer -> Vector (+ a few small changes)
f920531 [Andrew Or] Merge branch 'master' of github.com:apache/spark into them-rdd-memories
0871835 [Andrew Or] Add an effective storage level interface to BlockManager
64e7d4c [Andrew Or] Add/modify a few comments (minor)
8af2f35 [Andrew Or] Merge branch 'master' of github.com:apache/spark into them-rdd-memories
4f4834e [Andrew Or] Use original storage level for blocks dropped to disk
ecc8c2d [Andrew Or] Fix binary incompatibility
24185ea [Andrew Or] Avoid dropping a block back to disk if reading from disk
2b7ee66 [Andrew Or] Fix bug in SizeTracking*
9b9a273 [Andrew Or] Fix tests
20eb3e5 [Andrew Or] Merge branch 'master' of github.com:apache/spark into them-rdd-memories
649bdb3 [Andrew Or] Document spark.storage.bufferFraction
a10b0e7 [Andrew Or] Add initial memory request threshold + rename a few things
e9c3cb0 [Andrew Or] cacheMemoryMap -> unrollMemoryMap
198e374 [Andrew Or] Unfold -> unroll
0d50155 [Andrew Or] Merge branch 'master' of github.com:apache/spark into them-rdd-memories
d9d02a8 [Andrew Or] Remove unused param in unfoldSafely
ec728d8 [Andrew Or] Add tests for safe unfolding of blocks
22b2209 [Andrew Or] Merge branch 'master' of github.com:apache/spark into them-rdd-memories
078eb83 [Andrew Or] Add check for hasNext in PrimitiveVector.iterator
0871535 [Andrew Or] Fix tests in BlockManagerSuite
d68f31e [Andrew Or] Safely unfold blocks for all memory puts
5961f50 [Andrew Or] Fix tests
195abd7 [Andrew Or] Refactor: move unfold logic to MemoryStore
1e82d00 [Andrew Or] Merge branch 'master' of github.com:apache/spark into them-rdd-memories
3ce413e [Andrew Or] Merge branch 'master' of github.com:apache/spark into them-rdd-memories
d5dd3b4 [Andrew Or] Free buffer memory in finally
ea02eec [Andrew Or] Fix tests
b8e1d9c [Andrew Or] Merge branch 'master' of github.com:apache/spark into them-rdd-memories
a8704c1 [Andrew Or] Merge branch 'master' of github.com:apache/spark into them-rdd-memories
e1b8b25 [Andrew Or] Merge branch 'master' of github.com:apache/spark into them-rdd-memories
87aa75c [Andrew Or] Fix mima excludes again (typo)
11eb921 [Andrew Or] Clarify comment (minor)
50cae44 [Andrew Or] Remove now duplicate mima exclude
7de5ef9 [Andrew Or] Merge branch 'master' of github.com:apache/spark into them-rdd-memories
df47265 [Andrew Or] Fix binary incompatibility
6d05a81 [Andrew Or] Merge branch 'master' of github.com:apache/spark into them-rdd-memories
f94f5af [Andrew Or] Update a few comments (minor)
776aec9 [Andrew Or] Prevent OOM if a single RDD partition is too large
bbd3eea [Andrew Or] Fix CacheManagerSuite to use Array
97ea499 [Andrew Or] Change BlockManager interface to use Arrays
c12f093 [Andrew Or] Add SizeTrackingAppendOnlyBuffer and tests
xiliu82 pushed a commit to xiliu82/spark that referenced this pull request Sep 4, 2014
This adds a new ShuffleManager based on sorting, as described in https://issues.apache.org/jira/browse/SPARK-2045. The bulk of the code is in an ExternalSorter class that is similar to ExternalAppendOnlyMap, but sorts key-value pairs by partition ID and can be used to create a single sorted file with a map task's output. (Longer-term I think this can take on the remaining functionality in ExternalAppendOnlyMap and replace it so we don't have code duplication.)

The main TODOs still left are:
- [x] enabling ExternalSorter to merge across spilled files
  - [x] with an Ordering
  - [x] without an Ordering, using the keys' hash codes
- [x] adding more tests (e.g. a version of our shuffle suite that runs on this)
- [x] rebasing on top of the size-tracking refactoring in apache#1165 when that is merged
- [x] disabling spilling if spark.shuffle.spill is set to false

Despite this though, this seems to work pretty well (running successfully in cases where the hash shuffle would OOM, such as 1000 reduce tasks on executors with only 1G memory), and it seems to be comparable in speed or faster than hash-based shuffle (it will create much fewer files for the OS to keep track of). So I'm posting it to get some early feedback.

After these TODOs are done, I'd also like to enable ExternalSorter to sort data within each partition by a key as well, which will allow us to use it to implement external spilling in reduce tasks in `sortByKey`.

Author: Matei Zaharia <[email protected]>

Closes apache#1499 from mateiz/sort-based-shuffle and squashes the following commits:

bd841f9 [Matei Zaharia] Various review comments
d1c137f [Matei Zaharia] Various review comments
a611159 [Matei Zaharia] Compile fixes due to rebase
62c56c8 [Matei Zaharia] Fix ShuffledRDD sometimes not returning Tuple2s.
f617432 [Matei Zaharia] Fix a failing test (seems to be due to change in SizeTracker logic)
9464d5f [Matei Zaharia] Simplify code and fix conflicts after latest rebase
0174149 [Matei Zaharia] Add cleanup behavior and cleanup tests for sort-based shuffle
eb4ee0d [Matei Zaharia] Remove customizable element type in ShuffledRDD
fa2e8db [Matei Zaharia] Allow nextBatchStream to be called after we're done looking at all streams
a34b352 [Matei Zaharia] Fix tracking of indices within a partition in SpillReader, and add test
03e1006 [Matei Zaharia] Add a SortShuffleSuite that runs ShuffleSuite with sort-based shuffle
3c7ff1f [Matei Zaharia] Obey the spark.shuffle.spill setting in ExternalSorter
ad65fbd [Matei Zaharia] Rebase on top of Aaron's Sorter change, and use Sorter in our buffer
44d2a93 [Matei Zaharia] Use estimateSize instead of atGrowThreshold to test collection sizes
5686f71 [Matei Zaharia] Optimize merging phase for in-memory only data:
5461cbb [Matei Zaharia] Review comments and more tests (e.g. tests with 1 element per partition)
e9ad356 [Matei Zaharia] Update ContextCleanerSuite to make sure shuffle cleanup tests use hash shuffle (since they were written for it)
c72362a [Matei Zaharia] Added bug fix and test for when iterators are empty
de1fb40 [Matei Zaharia] Make trait SizeTrackingCollection private[spark]
4988d16 [Matei Zaharia] tweak
c1b7572 [Matei Zaharia] Small optimization
ba7db7f [Matei Zaharia] Handle null keys in hash-based comparator, and add tests for collisions
ef4e397 [Matei Zaharia] Support for partial aggregation even without an Ordering
4b7a5ce [Matei Zaharia] More tests, and ability to sort data if a total ordering is given
e1f84be [Matei Zaharia] Fix disk block manager test
5a40a1c [Matei Zaharia] More tests
614f1b4 [Matei Zaharia] Add spill metrics to map tasks
cc52caf [Matei Zaharia] Add more error handling and tests for error cases
bbf359d [Matei Zaharia] More work
3a56341 [Matei Zaharia] More partial work towards sort-based shuffle
7a0895d [Matei Zaharia] Some more partial work towards sort-based shuffle
b615476 [Matei Zaharia] Scaffolding for sort-based shuffle
} finally {
// If we return an array, the values returned do not depend on the underlying vector and
// we can immediately free up space for other threads. Otherwise, if we return an iterator,
// we release the memory claimed by this thread later on when the task finishes.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I understand we need release the memory later when the task finishes, but where? I checked CacheManager#putInBlockManager but didn't find it. Sorry if I missed something.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is done in Executor.scala (see TaskRunner)

@cloud-fan
Copy link
Contributor

Hi @andrewor14 , I think about your fix and focus on the timing to release unrollMemory. If we unroll a partition successfully, currently we release unrollMemory immediately(this may be dangerous as we still hold the unrollMemory) and return the vector. If we unroll a part of partition, we return the iterator of vector and the rest partition's iterator. Then we release unrollMemory after the whole task is finished. I think this is a little late as we can release unrollMemory once the vector has been iterated over. Maybe we can create a special iterator for the vector that automatically release the unrollMemory when reach the end. An example:

class SpecialIterator(private var data: Seq[Any], amountToRelease: Long) {
  private var cur = 0
  def hasNext = cur < data.size

  def next = {
    if (hasNext) {
      val re = data(cur)
      cur += 1
      release()
      re
    } else {
      throw new NoSuchElementException
    }
  }

  private def release(): Unit = {
    if (cur == data.size) {
      data = null
      ... //do the release job
    }
  }
}

We can also create this iterator for vector if unroll successfully in CacheManager#putInBlockManager and avoid release unrollMemory immediately in unrollSafely.

* Return a trimmed version of the underlying array.
*/
def toArray: Array[T] = {
super.iterator.toArray
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why not use super.trim.array? I think array copy is more efficient when there are a lot of elements.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

8 participants