Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-24755][Core] Executor loss can cause task to not be resubmitted #21729

Closed
wants to merge 7 commits into from

Conversation

hthuynh2
Copy link

@hthuynh2 hthuynh2 commented Jul 8, 2018

Description
As described in SPARK-24755, when speculation is enabled, there is scenario that executor loss can cause task to not be resubmitted.
This patch changes the variable killedByOtherAttempt to keeps track of the taskId of tasks that are killed by other attempt. By doing this, we can still prevent resubmitting task killed by other attempt while resubmit successful attempt when executor lost.

How was this patch tested?
A UT is added based on the UT written by @xuanyuanking with modification to simulate the scenario described in SPARK-24755.

@hthuynh2
Copy link
Author

hthuynh2 commented Jul 8, 2018

cc @mridulm @xuanyuanking

@@ -87,7 +87,7 @@ private[spark] class TaskSetManager(
// Set the coresponding index of Boolean var when the task killed by other attempt tasks,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

typo I made before, coresponding -> corresponding.

("exec2", "host2"), ("exec3", "host3"))
sched.initialize(new FakeSchedulerBackend() {
override def killTask(taskId: Long,
executorId: String,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: indent

var resubmittedTasks = new mutable.HashSet[Int]
val dagScheduler = new FakeDAGScheduler(sc, sched) {
override def taskEnded(task: Task[_],
reason: TaskEndReason,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto

Copy link
Member

@xuanyuanking xuanyuanking left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just some nits in code changes. The added UT copy a lot of code with SPARK-22074, is there any better way to reuse them or combine the two test together?

@xuanyuanking
Copy link
Member

Please change the title to '[SPARK-24755][Core] Executor loss can cause task to not be resubmitted'

@hthuynh2 hthuynh2 changed the title SPARK-24755 Executor loss can cause task to not be resubmitted [SPARK-24755][Core] Executor loss can cause task to not be resubmitted Jul 9, 2018
@hthuynh2
Copy link
Author

hthuynh2 commented Jul 9, 2018

@xuanyuanking Thanks for the comments. I also thought about modifying the UT of SPARK-22074 instead of adding new UT but I was afraid it might cause confusing since they are 2 different issues although they are very close. If you feel it is better to combine them, I can change it. Thanks.

@tgravescs
Copy link
Contributor

ok to test

@SparkQA
Copy link

SparkQA commented Jul 10, 2018

Test build #92775 has finished for PR 21729 at commit 093e39c.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@@ -87,7 +87,7 @@ private[spark] class TaskSetManager(
// Set the coresponding index of Boolean var when the task killed by other attempt tasks,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

comment needs to be changed since no longer array with boolean

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll update it. Thanks.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please update comment based on hashset being ok now

@tgravescs
Copy link
Contributor

cc @mridulm

@@ -87,7 +87,7 @@ private[spark] class TaskSetManager(
// Set the coresponding index of Boolean var when the task killed by other attempt tasks,
// this happened while we set the `spark.speculation` to true. The task killed by others
// should not resubmit while executor lost.
private val killedByOtherAttempt: Array[Boolean] = new Array[Boolean](numTasks)
private val killedByOtherAttempt = new HashSet[Long]
Copy link
Contributor

@jiangxb1987 jiangxb1987 Jul 10, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

super nit: I prefer an Array[Long], so you know the index corresponding to the taskId, that can provide more information while debug.

Copy link
Author

@hthuynh2 hthuynh2 Jul 10, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @jiangxb1987, thanks for the comment, but I'm not sure if I understand your suggestion correctly. Do you mean: private val killedByOtherAttempt = new Array[Long] ?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also, the comment "Set the corresponding index of Boolean var when the task killed ..." is not correct anymore. I'm sorry I forgot to update it.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yea, please also update the comment.

Copy link
Author

@hthuynh2 hthuynh2 Jul 10, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should use ArrayBuffer[Long] instead of Array[Long] because the number of elements can grow when there are more killed tasks.
Also, I think there is a downside of using Array-like data structure for this variable. Lookup operation for array-like data structure takes linear time and that operation is used many times when we check if a task need to be resubmitted (inside executorLost method of TSM). This will not matter much if the size of the array is small, but still I think this is something we might want to consider.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@mridulm 's approach also sounds good to me.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@jiangxb1987 please clarify is it fine as is or are you wanting to use a hashMap and track the index? Can you give an example when this is used for debugging? For instance are you getting a heap dump and looking at the datastructures that might make sense, otherwise its not accessible without you adding in further log statements anyway and its just extra memory usage.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For instance when you have corrupted shuffle data you may want to ensure it's not caused by killing tasks, and that requires track all killed taskIds corresponding to a partition. With a hashMap as @mridulm proposed it shall be easy to add extra log to debug. But actually I just looked at the code again and found that expanding the logInfo in L735 can also resolve my case. So it seems fine to use hashSet to save some memory.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not proposing to expand the logInfo in L735 in this PR, I'm just concern about whether it's convenient enough for me to add extra logs to debug a potential issue. Since there is another way to achieve the same effect, I'm okay with using hashSet here.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Great thanks, seems like we can go with code as is then.

@tgravescs
Copy link
Contributor

@hthuynh2 please update based on the comments above. You can leave the type as Hashset and fix the other typos, identations, and comments.

@SparkQA
Copy link

SparkQA commented Jul 16, 2018

Test build #93122 has finished for PR 21729 at commit b2affd2.

  • This patch fails Scala style tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Jul 16, 2018

Test build #93121 has finished for PR 21729 at commit 9f0e0ae.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Jul 16, 2018

Test build #93127 has finished for PR 21729 at commit a67bebc.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@tgravescs
Copy link
Contributor

test this please

@SparkQA
Copy link

SparkQA commented Jul 17, 2018

Test build #93176 has finished for PR 21729 at commit a67bebc.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@mridulm
Copy link
Contributor

mridulm commented Jul 18, 2018

Looks good to me, thanks for fixing this @hthuynh2 !

@squito
Copy link
Contributor

squito commented Jul 18, 2018

lgtm

@SparkQA
Copy link

SparkQA commented Jul 19, 2018

Test build #93248 has finished for PR 21729 at commit f9ed226.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Jul 19, 2018

Test build #93249 has finished for PR 21729 at commit 6316e5b.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@tgravescs
Copy link
Contributor

+1 I'm going to merge, thanks @hthuynh2

asfgit pushed a commit that referenced this pull request Jul 19, 2018
**Description**
As described in [SPARK-24755](https://issues.apache.org/jira/browse/SPARK-24755), when speculation is enabled, there is scenario that executor loss can cause task to not be resubmitted.
This patch changes the variable killedByOtherAttempt to keeps track of the taskId of tasks that are killed by other attempt. By doing this, we can still prevent resubmitting task killed by other attempt while resubmit successful attempt when executor lost.

**How was this patch tested?**
A UT is added based on the UT written by xuanyuanking with modification to simulate the scenario described in SPARK-24755.

Author: Hieu Huynh <“[email protected]”>

Closes #21729 from hthuynh2/SPARK_24755.

(cherry picked from commit 8d707b0)
Signed-off-by: Thomas Graves <[email protected]>
@asfgit asfgit closed this in 8d707b0 Jul 19, 2018
@gatorsmile
Copy link
Member

For other reviewers, this is merged to master/2.3

otterc pushed a commit to linkedin/spark that referenced this pull request Mar 22, 2023
**Description**
As described in [SPARK-24755](https://issues.apache.org/jira/browse/SPARK-24755), when speculation is enabled, there is scenario that executor loss can cause task to not be resubmitted.
This patch changes the variable killedByOtherAttempt to keeps track of the taskId of tasks that are killed by other attempt. By doing this, we can still prevent resubmitting task killed by other attempt while resubmit successful attempt when executor lost.

**How was this patch tested?**
A UT is added based on the UT written by xuanyuanking with modification to simulate the scenario described in SPARK-24755.

Author: Hieu Huynh <“[email protected]”>

Closes apache#21729 from hthuynh2/SPARK_24755.

(cherry-picked from commit 8d707b0)

Ref: LIHADOOP-40171

RB=1414249
BUG=LIHADOOP-40171
R=fli,mshen,yezhou
A=yezhou
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

8 participants