Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-20540][CORE] Fix unstable executor requests. #17813

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -331,7 +331,7 @@ private[spark] class ExecutorAllocationManager(
val delta = addExecutors(maxNeeded)
logDebug(s"Starting timer to add more executors (to " +
s"expire in $sustainedSchedulerBacklogTimeoutS seconds)")
addTime += sustainedSchedulerBacklogTimeoutS * 1000
addTime = now + (sustainedSchedulerBacklogTimeoutS * 1000)
delta
} else {
0
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,10 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
// `CoarseGrainedSchedulerBackend.this`.
private val executorDataMap = new HashMap[String, ExecutorData]

// Number of executors requested by the cluster manager, [[ExecutorAllocationManager]]
@GuardedBy("CoarseGrainedSchedulerBackend.this")
private var requestedTotalExecutors = 0

// Number of executors requested from the cluster manager that have not registered yet
@GuardedBy("CoarseGrainedSchedulerBackend.this")
private var numPendingExecutors = 0
Expand Down Expand Up @@ -413,6 +417,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
* */
protected def reset(): Unit = {
val executors = synchronized {
requestedTotalExecutors = 0
numPendingExecutors = 0
executorsPendingToRemove.clear()
Set() ++ executorDataMap.keys
Expand Down Expand Up @@ -487,12 +492,21 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
logInfo(s"Requesting $numAdditionalExecutors additional executor(s) from the cluster manager")

val response = synchronized {
requestedTotalExecutors += numAdditionalExecutors
numPendingExecutors += numAdditionalExecutors
logDebug(s"Number of pending executors is now $numPendingExecutors")
if (requestedTotalExecutors !=
(numExistingExecutors + numPendingExecutors - executorsPendingToRemove.size)) {
logDebug(
s"""requestExecutors($numAdditionalExecutors): Executor request doesn't match:
|requestedTotalExecutors = $requestedTotalExecutors
|numExistingExecutors = $numExistingExecutors
|numPendingExecutors = $numPendingExecutors
|executorsPendingToRemove = ${executorsPendingToRemove.size}""".stripMargin)
}

// Account for executors pending to be added or removed
doRequestTotalExecutors(
numExistingExecutors + numPendingExecutors - executorsPendingToRemove.size)
doRequestTotalExecutors(requestedTotalExecutors)
}

defaultAskTimeout.awaitResult(response)
Expand Down Expand Up @@ -524,6 +538,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
}

val response = synchronized {
this.requestedTotalExecutors = numExecutors
this.localityAwareTasks = localityAwareTasks
this.hostToLocalTaskCount = hostToLocalTaskCount

Expand Down Expand Up @@ -589,8 +604,17 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
// take into account executors that are pending to be added or removed.
val adjustTotalExecutors =
if (!replace) {
doRequestTotalExecutors(
numExistingExecutors + numPendingExecutors - executorsPendingToRemove.size)
requestedTotalExecutors = math.max(requestedTotalExecutors - executorsToKill.size, 0)
if (requestedTotalExecutors !=
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Won't this cause the message to be logged in the situation you describe in the PR description? Isn't that an "expected" situation? If so I'd demote this message, since users tend to get scared when messages like this one show up.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, it would. I can change it to debug or remove it. This was mainly for us to see how often it happened. With the fix to the request timing, this doesn't tend to happen at all. It is just if the method is called every 100ms that you see the behavior all the time because there isn't enough time for kills and requests to complete before recomputing.

Copy link
Contributor

@skonto skonto Oct 25, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hey @rdblue I have seen the message while testing dynamic allocation on mesos:

17/10/25 13:58:44 INFO MesosCoarseGrainedSchedulerBackend: Actual list of executor(s) to be killed is 1
17/10/25 13:58:44 DEBUG MesosCoarseGrainedSchedulerBackend: killExecutors(ArrayBuffer(1), false, false): Executor counts do not match:
requestedTotalExecutors  = 0
numExistingExecutors     = 2
numPendingExecutors      = 0
executorsPendingToRemove = 1

The executors are removed at some point after that message.
Test is here.
What should I expect here? I am a bit confused.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is just informational. The problem is that the state of the allocation manager isn't synced with the scheduler. Instead, the allocator sends messages to try to control the scheduler backend to get the same state. For example, instead of telling the scheduler backend that the desired number of executors is 10, the allocator sends a message to add 2 executors. When this gets out of sync because of failures or network delay, you end up with these messages.

When you see these, make sure you're just out of sync (and will eventually get back in sync), and not in a state where the scheduler and allocator can't reconcile the required number of executors. That's what this PR tried to fix.

The long-term solution is to update the communication so that the allocator requests its ideal state, always telling the scheduler backend how many executors it currently needs, instead of killing or requesting more.

(numExistingExecutors + numPendingExecutors - executorsPendingToRemove.size)) {
logDebug(
s"""killExecutors($executorIds, $replace, $force): Executor counts do not match:
|requestedTotalExecutors = $requestedTotalExecutors
|numExistingExecutors = $numExistingExecutors
|numPendingExecutors = $numPendingExecutors
|executorsPendingToRemove = ${executorsPendingToRemove.size}""".stripMargin)
}
doRequestTotalExecutors(requestedTotalExecutors)
} else {
numPendingExecutors += knownExecutors.size
Future.successful(true)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -356,12 +356,13 @@ class StandaloneDynamicAllocationSuite
test("kill the same executor twice (SPARK-9795)") {
sc = new SparkContext(appConf)
val appId = sc.applicationId
sc.requestExecutors(2)
eventually(timeout(10.seconds), interval(10.millis)) {
val apps = getApplications()
assert(apps.size === 1)
assert(apps.head.id === appId)
assert(apps.head.executors.size === 2)
assert(apps.head.getExecutorLimit === Int.MaxValue)
assert(apps.head.getExecutorLimit === 2)
}
// sync executors between the Master and the driver, needed because
// the driver refuses to kill executors it does not know about
Expand All @@ -380,12 +381,13 @@ class StandaloneDynamicAllocationSuite
test("the pending replacement executors should not be lost (SPARK-10515)") {
sc = new SparkContext(appConf)
val appId = sc.applicationId
sc.requestExecutors(2)
eventually(timeout(10.seconds), interval(10.millis)) {
val apps = getApplications()
assert(apps.size === 1)
assert(apps.head.id === appId)
assert(apps.head.executors.size === 2)
assert(apps.head.getExecutorLimit === Int.MaxValue)
assert(apps.head.getExecutorLimit === 2)
}
// sync executors between the Master and the driver, needed because
// the driver refuses to kill executors it does not know about
Expand Down