Skip to content

Commit

Permalink
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
k8s: implement invoker-node affinity and eliminate usage of kubectl
Browse files Browse the repository at this point in the history
1. Upgrade to latest released version of the fabric8 Kubernetes client
   to get access to an implementation of node affinity. Use that implementation
   to optionally add a scheduling affinity to the pods created for actions
   to bind them to worker nodes labeled as invoker nodes.

2. implement the container removal operation via the kube rest client
   instead of via an exec to the kubectl cli.  This eliminates the last
   usage of kubectl in the KubernetesClient (which will allow us to
   eventually remove the kubectl executable from the invoker docker
   image).
dgrove-oss committed Aug 13, 2018

Partially verified

This commit is signed with the committer’s verified signature.
spydon’s contribution has been verified via GPG key.
We cannot verify signatures from co-authors, and some of the co-authors attributed to this commit require their commits to be signed.
1 parent 7b5d805 commit dcef2d0
Showing 4 changed files with 51 additions and 40 deletions.
2 changes: 1 addition & 1 deletion common/scala/build.gradle
Original file line number Diff line number Diff line change
@@ -60,7 +60,7 @@ dependencies {
}
compile 'com.github.ben-manes.caffeine:caffeine:2.4.0'
compile 'com.google.code.findbugs:jsr305:3.0.2'
compile 'io.fabric8:kubernetes-client:2.5.7'
compile 'io.fabric8:kubernetes-client:4.0.3'
compile 'io.kamon:kamon-core_2.11:0.6.7'
compile 'io.kamon:kamon-statsd_2.11:0.6.7'
//for mesos
5 changes: 5 additions & 0 deletions core/invoker/src/main/resources/application.conf
Original file line number Diff line number Diff line change
@@ -54,6 +54,11 @@ whisk {
enabled: false
port: 3233
}
invoker-node-affinity {
enabled: true
key: "openwhisk-role"
value: "invoker"
}
}

# Timeouts for runc commands. Set to "Inf" to disable timeout.
Original file line number Diff line number Diff line change
@@ -17,15 +17,12 @@

package whisk.core.containerpool.kubernetes

import java.io.{FileNotFoundException, IOException}
import java.io.IOException
import java.net.SocketTimeoutException
import java.nio.file.Files
import java.nio.file.Paths
import java.time.{Instant, ZoneId}
import java.time.format.DateTimeFormatterBuilder

