diff --git a/README.md b/README.md
index 0a683a460ffac..5b09ad86849e7 100644
--- a/README.md
+++ b/README.md
@@ -4,8 +4,8 @@ Spark is a fast and general cluster computing system for Big Data. It provides
high-level APIs in Scala, Java, and Python, and an optimized engine that
supports general computation graphs for data analysis. It also supports a
rich set of higher-level tools including Spark SQL for SQL and structured
-data processing, MLLib for machine learning, GraphX for graph processing,
-and Spark Streaming.
+data processing, MLlib for machine learning, GraphX for graph processing,
+and Spark Streaming for stream processing.
diff --git a/bin/pyspark b/bin/pyspark
index f553b314c5991..26a16dd600b7a 100755
--- a/bin/pyspark
+++ b/bin/pyspark
@@ -85,6 +85,8 @@ export PYSPARK_SUBMIT_ARGS
# For pyspark tests
if [[ -n "$SPARK_TESTING" ]]; then
+ unset YARN_CONF_DIR
+ unset HADOOP_CONF_DIR
if [[ -n "$PYSPARK_DOC_TEST" ]]; then
exec "$PYSPARK_PYTHON" -m doctest $1
else
diff --git a/core/src/main/scala/org/apache/spark/rdd/RDD.scala b/core/src/main/scala/org/apache/spark/rdd/RDD.scala
index af9e31ba7b720..1cf55e86f6c81 100644
--- a/core/src/main/scala/org/apache/spark/rdd/RDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/RDD.scala
@@ -1064,11 +1064,10 @@ abstract class RDD[T: ClassTag](
// greater than totalParts because we actually cap it at totalParts in runJob.
var numPartsToTry = 1
if (partsScanned > 0) {
- // If we didn't find any rows after the first iteration, just try all partitions next.
- // Otherwise, interpolate the number of partitions we need to try, but overestimate it
- // by 50%.
+ // If we didn't find any rows after the previous iteration, quadruple and retry. Otherwise,
+ // interpolate the number of partitions we need to try, but overestimate it by 50%.
if (buf.size == 0) {
- numPartsToTry = totalParts - 1
+ numPartsToTry = partsScanned * 4
} else {
numPartsToTry = (1.5 * num * partsScanned / buf.size).toInt
}
diff --git a/ec2/spark_ec2.py b/ec2/spark_ec2.py
index eed6eb8485183..1670faca4a480 100755
--- a/ec2/spark_ec2.py
+++ b/ec2/spark_ec2.py
@@ -102,9 +102,17 @@ def parse_args():
"(for debugging)")
parser.add_option(
"--ebs-vol-size", metavar="SIZE", type="int", default=0,
- help="Attach a new EBS volume of size SIZE (in GB) to each node as " +
- "/vol. The volumes will be deleted when the instances terminate. " +
- "Only possible on EBS-backed AMIs.")
+ help="Size (in GB) of each EBS volume.")
+ parser.add_option(
+ "--ebs-vol-type", default="standard",
+ help="EBS volume type (e.g. 'gp2', 'standard').")
+ parser.add_option(
+ "--ebs-vol-num", type="int", default=1,
+ help="Number of EBS volumes to attach to each node as /vol[x]. " +
+ "The volumes will be deleted when the instances terminate. " +
+ "Only possible on EBS-backed AMIs. " +
+ "EBS volumes are only attached if --ebs-vol-size > 0." +
+ "Only support up to 8 EBS volumes.")
parser.add_option(
"--swap", metavar="SWAP", type="int", default=1024,
help="Swap space to set up per node, in MB (default: 1024)")
@@ -348,13 +356,16 @@ def launch_cluster(conn, opts, cluster_name):
print >> stderr, "Could not find AMI " + opts.ami
sys.exit(1)
- # Create block device mapping so that we can add an EBS volume if asked to
+ # Create block device mapping so that we can add EBS volumes if asked to.
+ # The first drive is attached as /dev/sds, 2nd as /dev/sdt, ... /dev/sdz
block_map = BlockDeviceMapping()
if opts.ebs_vol_size > 0:
- device = EBSBlockDeviceType()
- device.size = opts.ebs_vol_size
- device.delete_on_termination = True
- block_map["/dev/sdv"] = device
+ for i in range(opts.ebs_vol_num):
+ device = EBSBlockDeviceType()
+ device.size = opts.ebs_vol_size
+ device.volume_type=opts.ebs_vol_type
+ device.delete_on_termination = True
+ block_map["/dev/sd" + chr(ord('s') + i)] = device
# AWS ignores the AMI-specified block device mapping for M3 (see SPARK-3342).
if opts.instance_type.startswith('m3.'):
@@ -828,6 +839,12 @@ def get_partition(total, num_partitions, current_partitions):
def real_main():
(opts, action, cluster_name) = parse_args()
+
+ # Input parameter validation
+ if opts.ebs_vol_num > 8:
+ print >> stderr, "ebs-vol-num cannot be greater than 8"
+ sys.exit(1)
+
try:
conn = ec2.connect_to_region(opts.region)
except Exception as e:
diff --git a/python/pyspark/rdd.py b/python/pyspark/rdd.py
index dff6fc26fcb18..04f13523b431d 100644
--- a/python/pyspark/rdd.py
+++ b/python/pyspark/rdd.py
@@ -1089,11 +1089,11 @@ def take(self, num):
# we actually cap it at totalParts in runJob.
numPartsToTry = 1
if partsScanned > 0:
- # If we didn't find any rows after the first iteration, just
- # try all partitions next. Otherwise, interpolate the number
- # of partitions we need to try, but overestimate it by 50%.
+ # If we didn't find any rows after the previous iteration,
+ # quadruple and retry. Otherwise, interpolate the number of
+ # partitions we need to try, but overestimate it by 50%.
if len(items) == 0:
- numPartsToTry = totalParts - 1
+ numPartsToTry = partsScanned * 4
else:
numPartsToTry = int(1.5 * num * partsScanned / len(items))
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
index a75af94d29303..5acb45c155ba5 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
@@ -272,7 +272,7 @@ class SQLContext(@transient val sparkContext: SparkContext)
val currentTable = table(tableName).queryExecution.analyzed
val asInMemoryRelation = currentTable match {
case _: InMemoryRelation =>
- currentTable.logicalPlan
+ currentTable
case _ =>
InMemoryRelation(useCompression, columnBatchSize, executePlan(currentTable).executedPlan)
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/commands.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/commands.scala
index 286c6d264f86a..94543fc95b470 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/commands.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/commands.scala
@@ -60,10 +60,10 @@ case class SetCommand(
logWarning(s"Property ${SQLConf.Deprecated.MAPRED_REDUCE_TASKS} is deprecated, " +
s"automatically converted to ${SQLConf.SHUFFLE_PARTITIONS} instead.")
context.setConf(SQLConf.SHUFFLE_PARTITIONS, v)
- Array(Row(s"${SQLConf.SHUFFLE_PARTITIONS}=$v"))
+ Seq(Row(s"${SQLConf.SHUFFLE_PARTITIONS}=$v"))
} else {
context.setConf(k, v)
- Array(Row(s"$k=$v"))
+ Seq(Row(s"$k=$v"))
}
// Query the value bound to key k.
@@ -78,11 +78,19 @@ case class SetCommand(
"hive-hwi-0.12.0.jar",
"hive-0.12.0.jar").mkString(":")
- Array(
+ context.getAllConfs.map { case (k, v) =>
+ Row(s"$k=$v")
+ }.toSeq ++ Seq(
Row("system:java.class.path=" + hiveJars),
Row("system:sun.java.command=shark.SharkServer2"))
} else {
- Array(Row(s"$k=${context.getConf(k, "")}"))
+ if (k == SQLConf.Deprecated.MAPRED_REDUCE_TASKS) {
+ logWarning(s"Property ${SQLConf.Deprecated.MAPRED_REDUCE_TASKS} is deprecated, " +
+ s"showing ${SQLConf.SHUFFLE_PARTITIONS} instead.")
+ Seq(Row(s"${SQLConf.SHUFFLE_PARTITIONS}=${context.numShufflePartitions}"))
+ } else {
+ Seq(Row(s"$k=${context.getConf(k, "")}"))
+ }
}
// Query all key-value pairs that are set in the SQLConf of the context.
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala
index a4dd6be5f9e35..c98287c6aa662 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala
@@ -44,6 +44,8 @@ private[hive] case class SourceCommand(filePath: String) extends Command
private[hive] case class AddFile(filePath: String) extends Command
+private[hive] case class AddJar(path: String) extends Command
+
private[hive] case class DropTable(tableName: String, ifExists: Boolean) extends Command
private[hive] case class AnalyzeTable(tableName: String) extends Command
@@ -231,7 +233,7 @@ private[hive] object HiveQl {
} else if (sql.trim.toLowerCase.startsWith("uncache table")) {
CacheCommand(sql.trim.drop(14).trim, false)
} else if (sql.trim.toLowerCase.startsWith("add jar")) {
- NativeCommand(sql)
+ AddJar(sql.trim.drop(8).trim)
} else if (sql.trim.toLowerCase.startsWith("add file")) {
AddFile(sql.trim.drop(9))
} else if (sql.trim.toLowerCase.startsWith("dfs")) {
@@ -1018,9 +1020,9 @@ private[hive] object HiveQl {
/* Other functions */
case Token("TOK_FUNCTION", Token(RAND(), Nil) :: Nil) => Rand
- case Token("TOK_FUNCTION", Token(SUBSTR(), Nil) :: string :: pos :: Nil) =>
+ case Token("TOK_FUNCTION", Token(SUBSTR(), Nil) :: string :: pos :: Nil) =>
Substring(nodeToExpr(string), nodeToExpr(pos), Literal(Integer.MAX_VALUE, IntegerType))
- case Token("TOK_FUNCTION", Token(SUBSTR(), Nil) :: string :: pos :: length :: Nil) =>
+ case Token("TOK_FUNCTION", Token(SUBSTR(), Nil) :: string :: pos :: length :: Nil) =>
Substring(nodeToExpr(string), nodeToExpr(pos), nodeToExpr(length))
/* UDFs - Must be last otherwise will preempt built in functions */
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala
index 24abb1b5bd1a8..72cc01cdf4c84 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala
@@ -195,11 +195,12 @@ private[hive] trait HiveStrategies {
case class HiveCommandStrategy(context: HiveContext) extends Strategy {
def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
- case logical.NativeCommand(sql) =>
- NativeCommand(sql, plan.output)(context) :: Nil
+ case logical.NativeCommand(sql) => NativeCommand(sql, plan.output)(context) :: Nil
case hive.DropTable(tableName, ifExists) => execution.DropTable(tableName, ifExists) :: Nil
+ case hive.AddJar(path) => execution.AddJar(path) :: Nil
+
case hive.AnalyzeTable(tableName) => execution.AnalyzeTable(tableName) :: Nil
case describe: logical.DescribeCommand =>
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/commands.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/commands.scala
index a1a4aa7de7bf7..d61c5e274a596 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/commands.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/commands.scala
@@ -60,3 +60,19 @@ case class DropTable(tableName: String, ifExists: Boolean) extends LeafNode with
Seq.empty[Row]
}
}
+
+/**
+ * :: DeveloperApi ::
+ */
+@DeveloperApi
+case class AddJar(path: String) extends LeafNode with Command {
+ def hiveContext = sqlContext.asInstanceOf[HiveContext]
+
+ override def output = Seq.empty
+
+ override protected[sql] lazy val sideEffectResult: Seq[Row] = {
+ hiveContext.runSqlHive(s"ADD JAR $path")
+ hiveContext.sparkContext.addJar(path)
+ Seq.empty[Row]
+ }
+}
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala
index c4abb3eb4861f..f4217a52c3822 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala
@@ -17,9 +17,11 @@
package org.apache.spark.sql.hive.execution
+import java.io.File
+
import scala.util.Try
-import org.apache.spark.sql.{SchemaRDD, Row}
+import org.apache.spark.SparkException
import org.apache.spark.sql.hive._
import org.apache.spark.sql.hive.test.TestHive
import org.apache.spark.sql.hive.test.TestHive._
@@ -313,7 +315,7 @@ class HiveQuerySuite extends HiveComparisonTest {
"SELECT srcalias.KEY, SRCALIAS.value FROM sRc SrCAlias WHERE SrCAlias.kEy < 15")
test("case sensitivity: registered table") {
- val testData: SchemaRDD =
+ val testData =
TestHive.sparkContext.parallelize(
TestData(1, "str1") ::
TestData(2, "str2") :: Nil)
@@ -467,7 +469,7 @@ class HiveQuerySuite extends HiveComparisonTest {
}
// Describe a registered temporary table.
- val testData: SchemaRDD =
+ val testData =
TestHive.sparkContext.parallelize(
TestData(1, "str1") ::
TestData(1, "str2") :: Nil)
@@ -495,6 +497,23 @@ class HiveQuerySuite extends HiveComparisonTest {
}
}
+ test("ADD JAR command") {
+ val testJar = TestHive.getHiveFile("data/files/TestSerDe.jar").getCanonicalPath
+ sql("CREATE TABLE alter1(a INT, b INT)")
+ intercept[Exception] {
+ sql(
+ """ALTER TABLE alter1 SET SERDE 'org.apache.hadoop.hive.serde2.TestSerDe'
+ |WITH serdeproperties('s1'='9')
+ """.stripMargin)
+ }
+ sql(s"ADD JAR $testJar")
+ sql(
+ """ALTER TABLE alter1 SET SERDE 'org.apache.hadoop.hive.serde2.TestSerDe'
+ |WITH serdeproperties('s1'='9')
+ """.stripMargin)
+ sql("DROP TABLE alter1")
+ }
+
test("parse HQL set commands") {
// Adapted from its SQL counterpart.
val testKey = "spark.sql.key.usedfortestonly"
diff --git a/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala b/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala
index 7dae248e3e7db..10cbeb8b94325 100644
--- a/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala
+++ b/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala
@@ -35,7 +35,7 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration
import org.apache.hadoop.yarn.ipc.YarnRPC
import org.apache.hadoop.yarn.util.{Apps, ConverterUtils, Records, ProtoUtils}
-import org.apache.spark.{SparkConf, Logging}
+import org.apache.spark.{SecurityManager, SparkConf, Logging}
class ExecutorRunnable(
@@ -46,7 +46,8 @@ class ExecutorRunnable(
slaveId: String,
hostname: String,
executorMemory: Int,
- executorCores: Int)
+ executorCores: Int,
+ securityMgr: SecurityManager)
extends Runnable with ExecutorRunnableUtil with Logging {
var rpc: YarnRPC = YarnRPC.create(conf)
@@ -86,6 +87,8 @@ class ExecutorRunnable(
logInfo("Setting up executor with commands: " + commands)
ctx.setCommands(commands)
+ ctx.setApplicationACLs(YarnSparkHadoopUtil.getApplicationAclsForYarn(securityMgr))
+
// Send the start request to the ContainerManager
val startReq = Records.newRecord(classOf[StartContainerRequest])
.asInstanceOf[StartContainerRequest]
diff --git a/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala b/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala
index 9f9e16c06452b..5a1b42c1e17d5 100644
--- a/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala
+++ b/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala
@@ -23,7 +23,7 @@ import java.util.concurrent.atomic.AtomicInteger
import scala.collection.JavaConversions._
import scala.collection.mutable.{ArrayBuffer, HashMap}
-import org.apache.spark.SparkConf
+import org.apache.spark.{SecurityManager, SparkConf}
import org.apache.spark.scheduler.SplitInfo
import org.apache.hadoop.conf.Configuration
@@ -41,8 +41,9 @@ private[yarn] class YarnAllocationHandler(
resourceManager: AMRMProtocol,
appAttemptId: ApplicationAttemptId,
args: ApplicationMasterArguments,
- preferredNodes: collection.Map[String, collection.Set[SplitInfo]])
- extends YarnAllocator(conf, sparkConf, args, preferredNodes) {
+ preferredNodes: collection.Map[String, collection.Set[SplitInfo]],
+ securityMgr: SecurityManager)
+ extends YarnAllocator(conf, sparkConf, args, preferredNodes, securityMgr) {
private val lastResponseId = new AtomicInteger()
private val releaseList: CopyOnWriteArrayList[ContainerId] = new CopyOnWriteArrayList()
@@ -50,12 +51,13 @@ private[yarn] class YarnAllocationHandler(
override protected def allocateContainers(count: Int): YarnAllocateResponse = {
var resourceRequests: List[ResourceRequest] = null
- // default.
- if (count <= 0 || preferredHostToCount.isEmpty) {
- logDebug("numExecutors: " + count + ", host preferences: " +
- preferredHostToCount.isEmpty)
- resourceRequests = List(createResourceRequest(
- AllocationType.ANY, null, count, YarnSparkHadoopUtil.RM_REQUEST_PRIORITY))
+ logDebug("numExecutors: " + count)
+ if (count <= 0) {
+ resourceRequests = List()
+ } else if (preferredHostToCount.isEmpty) {
+ logDebug("host preferences is empty")
+ resourceRequests = List(createResourceRequest(
+ AllocationType.ANY, null, count, YarnSparkHadoopUtil.RM_REQUEST_PRIORITY))
} else {
// request for all hosts in preferred nodes and for numExecutors -
// candidates.size, request by default allocation policy.
diff --git a/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/YarnRMClientImpl.scala b/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/YarnRMClientImpl.scala
index cc5392192ec51..ad27a9ab781d2 100644
--- a/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/YarnRMClientImpl.scala
+++ b/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/YarnRMClientImpl.scala
@@ -27,7 +27,7 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration
import org.apache.hadoop.yarn.ipc.YarnRPC
import org.apache.hadoop.yarn.util.{ConverterUtils, Records}
-import org.apache.spark.{Logging, SparkConf}
+import org.apache.spark.{Logging, SecurityManager, SparkConf}
import org.apache.spark.scheduler.SplitInfo
import org.apache.spark.util.Utils
@@ -45,7 +45,8 @@ private class YarnRMClientImpl(args: ApplicationMasterArguments) extends YarnRMC
sparkConf: SparkConf,
preferredNodeLocations: Map[String, Set[SplitInfo]],
uiAddress: String,
- uiHistoryAddress: String) = {
+ uiHistoryAddress: String,
+ securityMgr: SecurityManager) = {
this.rpc = YarnRPC.create(conf)
this.uiHistoryAddress = uiHistoryAddress
@@ -53,7 +54,7 @@ private class YarnRMClientImpl(args: ApplicationMasterArguments) extends YarnRMC
registerApplicationMaster(uiAddress)
new YarnAllocationHandler(conf, sparkConf, resourceManager, getAttemptId(), args,
- preferredNodeLocations)
+ preferredNodeLocations, securityMgr)
}
override def getAttemptId() = {
diff --git a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
index 98039a20de245..a879c833a014f 100644
--- a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
+++ b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
@@ -116,7 +116,7 @@ private[spark] class ApplicationMaster(args: ApplicationMasterArguments,
val securityMgr = new SecurityManager(sparkConf)
if (isDriver) {
- runDriver()
+ runDriver(securityMgr)
} else {
runExecutorLauncher(securityMgr)
}
@@ -157,7 +157,7 @@ private[spark] class ApplicationMaster(args: ApplicationMasterArguments,
sparkContextRef.compareAndSet(sc, null)
}
- private def registerAM(uiAddress: String) = {
+ private def registerAM(uiAddress: String, securityMgr: SecurityManager) = {
val sc = sparkContextRef.get()
val appId = client.getAttemptId().getApplicationId().toString()
@@ -170,13 +170,14 @@ private[spark] class ApplicationMaster(args: ApplicationMasterArguments,
if (sc != null) sc.getConf else sparkConf,
if (sc != null) sc.preferredNodeLocationData else Map(),
uiAddress,
- historyAddress)
+ historyAddress,
+ securityMgr)
allocator.allocateResources()
reporterThread = launchReporterThread()
}
- private def runDriver(): Unit = {
+ private def runDriver(securityMgr: SecurityManager): Unit = {
addAmIpFilter()
val userThread = startUserClass()
@@ -188,7 +189,7 @@ private[spark] class ApplicationMaster(args: ApplicationMasterArguments,
if (sc == null) {
finish(FinalApplicationStatus.FAILED, "Timed out waiting for SparkContext.")
} else {
- registerAM(sc.ui.appUIHostPort)
+ registerAM(sc.ui.appUIHostPort, securityMgr)
try {
userThread.join()
} finally {
@@ -203,7 +204,7 @@ private[spark] class ApplicationMaster(args: ApplicationMasterArguments,
conf = sparkConf, securityManager = securityMgr)._1
actor = waitForSparkDriver()
addAmIpFilter()
- registerAM(sparkConf.get("spark.driver.appUIAddress", ""))
+ registerAM(sparkConf.get("spark.driver.appUIAddress", ""), securityMgr)
// In client mode the actor will stop the reporter thread.
reporterThread.join()
diff --git a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientBase.scala b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientBase.scala
index 5d8e5e6dffe7f..8075b7a7fb837 100644
--- a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientBase.scala
+++ b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientBase.scala
@@ -430,10 +430,8 @@ trait ClientBase extends Logging {
// send the acl settings into YARN to control who has access via YARN interfaces
val securityManager = new SecurityManager(sparkConf)
- val acls = Map[ApplicationAccessType, String] (
- ApplicationAccessType.VIEW_APP -> securityManager.getViewAcls,
- ApplicationAccessType.MODIFY_APP -> securityManager.getModifyAcls)
- amContainer.setApplicationACLs(acls)
+ amContainer.setApplicationACLs(YarnSparkHadoopUtil.getApplicationAclsForYarn(securityManager))
+
amContainer
}
}
diff --git a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala
index c74dd1c2b21dc..02b9a81bf6b50 100644
--- a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala
+++ b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala
@@ -28,7 +28,7 @@ import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.yarn.api.records._
import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse
-import org.apache.spark.{Logging, SparkConf, SparkEnv}
+import org.apache.spark.{Logging, SecurityManager, SparkConf, SparkEnv}
import org.apache.spark.scheduler.{SplitInfo, TaskSchedulerImpl}
import org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend
@@ -55,7 +55,8 @@ private[yarn] abstract class YarnAllocator(
conf: Configuration,
sparkConf: SparkConf,
args: ApplicationMasterArguments,
- preferredNodes: collection.Map[String, collection.Set[SplitInfo]])
+ preferredNodes: collection.Map[String, collection.Set[SplitInfo]],
+ securityMgr: SecurityManager)
extends Logging {
// These three are locked on allocatedHostToContainersMap. Complementary data structures
@@ -280,7 +281,8 @@ private[yarn] abstract class YarnAllocator(
executorId,
executorHostname,
executorMemory,
- executorCores)
+ executorCores,
+ securityMgr)
new Thread(executorRunnable).start()
}
}
@@ -444,4 +446,4 @@ private[yarn] abstract class YarnAllocator(
}
-}
\ No newline at end of file
+}
diff --git a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/YarnRMClient.scala b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/YarnRMClient.scala
index 922d7d1a854a5..ed65e56b3e413 100644
--- a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/YarnRMClient.scala
+++ b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/YarnRMClient.scala
@@ -22,7 +22,7 @@ import scala.collection.{Map, Set}
import org.apache.hadoop.yarn.conf.YarnConfiguration
import org.apache.hadoop.yarn.api.records._
-import org.apache.spark.{SparkConf, SparkContext}
+import org.apache.spark.{SecurityManager, SparkConf, SparkContext}
import org.apache.spark.scheduler.SplitInfo
/**
@@ -45,7 +45,8 @@ trait YarnRMClient {
sparkConf: SparkConf,
preferredNodeLocations: Map[String, Set[SplitInfo]],
uiAddress: String,
- uiHistoryAddress: String): YarnAllocator
+ uiHistoryAddress: String,
+ securityMgr: SecurityManager): YarnAllocator
/**
* Shuts down the AM. Guaranteed to only be called once.
diff --git a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala
index dc77f1236492d..4a33e34c3bfc7 100644
--- a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala
+++ b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala
@@ -32,10 +32,11 @@ import org.apache.hadoop.security.UserGroupInformation
import org.apache.hadoop.util.StringInterner
import org.apache.hadoop.yarn.conf.YarnConfiguration
import org.apache.hadoop.yarn.api.ApplicationConstants
+import org.apache.hadoop.yarn.api.records.ApplicationAccessType
import org.apache.hadoop.yarn.util.RackResolver
import org.apache.hadoop.conf.Configuration
-import org.apache.spark.{SparkConf, SparkContext}
+import org.apache.spark.{SecurityManager, SparkConf, SparkContext}
import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.util.Utils
@@ -211,4 +212,12 @@ object YarnSparkHadoopUtil {
}
}
+ private[spark] def getApplicationAclsForYarn(securityMgr: SecurityManager):
+ Map[ApplicationAccessType, String] = {
+ Map[ApplicationAccessType, String] (
+ ApplicationAccessType.VIEW_APP -> securityMgr.getViewAcls,
+ ApplicationAccessType.MODIFY_APP -> securityMgr.getModifyAcls
+ )
+ }
+
}
diff --git a/yarn/common/src/test/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtilSuite.scala b/yarn/common/src/test/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtilSuite.scala
index 75db8ee6d468f..2cc5abb3a890c 100644
--- a/yarn/common/src/test/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtilSuite.scala
+++ b/yarn/common/src/test/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtilSuite.scala
@@ -23,7 +23,10 @@ import com.google.common.io.{ByteStreams, Files}
import org.apache.hadoop.yarn.conf.YarnConfiguration
import org.scalatest.{FunSuite, Matchers}
-import org.apache.spark.{Logging, SparkConf}
+import org.apache.hadoop.yarn.api.records.ApplicationAccessType
+
+import org.apache.spark.{Logging, SecurityManager, SparkConf}
+
class YarnSparkHadoopUtilSuite extends FunSuite with Matchers with Logging {
@@ -74,4 +77,75 @@ class YarnSparkHadoopUtilSuite extends FunSuite with Matchers with Logging {
yarnConf.get(key) should not be default.get(key)
}
+
+ test("test getApplicationAclsForYarn acls on") {
+
+ // spark acls on, just pick up default user
+ val sparkConf = new SparkConf()
+ sparkConf.set("spark.acls.enable", "true")
+
+ val securityMgr = new SecurityManager(sparkConf)
+ val acls = YarnSparkHadoopUtil.getApplicationAclsForYarn(securityMgr)
+
+ val viewAcls = acls.get(ApplicationAccessType.VIEW_APP)
+ val modifyAcls = acls.get(ApplicationAccessType.MODIFY_APP)
+
+ viewAcls match {
+ case Some(vacls) => {
+ val aclSet = vacls.split(',').map(_.trim).toSet
+ assert(aclSet.contains(System.getProperty("user.name", "invalid")))
+ }
+ case None => {
+ fail()
+ }
+ }
+ modifyAcls match {
+ case Some(macls) => {
+ val aclSet = macls.split(',').map(_.trim).toSet
+ assert(aclSet.contains(System.getProperty("user.name", "invalid")))
+ }
+ case None => {
+ fail()
+ }
+ }
+ }
+
+ test("test getApplicationAclsForYarn acls on and specify users") {
+
+ // default spark acls are on and specify acls
+ val sparkConf = new SparkConf()
+ sparkConf.set("spark.acls.enable", "true")
+ sparkConf.set("spark.ui.view.acls", "user1,user2")
+ sparkConf.set("spark.modify.acls", "user3,user4")
+
+ val securityMgr = new SecurityManager(sparkConf)
+ val acls = YarnSparkHadoopUtil.getApplicationAclsForYarn(securityMgr)
+
+ val viewAcls = acls.get(ApplicationAccessType.VIEW_APP)
+ val modifyAcls = acls.get(ApplicationAccessType.MODIFY_APP)
+
+ viewAcls match {
+ case Some(vacls) => {
+ val aclSet = vacls.split(',').map(_.trim).toSet
+ assert(aclSet.contains("user1"))
+ assert(aclSet.contains("user2"))
+ assert(aclSet.contains(System.getProperty("user.name", "invalid")))
+ }
+ case None => {
+ fail()
+ }
+ }
+ modifyAcls match {
+ case Some(macls) => {
+ val aclSet = macls.split(',').map(_.trim).toSet
+ assert(aclSet.contains("user3"))
+ assert(aclSet.contains("user4"))
+ assert(aclSet.contains(System.getProperty("user.name", "invalid")))
+ }
+ case None => {
+ fail()
+ }
+ }
+
+ }
}
diff --git a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala
index 07ba0a4b30bd7..833be12982e71 100644
--- a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala
+++ b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala
@@ -35,7 +35,7 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration
import org.apache.hadoop.yarn.ipc.YarnRPC
import org.apache.hadoop.yarn.util.{Apps, ConverterUtils, Records}
-import org.apache.spark.{SparkConf, Logging}
+import org.apache.spark.{SecurityManager, SparkConf, Logging}
class ExecutorRunnable(
@@ -46,7 +46,8 @@ class ExecutorRunnable(
slaveId: String,
hostname: String,
executorMemory: Int,
- executorCores: Int)
+ executorCores: Int,
+ securityMgr: SecurityManager)
extends Runnable with ExecutorRunnableUtil with Logging {
var rpc: YarnRPC = YarnRPC.create(conf)
@@ -85,6 +86,8 @@ class ExecutorRunnable(
logInfo("Setting up executor with commands: " + commands)
ctx.setCommands(commands)
+ ctx.setApplicationACLs(YarnSparkHadoopUtil.getApplicationAclsForYarn(securityMgr))
+
// Send the start request to the ContainerManager
nmClient.startContainer(container, ctx)
}
diff --git a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala
index ed31457b61571..5438f151ac0ad 100644
--- a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala
+++ b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala
@@ -20,7 +20,7 @@ package org.apache.spark.deploy.yarn
import scala.collection.JavaConversions._
import scala.collection.mutable.{ArrayBuffer, HashMap}
-import org.apache.spark.SparkConf
+import org.apache.spark.{SecurityManager, SparkConf}
import org.apache.spark.scheduler.SplitInfo
import org.apache.hadoop.conf.Configuration
@@ -39,8 +39,9 @@ private[yarn] class YarnAllocationHandler(
amClient: AMRMClient[ContainerRequest],
appAttemptId: ApplicationAttemptId,
args: ApplicationMasterArguments,
- preferredNodes: collection.Map[String, collection.Set[SplitInfo]])
- extends YarnAllocator(conf, sparkConf, args, preferredNodes) {
+ preferredNodes: collection.Map[String, collection.Set[SplitInfo]],
+ securityMgr: SecurityManager)
+ extends YarnAllocator(conf, sparkConf, args, preferredNodes, securityMgr) {
override protected def releaseContainer(container: Container) = {
amClient.releaseAssignedContainer(container.getId())
@@ -87,9 +88,11 @@ private[yarn] class YarnAllocationHandler(
private def addResourceRequests(numExecutors: Int) {
val containerRequests: List[ContainerRequest] =
- if (numExecutors <= 0 || preferredHostToCount.isEmpty) {
- logDebug("numExecutors: " + numExecutors + ", host preferences: " +
- preferredHostToCount.isEmpty)
+ if (numExecutors <= 0) {
+ logDebug("numExecutors: " + numExecutors)
+ List()
+ } else if (preferredHostToCount.isEmpty) {
+ logDebug("host preferences is empty")
createResourceRequests(
AllocationType.ANY,
resource = null,
diff --git a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/YarnRMClientImpl.scala b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/YarnRMClientImpl.scala
index e8b8d9bc722bd..54bc6b14c44ce 100644
--- a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/YarnRMClientImpl.scala
+++ b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/YarnRMClientImpl.scala
@@ -28,7 +28,7 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration
import org.apache.hadoop.yarn.util.ConverterUtils
import org.apache.hadoop.yarn.webapp.util.WebAppUtils
-import org.apache.spark.{Logging, SparkConf}
+import org.apache.spark.{Logging, SecurityManager, SparkConf}
import org.apache.spark.scheduler.SplitInfo
import org.apache.spark.util.Utils
@@ -46,7 +46,8 @@ private class YarnRMClientImpl(args: ApplicationMasterArguments) extends YarnRMC
sparkConf: SparkConf,
preferredNodeLocations: Map[String, Set[SplitInfo]],
uiAddress: String,
- uiHistoryAddress: String) = {
+ uiHistoryAddress: String,
+ securityMgr: SecurityManager) = {
amClient = AMRMClient.createAMRMClient()
amClient.init(conf)
amClient.start()
@@ -55,7 +56,7 @@ private class YarnRMClientImpl(args: ApplicationMasterArguments) extends YarnRMC
logInfo("Registering the ApplicationMaster")
amClient.registerApplicationMaster(Utils.localHostName(), 0, uiAddress)
new YarnAllocationHandler(conf, sparkConf, amClient, getAttemptId(), args,
- preferredNodeLocations)
+ preferredNodeLocations, securityMgr)
}
override def shutdown(status: FinalApplicationStatus, diagnostics: String = "") =