Skip to content
This repository has been archived by the owner on Jan 9, 2020. It is now read-only.

Secure HDFS Support #414

Merged
Show file tree
Hide file tree
Changes from 42 commits
Commits
Show all changes
43 commits
Select commit Hold shift + click to select a range
ea9e516
Initial architecture design for HDFS support
ifilonenko Jul 15, 2017
47ea307
Minor styling
ifilonenko Jul 15, 2017
60a19ca
Added proper logic for mounting ConfigMaps
ifilonenko Jul 18, 2017
1d0175a
styling
ifilonenko Jul 18, 2017
163193a
modified otherKubernetesResource logic
ifilonenko Jul 18, 2017
8381fa6
fixed Integration tests and modified HADOOP_CONF_DIR variable to be F…
ifilonenko Jul 18, 2017
d4b1a68
setting HADOOP_CONF_DIR env variables
ifilonenko Jul 18, 2017
0bba092
Included integration tests for Stage 1
ifilonenko Jul 18, 2017
06df962
Initial Kerberos support
ifilonenko Jul 19, 2017
d7f54dd
initial Stage 2 architecture using deprecated 2.1 methods
ifilonenko Jul 21, 2017
d3c5a03
Added current, BROKEN, integration test environment for review
ifilonenko Jul 26, 2017
d7441ba
working hadoop cluster
ifilonenko Jul 28, 2017
04eed68
Using locks and monitors to ensure proper configs for setting up kerb…
ifilonenko Jul 29, 2017
62354eb
working Stage 2
ifilonenko Jul 31, 2017
514ac19
documentation
ifilonenko Aug 1, 2017
3fbf88c
Integration Stages 1,2 and 3
ifilonenko Aug 2, 2017
b321436
further testing work
ifilonenko Aug 2, 2017
b6912d2
fixing imports
ifilonenko Aug 2, 2017
c6b11f8
Stage 3 Integration tests pass
ifilonenko Aug 3, 2017
1e71ca7
uncommented SparkDockerBuilder
ifilonenko Aug 4, 2017
350c8ed
testing fix
ifilonenko Aug 4, 2017
5e4051c
handled comments and increased test hardening
ifilonenko Aug 8, 2017
8338fdb
Solve failing integration test problem and lower TIMEOUT time
ifilonenko Aug 9, 2017
d6d0945
modify security.authoization
ifilonenko Aug 9, 2017
e3f14e1
Modifying HADOOP_CONF flags
ifilonenko Aug 9, 2017
61a7414
Refactored tests and included modifications to pass all tests regardl…
ifilonenko Aug 15, 2017
7a0b4e4
Adding unit test and one more integration test
ifilonenko Aug 16, 2017
8dacb19
completed unit tests w/o UGI mocking
ifilonenko Aug 16, 2017
d9b7b50
cleanup and various small fixes
ifilonenko Aug 18, 2017
d53a50f
added back sparkdockerbuilder images
ifilonenko Aug 22, 2017
499b037
merge issues
ifilonenko Aug 31, 2017
ffe7891
address initial comments and scalastyle issues
ifilonenko Aug 31, 2017
6efa379
addresses comments from PR
ifilonenko Aug 31, 2017
6052a13
mocking hadoopUGI
ifilonenko Aug 31, 2017
f9ca47d
Fix executor env to include simple authn
kimoonkim Sep 1, 2017
91e364c
Merge remote-tracking branch 'bloomberg/secure-hdfs-support4' into pr…
kimoonkim Sep 1, 2017
4fe86f0
Merge pull request #1 from kimoonkim/pr-414
ifilonenko Sep 1, 2017
d2c8649
Fix a bug in executor env handling
kimoonkim Sep 1, 2017
4780878
Merge remote-tracking branch 'bloomberg/secure-hdfs-support4' into pr…
kimoonkim Sep 1, 2017
17f2702
Merge pull request #2 from kimoonkim/pr-414
ifilonenko Sep 1, 2017
b566fa9
Fix a bug in how the driver sets simple authn
kimoonkim Sep 1, 2017
726ff64
Merge pull request #3 from kimoonkim/pr-414
ifilonenko Sep 2, 2017
2d48613
handling Pr comments
ifilonenko Sep 19, 2017
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -575,7 +575,7 @@ object SparkSubmit extends CommandLineUtils {
}

