Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Improve] Support using multiple hadoop account #5903

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions docs/en/connector-v2/sink/HdfsFile.md
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ Output data to hdfs file
| is_enable_transaction | boolean | no | true | If `is_enable_transaction` is true, we will ensure that data will not be lost or duplicated when it is written to the target directory.Please note that, If `is_enable_transaction` is `true`, we will auto add `${transactionId}_` in the head of the file.Only support `true` now. |
| batch_size | int | no | 1000000 | The maximum number of rows in a file. For SeaTunnel Engine, the number of lines in the file is determined by `batch_size` and `checkpoint.interval` jointly decide. If the value of `checkpoint.interval` is large enough, sink writer will write rows in a file until the rows in the file larger than `batch_size`. If `checkpoint.interval` is small, the sink writer will create a new file when a new checkpoint trigger. |
| compress_codec | string | no | none | The compress codec of files and the details that supported as the following shown:[txt: `lzo` `none`,json: `lzo` `none`,csv: `lzo` `none`,orc: `lzo` `snappy` `lz4` `zlib` `none`,parquet: `lzo` `snappy` `lz4` `gzip` `brotli` `zstd` `none`].Tips: excel type does not support any compression format. |
| krb5_path | string | no | /etc/krb5.conf | The krb5 path of kerberos |
| kerberos_principal | string | no | - | The principal of kerberos |
| kerberos_keytab_path | string | no | - | The keytab path of kerberos |
| compress_codec | string | no | none | compress codec |
Expand Down
25 changes: 15 additions & 10 deletions docs/en/connector-v2/sink/Hive.md
Original file line number Diff line number Diff line change
Expand Up @@ -30,16 +30,17 @@ By default, we use 2PC commit to ensure `exactly-once`

## Options

| name | type | required | default value |
|----------------------|--------|----------|---------------|
| table_name | string | yes | - |
| metastore_uri | string | yes | - |
| compress_codec | string | no | none |
| hdfs_site_path | string | no | - |
| hive_site_path | string | no | - |
| kerberos_principal | string | no | - |
| kerberos_keytab_path | string | no | - |
| common-options | | no | - |
| name | type | required | default value |
|----------------------|--------|----------|----------------|
| table_name | string | yes | - |
| metastore_uri | string | yes | - |
| compress_codec | string | no | none |
| hdfs_site_path | string | no | - |
| hive_site_path | string | no | - |
| krb5_path | string | no | /etc/krb5.conf |
| kerberos_principal | string | no | - |
| kerberos_keytab_path | string | no | - |
| common-options | | no | - |

### table_name [string]

Expand All @@ -55,6 +56,10 @@ The path of `hdfs-site.xml`, used to load ha configuration of namenodes

### hive_site_path [string]

### krb5_path [string]

The path of `krb5.conf`, used to authentication kerberos

The path of `hive-site.xml`, used to authentication hive metastore

### kerberos_principal [string]
Expand Down
2 changes: 2 additions & 0 deletions docs/en/connector-v2/source/HdfsFile.md
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,8 @@ Read data from hdfs file system.
| date_format | string | no | yyyy-MM-dd | Date type format, used to tell connector how to convert string to date, supported as the following formats:`yyyy-MM-dd` `yyyy.MM.dd` `yyyy/MM/dd` default `yyyy-MM-dd`.Date type format, used to tell connector how to convert string to date, supported as the following formats:`yyyy-MM-dd` `yyyy.MM.dd` `yyyy/MM/dd` default `yyyy-MM-dd` |
| datetime_format | string | no | yyyy-MM-dd HH:mm:ss | Datetime type format, used to tell connector how to convert string to datetime, supported as the following formats:`yyyy-MM-dd HH:mm:ss` `yyyy.MM.dd HH:mm:ss` `yyyy/MM/dd HH:mm:ss` `yyyyMMddHHmmss` .default `yyyy-MM-dd HH:mm:ss` |
| time_format | string | no | HH:mm:ss | Time type format, used to tell connector how to convert string to time, supported as the following formats:`HH:mm:ss` `HH:mm:ss.SSS`.default `HH:mm:ss` |
| remote_user | string | no | - | The login user used to connect to hadoop login name. It is intended to be used for remote users in RPC, it won't have any credentials. |
| krb5_path | string | no | /etc/krb5.conf | The krb5 path of kerberos |
| kerberos_principal | string | no | - | The principal of kerberos |
| kerberos_keytab_path | string | no | - | The keytab path of kerberos |
| skip_header_row_number | long | no | 0 | Skip the first few lines, but only for the txt and csv.For example, set like following:`skip_header_row_number = 2`.then Seatunnel will skip the first 2 lines from source files |
Expand Down
31 changes: 18 additions & 13 deletions docs/en/connector-v2/source/Hive.md
Original file line number Diff line number Diff line change
Expand Up @@ -33,19 +33,20 @@ Read all the data in a split in a pollNext call. What splits are read will be sa

