-
Notifications
You must be signed in to change notification settings - Fork 7
[DCOS-46585] Fix supervised driver retry logic for outdated tasks #46
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
thanks, @samvantran. Provided fix relies on prior knowledge of task id format (its string representation and parsing) and on the ordering of task ids based on this representation. This heuristic doesn't look strong enough and the fix will stop working in case UUID semantics change in the future. I'd suggest relying on some state tracking data structure which will keep track of (re)launched drivers ordered by time/status. Or even better keep the lost driver ids in a set to perform taskIsOutdated
check against it.
@@ -787,6 +796,14 @@ private[spark] class MesosClusterScheduler( | |||
} | |||
} | |||
|
|||
private def taskIsOutdated(taskId: String, state: MesosClusterSubmissionState): Boolean = { | |||
if (getRetryCountFromTaskId(taskId) < getRetryCountFromTaskId(state.frameworkId)) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this can be simplified to just getRetryCountFromTaskId(taskId) < getRetryCountFromTaskId(state.frameworkId)
without if statement
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ah right. thanks!
This frees us from parsing retry counts from strings and future proofs us in the event taskId UUID ever changes
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the review @akirillov! Good call on using a set for tracking retried drivers. I've made changes based on your comments. Please take a 2nd look 🙏
@@ -787,6 +796,14 @@ private[spark] class MesosClusterScheduler( | |||
} | |||
} | |||
|
|||
private def taskIsOutdated(taskId: String, state: MesosClusterSubmissionState): Boolean = { | |||
if (getRetryCountFromTaskId(taskId) < getRetryCountFromTaskId(state.frameworkId)) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ah right. thanks!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
thanks, @samvantran. One thing is missing now - the cleanup logic, ids are only added to retriedDrivers
set and will still live there until the Dispatcher is terminated thus potentially causing unnecessary memory pressure.
What if to use a reversed approach and keep track of running drivers and ignore any other ids coming with TASK_LOST
/TASK_FAILED
. I.e. when a driver needs to be relaunched - a prior id is removed from the list, and the new id is added. When the old id is coming in with failed task message - just ignore it if it's not in running.
if (retriedDrivers.contains(taskId)) { | ||
true | ||
} else { | ||
retriedDrivers.add(taskId) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
having state mutation logic in a boolean function which checks some condition looks a bit confusing
Yeah my original concerns with this approach was that a long lived scheduler could 1) accumulate a lot of IDs and 2) degrade in lookup performance but realistically it wouldn't be that much. Imagine 1 million failed/lost jobs... that would catch cause alarm for any user and they'd like have bigger problems on their hands. But to your point... I actually had this much longer response written up before I made the HashSet change so I might as well share it:
I kind of am thinking we should go back to parsing retry count from taskId string because:
WDYT? |
If ID is not found in either than it is orphaned and we can ignore it
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
thanks, @samvantran. LGTM!
One minor comment: def taskIsOutdated
probably deserves a scaladoc
Good call. I'll add docs. Thanks for the offline discussion. I think this is the right way to go! |
This commit fixes a bug where `--supervised` drivers would relaunch after receiving an outdated status update from a restarted/crashed agent even if they had already been relaunched and running elsewhere. In those scenarios, previous logic would cause two identical jobs to be running and ZK state would only have a record of the latest one effectively orphaning the 1st job.
… (#47) This commit fixes a bug where `--supervised` drivers would relaunch after receiving an outdated status update from a restarted/crashed agent even if they had already been relaunched and running elsewhere. In those scenarios, previous logic would cause two identical jobs to be running and ZK state would only have a record of the latest one effectively orphaning the 1st job.
This commit fixes a bug where `--supervised` drivers would relaunch after receiving an outdated status update from a restarted/crashed agent even if they had already been relaunched and running elsewhere. In those scenarios, previous logic would cause two identical jobs to be running and ZK state would only have a record of the latest one effectively orphaning the 1st job.
This commit fixes a bug where `--supervised` drivers would relaunch after receiving an outdated status update from a restarted/crashed agent even if they had already been relaunched and running elsewhere. In those scenarios, previous logic would cause two identical jobs to be running and ZK state would only have a record of the latest one effectively orphaning the 1st job.
* Support for DSCOS_SERVICE_ACCOUNT_CREDENTIAL environment variable in MesosClusterScheduler * File Based Secrets support * [SPARK-723][SPARK-740] Add Metrics to Dispatcher and Driver - Counters: The total number of times that submissions have entered states - Timers: The duration from submit or launch until a submission entered a given state - Histogram: The retry counts at time of retry * Fixes to handling finished drivers - Rename 'failed' case to 'exception' - When a driver is 'finished', record its final MesosTaskState - Fix naming consistency after seeing how they look in practice * Register "finished" counters up-front Otherwise their values are never published. * [SPARK-692] Added spark.mesos.executor.gpus to specify the number of Executor CPUs * [SPARK-23941][MESOS] Mesos task failed on specific spark app name (#33) * [SPARK-23941][MESOS] Mesos task failed on specific spark app name Port from SPARK#21014 ** edit: not a direct port from upstream Spark. Changes were needed because we saw PySpark jobs fail to launch when 1) run with docker and 2) including --py-files ============== * Shell escape only appName, mainClass, default and driverConf Specifically, we do not want to shell-escape the --py-files. What we've seen IRL is that for spark jobs that use docker images coupled w/ python files, the $MESOS_SANDBOX path is escaped and results in FileNotFoundErrors during py4j.SparkSession.getOrCreate * [DCOS-39150][SPARK] Support unique Executor IDs in cluster managers (#36) Using incremental integers as Executor IDs leads to a situation when Spark Executors launched by different Drivers have same IDs. This leads to a situation when Mesos Task IDs for multiple Spark Executors are the same too. This PR prepends UUID unique for a CoarseGrainedSchedulerBackend instance to numeric ID thus allowing to distinguish Executors belonging to different drivers. This PR reverts commit ebe3c7f "[SPARK-12864][YARN] initialize executorIdCounter after ApplicationMaster killed for max n…)" * Upgrade of Hadoop, ZooKeeper, and Jackson libraries to fix CVEs. Updates for JSON-related tests. (#43) List of upgrades for 3rd-party libraries having CVEs: - Hadoop: 2.7.3 -> 2.7.7. Fixes: CVE-2016-6811, CVE-2017-3166, CVE-2017-3162, CVE-2018-8009 - Jackson 2.6.5 -> 2.9.6. Fixes: CVE-2017-15095, CVE-2017-17485, CVE-2017-7525, CVE-2018-7489, CVE-2016-3720 - ZooKeeper 3.4.6 -> 3.4.13 (https://zookeeper.apache.org/doc/r3.4.13/releasenotes.html) # Conflicts: # dev/deps/spark-deps-hadoop-2.6 # dev/deps/spark-deps-hadoop-2.7 # dev/deps/spark-deps-hadoop-3.1 # pom.xml * CNI Support for Docker containerizer, binding to SPARK_LOCAL_IP instead of 0.0.0.0 to properly advertise executors during shuffle (#44) * Spark Dispatcher support for launching applications in the same virtual network by default (#45) * [DCOS-46585] Fix supervised driver retry logic for outdated tasks (#46) This commit fixes a bug where `--supervised` drivers would relaunch after receiving an outdated status update from a restarted/crashed agent even if they had already been relaunched and running elsewhere. In those scenarios, previous logic would cause two identical jobs to be running and ZK state would only have a record of the latest one effectively orphaning the 1st job. * Revert "[SPARK-25088][CORE][MESOS][DOCS] Update Rest Server docs & defaults." This reverts commit 1024875. The change introduced in the reverted commit is breaking: - breaks semantics of `spark.master.rest.enabled` which belongs to Spark Standalone Master only but not to SparkSubmit - reverts the default behavior for Spark Standalone from REST to legacy RPC - contains misleading messages in `require` assertion blocks - prevents users from running jobs without specifying `spark.master.rest.enabled` * [DCOS-49020] Specify user in CommandInfo for Spark Driver launched on Mesos (#49) * [DCOS-40974] Mesos checkpointing support for Spark Drivers (#51) * [DCOS-51158] Improved Task ID assignment for Executor tasks (#52) * [DCOS-51454] Remove irrelevant Mesos REPL test (#54) * [DCOS-51453] Added Hadoop 2.9 profile (#53) * [DCOS-34235] spark.mesos.executor.memoryOverhead equivalent for the Driver when running on Mesos (#55) * Refactoring of metrics naming to add mesos semantics and avoid clashing with existing Spark metrics (#58) * [DCOS-34549] Mesos label NPE fix (#60)
* Support for DSCOS_SERVICE_ACCOUNT_CREDENTIAL environment variable in MesosClusterScheduler * File Based Secrets support * [SPARK-723][SPARK-740] Add Metrics to Dispatcher and Driver - Counters: The total number of times that submissions have entered states - Timers: The duration from submit or launch until a submission entered a given state - Histogram: The retry counts at time of retry * Fixes to handling finished drivers - Rename 'failed' case to 'exception' - When a driver is 'finished', record its final MesosTaskState - Fix naming consistency after seeing how they look in practice * Register "finished" counters up-front Otherwise their values are never published. * [SPARK-692] Added spark.mesos.executor.gpus to specify the number of Executor CPUs * [SPARK-23941][MESOS] Mesos task failed on specific spark app name (#33) * [SPARK-23941][MESOS] Mesos task failed on specific spark app name Port from SPARK#21014 ** edit: not a direct port from upstream Spark. Changes were needed because we saw PySpark jobs fail to launch when 1) run with docker and 2) including --py-files ============== * Shell escape only appName, mainClass, default and driverConf Specifically, we do not want to shell-escape the --py-files. What we've seen IRL is that for spark jobs that use docker images coupled w/ python files, the $MESOS_SANDBOX path is escaped and results in FileNotFoundErrors during py4j.SparkSession.getOrCreate * [DCOS-39150][SPARK] Support unique Executor IDs in cluster managers (#36) Using incremental integers as Executor IDs leads to a situation when Spark Executors launched by different Drivers have same IDs. This leads to a situation when Mesos Task IDs for multiple Spark Executors are the same too. This PR prepends UUID unique for a CoarseGrainedSchedulerBackend instance to numeric ID thus allowing to distinguish Executors belonging to different drivers. This PR reverts commit ebe3c7f "[SPARK-12864][YARN] initialize executorIdCounter after ApplicationMaster killed for max n…)" * Upgrade of Hadoop, ZooKeeper, and Jackson libraries to fix CVEs. Updates for JSON-related tests. (#43) List of upgrades for 3rd-party libraries having CVEs: - Hadoop: 2.7.3 -> 2.7.7. Fixes: CVE-2016-6811, CVE-2017-3166, CVE-2017-3162, CVE-2018-8009 - Jackson 2.6.5 -> 2.9.6. Fixes: CVE-2017-15095, CVE-2017-17485, CVE-2017-7525, CVE-2018-7489, CVE-2016-3720 - ZooKeeper 3.4.6 -> 3.4.13 (https://zookeeper.apache.org/doc/r3.4.13/releasenotes.html) * CNI Support for Docker containerizer, binding to SPARK_LOCAL_IP instead of 0.0.0.0 to properly advertise executors during shuffle (#44) * Spark Dispatcher support for launching applications in the same virtual network by default (#45) * [DCOS-46585] Fix supervised driver retry logic for outdated tasks (#46) This commit fixes a bug where `--supervised` drivers would relaunch after receiving an outdated status update from a restarted/crashed agent even if they had already been relaunched and running elsewhere. In those scenarios, previous logic would cause two identical jobs to be running and ZK state would only have a record of the latest one effectively orphaning the 1st job. * Revert "[SPARK-25088][CORE][MESOS][DOCS] Update Rest Server docs & defaults." This reverts commit 1024875. The change introduced in the reverted commit is breaking: - breaks semantics of `spark.master.rest.enabled` which belongs to Spark Standalone Master only but not to SparkSubmit - reverts the default behavior for Spark Standalone from REST to legacy RPC - contains misleading messages in `require` assertion blocks - prevents users from running jobs without specifying `spark.master.rest.enabled` * [DCOS-49020] Specify user in CommandInfo for Spark Driver launched on Mesos (#49) * [DCOS-40974] Mesos checkpointing support for Spark Drivers (#51) * [DCOS-51158] Improved Task ID assignment for Executor tasks (#52) * [DCOS-51454] Remove irrelevant Mesos REPL test (#54) * [DCOS-51453] Added Hadoop 2.9 profile (#53) * [DCOS-34235] spark.mesos.executor.memoryOverhead equivalent for the Driver when running on Mesos (#55) * Refactoring of metrics naming to add mesos semantics and avoid clashing with existing Spark metrics (#58) * [DCOS-34549] Mesos label NPE fix (#60)
* Support for DSCOS_SERVICE_ACCOUNT_CREDENTIAL environment variable in MesosClusterScheduler * File Based Secrets support * [SPARK-723][SPARK-740] Add Metrics to Dispatcher and Driver - Counters: The total number of times that submissions have entered states - Timers: The duration from submit or launch until a submission entered a given state - Histogram: The retry counts at time of retry * Fixes to handling finished drivers - Rename 'failed' case to 'exception' - When a driver is 'finished', record its final MesosTaskState - Fix naming consistency after seeing how they look in practice * Register "finished" counters up-front Otherwise their values are never published. * [SPARK-692] Added spark.mesos.executor.gpus to specify the number of Executor CPUs * [SPARK-23941][MESOS] Mesos task failed on specific spark app name (#33) * [SPARK-23941][MESOS] Mesos task failed on specific spark app name Port from SPARK#21014 ** edit: not a direct port from upstream Spark. Changes were needed because we saw PySpark jobs fail to launch when 1) run with docker and 2) including --py-files ============== * Shell escape only appName, mainClass, default and driverConf Specifically, we do not want to shell-escape the --py-files. What we've seen IRL is that for spark jobs that use docker images coupled w/ python files, the $MESOS_SANDBOX path is escaped and results in FileNotFoundErrors during py4j.SparkSession.getOrCreate * [DCOS-39150][SPARK] Support unique Executor IDs in cluster managers (#36) Using incremental integers as Executor IDs leads to a situation when Spark Executors launched by different Drivers have same IDs. This leads to a situation when Mesos Task IDs for multiple Spark Executors are the same too. This PR prepends UUID unique for a CoarseGrainedSchedulerBackend instance to numeric ID thus allowing to distinguish Executors belonging to different drivers. This PR reverts commit ebe3c7f "[SPARK-12864][YARN] initialize executorIdCounter after ApplicationMaster killed for max n…)" * Upgrade of Hadoop, ZooKeeper, and Jackson libraries to fix CVEs. Updates for JSON-related tests. (#43) List of upgrades for 3rd-party libraries having CVEs: - Hadoop: 2.7.3 -> 2.7.7. Fixes: CVE-2016-6811, CVE-2017-3166, CVE-2017-3162, CVE-2018-8009 - Jackson 2.6.5 -> 2.9.6. Fixes: CVE-2017-15095, CVE-2017-17485, CVE-2017-7525, CVE-2018-7489, CVE-2016-3720 - ZooKeeper 3.4.6 -> 3.4.13 (https://zookeeper.apache.org/doc/r3.4.13/releasenotes.html) * CNI Support for Docker containerizer, binding to SPARK_LOCAL_IP instead of 0.0.0.0 to properly advertise executors during shuffle (#44) * Spark Dispatcher support for launching applications in the same virtual network by default (#45) * [DCOS-46585] Fix supervised driver retry logic for outdated tasks (#46) This commit fixes a bug where `--supervised` drivers would relaunch after receiving an outdated status update from a restarted/crashed agent even if they had already been relaunched and running elsewhere. In those scenarios, previous logic would cause two identical jobs to be running and ZK state would only have a record of the latest one effectively orphaning the 1st job. * Revert "[SPARK-25088][CORE][MESOS][DOCS] Update Rest Server docs & defaults." This reverts commit 1024875. The change introduced in the reverted commit is breaking: - breaks semantics of `spark.master.rest.enabled` which belongs to Spark Standalone Master only but not to SparkSubmit - reverts the default behavior for Spark Standalone from REST to legacy RPC - contains misleading messages in `require` assertion blocks - prevents users from running jobs without specifying `spark.master.rest.enabled` * [DCOS-49020] Specify user in CommandInfo for Spark Driver launched on Mesos (#49) * [DCOS-40974] Mesos checkpointing support for Spark Drivers (#51) * [DCOS-51158] Improved Task ID assignment for Executor tasks (#52) * [DCOS-51454] Remove irrelevant Mesos REPL test (#54) * [DCOS-51453] Added Hadoop 2.9 profile (#53) * [DCOS-34235] spark.mesos.executor.memoryOverhead equivalent for the Driver when running on Mesos (#55) * Refactoring of metrics naming to add mesos semantics and avoid clashing with existing Spark metrics (#58) * [DCOS-34549] Mesos label NPE fix (#60)
What changes were proposed in this pull request?
Resolves DCOS-46585 && COPS-4325
Customers experienced
--supervised
Spark jobs were retried multiple times in scenarios where an agent would crash, come back, and re-register even when those jobs had already relaunched on a different agent. That is:This is because when an agent would come back and re-register it would send a status update
TASK_FAILED
for its old driver-task. Previous logic would indiscriminately remove thesubmissionId
from Zookeeper'slaunchedDrivers
node and add it toretryList
node. Then, when a new offer came in, it would relaunch another-retry-
task even though one was previously running.For example logs, scroll to bottom
How was this patch tested?
Log snippets included below. Notice the
-retry-1
task is running when status update for the old task comes in afterward: