diff --git a/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/IcebergCatalogFactory.java b/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/IcebergCatalogFactory.java index 862734894098..ead2d40dbd49 100644 --- a/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/IcebergCatalogFactory.java +++ b/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/IcebergCatalogFactory.java @@ -18,6 +18,7 @@ package org.apache.seatunnel.connectors.seatunnel.iceberg; import org.apache.seatunnel.common.exception.CommonErrorCode; +import org.apache.seatunnel.connectors.seatunnel.iceberg.config.HadoopConf; import org.apache.seatunnel.connectors.seatunnel.iceberg.config.IcebergCatalogType; import org.apache.seatunnel.connectors.seatunnel.iceberg.exception.IcebergConnectorException; @@ -44,6 +45,8 @@ public class IcebergCatalogFactory implements Serializable { private final String warehouse; private final String uri; + private HadoopConf hadoopConf; + public IcebergCatalogFactory( @NonNull String catalogName, @NonNull IcebergCatalogType catalogType, @@ -55,8 +58,27 @@ public IcebergCatalogFactory( this.uri = uri; } + public IcebergCatalogFactory( + @NonNull String catalogName, + @NonNull IcebergCatalogType catalogType, + @NonNull String warehouse, + String uri, + HadoopConf hadoopConf) { + this.catalogName = catalogName; + this.catalogType = catalogType; + this.warehouse = warehouse; + this.uri = uri; + this.hadoopConf = hadoopConf; + } + public Catalog create() { Configuration conf = new Configuration(); + + if (hadoopConf != null) { + Map extraOptions = hadoopConf.getExtraOptions(); + extraOptions.forEach((key, value) -> conf.set(key, value)); + } + SerializableConfiguration serializableConf = new SerializableConfiguration(conf); Map properties = new HashMap<>(); properties.put(CatalogProperties.WAREHOUSE_LOCATION, warehouse); diff --git a/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/IcebergTableLoader.java b/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/IcebergTableLoader.java index 554dd0bd7599..7162c8f099ee 100644 --- a/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/IcebergTableLoader.java +++ b/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/IcebergTableLoader.java @@ -17,6 +17,7 @@ package org.apache.seatunnel.connectors.seatunnel.iceberg; +import org.apache.seatunnel.connectors.seatunnel.iceberg.config.S3Conf; import org.apache.seatunnel.connectors.seatunnel.iceberg.config.SourceConfig; import org.apache.iceberg.CachingCatalog; @@ -73,7 +74,8 @@ public static IcebergTableLoader create(SourceConfig sourceConfig) { sourceConfig.getCatalogName(), sourceConfig.getCatalogType(), sourceConfig.getWarehouse(), - sourceConfig.getUri()); + sourceConfig.getUri(), + S3Conf.buildWithConfig(sourceConfig.getPluginConfig())); return new IcebergTableLoader( catalogFactory, TableIdentifier.of( diff --git a/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/config/CommonConfig.java b/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/config/CommonConfig.java index 2f893da092bf..b8407dbe0195 100644 --- a/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/config/CommonConfig.java +++ b/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/config/CommonConfig.java @@ -87,6 +87,8 @@ public class CommonConfig implements Serializable { private String table; private boolean caseSensitive; + private Config pluginConfig; + public CommonConfig(Config pluginConfig) { String catalogType = checkArgumentNotNull(pluginConfig.getString(KEY_CATALOG_TYPE.key())); checkArgument( @@ -105,6 +107,8 @@ public CommonConfig(Config pluginConfig) { if (pluginConfig.hasPath(KEY_CASE_SENSITIVE.key())) { this.caseSensitive = pluginConfig.getBoolean(KEY_CASE_SENSITIVE.key()); } + + this.pluginConfig = pluginConfig; } protected T checkArgumentNotNull(T argument) { diff --git a/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/config/HadoopConf.java b/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/config/HadoopConf.java new file mode 100644 index 000000000000..2f4b8efd982e --- /dev/null +++ b/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/config/HadoopConf.java @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.iceberg.config; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; + +import lombok.Data; + +import java.io.Serializable; +import java.util.HashMap; +import java.util.Map; + +@Data +public class HadoopConf implements Serializable { + private static final String HDFS_IMPL = "org.apache.hadoop.hdfs.DistributedFileSystem"; + private static final String SCHEMA = "hdfs"; + protected Map extraOptions = new HashMap<>(); + protected String hdfsNameKey; + protected String hdfsSitePath; + protected String kerberosPrincipal; + protected String kerberosKeytabPath; + + public HadoopConf(String hdfsNameKey) { + this.hdfsNameKey = hdfsNameKey; + } + + public String getFsHdfsImpl() { + return HDFS_IMPL; + } + + public String getSchema() { + return SCHEMA; + } + + public void setExtraOptionsForConfiguration(Configuration configuration) { + if (!extraOptions.isEmpty()) { + extraOptions.forEach(configuration::set); + } + if (hdfsSitePath != null) { + configuration.addResource(new Path(hdfsSitePath)); + } + } +} diff --git a/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/config/S3Conf.java b/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/config/S3Conf.java new file mode 100644 index 000000000000..d76104adf111 --- /dev/null +++ b/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/config/S3Conf.java @@ -0,0 +1,96 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.iceberg.config; + +import org.apache.seatunnel.shade.com.typesafe.config.Config; + +import org.apache.seatunnel.common.config.CheckConfigUtil; + +import java.util.HashMap; +import java.util.Map; + +public class S3Conf extends HadoopConf { + private static final String HDFS_S3N_IMPL = "org.apache.hadoop.fs.s3native.NativeS3FileSystem"; + private static final String HDFS_S3A_IMPL = "org.apache.hadoop.fs.s3a.S3AFileSystem"; + private static final String S3A_SCHEMA = "s3a"; + private static final String DEFAULT_SCHEMA = "s3n"; + private static String SCHEMA = DEFAULT_SCHEMA; + + @Override + public String getFsHdfsImpl() { + return switchHdfsImpl(); + } + + @Override + public String getSchema() { + return SCHEMA; + } + + private S3Conf(String hdfsNameKey) { + super(hdfsNameKey); + } + + public static HadoopConf buildWithConfig(Config config) { + + HadoopConf hadoopConf = new S3Conf(config.getString(S3Config.S3_BUCKET.key())); + String bucketName = config.getString(S3Config.S3_BUCKET.key()); + if (bucketName.startsWith(S3A_SCHEMA)) { + SCHEMA = S3A_SCHEMA; + } + HashMap s3Options = new HashMap<>(); + putS3SK(s3Options, config); + if (CheckConfigUtil.isValidParam(config, S3Config.S3_PROPERTIES.key())) { + config.getObject(S3Config.S3_PROPERTIES.key()) + .forEach((key, value) -> s3Options.put(key, String.valueOf(value.unwrapped()))); + } + + s3Options.put( + S3Config.S3A_AWS_CREDENTIALS_PROVIDER.key(), + config.getString(S3Config.S3A_AWS_CREDENTIALS_PROVIDER.key())); + s3Options.put( + S3Config.FS_S3A_ENDPOINT.key(), config.getString(S3Config.FS_S3A_ENDPOINT.key())); + hadoopConf.setExtraOptions(s3Options); + return hadoopConf; + } + + private String switchHdfsImpl() { + switch (SCHEMA) { + case S3A_SCHEMA: + return HDFS_S3A_IMPL; + default: + return HDFS_S3N_IMPL; + } + } + + private static void putS3SK(Map s3Options, Config config) { + if (!CheckConfigUtil.isValidParam(config, S3Config.S3_ACCESS_KEY.key()) + && !CheckConfigUtil.isValidParam(config, S3Config.S3_SECRET_KEY.key())) { + return; + } + String accessKey = config.getString(S3Config.S3_ACCESS_KEY.key()); + String secretKey = config.getString(S3Config.S3_SECRET_KEY.key()); + if (S3A_SCHEMA.equals(SCHEMA)) { + s3Options.put("fs.s3a.access.key", accessKey); + s3Options.put("fs.s3a.secret.key", secretKey); + return; + } + // default s3n + s3Options.put("fs.s3n.awsAccessKeyId", accessKey); + s3Options.put("fs.s3n.awsSecretAccessKey", secretKey); + } +} diff --git a/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/config/S3Config.java b/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/config/S3Config.java new file mode 100644 index 000000000000..cd53a7fae4d9 --- /dev/null +++ b/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/config/S3Config.java @@ -0,0 +1,84 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.iceberg.config; + +import org.apache.seatunnel.api.configuration.Option; +import org.apache.seatunnel.api.configuration.Options; + +import java.util.Map; + +public class S3Config { + public static final Option S3_ACCESS_KEY = + Options.key("access_key") + .stringType() + .noDefaultValue() + .withDescription("S3 access key"); + public static final Option S3_SECRET_KEY = + Options.key("secret_key") + .stringType() + .noDefaultValue() + .withDescription("S3 secret key"); + public static final Option S3_BUCKET = + Options.key("bucket").stringType().noDefaultValue().withDescription("S3 bucket"); + public static final Option FS_S3A_ENDPOINT = + Options.key("fs.s3a.endpoint") + .stringType() + .noDefaultValue() + .withDescription("fs s3a endpoint"); + + public static final Option S3A_AWS_CREDENTIALS_PROVIDER = + Options.key("fs.s3a.aws.credentials.provider") + .enumType(S3aAwsCredentialsProvider.class) + .defaultValue(S3aAwsCredentialsProvider.InstanceProfileCredentialsProvider) + .withDescription("s3a aws credentials provider"); + + /** + * The current key for that config option. if you need to add a new option, you can add it here + * and refer to this: + * + *

https://hadoop.apache.org/docs/stable/hadoop-aws/tools/hadoop-aws/index.html + * + *

such as: key = "fs.s3a.session.token" value = "SECRET-SESSION-TOKEN" + */ + public static final Option> S3_PROPERTIES = + Options.key("hadoop_s3_properties") + .mapType() + .noDefaultValue() + .withDescription("S3 properties"); + + public enum S3aAwsCredentialsProvider { + SimpleAWSCredentialsProvider("org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider"), + + InstanceProfileCredentialsProvider("com.amazonaws.auth.InstanceProfileCredentialsProvider"); + + private String provider; + + S3aAwsCredentialsProvider(String provider) { + this.provider = provider; + } + + public String getProvider() { + return provider; + } + + @Override + public String toString() { + return provider; + } + } +} diff --git a/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/source/IcebergSourceFactory.java b/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/source/IcebergSourceFactory.java index 1318ab8a9d72..20481d5fb222 100644 --- a/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/source/IcebergSourceFactory.java +++ b/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/source/IcebergSourceFactory.java @@ -22,6 +22,7 @@ import org.apache.seatunnel.api.table.catalog.schema.TableSchemaOptions; import org.apache.seatunnel.api.table.factory.Factory; import org.apache.seatunnel.api.table.factory.TableSourceFactory; +import org.apache.seatunnel.connectors.seatunnel.iceberg.config.S3Config; import com.google.auto.service.AutoService; @@ -63,6 +64,15 @@ public OptionRule optionRule() { KEY_USE_SNAPSHOT_ID, KEY_USE_SNAPSHOT_TIMESTAMP, KEY_STREAM_SCAN_STRATEGY) + .optional(S3Config.S3_BUCKET) + .optional(S3Config.FS_S3A_ENDPOINT) + .optional(S3Config.S3A_AWS_CREDENTIALS_PROVIDER) + .conditional( + S3Config.S3A_AWS_CREDENTIALS_PROVIDER, + S3Config.S3aAwsCredentialsProvider.SimpleAWSCredentialsProvider, + S3Config.S3_ACCESS_KEY, + S3Config.S3_SECRET_KEY) + .optional(S3Config.S3_PROPERTIES) .build(); }