Skip to content
This repository has been archived by the owner on Jan 9, 2020. It is now read-only.

Secure HDFS Support #414

Conversation

ifilonenko
Copy link
Member

@ifilonenko ifilonenko commented Aug 2, 2017

What changes were proposed in this pull request?

This it the on-going work of setting up Secure HDFS interaction with Spark-on-K8S.
The architecture is discussed in this community-wide google doc
This initiative can be broken down into 4 stages.

STAGE 1

  • Detecting HADOOP_CONF_DIR environmental variable and using Config Maps to store all Hadoop config files locally, while also setting HADOOP_CONF_DIR locally in the driver / executors

STAGE 2

  • Grabbing TGT from LTC or using keytabs+principle and creating a DT that will be mounted as a secret

STAGE 3

  • Driver + Executor Logic

How was this patch tested?

  • E2E Integration tests
    • Stage 1
    • Stage 2
    • Stage 3
  • Unit tests
    • Stage 1
    • Stage 2
    • Stage 3

Docs and Error Handling?

  • Docs
  • Error Handling

@ifilonenko ifilonenko mentioned this pull request Aug 2, 2017
14 tasks
@ifilonenko
Copy link
Member Author

rerun unit tests please

@ifilonenko
Copy link
Member Author

rerun integration tests please

1 similar comment
@ifilonenko
Copy link
Member Author

rerun integration tests please

@ifilonenko
Copy link
Member Author

rerun unit tests please

@kimoonkim
Copy link
Member

@ifilonenko and I talked offline. I am doing a preliminary review on this end-to-end prototype. After this review, we want to break this into smaller PRs and add unit tests to them.

@ifilonenko
Copy link
Member Author

rerun integration tests please

Copy link
Member

@kimoonkim kimoonkim left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Starting a preliminary review on this end-to-end prototype. After this review, we want to break this into smaller PRs and add unit tests to them.

The change looks good overall. I was able to follow the code relatively easily.

<td><code>spark.kubernetes.kerberos</code></td>
<td>false</td>
<td>
Specify whether your job is a job that will require a Delegation Token to access HDFS. By default, we
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I feel like "Delegation Token" is too much detail for this user-exposed documentation. Rewrite this to mention Kerberos, but omit "Delegation Token"?

