Skip to content

Commit

Permalink
[SPARK-26742][K8S][BRANCH-2.4] Update k8s client version to 4.1.2
Browse files Browse the repository at this point in the history
## What changes were proposed in this pull request?

Updates client version and fixes some related issues.

## How was this patch tested?

Tested with the latest minikube version and k8s 1.13.
KubernetesSuite:
- Run SparkPi with no resources
- Run SparkPi with a very long application name.
- Use SparkLauncher.NO_RESOURCE
- Run SparkPi with a master URL without a scheme.
- Run SparkPi with an argument.
- Run SparkPi with custom labels, annotations, and environment variables.
- Run extraJVMOptions check on driver
- Run SparkRemoteFileTest using a remote data file
- Run SparkPi with env and mount secrets.
- Run PySpark on simple pi.py example
- Run PySpark with Python2 to test a pyfiles example
- Run PySpark with Python3 to test a pyfiles example
- Run PySpark with memory customization
- Run in client mode.
Run completed in 4 minutes, 20 seconds.
Total number of tests run: 14
Suites: completed 2, aborted 0
Tests: succeeded 14, failed 0, canceled 0, ignored 0, pending 0
All tests passed.
[INFO] ------------------------------------------------------------------------
[INFO] Reactor Summary:
[INFO]
[INFO] Spark Project Parent POM 2.4.2-SNAPSHOT ............ SUCCESS [  2.980 s]
[INFO] Spark Project Tags ................................. SUCCESS [  2.880 s]
[INFO] Spark Project Local DB ............................. SUCCESS [  1.954 s]
[INFO] Spark Project Networking ........................... SUCCESS [  3.369 s]
[INFO] Spark Project Shuffle Streaming Service ............ SUCCESS [  1.791 s]
[INFO] Spark Project Unsafe ............................... SUCCESS [  1.845 s]
[INFO] Spark Project Launcher ............................. SUCCESS [  3.725 s]
[INFO] Spark Project Core ................................. SUCCESS [ 23.572 s]
[INFO] Spark Project Kubernetes Integration Tests 2.4.2-SNAPSHOT SUCCESS [04:25 min]
[INFO] ------------------------------------------------------------------------
[INFO] BUILD SUCCESS
[INFO] ------------------------------------------------------------------------
[INFO] Total time: 05:08 min
[INFO] Finished at: 2019-03-06T18:03:55Z
[INFO] ------------------------------------------------------------------------

Closes apache#23993 from skonto/fix-k8s-version.

Authored-by: Stavros Kontopoulos <[email protected]>
Signed-off-by: Marcelo Vanzin <[email protected]>
  • Loading branch information
Stavros Kontopoulos authored and kai-chi committed Jul 25, 2019
1 parent b4f8e88 commit 612e482
Show file tree
Hide file tree
Showing 9 changed files with 82 additions and 26 deletions.
7 changes: 4 additions & 3 deletions dev/deps/spark-deps-hadoop-2.6
Original file line number Diff line number Diff line change
Expand Up @@ -131,13 +131,14 @@ jta-1.1.jar
jtransforms-2.4.0.jar
jul-to-slf4j-1.7.16.jar
kryo-shaded-4.0.2.jar
kubernetes-client-3.0.0.jar
kubernetes-model-2.0.0.jar
kubernetes-client-4.1.2.jar
kubernetes-model-4.1.2.jar
kubernetes-model-common-4.1.2.jar
leveldbjni-all-1.8.jar
libfb303-0.9.3.jar
libthrift-0.9.3.jar
log4j-1.2.17.jar
logging-interceptor-3.8.1.jar
logging-interceptor-3.12.0.jar
lz4-java-1.4.0.jar
machinist_2.11-0.6.1.jar
macro-compat_2.11-1.1.1.jar
Expand Down
7 changes: 4 additions & 3 deletions dev/deps/spark-deps-hadoop-2.7
Original file line number Diff line number Diff line change
Expand Up @@ -132,13 +132,14 @@ jta-1.1.jar
jtransforms-2.4.0.jar
jul-to-slf4j-1.7.16.jar
kryo-shaded-4.0.2.jar
kubernetes-client-3.0.0.jar
kubernetes-model-2.0.0.jar
kubernetes-client-4.1.2.jar
kubernetes-model-4.1.2.jar
kubernetes-model-common-4.1.2.jar
leveldbjni-all-1.8.jar
libfb303-0.9.3.jar
libthrift-0.9.3.jar
log4j-1.2.17.jar
logging-interceptor-3.8.1.jar
logging-interceptor-3.12.0.jar
lz4-java-1.4.0.jar
machinist_2.11-0.6.1.jar
macro-compat_2.11-1.1.1.jar
Expand Down
7 changes: 4 additions & 3 deletions dev/deps/spark-deps-hadoop-3.1
Original file line number Diff line number Diff line change
Expand Up @@ -147,13 +147,14 @@ kerby-pkix-1.0.1.jar
kerby-util-1.0.1.jar
kerby-xdr-1.0.1.jar
kryo-shaded-4.0.2.jar
kubernetes-client-3.0.0.jar
kubernetes-model-2.0.0.jar
kubernetes-client-4.1.2.jar
kubernetes-model-4.1.2.jar
kubernetes-model-common-4.1.2.jar
leveldbjni-all-1.8.jar
libfb303-0.9.3.jar
libthrift-0.9.3.jar
log4j-1.2.17.jar
logging-interceptor-3.8.1.jar
logging-interceptor-3.12.0.jar
lz4-java-1.4.0.jar
machinist_2.11-0.6.1.jar
macro-compat_2.11-1.1.1.jar
Expand Down
2 changes: 1 addition & 1 deletion resource-managers/kubernetes/core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
<name>Spark Project Kubernetes</name>
<properties>
<sbt.project.name>kubernetes</sbt.project.name>
<kubernetes.client.version>3.0.0</kubernetes.client.version>
<kubernetes.client.version>4.1.2</kubernetes.client.version>
</properties>

