From d3cf58f9826e5554998bbd6adcee9ca6a2df8244 Mon Sep 17 00:00:00 2001 From: Ilan Filonenko Date: Fri, 16 Jun 2017 13:06:42 -0700 Subject: [PATCH 01/18] Adding PySpark Submit functionality. Launching Python from JVM --- README.md | 1 + .../org/apache/spark/deploy/SparkSubmit.scala | 15 +++- .../spark/deploy/kubernetes/constants.scala | 2 + .../deploy/kubernetes/submit/Client.scala | 50 ++++++++++--- ...riverInitContainerComponentsProvider.scala | 5 ++ .../DriverPodKubernetesFileMounter.scala | 74 +++++++++++++++++++ .../kubernetes/submit/ClientV2Suite.scala | 6 ++ .../src/main/docker/driver-py/Dockerfile | 48 ++++++++++++ .../src/main/docker/executor-py/Dockerfile | 46 ++++++++++++ .../src/main/docker/init-container/Dockerfile | 2 +- .../docker/resource-staging-server/Dockerfile | 3 +- .../main/docker/shuffle-service/Dockerfile | 2 +- .../kubernetes/integration-tests/pom.xml | 20 +++++ .../integration-tests/src/test/python/pi.py | 46 ++++++++++++ .../integrationtest/KubernetesSuite.scala | 37 +++++++++- .../docker/SparkDockerImageBuilder.scala | 4 + 16 files changed, 344 insertions(+), 17 deletions(-) create mode 100644 resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/submit/DriverPodKubernetesFileMounter.scala create mode 100644 resource-managers/kubernetes/docker-minimal-bundle/src/main/docker/driver-py/Dockerfile create mode 100644 resource-managers/kubernetes/docker-minimal-bundle/src/main/docker/executor-py/Dockerfile create mode 100755 resource-managers/kubernetes/integration-tests/src/test/python/pi.py diff --git a/README.md b/README.md index cf6b4fa80242b..cb747225a11d4 100644 --- a/README.md +++ b/README.md @@ -24,6 +24,7 @@ We've been asked by an Apache Spark Committer to work outside of the Apache infr This is a collaborative effort by several folks from different companies who are interested in seeing this feature be successful. Companies active in this project include (alphabetically): +- Bloomberg - Google - Haiwen - Hyperpilot diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala index 59ccf3af24ce7..56122b50adf29 100644 --- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala +++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala @@ -335,8 +335,8 @@ object SparkSubmit { (clusterManager, deployMode) match { case (KUBERNETES, CLIENT) => printErrorAndExit("Client mode is currently not supported for Kubernetes.") - case (KUBERNETES, CLUSTER) if args.isPython || args.isR => - printErrorAndExit("Kubernetes does not currently support python or R applications.") + case (KUBERNETES, CLUSTER) if args.isR => + printErrorAndExit("Kubernetes does not currently support R applications.") case (STANDALONE, CLUSTER) if args.isPython => printErrorAndExit("Cluster deploy mode is currently not supported for python " + "applications on standalone clusters.") @@ -620,8 +620,15 @@ object SparkSubmit { if (isKubernetesCluster) { childMainClass = "org.apache.spark.deploy.kubernetes.submit.Client" - childArgs += args.primaryResource - childArgs += args.mainClass + if (args.isPython) { + childArgs += args.primaryResource + childArgs += "org.apache.spark.deploy.PythonRunner" + childArgs += args.pyFiles + } + else { + childArgs += args.primaryResource + childArgs += args.mainClass + } childArgs ++= args.childArgs } diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/constants.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/constants.scala index f2f1136e54fe4..92f051b2ac298 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/constants.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/constants.scala @@ -67,6 +67,8 @@ package object constants { private[spark] val ENV_DRIVER_ARGS = "SPARK_DRIVER_ARGS" private[spark] val ENV_DRIVER_JAVA_OPTS = "SPARK_DRIVER_JAVA_OPTS" private[spark] val ENV_MOUNTED_FILES_DIR = "SPARK_MOUNTED_FILES_DIR" + private[spark] val ENV_PYSPARK_FILES = "PYSPARK_FILES" + private[spark] val ENV_PYSPARK_PRIMARY = "PYSPARK_PRIMARY" // Bootstrapping dependencies with the init-container private[spark] val INIT_CONTAINER_ANNOTATION = "pod.beta.kubernetes.io/init-containers" diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/submit/Client.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/submit/Client.scala index a9699d8c34b4e..78b6a40efdcc9 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/submit/Client.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/submit/Client.scala @@ -47,11 +47,14 @@ private[spark] class Client( appName: String, kubernetesResourceNamePrefix: String, kubernetesAppId: String, + mainAppResource: String, + isPython: Boolean, mainClass: String, sparkConf: SparkConf, appArgs: Array[String], sparkJars: Seq[String], sparkFiles: Seq[String], + pySparkFiles: List[String], waitForAppCompletion: Boolean, kubernetesClient: KubernetesClient, initContainerComponentsProvider: DriverInitContainerComponentsProvider, @@ -83,7 +86,14 @@ private[spark] class Client( def run(): Unit = { validateNoDuplicateFileNames(sparkJars) validateNoDuplicateFileNames(sparkFiles) - + if (isPython) {validateNoDuplicateFileNames(pySparkFiles)} + val arguments = if (isPython) pySparkFiles match { + case Nil => appArgs + case a::b => a match { + case _ if a==mainAppResource && b==Nil => appArgs + case _ => appArgs.drop(1) + } + } else appArgs val driverCustomLabels = ConfigurationUtils.combinePrefixedKeyValuePairsWithDeprecatedConf( sparkConf, KUBERNETES_DRIVER_LABEL_PREFIX, @@ -135,7 +145,7 @@ private[spark] class Client( .endEnv() .addNewEnv() .withName(ENV_DRIVER_ARGS) - .withValue(appArgs.mkString(" ")) + .withValue(arguments.mkString(" ")) .endEnv() .withNewResources() .addToRequests("cpu", driverCpuQuantity) @@ -204,7 +214,7 @@ private[spark] class Client( val resolvedDriverJavaOpts = resolvedSparkConf.getAll.map { case (confKey, confValue) => s"-D$confKey=$confValue" }.mkString(" ") + driverJavaOptions.map(" " + _).getOrElse("") - val resolvedDriverPod = podWithInitContainerAndMountedCreds.editSpec() + val resolvedDriverPodBuilder = podWithInitContainerAndMountedCreds.editSpec() .editMatchingContainer(new ContainerNameEqualityPredicate(driverContainer.getName)) .addNewEnv() .withName(ENV_MOUNTED_CLASSPATH) @@ -216,7 +226,15 @@ private[spark] class Client( .endEnv() .endContainer() .endSpec() - .build() + val resolvedDriverPod = if (!isPython) { + resolvedDriverPodBuilder.build() + } else { + initContainerComponentsProvider + .provideDriverPodFileMounter() + .addPySparkFiles( + mainAppResource, pySparkFiles, driverContainer.getName, resolvedDriverPodBuilder) + .build() + } Utils.tryWithResource( kubernetesClient .pods() @@ -266,7 +284,7 @@ private[spark] class Client( } } -private[spark] object Client { +private[spark] object Client{ def main(args: Array[String]): Unit = { val sparkConf = new SparkConf(true) val mainAppResource = args(0) @@ -274,22 +292,28 @@ private[spark] object Client { val appArgs = args.drop(2) run(sparkConf, mainAppResource, mainClass, appArgs) } - def run( sparkConf: SparkConf, mainAppResource: String, mainClass: String, appArgs: Array[String]): Unit = { - val sparkJars = sparkConf.getOption("spark.jars") + val isPython = mainAppResource.endsWith(".py") + val sparkJars = if (isPython) Array.empty[String] else { + sparkConf.getOption("spark.jars") .map(_.split(",")) .getOrElse(Array.empty[String]) ++ Option(mainAppResource) .filterNot(_ == SparkLauncher.NO_RESOURCE) - .toSeq + .toSeq } val launchTime = System.currentTimeMillis val sparkFiles = sparkConf.getOption("spark.files") .map(_.split(",")) .getOrElse(Array.empty[String]) + val pySparkFiles: Array[String] = if (isPython) { + appArgs(0) match { + case null => Array(mainAppResource) + case _ => mainAppResource +: appArgs(0).split(",") + }} else {Array.empty[String]} val appName = sparkConf.getOption("spark.app.name").getOrElse("spark") // The resource name prefix is derived from the application name, making it easy to connect the // names of the Kubernetes resources from e.g. Kubectl or the Kubernetes dashboard to the @@ -302,12 +326,17 @@ private[spark] object Client { val namespace = sparkConf.get(KUBERNETES_NAMESPACE) val master = resolveK8sMaster(sparkConf.get("spark.master")) val sslOptionsProvider = new ResourceStagingServerSslOptionsProviderImpl(sparkConf) + // No reason to distribute python files that are locally baked into Docker image + def filterByFile(pFiles: Array[String]) : Array[String] = { + val LocalPattern = "(local://)(.*)" + pFiles.filter(fi => !(fi matches LocalPattern)) + } val initContainerComponentsProvider = new DriverInitContainerComponentsProviderImpl( sparkConf, kubernetesResourceNamePrefix, namespace, sparkJars, - sparkFiles, + sparkFiles ++ filterByFile(pySparkFiles), sslOptionsProvider.getSslOptions) Utils.tryWithResource(SparkKubernetesClientFactory.createKubernetesClient( master, @@ -328,11 +357,14 @@ private[spark] object Client { appName, kubernetesResourceNamePrefix, kubernetesAppId, + mainAppResource, + isPython, mainClass, sparkConf, appArgs, sparkJars, sparkFiles, + pySparkFiles.toList, waitForAppCompletion, kubernetesClient, initContainerComponentsProvider, diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/submit/DriverInitContainerComponentsProvider.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/submit/DriverInitContainerComponentsProvider.scala index cfc61e193dcff..e9da1af35e8ca 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/submit/DriverInitContainerComponentsProvider.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/submit/DriverInitContainerComponentsProvider.scala @@ -41,6 +41,8 @@ private[spark] trait DriverInitContainerComponentsProvider { maybeSubmittedResourceSecrets: Option[SubmittedResourceSecrets]) : Option[SubmittedDependencySecretBuilder] def provideInitContainerBootstrap(): SparkPodInitContainerBootstrap + def provideDriverPodFileMounter(): DriverPodKubernetesFileMounter + } private[spark] class DriverInitContainerComponentsProviderImpl( @@ -202,4 +204,7 @@ private[spark] class DriverInitContainerComponentsProviderImpl( configMapKey, resourceStagingServerSecretPlugin) } + override def provideDriverPodFileMounter(): DriverPodKubernetesFileMounter = { + new DriverPodKubernetesFileMounterImpl(filesDownloadPath) + } } diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/submit/DriverPodKubernetesFileMounter.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/submit/DriverPodKubernetesFileMounter.scala new file mode 100644 index 0000000000000..c28d769e5205e --- /dev/null +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/submit/DriverPodKubernetesFileMounter.scala @@ -0,0 +1,74 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.deploy.kubernetes.submit + +import io.fabric8.kubernetes.api.model.{Container, PodBuilder} + +import org.apache.spark.deploy.kubernetes.constants._ + + /** + * Trait that is responsible for providing full file-paths dynamically after + * the filesDownloadPath has been defined. The file-names are then stored in the + * environmental variables in the driver-pod. + */ +private[spark] trait DriverPodKubernetesFileMounter { + def addPySparkFiles(mainAppResource: String, pythonFiles: List[String], + mainContainerName: String, originalPodSpec: PodBuilder) : PodBuilder +} + +private[spark] class DriverPodKubernetesFileMounterImpl(filesDownloadPath: String) + extends DriverPodKubernetesFileMounter { + val LocalPattern = "(local://)(.*)".r + val FilePattern = "(file:/)(.*)".r + def getName(file: String, separatorChar: Char) : String = { + val index: Int = file.lastIndexOf(separatorChar) + file.substring(index + 1) + } + def fileLoc(file: String) : String = file match { + case "" => "" + case LocalPattern(_, file_name) => file_name + case FilePattern(_, file_name) => filesDownloadPath + "/" + getName(file_name, '/') + case _ => filesDownloadPath + "/" + getName(file, '/') + } + def pythonFileLocations(pFiles: List[String], mainAppResource: String) : String = { + def recFileLoc(file: List[String]): List[String] = file match { + case Nil => List.empty[String] + case a::b => a match { + case _ if a==mainAppResource => recFileLoc(b) + case _ => fileLoc(a) +: recFileLoc(b) + } + } + recFileLoc(pFiles).mkString(",") + } + override def addPySparkFiles(mainAppResource: String, pythonFiles: List[String], + mainContainerName: String, + originalPodSpec: PodBuilder): PodBuilder = { + originalPodSpec + .editSpec() + .editMatchingContainer(new ContainerNameEqualityPredicate(mainContainerName)) + .addNewEnv() + .withName(ENV_PYSPARK_PRIMARY) + .withValue(fileLoc(mainAppResource)) + .endEnv() + .addNewEnv() + .withName(ENV_PYSPARK_FILES) + .withValue(pythonFileLocations(pythonFiles, mainAppResource)) + .endEnv() + .endContainer() + .endSpec() + } +} diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/kubernetes/submit/ClientV2Suite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/kubernetes/submit/ClientV2Suite.scala index 3945bef5bcfb8..a28b1f3b4139b 100644 --- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/kubernetes/submit/ClientV2Suite.scala +++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/kubernetes/submit/ClientV2Suite.scala @@ -301,11 +301,14 @@ class ClientV2Suite extends SparkFunSuite with BeforeAndAfter { APP_NAME, APP_RESOURCE_PREFIX, APP_ID, + null, + false, MAIN_CLASS, SPARK_CONF, APP_ARGS, SPARK_JARS, SPARK_FILES, + null, true, kubernetesClient, initContainerComponentsProvider, @@ -386,11 +389,14 @@ class ClientV2Suite extends SparkFunSuite with BeforeAndAfter { APP_NAME, APP_RESOURCE_PREFIX, APP_ID, + null, + false, MAIN_CLASS, SPARK_CONF, APP_ARGS, SPARK_JARS, SPARK_FILES, + null, false, kubernetesClient, initContainerComponentsProvider, diff --git a/resource-managers/kubernetes/docker-minimal-bundle/src/main/docker/driver-py/Dockerfile b/resource-managers/kubernetes/docker-minimal-bundle/src/main/docker/driver-py/Dockerfile new file mode 100644 index 0000000000000..6dcc7511c0dd9 --- /dev/null +++ b/resource-managers/kubernetes/docker-minimal-bundle/src/main/docker/driver-py/Dockerfile @@ -0,0 +1,48 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +FROM spark-base + +# If this docker file is being used in the context of building your images from a Spark distribution, the docker build +# command should be invoked from the top level directory of the Spark distribution. E.g.: +# docker build -t spark-driver-py:latest -f dockerfiles/driver-py/Dockerfile . + +ADD examples /opt/spark/examples +ADD python /opt/spark/python + +RUN apk add --no-cache python && \ + python -m ensurepip && \ + rm -r /usr/lib/python*/ensurepip && \ + pip install --upgrade pip setuptools && \ + rm -r /root/.cache +# UNCOMMENT THE FOLLOWING TO START PIP INSTALLING PYTHON PACKAGES +# RUN apk add --update alpine-sdk python-dev +# RUN pip install numpy + +ENV PYTHON_VERSION 2.7.13 +ENV PYSPARK_PYTHON python +ENV PYSPARK_DRIVER_PYTHON python +ENV PYTHONPATH ${SPARK_HOME}/python/:${SPARK_HOME}/python/lib/py4j-0.10.4-src.zip:${PYTHONPATH} + +CMD SPARK_CLASSPATH="${SPARK_HOME}/jars/*" && \ + if ! [ -z ${SPARK_MOUNTED_CLASSPATH+x} ]; then SPARK_CLASSPATH="$SPARK_MOUNTED_CLASSPATH:$SPARK_CLASSPATH"; fi && \ + if ! [ -z ${SPARK_SUBMIT_EXTRA_CLASSPATH+x} ]; then SPARK_CLASSPATH="$SPARK_SUBMIT_EXTRA_CLASSPATH:$SPARK_CLASSPATH"; fi && \ + if ! [ -z ${SPARK_EXTRA_CLASSPATH+x} ]; then SPARK_CLASSPATH="$SPARK_EXTRA_CLASSPATH:$SPARK_CLASSPATH"; fi && \ + if ! [ -z ${SPARK_MOUNTED_FILES_DIR} ]; then cp -R "$SPARK_MOUNTED_FILES_DIR/." .; fi && \ + exec /sbin/tini -- ${JAVA_HOME}/bin/java $SPARK_DRIVER_JAVA_OPTS -cp $SPARK_CLASSPATH \ + -Xms$SPARK_DRIVER_MEMORY -Xmx$SPARK_DRIVER_MEMORY \ + $SPARK_DRIVER_CLASS $PYSPARK_PRIMARY $PYSPARK_FILES $SPARK_DRIVER_ARGS diff --git a/resource-managers/kubernetes/docker-minimal-bundle/src/main/docker/executor-py/Dockerfile b/resource-managers/kubernetes/docker-minimal-bundle/src/main/docker/executor-py/Dockerfile new file mode 100644 index 0000000000000..7a65a4f879376 --- /dev/null +++ b/resource-managers/kubernetes/docker-minimal-bundle/src/main/docker/executor-py/Dockerfile @@ -0,0 +1,46 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +FROM spark-base + +# If this docker file is being used in the context of building your images from a Spark distribution, the docker build +# command should be invoked from the top level directory of the Spark distribution. E.g.: +# docker build -t spark-executor-py:latest -f dockerfiles/executor-py/Dockerfile . + +ADD examples /opt/spark/examples +ADD python /opt/spark/python + +RUN apk add --no-cache python && \ + python -m ensurepip && \ + rm -r /usr/lib/python*/ensurepip && \ + pip install --upgrade pip setuptools && \ + rm -r /root/.cache +# UNCOMMENT THE FOLLOWING TO START PIP INSTALLING PYTHON PACKAGES +# RUN apk add --update alpine-sdk python-dev +# RUN pip install numpy + +ENV PYTHON_VERSION 2.7.13 +ENV PYSPARK_PYTHON python +ENV PYSPARK_DRIVER_PYTHON python +ENV PYTHONPATH ${SPARK_HOME}/python/:${SPARK_HOME}/python/lib/py4j-0.10.4-src.zip:${PYTHONPATH} + +# TODO support spark.executor.extraClassPath +CMD SPARK_CLASSPATH="${SPARK_HOME}/jars/*" && \ + if ! [ -z ${SPARK_MOUNTED_CLASSPATH}+x} ]; then SPARK_CLASSPATH="$SPARK_MOUNTED_CLASSPATH:$SPARK_CLASSPATH"; fi && \ + if ! [ -z ${SPARK_EXECUTOR_EXTRA_CLASSPATH+x} ]; then SPARK_CLASSPATH="$SPARK_EXECUTOR_EXTRA_CLASSPATH:$SPARK_CLASSPATH"; fi && \ + if ! [ -z ${SPARK_MOUNTED_FILES_DIR} ]; then cp -R "$SPARK_MOUNTED_FILES_DIR/." .; fi && \ + exec /sbin/tini -- ${JAVA_HOME}/bin/java -Dspark.executor.port=$SPARK_EXECUTOR_PORT -Xms$SPARK_EXECUTOR_MEMORY -Xmx$SPARK_EXECUTOR_MEMORY -cp $SPARK_CLASSPATH org.apache.spark.executor.CoarseGrainedExecutorBackend --driver-url $SPARK_DRIVER_URL --executor-id $SPARK_EXECUTOR_ID --cores $SPARK_EXECUTOR_CORES --app-id $SPARK_APPLICATION_ID --hostname $SPARK_EXECUTOR_POD_IP \ No newline at end of file diff --git a/resource-managers/kubernetes/docker-minimal-bundle/src/main/docker/init-container/Dockerfile b/resource-managers/kubernetes/docker-minimal-bundle/src/main/docker/init-container/Dockerfile index 6bff06da12840..4bafe25e2608f 100644 --- a/resource-managers/kubernetes/docker-minimal-bundle/src/main/docker/init-container/Dockerfile +++ b/resource-managers/kubernetes/docker-minimal-bundle/src/main/docker/init-container/Dockerfile @@ -19,6 +19,6 @@ FROM spark-base # If this docker file is being used in the context of building your images from a Spark distribution, the docker build # command should be invoked from the top level directory of the Spark distribution. E.g.: -# docker build -t spark-executor:latest -f dockerfiles/executor/Dockerfile . +# docker build -t spark-init:latest -f dockerfiles/init-container/Dockerfile . ENTRYPOINT [ "/sbin/tini", "--", "bin/spark-class", "org.apache.spark.deploy.rest.kubernetes.KubernetesSparkDependencyDownloadInitContainer" ] diff --git a/resource-managers/kubernetes/docker-minimal-bundle/src/main/docker/resource-staging-server/Dockerfile b/resource-managers/kubernetes/docker-minimal-bundle/src/main/docker/resource-staging-server/Dockerfile index c9a92fa1c5b62..9ca96be0f1a88 100644 --- a/resource-managers/kubernetes/docker-minimal-bundle/src/main/docker/resource-staging-server/Dockerfile +++ b/resource-managers/kubernetes/docker-minimal-bundle/src/main/docker/resource-staging-server/Dockerfile @@ -17,8 +17,9 @@ FROM spark-base + # If this docker file is being used in the context of building your images from a Spark distribution, the docker build # command should be invoked from the top level directory of the Spark distribution. E.g.: -# docker build -t spark-executor:latest -f dockerfiles/executor/Dockerfile . +# docker build -t spark-resource-staging-server:latest -f dockerfiles/resource-staging-server/Dockerfile . ENTRYPOINT [ "/sbin/tini", "--", "bin/spark-class", "org.apache.spark.deploy.rest.kubernetes.ResourceStagingServer" ] diff --git a/resource-managers/kubernetes/docker-minimal-bundle/src/main/docker/shuffle-service/Dockerfile b/resource-managers/kubernetes/docker-minimal-bundle/src/main/docker/shuffle-service/Dockerfile index 7f4e2aa51b67d..ccb2f1a03d88c 100644 --- a/resource-managers/kubernetes/docker-minimal-bundle/src/main/docker/shuffle-service/Dockerfile +++ b/resource-managers/kubernetes/docker-minimal-bundle/src/main/docker/shuffle-service/Dockerfile @@ -19,7 +19,7 @@ FROM spark-base # If this docker file is being used in the context of building your images from a Spark distribution, the docker build # command should be invoked from the top level directory of the Spark distribution. E.g.: -# docker build -t spark-shuffle:latest -f dockerfiles/shuffle/Dockerfile . +# docker build -t spark-shuffle:latest -f dockerfiles/shuffle-service/Dockerfile . COPY examples /opt/spark/examples diff --git a/resource-managers/kubernetes/integration-tests/pom.xml b/resource-managers/kubernetes/integration-tests/pom.xml index bbf4b02cdaaf9..a4f06bcf45808 100644 --- a/resource-managers/kubernetes/integration-tests/pom.xml +++ b/resource-managers/kubernetes/integration-tests/pom.xml @@ -220,6 +220,26 @@ + + copy-integration-python + pre-integration-test + + copy-resources + + + ${project.build.directory}/docker/python + + + ${project.parent.basedir}/python + true + + ${project.parent.basedir}/python/.egg + ${project.parent.basedir}/python/dist + + + + + diff --git a/resource-managers/kubernetes/integration-tests/src/test/python/pi.py b/resource-managers/kubernetes/integration-tests/src/test/python/pi.py new file mode 100755 index 0000000000000..e3f0c4aeef1b7 --- /dev/null +++ b/resource-managers/kubernetes/integration-tests/src/test/python/pi.py @@ -0,0 +1,46 @@ +from __future__ import print_function +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +import sys +from random import random +from operator import add + +from pyspark.sql import SparkSession + + +if __name__ == "__main__": + """ + Usage: pi [partitions] + """ + spark = SparkSession\ + .builder\ + .appName("PythonPi")\ + .getOrCreate() + + partitions = int(sys.argv[1]) if len(sys.argv) > 1 else 2 + n = 100000 * partitions + + def f(_): + x = random() * 2 - 1 + y = random() * 2 - 1 + return 1 if x ** 2 + y ** 2 < 1 else 0 + + count = spark.sparkContext.parallelize(range(1, n + 1), partitions).map(f).reduce(add) + print("Pi is roughly %f" % (4.0 * count / n)) + + spark.stop() diff --git a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/kubernetes/integrationtest/KubernetesSuite.scala b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/kubernetes/integrationtest/KubernetesSuite.scala index e377f285eb9a6..cb0fe7332f79f 100644 --- a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/kubernetes/integrationtest/KubernetesSuite.scala +++ b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/kubernetes/integrationtest/KubernetesSuite.scala @@ -72,6 +72,31 @@ private[spark] class KubernetesSuite extends SparkFunSuite with BeforeAndAfter { kubernetesTestComponents.deleteNamespace() } + /** + * These tests need to be run in an environment similair to the one provided + * by make_distribution. Further investigation is required + */ +// test("Run PySpark Job on file from SUBMITTER") { +// assume(testBackend.name == MINIKUBE_TEST_BACKEND) +// launchStagingServer(SSLOptions(), None) +// sparkConf.set(DRIVER_DOCKER_IMAGE, +// System.getProperty("spark.docker.test.driverImage", "spark-driver-py:latest")) +// sparkConf.set(EXECUTOR_DOCKER_IMAGE, +// System.getProperty("spark.docker.test.executorImage", "spark-executor-py:latest")) +// runPySparkPiAndVerifyCompletion( +// PYSPARK_PI_SUBMITTER_LOCAL_FILE_LOCATION) +// } +// test("Run PySpark Job on file from CONTAINER") { +// assume(testBackend.name == MINIKUBE_TEST_BACKEND) +// // UPDATE SO THAT IT BUILDS FROM LOCAL DOCKER IMAGE +// sparkConf.set(DRIVER_DOCKER_IMAGE, +// System.getProperty("spark.docker.test.driverImage", "spark-driver-py:latest")) +// sparkConf.set(EXECUTOR_DOCKER_IMAGE, +// System.getProperty("spark.docker.test.executorImage", "spark-executor-py:latest")) +// runPySparkPiAndVerifyCompletion( +// PYSPARK_PI_CONTAINER_LOCAL_FILE_LOCATION) +// } + test("Simple submission test with the resource staging server.") { assume(testBackend.name == MINIKUBE_TEST_BACKEND) @@ -223,6 +248,13 @@ private[spark] class KubernetesSuite extends SparkFunSuite with BeforeAndAfter { appResource, SPARK_PI_MAIN_CLASS, "Pi is roughly 3", Array.empty[String]) } + private def runPySparkPiAndVerifyCompletion( + appResource: String): Unit = { + runSparkApplicationAndVerifyCompletion( + appResource, PYSPARK_PI_MAIN_CLASS, "Pi is roughly 3", + Array(null, "5")) + } + private def runSparkApplicationAndVerifyCompletion( appResource: String, mainClass: String, @@ -305,11 +337,14 @@ private[spark] object KubernetesSuite { s"integration-tests-jars/${EXAMPLES_JAR_FILE.getName}" val CONTAINER_LOCAL_HELPER_JAR_PATH = s"local:///opt/spark/examples/" + s"integration-tests-jars/${HELPER_JAR_FILE.getName}" - val TIMEOUT = PatienceConfiguration.Timeout(Span(2, Minutes)) val INTERVAL = PatienceConfiguration.Interval(Span(2, Seconds)) val SPARK_PI_MAIN_CLASS = "org.apache.spark.deploy.kubernetes" + ".integrationtest.jobs.SparkPiWithInfiniteWait" + val PYSPARK_PI_MAIN_CLASS = "org.apache.spark.deploy.PythonRunner" + val PYSPARK_PI_CONTAINER_LOCAL_FILE_LOCATION = "local:///opt/spark/" + + "examples/src/main/python/pi.py" + val PYSPARK_PI_SUBMITTER_LOCAL_FILE_LOCATION = "src/test/python/pi.py" val FILE_EXISTENCE_MAIN_CLASS = "org.apache.spark.deploy.kubernetes" + ".integrationtest.jobs.FileExistenceTest" val GROUP_BY_MAIN_CLASS = "org.apache.spark.deploy.kubernetes" + diff --git a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/kubernetes/integrationtest/docker/SparkDockerImageBuilder.scala b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/kubernetes/integrationtest/docker/SparkDockerImageBuilder.scala index 4db19478f44bc..faa29b91ff202 100644 --- a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/kubernetes/integrationtest/docker/SparkDockerImageBuilder.scala +++ b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/kubernetes/integrationtest/docker/SparkDockerImageBuilder.scala @@ -30,7 +30,9 @@ private[spark] class SparkDockerImageBuilder(private val dockerEnv: Map[String, // Dockerfile paths must be relative to the build path. private val BASE_DOCKER_FILE = "dockerfiles/spark-base/Dockerfile" private val DRIVER_DOCKER_FILE = "dockerfiles/driver/Dockerfile" + private val DRIVERPY_DOCKER_FILE = "dockerfiles/driver-py/Dockerfile" private val EXECUTOR_DOCKER_FILE = "dockerfiles/executor/Dockerfile" + private val EXECUTORPY_DOCKER_FILE = "dockerfiles/executor-py/Dockerfile" private val SHUFFLE_SERVICE_DOCKER_FILE = "dockerfiles/shuffle-service/Dockerfile" private val INIT_CONTAINER_DOCKER_FILE = "dockerfiles/init-container/Dockerfile" private val STAGING_SERVER_DOCKER_FILE = "dockerfiles/resource-staging-server/Dockerfile" @@ -63,7 +65,9 @@ private[spark] class SparkDockerImageBuilder(private val dockerEnv: Map[String, Eventually.eventually(TIMEOUT, INTERVAL) { dockerClient.ping() } buildImage("spark-base", BASE_DOCKER_FILE) buildImage("spark-driver", DRIVER_DOCKER_FILE) + buildImage("spark-driver-py", DRIVERPY_DOCKER_FILE) buildImage("spark-executor", EXECUTOR_DOCKER_FILE) + buildImage("spark-executor-py", EXECUTORPY_DOCKER_FILE) buildImage("spark-shuffle", SHUFFLE_SERVICE_DOCKER_FILE) buildImage("spark-resource-staging-server", STAGING_SERVER_DOCKER_FILE) buildImage("spark-init", INIT_CONTAINER_DOCKER_FILE) From bafc13c55d110c8755fc028dbe989a488e4b6869 Mon Sep 17 00:00:00 2001 From: Ilan Filonenko Date: Fri, 16 Jun 2017 18:20:16 -0700 Subject: [PATCH 02/18] Addressing scala idioms related to PR351 --- .../scala/org/apache/spark/deploy/SparkSubmit.scala | 3 +-- .../apache/spark/deploy/kubernetes/submit/Client.scala | 10 +++++----- .../spark/deploy/kubernetes/submit/ClientV2Suite.scala | 8 ++++---- 3 files changed, 10 insertions(+), 11 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala index 56122b50adf29..9256a9ddd9960 100644 --- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala +++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala @@ -624,8 +624,7 @@ object SparkSubmit { childArgs += args.primaryResource childArgs += "org.apache.spark.deploy.PythonRunner" childArgs += args.pyFiles - } - else { + } else { childArgs += args.primaryResource childArgs += args.mainClass } diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/submit/Client.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/submit/Client.scala index 78b6a40efdcc9..63ef05455387d 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/submit/Client.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/submit/Client.scala @@ -284,7 +284,7 @@ private[spark] class Client( } } -private[spark] object Client{ +private[spark] object Client extends Logging{ def main(args: Array[String]): Unit = { val sparkConf = new SparkConf(true) val mainAppResource = args(0) @@ -310,10 +310,10 @@ private[spark] object Client{ .map(_.split(",")) .getOrElse(Array.empty[String]) val pySparkFiles: Array[String] = if (isPython) { - appArgs(0) match { - case null => Array(mainAppResource) - case _ => mainAppResource +: appArgs(0).split(",") - }} else {Array.empty[String]} + Option(appArgs(0)) match { + case None => Array(mainAppResource) + case Some(a) => mainAppResource +: a.split(",") } + } else { Array.empty[String] } val appName = sparkConf.getOption("spark.app.name").getOrElse("spark") // The resource name prefix is derived from the application name, making it easy to connect the // names of the Kubernetes resources from e.g. Kubectl or the Kubernetes dashboard to the diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/kubernetes/submit/ClientV2Suite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/kubernetes/submit/ClientV2Suite.scala index a28b1f3b4139b..644d2d911e269 100644 --- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/kubernetes/submit/ClientV2Suite.scala +++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/kubernetes/submit/ClientV2Suite.scala @@ -301,14 +301,14 @@ class ClientV2Suite extends SparkFunSuite with BeforeAndAfter { APP_NAME, APP_RESOURCE_PREFIX, APP_ID, - null, + "", false, MAIN_CLASS, SPARK_CONF, APP_ARGS, SPARK_JARS, SPARK_FILES, - null, + Nil, true, kubernetesClient, initContainerComponentsProvider, @@ -389,14 +389,14 @@ class ClientV2Suite extends SparkFunSuite with BeforeAndAfter { APP_NAME, APP_RESOURCE_PREFIX, APP_ID, - null, + "", false, MAIN_CLASS, SPARK_CONF, APP_ARGS, SPARK_JARS, SPARK_FILES, - null, + Nil, false, kubernetesClient, initContainerComponentsProvider, From 59d9f0af583eafa88c183ddcd6fba9c0918aabc9 Mon Sep 17 00:00:00 2001 From: Ilan Filonenko Date: Fri, 16 Jun 2017 18:25:17 -0700 Subject: [PATCH 03/18] Removing extends Logging which was necessary for LogInfo --- .../org/apache/spark/deploy/kubernetes/submit/Client.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/submit/Client.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/submit/Client.scala index 63ef05455387d..487a971c86f34 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/submit/Client.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/submit/Client.scala @@ -284,7 +284,7 @@ private[spark] class Client( } } -private[spark] object Client extends Logging{ +private[spark] object Client { def main(args: Array[String]): Unit = { val sparkConf = new SparkConf(true) val mainAppResource = args(0) From 4daf6347e9ff84bece9996c264d87102bbee318d Mon Sep 17 00:00:00 2001 From: Ilan Filonenko Date: Mon, 19 Jun 2017 18:53:17 -0700 Subject: [PATCH 04/18] Refactored code to leverage the ContainerLocalizedFileResolver --- .../deploy/kubernetes/submit/Client.scala | 34 +++++++----- .../ContainerLocalizedFilesResolver.scala | 45 +++++++++++---- ...riverInitContainerComponentsProvider.scala | 16 ++++-- .../DriverPodKubernetesFileMounter.scala | 55 ++++++------------- .../kubernetes/submit/ClientV2Suite.scala | 4 +- ...ContainerLocalizedFilesResolverSuite.scala | 22 ++++++++ .../integrationtest/KubernetesSuite.scala | 4 +- 7 files changed, 108 insertions(+), 72 deletions(-) diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/submit/Client.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/submit/Client.scala index 487a971c86f34..7b536e9fca73f 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/submit/Client.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/submit/Client.scala @@ -183,10 +183,12 @@ private[spark] class Client( .bootstrapInitContainerAndVolumes(driverContainer.getName, basePod) val containerLocalizedFilesResolver = initContainerComponentsProvider - .provideContainerLocalizedFilesResolver() + .provideContainerLocalizedFilesResolver(mainAppResource) val resolvedSparkJars = containerLocalizedFilesResolver.resolveSubmittedSparkJars() val resolvedSparkFiles = containerLocalizedFilesResolver.resolveSubmittedSparkFiles() - + val resolvedPySparkFiles = containerLocalizedFilesResolver.resolveSubmittedPySparkFiles() + val resolvedPrimaryPySparkResource = if (!isPython) "" + else { containerLocalizedFilesResolver.resolvePrimaryResourceFile() } val executorInitContainerConfiguration = initContainerComponentsProvider .provideExecutorInitContainerConfiguration() val sparkConfWithExecutorInit = executorInitContainerConfiguration @@ -232,7 +234,10 @@ private[spark] class Client( initContainerComponentsProvider .provideDriverPodFileMounter() .addPySparkFiles( - mainAppResource, pySparkFiles, driverContainer.getName, resolvedDriverPodBuilder) + resolvedPrimaryPySparkResource, + resolvedPySparkFiles.mkString(","), + driverContainer.getName, + resolvedDriverPodBuilder) .build() } Utils.tryWithResource( @@ -298,13 +303,16 @@ private[spark] object Client { mainClass: String, appArgs: Array[String]): Unit = { val isPython = mainAppResource.endsWith(".py") - val sparkJars = if (isPython) Array.empty[String] else { - sparkConf.getOption("spark.jars") - .map(_.split(",")) - .getOrElse(Array.empty[String]) ++ - Option(mainAppResource) + // Since you might need jars for SQL UDFs in PySpark + def sparkJarFilter() : Seq[String] = isPython match { + case true => Seq.empty[String] + case false => Option(mainAppResource) .filterNot(_ == SparkLauncher.NO_RESOURCE) - .toSeq } + .toSeq + } + val sparkJars = sparkConf.getOption("spark.jars") + .map(_.split(",")) + .getOrElse(Array.empty[String]) ++ sparkJarFilter() val launchTime = System.currentTimeMillis val sparkFiles = sparkConf.getOption("spark.files") .map(_.split(",")) @@ -326,17 +334,13 @@ private[spark] object Client { val namespace = sparkConf.get(KUBERNETES_NAMESPACE) val master = resolveK8sMaster(sparkConf.get("spark.master")) val sslOptionsProvider = new ResourceStagingServerSslOptionsProviderImpl(sparkConf) - // No reason to distribute python files that are locally baked into Docker image - def filterByFile(pFiles: Array[String]) : Array[String] = { - val LocalPattern = "(local://)(.*)" - pFiles.filter(fi => !(fi matches LocalPattern)) - } val initContainerComponentsProvider = new DriverInitContainerComponentsProviderImpl( sparkConf, kubernetesResourceNamePrefix, namespace, sparkJars, - sparkFiles ++ filterByFile(pySparkFiles), + sparkFiles, + pySparkFiles, sslOptionsProvider.getSslOptions) Utils.tryWithResource(SparkKubernetesClientFactory.createKubernetesClient( master, diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/submit/ContainerLocalizedFilesResolver.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/submit/ContainerLocalizedFilesResolver.scala index c635484c4c124..999fe6d7e38a0 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/submit/ContainerLocalizedFilesResolver.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/submit/ContainerLocalizedFilesResolver.scala @@ -24,13 +24,19 @@ private[spark] trait ContainerLocalizedFilesResolver { def resolveSubmittedAndRemoteSparkJars(): Seq[String] def resolveSubmittedSparkJars(): Seq[String] def resolveSubmittedSparkFiles(): Seq[String] + def resolveSubmittedPySparkFiles(): Seq[String] + def resolvePrimaryResourceFile(): String } private[spark] class ContainerLocalizedFilesResolverImpl( sparkJars: Seq[String], sparkFiles: Seq[String], + pySparkFiles: Seq[String], + primaryPyFile: String, jarsDownloadPath: String, - filesDownloadPath: String) extends ContainerLocalizedFilesResolver { + filesDownloadPath: String + ) extends ContainerLocalizedFilesResolver { + override def resolveSubmittedAndRemoteSparkJars(): Seq[String] = { sparkJars.map { jar => @@ -53,16 +59,33 @@ private[spark] class ContainerLocalizedFilesResolverImpl( resolveSubmittedFiles(sparkFiles, filesDownloadPath) } - private def resolveSubmittedFiles(files: Seq[String], downloadPath: String): Seq[String] = { - files.map { file => - val fileUri = Utils.resolveURI(file) - Option(fileUri.getScheme).getOrElse("file") match { - case "file" => - val fileName = new File(fileUri.getPath).getName - s"$downloadPath/$fileName" - case _ => - file - } + override def resolveSubmittedPySparkFiles(): Seq[String] = { + def filterMainResource(x: String) = x match { + case `primaryPyFile` => None + case _ => Some(resolveFile(x, filesDownloadPath)) + } + pySparkFiles.flatMap(x => filterMainResource(x)) + } + + override def resolvePrimaryResourceFile(): String = { + Option(primaryPyFile) match { + case None => "" + case Some(p) => resolveFile(p, filesDownloadPath) } } + + private def resolveFile(file: String, downloadPath: String) = { + val fileUri = Utils.resolveURI(file) + Option(fileUri.getScheme).getOrElse("file") match { + case "file" => + val fileName = new File(fileUri.getPath).getName + s"$downloadPath/$fileName" + case _ => + file + } + } + + private def resolveSubmittedFiles(files: Seq[String], downloadPath: String): Seq[String] = { + files.map { file => resolveFile(file, downloadPath) } + } } diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/submit/DriverInitContainerComponentsProvider.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/submit/DriverInitContainerComponentsProvider.scala index e9da1af35e8ca..ba5de3acbc897 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/submit/DriverInitContainerComponentsProvider.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/submit/DriverInitContainerComponentsProvider.scala @@ -33,7 +33,8 @@ private[spark] trait DriverInitContainerComponentsProvider { def provideInitContainerConfigMapBuilder( maybeSubmittedResourceIds: Option[SubmittedResourceIds]) : SparkInitContainerConfigMapBuilder - def provideContainerLocalizedFilesResolver(): ContainerLocalizedFilesResolver + def provideContainerLocalizedFilesResolver( + mainAppResource: String) : ContainerLocalizedFilesResolver def provideExecutorInitContainerConfiguration(): ExecutorInitContainerConfiguration def provideInitContainerSubmittedDependencyUploader( driverPodLabels: Map[String, String]): Option[SubmittedDependencyUploader] @@ -51,6 +52,7 @@ private[spark] class DriverInitContainerComponentsProviderImpl( namespace: String, sparkJars: Seq[String], sparkFiles: Seq[String], + pySparkFiles: Seq[String], resourceStagingServerExternalSslOptions: SSLOptions) extends DriverInitContainerComponentsProvider { @@ -106,6 +108,7 @@ private[spark] class DriverInitContainerComponentsProviderImpl( private val initContainerImage = sparkConf.get(INIT_CONTAINER_DOCKER_IMAGE) private val dockerImagePullPolicy = sparkConf.get(DOCKER_IMAGE_PULL_POLICY) private val downloadTimeoutMinutes = sparkConf.get(INIT_CONTAINER_MOUNT_TIMEOUT) + private val pySparkSubmitted = KubernetesFileUtils.getOnlySubmitterLocalFiles(pySparkFiles) override def provideInitContainerConfigMapBuilder( maybeSubmittedResourceIds: Option[SubmittedResourceIds]) @@ -133,7 +136,7 @@ private[spark] class DriverInitContainerComponentsProviderImpl( } new SparkInitContainerConfigMapBuilderImpl( sparkJars, - sparkFiles, + sparkFiles ++ pySparkSubmitted, jarsDownloadPath, filesDownloadPath, configMapName, @@ -141,9 +144,10 @@ private[spark] class DriverInitContainerComponentsProviderImpl( submittedDependencyConfigPlugin) } - override def provideContainerLocalizedFilesResolver(): ContainerLocalizedFilesResolver = { + override def provideContainerLocalizedFilesResolver(mainAppResource: String) + : ContainerLocalizedFilesResolver = { new ContainerLocalizedFilesResolverImpl( - sparkJars, sparkFiles, jarsDownloadPath, filesDownloadPath) + sparkJars, sparkFiles, pySparkFiles, mainAppResource, jarsDownloadPath, filesDownloadPath) } override def provideExecutorInitContainerConfiguration(): ExecutorInitContainerConfiguration = { @@ -162,7 +166,7 @@ private[spark] class DriverInitContainerComponentsProviderImpl( namespace, stagingServerUri, sparkJars, - sparkFiles, + sparkFiles ++ pySparkSubmitted, resourceStagingServerExternalSslOptions, RetrofitClientFactoryImpl) } @@ -205,6 +209,6 @@ private[spark] class DriverInitContainerComponentsProviderImpl( resourceStagingServerSecretPlugin) } override def provideDriverPodFileMounter(): DriverPodKubernetesFileMounter = { - new DriverPodKubernetesFileMounterImpl(filesDownloadPath) + new DriverPodKubernetesFileMounterImpl() } } diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/submit/DriverPodKubernetesFileMounter.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/submit/DriverPodKubernetesFileMounter.scala index c28d769e5205e..cc0ef0eedb457 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/submit/DriverPodKubernetesFileMounter.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/submit/DriverPodKubernetesFileMounter.scala @@ -26,49 +26,30 @@ import org.apache.spark.deploy.kubernetes.constants._ * environmental variables in the driver-pod. */ private[spark] trait DriverPodKubernetesFileMounter { - def addPySparkFiles(mainAppResource: String, pythonFiles: List[String], + def addPySparkFiles(primaryFile: String, pySparkFiles: String, mainContainerName: String, originalPodSpec: PodBuilder) : PodBuilder } -private[spark] class DriverPodKubernetesFileMounterImpl(filesDownloadPath: String) +private[spark] class DriverPodKubernetesFileMounterImpl() extends DriverPodKubernetesFileMounter { - val LocalPattern = "(local://)(.*)".r - val FilePattern = "(file:/)(.*)".r - def getName(file: String, separatorChar: Char) : String = { - val index: Int = file.lastIndexOf(separatorChar) - file.substring(index + 1) - } - def fileLoc(file: String) : String = file match { - case "" => "" - case LocalPattern(_, file_name) => file_name - case FilePattern(_, file_name) => filesDownloadPath + "/" + getName(file_name, '/') - case _ => filesDownloadPath + "/" + getName(file, '/') - } - def pythonFileLocations(pFiles: List[String], mainAppResource: String) : String = { - def recFileLoc(file: List[String]): List[String] = file match { - case Nil => List.empty[String] - case a::b => a match { - case _ if a==mainAppResource => recFileLoc(b) - case _ => fileLoc(a) +: recFileLoc(b) - } - } - recFileLoc(pFiles).mkString(",") - } - override def addPySparkFiles(mainAppResource: String, pythonFiles: List[String], - mainContainerName: String, - originalPodSpec: PodBuilder): PodBuilder = { + override def addPySparkFiles( + primaryFile: String, + pySparkFiles: String, + mainContainerName: String, + originalPodSpec: PodBuilder): PodBuilder = { + originalPodSpec .editSpec() - .editMatchingContainer(new ContainerNameEqualityPredicate(mainContainerName)) - .addNewEnv() - .withName(ENV_PYSPARK_PRIMARY) - .withValue(fileLoc(mainAppResource)) - .endEnv() - .addNewEnv() - .withName(ENV_PYSPARK_FILES) - .withValue(pythonFileLocations(pythonFiles, mainAppResource)) - .endEnv() - .endContainer() + .editMatchingContainer(new ContainerNameEqualityPredicate(mainContainerName)) + .addNewEnv() + .withName(ENV_PYSPARK_PRIMARY) + .withValue(primaryFile) + .endEnv() + .addNewEnv() + .withName(ENV_PYSPARK_FILES) + .withValue(pySparkFiles) + .endEnv() + .endContainer() .endSpec() } } diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/kubernetes/submit/ClientV2Suite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/kubernetes/submit/ClientV2Suite.scala index 644d2d911e269..14990b71110da 100644 --- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/kubernetes/submit/ClientV2Suite.scala +++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/kubernetes/submit/ClientV2Suite.scala @@ -171,7 +171,7 @@ class ClientV2Suite extends SparkFunSuite with BeforeAndAfter { .endMetadata() } }) - when(initContainerComponentsProvider.provideContainerLocalizedFilesResolver()) + when(initContainerComponentsProvider.provideContainerLocalizedFilesResolver(MAIN_CLASS)) .thenReturn(containerLocalizedFilesResolver) when(initContainerComponentsProvider.provideExecutorInitContainerConfiguration()) .thenReturn(executorInitContainerConfiguration) @@ -465,3 +465,5 @@ class ClientV2Suite extends SparkFunSuite with BeforeAndAfter { pod.getMetadata.getAnnotations.asScala == expectedAnnotations } } + + diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/kubernetes/submit/ContainerLocalizedFilesResolverSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/kubernetes/submit/ContainerLocalizedFilesResolverSuite.scala index ca5cd1fff9b74..a1eb1e91b6ef9 100644 --- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/kubernetes/submit/ContainerLocalizedFilesResolverSuite.scala +++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/kubernetes/submit/ContainerLocalizedFilesResolverSuite.scala @@ -29,11 +29,19 @@ class ContainerLocalizedFilesResolverSuite extends SparkFunSuite { "file:///app/files/file2.txt", "local:///app/files/file3.txt", "http://app/files/file4.txt") + private val PYSPARK_FILES = Seq( + "hdfs://localhost:9000/app/files/file1.py", + "file:///app/files/file2.py", + "local:///app/files/file3.py", + "http://app/files/file4.py") private val JARS_DOWNLOAD_PATH = "/var/data/spark-jars" private val FILES_DOWNLOAD_PATH = "/var/data/spark-files" + private val PYSPARK_PRIMARY_FILE = "file:///app/files/file4.py" private val localizedFilesResolver = new ContainerLocalizedFilesResolverImpl( SPARK_JARS, SPARK_FILES, + PYSPARK_FILES, + PYSPARK_PRIMARY_FILE, JARS_DOWNLOAD_PATH, FILES_DOWNLOAD_PATH) @@ -66,4 +74,18 @@ class ContainerLocalizedFilesResolverSuite extends SparkFunSuite { "http://app/files/file4.txt") assert(resolvedFiles === expectedResolvedFiles) } + test("Submitted PySpark files should resolve to the download path.") { + val resolvedPySparkFiles = localizedFilesResolver.resolveSubmittedPySparkFiles() + val expectedPySparkFiles = Seq( + "hdfs://localhost:9000/app/files/file1.py", + s"$FILES_DOWNLOAD_PATH/file2.py", + "local:///app/files/file3.py") + assert(resolvedPySparkFiles === expectedPySparkFiles) + } + test("Submitted PySpark Primary resource should resolve to the download path.") { + val resolvedPySparkPrimary = + localizedFilesResolver.resolvePrimaryResourceFile() + val expectedPySparkPrimary = s"$FILES_DOWNLOAD_PATH/file4.py" + assert(resolvedPySparkPrimary === expectedPySparkPrimary) + } } diff --git a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/kubernetes/integrationtest/KubernetesSuite.scala b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/kubernetes/integrationtest/KubernetesSuite.scala index cb0fe7332f79f..d18fbd74da4ca 100644 --- a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/kubernetes/integrationtest/KubernetesSuite.scala +++ b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/kubernetes/integrationtest/KubernetesSuite.scala @@ -342,8 +342,8 @@ private[spark] object KubernetesSuite { val SPARK_PI_MAIN_CLASS = "org.apache.spark.deploy.kubernetes" + ".integrationtest.jobs.SparkPiWithInfiniteWait" val PYSPARK_PI_MAIN_CLASS = "org.apache.spark.deploy.PythonRunner" - val PYSPARK_PI_CONTAINER_LOCAL_FILE_LOCATION = "local:///opt/spark/" + - "examples/src/main/python/pi.py" + val PYSPARK_PI_CONTAINER_LOCAL_FILE_LOCATION = + "local:///opt/spark/examples/src/main/python/pi.py" val PYSPARK_PI_SUBMITTER_LOCAL_FILE_LOCATION = "src/test/python/pi.py" val FILE_EXISTENCE_MAIN_CLASS = "org.apache.spark.deploy.kubernetes" + ".integrationtest.jobs.FileExistenceTest" From 51105caa715edf787bcdb9af175b2c129955a3a1 Mon Sep 17 00:00:00 2001 From: Ilan Filonenko Date: Mon, 19 Jun 2017 19:45:54 -0700 Subject: [PATCH 05/18] Modified Unit tests so that they would pass --- .../submit/ContainerLocalizedFilesResolverSuite.scala | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/kubernetes/submit/ContainerLocalizedFilesResolverSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/kubernetes/submit/ContainerLocalizedFilesResolverSuite.scala index a1eb1e91b6ef9..7e51abcd7b8e0 100644 --- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/kubernetes/submit/ContainerLocalizedFilesResolverSuite.scala +++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/kubernetes/submit/ContainerLocalizedFilesResolverSuite.scala @@ -33,10 +33,11 @@ class ContainerLocalizedFilesResolverSuite extends SparkFunSuite { "hdfs://localhost:9000/app/files/file1.py", "file:///app/files/file2.py", "local:///app/files/file3.py", - "http://app/files/file4.py") + "http://app/files/file4.py", + "file:///app/files/file5.py") private val JARS_DOWNLOAD_PATH = "/var/data/spark-jars" private val FILES_DOWNLOAD_PATH = "/var/data/spark-files" - private val PYSPARK_PRIMARY_FILE = "file:///app/files/file4.py" + private val PYSPARK_PRIMARY_FILE = "file:///app/files/file5.py" private val localizedFilesResolver = new ContainerLocalizedFilesResolverImpl( SPARK_JARS, SPARK_FILES, @@ -79,13 +80,14 @@ class ContainerLocalizedFilesResolverSuite extends SparkFunSuite { val expectedPySparkFiles = Seq( "hdfs://localhost:9000/app/files/file1.py", s"$FILES_DOWNLOAD_PATH/file2.py", - "local:///app/files/file3.py") + "local:///app/files/file3.py", + "http://app/files/file4.py") assert(resolvedPySparkFiles === expectedPySparkFiles) } test("Submitted PySpark Primary resource should resolve to the download path.") { val resolvedPySparkPrimary = localizedFilesResolver.resolvePrimaryResourceFile() - val expectedPySparkPrimary = s"$FILES_DOWNLOAD_PATH/file4.py" + val expectedPySparkPrimary = s"$FILES_DOWNLOAD_PATH/file5.py" assert(resolvedPySparkPrimary === expectedPySparkPrimary) } } From bd30f409fcab6f2d7b2a5b45ed10e5874b9029aa Mon Sep 17 00:00:00 2001 From: Ilan Filonenko Date: Mon, 19 Jun 2017 21:41:04 -0700 Subject: [PATCH 06/18] Modified Unit Test input to pass Unit Tests --- .../apache/spark/deploy/kubernetes/submit/ClientV2Suite.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/kubernetes/submit/ClientV2Suite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/kubernetes/submit/ClientV2Suite.scala index 14990b71110da..4807f162df254 100644 --- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/kubernetes/submit/ClientV2Suite.scala +++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/kubernetes/submit/ClientV2Suite.scala @@ -171,7 +171,7 @@ class ClientV2Suite extends SparkFunSuite with BeforeAndAfter { .endMetadata() } }) - when(initContainerComponentsProvider.provideContainerLocalizedFilesResolver(MAIN_CLASS)) + when(initContainerComponentsProvider.provideContainerLocalizedFilesResolver("")) .thenReturn(containerLocalizedFilesResolver) when(initContainerComponentsProvider.provideExecutorInitContainerConfiguration()) .thenReturn(executorInitContainerConfiguration) From 720776ed42ab3aac8d9fe53a5bd5db867ac05868 Mon Sep 17 00:00:00 2001 From: Ilan Filonenko Date: Tue, 20 Jun 2017 19:13:45 -0700 Subject: [PATCH 07/18] Setup working environent for integration tests for PySpark --- .../kubernetes/integration-tests/pom.xml | 84 ++++++++++++++++++- .../integrationtest/KubernetesSuite.scala | 49 +++++------ .../docker/SparkDockerImageBuilder.scala | 15 ++++ 3 files changed, 123 insertions(+), 25 deletions(-) diff --git a/resource-managers/kubernetes/integration-tests/pom.xml b/resource-managers/kubernetes/integration-tests/pom.xml index a4f06bcf45808..cd3ccad0a2b22 100644 --- a/resource-managers/kubernetes/integration-tests/pom.xml +++ b/resource-managers/kubernetes/integration-tests/pom.xml @@ -231,7 +231,6 @@ ${project.parent.basedir}/python - true ${project.parent.basedir}/python/.egg ${project.parent.basedir}/python/dist @@ -240,6 +239,89 @@ + + copy-integration-data + pre-integration-test + + copy-resources + + + ${project.build.directory}/docker/data + + + ${project.parent.basedir}/data + true + + + + + + copy-integration-licenses + pre-integration-test + + copy-resources + + + ${project.build.directory}/docker/licenses + + + ${project.parent.basedir}/licenses + true + + + + + + copy-integration-examples-jar + pre-integration-test + + copy-resources + + + ${project.build.directory}/docker/examples/jars + + + ${project.parent.basedir}/examples/target/scala-2.11/jars + true + + + + + + copy-integration-examples-src + pre-integration-test + + copy-resources + + + ${project.build.directory}/docker/examples/src/main + + + ${project.parent.basedir}/examples/src/main + true + + + + + + + + org.apache.maven.plugins + maven-antrun-plugin + 1.6 + + + create-release-file + pre-integration-test + + run + + + + + + + diff --git a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/kubernetes/integrationtest/KubernetesSuite.scala b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/kubernetes/integrationtest/KubernetesSuite.scala index d18fbd74da4ca..1cb4488efeafd 100644 --- a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/kubernetes/integrationtest/KubernetesSuite.scala +++ b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/kubernetes/integrationtest/KubernetesSuite.scala @@ -72,30 +72,31 @@ private[spark] class KubernetesSuite extends SparkFunSuite with BeforeAndAfter { kubernetesTestComponents.deleteNamespace() } - /** - * These tests need to be run in an environment similair to the one provided - * by make_distribution. Further investigation is required - */ -// test("Run PySpark Job on file from SUBMITTER") { -// assume(testBackend.name == MINIKUBE_TEST_BACKEND) -// launchStagingServer(SSLOptions(), None) -// sparkConf.set(DRIVER_DOCKER_IMAGE, -// System.getProperty("spark.docker.test.driverImage", "spark-driver-py:latest")) -// sparkConf.set(EXECUTOR_DOCKER_IMAGE, -// System.getProperty("spark.docker.test.executorImage", "spark-executor-py:latest")) -// runPySparkPiAndVerifyCompletion( -// PYSPARK_PI_SUBMITTER_LOCAL_FILE_LOCATION) -// } -// test("Run PySpark Job on file from CONTAINER") { -// assume(testBackend.name == MINIKUBE_TEST_BACKEND) -// // UPDATE SO THAT IT BUILDS FROM LOCAL DOCKER IMAGE -// sparkConf.set(DRIVER_DOCKER_IMAGE, -// System.getProperty("spark.docker.test.driverImage", "spark-driver-py:latest")) -// sparkConf.set(EXECUTOR_DOCKER_IMAGE, -// System.getProperty("spark.docker.test.executorImage", "spark-executor-py:latest")) -// runPySparkPiAndVerifyCompletion( -// PYSPARK_PI_CONTAINER_LOCAL_FILE_LOCATION) -// } + test("Run PySpark Job on file from SUBMITTER") { + assume(testBackend.name == MINIKUBE_TEST_BACKEND) + + launchStagingServer(SSLOptions(), None) + sparkConf.set(DRIVER_DOCKER_IMAGE, + System.getProperty("spark.docker.test.driverImage", "spark-driver-py:latest")) + sparkConf.set(EXECUTOR_DOCKER_IMAGE, + System.getProperty("spark.docker.test.executorImage", "spark-executor-py:latest")) + + runPySparkPiAndVerifyCompletion( + PYSPARK_PI_SUBMITTER_LOCAL_FILE_LOCATION) + } + + test("Run PySpark Job on file from CONTAINER with spark.jar defined") { + assume(testBackend.name == MINIKUBE_TEST_BACKEND) + + sparkConf.setJars(Seq(CONTAINER_LOCAL_HELPER_JAR_PATH)) + sparkConf.set(DRIVER_DOCKER_IMAGE, + System.getProperty("spark.docker.test.driverImage", "spark-driver-py:latest")) + sparkConf.set(EXECUTOR_DOCKER_IMAGE, + System.getProperty("spark.docker.test.executorImage", "spark-executor-py:latest")) + + runPySparkPiAndVerifyCompletion( + PYSPARK_PI_CONTAINER_LOCAL_FILE_LOCATION) + } test("Simple submission test with the resource staging server.") { assume(testBackend.name == MINIKUBE_TEST_BACKEND) diff --git a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/kubernetes/integrationtest/docker/SparkDockerImageBuilder.scala b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/kubernetes/integrationtest/docker/SparkDockerImageBuilder.scala index faa29b91ff202..9c8039d2a1159 100644 --- a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/kubernetes/integrationtest/docker/SparkDockerImageBuilder.scala +++ b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/kubernetes/integrationtest/docker/SparkDockerImageBuilder.scala @@ -23,6 +23,11 @@ import com.spotify.docker.client.{DefaultDockerClient, DockerCertificates, Loggi import org.apache.http.client.utils.URIBuilder import org.scalatest.concurrent.{Eventually, PatienceConfiguration} import org.scalatest.time.{Minutes, Seconds, Span} +import scala.collection.JavaConverters._ + +import org.apache.spark.SparkUserAppException +import org.apache.spark.util.RedirectThread + private[spark] class SparkDockerImageBuilder(private val dockerEnv: Map[String, String]) { @@ -63,6 +68,16 @@ private[spark] class SparkDockerImageBuilder(private val dockerEnv: Map[String, def buildSparkDockerImages(): Unit = { Eventually.eventually(TIMEOUT, INTERVAL) { dockerClient.ping() } + // Building Python distribution environment + val builder = new ProcessBuilder( + Seq("python", "setup.py", "sdist").asJava) + builder.directory(new java.io.File(s"$DOCKER_BUILD_PATH/python")) + val process = builder.start() + new RedirectThread(process.getInputStream, System.out, "redirect output").start() + val exitCode = process.waitFor() + if (exitCode != 0) { + throw new SparkUserAppException(exitCode) + } buildImage("spark-base", BASE_DOCKER_FILE) buildImage("spark-driver", DRIVER_DOCKER_FILE) buildImage("spark-driver-py", DRIVERPY_DOCKER_FILE) From 4b5f4701cb9e2a13fa7b9ef86fc613836da75f68 Mon Sep 17 00:00:00 2001 From: Ilan Filonenko Date: Tue, 20 Jun 2017 21:14:54 -0700 Subject: [PATCH 08/18] Comment out Python thread logic until Jenkins has python in Python --- .../docker/SparkDockerImageBuilder.scala | 18 +++++++++--------- 1 file changed, 9 insertions(+), 9 deletions(-) diff --git a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/kubernetes/integrationtest/docker/SparkDockerImageBuilder.scala b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/kubernetes/integrationtest/docker/SparkDockerImageBuilder.scala index 9c8039d2a1159..e19545885e1a4 100644 --- a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/kubernetes/integrationtest/docker/SparkDockerImageBuilder.scala +++ b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/kubernetes/integrationtest/docker/SparkDockerImageBuilder.scala @@ -69,15 +69,15 @@ private[spark] class SparkDockerImageBuilder(private val dockerEnv: Map[String, def buildSparkDockerImages(): Unit = { Eventually.eventually(TIMEOUT, INTERVAL) { dockerClient.ping() } // Building Python distribution environment - val builder = new ProcessBuilder( - Seq("python", "setup.py", "sdist").asJava) - builder.directory(new java.io.File(s"$DOCKER_BUILD_PATH/python")) - val process = builder.start() - new RedirectThread(process.getInputStream, System.out, "redirect output").start() - val exitCode = process.waitFor() - if (exitCode != 0) { - throw new SparkUserAppException(exitCode) - } +// val builder = new ProcessBuilder( +// Seq("python", "setup.py", "sdist").asJava) +// builder.directory(new java.io.File(s"$DOCKER_BUILD_PATH/python")) +// val process = builder.start() +// new RedirectThread(process.getInputStream, System.out, "redirect output").start() +// val exitCode = process.waitFor() +// if (exitCode != 0) { +// throw new SparkUserAppException(exitCode) +// } buildImage("spark-base", BASE_DOCKER_FILE) buildImage("spark-driver", DRIVER_DOCKER_FILE) buildImage("spark-driver-py", DRIVERPY_DOCKER_FILE) From 1361a2644dea630f9766b2d16363fd08959b0c3e Mon Sep 17 00:00:00 2001 From: Ilan Filonenko Date: Wed, 21 Jun 2017 09:48:00 -0700 Subject: [PATCH 09/18] Modifying PythonExec to pass on Jenkins --- .../docker/SparkDockerImageBuilder.scala | 24 +++++++++++-------- 1 file changed, 14 insertions(+), 10 deletions(-) diff --git a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/kubernetes/integrationtest/docker/SparkDockerImageBuilder.scala b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/kubernetes/integrationtest/docker/SparkDockerImageBuilder.scala index e19545885e1a4..c8bed410fc28e 100644 --- a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/kubernetes/integrationtest/docker/SparkDockerImageBuilder.scala +++ b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/kubernetes/integrationtest/docker/SparkDockerImageBuilder.scala @@ -25,7 +25,6 @@ import org.scalatest.concurrent.{Eventually, PatienceConfiguration} import org.scalatest.time.{Minutes, Seconds, Span} import scala.collection.JavaConverters._ -import org.apache.spark.SparkUserAppException import org.apache.spark.util.RedirectThread @@ -69,15 +68,20 @@ private[spark] class SparkDockerImageBuilder(private val dockerEnv: Map[String, def buildSparkDockerImages(): Unit = { Eventually.eventually(TIMEOUT, INTERVAL) { dockerClient.ping() } // Building Python distribution environment -// val builder = new ProcessBuilder( -// Seq("python", "setup.py", "sdist").asJava) -// builder.directory(new java.io.File(s"$DOCKER_BUILD_PATH/python")) -// val process = builder.start() -// new RedirectThread(process.getInputStream, System.out, "redirect output").start() -// val exitCode = process.waitFor() -// if (exitCode != 0) { -// throw new SparkUserAppException(exitCode) -// } + val pythonExec = sys.env.get("PYSPARK_DRIVER_PYTHON") + .orElse(sys.env.get("PYSPARK_PYTHON")) + .getOrElse("python") + val builder = new ProcessBuilder( + Seq(pythonExec, "setup.py", "sdist").asJava) + builder.directory(new java.io.File(s"$DOCKER_BUILD_PATH/python")) + val process = builder.start() + new RedirectThread(process.getInputStream, System.out, "redirect output").start() + val exitCode = process.waitFor() + if (exitCode != 0) { + // scalastyle:off println + println(s"exitCode: $exitCode") + // scalastyle:on println + } buildImage("spark-base", BASE_DOCKER_FILE) buildImage("spark-driver", DRIVER_DOCKER_FILE) buildImage("spark-driver-py", DRIVERPY_DOCKER_FILE) From 0abc3b1c1d3e655dddb55fb9173e32a4f5b4b020 Mon Sep 17 00:00:00 2001 From: Ilan Filonenko Date: Wed, 21 Jun 2017 10:46:59 -0700 Subject: [PATCH 10/18] Modifying python exec --- .../integrationtest/docker/SparkDockerImageBuilder.scala | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/kubernetes/integrationtest/docker/SparkDockerImageBuilder.scala b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/kubernetes/integrationtest/docker/SparkDockerImageBuilder.scala index c8bed410fc28e..2db67c96021b8 100644 --- a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/kubernetes/integrationtest/docker/SparkDockerImageBuilder.scala +++ b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/kubernetes/integrationtest/docker/SparkDockerImageBuilder.scala @@ -70,10 +70,11 @@ private[spark] class SparkDockerImageBuilder(private val dockerEnv: Map[String, // Building Python distribution environment val pythonExec = sys.env.get("PYSPARK_DRIVER_PYTHON") .orElse(sys.env.get("PYSPARK_PYTHON")) - .getOrElse("python") + .getOrElse("/usr/bin/python") val builder = new ProcessBuilder( Seq(pythonExec, "setup.py", "sdist").asJava) builder.directory(new java.io.File(s"$DOCKER_BUILD_PATH/python")) + builder.redirectErrorStream(true) // Ugly but needed for stdout and stderr to synchronize val process = builder.start() new RedirectThread(process.getInputStream, System.out, "redirect output").start() val exitCode = process.waitFor() From 0869b077c6017980ed6b376d6a2c58d54f4e527a Mon Sep 17 00:00:00 2001 From: Ilan Filonenko Date: Thu, 22 Jun 2017 18:59:50 -0700 Subject: [PATCH 11/18] Added unit tests to ClientV2 and refactored to include pyspark submission resources --- .../deploy/kubernetes/submit/Client.scala | 100 ++++++----- .../submit/PythonSubmissionResources.scala | 60 +++++++ .../kubernetes/submit/ClientV2Suite.scala | 157 ++++++++++++++++-- .../integrationtest/KubernetesSuite.scala | 14 +- .../docker/SparkDockerImageBuilder.scala | 15 +- 5 files changed, 267 insertions(+), 79 deletions(-) create mode 100644 resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/submit/PythonSubmissionResources.scala diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/submit/Client.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/submit/Client.scala index 7b536e9fca73f..a33740ec33c01 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/submit/Client.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/submit/Client.scala @@ -48,13 +48,10 @@ private[spark] class Client( kubernetesResourceNamePrefix: String, kubernetesAppId: String, mainAppResource: String, - isPython: Boolean, + pythonResource: Option[PythonSubmissionResources], mainClass: String, sparkConf: SparkConf, appArgs: Array[String], - sparkJars: Seq[String], - sparkFiles: Seq[String], - pySparkFiles: List[String], waitForAppCompletion: Boolean, kubernetesClient: KubernetesClient, initContainerComponentsProvider: DriverInitContainerComponentsProvider, @@ -84,16 +81,10 @@ private[spark] class Client( org.apache.spark.internal.config.DRIVER_JAVA_OPTIONS) def run(): Unit = { - validateNoDuplicateFileNames(sparkJars) - validateNoDuplicateFileNames(sparkFiles) - if (isPython) {validateNoDuplicateFileNames(pySparkFiles)} - val arguments = if (isPython) pySparkFiles match { - case Nil => appArgs - case a::b => a match { - case _ if a==mainAppResource && b==Nil => appArgs - case _ => appArgs.drop(1) - } - } else appArgs + val arguments = pythonResource match { + case Some(p) => p.arguments + case None => appArgs + } val driverCustomLabels = ConfigurationUtils.combinePrefixedKeyValuePairsWithDeprecatedConf( sparkConf, KUBERNETES_DRIVER_LABEL_PREFIX, @@ -187,8 +178,10 @@ private[spark] class Client( val resolvedSparkJars = containerLocalizedFilesResolver.resolveSubmittedSparkJars() val resolvedSparkFiles = containerLocalizedFilesResolver.resolveSubmittedSparkFiles() val resolvedPySparkFiles = containerLocalizedFilesResolver.resolveSubmittedPySparkFiles() - val resolvedPrimaryPySparkResource = if (!isPython) "" - else { containerLocalizedFilesResolver.resolvePrimaryResourceFile() } + val resolvedPrimaryPySparkResource = pythonResource match { + case Some(p) => p.primarySparkResource(containerLocalizedFilesResolver) + case None => "" + } val executorInitContainerConfiguration = initContainerComponentsProvider .provideExecutorInitContainerConfiguration() val sparkConfWithExecutorInit = executorInitContainerConfiguration @@ -228,17 +221,15 @@ private[spark] class Client( .endEnv() .endContainer() .endSpec() - val resolvedDriverPod = if (!isPython) { - resolvedDriverPodBuilder.build() - } else { - initContainerComponentsProvider - .provideDriverPodFileMounter() - .addPySparkFiles( - resolvedPrimaryPySparkResource, - resolvedPySparkFiles.mkString(","), - driverContainer.getName, - resolvedDriverPodBuilder) - .build() + val resolvedDriverPod = pythonResource match { + case Some(p) => p.driverPod( + initContainerComponentsProvider, + resolvedPrimaryPySparkResource, + resolvedPySparkFiles.mkString(","), + driverContainer.getName, + resolvedDriverPodBuilder + ) + case None => resolvedDriverPodBuilder.build() } Utils.tryWithResource( kubernetesClient @@ -276,17 +267,6 @@ private[spark] class Client( } } } - - private def validateNoDuplicateFileNames(allFiles: Seq[String]): Unit = { - val fileNamesToUris = allFiles.map { file => - (new File(Utils.resolveURI(file).getPath).getName, file) - } - fileNamesToUris.groupBy(_._1).foreach { - case (fileName, urisWithFileName) => - require(urisWithFileName.size == 1, "Cannot add multiple files with the same name, but" + - s" file name $fileName is shared by all of these URIs: $urisWithFileName") - } - } } private[spark] object Client { @@ -303,12 +283,19 @@ private[spark] object Client { mainClass: String, appArgs: Array[String]): Unit = { val isPython = mainAppResource.endsWith(".py") + val pythonResource: Option[PythonSubmissionResources] = + if (isPython) { + Option(new PythonSubmissionResources(mainAppResource, appArgs)) + } else { + None + } // Since you might need jars for SQL UDFs in PySpark - def sparkJarFilter() : Seq[String] = isPython match { - case true => Seq.empty[String] - case false => Option(mainAppResource) - .filterNot(_ == SparkLauncher.NO_RESOURCE) - .toSeq + def sparkJarFilter() : Seq[String] = pythonResource match { + case Some(p) => p.sparkJars + case None => + Option(mainAppResource) + .filterNot(_ == SparkLauncher.NO_RESOURCE) + .toSeq } val sparkJars = sparkConf.getOption("spark.jars") .map(_.split(",")) @@ -317,11 +304,13 @@ private[spark] object Client { val sparkFiles = sparkConf.getOption("spark.files") .map(_.split(",")) .getOrElse(Array.empty[String]) - val pySparkFiles: Array[String] = if (isPython) { - Option(appArgs(0)) match { - case None => Array(mainAppResource) - case Some(a) => mainAppResource +: a.split(",") } - } else { Array.empty[String] } + val pySparkFiles: Array[String] = pythonResource match { + case Some(p) => p.pySparkFiles + case None => Array.empty[String] + } + validateNoDuplicateFileNames(sparkJars) + validateNoDuplicateFileNames(sparkFiles) + if (pythonResource.isDefined) {validateNoDuplicateFileNames(pySparkFiles)} val appName = sparkConf.getOption("spark.app.name").getOrElse("spark") // The resource name prefix is derived from the application name, making it easy to connect the // names of the Kubernetes resources from e.g. Kubectl or the Kubernetes dashboard to the @@ -362,13 +351,10 @@ private[spark] object Client { kubernetesResourceNamePrefix, kubernetesAppId, mainAppResource, - isPython, + pythonResource, mainClass, sparkConf, appArgs, - sparkJars, - sparkFiles, - pySparkFiles.toList, waitForAppCompletion, kubernetesClient, initContainerComponentsProvider, @@ -376,4 +362,14 @@ private[spark] object Client { loggingPodStatusWatcher).run() } } + private def validateNoDuplicateFileNames(allFiles: Seq[String]): Unit = { + val fileNamesToUris = allFiles.map { file => + (new File(Utils.resolveURI(file).getPath).getName, file) + } + fileNamesToUris.groupBy(_._1).foreach { + case (fileName, urisWithFileName) => + require(urisWithFileName.size == 1, "Cannot add multiple files with the same name, but" + + s" file name $fileName is shared by all of these URIs: $urisWithFileName") + } + } } diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/submit/PythonSubmissionResources.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/submit/PythonSubmissionResources.scala new file mode 100644 index 0000000000000..59285f7d24791 --- /dev/null +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/submit/PythonSubmissionResources.scala @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.deploy.kubernetes.submit + +import io.fabric8.kubernetes.api.model.{Pod, PodBuilder} + +class PythonSubmissionResources( + private val mainAppResource: String, + private val appArgs: Array[String] ) { + + private val pyFiles: Array[String] = Option(appArgs(0)) match { + case None => Array(mainAppResource) + case Some(a) => mainAppResource +: a.split(",") + } + + def sparkJars: Seq[String] = Seq.empty[String] + + def pySparkFiles: Array[String] = pyFiles + + def arguments: Array[String] = + pyFiles.toList match { + case Nil => appArgs + case a :: b => a match { + case _ if a == mainAppResource && b == Nil => appArgs + case _ => appArgs.drop(1) + } + } + def primarySparkResource (containerLocalizedFilesResolver: ContainerLocalizedFilesResolver) + : String = containerLocalizedFilesResolver.resolvePrimaryResourceFile() + + def driverPod( + initContainerComponentsProvider: DriverInitContainerComponentsProvider, + resolvedPrimaryPySparkResource: String, + resolvedPySparkFiles: String, + driverContainerName: String, + driverPodBuilder: PodBuilder) : Pod = { + initContainerComponentsProvider + .provideDriverPodFileMounter() + .addPySparkFiles( + resolvedPrimaryPySparkResource, + resolvedPySparkFiles, + driverContainerName, + driverPodBuilder) + .build() + } +} \ No newline at end of file diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/kubernetes/submit/ClientV2Suite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/kubernetes/submit/ClientV2Suite.scala index 4807f162df254..d0ff87e1fe15b 100644 --- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/kubernetes/submit/ClientV2Suite.scala +++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/kubernetes/submit/ClientV2Suite.scala @@ -18,7 +18,7 @@ package org.apache.spark.deploy.kubernetes.submit import java.io.File -import io.fabric8.kubernetes.api.model.{ConfigMap, ConfigMapBuilder, DoneablePod, HasMetadata, Pod, PodBuilder, PodList, Secret, SecretBuilder} +import io.fabric8.kubernetes.api.model._ import io.fabric8.kubernetes.client.{KubernetesClient, Watch} import io.fabric8.kubernetes.client.dsl.{MixedOperation, NamespaceListVisitFromServerGetDeleteRecreateWaitApplicable, PodResource} import org.hamcrest.{BaseMatcher, Description} @@ -27,10 +27,10 @@ import org.mockito.Matchers.{any, anyVararg, argThat, eq => mockitoEq} import org.mockito.Mockito.{times, verify, when} import org.mockito.invocation.InvocationOnMock import org.mockito.stubbing.Answer -import org.scalatest.BeforeAndAfter +import org.scalatest.{BeforeAndAfter, Matchers} + import scala.collection.JavaConverters._ import scala.collection.mutable - import org.apache.spark.{SecurityManager, SparkConf, SparkFunSuite} import org.apache.spark.deploy.kubernetes.{KubernetesExternalShuffleService, KubernetesShuffleBlockHandler, SparkPodInitContainerBootstrap} import org.apache.spark.deploy.kubernetes.config._ @@ -63,6 +63,7 @@ class ClientV2Suite extends SparkFunSuite with BeforeAndAfter { private val INIT_CONTAINER_SECRET_NAME = "init-container-secret" private val INIT_CONTAINER_SECRET_DATA = Map("secret-key" -> "secret-data") private val MAIN_CLASS = "org.apache.spark.examples.SparkPi" + private val PYSPARK_APP_ARGS = Array(null, "500") private val APP_ARGS = Array("3", "20") private val SPARK_JARS = Seq( "hdfs://localhost:9000/app/jars/jar1.jar", "file:///app/jars/jar2.jar") @@ -72,6 +73,20 @@ class ClientV2Suite extends SparkFunSuite with BeforeAndAfter { "/var/data/spark-jars/jar1.jar", "/var/data/spark-jars/jar2.jar") private val SPARK_FILES = Seq( "hdfs://localhost:9000/app/files/file1.txt", "file:///app/files/file2.txt") + private val PYSPARK_FILES = Seq( + "hdfs://localhost:9000/app/files/file1.py", + "file:///app/files/file2.py", + "local:///app/files/file3.py", + "http://app/files/file4.py", + "file:///app/files/file5.py") + private val RESOLVED_PYSPARK_FILES = Seq( + "hdfs://localhost:9000/app/files/file1.py", + "/var/spark-data/spark-files/file2.py", + "local:///app/files/file3.py", + "http://app/files/file4.py") + private val PYSPARK_PRIMARY_FILE = "file:///app/files/file5.py" + private val RESOLVED_PYSPARK_PRIMARY_FILE = "/var/spark-data/spark-file/file5.py" + private val RESOLVED_SPARK_FILES = Seq( "hdfs://localhost:9000/app/files/file1.txt", "file:///var/data/spark-files/file2.txt") private val INIT_CONTAINER_SECRET = new SecretBuilder() @@ -140,14 +155,19 @@ class ClientV2Suite extends SparkFunSuite with BeforeAndAfter { @Mock private var kubernetesClient: KubernetesClient = _ @Mock - private var podOps: MixedOperation[Pod, PodList, DoneablePod, PodResource[Pod, DoneablePod]] = _ + private var podOps: MixedOperation[ + Pod, PodList, DoneablePod, PodResource[Pod, DoneablePod]] = _ private type ResourceListOps = NamespaceListVisitFromServerGetDeleteRecreateWaitApplicable[ HasMetadata, java.lang.Boolean] @Mock + private var pythonSubmissionResources : PythonSubmissionResources = _ + @Mock private var resourceListOps: ResourceListOps = _ @Mock private var credentialsMounterProvider: DriverPodKubernetesCredentialsMounterProvider = _ @Mock + private var fileMounter: DriverPodKubernetesFileMounter = _ + @Mock private var credentialsMounter: DriverPodKubernetesCredentialsMounter = _ @Mock private var loggingPodStatusWatcher: LoggingPodStatusWatcher = _ @@ -171,10 +191,12 @@ class ClientV2Suite extends SparkFunSuite with BeforeAndAfter { .endMetadata() } }) - when(initContainerComponentsProvider.provideContainerLocalizedFilesResolver("")) + when(initContainerComponentsProvider.provideContainerLocalizedFilesResolver(any[String])) .thenReturn(containerLocalizedFilesResolver) when(initContainerComponentsProvider.provideExecutorInitContainerConfiguration()) .thenReturn(executorInitContainerConfiguration) + when(initContainerComponentsProvider.provideDriverPodFileMounter()) + .thenReturn(fileMounter) when(submittedDependenciesSecretBuilder.build()) .thenReturn(INIT_CONTAINER_SECRET) when(initContainerConfigMapBuilder.build()) @@ -184,14 +206,63 @@ class ClientV2Suite extends SparkFunSuite with BeforeAndAfter { override def answer(invocation: InvocationOnMock): Pod = { new PodBuilder(invocation.getArgumentAt(0, classOf[Pod])) .editMetadata() - .withUid(DRIVER_POD_UID) + .withUid(DRIVER_POD_UID) .endMetadata() - .withKind(DRIVER_POD_KIND) + .withKind(DRIVER_POD_KIND) .withApiVersion(DRIVER_POD_API_VERSION) .build() } }) when(podOps.withName(s"$APP_RESOURCE_PREFIX-driver")).thenReturn(namedPodResource) + when(pythonSubmissionResources.sparkJars).thenReturn(Seq.empty[String]) + when(pythonSubmissionResources.primarySparkResource(any())) + .thenReturn(RESOLVED_PYSPARK_PRIMARY_FILE) + when(pythonSubmissionResources.pySparkFiles).thenReturn(PYSPARK_FILES.toArray) + when(pythonSubmissionResources.arguments).thenReturn(Array(PYSPARK_FILES.mkString(","), "500")) + when(pythonSubmissionResources.driverPod( + any[DriverInitContainerComponentsProvider], + mockitoEq(RESOLVED_PYSPARK_PRIMARY_FILE), + mockitoEq(RESOLVED_PYSPARK_FILES.mkString(",")), + any[String], + any[PodBuilder])).thenAnswer( new Answer[Pod] { + override def answer(invocation: InvocationOnMock) : Pod = { + invocation.getArgumentAt(0, classOf[DriverInitContainerComponentsProvider]) + .provideDriverPodFileMounter().addPySparkFiles( + invocation.getArgumentAt(1, classOf[String]), + invocation.getArgumentAt(2, classOf[String]), + invocation.getArgumentAt(3, classOf[String]), + invocation.getArgumentAt(4, classOf[PodBuilder]) + ).build() + } + }) + when(fileMounter.addPySparkFiles( + mockitoEq(RESOLVED_PYSPARK_PRIMARY_FILE), + mockitoEq(RESOLVED_PYSPARK_FILES.mkString(",")), + any[String], + any())).thenAnswer( new Answer[PodBuilder] { + override def answer(invocation: InvocationOnMock) : PodBuilder = { + invocation.getArgumentAt(3, classOf[PodBuilder]) + .editSpec() + .editMatchingContainer(new ContainerNameEqualityPredicate( + invocation.getArgumentAt(2, classOf[String]))) + .addNewEnv() + .withName(ENV_PYSPARK_PRIMARY) + .withValue(invocation.getArgumentAt(0, classOf[String])) + .endEnv() + .addNewEnv() + .withName(ENV_PYSPARK_FILES) + .withValue(invocation.getArgumentAt(1, classOf[String])) + .endEnv() + .endContainer() + .endSpec() + .editMetadata() + .withUid(DRIVER_POD_UID) + .withName(s"$APP_RESOURCE_PREFIX-driver") + .endMetadata() + .withKind(DRIVER_POD_KIND) + .withApiVersion(DRIVER_POD_API_VERSION) + } + }) when(namedPodResource.watch(loggingPodStatusWatcher)).thenReturn(watch) when(containerLocalizedFilesResolver.resolveSubmittedAndRemoteSparkJars()) .thenReturn(RESOLVED_SPARK_REMOTE_AND_LOCAL_JARS) @@ -199,6 +270,10 @@ class ClientV2Suite extends SparkFunSuite with BeforeAndAfter { .thenReturn(RESOLVED_SPARK_JARS) when(containerLocalizedFilesResolver.resolveSubmittedSparkFiles()) .thenReturn(RESOLVED_SPARK_FILES) + when(containerLocalizedFilesResolver.resolvePrimaryResourceFile()) + .thenReturn(RESOLVED_PYSPARK_PRIMARY_FILE) + when(containerLocalizedFilesResolver.resolveSubmittedPySparkFiles()) + .thenReturn(RESOLVED_PYSPARK_FILES) when(executorInitContainerConfiguration.configureSparkConfForExecutorInitContainer(SPARK_CONF)) .thenReturn(SPARK_CONF_WITH_EXECUTOR_INIT_CONF) when(kubernetesClient.resourceList(anyVararg[HasMetadata]())).thenReturn(resourceListOps) @@ -302,13 +377,10 @@ class ClientV2Suite extends SparkFunSuite with BeforeAndAfter { APP_RESOURCE_PREFIX, APP_ID, "", - false, + None, MAIN_CLASS, SPARK_CONF, APP_ARGS, - SPARK_JARS, - SPARK_FILES, - Nil, true, kubernetesClient, initContainerComponentsProvider, @@ -317,6 +389,20 @@ class ClientV2Suite extends SparkFunSuite with BeforeAndAfter { verify(loggingPodStatusWatcher).awaitCompletion() } + test("Mounting environmental variables correctly onto Driver Pod for PySpark Jobs") { + expectationsForNoMountedCredentials() + expectationsForNoDependencyUploader() + expectationsForNoSparkJarsOrFiles() + runAndVerifyDriverPodHasCorrectPySparkProperties() + } + + private def expectationsForNoSparkJarsOrFiles(): Unit = { + when(containerLocalizedFilesResolver.resolveSubmittedSparkFiles()) + .thenReturn(Nil) + when(containerLocalizedFilesResolver.resolveSubmittedSparkJars()) + .thenReturn(Nil) + } + private def expectationsForNoDependencyUploader(): Unit = { when(initContainerComponentsProvider .provideInitContainerSubmittedDependencyUploader(ALL_EXPECTED_LABELS)) @@ -384,19 +470,22 @@ class ClientV2Suite extends SparkFunSuite with BeforeAndAfter { } } + private def runAndVerifyDriverPodHasCorrectPySparkProperties(): Unit = { + runAndVerifyPySparkPodMatchesPredicate { p => + Option(p).exists(pod => containerHasCorrectPySparkEnvs(pod)) + } + } + private def runAndVerifyPodMatchesPredicate(pred: (Pod => Boolean)): Unit = { new Client( APP_NAME, APP_RESOURCE_PREFIX, APP_ID, "", - false, + None, MAIN_CLASS, SPARK_CONF, APP_ARGS, - SPARK_JARS, - SPARK_FILES, - Nil, false, kubernetesClient, initContainerComponentsProvider, @@ -448,6 +537,17 @@ class ClientV2Suite extends SparkFunSuite with BeforeAndAfter { expectedBasicEnvs.toSet.subsetOf(envs.toSet) } + private def containerHasCorrectPySparkEnvs(pod: Pod): Boolean = { + val driverContainer = pod.getSpec.getContainers.asScala.head + val envs = driverContainer.getEnv.asScala.map(env => (env.getName, env.getValue)) + val expectedBasicEnvs = Map( + ENV_PYSPARK_PRIMARY -> RESOLVED_PYSPARK_PRIMARY_FILE, + ENV_PYSPARK_FILES -> RESOLVED_PYSPARK_FILES.mkString(","), + ENV_DRIVER_ARGS -> (RESOLVED_PYSPARK_FILES :+ "500").mkString(",") + ) + expectedBasicEnvs.toSet.subsetOf(envs.toSet) + } + private def containerHasCorrectBasicContainerConfiguration(pod: Pod): Boolean = { val containers = pod.getSpec.getContainers.asScala containers.size == 1 && @@ -464,6 +564,33 @@ class ClientV2Suite extends SparkFunSuite with BeforeAndAfter { BOOTSTRAPPED_POD_ANNOTATION -> TRUE) pod.getMetadata.getAnnotations.asScala == expectedAnnotations } + + private def runAndVerifyPySparkPodMatchesPredicate(pred: (Pod => Boolean)): Unit = { + new Client( + APP_NAME, + APP_RESOURCE_PREFIX, + APP_ID, + PYSPARK_PRIMARY_FILE, + Option(pythonSubmissionResources), + MAIN_CLASS, + SPARK_CONF, + PYSPARK_APP_ARGS, + false, + kubernetesClient, + initContainerComponentsProvider, + credentialsMounterProvider, + loggingPodStatusWatcher).run() + val podMatcher = new BaseMatcher[Pod] { + override def matches(o: scala.Any): Boolean = { + o match { + case p: Pod => pred(p) + case _ => false + } + } + override def describeTo(description: Description): Unit = {} + } + verify(podOps).create(argThat(podMatcher)) + } } diff --git a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/kubernetes/integrationtest/KubernetesSuite.scala b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/kubernetes/integrationtest/KubernetesSuite.scala index 1cb4488efeafd..d2082291eba22 100644 --- a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/kubernetes/integrationtest/KubernetesSuite.scala +++ b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/kubernetes/integrationtest/KubernetesSuite.scala @@ -76,10 +76,11 @@ private[spark] class KubernetesSuite extends SparkFunSuite with BeforeAndAfter { assume(testBackend.name == MINIKUBE_TEST_BACKEND) launchStagingServer(SSLOptions(), None) - sparkConf.set(DRIVER_DOCKER_IMAGE, - System.getProperty("spark.docker.test.driverImage", "spark-driver-py:latest")) - sparkConf.set(EXECUTOR_DOCKER_IMAGE, - System.getProperty("spark.docker.test.executorImage", "spark-executor-py:latest")) + sparkConf + .set(DRIVER_DOCKER_IMAGE, + System.getProperty("spark.docker.test.driverImage", "spark-driver-py:latest")) + .set(EXECUTOR_DOCKER_IMAGE, + System.getProperty("spark.docker.test.executorImage", "spark-executor-py:latest")) runPySparkPiAndVerifyCompletion( PYSPARK_PI_SUBMITTER_LOCAL_FILE_LOCATION) @@ -89,9 +90,10 @@ private[spark] class KubernetesSuite extends SparkFunSuite with BeforeAndAfter { assume(testBackend.name == MINIKUBE_TEST_BACKEND) sparkConf.setJars(Seq(CONTAINER_LOCAL_HELPER_JAR_PATH)) - sparkConf.set(DRIVER_DOCKER_IMAGE, + sparkConf + .set(DRIVER_DOCKER_IMAGE, System.getProperty("spark.docker.test.driverImage", "spark-driver-py:latest")) - sparkConf.set(EXECUTOR_DOCKER_IMAGE, + .set(EXECUTOR_DOCKER_IMAGE, System.getProperty("spark.docker.test.executorImage", "spark-executor-py:latest")) runPySparkPiAndVerifyCompletion( diff --git a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/kubernetes/integrationtest/docker/SparkDockerImageBuilder.scala b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/kubernetes/integrationtest/docker/SparkDockerImageBuilder.scala index 2db67c96021b8..e240fcf953f8c 100644 --- a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/kubernetes/integrationtest/docker/SparkDockerImageBuilder.scala +++ b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/kubernetes/integrationtest/docker/SparkDockerImageBuilder.scala @@ -16,19 +16,24 @@ */ package org.apache.spark.deploy.kubernetes.integrationtest.docker +import java.io.File import java.net.URI import java.nio.file.Paths +import scala.collection.JavaConverters._ + import com.spotify.docker.client.{DefaultDockerClient, DockerCertificates, LoggingBuildHandler} import org.apache.http.client.utils.URIBuilder import org.scalatest.concurrent.{Eventually, PatienceConfiguration} import org.scalatest.time.{Minutes, Seconds, Span} -import scala.collection.JavaConverters._ +import org.apache.spark.internal.Logging import org.apache.spark.util.RedirectThread -private[spark] class SparkDockerImageBuilder(private val dockerEnv: Map[String, String]) { + +private[spark] class SparkDockerImageBuilder + (private val dockerEnv: Map[String, String]) extends Logging{ private val DOCKER_BUILD_PATH = Paths.get("target", "docker") // Dockerfile paths must be relative to the build path. @@ -73,15 +78,13 @@ private[spark] class SparkDockerImageBuilder(private val dockerEnv: Map[String, .getOrElse("/usr/bin/python") val builder = new ProcessBuilder( Seq(pythonExec, "setup.py", "sdist").asJava) - builder.directory(new java.io.File(s"$DOCKER_BUILD_PATH/python")) + builder.directory(new File(DOCKER_BUILD_PATH.toFile, "python")) builder.redirectErrorStream(true) // Ugly but needed for stdout and stderr to synchronize val process = builder.start() new RedirectThread(process.getInputStream, System.out, "redirect output").start() val exitCode = process.waitFor() if (exitCode != 0) { - // scalastyle:off println - println(s"exitCode: $exitCode") - // scalastyle:on println + logInfo(s"exitCode: $exitCode") } buildImage("spark-base", BASE_DOCKER_FILE) buildImage("spark-driver", DRIVER_DOCKER_FILE) From 9bf7b9daa6ed3287a2b6d18b84b214aea9a07aad Mon Sep 17 00:00:00 2001 From: Ilan Filonenko Date: Thu, 22 Jun 2017 19:35:15 -0700 Subject: [PATCH 12/18] Modified unit test check --- .../apache/spark/deploy/kubernetes/submit/ClientV2Suite.scala | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/kubernetes/submit/ClientV2Suite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/kubernetes/submit/ClientV2Suite.scala index d0ff87e1fe15b..3180025b98788 100644 --- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/kubernetes/submit/ClientV2Suite.scala +++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/kubernetes/submit/ClientV2Suite.scala @@ -542,8 +542,7 @@ class ClientV2Suite extends SparkFunSuite with BeforeAndAfter { val envs = driverContainer.getEnv.asScala.map(env => (env.getName, env.getValue)) val expectedBasicEnvs = Map( ENV_PYSPARK_PRIMARY -> RESOLVED_PYSPARK_PRIMARY_FILE, - ENV_PYSPARK_FILES -> RESOLVED_PYSPARK_FILES.mkString(","), - ENV_DRIVER_ARGS -> (RESOLVED_PYSPARK_FILES :+ "500").mkString(",") + ENV_PYSPARK_FILES -> RESOLVED_PYSPARK_FILES.mkString(",") ) expectedBasicEnvs.toSet.subsetOf(envs.toSet) } From 4561194a2479ac2f1960a46dba0cb54842e1dc65 Mon Sep 17 00:00:00 2001 From: Ilan Filonenko Date: Fri, 23 Jun 2017 09:36:46 -0700 Subject: [PATCH 13/18] Scalastyle --- .../deploy/kubernetes/submit/PythonSubmissionResources.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/submit/PythonSubmissionResources.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/submit/PythonSubmissionResources.scala index 59285f7d24791..3cc3c2c37b86f 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/submit/PythonSubmissionResources.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/submit/PythonSubmissionResources.scala @@ -57,4 +57,4 @@ class PythonSubmissionResources( driverPodBuilder) .build() } -} \ No newline at end of file +} From eb1079a9ebaabd5542bf5cc3a009607089892404 Mon Sep 17 00:00:00 2001 From: Ilan Filonenko Date: Fri, 23 Jun 2017 12:29:53 -0700 Subject: [PATCH 14/18] PR 348 file conflicts --- .../apache/spark/deploy/kubernetes/submit/Client.scala | 6 ++---- .../submit/DriverInitContainerComponentsProvider.scala | 10 +++++----- .../spark/deploy/kubernetes/submit/ClientV2Suite.scala | 6 +++--- 3 files changed, 10 insertions(+), 12 deletions(-) diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/submit/Client.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/submit/Client.scala index 618107eb0e60c..1e5854f203493 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/submit/Client.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/submit/Client.scala @@ -194,7 +194,7 @@ private[spark] class Client( val initContainerBundler = initContainerComponentsProvider .provideInitContainerBundle(maybeSubmittedResourceIdentifiers.map(_.ids()), - resolvedSparkJars ++ resolvedSparkFiles, mainAppResource) + resolvedSparkJars ++ resolvedSparkFiles) val podWithInitContainer = initContainerBundler.map( _.sparkPodInitContainerBootstrap @@ -305,9 +305,7 @@ private[spark] object Client { val pythonResource: Option[PythonSubmissionResources] = if (isPython) { Option(new PythonSubmissionResources(mainAppResource, appArgs)) - } else { - None - } + } else None // Since you might need jars for SQL UDFs in PySpark def sparkJarFilter() : Seq[String] = pythonResource match { case Some(p) => p.sparkJars diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/submit/DriverInitContainerComponentsProvider.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/submit/DriverInitContainerComponentsProvider.scala index 1fa4e18cb9850..7f5a62e97abc0 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/submit/DriverInitContainerComponentsProvider.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/submit/DriverInitContainerComponentsProvider.scala @@ -42,7 +42,7 @@ private[spark] trait DriverInitContainerComponentsProvider { def provideInitContainerBootstrap(): SparkPodInitContainerBootstrap def provideDriverPodFileMounter(): DriverPodKubernetesFileMounter def provideInitContainerBundle(maybeSubmittedResourceIds: Option[SubmittedResourceIds], - uris: Iterable[String], mainAppResource: String): Option[InitContainerBundle] + uris: Iterable[String]): Option[InitContainerBundle] } private[spark] class DriverInitContainerComponentsProviderImpl( @@ -211,10 +211,10 @@ private[spark] class DriverInitContainerComponentsProviderImpl( } override def provideInitContainerBundle( maybeSubmittedResourceIds: Option[SubmittedResourceIds], - uris: Iterable[String], mainAppResource: String): Option[InitContainerBundle] = { - val containerLocalizedFilesResolver = provideContainerLocalizedFilesResolver(mainAppResource) - // Bypass init-containers if `spark.jars` and `spark.files` is empty or only has `local://` URIs - if (KubernetesFileUtils.getNonContainerLocalFiles(uris).nonEmpty) { + uris: Iterable[String]): Option[InitContainerBundle] = { + // Bypass init-containers if `spark.jars` and `spark.files` and '--py-rilfes' + // is empty or only has `local://` URIs + if ((KubernetesFileUtils.getNonContainerLocalFiles(uris) ++ pySparkSubmitted).nonEmpty) { Some(InitContainerBundle(provideInitContainerConfigMap(maybeSubmittedResourceIds), provideInitContainerBootstrap(), provideExecutorInitContainerConfiguration())) diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/kubernetes/submit/ClientV2Suite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/kubernetes/submit/ClientV2Suite.scala index 9efcbff41f4d2..3f1febf205654 100644 --- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/kubernetes/submit/ClientV2Suite.scala +++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/kubernetes/submit/ClientV2Suite.scala @@ -285,7 +285,7 @@ class ClientV2Suite extends SparkFunSuite with BeforeAndAfter { .thenReturn(Some(submittedDependenciesSecretBuilder)) when(initContainerComponentsProvider.provideInitContainerBundle(mockitoEq( Option(SUBMITTED_RESOURCES.ids())), - mockitoEq(RESOLVED_SPARK_JARS ++ RESOLVED_SPARK_FILES), any[String])) + mockitoEq(RESOLVED_SPARK_JARS ++ RESOLVED_SPARK_FILES))) .thenReturn(Option(InitContainerBundle(INIT_CONTAINER_CONFIG_MAP, initContainerBootstrap, executorInitContainerConfiguration))) runAndVerifyDriverPodHasCorrectProperties() @@ -403,7 +403,7 @@ class ClientV2Suite extends SparkFunSuite with BeforeAndAfter { .provideSubmittedDependenciesSecretBuilder(None)) .thenReturn(None) when(initContainerComponentsProvider.provideInitContainerBundle(mockitoEq(None), - mockitoEq(RESOLVED_SPARK_JARS ++ RESOLVED_SPARK_FILES), any[String])) + mockitoEq(RESOLVED_SPARK_JARS ++ RESOLVED_SPARK_FILES))) .thenReturn(Some(InitContainerBundle(INIT_CONTAINER_CONFIG_MAP, initContainerBootstrap, executorInitContainerConfiguration))) } @@ -465,7 +465,7 @@ class ClientV2Suite extends SparkFunSuite with BeforeAndAfter { private def runAndVerifyDriverPodHasCorrectPySparkProperties(): Unit = { when(initContainerComponentsProvider.provideInitContainerBundle( - any[Option[SubmittedResourceIds]], any[Iterable[String]], any[String])) + any[Option[SubmittedResourceIds]], any[Iterable[String]])) .thenReturn(Some(InitContainerBundle(INIT_CONTAINER_CONFIG_MAP, initContainerBootstrap, executorInitContainerConfiguration))) runAndVerifyPySparkPodMatchesPredicate { p => From 4a6b779b7ff2cc5ddad7f0bd415a3b613020a5a1 Mon Sep 17 00:00:00 2001 From: Ilan Filonenko Date: Tue, 27 Jun 2017 18:06:47 -0700 Subject: [PATCH 15/18] Refactored unit tests and styles --- .../deploy/kubernetes/submit/Client.scala | 16 +-- .../ContainerLocalizedFilesResolver.scala | 8 +- .../submit/PythonSubmissionResources.scala | 38 +++-- .../kubernetes/submit/ClientV2Suite.scala | 42 ++---- .../PythonSubmissionResourcesSuite.scala | 133 ++++++++++++++++++ 5 files changed, 177 insertions(+), 60 deletions(-) create mode 100644 resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/kubernetes/submit/PythonSubmissionResourcesSuite.scala diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/submit/Client.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/submit/Client.scala index 1e5854f203493..d583d3a0565c6 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/submit/Client.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/submit/Client.scala @@ -48,7 +48,7 @@ private[spark] class Client( kubernetesResourceNamePrefix: String, kubernetesAppId: String, mainAppResource: String, - pythonResource: Option[PythonSubmissionResources], + pythonResource: Option[PythonSubmissionResourcesImpl], mainClass: String, sparkConf: SparkConf, appArgs: Array[String], @@ -188,7 +188,7 @@ private[spark] class Client( val resolvedSparkFiles = containerLocalizedFilesResolver.resolveSubmittedSparkFiles() val resolvedPySparkFiles = containerLocalizedFilesResolver.resolveSubmittedPySparkFiles() val resolvedPrimaryPySparkResource = pythonResource match { - case Some(p) => p.primarySparkResource(containerLocalizedFilesResolver) + case Some(p) => p.primaryPySparkResource(containerLocalizedFilesResolver) case None => "" } @@ -302,9 +302,9 @@ private[spark] object Client { mainClass: String, appArgs: Array[String]): Unit = { val isPython = mainAppResource.endsWith(".py") - val pythonResource: Option[PythonSubmissionResources] = + val pythonResource: Option[PythonSubmissionResourcesImpl] = if (isPython) { - Option(new PythonSubmissionResources(mainAppResource, appArgs)) + Option(new PythonSubmissionResourcesImpl(mainAppResource, appArgs)) } else None // Since you might need jars for SQL UDFs in PySpark def sparkJarFilter() : Seq[String] = pythonResource match { @@ -321,13 +321,11 @@ private[spark] object Client { val sparkFiles = sparkConf.getOption("spark.files") .map(_.split(",")) .getOrElse(Array.empty[String]) - val pySparkFiles: Array[String] = pythonResource match { - case Some(p) => p.pySparkFiles - case None => Array.empty[String] - } + val pySparkFilesOption = pythonResource map {p => p.pySparkFiles} validateNoDuplicateFileNames(sparkJars) validateNoDuplicateFileNames(sparkFiles) - if (pythonResource.isDefined) {validateNoDuplicateFileNames(pySparkFiles)} + pySparkFilesOption foreach {b => validateNoDuplicateFileNames(b)} + val pySparkFiles = pySparkFilesOption.getOrElse(Array.empty[String]) val appName = sparkConf.getOption("spark.app.name").getOrElse("spark") // The resource name prefix is derived from the application name, making it easy to connect the // names of the Kubernetes resources from e.g. Kubectl or the Kubernetes dashboard to the diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/submit/ContainerLocalizedFilesResolver.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/submit/ContainerLocalizedFilesResolver.scala index 999fe6d7e38a0..7345589c59c42 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/submit/ContainerLocalizedFilesResolver.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/submit/ContainerLocalizedFilesResolver.scala @@ -34,8 +34,7 @@ private[spark] class ContainerLocalizedFilesResolverImpl( pySparkFiles: Seq[String], primaryPyFile: String, jarsDownloadPath: String, - filesDownloadPath: String - ) extends ContainerLocalizedFilesResolver { + filesDownloadPath: String ) extends ContainerLocalizedFilesResolver { override def resolveSubmittedAndRemoteSparkJars(): Seq[String] = { @@ -68,10 +67,7 @@ private[spark] class ContainerLocalizedFilesResolverImpl( } override def resolvePrimaryResourceFile(): String = { - Option(primaryPyFile) match { - case None => "" - case Some(p) => resolveFile(p, filesDownloadPath) - } + Option(primaryPyFile).map(p => resolveFile(p, filesDownloadPath)).getOrElse("") } private def resolveFile(file: String, downloadPath: String) = { diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/submit/PythonSubmissionResources.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/submit/PythonSubmissionResources.scala index 3cc3c2c37b86f..a20d93080c4e1 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/submit/PythonSubmissionResources.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/submit/PythonSubmissionResources.scala @@ -18,20 +18,34 @@ package org.apache.spark.deploy.kubernetes.submit import io.fabric8.kubernetes.api.model.{Pod, PodBuilder} -class PythonSubmissionResources( +private[spark] trait PythonSubmissionResources { + def sparkJars: Seq[String] + def pySparkFiles: Array[String] + def arguments: Array[String] + def primaryPySparkResource(containerLocalizedFilesResolver: ContainerLocalizedFilesResolver) + : String + def driverPod( + initContainerComponentsProvider: DriverInitContainerComponentsProvider, + resolvedPrimaryPySparkResource: String, + resolvedPySparkFiles: String, + driverContainerName: String, + driverPodBuilder: PodBuilder) : Pod +} + +private[spark] class PythonSubmissionResourcesImpl( private val mainAppResource: String, - private val appArgs: Array[String] ) { + private val appArgs: Array[String] ) extends PythonSubmissionResources { - private val pyFiles: Array[String] = Option(appArgs(0)) match { - case None => Array(mainAppResource) - case Some(a) => mainAppResource +: a.split(",") + private val pyFiles: Array[String] = { + (Option(appArgs(0)) map (a => mainAppResource +: a.split(","))) + .getOrElse(Array(mainAppResource)) } - def sparkJars: Seq[String] = Seq.empty[String] + override def sparkJars: Seq[String] = Seq.empty[String] - def pySparkFiles: Array[String] = pyFiles + override def pySparkFiles: Array[String] = pyFiles - def arguments: Array[String] = + override def arguments: Array[String] = { pyFiles.toList match { case Nil => appArgs case a :: b => a match { @@ -39,10 +53,12 @@ class PythonSubmissionResources( case _ => appArgs.drop(1) } } - def primarySparkResource (containerLocalizedFilesResolver: ContainerLocalizedFilesResolver) - : String = containerLocalizedFilesResolver.resolvePrimaryResourceFile() + } + override def primaryPySparkResource ( + containerLocalizedFilesResolver: ContainerLocalizedFilesResolver) : String = + containerLocalizedFilesResolver.resolvePrimaryResourceFile() - def driverPod( + override def driverPod( initContainerComponentsProvider: DriverInitContainerComponentsProvider, resolvedPrimaryPySparkResource: String, resolvedPySparkFiles: String, diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/kubernetes/submit/ClientV2Suite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/kubernetes/submit/ClientV2Suite.scala index 3f1febf205654..fe04fddc746bd 100644 --- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/kubernetes/submit/ClientV2Suite.scala +++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/kubernetes/submit/ClientV2Suite.scala @@ -158,8 +158,6 @@ class ClientV2Suite extends SparkFunSuite with BeforeAndAfter { private type ResourceListOps = NamespaceListVisitFromServerGetDeleteRecreateWaitApplicable[ HasMetadata, java.lang.Boolean] @Mock - private var pythonSubmissionResources : PythonSubmissionResources = _ - @Mock private var resourceListOps: ResourceListOps = _ @Mock private var credentialsMounterProvider: DriverPodKubernetesCredentialsMounterProvider = _ @@ -189,8 +187,8 @@ class ClientV2Suite extends SparkFunSuite with BeforeAndAfter { .endMetadata() } }) - when(initContainerComponentsProvider.provideContainerLocalizedFilesResolver(any[String])) - .thenReturn(containerLocalizedFilesResolver) + when(initContainerComponentsProvider.provideContainerLocalizedFilesResolver( + any[String])).thenReturn(containerLocalizedFilesResolver) when(initContainerComponentsProvider.provideDriverPodFileMounter()) .thenReturn(fileMounter) when(submittedDependenciesSecretBuilder.build()) @@ -208,27 +206,6 @@ class ClientV2Suite extends SparkFunSuite with BeforeAndAfter { } }) when(podOps.withName(s"$APP_RESOURCE_PREFIX-driver")).thenReturn(namedPodResource) - when(pythonSubmissionResources.sparkJars).thenReturn(Seq.empty[String]) - when(pythonSubmissionResources.primarySparkResource(any())) - .thenReturn(RESOLVED_PYSPARK_PRIMARY_FILE) - when(pythonSubmissionResources.pySparkFiles).thenReturn(PYSPARK_FILES.toArray) - when(pythonSubmissionResources.arguments).thenReturn(Array(PYSPARK_FILES.mkString(","), "500")) - when(pythonSubmissionResources.driverPod( - any[DriverInitContainerComponentsProvider], - mockitoEq(RESOLVED_PYSPARK_PRIMARY_FILE), - mockitoEq(RESOLVED_PYSPARK_FILES.mkString(",")), - any[String], - any[PodBuilder])).thenAnswer( new Answer[Pod] { - override def answer(invocation: InvocationOnMock) : Pod = { - invocation.getArgumentAt(0, classOf[DriverInitContainerComponentsProvider]) - .provideDriverPodFileMounter().addPySparkFiles( - invocation.getArgumentAt(1, classOf[String]), - invocation.getArgumentAt(2, classOf[String]), - invocation.getArgumentAt(3, classOf[String]), - invocation.getArgumentAt(4, classOf[PodBuilder]) - ).build() - } - }) when(fileMounter.addPySparkFiles( mockitoEq(RESOLVED_PYSPARK_PRIMARY_FILE), mockitoEq(RESOLVED_PYSPARK_FILES.mkString(",")), @@ -240,12 +217,8 @@ class ClientV2Suite extends SparkFunSuite with BeforeAndAfter { .editMatchingContainer(new ContainerNameEqualityPredicate( invocation.getArgumentAt(2, classOf[String]))) .addNewEnv() - .withName(ENV_PYSPARK_PRIMARY) - .withValue(invocation.getArgumentAt(0, classOf[String])) - .endEnv() - .addNewEnv() - .withName(ENV_PYSPARK_FILES) - .withValue(invocation.getArgumentAt(1, classOf[String])) + .withName("pyspark") + .withValue("true") .endEnv() .endContainer() .endSpec() @@ -464,6 +437,8 @@ class ClientV2Suite extends SparkFunSuite with BeforeAndAfter { } private def runAndVerifyDriverPodHasCorrectPySparkProperties(): Unit = { + when(initContainerComponentsProvider.provideContainerLocalizedFilesResolver( + mockitoEq(PYSPARK_PRIMARY_FILE))).thenReturn(containerLocalizedFilesResolver) when(initContainerComponentsProvider.provideInitContainerBundle( any[Option[SubmittedResourceIds]], any[Iterable[String]])) .thenReturn(Some(InitContainerBundle(INIT_CONTAINER_CONFIG_MAP, @@ -538,8 +513,7 @@ class ClientV2Suite extends SparkFunSuite with BeforeAndAfter { val driverContainer = pod.getSpec.getContainers.asScala.head val envs = driverContainer.getEnv.asScala.map(env => (env.getName, env.getValue)) val expectedBasicEnvs = Map( - ENV_PYSPARK_PRIMARY -> RESOLVED_PYSPARK_PRIMARY_FILE, - ENV_PYSPARK_FILES -> RESOLVED_PYSPARK_FILES.mkString(",") + "pyspark" -> "true" ) expectedBasicEnvs.toSet.subsetOf(envs.toSet) } @@ -567,7 +541,7 @@ class ClientV2Suite extends SparkFunSuite with BeforeAndAfter { APP_RESOURCE_PREFIX, APP_ID, PYSPARK_PRIMARY_FILE, - Option(pythonSubmissionResources), + Option(new PythonSubmissionResourcesImpl(PYSPARK_PRIMARY_FILE, PYSPARK_APP_ARGS)), MAIN_CLASS, SPARK_CONF, PYSPARK_APP_ARGS, diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/kubernetes/submit/PythonSubmissionResourcesSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/kubernetes/submit/PythonSubmissionResourcesSuite.scala new file mode 100644 index 0000000000000..bb83bee667ce4 --- /dev/null +++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/kubernetes/submit/PythonSubmissionResourcesSuite.scala @@ -0,0 +1,133 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.deploy.kubernetes.submit + +import org.apache.spark.{SSLOptions, SparkConf, SparkFunSuite} +import org.apache.spark.deploy.kubernetes.config._ +import scala.collection.JavaConverters._ + +import io.fabric8.kubernetes.api.model.{ContainerBuilder, Pod, PodBuilder} + + + +private[spark] class PythonSubmissionResourcesSuite extends SparkFunSuite { + private val PYSPARK_FILES = Seq( + "hdfs://localhost:9000/app/files/file1.py", + "file:///app/files/file2.py", + "local:///app/files/file3.py", + "http://app/files/file4.py", + "file:///app/files/file5.py") + private val RESOLVED_PYSPARK_FILES = Seq( + "hdfs://localhost:9000/app/files/file1.py", + "/var/spark-data/spark-files/file2.py", + "local:///app/file`s/file3.py", + "http://app/files/file4.py") + private val PYSPARK_PRIMARY_FILE = "file:///app/files/file5.py" + private val RESOLVED_PYSPARK_PRIMARY_FILE = "/var/data/spark-files/file5.py" + + private val pyFilesResource = new PythonSubmissionResourcesImpl( + PYSPARK_PRIMARY_FILE, Array(PYSPARK_FILES.mkString(","), "500") + ) + private val pyResource = new PythonSubmissionResourcesImpl( + PYSPARK_PRIMARY_FILE, Array(null, "500") + ) + private val SPARK_FILES = Seq.empty[String] + private val SPARK_JARS = Seq.empty[String] + private val JARS_DOWNLOAD_PATH = "/var/data/spark-jars" + private val FILES_DOWNLOAD_PATH = "/var/data/spark-files" + private val localizedFilesResolver = new ContainerLocalizedFilesResolverImpl( + SPARK_JARS, + SPARK_FILES, + PYSPARK_FILES, + PYSPARK_PRIMARY_FILE, + JARS_DOWNLOAD_PATH, + FILES_DOWNLOAD_PATH) + private val lessLocalizedFilesResolver = new ContainerLocalizedFilesResolverImpl( + SPARK_JARS, + SPARK_FILES, + Seq.empty[String], + PYSPARK_PRIMARY_FILE, + JARS_DOWNLOAD_PATH, + FILES_DOWNLOAD_PATH) + private val NAMESPACE = "example_pyspark" + private val DRIVER_CONTAINER_NAME = "pyspark_container" + private val driverContainer = new ContainerBuilder() + .withName(DRIVER_CONTAINER_NAME) + .build() + private val basePodBuilder = new PodBuilder() + .withNewMetadata() + .withName("base_pod") + .endMetadata() + .withNewSpec() + .addToContainers(driverContainer) + .endSpec() + private val driverInitcontainer = new DriverInitContainerComponentsProviderImpl( + new SparkConf(true).set(KUBERNETES_NAMESPACE, NAMESPACE), + "kubeResourceName", + "namespace", + SPARK_JARS, + SPARK_FILES, + PYSPARK_PRIMARY_FILE +: PYSPARK_FILES, + SSLOptions() + ) + private val lessDriverInitcontainer = new DriverInitContainerComponentsProviderImpl( + new SparkConf(true).set(KUBERNETES_NAMESPACE, NAMESPACE), + "kubeResourceName", + "namespace", + SPARK_JARS, + SPARK_FILES, + Array(PYSPARK_PRIMARY_FILE), + SSLOptions() + ) + + test("Test with --py-files included") { + assert(pyFilesResource.sparkJars === Seq.empty[String]) + assert(pyFilesResource.pySparkFiles === + PYSPARK_PRIMARY_FILE +: PYSPARK_FILES) + assert(pyFilesResource.primaryPySparkResource(localizedFilesResolver) === + RESOLVED_PYSPARK_PRIMARY_FILE) + val driverPod: Pod = pyFilesResource.driverPod( + driverInitcontainer, + RESOLVED_PYSPARK_PRIMARY_FILE, + RESOLVED_PYSPARK_FILES.mkString(","), + DRIVER_CONTAINER_NAME, + basePodBuilder + ) + val driverContainer = driverPod.getSpec.getContainers.asScala.head + val envs = driverContainer.getEnv.asScala.map(env => (env.getName, env.getValue)).toMap + envs.get("PYSPARK_PRIMARY") foreach{ a => assert (a === RESOLVED_PYSPARK_PRIMARY_FILE) } + envs.get("PYSPARK_FILES") foreach{ a => assert (a === RESOLVED_PYSPARK_FILES.mkString(",")) } + } + + test("Test without --py-files") { + assert(pyResource.sparkJars === Seq.empty[String]) + assert(pyResource.pySparkFiles === Array(PYSPARK_PRIMARY_FILE)) + assert(pyResource.primaryPySparkResource(lessLocalizedFilesResolver) === + RESOLVED_PYSPARK_PRIMARY_FILE) + val driverPod: Pod = pyResource.driverPod( + lessDriverInitcontainer, + RESOLVED_PYSPARK_PRIMARY_FILE, + "", + DRIVER_CONTAINER_NAME, + basePodBuilder + ) + val driverContainer = driverPod.getSpec.getContainers.asScala.head + val envs = driverContainer.getEnv.asScala.map(env => (env.getName, env.getValue)).toMap + envs.get("PYSPARK_PRIMARY") foreach{ a => assert (a === RESOLVED_PYSPARK_PRIMARY_FILE) } + envs.get("PYSPARK_FILES") foreach{ a => assert (a === "") } + } +} \ No newline at end of file From 363919a49648a5e3c7c8735b6008a233e38ea612 Mon Sep 17 00:00:00 2001 From: Ilan Filonenko Date: Wed, 28 Jun 2017 11:15:28 -0700 Subject: [PATCH 16/18] further scala stylzing and logic --- .../deploy/kubernetes/submit/Client.scala | 32 +++++++------------ .../submit/PythonSubmissionResources.scala | 7 ++-- .../PythonSubmissionResourcesSuite.scala | 12 +++---- 3 files changed, 21 insertions(+), 30 deletions(-) diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/submit/Client.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/submit/Client.scala index d583d3a0565c6..882672a31ad13 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/submit/Client.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/submit/Client.scala @@ -82,10 +82,7 @@ private[spark] class Client( org.apache.spark.internal.config.DRIVER_JAVA_OPTIONS) def run(): Unit = { - val arguments = pythonResource match { - case Some(p) => p.arguments - case None => appArgs - } + val arguments = (pythonResource map {p => p.arguments}).getOrElse(appArgs) val driverCustomLabels = ConfigurationUtils.combinePrefixedKeyValuePairsWithDeprecatedConf( sparkConf, KUBERNETES_DRIVER_LABEL_PREFIX, @@ -187,11 +184,9 @@ private[spark] class Client( val resolvedSparkJars = containerLocalizedFilesResolver.resolveSubmittedSparkJars() val resolvedSparkFiles = containerLocalizedFilesResolver.resolveSubmittedSparkFiles() val resolvedPySparkFiles = containerLocalizedFilesResolver.resolveSubmittedPySparkFiles() - val resolvedPrimaryPySparkResource = pythonResource match { - case Some(p) => p.primaryPySparkResource(containerLocalizedFilesResolver) - case None => "" - } - + val resolvedPrimaryPySparkResource = (pythonResource map { + p => p.primaryPySparkResource(containerLocalizedFilesResolver) + }).getOrElse("") val initContainerBundler = initContainerComponentsProvider .provideInitContainerBundle(maybeSubmittedResourceIdentifiers.map(_.ids()), resolvedSparkJars ++ resolvedSparkFiles) @@ -239,16 +234,15 @@ private[spark] class Client( .endEnv() .endContainer() .endSpec() - val resolvedDriverPod = pythonResource match { - case Some(p) => p.driverPod( - initContainerComponentsProvider, + val driverPodFileMounter = initContainerComponentsProvider.provideDriverPodFileMounter() + val resolvedDriverPod = (pythonResource map { + p => p.driverPod( + driverPodFileMounter, resolvedPrimaryPySparkResource, resolvedPySparkFiles.mkString(","), driverContainer.getName, resolvedDriverPodBuilder - ) - case None => resolvedDriverPodBuilder.build() - } + )}).getOrElse(resolvedDriverPodBuilder.build()) Utils.tryWithResource( kubernetesClient .pods() @@ -307,13 +301,11 @@ private[spark] object Client { Option(new PythonSubmissionResourcesImpl(mainAppResource, appArgs)) } else None // Since you might need jars for SQL UDFs in PySpark - def sparkJarFilter() : Seq[String] = pythonResource match { - case Some(p) => p.sparkJars - case None => + def sparkJarFilter() : Seq[String] = (pythonResource map { + p => p.sparkJars}).getOrElse( Option(mainAppResource) .filterNot(_ == SparkLauncher.NO_RESOURCE) - .toSeq - } + .toSeq) val sparkJars = sparkConf.getOption("spark.jars") .map(_.split(",")) .getOrElse(Array.empty[String]) ++ sparkJarFilter() diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/submit/PythonSubmissionResources.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/submit/PythonSubmissionResources.scala index a20d93080c4e1..c41542e2ea48b 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/submit/PythonSubmissionResources.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/submit/PythonSubmissionResources.scala @@ -25,7 +25,7 @@ private[spark] trait PythonSubmissionResources { def primaryPySparkResource(containerLocalizedFilesResolver: ContainerLocalizedFilesResolver) : String def driverPod( - initContainerComponentsProvider: DriverInitContainerComponentsProvider, + driverPodFileMounter: DriverPodKubernetesFileMounter, resolvedPrimaryPySparkResource: String, resolvedPySparkFiles: String, driverContainerName: String, @@ -59,13 +59,12 @@ private[spark] class PythonSubmissionResourcesImpl( containerLocalizedFilesResolver.resolvePrimaryResourceFile() override def driverPod( - initContainerComponentsProvider: DriverInitContainerComponentsProvider, + driverPodFileMounter: DriverPodKubernetesFileMounter, resolvedPrimaryPySparkResource: String, resolvedPySparkFiles: String, driverContainerName: String, driverPodBuilder: PodBuilder) : Pod = { - initContainerComponentsProvider - .provideDriverPodFileMounter() + driverPodFileMounter .addPySparkFiles( resolvedPrimaryPySparkResource, resolvedPySparkFiles, diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/kubernetes/submit/PythonSubmissionResourcesSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/kubernetes/submit/PythonSubmissionResourcesSuite.scala index bb83bee667ce4..5c2305a8c61f7 100644 --- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/kubernetes/submit/PythonSubmissionResourcesSuite.scala +++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/kubernetes/submit/PythonSubmissionResourcesSuite.scala @@ -75,7 +75,7 @@ private[spark] class PythonSubmissionResourcesSuite extends SparkFunSuite { .withNewSpec() .addToContainers(driverContainer) .endSpec() - private val driverInitcontainer = new DriverInitContainerComponentsProviderImpl( + private val driverFileMounter = new DriverInitContainerComponentsProviderImpl( new SparkConf(true).set(KUBERNETES_NAMESPACE, NAMESPACE), "kubeResourceName", "namespace", @@ -83,8 +83,8 @@ private[spark] class PythonSubmissionResourcesSuite extends SparkFunSuite { SPARK_FILES, PYSPARK_PRIMARY_FILE +: PYSPARK_FILES, SSLOptions() - ) - private val lessDriverInitcontainer = new DriverInitContainerComponentsProviderImpl( + ).provideDriverPodFileMounter() + private val lessDriverFileMounter = new DriverInitContainerComponentsProviderImpl( new SparkConf(true).set(KUBERNETES_NAMESPACE, NAMESPACE), "kubeResourceName", "namespace", @@ -92,7 +92,7 @@ private[spark] class PythonSubmissionResourcesSuite extends SparkFunSuite { SPARK_FILES, Array(PYSPARK_PRIMARY_FILE), SSLOptions() - ) + ).provideDriverPodFileMounter() test("Test with --py-files included") { assert(pyFilesResource.sparkJars === Seq.empty[String]) @@ -101,7 +101,7 @@ private[spark] class PythonSubmissionResourcesSuite extends SparkFunSuite { assert(pyFilesResource.primaryPySparkResource(localizedFilesResolver) === RESOLVED_PYSPARK_PRIMARY_FILE) val driverPod: Pod = pyFilesResource.driverPod( - driverInitcontainer, + driverFileMounter, RESOLVED_PYSPARK_PRIMARY_FILE, RESOLVED_PYSPARK_FILES.mkString(","), DRIVER_CONTAINER_NAME, @@ -119,7 +119,7 @@ private[spark] class PythonSubmissionResourcesSuite extends SparkFunSuite { assert(pyResource.primaryPySparkResource(lessLocalizedFilesResolver) === RESOLVED_PYSPARK_PRIMARY_FILE) val driverPod: Pod = pyResource.driverPod( - lessDriverInitcontainer, + lessDriverFileMounter, RESOLVED_PYSPARK_PRIMARY_FILE, "", DRIVER_CONTAINER_NAME, From 9c7adb14c8239d3e33677bf63c542b19570d64e2 Mon Sep 17 00:00:00 2001 From: Ilan Filonenko Date: Wed, 28 Jun 2017 13:49:04 -0700 Subject: [PATCH 17/18] Modified unit tests to be more specific towards Class in question --- .../deploy/kubernetes/submit/Client.scala | 2 +- .../submit/PythonSubmissionResources.scala | 4 +- .../kubernetes/submit/ClientV2Suite.scala | 22 ++---- .../PythonSubmissionResourcesSuite.scala | 69 +++++++------------ 4 files changed, 34 insertions(+), 63 deletions(-) diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/submit/Client.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/submit/Client.scala index 882672a31ad13..c3f4ff922b872 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/submit/Client.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/submit/Client.scala @@ -236,7 +236,7 @@ private[spark] class Client( .endSpec() val driverPodFileMounter = initContainerComponentsProvider.provideDriverPodFileMounter() val resolvedDriverPod = (pythonResource map { - p => p.driverPod( + p => p.driverPodWithPySparkEnvs( driverPodFileMounter, resolvedPrimaryPySparkResource, resolvedPySparkFiles.mkString(","), diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/submit/PythonSubmissionResources.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/submit/PythonSubmissionResources.scala index c41542e2ea48b..f7589a3c2f809 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/submit/PythonSubmissionResources.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/submit/PythonSubmissionResources.scala @@ -24,7 +24,7 @@ private[spark] trait PythonSubmissionResources { def arguments: Array[String] def primaryPySparkResource(containerLocalizedFilesResolver: ContainerLocalizedFilesResolver) : String - def driverPod( + def driverPodWithPySparkEnvs( driverPodFileMounter: DriverPodKubernetesFileMounter, resolvedPrimaryPySparkResource: String, resolvedPySparkFiles: String, @@ -58,7 +58,7 @@ private[spark] class PythonSubmissionResourcesImpl( containerLocalizedFilesResolver: ContainerLocalizedFilesResolver) : String = containerLocalizedFilesResolver.resolvePrimaryResourceFile() - override def driverPod( + override def driverPodWithPySparkEnvs( driverPodFileMounter: DriverPodKubernetesFileMounter, resolvedPrimaryPySparkResource: String, resolvedPySparkFiles: String, diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/kubernetes/submit/ClientV2Suite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/kubernetes/submit/ClientV2Suite.scala index fe04fddc746bd..a58a37691f4eb 100644 --- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/kubernetes/submit/ClientV2Suite.scala +++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/kubernetes/submit/ClientV2Suite.scala @@ -213,18 +213,10 @@ class ClientV2Suite extends SparkFunSuite with BeforeAndAfter { any())).thenAnswer( new Answer[PodBuilder] { override def answer(invocation: InvocationOnMock) : PodBuilder = { invocation.getArgumentAt(3, classOf[PodBuilder]) - .editSpec() - .editMatchingContainer(new ContainerNameEqualityPredicate( - invocation.getArgumentAt(2, classOf[String]))) - .addNewEnv() - .withName("pyspark") - .withValue("true") - .endEnv() - .endContainer() - .endSpec() .editMetadata() .withUid(DRIVER_POD_UID) .withName(s"$APP_RESOURCE_PREFIX-driver") + .addToLabels("pyspark-test", "true") .endMetadata() .withKind(DRIVER_POD_KIND) .withApiVersion(DRIVER_POD_API_VERSION) @@ -510,12 +502,12 @@ class ClientV2Suite extends SparkFunSuite with BeforeAndAfter { } private def containerHasCorrectPySparkEnvs(pod: Pod): Boolean = { - val driverContainer = pod.getSpec.getContainers.asScala.head - val envs = driverContainer.getEnv.asScala.map(env => (env.getName, env.getValue)) - val expectedBasicEnvs = Map( - "pyspark" -> "true" - ) - expectedBasicEnvs.toSet.subsetOf(envs.toSet) + val driverPodLabels = + pod.getMetadata.getLabels.asScala.map(env => (env._1.toString, env._2.toString)) + val expectedBasicLabels = Map( + "pyspark-test" -> "true", + "spark-role" -> "driver") + expectedBasicLabels.toSet.subsetOf(driverPodLabels.toSet) } private def containerHasCorrectBasicContainerConfiguration(pod: Pod): Boolean = { diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/kubernetes/submit/PythonSubmissionResourcesSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/kubernetes/submit/PythonSubmissionResourcesSuite.scala index 5c2305a8c61f7..5483dc154fe73 100644 --- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/kubernetes/submit/PythonSubmissionResourcesSuite.scala +++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/kubernetes/submit/PythonSubmissionResourcesSuite.scala @@ -18,13 +18,17 @@ package org.apache.spark.deploy.kubernetes.submit import org.apache.spark.{SSLOptions, SparkConf, SparkFunSuite} import org.apache.spark.deploy.kubernetes.config._ -import scala.collection.JavaConverters._ +import scala.collection.JavaConverters._ import io.fabric8.kubernetes.api.model.{ContainerBuilder, Pod, PodBuilder} +import org.mockito.{Mock, MockitoAnnotations} +import org.mockito.Mockito.when +import org.scalatest.BeforeAndAfter + -private[spark] class PythonSubmissionResourcesSuite extends SparkFunSuite { +private[spark] class PythonSubmissionResourcesSuite extends SparkFunSuite with BeforeAndAfter { private val PYSPARK_FILES = Seq( "hdfs://localhost:9000/app/files/file1.py", "file:///app/files/file2.py", @@ -45,25 +49,6 @@ private[spark] class PythonSubmissionResourcesSuite extends SparkFunSuite { private val pyResource = new PythonSubmissionResourcesImpl( PYSPARK_PRIMARY_FILE, Array(null, "500") ) - private val SPARK_FILES = Seq.empty[String] - private val SPARK_JARS = Seq.empty[String] - private val JARS_DOWNLOAD_PATH = "/var/data/spark-jars" - private val FILES_DOWNLOAD_PATH = "/var/data/spark-files" - private val localizedFilesResolver = new ContainerLocalizedFilesResolverImpl( - SPARK_JARS, - SPARK_FILES, - PYSPARK_FILES, - PYSPARK_PRIMARY_FILE, - JARS_DOWNLOAD_PATH, - FILES_DOWNLOAD_PATH) - private val lessLocalizedFilesResolver = new ContainerLocalizedFilesResolverImpl( - SPARK_JARS, - SPARK_FILES, - Seq.empty[String], - PYSPARK_PRIMARY_FILE, - JARS_DOWNLOAD_PATH, - FILES_DOWNLOAD_PATH) - private val NAMESPACE = "example_pyspark" private val DRIVER_CONTAINER_NAME = "pyspark_container" private val driverContainer = new ContainerBuilder() .withName(DRIVER_CONTAINER_NAME) @@ -75,33 +60,27 @@ private[spark] class PythonSubmissionResourcesSuite extends SparkFunSuite { .withNewSpec() .addToContainers(driverContainer) .endSpec() - private val driverFileMounter = new DriverInitContainerComponentsProviderImpl( - new SparkConf(true).set(KUBERNETES_NAMESPACE, NAMESPACE), - "kubeResourceName", - "namespace", - SPARK_JARS, - SPARK_FILES, - PYSPARK_PRIMARY_FILE +: PYSPARK_FILES, - SSLOptions() - ).provideDriverPodFileMounter() - private val lessDriverFileMounter = new DriverInitContainerComponentsProviderImpl( - new SparkConf(true).set(KUBERNETES_NAMESPACE, NAMESPACE), - "kubeResourceName", - "namespace", - SPARK_JARS, - SPARK_FILES, - Array(PYSPARK_PRIMARY_FILE), - SSLOptions() - ).provideDriverPodFileMounter() + @Mock + private var driverInitContainer: DriverInitContainerComponentsProviderImpl = _ + @Mock + private var localizedFileResolver: ContainerLocalizedFilesResolverImpl = _ + before { + MockitoAnnotations.initMocks(this) + when(driverInitContainer.provideDriverPodFileMounter()).thenReturn( + new DriverPodKubernetesFileMounterImpl() + ) + when(localizedFileResolver.resolvePrimaryResourceFile()).thenReturn( + RESOLVED_PYSPARK_PRIMARY_FILE) + } test("Test with --py-files included") { assert(pyFilesResource.sparkJars === Seq.empty[String]) assert(pyFilesResource.pySparkFiles === PYSPARK_PRIMARY_FILE +: PYSPARK_FILES) - assert(pyFilesResource.primaryPySparkResource(localizedFilesResolver) === + assert(pyFilesResource.primaryPySparkResource(localizedFileResolver) === RESOLVED_PYSPARK_PRIMARY_FILE) - val driverPod: Pod = pyFilesResource.driverPod( - driverFileMounter, + val driverPod: Pod = pyFilesResource.driverPodWithPySparkEnvs( + driverInitContainer.provideDriverPodFileMounter(), RESOLVED_PYSPARK_PRIMARY_FILE, RESOLVED_PYSPARK_FILES.mkString(","), DRIVER_CONTAINER_NAME, @@ -116,10 +95,10 @@ private[spark] class PythonSubmissionResourcesSuite extends SparkFunSuite { test("Test without --py-files") { assert(pyResource.sparkJars === Seq.empty[String]) assert(pyResource.pySparkFiles === Array(PYSPARK_PRIMARY_FILE)) - assert(pyResource.primaryPySparkResource(lessLocalizedFilesResolver) === + assert(pyResource.primaryPySparkResource(localizedFileResolver) === RESOLVED_PYSPARK_PRIMARY_FILE) - val driverPod: Pod = pyResource.driverPod( - lessDriverFileMounter, + val driverPod: Pod = pyResource.driverPodWithPySparkEnvs( + driverInitContainer.provideDriverPodFileMounter(), RESOLVED_PYSPARK_PRIMARY_FILE, "", DRIVER_CONTAINER_NAME, From 0388aa4637235876364a9eb3447e42da594227e9 Mon Sep 17 00:00:00 2001 From: Ilan Filonenko Date: Wed, 28 Jun 2017 14:47:49 -0700 Subject: [PATCH 18/18] Removed space delimiting for methods --- .../spark/deploy/kubernetes/submit/Client.scala | 16 ++++++++-------- .../submit/PythonSubmissionResources.scala | 2 +- 2 files changed, 9 insertions(+), 9 deletions(-) diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/submit/Client.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/submit/Client.scala index c3f4ff922b872..e61cece142e3d 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/submit/Client.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/submit/Client.scala @@ -184,9 +184,9 @@ private[spark] class Client( val resolvedSparkJars = containerLocalizedFilesResolver.resolveSubmittedSparkJars() val resolvedSparkFiles = containerLocalizedFilesResolver.resolveSubmittedSparkFiles() val resolvedPySparkFiles = containerLocalizedFilesResolver.resolveSubmittedPySparkFiles() - val resolvedPrimaryPySparkResource = (pythonResource map { + val resolvedPrimaryPySparkResource = pythonResource.map { p => p.primaryPySparkResource(containerLocalizedFilesResolver) - }).getOrElse("") + }.getOrElse("") val initContainerBundler = initContainerComponentsProvider .provideInitContainerBundle(maybeSubmittedResourceIdentifiers.map(_.ids()), resolvedSparkJars ++ resolvedSparkFiles) @@ -235,14 +235,14 @@ private[spark] class Client( .endContainer() .endSpec() val driverPodFileMounter = initContainerComponentsProvider.provideDriverPodFileMounter() - val resolvedDriverPod = (pythonResource map { + val resolvedDriverPod = pythonResource.map { p => p.driverPodWithPySparkEnvs( driverPodFileMounter, resolvedPrimaryPySparkResource, resolvedPySparkFiles.mkString(","), driverContainer.getName, resolvedDriverPodBuilder - )}).getOrElse(resolvedDriverPodBuilder.build()) + )}.getOrElse(resolvedDriverPodBuilder.build()) Utils.tryWithResource( kubernetesClient .pods() @@ -301,8 +301,8 @@ private[spark] object Client { Option(new PythonSubmissionResourcesImpl(mainAppResource, appArgs)) } else None // Since you might need jars for SQL UDFs in PySpark - def sparkJarFilter() : Seq[String] = (pythonResource map { - p => p.sparkJars}).getOrElse( + def sparkJarFilter() : Seq[String] = + pythonResource.map { p => p.sparkJars}.getOrElse( Option(mainAppResource) .filterNot(_ == SparkLauncher.NO_RESOURCE) .toSeq) @@ -313,10 +313,10 @@ private[spark] object Client { val sparkFiles = sparkConf.getOption("spark.files") .map(_.split(",")) .getOrElse(Array.empty[String]) - val pySparkFilesOption = pythonResource map {p => p.pySparkFiles} + val pySparkFilesOption = pythonResource.map {p => p.pySparkFiles} validateNoDuplicateFileNames(sparkJars) validateNoDuplicateFileNames(sparkFiles) - pySparkFilesOption foreach {b => validateNoDuplicateFileNames(b)} + pySparkFilesOption.foreach {b => validateNoDuplicateFileNames(b)} val pySparkFiles = pySparkFilesOption.getOrElse(Array.empty[String]) val appName = sparkConf.getOption("spark.app.name").getOrElse("spark") // The resource name prefix is derived from the application name, making it easy to connect the diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/submit/PythonSubmissionResources.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/submit/PythonSubmissionResources.scala index f7589a3c2f809..e615cf72116e1 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/submit/PythonSubmissionResources.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/submit/PythonSubmissionResources.scala @@ -37,7 +37,7 @@ private[spark] class PythonSubmissionResourcesImpl( private val appArgs: Array[String] ) extends PythonSubmissionResources { private val pyFiles: Array[String] = { - (Option(appArgs(0)) map (a => mainAppResource +: a.split(","))) + Option(appArgs(0)).map(a => mainAppResource +: a.split(",")) .getOrElse(Array(mainAppResource)) }