Skip to content

Commit

Permalink
Support using multiple hadoop account (apache#5903)
Browse files Browse the repository at this point in the history
  • Loading branch information
ruanwenjun authored and chaorongzhi committed Aug 21, 2024
1 parent cb1f46d commit 344984a
Show file tree
Hide file tree
Showing 47 changed files with 941 additions and 289 deletions.
1 change: 1 addition & 0 deletions docs/en/connector-v2/sink/HdfsFile.md
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ Output data to hdfs file
| is_enable_transaction | boolean | no | true | If `is_enable_transaction` is true, we will ensure that data will not be lost or duplicated when it is written to the target directory.Please note that, If `is_enable_transaction` is `true`, we will auto add `${transactionId}_` in the head of the file.Only support `true` now. |
| batch_size | int | no | 1000000 | The maximum number of rows in a file. For SeaTunnel Engine, the number of lines in the file is determined by `batch_size` and `checkpoint.interval` jointly decide. If the value of `checkpoint.interval` is large enough, sink writer will write rows in a file until the rows in the file larger than `batch_size`. If `checkpoint.interval` is small, the sink writer will create a new file when a new checkpoint trigger. |
| compress_codec | string | no | none | The compress codec of files and the details that supported as the following shown:[txt: `lzo` `none`,json: `lzo` `none`,csv: `lzo` `none`,orc: `lzo` `snappy` `lz4` `zlib` `none`,parquet: `lzo` `snappy` `lz4` `gzip` `brotli` `zstd` `none`].Tips: excel type does not support any compression format. |
| krb5_path | string | no | /etc/krb5.conf | The krb5 path of kerberos |
| kerberos_principal | string | no | - | The principal of kerberos |
| kerberos_keytab_path | string | no | - | The keytab path of kerberos |
| compress_codec | string | no | none | compress codec |
Expand Down
25 changes: 15 additions & 10 deletions docs/en/connector-v2/sink/Hive.md
Original file line number Diff line number Diff line change
Expand Up @@ -30,16 +30,17 @@ By default, we use 2PC commit to ensure `exactly-once`

## Options

| name | type | required | default value |
|----------------------|--------|----------|---------------|
| table_name | string | yes | - |
| metastore_uri | string | yes | - |
| compress_codec | string | no | none |
| hdfs_site_path | string | no | - |
| hive_site_path | string | no | - |
| kerberos_principal | string | no | - |
| kerberos_keytab_path | string | no | - |
| common-options | | no | - |
| name | type | required | default value |
|----------------------|--------|----------|----------------|
| table_name | string | yes | - |
| metastore_uri | string | yes | - |
| compress_codec | string | no | none |
| hdfs_site_path | string | no | - |
| hive_site_path | string | no | - |
| krb5_path | string | no | /etc/krb5.conf |
| kerberos_principal | string | no | - |
| kerberos_keytab_path | string | no | - |
| common-options | | no | - |

### table_name [string]

Expand All @@ -55,6 +56,10 @@ The path of `hdfs-site.xml`, used to load ha configuration of namenodes

### hive_site_path [string]

### krb5_path [string]

The path of `krb5.conf`, used to authentication kerberos

The path of `hive-site.xml`, used to authentication hive metastore

### kerberos_principal [string]
Expand Down
2 changes: 2 additions & 0 deletions docs/en/connector-v2/source/HdfsFile.md
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,8 @@ Read data from hdfs file system.
| date_format | string | no | yyyy-MM-dd | Date type format, used to tell connector how to convert string to date, supported as the following formats:`yyyy-MM-dd` `yyyy.MM.dd` `yyyy/MM/dd` default `yyyy-MM-dd`.Date type format, used to tell connector how to convert string to date, supported as the following formats:`yyyy-MM-dd` `yyyy.MM.dd` `yyyy/MM/dd` default `yyyy-MM-dd` |
| datetime_format | string | no | yyyy-MM-dd HH:mm:ss | Datetime type format, used to tell connector how to convert string to datetime, supported as the following formats:`yyyy-MM-dd HH:mm:ss` `yyyy.MM.dd HH:mm:ss` `yyyy/MM/dd HH:mm:ss` `yyyyMMddHHmmss` .default `yyyy-MM-dd HH:mm:ss` |
| time_format | string | no | HH:mm:ss | Time type format, used to tell connector how to convert string to time, supported as the following formats:`HH:mm:ss` `HH:mm:ss.SSS`.default `HH:mm:ss` |
| remote_user | string | no | - | The login user used to connect to hadoop login name. It is intended to be used for remote users in RPC, it won't have any credentials. |
| krb5_path | string | no | /etc/krb5.conf | The krb5 path of kerberos |
| kerberos_principal | string | no | - | The principal of kerberos |
| kerberos_keytab_path | string | no | - | The keytab path of kerberos |
| skip_header_row_number | long | no | 0 | Skip the first few lines, but only for the txt and csv.For example, set like following:`skip_header_row_number = 2`.then Seatunnel will skip the first 2 lines from source files |
Expand Down
31 changes: 18 additions & 13 deletions docs/en/connector-v2/source/Hive.md
Original file line number Diff line number Diff line change
Expand Up @@ -33,19 +33,20 @@ Read all the data in a split in a pollNext call. What splits are read will be sa

## Options

| name | type | required | default value |
|-------------------------------|---------|----------|---------------|
| table_name | string | yes | - |
| metastore_uri | string | yes | - |
| kerberos_principal | string | no | - |
| kerberos_keytab_path | string | no | - |
| hdfs_site_path | string | no | - |
| hive_site_path | string | no | - |
| read_partitions | list | no | - |
| read_columns | list | no | - |
| abort_drop_partition_metadata | boolean | no | true |
| compress_codec | string | no | none |
| common-options | | no | - |
| name | type | required | default value |
|-------------------------------|---------|----------|----------------|
| table_name | string | yes | - |
| metastore_uri | string | yes | - |
| krb5_path | string | no | /etc/krb5.conf |
| kerberos_principal | string | no | - |
| kerberos_keytab_path | string | no | - |
| hdfs_site_path | string | no | - |
| hive_site_path | string | no | - |
| read_partitions | list | no | - |
| read_columns | list | no | - |
| abort_drop_partition_metadata | boolean | no | true |
| compress_codec | string | no | none |
| common-options | | no | - |

### table_name [string]

Expand All @@ -70,6 +71,10 @@ The target partitions that user want to read from hive table, if user does not s
**Tips: Every partition in partitions list should have the same directory depth. For example, a hive table has two partitions: par1 and par2, if user sets it like as the following:**
**read_partitions = [par1=xxx, par1=yyy/par2=zzz], it is illegal**

### krb5_path [string]

The path of `krb5.conf`, used to authentication kerberos

### kerberos_principal [string]

The principal of kerberos authentication
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,15 @@ public void prepare(Config pluginConfig) throws PrepareFailException {
if (pluginConfig.hasPath(BaseSinkConfig.HDFS_SITE_PATH.key())) {
hadoopConf.setHdfsSitePath(pluginConfig.getString(BaseSinkConfig.HDFS_SITE_PATH.key()));
}

if (pluginConfig.hasPath(BaseSinkConfig.REMOTE_USER.key())) {
hadoopConf.setRemoteUser(pluginConfig.getString(BaseSinkConfig.REMOTE_USER.key()));
}

if (pluginConfig.hasPath(BaseSinkConfig.KRB5_PATH.key())) {
hadoopConf.setKrb5Path(pluginConfig.getString(BaseSinkConfig.KRB5_PATH.key()));
}

if (pluginConfig.hasPath(BaseSinkConfig.CORE_SITE_PATH.key())) {
hadoopConf.setCoreSitePath(pluginConfig.getString(BaseSinkConfig.CORE_SITE_PATH.key()));
}
Expand All @@ -59,12 +68,5 @@ public void prepare(Config pluginConfig) throws PrepareFailException {
hadoopConf.setKerberosKeytabPath(
pluginConfig.getString(BaseSinkConfig.KERBEROS_KEYTAB_PATH.key()));
}
if (pluginConfig.hasPath(BaseSinkConfig.KRB5_PATH.key())) {
hadoopConf.setKrb5Path(pluginConfig.getString(BaseSinkConfig.KRB5_PATH.key()));
}
if (pluginConfig.hasPath(BaseSinkConfig.HADOOP_USER_NAME.key())) {
hadoopConf.setHadoopUserName(
pluginConfig.getString(BaseSinkConfig.HADOOP_USER_NAME.key()));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -56,10 +56,6 @@ public void prepare(Config pluginConfig) throws PrepareFailException {
"PluginName: %s, PluginType: %s, Message: %s",
getPluginName(), PluginType.SOURCE, result.getMsg()));
}
readStrategy =
ReadStrategyFactory.of(
pluginConfig.getString(HdfsSourceConfig.FILE_FORMAT_TYPE.key()));
readStrategy.setPluginConfig(pluginConfig);
String path = pluginConfig.getString(HdfsSourceConfig.FILE_PATH.key());
hadoopConf = new HadoopConf(pluginConfig.getString(HdfsSourceConfig.DEFAULT_FS.key()));
if (pluginConfig.hasPath(BaseSinkConfig.HADOOP_USER_NAME.key())) {
Expand All @@ -72,6 +68,11 @@ public void prepare(Config pluginConfig) throws PrepareFailException {
hadoopConf.setHdfsSitePath(
pluginConfig.getString(HdfsSourceConfig.HDFS_SITE_PATH.key()));
}

if (pluginConfig.hasPath(HdfsSourceConfig.REMOTE_USER.key())) {
hadoopConf.setRemoteUser(pluginConfig.getString(HdfsSourceConfig.REMOTE_USER.key()));
}

if (pluginConfig.hasPath(BaseSinkConfig.CORE_SITE_PATH.key())) {
hadoopConf.setCoreSitePath(pluginConfig.getString(BaseSinkConfig.CORE_SITE_PATH.key()));
}
Expand All @@ -83,8 +84,13 @@ public void prepare(Config pluginConfig) throws PrepareFailException {
hadoopConf.setKerberosKeytabPath(
pluginConfig.getString(HdfsSourceConfig.KERBEROS_KEYTAB_PATH.key()));
}
readStrategy =
ReadStrategyFactory.of(
pluginConfig.getString(HdfsSourceConfig.FILE_FORMAT_TYPE.key()));
readStrategy.setPluginConfig(pluginConfig);
readStrategy.init(hadoopConf);
try {
filePaths = readStrategy.getFileNamesByPath(hadoopConf, path);
filePaths = readStrategy.getFileNamesByPath(path);
} catch (IOException e) {
String errorMsg = String.format("Get file list from this path [%s] failed", path);
throw new FileConnectorException(
Expand Down Expand Up @@ -127,7 +133,7 @@ public void prepare(Config pluginConfig) throws PrepareFailException {
return;
}
try {
rowType = readStrategy.getSeaTunnelRowTypeInfo(hadoopConf, filePaths.get(0));
rowType = readStrategy.getSeaTunnelRowTypeInfo(filePaths.get(0));
} catch (FileConnectorException e) {
String errorMsg =
String.format("Get table schema from file [%s] failed", filePaths.get(0));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
<parquet-avro.version>1.12.3</parquet-avro.version>
<poi.version>4.1.2</poi.version>
<poi-ooxml.version>4.1.2</poi-ooxml.version>
<hadoop-minikdc.version>3.1.4</hadoop-minikdc.version>
</properties>

<dependencyManagement>
Expand Down Expand Up @@ -136,6 +137,13 @@
<artifactId>poi-ooxml</artifactId>
<version>${poi-ooxml.version}</version>
</dependency>

<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-minikdc</artifactId>
<version>${hadoop-minikdc.version}</version>
<scope>test</scope>
</dependency>
</dependencies>

<build>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -209,6 +209,19 @@ public class BaseSinkConfig {
.noDefaultValue()
.withDescription("The path of hdfs-site.xml");

public static final Option<String> REMOTE_USER =
Options.key("remote_user")
.stringType()
.noDefaultValue()
.withDescription("The remote user name of hdfs");

public static final Option<String> KRB5_PATH =
Options.key("krb5_path")
.stringType()
.defaultValue("/etc/krb5.conf")
.withDescription(
"When use kerberos, we should set krb5 path file path such as '/seatunnel/krb5.conf' or use the default path '/etc/krb5.conf");

public static final Option<String> CORE_SITE_PATH =
Options.key("core_site_path")
.stringType()
Expand Down Expand Up @@ -250,10 +263,4 @@ public class BaseSinkConfig {
.stringType()
.noDefaultValue()
.withDescription("hadoop user name");

public static final Option<String> KRB5_PATH =
Options.key("krb5_path")
.stringType()
.noDefaultValue()
.withDescription("krb5 file path");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,12 @@ public class BaseSourceConfig {
.noDefaultValue()
.withDescription("The path of hdfs-site.xml");

public static final Option<String> REMOTE_USER =
Options.key("remote_user")
.stringType()
.noDefaultValue()
.withDescription("The remote user name of hdfs");

public static final Option<String> CORE_SITE_PATH =
Options.key("core_site_path")
.stringType()
Expand All @@ -90,6 +96,13 @@ public class BaseSourceConfig {
.noDefaultValue()
.withDescription("Kerberos principal");

public static final Option<String> KRB5_PATH =
Options.key("krb5_path")
.stringType()
.defaultValue("/etc/krb5.conf")
.withDescription(
"When use kerberos, we should set krb5 path file path such as '/seatunnel/krb5.conf' or use the default path '/etc/krb5.conf");

public static final Option<String> KERBEROS_KEYTAB_PATH =
Options.key("kerberos_keytab_path")
.stringType()
Expand Down Expand Up @@ -132,10 +145,4 @@ public class BaseSourceConfig {
.enumType(CompressFormat.class)
.defaultValue(CompressFormat.NONE)
.withDescription("Compression codec");

public static final Option<String> KRB5_PATH =
Options.key("krb5_path")
.stringType()
.noDefaultValue()
.withDescription("Krb5 file path");
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ public class HadoopConf implements Serializable {
protected Map<String, String> extraOptions = new HashMap<>();
protected String hdfsNameKey;
protected String hdfsSitePath;
protected String remoteUser;
protected String coreSitePath;
protected String kerberosPrincipal;
protected String kerberosKeytabPath;
Expand Down
Loading

0 comments on commit 344984a

Please sign in to comment.