// assure a keytab is available from any place in a JVM
if (clusterManager == YARN || clusterManager == LOCAL) {
if (clusterManager == YARN || clusterManager == KUBERNETES || clusterManager == LOCAL) {
if (args.principal != null) {
require(args.keytab != null, "Keytab must be specified when principal is specified")
if (!new File(args.keytab).exists()) {
Expand Down
44 changes: 44 additions & 0 deletions docs/running-on-kubernetes.md
Original file line number Diff line number Diff line change
Expand Up @@ -783,6 +783,50 @@ from the other deployment modes. See the [configuration page](configuration.html
</td>
</tr>
<tr>
<td><code>spark.kubernetes.kerberos.enabled</code></td>
<td>false</td>
<td>
Specify whether your job requires a Kerberos Authentication to access HDFS. By default, we
will assume that you will not require secure HDFS access.
</td>
</tr>
<tr>
<td><code>spark.kubernetes.kerberos.keytab</code></td>
<td>(none)</td>
<td>
Assuming you have set <code>spark.kubernetes.kerberos.enabled</code> to be true. This will let you specify
the location of your Kerberos keytab to be used in order to access Secure HDFS. This is optional as you
may login by running <code>kinit</code> before running the spark-submit, and the submission client
will look within your local TGT cache to resolve this.
</td>
</tr>
<tr>
<td><code>spark.kubernetes.kerberos.principal</code></td>
<td>(none)</td>
<td>
Assuming you have set <code>spark.kubernetes.kerberos.enabled</code> to be true. This will let you specify
your Kerberos principal that you wish to use to access Secure HDFS. This is optional as you
may login by running <code>kinit</code> before running the spark-submit, and the submission client
will look within your local TGT cache to resolve this.
</td>
</tr>
<tr>
<td><code>spark.kubernetes.kerberos.tokensecret.name</code></td>
<td>(none)</td>
<td>
Assuming you have set <code>spark.kubernetes.kerberos.enabled</code> to be true. This will let you specify
the name of the secret where your existing delegation token data is stored. You must also specify the
item key <code>spark.kubernetes.kerberos.tokensecret.itemkey</code> where your data is stored on the secret.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you mention that this is optional in case you want to use pre-existing secret and a new secret will be automatically created otherwise?

</td>
</tr>
<tr>
<td><code>spark.kubernetes.kerberos.tokensecret.itemkey</code></td>
<td>spark.kubernetes.kerberos.dt.label</td>
<td>
Assuming you have set <code>spark.kubernetes.kerberos.enabled</code> to be true. This will let you specify
the label within the pre-specified secret where the data of your existing delegation token data is stored.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

s/label/data item key name/

We have a default value of <code>spark.kubernetes.kerberos.tokensecret.itemkey</code> should you not include it. But
you should always include this if you are proposing a pre-existing secret contain the delegation token data.
<td><code>spark.executorEnv.[EnvironmentVariableName]</code></td>
<td>(none)</td>
<td>
Expand Down
6 changes: 6 additions & 0 deletions resource-managers/kubernetes/core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,12 @@
<dependency>
<groupId>com.fasterxml.jackson.jaxrs</groupId>
<artifactId>jackson-jaxrs-json-provider</artifactId>
<exclusions>
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm. Do we still need this?

<exclusion>
<groupId>javax.ws.rs</groupId>
<artifactId>jsr311-api</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>javax.ws.rs</groupId>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.deploy.kubernetes

import java.io.File

import scala.collection.JavaConverters._

import io.fabric8.kubernetes.api.model.{ContainerBuilder, KeyToPathBuilder, PodBuilder}

import org.apache.spark.deploy.kubernetes.constants._
import org.apache.spark.internal.Logging

/**
* This is separated out from the HadoopConf steps API because this component can be reused to
* set up the Hadoop Configuration for executors as well.
*/
private[spark] trait HadoopConfBootstrap {
/**
* Bootstraps a main container with the ConfigMaps containing Hadoop config files
* mounted as volumes and an ENV variable pointing to the mounted file.
*/
def bootstrapMainContainerAndVolumes(
originalPodWithMainContainer: PodWithMainContainer)
: PodWithMainContainer
}

private[spark] class HadoopConfBootstrapImpl(
hadoopConfConfigMapName: String,
hadoopConfigFiles: Seq[File],
hadoopUGI: HadoopUGIUtil) extends HadoopConfBootstrap with Logging{

override def bootstrapMainContainerAndVolumes(
originalPodWithMainContainer: PodWithMainContainer)
: PodWithMainContainer = {
logInfo("HADOOP_CONF_DIR defined. Mounting HDFS specific .xml files")
val keyPaths = hadoopConfigFiles.map(file =>
new KeyToPathBuilder()
.withKey(file.toPath.getFileName.toString)
.withPath(file.toPath.getFileName.toString)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Key and path are exactly same. Is this intended? If yes, why not use a set?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is intended to make my life easier when looking at the configmap and seeing the data be: core-site.xml

.build()).toList
val hadoopSupportedPod = new PodBuilder(originalPodWithMainContainer.pod)
.editSpec()
.addNewVolume()
.withName(HADOOP_FILE_VOLUME)
.withNewConfigMap()
.withName(hadoopConfConfigMapName)
.withItems(keyPaths.asJava)
.endConfigMap()
.endVolume()
.endSpec()
.build()
val mainContainerWithMountedHadoopConf = new ContainerBuilder(
originalPodWithMainContainer.mainContainer)
.addNewVolumeMount()
.withName(HADOOP_FILE_VOLUME)
.withMountPath(HADOOP_CONF_DIR_PATH)
.endVolumeMount()
.addNewEnv()
.withName(ENV_HADOOP_CONF_DIR)
.withValue(HADOOP_CONF_DIR_PATH)
.endEnv()
.addNewEnv()
.withName(ENV_SPARK_USER)
.withValue(hadoopUGI.getShortName)
.endEnv()
.build()
originalPodWithMainContainer.copy(
pod = hadoopSupportedPod,
mainContainer = mainContainerWithMountedHadoopConf)
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.deploy.kubernetes

import org.apache.hadoop.security.UserGroupInformation

private[spark] class HadoopUGIUtil{
def getCurrentUser: UserGroupInformation = UserGroupInformation.getCurrentUser
def getShortName: String = getCurrentUser.getShortUserName
def isSecurityEnabled: Boolean = UserGroupInformation.isSecurityEnabled
def loginUserFromKeytabAndReturnUGI(principal: String, keytab: String): UserGroupInformation =
UserGroupInformation.loginUserFromKeytabAndReturnUGI(principal, keytab)
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.deploy.kubernetes

import io.fabric8.kubernetes.api.model.{ContainerBuilder, PodBuilder}

import org.apache.spark.deploy.kubernetes.constants._
import org.apache.spark.internal.Logging


/**
* This is separated out from the HadoopConf steps API because this component can be reused to
* mounted the DT secret for executors as well.
*/
private[spark] trait KerberosTokenBootstrapConf {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Make the trail name match the file name, KerberosTokenConfBootstrap?

// Bootstraps a main container with the Secret mounted as volumes and an ENV variable
// pointing to the mounted file containing the DT for Secure HDFS interaction
def bootstrapMainContainerAndVolumes(
originalPodWithMainContainer: PodWithMainContainer)
: PodWithMainContainer
}

private[spark] class KerberosTokenConfBootstrapImpl(
secretName: String,
secretItemKey: String,
userName: String) extends KerberosTokenBootstrapConf with Logging{


override def bootstrapMainContainerAndVolumes(
originalPodWithMainContainer: PodWithMainContainer)
: PodWithMainContainer = {
logInfo("Mounting HDFS DT from Secret for Secure HDFS")
val dtMountedPod = new PodBuilder(originalPodWithMainContainer.pod)
.editOrNewSpec()
.addNewVolume()
.withName(SPARK_APP_HADOOP_SECRET_VOLUME_NAME)
.withNewSecret()
.withSecretName(secretName)
.endSecret()
.endVolume()
.endSpec()
.build()
val mainContainerWithMountedKerberos = new ContainerBuilder(
originalPodWithMainContainer.mainContainer)
.addNewVolumeMount()
.withName(SPARK_APP_HADOOP_SECRET_VOLUME_NAME)
.withMountPath(SPARK_APP_HADOOP_CREDENTIALS_BASE_DIR)
.endVolumeMount()
.addNewEnv()
.withName(ENV_HADOOP_TOKEN_FILE_LOCATION)
.withValue(s"$SPARK_APP_HADOOP_CREDENTIALS_BASE_DIR/$secretItemKey")
.endEnv()
.addNewEnv()
.withName(ENV_SPARK_USER)
.withValue(userName)
.endEnv()
.build()
originalPodWithMainContainer.copy(
pod = dtMountedPod,
mainContainer = mainContainerWithMountedKerberos)
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.deploy.kubernetes

import io.fabric8.kubernetes.api.model.{Container, Pod}

/**
* The purpose of this case class is so that we can package together
* the driver pod with its container so we can bootstrap and modify
* the class instead of each component seperately
*/
private[spark] case class PodWithMainContainer(
pod: Pod,
mainContainer: Container)
Original file line number Diff line number Diff line change
Expand Up @@ -542,6 +542,42 @@ package object config extends Logging {

private[spark] val KUBERNETES_NODE_SELECTOR_PREFIX = "spark.kubernetes.node.selector."

private[spark] val KUBERNETES_KERBEROS_SUPPORT =
ConfigBuilder("spark.kubernetes.kerberos.enabled")
.doc("Specify whether your job is a job that will require a Delegation Token to access HDFS")
.booleanConf
.createWithDefault(false)

private[spark] val KUBERNETES_KERBEROS_KEYTAB =
ConfigBuilder("spark.kubernetes.kerberos.keytab")
.doc("Specify the location of keytab" +
" for Kerberos in order to access Secure HDFS")
.stringConf
.createOptional

private[spark] val KUBERNETES_KERBEROS_PRINCIPAL =
ConfigBuilder("spark.kubernetes.kerberos.principal")
.doc("Specify the principal" +
" for Kerberos in order to access Secure HDFS")
.stringConf
.createOptional

private[spark] val KUBERNETES_KERBEROS_DT_SECRET_NAME =
ConfigBuilder("spark.kubernetes.kerberos.tokensecret.name")
.doc("Specify the name of the secret where " +
" your existing delegation token is stored. This removes the need" +
" for the job user to provide any keytab for launching a job")
.stringConf
.createOptional

private[spark] val KUBERNETES_KERBEROS_DT_SECRET_ITEM_KEY =
ConfigBuilder("spark.kubernetes.kerberos.tokensecret.itemkey")
.doc("Specify the item key of the data where " +
" your existing delegation token is stored. This removes the need" +
" for the job user to provide any keytab for launching a job")
.stringConf
.createOptional

private[spark] def resolveK8sMaster(rawMasterString: String): String = {
if (!rawMasterString.startsWith("k8s://")) {
throw new IllegalArgumentException("Master URL should start with k8s:// in Kubernetes mode.")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,9 +45,6 @@ package object constants {

// Hadoop credentials secrets for the Spark app.
private[spark] val SPARK_APP_HADOOP_CREDENTIALS_BASE_DIR = "/mnt/secrets/hadoop-credentials"
private[spark] val SPARK_APP_HADOOP_TOKEN_FILE_SECRET_NAME = "hadoop-token-file"
private[spark] val SPARK_APP_HADOOP_TOKEN_FILE_PATH =
s"$SPARK_APP_HADOOP_CREDENTIALS_BASE_DIR/$SPARK_APP_HADOOP_TOKEN_FILE_SECRET_NAME"
private[spark] val SPARK_APP_HADOOP_SECRET_VOLUME_NAME = "hadoop-secret"

// Default and fixed ports
Expand Down Expand Up @@ -79,6 +76,7 @@ package object constants {
private[spark] val ENV_JAVA_OPT_PREFIX = "SPARK_JAVA_OPT_"
private[spark] val ENV_MOUNTED_FILES_FROM_SECRET_DIR = "SPARK_MOUNTED_FILES_FROM_SECRET_DIR"
private[spark] val ENV_HADOOP_TOKEN_FILE_LOCATION = "HADOOP_TOKEN_FILE_LOCATION"
private[spark] val ENV_SPARK_USER = "SPARK_USER"

// Bootstrapping dependencies with the init-container
private[spark] val INIT_CONTAINER_ANNOTATION = "pod.beta.kubernetes.io/init-containers"
Expand All @@ -101,6 +99,26 @@ package object constants {
private[spark] val DEFAULT_SHUFFLE_MOUNT_NAME = "shuffle"
private[spark] val INIT_CONTAINER_SECRET_VOLUME_NAME = "spark-init-secret"

// Hadoop Configuration
private[spark] val HADOOP_FILE_VOLUME = "hadoop-properties"
private[spark] val HADOOP_CONF_DIR_PATH = "/etc/hadoop/conf"
private[spark] val ENV_HADOOP_CONF_DIR = "HADOOP_CONF_DIR"
private[spark] val HADOOP_CONF_DIR_LOC = "spark.kubernetes.hadoop.conf.dir"
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are these spark.kubernetes.xxx Spark config keys? I see some of them are used to pass info from the client to driver/executors. I wonder if there is a better way to pass info to them. I'm worried about potential conflict if future public config keys pick same names. Maybe we want to move these to config.scala using ConfigBuilder. And mark them as internal so we can minimize the chance.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is an interesting point. This will break out system, true. But this is our current point of communication towards the executors

private[spark] val HADOOP_CONFIG_MAP_SPARK_CONF_NAME =
"spark.kubernetes.hadoop.executor.hadoopconfigmapname"

// Kerberos Configuration
private[spark] val HADOOP_KERBEROS_SECRET_NAME =
"spark.kubernetes.kerberos.dt"
private[spark] val HADOOP_KERBEROS_CONF_SECRET =
"spark.kubernetes.kerberos.secretname"
private[spark] val HADOOP_KERBEROS_CONF_ITEM_KEY =
"spark.kubernetes.kerberos.itemkeyname"
private[spark] val KERBEROS_SECRET_LABEL_PREFIX =
"hadoop-tokens"
private[spark] val SPARK_HADOOP_PREFIX = "spark.hadoop."
private[spark] val HADOOP_SECURITY_AUTHENTICATION =
SPARK_HADOOP_PREFIX + "hadoop.security.authentication"
// Bootstrapping dependencies via a secret
private[spark] val MOUNTED_SMALL_FILES_SECRET_MOUNT_PATH = "/etc/spark-submitted-files"

Expand Down
Loading