Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-23129][CORE] Make deserializeStream of DiskMapIterator init lazily #20292

Closed
wants to merge 5 commits into from
Closed
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -463,7 +463,7 @@ class ExternalAppendOnlyMap[K, V, C](

// An intermediate stream that reads from exactly one batch
// This guards against pre-fetching and other arbitrary behavior of higher level streams
private var deserializeStream = nextBatchStream()
private var deserializeStream: DeserializationStream = null
private var nextItem: (K, C) = null
private var objectsRead = 0

Expand Down Expand Up @@ -528,14 +528,22 @@ class ExternalAppendOnlyMap[K, V, C](
override def hasNext: Boolean = {
if (nextItem == null) {
if (deserializeStream == null) {
return false
// In case of deserializeStream has not been initialized
deserializeStream = nextBatchStream()
if (deserializeStream == null) {
return false
}
}
nextItem = readNextItem()
}
nextItem != null
}

override def next(): (K, C) = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit:

if (!hasNext) {
  throw new NoSuchElementException
}
val item = nextItem
nextItem = null
item

if (deserializeStream == null) {
// In case of deserializeStream has not been initialized when call next() directly
deserializeStream = nextBatchStream()
}
val item = if (nextItem == null) readNextItem() else nextItem
if (item == null) {
throw new NoSuchElementException
Expand All @@ -546,9 +554,8 @@ class ExternalAppendOnlyMap[K, V, C](

private def cleanup() {
batchIndex = batchOffsets.length // Prevent reading any other batch
val ds = deserializeStream
if (ds != null) {
ds.close()
if (deserializeStream != null) {
deserializeStream.close()
deserializeStream = null
}
if (fileStream != null) {
Expand Down