From 0ed5c9664a7711335e7d576d960019215dab6330 Mon Sep 17 00:00:00 2001 From: wangd Date: Fri, 13 Dec 2024 15:23:27 +0800 Subject: [PATCH] [Iceberg]Support s3 path as warehouse dir for hive and hadoop catalog --- .../metastore/file/FileHiveMetastore.java | 17 ++- .../presto/hive/s3/PrestoS3FileSystem.java | 9 +- presto-iceberg/pom.xml | 22 ++++ .../iceberg/IcebergDistributedTestBase.java | 26 ++-- .../presto/iceberg/IcebergQueryRunner.java | 13 +- .../presto/iceberg/TestIcebergFileWriter.java | 6 +- .../container/IcebergMinIODataLake.java | 117 ++++++++++++++++++ .../TestIcebergDistributedOnS3Hadoop.java | 100 +++++++++++++++ .../hive/TestIcebergDistributedHive.java | 2 +- .../nessie/TestIcebergDistributedNessie.java | 4 +- .../iceberg/rest/IcebergRestTestUtil.java | 5 +- 11 files changed, 295 insertions(+), 26 deletions(-) create mode 100644 presto-iceberg/src/test/java/com/facebook/presto/iceberg/container/IcebergMinIODataLake.java create mode 100644 presto-iceberg/src/test/java/com/facebook/presto/iceberg/hadoop/TestIcebergDistributedOnS3Hadoop.java diff --git a/presto-hive-metastore/src/main/java/com/facebook/presto/hive/metastore/file/FileHiveMetastore.java b/presto-hive-metastore/src/main/java/com/facebook/presto/hive/metastore/file/FileHiveMetastore.java index 5ed409e353669..a0f861f15cb80 100644 --- a/presto-hive-metastore/src/main/java/com/facebook/presto/hive/metastore/file/FileHiveMetastore.java +++ b/presto-hive-metastore/src/main/java/com/facebook/presto/hive/metastore/file/FileHiveMetastore.java @@ -68,7 +68,6 @@ import javax.annotation.concurrent.ThreadSafe; import javax.inject.Inject; -import java.io.File; import java.io.IOException; import java.io.OutputStream; import java.util.ArrayDeque; @@ -286,9 +285,19 @@ else if (table.getTableType().equals(EXTERNAL_TABLE)) { } if (!table.getTableType().equals(VIRTUAL_VIEW)) { - File location = new File(new Path(table.getStorage().getLocation()).toUri()); - checkArgument(location.isDirectory(), "Table location is not a directory: %s", location); - checkArgument(location.exists(), "Table directory does not exist: %s", location); + try { + Path tableLocation = new Path(table.getStorage().getLocation()); + FileSystem fileSystem = hdfsEnvironment.getFileSystem(hdfsContext, tableLocation); + if (!fileSystem.isDirectory(tableLocation)) { + throw new PrestoException(HIVE_METASTORE_ERROR, "Table location is not a directory: " + tableLocation); + } + if (!fileSystem.exists(tableLocation)) { + throw new PrestoException(HIVE_METASTORE_ERROR, "Table directory does not exist: " + tableLocation); + } + } + catch (IOException e) { + throw new PrestoException(HIVE_METASTORE_ERROR, "Could not validate table location", e); + } } writeSchemaFile("table", tableMetadataDirectory, tableCodec, new TableMetadata(table), false); diff --git a/presto-hive/src/main/java/com/facebook/presto/hive/s3/PrestoS3FileSystem.java b/presto-hive/src/main/java/com/facebook/presto/hive/s3/PrestoS3FileSystem.java index 2cda2c392e956..94fdf743be663 100644 --- a/presto-hive/src/main/java/com/facebook/presto/hive/s3/PrestoS3FileSystem.java +++ b/presto-hive/src/main/java/com/facebook/presto/hive/s3/PrestoS3FileSystem.java @@ -546,8 +546,13 @@ private boolean deleteObject(String key) @Override public boolean mkdirs(Path f, FsPermission permission) { - // no need to do anything for S3 - return true; + try { + s3.putObject(getBucketName(uri), keyFromPath(f) + PATH_SEPARATOR, ""); + return true; + } + catch (AmazonClientException e) { + return false; + } } private enum ListingMode { diff --git a/presto-iceberg/pom.xml b/presto-iceberg/pom.xml index 7372a8956f93f..3a0bd277881b6 100644 --- a/presto-iceberg/pom.xml +++ b/presto-iceberg/pom.xml @@ -601,6 +601,28 @@ test + + org.testcontainers + testcontainers + test + + + org.slf4j + slf4j-api + + + + + + com.amazonaws + aws-java-sdk-core + + + + com.amazonaws + aws-java-sdk-s3 + + org.apache.iceberg iceberg-core diff --git a/presto-iceberg/src/test/java/com/facebook/presto/iceberg/IcebergDistributedTestBase.java b/presto-iceberg/src/test/java/com/facebook/presto/iceberg/IcebergDistributedTestBase.java index a20bc099fb0cb..051c8e12c0e09 100644 --- a/presto-iceberg/src/test/java/com/facebook/presto/iceberg/IcebergDistributedTestBase.java +++ b/presto-iceberg/src/test/java/com/facebook/presto/iceberg/IcebergDistributedTestBase.java @@ -29,6 +29,9 @@ import com.facebook.presto.hive.HiveHdfsConfiguration; import com.facebook.presto.hive.MetastoreClientConfig; import com.facebook.presto.hive.authentication.NoHdfsAuthentication; +import com.facebook.presto.hive.s3.HiveS3Config; +import com.facebook.presto.hive.s3.PrestoS3ConfigurationUpdater; +import com.facebook.presto.hive.s3.S3ConfigurationUpdater; import com.facebook.presto.iceberg.delete.DeleteFile; import com.facebook.presto.metadata.CatalogMetadata; import com.facebook.presto.metadata.Metadata; @@ -2057,13 +2060,19 @@ private void writeEqualityDeleteToNationTable(Table icebergTable, Map {}), + ImmutableSet.of(), hiveClientConfig); return new HdfsEnvironment(hdfsConfiguration, metastoreClientConfig, new NoHdfsAuthentication()); } @@ -2079,24 +2088,25 @@ private Table updateTable(String tableName) protected Table loadTable(String tableName) { - Catalog catalog = CatalogUtil.loadCatalog(catalogType.getCatalogImpl(), "test-hive", getProperties(), new Configuration()); + Configuration configuration = getHdfsEnvironment().getConfiguration(new HdfsContext(SESSION), getCatalogDirectory()); + Catalog catalog = CatalogUtil.loadCatalog(catalogType.getCatalogImpl(), "test-hive", getProperties(), configuration); return catalog.loadTable(TableIdentifier.of("tpch", tableName)); } protected Map getProperties() { - File metastoreDir = getCatalogDirectory(); + org.apache.hadoop.fs.Path metastoreDir = getCatalogDirectory(); return ImmutableMap.of("warehouse", metastoreDir.toString()); } - protected File getCatalogDirectory() + protected org.apache.hadoop.fs.Path getCatalogDirectory() { Path dataDirectory = getDistributedQueryRunner().getCoordinator().getDataDirectory(); switch (catalogType) { case HIVE: case HADOOP: case NESSIE: - return getIcebergDataDirectoryPath(dataDirectory, catalogType.name(), new IcebergConfig().getFileFormat(), false).toFile(); + return new org.apache.hadoop.fs.Path(getIcebergDataDirectoryPath(dataDirectory, catalogType.name(), new IcebergConfig().getFileFormat(), false).toFile().toURI()); } throw new PrestoException(NOT_SUPPORTED, "Unsupported Presto Iceberg catalog type " + catalogType); diff --git a/presto-iceberg/src/test/java/com/facebook/presto/iceberg/IcebergQueryRunner.java b/presto-iceberg/src/test/java/com/facebook/presto/iceberg/IcebergQueryRunner.java index 11be7a0e9857c..729b284960f20 100644 --- a/presto-iceberg/src/test/java/com/facebook/presto/iceberg/IcebergQueryRunner.java +++ b/presto-iceberg/src/test/java/com/facebook/presto/iceberg/IcebergQueryRunner.java @@ -43,6 +43,7 @@ import java.nio.file.Files; import java.nio.file.Path; import java.util.Collections; +import java.util.HashMap; import java.util.Map; import java.util.Optional; import java.util.OptionalInt; @@ -202,13 +203,11 @@ public static DistributedQueryRunner createIcebergQueryRunner( String catalogType = extraConnectorProperties.getOrDefault("iceberg.catalog.type", HIVE.name()); Path icebergDataDirectory = getIcebergDataDirectoryPath(queryRunner.getCoordinator().getDataDirectory(), catalogType, format, addStorageFormatToPath); - Map icebergProperties = ImmutableMap.builder() - .put("iceberg.file-format", format.name()) - .putAll(getConnectorProperties(CatalogType.valueOf(catalogType), icebergDataDirectory)) - .putAll(extraConnectorProperties) - .build(); - - queryRunner.createCatalog(ICEBERG_CATALOG, "iceberg", icebergProperties); + Map icebergProperties = new HashMap<>(); + icebergProperties.put("iceberg.file-format", format.name()); + icebergProperties.putAll(getConnectorProperties(CatalogType.valueOf(catalogType), icebergDataDirectory)); + icebergProperties.putAll(extraConnectorProperties); + queryRunner.createCatalog(ICEBERG_CATALOG, "iceberg", ImmutableMap.copyOf(icebergProperties)); if (addJmxPlugin) { queryRunner.installPlugin(new JmxPlugin()); diff --git a/presto-iceberg/src/test/java/com/facebook/presto/iceberg/TestIcebergFileWriter.java b/presto-iceberg/src/test/java/com/facebook/presto/iceberg/TestIcebergFileWriter.java index 734d47172ded8..44fc80cf2a01b 100644 --- a/presto-iceberg/src/test/java/com/facebook/presto/iceberg/TestIcebergFileWriter.java +++ b/presto-iceberg/src/test/java/com/facebook/presto/iceberg/TestIcebergFileWriter.java @@ -25,10 +25,13 @@ import com.facebook.presto.hive.FileFormatDataSourceStats; import com.facebook.presto.hive.HdfsContext; import com.facebook.presto.hive.HdfsEnvironment; +import com.facebook.presto.hive.HiveClientConfig; import com.facebook.presto.hive.HiveCompressionCodec; import com.facebook.presto.hive.HiveDwrfEncryptionProvider; +import com.facebook.presto.hive.MetastoreClientConfig; import com.facebook.presto.hive.NodeVersion; import com.facebook.presto.hive.OrcFileWriterConfig; +import com.facebook.presto.hive.s3.HiveS3Config; import com.facebook.presto.metadata.SessionPropertyManager; import com.facebook.presto.parquet.FileParquetDataSource; import com.facebook.presto.parquet.cache.MetadataReader; @@ -61,6 +64,7 @@ import static com.facebook.presto.common.type.VarbinaryType.VARBINARY; import static com.facebook.presto.common.type.VarcharType.VARCHAR; import static com.facebook.presto.iceberg.IcebergAbstractMetadata.toIcebergSchema; +import static com.facebook.presto.iceberg.IcebergDistributedTestBase.getHdfsEnvironment; import static com.facebook.presto.iceberg.IcebergQueryRunner.ICEBERG_CATALOG; import static com.facebook.presto.iceberg.IcebergSessionProperties.dataSizeSessionProperty; import static com.facebook.presto.metadata.SessionPropertyManager.createTestingSessionPropertyManager; @@ -113,7 +117,7 @@ public void setup() throws Exception this.connectorSession = session.toConnectorSession(connectorId); TypeManager typeManager = new TestingTypeManager(); this.hdfsContext = new HdfsContext(connectorSession); - HdfsEnvironment hdfsEnvironment = IcebergDistributedTestBase.getHdfsEnvironment(); + HdfsEnvironment hdfsEnvironment = getHdfsEnvironment(new HiveClientConfig(), new MetastoreClientConfig(), new HiveS3Config()); this.icebergFileWriterFactory = new IcebergFileWriterFactory(hdfsEnvironment, typeManager, new FileFormatDataSourceStats(), new NodeVersion("test"), new OrcFileWriterConfig(), HiveDwrfEncryptionProvider.NO_ENCRYPTION); } diff --git a/presto-iceberg/src/test/java/com/facebook/presto/iceberg/container/IcebergMinIODataLake.java b/presto-iceberg/src/test/java/com/facebook/presto/iceberg/container/IcebergMinIODataLake.java new file mode 100644 index 0000000000000..b5cf3302fc584 --- /dev/null +++ b/presto-iceberg/src/test/java/com/facebook/presto/iceberg/container/IcebergMinIODataLake.java @@ -0,0 +1,117 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.facebook.presto.iceberg.container; + +import com.amazonaws.auth.AWSStaticCredentialsProvider; +import com.amazonaws.auth.BasicAWSCredentials; +import com.amazonaws.client.builder.AwsClientBuilder; +import com.amazonaws.services.s3.AmazonS3; +import com.amazonaws.services.s3.AmazonS3ClientBuilder; +import com.facebook.presto.testing.containers.MinIOContainer; +import com.facebook.presto.util.AutoCloseableCloser; +import com.google.common.collect.ImmutableMap; +import org.testcontainers.containers.Network; + +import java.io.Closeable; +import java.io.IOException; +import java.util.concurrent.atomic.AtomicBoolean; + +import static java.util.Objects.requireNonNull; +import static org.testcontainers.containers.Network.newNetwork; + +public class IcebergMinIODataLake + implements Closeable +{ + public static final String ACCESS_KEY = "accesskey"; + public static final String SECRET_KEY = "secretkey"; + + private final String bucketName; + private final String warehouseDir; + private final MinIOContainer minIOContainer; + + private final AtomicBoolean isStarted = new AtomicBoolean(false); + private final AutoCloseableCloser closer = AutoCloseableCloser.create(); + + public IcebergMinIODataLake(String bucketName, String warehouseDir) + { + this.bucketName = requireNonNull(bucketName, "bucketName is null"); + this.warehouseDir = requireNonNull(warehouseDir, "warehouseDir is null"); + Network network = closer.register(newNetwork()); + this.minIOContainer = closer.register( + MinIOContainer.builder() + .withNetwork(network) + .withEnvVars(ImmutableMap.builder() + .put("MINIO_ACCESS_KEY", ACCESS_KEY) + .put("MINIO_SECRET_KEY", SECRET_KEY) + .build()) + .build()); + } + + public void start() + { + if (isStarted()) { + return; + } + try { + this.minIOContainer.start(); + AmazonS3 s3Client = AmazonS3ClientBuilder + .standard() + .withEndpointConfiguration(new AwsClientBuilder.EndpointConfiguration( + "http://localhost:" + minIOContainer.getMinioApiEndpoint().getPort(), + "us-east-1")) + .withPathStyleAccessEnabled(true) + .withCredentials(new AWSStaticCredentialsProvider( + new BasicAWSCredentials(ACCESS_KEY, SECRET_KEY))) + .build(); + s3Client.createBucket(this.bucketName); + s3Client.putObject(this.bucketName, this.warehouseDir, ""); + } + finally { + isStarted.set(true); + } + } + + public boolean isStarted() + { + return isStarted.get(); + } + + public void stop() + { + if (!isStarted()) { + return; + } + try { + closer.close(); + } + catch (Exception e) { + throw new RuntimeException("Failed to stop HiveMinioDataLake", e); + } + finally { + isStarted.set(false); + } + } + + public MinIOContainer getMinio() + { + return minIOContainer; + } + + @Override + public void close() + throws IOException + { + stop(); + } +} diff --git a/presto-iceberg/src/test/java/com/facebook/presto/iceberg/hadoop/TestIcebergDistributedOnS3Hadoop.java b/presto-iceberg/src/test/java/com/facebook/presto/iceberg/hadoop/TestIcebergDistributedOnS3Hadoop.java new file mode 100644 index 0000000000000..c613260bebe4d --- /dev/null +++ b/presto-iceberg/src/test/java/com/facebook/presto/iceberg/hadoop/TestIcebergDistributedOnS3Hadoop.java @@ -0,0 +1,100 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.facebook.presto.iceberg.hadoop; + +import com.facebook.presto.hive.HdfsEnvironment; +import com.facebook.presto.hive.HiveClientConfig; +import com.facebook.presto.hive.MetastoreClientConfig; +import com.facebook.presto.hive.s3.HiveS3Config; +import com.facebook.presto.iceberg.IcebergDistributedTestBase; +import com.facebook.presto.iceberg.IcebergQueryRunner; +import com.facebook.presto.iceberg.container.IcebergMinIODataLake; +import com.facebook.presto.testing.QueryRunner; +import com.google.common.collect.ImmutableMap; +import com.google.common.net.HostAndPort; +import org.apache.hadoop.fs.Path; +import org.testng.annotations.AfterClass; +import org.testng.annotations.BeforeClass; + +import java.net.URI; + +import static com.facebook.presto.iceberg.CatalogType.HADOOP; +import static com.facebook.presto.iceberg.container.IcebergMinIODataLake.ACCESS_KEY; +import static com.facebook.presto.iceberg.container.IcebergMinIODataLake.SECRET_KEY; +import static com.facebook.presto.tests.sql.TestTable.randomTableSuffix; +import static java.lang.String.format; + +public class TestIcebergDistributedOnS3Hadoop + extends IcebergDistributedTestBase +{ + static final String WAREHOUSE_DIR = "warehouse/"; + String bucketName = "test-iceberg-s3-" + randomTableSuffix(); + private IcebergMinIODataLake dockerizedS3DataLake; + HostAndPort hostAndPort; + + public TestIcebergDistributedOnS3Hadoop() + { + super(HADOOP); + } + + protected QueryRunner createQueryRunner() + throws Exception + { + return IcebergQueryRunner.createIcebergQueryRunner(ImmutableMap.of(), HADOOP, + ImmutableMap.of( + "iceberg.catalog.warehouse", format("s3://%s/%s", bucketName, WAREHOUSE_DIR), + "hive.s3.use-instance-credentials", "false", + "hive.s3.aws-access-key", ACCESS_KEY, + "hive.s3.aws-secret-key", SECRET_KEY, + "hive.s3.endpoint", format("http://%s:%s", hostAndPort.getHost(), hostAndPort.getPort()), + "hive.s3.path-style-access", "true")); + } + + @BeforeClass + @Override + public void init() + throws Exception + { + this.dockerizedS3DataLake = new IcebergMinIODataLake(bucketName, WAREHOUSE_DIR); + this.dockerizedS3DataLake.start(); + hostAndPort = this.dockerizedS3DataLake.getMinio().getMinioApiEndpoint(); + super.init(); + } + + @AfterClass + public void tearDown() + { + if (dockerizedS3DataLake != null) { + dockerizedS3DataLake.stop(); + } + } + + protected Path getCatalogDirectory() + { + return new Path(URI.create(format("s3://%s/%s", bucketName, WAREHOUSE_DIR))); + } + + protected HdfsEnvironment getHdfsEnvironment() + { + HiveClientConfig hiveClientConfig = new HiveClientConfig(); + MetastoreClientConfig metastoreClientConfig = new MetastoreClientConfig(); + HiveS3Config hiveS3Config = new HiveS3Config() + .setS3AwsAccessKey(ACCESS_KEY) + .setS3AwsSecretKey(SECRET_KEY) + .setS3PathStyleAccess(true) + .setS3Endpoint(format("http://%s:%s", hostAndPort.getHost(), hostAndPort.getPort())) + .setS3UseInstanceCredentials(false); + return getHdfsEnvironment(hiveClientConfig, metastoreClientConfig, hiveS3Config); + } +} diff --git a/presto-iceberg/src/test/java/com/facebook/presto/iceberg/hive/TestIcebergDistributedHive.java b/presto-iceberg/src/test/java/com/facebook/presto/iceberg/hive/TestIcebergDistributedHive.java index 3954524b959b2..fdb1f7b487152 100644 --- a/presto-iceberg/src/test/java/com/facebook/presto/iceberg/hive/TestIcebergDistributedHive.java +++ b/presto-iceberg/src/test/java/com/facebook/presto/iceberg/hive/TestIcebergDistributedHive.java @@ -83,7 +83,7 @@ protected Table loadTable(String tableName) protected ExtendedHiveMetastore getFileHiveMetastore() { FileHiveMetastore fileHiveMetastore = new FileHiveMetastore(getHdfsEnvironment(), - getCatalogDirectory().getPath(), + getCatalogDirectory().toString(), "test"); return memoizeMetastore(fileHiveMetastore, false, 1000, 0); } diff --git a/presto-iceberg/src/test/java/com/facebook/presto/iceberg/nessie/TestIcebergDistributedNessie.java b/presto-iceberg/src/test/java/com/facebook/presto/iceberg/nessie/TestIcebergDistributedNessie.java index d9a5884ad52c2..c72612e799fa7 100644 --- a/presto-iceberg/src/test/java/com/facebook/presto/iceberg/nessie/TestIcebergDistributedNessie.java +++ b/presto-iceberg/src/test/java/com/facebook/presto/iceberg/nessie/TestIcebergDistributedNessie.java @@ -18,11 +18,11 @@ import com.facebook.presto.testing.QueryRunner; import com.facebook.presto.testing.containers.NessieContainer; import com.google.common.collect.ImmutableMap; +import org.apache.hadoop.fs.Path; import org.testng.annotations.AfterClass; import org.testng.annotations.BeforeClass; import org.testng.annotations.Test; -import java.io.File; import java.util.Map; import static com.facebook.presto.iceberg.CatalogType.NESSIE; @@ -43,7 +43,7 @@ protected TestIcebergDistributedNessie() @Override protected Map getProperties() { - File metastoreDir = getCatalogDirectory(); + Path metastoreDir = getCatalogDirectory(); return ImmutableMap.of("warehouse", metastoreDir.toString(), "uri", nessieContainer.getRestApiUri()); } diff --git a/presto-iceberg/src/test/java/com/facebook/presto/iceberg/rest/IcebergRestTestUtil.java b/presto-iceberg/src/test/java/com/facebook/presto/iceberg/rest/IcebergRestTestUtil.java index 4baf4560b28c6..193fc0a22975d 100644 --- a/presto-iceberg/src/test/java/com/facebook/presto/iceberg/rest/IcebergRestTestUtil.java +++ b/presto-iceberg/src/test/java/com/facebook/presto/iceberg/rest/IcebergRestTestUtil.java @@ -20,6 +20,9 @@ import com.facebook.airlift.node.NodeInfo; import com.facebook.presto.hive.HdfsContext; import com.facebook.presto.hive.HdfsEnvironment; +import com.facebook.presto.hive.HiveClientConfig; +import com.facebook.presto.hive.MetastoreClientConfig; +import com.facebook.presto.hive.s3.HiveS3Config; import com.facebook.presto.spi.ConnectorSession; import com.facebook.presto.testing.TestingConnectorSession; import com.google.common.collect.ImmutableList; @@ -62,7 +65,7 @@ public static Map restConnectorProperties(String serverUri) public static TestingHttpServer getRestServer(String location) { JdbcCatalog backingCatalog = new JdbcCatalog(); - HdfsEnvironment hdfsEnvironment = getHdfsEnvironment(); + HdfsEnvironment hdfsEnvironment = getHdfsEnvironment(new HiveClientConfig(), new MetastoreClientConfig(), new HiveS3Config()); backingCatalog.setConf(hdfsEnvironment.getConfiguration(new HdfsContext(SESSION), new Path(location))); Map properties = ImmutableMap.builder()