import akka.actor.ActorSystem
import akka.event.Logging.{ErrorLevel, InfoLevel}
import akka.http.scaladsl.model.Uri
import akka.http.scaladsl.model.Uri.Path
import akka.http.scaladsl.model.Uri.Query
@@ -37,7 +34,6 @@ import akka.util.ByteString
import io.fabric8.kubernetes.api.model._
import pureconfig.loadConfigOrThrow
import whisk.common.Logging
import whisk.common.LoggingMarkers
import whisk.common.TransactionId
import whisk.core.ConfigKeys
import whisk.core.containerpool.ContainerId
@@ -75,10 +71,17 @@ case class KubernetesClientTimeoutConfig(run: Duration, rm: Duration, inspect: D
*/
case class KubernetesInvokerAgentConfig(enabled: Boolean, port: Int)

/**
* Configuration for invoker node affinity for user action containers
*/
case class KubernetesInvokerNodeAffinity(enabled: Boolean, key: String, value: String)

/**
* General configuration for kubernetes client
*/
case class KubernetesClientConfig(timeouts: KubernetesClientTimeoutConfig, invokerAgent: KubernetesInvokerAgentConfig)
case class KubernetesClientConfig(timeouts: KubernetesClientTimeoutConfig,
invokerAgent: KubernetesInvokerAgentConfig,
invokerNodeAffinity: KubernetesInvokerNodeAffinity)

/**
* Serves as an interface to the Kubernetes API by proxying its REST API and/or invoking the kubectl CLI.
@@ -101,19 +104,6 @@ class KubernetesClient(
.withRequestTimeout(config.timeouts.logs.toMillis.toInt)
.build())

// Determines how to run kubectl. Failure to find a kubectl binary implies
// a failure to initialize this instance of KubernetesClient.
protected def findKubectlCmd(): String = {
val alternatives = List("/usr/bin/kubectl", "/usr/local/bin/kubectl")
val kubectlBin = Try {
alternatives.find(a => Files.isExecutable(Paths.get(a))).get
} getOrElse {
throw new FileNotFoundException(s"Couldn't locate kubectl binary (tried: ${alternatives.mkString(", ")}).")
}
kubectlBin
}
protected val kubectlCmd = Seq(findKubectlCmd)

def run(name: String,
image: String,
memory: ByteSize = 256.MB,
@@ -124,14 +114,31 @@ class KubernetesClient(
case (key, value) => new EnvVarBuilder().withName(key).withValue(value).build()
}.toSeq

val pod = new PodBuilder()
val podBuilder = new PodBuilder()
.withNewMetadata()
.withName(name)
.addToLabels("name", name)
.addToLabels(labels.asJava)
.endMetadata()
.withNewSpec()
.withRestartPolicy("Always")
if (config.invokerNodeAffinity.enabled) {
val invokerNodeAffinity = new AffinityBuilder()
.withNewNodeAffinity()
.withNewRequiredDuringSchedulingIgnoredDuringExecution()
.addNewNodeSelectorTerm()
.addNewMatchExpression()
.withKey(config.invokerNodeAffinity.key)
.withOperator("In")
.withValues(config.invokerNodeAffinity.value)
.endMatchExpression()
.endNodeSelectorTerm()
.endRequiredDuringSchedulingIgnoredDuringExecution()
.endNodeAffinity()
.build()
podBuilder.withAffinity(invokerNodeAffinity)
}
val pod = podBuilder
.addNewContainer()
.withNewResources()
.withLimits(Map("memory" -> new Quantity(memory.toMB + "Mi")).asJava)
@@ -166,11 +173,27 @@ class KubernetesClient(
}

def rm(container: KubernetesContainer)(implicit transid: TransactionId): Future[Unit] = {
runCmd(Seq("delete", "--now", "pod", container.id.asString), config.timeouts.rm).map(_ => ())
Future {
blocking {
kubeRestClient
.inNamespace(kubeRestClient.getNamespace)
.pods()
.withName(container.id.asString)
.delete()
}
}.map(_ => ())
}

def rm(key: String, value: String, ensureUnpaused: Boolean = false)(implicit transid: TransactionId): Future[Unit] = {
runCmd(Seq("delete", "--now", "pod", "-l", s"$key=$value"), config.timeouts.rm).map(_ => ())
Future {
blocking {
kubeRestClient
.inNamespace(kubeRestClient.getNamespace)
.pods()
.withLabel(key, value)
.delete()
}
}.map(_ => ())
}

// suspend is a no-op with the basic KubernetesClient
@@ -200,19 +223,6 @@ class KubernetesClient(
implicit val kubernetes = this
new KubernetesContainer(id, addr, workerIP, nativeContainerId)
}

protected def runCmd(args: Seq[String], timeout: Duration)(implicit transid: TransactionId): Future[String] = {
val cmd = kubectlCmd ++ args
val start = transid.started(
this,
LoggingMarkers.INVOKER_KUBECTL_CMD(args.head),
s"running ${cmd.mkString(" ")} (timeout: $timeout)",
logLevel = InfoLevel)
executeProcess(cmd, timeout).andThen {
case Success(_) => transid.finished(this, start)
case Failure(t) => transid.failed(this, start, t.getMessage, ErrorLevel)
}
}
}

object KubernetesClient {
Original file line number Diff line number Diff line change
@@ -76,13 +76,9 @@ class KubernetesClientTests
val id = ContainerId("55db56ee082239428b27d3728b4dd324c09068458aad9825727d5bfc1bba6d52")
val container = kubernetesContainer(id)

val kubectlCommand = "kubectl"

/** Returns a KubernetesClient with a mocked result for 'executeProcess' */
def kubernetesClient(fixture: => Future[String]) = {
new KubernetesClient()(global) {
override def findKubectlCmd() = kubectlCommand

override def executeProcess(args: Seq[String], timeout: Duration)(implicit ec: ExecutionContext,
as: ActorSystem) =
fixture

0 comments on commit dcef2d0

Please sign in to comment.