From f9ca47da811c2f198d4ace6c54649d36518d8c98 Mon Sep 17 00:00:00 2001 From: Kimoon Kim Date: Fri, 1 Sep 2017 13:15:00 -0700 Subject: [PATCH] Fix executor env to include simple authn --- .../spark/deploy/kubernetes/constants.scala | 1 - .../KubernetesClusterSchedulerBackend.scala | 28 ++++++------------- .../docker/SparkDockerImageBuilder.scala | 1 + 3 files changed, 10 insertions(+), 20 deletions(-) diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/constants.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/constants.scala index af44c8cb7c697..dfb4e0838113f 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/constants.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/constants.scala @@ -60,7 +60,6 @@ package object constants { private[spark] val ENV_DRIVER_URL = "SPARK_DRIVER_URL" private[spark] val ENV_EXECUTOR_CORES = "SPARK_EXECUTOR_CORES" private[spark] val ENV_EXECUTOR_MEMORY = "SPARK_EXECUTOR_MEMORY" - private[spark] val ENV_EXECUTOR_JAVA_OPTS = "SPARK_EXECUTOR_JAVA_OPTS" private[spark] val ENV_APPLICATION_ID = "SPARK_APPLICATION_ID" private[spark] val ENV_EXECUTOR_ID = "SPARK_EXECUTOR_ID" private[spark] val ENV_EXECUTOR_POD_IP = "SPARK_EXECUTOR_POD_IP" diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/KubernetesClusterSchedulerBackend.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/KubernetesClusterSchedulerBackend.scala index 3d933577b5495..35d0f8c8dc8a6 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/KubernetesClusterSchedulerBackend.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/KubernetesClusterSchedulerBackend.scala @@ -72,14 +72,12 @@ private[spark] class KubernetesClusterSchedulerBackend( private val executorsToRemove = Collections.newSetFromMap[String]( new ConcurrentHashMap[String, java.lang.Boolean]()).asScala - private val executorExtraJavaOpts = conf.get( - org.apache.spark.internal.config.EXECUTOR_JAVA_OPTIONS) private val executorExtraClasspath = conf.get( org.apache.spark.internal.config.EXECUTOR_CLASS_PATH) private val executorJarsDownloadDir = conf.get(INIT_CONTAINER_JARS_DOWNLOAD_LOCATION) private val isKerberosEnabled = conf.get(KUBERNETES_KERBEROS_SUPPORT) private val maybeSimpleAuthentication = - if (isKerberosEnabled) s" -D$HADOOP_SECURITY_AUTHENTICATION=simple" else "" + if (isKerberosEnabled) Some(s"-D$HADOOP_SECURITY_AUTHENTICATION=simple") else None private val executorLabels = ConfigurationUtils.combinePrefixedKeyValuePairsWithDeprecatedConf( conf, KUBERNETES_EXECUTOR_LABEL_PREFIX, @@ -455,12 +453,6 @@ private[spark] class KubernetesClusterSchedulerBackend( val executorCpuQuantity = new QuantityBuilder(false) .withAmount(executorCores.toString) .build() - val executorJavaOpts = executorExtraJavaOpts.getOrElse("") + maybeSimpleAuthentication - val executorJavaOptsEnv = if (executorJavaOpts.nonEmpty) { - Some(new EnvVarBuilder() - .withName(ENV_EXECUTOR_JAVA_OPTS) - .withValue(executorJavaOpts) - .build()) } else None val executorExtraClasspathEnv = executorExtraClasspath.map { cp => new EnvVarBuilder() .withName(ENV_EXECUTOR_EXTRA_CLASSPATH) @@ -468,14 +460,14 @@ private[spark] class KubernetesClusterSchedulerBackend( .build() } val executorExtraJavaOptionsEnv = conf - .get(org.apache.spark.internal.config.EXECUTOR_JAVA_OPTIONS) - .map { opts => - val delimitedOpts = Utils.splitCommandString(opts) - delimitedOpts.zipWithIndex.map { - case (opt, index) => - new EnvVarBuilder().withName(s"$ENV_JAVA_OPT_PREFIX$index").withValue(opt).build() - } - }.getOrElse(Seq.empty[EnvVar]) + .get(org.apache.spark.internal.config.EXECUTOR_JAVA_OPTIONS) + .map { opts => + val delimitedOpts = Utils.splitCommandString(opts) ++ maybeSimpleAuthentication + delimitedOpts.zipWithIndex.map { + case (opt, index) => + new EnvVarBuilder().withName(s"$ENV_JAVA_OPT_PREFIX$index").withValue(opt).build() + } + }.getOrElse(Seq.empty[EnvVar]) val executorEnv = (Seq( (ENV_EXECUTOR_PORT, executorPort.toString), (ENV_DRIVER_URL, driverUrl), @@ -516,8 +508,6 @@ private[spark] class KubernetesClusterSchedulerBackend( .addToLimits("memory", executorMemoryLimitQuantity) .addToRequests("cpu", executorCpuQuantity) .endResources() - .addToEnv(executorExtraClasspathEnv.toSeq: _*) - .addToEnv(executorJavaOptsEnv.toSeq: _*) .addAllToEnv(executorEnv.asJava) .withPorts(requiredPorts.asJava) .build() diff --git a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/kubernetes/integrationtest/docker/SparkDockerImageBuilder.scala b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/kubernetes/integrationtest/docker/SparkDockerImageBuilder.scala index 8b7aad95d8ca7..a210aab4b78fe 100644 --- a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/kubernetes/integrationtest/docker/SparkDockerImageBuilder.scala +++ b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/kubernetes/integrationtest/docker/SparkDockerImageBuilder.scala @@ -104,5 +104,6 @@ private[spark] class SparkDockerImageBuilder name, dockerFile, new LoggingBuildHandler()) + logInfo(s"Built docker image for $name") } }