-
Notifications
You must be signed in to change notification settings - Fork 28.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-23129][CORE] Make deserializeStream of DiskMapIterator init lazily #20292
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Without even commenting on the logic, a lot about this change is wrong. I'd review it carefully first.
@@ -463,21 +463,21 @@ class ExternalAppendOnlyMap[K, V, C]( | |||
|
|||
// An intermediate stream that reads from exactly one batch | |||
// This guards against pre-fetching and other arbitrary behavior of higher level streams | |||
private var deserializeStream = nextBatchStream() | |||
private var deserializeStream = null.asInstanceOf[Option[DeserializationStream]] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Much better: ... deserializeStream: Option[DeserializationStream] = None
. It's not just the cast, but the fact that you're assigning null to an Option
// Note that batchOffsets.length = numBatches + 1 since we did a scan above; check whether | ||
// we're still in a valid batch. | ||
if (batchIndex < batchOffsets.length - 1) { | ||
if (batchIndex < batchOffsets.length - 1 && deserializeStream.isDefined) { | ||
if (deserializeStream != null) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This check doesn't make sense then. You probably want a deserializeStream.foreach { ... }
construct anyway
if (ds != null) { | ||
ds.close() | ||
deserializeStream = null | ||
if (ds != null && ds.isEmpty) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This check is wrong
@@ -509,8 +509,8 @@ class ExternalAppendOnlyMap[K, V, C]( | |||
*/ | |||
private def readNextItem(): (K, C) = { | |||
try { | |||
val k = deserializeStream.readKey().asInstanceOf[K] | |||
val c = deserializeStream.readValue().asInstanceOf[C] | |||
val k = deserializeStream.get.readKey().asInstanceOf[K] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If developer call next()
without checking hasNext
then this will trigger clean up, which is not expected.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@jiangxb1987 I think this does not change the original semantic,since it is only call cleanup
when EOFException
was thrown.Maybe i missed something?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Here may throw NoSuchElementException.I will fix this.Thanks @jiangxb1987
Ping @jiangxb1987 could you help review this?thanks too much! |
@@ -463,21 +463,21 @@ class ExternalAppendOnlyMap[K, V, C]( | |||
|
|||
// An intermediate stream that reads from exactly one batch | |||
// This guards against pre-fetching and other arbitrary behavior of higher level streams | |||
private var deserializeStream = nextBatchStream() | |||
private var deserializeStream: Option[DeserializationStream] = None |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Perhaps private var deserializeStream: Option[DeserializationStream] = nextBatchStream()
? Then we can avoid many other code changes.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @jiangxb1987 But this may make deserializeStream still init when spilling and generate DiskMapIterator
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can't we just use null
as initial value? For performance critical places we don't need follow Scala style and use None
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This can also make this patch much smaller.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok,i will fix later @jiangxb1987 @cloud-fan Thanks for your precious time
@@ -509,8 +509,8 @@ class ExternalAppendOnlyMap[K, V, C]( | |||
*/ | |||
private def readNextItem(): (K, C) = { | |||
try { | |||
val k = deserializeStream.readKey().asInstanceOf[K] | |||
val c = deserializeStream.readValue().asInstanceOf[C] | |||
val k = deserializeStream.get.readKey().asInstanceOf[K] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should still check deserializeStream
is not emtpy here to be safe
@@ -547,9 +555,9 @@ class ExternalAppendOnlyMap[K, V, C]( | |||
private def cleanup() { | |||
batchIndex = batchOffsets.length // Prevent reading any other batch | |||
val ds = deserializeStream |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The new val is not actually needed.
ok to test. |
deserializeStream = nextBatchStream() | ||
if (deserializeStream == null) { | ||
return false | ||
} | ||
} | ||
nextItem = readNextItem() | ||
} | ||
nextItem != null | ||
} | ||
|
||
override def next(): (K, C) = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit:
if (!hasNext) {
throw new NoSuchElementException
}
val item = nextItem
nextItem = null
item
LGTM |
Test build #86582 has finished for PR 20292 at commit
|
Test build #86618 has finished for PR 20292 at commit
|
LGTM. |
LGTM |
…zily ## What changes were proposed in this pull request? Currently,the deserializeStream in ExternalAppendOnlyMap#DiskMapIterator init when DiskMapIterator instance created.This will cause memory use overhead when ExternalAppendOnlyMap spill too much times. We can avoid this by making deserializeStream init when it is used the first time. This patch make deserializeStream init lazily. ## How was this patch tested? Exist tests Author: zhoukang <[email protected]> Closes #20292 from caneGuy/zhoukang/lay-diskmapiterator. (cherry picked from commit 45b4bbf) Signed-off-by: Wenchen Fan <[email protected]>
thanks, merging to master/2.3! |
Thanks for your time! @cloud-fan @jerryshao @jiangxb1987 |
What changes were proposed in this pull request?
Currently,the deserializeStream in ExternalAppendOnlyMap#DiskMapIterator init when DiskMapIterator instance created.This will cause memory use overhead when ExternalAppendOnlyMap spill too much times.
We can avoid this by making deserializeStream init when it is used the first time.
This patch make deserializeStream init lazily.
How was this patch tested?
Exist tests