diff --git a/seatunnel-connectors-v2/connector-file/connector-file-base-hadoop/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/hdfs/sink/BaseHdfsFileSink.java b/seatunnel-connectors-v2/connector-file/connector-file-base-hadoop/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/hdfs/sink/BaseHdfsFileSink.java index 5870394a0e1..4cc2e7a54bc 100644 --- a/seatunnel-connectors-v2/connector-file/connector-file-base-hadoop/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/hdfs/sink/BaseHdfsFileSink.java +++ b/seatunnel-connectors-v2/connector-file/connector-file-base-hadoop/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/hdfs/sink/BaseHdfsFileSink.java @@ -26,6 +26,7 @@ import org.apache.seatunnel.common.constants.PluginType; import org.apache.seatunnel.connectors.seatunnel.file.config.HadoopConf; import org.apache.seatunnel.connectors.seatunnel.file.exception.FileConnectorException; +import org.apache.seatunnel.connectors.seatunnel.file.hdfs.source.config.HdfsSourceConfig; import org.apache.seatunnel.connectors.seatunnel.file.sink.BaseFileSink; import org.apache.seatunnel.shade.com.typesafe.config.Config; @@ -42,5 +43,8 @@ public void prepare(Config pluginConfig) throws PrepareFailException { } super.prepare(pluginConfig); hadoopConf = new HadoopConf(pluginConfig.getString(FS_DEFAULT_NAME_KEY)); + if (pluginConfig.hasPath(HdfsSourceConfig.HDFS_SITE_PATH.key())) { + hadoopConf.setHdfsSitePath(pluginConfig.getString(HdfsSourceConfig.HDFS_SITE_PATH.key())); + } } } diff --git a/seatunnel-connectors-v2/connector-file/connector-file-base-hadoop/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/hdfs/source/BaseHdfsFileSource.java b/seatunnel-connectors-v2/connector-file/connector-file-base-hadoop/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/hdfs/source/BaseHdfsFileSource.java index 75e9cea1d9e..4e67cf9bbb1 100644 --- a/seatunnel-connectors-v2/connector-file/connector-file-base-hadoop/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/hdfs/source/BaseHdfsFileSource.java +++ b/seatunnel-connectors-v2/connector-file/connector-file-base-hadoop/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/hdfs/source/BaseHdfsFileSource.java @@ -52,6 +52,9 @@ public void prepare(Config pluginConfig) throws PrepareFailException { readStrategy.setPluginConfig(pluginConfig); String path = pluginConfig.getString(HdfsSourceConfig.FILE_PATH.key()); hadoopConf = new HadoopConf(pluginConfig.getString(HdfsSourceConfig.DEFAULT_FS.key())); + if (pluginConfig.hasPath(HdfsSourceConfig.HDFS_SITE_PATH.key())) { + hadoopConf.setHdfsSitePath(pluginConfig.getString(HdfsSourceConfig.HDFS_SITE_PATH.key())); + } try { filePaths = readStrategy.getFileNamesByPath(hadoopConf, path); } catch (IOException e) { diff --git a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/BaseSinkConfig.java b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/BaseSinkConfig.java index 2caed7ffa6a..658b0a22b2a 100644 --- a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/BaseSinkConfig.java +++ b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/BaseSinkConfig.java @@ -112,4 +112,8 @@ public class BaseSinkConfig { .intType() .defaultValue(DEFAULT_BATCH_SIZE) .withDescription("The batch size of each split file"); + public static final Option HDFS_SITE_PATH = Options.key("hdfs_site_path") + .stringType() + .noDefaultValue() + .withDescription("The path of hdfs-site.xml"); } diff --git a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/BaseSourceConfig.java b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/BaseSourceConfig.java index 5ac2cf2828e..7733e604929 100644 --- a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/BaseSourceConfig.java +++ b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/BaseSourceConfig.java @@ -52,4 +52,8 @@ public class BaseSourceConfig { .booleanType() .defaultValue(true) .withDescription("Whether parse partition fields from file path"); + public static final Option HDFS_SITE_PATH = Options.key("hdfs_site_path") + .stringType() + .noDefaultValue() + .withDescription("The path of hdfs-site.xml"); } diff --git a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/HadoopConf.java b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/HadoopConf.java index 2c59d46ee78..baeb7a3add8 100644 --- a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/HadoopConf.java +++ b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/HadoopConf.java @@ -19,6 +19,7 @@ import lombok.Data; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; import java.io.Serializable; import java.util.HashMap; @@ -30,6 +31,7 @@ public class HadoopConf implements Serializable { private static final String SCHEMA = "hdfs"; protected Map extraOptions = new HashMap<>(); protected String hdfsNameKey; + protected String hdfsSitePath; public HadoopConf(String hdfsNameKey) { this.hdfsNameKey = hdfsNameKey; @@ -47,5 +49,8 @@ public void setExtraOptionsForConfiguration(Configuration configuration) { if (!extraOptions.isEmpty()) { extraOptions.forEach(configuration::set); } + if (hdfsSitePath != null) { + configuration.addResource(new Path(hdfsSitePath)); + } } }