Skip to content

Commit

Permalink
[SPARK-23639][SQL] Obtain token before init metastore client in Spark…
Browse files Browse the repository at this point in the history
…SQL CLI

## What changes were proposed in this pull request?

In SparkSQLCLI, SessionState generates before SparkContext instantiating. When we use --proxy-user to impersonate, it's unable to initializing a metastore client to talk to the secured metastore for no kerberos ticket.

This PR use real user ugi to obtain token for owner before talking to kerberized metastore.

## How was this patch tested?

Manually verified with kerberized hive metasotre / hdfs.

Author: Kent Yao <[email protected]>

Closes #20784 from yaooqinn/SPARK-23639.
  • Loading branch information
yaooqinn authored and Marcelo Vanzin committed Mar 29, 2018
1 parent 491ec11 commit a7755fd
Show file tree
Hide file tree
Showing 2 changed files with 13 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ import org.apache.spark.internal.Logging
import org.apache.spark.internal.config.KEYTAB
import org.apache.spark.util.Utils

private[security] class HiveDelegationTokenProvider
private[spark] class HiveDelegationTokenProvider
extends HadoopDelegationTokenProvider with Logging {

override def serviceName: String = "hive"
Expand Down Expand Up @@ -124,9 +124,9 @@ private[security] class HiveDelegationTokenProvider
val currentUser = UserGroupInformation.getCurrentUser()
val realUser = Option(currentUser.getRealUser()).getOrElse(currentUser)

// For some reason the Scala-generated anonymous class ends up causing an
// UndeclaredThrowableException, even if you annotate the method with @throws.
try {
// For some reason the Scala-generated anonymous class ends up causing an
// UndeclaredThrowableException, even if you annotate the method with @throws.
try {
realUser.doAs(new PrivilegedExceptionAction[T]() {
override def run(): T = fn
})
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,11 +34,13 @@ import org.apache.hadoop.hive.ql.Driver
import org.apache.hadoop.hive.ql.exec.Utilities
import org.apache.hadoop.hive.ql.processors._
import org.apache.hadoop.hive.ql.session.SessionState
import org.apache.hadoop.security.{Credentials, UserGroupInformation}
import org.apache.log4j.{Level, Logger}
import org.apache.thrift.transport.TSocket

import org.apache.spark.SparkConf
import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.deploy.security.HiveDelegationTokenProvider
import org.apache.spark.internal.Logging
import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.hive.HiveUtils
Expand Down Expand Up @@ -121,6 +123,13 @@ private[hive] object SparkSQLCLIDriver extends Logging {
}
}

val tokenProvider = new HiveDelegationTokenProvider()
if (tokenProvider.delegationTokensRequired(sparkConf, hadoopConf)) {
val credentials = new Credentials()
tokenProvider.obtainDelegationTokens(hadoopConf, sparkConf, credentials)
UserGroupInformation.getCurrentUser.addCredentials(credentials)
}

SessionState.start(sessionState)

// Clean up after we exit
Expand Down

0 comments on commit a7755fd

Please sign in to comment.