Skip to content
This repository has been archived by the owner on Oct 23, 2024. It is now read-only.

[DCOS-46585] Fix supervised driver retry logic for outdated tasks #46

Merged
merged 12 commits into from
Jan 18, 2019

Conversation

samvantran
Copy link

What changes were proposed in this pull request?

Resolves DCOS-46585 && COPS-4325

Customers experienced --supervised Spark jobs were retried multiple times in scenarios where an agent would crash, come back, and re-register even when those jobs had already relaunched on a different agent. That is:

- supervised driver is running on agent1
- agent1 crashes
- driver is relaunched on another agent as `<task-id>-retry-1`
- agent1 comes back online and re-registers with scheduler
- spark relaunches the same job as `<task-id>-retry-2`
- now there are two jobs running simultaneously

This is because when an agent would come back and re-register it would send a status update TASK_FAILED for its old driver-task. Previous logic would indiscriminately remove the submissionId from Zookeeper's launchedDrivers node and add it to retryList node. Then, when a new offer came in, it would relaunch another -retry- task even though one was previously running.

For example logs, scroll to bottom

How was this patch tested?

  • Added a unit test to simulate behavior described above
  • Tested manually on a DC/OS cluster by
    - launching a --supervised spark job
    - dcos node ssh <to the agent with the running spark-driver>
    - systemctl stop dcos-mesos-slave
    - docker kill <driver-container-id>
    - [ wait until spark job is relaunched ]
    - systemctl start dcos-mesos-slave
    - [ observe spark driver is not relaunched as `-retry-2` ]
    

Log snippets included below. Notice the -retry-1 task is running when status update for the old task comes in afterward:

19/01/15 19:21:38 TRACE MesosClusterScheduler: Received offers from Mesos: 
... [offers] ...
19/01/15 19:21:39 TRACE MesosClusterScheduler: Using offer 5d421001-0630-4214-9ecb-d5838a2ec149-O2532 to launch driver driver-20190115192138-0001 with taskId: value: "driver-20190115192138-0001"
...
19/01/15 19:21:42 INFO MesosClusterScheduler: Received status update: taskId=driver-20190115192138-0001 state=TASK_STARTING message=''
19/01/15 19:21:43 INFO MesosClusterScheduler: Received status update: taskId=driver-20190115192138-0001 state=TASK_RUNNING message=''
...
19/01/15 19:29:12 INFO MesosClusterScheduler: Received status update: taskId=driver-20190115192138-0001 state=TASK_LOST message='health check timed out' reason=REASON_SLAVE_REMOVED
...
19/01/15 19:31:12 TRACE MesosClusterScheduler: Using offer 5d421001-0630-4214-9ecb-d5838a2ec149-O2681 to launch driver driver-20190115192138-0001 with taskId: value: "driver-20190115192138-0001-retry-1"
...
19/01/15 19:31:15 INFO MesosClusterScheduler: Received status update: taskId=driver-20190115192138-0001-retry-1 state=TASK_STARTING message=''
19/01/15 19:31:16 INFO MesosClusterScheduler: Received status update: taskId=driver-20190115192138-0001-retry-1 state=TASK_RUNNING message=''
...
19/01/15 19:33:45 INFO MesosClusterScheduler: Received status update: taskId=driver-20190115192138-0001 state=TASK_FAILED message='Unreachable agent re-reregistered'
...
19/01/15 19:33:45 INFO MesosClusterScheduler: Received status update: taskId=driver-20190115192138-0001 state=TASK_FAILED message='Abnormal executor termination: unknown container' reason=REASON_EXECUTOR_TERMINATED
19/01/15 19:33:45 ERROR MesosClusterScheduler: Unable to find driver with driver-20190115192138-0001 in status update
...
19/01/15 19:33:47 TRACE MesosClusterScheduler: Using offer 5d421001-0630-4214-9ecb-d5838a2ec149-O2729 to launch driver driver-20190115192138-0001 with taskId: value: "driver-20190115192138-0001-retry-2"
...
19/01/15 19:33:50 INFO MesosClusterScheduler: Received status update: taskId=driver-20190115192138-0001-retry-2 state=TASK_STARTING message=''
19/01/15 19:33:51 INFO MesosClusterScheduler: Received status update: taskId=driver-20190115192138-0001-retry-2 state=TASK_RUNNING message=''

