Skip to content

Commit

Permalink
[Iceberg]Support setting warehouse data directory for Hadoop catalog
Browse files Browse the repository at this point in the history
  • Loading branch information
hantangwangd committed Jan 27, 2025
1 parent bce0716 commit 5fd6b9c
Show file tree
Hide file tree
Showing 23 changed files with 1,015 additions and 109 deletions.
23 changes: 23 additions & 0 deletions presto-docs/src/main/sphinx/connector/iceberg.rst
Original file line number Diff line number Diff line change
Expand Up @@ -260,11 +260,34 @@ Property Name Description
This property is required if the ``iceberg.catalog.type`` is
``hadoop``.

``iceberg.catalog.warehouse.datadir`` The catalog warehouse root data path for Iceberg tables.
Example: ``s3://iceberg_bucket/warehouse``.
If set, all tables in this Hadoop catalog will default to
saving their data and delete files in the specified root
data directory.

``iceberg.catalog.cached-catalog-num`` The number of Iceberg catalogs to cache. This property is ``10``
required if the ``iceberg.catalog.type`` is ``hadoop``.
Otherwise, it will be ignored.
======================================================= ============================================================= ============

Configure the `Amazon S3 <https://prestodb.io/docs/current/connector/hive.html#amazon-s3-configuration>`_
properties to specify a S3 location as the warehouse data directory for the Hadoop catalog. This way,
the data and delete files of Iceberg tables are stored in S3. An example configuration includes:

.. code-block:: none
connector.name=iceberg
iceberg.catalog.type=hadoop
iceberg.catalog.warehouse=hdfs://nn:8020/warehouse/path
iceberg.catalog.warehouse.datadir=s3://iceberg_bucket/warehouse
hive.s3.use-instance-credentials=false
hive.s3.aws-access-key=accesskey
hive.s3.aws-secret-key=secretkey
hive.s3.endpoint=http://192.168.0.103:9878
hive.s3.path-style-access=true
Configuration Properties
------------------------

Expand Down
22 changes: 22 additions & 0 deletions presto-iceberg/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -263,6 +263,28 @@
</exclusions>
</dependency>

<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>testcontainers</artifactId>
<scope>test</scope>
<exclusions>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
</exclusion>
</exclusions>
</dependency>

<dependency>
<groupId>com.amazonaws</groupId>
<artifactId>aws-java-sdk-core</artifactId>
</dependency>

<dependency>
<groupId>com.amazonaws</groupId>
<artifactId>aws-java-sdk-s3</artifactId>
</dependency>