## Options

| name | type | required | default value |
|-------------------------------|---------|----------|---------------|
| table_name | string | yes | - |
| metastore_uri | string | yes | - |
| kerberos_principal | string | no | - |
| kerberos_keytab_path | string | no | - |
| hdfs_site_path | string | no | - |
| hive_site_path | string | no | - |
| read_partitions | list | no | - |
| read_columns | list | no | - |
| abort_drop_partition_metadata | boolean | no | true |
| compress_codec | string | no | none |
| common-options | | no | - |
| name | type | required | default value |
|-------------------------------|---------|----------|----------------|
| table_name | string | yes | - |
| metastore_uri | string | yes | - |
| krb5_path | string | no | /etc/krb5.conf |
| kerberos_principal | string | no | - |
| kerberos_keytab_path | string | no | - |
| hdfs_site_path | string | no | - |
| hive_site_path | string | no | - |
| read_partitions | list | no | - |
| read_columns | list | no | - |
| abort_drop_partition_metadata | boolean | no | true |
| compress_codec | string | no | none |
| common-options | | no | - |

### table_name [string]

Expand All @@ -70,6 +71,10 @@ The target partitions that user want to read from hive table, if user does not s
**Tips: Every partition in partitions list should have the same directory depth. For example, a hive table has two partitions: par1 and par2, if user sets it like as the following:**
**read_partitions = [par1=xxx, par1=yyy/par2=zzz], it is illegal**

### krb5_path [string]

The path of `krb5.conf`, used to authentication kerberos

### kerberos_principal [string]

