From 612e4826f2db668c23ff1b07a1c1f56a7bcf46de Mon Sep 17 00:00:00 2001 From: Stavros Kontopoulos Date: Thu, 14 Mar 2019 09:29:52 -0700 Subject: [PATCH] [SPARK-26742][K8S][BRANCH-2.4] Update k8s client version to 4.1.2 ## What changes were proposed in this pull request? Updates client version and fixes some related issues. ## How was this patch tested? Tested with the latest minikube version and k8s 1.13. KubernetesSuite: - Run SparkPi with no resources - Run SparkPi with a very long application name. - Use SparkLauncher.NO_RESOURCE - Run SparkPi with a master URL without a scheme. - Run SparkPi with an argument. - Run SparkPi with custom labels, annotations, and environment variables. - Run extraJVMOptions check on driver - Run SparkRemoteFileTest using a remote data file - Run SparkPi with env and mount secrets. - Run PySpark on simple pi.py example - Run PySpark with Python2 to test a pyfiles example - Run PySpark with Python3 to test a pyfiles example - Run PySpark with memory customization - Run in client mode. Run completed in 4 minutes, 20 seconds. Total number of tests run: 14 Suites: completed 2, aborted 0 Tests: succeeded 14, failed 0, canceled 0, ignored 0, pending 0 All tests passed. [INFO] ------------------------------------------------------------------------ [INFO] Reactor Summary: [INFO] [INFO] Spark Project Parent POM 2.4.2-SNAPSHOT ............ SUCCESS [ 2.980 s] [INFO] Spark Project Tags ................................. SUCCESS [ 2.880 s] [INFO] Spark Project Local DB ............................. SUCCESS [ 1.954 s] [INFO] Spark Project Networking ........................... SUCCESS [ 3.369 s] [INFO] Spark Project Shuffle Streaming Service ............ SUCCESS [ 1.791 s] [INFO] Spark Project Unsafe ............................... SUCCESS [ 1.845 s] [INFO] Spark Project Launcher ............................. SUCCESS [ 3.725 s] [INFO] Spark Project Core ................................. SUCCESS [ 23.572 s] [INFO] Spark Project Kubernetes Integration Tests 2.4.2-SNAPSHOT SUCCESS [04:25 min] [INFO] ------------------------------------------------------------------------ [INFO] BUILD SUCCESS [INFO] ------------------------------------------------------------------------ [INFO] Total time: 05:08 min [INFO] Finished at: 2019-03-06T18:03:55Z [INFO] ------------------------------------------------------------------------ Closes #23993 from skonto/fix-k8s-version. Authored-by: Stavros Kontopoulos Signed-off-by: Marcelo Vanzin --- dev/deps/spark-deps-hadoop-2.6 | 7 +- dev/deps/spark-deps-hadoop-2.7 | 7 +- dev/deps/spark-deps-hadoop-3.1 | 7 +- resource-managers/kubernetes/core/pom.xml | 2 +- .../features/MountVolumesFeatureStep.scala | 4 +- .../k8s/submit/LoggingPodStatusWatcher.scala | 6 +- .../k8s/ExecutorLifecycleTestUtils.scala | 2 +- .../kubernetes/integration-tests/pom.xml | 2 +- .../backend/minikube/Minikube.scala | 71 ++++++++++++++++--- 9 files changed, 82 insertions(+), 26 deletions(-) diff --git a/dev/deps/spark-deps-hadoop-2.6 b/dev/deps/spark-deps-hadoop-2.6 index 307040ea4f0b8..0e34af70a4acf 100644 --- a/dev/deps/spark-deps-hadoop-2.6 +++ b/dev/deps/spark-deps-hadoop-2.6 @@ -131,13 +131,14 @@ jta-1.1.jar jtransforms-2.4.0.jar jul-to-slf4j-1.7.16.jar kryo-shaded-4.0.2.jar -kubernetes-client-3.0.0.jar -kubernetes-model-2.0.0.jar +kubernetes-client-4.1.2.jar +kubernetes-model-4.1.2.jar +kubernetes-model-common-4.1.2.jar leveldbjni-all-1.8.jar libfb303-0.9.3.jar libthrift-0.9.3.jar log4j-1.2.17.jar -logging-interceptor-3.8.1.jar +logging-interceptor-3.12.0.jar lz4-java-1.4.0.jar machinist_2.11-0.6.1.jar macro-compat_2.11-1.1.1.jar diff --git a/dev/deps/spark-deps-hadoop-2.7 b/dev/deps/spark-deps-hadoop-2.7 index 4a6ad3f6b32db..6b165a410ecd7 100644 --- a/dev/deps/spark-deps-hadoop-2.7 +++ b/dev/deps/spark-deps-hadoop-2.7 @@ -132,13 +132,14 @@ jta-1.1.jar jtransforms-2.4.0.jar jul-to-slf4j-1.7.16.jar kryo-shaded-4.0.2.jar -kubernetes-client-3.0.0.jar -kubernetes-model-2.0.0.jar +kubernetes-client-4.1.2.jar +kubernetes-model-4.1.2.jar +kubernetes-model-common-4.1.2.jar leveldbjni-all-1.8.jar libfb303-0.9.3.jar libthrift-0.9.3.jar log4j-1.2.17.jar -logging-interceptor-3.8.1.jar +logging-interceptor-3.12.0.jar lz4-java-1.4.0.jar machinist_2.11-0.6.1.jar macro-compat_2.11-1.1.1.jar diff --git a/dev/deps/spark-deps-hadoop-3.1 b/dev/deps/spark-deps-hadoop-3.1 index 83e243b7a01ea..1ee39023c522b 100644 --- a/dev/deps/spark-deps-hadoop-3.1 +++ b/dev/deps/spark-deps-hadoop-3.1 @@ -147,13 +147,14 @@ kerby-pkix-1.0.1.jar kerby-util-1.0.1.jar kerby-xdr-1.0.1.jar kryo-shaded-4.0.2.jar -kubernetes-client-3.0.0.jar -kubernetes-model-2.0.0.jar +kubernetes-client-4.1.2.jar +kubernetes-model-4.1.2.jar +kubernetes-model-common-4.1.2.jar leveldbjni-all-1.8.jar libfb303-0.9.3.jar libthrift-0.9.3.jar log4j-1.2.17.jar -logging-interceptor-3.8.1.jar +logging-interceptor-3.12.0.jar lz4-java-1.4.0.jar machinist_2.11-0.6.1.jar macro-compat_2.11-1.1.1.jar diff --git a/resource-managers/kubernetes/core/pom.xml b/resource-managers/kubernetes/core/pom.xml index 788e706073aa1..d534183cb6d95 100644 --- a/resource-managers/kubernetes/core/pom.xml +++ b/resource-managers/kubernetes/core/pom.xml @@ -29,7 +29,7 @@ Spark Project Kubernetes kubernetes - 3.0.0 + 4.1.2 diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/MountVolumesFeatureStep.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/MountVolumesFeatureStep.scala index bb0e2b3128efd..026b7eb774c15 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/MountVolumesFeatureStep.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/MountVolumesFeatureStep.scala @@ -57,7 +57,9 @@ private[spark] class MountVolumesFeatureStep( val volumeBuilder = spec.volumeConf match { case KubernetesHostPathVolumeConf(hostPath) => new VolumeBuilder() - .withHostPath(new HostPathVolumeSource(hostPath)) + .withHostPath(new HostPathVolumeSourceBuilder() + .withPath(hostPath) + .build()) case KubernetesPVCVolumeConf(claimName) => new VolumeBuilder() diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/LoggingPodStatusWatcher.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/LoggingPodStatusWatcher.scala index 173ac541626a7..4a7d3d42d23db 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/LoggingPodStatusWatcher.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/LoggingPodStatusWatcher.scala @@ -20,7 +20,7 @@ import java.util.concurrent.{CountDownLatch, TimeUnit} import scala.collection.JavaConverters._ -import io.fabric8.kubernetes.api.model.{ContainerStateRunning, ContainerStateTerminated, ContainerStateWaiting, ContainerStatus, Pod, Time} +import io.fabric8.kubernetes.api.model.{ContainerStateRunning, ContainerStateTerminated, ContainerStateWaiting, ContainerStatus, Pod} import io.fabric8.kubernetes.client.{KubernetesClientException, Watcher} import io.fabric8.kubernetes.client.Watcher.Action @@ -174,7 +174,7 @@ private[k8s] class LoggingPodStatusWatcherImpl( }.getOrElse(Seq(("Container state", "N/A"))) } - private def formatTime(time: Time): String = { - if (time != null) time.getTime else "N/A" + private def formatTime(time: String): String = { + if (time != null || time != "") time else "N/A" } } diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorLifecycleTestUtils.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorLifecycleTestUtils.scala index c6b667ed85e8c..2e883623a4b1c 100644 --- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorLifecycleTestUtils.scala +++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorLifecycleTestUtils.scala @@ -82,7 +82,7 @@ object ExecutorLifecycleTestUtils { def deletedExecutor(executorId: Long): Pod = { new PodBuilder(podWithAttachedContainerForId(executorId)) .editOrNewMetadata() - .withNewDeletionTimestamp("523012521") + .withDeletionTimestamp("523012521") .endMetadata() .build() } diff --git a/resource-managers/kubernetes/integration-tests/pom.xml b/resource-managers/kubernetes/integration-tests/pom.xml index 47d15afeb74b3..cd02b68d0ff42 100644 --- a/resource-managers/kubernetes/integration-tests/pom.xml +++ b/resource-managers/kubernetes/integration-tests/pom.xml @@ -29,7 +29,7 @@ 1.3.0 1.4.0 - 3.0.0 + 4.1.2 3.2.2 1.0 kubernetes-integration-tests diff --git a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/backend/minikube/Minikube.scala b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/backend/minikube/Minikube.scala index 6494cbc18f33e..78ef44be7fb78 100644 --- a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/backend/minikube/Minikube.scala +++ b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/backend/minikube/Minikube.scala @@ -16,7 +16,6 @@ */ package org.apache.spark.deploy.k8s.integrationtest.backend.minikube -import java.io.File import java.nio.file.Paths import io.fabric8.kubernetes.client.{ConfigBuilder, DefaultKubernetesClient} @@ -26,8 +25,18 @@ import org.apache.spark.internal.Logging // TODO support windows private[spark] object Minikube extends Logging { - private val MINIKUBE_STARTUP_TIMEOUT_SECONDS = 60 + private val HOST_PREFIX = "host:" + private val KUBELET_PREFIX = "kubelet:" + private val APISERVER_PREFIX = "apiserver:" + private val KUBECTL_PREFIX = "kubectl:" + private val MINIKUBE_VM_PREFIX = "minikubeVM: " + private val MINIKUBE_PREFIX = "minikube: " + private val MINIKUBE_PATH = ".minikube" + + def logVersion(): Unit = { + logInfo(executeMinikube("version").mkString("\n")) + } def getMinikubeIp: String = { val outputs = executeMinikube("ip") @@ -38,12 +47,21 @@ private[spark] object Minikube extends Logging { def getMinikubeStatus: MinikubeStatus.Value = { val statusString = executeMinikube("status") - .filter(line => line.contains("minikubeVM: ") || line.contains("minikube:")) - .head - .replaceFirst("minikubeVM: ", "") - .replaceFirst("minikube: ", "") - MinikubeStatus.unapply(statusString) + logInfo(s"Minikube status command output:\n$statusString") + // up to minikube version v0.30.0 use this to check for minikube status + val oldMinikube = statusString + .filter(line => line.contains(MINIKUBE_VM_PREFIX) || line.contains(MINIKUBE_PREFIX)) + + if (oldMinikube.isEmpty) { + getIfNewMinikubeStatus(statusString) + } else { + val finalStatusString = oldMinikube + .head + .replaceFirst(MINIKUBE_VM_PREFIX, "") + .replaceFirst(MINIKUBE_PREFIX, "") + MinikubeStatus.unapply(finalStatusString) .getOrElse(throw new IllegalStateException(s"Unknown status $statusString")) + } } def getKubernetesClient: DefaultKubernetesClient = { @@ -52,13 +70,46 @@ private[spark] object Minikube extends Logging { val kubernetesConf = new ConfigBuilder() .withApiVersion("v1") .withMasterUrl(kubernetesMaster) - .withCaCertFile(Paths.get(userHome, ".minikube", "ca.crt").toFile.getAbsolutePath) - .withClientCertFile(Paths.get(userHome, ".minikube", "apiserver.crt").toFile.getAbsolutePath) - .withClientKeyFile(Paths.get(userHome, ".minikube", "apiserver.key").toFile.getAbsolutePath) + .withCaCertFile( + Paths.get(userHome, MINIKUBE_PATH, "ca.crt").toFile.getAbsolutePath) + .withClientCertFile( + Paths.get(userHome, MINIKUBE_PATH, "apiserver.crt").toFile.getAbsolutePath) + .withClientKeyFile( + Paths.get(userHome, MINIKUBE_PATH, "apiserver.key").toFile.getAbsolutePath) .build() new DefaultKubernetesClient(kubernetesConf) } + // Covers minikube status output after Minikube V0.30. + private def getIfNewMinikubeStatus(statusString: Seq[String]): MinikubeStatus.Value = { + val hostString = statusString.find(_.contains(s"$HOST_PREFIX ")) + val kubeletString = statusString.find(_.contains(s"$KUBELET_PREFIX ")) + val apiserverString = statusString.find(_.contains(s"$APISERVER_PREFIX ")) + val kubectlString = statusString.find(_.contains(s"$KUBECTL_PREFIX ")) + + if (hostString.isEmpty || kubeletString.isEmpty + || apiserverString.isEmpty || kubectlString.isEmpty) { + MinikubeStatus.NONE + } else { + val status1 = hostString.get.replaceFirst(s"$HOST_PREFIX ", "") + val status2 = kubeletString.get.replaceFirst(s"$KUBELET_PREFIX ", "") + val status3 = apiserverString.get.replaceFirst(s"$APISERVER_PREFIX ", "") + val status4 = kubectlString.get.replaceFirst(s"$KUBECTL_PREFIX ", "") + if (!status4.contains("Correctly Configured:")) { + MinikubeStatus.NONE + } else { + val stats = List(status1, status2, status3) + .map(MinikubeStatus.unapply) + .map(_.getOrElse(throw new IllegalStateException(s"Unknown status $statusString"))) + if (stats.exists(_ != MinikubeStatus.RUNNING)) { + MinikubeStatus.NONE + } else { + MinikubeStatus.RUNNING + } + } + } + } + private def executeMinikube(action: String, args: String*): Seq[String] = { ProcessUtils.executeProcess( Array("bash", "-c", s"minikube $action") ++ args, MINIKUBE_STARTUP_TIMEOUT_SECONDS)