Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Feature][Connector-V2] Support kerberos in hive and hdfs file connector #3840

Merged
merged 6 commits into from
Jan 16, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 15 additions & 3 deletions docs/en/connector-v2/sink/HdfsFile.md
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,10 @@ By default, we use 2PC commit to ensure `exactly-once`
| sink_columns | array | no | | When this parameter is empty, all fields are sink columns |
| is_enable_transaction | boolean | no | true | |
| batch_size | int | no | 1000000 | |
| kerberos_principal | string | no | - |
| kerberos_keytab_path | string | no | - | |
| compress_codec | string | no | none | |
| common-options | object | no | - | |
| compressCodec | string | no | none | |

### fs.defaultFS [string]

Expand Down Expand Up @@ -151,6 +153,14 @@ Only support `true` now.

The maximum number of rows in a file. For SeaTunnel Engine, the number of lines in the file is determined by `batch_size` and `checkpoint.interval` jointly decide. If the value of `checkpoint.interval` is large enough, sink writer will write rows in a file until the rows in the file larger than `batch_size`. If `checkpoint.interval` is small, the sink writer will create a new file when a new checkpoint trigger.

### kerberos_principal [string]

The principal of kerberos

### kerberos_keytab_path [string]

The keytab path of kerberos

### compressCodec [string]
Support lzo compression for text in file format. The file name ends with ".lzo.txt" .

Expand Down Expand Up @@ -226,11 +236,13 @@ HdfsFile {
- [BugFix] Fix filesystem get error ([3117](https://github.com/apache/incubator-seatunnel/pull/3117))
- [BugFix] Solved the bug of can not parse '\t' as delimiter from config file ([3083](https://github.com/apache/incubator-seatunnel/pull/3083))

### Next version
### 2.3.0 2022-12-30
- [BugFix] Fixed the following bugs that failed to write data to files ([3258](https://github.com/apache/incubator-seatunnel/pull/3258))
- When field from upstream is null it will throw NullPointerException
- Sink columns mapping failed
- When restore writer from states getting transaction directly failed

### Next version
- [Improve] Support setting batch size for every file ([3625](https://github.com/apache/incubator-seatunnel/pull/3625))
- [Improve] Support lzo compression for text in file format ([3782](https://github.com/apache/incubator-seatunnel/pull/3782))
- [Improve] Support lzo compression for text in file format ([3782](https://github.com/apache/incubator-seatunnel/pull/3782))
- [Improve] Support kerberos authentication ([3840](https://github.com/apache/incubator-seatunnel/pull/3840))
29 changes: 22 additions & 7 deletions docs/en/connector-v2/sink/Hive.md
Original file line number Diff line number Diff line change
Expand Up @@ -30,12 +30,15 @@ By default, we use 2PC commit to ensure `exactly-once`

## Options

| name | type | required | default value |
|----------------|--------|----------|---------------|
| table_name | string | yes | - |
| metastore_uri | string | yes | - |
| compressCodec | string | no | none |
| common-options | | no | - |
| name | type | required | default value |
|----------------------|--------|----------|---------------|
| table_name | string | yes | - |
| metastore_uri | string | yes | - |
| compress_codec | string | no | none |
| kerberos_principal | string | no | - |
| kerberos_keytab_path | string | no | - |
| common-options | | no | - |

### table_name [string]

Target Hive table name eg: db1.table1
Expand All @@ -44,6 +47,14 @@ Target Hive table name eg: db1.table1

Hive metastore uri

### kerberos_principal [string]

The principal of kerberos

### kerberos_keytab_path [string]

The keytab path of kerberos

### common options

Sink plugin common parameters, please refer to [Sink Common Options](common-options.md) for details
Expand Down Expand Up @@ -145,9 +156,13 @@ sink {
### 2.3.0-beta 2022-10-20
- [Improve] Hive Sink supports automatic partition repair ([3133](https://github.com/apache/incubator-seatunnel/pull/3133))

### Next version
### 2.3.0 2022-12-30
- [BugFix] Fixed the following bugs that failed to write data to files ([3258](https://github.com/apache/incubator-seatunnel/pull/3258))
- When field from upstream is null it will throw NullPointerException
- Sink columns mapping failed
- When restore writer from states getting transaction directly failed

### Next version

- [Improve] Support kerberos authentication ([3840](https://github.com/apache/incubator-seatunnel/pull/3840))

15 changes: 15 additions & 0 deletions docs/en/connector-v2/source/HdfsFile.md
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,8 @@ Read all the data in a split in a pollNext call. What splits are read will be sa
| date_format | string | no | yyyy-MM-dd |
| datetime_format | string | no | yyyy-MM-dd HH:mm:ss |
| time_format | string | no | HH:mm:ss |
| kerberos_principal | string | no | - |
| kerberos_keytab_path | string | no | - |
| skip_header_row_number | long | no | 0 |
| schema | config | no | - |
| common-options | | no | - |
Expand Down Expand Up @@ -204,6 +206,14 @@ Hdfs cluster address.

The path of `hdfs-site.xml`, used to load ha configuration of namenodes

### kerberos_principal [string]

The principal of kerberos

### kerberos_keytab_path [string]

The keytab path of kerberos

### schema [Config]

#### fields [Config]
Expand Down Expand Up @@ -253,3 +263,8 @@ HdfsFile {
- [BugFix] Fix the bug of incorrect path in windows environment ([2980](https://github.com/apache/incubator-seatunnel/pull/2980))
- [Improve] Support extract partition from SeaTunnelRow fields ([3085](https://github.com/apache/incubator-seatunnel/pull/3085))
- [Improve] Support parse field from file path ([2985](https://github.com/apache/incubator-seatunnel/pull/2985))

### next version

- [Improve] Support skip header for csv and txt files ([3900](https://github.com/apache/incubator-seatunnel/pull/3840))
- [Improve] Support kerberos authentication ([3840](https://github.com/apache/incubator-seatunnel/pull/3840))
16 changes: 11 additions & 5 deletions docs/en/connector-v2/source/Hive.md
Original file line number Diff line number Diff line change
Expand Up @@ -33,11 +33,13 @@ Read all the data in a split in a pollNext call. What splits are read will be sa

## Options

| name | type | required | default value |
|----------------|--------|----------|---------------|
| table_name | string | yes | - |
| metastore_uri | string | yes | - |
| common-options | | no | - |
| name | type | required | default value |
|----------------------|--------|----------|---------------|
| table_name | string | yes | - |
| metastore_uri | string | yes | - |
| kerberos_principal | string | no | - |
| kerberos_keytab_path | string | no | - |
| common-options | | no | - |

### table_name [string]

Expand Down Expand Up @@ -67,3 +69,7 @@ Source plugin common parameters, please refer to [Source Common Options](common-
### 2.2.0-beta 2022-09-26

- Add Hive Source Connector

### Next version

- [Improve] Support kerberos authentication ([3840](https://github.com/apache/incubator-seatunnel/pull/3840))
Original file line number Diff line number Diff line change
Expand Up @@ -46,5 +46,11 @@ public void prepare(Config pluginConfig) throws PrepareFailException {
if (pluginConfig.hasPath(BaseSinkConfig.HDFS_SITE_PATH.key())) {
hadoopConf.setHdfsSitePath(pluginConfig.getString(BaseSinkConfig.HDFS_SITE_PATH.key()));
}
if (pluginConfig.hasPath(BaseSinkConfig.KERBEROS_PRINCIPAL.key())) {
hadoopConf.setKerberosPrincipal(pluginConfig.getString(BaseSinkConfig.KERBEROS_PRINCIPAL.key()));
}
if (pluginConfig.hasPath(BaseSinkConfig.KERBEROS_KEYTAB_PATH.key())) {
hadoopConf.setKerberosKeytabPath(pluginConfig.getString(BaseSinkConfig.KERBEROS_KEYTAB_PATH.key()));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,12 @@ public void prepare(Config pluginConfig) throws PrepareFailException {
if (pluginConfig.hasPath(HdfsSourceConfig.HDFS_SITE_PATH.key())) {
hadoopConf.setHdfsSitePath(pluginConfig.getString(HdfsSourceConfig.HDFS_SITE_PATH.key()));
}
if (pluginConfig.hasPath(HdfsSourceConfig.KERBEROS_PRINCIPAL.key())) {
hadoopConf.setKerberosPrincipal(pluginConfig.getString(HdfsSourceConfig.KERBEROS_PRINCIPAL.key()));
}
if (pluginConfig.hasPath(HdfsSourceConfig.KERBEROS_KEYTAB_PATH.key())) {
hadoopConf.setKerberosKeytabPath(pluginConfig.getString(HdfsSourceConfig.KERBEROS_KEYTAB_PATH.key()));
}
try {
filePaths = readStrategy.getFileNamesByPath(hadoopConf, path);
} catch (IOException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,26 +41,32 @@ public class BaseSinkConfig {
.stringType()
.noDefaultValue()
.withDescription("Compression codec");

public static final Option<DateUtils.Formatter> DATE_FORMAT = Options.key("date_format")
.enumType(DateUtils.Formatter.class)
.defaultValue(DateUtils.Formatter.YYYY_MM_DD)
.withDescription("Date format");

public static final Option<DateTimeUtils.Formatter> DATETIME_FORMAT = Options.key("datetime_format")
.enumType(DateTimeUtils.Formatter.class)
.defaultValue(DateTimeUtils.Formatter.YYYY_MM_DD_HH_MM_SS)
.withDescription("Datetime format");

public static final Option<TimeUtils.Formatter> TIME_FORMAT = Options.key("time_format")
.enumType(TimeUtils.Formatter.class)
.defaultValue(TimeUtils.Formatter.HH_MM_SS)
.withDescription("Time format");

public static final Option<String> FILE_PATH = Options.key("path")
.stringType()
.noDefaultValue()
.withDescription("The file path of target files");

public static final Option<String> FIELD_DELIMITER = Options.key("field_delimiter")
.stringType()
.defaultValue(DEFAULT_FIELD_DELIMITER)
.withDescription("The separator between columns in a row of data. Only needed by `text` and `csv` file format");

public static final Option<String> ROW_DELIMITER = Options.key("row_delimiter")
.stringType()
.defaultValue(DEFAULT_ROW_DELIMITER)
Expand All @@ -84,6 +90,7 @@ public class BaseSinkConfig {
"and the final file will be placed in the partition directory. " +
"Default `partition_dir_expression` is `${k0}=${v0}/${k1}=${v1}/.../${kn}=${vn}/`. " +
"`k0` is the first partition field and `v0` is the value of the first partition field.");

public static final Option<Boolean> IS_PARTITION_FIELD_WRITE_IN_FILE = Options.key("is_partition_field_write_in_file")
.booleanType()
.defaultValue(false)
Expand Down Expand Up @@ -136,4 +143,14 @@ public class BaseSinkConfig {
.stringType()
.noDefaultValue()
.withDescription("The path of hdfs-site.xml");

public static final Option<String> KERBEROS_PRINCIPAL = Options.key("kerberos_principal")
.stringType()
.noDefaultValue()
.withDescription("Kerberos principal");

public static final Option<String> KERBEROS_KEYTAB_PATH = Options.key("kerberos_keytab_path")
.stringType()
.noDefaultValue()
.withDescription("Kerberos keytab file path");
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,34 +28,52 @@ public class BaseSourceConfig {
.objectType(FileFormat.class)
.noDefaultValue()
.withDescription("File type");

public static final Option<String> FILE_PATH = Options.key("path")
.stringType()
.noDefaultValue()
.withDescription("The file path of source files");

public static final Option<String> DELIMITER = Options.key("delimiter")
.stringType()
.defaultValue(String.valueOf('\001'))
.withDescription("The separator between columns in a row of data. Only needed by `text` file format");

public static final Option<DateUtils.Formatter> DATE_FORMAT = Options.key("date_format")
.enumType(DateUtils.Formatter.class)
.defaultValue(DateUtils.Formatter.YYYY_MM_DD)
.withDescription("Date format");

public static final Option<DateTimeUtils.Formatter> DATETIME_FORMAT = Options.key("datetime_format")
.enumType(DateTimeUtils.Formatter.class)
.defaultValue(DateTimeUtils.Formatter.YYYY_MM_DD_HH_MM_SS)
.withDescription("Datetime format");

public static final Option<TimeUtils.Formatter> TIME_FORMAT = Options.key("time_format")
.enumType(TimeUtils.Formatter.class)
.defaultValue(TimeUtils.Formatter.HH_MM_SS)
.withDescription("Time format");

public static final Option<Boolean> PARSE_PARTITION_FROM_PATH = Options.key("parse_partition_from_path")
.booleanType()
.defaultValue(true)
.withDescription("Whether parse partition fields from file path");

public static final Option<String> HDFS_SITE_PATH = Options.key("hdfs_site_path")
.stringType()
.noDefaultValue()
.withDescription("The path of hdfs-site.xml");

public static final Option<String> KERBEROS_PRINCIPAL = Options.key("kerberos_principal")
.stringType()
.noDefaultValue()
.withDescription("Kerberos principal");

public static final Option<String> KERBEROS_KEYTAB_PATH = Options.key("kerberos_keytab_path")
.stringType()
.noDefaultValue()
.withDescription("Kerberos keytab file path");

public static final Option<Long> SKIP_HEADER_ROW_NUMBER = Options.key("skip_header_row_number")
.longType()
.defaultValue(0L)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@ public class HadoopConf implements Serializable {
protected Map<String, String> extraOptions = new HashMap<>();
protected String hdfsNameKey;
protected String hdfsSitePath;
protected String kerberosPrincipal;
protected String kerberosKeytabPath;

public HadoopConf(String hdfsNameKey) {
this.hdfsNameKey = hdfsNameKey;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CommonConfigurationKeys;
import org.apache.hadoop.security.UserGroupInformation;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -83,6 +84,7 @@ public abstract class AbstractWriteStrategy implements WriteStrategy {
protected int partId = 0;
protected int batchSize;
protected int currentBatchSize = 0;
protected boolean isKerberosAuthorization = false;

public AbstractWriteStrategy(FileSinkConfig fileSinkConfig) {
this.fileSinkConfig = fileSinkConfig;
Expand Down Expand Up @@ -129,6 +131,27 @@ public Configuration getConfiguration(HadoopConf hadoopConf) {
configuration.set(CommonConfigurationKeys.FS_DEFAULT_NAME_KEY, hadoopConf.getHdfsNameKey());
configuration.set(String.format("fs.%s.impl", hadoopConf.getSchema()), hadoopConf.getFsHdfsImpl());
this.hadoopConf.setExtraOptionsForConfiguration(configuration);
String principal = hadoopConf.getKerberosPrincipal();
String keytabPath = hadoopConf.getKerberosKeytabPath();
if (!isKerberosAuthorization && StringUtils.isNotBlank(principal)) {
// kerberos authentication and only once
if (StringUtils.isBlank(keytabPath)) {
throw new FileConnectorException(CommonErrorCode.KERBEROS_AUTHORIZED_FAILED,
"Kerberos keytab path is blank, please check this parameter that in your config file");
}
configuration.set("hadoop.security.authentication", "kerberos");
UserGroupInformation.setConfiguration(configuration);
try {
log.info("Start Kerberos authentication using principal {} and keytab {}", principal, keytabPath);
UserGroupInformation.loginUserFromKeytab(principal, keytabPath);
log.info("Kerberos authentication successful");
} catch (IOException e) {
String errorMsg = String.format("Kerberos authentication failed using this " +
"principal [%s] and keytab path [%s]", principal, keytabPath);
throw new FileConnectorException(CommonErrorCode.KERBEROS_AUTHORIZED_FAILED, errorMsg, e);
}
isKerberosAuthorization = true;
}
return configuration;
}

Expand Down
Loading