Copy link

@akirillov akirillov left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thanks, @samvantran. Provided fix relies on prior knowledge of task id format (its string representation and parsing) and on the ordering of task ids based on this representation. This heuristic doesn't look strong enough and the fix will stop working in case UUID semantics change in the future. I'd suggest relying on some state tracking data structure which will keep track of (re)launched drivers ordered by time/status. Or even better keep the lost driver ids in a set to perform taskIsOutdated check against it.

@@ -787,6 +796,14 @@ private[spark] class MesosClusterScheduler(
}
}

private def taskIsOutdated(taskId: String, state: MesosClusterSubmissionState): Boolean = {
if (getRetryCountFromTaskId(taskId) < getRetryCountFromTaskId(state.frameworkId)) {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this can be simplified to just getRetryCountFromTaskId(taskId) < getRetryCountFromTaskId(state.frameworkId) without if statement

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ah right. thanks!

This frees us from parsing retry counts from strings and future proofs
us in the event taskId UUID ever changes
Copy link
Author

@samvantran samvantran left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the review @akirillov! Good call on using a set for tracking retried drivers. I've made changes based on your comments. Please take a 2nd look 🙏

@@ -787,6 +796,14 @@ private[spark] class MesosClusterScheduler(
}
}

private def taskIsOutdated(taskId: String, state: MesosClusterSubmissionState): Boolean = {
if (getRetryCountFromTaskId(taskId) < getRetryCountFromTaskId(state.frameworkId)) {
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ah right. thanks!

Copy link

@akirillov akirillov left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thanks, @samvantran. One thing is missing now - the cleanup logic, ids are only added to retriedDrivers set and will still live there until the Dispatcher is terminated thus potentially causing unnecessary memory pressure.

What if to use a reversed approach and keep track of running drivers and ignore any other ids coming with TASK_LOST/TASK_FAILED. I.e. when a driver needs to be relaunched - a prior id is removed from the list, and the new id is added. When the old id is coming in with failed task message - just ignore it if it's not in running.

if (retriedDrivers.contains(taskId)) {
true
} else {
retriedDrivers.add(taskId)

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

having state mutation logic in a boolean function which checks some condition looks a bit confusing

@samvantran
Copy link
Author

samvantran commented Jan 17, 2019

Yeah my original concerns with this approach was that a long lived scheduler could 1) accumulate a lot of IDs and 2) degrade in lookup performance but realistically it wouldn't be that much. Imagine 1 million failed/lost jobs... that would catch cause alarm for any user and they'd like have bigger problems on their hands.

But to your point... I actually had this much longer response written up before I made the HashSet change so I might as well share it:

  • re. tracking drivers, there is already the launchedDrivers map that does just that. It tracks status and last retryState (with number of retries, lastFailedStatus etc). When I was parsing retry counts, I was basing it on the fact that the map would have latest task state (which is updated here)
  • there is even pendingRetryDrivers array that temporarily stores which drivers need to be relaunched and is purged when they are.
  • When you say we remove "prior" IDs and ignore "old" IDs with status TASK_FAILED, how do we know they're old? If we follow my test example we get these events
    task001 -> TASK_RUNNING
    task001 -> TASK_LOST              // retriedDrivers = [task001]
    task001-retry1 -> TASK_RUNNING    // is this where we remove task001 ? 
    task001 -> TASK_FAILED 
    
  • from the example above, how do we know task001-retry-1 is associated with task001 without parsing the taskId string?

I kind of am thinking we should go back to parsing retry count from taskId string because:

  1. like you said, there would be added memory pressure over time with a set
  2. no extra data structures needed
  3. parsing retry count is O(1) operation
  4. parsing taskId against the {RETRY_SEP} constant has been done for at least 2.5 years (see these methods). It still exists in Spark 3.0-SNAPSHOT so I don't suspect UUID semantics will change any time soon.

WDYT?

If ID is not found in either than it is orphaned and we can ignore it
Copy link

@akirillov akirillov left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thanks, @samvantran. LGTM!

One minor comment: def taskIsOutdated probably deserves a scaladoc

@samvantran
Copy link
Author

Good call. I'll add docs. Thanks for the offline discussion. I think this is the right way to go!

@samvantran samvantran merged commit 60e0868 into custom-branch-2.3.x Jan 18, 2019
@samvantran samvantran changed the title [DCOS-46585] Fix supervised driver retries on outdated tasks [DCOS-46585] Fix supervised driver retry logic for outdated tasks Jan 18, 2019
samvantran added a commit that referenced this pull request Jan 18, 2019
This commit fixes a bug where `--supervised` drivers would relaunch after receiving an outdated status update from a restarted/crashed agent even if they had already been relaunched and running elsewhere. In those scenarios, previous logic would cause two identical jobs to be running and ZK state would only have a record of the latest one effectively orphaning the 1st job.
samvantran added a commit that referenced this pull request Jan 22, 2019
… (#47)

This commit fixes a bug where `--supervised` drivers would relaunch after receiving an outdated status update from a restarted/crashed agent even if they had already been relaunched and running elsewhere. In those scenarios, previous logic would cause two identical jobs to be running and ZK state would only have a record of the latest one effectively orphaning the 1st job.
akirillov pushed a commit that referenced this pull request Feb 8, 2019
This commit fixes a bug where `--supervised` drivers would relaunch after receiving an outdated status update from a restarted/crashed agent even if they had already been relaunched and running elsewhere. In those scenarios, previous logic would cause two identical jobs to be running and ZK state would only have a record of the latest one effectively orphaning the 1st job.
@vishnu2kmohan vishnu2kmohan deleted the 46585-duplicate-retries branch February 19, 2019 19:17
@samvantran samvantran restored the 46585-duplicate-retries branch April 2, 2019 14:44
@samvantran samvantran deleted the 46585-duplicate-retries branch April 2, 2019 14:45
alembiewski pushed a commit that referenced this pull request Jun 12, 2019
This commit fixes a bug where `--supervised` drivers would relaunch after receiving an outdated status update from a restarted/crashed agent even if they had already been relaunched and running elsewhere. In those scenarios, previous logic would cause two identical jobs to be running and ZK state would only have a record of the latest one effectively orphaning the 1st job.
alembiewski added a commit that referenced this pull request Aug 19, 2019
* Support for DSCOS_SERVICE_ACCOUNT_CREDENTIAL environment variable in MesosClusterScheduler

* File Based Secrets support

* [SPARK-723][SPARK-740] Add Metrics to Dispatcher and Driver

- Counters: The total number of times that submissions have entered states
- Timers: The duration from submit or launch until a submission entered a given state
- Histogram: The retry counts at time of retry

* Fixes to handling finished drivers

- Rename 'failed' case to 'exception'
- When a driver is 'finished', record its final MesosTaskState
- Fix naming consistency after seeing how they look in practice

* Register "finished" counters up-front

Otherwise their values are never published.

* [SPARK-692] Added spark.mesos.executor.gpus to specify the number of Executor CPUs

* [SPARK-23941][MESOS] Mesos task failed on specific spark app name (#33)

* [SPARK-23941][MESOS] Mesos task failed on specific spark app name

Port from SPARK#21014

** edit: not a direct port from upstream Spark. Changes were needed because we saw PySpark jobs fail to launch when 1) run with docker and 2) including --py-files

==============

* Shell escape only appName, mainClass, default and driverConf

Specifically, we do not want to shell-escape the --py-files. What we've
seen IRL is that for spark jobs that use docker images coupled w/ python
files, the $MESOS_SANDBOX path is escaped and results in
FileNotFoundErrors during py4j.SparkSession.getOrCreate

* [DCOS-39150][SPARK] Support unique Executor IDs in cluster managers (#36)

Using incremental integers as Executor IDs leads to a situation when Spark Executors launched by different Drivers have same IDs. This leads to a situation when Mesos Task IDs for multiple Spark Executors are the same too. This PR prepends UUID unique for a CoarseGrainedSchedulerBackend instance to numeric ID thus allowing to distinguish Executors belonging to different drivers.

This PR reverts commit ebe3c7f "[SPARK-12864][YARN] initialize executorIdCounter after ApplicationMaster killed for max n…)"

* Upgrade of Hadoop, ZooKeeper, and Jackson libraries to fix CVEs. Updates for JSON-related tests. (#43)

List of upgrades for 3rd-party libraries having CVEs:

- Hadoop: 2.7.3 -> 2.7.7. Fixes: CVE-2016-6811, CVE-2017-3166, CVE-2017-3162, CVE-2018-8009
- Jackson 2.6.5 -> 2.9.6. Fixes: CVE-2017-15095, CVE-2017-17485, CVE-2017-7525, CVE-2018-7489, CVE-2016-3720
- ZooKeeper 3.4.6 -> 3.4.13 (https://zookeeper.apache.org/doc/r3.4.13/releasenotes.html)

# Conflicts:
#	dev/deps/spark-deps-hadoop-2.6
#	dev/deps/spark-deps-hadoop-2.7
#	dev/deps/spark-deps-hadoop-3.1
#	pom.xml

* CNI Support for Docker containerizer, binding to SPARK_LOCAL_IP instead of 0.0.0.0 to properly advertise executors during shuffle (#44)

* Spark Dispatcher support for launching applications in the same virtual network by default (#45)

* [DCOS-46585] Fix supervised driver retry logic for outdated tasks (#46)

This commit fixes a bug where `--supervised` drivers would relaunch after receiving an outdated status update from a restarted/crashed agent even if they had already been relaunched and running elsewhere. In those scenarios, previous logic would cause two identical jobs to be running and ZK state would only have a record of the latest one effectively orphaning the 1st job.

* Revert "[SPARK-25088][CORE][MESOS][DOCS] Update Rest Server docs & defaults."

This reverts commit 1024875.

The change introduced in the reverted commit is breaking:
- breaks semantics of `spark.master.rest.enabled` which belongs to Spark Standalone Master only but not to SparkSubmit
- reverts the default behavior for Spark Standalone from REST to legacy RPC
- contains misleading messages in `require` assertion blocks
- prevents users from running jobs without specifying `spark.master.rest.enabled`

* [DCOS-49020] Specify user in CommandInfo for Spark Driver launched on Mesos (#49)

* [DCOS-40974] Mesos checkpointing support for Spark Drivers (#51)

* [DCOS-51158] Improved Task ID assignment for Executor tasks (#52)

* [DCOS-51454] Remove irrelevant Mesos REPL test (#54)

* [DCOS-51453] Added Hadoop 2.9 profile (#53)

* [DCOS-34235] spark.mesos.executor.memoryOverhead equivalent for the Driver when running on Mesos (#55)

* Refactoring of metrics naming to add mesos semantics and avoid clashing with existing Spark metrics (#58)

* [DCOS-34549] Mesos label NPE fix (#60)
rpalaznik pushed a commit that referenced this pull request Feb 24, 2020
* Support for DSCOS_SERVICE_ACCOUNT_CREDENTIAL environment variable in MesosClusterScheduler

* File Based Secrets support

* [SPARK-723][SPARK-740] Add Metrics to Dispatcher and Driver

- Counters: The total number of times that submissions have entered states
- Timers: The duration from submit or launch until a submission entered a given state
- Histogram: The retry counts at time of retry

* Fixes to handling finished drivers

- Rename 'failed' case to 'exception'
- When a driver is 'finished', record its final MesosTaskState
- Fix naming consistency after seeing how they look in practice

* Register "finished" counters up-front

Otherwise their values are never published.

* [SPARK-692] Added spark.mesos.executor.gpus to specify the number of Executor CPUs

* [SPARK-23941][MESOS] Mesos task failed on specific spark app name (#33)

* [SPARK-23941][MESOS] Mesos task failed on specific spark app name

Port from SPARK#21014

** edit: not a direct port from upstream Spark. Changes were needed because we saw PySpark jobs fail to launch when 1) run with docker and 2) including --py-files

==============

* Shell escape only appName, mainClass, default and driverConf

Specifically, we do not want to shell-escape the --py-files. What we've
seen IRL is that for spark jobs that use docker images coupled w/ python
files, the $MESOS_SANDBOX path is escaped and results in
FileNotFoundErrors during py4j.SparkSession.getOrCreate

* [DCOS-39150][SPARK] Support unique Executor IDs in cluster managers (#36)

Using incremental integers as Executor IDs leads to a situation when Spark Executors launched by different Drivers have same IDs. This leads to a situation when Mesos Task IDs for multiple Spark Executors are the same too. This PR prepends UUID unique for a CoarseGrainedSchedulerBackend instance to numeric ID thus allowing to distinguish Executors belonging to different drivers.

This PR reverts commit ebe3c7f "[SPARK-12864][YARN] initialize executorIdCounter after ApplicationMaster killed for max n…)"

* Upgrade of Hadoop, ZooKeeper, and Jackson libraries to fix CVEs. Updates for JSON-related tests. (#43)

List of upgrades for 3rd-party libraries having CVEs:

- Hadoop: 2.7.3 -> 2.7.7. Fixes: CVE-2016-6811, CVE-2017-3166, CVE-2017-3162, CVE-2018-8009
- Jackson 2.6.5 -> 2.9.6. Fixes: CVE-2017-15095, CVE-2017-17485, CVE-2017-7525, CVE-2018-7489, CVE-2016-3720
- ZooKeeper 3.4.6 -> 3.4.13 (https://zookeeper.apache.org/doc/r3.4.13/releasenotes.html)

* CNI Support for Docker containerizer, binding to SPARK_LOCAL_IP instead of 0.0.0.0 to properly advertise executors during shuffle (#44)

* Spark Dispatcher support for launching applications in the same virtual network by default (#45)

* [DCOS-46585] Fix supervised driver retry logic for outdated tasks (#46)

This commit fixes a bug where `--supervised` drivers would relaunch after receiving an outdated status update from a restarted/crashed agent even if they had already been relaunched and running elsewhere. In those scenarios, previous logic would cause two identical jobs to be running and ZK state would only have a record of the latest one effectively orphaning the 1st job.

* Revert "[SPARK-25088][CORE][MESOS][DOCS] Update Rest Server docs & defaults."

This reverts commit 1024875.

The change introduced in the reverted commit is breaking:
- breaks semantics of `spark.master.rest.enabled` which belongs to Spark Standalone Master only but not to SparkSubmit
- reverts the default behavior for Spark Standalone from REST to legacy RPC
- contains misleading messages in `require` assertion blocks
- prevents users from running jobs without specifying `spark.master.rest.enabled`

* [DCOS-49020] Specify user in CommandInfo for Spark Driver launched on Mesos (#49)

* [DCOS-40974] Mesos checkpointing support for Spark Drivers (#51)

* [DCOS-51158] Improved Task ID assignment for Executor tasks (#52)

* [DCOS-51454] Remove irrelevant Mesos REPL test (#54)

* [DCOS-51453] Added Hadoop 2.9 profile (#53)

* [DCOS-34235] spark.mesos.executor.memoryOverhead equivalent for the Driver when running on Mesos (#55)

* Refactoring of metrics naming to add mesos semantics and avoid clashing with existing Spark metrics (#58)

* [DCOS-34549] Mesos label NPE fix (#60)
farhan5900 pushed a commit that referenced this pull request Aug 7, 2020
* Support for DSCOS_SERVICE_ACCOUNT_CREDENTIAL environment variable in MesosClusterScheduler

* File Based Secrets support

* [SPARK-723][SPARK-740] Add Metrics to Dispatcher and Driver

- Counters: The total number of times that submissions have entered states
- Timers: The duration from submit or launch until a submission entered a given state
- Histogram: The retry counts at time of retry

* Fixes to handling finished drivers

- Rename 'failed' case to 'exception'
- When a driver is 'finished', record its final MesosTaskState
- Fix naming consistency after seeing how they look in practice

* Register "finished" counters up-front

Otherwise their values are never published.

* [SPARK-692] Added spark.mesos.executor.gpus to specify the number of Executor CPUs

* [SPARK-23941][MESOS] Mesos task failed on specific spark app name (#33)

* [SPARK-23941][MESOS] Mesos task failed on specific spark app name

Port from SPARK#21014

** edit: not a direct port from upstream Spark. Changes were needed because we saw PySpark jobs fail to launch when 1) run with docker and 2) including --py-files

==============

* Shell escape only appName, mainClass, default and driverConf

Specifically, we do not want to shell-escape the --py-files. What we've
seen IRL is that for spark jobs that use docker images coupled w/ python
files, the $MESOS_SANDBOX path is escaped and results in
FileNotFoundErrors during py4j.SparkSession.getOrCreate

* [DCOS-39150][SPARK] Support unique Executor IDs in cluster managers (#36)

Using incremental integers as Executor IDs leads to a situation when Spark Executors launched by different Drivers have same IDs. This leads to a situation when Mesos Task IDs for multiple Spark Executors are the same too. This PR prepends UUID unique for a CoarseGrainedSchedulerBackend instance to numeric ID thus allowing to distinguish Executors belonging to different drivers.

This PR reverts commit ebe3c7f "[SPARK-12864][YARN] initialize executorIdCounter after ApplicationMaster killed for max n…)"

* Upgrade of Hadoop, ZooKeeper, and Jackson libraries to fix CVEs. Updates for JSON-related tests. (#43)

List of upgrades for 3rd-party libraries having CVEs:

- Hadoop: 2.7.3 -> 2.7.7. Fixes: CVE-2016-6811, CVE-2017-3166, CVE-2017-3162, CVE-2018-8009
- Jackson 2.6.5 -> 2.9.6. Fixes: CVE-2017-15095, CVE-2017-17485, CVE-2017-7525, CVE-2018-7489, CVE-2016-3720
- ZooKeeper 3.4.6 -> 3.4.13 (https://zookeeper.apache.org/doc/r3.4.13/releasenotes.html)

* CNI Support for Docker containerizer, binding to SPARK_LOCAL_IP instead of 0.0.0.0 to properly advertise executors during shuffle (#44)

* Spark Dispatcher support for launching applications in the same virtual network by default (#45)

* [DCOS-46585] Fix supervised driver retry logic for outdated tasks (#46)

This commit fixes a bug where `--supervised` drivers would relaunch after receiving an outdated status update from a restarted/crashed agent even if they had already been relaunched and running elsewhere. In those scenarios, previous logic would cause two identical jobs to be running and ZK state would only have a record of the latest one effectively orphaning the 1st job.

* Revert "[SPARK-25088][CORE][MESOS][DOCS] Update Rest Server docs & defaults."

This reverts commit 1024875.

The change introduced in the reverted commit is breaking:
- breaks semantics of `spark.master.rest.enabled` which belongs to Spark Standalone Master only but not to SparkSubmit
- reverts the default behavior for Spark Standalone from REST to legacy RPC
- contains misleading messages in `require` assertion blocks
- prevents users from running jobs without specifying `spark.master.rest.enabled`

* [DCOS-49020] Specify user in CommandInfo for Spark Driver launched on Mesos (#49)

* [DCOS-40974] Mesos checkpointing support for Spark Drivers (#51)

* [DCOS-51158] Improved Task ID assignment for Executor tasks (#52)

* [DCOS-51454] Remove irrelevant Mesos REPL test (#54)

* [DCOS-51453] Added Hadoop 2.9 profile (#53)

* [DCOS-34235] spark.mesos.executor.memoryOverhead equivalent for the Driver when running on Mesos (#55)

* Refactoring of metrics naming to add mesos semantics and avoid clashing with existing Spark metrics (#58)

* [DCOS-34549] Mesos label NPE fix (#60)
Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants