Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK][K8S] Support auto build Kubernetes client from env when Kyuubi running in Pod #3663

Closed
wants to merge 15 commits into from
2 changes: 1 addition & 1 deletion .github/workflows/master.yml
Original file line number Diff line number Diff line change
Expand Up @@ -356,7 +356,7 @@ jobs:
./build/mvn ${MVN_OPT} clean install
-pl integration-tests/kyuubi-kubernetes-it -am
-Pkubernetes-it
-Dtest=none -DwildcardSuites=org.apache.kyuubi.kubernetes.test.deployment
-Dtest=none -DwildcardSuites=org.apache.kyuubi.kubernetes.test.deployment,org.apache.kyuubi.kubernetes.test.KubernetesUtilsTest
- name: Upload test logs
if: failure()
uses: actions/upload-artifact@v2
Expand Down
3 changes: 2 additions & 1 deletion docker/helm/templates/kyuubi-configmap.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -47,5 +47,6 @@ data:
#
kyuubi.frontend.bind.host={{ .Values.server.bind.host }}
kyuubi.frontend.bind.port={{ .Values.server.bind.port }}
kyuubi.kubernetes.namespace= {{ .Release.Namespace }}

# Details in https://kyuubi.apache.org/docs/latest/deployment/settings.html
# Details in https://kyuubi.apache.org/docs/latest/deployment/settings.html
8 changes: 8 additions & 0 deletions docs/deployment/settings.md
Original file line number Diff line number Diff line change
Expand Up @@ -369,7 +369,15 @@ kyuubi.kinit.principal|<undefined>|Name of the Kerberos principal.|string|

Key | Default | Meaning | Type | Since
--- | --- | --- | --- | ---
kyuubi.kubernetes.authenticate.ca cert file|<undefined>|kubernetes client authenticate ca cert file|string|1.7.0
kyuubi.kubernetes.authenticate.clientCertFile|<undefined>|kubernetes client authenticate client cert file|string|1.7.0
kyuubi.kubernetes.authenticate.clientKeyFile|<undefined>|kubernetes client authenticate client key file|string|1.7.0
kyuubi.kubernetes.authenticate.oauthToken|<undefined>|kubernetes client authenticate oauth token value|string|1.7.0
kyuubi.kubernetes.authenticate.oauthTokenFile|<undefined>|kubernetes client authenticate oauth token file|string|1.7.0
kyuubi.kubernetes.context|<undefined>|The desired context from your kubernetes config file used to configure the K8S client for interacting with the cluster.|string|1.6.0
kyuubi.kubernetes.master.address|<undefined>|kubernetes master address for build kubernetes client|string|1.7.0
kyuubi.kubernetes.namespace|default||string|1.7.0
kyuubi.kubernetes.trust.certificates|false|If set to true then client can submit to kubernetes cluster only with token|boolean|1.7.0


### Metadata
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.kyuubi.kubernetes.test

import io.fabric8.kubernetes.client.Config

import org.apache.kyuubi.KyuubiFunSuite
import org.apache.kyuubi.config.KyuubiConf
import org.apache.kyuubi.config.KyuubiConf.{KUBERNETES_CONTEXT, KUBERNETES_MASTER}
import org.apache.kyuubi.util.KubernetesUtils

class KubernetesUtilsTest extends KyuubiFunSuite {

test("Test kubernetesUtils build Kubernetes client") {
val testMaster = "https://localhost:12345/"
System.setProperty(Config.KUBERNETES_MASTER_SYSTEM_PROPERTY, testMaster)
val conf = KyuubiConf()
val client1 = KubernetesUtils.buildKubernetesClient(conf)
assert(client1.nonEmpty && client1.get.getMasterUrl.toString.equals(testMaster))

// start up minikube
MiniKube.getIp
conf.set(KUBERNETES_CONTEXT.key, "minikube")
val client2 = KubernetesUtils.buildKubernetesClient(conf)
assert(client2.nonEmpty && client2.get.getMasterUrl.equals(
MiniKube.getKubernetesClient.getMasterUrl))

// user set master uri should replace uri in context
val master = "https://kyuubi-test:8443/"
conf.set(KUBERNETES_MASTER.key, master)
val client3 = KubernetesUtils.buildKubernetesClient(conf)
assert(client3.nonEmpty && client3.get.getMasterUrl.toString.equals(master))
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -828,6 +828,80 @@ object KyuubiConf {
.version("1.4.0")
.fallbackConf(FRONTEND_WORKER_KEEPALIVE_TIME)

val KUBERNETES_CONTEXT: OptionalConfigEntry[String] =
buildConf("kyuubi.kubernetes.context")
.doc("The desired context from your kubernetes config file used to configure the K8S " +
"client for interacting with the cluster.")
.version("1.6.0")
.stringConf
.createOptional

val KUBERNETES_NAMESPACE: ConfigEntry[String] =
buildConf("kyuubi.kubernetes.namespace")
.doc("The namespace that will be used for running the kyuubi pods and find engines.")
.version("1.7.0")
.stringConf
.createWithDefault("default")

val KUBERNETES_MASTER: OptionalConfigEntry[String] =
buildConf("kyuubi.kubernetes.master.address")
.doc("The internal Kubernetes master (API server) address to be used for kyuubi.")
.version("1.7.0")
.stringConf
.createOptional

val KUBERNETES_AUTHENTICATE_OAUTH_TOKEN_FILE: OptionalConfigEntry[String] =
buildConf("kyuubi.kubernetes.authenticate.oauthTokenFile")
.doc("Path to the file containing the OAuth token to use when authenticating against " +
"the Kubernetes API server. Specify this as a path as opposed to a URI " +
"(i.e. do not provide a scheme)")
.version("1.7.0")
.stringConf
.createOptional

val KUBERNETES_AUTHENTICATE_OAUTH_TOKEN: OptionalConfigEntry[String] =
buildConf("kyuubi.kubernetes.authenticate.oauthToken")
.doc("The OAuth token to use when authenticating against the Kubernetes API server. " +
"Note that unlike the other authentication options, this must be the exact string value " +
"of the token to use for the authentication.")
.version("1.7.0")
.stringConf
.createOptional

val KUBERNETES_AUTHENTICATE_CLIENT_KEY_FILE: OptionalConfigEntry[String] =
buildConf("kyuubi.kubernetes.authenticate.clientKeyFile")
.doc("Path to the client key file for connecting to the Kubernetes API server " +
"over TLS from the kyuubi. Specify this as a path as opposed to a URI " +
"(i.e. do not provide a scheme)")
.version("1.7.0")
.stringConf
.createOptional

val KUBERNETES_AUTHENTICATE_CLIENT_CERT_FILE: OptionalConfigEntry[String] =
buildConf("kyuubi.kubernetes.authenticate.clientCertFile")
.doc("Path to the client cert file for connecting to the Kubernetes API server " +
"over TLS from the kyuubi. Specify this as a path as opposed to a URI " +
"(i.e. do not provide a scheme)")
.version("1.7.0")
.stringConf
.createOptional

val KUBERNETES_AUTHENTICATE_CA_CERT_FILE: OptionalConfigEntry[String] =
buildConf("kyuubi.kubernetes.authenticate.caCertFile")
.doc("Path to the CA cert file for connecting to the Kubernetes API server " +
"over TLS from the kyuubi. Specify this as a path as opposed to a URI " +
"(i.e. do not provide a scheme)")
.version("1.7.0")
.stringConf
.createOptional

val KUBERNETES_TRUST_CERTIFICATES: ConfigEntry[Boolean] =
buildConf("kyuubi.kubernetes.trust.certificates")
.doc("If set to true then client can submit to kubernetes cluster only with token")
.version("1.7.0")
.booleanConf
.createWithDefault(false)

// ///////////////////////////////////////////////////////////////////////////////////////////////
// SQL Engine Configuration //
// ///////////////////////////////////////////////////////////////////////////////////////////////
Expand Down Expand Up @@ -1950,14 +2024,6 @@ object KyuubiConf {
.booleanConf
.createWithDefault(true)

val KUBERNETES_CONTEXT: OptionalConfigEntry[String] =
buildConf("kyuubi.kubernetes.context")
.doc("The desired context from your kubernetes config file used to configure the K8S " +
"client for interacting with the cluster.")
.version("1.6.0")
.stringConf
.createOptional

private val serverOnlyConfEntries: Set[ConfigEntry[_]] = Set(
FRONTEND_BIND_HOST,
FRONTEND_BIND_PORT,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,11 @@ object ThreadUtils extends Logging {
Executors.newFixedThreadPool(nThreads, threadFactory).asInstanceOf[ThreadPoolExecutor]
}

def newDaemonCachedThreadPool(prefix: String): ThreadPoolExecutor = {
val threadFactory = new NamedThreadFactory(prefix, daemon = true)
Executors.newCachedThreadPool(threadFactory).asInstanceOf[ThreadPoolExecutor]
}

def awaitResult[T](awaitable: Awaitable[T], atMost: Duration): T = {
try {
// `awaitPermission` is not actually used anywhere so it's safe to pass in null here.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,14 @@
package org.apache.kyuubi.engine

import io.fabric8.kubernetes.api.model.{Pod, PodList}
import io.fabric8.kubernetes.client.{Config, DefaultKubernetesClient, KubernetesClient, KubernetesClientException}
import io.fabric8.kubernetes.client.KubernetesClient
import io.fabric8.kubernetes.client.dsl.FilterWatchListDeletable

import org.apache.kyuubi.Logging
import org.apache.kyuubi.config.KyuubiConf
import org.apache.kyuubi.config.KyuubiConf.KUBERNETES_CONTEXT
import org.apache.kyuubi.engine.ApplicationState.{ApplicationState, FAILED, FINISHED, PENDING, RUNNING}
import org.apache.kyuubi.engine.KubernetesApplicationOperation.SPARK_APP_ID_LABEL
import org.apache.kyuubi.util.KubernetesUtils

class KubernetesApplicationOperation extends ApplicationOperation with Logging {

Expand All @@ -33,24 +34,18 @@ class KubernetesApplicationOperation extends ApplicationOperation with Logging {
private var jpsOperation: JpsApplicationOperation = _

override def initialize(conf: KyuubiConf): Unit = {
info("Start Initialize Kubernetes Client.")
val contextOpt = conf.get(KUBERNETES_CONTEXT)
if (contextOpt.isEmpty) {
warn("Skip Initialize Kubernetes Client, because of Context not set.")
return
}
jpsOperation = new JpsApplicationOperation
jpsOperation.initialize(conf)
kubernetesClient =
try {
val client = new DefaultKubernetesClient(Config.autoConfigure(contextOpt.get))

info("Start initializing Kubernetes Client.")
kubernetesClient = KubernetesUtils.buildKubernetesClient(conf) match {
case Some(client) =>
info(s"Initialized Kubernetes Client connect to: ${client.getMasterUrl}")
client
} catch {
case e: KubernetesClientException =>
error("Fail to init KubernetesClient for KubernetesApplicationOperation", e)
null
}
case None =>
warn("Fail to init Kubernetes Client for Kubernetes Application Operation")
null
}
}

override def isSupported(clusterManager: Option[String]): Boolean = {
Expand Down Expand Up @@ -91,8 +86,8 @@ class KubernetesApplicationOperation extends ApplicationOperation with Logging {
if (podList.size() != 0) {
val pod = podList.get(0)
val info = ApplicationInfo(
// Can't get appId, get Pod UID instead.
id = pod.getMetadata.getUid,
// spark pods always tag label `spark-app-selector:<spark-app-id>`
id = pod.getMetadata.getLabels.get(SPARK_APP_ID_LABEL),
name = pod.getMetadata.getName,
state = KubernetesApplicationOperation.toApplicationState(pod.getStatus.getPhase),
error = Option(pod.getStatus.getReason))
Expand Down Expand Up @@ -135,6 +130,7 @@ class KubernetesApplicationOperation extends ApplicationOperation with Logging {

object KubernetesApplicationOperation extends Logging {
val LABEL_KYUUBI_UNIQUE_KEY = "kyuubi-unique-tag"
val SPARK_APP_ID_LABEL = "spark-app-selector"

def toApplicationState(state: String): ApplicationState = state match {
// https://github.com/kubernetes/kubernetes/blob/master/pkg/apis/core/types.go#L2396
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.kyuubi.util

import java.io.File

import com.fasterxml.jackson.databind.ObjectMapper
import com.google.common.base.Charsets
import com.google.common.io.Files
import io.fabric8.kubernetes.client.{Config, ConfigBuilder, DefaultKubernetesClient, KubernetesClient}
import io.fabric8.kubernetes.client.Config.autoConfigure
import io.fabric8.kubernetes.client.okhttp.OkHttpClientFactory
import okhttp3.{Dispatcher, OkHttpClient}

import org.apache.kyuubi.Logging
import org.apache.kyuubi.config.KyuubiConf
import org.apache.kyuubi.config.KyuubiConf._

object KubernetesUtils extends Logging {

def buildKubernetesClient(conf: KyuubiConf): Option[KubernetesClient] = {
val master = conf.get(KUBERNETES_MASTER)
val namespace = conf.get(KUBERNETES_NAMESPACE)
val serviceAccountToken =
Some(new File(Config.KUBERNETES_SERVICE_ACCOUNT_TOKEN_PATH)).filter(_.exists)
val serviceAccountCaCrt =
Some(new File(Config.KUBERNETES_SERVICE_ACCOUNT_CA_CRT_PATH)).filter(_.exists)

val oauthTokenFile = conf.get(KUBERNETES_AUTHENTICATE_OAUTH_TOKEN_FILE)
.map(new File(_))
.orElse(serviceAccountToken)
val oauthTokenValue = conf.get(KUBERNETES_AUTHENTICATE_OAUTH_TOKEN)

KubernetesUtils.requireNandDefined(
oauthTokenFile,
oauthTokenValue,
s"Cannot specify OAuth token through both a oauth token file and a " +
s"oauth token value.")

val caCertFile = conf
.get(KUBERNETES_AUTHENTICATE_CA_CERT_FILE)
.orElse(serviceAccountCaCrt.map(_.getAbsolutePath))
val clientKeyFile = conf.get(KUBERNETES_AUTHENTICATE_CLIENT_KEY_FILE)
val clientCertFile = conf.get(KUBERNETES_AUTHENTICATE_CLIENT_CERT_FILE)

// Allow for specifying a context used to auto-configure from the users K8S config file
val kubeContext = conf.get(KUBERNETES_CONTEXT).filter(_.nonEmpty)
info("Auto-configuring K8S client using " +
kubeContext.map("context " + _).getOrElse("current context") +
" from users K8S config file")

val config = new ConfigBuilder(autoConfigure(kubeContext.orNull))
.withApiVersion("v1")
.withOption(master) {
(master, configBuilder) => configBuilder.withMasterUrl(master)
}.withNamespace(namespace)
.withTrustCerts(conf.get(KUBERNETES_TRUST_CERTIFICATES))
.withOption(oauthTokenValue) {
(token, configBuilder) => configBuilder.withOauthToken(token)
}.withOption(oauthTokenFile) {
(file, configBuilder) =>
configBuilder.withOauthToken(Files.asCharSource(file, Charsets.UTF_8).read())
}.withOption(caCertFile) {
(file, configBuilder) => configBuilder.withCaCertFile(file)
}.withOption(clientKeyFile) {
(file, configBuilder) => configBuilder.withClientKeyFile(file)
}.withOption(clientCertFile) {
(file, configBuilder) => configBuilder.withClientCertFile(file)
}.build()

// https://github.com/fabric8io/kubernetes-client/issues/3547
val dispatcher = new Dispatcher(
ThreadUtils.newDaemonCachedThreadPool("kubernetes-dispatcher"))
val factoryWithCustomDispatcher = new OkHttpClientFactory() {
override protected def additionalConfig(builder: OkHttpClient.Builder): Unit = {
builder.dispatcher(dispatcher)
}
}

debug("Kubernetes client config: " +
new ObjectMapper().writerWithDefaultPrettyPrinter().writeValueAsString(config))
Some(new DefaultKubernetesClient(factoryWithCustomDispatcher.createHttpClient(config), config))
}

implicit private class OptionConfigurableConfigBuilder(val configBuilder: ConfigBuilder)
extends AnyVal {

def withOption[T](option: Option[T])(configurator: ((T, ConfigBuilder) => ConfigBuilder))
: ConfigBuilder = {
option.map { opt =>
configurator(opt, configBuilder)
}.getOrElse(configBuilder)
}
}

def requireNandDefined(opt1: Option[_], opt2: Option[_], errMessage: String): Unit = {
opt1.foreach { _ => require(opt2.isEmpty, errMessage) }
opt2.foreach { _ => require(opt1.isEmpty, errMessage) }
}
}