Skip to content

Commit

Permalink
[Iceberg]Support s3 path as warehouse dir for hive and hadoop catalog
Browse files Browse the repository at this point in the history
  • Loading branch information
hantangwangd committed Dec 13, 2024
1 parent 004ee32 commit 0ed5c96
Show file tree
Hide file tree
Showing 11 changed files with 295 additions and 26 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,6 @@
import javax.annotation.concurrent.ThreadSafe;
import javax.inject.Inject;

import java.io.File;
import java.io.IOException;
import java.io.OutputStream;
import java.util.ArrayDeque;
Expand Down Expand Up @@ -286,9 +285,19 @@ else if (table.getTableType().equals(EXTERNAL_TABLE)) {
}

if (!table.getTableType().equals(VIRTUAL_VIEW)) {
File location = new File(new Path(table.getStorage().getLocation()).toUri());
checkArgument(location.isDirectory(), "Table location is not a directory: %s", location);
checkArgument(location.exists(), "Table directory does not exist: %s", location);
try {
Path tableLocation = new Path(table.getStorage().getLocation());
FileSystem fileSystem = hdfsEnvironment.getFileSystem(hdfsContext, tableLocation);
if (!fileSystem.isDirectory(tableLocation)) {
throw new PrestoException(HIVE_METASTORE_ERROR, "Table location is not a directory: " + tableLocation);
}
if (!fileSystem.exists(tableLocation)) {
throw new PrestoException(HIVE_METASTORE_ERROR, "Table directory does not exist: " + tableLocation);
}
}
catch (IOException e) {
throw new PrestoException(HIVE_METASTORE_ERROR, "Could not validate table location", e);
}
}

writeSchemaFile("table", tableMetadataDirectory, tableCodec, new TableMetadata(table), false);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -546,8 +546,13 @@ private boolean deleteObject(String key)
@Override
public boolean mkdirs(Path f, FsPermission permission)
{
// no need to do anything for S3
return true;
try {
s3.putObject(getBucketName(uri), keyFromPath(f) + PATH_SEPARATOR, "");
return true;
}
catch (AmazonClientException e) {
return false;
}
}

