diff --git a/dev/scalastyle b/dev/scalastyle index 19955b9aaaad3..7b572f6a8945a 100755 --- a/dev/scalastyle +++ b/dev/scalastyle @@ -18,6 +18,10 @@ # echo -e "q\n" | sbt/sbt clean scalastyle > scalastyle.txt +# Check style with YARN alpha built too +SPARK_YARN=true sbt/sbt yarn/scalastyle >> scalastyle.txt +# Check style with YARN built too +SPARK_HADOOP_VERSION=2.2.0 SPARK_YARN=true sbt/sbt yarn/scalastyle >> scalastyle.txt ERRORS=$(cat scalastyle.txt | grep -e "\") if test ! -z "$ERRORS"; then echo -e "Scalastyle checks failed at following occurrences:\n$ERRORS" diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/TestHive.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/TestHive.scala index 465e5f146fe71..444bbfb4dd934 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/TestHive.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/TestHive.scala @@ -261,8 +261,9 @@ class TestHiveContext(sc: SparkContext) extends LocalHiveContext(sc) { testTables.get(name).map(_.commands).getOrElse(sys.error(s"Unknown test table $name")) createCmds.foreach(_()) - if (cacheTables) + if (cacheTables) { cacheTable(name) + } } } diff --git a/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ExecutorLauncher.scala b/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ExecutorLauncher.scala index 7b0e020263835..21f14576efe8a 100644 --- a/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ExecutorLauncher.scala +++ b/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ExecutorLauncher.scala @@ -37,7 +37,8 @@ import org.apache.spark.scheduler.SplitInfo class ExecutorLauncher(args: ApplicationMasterArguments, conf: Configuration, sparkConf: SparkConf) extends Logging { - def this(args: ApplicationMasterArguments, sparkConf: SparkConf) = this(args, new Configuration(), sparkConf) + def this(args: ApplicationMasterArguments, sparkConf: SparkConf) = + this(args, new Configuration(), sparkConf) def this(args: ApplicationMasterArguments) = this(args, new SparkConf()) @@ -63,7 +64,8 @@ class ExecutorLauncher(args: ApplicationMasterArguments, conf: Configuration, sp override def preStart() { logInfo("Listen to driver: " + driverUrl) driver = context.actorSelection(driverUrl) - // Send a hello message thus the connection is actually established, thus we can monitor Lifecycle Events. + // Send a hello message thus the connection is actually established, thus we can + // monitor Lifecycle Events. driver ! "Hello" context.system.eventStream.subscribe(self, classOf[RemotingLifecycleEvent]) } @@ -104,8 +106,9 @@ class ExecutorLauncher(args: ApplicationMasterArguments, conf: Configuration, sp // Allocate all containers allocateExecutors() - // Launch a progress reporter thread, else app will get killed after expiration (def: 10mins) timeout - // ensure that progress is sent before YarnConfiguration.RM_AM_EXPIRY_INTERVAL_MS elapse. + // Launch a progress reporter thread, else app will get killed after expiration + // (def: 10mins) timeout ensure that progress is sent before + // YarnConfiguration.RM_AM_EXPIRY_INTERVAL_MS elapse. val timeoutInterval = yarnConf.getInt(YarnConfiguration.RM_AM_EXPIRY_INTERVAL_MS, 120000) // we want to be reasonably responsive without causing too many requests to RM. @@ -163,8 +166,8 @@ class ExecutorLauncher(args: ApplicationMasterArguments, conf: Configuration, sp val appMasterRequest = Records.newRecord(classOf[RegisterApplicationMasterRequest]) .asInstanceOf[RegisterApplicationMasterRequest] appMasterRequest.setApplicationAttemptId(appAttemptId) - // Setting this to master host,port - so that the ApplicationReport at client has some sensible info. - // Users can then monitor stderr/stdout on that node if required. + // Setting this to master host,port - so that the ApplicationReport at client has + // some sensible info. Users can then monitor stderr/stdout on that node if required. appMasterRequest.setHost(Utils.localHostName()) appMasterRequest.setRpcPort(0) // What do we provide here ? Might make sense to expose something sensible later ? @@ -213,7 +216,8 @@ class ExecutorLauncher(args: ApplicationMasterArguments, conf: Configuration, sp // TODO: This is a bit ugly. Can we make it nicer? // TODO: Handle container failure while ((yarnAllocator.getNumExecutorsRunning < args.numExecutors) && (!driverClosed)) { - yarnAllocator.allocateContainers(math.max(args.numExecutors - yarnAllocator.getNumExecutorsRunning, 0)) + yarnAllocator.allocateContainers( + math.max(args.numExecutors - yarnAllocator.getNumExecutorsRunning, 0)) Thread.sleep(100) } @@ -230,7 +234,8 @@ class ExecutorLauncher(args: ApplicationMasterArguments, conf: Configuration, sp while (!driverClosed) { val missingExecutorCount = args.numExecutors - yarnAllocator.getNumExecutorsRunning if (missingExecutorCount > 0) { - logInfo("Allocating " + missingExecutorCount + " containers to make up for (potentially ?) lost containers") + logInfo("Allocating " + missingExecutorCount + + " containers to make up for (potentially ?) lost containers") yarnAllocator.allocateContainers(missingExecutorCount) } else sendProgress() diff --git a/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala b/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala index 2056667af50cb..d6d46a5f6ce42 100644 --- a/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala +++ b/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala @@ -225,8 +225,8 @@ private[yarn] class YarnAllocationHandler( val executorHostname = container.getNodeId.getHost val containerId = container.getId - assert( - container.getResource.getMemory >= (executorMemory + YarnAllocationHandler.MEMORY_OVERHEAD)) + assert( container.getResource.getMemory >= + (executorMemory + YarnAllocationHandler.MEMORY_OVERHEAD)) if (numExecutorsRunningNow > maxExecutors) { logInfo("""Ignoring container %s at host %s, since we already have the required number of @@ -393,9 +393,10 @@ private[yarn] class YarnAllocationHandler( // default. if (numExecutors <= 0 || preferredHostToCount.isEmpty) { - logDebug("numExecutors: " + numExecutors + ", host preferences: " + preferredHostToCount.isEmpty) - resourceRequests = List( - createResourceRequest(AllocationType.ANY, null, numExecutors, YarnAllocationHandler.PRIORITY)) + logDebug("numExecutors: " + numExecutors + ", host preferences: " + + preferredHostToCount.isEmpty) + resourceRequests = List(createResourceRequest( + AllocationType.ANY, null, numExecutors, YarnAllocationHandler.PRIORITY)) } else { // request for all hosts in preferred nodes and for numExecutors - diff --git a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala index 61af0f9ac5ca0..581cfe43b65c2 100644 --- a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala +++ b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala @@ -137,7 +137,8 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration, System.getenv(ApplicationConstants.APPLICATION_WEB_PROXY_BASE_ENV) val params = "PROXY_HOST=" + parts(0) + "," + "PROXY_URI_BASE=" + uriBase - System.setProperty("spark.org.apache.hadoop.yarn.server.webproxy.amfilter.AmIpFilter.params", params) + System.setProperty( + "spark.org.apache.hadoop.yarn.server.webproxy.amfilter.AmIpFilter.params", params) } /** Get the Yarn approved local directories. */ diff --git a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ExecutorLauncher.scala b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ExecutorLauncher.scala index b697f103914fd..67ed591c78bf9 100644 --- a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ExecutorLauncher.scala +++ b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ExecutorLauncher.scala @@ -65,7 +65,8 @@ class ExecutorLauncher(args: ApplicationMasterArguments, conf: Configuration, sp override def preStart() { logInfo("Listen to driver: " + driverUrl) driver = context.actorSelection(driverUrl) - // Send a hello message thus the connection is actually established, thus we can monitor Lifecycle Events. + // Send a hello message thus the connection is actually established, + // thus we can monitor Lifecycle Events. driver ! "Hello" context.system.eventStream.subscribe(self, classOf[RemotingLifecycleEvent]) } @@ -95,8 +96,9 @@ class ExecutorLauncher(args: ApplicationMasterArguments, conf: Configuration, sp // Allocate all containers allocateExecutors() - // Launch a progress reporter thread, else app will get killed after expiration (def: 10mins) timeout - // ensure that progress is sent before YarnConfiguration.RM_AM_EXPIRY_INTERVAL_MS elapse. + // Launch a progress reporter thread, else app will get killed after expiration + // (def: 10mins) timeout ensure that progress is sent before + // YarnConfiguration.RM_AM_EXPIRY_INTERVAL_MS elapse. val timeoutInterval = yarnConf.getInt(YarnConfiguration.RM_AM_EXPIRY_INTERVAL_MS, 120000) // we want to be reasonably responsive without causing too many requests to RM. diff --git a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala index e31c4060e8452..4fafae1aff26f 100644 --- a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala +++ b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala @@ -276,7 +276,8 @@ private[yarn] class YarnAllocationHandler( allocatedRackCount.put(rack, allocatedRackCount.getOrElse(rack, 0) + 1) } } - logInfo("Launching ExecutorRunnable. driverUrl: %s, executorHostname: %s".format(driverUrl, executorHostname)) + logInfo("Launching ExecutorRunnable. driverUrl: %s, executorHostname: %s".format( + driverUrl, executorHostname)) val executorRunnable = new ExecutorRunnable( container, conf, @@ -314,8 +315,8 @@ private[yarn] class YarnAllocationHandler( // `pendingReleaseContainers`. pendingReleaseContainers.remove(containerId) } else { - // Decrement the number of executors running. The next iteration of the ApplicationMaster's - // reporting thread will take care of allocating. + // Decrement the number of executors running. The next iteration of + // the ApplicationMaster's reporting thread will take care of allocating. numExecutorsRunning.decrementAndGet() logInfo("Completed container %s (state: %s, exit status: %s)".format( containerId,