From cb449d2964130f030ae45a73dce8b558b6189ae6 Mon Sep 17 00:00:00 2001 From: pratyakshsharma Date: Thu, 8 Feb 2024 13:01:57 +0530 Subject: [PATCH] Add support for iceberg concurrent insertions Introduce new iceberg table property `commit_retries` to handle concurrent insertions across catalogs. --- .../src/main/sphinx/connector/iceberg.rst | 4 ++ .../presto/iceberg/IcebergHiveMetadata.java | 19 +------ .../iceberg/IcebergHiveMetadataFactory.java | 2 - .../presto/iceberg/IcebergNativeMetadata.java | 17 +----- .../iceberg/IcebergTableProperties.java | 13 +++++ .../facebook/presto/iceberg/IcebergUtil.java | 28 +++++++++- .../iceberg/TestIcebergSystemTables.java | 8 ++- .../iceberg/hive/TestIcebergSmokeHive.java | 56 ------------------- .../nessie/TestIcebergSystemTablesNessie.java | 8 ++- 9 files changed, 56 insertions(+), 99 deletions(-) diff --git a/presto-docs/src/main/sphinx/connector/iceberg.rst b/presto-docs/src/main/sphinx/connector/iceberg.rst index 9de464f3e90a8..60b25e5e301fb 100644 --- a/presto-docs/src/main/sphinx/connector/iceberg.rst +++ b/presto-docs/src/main/sphinx/connector/iceberg.rst @@ -254,6 +254,10 @@ Property Name Description ``format_version`` Optionally specifies the format version of the Iceberg specification to use for new tables, either ``1`` or ``2``. Defaults to ``1``. + +``commit_retries`` Determines the number of attempts for committing the metadata + in case of concurrent upsert requests, before failing. The + default value is 4. ========================================= =============================================================== The table definition below specifies format ``ORC``, partitioning by columns ``c1`` and ``c2``, diff --git a/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergHiveMetadata.java b/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergHiveMetadata.java index 54dca640b3441..f189937825767 100644 --- a/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergHiveMetadata.java +++ b/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergHiveMetadata.java @@ -99,7 +99,6 @@ import static com.facebook.presto.hive.metastore.HivePrivilegeInfo.HivePrivilege.INSERT; import static com.facebook.presto.hive.metastore.HivePrivilegeInfo.HivePrivilege.SELECT; import static com.facebook.presto.hive.metastore.HivePrivilegeInfo.HivePrivilege.UPDATE; -import static com.facebook.presto.hive.metastore.MetastoreUtil.TABLE_COMMENT; import static com.facebook.presto.hive.metastore.MetastoreUtil.buildInitialPrivilegeSet; import static com.facebook.presto.hive.metastore.MetastoreUtil.checkIfNullView; import static com.facebook.presto.hive.metastore.MetastoreUtil.createTableObjectForViewCreation; @@ -113,7 +112,6 @@ import static com.facebook.presto.iceberg.IcebergSchemaProperties.getSchemaLocation; import static com.facebook.presto.iceberg.IcebergSessionProperties.getHiveStatisticsMergeStrategy; import static com.facebook.presto.iceberg.IcebergTableProperties.getFileFormat; -import static com.facebook.presto.iceberg.IcebergTableProperties.getFormatVersion; import static com.facebook.presto.iceberg.IcebergTableProperties.getPartitioning; import static com.facebook.presto.iceberg.IcebergTableProperties.getTableLocation; import static com.facebook.presto.iceberg.IcebergTableType.DATA; @@ -121,6 +119,7 @@ import static com.facebook.presto.iceberg.IcebergUtil.getColumns; import static com.facebook.presto.iceberg.IcebergUtil.getHiveIcebergTable; import static com.facebook.presto.iceberg.IcebergUtil.isIcebergTable; +import static com.facebook.presto.iceberg.IcebergUtil.populateTableProperties; import static com.facebook.presto.iceberg.IcebergUtil.toHiveColumns; import static com.facebook.presto.iceberg.IcebergUtil.tryGetProperties; import static com.facebook.presto.iceberg.PartitionFields.parsePartitionFields; @@ -137,8 +136,6 @@ import static java.util.Locale.ENGLISH; import static java.util.Objects.requireNonNull; import static org.apache.iceberg.TableMetadata.newTableMetadata; -import static org.apache.iceberg.TableProperties.DEFAULT_FILE_FORMAT; -import static org.apache.iceberg.TableProperties.FORMAT_VERSION; import static org.apache.iceberg.TableProperties.OBJECT_STORE_PATH; import static org.apache.iceberg.TableProperties.WRITE_DATA_LOCATION; import static org.apache.iceberg.TableProperties.WRITE_METADATA_LOCATION; @@ -301,20 +298,8 @@ public ConnectorOutputTableHandle beginCreateTable(ConnectorSession session, Con throw new TableAlreadyExistsException(schemaTableName); } - ImmutableMap.Builder propertiesBuilder = ImmutableMap.builderWithExpectedSize(3); FileFormat fileFormat = getFileFormat(tableMetadata.getProperties()); - propertiesBuilder.put(DEFAULT_FILE_FORMAT, fileFormat.toString()); - if (tableMetadata.getComment().isPresent()) { - propertiesBuilder.put(TABLE_COMMENT, tableMetadata.getComment().get()); - } - - String formatVersion = getFormatVersion(tableMetadata.getProperties()); - if (formatVersion != null) { - propertiesBuilder.put(FORMAT_VERSION, formatVersion); - } - - TableMetadata metadata = newTableMetadata(schema, partitionSpec, targetPath, propertiesBuilder.build()); - + TableMetadata metadata = newTableMetadata(schema, partitionSpec, targetPath, populateTableProperties(tableMetadata, fileFormat)); transaction = createTableTransaction(tableName, operations, metadata); return new IcebergWritableTableHandle( diff --git a/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergHiveMetadataFactory.java b/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergHiveMetadataFactory.java index 4ef5affb4b238..d69e9447772fd 100644 --- a/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergHiveMetadataFactory.java +++ b/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergHiveMetadataFactory.java @@ -41,7 +41,6 @@ public class IcebergHiveMetadataFactory @Inject public IcebergHiveMetadataFactory( - IcebergConfig config, ExtendedHiveMetastore metastore, HdfsEnvironment hdfsEnvironment, TypeManager typeManager, @@ -59,7 +58,6 @@ public IcebergHiveMetadataFactory( this.commitTaskCodec = requireNonNull(commitTaskCodec, "commitTaskCodec is null"); this.nodeVersion = requireNonNull(nodeVersion, "nodeVersion is null"); this.filterStatsCalculatorService = requireNonNull(filterStatsCalculatorService, "filterStatsCalculatorService is null"); - requireNonNull(config, "config is null"); } public ConnectorMetadata create() diff --git a/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergNativeMetadata.java b/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergNativeMetadata.java index 7ea91c3884778..ed39adbe34fdc 100644 --- a/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergNativeMetadata.java +++ b/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergNativeMetadata.java @@ -28,7 +28,6 @@ import com.facebook.presto.spi.SchemaTableName; import com.facebook.presto.spi.function.StandardFunctionResolution; import com.facebook.presto.spi.relation.RowExpressionService; -import com.google.common.collect.ImmutableMap; import org.apache.hadoop.fs.Path; import org.apache.iceberg.PartitionSpec; import org.apache.iceberg.PartitionSpecParser; @@ -46,11 +45,11 @@ import java.util.Optional; import static com.facebook.presto.iceberg.IcebergTableProperties.getFileFormat; -import static com.facebook.presto.iceberg.IcebergTableProperties.getFormatVersion; import static com.facebook.presto.iceberg.IcebergTableProperties.getPartitioning; import static com.facebook.presto.iceberg.IcebergTableType.DATA; import static com.facebook.presto.iceberg.IcebergUtil.getColumns; import static com.facebook.presto.iceberg.IcebergUtil.getNativeIcebergTable; +import static com.facebook.presto.iceberg.IcebergUtil.populateTableProperties; import static com.facebook.presto.iceberg.PartitionFields.parsePartitionFields; import static com.facebook.presto.iceberg.util.IcebergPrestoModelConverters.toIcebergNamespace; import static com.facebook.presto.iceberg.util.IcebergPrestoModelConverters.toIcebergTableIdentifier; @@ -61,8 +60,6 @@ import static java.util.Objects.requireNonNull; import static java.util.stream.Collectors.toList; import static java.util.stream.Collectors.toMap; -import static org.apache.iceberg.TableProperties.DEFAULT_FILE_FORMAT; -import static org.apache.iceberg.TableProperties.FORMAT_VERSION; public class IcebergNativeMetadata extends IcebergAbstractMetadata @@ -168,21 +165,11 @@ public ConnectorOutputTableHandle beginCreateTable(ConnectorSession session, Con Schema schema = toIcebergSchema(tableMetadata.getColumns()); PartitionSpec partitionSpec = parsePartitionFields(schema, getPartitioning(tableMetadata.getProperties())); - - ImmutableMap.Builder propertiesBuilder = ImmutableMap.builder(); FileFormat fileFormat = getFileFormat(tableMetadata.getProperties()); - propertiesBuilder.put(DEFAULT_FILE_FORMAT, fileFormat.toString()); - if (tableMetadata.getComment().isPresent()) { - propertiesBuilder.put(TABLE_COMMENT, tableMetadata.getComment().get()); - } - String formatVersion = getFormatVersion(tableMetadata.getProperties()); - if (formatVersion != null) { - propertiesBuilder.put(FORMAT_VERSION, formatVersion); - } try { transaction = resourceFactory.getCatalog(session).newCreateTableTransaction( - toIcebergTableIdentifier(schemaTableName), schema, partitionSpec, propertiesBuilder.build()); + toIcebergTableIdentifier(schemaTableName), schema, partitionSpec, populateTableProperties(tableMetadata, fileFormat)); } catch (AlreadyExistsException e) { throw new TableAlreadyExistsException(schemaTableName); diff --git a/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergTableProperties.java b/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergTableProperties.java index 89ee5915b258e..295283648a477 100644 --- a/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergTableProperties.java +++ b/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergTableProperties.java @@ -16,6 +16,7 @@ import com.facebook.presto.common.type.ArrayType; import com.facebook.presto.spi.session.PropertyMetadata; import com.google.common.collect.ImmutableList; +import org.apache.iceberg.TableProperties; import javax.inject.Inject; @@ -25,6 +26,7 @@ import static com.facebook.presto.common.type.VarcharType.VARCHAR; import static com.facebook.presto.common.type.VarcharType.createUnboundedVarcharType; +import static com.facebook.presto.spi.session.PropertyMetadata.integerProperty; import static com.facebook.presto.spi.session.PropertyMetadata.stringProperty; import static com.google.common.collect.ImmutableList.toImmutableList; import static java.util.Locale.ENGLISH; @@ -35,6 +37,7 @@ public class IcebergTableProperties public static final String PARTITIONING_PROPERTY = "partitioning"; public static final String LOCATION_PROPERTY = "location"; public static final String FORMAT_VERSION = "format_version"; + public static final String COMMIT_RETRIES = "commit_retries"; private final List> tableProperties; private final List> columnProperties; @@ -73,6 +76,11 @@ public IcebergTableProperties(IcebergConfig icebergConfig) "Format version for the table", null, false)) + .add(integerProperty( + COMMIT_RETRIES, + "Determines the number of attempts in case of concurrent upserts and deletes", + TableProperties.COMMIT_NUM_RETRIES_DEFAULT, + false)) .build(); columnProperties = ImmutableList.of(stringProperty( @@ -113,4 +121,9 @@ public static String getFormatVersion(Map tableProperties) { return (String) tableProperties.get(FORMAT_VERSION); } + + public static Integer getCommitRetries(Map tableProperties) + { + return (Integer) tableProperties.get(COMMIT_RETRIES); + } } diff --git a/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergUtil.java b/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergUtil.java index 712b1a30f0a31..634f823e1c4fc 100644 --- a/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergUtil.java +++ b/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergUtil.java @@ -35,6 +35,7 @@ import com.facebook.presto.spi.ColumnHandle; import com.facebook.presto.spi.ConnectorSession; import com.facebook.presto.spi.ConnectorTableHandle; +import com.facebook.presto.spi.ConnectorTableMetadata; import com.facebook.presto.spi.Constraint; import com.facebook.presto.spi.PrestoException; import com.facebook.presto.spi.SchemaTableName; @@ -135,6 +136,8 @@ import static com.facebook.presto.iceberg.IcebergErrorCode.ICEBERG_INVALID_SNAPSHOT_ID; import static com.facebook.presto.iceberg.IcebergErrorCode.ICEBERG_INVALID_TABLE_TIMESTAMP; import static com.facebook.presto.iceberg.IcebergSessionProperties.isMergeOnReadModeEnabled; +import static com.facebook.presto.iceberg.IcebergTableProperties.getCommitRetries; +import static com.facebook.presto.iceberg.IcebergTableProperties.getFormatVersion; import static com.facebook.presto.iceberg.TypeConverter.toIcebergType; import static com.facebook.presto.iceberg.TypeConverter.toPrestoType; import static com.facebook.presto.iceberg.util.IcebergPrestoModelConverters.toIcebergTableIdentifier; @@ -164,9 +167,11 @@ import static org.apache.iceberg.CatalogProperties.IO_MANIFEST_CACHE_MAX_TOTAL_BYTES; import static org.apache.iceberg.LocationProviders.locationsFor; import static org.apache.iceberg.MetadataTableUtils.createMetadataTableInstance; +import static org.apache.iceberg.TableProperties.COMMIT_NUM_RETRIES; import static org.apache.iceberg.TableProperties.DEFAULT_FILE_FORMAT; import static org.apache.iceberg.TableProperties.DEFAULT_FILE_FORMAT_DEFAULT; import static org.apache.iceberg.TableProperties.DELETE_MODE; +import static org.apache.iceberg.TableProperties.FORMAT_VERSION; import static org.apache.iceberg.TableProperties.MERGE_MODE; import static org.apache.iceberg.TableProperties.UPDATE_MODE; import static org.apache.iceberg.TableProperties.WRITE_LOCATION_PROVIDER_IMPL; @@ -820,9 +825,9 @@ private static class DeleteFilesIterator private DeleteFile currentFile; private DeleteFilesIterator(Map partitionSpecsById, - CloseableIterator fileTasks, - Optional> requestedPartitionSpec, - Optional> requestedSchema) + CloseableIterator fileTasks, + Optional> requestedPartitionSpec, + Optional> requestedSchema) { this.partitionSpecsById = partitionSpecsById; this.fileTasks = fileTasks; @@ -884,4 +889,21 @@ public void close() fileTasks = CloseableIterator.empty(); } } + + public static Map populateTableProperties(ConnectorTableMetadata tableMetadata, FileFormat fileFormat) + { + ImmutableMap.Builder propertiesBuilder = ImmutableMap.builderWithExpectedSize(4); + Integer commitRetries = getCommitRetries(tableMetadata.getProperties()); + propertiesBuilder.put(DEFAULT_FILE_FORMAT, fileFormat.toString()); + propertiesBuilder.put(COMMIT_NUM_RETRIES, String.valueOf(commitRetries)); + if (tableMetadata.getComment().isPresent()) { + propertiesBuilder.put(TABLE_COMMENT, tableMetadata.getComment().get()); + } + + String formatVersion = getFormatVersion(tableMetadata.getProperties()); + if (formatVersion != null) { + propertiesBuilder.put(FORMAT_VERSION, formatVersion); + } + return propertiesBuilder.build(); + } } diff --git a/presto-iceberg/src/test/java/com/facebook/presto/iceberg/TestIcebergSystemTables.java b/presto-iceberg/src/test/java/com/facebook/presto/iceberg/TestIcebergSystemTables.java index b014730a0828d..3a826ec81de7e 100644 --- a/presto-iceberg/src/test/java/com/facebook/presto/iceberg/TestIcebergSystemTables.java +++ b/presto-iceberg/src/test/java/com/facebook/presto/iceberg/TestIcebergSystemTables.java @@ -191,16 +191,18 @@ public void testPropertiesTable() { assertQuery("SHOW COLUMNS FROM test_schema.\"test_table$properties\"", "VALUES ('key', 'varchar', '', '')," + "('value', 'varchar', '', '')"); - assertQuery("SELECT COUNT(*) FROM test_schema.\"test_table$properties\"", "VALUES 2"); + assertQuery("SELECT COUNT(*) FROM test_schema.\"test_table$properties\"", "VALUES 3"); List materializedRows = computeActual(getSession(), "SELECT * FROM test_schema.\"test_table$properties\"").getMaterializedRows(); - assertThat(materializedRows).hasSize(2); + assertThat(materializedRows).hasSize(3); assertThat(materializedRows) .anySatisfy(row -> assertThat(row) .isEqualTo(new MaterializedRow(MaterializedResult.DEFAULT_PRECISION, "write.format.default", "PARQUET"))) .anySatisfy(row -> assertThat(row) - .isEqualTo(new MaterializedRow(MaterializedResult.DEFAULT_PRECISION, "write.parquet.compression-codec", "zstd"))); + .isEqualTo(new MaterializedRow(MaterializedResult.DEFAULT_PRECISION, "write.parquet.compression-codec", "zstd"))) + .anySatisfy(row -> assertThat(row) + .isEqualTo(new MaterializedRow(MaterializedResult.DEFAULT_PRECISION, "commit.retry.num-retries", "4"))); } @Test diff --git a/presto-iceberg/src/test/java/com/facebook/presto/iceberg/hive/TestIcebergSmokeHive.java b/presto-iceberg/src/test/java/com/facebook/presto/iceberg/hive/TestIcebergSmokeHive.java index c6582f992df74..f614896d686a2 100644 --- a/presto-iceberg/src/test/java/com/facebook/presto/iceberg/hive/TestIcebergSmokeHive.java +++ b/presto-iceberg/src/test/java/com/facebook/presto/iceberg/hive/TestIcebergSmokeHive.java @@ -13,7 +13,6 @@ */ package com.facebook.presto.iceberg.hive; -import com.facebook.presto.Session; import com.facebook.presto.hive.HdfsConfiguration; import com.facebook.presto.hive.HdfsConfigurationInitializer; import com.facebook.presto.hive.HdfsEnvironment; @@ -30,22 +29,12 @@ import com.facebook.presto.tests.DistributedQueryRunner; import com.google.common.collect.ImmutableSet; import org.apache.iceberg.Table; -import org.testng.annotations.Test; import java.io.File; -import java.util.List; -import java.util.Set; -import java.util.concurrent.CopyOnWriteArraySet; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicInteger; -import java.util.stream.Collectors; -import java.util.stream.Stream; import static com.facebook.presto.hive.metastore.CachingHiveMetastore.memoizeMetastore; import static com.facebook.presto.iceberg.CatalogType.HIVE; import static java.lang.String.format; -import static org.testng.Assert.fail; public class TestIcebergSmokeHive extends IcebergDistributedSmokeTestBase @@ -55,51 +44,6 @@ public TestIcebergSmokeHive() super(HIVE); } - @Test - public void testConcurrentInsert() - { - final Session session = getSession(); - assertUpdate(session, "CREATE TABLE test_concurrent_insert (col0 INTEGER, col1 VARCHAR) WITH (format = 'ORC')"); - - int concurrency = 5; - final String[] strings = {"one", "two", "three", "four", "five"}; - final CountDownLatch countDownLatch = new CountDownLatch(concurrency); - AtomicInteger value = new AtomicInteger(0); - Set errors = new CopyOnWriteArraySet<>(); - List threads = Stream.generate(() -> new Thread(() -> { - int i = value.getAndIncrement(); - try { - getQueryRunner().execute(session, format("INSERT INTO test_concurrent_insert VALUES(%s, '%s')", i + 1, strings[i])); - } - catch (Throwable throwable) { - errors.add(throwable); - } - finally { - countDownLatch.countDown(); - } - })).limit(concurrency).collect(Collectors.toList()); - - threads.forEach(Thread::start); - - try { - final int seconds = 10; - if (!countDownLatch.await(seconds, TimeUnit.SECONDS)) { - fail(format("Failed to insert in %s seconds", seconds)); - } - if (!errors.isEmpty()) { - fail(format("Failed to insert concurrently: %s", errors.stream().map(Throwable::getMessage).collect(Collectors.joining(" & ")))); - } - assertQuery(session, "SELECT count(*) FROM test_concurrent_insert", "SELECT " + concurrency); - assertQuery(session, "SELECT * FROM test_concurrent_insert", "VALUES(1, 'one'), (2, 'two'), (3, 'three'), (4, 'four'), (5, 'five')"); - } - catch (InterruptedException e) { - fail("Interrupted when await insertion", e); - } - finally { - dropTable(session, "test_concurrent_insert"); - } - } - @Override protected String getLocation(String schema, String table) { diff --git a/presto-iceberg/src/test/java/com/facebook/presto/iceberg/nessie/TestIcebergSystemTablesNessie.java b/presto-iceberg/src/test/java/com/facebook/presto/iceberg/nessie/TestIcebergSystemTablesNessie.java index 20fc32f72838c..109ece3470ffa 100644 --- a/presto-iceberg/src/test/java/com/facebook/presto/iceberg/nessie/TestIcebergSystemTablesNessie.java +++ b/presto-iceberg/src/test/java/com/facebook/presto/iceberg/nessie/TestIcebergSystemTablesNessie.java @@ -88,12 +88,12 @@ public void testPropertiesTable() { assertQuery("SHOW COLUMNS FROM test_schema.\"test_table$properties\"", "VALUES ('key', 'varchar', '', '')," + "('value', 'varchar', '', '')"); - assertQuery("SELECT COUNT(*) FROM test_schema.\"test_table$properties\"", "VALUES 5"); + assertQuery("SELECT COUNT(*) FROM test_schema.\"test_table$properties\"", "VALUES 6"); List materializedRows = computeActual(getSession(), "SELECT * FROM test_schema.\"test_table$properties\"").getMaterializedRows(); // nessie writes a "nessie.commit.id" + "gc.enabled=false" to the table properties - assertThat(materializedRows).hasSize(5); + assertThat(materializedRows).hasSize(6); assertThat(materializedRows) .anySatisfy(row -> assertThat(row) .isEqualTo(new MaterializedRow(MaterializedResult.DEFAULT_PRECISION, "write.format.default", "PARQUET"))) @@ -102,6 +102,8 @@ public void testPropertiesTable() .anySatisfy(row -> assertThat(row) .isEqualTo(new MaterializedRow(MaterializedResult.DEFAULT_PRECISION, "write.parquet.compression-codec", "zstd"))) .anySatisfy(row -> assertThat(row) - .isEqualTo(new MaterializedRow(MaterializedResult.DEFAULT_PRECISION, "write.metadata.delete-after-commit.enabled", "false"))); + .isEqualTo(new MaterializedRow(MaterializedResult.DEFAULT_PRECISION, "write.metadata.delete-after-commit.enabled", "false"))) + .anySatisfy(row -> assertThat(row) + .isEqualTo(new MaterializedRow(MaterializedResult.DEFAULT_PRECISION, "commit.retry.num-retries", "4"))); } }