Skip to content

Commit

Permalink
[Improve][Connector-V2][Iceberg] Support for S3 in hadoop catalog
Browse files Browse the repository at this point in the history
  • Loading branch information
4chicat committed Oct 27, 2023
1 parent 3f429e1 commit 7ceedd0
Show file tree
Hide file tree
Showing 7 changed files with 278 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.seatunnel.connectors.seatunnel.iceberg;

import org.apache.seatunnel.common.exception.CommonErrorCode;
import org.apache.seatunnel.connectors.seatunnel.iceberg.config.HadoopConf;
import org.apache.seatunnel.connectors.seatunnel.iceberg.config.IcebergCatalogType;
import org.apache.seatunnel.connectors.seatunnel.iceberg.exception.IcebergConnectorException;

Expand All @@ -44,6 +45,8 @@ public class IcebergCatalogFactory implements Serializable {
private final String warehouse;
private final String uri;

private HadoopConf hadoopConf;

public IcebergCatalogFactory(
@NonNull String catalogName,
@NonNull IcebergCatalogType catalogType,
Expand All @@ -55,8 +58,27 @@ public IcebergCatalogFactory(
this.uri = uri;
}

public IcebergCatalogFactory(
@NonNull String catalogName,
@NonNull IcebergCatalogType catalogType,
@NonNull String warehouse,
String uri,
HadoopConf hadoopConf) {
this.catalogName = catalogName;
this.catalogType = catalogType;
this.warehouse = warehouse;
this.uri = uri;
this.hadoopConf = hadoopConf;
}

public Catalog create() {
Configuration conf = new Configuration();

if (hadoopConf != null) {
Map<String, String> extraOptions = hadoopConf.getExtraOptions();
extraOptions.forEach((key, value) -> conf.set(key, value));
}

SerializableConfiguration serializableConf = new SerializableConfiguration(conf);
Map<String, String> properties = new HashMap<>();
properties.put(CatalogProperties.WAREHOUSE_LOCATION, warehouse);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package org.apache.seatunnel.connectors.seatunnel.iceberg;

import org.apache.seatunnel.connectors.seatunnel.iceberg.config.S3Conf;
import org.apache.seatunnel.connectors.seatunnel.iceberg.config.SourceConfig;

import org.apache.iceberg.CachingCatalog;
Expand Down Expand Up @@ -73,7 +74,8 @@ public static IcebergTableLoader create(SourceConfig sourceConfig) {
sourceConfig.getCatalogName(),
sourceConfig.getCatalogType(),
sourceConfig.getWarehouse(),
sourceConfig.getUri());
sourceConfig.getUri(),
S3Conf.buildWithConfig(sourceConfig.getPluginConfig()));
return new IcebergTableLoader(
catalogFactory,
TableIdentifier.of(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,8 @@ public class CommonConfig implements Serializable {
private String table;
private boolean caseSensitive;

private Config pluginConfig;

public CommonConfig(Config pluginConfig) {
String catalogType = checkArgumentNotNull(pluginConfig.getString(KEY_CATALOG_TYPE.key()));
checkArgument(
Expand All @@ -105,6 +107,8 @@ public CommonConfig(Config pluginConfig) {
if (pluginConfig.hasPath(KEY_CASE_SENSITIVE.key())) {
this.caseSensitive = pluginConfig.getBoolean(KEY_CASE_SENSITIVE.key());
}

this.pluginConfig = pluginConfig;
}

protected <T> T checkArgumentNotNull(T argument) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.seatunnel.connectors.seatunnel.iceberg.config;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;

import lombok.Data;

import java.io.Serializable;
import java.util.HashMap;
import java.util.Map;

@Data
public class HadoopConf implements Serializable {
private static final String HDFS_IMPL = "org.apache.hadoop.hdfs.DistributedFileSystem";
private static final String SCHEMA = "hdfs";
protected Map<String, String> extraOptions = new HashMap<>();
protected String hdfsNameKey;
protected String hdfsSitePath;
protected String kerberosPrincipal;
protected String kerberosKeytabPath;

public HadoopConf(String hdfsNameKey) {
this.hdfsNameKey = hdfsNameKey;
}

public String getFsHdfsImpl() {
return HDFS_IMPL;
}

public String getSchema() {
return SCHEMA;
}

public void setExtraOptionsForConfiguration(Configuration configuration) {
if (!extraOptions.isEmpty()) {
extraOptions.forEach(configuration::set);
}
if (hdfsSitePath != null) {
configuration.addResource(new Path(hdfsSitePath));
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.seatunnel.connectors.seatunnel.iceberg.config;

import org.apache.seatunnel.shade.com.typesafe.config.Config;

import org.apache.seatunnel.common.config.CheckConfigUtil;

import java.util.HashMap;
import java.util.Map;

public class S3Conf extends HadoopConf {
private static final String HDFS_S3N_IMPL = "org.apache.hadoop.fs.s3native.NativeS3FileSystem";
private static final String HDFS_S3A_IMPL = "org.apache.hadoop.fs.s3a.S3AFileSystem";
private static final String S3A_SCHEMA = "s3a";
private static final String DEFAULT_SCHEMA = "s3n";
private static String SCHEMA = DEFAULT_SCHEMA;

@Override
public String getFsHdfsImpl() {
return switchHdfsImpl();
}

@Override
public String getSchema() {
return SCHEMA;
}

private S3Conf(String hdfsNameKey) {
super(hdfsNameKey);
}

public static HadoopConf buildWithConfig(Config config) {

HadoopConf hadoopConf = new S3Conf(config.getString(S3Config.S3_BUCKET.key()));
String bucketName = config.getString(S3Config.S3_BUCKET.key());
if (bucketName.startsWith(S3A_SCHEMA)) {
SCHEMA = S3A_SCHEMA;
}
HashMap<String, String> s3Options = new HashMap<>();
putS3SK(s3Options, config);
if (CheckConfigUtil.isValidParam(config, S3Config.S3_PROPERTIES.key())) {
config.getObject(S3Config.S3_PROPERTIES.key())
.forEach((key, value) -> s3Options.put(key, String.valueOf(value.unwrapped())));
}

s3Options.put(
S3Config.S3A_AWS_CREDENTIALS_PROVIDER.key(),
config.getString(S3Config.S3A_AWS_CREDENTIALS_PROVIDER.key()));
s3Options.put(
S3Config.FS_S3A_ENDPOINT.key(), config.getString(S3Config.FS_S3A_ENDPOINT.key()));
hadoopConf.setExtraOptions(s3Options);
return hadoopConf;
}

private String switchHdfsImpl() {
switch (SCHEMA) {
case S3A_SCHEMA:
return HDFS_S3A_IMPL;
default:
return HDFS_S3N_IMPL;
}
}

private static void putS3SK(Map<String, String> s3Options, Config config) {
if (!CheckConfigUtil.isValidParam(config, S3Config.S3_ACCESS_KEY.key())
&& !CheckConfigUtil.isValidParam(config, S3Config.S3_SECRET_KEY.key())) {
return;
}
String accessKey = config.getString(S3Config.S3_ACCESS_KEY.key());
String secretKey = config.getString(S3Config.S3_SECRET_KEY.key());
if (S3A_SCHEMA.equals(SCHEMA)) {
s3Options.put("fs.s3a.access.key", accessKey);
s3Options.put("fs.s3a.secret.key", secretKey);
return;
}
// default s3n
s3Options.put("fs.s3n.awsAccessKeyId", accessKey);
s3Options.put("fs.s3n.awsSecretAccessKey", secretKey);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.seatunnel.connectors.seatunnel.iceberg.config;

import org.apache.seatunnel.api.configuration.Option;
import org.apache.seatunnel.api.configuration.Options;

import java.util.Map;

public class S3Config {
public static final Option<String> S3_ACCESS_KEY =
Options.key("access_key")
.stringType()
.noDefaultValue()
.withDescription("S3 access key");
public static final Option<String> S3_SECRET_KEY =
Options.key("secret_key")
.stringType()
.noDefaultValue()
.withDescription("S3 secret key");
public static final Option<String> S3_BUCKET =
Options.key("bucket").stringType().noDefaultValue().withDescription("S3 bucket");
public static final Option<String> FS_S3A_ENDPOINT =
Options.key("fs.s3a.endpoint")
.stringType()
.noDefaultValue()
.withDescription("fs s3a endpoint");

public static final Option<S3aAwsCredentialsProvider> S3A_AWS_CREDENTIALS_PROVIDER =
Options.key("fs.s3a.aws.credentials.provider")
.enumType(S3aAwsCredentialsProvider.class)
.defaultValue(S3aAwsCredentialsProvider.InstanceProfileCredentialsProvider)
.withDescription("s3a aws credentials provider");

/**
* The current key for that config option. if you need to add a new option, you can add it here
* and refer to this:
*
* <p>https://hadoop.apache.org/docs/stable/hadoop-aws/tools/hadoop-aws/index.html
*
* <p>such as: key = "fs.s3a.session.token" value = "SECRET-SESSION-TOKEN"
*/
public static final Option<Map<String, String>> S3_PROPERTIES =
Options.key("hadoop_s3_properties")
.mapType()
.noDefaultValue()
.withDescription("S3 properties");

public enum S3aAwsCredentialsProvider {
SimpleAWSCredentialsProvider("org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider"),

InstanceProfileCredentialsProvider("com.amazonaws.auth.InstanceProfileCredentialsProvider");

private String provider;

S3aAwsCredentialsProvider(String provider) {
this.provider = provider;
}

public String getProvider() {
return provider;
}

@Override
public String toString() {
return provider;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.apache.seatunnel.api.table.catalog.schema.TableSchemaOptions;
import org.apache.seatunnel.api.table.factory.Factory;
import org.apache.seatunnel.api.table.factory.TableSourceFactory;
import org.apache.seatunnel.connectors.seatunnel.iceberg.config.S3Config;

import com.google.auto.service.AutoService;

Expand Down Expand Up @@ -63,6 +64,15 @@ public OptionRule optionRule() {
KEY_USE_SNAPSHOT_ID,
KEY_USE_SNAPSHOT_TIMESTAMP,
KEY_STREAM_SCAN_STRATEGY)
.optional(S3Config.S3_BUCKET)
.optional(S3Config.FS_S3A_ENDPOINT)
.optional(S3Config.S3A_AWS_CREDENTIALS_PROVIDER)
.conditional(
S3Config.S3A_AWS_CREDENTIALS_PROVIDER,
S3Config.S3aAwsCredentialsProvider.SimpleAWSCredentialsProvider,
S3Config.S3_ACCESS_KEY,
S3Config.S3_SECRET_KEY)
.optional(S3Config.S3_PROPERTIES)
.build();
}

Expand Down

0 comments on commit 7ceedd0

Please sign in to comment.