From f7b6401d3cd631a851cdc0dd25166f0e3ce8edf7 Mon Sep 17 00:00:00 2001 From: tyrantlucifer Date: Fri, 30 Dec 2022 21:23:06 +0800 Subject: [PATCH 1/6] [Improve][Connector-V2][HdfsFile] Support kerberos authentication --- .../file/hdfs/sink/BaseHdfsFileSink.java | 6 +++++ .../file/hdfs/source/BaseHdfsFileSource.java | 6 +++++ .../seatunnel/file/config/BaseSinkConfig.java | 17 +++++++++++++ .../file/config/BaseSourceConfig.java | 18 ++++++++++++++ .../seatunnel/file/config/HadoopConf.java | 2 ++ .../sink/writer/AbstractWriteStrategy.java | 21 ++++++++++++++++ .../source/reader/AbstractReadStrategy.java | 24 +++++++++++++++++++ 7 files changed, 94 insertions(+) diff --git a/seatunnel-connectors-v2/connector-file/connector-file-base-hadoop/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/hdfs/sink/BaseHdfsFileSink.java b/seatunnel-connectors-v2/connector-file/connector-file-base-hadoop/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/hdfs/sink/BaseHdfsFileSink.java index 090f942c08b..79649fb443b 100644 --- a/seatunnel-connectors-v2/connector-file/connector-file-base-hadoop/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/hdfs/sink/BaseHdfsFileSink.java +++ b/seatunnel-connectors-v2/connector-file/connector-file-base-hadoop/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/hdfs/sink/BaseHdfsFileSink.java @@ -46,5 +46,11 @@ public void prepare(Config pluginConfig) throws PrepareFailException { if (pluginConfig.hasPath(BaseSinkConfig.HDFS_SITE_PATH.key())) { hadoopConf.setHdfsSitePath(pluginConfig.getString(BaseSinkConfig.HDFS_SITE_PATH.key())); } + if (pluginConfig.hasPath(BaseSinkConfig.KERBEROS_PRINCIPAL.key())) { + hadoopConf.setKerberosPrincipal(pluginConfig.getString(BaseSinkConfig.KERBEROS_PRINCIPAL.key())); + } + if (pluginConfig.hasPath(BaseSinkConfig.KERBEROS_KEYTAB_PATH.key())) { + hadoopConf.setKerberosKeytabPath(pluginConfig.getString(BaseSinkConfig.KERBEROS_KEYTAB_PATH.key())); + } } } diff --git a/seatunnel-connectors-v2/connector-file/connector-file-base-hadoop/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/hdfs/source/BaseHdfsFileSource.java b/seatunnel-connectors-v2/connector-file/connector-file-base-hadoop/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/hdfs/source/BaseHdfsFileSource.java index 4e67cf9bbb1..be7fc0e8c69 100644 --- a/seatunnel-connectors-v2/connector-file/connector-file-base-hadoop/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/hdfs/source/BaseHdfsFileSource.java +++ b/seatunnel-connectors-v2/connector-file/connector-file-base-hadoop/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/hdfs/source/BaseHdfsFileSource.java @@ -55,6 +55,12 @@ public void prepare(Config pluginConfig) throws PrepareFailException { if (pluginConfig.hasPath(HdfsSourceConfig.HDFS_SITE_PATH.key())) { hadoopConf.setHdfsSitePath(pluginConfig.getString(HdfsSourceConfig.HDFS_SITE_PATH.key())); } + if (pluginConfig.hasPath(HdfsSourceConfig.KERBEROS_PRINCIPAL.key())) { + hadoopConf.setKerberosPrincipal(pluginConfig.getString(HdfsSourceConfig.KERBEROS_PRINCIPAL.key())); + } + if (pluginConfig.hasPath(HdfsSourceConfig.KERBEROS_KEYTAB_PATH.key())) { + hadoopConf.setKerberosKeytabPath(pluginConfig.getString(HdfsSourceConfig.KERBEROS_KEYTAB_PATH.key())); + } try { filePaths = readStrategy.getFileNamesByPath(hadoopConf, path); } catch (IOException e) { diff --git a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/BaseSinkConfig.java b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/BaseSinkConfig.java index 9137576d55c..5abffc13f43 100644 --- a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/BaseSinkConfig.java +++ b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/BaseSinkConfig.java @@ -41,26 +41,32 @@ public class BaseSinkConfig { .stringType() .noDefaultValue() .withDescription("Compression codec"); + public static final Option DATE_FORMAT = Options.key("date_format") .enumType(DateUtils.Formatter.class) .defaultValue(DateUtils.Formatter.YYYY_MM_DD) .withDescription("Date format"); + public static final Option DATETIME_FORMAT = Options.key("datetime_format") .enumType(DateTimeUtils.Formatter.class) .defaultValue(DateTimeUtils.Formatter.YYYY_MM_DD_HH_MM_SS) .withDescription("Datetime format"); + public static final Option TIME_FORMAT = Options.key("time_format") .enumType(TimeUtils.Formatter.class) .defaultValue(TimeUtils.Formatter.HH_MM_SS) .withDescription("Time format"); + public static final Option FILE_PATH = Options.key("path") .stringType() .noDefaultValue() .withDescription("The file path of target files"); + public static final Option FIELD_DELIMITER = Options.key("field_delimiter") .stringType() .defaultValue(DEFAULT_FIELD_DELIMITER) .withDescription("The separator between columns in a row of data. Only needed by `text` and `csv` file format"); + public static final Option ROW_DELIMITER = Options.key("row_delimiter") .stringType() .defaultValue(DEFAULT_ROW_DELIMITER) @@ -84,6 +90,7 @@ public class BaseSinkConfig { "and the final file will be placed in the partition directory. " + "Default `partition_dir_expression` is `${k0}=${v0}/${k1}=${v1}/.../${kn}=${vn}/`. " + "`k0` is the first partition field and `v0` is the value of the first partition field."); + public static final Option IS_PARTITION_FIELD_WRITE_IN_FILE = Options.key("is_partition_field_write_in_file") .booleanType() .defaultValue(false) @@ -136,4 +143,14 @@ public class BaseSinkConfig { .stringType() .noDefaultValue() .withDescription("The path of hdfs-site.xml"); + + public static final Option KERBEROS_PRINCIPAL = Options.key("kerberos_principal") + .stringType() + .noDefaultValue() + .withDescription("Kerberos principal"); + + public static final Option KERBEROS_KEYTAB_PATH = Options.key("kerberos_keytab_path") + .stringType() + .noDefaultValue() + .withDescription("Kerberos keytab file path"); } diff --git a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/BaseSourceConfig.java b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/BaseSourceConfig.java index d8e5bcce6d1..42ef21f2d1b 100644 --- a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/BaseSourceConfig.java +++ b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/BaseSourceConfig.java @@ -28,34 +28,52 @@ public class BaseSourceConfig { .objectType(FileFormat.class) .noDefaultValue() .withDescription("File type"); + public static final Option FILE_PATH = Options.key("path") .stringType() .noDefaultValue() .withDescription("The file path of source files"); + public static final Option DELIMITER = Options.key("delimiter") .stringType() .defaultValue(String.valueOf('\001')) .withDescription("The separator between columns in a row of data. Only needed by `text` file format"); + public static final Option DATE_FORMAT = Options.key("date_format") .enumType(DateUtils.Formatter.class) .defaultValue(DateUtils.Formatter.YYYY_MM_DD) .withDescription("Date format"); + public static final Option DATETIME_FORMAT = Options.key("datetime_format") .enumType(DateTimeUtils.Formatter.class) .defaultValue(DateTimeUtils.Formatter.YYYY_MM_DD_HH_MM_SS) .withDescription("Datetime format"); + public static final Option TIME_FORMAT = Options.key("time_format") .enumType(TimeUtils.Formatter.class) .defaultValue(TimeUtils.Formatter.HH_MM_SS) .withDescription("Time format"); + public static final Option PARSE_PARTITION_FROM_PATH = Options.key("parse_partition_from_path") .booleanType() .defaultValue(true) .withDescription("Whether parse partition fields from file path"); + public static final Option HDFS_SITE_PATH = Options.key("hdfs_site_path") .stringType() .noDefaultValue() .withDescription("The path of hdfs-site.xml"); + + public static final Option KERBEROS_PRINCIPAL = Options.key("kerberos_principal") + .stringType() + .noDefaultValue() + .withDescription("Kerberos principal"); + + public static final Option KERBEROS_KEYTAB_PATH = Options.key("kerberos_keytab_path") + .stringType() + .noDefaultValue() + .withDescription("Kerberos keytab file path"); + public static final Option SKIP_HEADER_ROW_NUMBER = Options.key("skip_header_row_number") .longType() .defaultValue(0L) diff --git a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/HadoopConf.java b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/HadoopConf.java index baeb7a3add8..92b2db28245 100644 --- a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/HadoopConf.java +++ b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/HadoopConf.java @@ -32,6 +32,8 @@ public class HadoopConf implements Serializable { protected Map extraOptions = new HashMap<>(); protected String hdfsNameKey; protected String hdfsSitePath; + protected String kerberosPrincipal; + protected String kerberosKeytabPath; public HadoopConf(String hdfsNameKey) { this.hdfsNameKey = hdfsNameKey; diff --git a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/AbstractWriteStrategy.java b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/AbstractWriteStrategy.java index 50c1f365e5f..d081ed63493 100644 --- a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/AbstractWriteStrategy.java +++ b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/AbstractWriteStrategy.java @@ -43,6 +43,7 @@ import org.apache.commons.lang3.StringUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.CommonConfigurationKeys; +import org.apache.hadoop.security.UserGroupInformation; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -83,6 +84,7 @@ public abstract class AbstractWriteStrategy implements WriteStrategy { protected int partId = 0; protected int batchSize; protected int currentBatchSize = 0; + protected boolean isKerberosAuthorization = false; public AbstractWriteStrategy(FileSinkConfig fileSinkConfig) { this.fileSinkConfig = fileSinkConfig; @@ -129,6 +131,25 @@ public Configuration getConfiguration(HadoopConf hadoopConf) { configuration.set(CommonConfigurationKeys.FS_DEFAULT_NAME_KEY, hadoopConf.getHdfsNameKey()); configuration.set(String.format("fs.%s.impl", hadoopConf.getSchema()), hadoopConf.getFsHdfsImpl()); this.hadoopConf.setExtraOptionsForConfiguration(configuration); + if (!isKerberosAuthorization && StringUtils.isNotBlank(hadoopConf.getKerberosPrincipal())) { + // kerberos authentication and only once + if (StringUtils.isBlank(hadoopConf.getKerberosPrincipal())) { + throw new FileConnectorException(CommonErrorCode.KERBEROS_AUTHORIZED_FAILED, + "Kerberos keytab path is blank, please check this parameter that in your config file"); + } + configuration.set("hadoop.security.authentication", "kerberos"); + UserGroupInformation.setConfiguration(configuration); + try { + UserGroupInformation.loginUserFromKeytab(hadoopConf.getKerberosPrincipal(), + hadoopConf.getKerberosKeytabPath()); + } catch (IOException e) { + String errorMsg = String.format("Kerberos authentication failed using this " + + "principal [%s] and keytab path [%s]", + hadoopConf.getKerberosPrincipal(), hadoopConf.getKerberosKeytabPath()); + throw new FileConnectorException(CommonErrorCode.KERBEROS_AUTHORIZED_FAILED, errorMsg, e); + } + isKerberosAuthorization = true; + } return configuration; } diff --git a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/AbstractReadStrategy.java b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/AbstractReadStrategy.java index 4c7de8a54c6..e878fa3c7ba 100644 --- a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/AbstractReadStrategy.java +++ b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/AbstractReadStrategy.java @@ -25,17 +25,21 @@ import org.apache.seatunnel.api.table.type.BasicType; import org.apache.seatunnel.api.table.type.SeaTunnelDataType; import org.apache.seatunnel.api.table.type.SeaTunnelRowType; +import org.apache.seatunnel.common.exception.CommonErrorCode; import org.apache.seatunnel.connectors.seatunnel.file.config.BaseSourceConfig; import org.apache.seatunnel.connectors.seatunnel.file.config.HadoopConf; +import org.apache.seatunnel.connectors.seatunnel.file.exception.FileConnectorException; import org.apache.seatunnel.shade.com.typesafe.config.Config; import lombok.extern.slf4j.Slf4j; +import org.apache.commons.lang3.StringUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.CommonConfigurationKeys; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.security.UserGroupInformation; import java.io.IOException; import java.math.BigDecimal; @@ -68,6 +72,7 @@ public abstract class AbstractReadStrategy implements ReadStrategy { protected List fileNames = new ArrayList<>(); protected boolean isMergePartition = true; protected long skipHeaderNumber = BaseSourceConfig.SKIP_HEADER_ROW_NUMBER.defaultValue(); + protected boolean isKerberosAuthorization = false; @Override public void init(HadoopConf conf) { @@ -90,6 +95,25 @@ public Configuration getConfiguration(HadoopConf hadoopConf) { configuration.set(CommonConfigurationKeys.FS_DEFAULT_NAME_KEY, hadoopConf.getHdfsNameKey()); configuration.set(String.format("fs.%s.impl", hadoopConf.getSchema()), hadoopConf.getFsHdfsImpl()); hadoopConf.setExtraOptionsForConfiguration(configuration); + if (!isKerberosAuthorization && StringUtils.isNotBlank(hadoopConf.getKerberosPrincipal())) { + // kerberos authentication and only once + if (StringUtils.isBlank(hadoopConf.getKerberosPrincipal())) { + throw new FileConnectorException(CommonErrorCode.KERBEROS_AUTHORIZED_FAILED, + "Kerberos keytab path is blank, please check this parameter that in your config file"); + } + configuration.set("hadoop.security.authentication", "kerberos"); + UserGroupInformation.setConfiguration(configuration); + try { + UserGroupInformation.loginUserFromKeytab(hadoopConf.getKerberosPrincipal(), + hadoopConf.getKerberosKeytabPath()); + } catch (IOException e) { + String errorMsg = String.format("Kerberos authentication failed using this " + + "principal [%s] and keytab path [%s]", + hadoopConf.getKerberosPrincipal(), hadoopConf.getKerberosKeytabPath()); + throw new FileConnectorException(CommonErrorCode.KERBEROS_AUTHORIZED_FAILED, errorMsg, e); + } + isKerberosAuthorization = true; + } return configuration; } From 224777e5c3f2df7e23681633dfa6b7ef54e9e12b Mon Sep 17 00:00:00 2001 From: tyrantlucifer Date: Fri, 30 Dec 2022 21:47:55 +0800 Subject: [PATCH 2/6] [Improve][Connector-V2][Hive] Support kerberos authentication --- .../hive/utils/HiveMetaStoreProxy.java | 42 ++++++++++++++++--- 1 file changed, 37 insertions(+), 5 deletions(-) diff --git a/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/utils/HiveMetaStoreProxy.java b/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/utils/HiveMetaStoreProxy.java index 0d819448838..e7a52259158 100644 --- a/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/utils/HiveMetaStoreProxy.java +++ b/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/utils/HiveMetaStoreProxy.java @@ -17,6 +17,9 @@ package org.apache.seatunnel.connectors.seatunnel.hive.utils; +import org.apache.seatunnel.common.exception.CommonErrorCode; +import org.apache.seatunnel.connectors.seatunnel.file.config.BaseSourceConfig; +import org.apache.seatunnel.connectors.seatunnel.file.exception.FileConnectorException; import org.apache.seatunnel.connectors.seatunnel.hive.config.HiveConfig; import org.apache.seatunnel.connectors.seatunnel.hive.exception.HiveConnectorErrorCode; import org.apache.seatunnel.connectors.seatunnel.hive.exception.HiveConnectorException; @@ -24,26 +27,56 @@ import org.apache.seatunnel.shade.com.typesafe.config.Config; import lombok.NonNull; +import lombok.extern.slf4j.Slf4j; +import org.apache.commons.lang3.StringUtils; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.metastore.HiveMetaStoreClient; import org.apache.hadoop.hive.metastore.api.MetaException; import org.apache.hadoop.hive.metastore.api.Table; +import org.apache.hadoop.security.UserGroupInformation; import org.apache.thrift.TException; +import java.io.IOException; import java.util.List; import java.util.Objects; +@Slf4j public class HiveMetaStoreProxy { private final HiveMetaStoreClient hiveMetaStoreClient; private static volatile HiveMetaStoreProxy INSTANCE = null; - private HiveMetaStoreProxy(@NonNull String uris) { + private HiveMetaStoreProxy(Config config) { + String metastoreUri = config.getString(HiveConfig.METASTORE_URI.key()); HiveConf hiveConf = new HiveConf(); - hiveConf.set("hive.metastore.uris", uris); + hiveConf.set("hive.metastore.uris", metastoreUri); + if (config.hasPath(BaseSourceConfig.KERBEROS_PRINCIPAL.key()) && + config.hasPath(BaseSourceConfig.KERBEROS_KEYTAB_PATH.key())) { + String principal = config.getString(BaseSourceConfig.KERBEROS_PRINCIPAL.key()); + String keytabPath = config.getString(BaseSourceConfig.KERBEROS_KEYTAB_PATH.key()); + if (StringUtils.isBlank(principal) || StringUtils.isBlank(keytabPath)) { + String errorMsg = String.format("Kerberos principal [%s] or keytab file path [%s] is blank," + + "please check", principal, keytabPath); + throw new HiveConnectorException(CommonErrorCode.KERBEROS_AUTHORIZED_FAILED, errorMsg); + } + Configuration configuration = new Configuration(); + configuration.set("hadoop.security.authentication", "kerberos"); + UserGroupInformation.setConfiguration(configuration); + try { + log.info("Start Kerberos authentication using principal {} and keytab {}", principal, keytabPath); + UserGroupInformation.loginUserFromKeytab(principal, keytabPath); + log.info("Kerberos authentication successful"); + } catch (IOException e) { + String errorMsg = String.format("Kerberos authentication failed using this " + + "principal [%s] and keytab path [%s]", principal, keytabPath); + throw new FileConnectorException(CommonErrorCode.KERBEROS_AUTHORIZED_FAILED, errorMsg, e); + } + } try { hiveMetaStoreClient = new HiveMetaStoreClient(hiveConf); } catch (MetaException e) { - String errorMsg = String.format("Using this hive uris [%s] to initialize hive metastore client instance failed", uris); + String errorMsg = String.format("Using this hive uris [%s] to initialize " + + "hive metastore client instance failed", metastoreUri); throw new HiveConnectorException(HiveConnectorErrorCode.INITIALIZE_HIVE_METASTORE_CLIENT_FAILED, errorMsg, e); } } @@ -52,8 +85,7 @@ public static HiveMetaStoreProxy getInstance(Config config) { if (INSTANCE == null) { synchronized (HiveMetaStoreProxy.class) { if (INSTANCE == null) { - String metastoreUri = config.getString(HiveConfig.METASTORE_URI.key()); - INSTANCE = new HiveMetaStoreProxy(metastoreUri); + INSTANCE = new HiveMetaStoreProxy(config); } } } From 23fe3e17244b2ec2eaf76c43d5258b2e043bcda8 Mon Sep 17 00:00:00 2001 From: tyrantlucifer Date: Fri, 30 Dec 2022 21:53:33 +0800 Subject: [PATCH 3/6] [Improve][Connector-V2][HdfsFile] Optimize code structure --- .../file/sink/writer/AbstractWriteStrategy.java | 14 ++++++++------ .../file/source/reader/AbstractReadStrategy.java | 11 +++++++---- 2 files changed, 15 insertions(+), 10 deletions(-) diff --git a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/AbstractWriteStrategy.java b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/AbstractWriteStrategy.java index d081ed63493..41bd8ab0831 100644 --- a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/AbstractWriteStrategy.java +++ b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/AbstractWriteStrategy.java @@ -131,21 +131,23 @@ public Configuration getConfiguration(HadoopConf hadoopConf) { configuration.set(CommonConfigurationKeys.FS_DEFAULT_NAME_KEY, hadoopConf.getHdfsNameKey()); configuration.set(String.format("fs.%s.impl", hadoopConf.getSchema()), hadoopConf.getFsHdfsImpl()); this.hadoopConf.setExtraOptionsForConfiguration(configuration); - if (!isKerberosAuthorization && StringUtils.isNotBlank(hadoopConf.getKerberosPrincipal())) { + String principal = hadoopConf.getKerberosPrincipal(); + String keytabPath = hadoopConf.getKerberosKeytabPath(); + if (!isKerberosAuthorization && StringUtils.isNotBlank(principal)) { // kerberos authentication and only once - if (StringUtils.isBlank(hadoopConf.getKerberosPrincipal())) { + if (StringUtils.isBlank(keytabPath)) { throw new FileConnectorException(CommonErrorCode.KERBEROS_AUTHORIZED_FAILED, "Kerberos keytab path is blank, please check this parameter that in your config file"); } configuration.set("hadoop.security.authentication", "kerberos"); UserGroupInformation.setConfiguration(configuration); try { - UserGroupInformation.loginUserFromKeytab(hadoopConf.getKerberosPrincipal(), - hadoopConf.getKerberosKeytabPath()); + log.info("Start Kerberos authentication using principal {} and keytab {}", principal, keytabPath); + UserGroupInformation.loginUserFromKeytab(principal, keytabPath); + log.info("Kerberos authentication successful"); } catch (IOException e) { String errorMsg = String.format("Kerberos authentication failed using this " + - "principal [%s] and keytab path [%s]", - hadoopConf.getKerberosPrincipal(), hadoopConf.getKerberosKeytabPath()); + "principal [%s] and keytab path [%s]", principal, keytabPath); throw new FileConnectorException(CommonErrorCode.KERBEROS_AUTHORIZED_FAILED, errorMsg, e); } isKerberosAuthorization = true; diff --git a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/AbstractReadStrategy.java b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/AbstractReadStrategy.java index e878fa3c7ba..e8d7ae50772 100644 --- a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/AbstractReadStrategy.java +++ b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/AbstractReadStrategy.java @@ -95,21 +95,24 @@ public Configuration getConfiguration(HadoopConf hadoopConf) { configuration.set(CommonConfigurationKeys.FS_DEFAULT_NAME_KEY, hadoopConf.getHdfsNameKey()); configuration.set(String.format("fs.%s.impl", hadoopConf.getSchema()), hadoopConf.getFsHdfsImpl()); hadoopConf.setExtraOptionsForConfiguration(configuration); - if (!isKerberosAuthorization && StringUtils.isNotBlank(hadoopConf.getKerberosPrincipal())) { + String principal = hadoopConf.getKerberosPrincipal(); + String keytabPath = hadoopConf.getKerberosKeytabPath(); + if (!isKerberosAuthorization && StringUtils.isNotBlank(principal)) { // kerberos authentication and only once - if (StringUtils.isBlank(hadoopConf.getKerberosPrincipal())) { + if (StringUtils.isBlank(keytabPath)) { throw new FileConnectorException(CommonErrorCode.KERBEROS_AUTHORIZED_FAILED, "Kerberos keytab path is blank, please check this parameter that in your config file"); } configuration.set("hadoop.security.authentication", "kerberos"); UserGroupInformation.setConfiguration(configuration); try { + log.info("Start Kerberos authentication using principal {} and keytab {}", principal, keytabPath); UserGroupInformation.loginUserFromKeytab(hadoopConf.getKerberosPrincipal(), hadoopConf.getKerberosKeytabPath()); + log.info("Kerberos authentication successful"); } catch (IOException e) { String errorMsg = String.format("Kerberos authentication failed using this " + - "principal [%s] and keytab path [%s]", - hadoopConf.getKerberosPrincipal(), hadoopConf.getKerberosKeytabPath()); + "principal [%s] and keytab path [%s]", principal, keytabPath); throw new FileConnectorException(CommonErrorCode.KERBEROS_AUTHORIZED_FAILED, errorMsg, e); } isKerberosAuthorization = true; From e4b60b5769dfc2b38c7166c9f528292478a3b480 Mon Sep 17 00:00:00 2001 From: tyrantlucifer Date: Fri, 30 Dec 2022 22:18:14 +0800 Subject: [PATCH 4/6] [Improve][Connector-V2][Hive & HdfsFile] Update docs --- docs/en/connector-v2/sink/HdfsFile.md | 16 ++++++++++++-- docs/en/connector-v2/sink/Hive.md | 29 +++++++++++++++++++------ docs/en/connector-v2/source/HdfsFile.md | 15 +++++++++++++ docs/en/connector-v2/source/Hive.md | 16 +++++++++----- 4 files changed, 62 insertions(+), 14 deletions(-) diff --git a/docs/en/connector-v2/sink/HdfsFile.md b/docs/en/connector-v2/sink/HdfsFile.md index eed6ff210de..a46e44d87ba 100644 --- a/docs/en/connector-v2/sink/HdfsFile.md +++ b/docs/en/connector-v2/sink/HdfsFile.md @@ -50,6 +50,8 @@ By default, we use 2PC commit to ensure `exactly-once` | sink_columns | array | no | | When this parameter is empty, all fields are sink columns | | is_enable_transaction | boolean | no | true | | | batch_size | int | no | 1000000 | | +| kerberos_principal | string | no | - | +| kerberos_keytab_path | string | no | - | | | common-options | object | no | - | | | compressCodec | string | no | none | | @@ -151,6 +153,14 @@ Only support `true` now. The maximum number of rows in a file. For SeaTunnel Engine, the number of lines in the file is determined by `batch_size` and `checkpoint.interval` jointly decide. If the value of `checkpoint.interval` is large enough, sink writer will write rows in a file until the rows in the file larger than `batch_size`. If `checkpoint.interval` is small, the sink writer will create a new file when a new checkpoint trigger. +### kerberos_principal [string] + +The principal of kerberos + +### kerberos_keytab_path [string] + +The keytab path of kerberos + ### compressCodec [string] Support lzo compression for text in file format. The file name ends with ".lzo.txt" . @@ -226,11 +236,13 @@ HdfsFile { - [BugFix] Fix filesystem get error ([3117](https://github.com/apache/incubator-seatunnel/pull/3117)) - [BugFix] Solved the bug of can not parse '\t' as delimiter from config file ([3083](https://github.com/apache/incubator-seatunnel/pull/3083)) -### Next version +### 2.3.0 2022-12-30 - [BugFix] Fixed the following bugs that failed to write data to files ([3258](https://github.com/apache/incubator-seatunnel/pull/3258)) - When field from upstream is null it will throw NullPointerException - Sink columns mapping failed - When restore writer from states getting transaction directly failed +### Next version - [Improve] Support setting batch size for every file ([3625](https://github.com/apache/incubator-seatunnel/pull/3625)) -- [Improve] Support lzo compression for text in file format ([3782](https://github.com/apache/incubator-seatunnel/pull/3782)) \ No newline at end of file +- [Improve] Support lzo compression for text in file format ([3782](https://github.com/apache/incubator-seatunnel/pull/3782)) +- [Improve] Support kerberos authentication ([3840](https://github.com/apache/incubator-seatunnel/pull/3840)) \ No newline at end of file diff --git a/docs/en/connector-v2/sink/Hive.md b/docs/en/connector-v2/sink/Hive.md index 269cc1cefce..96513544a3a 100644 --- a/docs/en/connector-v2/sink/Hive.md +++ b/docs/en/connector-v2/sink/Hive.md @@ -30,12 +30,15 @@ By default, we use 2PC commit to ensure `exactly-once` ## Options -| name | type | required | default value | -|----------------|--------|----------|---------------| -| table_name | string | yes | - | -| metastore_uri | string | yes | - | -| compressCodec | string | no | none | -| common-options | | no | - | +| name | type | required | default value | +|----------------------|--------|----------|---------------| +| table_name | string | yes | - | +| metastore_uri | string | yes | - | +| compressCodec | string | no | none | +| kerberos_principal | string | no | - | +| kerberos_keytab_path | string | no | - | +| common-options | | no | - | + ### table_name [string] Target Hive table name eg: db1.table1 @@ -44,6 +47,14 @@ Target Hive table name eg: db1.table1 Hive metastore uri +### kerberos_principal [string] + +The principal of kerberos + +### kerberos_keytab_path [string] + +The keytab path of kerberos + ### common options Sink plugin common parameters, please refer to [Sink Common Options](common-options.md) for details @@ -145,9 +156,13 @@ sink { ### 2.3.0-beta 2022-10-20 - [Improve] Hive Sink supports automatic partition repair ([3133](https://github.com/apache/incubator-seatunnel/pull/3133)) -### Next version +### 2.3.0 2022-12-30 - [BugFix] Fixed the following bugs that failed to write data to files ([3258](https://github.com/apache/incubator-seatunnel/pull/3258)) - When field from upstream is null it will throw NullPointerException - Sink columns mapping failed - When restore writer from states getting transaction directly failed +### Next version + +- [Improve] Support kerberos authentication ([3840](https://github.com/apache/incubator-seatunnel/pull/3840)) + diff --git a/docs/en/connector-v2/source/HdfsFile.md b/docs/en/connector-v2/source/HdfsFile.md index 869e9859ec9..240f89146b7 100644 --- a/docs/en/connector-v2/source/HdfsFile.md +++ b/docs/en/connector-v2/source/HdfsFile.md @@ -45,6 +45,8 @@ Read all the data in a split in a pollNext call. What splits are read will be sa | date_format | string | no | yyyy-MM-dd | | datetime_format | string | no | yyyy-MM-dd HH:mm:ss | | time_format | string | no | HH:mm:ss | +| kerberos_principal | string | no | - | +| kerberos_keytab_path | string | no | - | | skip_header_row_number | long | no | 0 | | schema | config | no | - | | common-options | | no | - | @@ -204,6 +206,14 @@ Hdfs cluster address. The path of `hdfs-site.xml`, used to load ha configuration of namenodes +### kerberos_principal [string] + +The principal of kerberos + +### kerberos_keytab_path [string] + +The keytab path of kerberos + ### schema [Config] #### fields [Config] @@ -253,3 +263,8 @@ HdfsFile { - [BugFix] Fix the bug of incorrect path in windows environment ([2980](https://github.com/apache/incubator-seatunnel/pull/2980)) - [Improve] Support extract partition from SeaTunnelRow fields ([3085](https://github.com/apache/incubator-seatunnel/pull/3085)) - [Improve] Support parse field from file path ([2985](https://github.com/apache/incubator-seatunnel/pull/2985)) + +### next version + +- [Improve] Support skip header for csv and txt files ([3900](https://github.com/apache/incubator-seatunnel/pull/3840)) +- [Improve] Support kerberos authentication ([3840](https://github.com/apache/incubator-seatunnel/pull/3840)) \ No newline at end of file diff --git a/docs/en/connector-v2/source/Hive.md b/docs/en/connector-v2/source/Hive.md index b12b4802a4e..69df9eb9fc4 100644 --- a/docs/en/connector-v2/source/Hive.md +++ b/docs/en/connector-v2/source/Hive.md @@ -33,11 +33,13 @@ Read all the data in a split in a pollNext call. What splits are read will be sa ## Options -| name | type | required | default value | -|----------------|--------|----------|---------------| -| table_name | string | yes | - | -| metastore_uri | string | yes | - | -| common-options | | no | - | +| name | type | required | default value | +|----------------------|--------|----------|---------------| +| table_name | string | yes | - | +| metastore_uri | string | yes | - | +| kerberos_principal | string | no | - | +| kerberos_keytab_path | string | no | - | +| common-options | | no | - | ### table_name [string] @@ -67,3 +69,7 @@ Source plugin common parameters, please refer to [Source Common Options](common- ### 2.2.0-beta 2022-09-26 - Add Hive Source Connector + +### Next version + +- [Improve] Support kerberos authentication ([3840](https://github.com/apache/incubator-seatunnel/pull/3840)) From a463e3c612342cd4f5cad943368e65a2a5039963 Mon Sep 17 00:00:00 2001 From: tyrantlucifer Date: Fri, 30 Dec 2022 22:19:41 +0800 Subject: [PATCH 5/6] [Improve][Connector-V2][Hive & HdfsFile] Correct parameters --- docs/en/connector-v2/sink/HdfsFile.md | 2 +- docs/en/connector-v2/sink/Hive.md | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/docs/en/connector-v2/sink/HdfsFile.md b/docs/en/connector-v2/sink/HdfsFile.md index a46e44d87ba..3790fe01b11 100644 --- a/docs/en/connector-v2/sink/HdfsFile.md +++ b/docs/en/connector-v2/sink/HdfsFile.md @@ -52,8 +52,8 @@ By default, we use 2PC commit to ensure `exactly-once` | batch_size | int | no | 1000000 | | | kerberos_principal | string | no | - | | kerberos_keytab_path | string | no | - | | +| compress_codec | string | no | none | | | common-options | object | no | - | | -| compressCodec | string | no | none | | ### fs.defaultFS [string] diff --git a/docs/en/connector-v2/sink/Hive.md b/docs/en/connector-v2/sink/Hive.md index 96513544a3a..d47621880c4 100644 --- a/docs/en/connector-v2/sink/Hive.md +++ b/docs/en/connector-v2/sink/Hive.md @@ -34,7 +34,7 @@ By default, we use 2PC commit to ensure `exactly-once` |----------------------|--------|----------|---------------| | table_name | string | yes | - | | metastore_uri | string | yes | - | -| compressCodec | string | no | none | +| compress_codec | string | no | none | | kerberos_principal | string | no | - | | kerberos_keytab_path | string | no | - | | common-options | | no | - | From fe13ce666d7221d98e8363359e64eeab82769de0 Mon Sep 17 00:00:00 2001 From: tyrantlucifer Date: Sun, 1 Jan 2023 16:45:54 +0800 Subject: [PATCH 6/6] [Improve][Connector-V2][File] Optimize code --- .../seatunnel/file/source/reader/AbstractReadStrategy.java | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/AbstractReadStrategy.java b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/AbstractReadStrategy.java index e8d7ae50772..5c54794781b 100644 --- a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/AbstractReadStrategy.java +++ b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/AbstractReadStrategy.java @@ -107,8 +107,7 @@ public Configuration getConfiguration(HadoopConf hadoopConf) { UserGroupInformation.setConfiguration(configuration); try { log.info("Start Kerberos authentication using principal {} and keytab {}", principal, keytabPath); - UserGroupInformation.loginUserFromKeytab(hadoopConf.getKerberosPrincipal(), - hadoopConf.getKerberosKeytabPath()); + UserGroupInformation.loginUserFromKeytab(principal, keytabPath); log.info("Kerberos authentication successful"); } catch (IOException e) { String errorMsg = String.format("Kerberos authentication failed using this " +