@@ -768,6 +768,53 @@ from the other deployment modes. See the [configuration page](configuration.html
<code>myIdentifier</code>. Multiple node selector keys can be added by setting multiple configurations with this prefix.
</td>
</tr>
<tr>
<td><code>spark.kubernetes.kerberos</code></td>
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Indicate this is a boolean by adding .enabled to the name? Many boolean flags have the suffix.

<td>
Assuming you have set <code>spark.kubernetes.kerberos</code> to be true. This will let you specify
the location of your Kerberos keytab to be used in order to access Secure HDFS. This is optional as you
may login by running <code>kinit -kt</code> before running the spark-submit, and the submission client
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

s/ -kt//. -kt is for keytab. kinit can be invoked without keytab file like $ kinit <USERNAME>, which also allows you to avoid using this option.

<td>
Assuming you have set <code>spark.kubernetes.kerberos</code> to be true. This will let you specify
your Kerberos principal that you wish to use to access Secure HDFS. This is optional as you
may login by running <code>kinit -kt</code> before running the spark-submit, and the submission client
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ditto.

<td><code>spark.kubernetes.kerberos.tokensecret.name</code></td>
<td>(none)</td>
<td>
Assuming you have set <code>spark.kubernetes.kerberos</code> to be true. This will let you specify
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Mention this is optional if the user does not want to use an existing delegation token?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What do you mean?

// Spark core providers to handle delegation token renewal
renewer = jobUserUGI.getShortUserName
logInfo(s"Renewer is: $renewer")
renewedCredentials = new Credentials(originalCredentials)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

s/renewedCredentials/credentials/

renewedCredentials = new Credentials(originalCredentials)
dfs.addDelegationTokens(renewer, renewedCredentials)
renewedTokens = renewedCredentials.getAllTokens.asScala
logInfo(s"Renewed tokens: ${renewedCredentials.toString}")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

s/Renewed //

dfs.addDelegationTokens(renewer, renewedCredentials)
renewedTokens = renewedCredentials.getAllTokens.asScala
logInfo(s"Renewed tokens: ${renewedCredentials.toString}")
logInfo(s"All renewed tokens: ${renewedTokens.mkString(",")}")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

s/ renewed tokens/tokens/

renewedTokens = renewedCredentials.getAllTokens.asScala
logInfo(s"Renewed tokens: ${renewedCredentials.toString}")
logInfo(s"All renewed tokens: ${renewedTokens.mkString(",")}")
logInfo(s"All renewed secret keys: ${renewedCredentials.getAllSecretKeys}")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

s/renewed secret/secret/

.withMountPath(HADOOP_FILE_DIR)
.endVolumeMount()
.addNewEnv()
.withName(HADOOP_CONF_DIR)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not sure setting the HADOOP_CONF_DIR env alone will make the driver and executor JVMs to find these hadoop config files. The config dir path should be included in the classpath of the JVMs when the JVMs start up. Given our driver and executor Dockerfiles launch the JVM directly (although through tini), I don't think any software layer will pick up the HADOOP_CONF_DIR env and put that in the JVM classpath.

I think we want to modify the command line of the Dockerfiles to include HADOOP_CONF_DIR env. Here's executor command line part for an example. We can add HADOOP_CONF_DIR to SPARK_CLASSPATH here. And do the same thing for the driver:


CMD SPARK_CLASSPATH=“${SPARK_HOME}/jars/*” && \
    if ! [ -z ${SPARK_MOUNTED_CLASSPATH}+x} ]; then SPARK_CLASSPATH=“$SPARK_MOUNTED_CLASSPATH:$SPARK_CLASSPATH”; fi && \
    if ! [ -z ${SPARK_EXECUTOR_EXTRA_CLASSPATH+x} ]; then SPARK_CLASSPATH=“$SPARK_EXECUTOR_EXTRA_CLASSPATH:$SPARK_CLASSPATH”; fi && \
    if ! [ -z ${SPARK_EXTRA_CLASSPATH+x} ]; then SPARK_CLASSPATH=“$SPARK_EXTRA_CLASSPATH:$SPARK_CLASSPATH”; fi && \
    if ! [ -z ${SPARK_MOUNTED_FILES_DIR} ]; then cp -R “$SPARK_MOUNTED_FILES_DIR/.” .; fi && \
    exec /sbin/tini -- ${JAVA_HOME}/bin/java -Dspark.executor.port=$SPARK_EXECUTOR_PORT -Xms$SPARK_EXECUTOR_MEMORY -Xmx$SPARK_EXECUTOR_MEMORY -cp $SPARK_CLASSPATH org.apache.spark.executor.CoarseGrainedExecutorBackend --driver-url $SPARK_DRIVER_URL --executor-id $SPARK_EXECUTOR_ID --cores $SPARK_EXECUTOR_CORES --app-id $SPARK_APPLICATION_ID --hostname $SPARK_EXECUTOR_POD_IP

@ifilonenko
Copy link
Member Author

Partial mocking of UGI functions has been done, with the exception of the FileSystem portion in the KeytabResolverStep.

Garbage Collection of the secret post job is already handled by the Client.scala OwnerReference.

Current failures in integration tests are due to issues found after rebasing PRs. Will be addressed before ready for merging

@ifilonenko
Copy link
Member Author

rerun integration tests please

1 similar comment
@ifilonenko
Copy link
Member Author

rerun integration tests please

Copy link
Member

@kimoonkim kimoonkim left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I finished looking a the change. Looks good to me overall. Thanks for putting this together.

Left a few comments. PTAL.

Perhaps we can merge this into the target branch hdfs-kerberos-support soon after this round. Then start breaking this into multiple PRs heading to branch-2.2-kubernetes. Like one PR for the main code and another for the integration test code.

<td>spark.kubernetes.kerberos.dt.label</td>
<td>
Assuming you have set <code>spark.kubernetes.kerberos.enabled</code> to be true. This will let you specify
the label within the pre-specified secret where the data of your existing delegation token data is stored.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

s/label/data item key name/

<td>
Assuming you have set <code>spark.kubernetes.kerberos.enabled</code> to be true. This will let you specify
the name of the secret where your existing delegation token data is stored. You must also specify the
item key <code>spark.kubernetes.kerberos.tokensecret.itemkey</code> where your data is stored on the secret.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you mention that this is optional in case you want to use pre-existing secret and a new secret will be automatically created otherwise?

@@ -100,6 +100,12 @@
<dependency>
<groupId>com.fasterxml.jackson.jaxrs</groupId>
<artifactId>jackson-jaxrs-json-provider</artifactId>
<exclusions>
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm. Do we still need this?

* This is separated out from the HadoopConf steps API because this component can be reused to
* mounted the DT secret for executors as well.
*/
private[spark] trait KerberosTokenBootstrapConf {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Make the trail name match the file name, KerberosTokenConfBootstrap?

val maybeHadoopConfigMap = sparkConf.getOption(HADOOP_CONFIG_MAP_SPARK_CONF_NAME)
val maybeHadoopConfDir = sparkConf.getOption(HADOOP_CONF_DIR_LOC)
val maybeDTSecretName = sparkConf.getOption(HADOOP_KERBEROS_CONF_SECRET)
val maybeDTLabelName = sparkConf.getOption(HADOOP_KERBEROS_CONF_ITEM_KEY)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

s/LabelName/DataItem/

}})
credentials.getAllTokens.asScala.isEmpty
tokens.isEmpty
if (tokens.isEmpty) logError("Did not obtain any Delegation Tokens")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should throw an exception. Otherwise, it's hard to debug this downstream.

