Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix concurrent insertions for Iceberg tables #21250

Merged
merged 1 commit into from
Feb 14, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions presto-docs/src/main/sphinx/connector/iceberg.rst
Original file line number Diff line number Diff line change
Expand Up @@ -254,6 +254,10 @@ Property Name Description
``format_version`` Optionally specifies the format version of the Iceberg
specification to use for new tables, either ``1`` or ``2``.
Defaults to ``1``.

``commit_retries`` Determines the number of attempts for committing the metadata
in case of concurrent upsert requests, before failing. The
default value is 4.
========================================= ===============================================================

The table definition below specifies format ``ORC``, partitioning by columns ``c1`` and ``c2``,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,6 @@
import static com.facebook.presto.hive.metastore.HivePrivilegeInfo.HivePrivilege.INSERT;
import static com.facebook.presto.hive.metastore.HivePrivilegeInfo.HivePrivilege.SELECT;
import static com.facebook.presto.hive.metastore.HivePrivilegeInfo.HivePrivilege.UPDATE;
import static com.facebook.presto.hive.metastore.MetastoreUtil.TABLE_COMMENT;
import static com.facebook.presto.hive.metastore.MetastoreUtil.buildInitialPrivilegeSet;
import static com.facebook.presto.hive.metastore.MetastoreUtil.checkIfNullView;
import static com.facebook.presto.hive.metastore.MetastoreUtil.createTableObjectForViewCreation;
Expand All @@ -113,14 +112,14 @@
import static com.facebook.presto.iceberg.IcebergSchemaProperties.getSchemaLocation;
import static com.facebook.presto.iceberg.IcebergSessionProperties.getHiveStatisticsMergeStrategy;
import static com.facebook.presto.iceberg.IcebergTableProperties.getFileFormat;
import static com.facebook.presto.iceberg.IcebergTableProperties.getFormatVersion;
import static com.facebook.presto.iceberg.IcebergTableProperties.getPartitioning;
import static com.facebook.presto.iceberg.IcebergTableProperties.getTableLocation;
import static com.facebook.presto.iceberg.IcebergTableType.DATA;
import static com.facebook.presto.iceberg.IcebergUtil.createIcebergViewProperties;
import static com.facebook.presto.iceberg.IcebergUtil.getColumns;
import static com.facebook.presto.iceberg.IcebergUtil.getHiveIcebergTable;
import static com.facebook.presto.iceberg.IcebergUtil.isIcebergTable;
import static com.facebook.presto.iceberg.IcebergUtil.populateTableProperties;
import static com.facebook.presto.iceberg.IcebergUtil.toHiveColumns;
import static com.facebook.presto.iceberg.IcebergUtil.tryGetProperties;
import static com.facebook.presto.iceberg.PartitionFields.parsePartitionFields;
Expand All @@ -137,8 +136,6 @@
import static java.util.Locale.ENGLISH;
import static java.util.Objects.requireNonNull;
import static org.apache.iceberg.TableMetadata.newTableMetadata;
import static org.apache.iceberg.TableProperties.DEFAULT_FILE_FORMAT;
import static org.apache.iceberg.TableProperties.FORMAT_VERSION;
import static org.apache.iceberg.TableProperties.OBJECT_STORE_PATH;
import static org.apache.iceberg.TableProperties.WRITE_DATA_LOCATION;
import static org.apache.iceberg.TableProperties.WRITE_METADATA_LOCATION;
Expand Down Expand Up @@ -301,20 +298,8 @@ public ConnectorOutputTableHandle beginCreateTable(ConnectorSession session, Con
throw new TableAlreadyExistsException(schemaTableName);
}

ImmutableMap.Builder<String, String> propertiesBuilder = ImmutableMap.builderWithExpectedSize(3);
FileFormat fileFormat = getFileFormat(tableMetadata.getProperties());
propertiesBuilder.put(DEFAULT_FILE_FORMAT, fileFormat.toString());
if (tableMetadata.getComment().isPresent()) {
propertiesBuilder.put(TABLE_COMMENT, tableMetadata.getComment().get());
}