The principal of kerberos authentication
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,15 @@ public void prepare(Config pluginConfig) throws PrepareFailException {
if (pluginConfig.hasPath(BaseSinkConfig.HDFS_SITE_PATH.key())) {
hadoopConf.setHdfsSitePath(pluginConfig.getString(BaseSinkConfig.HDFS_SITE_PATH.key()));
}

if (pluginConfig.hasPath(BaseSinkConfig.REMOTE_USER.key())) {
hadoopConf.setRemoteUser(pluginConfig.getString(BaseSinkConfig.REMOTE_USER.key()));
}

if (pluginConfig.hasPath(BaseSinkConfig.KRB5_PATH.key())) {
hadoopConf.setKrb5Path(pluginConfig.getString(BaseSinkConfig.KRB5_PATH.key()));
}

if (pluginConfig.hasPath(BaseSinkConfig.KERBEROS_PRINCIPAL.key())) {
hadoopConf.setKerberosPrincipal(
pluginConfig.getString(BaseSinkConfig.KERBEROS_PRINCIPAL.key()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,16 +55,17 @@ public void prepare(Config pluginConfig) throws PrepareFailException {
"PluginName: %s, PluginType: %s, Message: %s",
getPluginName(), PluginType.SOURCE, result.getMsg()));
}
readStrategy =
ReadStrategyFactory.of(
pluginConfig.getString(HdfsSourceConfig.FILE_FORMAT_TYPE.key()));
readStrategy.setPluginConfig(pluginConfig);
String path = pluginConfig.getString(HdfsSourceConfig.FILE_PATH.key());
hadoopConf = new HadoopConf(pluginConfig.getString(HdfsSourceConfig.DEFAULT_FS.key()));
if (pluginConfig.hasPath(HdfsSourceConfig.HDFS_SITE_PATH.key())) {
hadoopConf.setHdfsSitePath(
pluginConfig.getString(HdfsSourceConfig.HDFS_SITE_PATH.key()));
}

if (pluginConfig.hasPath(HdfsSourceConfig.REMOTE_USER.key())) {
hadoopConf.setRemoteUser(pluginConfig.getString(HdfsSourceConfig.REMOTE_USER.key()));
}

if (pluginConfig.hasPath(HdfsSourceConfig.KERBEROS_PRINCIPAL.key())) {
hadoopConf.setKerberosPrincipal(
pluginConfig.getString(HdfsSourceConfig.KERBEROS_PRINCIPAL.key()));
Expand All @@ -73,8 +74,13 @@ public void prepare(Config pluginConfig) throws PrepareFailException {
hadoopConf.setKerberosKeytabPath(
pluginConfig.getString(HdfsSourceConfig.KERBEROS_KEYTAB_PATH.key()));
}
readStrategy =
ReadStrategyFactory.of(
pluginConfig.getString(HdfsSourceConfig.FILE_FORMAT_TYPE.key()));
readStrategy.setPluginConfig(pluginConfig);
readStrategy.init(hadoopConf);
try {
filePaths = readStrategy.getFileNamesByPath(hadoopConf, path);
filePaths = readStrategy.getFileNamesByPath(path);
} catch (IOException e) {
String errorMsg = String.format("Get file list from this path [%s] failed", path);
throw new FileConnectorException(
Expand Down Expand Up @@ -117,7 +123,7 @@ public void prepare(Config pluginConfig) throws PrepareFailException {
return;
}
try {
rowType = readStrategy.getSeaTunnelRowTypeInfo(hadoopConf, filePaths.get(0));
rowType = readStrategy.getSeaTunnelRowTypeInfo(filePaths.get(0));
} catch (FileConnectorException e) {
String errorMsg =
String.format("Get table schema from file [%s] failed", filePaths.get(0));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
<parquet-avro.version>1.12.3</parquet-avro.version>
<poi.version>4.1.2</poi.version>
<poi-ooxml.version>4.1.2</poi-ooxml.version>
<hadoop-minikdc.version>3.1.4</hadoop-minikdc.version>
</properties>

<dependencyManagement>
Expand Down Expand Up @@ -136,6 +137,13 @@
<artifactId>poi-ooxml</artifactId>
<version>${poi-ooxml.version}</version>
</dependency>

<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-minikdc</artifactId>
<version>${hadoop-minikdc.version}</version>
<scope>test</scope>
</dependency>
</dependencies>

<build>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -209,6 +209,19 @@ public class BaseSinkConfig {
.noDefaultValue()
.withDescription("The path of hdfs-site.xml");

public static final Option<String> REMOTE_USER =
Options.key("remote_user")
.stringType()
.noDefaultValue()
.withDescription("The remote user name of hdfs");

public static final Option<String> KRB5_PATH =
Options.key("krb5_path")
.stringType()
.defaultValue("/etc/krb5.conf")
.withDescription(
"When use kerberos, we should set krb5 path file path such as '/seatunnel/krb5.conf' or use the default path '/etc/krb5.conf");

public static final Option<String> KERBEROS_PRINCIPAL =
Options.key("kerberos_principal")
.stringType()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,12 +78,25 @@ public class BaseSourceConfig {
.noDefaultValue()
.withDescription("The path of hdfs-site.xml");

public static final Option<String> REMOTE_USER =
Options.key("remote_user")
.stringType()
.noDefaultValue()
.withDescription("The remote user name of hdfs");

public static final Option<String> KERBEROS_PRINCIPAL =
Options.key("kerberos_principal")
.stringType()
.noDefaultValue()
.withDescription("Kerberos principal");

public static final Option<String> KRB5_PATH =
Options.key("krb5_path")
.stringType()
.defaultValue("/etc/krb5.conf")
.withDescription(
"When use kerberos, we should set krb5 path file path such as '/seatunnel/krb5.conf' or use the default path '/etc/krb5.conf");

public static final Option<String> KERBEROS_KEYTAB_PATH =
Options.key("kerberos_keytab_path")
.stringType()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,10 @@ public class HadoopConf implements Serializable {
protected Map<String, String> extraOptions = new HashMap<>();
protected String hdfsNameKey;
protected String hdfsSitePath;

protected String remoteUser;

private String krb5Path;
protected String kerberosPrincipal;
protected String kerberosKeytabPath;

Expand Down
Loading
Loading