Skip to content

Commit

Permalink
add time limit to allocateExecutors
Browse files Browse the repository at this point in the history
  • Loading branch information
witgo committed May 31, 2014
1 parent e00b656 commit 3c464bd
Show file tree
Hide file tree
Showing 2 changed files with 11 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -256,18 +256,22 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration,
// TODO: Handle container failure
yarnAllocator.addResourceRequests(args.numExecutors)
// Exits the loop if the user thread exits.
while (yarnAllocator.getNumExecutorsRunning < args.numExecutors && userThread.isAlive) {
val startTime = System.currentTimeMillis()
var usedTime = 0L
while ((yarnAllocator.getNumExecutorsRunning < args.numExecutors
&& userThread.isAlive) && (usedTime < 1000L * 60 * 10)) {
if (yarnAllocator.getNumExecutorsFailed >= maxNumExecutorFailures) {
finishApplicationMaster(FinalApplicationStatus.FAILED,
"max number of executor failures reached")
}
yarnAllocator.allocateResources()
val numExecutorsFailed = yarnAllocator.getNumExecutorsFailed
if (numExecutorsFailed > 0) {
yarnAllocator.addResourceRequests(numExecutorsFailed)
}
yarnAllocator.allocateResources()
ApplicationMaster.incrementAllocatorLoop(1)
Thread.sleep(100)
usedTime = System.currentTimeMillis() - startTime
}
} finally {
// In case of exceptions, etc - ensure that count is at least ALLOCATOR_LOOP_WAIT_COUNT,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -204,13 +204,17 @@ class ExecutorLauncher(args: ApplicationMasterArguments, conf: Configuration, sp
// TODO: Handle container failure

yarnAllocator.addResourceRequests(args.numExecutors)
while ((yarnAllocator.getNumExecutorsRunning < args.numExecutors) && (!driverClosed)) {
val startTime = System.currentTimeMillis()
var usedTime = 0L
while (((yarnAllocator.getNumExecutorsRunning < args.numExecutors) && (!driverClosed))
&& (usedTime < 1000L * 60 * 10)) {
yarnAllocator.allocateResources()
val numExecutorsFailed = yarnAllocator.getNumExecutorsFailed
if (numExecutorsFailed > 0) {
yarnAllocator.addResourceRequests(numExecutorsFailed)
}
Thread.sleep(100)
usedTime = System.currentTimeMillis() - startTime
}

logInfo("All executors have launched.")
Expand Down

0 comments on commit 3c464bd

Please sign in to comment.