From 1588597104f5154dda502742c68a33a81ddd0a13 Mon Sep 17 00:00:00 2001 From: WenboZhao Date: Fri, 9 Jun 2017 07:39:06 -0400 Subject: [PATCH] Reformat code using scalafmt (#19) (cherry picked from commit f52b8fc5227dc48d9e002bd142686e171f97b28a) (cherry picked from commit 462faf5227219016314a1977bdb6b30b01797ea7) (cherry picked from commit c3c332d0181b8f61f6b6e983bc30f0b7e980d09c) --- .../cook/CoarseCookSchedulerBackend.scala | 275 +++++++++--------- 1 file changed, 145 insertions(+), 130 deletions(-) diff --git a/cook/src/main/scala/org/apache/spark/scheduler/cluster/cook/CoarseCookSchedulerBackend.scala b/cook/src/main/scala/org/apache/spark/scheduler/cluster/cook/CoarseCookSchedulerBackend.scala index ac343ea15fd52..694cbb9a18582 100644 --- a/cook/src/main/scala/org/apache/spark/scheduler/cluster/cook/CoarseCookSchedulerBackend.scala +++ b/cook/src/main/scala/org/apache/spark/scheduler/cluster/cook/CoarseCookSchedulerBackend.scala @@ -17,34 +17,35 @@ package org.apache.spark.scheduler.cluster.cook -import java.io.{BufferedWriter, File, FileWriter, PrintWriter} import java.lang.Thread.UncaughtExceptionHandler -import java.net.{InetAddress, ServerSocket, URI} -import java.nio.file.{Files, Paths} +import java.nio.file.Paths import java.util.UUID import java.util.concurrent.{Executors, TimeUnit} -import scala.concurrent.{Await, Future} -import scala.concurrent.duration._ +import scala.collection.mutable import scala.collection.JavaConverters._ +import scala.concurrent.Future +import scala.concurrent.duration._ import scala.util.{Failure, Success, Try} + import org.apache.mesos._ import org.apache.mesos.Protos._ + +import org.apache.spark.internal.Logging +import org.apache.spark.rpc.RpcAddress import org.apache.spark.{SparkConf, SparkContext} import org.apache.spark.scheduler.{ExecutorExited, SlaveLost, TaskSchedulerImpl} import org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend import org.apache.spark.scheduler.cluster.mesos.{MesosCoarseGrainedSchedulerBackend, MesosSchedulerUtils} -import org.json.JSONObject +import org.apache.spark.util.ThreadUtils + import com.google.common.util.concurrent.ThreadFactoryBuilder import com.twosigma.cook.jobclient.{Job, JobClient, JobListener => CJobListener} -import org.apache.spark.internal.Logging -import org.apache.spark.rpc.RpcAddress - -import scala.collection.mutable +import org.json.JSONObject object CoarseCookSchedulerBackend { - // A collection of regexes for extracting information from an URI + // A collection of regular expressions for extracting information from an URI private val HTTP_URI_REGEX = """http://(.*)""".r private val RSYNC_URI_REGEX = """rsync://(.*)""".r private val SPARK_RSYNC_URI_REGEX = """spark-rsync://(.*)""".r @@ -57,15 +58,17 @@ object CoarseCookSchedulerBackend { s"rsync $file ./" case SPARK_RSYNC_URI_REGEX(file) => "RSYNC_CONNECT_PROG=\"knc spark-rsync@%H $SPARK_DRIVER_PULL_PORT\"" + - s" rsync $$SPARK_DRIVER_PULL_HOST::spark/$file ./" + s" rsync $$SPARK_DRIVER_PULL_HOST::spark/$file ./" case HDFS_URI_REGEX(file) => s"$$HADOOP_COMMAND fs -copyToLocal hdfs://$file ." case _ => sys.error(s"$uri not supported yet") } - def apply(scheduler: TaskSchedulerImpl, sc: SparkContext, cookHost: String, - cookPort: Int): CoarseGrainedSchedulerBackend = { + def apply(scheduler: TaskSchedulerImpl, + sc: SparkContext, + cookHost: String, + cookPort: Int): CoarseGrainedSchedulerBackend = { new CoarseCookSchedulerBackend(scheduler, sc, cookHost, cookPort) } } @@ -89,11 +92,13 @@ object CoarseCookSchedulerBackend { * -> to track this relationship. */ class CoarseCookSchedulerBackend( - scheduler: TaskSchedulerImpl, - sc: SparkContext, - cookHost: String, - cookPort: Int -) extends CoarseGrainedSchedulerBackend(scheduler, sc.env.rpcEnv) with Logging with MesosSchedulerUtils { + scheduler: TaskSchedulerImpl, + sc: SparkContext, + cookHost: String, + cookPort: Int +) extends CoarseGrainedSchedulerBackend(scheduler, sc.env.rpcEnv) + with Logging + with MesosSchedulerUtils { private[this] val schedulerConf = CookSchedulerConfiguration.conf(conf) private[this] var executorsRequested = 0 @@ -133,48 +138,22 @@ class CoarseCookSchedulerBackend( jobIds -= job.getUUID if (totalFailures >= schedulerConf.getMaximumExecutorFailures) { // TODO should we abort the outstanding tasks now? - logError(s"We have exceeded our maximum failures " + - s"($schedulerConf.getMaximumExecutorFailures)" + - "and will not relaunch any more tasks") + logError( + s"We have exceeded our maximum failures " + + s"($schedulerConf.getMaximumExecutorFailures)" + + "and will not relaunch any more tasks") } } } } } - private def executorUUIDWriter: UUID => Unit = - conf.getOption("spark.cook.executoruuid.log").fold { _: UUID => () } { _file => - def file(ct: Int) = s"${_file}.$ct" - def path(ct: Int) = Paths.get(file(ct)) - - // Here we roll existing logs. - @annotation.tailrec - def findFirstFree(ct: Int = 0): Int = - if (Files.exists(path(ct))) findFirstFree(ct + 1) - else ct - - @annotation.tailrec - def rollin(ct: Int) { - if (ct > 0) { - Files.move(path(ct - 1), path(ct)) - rollin(ct - 1) - } - } - - rollin(findFirstFree()) - - { uuid: UUID => - val bw = new BufferedWriter(new FileWriter(file(0), true)) - bw.write(uuid.toString) - bw.newLine() - bw.close() - } - } - private[this] val sparkMesosScheduler = new MesosCoarseGrainedSchedulerBackend(scheduler, sc, "", sc.env.securityManager) - override def applicationId(): String = conf.get("spark.cook.applicationId", super.applicationId()) + override def applicationId(): String = + conf.get("spark.cook.applicationId", super.applicationId()) + override def applicationAttemptId(): Option[String] = Some(applicationId()) private def createJob(numCores: Double): Job = { @@ -182,15 +161,20 @@ class CoarseCookSchedulerBackend( val jobId = UUID.randomUUID() val taskId = sparkMesosScheduler.newMesosTaskId() + executorIdToJobId += taskId -> jobId - logInfo(s"Creating job with id: $jobId. The corresponding executor id and task id is $taskId") - val fakeOffer = Offer.newBuilder() + logInfo( + s"Creating job with id: $jobId. The corresponding executor id and task id is $taskId") + + val fakeOffer = Offer + .newBuilder() .setId(OfferID.newBuilder().setValue("Cook-id")) .setFrameworkId(FrameworkID.newBuilder().setValue("Cook")) .setHostname("$(hostname)") .setSlaveId(SlaveID.newBuilder().setValue(jobId.toString)) .build() - val commandInfo = sparkMesosScheduler.createCommand(fakeOffer, numCores.toInt, taskId) + val commandInfo = + sparkMesosScheduler.createCommand(fakeOffer, numCores.toInt, taskId) val commandString = commandInfo.getValue val environmentInfo = commandInfo.getEnvironment @@ -198,19 +182,21 @@ class CoarseCookSchedulerBackend( // we invoke the spark scripts, our values will not be picked up val environment = environmentInfo.getVariablesList.asScala - .map { v => s"export ${v.getName}=" + "\"" + v.getValue + "\"" } ++ - Seq("export SPARK_LOCAL_DIRS=$MESOS_SANDBOX/spark-temp", "mkdir $SPARK_LOCAL_DIRS") ++ - Seq(s"export SPARK_EXECUTOR_APP_ID=$applicationId") ++ + .map { v => + s"export ${v.getName}=" + "\"" + v.getValue + "\"" + } ++ + Seq("export SPARK_LOCAL_DIRS=$MESOS_SANDBOX/spark-temp", + "mkdir $SPARK_LOCAL_DIRS") ++ + Seq(s"export SPARK_EXECUTOR_APP_ID=${applicationId()}") ++ rsyncServer.fold(Seq[String]()) { server => - val port = Await.result(server.port, Duration.Inf) - Seq( - s"export SPARK_DRIVER_PULL_HOST=${server.hostname}", - s"export SPARK_DRIVER_PULL_PORT=$port") + val port = ThreadUtils.awaitResult(server.port, Duration.Inf) + Seq(s"export SPARK_DRIVER_PULL_HOST=${server.hostname}", + s"export SPARK_DRIVER_PULL_PORT=$port") } ++ - conf.getOption("spark.python.command").fold(Seq[String]()) { pythonCommand => - Seq( - s"echo $pythonCommand \\$$@ > python_command", - "chmod 755 python_command") + conf.getOption("spark.python.command").fold(Seq[String]()) { + pythonCommand => + Seq(s"echo $pythonCommand \\$$@ > python_command", + "chmod 755 python_command") } val uriValues = commandInfo.getUrisList.asScala.map(_.getValue) @@ -222,12 +208,15 @@ class CoarseCookSchedulerBackend( val urisCommand = uriValues.map { uri => - s"[ ! -e $$(basename $uri) ] && ${fetchURI(uri)} && tar -xvzf $$(basename $uri)" + - " || (echo \"ERROR FETCHING\" && exit 1)" - } + s"[ ! -e $$(basename $uri) ] && ${fetchURI(uri)} && tar -xvzf $$(basename $uri)" + + " || (echo \"ERROR FETCHING\" && exit 1)" + } - val shippedTarballs: Seq[String] = conf.getOption("spark.cook.shippedTarballs") - .fold(Seq[String]()){ tgz => tgz.split(",").map(_.trim).toList } + val shippedTarballs: Seq[String] = conf + .getOption("spark.cook.shippedTarballs") + .fold(Seq[String]()) { tgz => + tgz.split(",").map(_.trim).toList + } val shippedTarballsCommand = shippedTarballs.map { uri => s"[ ! -e $$(basename $uri) ] && ${fetchURI(uri)} && tar -xvzf $$(basename $uri)" @@ -251,7 +240,8 @@ class CoarseCookSchedulerBackend( s"tar --strip-components=1 -xvzf $name -C HADOOP_CONF_DIR", // This must be absolute because we cd into the spark directory s"export HADOOP_CONF_DIR=`pwd`/HADOOP_CONF_DIR", - "export HADOOP_CLASSPATH=$HADOOP_CONF_DIR") + "export HADOOP_CLASSPATH=$HADOOP_CONF_DIR" + ) } else Seq() val cleanup = Seq( @@ -260,19 +250,19 @@ class CoarseCookSchedulerBackend( "then rm -rf $SPARK_LOCAL_DIRS", "echo deleted $SPARK_LOCAL_DIRS", "fi" // Deletes all of the tar files we fetched to save space on mesos - ) ++ uriValues.map { uri => - s"[ -z $$KEEP_SPARK_LOCAL_TARS ] || rm -f $$(basename $uri)" - } + ) ++ uriValues.map { uri => + s"[ -z $$KEEP_SPARK_LOCAL_TARS ] || rm -f $$(basename $uri)" + } val cmds = debug ++ - environment ++ - urisCommand ++ - shippedTarballsCommand ++ - remoteConfFetch ++ - keystorePull.map(Seq(_)).getOrElse(Seq[String]()) ++ - Seq("set", commandString) ++ - cleanup + environment ++ + urisCommand ++ + shippedTarballsCommand ++ + remoteConfFetch ++ + keystorePull.map(Seq(_)).getOrElse(Seq[String]()) ++ + Seq("set", commandString) ++ + cleanup val builder = new Job.Builder() .setUUID(jobId) @@ -285,11 +275,13 @@ class CoarseCookSchedulerBackend( .setRetries(1) val container = conf.get("spark.executor.cook.container", null) - if(container != null) { + if (container != null) { builder.setContainer(new JSONObject(container)) } - conf.getOption("spark.executor.cook.principalsThatCanView").foreach(builder.addLabel("principalsThatCanView", _)) + conf + .getOption("spark.executor.cook.principalsThatCanView") + .foreach(builder.addLabel("principalsThatCanView", _)) builder.build() } @@ -306,32 +298,40 @@ class CoarseCookSchedulerBackend( val ret = super.isReady() val cur = System.currentTimeMillis if (!ret && cur - lastIsReadyLog > 5000) { - logInfo("Backend is not yet ready. Registered executors " + - s"[${totalRegisteredExecutors.get}] vs minimum necessary " + - s"to start [$minExecutorsNecessary]") + logInfo( + "Backend is not yet ready. Registered executors " + + s"[${totalRegisteredExecutors.get}] vs minimum necessary " + + s"to start [$minExecutorsNecessary]") lastIsReadyLog = cur } ret } - override def createDriverEndpoint(properties: Seq[(String, String)]): DriverEndpoint = { + override def createDriverEndpoint( + properties: Seq[(String, String)]): DriverEndpoint = { new DriverEndpoint(rpcEnv, properties) { override def onDisconnected(remoteAddress: RpcAddress): Unit = { addressToExecutorId .get(remoteAddress) - .foreach(handleDisconnectedExecutor(_)) + .foreach(handleDisconnectedExecutor) } def handleDisconnectedExecutor(executorId: String): Unit = { - logInfo(s"Received disconnect message from executor with id $executorId." + - s" Its related cook job id is ${executorIdToJobId(executorId)}") + logInfo( + s"Received disconnect message from executor with id $executorId." + + s" Its related cook job id is ${executorIdToJobId(executorId)}") // TODO: we end up querying for everything, not sure of the perf implications here val jobId = executorIdToJobId(executorId) - val jobInstances = jobClient.query(Seq(jobId).asJava).asScala.values - .flatMap(_.getInstances.asScala).toSeq - val slaveLostReason = SlaveLost("Remote RPC client disassociated likely due to " + - "containers exceeding thresholds or network issues. Check driver logs for WARN " + - "message.") + val jobInstances = jobClient + .queryJobs(Seq(jobId).asJava) + .asScala + .values + .flatMap(_.getInstances.asScala) + .toSeq + val slaveLostReason = SlaveLost( + "Remote RPC client disassociated likely due to " + + "containers exceeding thresholds or network issues. Check driver logs for WARN " + + "message.") if (jobInstances.isEmpty) { // This can happen in the case of an aborted executor when the Listener removes it first. // We can just mark it as lost since it wouldn't be preempted anyways. @@ -339,9 +339,13 @@ class CoarseCookSchedulerBackend( } jobInstances.foreach { instance => if (instance.getPreempted) { - logInfo(s"Executor $executorId was removed due to preemption. Marking as killed.") - removeExecutor(executorId, ExecutorExited(instance.getReasonCode.toInt, - exitCausedByApp = false, "Executor was preempted by the scheduler.")) + logInfo( + s"Executor $executorId was removed due to preemption. Marking as killed.") + removeExecutor( + executorId, + ExecutorExited(instance.getReasonCode.toInt, + exitCausedByApp = false, + "Executor was preempted by the scheduler.")) } else { removeExecutor(executorId, slaveLostReason) } @@ -350,23 +354,28 @@ class CoarseCookSchedulerBackend( } } - /* + /** * Kill the given list of executors through the cluster manager. + * * @return whether the kill request is acknowledged. */ - override def doKillExecutors(executorIds: Seq[String]): Future[Boolean] = Future.successful { - val jobIdsToKill = executorIds.flatMap(executorIdToJobId.get) - jobClient.abort(jobIdsToKill.asJava) - jobIdsToKill.foreach(abortedJobIds.add) - true - } + override def doKillExecutors(executorIds: Seq[String]): Future[Boolean] = + Future.successful { + val jobIdsToKill = executorIds.flatMap(executorIdToJobId.get) + jobClient.abort(jobIdsToKill.asJava) + jobIdsToKill.foreach(abortedJobIds.add) + true + } - override def doRequestTotalExecutors(requestedTotal: Int): Future[Boolean] = Future.successful { - logInfo(s"Setting total amount of executors to request to $requestedTotal") - schedulerConf.setMaximumCores(requestedTotal * schedulerConf.getCoresPerCookJob) - requestExecutorsIfNecessary() - true - } + override def doRequestTotalExecutors(requestedTotal: Int): Future[Boolean] = + Future.successful { + logInfo( + s"Setting total amount of executors to request to $requestedTotal") + schedulerConf.setMaximumCores( + requestedTotal * schedulerConf.getCoresPerCookJob) + requestExecutorsIfNecessary() + true + } /** * Generate a list of jobs expected to run in Cook according to the difference of the total @@ -377,15 +386,17 @@ class CoarseCookSchedulerBackend( private[this] def createRemainingJobs(): List[Job] = { @annotation.tailrec def loop(instancesRemaining: Double, jobs: List[Job]): List[Job] = - if (instancesRemaining <= 0) jobs - else loop(instancesRemaining - 1, createJob(schedulerConf.getCoresPerCookJob) :: jobs) + if (instancesRemaining <= 0) { + jobs + } else { + loop(instancesRemaining - 1, + createJob(schedulerConf.getCoresPerCookJob) :: jobs) + } loop(schedulerConf.getExecutorsToRequest(executorsRequested), Nil).reverse } - /** - * Kill the extra executors if necessary. - */ + // Kill the extra executors if necessary. private[this] def killExecutorsIfNecessary(): Unit = { val executorsToKill = schedulerConf.getExecutorsToKill(executorsRequested) if (executorsToKill > 0) { @@ -400,9 +411,7 @@ class CoarseCookSchedulerBackend( } } - /** - * Request more executors from Cook via cook jobs if necessary. - */ + // Request more executors from Cook via cook jobs if necessary. private[this] def requestExecutorsIfNecessary(): Unit = { val jobs = createRemainingJobs() if (jobs.nonEmpty) { @@ -416,18 +425,24 @@ class CoarseCookSchedulerBackend( } } - /** - * Periodically check if the requested cores meets the requirement. If not, request more. - */ - val resourceManagerService = Executors.newScheduledThreadPool(1, - new ThreadFactoryBuilder().setUncaughtExceptionHandler(new UncaughtExceptionHandler() { - override def uncaughtException(t: Thread, e: Throwable) = - logWarning(s"Can not handle exception ${e}") - }).build()) + // Periodically check if the requested cores meets the requirement. If not, request more. + private[this] val resourceManagerService = Executors.newScheduledThreadPool( + 1, + new ThreadFactoryBuilder() + .setUncaughtExceptionHandler(new UncaughtExceptionHandler() { + override def uncaughtException(t: Thread, e: Throwable): Unit = + logWarning(s"Can not handle exception $e") + }) + .build() + ) private[this] val rsyncServer: Option[RsyncServer] = - conf.getOption("spark.executor.rsyncDir").map(rsyncDir => - new RsyncServer(rsyncDir, conf.getOption("spark.driver.rsyncExportDirs"))) + conf + .getOption("spark.executor.rsyncDir") + .map( + rsyncDir => + new RsyncServer(rsyncDir, + conf.getOption("spark.driver.rsyncExportDirs"))) override def start(): Unit = { super.start()