Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-45227][CORE] Fix a subtle thread-safety issue with CoarseGrainedExecutorBackend #43021

Closed
wants to merge 1 commit into from

Conversation

xiongbo-sjtu
Copy link

@xiongbo-sjtu xiongbo-sjtu commented Sep 21, 2023

What changes were proposed in this pull request?

Fix a subtle thread-safety issue with CoarseGrainedExecutorBackend where an executor process randomly gets stuck

Why are the changes needed?

For each executor, the single-threaded dispatcher can run into an "infinite loop" (as explained in the SPARK-45227). Once an executor process runs into a state, it'd stop launching tasks from the driver or reporting task status back.

Does this PR introduce any user-facing change?

No

How was this patch tested?

$ build/mvn package -DskipTests -pl core
$ build/mvn -Dtest=none -DwildcardSuites=org.apache.spark.executor.CoarseGrainedExecutorBackendSuite test

Was this patch authored or co-authored using generative AI tooling?

No


Please feel free to skip reading unless you're interested in details


Symptom

Our Spark 3 app running on EMR 6.10.0 with Spark 3.3.1 got stuck in the very last step of writing a data frame to S3 by calling df.write. Looking at Spark UI, we saw that an executor process hung over 1 hour. After we manually killed the executor process, the app succeeded. Note that the same EMR cluster with two worker nodes was able to run the same app without any issue before and after the incident.

Below is what's observed from relevant container logs and thread dump.

  • A regular task that's sent to the executor, which also reported back to the driver upon the task completion.
    $zgrep 'task 150' container_1694029806204_12865_01_000001/stderr.gz
    23/09/12 18:13:55 INFO TaskSetManager: Starting task 150.0 in stage 23.0 (TID 923) (ip-10-0-185-107.ec2.internal, executor 3, partition 150, NODE_LOCAL, 4432 bytes) taskResourceAssignments Map()
    23/09/12 18:13:55 INFO TaskSetManager: Finished task 150.0 in stage 23.0 (TID 923) in 126 ms on ip-10-0-185-107.ec2.internal (executor 3) (16/200)

    $zgrep ' 923' container_1694029806204_12865_01_000004/stderr.gz
    23/09/12 18:13:55 INFO YarnCoarseGrainedExecutorBackend: Got assigned task 923

    $zgrep 'task 150' container_1694029806204_12865_01_000004/stderr.gz
    23/09/12 18:13:55 INFO Executor: Running task 150.0 in stage 23.0 (TID 923)
    23/09/12 18:13:55 INFO Executor: Finished task 150.0 in stage 23.0 (TID 923). 4495 bytes result sent to driver
  • Another task that's sent to the executor but didn't get launched since the single-threaded dispatcher was stuck (presumably in an "infinite loop" as explained later).
    $zgrep 'task 153' container_1694029806204_12865_01_000001/stderr.gz
    23/09/12 18:13:55 INFO TaskSetManager: Starting task 153.0 in stage 23.0 (TID 924) (ip-10-0-185-107.ec2.internal, executor 3, partition 153, NODE_LOCAL, 4432 bytes) taskResourceAssignments Map()

    $zgrep ' 924' container_1694029806204_12865_01_000004/stderr.gz
    23/09/12 18:13:55 INFO YarnCoarseGrainedExecutorBackend: Got assigned task 924

    $zgrep 'task 153' container_1694029806204_12865_01_000004/stderr.gz
    >> note that the above command has no matching result, indicating that task 153.0 in stage 23.0 (TID 924) was never launched
  • Thread dump shows that the dispatcher-Executor thread has the following stack trace.
    "dispatcher-Executor" #40 daemon prio=5 os_prio=0 tid=0x0000ffff98e37800 nid=0x1aff runnable [0x0000ffff73bba000]
    java.lang.Thread.State: RUNNABLE
    at scala.runtime.BoxesRunTime.equalsNumObject(BoxesRunTime.java:142)
    at scala.runtime.BoxesRunTime.equals2(BoxesRunTime.java:131)
    at scala.runtime.BoxesRunTime.equals(BoxesRunTime.java:123)
    at scala.collection.mutable.HashTable.elemEquals(HashTable.scala:365)
    at scala.collection.mutable.HashTable.elemEquals$(HashTable.scala:365)
    at scala.collection.mutable.HashMap.elemEquals(HashMap.scala:44)
    at scala.collection.mutable.HashTable.findEntry0(HashTable.scala:140)
    at scala.collection.mutable.HashTable.findOrAddEntry(HashTable.scala:169)
    at scala.collection.mutable.HashTable.findOrAddEntry$(HashTable.scala:167)
    at scala.collection.mutable.HashMap.findOrAddEntry(HashMap.scala:44)
    at scala.collection.mutable.HashMap.put(HashMap.scala:126)
    at scala.collection.mutable.HashMap.update(HashMap.scala:131)
    at org.apache.spark.executor.CoarseGrainedExecutorBackend$$anonfun$receive$1.applyOrElse(CoarseGrainedExecutorBackend.scala:200)
    at org.apache.spark.rpc.netty.Inbox.$anonfun$process$1(Inbox.scala:115)
    at org.apache.spark.rpc.netty.Inbox$$Lambda$323/1930826709.apply$mcV$sp(Unknown Source)
    at org.apache.spark.rpc.netty.Inbox.safelyCall(Inbox.scala:213)
    at org.apache.spark.rpc.netty.Inbox.process(Inbox.scala:100)
    at org.apache.spark.rpc.netty.MessageLoop.org$apache$spark$rpc$netty$MessageLoop$$receiveLoop(MessageLoop.scala:75)
    at org.apache.spark.rpc.netty.MessageLoop$$anon$1.run(MessageLoop.scala:41)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:750)