<dependency>
<groupId>org.apache.iceberg</groupId>
<artifactId>iceberg-core</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,7 @@
import static com.facebook.presto.hive.MetadataUtils.getDiscretePredicates;
import static com.facebook.presto.hive.MetadataUtils.getPredicate;
import static com.facebook.presto.hive.MetadataUtils.getSubfieldPredicate;
import static com.facebook.presto.hive.metastore.MetastoreUtil.TABLE_COMMENT;
import static com.facebook.presto.iceberg.ExpressionConverter.toIcebergExpression;
import static com.facebook.presto.iceberg.IcebergColumnHandle.DATA_SEQUENCE_NUMBER_COLUMN_HANDLE;
import static com.facebook.presto.iceberg.IcebergColumnHandle.DATA_SEQUENCE_NUMBER_COLUMN_METADATA;
Expand All @@ -136,6 +137,8 @@
import static com.facebook.presto.iceberg.IcebergTableProperties.METRICS_MAX_INFERRED_COLUMN;
import static com.facebook.presto.iceberg.IcebergTableProperties.PARTITIONING_PROPERTY;
import static com.facebook.presto.iceberg.IcebergTableProperties.WRITE_DATA_LOCATION_PROPERTY;
import static com.facebook.presto.iceberg.IcebergTableProperties.getCommitRetries;
import static com.facebook.presto.iceberg.IcebergTableProperties.getFormatVersion;
import static com.facebook.presto.iceberg.IcebergTableType.CHANGELOG;
import static com.facebook.presto.iceberg.IcebergTableType.DATA;
import static com.facebook.presto.iceberg.IcebergTableType.EQUALITY_DELETES;
Expand All @@ -150,6 +153,7 @@
import static com.facebook.presto.iceberg.IcebergUtil.getSnapshotIdTimeOperator;
import static com.facebook.presto.iceberg.IcebergUtil.getTableComment;
import static com.facebook.presto.iceberg.IcebergUtil.getViewComment;
import static com.facebook.presto.iceberg.IcebergUtil.parseFormatVersion;
import static com.facebook.presto.iceberg.IcebergUtil.resolveSnapshotIdByName;
import static com.facebook.presto.iceberg.IcebergUtil.toHiveColumns;
import static com.facebook.presto.iceberg.IcebergUtil.tryGetProperties;
Expand Down Expand Up @@ -181,6 +185,12 @@
import static org.apache.iceberg.SnapshotSummary.DELETED_RECORDS_PROP;
import static org.apache.iceberg.SnapshotSummary.REMOVED_EQ_DELETES_PROP;
import static org.apache.iceberg.SnapshotSummary.REMOVED_POS_DELETES_PROP;
import static org.apache.iceberg.TableProperties.COMMIT_NUM_RETRIES;
import static org.apache.iceberg.TableProperties.DEFAULT_FILE_FORMAT;
import static org.apache.iceberg.TableProperties.METADATA_DELETE_AFTER_COMMIT_ENABLED;
import static org.apache.iceberg.TableProperties.METRICS_MAX_INFERRED_COLUMN_DEFAULTS;
import static org.apache.iceberg.TableProperties.ORC_COMPRESSION;
import static org.apache.iceberg.TableProperties.PARQUET_COMPRESSION;
import static org.apache.iceberg.TableProperties.WRITE_DATA_LOCATION;

public abstract class IcebergAbstractMetadata
Expand Down Expand Up @@ -1030,6 +1040,62 @@ public void setTableProperties(ConnectorSession session, ConnectorTableHandle ta
transaction.commitTransaction();
}

protected Map<String, String> populateTableProperties(ConnectorTableMetadata tableMetadata, com.facebook.presto.iceberg.FileFormat fileFormat, ConnectorSession session, CatalogType catalogType)
{
ImmutableMap.Builder<String, String> propertiesBuilder = ImmutableMap.builderWithExpectedSize(5);

String writeDataLocation = IcebergTableProperties.getWriteDataLocation(tableMetadata.getProperties());
if (!Strings.isNullOrEmpty(writeDataLocation)) {
if (catalogType.equals(CatalogType.HADOOP)) {
propertiesBuilder.put(WRITE_DATA_LOCATION, writeDataLocation);
}
else {
throw new PrestoException(NOT_SUPPORTED, "Not support set write_data_path on catalog: " + catalogType);
}
}
else {
Optional<String> dataLocation = getDataLocationBasedOnWarehouseDataDir(tableMetadata.getTable());
dataLocation.ifPresent(location -> propertiesBuilder.put(WRITE_DATA_LOCATION, location));
}

Integer commitRetries = getCommitRetries(tableMetadata.getProperties());
propertiesBuilder.put(DEFAULT_FILE_FORMAT, fileFormat.toString());
propertiesBuilder.put(COMMIT_NUM_RETRIES, String.valueOf(commitRetries));
switch (fileFormat) {
case PARQUET:
propertiesBuilder.put(PARQUET_COMPRESSION, getCompressionCodec(session).getParquetCompressionCodec().get().toString());
break;
case ORC:
propertiesBuilder.put(ORC_COMPRESSION, getCompressionCodec(session).getOrcCompressionKind().name());
break;
}
if (tableMetadata.getComment().isPresent()) {
propertiesBuilder.put(TABLE_COMMENT, tableMetadata.getComment().get());
}

String formatVersion = getFormatVersion(tableMetadata.getProperties());
verify(formatVersion != null, "Format version cannot be null");
propertiesBuilder.put(TableProperties.FORMAT_VERSION, formatVersion);

if (parseFormatVersion(formatVersion) < MIN_FORMAT_VERSION_FOR_DELETE) {
propertiesBuilder.put(TableProperties.DELETE_MODE, RowLevelOperationMode.COPY_ON_WRITE.modeName());
}
else {
RowLevelOperationMode deleteMode = IcebergTableProperties.getDeleteMode(tableMetadata.getProperties());
propertiesBuilder.put(TableProperties.DELETE_MODE, deleteMode.modeName());
}

Integer metadataPreviousVersionsMax = IcebergTableProperties.getMetadataPreviousVersionsMax(tableMetadata.getProperties());
propertiesBuilder.put(TableProperties.METADATA_PREVIOUS_VERSIONS_MAX, String.valueOf(metadataPreviousVersionsMax));

Boolean metadataDeleteAfterCommit = IcebergTableProperties.isMetadataDeleteAfterCommit(tableMetadata.getProperties());
propertiesBuilder.put(METADATA_DELETE_AFTER_COMMIT_ENABLED, String.valueOf(metadataDeleteAfterCommit));

Integer metricsMaxInferredColumn = IcebergTableProperties.getMetricsMaxInferredColumn(tableMetadata.getProperties());
propertiesBuilder.put(METRICS_MAX_INFERRED_COLUMN_DEFAULTS, String.valueOf(metricsMaxInferredColumn));
return propertiesBuilder.build();
}