val secretDT =
new SecretBuilder()
.withNewMetadata()
.withName(HADOOP_KERBEROS_SECRET_NAME)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldn't we generate a unique secret name so that multiple jobs can run simultaneously using different secrets?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

On the same namespace why would you have the need for different secrets? I guess the problem would happen when the ownerRef destroy's the secret while another job is processing.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, if we have multiple jobs, it's better to have one secret per job.

tokens.isEmpty
if (tokens.isEmpty) logError("Did not obtain any Delegation Tokens")
val data = serialize(credentials)
val renewalTime = getTokenRenewalInterval(tokens, hadoopConf).getOrElse(Long.MaxValue)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

s/renewalTime/renewalInterval/

hadoopConfDir)
val maybeKerberosStep =
if (isKerberosEnabled) {
maybeExistingSecret.map(secretItemKey => Some(new HadoopKerberosSecretResolverStep(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

s/secretItemKey/existingSecretName/

logInfo(s"Renewal interval is $interval for token ${token.getKind.toString}")
interval
}.toOption}
if (renewIntervals.isEmpty) None else Some(renewIntervals.min)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just as a note to myself, I see now that we'll return the earliest expiration time in case there are multiple tokens.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Correct

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe we should add a comment about this, because it wasn't easy to discover.

currentHadoopSpec = nextStep.configureContainers(currentHadoopSpec)
}
val configMap =
new ConfigMapBuilder()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also use owner ref here so this gets deleted after the job is done?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe since all of these are added to the driverSpec: otherKubernetesResources the ownerref is applied after the driverPod is launched in Client.scala

@kimoonkim
Copy link
Member

The latest commit addressed most of my comments. Looks great to me. Thanks @ifilonenko for the work so far.

@ifilonenko
Copy link
Member Author

@erikerlandson after all tests pass, can you give the final okay before merge?

@erikerlandson
Copy link
Member

LGTM, and passing CI. This is good to merge when we're ready!

@foxish
Copy link
Member

foxish commented Sep 20, 2017

Let's merge after cutting the new release and tagging.

@ifilonenko
Copy link
Member Author

important note. this PR will require refactoring upon merging because of most recent commits with renaming and unit test additions to the KubernetesSchedulerBackend. These changes will be handled on the hdfs-kerberos-support branch directly.

@erikerlandson erikerlandson merged commit 569f73c into apache-spark-on-k8s:hdfs-kerberos-support Sep 28, 2017
ifilonenko pushed a commit to ifilonenko/spark that referenced this pull request Feb 26, 2019
Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants