Skip to content

Commit

Permalink
[SPARK-26605][YARN] Update AM's credentials when creating tokens.
Browse files Browse the repository at this point in the history
This ensures new executors in client mode also get the new tokens,
instead of being started with potentially expired tokens.

Closes apache#23523 from vanzin/SPARK-26605.

Authored-by: Marcelo Vanzin <[email protected]>
Signed-off-by: Marcelo Vanzin <[email protected]>
  • Loading branch information
Marcelo Vanzin committed Jan 23, 2019
1 parent 9814108 commit 10d7713
Showing 1 changed file with 11 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,16 @@ private[yarn] class AMCredentialRenewer(
val originalCreds = UserGroupInformation.getCurrentUser().getCredentials()
val ugi = doLogin()

ugi.doAs(new PrivilegedExceptionAction[Unit]() {
override def run(): Unit = {
startInternal(ugi, originalCreds)
}
})

ugi
}

private def startInternal(ugi: UserGroupInformation, originalCreds: Credentials): Unit = {
val tgtRenewalTask = new Runnable() {
override def run(): Unit = {
ugi.checkTGTAndReloginFromKeytab()
Expand All @@ -104,8 +114,6 @@ private[yarn] class AMCredentialRenewer(
val existing = ugi.getCredentials()
existing.mergeAll(originalCreds)
ugi.addCredentials(existing)

ugi
}

def stop(): Unit = {
Expand Down Expand Up @@ -136,8 +144,8 @@ private[yarn] class AMCredentialRenewer(
// This shouldn't really happen, since the driver should register way before tokens expire
// (or the AM should time out the application).
logWarning("Delegation tokens close to expiration but no driver has registered yet.")
SparkHadoopUtil.get.addDelegationTokens(tokens, sparkConf)
}
SparkHadoopUtil.get.addDelegationTokens(tokens, sparkConf)
} catch {
case e: Exception =>
val delay = TimeUnit.SECONDS.toMillis(sparkConf.get(CREDENTIALS_RENEWAL_RETRY_WAIT))
Expand Down

0 comments on commit 10d7713

Please sign in to comment.