Skip to content

Commit

Permalink
[Feature][Connector-V2] Support kerberos in hive and hdfs file connec…
Browse files Browse the repository at this point in the history
…tor (#3840)

* [Improve][Connector-V2][HdfsFile] Support kerberos authentication

* [Improve][Connector-V2][HdfsFile] Optimize code structure
  • Loading branch information
TyrantLucifer authored Jan 16, 2023
1 parent 6ab266e commit 055ad9d
Show file tree
Hide file tree
Showing 12 changed files with 198 additions and 20 deletions.
18 changes: 15 additions & 3 deletions docs/en/connector-v2/sink/HdfsFile.md
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,10 @@ By default, we use 2PC commit to ensure `exactly-once`
| sink_columns | array | no | | When this parameter is empty, all fields are sink columns |
| is_enable_transaction | boolean | no | true | |
| batch_size | int | no | 1000000 | |
| kerberos_principal | string | no | - |
| kerberos_keytab_path | string | no | - | |
| compress_codec | string | no | none | |
| common-options | object | no | - | |
| compressCodec | string | no | none | |

### fs.defaultFS [string]

Expand Down Expand Up @@ -151,6 +153,14 @@ Only support `true` now.

The maximum number of rows in a file. For SeaTunnel Engine, the number of lines in the file is determined by `batch_size` and `checkpoint.interval` jointly decide. If the value of `checkpoint.interval` is large enough, sink writer will write rows in a file until the rows in the file larger than `batch_size`. If `checkpoint.interval` is small, the sink writer will create a new file when a new checkpoint trigger.

### kerberos_principal [string]

The principal of kerberos

### kerberos_keytab_path [string]

The keytab path of kerberos

### compressCodec [string]
Support lzo compression for text in file format. The file name ends with ".lzo.txt" .

Expand Down Expand Up @@ -226,11 +236,13 @@ HdfsFile {
- [BugFix] Fix filesystem get error ([3117](https://github.com/apache/incubator-seatunnel/pull/3117))
- [BugFix] Solved the bug of can not parse '\t' as delimiter from config file ([3083](https://github.com/apache/incubator-seatunnel/pull/3083))

### Next version
### 2.3.0 2022-12-30
- [BugFix] Fixed the following bugs that failed to write data to files ([3258](https://github.com/apache/incubator-seatunnel/pull/3258))
- When field from upstream is null it will throw NullPointerException
- Sink columns mapping failed
- When restore writer from states getting transaction directly failed

### Next version
- [Improve] Support setting batch size for every file ([3625](https://github.com/apache/incubator-seatunnel/pull/3625))
- [Improve] Support lzo compression for text in file format ([3782](https://github.com/apache/incubator-seatunnel/pull/3782))
- [Improve] Support lzo compression for text in file format ([3782](https://github.com/apache/incubator-seatunnel/pull/3782))
- [Improve] Support kerberos authentication ([3840](https://github.com/apache/incubator-seatunnel/pull/3840))
29 changes: 22 additions & 7 deletions docs/en/connector-v2/sink/Hive.md
Original file line number Diff line number Diff line change
Expand Up @@ -30,12 +30,15 @@ By default, we use 2PC commit to ensure `exactly-once`

## Options

| name | type | required | default value |
|----------------|--------|----------|---------------|
| table_name | string | yes | - |
| metastore_uri | string | yes | - |
| compressCodec | string | no | none |
| common-options | | no | - |
| name | type | required | default value |
|----------------------|--------|----------|---------------|
| table_name | string | yes | - |
| metastore_uri | string | yes | - |
| compress_codec | string | no | none |
| kerberos_principal | string | no | - |
| kerberos_keytab_path | string | no | - |
| common-options | | no | - |

### table_name [string]

Target Hive table name eg: db1.table1
Expand All @@ -44,6 +47,14 @@ Target Hive table name eg: db1.table1

Hive metastore uri

### kerberos_principal [string]

The principal of kerberos

### kerberos_keytab_path [string]

The keytab path of kerberos

### common options

Sink plugin common parameters, please refer to [Sink Common Options](common-options.md) for details
Expand Down Expand Up @@ -145,9 +156,13 @@ sink {
### 2.3.0-beta 2022-10-20
- [Improve] Hive Sink supports automatic partition repair ([3133](https://github.com/apache/incubator-seatunnel/pull/3133))

### Next version
### 2.3.0 2022-12-30
- [BugFix] Fixed the following bugs that failed to write data to files ([3258](https://github.com/apache/incubator-seatunnel/pull/3258))
- When field from upstream is null it will throw NullPointerException
- Sink columns mapping failed
- When restore writer from states getting transaction directly failed

### Next version

- [Improve] Support kerberos authentication ([3840](https://github.com/apache/incubator-seatunnel/pull/3840))

15 changes: 15 additions & 0 deletions docs/en/connector-v2/source/HdfsFile.md
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,8 @@ Read all the data in a split in a pollNext call. What splits are read will be sa
| date_format | string | no | yyyy-MM-dd |
| datetime_format | string | no | yyyy-MM-dd HH:mm:ss |
| time_format | string | no | HH:mm:ss |
| kerberos_principal | string | no | - |
| kerberos_keytab_path | string | no | - |
| skip_header_row_number | long | no | 0 |
| schema | config | no | - |
| common-options | | no | - |
Expand Down Expand Up @@ -204,6 +206,14 @@ Hdfs cluster address.

The path of `hdfs-site.xml`, used to load ha configuration of namenodes

### kerberos_principal [string]

The principal of kerberos

### kerberos_keytab_path [string]

The keytab path of kerberos

### schema [Config]

#### fields [Config]
Expand Down Expand Up @@ -253,3 +263,8 @@ HdfsFile {
- [BugFix] Fix the bug of incorrect path in windows environment ([2980](https://github.com/apache/incubator-seatunnel/pull/2980))
- [Improve] Support extract partition from SeaTunnelRow fields ([3085](https://github.com/apache/incubator-seatunnel/pull/3085))
- [Improve] Support parse field from file path ([2985](https://github.com/apache/incubator-seatunnel/pull/2985))

### next version

- [Improve] Support skip header for csv and txt files ([3900](https://github.com/apache/incubator-seatunnel/pull/3840))
- [Improve] Support kerberos authentication ([3840](https://github.com/apache/incubator-seatunnel/pull/3840))
16 changes: 11 additions & 5 deletions docs/en/connector-v2/source/Hive.md
Original file line number Diff line number Diff line change
Expand Up @@ -33,11 +33,13 @@ Read all the data in a split in a pollNext call. What splits are read will be sa

## Options

| name | type | required | default value |
|----------------|--------|----------|---------------|
| table_name | string | yes | - |
| metastore_uri | string | yes | - |
| common-options | | no | - |
| name | type | required | default value |
|----------------------|--------|----------|---------------|
| table_name | string | yes | - |
| metastore_uri | string | yes | - |
| kerberos_principal | string | no | - |
| kerberos_keytab_path | string | no | - |
| common-options | | no | - |

### table_name [string]

Expand Down Expand Up @@ -67,3 +69,7 @@ Source plugin common parameters, please refer to [Source Common Options](common-
### 2.2.0-beta 2022-09-26

- Add Hive Source Connector

### Next version

- [Improve] Support kerberos authentication ([3840](https://github.com/apache/incubator-seatunnel/pull/3840))
Original file line number Diff line number Diff line change
Expand Up @@ -46,5 +46,11 @@ public void prepare(Config pluginConfig) throws PrepareFailException {
if (pluginConfig.hasPath(BaseSinkConfig.HDFS_SITE_PATH.key())) {
hadoopConf.setHdfsSitePath(pluginConfig.getString(BaseSinkConfig.HDFS_SITE_PATH.key()));
}
if (pluginConfig.hasPath(BaseSinkConfig.KERBEROS_PRINCIPAL.key())) {
hadoopConf.setKerberosPrincipal(pluginConfig.getString(BaseSinkConfig.KERBEROS_PRINCIPAL.key()));
}
if (pluginConfig.hasPath(BaseSinkConfig.KERBEROS_KEYTAB_PATH.key())) {
hadoopConf.setKerberosKeytabPath(pluginConfig.getString(BaseSinkConfig.KERBEROS_KEYTAB_PATH.key()));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,12 @@ public void prepare(Config pluginConfig) throws PrepareFailException {
if (pluginConfig.hasPath(HdfsSourceConfig.HDFS_SITE_PATH.key())) {
hadoopConf.setHdfsSitePath(pluginConfig.getString(HdfsSourceConfig.HDFS_SITE_PATH.key()));
}
if (pluginConfig.hasPath(HdfsSourceConfig.KERBEROS_PRINCIPAL.key())) {
hadoopConf.setKerberosPrincipal(pluginConfig.getString(HdfsSourceConfig.KERBEROS_PRINCIPAL.key()));
}
if (pluginConfig.hasPath(HdfsSourceConfig.KERBEROS_KEYTAB_PATH.key())) {
hadoopConf.setKerberosKeytabPath(pluginConfig.getString(HdfsSourceConfig.KERBEROS_KEYTAB_PATH.key()));
}
try {
filePaths = readStrategy.getFileNamesByPath(hadoopConf, path);
} catch (IOException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,26 +41,32 @@ public class BaseSinkConfig {
.stringType()
.noDefaultValue()
.withDescription("Compression codec");

public static final Option<DateUtils.Formatter> DATE_FORMAT = Options.key("date_format")
.enumType(DateUtils.Formatter.class)
.defaultValue(DateUtils.Formatter.YYYY_MM_DD)
.withDescription("Date format");

public static final Option<DateTimeUtils.Formatter> DATETIME_FORMAT = Options.key("datetime_format")
.enumType(DateTimeUtils.Formatter.class)
.defaultValue(DateTimeUtils.Formatter.YYYY_MM_DD_HH_MM_SS)
.withDescription("Datetime format");

public static final Option<TimeUtils.Formatter> TIME_FORMAT = Options.key("time_format")
.enumType(TimeUtils.Formatter.class)
.defaultValue(TimeUtils.Formatter.HH_MM_SS)
.withDescription("Time format");

public static final Option<String> FILE_PATH = Options.key("path")
.stringType()
.noDefaultValue()
.withDescription("The file path of target files");

public static final Option<String> FIELD_DELIMITER = Options.key("field_delimiter")
.stringType()
.defaultValue(DEFAULT_FIELD_DELIMITER)
.withDescription("The separator between columns in a row of data. Only needed by `text` and `csv` file format");

public static final Option<String> ROW_DELIMITER = Options.key("row_delimiter")
.stringType()
.defaultValue(DEFAULT_ROW_DELIMITER)
Expand All @@ -84,6 +90,7 @@ public class BaseSinkConfig {
"and the final file will be placed in the partition directory. " +
"Default `partition_dir_expression` is `${k0}=${v0}/${k1}=${v1}/.../${kn}=${vn}/`. " +
"`k0` is the first partition field and `v0` is the value of the first partition field.");

public static final Option<Boolean> IS_PARTITION_FIELD_WRITE_IN_FILE = Options.key("is_partition_field_write_in_file")
.booleanType()
.defaultValue(false)
Expand Down Expand Up @@ -136,4 +143,14 @@ public class BaseSinkConfig {
.stringType()
.noDefaultValue()
.withDescription("The path of hdfs-site.xml");

public static final Option<String> KERBEROS_PRINCIPAL = Options.key("kerberos_principal")
.stringType()
.noDefaultValue()
.withDescription("Kerberos principal");

public static final Option<String> KERBEROS_KEYTAB_PATH = Options.key("kerberos_keytab_path")
.stringType()
.noDefaultValue()
.withDescription("Kerberos keytab file path");
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,34 +28,52 @@ public class BaseSourceConfig {
.objectType(FileFormat.class)
.noDefaultValue()
.withDescription("File type");

public static final Option<String> FILE_PATH = Options.key("path")
.stringType()
.noDefaultValue()
.withDescription("The file path of source files");

public static final Option<String> DELIMITER = Options.key("delimiter")
.stringType()
.defaultValue(String.valueOf('\001'))
.withDescription("The separator between columns in a row of data. Only needed by `text` file format");

public static final Option<DateUtils.Formatter> DATE_FORMAT = Options.key("date_format")
.enumType(DateUtils.Formatter.class)
.defaultValue(DateUtils.Formatter.YYYY_MM_DD)
.withDescription("Date format");

public static final Option<DateTimeUtils.Formatter> DATETIME_FORMAT = Options.key("datetime_format")
.enumType(DateTimeUtils.Formatter.class)
.defaultValue(DateTimeUtils.Formatter.YYYY_MM_DD_HH_MM_SS)
.withDescription("Datetime format");

public static final Option<TimeUtils.Formatter> TIME_FORMAT = Options.key("time_format")
.enumType(TimeUtils.Formatter.class)
.defaultValue(TimeUtils.Formatter.HH_MM_SS)
.withDescription("Time format");

public static final Option<Boolean> PARSE_PARTITION_FROM_PATH = Options.key("parse_partition_from_path")
.booleanType()
.defaultValue(true)
.withDescription("Whether parse partition fields from file path");

public static final Option<String> HDFS_SITE_PATH = Options.key("hdfs_site_path")
.stringType()
.noDefaultValue()
.withDescription("The path of hdfs-site.xml");

public static final Option<String> KERBEROS_PRINCIPAL = Options.key("kerberos_principal")
.stringType()
.noDefaultValue()
.withDescription("Kerberos principal");

public static final Option<String> KERBEROS_KEYTAB_PATH = Options.key("kerberos_keytab_path")
.stringType()
.noDefaultValue()
.withDescription("Kerberos keytab file path");

public static final Option<Long> SKIP_HEADER_ROW_NUMBER = Options.key("skip_header_row_number")
.longType()
.defaultValue(0L)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@ public class HadoopConf implements Serializable {
protected Map<String, String> extraOptions = new HashMap<>();
protected String hdfsNameKey;
protected String hdfsSitePath;
protected String kerberosPrincipal;
protected String kerberosKeytabPath;

public HadoopConf(String hdfsNameKey) {
this.hdfsNameKey = hdfsNameKey;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CommonConfigurationKeys;
import org.apache.hadoop.security.UserGroupInformation;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -83,6 +84,7 @@ public abstract class AbstractWriteStrategy implements WriteStrategy {
protected int partId = 0;
protected int batchSize;
protected int currentBatchSize = 0;
protected boolean isKerberosAuthorization = false;

public AbstractWriteStrategy(FileSinkConfig fileSinkConfig) {
this.fileSinkConfig = fileSinkConfig;
Expand Down Expand Up @@ -129,6 +131,27 @@ public Configuration getConfiguration(HadoopConf hadoopConf) {
configuration.set(CommonConfigurationKeys.FS_DEFAULT_NAME_KEY, hadoopConf.getHdfsNameKey());
configuration.set(String.format("fs.%s.impl", hadoopConf.getSchema()), hadoopConf.getFsHdfsImpl());
this.hadoopConf.setExtraOptionsForConfiguration(configuration);
String principal = hadoopConf.getKerberosPrincipal();
String keytabPath = hadoopConf.getKerberosKeytabPath();
if (!isKerberosAuthorization && StringUtils.isNotBlank(principal)) {
// kerberos authentication and only once
if (StringUtils.isBlank(keytabPath)) {
throw new FileConnectorException(CommonErrorCode.KERBEROS_AUTHORIZED_FAILED,
"Kerberos keytab path is blank, please check this parameter that in your config file");
}
configuration.set("hadoop.security.authentication", "kerberos");
UserGroupInformation.setConfiguration(configuration);
try {
log.info("Start Kerberos authentication using principal {} and keytab {}", principal, keytabPath);
UserGroupInformation.loginUserFromKeytab(principal, keytabPath);
log.info("Kerberos authentication successful");
} catch (IOException e) {
String errorMsg = String.format("Kerberos authentication failed using this " +
"principal [%s] and keytab path [%s]", principal, keytabPath);
throw new FileConnectorException(CommonErrorCode.KERBEROS_AUTHORIZED_FAILED, errorMsg, e);
}
isKerberosAuthorization = true;
}
return configuration;
}

Expand Down
Loading

0 comments on commit 055ad9d

Please sign in to comment.