/**
* Deletes all the files for a specific predicate
*
Expand Down Expand Up @@ -1101,4 +1167,9 @@ protected Optional<String> getWriteDataLocation(Table table)
return Optional.empty();
}
}

protected Optional<String> getDataLocationBasedOnWarehouseDataDir(SchemaTableName schemaTableName)
{
return Optional.empty();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ public class IcebergConfig
private HiveCompressionCodec compressionCodec = GZIP;
private CatalogType catalogType = HIVE;
private String catalogWarehouse;
private String catalogWarehouseDataDir;
private int catalogCacheSize = 10;
private int maxPartitionsPerWriter = 100;
private List<String> hadoopConfigResources = ImmutableList.of();
Expand Down Expand Up @@ -127,6 +128,19 @@ public IcebergConfig setCatalogWarehouse(String catalogWarehouse)
return this;
}

public String getCatalogWarehouseDataDir()
{
return catalogWarehouseDataDir;
}

@Config("iceberg.catalog.warehouse.datadir")
@ConfigDescription("Iceberg catalog default root data writing directory")
public IcebergConfig setCatalogWarehouseDataDir(String catalogWarehouseDataDir)
{
this.catalogWarehouseDataDir = catalogWarehouseDataDir;
return this;
}

@Min(1)
public int getCatalogCacheSize()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,6 @@
import static com.facebook.presto.iceberg.IcebergUtil.getColumns;
import static com.facebook.presto.iceberg.IcebergUtil.getHiveIcebergTable;
import static com.facebook.presto.iceberg.IcebergUtil.isIcebergTable;
import static com.facebook.presto.iceberg.IcebergUtil.populateTableProperties;
import static com.facebook.presto.iceberg.IcebergUtil.toHiveColumns;
import static com.facebook.presto.iceberg.IcebergUtil.tryGetProperties;
import static com.facebook.presto.iceberg.IcebergUtil.verifyTypeSupported;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ public class IcebergNativeCatalogFactory
private final String catalogName;
protected final CatalogType catalogType;
private final String catalogWarehouse;
private final String catalogWarehouseDataDir;
protected final IcebergConfig icebergConfig;

private final List<String> hadoopConfigResources;
Expand All @@ -69,6 +70,7 @@ public IcebergNativeCatalogFactory(
this.icebergConfig = requireNonNull(config, "config is null");
this.catalogType = config.getCatalogType();
this.catalogWarehouse = config.getCatalogWarehouse();
this.catalogWarehouseDataDir = config.getCatalogWarehouseDataDir();
this.hadoopConfigResources = icebergConfig.getHadoopConfigResources();
this.s3ConfigurationUpdater = requireNonNull(s3ConfigurationUpdater, "s3ConfigurationUpdater is null");
this.gcsConfigurationInitialize = requireNonNull(gcsConfigurationInitialize, "gcsConfigurationInitialize is null");
Expand All @@ -90,6 +92,11 @@ public Catalog getCatalog(ConnectorSession session)
}
}

public String getCatalogWarehouseDataDir()
{
return this.catalogWarehouseDataDir;
}

public SupportsNamespaces getNamespaces(ConnectorSession session)
{
Catalog catalog = getCatalog(session);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@
import java.util.concurrent.ConcurrentMap;
import java.util.stream.Stream;

import static com.facebook.presto.iceberg.CatalogType.HADOOP;
import static com.facebook.presto.iceberg.IcebergSessionProperties.getCompressionCodec;
import static com.facebook.presto.iceberg.IcebergTableProperties.getFileFormat;
import static com.facebook.presto.iceberg.IcebergTableProperties.getPartitioning;
Expand All @@ -66,7 +67,6 @@
import static com.facebook.presto.iceberg.IcebergUtil.getColumns;
import static com.facebook.presto.iceberg.IcebergUtil.getNativeIcebergTable;
import static com.facebook.presto.iceberg.IcebergUtil.getNativeIcebergView;
import static com.facebook.presto.iceberg.IcebergUtil.populateTableProperties;
import static com.facebook.presto.iceberg.IcebergUtil.verifyTypeSupported;
import static com.facebook.presto.iceberg.PartitionFields.parsePartitionFields;
import static com.facebook.presto.iceberg.PartitionSpecConverter.toPrestoPartitionSpec;
Expand All @@ -89,6 +89,7 @@ public class IcebergNativeMetadata
{
private static final String VIEW_DIALECT = "presto";

private final Optional<String> warehouseDataDir;
private final IcebergNativeCatalogFactory catalogFactory;
private final CatalogType catalogType;
private final ConcurrentMap<SchemaTableName, View> icebergViews = new ConcurrentHashMap<>();
Expand All @@ -107,6 +108,7 @@ public IcebergNativeMetadata(
super(typeManager, functionResolution, rowExpressionService, commitTaskCodec, nodeVersion, filterStatsCalculatorService, statisticsFileCache);
this.catalogFactory = requireNonNull(catalogFactory, "catalogFactory is null");
this.catalogType = requireNonNull(catalogType, "catalogType is null");
this.warehouseDataDir = Optional.ofNullable(catalogFactory.getCatalogWarehouseDataDir());
}

@Override
Expand Down Expand Up @@ -316,20 +318,21 @@ public ConnectorOutputTableHandle beginCreateTable(ConnectorSession session, Con
try {
TableIdentifier tableIdentifier = toIcebergTableIdentifier(schemaTableName, catalogFactory.isNestedNamespaceEnabled());
String targetPath = getTableLocation(tableMetadata.getProperties());
Map<String, String> tableProperties = populateTableProperties(tableMetadata, fileFormat, session, catalogType);
if (!isNullOrEmpty(targetPath)) {
transaction = catalogFactory.getCatalog(session).newCreateTableTransaction(
tableIdentifier,
schema,
partitionSpec,
targetPath,
populateTableProperties(tableMetadata, fileFormat, session, catalogType));
tableProperties);
}
else {
transaction = catalogFactory.getCatalog(session).newCreateTableTransaction(
tableIdentifier,
schema,
partitionSpec,
populateTableProperties(tableMetadata, fileFormat, session, catalogType));
tableProperties);
}
}
catch (AlreadyExistsException e) {
Expand Down Expand Up @@ -379,4 +382,13 @@ public void unregisterTable(ConnectorSession clientSession, SchemaTableName sche
{
catalogFactory.getCatalog(clientSession).dropTable(toIcebergTableIdentifier(schemaTableName, catalogFactory.isNestedNamespaceEnabled()), false);
}

protected Optional<String> getDataLocationBasedOnWarehouseDataDir(SchemaTableName schemaTableName)
{
if (!catalogType.equals(HADOOP)) {
return Optional.empty();
}
return Optional.ofNullable(warehouseDataDir.map(base -> base + schemaTableName.getSchemaName() + "/" + schemaTableName.getTableName())
.orElse(null));
}
}
Loading

0 comments on commit 5fd6b9c

Please sign in to comment.