-
Notifications
You must be signed in to change notification settings - Fork 118
Secure HDFS Support #414
Secure HDFS Support #414
Conversation
…ILE_DIR for Volume mount
…erized cluster in integration tests
rerun unit tests please |
rerun integration tests please |
1 similar comment
rerun integration tests please |
rerun unit tests please |
@ifilonenko and I talked offline. I am doing a preliminary review on this end-to-end prototype. After this review, we want to break this into smaller PRs and add unit tests to them. |
rerun integration tests please |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Starting a preliminary review on this end-to-end prototype. After this review, we want to break this into smaller PRs and add unit tests to them.
The change looks good overall. I was able to follow the code relatively easily.
docs/running-on-kubernetes.md
Outdated
<td><code>spark.kubernetes.kerberos</code></td> | ||
<td>false</td> | ||
<td> | ||
Specify whether your job is a job that will require a Delegation Token to access HDFS. By default, we |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I feel like "Delegation Token" is too much detail for this user-exposed documentation. Rewrite this to mention Kerberos, but omit "Delegation Token"?
docs/running-on-kubernetes.md
Outdated
@@ -768,6 +768,53 @@ from the other deployment modes. See the [configuration page](configuration.html | |||
<code>myIdentifier</code>. Multiple node selector keys can be added by setting multiple configurations with this prefix. | |||
</td> | |||
</tr> | |||
<tr> | |||
<td><code>spark.kubernetes.kerberos</code></td> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Indicate this is a boolean by adding .enabled
to the name? Many boolean flags have the suffix.
docs/running-on-kubernetes.md
Outdated
<td> | ||
Assuming you have set <code>spark.kubernetes.kerberos</code> to be true. This will let you specify | ||
the location of your Kerberos keytab to be used in order to access Secure HDFS. This is optional as you | ||
may login by running <code>kinit -kt</code> before running the spark-submit, and the submission client |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
s/ -kt//. -kt
is for keytab. kinit
can be invoked without keytab file like $ kinit <USERNAME>
, which also allows you to avoid using this option.
docs/running-on-kubernetes.md
Outdated
<td> | ||
Assuming you have set <code>spark.kubernetes.kerberos</code> to be true. This will let you specify | ||
your Kerberos principal that you wish to use to access Secure HDFS. This is optional as you | ||
may login by running <code>kinit -kt</code> before running the spark-submit, and the submission client |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ditto.
docs/running-on-kubernetes.md
Outdated
<td><code>spark.kubernetes.kerberos.tokensecret.name</code></td> | ||
<td>(none)</td> | ||
<td> | ||
Assuming you have set <code>spark.kubernetes.kerberos</code> to be true. This will let you specify |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Mention this is optional if the user does not want to use an existing delegation token?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What do you mean?
// Spark core providers to handle delegation token renewal | ||
renewer = jobUserUGI.getShortUserName | ||
logInfo(s"Renewer is: $renewer") | ||
renewedCredentials = new Credentials(originalCredentials) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
s/renewedCredentials/credentials/
renewedCredentials = new Credentials(originalCredentials) | ||
dfs.addDelegationTokens(renewer, renewedCredentials) | ||
renewedTokens = renewedCredentials.getAllTokens.asScala | ||
logInfo(s"Renewed tokens: ${renewedCredentials.toString}") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
s/Renewed //
dfs.addDelegationTokens(renewer, renewedCredentials) | ||
renewedTokens = renewedCredentials.getAllTokens.asScala | ||
logInfo(s"Renewed tokens: ${renewedCredentials.toString}") | ||
logInfo(s"All renewed tokens: ${renewedTokens.mkString(",")}") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
s/ renewed tokens/tokens/
renewedTokens = renewedCredentials.getAllTokens.asScala | ||
logInfo(s"Renewed tokens: ${renewedCredentials.toString}") | ||
logInfo(s"All renewed tokens: ${renewedTokens.mkString(",")}") | ||
logInfo(s"All renewed secret keys: ${renewedCredentials.getAllSecretKeys}") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
s/renewed secret/secret/
.withMountPath(HADOOP_FILE_DIR) | ||
.endVolumeMount() | ||
.addNewEnv() | ||
.withName(HADOOP_CONF_DIR) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am not sure setting the HADOOP_CONF_DIR env alone will make the driver and executor JVMs to find these hadoop config files. The config dir path should be included in the classpath of the JVMs when the JVMs start up. Given our driver and executor Dockerfiles launch the JVM directly (although through tini), I don't think any software layer will pick up the HADOOP_CONF_DIR env and put that in the JVM classpath.
I think we want to modify the command line of the Dockerfiles to include HADOOP_CONF_DIR env. Here's executor command line part for an example. We can add HADOOP_CONF_DIR to SPARK_CLASSPATH here. And do the same thing for the driver:
CMD SPARK_CLASSPATH=“${SPARK_HOME}/jars/*” && \
if ! [ -z ${SPARK_MOUNTED_CLASSPATH}+x} ]; then SPARK_CLASSPATH=“$SPARK_MOUNTED_CLASSPATH:$SPARK_CLASSPATH”; fi && \
if ! [ -z ${SPARK_EXECUTOR_EXTRA_CLASSPATH+x} ]; then SPARK_CLASSPATH=“$SPARK_EXECUTOR_EXTRA_CLASSPATH:$SPARK_CLASSPATH”; fi && \
if ! [ -z ${SPARK_EXTRA_CLASSPATH+x} ]; then SPARK_CLASSPATH=“$SPARK_EXTRA_CLASSPATH:$SPARK_CLASSPATH”; fi && \
if ! [ -z ${SPARK_MOUNTED_FILES_DIR} ]; then cp -R “$SPARK_MOUNTED_FILES_DIR/.” .; fi && \
exec /sbin/tini -- ${JAVA_HOME}/bin/java -Dspark.executor.port=$SPARK_EXECUTOR_PORT -Xms$SPARK_EXECUTOR_MEMORY -Xmx$SPARK_EXECUTOR_MEMORY -cp $SPARK_CLASSPATH org.apache.spark.executor.CoarseGrainedExecutorBackend --driver-url $SPARK_DRIVER_URL --executor-id $SPARK_EXECUTOR_ID --cores $SPARK_EXECUTOR_CORES --app-id $SPARK_APPLICATION_ID --hostname $SPARK_EXECUTOR_POD_IP
Partial mocking of UGI functions has been done, with the exception of the FileSystem portion in the KeytabResolverStep. Garbage Collection of the secret post job is already handled by the Client.scala OwnerReference. Current failures in integration tests are due to issues found after rebasing PRs. Will be addressed before ready for merging |
Fix executor env to include simple authn
Fix a bug in executor env handling
Fix a bug in how the driver sets simple authn
rerun integration tests please |
1 similar comment
rerun integration tests please |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I finished looking a the change. Looks good to me overall. Thanks for putting this together.
Left a few comments. PTAL.
Perhaps we can merge this into the target branch hdfs-kerberos-support
soon after this round. Then start breaking this into multiple PRs heading to branch-2.2-kubernetes
. Like one PR for the main code and another for the integration test code.
docs/running-on-kubernetes.md
Outdated
<td>spark.kubernetes.kerberos.dt.label</td> | ||
<td> | ||
Assuming you have set <code>spark.kubernetes.kerberos.enabled</code> to be true. This will let you specify | ||
the label within the pre-specified secret where the data of your existing delegation token data is stored. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
s/label/data item key name/
<td> | ||
Assuming you have set <code>spark.kubernetes.kerberos.enabled</code> to be true. This will let you specify | ||
the name of the secret where your existing delegation token data is stored. You must also specify the | ||
item key <code>spark.kubernetes.kerberos.tokensecret.itemkey</code> where your data is stored on the secret. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you mention that this is optional in case you want to use pre-existing secret and a new secret will be automatically created otherwise?
@@ -100,6 +100,12 @@ | |||
<dependency> | |||
<groupId>com.fasterxml.jackson.jaxrs</groupId> | |||
<artifactId>jackson-jaxrs-json-provider</artifactId> | |||
<exclusions> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm. Do we still need this?
* This is separated out from the HadoopConf steps API because this component can be reused to | ||
* mounted the DT secret for executors as well. | ||
*/ | ||
private[spark] trait KerberosTokenBootstrapConf { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Make the trail name match the file name, KerberosTokenConfBootstrap?
val maybeHadoopConfigMap = sparkConf.getOption(HADOOP_CONFIG_MAP_SPARK_CONF_NAME) | ||
val maybeHadoopConfDir = sparkConf.getOption(HADOOP_CONF_DIR_LOC) | ||
val maybeDTSecretName = sparkConf.getOption(HADOOP_KERBEROS_CONF_SECRET) | ||
val maybeDTLabelName = sparkConf.getOption(HADOOP_KERBEROS_CONF_ITEM_KEY) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
s/LabelName/DataItem/
}}) | ||
credentials.getAllTokens.asScala.isEmpty | ||
tokens.isEmpty | ||
if (tokens.isEmpty) logError("Did not obtain any Delegation Tokens") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we should throw an exception. Otherwise, it's hard to debug this downstream.
val secretDT = | ||
new SecretBuilder() | ||
.withNewMetadata() | ||
.withName(HADOOP_KERBEROS_SECRET_NAME) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Shouldn't we generate a unique secret name so that multiple jobs can run simultaneously using different secrets?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
On the same namespace why would you have the need for different secrets? I guess the problem would happen when the ownerRef destroy's the secret while another job is processing.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, if we have multiple jobs, it's better to have one secret per job.
tokens.isEmpty | ||
if (tokens.isEmpty) logError("Did not obtain any Delegation Tokens") | ||
val data = serialize(credentials) | ||
val renewalTime = getTokenRenewalInterval(tokens, hadoopConf).getOrElse(Long.MaxValue) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
s/renewalTime/renewalInterval/
hadoopConfDir) | ||
val maybeKerberosStep = | ||
if (isKerberosEnabled) { | ||
maybeExistingSecret.map(secretItemKey => Some(new HadoopKerberosSecretResolverStep( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
s/secretItemKey/existingSecretName/
logInfo(s"Renewal interval is $interval for token ${token.getKind.toString}") | ||
interval | ||
}.toOption} | ||
if (renewIntervals.isEmpty) None else Some(renewIntervals.min) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just as a note to myself, I see now that we'll return the earliest expiration time in case there are multiple tokens.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Correct
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe we should add a comment about this, because it wasn't easy to discover.
currentHadoopSpec = nextStep.configureContainers(currentHadoopSpec) | ||
} | ||
val configMap = | ||
new ConfigMapBuilder() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also use owner ref here so this gets deleted after the job is done?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I believe since all of these are added to the driverSpec: otherKubernetesResources
the ownerref is applied after the driverPod is launched in Client.scala
The latest commit addressed most of my comments. Looks great to me. Thanks @ifilonenko for the work so far. |
@erikerlandson after all tests pass, can you give the final okay before merge? |
LGTM, and passing CI. This is good to merge when we're ready! |
Let's merge after cutting the new release and tagging. |
important note. this PR will require refactoring upon merging because of most recent commits with renaming and unit test additions to the |
What changes were proposed in this pull request?
This it the on-going work of setting up Secure HDFS interaction with Spark-on-K8S.
The architecture is discussed in this community-wide google doc
This initiative can be broken down into 4 stages.
STAGE 1
HADOOP_CONF_DIR
environmental variable and using Config Maps to store all Hadoop config files locally, while also settingHADOOP_CONF_DIR
locally in the driver / executorsSTAGE 2
TGT
fromLTC
or using keytabs+principle and creating aDT
that will be mounted as a secretSTAGE 3
How was this patch tested?
Docs and Error Handling?