Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Iceberg]Support specifying S3 path as warehouse dir for Hadoop catalog #24221

Draft
wants to merge 1 commit into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 16 additions & 0 deletions presto-docs/src/main/sphinx/connector/iceberg.rst
Original file line number Diff line number Diff line change
Expand Up @@ -265,6 +265,22 @@ Property Name Description
Otherwise, it will be ignored.
======================================================= ============================================================= ============

Configure the `Amazon S3 <https://prestodb.io/docs/current/connector/hive.html#amazon-s3-configuration>`_
properties to specify a S3 location as the warehouse directory for the Hadoop catalog. This way, both metadata and data
of Iceberg tables are stored in S3. An example configuration includes:

.. code-block:: none

connector.name=iceberg
iceberg.catalog.type=hadoop
iceberg.catalog.warehouse=s3://iceberg_bucket/warehouse

hive.s3.use-instance-credentials=false
hive.s3.aws-access-key=accesskey
hive.s3.aws-secret-key=secretkey
hive.s3.endpoint=http://192.168.0.103:9878
hive.s3.path-style-access=true

Configuration Properties
------------------------

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ public class HiveS3Module
{
private static final String EMR_FS_CLASS_NAME = "com.amazon.ws.emr.hadoop.fs.EmrFileSystem";

private final String connectorId;
protected final String connectorId;

public HiveS3Module(String connectorId)
{
Expand Down Expand Up @@ -80,7 +80,7 @@ public AWSSecurityMappingsSupplier provideAWSSecurityMappingsSupplier(AWSSecurit
return new AWSSecurityMappingsSupplier(config.getConfigFile(), config.getRefreshPeriod());
}

private void bindSecurityMapping(Binder binder)
protected void bindSecurityMapping(Binder binder)
{
if (buildConfigObject(AWSSecurityMappingConfig.class).getConfigFile().isPresent() &&
buildConfigObject(AWSSecurityMappingConfig.class).getMappingType().equals(S3)) {
Expand All @@ -89,7 +89,7 @@ private void bindSecurityMapping(Binder binder)
}
}

private static void validateEmrFsClass()
protected static void validateEmrFsClass()
{
// verify that the class exists
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ public PrestoS3ConfigurationUpdater(HiveS3Config config)
@Override
public void updateConfiguration(Configuration config)
{
// re-map filesystem schemes to match Amazon Elastic MapReduce
// re-map filesystem schemes to match customized presto s3 filesystem
config.set("fs.s3.impl", PrestoS3FileSystem.class.getName());
config.set("fs.s3a.impl", PrestoS3FileSystem.class.getName());
config.set("fs.s3n.impl", PrestoS3FileSystem.class.getName());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -168,16 +168,15 @@ public class PrestoS3FileSystem
private static final String DIRECTORY_SUFFIX = "_$folder$";
private static final DataSize BLOCK_SIZE = new DataSize(32, MEGABYTE);
private static final DataSize MAX_SKIP_SIZE = new DataSize(1, MEGABYTE);
private static final String PATH_SEPARATOR = "/";
protected static final String PATH_SEPARATOR = "/";
private static final Duration BACKOFF_MIN_SLEEP = new Duration(1, SECONDS);
private static final int HTTP_RANGE_NOT_SATISFIABLE = 416;
private static final MediaType X_DIRECTORY_MEDIA_TYPE = MediaType.create("application", "x-directory");
private static final MediaType OCTET_STREAM_MEDIA_TYPE = MediaType.create("application", "octet-stream");
private static final Set<String> GLACIER_STORAGE_CLASSES = ImmutableSet.of(Glacier.toString(), DeepArchive.toString());

private URI uri;
protected URI uri;
private Path workingDirectory;
private AmazonS3 s3;
protected AmazonS3 s3;
private AWSCredentialsProvider credentialsProvider;
private File stagingDirectory;
private int maxAttempts;
Expand Down Expand Up @@ -396,8 +395,7 @@ private static boolean isDirectory(PrestoS3ObjectMetadata metadata)
}

return mediaType.is(X_DIRECTORY_MEDIA_TYPE) ||
(mediaType.is(OCTET_STREAM_MEDIA_TYPE)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is this condition removed?

Copy link
Member Author

@hantangwangd hantangwangd Jan 10, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Object Storage System do not necessarily return content type application/octet-stream, for example, Ozone return a content type binary/octet-stream, see: https://github.com/apache/ozone/blob/master/hadoop-ozone/s3gateway/src/main/java/org/apache/hadoop/ozone/s3/endpoint/ObjectEndpoint.java#L645-L647. Besides, the response examples in amazon s3 api document also used binary/octet-stream as it's content type, see the examples of Sample Response for general purpose buckets in https://docs.aws.amazon.com/AmazonS3/latest/API/API_HeadObject.html.

Moreover, I believe the remaining two conditions can already confirm that this is a directory. So I removed this condition in order to support as many Object Storage Systems as possible.

Copy link
Contributor

@ZacBlanco ZacBlanco Jan 10, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this makes sense, but I'm not sure if there are additional implications. If our tests pass then I am ok with it. Though I will defer to @imjalpreet for any additional thoughts. He is the one that implemented this check previously

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@hantangwangd Could we please verify if it is possible to create an external table on an empty directory in MinIO after removing the current condition?

I added this condition because we noticed that some systems, such as MinIO, were creating empty directories with a content type of application/octet-stream instead of application/x-directory.

For your reference: #20603

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@imjalpreet Thanks for providing the relevant note and issue link. I have verified that we can create an external table on an empty directory in Minio after removing condition 'mediaType.is(OCTET_STREAM_MEDIA_TYPE)', the remaining conditions 'metadata.isKeyNeedsPathSeparator() && objectMetadata.getContentLength() == 0' is capable of handling the directory identification problem described in issue #20310.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Meanwhile, if we do not remove condition 'mediaType.is(OCTET_STREAM_MEDIA_TYPE)', we will get an exception External location must be a directory when trying to create an external table on an empty directory in some other Object Storage Systems like Ozone.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is the empty directory table covered by any test cases that run in CI? If not we should add it. Aside from that I think this PR is good to go

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK, I'll check it out.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@ZacBlanco @imjalpreet I have added test method testCreateExternalTableOnEmptyS3Directory() in BaseTestHiveInsertOverwrite, it verifies the behavior of creating an external hive table on an empty S3 directory. Please take a look when convenient, thanks.

&& metadata.isKeyNeedsPathSeparator()
(metadata.isKeyNeedsPathSeparator()
&& objectMetadata.getContentLength() == 0);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,23 @@ public void testInsertOverwritePartitionedAndBucketedExternalTable()
assertOverwritePartition(externalTableName);
}

@Test
public void testCreateExternalTableOnEmptyS3Directory()
{
String emptyDir = "test-empty-dir-" + randomTableSuffix() + "/";
this.dockerizedS3DataLake.createDirectoryOnS3(emptyDir);
String testTable = getTestTableName();
String tableName = testTable.substring(testTable.lastIndexOf('.') + 1);
computeActual(getCreateTableStatement(
tableName,
"partitioned_by=ARRAY['regionkey']",
"bucketed_by = ARRAY['nationkey']",
"bucket_count = 3",
format("external_location = 's3a://%s/%s'", this.bucketName, emptyDir)));
MaterializedResult materializedRows = computeActual("select * from " + tableName);
assertEquals(materializedRows.getRowCount(), 0);
}

protected void assertOverwritePartition(String testTable)
{
computeActual(format(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ public class HiveMinIODataLake
private final String bucketName;
private final MinIOContainer minIOContainer;
private final HiveHadoopContainer hiveHadoopContainer;
private AmazonS3 s3Client;

private final AtomicBoolean isStarted = new AtomicBoolean(false);
private final AutoCloseableCloser closer = AutoCloseableCloser.create();
Expand Down Expand Up @@ -89,7 +90,7 @@ public void start()
try {
this.minIOContainer.start();
this.hiveHadoopContainer.start();
AmazonS3 s3Client = AmazonS3ClientBuilder
this.s3Client = AmazonS3ClientBuilder
.standard()
.withEndpointConfiguration(new AwsClientBuilder.EndpointConfiguration(
"http://localhost:" + minIOContainer.getMinioApiEndpoint().getPort(),
Expand All @@ -99,6 +100,7 @@ public void start()
new BasicAWSCredentials(ACCESS_KEY, SECRET_KEY)))
.build();
s3Client.createBucket(this.bucketName);
closer.register(s3Client::shutdown);
}
finally {
isStarted.set(true);
Expand Down Expand Up @@ -126,6 +128,11 @@ public void stop()
}
}

public void createDirectoryOnS3(String directoryPath)
{
s3Client.putObject(this.bucketName, directoryPath, "");
}

public MinIOContainer getMinio()
{
return minIOContainer;
Expand Down
22 changes: 22 additions & 0 deletions presto-iceberg/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -598,6 +598,28 @@
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>testcontainers</artifactId>
<scope>test</scope>
<exclusions>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
</exclusion>
</exclusions>
</dependency>

<dependency>
<groupId>com.amazonaws</groupId>
<artifactId>aws-java-sdk-core</artifactId>
</dependency>

<dependency>
<groupId>com.amazonaws</groupId>
<artifactId>aws-java-sdk-s3</artifactId>
</dependency>

kiersten-stokes marked this conversation as resolved.
Show resolved Hide resolved
<dependency>
<groupId>org.apache.iceberg</groupId>
<artifactId>iceberg-core</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
import com.facebook.presto.hive.authentication.HiveAuthenticationModule;
import com.facebook.presto.hive.gcs.HiveGcsModule;
import com.facebook.presto.hive.metastore.ExtendedHiveMetastore;
import com.facebook.presto.hive.s3.HiveS3Module;
import com.facebook.presto.iceberg.s3.IcebergS3Module;
import com.facebook.presto.plugin.base.security.AllowAllAccessControl;
import com.facebook.presto.spi.NodeManager;
import com.facebook.presto.spi.PageIndexerFactory;
Expand Down Expand Up @@ -81,7 +81,7 @@ public static Connector createConnector(
new JsonModule(),
new IcebergCommonModule(catalogName),
new IcebergCatalogModule(catalogName, metastore),
new HiveS3Module(catalogName),
new IcebergS3Module(catalogName),
new HiveGcsModule(),
new HiveAuthenticationModule(),
new CachingModule(),
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.facebook.presto.iceberg.s3;

import com.facebook.presto.hive.HiveClientConfig;
import com.facebook.presto.hive.s3.HiveS3Config;
import com.facebook.presto.hive.s3.HiveS3Module;
import com.facebook.presto.hive.s3.PrestoS3FileSystem;
import com.facebook.presto.hive.s3.PrestoS3FileSystemStats;
import com.facebook.presto.hive.s3.S3ConfigurationUpdater;
import com.facebook.presto.hive.s3.S3FileSystemType;
import com.google.inject.Binder;
import com.google.inject.Scopes;

import static com.facebook.airlift.configuration.ConfigBinder.configBinder;
import static org.weakref.jmx.ObjectNames.generatedNameOf;
import static org.weakref.jmx.guice.ExportBinder.newExporter;

public class IcebergS3Module
extends HiveS3Module
{
public IcebergS3Module(String connectorId)
{
super(connectorId);
}

@Override
protected void setup(Binder binder)
{
S3FileSystemType type = buildConfigObject(HiveClientConfig.class).getS3FileSystemType();
if (type == S3FileSystemType.PRESTO) {
bindSecurityMapping(binder);

binder.bind(S3ConfigurationUpdater.class).to(PrestoIcebergS3ConfigurationUpdater.class).in(Scopes.SINGLETON);
configBinder(binder).bindConfig(HiveS3Config.class);

binder.bind(PrestoS3FileSystemStats.class).toInstance(PrestoS3FileSystem.getFileSystemStats());
newExporter(binder).export(PrestoS3FileSystemStats.class).as(generatedNameOf(PrestoS3FileSystem.class, connectorId));
}
else if (type == S3FileSystemType.EMRFS) {
validateEmrFsClass();
binder.bind(S3ConfigurationUpdater.class).to(EmrFsS3ConfigurationUpdater.class).in(Scopes.SINGLETON);
}
else if (type == S3FileSystemType.HADOOP_DEFAULT) {
// configuration is done using Hadoop configuration files
binder.bind(S3ConfigurationUpdater.class).to(HadoopDefaultConfigurationUpdater.class).in(Scopes.SINGLETON);
}
else {
throw new RuntimeException("Unknown file system type: " + type);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.facebook.presto.iceberg.s3;

import com.amazonaws.AmazonClientException;
import com.facebook.presto.hive.s3.PrestoS3FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsPermission;

public class PrestoIcebergNativeS3FileSystem
extends PrestoS3FileSystem
{
@Override
public boolean mkdirs(Path f, FsPermission permission)
{
try {
s3.putObject(getBucketName(uri), keyFromPath(f) + PATH_SEPARATOR, "");
return true;
}
catch (AmazonClientException e) {
return false;
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.facebook.presto.iceberg.s3;

import com.facebook.presto.hive.s3.HiveS3Config;
import com.facebook.presto.hive.s3.PrestoS3ConfigurationUpdater;
import com.facebook.presto.iceberg.IcebergConfig;
import org.apache.hadoop.conf.Configuration;

import javax.inject.Inject;

import static com.facebook.presto.iceberg.CatalogType.HADOOP;

public class PrestoIcebergS3ConfigurationUpdater
extends PrestoS3ConfigurationUpdater
{
private final IcebergConfig icebergConfig;

@Inject
public PrestoIcebergS3ConfigurationUpdater(HiveS3Config config, IcebergConfig icebergConfig)
{
super(config);
this.icebergConfig = icebergConfig;
}

@Override
public void updateConfiguration(Configuration config)
{
super.updateConfiguration(config);

if (this.icebergConfig.getCatalogType().equals(HADOOP)) {
// re-map filesystem schemes to match customized presto s3 filesystem for `HADOOP` catalog
config.set("fs.s3.impl", PrestoIcebergNativeS3FileSystem.class.getName());
config.set("fs.s3a.impl", PrestoIcebergNativeS3FileSystem.class.getName());
config.set("fs.s3n.impl", PrestoIcebergNativeS3FileSystem.class.getName());
}
}
}
Loading
Loading