Skip to content

Commit

Permalink
[SPARK-1978] In some cases, spark-yarn does not automatically restart…
Browse files Browse the repository at this point in the history
… the failed container

Author: witgo <[email protected]>

Closes apache#921 from witgo/allocateExecutors and squashes the following commits:

bc3aa66 [witgo] review commit
8800eba [witgo] Merge branch 'master' of https://github.com/apache/spark into allocateExecutors
32ac7af [witgo] review commit
056b8c7 [witgo] Merge branch 'master' of https://github.com/apache/spark into allocateExecutors
04c6f7e [witgo] Merge branch 'master' into allocateExecutors
aff827c [witgo] review commit
5c376e0 [witgo] Merge branch 'master' of https://github.com/apache/spark into allocateExecutors
1faf4f4 [witgo] Merge branch 'master' into allocateExecutors
3c464bd [witgo] add time limit to allocateExecutors
e00b656 [witgo] In some cases, yarn does not automatically restart the container
  • Loading branch information
witgo authored and tgravescs committed Jun 10, 2014
1 parent a9a461c commit 884ca71
Show file tree
Hide file tree
Showing 2 changed files with 34 additions and 27 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -252,15 +252,12 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration,
try {
logInfo("Allocating " + args.numExecutors + " executors.")
// Wait until all containers have finished
// TODO: This is a bit ugly. Can we make it nicer?
// TODO: Handle container failure
yarnAllocator.addResourceRequests(args.numExecutors)
yarnAllocator.allocateResources()
// Exits the loop if the user thread exits.
while (yarnAllocator.getNumExecutorsRunning < args.numExecutors && userThread.isAlive) {
if (yarnAllocator.getNumExecutorsFailed >= maxNumExecutorFailures) {
finishApplicationMaster(FinalApplicationStatus.FAILED,
"max number of executor failures reached")
}
checkNumExecutorsFailed()
allocateMissingExecutor()
yarnAllocator.allocateResources()
ApplicationMaster.incrementAllocatorLoop(1)
Thread.sleep(100)
Expand Down Expand Up @@ -289,23 +286,31 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration,
}
}

private def allocateMissingExecutor() {
val missingExecutorCount = args.numExecutors - yarnAllocator.getNumExecutorsRunning -
yarnAllocator.getNumPendingAllocate
if (missingExecutorCount > 0) {
logInfo("Allocating %d containers to make up for (potentially) lost containers".
format(missingExecutorCount))
yarnAllocator.addResourceRequests(missingExecutorCount)
}
}

private def checkNumExecutorsFailed() {
if (yarnAllocator.getNumExecutorsFailed >= maxNumExecutorFailures) {
finishApplicationMaster(FinalApplicationStatus.FAILED,
"max number of executor failures reached")
}
}

private def launchReporterThread(_sleepTime: Long): Thread = {
val sleepTime = if (_sleepTime <= 0) 0 else _sleepTime

val t = new Thread {
override def run() {
while (userThread.isAlive) {
if (yarnAllocator.getNumExecutorsFailed >= maxNumExecutorFailures) {
finishApplicationMaster(FinalApplicationStatus.FAILED,
"max number of executor failures reached")
}
val missingExecutorCount = args.numExecutors - yarnAllocator.getNumExecutorsRunning -
yarnAllocator.getNumPendingAllocate
if (missingExecutorCount > 0) {
logInfo("Allocating %d containers to make up for (potentially) lost containers".
format(missingExecutorCount))
yarnAllocator.addResourceRequests(missingExecutorCount)
}
checkNumExecutorsFailed()
allocateMissingExecutor()
sendProgress()
Thread.sleep(sleepTime)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -200,17 +200,25 @@ class ExecutorLauncher(args: ApplicationMasterArguments, conf: Configuration, sp

logInfo("Allocating " + args.numExecutors + " executors.")
// Wait until all containers have finished
// TODO: This is a bit ugly. Can we make it nicer?
// TODO: Handle container failure

yarnAllocator.addResourceRequests(args.numExecutors)
yarnAllocator.allocateResources()
while ((yarnAllocator.getNumExecutorsRunning < args.numExecutors) && (!driverClosed)) {
allocateMissingExecutor()
yarnAllocator.allocateResources()
Thread.sleep(100)
}

logInfo("All executors have launched.")
}

private def allocateMissingExecutor() {
val missingExecutorCount = args.numExecutors - yarnAllocator.getNumExecutorsRunning -
yarnAllocator.getNumPendingAllocate
if (missingExecutorCount > 0) {
logInfo("Allocating %d containers to make up for (potentially) lost containers".
format(missingExecutorCount))
yarnAllocator.addResourceRequests(missingExecutorCount)
}
}

// TODO: We might want to extend this to allocate more containers in case they die !
Expand All @@ -220,13 +228,7 @@ class ExecutorLauncher(args: ApplicationMasterArguments, conf: Configuration, sp
val t = new Thread {
override def run() {
while (!driverClosed) {
val missingExecutorCount = args.numExecutors - yarnAllocator.getNumExecutorsRunning -
yarnAllocator.getNumPendingAllocate
if (missingExecutorCount > 0) {
logInfo("Allocating %d containers to make up for (potentially) lost containers".
format(missingExecutorCount))
yarnAllocator.addResourceRequests(missingExecutorCount)
}
allocateMissingExecutor()
sendProgress()
Thread.sleep(sleepTime)
}
Expand Down

0 comments on commit 884ca71

Please sign in to comment.