Skip to content

Commit

Permalink
[SPARK-5493] [core] Add option to impersonate user.
Browse files Browse the repository at this point in the history
Hadoop has a feature that allows users to impersonate other users
when submitting applications or talking to HDFS, for example. These
impersonated users are referred generally as "proxy users".

Services such as Oozie or Hive use this feature to run applications
as the requesting user.

This change makes SparkSubmit accept a new command line option to
run the application as a proxy user. It also fixes the plumbing
of the user name through the UI (and a couple of other places) to
refer to the correct user running the application, which can be
different that `sys.props("user.name")` even without proxies (e.g.
when using kerberos).
  • Loading branch information
Marcelo Vanzin committed Feb 5, 2015
1 parent c4b1108 commit 0540d38
Show file tree
Hide file tree
Showing 8 changed files with 50 additions and 30 deletions.
3 changes: 2 additions & 1 deletion bin/utils.sh
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,8 @@ function gatherSparkSubmitOpts() {
--master | --deploy-mode | --class | --name | --jars | --packages | --py-files | --files | \
--conf | --repositories | --properties-file | --driver-memory | --driver-java-options | \
--driver-library-path | --driver-class-path | --executor-memory | --driver-cores | \
--total-executor-cores | --executor-cores | --queue | --num-executors | --archives)
--total-executor-cores | --executor-cores | --queue | --num-executors | --archives |
--proxy-user)
if [[ $# -lt 2 ]]; then
"$SUBMIT_USAGE_FUNCTION"
exit 1;
Expand Down
1 change: 1 addition & 0 deletions bin/windows-utils.cmd
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ SET opts="%opts:~1,-1% \<--conf\> \<--properties-file\> \<--driver-memory\> \<--
SET opts="%opts:~1,-1% \<--driver-library-path\> \<--driver-class-path\> \<--executor-memory\>"
SET opts="%opts:~1,-1% \<--driver-cores\> \<--total-executor-cores\> \<--executor-cores\> \<--queue\>"
SET opts="%opts:~1,-1% \<--num-executors\> \<--archives\> \<--packages\> \<--repositories\>"
SET opts="%opts:~1,-1% \<--proxy-user\>"

echo %1 | findstr %opts% >nul
if %ERRORLEVEL% equ 0 (
Expand Down
3 changes: 2 additions & 1 deletion core/src/main/scala/org/apache/spark/SecurityManager.scala
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import org.apache.hadoop.io.Text

import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.network.sasl.SecretKeyHolder
import org.apache.spark.util.Utils

/**
* Spark class responsible for security.
Expand Down Expand Up @@ -203,7 +204,7 @@ private[spark] class SecurityManager(sparkConf: SparkConf)

// always add the current user and SPARK_USER to the viewAcls
private val defaultAclUsers = Set[String](System.getProperty("user.name", ""),
Option(System.getenv("SPARK_USER")).getOrElse("")).filter(!_.isEmpty)
Utils.getCurrentUserName())

setViewAcls(defaultAclUsers, sparkConf.get("spark.ui.view.acls", ""))
setModifyAcls(defaultAclUsers, sparkConf.get("spark.modify.acls", ""))
Expand Down
16 changes: 5 additions & 11 deletions core/src/main/scala/org/apache/spark/SparkContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -191,7 +191,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli

// log out Spark Version in Spark driver log
logInfo(s"Running Spark version $SPARK_VERSION")

private[spark] val conf = config.clone()
conf.validateSettings()

Expand Down Expand Up @@ -330,11 +330,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
executorEnvs ++= conf.getExecutorEnv

// Set SPARK_USER for user who is running SparkContext.
val sparkUser = Option {
Option(System.getenv("SPARK_USER")).getOrElse(System.getProperty("user.name"))
}.getOrElse {
SparkContext.SPARK_UNKNOWN_USER
}
val sparkUser = Utils.getCurrentUserName()
executorEnvs("SPARK_USER") = sparkUser

// Create and start the scheduler
Expand Down Expand Up @@ -818,7 +814,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
vClass: Class[V],
conf: Configuration = hadoopConfiguration): RDD[(K, V)] = {
assertNotStopped()
// The call to new NewHadoopJob automatically adds security credentials to conf,
// The call to new NewHadoopJob automatically adds security credentials to conf,
// so we don't need to explicitly add them ourselves
val job = new NewHadoopJob(conf)
NewFileInputFormat.addInputPath(job, new Path(path))
Expand Down Expand Up @@ -1590,8 +1586,8 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
@deprecated("use defaultMinPartitions", "1.0.0")
def defaultMinSplits: Int = math.min(defaultParallelism, 2)

/**
* Default min number of partitions for Hadoop RDDs when not given by user
/**
* Default min number of partitions for Hadoop RDDs when not given by user
* Notice that we use math.min so the "defaultMinPartitions" cannot be higher than 2.
* The reasons for this are discussed in https://github.com/mesos/spark/pull/718
*/
Expand Down Expand Up @@ -1808,8 +1804,6 @@ object SparkContext extends Logging {

private[spark] val SPARK_JOB_INTERRUPT_ON_CANCEL = "spark.job.interruptOnCancel"

private[spark] val SPARK_UNKNOWN_USER = "<unknown>"

private[spark] val DRIVER_IDENTIFIER = "<driver>"

// The following deprecated objects have already been copied to `object AccumulatorParam` to
Expand Down
19 changes: 7 additions & 12 deletions core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala
Original file line number Diff line number Diff line change
Expand Up @@ -52,18 +52,13 @@ class SparkHadoopUtil extends Logging {
* do a FileSystem.closeAllForUGI in order to avoid leaking Filesystems
*/
def runAsSparkUser(func: () => Unit) {
val user = Option(System.getenv("SPARK_USER")).getOrElse(SparkContext.SPARK_UNKNOWN_USER)
if (user != SparkContext.SPARK_UNKNOWN_USER) {
logDebug("running as user: " + user)
val ugi = UserGroupInformation.createRemoteUser(user)
transferCredentials(UserGroupInformation.getCurrentUser(), ugi)
ugi.doAs(new PrivilegedExceptionAction[Unit] {
def run: Unit = func()
})
} else {
logDebug("running as SPARK_UNKNOWN_USER")
func()
}
val user = Utils.getCurrentUserName()
logDebug("running as user: " + user)
val ugi = UserGroupInformation.createRemoteUser(user)
transferCredentials(UserGroupInformation.getCurrentUser(), ugi)
ugi.doAs(new PrivilegedExceptionAction[Unit] {
def run: Unit = func()
})
}

def transferCredentials(source: UserGroupInformation, dest: UserGroupInformation) {
Expand Down
20 changes: 15 additions & 5 deletions core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,12 @@ package org.apache.spark.deploy
import java.io.{File, PrintStream}
import java.lang.reflect.{Modifier, InvocationTargetException}
import java.net.URL
import java.security.PrivilegedExceptionAction

import scala.collection.mutable.{ArrayBuffer, HashMap, Map}

import org.apache.hadoop.fs.Path

import org.apache.hadoop.security.UserGroupInformation
import org.apache.ivy.Ivy
import org.apache.ivy.core.LogOptions
import org.apache.ivy.core.module.descriptor.{DefaultExcludeRule, DefaultDependencyDescriptor, DefaultModuleDescriptor}
Expand Down Expand Up @@ -85,7 +86,7 @@ object SparkSubmit {
printStream.println(appArgs)
}
val (childArgs, classpath, sysProps, mainClass) = createLaunchEnv(appArgs)
launch(childArgs, classpath, sysProps, mainClass, appArgs.verbose)
launch(childArgs, classpath, sysProps, mainClass, appArgs)
}

/**
Expand Down Expand Up @@ -380,8 +381,8 @@ object SparkSubmit {
childClasspath: ArrayBuffer[String],
sysProps: Map[String, String],
childMainClass: String,
verbose: Boolean = false) {
if (verbose) {
appArgs: SparkSubmitArguments) {
if (appArgs.verbose) {
printStream.println(s"Main class:\n$childMainClass")
printStream.println(s"Arguments:\n${childArgs.mkString("\n")}")
printStream.println(s"System properties:\n${sysProps.mkString("\n")}")
Expand Down Expand Up @@ -424,8 +425,17 @@ object SparkSubmit {
if (!Modifier.isStatic(mainMethod.getModifiers)) {
throw new IllegalStateException("The main method in the given main class must be static")
}

try {
mainMethod.invoke(null, childArgs.toArray)
if (appArgs.proxyUser != null) {
val proxyUser = UserGroupInformation.createProxyUser(appArgs.proxyUser,
UserGroupInformation.getCurrentUser())
proxyUser.doAs(new PrivilegedExceptionAction[Unit]() {
override def run(): Unit = mainMethod.invoke(null, childArgs.toArray)
})
} else {
mainMethod.invoke(null, childArgs.toArray)
}
} catch {
case e: InvocationTargetException => e.getCause match {
case cause: Throwable => throw cause
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ private[spark] class SparkSubmitArguments(args: Seq[String], env: Map[String, St
var isPython: Boolean = false
var pyFiles: String = null
val sparkProperties: HashMap[String, String] = new HashMap[String, String]()
var proxyUser: String = null

/** Default properties present in the currently defined defaults file. */
lazy val defaultSparkProperties: HashMap[String, String] = {
Expand Down Expand Up @@ -339,6 +340,10 @@ private[spark] class SparkSubmitArguments(args: Seq[String], env: Map[String, St
}
parse(tail)

case ("--proxy-user") :: value :: tail =>
proxyUser = value
parse(tail)

case ("--help" | "-h") :: tail =>
printUsageAndExit(0)

Expand Down Expand Up @@ -407,6 +412,8 @@ private[spark] class SparkSubmitArguments(args: Seq[String], env: Map[String, St
|
| --executor-memory MEM Memory per executor (e.g. 1000M, 2G) (Default: 1G).
|
| --proxy-user User to impersonate when submitting the application.
|
| --help, -h Show this help message and exit
| --verbose, -v Print additional debug output
|
Expand Down
11 changes: 11 additions & 0 deletions core/src/main/scala/org/apache/spark/util/Utils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ import com.google.common.util.concurrent.ThreadFactoryBuilder
import org.apache.commons.lang3.SystemUtils
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileSystem, FileUtil, Path}
import org.apache.hadoop.security.UserGroupInformation
import org.apache.log4j.PropertyConfigurator
import org.eclipse.jetty.util.MultiException
import org.json4s._
Expand Down Expand Up @@ -1986,6 +1987,16 @@ private[spark] object Utils extends Logging {
throw new SparkException("Invalid master URL: " + sparkUrl, e)
}
}

/**
* Returns the current user name. This is the currently logged in user, unless that's been
* overridden by the `SPARK_USER` environment variable.
*/
def getCurrentUserName(): String = {
Option(System.getenv("SPARK_USER"))
.getOrElse(UserGroupInformation.getCurrentUser().getUserName())
}

}

/**
Expand Down

0 comments on commit 0540d38

Please sign in to comment.