From 0fdf2f5a18d88a600ca1ab7b4bf02fb0537e9411 Mon Sep 17 00:00:00 2001 From: Matei Zaharia Date: Thu, 4 Sep 2014 17:47:16 -0700 Subject: [PATCH 01/11] Manually close old PR Closes #1588 From 90b17a70c703c403d397e24cfbc20da22a32102d Mon Sep 17 00:00:00 2001 From: Matei Zaharia Date: Thu, 4 Sep 2014 17:51:14 -0700 Subject: [PATCH 02/11] Manually close old PR Closes #544 From 3eb6ef316c2a5ee43d5ecfcf9f10c2d7adc6b819 Mon Sep 17 00:00:00 2001 From: Liang-Chi Hsieh Date: Thu, 4 Sep 2014 18:46:09 -0700 Subject: [PATCH 03/11] [SPARK-3310][SQL] Directly use currentTable without unnecessary implicit conversion We can directly use currentTable there without unnecessary implicit conversion. Author: Liang-Chi Hsieh Closes #2203 from viirya/direct_use_inmemoryrelation and squashes the following commits: 4741d02 [Liang-Chi Hsieh] Merge remote-tracking branch 'upstream/master' into direct_use_inmemoryrelation b671f67 [Liang-Chi Hsieh] Can directly use currentTable there without unnecessary implicit conversion. --- sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala index a75af94d29303..5acb45c155ba5 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala @@ -272,7 +272,7 @@ class SQLContext(@transient val sparkContext: SparkContext) val currentTable = table(tableName).queryExecution.analyzed val asInMemoryRelation = currentTable match { case _: InMemoryRelation => - currentTable.logicalPlan + currentTable case _ => InMemoryRelation(useCompression, columnBatchSize, executePlan(currentTable).executedPlan) From ee575f12f2ab059d9c1b4fa8d6c1e62248c3d11b Mon Sep 17 00:00:00 2001 From: Cheng Lian Date: Thu, 4 Sep 2014 18:47:45 -0700 Subject: [PATCH 04/11] [SPARK-2219][SQL] Added support for the "add jar" command Adds logical and physical command classes for the "add jar" command. Note that this PR conflicts with and should be merged after #2215. Author: Cheng Lian Closes #2242 from liancheng/add-jar and squashes the following commits: e43a2f1 [Cheng Lian] Updates AddJar according to conventions introduced in #2215 b99107f [Cheng Lian] Added test case for ADD JAR command 095b2c7 [Cheng Lian] Also forward ADD JAR command to Hive 9be031b [Cheng Lian] Trims Jar path string 8195056 [Cheng Lian] Added support for the "add jar" command --- .../org/apache/spark/sql/hive/HiveQl.scala | 8 +++--- .../spark/sql/hive/HiveStrategies.scala | 5 ++-- .../spark/sql/hive/execution/commands.scala | 16 ++++++++++++ .../sql/hive/execution/HiveQuerySuite.scala | 25 ++++++++++++++++--- 4 files changed, 46 insertions(+), 8 deletions(-) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala index a4dd6be5f9e35..c98287c6aa662 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala @@ -44,6 +44,8 @@ private[hive] case class SourceCommand(filePath: String) extends Command private[hive] case class AddFile(filePath: String) extends Command +private[hive] case class AddJar(path: String) extends Command + private[hive] case class DropTable(tableName: String, ifExists: Boolean) extends Command private[hive] case class AnalyzeTable(tableName: String) extends Command @@ -231,7 +233,7 @@ private[hive] object HiveQl { } else if (sql.trim.toLowerCase.startsWith("uncache table")) { CacheCommand(sql.trim.drop(14).trim, false) } else if (sql.trim.toLowerCase.startsWith("add jar")) { - NativeCommand(sql) + AddJar(sql.trim.drop(8).trim) } else if (sql.trim.toLowerCase.startsWith("add file")) { AddFile(sql.trim.drop(9)) } else if (sql.trim.toLowerCase.startsWith("dfs")) { @@ -1018,9 +1020,9 @@ private[hive] object HiveQl { /* Other functions */ case Token("TOK_FUNCTION", Token(RAND(), Nil) :: Nil) => Rand - case Token("TOK_FUNCTION", Token(SUBSTR(), Nil) :: string :: pos :: Nil) => + case Token("TOK_FUNCTION", Token(SUBSTR(), Nil) :: string :: pos :: Nil) => Substring(nodeToExpr(string), nodeToExpr(pos), Literal(Integer.MAX_VALUE, IntegerType)) - case Token("TOK_FUNCTION", Token(SUBSTR(), Nil) :: string :: pos :: length :: Nil) => + case Token("TOK_FUNCTION", Token(SUBSTR(), Nil) :: string :: pos :: length :: Nil) => Substring(nodeToExpr(string), nodeToExpr(pos), nodeToExpr(length)) /* UDFs - Must be last otherwise will preempt built in functions */ diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala index 24abb1b5bd1a8..72cc01cdf4c84 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala @@ -195,11 +195,12 @@ private[hive] trait HiveStrategies { case class HiveCommandStrategy(context: HiveContext) extends Strategy { def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match { - case logical.NativeCommand(sql) => - NativeCommand(sql, plan.output)(context) :: Nil + case logical.NativeCommand(sql) => NativeCommand(sql, plan.output)(context) :: Nil case hive.DropTable(tableName, ifExists) => execution.DropTable(tableName, ifExists) :: Nil + case hive.AddJar(path) => execution.AddJar(path) :: Nil + case hive.AnalyzeTable(tableName) => execution.AnalyzeTable(tableName) :: Nil case describe: logical.DescribeCommand => diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/commands.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/commands.scala index a1a4aa7de7bf7..d61c5e274a596 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/commands.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/commands.scala @@ -60,3 +60,19 @@ case class DropTable(tableName: String, ifExists: Boolean) extends LeafNode with Seq.empty[Row] } } + +/** + * :: DeveloperApi :: + */ +@DeveloperApi +case class AddJar(path: String) extends LeafNode with Command { + def hiveContext = sqlContext.asInstanceOf[HiveContext] + + override def output = Seq.empty + + override protected[sql] lazy val sideEffectResult: Seq[Row] = { + hiveContext.runSqlHive(s"ADD JAR $path") + hiveContext.sparkContext.addJar(path) + Seq.empty[Row] + } +} diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala index c4abb3eb4861f..f4217a52c3822 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala @@ -17,9 +17,11 @@ package org.apache.spark.sql.hive.execution +import java.io.File + import scala.util.Try -import org.apache.spark.sql.{SchemaRDD, Row} +import org.apache.spark.SparkException import org.apache.spark.sql.hive._ import org.apache.spark.sql.hive.test.TestHive import org.apache.spark.sql.hive.test.TestHive._ @@ -313,7 +315,7 @@ class HiveQuerySuite extends HiveComparisonTest { "SELECT srcalias.KEY, SRCALIAS.value FROM sRc SrCAlias WHERE SrCAlias.kEy < 15") test("case sensitivity: registered table") { - val testData: SchemaRDD = + val testData = TestHive.sparkContext.parallelize( TestData(1, "str1") :: TestData(2, "str2") :: Nil) @@ -467,7 +469,7 @@ class HiveQuerySuite extends HiveComparisonTest { } // Describe a registered temporary table. - val testData: SchemaRDD = + val testData = TestHive.sparkContext.parallelize( TestData(1, "str1") :: TestData(1, "str2") :: Nil) @@ -495,6 +497,23 @@ class HiveQuerySuite extends HiveComparisonTest { } } + test("ADD JAR command") { + val testJar = TestHive.getHiveFile("data/files/TestSerDe.jar").getCanonicalPath + sql("CREATE TABLE alter1(a INT, b INT)") + intercept[Exception] { + sql( + """ALTER TABLE alter1 SET SERDE 'org.apache.hadoop.hive.serde2.TestSerDe' + |WITH serdeproperties('s1'='9') + """.stripMargin) + } + sql(s"ADD JAR $testJar") + sql( + """ALTER TABLE alter1 SET SERDE 'org.apache.hadoop.hive.serde2.TestSerDe' + |WITH serdeproperties('s1'='9') + """.stripMargin) + sql("DROP TABLE alter1") + } + test("parse HQL set commands") { // Adapted from its SQL counterpart. val testKey = "spark.sql.key.usedfortestonly" From 1904bac38d97df5ae9fb193e92a83c7f8ff6d255 Mon Sep 17 00:00:00 2001 From: Cheng Hao Date: Thu, 4 Sep 2014 19:16:12 -0700 Subject: [PATCH 05/11] [SPARK-3392] [SQL] Show value spark.sql.shuffle.partitions for mapred.reduce.tasks This is a tiny fix for getting the value of "mapred.reduce.tasks", which make more sense for the hive user. As well as the command "set -v", which should output verbose information for all of the key/values. Author: Cheng Hao Closes #2261 from chenghao-intel/set_mapreduce_tasks and squashes the following commits: 653858a [Cheng Hao] show value spark.sql.shuffle.partitions for mapred.reduce.tasks --- .../apache/spark/sql/execution/commands.scala | 16 ++++++++++++---- 1 file changed, 12 insertions(+), 4 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/commands.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/commands.scala index 286c6d264f86a..94543fc95b470 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/commands.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/commands.scala @@ -60,10 +60,10 @@ case class SetCommand( logWarning(s"Property ${SQLConf.Deprecated.MAPRED_REDUCE_TASKS} is deprecated, " + s"automatically converted to ${SQLConf.SHUFFLE_PARTITIONS} instead.") context.setConf(SQLConf.SHUFFLE_PARTITIONS, v) - Array(Row(s"${SQLConf.SHUFFLE_PARTITIONS}=$v")) + Seq(Row(s"${SQLConf.SHUFFLE_PARTITIONS}=$v")) } else { context.setConf(k, v) - Array(Row(s"$k=$v")) + Seq(Row(s"$k=$v")) } // Query the value bound to key k. @@ -78,11 +78,19 @@ case class SetCommand( "hive-hwi-0.12.0.jar", "hive-0.12.0.jar").mkString(":") - Array( + context.getAllConfs.map { case (k, v) => + Row(s"$k=$v") + }.toSeq ++ Seq( Row("system:java.class.path=" + hiveJars), Row("system:sun.java.command=shark.SharkServer2")) } else { - Array(Row(s"$k=${context.getConf(k, "")}")) + if (k == SQLConf.Deprecated.MAPRED_REDUCE_TASKS) { + logWarning(s"Property ${SQLConf.Deprecated.MAPRED_REDUCE_TASKS} is deprecated, " + + s"showing ${SQLConf.SHUFFLE_PARTITIONS} instead.") + Seq(Row(s"${SQLConf.SHUFFLE_PARTITIONS}=${context.numShufflePartitions}")) + } else { + Seq(Row(s"$k=${context.getConf(k, "")}")) + } } // Query all key-value pairs that are set in the SQLConf of the context. From 1725a1a5d10a53762bd80f391eddbf306f2841ee Mon Sep 17 00:00:00 2001 From: Reynold Xin Date: Thu, 4 Sep 2014 23:34:58 -0700 Subject: [PATCH 06/11] [SPARK-3391][EC2] Support attaching up to 8 EBS volumes. Please merge this at the same time as https://github.com/mesos/spark-ec2/pull/66 Author: Reynold Xin Closes #2260 from rxin/ec2-ebs-vol and squashes the following commits: b9527d9 [Reynold Xin] Removed io1 ebs type. bf9c403 [Reynold Xin] Made EBS volume type configurable. c8e25ea [Reynold Xin] Support up to 8 EBS volumes. adf4f2e [Reynold Xin] Revert git repo change. 020c542 [Reynold Xin] [SPARK-3391] Support attaching more than 1 EBS volumes. --- ec2/spark_ec2.py | 33 +++++++++++++++++++++++++-------- 1 file changed, 25 insertions(+), 8 deletions(-) diff --git a/ec2/spark_ec2.py b/ec2/spark_ec2.py index eed6eb8485183..1670faca4a480 100755 --- a/ec2/spark_ec2.py +++ b/ec2/spark_ec2.py @@ -102,9 +102,17 @@ def parse_args(): "(for debugging)") parser.add_option( "--ebs-vol-size", metavar="SIZE", type="int", default=0, - help="Attach a new EBS volume of size SIZE (in GB) to each node as " + - "/vol. The volumes will be deleted when the instances terminate. " + - "Only possible on EBS-backed AMIs.") + help="Size (in GB) of each EBS volume.") + parser.add_option( + "--ebs-vol-type", default="standard", + help="EBS volume type (e.g. 'gp2', 'standard').") + parser.add_option( + "--ebs-vol-num", type="int", default=1, + help="Number of EBS volumes to attach to each node as /vol[x]. " + + "The volumes will be deleted when the instances terminate. " + + "Only possible on EBS-backed AMIs. " + + "EBS volumes are only attached if --ebs-vol-size > 0." + + "Only support up to 8 EBS volumes.") parser.add_option( "--swap", metavar="SWAP", type="int", default=1024, help="Swap space to set up per node, in MB (default: 1024)") @@ -348,13 +356,16 @@ def launch_cluster(conn, opts, cluster_name): print >> stderr, "Could not find AMI " + opts.ami sys.exit(1) - # Create block device mapping so that we can add an EBS volume if asked to + # Create block device mapping so that we can add EBS volumes if asked to. + # The first drive is attached as /dev/sds, 2nd as /dev/sdt, ... /dev/sdz block_map = BlockDeviceMapping() if opts.ebs_vol_size > 0: - device = EBSBlockDeviceType() - device.size = opts.ebs_vol_size - device.delete_on_termination = True - block_map["/dev/sdv"] = device + for i in range(opts.ebs_vol_num): + device = EBSBlockDeviceType() + device.size = opts.ebs_vol_size + device.volume_type=opts.ebs_vol_type + device.delete_on_termination = True + block_map["/dev/sd" + chr(ord('s') + i)] = device # AWS ignores the AMI-specified block device mapping for M3 (see SPARK-3342). if opts.instance_type.startswith('m3.'): @@ -828,6 +839,12 @@ def get_partition(total, num_partitions, current_partitions): def real_main(): (opts, action, cluster_name) = parse_args() + + # Input parameter validation + if opts.ebs_vol_num > 8: + print >> stderr, "ebs-vol-num cannot be greater than 8" + sys.exit(1) + try: conn = ec2.connect_to_region(opts.region) except Exception as e: From 6a37ed838b3cbf96f7a904f3d3dabf99141729f5 Mon Sep 17 00:00:00 2001 From: Nicholas Chammas Date: Thu, 4 Sep 2014 23:37:06 -0700 Subject: [PATCH 07/11] [Docs] fix minor MLlib case typo Also make the list of features consistent in style. Author: Nicholas Chammas Closes #2278 from nchammas/patch-1 and squashes the following commits: 56df319 [Nicholas Chammas] [Docs] fix minor MLlib case typo --- README.md | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/README.md b/README.md index 0a683a460ffac..5b09ad86849e7 100644 --- a/README.md +++ b/README.md @@ -4,8 +4,8 @@ Spark is a fast and general cluster computing system for Big Data. It provides high-level APIs in Scala, Java, and Python, and an optimized engine that supports general computation graphs for data analysis. It also supports a rich set of higher-level tools including Spark SQL for SQL and structured -data processing, MLLib for machine learning, GraphX for graph processing, -and Spark Streaming. +data processing, MLlib for machine learning, GraphX for graph processing, +and Spark Streaming for stream processing. From 51b53a758c85f2e20ad9bd73ed815fcfa9c7180b Mon Sep 17 00:00:00 2001 From: Thomas Graves Date: Fri, 5 Sep 2014 09:54:40 -0500 Subject: [PATCH 08/11] [SPARK-3260] yarn - pass acls along with executor launch Pass along the acl settings when we launch a container so that they can be applied to viewing the logs on a running NodeManager. Author: Thomas Graves Closes #2185 from tgravescs/SPARK-3260 and squashes the following commits: 6f94b5a [Thomas Graves] make unit test more robust 28b9dd3 [Thomas Graves] yarn - pass acls along with executor launch --- .../spark/deploy/yarn/ExecutorRunnable.scala | 7 +- .../deploy/yarn/YarnAllocationHandler.scala | 7 +- .../spark/deploy/yarn/YarnRMClientImpl.scala | 7 +- .../spark/deploy/yarn/ApplicationMaster.scala | 13 ++-- .../apache/spark/deploy/yarn/ClientBase.scala | 6 +- .../spark/deploy/yarn/YarnAllocator.scala | 10 ++- .../spark/deploy/yarn/YarnRMClient.scala | 5 +- .../deploy/yarn/YarnSparkHadoopUtil.scala | 11 ++- .../yarn/YarnSparkHadoopUtilSuite.scala | 76 ++++++++++++++++++- .../spark/deploy/yarn/ExecutorRunnable.scala | 7 +- .../deploy/yarn/YarnAllocationHandler.scala | 7 +- .../spark/deploy/yarn/YarnRMClientImpl.scala | 7 +- 12 files changed, 129 insertions(+), 34 deletions(-) diff --git a/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala b/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala index 7dae248e3e7db..10cbeb8b94325 100644 --- a/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala +++ b/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala @@ -35,7 +35,7 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration import org.apache.hadoop.yarn.ipc.YarnRPC import org.apache.hadoop.yarn.util.{Apps, ConverterUtils, Records, ProtoUtils} -import org.apache.spark.{SparkConf, Logging} +import org.apache.spark.{SecurityManager, SparkConf, Logging} class ExecutorRunnable( @@ -46,7 +46,8 @@ class ExecutorRunnable( slaveId: String, hostname: String, executorMemory: Int, - executorCores: Int) + executorCores: Int, + securityMgr: SecurityManager) extends Runnable with ExecutorRunnableUtil with Logging { var rpc: YarnRPC = YarnRPC.create(conf) @@ -86,6 +87,8 @@ class ExecutorRunnable( logInfo("Setting up executor with commands: " + commands) ctx.setCommands(commands) + ctx.setApplicationACLs(YarnSparkHadoopUtil.getApplicationAclsForYarn(securityMgr)) + // Send the start request to the ContainerManager val startReq = Records.newRecord(classOf[StartContainerRequest]) .asInstanceOf[StartContainerRequest] diff --git a/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala b/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala index 9f9e16c06452b..85d6274df2fcb 100644 --- a/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala +++ b/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala @@ -23,7 +23,7 @@ import java.util.concurrent.atomic.AtomicInteger import scala.collection.JavaConversions._ import scala.collection.mutable.{ArrayBuffer, HashMap} -import org.apache.spark.SparkConf +import org.apache.spark.{SecurityManager, SparkConf} import org.apache.spark.scheduler.SplitInfo import org.apache.hadoop.conf.Configuration @@ -41,8 +41,9 @@ private[yarn] class YarnAllocationHandler( resourceManager: AMRMProtocol, appAttemptId: ApplicationAttemptId, args: ApplicationMasterArguments, - preferredNodes: collection.Map[String, collection.Set[SplitInfo]]) - extends YarnAllocator(conf, sparkConf, args, preferredNodes) { + preferredNodes: collection.Map[String, collection.Set[SplitInfo]], + securityMgr: SecurityManager) + extends YarnAllocator(conf, sparkConf, args, preferredNodes, securityMgr) { private val lastResponseId = new AtomicInteger() private val releaseList: CopyOnWriteArrayList[ContainerId] = new CopyOnWriteArrayList() diff --git a/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/YarnRMClientImpl.scala b/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/YarnRMClientImpl.scala index cc5392192ec51..ad27a9ab781d2 100644 --- a/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/YarnRMClientImpl.scala +++ b/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/YarnRMClientImpl.scala @@ -27,7 +27,7 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration import org.apache.hadoop.yarn.ipc.YarnRPC import org.apache.hadoop.yarn.util.{ConverterUtils, Records} -import org.apache.spark.{Logging, SparkConf} +import org.apache.spark.{Logging, SecurityManager, SparkConf} import org.apache.spark.scheduler.SplitInfo import org.apache.spark.util.Utils @@ -45,7 +45,8 @@ private class YarnRMClientImpl(args: ApplicationMasterArguments) extends YarnRMC sparkConf: SparkConf, preferredNodeLocations: Map[String, Set[SplitInfo]], uiAddress: String, - uiHistoryAddress: String) = { + uiHistoryAddress: String, + securityMgr: SecurityManager) = { this.rpc = YarnRPC.create(conf) this.uiHistoryAddress = uiHistoryAddress @@ -53,7 +54,7 @@ private class YarnRMClientImpl(args: ApplicationMasterArguments) extends YarnRMC registerApplicationMaster(uiAddress) new YarnAllocationHandler(conf, sparkConf, resourceManager, getAttemptId(), args, - preferredNodeLocations) + preferredNodeLocations, securityMgr) } override def getAttemptId() = { diff --git a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala index 98039a20de245..a879c833a014f 100644 --- a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala +++ b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala @@ -116,7 +116,7 @@ private[spark] class ApplicationMaster(args: ApplicationMasterArguments, val securityMgr = new SecurityManager(sparkConf) if (isDriver) { - runDriver() + runDriver(securityMgr) } else { runExecutorLauncher(securityMgr) } @@ -157,7 +157,7 @@ private[spark] class ApplicationMaster(args: ApplicationMasterArguments, sparkContextRef.compareAndSet(sc, null) } - private def registerAM(uiAddress: String) = { + private def registerAM(uiAddress: String, securityMgr: SecurityManager) = { val sc = sparkContextRef.get() val appId = client.getAttemptId().getApplicationId().toString() @@ -170,13 +170,14 @@ private[spark] class ApplicationMaster(args: ApplicationMasterArguments, if (sc != null) sc.getConf else sparkConf, if (sc != null) sc.preferredNodeLocationData else Map(), uiAddress, - historyAddress) + historyAddress, + securityMgr) allocator.allocateResources() reporterThread = launchReporterThread() } - private def runDriver(): Unit = { + private def runDriver(securityMgr: SecurityManager): Unit = { addAmIpFilter() val userThread = startUserClass() @@ -188,7 +189,7 @@ private[spark] class ApplicationMaster(args: ApplicationMasterArguments, if (sc == null) { finish(FinalApplicationStatus.FAILED, "Timed out waiting for SparkContext.") } else { - registerAM(sc.ui.appUIHostPort) + registerAM(sc.ui.appUIHostPort, securityMgr) try { userThread.join() } finally { @@ -203,7 +204,7 @@ private[spark] class ApplicationMaster(args: ApplicationMasterArguments, conf = sparkConf, securityManager = securityMgr)._1 actor = waitForSparkDriver() addAmIpFilter() - registerAM(sparkConf.get("spark.driver.appUIAddress", "")) + registerAM(sparkConf.get("spark.driver.appUIAddress", ""), securityMgr) // In client mode the actor will stop the reporter thread. reporterThread.join() diff --git a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientBase.scala b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientBase.scala index 5d8e5e6dffe7f..8075b7a7fb837 100644 --- a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientBase.scala +++ b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientBase.scala @@ -430,10 +430,8 @@ trait ClientBase extends Logging { // send the acl settings into YARN to control who has access via YARN interfaces val securityManager = new SecurityManager(sparkConf) - val acls = Map[ApplicationAccessType, String] ( - ApplicationAccessType.VIEW_APP -> securityManager.getViewAcls, - ApplicationAccessType.MODIFY_APP -> securityManager.getModifyAcls) - amContainer.setApplicationACLs(acls) + amContainer.setApplicationACLs(YarnSparkHadoopUtil.getApplicationAclsForYarn(securityManager)) + amContainer } } diff --git a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala index c74dd1c2b21dc..02b9a81bf6b50 100644 --- a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala +++ b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala @@ -28,7 +28,7 @@ import org.apache.hadoop.conf.Configuration import org.apache.hadoop.yarn.api.records._ import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse -import org.apache.spark.{Logging, SparkConf, SparkEnv} +import org.apache.spark.{Logging, SecurityManager, SparkConf, SparkEnv} import org.apache.spark.scheduler.{SplitInfo, TaskSchedulerImpl} import org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend @@ -55,7 +55,8 @@ private[yarn] abstract class YarnAllocator( conf: Configuration, sparkConf: SparkConf, args: ApplicationMasterArguments, - preferredNodes: collection.Map[String, collection.Set[SplitInfo]]) + preferredNodes: collection.Map[String, collection.Set[SplitInfo]], + securityMgr: SecurityManager) extends Logging { // These three are locked on allocatedHostToContainersMap. Complementary data structures @@ -280,7 +281,8 @@ private[yarn] abstract class YarnAllocator( executorId, executorHostname, executorMemory, - executorCores) + executorCores, + securityMgr) new Thread(executorRunnable).start() } } @@ -444,4 +446,4 @@ private[yarn] abstract class YarnAllocator( } -} \ No newline at end of file +} diff --git a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/YarnRMClient.scala b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/YarnRMClient.scala index 922d7d1a854a5..ed65e56b3e413 100644 --- a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/YarnRMClient.scala +++ b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/YarnRMClient.scala @@ -22,7 +22,7 @@ import scala.collection.{Map, Set} import org.apache.hadoop.yarn.conf.YarnConfiguration import org.apache.hadoop.yarn.api.records._ -import org.apache.spark.{SparkConf, SparkContext} +import org.apache.spark.{SecurityManager, SparkConf, SparkContext} import org.apache.spark.scheduler.SplitInfo /** @@ -45,7 +45,8 @@ trait YarnRMClient { sparkConf: SparkConf, preferredNodeLocations: Map[String, Set[SplitInfo]], uiAddress: String, - uiHistoryAddress: String): YarnAllocator + uiHistoryAddress: String, + securityMgr: SecurityManager): YarnAllocator /** * Shuts down the AM. Guaranteed to only be called once. diff --git a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala index dc77f1236492d..4a33e34c3bfc7 100644 --- a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala +++ b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala @@ -32,10 +32,11 @@ import org.apache.hadoop.security.UserGroupInformation import org.apache.hadoop.util.StringInterner import org.apache.hadoop.yarn.conf.YarnConfiguration import org.apache.hadoop.yarn.api.ApplicationConstants +import org.apache.hadoop.yarn.api.records.ApplicationAccessType import org.apache.hadoop.yarn.util.RackResolver import org.apache.hadoop.conf.Configuration -import org.apache.spark.{SparkConf, SparkContext} +import org.apache.spark.{SecurityManager, SparkConf, SparkContext} import org.apache.spark.deploy.SparkHadoopUtil import org.apache.spark.util.Utils @@ -211,4 +212,12 @@ object YarnSparkHadoopUtil { } } + private[spark] def getApplicationAclsForYarn(securityMgr: SecurityManager): + Map[ApplicationAccessType, String] = { + Map[ApplicationAccessType, String] ( + ApplicationAccessType.VIEW_APP -> securityMgr.getViewAcls, + ApplicationAccessType.MODIFY_APP -> securityMgr.getModifyAcls + ) + } + } diff --git a/yarn/common/src/test/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtilSuite.scala b/yarn/common/src/test/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtilSuite.scala index 75db8ee6d468f..2cc5abb3a890c 100644 --- a/yarn/common/src/test/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtilSuite.scala +++ b/yarn/common/src/test/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtilSuite.scala @@ -23,7 +23,10 @@ import com.google.common.io.{ByteStreams, Files} import org.apache.hadoop.yarn.conf.YarnConfiguration import org.scalatest.{FunSuite, Matchers} -import org.apache.spark.{Logging, SparkConf} +import org.apache.hadoop.yarn.api.records.ApplicationAccessType + +import org.apache.spark.{Logging, SecurityManager, SparkConf} + class YarnSparkHadoopUtilSuite extends FunSuite with Matchers with Logging { @@ -74,4 +77,75 @@ class YarnSparkHadoopUtilSuite extends FunSuite with Matchers with Logging { yarnConf.get(key) should not be default.get(key) } + + test("test getApplicationAclsForYarn acls on") { + + // spark acls on, just pick up default user + val sparkConf = new SparkConf() + sparkConf.set("spark.acls.enable", "true") + + val securityMgr = new SecurityManager(sparkConf) + val acls = YarnSparkHadoopUtil.getApplicationAclsForYarn(securityMgr) + + val viewAcls = acls.get(ApplicationAccessType.VIEW_APP) + val modifyAcls = acls.get(ApplicationAccessType.MODIFY_APP) + + viewAcls match { + case Some(vacls) => { + val aclSet = vacls.split(',').map(_.trim).toSet + assert(aclSet.contains(System.getProperty("user.name", "invalid"))) + } + case None => { + fail() + } + } + modifyAcls match { + case Some(macls) => { + val aclSet = macls.split(',').map(_.trim).toSet + assert(aclSet.contains(System.getProperty("user.name", "invalid"))) + } + case None => { + fail() + } + } + } + + test("test getApplicationAclsForYarn acls on and specify users") { + + // default spark acls are on and specify acls + val sparkConf = new SparkConf() + sparkConf.set("spark.acls.enable", "true") + sparkConf.set("spark.ui.view.acls", "user1,user2") + sparkConf.set("spark.modify.acls", "user3,user4") + + val securityMgr = new SecurityManager(sparkConf) + val acls = YarnSparkHadoopUtil.getApplicationAclsForYarn(securityMgr) + + val viewAcls = acls.get(ApplicationAccessType.VIEW_APP) + val modifyAcls = acls.get(ApplicationAccessType.MODIFY_APP) + + viewAcls match { + case Some(vacls) => { + val aclSet = vacls.split(',').map(_.trim).toSet + assert(aclSet.contains("user1")) + assert(aclSet.contains("user2")) + assert(aclSet.contains(System.getProperty("user.name", "invalid"))) + } + case None => { + fail() + } + } + modifyAcls match { + case Some(macls) => { + val aclSet = macls.split(',').map(_.trim).toSet + assert(aclSet.contains("user3")) + assert(aclSet.contains("user4")) + assert(aclSet.contains(System.getProperty("user.name", "invalid"))) + } + case None => { + fail() + } + } + + } } diff --git a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala index 07ba0a4b30bd7..833be12982e71 100644 --- a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala +++ b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala @@ -35,7 +35,7 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration import org.apache.hadoop.yarn.ipc.YarnRPC import org.apache.hadoop.yarn.util.{Apps, ConverterUtils, Records} -import org.apache.spark.{SparkConf, Logging} +import org.apache.spark.{SecurityManager, SparkConf, Logging} class ExecutorRunnable( @@ -46,7 +46,8 @@ class ExecutorRunnable( slaveId: String, hostname: String, executorMemory: Int, - executorCores: Int) + executorCores: Int, + securityMgr: SecurityManager) extends Runnable with ExecutorRunnableUtil with Logging { var rpc: YarnRPC = YarnRPC.create(conf) @@ -85,6 +86,8 @@ class ExecutorRunnable( logInfo("Setting up executor with commands: " + commands) ctx.setCommands(commands) + ctx.setApplicationACLs(YarnSparkHadoopUtil.getApplicationAclsForYarn(securityMgr)) + // Send the start request to the ContainerManager nmClient.startContainer(container, ctx) } diff --git a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala index ed31457b61571..c887cb52dd9cf 100644 --- a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala +++ b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala @@ -20,7 +20,7 @@ package org.apache.spark.deploy.yarn import scala.collection.JavaConversions._ import scala.collection.mutable.{ArrayBuffer, HashMap} -import org.apache.spark.SparkConf +import org.apache.spark.{SecurityManager, SparkConf} import org.apache.spark.scheduler.SplitInfo import org.apache.hadoop.conf.Configuration @@ -39,8 +39,9 @@ private[yarn] class YarnAllocationHandler( amClient: AMRMClient[ContainerRequest], appAttemptId: ApplicationAttemptId, args: ApplicationMasterArguments, - preferredNodes: collection.Map[String, collection.Set[SplitInfo]]) - extends YarnAllocator(conf, sparkConf, args, preferredNodes) { + preferredNodes: collection.Map[String, collection.Set[SplitInfo]], + securityMgr: SecurityManager) + extends YarnAllocator(conf, sparkConf, args, preferredNodes, securityMgr) { override protected def releaseContainer(container: Container) = { amClient.releaseAssignedContainer(container.getId()) diff --git a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/YarnRMClientImpl.scala b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/YarnRMClientImpl.scala index e8b8d9bc722bd..54bc6b14c44ce 100644 --- a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/YarnRMClientImpl.scala +++ b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/YarnRMClientImpl.scala @@ -28,7 +28,7 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration import org.apache.hadoop.yarn.util.ConverterUtils import org.apache.hadoop.yarn.webapp.util.WebAppUtils -import org.apache.spark.{Logging, SparkConf} +import org.apache.spark.{Logging, SecurityManager, SparkConf} import org.apache.spark.scheduler.SplitInfo import org.apache.spark.util.Utils @@ -46,7 +46,8 @@ private class YarnRMClientImpl(args: ApplicationMasterArguments) extends YarnRMC sparkConf: SparkConf, preferredNodeLocations: Map[String, Set[SplitInfo]], uiAddress: String, - uiHistoryAddress: String) = { + uiHistoryAddress: String, + securityMgr: SecurityManager) = { amClient = AMRMClient.createAMRMClient() amClient.init(conf) amClient.start() @@ -55,7 +56,7 @@ private class YarnRMClientImpl(args: ApplicationMasterArguments) extends YarnRMC logInfo("Registering the ApplicationMaster") amClient.registerApplicationMaster(Utils.localHostName(), 0, uiAddress) new YarnAllocationHandler(conf, sparkConf, amClient, getAttemptId(), args, - preferredNodeLocations) + preferredNodeLocations, securityMgr) } override def shutdown(status: FinalApplicationStatus, diagnostics: String = "") = From 62c557609929982eeec170fe12f810bedfcf97f2 Mon Sep 17 00:00:00 2001 From: Thomas Graves Date: Fri, 5 Sep 2014 09:56:22 -0500 Subject: [PATCH 09/11] [SPARK-3375] spark on yarn container allocation issues If yarn doesn't get the containers immediately it stops asking for them and the yarn application hangs with never getting any executors. The issue here is that we are sending the number of containers as 0 after we send the original one of X. on the yarn side this clears out the original request. For a ping we should just send empty asks. Author: Thomas Graves Closes #2275 from tgravescs/SPARK-3375 and squashes the following commits: 74b6820 [Thomas Graves] send empty resource requests when we aren't asking for containers --- .../spark/deploy/yarn/YarnAllocationHandler.scala | 13 +++++++------ .../spark/deploy/yarn/YarnAllocationHandler.scala | 8 +++++--- 2 files changed, 12 insertions(+), 9 deletions(-) diff --git a/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala b/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala index 85d6274df2fcb..5a1b42c1e17d5 100644 --- a/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala +++ b/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala @@ -51,12 +51,13 @@ private[yarn] class YarnAllocationHandler( override protected def allocateContainers(count: Int): YarnAllocateResponse = { var resourceRequests: List[ResourceRequest] = null - // default. - if (count <= 0 || preferredHostToCount.isEmpty) { - logDebug("numExecutors: " + count + ", host preferences: " + - preferredHostToCount.isEmpty) - resourceRequests = List(createResourceRequest( - AllocationType.ANY, null, count, YarnSparkHadoopUtil.RM_REQUEST_PRIORITY)) + logDebug("numExecutors: " + count) + if (count <= 0) { + resourceRequests = List() + } else if (preferredHostToCount.isEmpty) { + logDebug("host preferences is empty") + resourceRequests = List(createResourceRequest( + AllocationType.ANY, null, count, YarnSparkHadoopUtil.RM_REQUEST_PRIORITY)) } else { // request for all hosts in preferred nodes and for numExecutors - // candidates.size, request by default allocation policy. diff --git a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala index c887cb52dd9cf..5438f151ac0ad 100644 --- a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala +++ b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala @@ -88,9 +88,11 @@ private[yarn] class YarnAllocationHandler( private def addResourceRequests(numExecutors: Int) { val containerRequests: List[ContainerRequest] = - if (numExecutors <= 0 || preferredHostToCount.isEmpty) { - logDebug("numExecutors: " + numExecutors + ", host preferences: " + - preferredHostToCount.isEmpty) + if (numExecutors <= 0) { + logDebug("numExecutors: " + numExecutors) + List() + } else if (preferredHostToCount.isEmpty) { + logDebug("host preferences is empty") createResourceRequests( AllocationType.ANY, resource = null, From 7ff8c45d714e0f2315910838b739c0c034672015 Mon Sep 17 00:00:00 2001 From: Kousuke Saruta Date: Fri, 5 Sep 2014 11:07:00 -0700 Subject: [PATCH 10/11] [SPARK-3399][PySpark] Test for PySpark should ignore HADOOP_CONF_DIR and YARN_CONF_DIR Author: Kousuke Saruta Closes #2270 from sarutak/SPARK-3399 and squashes the following commits: 7613be6 [Kousuke Saruta] Modified pyspark script to ignore environment variables YARN_CONF_DIR and HADOOP_CONF_DIR while testing --- bin/pyspark | 2 ++ 1 file changed, 2 insertions(+) diff --git a/bin/pyspark b/bin/pyspark index f553b314c5991..26a16dd600b7a 100755 --- a/bin/pyspark +++ b/bin/pyspark @@ -85,6 +85,8 @@ export PYSPARK_SUBMIT_ARGS # For pyspark tests if [[ -n "$SPARK_TESTING" ]]; then + unset YARN_CONF_DIR + unset HADOOP_CONF_DIR if [[ -n "$PYSPARK_DOC_TEST" ]]; then exec "$PYSPARK_PYTHON" -m doctest $1 else From ba5bcaddecd54811d45c5fc79a013b3857d4c633 Mon Sep 17 00:00:00 2001 From: Andrew Ash Date: Fri, 5 Sep 2014 18:52:05 -0700 Subject: [PATCH 11/11] SPARK-3211 .take() is OOM-prone with empty partitions Instead of jumping straight from 1 partition to all partitions, do exponential growth and double the number of partitions to attempt each time instead. Fix proposed by Paul Nepywoda Author: Andrew Ash Closes #2117 from ash211/SPARK-3211 and squashes the following commits: 8b2299a [Andrew Ash] Quadruple instead of double for a minor speedup e5f7e4d [Andrew Ash] Update comment to better reflect what we're doing 09a27f7 [Andrew Ash] Update PySpark to be less OOM-prone as well 3a156b8 [Andrew Ash] SPARK-3211 .take() is OOM-prone with empty partitions --- core/src/main/scala/org/apache/spark/rdd/RDD.scala | 7 +++---- python/pyspark/rdd.py | 8 ++++---- 2 files changed, 7 insertions(+), 8 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/rdd/RDD.scala b/core/src/main/scala/org/apache/spark/rdd/RDD.scala index af9e31ba7b720..1cf55e86f6c81 100644 --- a/core/src/main/scala/org/apache/spark/rdd/RDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/RDD.scala @@ -1064,11 +1064,10 @@ abstract class RDD[T: ClassTag]( // greater than totalParts because we actually cap it at totalParts in runJob. var numPartsToTry = 1 if (partsScanned > 0) { - // If we didn't find any rows after the first iteration, just try all partitions next. - // Otherwise, interpolate the number of partitions we need to try, but overestimate it - // by 50%. + // If we didn't find any rows after the previous iteration, quadruple and retry. Otherwise, + // interpolate the number of partitions we need to try, but overestimate it by 50%. if (buf.size == 0) { - numPartsToTry = totalParts - 1 + numPartsToTry = partsScanned * 4 } else { numPartsToTry = (1.5 * num * partsScanned / buf.size).toInt } diff --git a/python/pyspark/rdd.py b/python/pyspark/rdd.py index dff6fc26fcb18..04f13523b431d 100644 --- a/python/pyspark/rdd.py +++ b/python/pyspark/rdd.py @@ -1089,11 +1089,11 @@ def take(self, num): # we actually cap it at totalParts in runJob. numPartsToTry = 1 if partsScanned > 0: - # If we didn't find any rows after the first iteration, just - # try all partitions next. Otherwise, interpolate the number - # of partitions we need to try, but overestimate it by 50%. + # If we didn't find any rows after the previous iteration, + # quadruple and retry. Otherwise, interpolate the number of + # partitions we need to try, but overestimate it by 50%. if len(items) == 0: - numPartsToTry = totalParts - 1 + numPartsToTry = partsScanned * 4 else: numPartsToTry = int(1.5 * num * partsScanned / len(items))