<dependencies>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,9 @@ private[spark] class MountVolumesFeatureStep(
val volumeBuilder = spec.volumeConf match {
case KubernetesHostPathVolumeConf(hostPath) =>
new VolumeBuilder()
.withHostPath(new HostPathVolumeSource(hostPath))
.withHostPath(new HostPathVolumeSourceBuilder()
.withPath(hostPath)
.build())

case KubernetesPVCVolumeConf(claimName) =>
new VolumeBuilder()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ import java.util.concurrent.{CountDownLatch, TimeUnit}

import scala.collection.JavaConverters._

import io.fabric8.kubernetes.api.model.{ContainerStateRunning, ContainerStateTerminated, ContainerStateWaiting, ContainerStatus, Pod, Time}
import io.fabric8.kubernetes.api.model.{ContainerStateRunning, ContainerStateTerminated, ContainerStateWaiting, ContainerStatus, Pod}
import io.fabric8.kubernetes.client.{KubernetesClientException, Watcher}
import io.fabric8.kubernetes.client.Watcher.Action

Expand Down Expand Up @@ -174,7 +174,7 @@ private[k8s] class LoggingPodStatusWatcherImpl(
}.getOrElse(Seq(("Container state", "N/A")))
}

private def formatTime(time: Time): String = {
if (time != null) time.getTime else "N/A"
private def formatTime(time: String): String = {
if (time != null || time != "") time else "N/A"
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ object ExecutorLifecycleTestUtils {
def deletedExecutor(executorId: Long): Pod = {
new PodBuilder(podWithAttachedContainerForId(executorId))
.editOrNewMetadata()
.withNewDeletionTimestamp("523012521")
.withDeletionTimestamp("523012521")
.endMetadata()
.build()
}
Expand Down
2 changes: 1 addition & 1 deletion resource-managers/kubernetes/integration-tests/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
<download-maven-plugin.version>1.3.0</download-maven-plugin.version>
<exec-maven-plugin.version>1.4.0</exec-maven-plugin.version>
<extraScalaTestArgs></extraScalaTestArgs>
<kubernetes-client.version>3.0.0</kubernetes-client.version>
<kubernetes-client.version>4.1.2</kubernetes-client.version>
<scala-maven-plugin.version>3.2.2</scala-maven-plugin.version>
<scalatest-maven-plugin.version>1.0</scalatest-maven-plugin.version>
<sbt.project.name>kubernetes-integration-tests</sbt.project.name>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
*/
package org.apache.spark.deploy.k8s.integrationtest.backend.minikube

import java.io.File
import java.nio.file.Paths

import io.fabric8.kubernetes.client.{ConfigBuilder, DefaultKubernetesClient}
Expand All @@ -26,8 +25,18 @@ import org.apache.spark.internal.Logging

// TODO support windows
private[spark] object Minikube extends Logging {

private val MINIKUBE_STARTUP_TIMEOUT_SECONDS = 60
private val HOST_PREFIX = "host:"
private val KUBELET_PREFIX = "kubelet:"
private val APISERVER_PREFIX = "apiserver:"
private val KUBECTL_PREFIX = "kubectl:"
private val MINIKUBE_VM_PREFIX = "minikubeVM: "
private val MINIKUBE_PREFIX = "minikube: "
private val MINIKUBE_PATH = ".minikube"

def logVersion(): Unit = {
logInfo(executeMinikube("version").mkString("\n"))
}

def getMinikubeIp: String = {
val outputs = executeMinikube("ip")
Expand All @@ -38,12 +47,21 @@ private[spark] object Minikube extends Logging {

def getMinikubeStatus: MinikubeStatus.Value = {
val statusString = executeMinikube("status")
.filter(line => line.contains("minikubeVM: ") || line.contains("minikube:"))
.head
.replaceFirst("minikubeVM: ", "")
.replaceFirst("minikube: ", "")
MinikubeStatus.unapply(statusString)
logInfo(s"Minikube status command output:\n$statusString")
// up to minikube version v0.30.0 use this to check for minikube status
val oldMinikube = statusString
.filter(line => line.contains(MINIKUBE_VM_PREFIX) || line.contains(MINIKUBE_PREFIX))

if (oldMinikube.isEmpty) {
getIfNewMinikubeStatus(statusString)
} else {
val finalStatusString = oldMinikube
.head
.replaceFirst(MINIKUBE_VM_PREFIX, "")
.replaceFirst(MINIKUBE_PREFIX, "")
MinikubeStatus.unapply(finalStatusString)
.getOrElse(throw new IllegalStateException(s"Unknown status $statusString"))
}
}

def getKubernetesClient: DefaultKubernetesClient = {
Expand All @@ -52,13 +70,46 @@ private[spark] object Minikube extends Logging {
val kubernetesConf = new ConfigBuilder()
.withApiVersion("v1")
.withMasterUrl(kubernetesMaster)
.withCaCertFile(Paths.get(userHome, ".minikube", "ca.crt").toFile.getAbsolutePath)
.withClientCertFile(Paths.get(userHome, ".minikube", "apiserver.crt").toFile.getAbsolutePath)
.withClientKeyFile(Paths.get(userHome, ".minikube", "apiserver.key").toFile.getAbsolutePath)
.withCaCertFile(
Paths.get(userHome, MINIKUBE_PATH, "ca.crt").toFile.getAbsolutePath)
.withClientCertFile(
Paths.get(userHome, MINIKUBE_PATH, "apiserver.crt").toFile.getAbsolutePath)
.withClientKeyFile(
Paths.get(userHome, MINIKUBE_PATH, "apiserver.key").toFile.getAbsolutePath)
.build()
new DefaultKubernetesClient(kubernetesConf)
}

// Covers minikube status output after Minikube V0.30.
private def getIfNewMinikubeStatus(statusString: Seq[String]): MinikubeStatus.Value = {
val hostString = statusString.find(_.contains(s"$HOST_PREFIX "))
val kubeletString = statusString.find(_.contains(s"$KUBELET_PREFIX "))
val apiserverString = statusString.find(_.contains(s"$APISERVER_PREFIX "))
val kubectlString = statusString.find(_.contains(s"$KUBECTL_PREFIX "))

if (hostString.isEmpty || kubeletString.isEmpty
|| apiserverString.isEmpty || kubectlString.isEmpty) {
MinikubeStatus.NONE
} else {
val status1 = hostString.get.replaceFirst(s"$HOST_PREFIX ", "")
val status2 = kubeletString.get.replaceFirst(s"$KUBELET_PREFIX ", "")
val status3 = apiserverString.get.replaceFirst(s"$APISERVER_PREFIX ", "")
val status4 = kubectlString.get.replaceFirst(s"$KUBECTL_PREFIX ", "")
if (!status4.contains("Correctly Configured:")) {
MinikubeStatus.NONE
} else {
val stats = List(status1, status2, status3)
.map(MinikubeStatus.unapply)
.map(_.getOrElse(throw new IllegalStateException(s"Unknown status $statusString")))
if (stats.exists(_ != MinikubeStatus.RUNNING)) {
MinikubeStatus.NONE
} else {
MinikubeStatus.RUNNING
}
}
}
}

private def executeMinikube(action: String, args: String*): Seq[String] = {
ProcessUtils.executeProcess(
Array("bash", "-c", s"minikube $action") ++ args, MINIKUBE_STARTUP_TIMEOUT_SECONDS)
Expand Down

0 comments on commit 612e482

Please sign in to comment.