Skip to content

Commit

Permalink
Add support for iceberg concurrent insertions
Browse files Browse the repository at this point in the history
Introduce new iceberg table property `commit_retries` to handle
concurrent insertions across catalogs.
  • Loading branch information
pratyakshsharma authored and yingsu00 committed Feb 14, 2024
1 parent c84d821 commit e579a3f
Show file tree
Hide file tree
Showing 9 changed files with 56 additions and 99 deletions.
4 changes: 4 additions & 0 deletions presto-docs/src/main/sphinx/connector/iceberg.rst
Original file line number Diff line number Diff line change
Expand Up @@ -254,6 +254,10 @@ Property Name Description
``format_version`` Optionally specifies the format version of the Iceberg
specification to use for new tables, either ``1`` or ``2``.
Defaults to ``1``.

``commit_retries`` Determines the number of attempts for committing the metadata
in case of concurrent upsert requests, before failing. The
default value is 4.
========================================= ===============================================================

The table definition below specifies format ``ORC``, partitioning by columns ``c1`` and ``c2``,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,6 @@
import static com.facebook.presto.hive.metastore.HivePrivilegeInfo.HivePrivilege.INSERT;
import static com.facebook.presto.hive.metastore.HivePrivilegeInfo.HivePrivilege.SELECT;
import static com.facebook.presto.hive.metastore.HivePrivilegeInfo.HivePrivilege.UPDATE;
import static com.facebook.presto.hive.metastore.MetastoreUtil.TABLE_COMMENT;
import static com.facebook.presto.hive.metastore.MetastoreUtil.buildInitialPrivilegeSet;
import static com.facebook.presto.hive.metastore.MetastoreUtil.checkIfNullView;
import static com.facebook.presto.hive.metastore.MetastoreUtil.createTableObjectForViewCreation;
Expand All @@ -113,14 +112,14 @@
import static com.facebook.presto.iceberg.IcebergSchemaProperties.getSchemaLocation;
import static com.facebook.presto.iceberg.IcebergSessionProperties.getHiveStatisticsMergeStrategy;
import static com.facebook.presto.iceberg.IcebergTableProperties.getFileFormat;
import static com.facebook.presto.iceberg.IcebergTableProperties.getFormatVersion;
import static com.facebook.presto.iceberg.IcebergTableProperties.getPartitioning;
import static com.facebook.presto.iceberg.IcebergTableProperties.getTableLocation;
import static com.facebook.presto.iceberg.IcebergTableType.DATA;
import static com.facebook.presto.iceberg.IcebergUtil.createIcebergViewProperties;
import static com.facebook.presto.iceberg.IcebergUtil.getColumns;
import static com.facebook.presto.iceberg.IcebergUtil.getHiveIcebergTable;
import static com.facebook.presto.iceberg.IcebergUtil.isIcebergTable;
import static com.facebook.presto.iceberg.IcebergUtil.populateTableProperties;
import static com.facebook.presto.iceberg.IcebergUtil.toHiveColumns;
import static com.facebook.presto.iceberg.IcebergUtil.tryGetProperties;
import static com.facebook.presto.iceberg.PartitionFields.parsePartitionFields;
Expand All @@ -137,8 +136,6 @@
import static java.util.Locale.ENGLISH;
import static java.util.Objects.requireNonNull;
import static org.apache.iceberg.TableMetadata.newTableMetadata;
import static org.apache.iceberg.TableProperties.DEFAULT_FILE_FORMAT;
import static org.apache.iceberg.TableProperties.FORMAT_VERSION;
import static org.apache.iceberg.TableProperties.OBJECT_STORE_PATH;
import static org.apache.iceberg.TableProperties.WRITE_DATA_LOCATION;
import static org.apache.iceberg.TableProperties.WRITE_METADATA_LOCATION;
Expand Down Expand Up @@ -301,20 +298,8 @@ public ConnectorOutputTableHandle beginCreateTable(ConnectorSession session, Con
throw new TableAlreadyExistsException(schemaTableName);
}

ImmutableMap.Builder<String, String> propertiesBuilder = ImmutableMap.builderWithExpectedSize(3);
FileFormat fileFormat = getFileFormat(tableMetadata.getProperties());
propertiesBuilder.put(DEFAULT_FILE_FORMAT, fileFormat.toString());
if (tableMetadata.getComment().isPresent()) {
propertiesBuilder.put(TABLE_COMMENT, tableMetadata.getComment().get());
}

String formatVersion = getFormatVersion(tableMetadata.getProperties());
if (formatVersion != null) {
propertiesBuilder.put(FORMAT_VERSION, formatVersion);
}

TableMetadata metadata = newTableMetadata(schema, partitionSpec, targetPath, propertiesBuilder.build());

TableMetadata metadata = newTableMetadata(schema, partitionSpec, targetPath, populateTableProperties(tableMetadata, fileFormat));
transaction = createTableTransaction(tableName, operations, metadata);

return new IcebergWritableTableHandle(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,6 @@ public class IcebergHiveMetadataFactory

@Inject
public IcebergHiveMetadataFactory(
IcebergConfig config,
ExtendedHiveMetastore metastore,
HdfsEnvironment hdfsEnvironment,
TypeManager typeManager,
Expand All @@ -59,7 +58,6 @@ public IcebergHiveMetadataFactory(
this.commitTaskCodec = requireNonNull(commitTaskCodec, "commitTaskCodec is null");
this.nodeVersion = requireNonNull(nodeVersion, "nodeVersion is null");
this.filterStatsCalculatorService = requireNonNull(filterStatsCalculatorService, "filterStatsCalculatorService is null");
requireNonNull(config, "config is null");
}

public ConnectorMetadata create()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@
import com.facebook.presto.spi.SchemaTableName;
import com.facebook.presto.spi.function.StandardFunctionResolution;
import com.facebook.presto.spi.relation.RowExpressionService;
import com.google.common.collect.ImmutableMap;
import org.apache.hadoop.fs.Path;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.PartitionSpecParser;
Expand All @@ -46,11 +45,11 @@
import java.util.Optional;

import static com.facebook.presto.iceberg.IcebergTableProperties.getFileFormat;
import static com.facebook.presto.iceberg.IcebergTableProperties.getFormatVersion;
import static com.facebook.presto.iceberg.IcebergTableProperties.getPartitioning;
import static com.facebook.presto.iceberg.IcebergTableType.DATA;
import static com.facebook.presto.iceberg.IcebergUtil.getColumns;
import static com.facebook.presto.iceberg.IcebergUtil.getNativeIcebergTable;
import static com.facebook.presto.iceberg.IcebergUtil.populateTableProperties;
import static com.facebook.presto.iceberg.PartitionFields.parsePartitionFields;
import static com.facebook.presto.iceberg.util.IcebergPrestoModelConverters.toIcebergNamespace;
import static com.facebook.presto.iceberg.util.IcebergPrestoModelConverters.toIcebergTableIdentifier;
Expand All @@ -61,8 +60,6 @@
import static java.util.Objects.requireNonNull;
import static java.util.stream.Collectors.toList;
import static java.util.stream.Collectors.toMap;
import static org.apache.iceberg.TableProperties.DEFAULT_FILE_FORMAT;
import static org.apache.iceberg.TableProperties.FORMAT_VERSION;

public class IcebergNativeMetadata
extends IcebergAbstractMetadata
Expand Down Expand Up @@ -168,21 +165,11 @@ public ConnectorOutputTableHandle beginCreateTable(ConnectorSession session, Con
Schema schema = toIcebergSchema(tableMetadata.getColumns());

PartitionSpec partitionSpec = parsePartitionFields(schema, getPartitioning(tableMetadata.getProperties()));

ImmutableMap.Builder<String, String> propertiesBuilder = ImmutableMap.builder();
FileFormat fileFormat = getFileFormat(tableMetadata.getProperties());
propertiesBuilder.put(DEFAULT_FILE_FORMAT, fileFormat.toString());
if (tableMetadata.getComment().isPresent()) {
propertiesBuilder.put(TABLE_COMMENT, tableMetadata.getComment().get());
}
String formatVersion = getFormatVersion(tableMetadata.getProperties());
if (formatVersion != null) {
propertiesBuilder.put(FORMAT_VERSION, formatVersion);
}

try {
transaction = resourceFactory.getCatalog(session).newCreateTableTransaction(
toIcebergTableIdentifier(schemaTableName), schema, partitionSpec, propertiesBuilder.build());
toIcebergTableIdentifier(schemaTableName), schema, partitionSpec, populateTableProperties(tableMetadata, fileFormat));
}
catch (AlreadyExistsException e) {
throw new TableAlreadyExistsException(schemaTableName);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import com.facebook.presto.common.type.ArrayType;
import com.facebook.presto.spi.session.PropertyMetadata;
import com.google.common.collect.ImmutableList;
import org.apache.iceberg.TableProperties;

import javax.inject.Inject;

Expand All @@ -25,6 +26,7 @@

import static com.facebook.presto.common.type.VarcharType.VARCHAR;
import static com.facebook.presto.common.type.VarcharType.createUnboundedVarcharType;
import static com.facebook.presto.spi.session.PropertyMetadata.integerProperty;
import static com.facebook.presto.spi.session.PropertyMetadata.stringProperty;
import static com.google.common.collect.ImmutableList.toImmutableList;
import static java.util.Locale.ENGLISH;
Expand All @@ -35,6 +37,7 @@ public class IcebergTableProperties
public static final String PARTITIONING_PROPERTY = "partitioning";
public static final String LOCATION_PROPERTY = "location";
public static final String FORMAT_VERSION = "format_version";
public static final String COMMIT_RETRIES = "commit_retries";

private final List<PropertyMetadata<?>> tableProperties;
private final List<PropertyMetadata<?>> columnProperties;
Expand Down Expand Up @@ -73,6 +76,11 @@ public IcebergTableProperties(IcebergConfig icebergConfig)
"Format version for the table",
null,
false))
.add(integerProperty(
COMMIT_RETRIES,
"Determines the number of attempts in case of concurrent upserts and deletes",
TableProperties.COMMIT_NUM_RETRIES_DEFAULT,
false))
.build();

columnProperties = ImmutableList.of(stringProperty(
Expand Down Expand Up @@ -113,4 +121,9 @@ public static String getFormatVersion(Map<String, Object> tableProperties)
{
return (String) tableProperties.get(FORMAT_VERSION);
}

public static Integer getCommitRetries(Map<String, Object> tableProperties)
{
return (Integer) tableProperties.get(COMMIT_RETRIES);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import com.facebook.presto.spi.ColumnHandle;
import com.facebook.presto.spi.ConnectorSession;
import com.facebook.presto.spi.ConnectorTableHandle;
import com.facebook.presto.spi.ConnectorTableMetadata;
import com.facebook.presto.spi.Constraint;
import com.facebook.presto.spi.PrestoException;
import com.facebook.presto.spi.SchemaTableName;
Expand Down Expand Up @@ -135,6 +136,8 @@
import static com.facebook.presto.iceberg.IcebergErrorCode.ICEBERG_INVALID_SNAPSHOT_ID;
import static com.facebook.presto.iceberg.IcebergErrorCode.ICEBERG_INVALID_TABLE_TIMESTAMP;
import static com.facebook.presto.iceberg.IcebergSessionProperties.isMergeOnReadModeEnabled;
import static com.facebook.presto.iceberg.IcebergTableProperties.getCommitRetries;
import static com.facebook.presto.iceberg.IcebergTableProperties.getFormatVersion;
import static com.facebook.presto.iceberg.TypeConverter.toIcebergType;
import static com.facebook.presto.iceberg.TypeConverter.toPrestoType;
import static com.facebook.presto.iceberg.util.IcebergPrestoModelConverters.toIcebergTableIdentifier;
Expand Down Expand Up @@ -164,9 +167,11 @@
import static org.apache.iceberg.CatalogProperties.IO_MANIFEST_CACHE_MAX_TOTAL_BYTES;
import static org.apache.iceberg.LocationProviders.locationsFor;
import static org.apache.iceberg.MetadataTableUtils.createMetadataTableInstance;
import static org.apache.iceberg.TableProperties.COMMIT_NUM_RETRIES;
import static org.apache.iceberg.TableProperties.DEFAULT_FILE_FORMAT;
import static org.apache.iceberg.TableProperties.DEFAULT_FILE_FORMAT_DEFAULT;
import static org.apache.iceberg.TableProperties.DELETE_MODE;
import static org.apache.iceberg.TableProperties.FORMAT_VERSION;
import static org.apache.iceberg.TableProperties.MERGE_MODE;
import static org.apache.iceberg.TableProperties.UPDATE_MODE;
import static org.apache.iceberg.TableProperties.WRITE_LOCATION_PROVIDER_IMPL;
Expand Down Expand Up @@ -820,9 +825,9 @@ private static class DeleteFilesIterator
private DeleteFile currentFile;

private DeleteFilesIterator(Map<Integer, PartitionSpec> partitionSpecsById,
CloseableIterator<FileScanTask> fileTasks,
Optional<Set<Integer>> requestedPartitionSpec,
Optional<Set<Integer>> requestedSchema)
CloseableIterator<FileScanTask> fileTasks,
Optional<Set<Integer>> requestedPartitionSpec,
Optional<Set<Integer>> requestedSchema)
{
this.partitionSpecsById = partitionSpecsById;
this.fileTasks = fileTasks;
Expand Down Expand Up @@ -884,4 +889,21 @@ public void close()
fileTasks = CloseableIterator.empty();
}
}

public static Map<String, String> populateTableProperties(ConnectorTableMetadata tableMetadata, FileFormat fileFormat)
{
ImmutableMap.Builder<String, String> propertiesBuilder = ImmutableMap.builderWithExpectedSize(4);
Integer commitRetries = getCommitRetries(tableMetadata.getProperties());
propertiesBuilder.put(DEFAULT_FILE_FORMAT, fileFormat.toString());
propertiesBuilder.put(COMMIT_NUM_RETRIES, String.valueOf(commitRetries));
if (tableMetadata.getComment().isPresent()) {
propertiesBuilder.put(TABLE_COMMENT, tableMetadata.getComment().get());
}

String formatVersion = getFormatVersion(tableMetadata.getProperties());
if (formatVersion != null) {
propertiesBuilder.put(FORMAT_VERSION, formatVersion);
}
return propertiesBuilder.build();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -191,16 +191,18 @@ public void testPropertiesTable()
{
assertQuery("SHOW COLUMNS FROM test_schema.\"test_table$properties\"",
"VALUES ('key', 'varchar', '', '')," + "('value', 'varchar', '', '')");
assertQuery("SELECT COUNT(*) FROM test_schema.\"test_table$properties\"", "VALUES 2");
assertQuery("SELECT COUNT(*) FROM test_schema.\"test_table$properties\"", "VALUES 3");
List<MaterializedRow> materializedRows = computeActual(getSession(),
"SELECT * FROM test_schema.\"test_table$properties\"").getMaterializedRows();

assertThat(materializedRows).hasSize(2);
assertThat(materializedRows).hasSize(3);
assertThat(materializedRows)
.anySatisfy(row -> assertThat(row)
.isEqualTo(new MaterializedRow(MaterializedResult.DEFAULT_PRECISION, "write.format.default", "PARQUET")))
.anySatisfy(row -> assertThat(row)
.isEqualTo(new MaterializedRow(MaterializedResult.DEFAULT_PRECISION, "write.parquet.compression-codec", "zstd")));
.isEqualTo(new MaterializedRow(MaterializedResult.DEFAULT_PRECISION, "write.parquet.compression-codec", "zstd")))
.anySatisfy(row -> assertThat(row)
.isEqualTo(new MaterializedRow(MaterializedResult.DEFAULT_PRECISION, "commit.retry.num-retries", "4")));
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@
*/
package com.facebook.presto.iceberg.hive;

import com.facebook.presto.Session;
import com.facebook.presto.hive.HdfsConfiguration;
import com.facebook.presto.hive.HdfsConfigurationInitializer;
import com.facebook.presto.hive.HdfsEnvironment;
Expand All @@ -30,22 +29,12 @@
import com.facebook.presto.tests.DistributedQueryRunner;
import com.google.common.collect.ImmutableSet;
import org.apache.iceberg.Table;
import org.testng.annotations.Test;

import java.io.File;
import java.util.List;
import java.util.Set;
import java.util.concurrent.CopyOnWriteArraySet;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import java.util.stream.Stream;

import static com.facebook.presto.hive.metastore.CachingHiveMetastore.memoizeMetastore;
import static com.facebook.presto.iceberg.CatalogType.HIVE;
import static java.lang.String.format;
import static org.testng.Assert.fail;

public class TestIcebergSmokeHive
extends IcebergDistributedSmokeTestBase
Expand All @@ -55,51 +44,6 @@ public TestIcebergSmokeHive()
super(HIVE);
}

@Test
public void testConcurrentInsert()
{
final Session session = getSession();
assertUpdate(session, "CREATE TABLE test_concurrent_insert (col0 INTEGER, col1 VARCHAR) WITH (format = 'ORC')");

int concurrency = 5;
final String[] strings = {"one", "two", "three", "four", "five"};
final CountDownLatch countDownLatch = new CountDownLatch(concurrency);
AtomicInteger value = new AtomicInteger(0);
Set<Throwable> errors = new CopyOnWriteArraySet<>();
List<Thread> threads = Stream.generate(() -> new Thread(() -> {
int i = value.getAndIncrement();
try {
getQueryRunner().execute(session, format("INSERT INTO test_concurrent_insert VALUES(%s, '%s')", i + 1, strings[i]));
}
catch (Throwable throwable) {
errors.add(throwable);
}
finally {
countDownLatch.countDown();
}
})).limit(concurrency).collect(Collectors.toList());

threads.forEach(Thread::start);

try {
final int seconds = 10;
if (!countDownLatch.await(seconds, TimeUnit.SECONDS)) {
fail(format("Failed to insert in %s seconds", seconds));
}
if (!errors.isEmpty()) {
fail(format("Failed to insert concurrently: %s", errors.stream().map(Throwable::getMessage).collect(Collectors.joining(" & "))));
}
assertQuery(session, "SELECT count(*) FROM test_concurrent_insert", "SELECT " + concurrency);
assertQuery(session, "SELECT * FROM test_concurrent_insert", "VALUES(1, 'one'), (2, 'two'), (3, 'three'), (4, 'four'), (5, 'five')");
}
catch (InterruptedException e) {
fail("Interrupted when await insertion", e);
}
finally {
dropTable(session, "test_concurrent_insert");
}
}

@Override
protected String getLocation(String schema, String table)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,12 +88,12 @@ public void testPropertiesTable()
{
assertQuery("SHOW COLUMNS FROM test_schema.\"test_table$properties\"",
"VALUES ('key', 'varchar', '', '')," + "('value', 'varchar', '', '')");
assertQuery("SELECT COUNT(*) FROM test_schema.\"test_table$properties\"", "VALUES 5");
assertQuery("SELECT COUNT(*) FROM test_schema.\"test_table$properties\"", "VALUES 6");
List<MaterializedRow> materializedRows = computeActual(getSession(),
"SELECT * FROM test_schema.\"test_table$properties\"").getMaterializedRows();

// nessie writes a "nessie.commit.id" + "gc.enabled=false" to the table properties
assertThat(materializedRows).hasSize(5);
assertThat(materializedRows).hasSize(6);
assertThat(materializedRows)
.anySatisfy(row -> assertThat(row)
.isEqualTo(new MaterializedRow(MaterializedResult.DEFAULT_PRECISION, "write.format.default", "PARQUET")))
Expand All @@ -102,6 +102,8 @@ public void testPropertiesTable()
.anySatisfy(row -> assertThat(row)
.isEqualTo(new MaterializedRow(MaterializedResult.DEFAULT_PRECISION, "write.parquet.compression-codec", "zstd")))
.anySatisfy(row -> assertThat(row)
.isEqualTo(new MaterializedRow(MaterializedResult.DEFAULT_PRECISION, "write.metadata.delete-after-commit.enabled", "false")));
.isEqualTo(new MaterializedRow(MaterializedResult.DEFAULT_PRECISION, "write.metadata.delete-after-commit.enabled", "false")))
.anySatisfy(row -> assertThat(row)
.isEqualTo(new MaterializedRow(MaterializedResult.DEFAULT_PRECISION, "commit.retry.num-retries", "4")));
}
}

0 comments on commit e579a3f

Please sign in to comment.