Skip to content

Commit

Permalink
review commit
Browse files Browse the repository at this point in the history
  • Loading branch information
witgo committed Jun 5, 2014
1 parent 5c376e0 commit aff827c
Show file tree
Hide file tree
Showing 2 changed files with 34 additions and 45 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -252,26 +252,14 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration,
try {
logInfo("Allocating " + args.numExecutors + " executors.")
// Wait until all containers have finished
// TODO: This is a bit ugly. Can we make it nicer?
// TODO: Handle container failure
yarnAllocator.addResourceRequests(args.numExecutors)
// Exits the loop if the user thread exits.
val startTime = System.currentTimeMillis()
var usedTime = 0L
while ((yarnAllocator.getNumExecutorsRunning < args.numExecutors
&& userThread.isAlive) && (usedTime < 1000L * 60 * 10)) {
if (yarnAllocator.getNumExecutorsFailed >= maxNumExecutorFailures) {
finishApplicationMaster(FinalApplicationStatus.FAILED,
"max number of executor failures reached")
}
while (yarnAllocator.getNumExecutorsRunning < args.numExecutors && userThread.isAlive) {
yarnAllocator.allocateResources()
val numExecutorsFailed = yarnAllocator.getNumExecutorsFailed
if (numExecutorsFailed > 0) {
yarnAllocator.addResourceRequests(numExecutorsFailed)
}
checkNumExecutorsFailed()
allocateMissingExecutor()
ApplicationMaster.incrementAllocatorLoop(1)
Thread.sleep(100)
usedTime = System.currentTimeMillis() - startTime
}
} finally {
// In case of exceptions, etc - ensure that count is at least ALLOCATOR_LOOP_WAIT_COUNT,
Expand All @@ -297,23 +285,31 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration,
}
}

private def allocateMissingExecutor() {
val missingExecutorCount = args.numExecutors - yarnAllocator.getNumExecutorsRunning -
yarnAllocator.getNumPendingAllocate
if (missingExecutorCount > 0) {
logInfo("Allocating %d containers to make up for (potentially) lost containers".
format(missingExecutorCount))
yarnAllocator.addResourceRequests(missingExecutorCount)
}
}

private def checkNumExecutorsFailed() {
if (yarnAllocator.getNumExecutorsFailed >= maxNumExecutorFailures) {
finishApplicationMaster(FinalApplicationStatus.FAILED,
"max number of executor failures reached")
}
}

private def launchReporterThread(_sleepTime: Long): Thread = {
val sleepTime = if (_sleepTime <= 0) 0 else _sleepTime

val t = new Thread {
override def run() {
while (userThread.isAlive) {
if (yarnAllocator.getNumExecutorsFailed >= maxNumExecutorFailures) {
finishApplicationMaster(FinalApplicationStatus.FAILED,
"max number of executor failures reached")
}
val missingExecutorCount = args.numExecutors - yarnAllocator.getNumExecutorsRunning -
yarnAllocator.getNumPendingAllocate
if (missingExecutorCount > 0) {
logInfo("Allocating %d containers to make up for (potentially) lost containers".
format(missingExecutorCount))
yarnAllocator.addResourceRequests(missingExecutorCount)
}
checkNumExecutorsFailed()
allocateMissingExecutor()
sendProgress()
Thread.sleep(sleepTime)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -200,25 +200,24 @@ class ExecutorLauncher(args: ApplicationMasterArguments, conf: Configuration, sp

logInfo("Allocating " + args.numExecutors + " executors.")
// Wait until all containers have finished
// TODO: This is a bit ugly. Can we make it nicer?
// TODO: Handle container failure

yarnAllocator.addResourceRequests(args.numExecutors)
val startTime = System.currentTimeMillis()
var usedTime = 0L
while (((yarnAllocator.getNumExecutorsRunning < args.numExecutors) && (!driverClosed))
&& (usedTime < 1000L * 60 * 10)) {
while ((yarnAllocator.getNumExecutorsRunning < args.numExecutors) && (!driverClosed)) {
yarnAllocator.allocateResources()
val numExecutorsFailed = yarnAllocator.getNumExecutorsFailed
if (numExecutorsFailed > 0) {
yarnAllocator.addResourceRequests(numExecutorsFailed)
}
allocateMissingExecutor()
Thread.sleep(100)
usedTime = System.currentTimeMillis() - startTime
}

logInfo("All executors have launched.")
}

private def allocateMissingExecutor() {
val missingExecutorCount = args.numExecutors - yarnAllocator.getNumExecutorsRunning -
yarnAllocator.getNumPendingAllocate
if (missingExecutorCount > 0) {
logInfo("Allocating %d containers to make up for (potentially) lost containers".
format(missingExecutorCount))
yarnAllocator.addResourceRequests(missingExecutorCount)
}
}

// TODO: We might want to extend this to allocate more containers in case they die !
Expand All @@ -228,13 +227,7 @@ class ExecutorLauncher(args: ApplicationMasterArguments, conf: Configuration, sp
val t = new Thread {
override def run() {
while (!driverClosed) {
val missingExecutorCount = args.numExecutors - yarnAllocator.getNumExecutorsRunning -
yarnAllocator.getNumPendingAllocate
if (missingExecutorCount > 0) {
logInfo("Allocating %d containers to make up for (potentially) lost containers".
format(missingExecutorCount))
yarnAllocator.addResourceRequests(missingExecutorCount)
}
allocateMissingExecutor()
sendProgress()
Thread.sleep(sleepTime)
}
Expand Down

0 comments on commit aff827c

Please sign in to comment.