private enum ListingMode {
Expand Down
22 changes: 22 additions & 0 deletions presto-iceberg/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -601,6 +601,28 @@
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>testcontainers</artifactId>
<scope>test</scope>
<exclusions>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
</exclusion>
</exclusions>
</dependency>

<dependency>
<groupId>com.amazonaws</groupId>
<artifactId>aws-java-sdk-core</artifactId>
</dependency>

<dependency>
<groupId>com.amazonaws</groupId>
<artifactId>aws-java-sdk-s3</artifactId>
</dependency>

<dependency>
<groupId>org.apache.iceberg</groupId>
<artifactId>iceberg-core</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,9 @@
import com.facebook.presto.hive.HiveHdfsConfiguration;
import com.facebook.presto.hive.MetastoreClientConfig;
import com.facebook.presto.hive.authentication.NoHdfsAuthentication;
import com.facebook.presto.hive.s3.HiveS3Config;
import com.facebook.presto.hive.s3.PrestoS3ConfigurationUpdater;
import com.facebook.presto.hive.s3.S3ConfigurationUpdater;
import com.facebook.presto.iceberg.delete.DeleteFile;
import com.facebook.presto.metadata.CatalogMetadata;
import com.facebook.presto.metadata.Metadata;
Expand Down Expand Up @@ -2057,13 +2060,19 @@ private void writeEqualityDeleteToNationTable(Table icebergTable, Map<String, Ob
icebergTable.newRowDelta().addDeletes(writer.toDeleteFile()).commit();
}

public static HdfsEnvironment getHdfsEnvironment()
protected HdfsEnvironment getHdfsEnvironment()
{
HiveClientConfig hiveClientConfig = new HiveClientConfig();
MetastoreClientConfig metastoreClientConfig = new MetastoreClientConfig();
HdfsConfiguration hdfsConfiguration = new HiveHdfsConfiguration(new HdfsConfigurationInitializer(hiveClientConfig, metastoreClientConfig),
ImmutableSet.of(),
hiveClientConfig);
HiveS3Config hiveS3Config = new HiveS3Config();
return getHdfsEnvironment(hiveClientConfig, metastoreClientConfig, hiveS3Config);
}

public static HdfsEnvironment getHdfsEnvironment(HiveClientConfig hiveClientConfig, MetastoreClientConfig metastoreClientConfig, HiveS3Config hiveS3Config)
{
S3ConfigurationUpdater s3ConfigurationUpdater = new PrestoS3ConfigurationUpdater(hiveS3Config);
HdfsConfiguration hdfsConfiguration = new HiveHdfsConfiguration(new HdfsConfigurationInitializer(hiveClientConfig, metastoreClientConfig, s3ConfigurationUpdater, ignored -> {}),
ImmutableSet.of(), hiveClientConfig);
return new HdfsEnvironment(hdfsConfiguration, metastoreClientConfig, new NoHdfsAuthentication());
}

Expand All @@ -2079,24 +2088,25 @@ private Table updateTable(String tableName)

protected Table loadTable(String tableName)
{
Catalog catalog = CatalogUtil.loadCatalog(catalogType.getCatalogImpl(), "test-hive", getProperties(), new Configuration());
Configuration configuration = getHdfsEnvironment().getConfiguration(new HdfsContext(SESSION), getCatalogDirectory());
Catalog catalog = CatalogUtil.loadCatalog(catalogType.getCatalogImpl(), "test-hive", getProperties(), configuration);
return catalog.loadTable(TableIdentifier.of("tpch", tableName));
}

protected Map<String, String> getProperties()
{
File metastoreDir = getCatalogDirectory();
org.apache.hadoop.fs.Path metastoreDir = getCatalogDirectory();
return ImmutableMap.of("warehouse", metastoreDir.toString());
}

protected File getCatalogDirectory()
protected org.apache.hadoop.fs.Path getCatalogDirectory()
{
Path dataDirectory = getDistributedQueryRunner().getCoordinator().getDataDirectory();
switch (catalogType) {
case HIVE:
case HADOOP:
case NESSIE:
return getIcebergDataDirectoryPath(dataDirectory, catalogType.name(), new IcebergConfig().getFileFormat(), false).toFile();
return new org.apache.hadoop.fs.Path(getIcebergDataDirectoryPath(dataDirectory, catalogType.name(), new IcebergConfig().getFileFormat(), false).toFile().toURI());
}

throw new PrestoException(NOT_SUPPORTED, "Unsupported Presto Iceberg catalog type " + catalogType);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
import java.util.OptionalInt;
Expand Down Expand Up @@ -202,13 +203,11 @@ public static DistributedQueryRunner createIcebergQueryRunner(
String catalogType = extraConnectorProperties.getOrDefault("iceberg.catalog.type", HIVE.name());
Path icebergDataDirectory = getIcebergDataDirectoryPath(queryRunner.getCoordinator().getDataDirectory(), catalogType, format, addStorageFormatToPath);

Map<String, String> icebergProperties = ImmutableMap.<String, String>builder()
.put("iceberg.file-format", format.name())
.putAll(getConnectorProperties(CatalogType.valueOf(catalogType), icebergDataDirectory))
.putAll(extraConnectorProperties)
.build();

queryRunner.createCatalog(ICEBERG_CATALOG, "iceberg", icebergProperties);
Map<String, String> icebergProperties = new HashMap<>();
icebergProperties.put("iceberg.file-format", format.name());
icebergProperties.putAll(getConnectorProperties(CatalogType.valueOf(catalogType), icebergDataDirectory));
icebergProperties.putAll(extraConnectorProperties);
queryRunner.createCatalog(ICEBERG_CATALOG, "iceberg", ImmutableMap.copyOf(icebergProperties));

if (addJmxPlugin) {
queryRunner.installPlugin(new JmxPlugin());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,13 @@
import com.facebook.presto.hive.FileFormatDataSourceStats;
import com.facebook.presto.hive.HdfsContext;
import com.facebook.presto.hive.HdfsEnvironment;
import com.facebook.presto.hive.HiveClientConfig;
import com.facebook.presto.hive.HiveCompressionCodec;
import com.facebook.presto.hive.HiveDwrfEncryptionProvider;
import com.facebook.presto.hive.MetastoreClientConfig;
import com.facebook.presto.hive.NodeVersion;
import com.facebook.presto.hive.OrcFileWriterConfig;
import com.facebook.presto.hive.s3.HiveS3Config;
import com.facebook.presto.metadata.SessionPropertyManager;
import com.facebook.presto.parquet.FileParquetDataSource;
import com.facebook.presto.parquet.cache.MetadataReader;
Expand Down Expand Up @@ -61,6 +64,7 @@
import static com.facebook.presto.common.type.VarbinaryType.VARBINARY;
import static com.facebook.presto.common.type.VarcharType.VARCHAR;
import static com.facebook.presto.iceberg.IcebergAbstractMetadata.toIcebergSchema;
import static com.facebook.presto.iceberg.IcebergDistributedTestBase.getHdfsEnvironment;
import static com.facebook.presto.iceberg.IcebergQueryRunner.ICEBERG_CATALOG;
import static com.facebook.presto.iceberg.IcebergSessionProperties.dataSizeSessionProperty;
import static com.facebook.presto.metadata.SessionPropertyManager.createTestingSessionPropertyManager;
Expand Down Expand Up @@ -113,7 +117,7 @@ public void setup() throws Exception
this.connectorSession = session.toConnectorSession(connectorId);
TypeManager typeManager = new TestingTypeManager();
this.hdfsContext = new HdfsContext(connectorSession);
HdfsEnvironment hdfsEnvironment = IcebergDistributedTestBase.getHdfsEnvironment();
HdfsEnvironment hdfsEnvironment = getHdfsEnvironment(new HiveClientConfig(), new MetastoreClientConfig(), new HiveS3Config());
this.icebergFileWriterFactory = new IcebergFileWriterFactory(hdfsEnvironment, typeManager,
new FileFormatDataSourceStats(), new NodeVersion("test"), new OrcFileWriterConfig(), HiveDwrfEncryptionProvider.NO_ENCRYPTION);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.facebook.presto.iceberg.container;

import com.amazonaws.auth.AWSStaticCredentialsProvider;
import com.amazonaws.auth.BasicAWSCredentials;
import com.amazonaws.client.builder.AwsClientBuilder;
import com.amazonaws.services.s3.AmazonS3;
import com.amazonaws.services.s3.AmazonS3ClientBuilder;
import com.facebook.presto.testing.containers.MinIOContainer;
import com.facebook.presto.util.AutoCloseableCloser;
import com.google.common.collect.ImmutableMap;
import org.testcontainers.containers.Network;

import java.io.Closeable;
import java.io.IOException;
import java.util.concurrent.atomic.AtomicBoolean;

import static java.util.Objects.requireNonNull;
import static org.testcontainers.containers.Network.newNetwork;

public class IcebergMinIODataLake
implements Closeable
{
public static final String ACCESS_KEY = "accesskey";
public static final String SECRET_KEY = "secretkey";

private final String bucketName;
private final String warehouseDir;
private final MinIOContainer minIOContainer;

private final AtomicBoolean isStarted = new AtomicBoolean(false);
private final AutoCloseableCloser closer = AutoCloseableCloser.create();

public IcebergMinIODataLake(String bucketName, String warehouseDir)
{
this.bucketName = requireNonNull(bucketName, "bucketName is null");
this.warehouseDir = requireNonNull(warehouseDir, "warehouseDir is null");
Network network = closer.register(newNetwork());
this.minIOContainer = closer.register(
MinIOContainer.builder()
.withNetwork(network)
.withEnvVars(ImmutableMap.<String, String>builder()
.put("MINIO_ACCESS_KEY", ACCESS_KEY)
.put("MINIO_SECRET_KEY", SECRET_KEY)
.build())
.build());
}

public void start()
{
if (isStarted()) {
return;
}
try {
this.minIOContainer.start();
AmazonS3 s3Client = AmazonS3ClientBuilder
.standard()
.withEndpointConfiguration(new AwsClientBuilder.EndpointConfiguration(
"http://localhost:" + minIOContainer.getMinioApiEndpoint().getPort(),
"us-east-1"))
.withPathStyleAccessEnabled(true)
.withCredentials(new AWSStaticCredentialsProvider(
new BasicAWSCredentials(ACCESS_KEY, SECRET_KEY)))
.build();
s3Client.createBucket(this.bucketName);
s3Client.putObject(this.bucketName, this.warehouseDir, "");
}
finally {
isStarted.set(true);
}
}

public boolean isStarted()
{
return isStarted.get();
}

public void stop()
{
if (!isStarted()) {
return;
}
try {
closer.close();
}
catch (Exception e) {
throw new RuntimeException("Failed to stop HiveMinioDataLake", e);
}
finally {
isStarted.set(false);
}
}

public MinIOContainer getMinio()
{
return minIOContainer;
}

@Override
public void close()
throws IOException
{
stop();
}
}
Loading

0 comments on commit 0ed5c96

Please sign in to comment.