Relevant code paths

Within an executor process, there's a dispatcher thread dedicated to CoarseGrainedExecutorBackend(a single RPC endpoint) that launches tasks scheduled by the driver. Each task is run on a TaskRunner thread backed by a thread pool created for the executor. The TaskRunner thread and the dispatcher thread are different. However, they read and write a common object (i.e., taskResources) that's a mutable hashmap without thread-safety, in Executor and CoarseGrainedExecutorBackend, respectively.

What's going on?

Based on the above observations, our hypothesis is that the dispatcher thread runs into an "infinite loop" due to a race condition when two threads access the same hashmap object. For illustration purpose, let's consider the following scenario where two threads (Thread 1 and Thread 2) access a hash table without thread-safety

  • Thread 1 sees A.next = B, but then yields execution to Thread 2

  • Thread 2 triggers a resize operation resulting in B.next = A (Note that hashmap doesn't care about ordering), and then yields execution to Thread 1.

  • After taking over CPU, Thread 1 would run into an "infinite loop" when traversing the list in the last bucket, given A.next = B and B.next = A in its view.

@github-actions github-actions bot added the CORE label Sep 21, 2023
Copy link
Contributor

@mridulm mridulm left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for fixing this @xiongbo-sjtu, nice catch !

+CC @jiangxb1987 as well

@jiangxb1987
Copy link
Contributor

Thanks for the fix, is it possible to add an unit test to cover the failure?

@xiongbo-sjtu xiongbo-sjtu force-pushed the master branch 9 times, most recently from f24da16 to 4e9ae1e Compare September 22, 2023 02:33
@xiongbo-sjtu xiongbo-sjtu changed the title [SPARK-45227][CORE] Fix an issue with CoarseGrainedExecutorBackend wh… [SPARK-45227][CORE] Fix an issue with CoarseGrainedExecutorBackend where an executor process randomly gets stuck Sep 22, 2023
@xiongbo-sjtu xiongbo-sjtu changed the title [SPARK-45227][CORE] Fix an issue with CoarseGrainedExecutorBackend where an executor process randomly gets stuck [SPARK-45227][CORE] Fix a subtle thread-safety with CoarseGrainedExecutorBackend Sep 22, 2023
@xiongbo-sjtu
Copy link
Author

xiongbo-sjtu commented Sep 22, 2023

@jiangxb1987 Good suggestion! I've updated the unit test accordingly.

Please merge this fix to master. If possible, please help patch v3.3.1 and above.

Thanks!
Bo (PE w/ Amazon)

@xiongbo-sjtu xiongbo-sjtu changed the title [SPARK-45227][CORE] Fix a subtle thread-safety with CoarseGrainedExecutorBackend [SPARK-45227][CORE] Fix a subtle thread-safety issue with CoarseGrainedExecutorBackend Sep 22, 2023
@xiongbo-sjtu
Copy link
Author

@jiangxb1987 @mridulm

Eventually got all tests passed in Github Actions. Any concern on merging this pull request?

As a side note, I've discovered another minor issue, but will address that in another pull request.

Thanks,
Bo

@mridulm
Copy link
Contributor

mridulm commented Sep 26, 2023

The updated test is not checking for behavior (it is checking for the type of the variable, so imo not very useful), not sure how we can reliably trigger this behavior to test.
Thoughts @jiangxb1987 ?

Copy link
Contributor

@JoshRosen JoshRosen left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The updated test is not checking for behavior (it is checking for the type of the variable, so imo not very useful), not sure how we can reliably trigger this behavior to test.

This strikes me as a situation where the ROI of a reproducing test is somewhat low: any reproducer would necessarily involve multiple threads performing different operations (reads and writes) and in order to reliably reproduce the old bug we'd have to either get lucky on the interleaving or would need to run with really high concurrency in order to get enough attempts so that we have a high likelihood of reproduction.

I think it's unlikely that we'll regress on this specific code (e.g. accidentally changing it from a concurrent map to a non-concurrent one). An end-to-end reproduction might help to defend against other thread-safety bugs here, but I don't view this code as inherently more vulnerable than other multi-threaded places in Spark so I don't think it's worth it to add dedicated tests there.

Given this, I would personally be okay with foregoing any added tests for this specific fix. WDYT?

@xiongbo-sjtu
Copy link
Author

+1 to the comment made by @JoshRosen

Our production fleet has tens of thousands Spark jobs running daily. After migrating those jobs from Spark 2 to Spark 3, we've observed the issue (i.e., thread dumps show that the dispatcher thread is stuck with the reported stack trace) only a handful of times.

The unit test was added so that the thread-safety will not be unintentionally refactored away later. It's not meant to simulate the race condition. The bottom line is that no functional regression is expected after this pull request replaces the vanilla mutable.hashmap with CHM.

Copy link
Contributor

@jiangxb1987 jiangxb1987 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

// Ensure thread-safety (https://issues.apache.org/jira/browse/SPARK-45227)
assert(backend.taskResources.isInstanceOf[
ConcurrentMap[Long, Map[String, ResourceInformation]]
])
Copy link
Contributor

@mridulm mridulm Sep 27, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Do you want to drop this assert @xiongbo-sjtu ?

Copy link
Author

@xiongbo-sjtu xiongbo-sjtu Sep 27, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The above assert logic was added so that the thread-safe data structure (i.e., CHM) will not be unintentionally refactored away later. That's why I think that it's better for us to keep the logic here.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should add a comment on the field about thread safety, and remove this assertion.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the spirit of "disagree and commit", I've removed the assert statement and updated the pull request.

Please merge this fix to master. If possible, please help patch v3.3.1 and above. We have been waiting for the fix to get merged, before pulling the trigger to sync up our internal repo with the open-source repo.

@attilapiros
Copy link
Contributor

I think it would be good to add the content of SPARK-45227 here to the PR's description.

@xiongbo-sjtu
Copy link
Author

Gentlemen, please 👍 if you're okay with getting this pull request merged.

…ere an executor process randomly gets stuck
@mridulm
Copy link
Contributor

mridulm commented Sep 28, 2023

@HeartSaVioR, @HyukjinKwon I am seeing multiple PR's failing with this. It looks like some timeout issue, did anything change recently which might be triggering it ?

This PR should not be impacted by those modules, but I would have prefer clean builds before merging.

@xiongbo-sjtu
Copy link
Author

@HeartSaVioR, @HyukjinKwon I am seeing multiple PR's failing with this. It looks like some timeout issue, did anything change recently which might be triggering it ?

This PR should not be impacted by those modules, but I would have prefer clean builds before merging.

All failed tests have passed after I rerun them. Solving the fragile unit tests is an orthogonal concern.

Any more concern or blocker for merging this pull request? @mridulm

Copy link
Contributor

@mridulm mridulm left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks good to me, thanks for fixing this @xiongbo-sjtu !

@mridulm mridulm closed this in 8e6b160 Sep 29, 2023
mridulm pushed a commit that referenced this pull request Sep 29, 2023
…edExecutorBackend

### What changes were proposed in this pull request?
Fix a subtle thread-safety issue with CoarseGrainedExecutorBackend where an executor process randomly gets stuck

### Why are the changes needed?
For each executor, the single-threaded dispatcher can run into an "infinite loop" (as explained in the SPARK-45227). Once an executor process runs into a state, it'd stop launching tasks from the driver or reporting task status back.

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
```
$ build/mvn package -DskipTests -pl core
$ build/mvn -Dtest=none -DwildcardSuites=org.apache.spark.executor.CoarseGrainedExecutorBackendSuite test
```

### Was this patch authored or co-authored using generative AI tooling?
No

******************************************************************************
**_Please feel free to skip reading unless you're interested in details_**
******************************************************************************

### Symptom

Our Spark 3 app running on EMR 6.10.0 with Spark 3.3.1 got stuck in the very last step of writing a data frame to S3 by calling `df.write`. Looking at Spark UI, we saw that an executor process hung over 1 hour. After we manually killed the executor process, the app succeeded. Note that the same EMR cluster with two worker nodes was able to run the same app without any issue before and after the incident.

Below is what's observed from relevant container logs and thread dump.

- A regular task that's sent to the executor, which also reported back to the driver upon the task completion.

```
    $zgrep 'task 150' container_1694029806204_12865_01_000001/stderr.gz
    23/09/12 18:13:55 INFO TaskSetManager: Starting task 150.0 in stage 23.0 (TID 923) (ip-10-0-185-107.ec2.internal, executor 3, partition 150, NODE_LOCAL, 4432 bytes) taskResourceAssignments Map()
    23/09/12 18:13:55 INFO TaskSetManager: Finished task 150.0 in stage 23.0 (TID 923) in 126 ms on ip-10-0-185-107.ec2.internal (executor 3) (16/200)

    $zgrep ' 923' container_1694029806204_12865_01_000004/stderr.gz
    23/09/12 18:13:55 INFO YarnCoarseGrainedExecutorBackend: Got assigned task 923

    $zgrep 'task 150' container_1694029806204_12865_01_000004/stderr.gz
    23/09/12 18:13:55 INFO Executor: Running task 150.0 in stage 23.0 (TID 923)
    23/09/12 18:13:55 INFO Executor: Finished task 150.0 in stage 23.0 (TID 923). 4495 bytes result sent to driver
```

- Another task that's sent to the executor but didn't get launched since the single-threaded dispatcher was stuck (presumably in an "infinite loop" as explained later).

```
    $zgrep 'task 153' container_1694029806204_12865_01_000001/stderr.gz
    23/09/12 18:13:55 INFO TaskSetManager: Starting task 153.0 in stage 23.0 (TID 924) (ip-10-0-185-107.ec2.internal, executor 3, partition 153, NODE_LOCAL, 4432 bytes) taskResourceAssignments Map()

    $zgrep ' 924' container_1694029806204_12865_01_000004/stderr.gz
    23/09/12 18:13:55 INFO YarnCoarseGrainedExecutorBackend: Got assigned task 924

    $zgrep 'task 153' container_1694029806204_12865_01_000004/stderr.gz
    >> note that the above command has no matching result, indicating that task 153.0 in stage 23.0 (TID 924) was never launched
```

- Thread dump shows that the dispatcher-Executor thread has the following stack trace.

```
    "dispatcher-Executor" #40 daemon prio=5 os_prio=0 tid=0x0000ffff98e37800 nid=0x1aff runnable [0x0000ffff73bba000]
    java.lang.Thread.State: RUNNABLE
    at scala.runtime.BoxesRunTime.equalsNumObject(BoxesRunTime.java:142)
    at scala.runtime.BoxesRunTime.equals2(BoxesRunTime.java:131)
    at scala.runtime.BoxesRunTime.equals(BoxesRunTime.java:123)
    at scala.collection.mutable.HashTable.elemEquals(HashTable.scala:365)
    at scala.collection.mutable.HashTable.elemEquals$(HashTable.scala:365)
    at scala.collection.mutable.HashMap.elemEquals(HashMap.scala:44)
    at scala.collection.mutable.HashTable.findEntry0(HashTable.scala:140)
    at scala.collection.mutable.HashTable.findOrAddEntry(HashTable.scala:169)
    at scala.collection.mutable.HashTable.findOrAddEntry$(HashTable.scala:167)
    at scala.collection.mutable.HashMap.findOrAddEntry(HashMap.scala:44)
    at scala.collection.mutable.HashMap.put(HashMap.scala:126)
    at scala.collection.mutable.HashMap.update(HashMap.scala:131)
    at org.apache.spark.executor.CoarseGrainedExecutorBackend$$anonfun$receive$1.applyOrElse(CoarseGrainedExecutorBackend.scala:200)
    at org.apache.spark.rpc.netty.Inbox.$anonfun$process$1(Inbox.scala:115)
    at org.apache.spark.rpc.netty.Inbox$$Lambda$323/1930826709.apply$mcV$sp(Unknown Source)
    at org.apache.spark.rpc.netty.Inbox.safelyCall(Inbox.scala:213)
    at org.apache.spark.rpc.netty.Inbox.process(Inbox.scala:100)
    at org.apache.spark.rpc.netty.MessageLoop.org$apache$spark$rpc$netty$MessageLoop$$receiveLoop(MessageLoop.scala:75)
    at org.apache.spark.rpc.netty.MessageLoop$$anon$1.run(MessageLoop.scala:41)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:750)
```

### Relevant code paths

Within an executor process, there's a [dispatcher thread](https://github.com/apache/spark/blob/1fdd46f173f7bc90e0523eb0a2d5e8e27e990102/core/src/main/scala/org/apache/spark/rpc/netty/MessageLoop.scala#L170) dedicated to CoarseGrainedExecutorBackend(a single RPC endpoint) that launches tasks scheduled by the driver. Each task is run on a TaskRunner thread backed by a thread pool created for the executor. The TaskRunner thread and the dispatcher thread are different. However, they read and write a common object (i.e., taskResources) that's a mutable hashmap without thread-safety, in [Executor](https://github.com/apache/spark/blob/1fdd46f173f7bc90e0523eb0a2d5e8e27e990102/core/src/main/scala/org/apache/spark/executor/Executor.scala#L561) and [CoarseGrainedExecutorBackend](https://github.com/apache/spark/blob/1fdd46f173f7bc90e0523eb0a2d5e8e27e990102/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala#L189), respectively.

### What's going on?

Based on the above observations, our hypothesis is that the dispatcher thread runs into an "infinite loop" due to a race condition when two threads access the same hashmap object.  For illustration purpose, let's consider the following scenario where two threads (Thread 1 and Thread 2) access a hash table without thread-safety

- Thread 1 sees A.next = B, but then yields execution to Thread 2
<img src="https://issues.apache.org/jira/secure/attachment/13063040/13063040_hashtable1.png" width="400">

- Thread 2 triggers a resize operation resulting in B.next = A (Note that hashmap doesn't care about ordering), and then yields execution to Thread 1.
<img src="https://issues.apache.org/jira/secure/attachment/13063041/13063041_hashtable2.png" width="400">

- After taking over CPU, Thread 1 would run into an "infinite loop" when traversing the list in the last bucket, given A.next = B and B.next = A in its view.

Closes #43021 from xiongbo-sjtu/master.

Authored-by: Bo Xiong <[email protected]>
Signed-off-by: Mridul Muralidharan <mridul<at>gmail.com>
(cherry picked from commit 8e6b160)
Signed-off-by: Mridul Muralidharan <mridulatgmail.com>
mridulm pushed a commit that referenced this pull request Sep 29, 2023
…edExecutorBackend

### What changes were proposed in this pull request?
Fix a subtle thread-safety issue with CoarseGrainedExecutorBackend where an executor process randomly gets stuck

### Why are the changes needed?
For each executor, the single-threaded dispatcher can run into an "infinite loop" (as explained in the SPARK-45227). Once an executor process runs into a state, it'd stop launching tasks from the driver or reporting task status back.

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
```
$ build/mvn package -DskipTests -pl core
$ build/mvn -Dtest=none -DwildcardSuites=org.apache.spark.executor.CoarseGrainedExecutorBackendSuite test
```

### Was this patch authored or co-authored using generative AI tooling?
No

******************************************************************************
**_Please feel free to skip reading unless you're interested in details_**
******************************************************************************

### Symptom

Our Spark 3 app running on EMR 6.10.0 with Spark 3.3.1 got stuck in the very last step of writing a data frame to S3 by calling `df.write`. Looking at Spark UI, we saw that an executor process hung over 1 hour. After we manually killed the executor process, the app succeeded. Note that the same EMR cluster with two worker nodes was able to run the same app without any issue before and after the incident.

Below is what's observed from relevant container logs and thread dump.

- A regular task that's sent to the executor, which also reported back to the driver upon the task completion.

```
    $zgrep 'task 150' container_1694029806204_12865_01_000001/stderr.gz
    23/09/12 18:13:55 INFO TaskSetManager: Starting task 150.0 in stage 23.0 (TID 923) (ip-10-0-185-107.ec2.internal, executor 3, partition 150, NODE_LOCAL, 4432 bytes) taskResourceAssignments Map()
    23/09/12 18:13:55 INFO TaskSetManager: Finished task 150.0 in stage 23.0 (TID 923) in 126 ms on ip-10-0-185-107.ec2.internal (executor 3) (16/200)

    $zgrep ' 923' container_1694029806204_12865_01_000004/stderr.gz
    23/09/12 18:13:55 INFO YarnCoarseGrainedExecutorBackend: Got assigned task 923

    $zgrep 'task 150' container_1694029806204_12865_01_000004/stderr.gz
    23/09/12 18:13:55 INFO Executor: Running task 150.0 in stage 23.0 (TID 923)
    23/09/12 18:13:55 INFO Executor: Finished task 150.0 in stage 23.0 (TID 923). 4495 bytes result sent to driver
```

- Another task that's sent to the executor but didn't get launched since the single-threaded dispatcher was stuck (presumably in an "infinite loop" as explained later).

```
    $zgrep 'task 153' container_1694029806204_12865_01_000001/stderr.gz
    23/09/12 18:13:55 INFO TaskSetManager: Starting task 153.0 in stage 23.0 (TID 924) (ip-10-0-185-107.ec2.internal, executor 3, partition 153, NODE_LOCAL, 4432 bytes) taskResourceAssignments Map()

    $zgrep ' 924' container_1694029806204_12865_01_000004/stderr.gz
    23/09/12 18:13:55 INFO YarnCoarseGrainedExecutorBackend: Got assigned task 924

    $zgrep 'task 153' container_1694029806204_12865_01_000004/stderr.gz
    >> note that the above command has no matching result, indicating that task 153.0 in stage 23.0 (TID 924) was never launched
```

- Thread dump shows that the dispatcher-Executor thread has the following stack trace.

```
    "dispatcher-Executor" #40 daemon prio=5 os_prio=0 tid=0x0000ffff98e37800 nid=0x1aff runnable [0x0000ffff73bba000]
    java.lang.Thread.State: RUNNABLE
    at scala.runtime.BoxesRunTime.equalsNumObject(BoxesRunTime.java:142)
    at scala.runtime.BoxesRunTime.equals2(BoxesRunTime.java:131)
    at scala.runtime.BoxesRunTime.equals(BoxesRunTime.java:123)
    at scala.collection.mutable.HashTable.elemEquals(HashTable.scala:365)
    at scala.collection.mutable.HashTable.elemEquals$(HashTable.scala:365)
    at scala.collection.mutable.HashMap.elemEquals(HashMap.scala:44)
    at scala.collection.mutable.HashTable.findEntry0(HashTable.scala:140)
    at scala.collection.mutable.HashTable.findOrAddEntry(HashTable.scala:169)
    at scala.collection.mutable.HashTable.findOrAddEntry$(HashTable.scala:167)
    at scala.collection.mutable.HashMap.findOrAddEntry(HashMap.scala:44)
    at scala.collection.mutable.HashMap.put(HashMap.scala:126)
    at scala.collection.mutable.HashMap.update(HashMap.scala:131)
    at org.apache.spark.executor.CoarseGrainedExecutorBackend$$anonfun$receive$1.applyOrElse(CoarseGrainedExecutorBackend.scala:200)
    at org.apache.spark.rpc.netty.Inbox.$anonfun$process$1(Inbox.scala:115)
    at org.apache.spark.rpc.netty.Inbox$$Lambda$323/1930826709.apply$mcV$sp(Unknown Source)
    at org.apache.spark.rpc.netty.Inbox.safelyCall(Inbox.scala:213)
    at org.apache.spark.rpc.netty.Inbox.process(Inbox.scala:100)
    at org.apache.spark.rpc.netty.MessageLoop.org$apache$spark$rpc$netty$MessageLoop$$receiveLoop(MessageLoop.scala:75)
    at org.apache.spark.rpc.netty.MessageLoop$$anon$1.run(MessageLoop.scala:41)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:750)
```

### Relevant code paths

Within an executor process, there's a [dispatcher thread](https://github.com/apache/spark/blob/1fdd46f173f7bc90e0523eb0a2d5e8e27e990102/core/src/main/scala/org/apache/spark/rpc/netty/MessageLoop.scala#L170) dedicated to CoarseGrainedExecutorBackend(a single RPC endpoint) that launches tasks scheduled by the driver. Each task is run on a TaskRunner thread backed by a thread pool created for the executor. The TaskRunner thread and the dispatcher thread are different. However, they read and write a common object (i.e., taskResources) that's a mutable hashmap without thread-safety, in [Executor](https://github.com/apache/spark/blob/1fdd46f173f7bc90e0523eb0a2d5e8e27e990102/core/src/main/scala/org/apache/spark/executor/Executor.scala#L561) and [CoarseGrainedExecutorBackend](https://github.com/apache/spark/blob/1fdd46f173f7bc90e0523eb0a2d5e8e27e990102/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala#L189), respectively.

### What's going on?

Based on the above observations, our hypothesis is that the dispatcher thread runs into an "infinite loop" due to a race condition when two threads access the same hashmap object.  For illustration purpose, let's consider the following scenario where two threads (Thread 1 and Thread 2) access a hash table without thread-safety

- Thread 1 sees A.next = B, but then yields execution to Thread 2
<img src="https://issues.apache.org/jira/secure/attachment/13063040/13063040_hashtable1.png" width="400">

- Thread 2 triggers a resize operation resulting in B.next = A (Note that hashmap doesn't care about ordering), and then yields execution to Thread 1.
<img src="https://issues.apache.org/jira/secure/attachment/13063041/13063041_hashtable2.png" width="400">

- After taking over CPU, Thread 1 would run into an "infinite loop" when traversing the list in the last bucket, given A.next = B and B.next = A in its view.

Closes #43021 from xiongbo-sjtu/master.

Authored-by: Bo Xiong <[email protected]>
Signed-off-by: Mridul Muralidharan <mridul<at>gmail.com>
(cherry picked from commit 8e6b160)
Signed-off-by: Mridul Muralidharan <mridulatgmail.com>
@mridulm
Copy link
Contributor

mridulm commented Sep 29, 2023

Merged to master, 3.5 and 3.4
Thanks for working on this @xiongbo-sjtu !
Ran into conflicts for 3.3, can you create a follow up PR to fix it for branch-3.3 @xiongbo-sjtu ? Thanks.

Thanks for the reviews @JoshRosen, @jiangxb1987, @attilapiros :-)

@xiongbo-sjtu
Copy link
Author

Thanks. Just submit another pull request to patch branch-3.3

#43176

mridulm pushed a commit that referenced this pull request Sep 30, 2023
…edExecutorBackend

Backport of #43021 to branch 3.3

### What changes were proposed in this pull request?
Fix a subtle thread-safety issue with CoarseGrainedExecutorBackend where an executor process randomly gets stuck.

### Why are the changes needed?
For each executor, the single-threaded dispatcher can run into an "infinite loop" (as explained in the SPARK-45227). Once an executor process runs into a state, it'd stop launching tasks from the driver or reporting task status back.

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
```
$ build/mvn package -DskipTests -pl core
$ build/mvn -Dtest=none -DwildcardSuites=org.apache.spark.executor.CoarseGrainedExecutorBackendSuite test
```

### Was this patch authored or co-authored using generative AI tooling?
No

Closes #43176 from xiongbo-sjtu/branch-3.3.

Authored-by: Bo Xiong <[email protected]>
Signed-off-by: Mridul Muralidharan <mridul<at>gmail.com>
viirya pushed a commit to viirya/spark-1 that referenced this pull request Oct 19, 2023
…edExecutorBackend

### What changes were proposed in this pull request?
Fix a subtle thread-safety issue with CoarseGrainedExecutorBackend where an executor process randomly gets stuck

### Why are the changes needed?
For each executor, the single-threaded dispatcher can run into an "infinite loop" (as explained in the SPARK-45227). Once an executor process runs into a state, it'd stop launching tasks from the driver or reporting task status back.

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
```
$ build/mvn package -DskipTests -pl core
$ build/mvn -Dtest=none -DwildcardSuites=org.apache.spark.executor.CoarseGrainedExecutorBackendSuite test
```

### Was this patch authored or co-authored using generative AI tooling?
No

******************************************************************************
**_Please feel free to skip reading unless you're interested in details_**
******************************************************************************

### Symptom

Our Spark 3 app running on EMR 6.10.0 with Spark 3.3.1 got stuck in the very last step of writing a data frame to S3 by calling `df.write`. Looking at Spark UI, we saw that an executor process hung over 1 hour. After we manually killed the executor process, the app succeeded. Note that the same EMR cluster with two worker nodes was able to run the same app without any issue before and after the incident.

Below is what's observed from relevant container logs and thread dump.

- A regular task that's sent to the executor, which also reported back to the driver upon the task completion.

```
    $zgrep 'task 150' container_1694029806204_12865_01_000001/stderr.gz
    23/09/12 18:13:55 INFO TaskSetManager: Starting task 150.0 in stage 23.0 (TID 923) (ip-10-0-185-107.ec2.internal, executor 3, partition 150, NODE_LOCAL, 4432 bytes) taskResourceAssignments Map()
    23/09/12 18:13:55 INFO TaskSetManager: Finished task 150.0 in stage 23.0 (TID 923) in 126 ms on ip-10-0-185-107.ec2.internal (executor 3) (16/200)

    $zgrep ' 923' container_1694029806204_12865_01_000004/stderr.gz
    23/09/12 18:13:55 INFO YarnCoarseGrainedExecutorBackend: Got assigned task 923

    $zgrep 'task 150' container_1694029806204_12865_01_000004/stderr.gz
    23/09/12 18:13:55 INFO Executor: Running task 150.0 in stage 23.0 (TID 923)
    23/09/12 18:13:55 INFO Executor: Finished task 150.0 in stage 23.0 (TID 923). 4495 bytes result sent to driver
```

- Another task that's sent to the executor but didn't get launched since the single-threaded dispatcher was stuck (presumably in an "infinite loop" as explained later).

```
    $zgrep 'task 153' container_1694029806204_12865_01_000001/stderr.gz
    23/09/12 18:13:55 INFO TaskSetManager: Starting task 153.0 in stage 23.0 (TID 924) (ip-10-0-185-107.ec2.internal, executor 3, partition 153, NODE_LOCAL, 4432 bytes) taskResourceAssignments Map()

    $zgrep ' 924' container_1694029806204_12865_01_000004/stderr.gz
    23/09/12 18:13:55 INFO YarnCoarseGrainedExecutorBackend: Got assigned task 924

    $zgrep 'task 153' container_1694029806204_12865_01_000004/stderr.gz
    >> note that the above command has no matching result, indicating that task 153.0 in stage 23.0 (TID 924) was never launched
```

- Thread dump shows that the dispatcher-Executor thread has the following stack trace.

```
    "dispatcher-Executor" apache#40 daemon prio=5 os_prio=0 tid=0x0000ffff98e37800 nid=0x1aff runnable [0x0000ffff73bba000]
    java.lang.Thread.State: RUNNABLE
    at scala.runtime.BoxesRunTime.equalsNumObject(BoxesRunTime.java:142)
    at scala.runtime.BoxesRunTime.equals2(BoxesRunTime.java:131)
    at scala.runtime.BoxesRunTime.equals(BoxesRunTime.java:123)
    at scala.collection.mutable.HashTable.elemEquals(HashTable.scala:365)
    at scala.collection.mutable.HashTable.elemEquals$(HashTable.scala:365)
    at scala.collection.mutable.HashMap.elemEquals(HashMap.scala:44)
    at scala.collection.mutable.HashTable.findEntry0(HashTable.scala:140)
    at scala.collection.mutable.HashTable.findOrAddEntry(HashTable.scala:169)
    at scala.collection.mutable.HashTable.findOrAddEntry$(HashTable.scala:167)
    at scala.collection.mutable.HashMap.findOrAddEntry(HashMap.scala:44)
    at scala.collection.mutable.HashMap.put(HashMap.scala:126)
    at scala.collection.mutable.HashMap.update(HashMap.scala:131)
    at org.apache.spark.executor.CoarseGrainedExecutorBackend$$anonfun$receive$1.applyOrElse(CoarseGrainedExecutorBackend.scala:200)
    at org.apache.spark.rpc.netty.Inbox.$anonfun$process$1(Inbox.scala:115)
    at org.apache.spark.rpc.netty.Inbox$$Lambda$323/1930826709.apply$mcV$sp(Unknown Source)
    at org.apache.spark.rpc.netty.Inbox.safelyCall(Inbox.scala:213)
    at org.apache.spark.rpc.netty.Inbox.process(Inbox.scala:100)
    at org.apache.spark.rpc.netty.MessageLoop.org$apache$spark$rpc$netty$MessageLoop$$receiveLoop(MessageLoop.scala:75)
    at org.apache.spark.rpc.netty.MessageLoop$$anon$1.run(MessageLoop.scala:41)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:750)
```

### Relevant code paths

Within an executor process, there's a [dispatcher thread](https://github.com/apache/spark/blob/1fdd46f173f7bc90e0523eb0a2d5e8e27e990102/core/src/main/scala/org/apache/spark/rpc/netty/MessageLoop.scala#L170) dedicated to CoarseGrainedExecutorBackend(a single RPC endpoint) that launches tasks scheduled by the driver. Each task is run on a TaskRunner thread backed by a thread pool created for the executor. The TaskRunner thread and the dispatcher thread are different. However, they read and write a common object (i.e., taskResources) that's a mutable hashmap without thread-safety, in [Executor](https://github.com/apache/spark/blob/1fdd46f173f7bc90e0523eb0a2d5e8e27e990102/core/src/main/scala/org/apache/spark/executor/Executor.scala#L561) and [CoarseGrainedExecutorBackend](https://github.com/apache/spark/blob/1fdd46f173f7bc90e0523eb0a2d5e8e27e990102/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala#L189), respectively.

### What's going on?

Based on the above observations, our hypothesis is that the dispatcher thread runs into an "infinite loop" due to a race condition when two threads access the same hashmap object.  For illustration purpose, let's consider the following scenario where two threads (Thread 1 and Thread 2) access a hash table without thread-safety

- Thread 1 sees A.next = B, but then yields execution to Thread 2
<img src="https://issues.apache.org/jira/secure/attachment/13063040/13063040_hashtable1.png" width="400">

- Thread 2 triggers a resize operation resulting in B.next = A (Note that hashmap doesn't care about ordering), and then yields execution to Thread 1.
<img src="https://issues.apache.org/jira/secure/attachment/13063041/13063041_hashtable2.png" width="400">

- After taking over CPU, Thread 1 would run into an "infinite loop" when traversing the list in the last bucket, given A.next = B and B.next = A in its view.

Closes apache#43021 from xiongbo-sjtu/master.

Authored-by: Bo Xiong <[email protected]>
Signed-off-by: Mridul Muralidharan <mridul<at>gmail.com>
(cherry picked from commit 8e6b160)
Signed-off-by: Mridul Muralidharan <mridulatgmail.com>
(cherry picked from commit e706ba1)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants