Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

k8s: implement invoker-node affinity and eliminate usage of kubectl #3963

Merged
merged 1 commit into from
Aug 17, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion common/scala/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ dependencies {
}
compile 'com.github.ben-manes.caffeine:caffeine:2.4.0'
compile 'com.google.code.findbugs:jsr305:3.0.2'
compile 'io.fabric8:kubernetes-client:2.5.7'
compile 'io.fabric8:kubernetes-client:4.0.3'
compile 'io.kamon:kamon-core_2.11:0.6.7'
compile 'io.kamon:kamon-statsd_2.11:0.6.7'
//for mesos
Expand Down
6 changes: 0 additions & 6 deletions core/invoker/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
FROM scala

ENV DOCKER_VERSION 1.12.0
ENV KUBERNETES_VERSION 1.6.4

RUN apk add --update openssl

Expand All @@ -17,11 +16,6 @@ rm -f docker-${DOCKER_VERSION}.tgz && \
chmod +x /usr/bin/docker && \
chmod +x /usr/bin/docker-runc

# Install kubernetes client
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

RUN wget --no-verbose https://storage.googleapis.com/kubernetes-release/release/v${KUBERNETES_VERSION}/bin/linux/amd64/kubectl && \
chmod +x kubectl && \
mv kubectl /usr/bin/kubectl

ADD build/distributions/invoker.tar ./

COPY init.sh /
Expand Down
7 changes: 5 additions & 2 deletions core/invoker/src/main/resources/application.conf
Original file line number Diff line number Diff line change
Expand Up @@ -46,14 +46,17 @@ whisk {
# Timeouts for k8s commands. Set to "Inf" to disable timeout.
timeouts {
run: 1 minute
rm: 1 minute
inspect: 1 minute
logs: 1 minute
}
invoker-agent {
enabled: false
port: 3233
}
user-pod-node-affinity {
enabled: true
key: "openwhisk-role"
value: "invoker"
}
}

# Timeouts for runc commands. Set to "Inf" to disable timeout.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,12 @@

package whisk.core.containerpool.kubernetes

import java.io.{FileNotFoundException, IOException}
import java.io.IOException
import java.net.SocketTimeoutException
import java.nio.file.Files
import java.nio.file.Paths
import java.time.{Instant, ZoneId}
import java.time.format.DateTimeFormatterBuilder

import akka.actor.ActorSystem
import akka.event.Logging.{ErrorLevel, InfoLevel}
import akka.http.scaladsl.model.Uri
import akka.http.scaladsl.model.Uri.Path
import akka.http.scaladsl.model.Uri.Query
Expand All @@ -37,7 +34,6 @@ import akka.util.ByteString
import io.fabric8.kubernetes.api.model._
import pureconfig.loadConfigOrThrow
import whisk.common.Logging
import whisk.common.LoggingMarkers
import whisk.common.TransactionId
import whisk.core.ConfigKeys
import whisk.core.containerpool.ContainerId
Expand Down Expand Up @@ -68,17 +64,27 @@ import scala.util.control.NonFatal
/**
* Configuration for kubernetes client command timeouts.
*/
case class KubernetesClientTimeoutConfig(run: Duration, rm: Duration, inspect: Duration, logs: Duration)
case class KubernetesClientTimeoutConfig(run: Duration, logs: Duration)

/**
* Configuration for kubernetes invoker-agent
*/
case class KubernetesInvokerAgentConfig(enabled: Boolean, port: Int)

/**
* Configuration for node affinity for the pods that execute user action containers
* The key,value pair should match the <key,value> pair with which the invoker worker nodes
* are labeled in the Kubernetes cluster. The default pair is <openwhisk-role,invoker>,
* but a deployment may override this default if needed.
*/
case class KubernetesInvokerNodeAffinity(enabled: Boolean, key: String, value: String)

/**
* General configuration for kubernetes client
*/
case class KubernetesClientConfig(timeouts: KubernetesClientTimeoutConfig, invokerAgent: KubernetesInvokerAgentConfig)
case class KubernetesClientConfig(timeouts: KubernetesClientTimeoutConfig,
invokerAgent: KubernetesInvokerAgentConfig,
userPodNodeAffinity: KubernetesInvokerNodeAffinity)

