Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-23040][CORE]: Returns interruptible iterator for shuffle reader #20449

Closed
wants to merge 10 commits into from

Conversation

advancedxy
Copy link
Contributor

What changes were proposed in this pull request?

Before this commit, a non-interruptible iterator is returned if aggregator or ordering is specified.
This commit also ensures that sorter is closed even when task is cancelled(killed) in the middle of sorting.

How was this patch tested?

Add a unit test in JobCancellationSuite

Before this commit, a non-interruptible iterator is returned if
aggregator or ordering is specified.
@advancedxy
Copy link
Contributor Author

ping @cloud-fan

context.addTaskCompletionListener(tc => {
// Note: we only stop sorter if cancelled as sorter.stop wouldn't be called in
// CompletionIterator. Another way would be making sorter.stop idempotent.
if (tc.isInterrupted()) { sorter.stop() }
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

seems we can remove this if if we don't return a CompletionIterator.

BTW I think we need to check all the places that use CompletionIterator, to see if they consider job canceling.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One advantage of CompletionIterator is that the completionFunction will be called as soon as the wrapped iterator is consumed. So for sorter, it will release memory earlier rather than at task completion.

As for job cancelling, It's not just CompletionIterator that we should consider. The combiner and sorter pattern(or similar) is something we should look for:

combiner.insertAll(iterator) // or sorter.insertAll(iterator)
// then returns new iterator
combiner.iterator // or sorter.iterator

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I may be missing something obvious, but seems ExternalSorter.stop() is already idempotent?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I may be missing something obvious, but seems ExternalSorter.stop() is already idempotent?

Ah, yes. After another look, it's indeed idempotent.
Will update the code.

@cloud-fan
Copy link
Contributor

cc @jiangxb1987

@advancedxy
Copy link
Contributor Author

ping @cloud-fan and @jiangxb1987.

@jerryshao
Copy link
Contributor

@advancedxy did you see any issue or exception regarding to this issue?

@advancedxy
Copy link
Contributor Author

Hi, @jerryshao I didn't see exception. But the issue is:
When the stage is abort and all the remaining tasks are killed, those tasks are not cancelled but rather continue running which is a waste of executor resource.

@jerryshao
Copy link
Contributor

I understood your intention. I was wondering do we actually meet this issue in production envs, or do you have a minimal reproduce code?

@advancedxy
Copy link
Contributor Author

advancedxy commented Feb 8, 2018

I was wondering do we actually meet this issue in production envs,

@jerryshao I met this issue in our production when I was debugging a Spark job. I noticed the aborted stage's task continues running until finishes.

I cannot give a minimal reproduce code since the failure is related to our mixed(online and offline services) hosts. But you can have a look at the test case I added, it essentially captures the transformation I used except the async part.

Currently, I wrap the user defined iterator under Iterruptible Iterator. However I believe it's better handled on Spark side.

@jerryshao
Copy link
Contributor

I see. Thanks.

.mapPartitions { iter =>
taskStartedSemaphore.release()
// Small delay to ensure that foreach is cancelled if task is killed
Thread.sleep(1000)
Copy link
Contributor

@jerryshao jerryshao Feb 8, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think using sleep will make the UT flaky, you'd better changing to some deterministic ways.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1

@advancedxy
Copy link
Contributor Author

@jerryshao @cloud-fan I have updated my code. Do you have any other concerns?

@@ -104,9 +104,16 @@ private[spark] class BlockStoreShuffleReader[K, C](
context.taskMetrics().incMemoryBytesSpilled(sorter.memoryBytesSpilled)
context.taskMetrics().incDiskBytesSpilled(sorter.diskBytesSpilled)
context.taskMetrics().incPeakExecutionMemory(sorter.peakMemoryUsedBytes)
// Use completion callback to stop sorter if task was cancelled.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if task is completed(either finished or canceled)

Copy link
Contributor Author

@advancedxy advancedxy left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@cloud-fan Sorry for the delay.

Your comment is addressed.

@@ -104,9 +104,16 @@ private[spark] class BlockStoreShuffleReader[K, C](
context.taskMetrics().incMemoryBytesSpilled(sorter.memoryBytesSpilled)
context.taskMetrics().incDiskBytesSpilled(sorter.diskBytesSpilled)
context.taskMetrics().incPeakExecutionMemory(sorter.peakMemoryUsedBytes)
// Use completion callback to stop sorter if task was completed(either finished/cancelled).
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To fit the 100 chars limitation, or is replaced by /

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

then we can just write if task was finished/cancelled.

import org.scalatest.BeforeAndAfter
import org.scalatest.Matchers

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this will break the style check

taskStartedSemaphore.release()
iter
}.foreachAsync { x =>
if ( x._1 >= 10) { // this block of code is partially executed.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

no space after if(

@@ -320,6 +319,55 @@ class JobCancellationSuite extends SparkFunSuite with Matchers with BeforeAndAft
f2.get()
}

test("Interruptible iterator of shuffle reader") {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we briefly explain what happened in this test?

@advancedxy
Copy link
Contributor Author

@cloud-fan I have update the comments and fixed style issues(previously was auto formatted by IntelliJ)

@cloud-fan
Copy link
Contributor

ok to test

val f = sc.parallelize(1 to 1000, numSlice).map { i => (i, i) }
.repartitionAndSortWithinPartitions(new HashPartitioner(2))
.mapPartitions { iter =>
taskStartedSemaphore.release()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This will be called twice as the root RDD has 2 partitions, so f.cancel might be called before both of these 2 partitions finished.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

f.cancel() should be called before these partitions(tasks) finishing , and we want to make sure these tasks could be cancelled

val taskCompletedSem = new Semaphore(0)
Future {
taskStartedSemaphore.acquire()
f.cancel()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what's the expectation for when this f.cancel() should be called?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Line 372: sem.acquire() is blocked by this Future block, but it looks we don't need Future or sem here. I will update the code.

@SparkQA
Copy link

SparkQA commented Feb 27, 2018

Test build #87703 has finished for PR 20449 at commit ba2f355.

  • This patch fails due to an unknown error code, -9.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Feb 27, 2018

Test build #87701 has finished for PR 20449 at commit 88e86e0.

  • This patch fails due to an unknown error code, -9.
  • This patch merges cleanly.
  • This patch adds no public classes.

taskStartedSemaphore.acquire()
f.cancel()

val e = intercept[SparkException] { f.get() }.getCause
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: intercept[SparkException](f.get()).getCause

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

will do

})

taskStartedSemaphore.acquire()
f.cancel()
Copy link
Contributor

@cloud-fan cloud-fan Feb 27, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should add some comment to explain when we reach here. From what I am seeing:

  1. taskStartedSemaphore.release() must be called, so at least one task is started.
  2. the first task has processed no more than 10 records, the second task hasn't processed any data, because the reduce stage is not finished and taskCancelledSemaphore.acquire() will be blocked.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

will do

}
})

taskStartedSemaphore.acquire()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why not taskStartedSemaphore.acquire(numSlice)?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As soon as one task starts, we can cancel the job.

@SparkQA
Copy link

SparkQA commented Feb 27, 2018

Test build #87723 has finished for PR 20449 at commit d6ed9a1.

  • This patch fails from timeout after a configured wait of `300m`.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Feb 27, 2018

Test build #87725 has finished for PR 20449 at commit 8c15c56.

  • This patch fails from timeout after a configured wait of `300m`.
  • This patch merges cleanly.
  • This patch adds no public classes.

// execution and a counter is used to make sure that the corresponding tasks are indeed
// cancelled.
import JobCancellationSuite._
val numSlice = 1
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we hardcode it? using a variable makes people feel like they can change its value and the test can still pass, however it's not true as assert(executionOfInterruptibleCounter.get() <= 10) needs to be updated too.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will update it later.

But looks like Jenkins are having troubles there days? it it back to normal?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure, let's just try it :)

@cloud-fan
Copy link
Contributor

LGTM

@cloud-fan
Copy link
Contributor

retest this please

@SparkQA
Copy link

SparkQA commented Feb 28, 2018

Test build #87756 has finished for PR 20449 at commit 8c15c56.

  • This patch fails due to an unknown error code, -9.
  • This patch merges cleanly.
  • This patch adds no public classes.

@advancedxy
Copy link
Contributor Author

I'm not sure, let's just try it :)

All right, I finally tracked down why it's hanging on Jenkins.
The global semaphores used by interruptible iterator of shuffle reader are interfered by other tasks.

Please check the latest change, @cloud-fan

@SparkQA
Copy link

SparkQA commented Mar 1, 2018

Test build #87813 has finished for PR 20449 at commit 756e0b7.

  • This patch fails from timeout after a configured wait of `300m`.
  • This patch merges cleanly.
  • This patch adds no public classes.

// Reset semaphores if used by multiple tests.
// Note: if other semaphores are shared by multiple tests, please reset them in this block
JobCancellationSuite.taskStartedSemaphore.drainPermits()
JobCancellationSuite.taskCancelledSemaphore.drainPermits()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: for simplicity, I'd like to reset all semaphores here, instead of thinking about which one are shared.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

or we can make all semaphores local, so that we don't need to care about it.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

for simplicity, I'd like to reset all semaphores here, instead of thinking about which one are shared.

Another way to avoid this problem is: don't reuse semaphores. But that's too verbose.

As for your suggestion, if new semaphores are added by others, how could he know that he's supposed to reset the semaphores? Maybe some comments are needed in semaphore declaration

or we can make all semaphores local, so that we don't need to care about it.

No, Global semaphore is required when being shared between driver and executor(another thread in local mode).
See related pr #4180 for details

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe some comments are needed in semaphore declaration

+1. It's also good for reviewers, otherwise figuring out a semaphore is shared or not is really unnecessary for reviewers.

@SparkQA
Copy link

SparkQA commented Mar 1, 2018

Test build #87822 has finished for PR 20449 at commit a3d8ad5.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Mar 1, 2018

Test build #87845 has finished for PR 20449 at commit 28119e9.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@advancedxy
Copy link
Contributor Author

ping @cloud-fan

@cloud-fan
Copy link
Contributor

thanks, merging to master!

@asfgit asfgit closed this in f2cab56 Mar 5, 2018
@advancedxy
Copy link
Contributor Author

@cloud-fan is it possible that we also merge this into branch-2.3, so this fix could be released in the Spark-2.3.1?

CompletionIterator[Product2[K, C], Iterator[Product2[K, C]]](sorter.iterator, sorter.stop())
case None =>
aggregatedIter
}
// Use another interruptible iterator here to support task cancellation as aggregator or(and)
// sorter may have consumed previous interruptible iterator.
new InterruptibleIterator[Product2[K, C]](context, resultIter)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

there is a chance that resultIter is already an InterruptibleIterator, and we should not double wrap it. Can you send a followup PR to fix this? then we can backport them to 2.3 together.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will do

ghost pushed a commit to dbtsai/spark that referenced this pull request Mar 31, 2018
## What changes were proposed in this pull request?

Address apache#20449 (comment), If `resultIter` is already a `InterruptibleIterator`, don't double wrap it.

## How was this patch tested?
Existing tests.

Author: Xingbo Jiang <[email protected]>

Closes apache#20920 from jiangxb1987/SPARK-23040.
jiangxb1987 pushed a commit to jiangxb1987/spark that referenced this pull request Mar 31, 2018
## What changes were proposed in this pull request?

Before this commit, a non-interruptible iterator is returned if aggregator or ordering is specified.
This commit also ensures that sorter is closed even when task is cancelled(killed) in the middle of sorting.

## How was this patch tested?

Add a unit test in JobCancellationSuite

Author: Xianjin YE <[email protected]>

Closes apache#20449 from advancedxy/SPARK-23040.
jiangxb1987 added a commit to jiangxb1987/spark that referenced this pull request Mar 31, 2018
## What changes were proposed in this pull request?

Address apache#20449 (comment), If `resultIter` is already a `InterruptibleIterator`, don't double wrap it.

## How was this patch tested?
Existing tests.

Author: Xingbo Jiang <[email protected]>

Closes apache#20920 from jiangxb1987/SPARK-23040.
asfgit pushed a commit that referenced this pull request Apr 1, 2018
…fle reader

Backport #20449 and #20920 to branch-2.3

---

## What changes were proposed in this pull request?
Before this commit, a non-interruptible iterator is returned if aggregator or ordering is specified.
This commit also ensures that sorter is closed even when task is cancelled(killed) in the middle of sorting.

## How was this patch tested?
Add a unit test in JobCancellationSuite

Author: Xianjin YE <[email protected]>
Author: Xingbo Jiang <[email protected]>

Closes #20954 from jiangxb1987/SPARK-23040-2.3.
mshtelma pushed a commit to mshtelma/spark that referenced this pull request Apr 5, 2018
## What changes were proposed in this pull request?

Address apache#20449 (comment), If `resultIter` is already a `InterruptibleIterator`, don't double wrap it.

## How was this patch tested?
Existing tests.

Author: Xingbo Jiang <[email protected]>

Closes apache#20920 from jiangxb1987/SPARK-23040.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants