From dcef2d0971404afb9df4b0963ab2c675262d4777 Mon Sep 17 00:00:00 2001 From: David Grove Date: Mon, 13 Aug 2018 12:06:47 -0400 Subject: [PATCH] k8s: implement invoker-node affinity and eliminate usage of kubectl 1. Upgrade to latest released version of the fabric8 Kubernetes client to get access to an implementation of node affinity. Use that implementation to optionally add a scheduling affinity to the pods created for actions to bind them to worker nodes labeled as invoker nodes. 2. implement the container removal operation via the kube rest client instead of via an exec to the kubectl cli. This eliminates the last usage of kubectl in the KubernetesClient (which will allow us to eventually remove the kubectl executable from the invoker docker image). --- common/scala/build.gradle | 2 +- .../src/main/resources/application.conf | 5 ++ .../kubernetes/KubernetesClient.scala | 80 +++++++++++-------- .../test/KubernetesClientTests.scala | 4 - 4 files changed, 51 insertions(+), 40 deletions(-) diff --git a/common/scala/build.gradle b/common/scala/build.gradle index c4ae7639605..743d36c853b 100644 --- a/common/scala/build.gradle +++ b/common/scala/build.gradle @@ -60,7 +60,7 @@ dependencies { } compile 'com.github.ben-manes.caffeine:caffeine:2.4.0' compile 'com.google.code.findbugs:jsr305:3.0.2' - compile 'io.fabric8:kubernetes-client:2.5.7' + compile 'io.fabric8:kubernetes-client:4.0.3' compile 'io.kamon:kamon-core_2.11:0.6.7' compile 'io.kamon:kamon-statsd_2.11:0.6.7' //for mesos diff --git a/core/invoker/src/main/resources/application.conf b/core/invoker/src/main/resources/application.conf index c471d1b7264..ee1bba693d2 100644 --- a/core/invoker/src/main/resources/application.conf +++ b/core/invoker/src/main/resources/application.conf @@ -54,6 +54,11 @@ whisk { enabled: false port: 3233 } + invoker-node-affinity { + enabled: true + key: "openwhisk-role" + value: "invoker" + } } # Timeouts for runc commands. Set to "Inf" to disable timeout. diff --git a/core/invoker/src/main/scala/whisk/core/containerpool/kubernetes/KubernetesClient.scala b/core/invoker/src/main/scala/whisk/core/containerpool/kubernetes/KubernetesClient.scala index 9a19571ea64..d2a20605e73 100644 --- a/core/invoker/src/main/scala/whisk/core/containerpool/kubernetes/KubernetesClient.scala +++ b/core/invoker/src/main/scala/whisk/core/containerpool/kubernetes/KubernetesClient.scala @@ -17,15 +17,12 @@ package whisk.core.containerpool.kubernetes -import java.io.{FileNotFoundException, IOException} +import java.io.IOException import java.net.SocketTimeoutException -import java.nio.file.Files -import java.nio.file.Paths import java.time.{Instant, ZoneId} import java.time.format.DateTimeFormatterBuilder import akka.actor.ActorSystem -import akka.event.Logging.{ErrorLevel, InfoLevel} import akka.http.scaladsl.model.Uri import akka.http.scaladsl.model.Uri.Path import akka.http.scaladsl.model.Uri.Query @@ -37,7 +34,6 @@ import akka.util.ByteString import io.fabric8.kubernetes.api.model._ import pureconfig.loadConfigOrThrow import whisk.common.Logging -import whisk.common.LoggingMarkers import whisk.common.TransactionId import whisk.core.ConfigKeys import whisk.core.containerpool.ContainerId @@ -75,10 +71,17 @@ case class KubernetesClientTimeoutConfig(run: Duration, rm: Duration, inspect: D */ case class KubernetesInvokerAgentConfig(enabled: Boolean, port: Int) +/** + * Configuration for invoker node affinity for user action containers + */ +case class KubernetesInvokerNodeAffinity(enabled: Boolean, key: String, value: String) + /** * General configuration for kubernetes client */ -case class KubernetesClientConfig(timeouts: KubernetesClientTimeoutConfig, invokerAgent: KubernetesInvokerAgentConfig) +case class KubernetesClientConfig(timeouts: KubernetesClientTimeoutConfig, + invokerAgent: KubernetesInvokerAgentConfig, + invokerNodeAffinity: KubernetesInvokerNodeAffinity) /** * Serves as an interface to the Kubernetes API by proxying its REST API and/or invoking the kubectl CLI. @@ -101,19 +104,6 @@ class KubernetesClient( .withRequestTimeout(config.timeouts.logs.toMillis.toInt) .build()) - // Determines how to run kubectl. Failure to find a kubectl binary implies - // a failure to initialize this instance of KubernetesClient. - protected def findKubectlCmd(): String = { - val alternatives = List("/usr/bin/kubectl", "/usr/local/bin/kubectl") - val kubectlBin = Try { - alternatives.find(a => Files.isExecutable(Paths.get(a))).get - } getOrElse { - throw new FileNotFoundException(s"Couldn't locate kubectl binary (tried: ${alternatives.mkString(", ")}).") - } - kubectlBin - } - protected val kubectlCmd = Seq(findKubectlCmd) - def run(name: String, image: String, memory: ByteSize = 256.MB, @@ -124,7 +114,7 @@ class KubernetesClient( case (key, value) => new EnvVarBuilder().withName(key).withValue(value).build() }.toSeq - val pod = new PodBuilder() + val podBuilder = new PodBuilder() .withNewMetadata() .withName(name) .addToLabels("name", name) @@ -132,6 +122,23 @@ class KubernetesClient( .endMetadata() .withNewSpec() .withRestartPolicy("Always") + if (config.invokerNodeAffinity.enabled) { + val invokerNodeAffinity = new AffinityBuilder() + .withNewNodeAffinity() + .withNewRequiredDuringSchedulingIgnoredDuringExecution() + .addNewNodeSelectorTerm() + .addNewMatchExpression() + .withKey(config.invokerNodeAffinity.key) + .withOperator("In") + .withValues(config.invokerNodeAffinity.value) + .endMatchExpression() + .endNodeSelectorTerm() + .endRequiredDuringSchedulingIgnoredDuringExecution() + .endNodeAffinity() + .build() + podBuilder.withAffinity(invokerNodeAffinity) + } + val pod = podBuilder .addNewContainer() .withNewResources() .withLimits(Map("memory" -> new Quantity(memory.toMB + "Mi")).asJava) @@ -166,11 +173,27 @@ class KubernetesClient( } def rm(container: KubernetesContainer)(implicit transid: TransactionId): Future[Unit] = { - runCmd(Seq("delete", "--now", "pod", container.id.asString), config.timeouts.rm).map(_ => ()) + Future { + blocking { + kubeRestClient + .inNamespace(kubeRestClient.getNamespace) + .pods() + .withName(container.id.asString) + .delete() + } + }.map(_ => ()) } def rm(key: String, value: String, ensureUnpaused: Boolean = false)(implicit transid: TransactionId): Future[Unit] = { - runCmd(Seq("delete", "--now", "pod", "-l", s"$key=$value"), config.timeouts.rm).map(_ => ()) + Future { + blocking { + kubeRestClient + .inNamespace(kubeRestClient.getNamespace) + .pods() + .withLabel(key, value) + .delete() + } + }.map(_ => ()) } // suspend is a no-op with the basic KubernetesClient @@ -200,19 +223,6 @@ class KubernetesClient( implicit val kubernetes = this new KubernetesContainer(id, addr, workerIP, nativeContainerId) } - - protected def runCmd(args: Seq[String], timeout: Duration)(implicit transid: TransactionId): Future[String] = { - val cmd = kubectlCmd ++ args - val start = transid.started( - this, - LoggingMarkers.INVOKER_KUBECTL_CMD(args.head), - s"running ${cmd.mkString(" ")} (timeout: $timeout)", - logLevel = InfoLevel) - executeProcess(cmd, timeout).andThen { - case Success(_) => transid.finished(this, start) - case Failure(t) => transid.failed(this, start, t.getMessage, ErrorLevel) - } - } } object KubernetesClient { diff --git a/tests/src/test/scala/whisk/core/containerpool/kubernetes/test/KubernetesClientTests.scala b/tests/src/test/scala/whisk/core/containerpool/kubernetes/test/KubernetesClientTests.scala index 1f584d7d411..e653b4117cf 100644 --- a/tests/src/test/scala/whisk/core/containerpool/kubernetes/test/KubernetesClientTests.scala +++ b/tests/src/test/scala/whisk/core/containerpool/kubernetes/test/KubernetesClientTests.scala @@ -76,13 +76,9 @@ class KubernetesClientTests val id = ContainerId("55db56ee082239428b27d3728b4dd324c09068458aad9825727d5bfc1bba6d52") val container = kubernetesContainer(id) - val kubectlCommand = "kubectl" - /** Returns a KubernetesClient with a mocked result for 'executeProcess' */ def kubernetesClient(fixture: => Future[String]) = { new KubernetesClient()(global) { - override def findKubectlCmd() = kubectlCommand - override def executeProcess(args: Seq[String], timeout: Duration)(implicit ec: ExecutionContext, as: ActorSystem) = fixture