/**
* Serves as an interface to the Kubernetes API by proxying its REST API and/or invoking the kubectl CLI.
Expand All @@ -101,19 +107,6 @@ class KubernetesClient(
.withRequestTimeout(config.timeouts.logs.toMillis.toInt)
.build())

// Determines how to run kubectl. Failure to find a kubectl binary implies
// a failure to initialize this instance of KubernetesClient.
protected def findKubectlCmd(): String = {
val alternatives = List("/usr/bin/kubectl", "/usr/local/bin/kubectl")
val kubectlBin = Try {
alternatives.find(a => Files.isExecutable(Paths.get(a))).get
} getOrElse {
throw new FileNotFoundException(s"Couldn't locate kubectl binary (tried: ${alternatives.mkString(", ")}).")
}
kubectlBin
}
protected val kubectlCmd = Seq(findKubectlCmd)

def run(name: String,
image: String,
memory: ByteSize = 256.MB,
Expand All @@ -124,14 +117,31 @@ class KubernetesClient(
case (key, value) => new EnvVarBuilder().withName(key).withValue(value).build()
}.toSeq

val pod = new PodBuilder()
val podBuilder = new PodBuilder()
.withNewMetadata()
.withName(name)
.addToLabels("name", name)
.addToLabels(labels.asJava)
.endMetadata()
.withNewSpec()
.withRestartPolicy("Always")
if (config.userPodNodeAffinity.enabled) {
val invokerNodeAffinity = new AffinityBuilder()
.withNewNodeAffinity()
.withNewRequiredDuringSchedulingIgnoredDuringExecution()
.addNewNodeSelectorTerm()
.addNewMatchExpression()
.withKey(config.userPodNodeAffinity.key)
.withOperator("In")
.withValues(config.userPodNodeAffinity.value)
.endMatchExpression()
.endNodeSelectorTerm()
.endRequiredDuringSchedulingIgnoredDuringExecution()
.endNodeAffinity()
.build()
podBuilder.withAffinity(invokerNodeAffinity)
}
val pod = podBuilder
.addNewContainer()
.withNewResources()
.withLimits(Map("memory" -> new Quantity(memory.toMB + "Mi")).asJava)
Expand Down Expand Up @@ -166,11 +176,27 @@ class KubernetesClient(
}

def rm(container: KubernetesContainer)(implicit transid: TransactionId): Future[Unit] = {
runCmd(Seq("delete", "--now", "pod", container.id.asString), config.timeouts.rm).map(_ => ())
Future {
blocking {
kubeRestClient
.inNamespace(kubeRestClient.getNamespace)
.pods()
.withName(container.id.asString)
.delete()
}
}.map(_ => ())
}

def rm(key: String, value: String, ensureUnpaused: Boolean = false)(implicit transid: TransactionId): Future[Unit] = {
runCmd(Seq("delete", "--now", "pod", "-l", s"$key=$value"), config.timeouts.rm).map(_ => ())
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are the timeouts still in use? If not: Delete from the config?

Future {
blocking {
kubeRestClient
.inNamespace(kubeRestClient.getNamespace)
.pods()
.withLabel(key, value)
.delete()
}
}.map(_ => ())
}

// suspend is a no-op with the basic KubernetesClient
Expand Down Expand Up @@ -200,19 +226,6 @@ class KubernetesClient(
implicit val kubernetes = this
new KubernetesContainer(id, addr, workerIP, nativeContainerId)
}

protected def runCmd(args: Seq[String], timeout: Duration)(implicit transid: TransactionId): Future[String] = {
val cmd = kubectlCmd ++ args
val start = transid.started(
this,
LoggingMarkers.INVOKER_KUBECTL_CMD(args.head),
s"running ${cmd.mkString(" ")} (timeout: $timeout)",
logLevel = InfoLevel)
executeProcess(cmd, timeout).andThen {
case Success(_) => transid.finished(this, start)
case Failure(t) => transid.failed(this, start, t.getMessage, ErrorLevel)
}
}
}

object KubernetesClient {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,13 +76,9 @@ class KubernetesClientTests
val id = ContainerId("55db56ee082239428b27d3728b4dd324c09068458aad9825727d5bfc1bba6d52")
val container = kubernetesContainer(id)

val kubectlCommand = "kubectl"

/** Returns a KubernetesClient with a mocked result for 'executeProcess' */
def kubernetesClient(fixture: => Future[String]) = {
new KubernetesClient()(global) {
override def findKubectlCmd() = kubectlCommand

override def executeProcess(args: Seq[String], timeout: Duration)(implicit ec: ExecutionContext,
as: ActorSystem) =
fixture
Expand Down