Skip to content

Commit

Permalink
Revert "[Bug] [connector-hive] Eanble login with kerberos for hive (a…
Browse files Browse the repository at this point in the history
…pache#6893)"

This reverts commit 566b20d.
  • Loading branch information
LeonYoah committed Jun 27, 2024
1 parent 566b20d commit c042403
Showing 1 changed file with 26 additions and 27 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,18 +17,18 @@

package org.apache.seatunnel.connectors.seatunnel.hive.utils;

import org.apache.seatunnel.api.configuration.ReadonlyConfig;
import org.apache.seatunnel.shade.com.typesafe.config.Config;

import org.apache.seatunnel.common.config.TypesafeConfigUtils;
import org.apache.seatunnel.connectors.seatunnel.file.config.BaseSourceConfigOptions;
import org.apache.seatunnel.connectors.seatunnel.file.hadoop.HadoopLoginFactory;
import org.apache.seatunnel.connectors.seatunnel.hive.config.HiveConfig;
import org.apache.seatunnel.connectors.seatunnel.hive.exception.HiveConnectorErrorCode;
import org.apache.seatunnel.connectors.seatunnel.hive.exception.HiveConnectorException;
import org.apache.seatunnel.connectors.seatunnel.hive.source.config.HiveSourceOptions;

import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
import org.apache.hadoop.hive.metastore.api.AlreadyExistsException;
import org.apache.hadoop.hive.metastore.api.MetaException;
import org.apache.hadoop.hive.metastore.api.Table;
import org.apache.thrift.TException;
Expand All @@ -46,33 +46,36 @@ public class HiveMetaStoreProxy {
private HiveMetaStoreClient hiveMetaStoreClient;
private static volatile HiveMetaStoreProxy INSTANCE = null;

private HiveMetaStoreProxy(ReadonlyConfig readonlyConfig) {
String metastoreUri = readonlyConfig.get(HiveSourceOptions.METASTORE_URI);
HiveConf hiveConf = new HiveConf();
hiveConf.set("hive.metastore.uris", metastoreUri);
private HiveMetaStoreProxy(Config config) {
String metastoreUri = config.getString(HiveConfig.METASTORE_URI.key());

try {
if (StringUtils.isNotEmpty(readonlyConfig.get(HiveSourceOptions.HIVE_SITE_PATH))) {
String hiveSitePath = readonlyConfig.get(HiveSourceOptions.HIVE_SITE_PATH);
HiveConf hiveConf = new HiveConf();
hiveConf.set("hive.metastore.uris", metastoreUri);
if (config.hasPath(HiveConfig.HIVE_SITE_PATH.key())) {
String hiveSitePath = config.getString(HiveConfig.HIVE_SITE_PATH.key());
hiveConf.addResource(new File(hiveSitePath).toURI().toURL());
}
if (HiveMetaStoreProxyUtils.enableKerberos(readonlyConfig)) {
Configuration hadoopConfig = new Configuration();
hadoopConfig.set("hadoop.security.authentication", "kerberos");
if (HiveMetaStoreProxyUtils.enableKerberos(config)) {
this.hiveMetaStoreClient =
HadoopLoginFactory.loginWithKerberos(
hadoopConfig,
readonlyConfig.get(BaseSourceConfigOptions.KRB5_PATH),
readonlyConfig.get(BaseSourceConfigOptions.KERBEROS_PRINCIPAL),
readonlyConfig.get(BaseSourceConfigOptions.KERBEROS_KEYTAB_PATH),
new Configuration(),
TypesafeConfigUtils.getConfig(
config,
BaseSourceConfigOptions.KRB5_PATH.key(),
BaseSourceConfigOptions.KRB5_PATH.defaultValue()),
config.getString(BaseSourceConfigOptions.KERBEROS_PRINCIPAL.key()),
config.getString(
BaseSourceConfigOptions.KERBEROS_KEYTAB_PATH.key()),
(configuration, userGroupInformation) ->
new HiveMetaStoreClient(hiveConf));
return;
}
if (HiveMetaStoreProxyUtils.enableRemoteUser(readonlyConfig)) {
if (HiveMetaStoreProxyUtils.enableRemoteUser(config)) {
this.hiveMetaStoreClient =
HadoopLoginFactory.loginWithRemoteUser(
new Configuration(),
readonlyConfig.get(BaseSourceConfigOptions.REMOTE_USER),
config.getString(BaseSourceConfigOptions.REMOTE_USER.key()),
(configuration, userGroupInformation) ->
new HiveMetaStoreClient(hiveConf));
return;
Expand All @@ -91,7 +94,7 @@ private HiveMetaStoreProxy(ReadonlyConfig readonlyConfig) {
String.format(
"Using this hive uris [%s], hive conf [%s] to initialize "
+ "hive metastore client instance failed",
metastoreUri, readonlyConfig.get(HiveSourceOptions.HIVE_SITE_PATH));
metastoreUri, config.getString(HiveConfig.HIVE_SITE_PATH.key()));
throw new HiveConnectorException(
HiveConnectorErrorCode.INITIALIZE_HIVE_METASTORE_CLIENT_FAILED, errorMsg, e);
} catch (Exception e) {
Expand All @@ -102,11 +105,11 @@ private HiveMetaStoreProxy(ReadonlyConfig readonlyConfig) {
}
}

public static HiveMetaStoreProxy getInstance(ReadonlyConfig readonlyConfig) {
public static HiveMetaStoreProxy getInstance(Config config) {
if (INSTANCE == null) {
synchronized (HiveMetaStoreProxy.class) {
if (INSTANCE == null) {
INSTANCE = new HiveMetaStoreProxy(readonlyConfig);
INSTANCE = new HiveMetaStoreProxy(config);
}
}
}
Expand All @@ -128,11 +131,7 @@ public void addPartitions(
@NonNull String dbName, @NonNull String tableName, List<String> partitions)
throws TException {
for (String partition : partitions) {
try {
hiveMetaStoreClient.appendPartition(dbName, tableName, partition);
} catch (AlreadyExistsException e) {
log.warn("The partition {} are already exists", partition);
}
hiveMetaStoreClient.appendPartition(dbName, tableName, partition);
}
}

Expand Down

0 comments on commit c042403

Please sign in to comment.