Skip to content

Commit

Permalink
[Improve][Connector File][HdfsFile] Improved to create configuration …
Browse files Browse the repository at this point in the history
…with text content in addition to hdfs-site.xml path. (#6)
  • Loading branch information
tedshim authored Feb 14, 2024
1 parent c6aa4a6 commit 3de3a99
Show file tree
Hide file tree
Showing 3 changed files with 24 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,10 @@ public void prepare(Config pluginConfig) throws PrepareFailException {
hadoopConf.setHdfsSitePath(
pluginConfig.getString(HdfsSourceConfig.HDFS_SITE_PATH.key()));
}
if (pluginConfig.hasPath(HdfsSourceConfig.HDFS_SITE_XML_PROPERTIES.key())) {
hadoopConf.setHdfsSiteXmlProperties(
pluginConfig.getString(HdfsSourceConfig.HDFS_SITE_XML_PROPERTIES.key()));
}
if (pluginConfig.hasPath(HdfsSourceConfig.KERBEROS_PRINCIPAL.key())) {
hadoopConf.setKerberosPrincipal(
pluginConfig.getString(HdfsSourceConfig.KERBEROS_PRINCIPAL.key()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,12 @@ public class BaseSourceConfig {
.noDefaultValue()
.withDescription("The path of hdfs-site.xml");

public static final Option<String> HDFS_SITE_XML_PROPERTIES =
Options.key("hdfs_site_xml_properties")
.stringType()
.noDefaultValue()
.withDescription("Full content of hdfs-site.xml; can be used instead of path");

public static final Option<String> KERBEROS_PRINCIPAL =
Options.key("kerberos_principal")
.stringType()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,13 @@
import org.apache.hadoop.fs.Path;

import lombok.Data;
import org.apache.seatunnel.connectors.seatunnel.file.exception.FileConnectorErrorCode;
import org.apache.seatunnel.connectors.seatunnel.file.exception.FileConnectorException;

import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.Serializable;
import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.Map;

Expand All @@ -33,6 +38,7 @@ public class HadoopConf implements Serializable {
protected Map<String, String> extraOptions = new HashMap<>();
protected String hdfsNameKey;
protected String hdfsSitePath;
protected String hdfsSiteXmlProperties;
protected String kerberosPrincipal;
protected String kerberosKeytabPath;

Expand All @@ -55,5 +61,13 @@ public void setExtraOptionsForConfiguration(Configuration configuration) {
if (hdfsSitePath != null) {
configuration.addResource(new Path(hdfsSitePath));
}
if (hdfsSiteXmlProperties != null) {
try (ByteArrayInputStream inputStream =
new ByteArrayInputStream(hdfsSiteXmlProperties.getBytes(StandardCharsets.UTF_8))) {
configuration.addResource(inputStream);
} catch (IOException e) {
throw new FileConnectorException(FileConnectorErrorCode.DATA_DESERIALIZE_FAILED, e.getMessage());
}
}
}
}

0 comments on commit 3de3a99

Please sign in to comment.