Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-23129][CORE] Make deserializeStream of DiskMapIterator init lazily #20292

Closed
wants to merge 5 commits into from

Conversation

caneGuy
Copy link
Contributor

@caneGuy caneGuy commented Jan 17, 2018

What changes were proposed in this pull request?

Currently,the deserializeStream in ExternalAppendOnlyMap#DiskMapIterator init when DiskMapIterator instance created.This will cause memory use overhead when ExternalAppendOnlyMap spill too much times.

We can avoid this by making deserializeStream init when it is used the first time.
This patch make deserializeStream init lazily.

How was this patch tested?

Exist tests

Copy link
Member

@srowen srowen left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Without even commenting on the logic, a lot about this change is wrong. I'd review it carefully first.

@@ -463,21 +463,21 @@ class ExternalAppendOnlyMap[K, V, C](

// An intermediate stream that reads from exactly one batch
// This guards against pre-fetching and other arbitrary behavior of higher level streams
private var deserializeStream = nextBatchStream()
private var deserializeStream = null.asInstanceOf[Option[DeserializationStream]]
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Much better: ... deserializeStream: Option[DeserializationStream] = None. It's not just the cast, but the fact that you're assigning null to an Option

// Note that batchOffsets.length = numBatches + 1 since we did a scan above; check whether
// we're still in a valid batch.
if (batchIndex < batchOffsets.length - 1) {
if (batchIndex < batchOffsets.length - 1 && deserializeStream.isDefined) {
if (deserializeStream != null) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This check doesn't make sense then. You probably want a deserializeStream.foreach { ... } construct anyway

if (ds != null) {
ds.close()
deserializeStream = null
if (ds != null && ds.isEmpty) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This check is wrong

@@ -509,8 +509,8 @@ class ExternalAppendOnlyMap[K, V, C](
*/
private def readNextItem(): (K, C) = {
try {
val k = deserializeStream.readKey().asInstanceOf[K]
val c = deserializeStream.readValue().asInstanceOf[C]
val k = deserializeStream.get.readKey().asInstanceOf[K]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If developer call next() without checking hasNext then this will trigger clean up, which is not expected.

Copy link
Contributor Author

@caneGuy caneGuy Jan 18, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@jiangxb1987 I think this does not change the original semantic,since it is only call cleanup when EOFException was thrown.Maybe i missed something?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here may throw NoSuchElementException.I will fix this.Thanks @jiangxb1987

@caneGuy
Copy link
Contributor Author

caneGuy commented Jan 22, 2018

Ping @jiangxb1987 could you help review this?thanks too much!

@@ -463,21 +463,21 @@ class ExternalAppendOnlyMap[K, V, C](

// An intermediate stream that reads from exactly one batch
// This guards against pre-fetching and other arbitrary behavior of higher level streams
private var deserializeStream = nextBatchStream()
private var deserializeStream: Option[DeserializationStream] = None
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Perhaps private var deserializeStream: Option[DeserializationStream] = nextBatchStream() ? Then we can avoid many other code changes.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @jiangxb1987 But this may make deserializeStream still init when spilling and generate DiskMapIterator

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can't we just use null as initial value? For performance critical places we don't need follow Scala style and use None.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This can also make this patch much smaller.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok,i will fix later @jiangxb1987 @cloud-fan Thanks for your precious time

@@ -509,8 +509,8 @@ class ExternalAppendOnlyMap[K, V, C](
*/
private def readNextItem(): (K, C) = {
try {
val k = deserializeStream.readKey().asInstanceOf[K]
val c = deserializeStream.readValue().asInstanceOf[C]
val k = deserializeStream.get.readKey().asInstanceOf[K]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should still check deserializeStream is not emtpy here to be safe

@@ -547,9 +555,9 @@ class ExternalAppendOnlyMap[K, V, C](
private def cleanup() {
batchIndex = batchOffsets.length // Prevent reading any other batch
val ds = deserializeStream
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The new val is not actually needed.

@jerryshao
Copy link
Contributor

ok to test.

deserializeStream = nextBatchStream()
if (deserializeStream == null) {
return false
}
}
nextItem = readNextItem()
}
nextItem != null
}

override def next(): (K, C) = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit:

if (!hasNext) {
  throw new NoSuchElementException
}
val item = nextItem
nextItem = null
item

@cloud-fan
Copy link
Contributor

LGTM

@SparkQA
Copy link

SparkQA commented Jan 24, 2018

Test build #86582 has finished for PR 20292 at commit 3087208.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Jan 25, 2018

Test build #86618 has finished for PR 20292 at commit a443531.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@jerryshao
Copy link
Contributor

LGTM.

@jiangxb1987
Copy link
Contributor

LGTM

asfgit pushed a commit that referenced this pull request Jan 25, 2018
…zily

## What changes were proposed in this pull request?

Currently,the deserializeStream in ExternalAppendOnlyMap#DiskMapIterator init when DiskMapIterator instance created.This will cause memory use overhead when ExternalAppendOnlyMap spill too much times.

We can avoid this by making deserializeStream init when it is used the first time.
This patch make deserializeStream init lazily.

## How was this patch tested?

Exist tests

Author: zhoukang <[email protected]>

Closes #20292 from caneGuy/zhoukang/lay-diskmapiterator.

(cherry picked from commit 45b4bbf)
Signed-off-by: Wenchen Fan <[email protected]>
@cloud-fan
Copy link
Contributor

thanks, merging to master/2.3!

@asfgit asfgit closed this in 45b4bbf Jan 25, 2018
@caneGuy
Copy link
Contributor Author

caneGuy commented Jan 25, 2018

Thanks for your time! @cloud-fan @jerryshao @jiangxb1987

@caneGuy caneGuy deleted the zhoukang/lay-diskmapiterator branch January 26, 2018 02:37
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

6 participants