String formatVersion = getFormatVersion(tableMetadata.getProperties());
if (formatVersion != null) {
propertiesBuilder.put(FORMAT_VERSION, formatVersion);
}

TableMetadata metadata = newTableMetadata(schema, partitionSpec, targetPath, propertiesBuilder.build());

TableMetadata metadata = newTableMetadata(schema, partitionSpec, targetPath, populateTableProperties(tableMetadata, fileFormat));
transaction = createTableTransaction(tableName, operations, metadata);

return new IcebergWritableTableHandle(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,6 @@ public class IcebergHiveMetadataFactory

@Inject
public IcebergHiveMetadataFactory(
IcebergConfig config,
ExtendedHiveMetastore metastore,
HdfsEnvironment hdfsEnvironment,
TypeManager typeManager,
Expand All @@ -59,7 +58,6 @@ public IcebergHiveMetadataFactory(
this.commitTaskCodec = requireNonNull(commitTaskCodec, "commitTaskCodec is null");
this.nodeVersion = requireNonNull(nodeVersion, "nodeVersion is null");
this.filterStatsCalculatorService = requireNonNull(filterStatsCalculatorService, "filterStatsCalculatorService is null");
requireNonNull(config, "config is null");
}

public ConnectorMetadata create()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@
import com.facebook.presto.spi.SchemaTableName;
import com.facebook.presto.spi.function.StandardFunctionResolution;
import com.facebook.presto.spi.relation.RowExpressionService;
import com.google.common.collect.ImmutableMap;
import org.apache.hadoop.fs.Path;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.PartitionSpecParser;
Expand All @@ -46,11 +45,11 @@
import java.util.Optional;

import static com.facebook.presto.iceberg.IcebergTableProperties.getFileFormat;
import static com.facebook.presto.iceberg.IcebergTableProperties.getFormatVersion;
import static com.facebook.presto.iceberg.IcebergTableProperties.getPartitioning;
import static com.facebook.presto.iceberg.IcebergTableType.DATA;
import static com.facebook.presto.iceberg.IcebergUtil.getColumns;
import static com.facebook.presto.iceberg.IcebergUtil.getNativeIcebergTable;
import static com.facebook.presto.iceberg.IcebergUtil.populateTableProperties;
import static com.facebook.presto.iceberg.PartitionFields.parsePartitionFields;
import static com.facebook.presto.iceberg.util.IcebergPrestoModelConverters.toIcebergNamespace;
import static com.facebook.presto.iceberg.util.IcebergPrestoModelConverters.toIcebergTableIdentifier;
Expand All @@ -61,8 +60,6 @@
import static java.util.Objects.requireNonNull;
import static java.util.stream.Collectors.toList;
import static java.util.stream.Collectors.toMap;
import static org.apache.iceberg.TableProperties.DEFAULT_FILE_FORMAT;
import static org.apache.iceberg.TableProperties.FORMAT_VERSION;

public class IcebergNativeMetadata
extends IcebergAbstractMetadata
Expand Down Expand Up @@ -168,21 +165,11 @@ public ConnectorOutputTableHandle beginCreateTable(ConnectorSession session, Con
Schema schema = toIcebergSchema(tableMetadata.getColumns());

PartitionSpec partitionSpec = parsePartitionFields(schema, getPartitioning(tableMetadata.getProperties()));

ImmutableMap.Builder<String, String> propertiesBuilder = ImmutableMap.builder();
FileFormat fileFormat = getFileFormat(tableMetadata.getProperties());
propertiesBuilder.put(DEFAULT_FILE_FORMAT, fileFormat.toString());
if (tableMetadata.getComment().isPresent()) {
propertiesBuilder.put(TABLE_COMMENT, tableMetadata.getComment().get());
}
String formatVersion = getFormatVersion(tableMetadata.getProperties());
if (formatVersion != null) {
propertiesBuilder.put(FORMAT_VERSION, formatVersion);
}

try {
transaction = resourceFactory.getCatalog(session).newCreateTableTransaction(
toIcebergTableIdentifier(schemaTableName), schema, partitionSpec, propertiesBuilder.build());
toIcebergTableIdentifier(schemaTableName), schema, partitionSpec, populateTableProperties(tableMetadata, fileFormat));
}
catch (AlreadyExistsException e) {
throw new TableAlreadyExistsException(schemaTableName);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import com.facebook.presto.common.type.ArrayType;
import com.facebook.presto.spi.session.PropertyMetadata;
import com.google.common.collect.ImmutableList;
import org.apache.iceberg.TableProperties;

import javax.inject.Inject;

Expand All @@ -25,6 +26,7 @@

import static com.facebook.presto.common.type.VarcharType.VARCHAR;
import static com.facebook.presto.common.type.VarcharType.createUnboundedVarcharType;
import static com.facebook.presto.spi.session.PropertyMetadata.integerProperty;
import static com.facebook.presto.spi.session.PropertyMetadata.stringProperty;
import static com.google.common.collect.ImmutableList.toImmutableList;
import static java.util.Locale.ENGLISH;
Expand All @@ -35,6 +37,7 @@ public class IcebergTableProperties
public static final String PARTITIONING_PROPERTY = "partitioning";
public static final String LOCATION_PROPERTY = "location";
public static final String FORMAT_VERSION = "format_version";
public static final String COMMIT_RETRIES = "commit_retries";

private final List<PropertyMetadata<?>> tableProperties;
private final List<PropertyMetadata<?>> columnProperties;
Expand Down Expand Up @@ -73,6 +76,11 @@ public IcebergTableProperties(IcebergConfig icebergConfig)
"Format version for the table",
null,
false))
.add(integerProperty(
COMMIT_RETRIES,
"Determines the number of attempts in case of concurrent upserts and deletes",
TableProperties.COMMIT_NUM_RETRIES_DEFAULT,
false))
.build();

columnProperties = ImmutableList.of(stringProperty(
Expand Down Expand Up @@ -113,4 +121,9 @@ public static String getFormatVersion(Map<String, Object> tableProperties)
{
return (String) tableProperties.get(FORMAT_VERSION);
}

public static Integer getCommitRetries(Map<String, Object> tableProperties)
{
return (Integer) tableProperties.get(COMMIT_RETRIES);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import com.facebook.presto.spi.ColumnHandle;
import com.facebook.presto.spi.ConnectorSession;
import com.facebook.presto.spi.ConnectorTableHandle;
import com.facebook.presto.spi.ConnectorTableMetadata;
import com.facebook.presto.spi.Constraint;
import com.facebook.presto.spi.PrestoException;
import com.facebook.presto.spi.SchemaTableName;
Expand Down Expand Up @@ -135,6 +136,8 @@
import static com.facebook.presto.iceberg.IcebergErrorCode.ICEBERG_INVALID_SNAPSHOT_ID;
import static com.facebook.presto.iceberg.IcebergErrorCode.ICEBERG_INVALID_TABLE_TIMESTAMP;
import static com.facebook.presto.iceberg.IcebergSessionProperties.isMergeOnReadModeEnabled;
import static com.facebook.presto.iceberg.IcebergTableProperties.getCommitRetries;
import static com.facebook.presto.iceberg.IcebergTableProperties.getFormatVersion;
import static com.facebook.presto.iceberg.TypeConverter.toIcebergType;
import static com.facebook.presto.iceberg.TypeConverter.toPrestoType;
import static com.facebook.presto.iceberg.util.IcebergPrestoModelConverters.toIcebergTableIdentifier;
Expand Down Expand Up @@ -164,9 +167,11 @@
import static org.apache.iceberg.CatalogProperties.IO_MANIFEST_CACHE_MAX_TOTAL_BYTES;
import static org.apache.iceberg.LocationProviders.locationsFor;
import static org.apache.iceberg.MetadataTableUtils.createMetadataTableInstance;
import static org.apache.iceberg.TableProperties.COMMIT_NUM_RETRIES;
import static org.apache.iceberg.TableProperties.DEFAULT_FILE_FORMAT;
import static org.apache.iceberg.TableProperties.DEFAULT_FILE_FORMAT_DEFAULT;
import static org.apache.iceberg.TableProperties.DELETE_MODE;
import static org.apache.iceberg.TableProperties.FORMAT_VERSION;
import static org.apache.iceberg.TableProperties.MERGE_MODE;
import static org.apache.iceberg.TableProperties.UPDATE_MODE;
import static org.apache.iceberg.TableProperties.WRITE_LOCATION_PROVIDER_IMPL;
Expand Down Expand Up @@ -820,9 +825,9 @@ private static class DeleteFilesIterator
private DeleteFile currentFile;

private DeleteFilesIterator(Map<Integer, PartitionSpec> partitionSpecsById,
CloseableIterator<FileScanTask> fileTasks,
Optional<Set<Integer>> requestedPartitionSpec,
Optional<Set<Integer>> requestedSchema)
CloseableIterator<FileScanTask> fileTasks,
Optional<Set<Integer>> requestedPartitionSpec,
Optional<Set<Integer>> requestedSchema)
{
this.partitionSpecsById = partitionSpecsById;
this.fileTasks = fileTasks;
Expand Down Expand Up @@ -884,4 +889,21 @@ public void close()
fileTasks = CloseableIterator.empty();
}
}

public static Map<String, String> populateTableProperties(ConnectorTableMetadata tableMetadata, FileFormat fileFormat)
{
ImmutableMap.Builder<String, String> propertiesBuilder = ImmutableMap.builderWithExpectedSize(4);
Integer commitRetries = getCommitRetries(tableMetadata.getProperties());
propertiesBuilder.put(DEFAULT_FILE_FORMAT, fileFormat.toString());
propertiesBuilder.put(COMMIT_NUM_RETRIES, String.valueOf(commitRetries));
if (tableMetadata.getComment().isPresent()) {
propertiesBuilder.put(TABLE_COMMENT, tableMetadata.getComment().get());
}

String formatVersion = getFormatVersion(tableMetadata.getProperties());
if (formatVersion != null) {
propertiesBuilder.put(FORMAT_VERSION, formatVersion);
}
return propertiesBuilder.build();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -191,16 +191,18 @@ public void testPropertiesTable()
{
assertQuery("SHOW COLUMNS FROM test_schema.\"test_table$properties\"",
"VALUES ('key', 'varchar', '', '')," + "('value', 'varchar', '', '')");
assertQuery("SELECT COUNT(*) FROM test_schema.\"test_table$properties\"", "VALUES 2");
assertQuery("SELECT COUNT(*) FROM test_schema.\"test_table$properties\"", "VALUES 3");
List<MaterializedRow> materializedRows = computeActual(getSession(),
"SELECT * FROM test_schema.\"test_table$properties\"").getMaterializedRows();

assertThat(materializedRows).hasSize(2);
assertThat(materializedRows).hasSize(3);
assertThat(materializedRows)
.anySatisfy(row -> assertThat(row)
.isEqualTo(new MaterializedRow(MaterializedResult.DEFAULT_PRECISION, "write.format.default", "PARQUET")))
.anySatisfy(row -> assertThat(row)
.isEqualTo(new MaterializedRow(MaterializedResult.DEFAULT_PRECISION, "write.parquet.compression-codec", "zstd")));
.isEqualTo(new MaterializedRow(MaterializedResult.DEFAULT_PRECISION, "write.parquet.compression-codec", "zstd")))
.anySatisfy(row -> assertThat(row)
.isEqualTo(new MaterializedRow(MaterializedResult.DEFAULT_PRECISION, "commit.retry.num-retries", "4")));
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@
*/
package com.facebook.presto.iceberg.hive;

import com.facebook.presto.Session;
import com.facebook.presto.hive.HdfsConfiguration;
import com.facebook.presto.hive.HdfsConfigurationInitializer;
import com.facebook.presto.hive.HdfsEnvironment;
Expand All @@ -30,22 +29,12 @@
import com.facebook.presto.tests.DistributedQueryRunner;
import com.google.common.collect.ImmutableSet;
import org.apache.iceberg.Table;
import org.testng.annotations.Test;

import java.io.File;
import java.util.List;
import java.util.Set;
import java.util.concurrent.CopyOnWriteArraySet;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import java.util.stream.Stream;

import static com.facebook.presto.hive.metastore.CachingHiveMetastore.memoizeMetastore;
import static com.facebook.presto.iceberg.CatalogType.HIVE;
import static java.lang.String.format;
import static org.testng.Assert.fail;

public class TestIcebergSmokeHive
extends IcebergDistributedSmokeTestBase
Expand All @@ -55,51 +44,6 @@ public TestIcebergSmokeHive()
super(HIVE);
}

@Test
public void testConcurrentInsert()
{
final Session session = getSession();
assertUpdate(session, "CREATE TABLE test_concurrent_insert (col0 INTEGER, col1 VARCHAR) WITH (format = 'ORC')");

int concurrency = 5;
final String[] strings = {"one", "two", "three", "four", "five"};
final CountDownLatch countDownLatch = new CountDownLatch(concurrency);
AtomicInteger value = new AtomicInteger(0);
Set<Throwable> errors = new CopyOnWriteArraySet<>();
List<Thread> threads = Stream.generate(() -> new Thread(() -> {
int i = value.getAndIncrement();
try {
getQueryRunner().execute(session, format("INSERT INTO test_concurrent_insert VALUES(%s, '%s')", i + 1, strings[i]));
}
catch (Throwable throwable) {
errors.add(throwable);
}
finally {
countDownLatch.countDown();
}
})).limit(concurrency).collect(Collectors.toList());

threads.forEach(Thread::start);

try {
final int seconds = 10;
if (!countDownLatch.await(seconds, TimeUnit.SECONDS)) {
fail(format("Failed to insert in %s seconds", seconds));
}
if (!errors.isEmpty()) {
fail(format("Failed to insert concurrently: %s", errors.stream().map(Throwable::getMessage).collect(Collectors.joining(" & "))));
}
assertQuery(session, "SELECT count(*) FROM test_concurrent_insert", "SELECT " + concurrency);
assertQuery(session, "SELECT * FROM test_concurrent_insert", "VALUES(1, 'one'), (2, 'two'), (3, 'three'), (4, 'four'), (5, 'five')");
}
catch (InterruptedException e) {
fail("Interrupted when await insertion", e);
}
finally {
dropTable(session, "test_concurrent_insert");
}
}

@Override
protected String getLocation(String schema, String table)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,12 +88,12 @@ public void testPropertiesTable()
{
assertQuery("SHOW COLUMNS FROM test_schema.\"test_table$properties\"",
"VALUES ('key', 'varchar', '', '')," + "('value', 'varchar', '', '')");
assertQuery("SELECT COUNT(*) FROM test_schema.\"test_table$properties\"", "VALUES 5");
assertQuery("SELECT COUNT(*) FROM test_schema.\"test_table$properties\"", "VALUES 6");
List<MaterializedRow> materializedRows = computeActual(getSession(),
"SELECT * FROM test_schema.\"test_table$properties\"").getMaterializedRows();

// nessie writes a "nessie.commit.id" + "gc.enabled=false" to the table properties
assertThat(materializedRows).hasSize(5);
assertThat(materializedRows).hasSize(6);
assertThat(materializedRows)
.anySatisfy(row -> assertThat(row)
.isEqualTo(new MaterializedRow(MaterializedResult.DEFAULT_PRECISION, "write.format.default", "PARQUET")))
Expand All @@ -102,6 +102,8 @@ public void testPropertiesTable()
.anySatisfy(row -> assertThat(row)
.isEqualTo(new MaterializedRow(MaterializedResult.DEFAULT_PRECISION, "write.parquet.compression-codec", "zstd")))
.anySatisfy(row -> assertThat(row)
.isEqualTo(new MaterializedRow(MaterializedResult.DEFAULT_PRECISION, "write.metadata.delete-after-commit.enabled", "false")));
.isEqualTo(new MaterializedRow(MaterializedResult.DEFAULT_PRECISION, "write.metadata.delete-after-commit.enabled", "false")))
.anySatisfy(row -> assertThat(row)
.isEqualTo(new MaterializedRow(MaterializedResult.DEFAULT_PRECISION, "commit.retry.num-retries", "4")));
}
}
Loading