-
Notifications
You must be signed in to change notification settings - Fork 1.2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
k8s: implement invoker-node affinity and eliminate usage of kubectl #3963
Merged
Merged
Changes from all commits
Commits
File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -17,15 +17,12 @@ | |
|
||
package whisk.core.containerpool.kubernetes | ||
|
||
import java.io.{FileNotFoundException, IOException} | ||
import java.io.IOException | ||
import java.net.SocketTimeoutException | ||
import java.nio.file.Files | ||
import java.nio.file.Paths | ||
import java.time.{Instant, ZoneId} | ||
import java.time.format.DateTimeFormatterBuilder | ||
|
||
import akka.actor.ActorSystem | ||
import akka.event.Logging.{ErrorLevel, InfoLevel} | ||
import akka.http.scaladsl.model.Uri | ||
import akka.http.scaladsl.model.Uri.Path | ||
import akka.http.scaladsl.model.Uri.Query | ||
|
@@ -37,7 +34,6 @@ import akka.util.ByteString | |
import io.fabric8.kubernetes.api.model._ | ||
import pureconfig.loadConfigOrThrow | ||
import whisk.common.Logging | ||
import whisk.common.LoggingMarkers | ||
import whisk.common.TransactionId | ||
import whisk.core.ConfigKeys | ||
import whisk.core.containerpool.ContainerId | ||
|
@@ -68,17 +64,27 @@ import scala.util.control.NonFatal | |
/** | ||
* Configuration for kubernetes client command timeouts. | ||
*/ | ||
case class KubernetesClientTimeoutConfig(run: Duration, rm: Duration, inspect: Duration, logs: Duration) | ||
case class KubernetesClientTimeoutConfig(run: Duration, logs: Duration) | ||
|
||
/** | ||
* Configuration for kubernetes invoker-agent | ||
*/ | ||
case class KubernetesInvokerAgentConfig(enabled: Boolean, port: Int) | ||
|
||
/** | ||
* Configuration for node affinity for the pods that execute user action containers | ||
* The key,value pair should match the <key,value> pair with which the invoker worker nodes | ||
* are labeled in the Kubernetes cluster. The default pair is <openwhisk-role,invoker>, | ||
* but a deployment may override this default if needed. | ||
*/ | ||
case class KubernetesInvokerNodeAffinity(enabled: Boolean, key: String, value: String) | ||
|
||
/** | ||
* General configuration for kubernetes client | ||
*/ | ||
case class KubernetesClientConfig(timeouts: KubernetesClientTimeoutConfig, invokerAgent: KubernetesInvokerAgentConfig) | ||
case class KubernetesClientConfig(timeouts: KubernetesClientTimeoutConfig, | ||
invokerAgent: KubernetesInvokerAgentConfig, | ||
userPodNodeAffinity: KubernetesInvokerNodeAffinity) | ||
|
||
/** | ||
* Serves as an interface to the Kubernetes API by proxying its REST API and/or invoking the kubectl CLI. | ||
|
@@ -101,19 +107,6 @@ class KubernetesClient( | |
.withRequestTimeout(config.timeouts.logs.toMillis.toInt) | ||
.build()) | ||
|
||
// Determines how to run kubectl. Failure to find a kubectl binary implies | ||
// a failure to initialize this instance of KubernetesClient. | ||
protected def findKubectlCmd(): String = { | ||
val alternatives = List("/usr/bin/kubectl", "/usr/local/bin/kubectl") | ||
val kubectlBin = Try { | ||
alternatives.find(a => Files.isExecutable(Paths.get(a))).get | ||
} getOrElse { | ||
throw new FileNotFoundException(s"Couldn't locate kubectl binary (tried: ${alternatives.mkString(", ")}).") | ||
} | ||
kubectlBin | ||
} | ||
protected val kubectlCmd = Seq(findKubectlCmd) | ||
|
||
def run(name: String, | ||
image: String, | ||
memory: ByteSize = 256.MB, | ||
|
@@ -124,14 +117,31 @@ class KubernetesClient( | |
case (key, value) => new EnvVarBuilder().withName(key).withValue(value).build() | ||
}.toSeq | ||
|
||
val pod = new PodBuilder() | ||
val podBuilder = new PodBuilder() | ||
.withNewMetadata() | ||
.withName(name) | ||
.addToLabels("name", name) | ||
.addToLabels(labels.asJava) | ||
.endMetadata() | ||
.withNewSpec() | ||
.withRestartPolicy("Always") | ||
if (config.userPodNodeAffinity.enabled) { | ||
val invokerNodeAffinity = new AffinityBuilder() | ||
.withNewNodeAffinity() | ||
.withNewRequiredDuringSchedulingIgnoredDuringExecution() | ||
.addNewNodeSelectorTerm() | ||
.addNewMatchExpression() | ||
.withKey(config.userPodNodeAffinity.key) | ||
.withOperator("In") | ||
.withValues(config.userPodNodeAffinity.value) | ||
.endMatchExpression() | ||
.endNodeSelectorTerm() | ||
.endRequiredDuringSchedulingIgnoredDuringExecution() | ||
.endNodeAffinity() | ||
.build() | ||
podBuilder.withAffinity(invokerNodeAffinity) | ||
} | ||
val pod = podBuilder | ||
.addNewContainer() | ||
.withNewResources() | ||
.withLimits(Map("memory" -> new Quantity(memory.toMB + "Mi")).asJava) | ||
|
@@ -166,11 +176,27 @@ class KubernetesClient( | |
} | ||
|
||
def rm(container: KubernetesContainer)(implicit transid: TransactionId): Future[Unit] = { | ||
runCmd(Seq("delete", "--now", "pod", container.id.asString), config.timeouts.rm).map(_ => ()) | ||
Future { | ||
blocking { | ||
kubeRestClient | ||
.inNamespace(kubeRestClient.getNamespace) | ||
.pods() | ||
.withName(container.id.asString) | ||
.delete() | ||
} | ||
}.map(_ => ()) | ||
} | ||
|
||
def rm(key: String, value: String, ensureUnpaused: Boolean = false)(implicit transid: TransactionId): Future[Unit] = { | ||
runCmd(Seq("delete", "--now", "pod", "-l", s"$key=$value"), config.timeouts.rm).map(_ => ()) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Are the timeouts still in use? If not: Delete from the config? |
||
Future { | ||
blocking { | ||
kubeRestClient | ||
.inNamespace(kubeRestClient.getNamespace) | ||
.pods() | ||
.withLabel(key, value) | ||
.delete() | ||
} | ||
}.map(_ => ()) | ||
} | ||
|
||
// suspend is a no-op with the basic KubernetesClient | ||
|
@@ -200,19 +226,6 @@ class KubernetesClient( | |
implicit val kubernetes = this | ||
new KubernetesContainer(id, addr, workerIP, nativeContainerId) | ||
} | ||
|
||
protected def runCmd(args: Seq[String], timeout: Duration)(implicit transid: TransactionId): Future[String] = { | ||
val cmd = kubectlCmd ++ args | ||
val start = transid.started( | ||
this, | ||
LoggingMarkers.INVOKER_KUBECTL_CMD(args.head), | ||
s"running ${cmd.mkString(" ")} (timeout: $timeout)", | ||
logLevel = InfoLevel) | ||
executeProcess(cmd, timeout).andThen { | ||
case Success(_) => transid.finished(this, start) | ||
case Failure(t) => transid.failed(this, start, t.getMessage, ErrorLevel) | ||
} | ||
} | ||
} | ||
|
||
object KubernetesClient { | ||
|
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