diff --git a/common/scala/build.gradle b/common/scala/build.gradle index c4ae7639605..743d36c853b 100644 --- a/common/scala/build.gradle +++ b/common/scala/build.gradle @@ -60,7 +60,7 @@ dependencies { } compile 'com.github.ben-manes.caffeine:caffeine:2.4.0' compile 'com.google.code.findbugs:jsr305:3.0.2' - compile 'io.fabric8:kubernetes-client:2.5.7' + compile 'io.fabric8:kubernetes-client:4.0.3' compile 'io.kamon:kamon-core_2.11:0.6.7' compile 'io.kamon:kamon-statsd_2.11:0.6.7' //for mesos diff --git a/core/invoker/Dockerfile b/core/invoker/Dockerfile index d1c5d8cc519..268f24a5008 100644 --- a/core/invoker/Dockerfile +++ b/core/invoker/Dockerfile @@ -4,7 +4,6 @@ FROM scala ENV DOCKER_VERSION 1.12.0 -ENV KUBERNETES_VERSION 1.6.4 RUN apk add --update openssl @@ -17,11 +16,6 @@ rm -f docker-${DOCKER_VERSION}.tgz && \ chmod +x /usr/bin/docker && \ chmod +x /usr/bin/docker-runc -# Install kubernetes client -RUN wget --no-verbose https://storage.googleapis.com/kubernetes-release/release/v${KUBERNETES_VERSION}/bin/linux/amd64/kubectl && \ -chmod +x kubectl && \ -mv kubectl /usr/bin/kubectl - ADD build/distributions/invoker.tar ./ COPY init.sh / diff --git a/core/invoker/src/main/resources/application.conf b/core/invoker/src/main/resources/application.conf index c471d1b7264..244220afb12 100644 --- a/core/invoker/src/main/resources/application.conf +++ b/core/invoker/src/main/resources/application.conf @@ -46,14 +46,17 @@ whisk { # Timeouts for k8s commands. Set to "Inf" to disable timeout. timeouts { run: 1 minute - rm: 1 minute - inspect: 1 minute logs: 1 minute } invoker-agent { enabled: false port: 3233 } + user-pod-node-affinity { + enabled: true + key: "openwhisk-role" + value: "invoker" + } } # Timeouts for runc commands. Set to "Inf" to disable timeout. diff --git a/core/invoker/src/main/scala/whisk/core/containerpool/kubernetes/KubernetesClient.scala b/core/invoker/src/main/scala/whisk/core/containerpool/kubernetes/KubernetesClient.scala index 9a19571ea64..c6e5233062c 100644 --- a/core/invoker/src/main/scala/whisk/core/containerpool/kubernetes/KubernetesClient.scala +++ b/core/invoker/src/main/scala/whisk/core/containerpool/kubernetes/KubernetesClient.scala @@ -17,15 +17,12 @@ package whisk.core.containerpool.kubernetes -import java.io.{FileNotFoundException, IOException} +import java.io.IOException import java.net.SocketTimeoutException -import java.nio.file.Files -import java.nio.file.Paths import java.time.{Instant, ZoneId} import java.time.format.DateTimeFormatterBuilder import akka.actor.ActorSystem -import akka.event.Logging.{ErrorLevel, InfoLevel} import akka.http.scaladsl.model.Uri import akka.http.scaladsl.model.Uri.Path import akka.http.scaladsl.model.Uri.Query @@ -37,7 +34,6 @@ import akka.util.ByteString import io.fabric8.kubernetes.api.model._ import pureconfig.loadConfigOrThrow import whisk.common.Logging -import whisk.common.LoggingMarkers import whisk.common.TransactionId import whisk.core.ConfigKeys import whisk.core.containerpool.ContainerId @@ -68,17 +64,27 @@ import scala.util.control.NonFatal /** * Configuration for kubernetes client command timeouts. */ -case class KubernetesClientTimeoutConfig(run: Duration, rm: Duration, inspect: Duration, logs: Duration) +case class KubernetesClientTimeoutConfig(run: Duration, logs: Duration) /** * Configuration for kubernetes invoker-agent */ case class KubernetesInvokerAgentConfig(enabled: Boolean, port: Int) +/** + * Configuration for node affinity for the pods that execute user action containers + * The key,value pair should match the pair with which the invoker worker nodes + * are labeled in the Kubernetes cluster. The default pair is , + * but a deployment may override this default if needed. + */ +case class KubernetesInvokerNodeAffinity(enabled: Boolean, key: String, value: String) + /** * General configuration for kubernetes client */ -case class KubernetesClientConfig(timeouts: KubernetesClientTimeoutConfig, invokerAgent: KubernetesInvokerAgentConfig) +case class KubernetesClientConfig(timeouts: KubernetesClientTimeoutConfig, + invokerAgent: KubernetesInvokerAgentConfig, + userPodNodeAffinity: KubernetesInvokerNodeAffinity) /** * Serves as an interface to the Kubernetes API by proxying its REST API and/or invoking the kubectl CLI. @@ -101,19 +107,6 @@ class KubernetesClient( .withRequestTimeout(config.timeouts.logs.toMillis.toInt) .build()) - // Determines how to run kubectl. Failure to find a kubectl binary implies - // a failure to initialize this instance of KubernetesClient. - protected def findKubectlCmd(): String = { - val alternatives = List("/usr/bin/kubectl", "/usr/local/bin/kubectl") - val kubectlBin = Try { - alternatives.find(a => Files.isExecutable(Paths.get(a))).get - } getOrElse { - throw new FileNotFoundException(s"Couldn't locate kubectl binary (tried: ${alternatives.mkString(", ")}).") - } - kubectlBin - } - protected val kubectlCmd = Seq(findKubectlCmd) - def run(name: String, image: String, memory: ByteSize = 256.MB, @@ -124,7 +117,7 @@ class KubernetesClient( case (key, value) => new EnvVarBuilder().withName(key).withValue(value).build() }.toSeq - val pod = new PodBuilder() + val podBuilder = new PodBuilder() .withNewMetadata() .withName(name) .addToLabels("name", name) @@ -132,6 +125,23 @@ class KubernetesClient( .endMetadata() .withNewSpec() .withRestartPolicy("Always") + if (config.userPodNodeAffinity.enabled) { + val invokerNodeAffinity = new AffinityBuilder() + .withNewNodeAffinity() + .withNewRequiredDuringSchedulingIgnoredDuringExecution() + .addNewNodeSelectorTerm() + .addNewMatchExpression() + .withKey(config.userPodNodeAffinity.key) + .withOperator("In") + .withValues(config.userPodNodeAffinity.value) + .endMatchExpression() + .endNodeSelectorTerm() + .endRequiredDuringSchedulingIgnoredDuringExecution() + .endNodeAffinity() + .build() + podBuilder.withAffinity(invokerNodeAffinity) + } + val pod = podBuilder .addNewContainer() .withNewResources() .withLimits(Map("memory" -> new Quantity(memory.toMB + "Mi")).asJava) @@ -166,11 +176,27 @@ class KubernetesClient( } def rm(container: KubernetesContainer)(implicit transid: TransactionId): Future[Unit] = { - runCmd(Seq("delete", "--now", "pod", container.id.asString), config.timeouts.rm).map(_ => ()) + Future { + blocking { + kubeRestClient + .inNamespace(kubeRestClient.getNamespace) + .pods() + .withName(container.id.asString) + .delete() + } + }.map(_ => ()) } def rm(key: String, value: String, ensureUnpaused: Boolean = false)(implicit transid: TransactionId): Future[Unit] = { - runCmd(Seq("delete", "--now", "pod", "-l", s"$key=$value"), config.timeouts.rm).map(_ => ()) + Future { + blocking { + kubeRestClient + .inNamespace(kubeRestClient.getNamespace) + .pods() + .withLabel(key, value) + .delete() + } + }.map(_ => ()) } // suspend is a no-op with the basic KubernetesClient @@ -200,19 +226,6 @@ class KubernetesClient( implicit val kubernetes = this new KubernetesContainer(id, addr, workerIP, nativeContainerId) } - - protected def runCmd(args: Seq[String], timeout: Duration)(implicit transid: TransactionId): Future[String] = { - val cmd = kubectlCmd ++ args - val start = transid.started( - this, - LoggingMarkers.INVOKER_KUBECTL_CMD(args.head), - s"running ${cmd.mkString(" ")} (timeout: $timeout)", - logLevel = InfoLevel) - executeProcess(cmd, timeout).andThen { - case Success(_) => transid.finished(this, start) - case Failure(t) => transid.failed(this, start, t.getMessage, ErrorLevel) - } - } } object KubernetesClient { diff --git a/tests/src/test/scala/whisk/core/containerpool/kubernetes/test/KubernetesClientTests.scala b/tests/src/test/scala/whisk/core/containerpool/kubernetes/test/KubernetesClientTests.scala index 1f584d7d411..e653b4117cf 100644 --- a/tests/src/test/scala/whisk/core/containerpool/kubernetes/test/KubernetesClientTests.scala +++ b/tests/src/test/scala/whisk/core/containerpool/kubernetes/test/KubernetesClientTests.scala @@ -76,13 +76,9 @@ class KubernetesClientTests val id = ContainerId("55db56ee082239428b27d3728b4dd324c09068458aad9825727d5bfc1bba6d52") val container = kubernetesContainer(id) - val kubectlCommand = "kubectl" - /** Returns a KubernetesClient with a mocked result for 'executeProcess' */ def kubernetesClient(fixture: => Future[String]) = { new KubernetesClient()(global) { - override def findKubectlCmd() = kubectlCommand - override def executeProcess(args: Seq[String], timeout: Duration)(implicit ec: ExecutionContext, as: ActorSystem) = fixture