From abb7d4494f7a0154c173919120f5e6bc049f9348 Mon Sep 17 00:00:00 2001 From: panyliu Date: Fri, 12 Nov 2021 17:53:25 +0800 Subject: [PATCH] Support iceberg connector concurrent insertion Referenced commits: https://github.com/apache/iceberg/commit/718b85d3046561b0d046826f2733021a9511099f, https://github.com/apache/iceberg/commit/d5443e3a34a4288441a015ab616d965557d78202 --- .../hive/metastore/CachingHiveMetastore.java | 13 ++ .../hive/metastore/ExtendedHiveMetastore.java | 11 ++ .../metastore/file/FileHiveMetastore.java | 31 ++++ .../thrift/BridgingHiveMetastore.java | 12 ++ .../hive/metastore/thrift/HiveMetastore.java | 4 + .../metastore/thrift/HiveMetastoreClient.java | 13 ++ .../metastore/thrift/ThriftHiveMetastore.java | 107 +++++++++++++ .../thrift/ThriftHiveMetastoreClient.java | 25 +++ .../thrift/ThriftHiveMetastoreStats.java | 16 ++ .../thrift/InMemoryHiveMetastore.java | 12 ++ .../thrift/MockHiveMetastoreClient.java | 22 +++ .../presto/iceberg/HiveTableOperations.java | 145 +++++++++++------- .../presto/iceberg/TestIcebergSmoke.java | 54 +++++++ 13 files changed, 413 insertions(+), 52 deletions(-) diff --git a/presto-hive-metastore/src/main/java/com/facebook/presto/hive/metastore/CachingHiveMetastore.java b/presto-hive-metastore/src/main/java/com/facebook/presto/hive/metastore/CachingHiveMetastore.java index e88a1b3457453..fae75c7e21688 100644 --- a/presto-hive-metastore/src/main/java/com/facebook/presto/hive/metastore/CachingHiveMetastore.java +++ b/presto-hive-metastore/src/main/java/com/facebook/presto/hive/metastore/CachingHiveMetastore.java @@ -941,6 +941,19 @@ public void setPartitionLeases(MetastoreContext metastoreContext, String databas delegate.setPartitionLeases(metastoreContext, databaseName, tableName, partitionNameToLocation, leaseDuration); } + @Override + public long lock(MetastoreContext metastoreContext, String databaseName, String tableName) + { + tableCache.invalidate(getCachingKey(metastoreContext, hiveTableName(databaseName, tableName))); + return delegate.lock(metastoreContext, databaseName, tableName); + } + + @Override + public void unlock(MetastoreContext metastoreContext, long lockId) + { + delegate.unlock(metastoreContext, lockId); + } + public Set loadTablePrivileges(KeyAndContext loadTablePrivilegesKey) { return delegate.listTablePrivileges(loadTablePrivilegesKey.getContext(), loadTablePrivilegesKey.getKey().getDatabase(), loadTablePrivilegesKey.getKey().getTable(), loadTablePrivilegesKey.getKey().getPrincipal()); diff --git a/presto-hive-metastore/src/main/java/com/facebook/presto/hive/metastore/ExtendedHiveMetastore.java b/presto-hive-metastore/src/main/java/com/facebook/presto/hive/metastore/ExtendedHiveMetastore.java index dd66da57e4721..92b550dd861d6 100644 --- a/presto-hive-metastore/src/main/java/com/facebook/presto/hive/metastore/ExtendedHiveMetastore.java +++ b/presto-hive-metastore/src/main/java/com/facebook/presto/hive/metastore/ExtendedHiveMetastore.java @@ -13,6 +13,7 @@ */ package com.facebook.presto.hive.metastore; +import com.facebook.presto.common.NotSupportedException; import com.facebook.presto.common.predicate.Domain; import com.facebook.presto.common.type.Type; import com.facebook.presto.hive.HiveType; @@ -117,4 +118,14 @@ List getPartitionNamesWithVersionByFilter( Set listTablePrivileges(MetastoreContext metastoreContext, String databaseName, String tableName, PrestoPrincipal principal); void setPartitionLeases(MetastoreContext metastoreContext, String databaseName, String tableName, Map partitionNameToLocation, Duration leaseDuration); + + default long lock(MetastoreContext metastoreContext, String databaseName, String tableName) + { + throw new NotSupportedException("Lock is not supported by default"); + } + + default void unlock(MetastoreContext metastoreContext, long lockId) + { + throw new NotSupportedException("Unlock is not supported by default"); + } } diff --git a/presto-hive-metastore/src/main/java/com/facebook/presto/hive/metastore/file/FileHiveMetastore.java b/presto-hive-metastore/src/main/java/com/facebook/presto/hive/metastore/file/FileHiveMetastore.java index c6c43326724d4..1bd819e1b9fdf 100644 --- a/presto-hive-metastore/src/main/java/com/facebook/presto/hive/metastore/file/FileHiveMetastore.java +++ b/presto-hive-metastore/src/main/java/com/facebook/presto/hive/metastore/file/FileHiveMetastore.java @@ -28,6 +28,7 @@ import com.facebook.presto.hive.metastore.ExtendedHiveMetastore; import com.facebook.presto.hive.metastore.HiveColumnStatistics; import com.facebook.presto.hive.metastore.HivePrivilegeInfo; +import com.facebook.presto.hive.metastore.HiveTableName; import com.facebook.presto.hive.metastore.MetastoreContext; import com.facebook.presto.hive.metastore.MetastoreUtil; import com.facebook.presto.hive.metastore.Partition; @@ -45,6 +46,8 @@ import com.facebook.presto.spi.security.PrestoPrincipal; import com.facebook.presto.spi.security.RoleGrant; import com.facebook.presto.spi.statistics.ColumnStatisticType; +import com.google.common.collect.BiMap; +import com.google.common.collect.HashBiMap; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableSet; @@ -81,6 +84,7 @@ import static com.facebook.presto.hive.HiveErrorCode.HIVE_METASTORE_ERROR; import static com.facebook.presto.hive.HiveErrorCode.HIVE_PARTITION_DROPPED_DURING_QUERY; import static com.facebook.presto.hive.metastore.HivePrivilegeInfo.HivePrivilege.OWNERSHIP; +import static com.facebook.presto.hive.metastore.HiveTableName.hiveTableName; import static com.facebook.presto.hive.metastore.MetastoreUtil.convertPredicateToParts; import static com.facebook.presto.hive.metastore.MetastoreUtil.extractPartitionValues; import static com.facebook.presto.hive.metastore.MetastoreUtil.getHiveBasicStatistics; @@ -123,6 +127,9 @@ public class FileHiveMetastore private final HdfsContext hdfsContext; private final FileSystem metadataFileSystem; + private final BiMap lockedHiveTables = HashBiMap.create(); + private long currentLockId; + private final JsonCodec databaseCodec = JsonCodec.jsonCodec(DatabaseMetadata.class); private final JsonCodec tableCodec = JsonCodec.jsonCodec(TableMetadata.class); private final JsonCodec partitionCodec = JsonCodec.jsonCodec(PartitionMetadata.class); @@ -1005,6 +1012,30 @@ public synchronized void setPartitionLeases(MetastoreContext metastoreContext, S throw new UnsupportedOperationException("setPartitionLeases is not supported in FileHiveMetastore"); } + @Override + public synchronized long lock(MetastoreContext metastoreContext, String databaseName, String tableName) + { + HiveTableName hiveTableName = hiveTableName(databaseName, tableName); + while (lockedHiveTables.containsValue(hiveTableName)) { + try { + Thread.sleep(10); + } + catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new RuntimeException("Waiting for lock interrupted"); + } + } + long lockId = ++currentLockId; + lockedHiveTables.put(lockId, hiveTableName); + return lockId; + } + + @Override + public synchronized void unlock(MetastoreContext metastoreContext, long lockId) + { + lockedHiveTables.remove(lockId); + } + private synchronized void setTablePrivileges( MetastoreContext metastoreContext, PrestoPrincipal grantee, diff --git a/presto-hive-metastore/src/main/java/com/facebook/presto/hive/metastore/thrift/BridgingHiveMetastore.java b/presto-hive-metastore/src/main/java/com/facebook/presto/hive/metastore/thrift/BridgingHiveMetastore.java index 2c8bd2b90d9cf..5332a8fa51b32 100644 --- a/presto-hive-metastore/src/main/java/com/facebook/presto/hive/metastore/thrift/BridgingHiveMetastore.java +++ b/presto-hive-metastore/src/main/java/com/facebook/presto/hive/metastore/thrift/BridgingHiveMetastore.java @@ -378,4 +378,16 @@ public void setPartitionLeases(MetastoreContext metastoreContext, String databas { delegate.setPartitionLeases(metastoreContext, databaseName, tableName, partitionNameToLocation, leaseDuration); } + + @Override + public long lock(MetastoreContext metastoreContext, String databaseName, String tableName) + { + return delegate.lock(metastoreContext, databaseName, tableName); + } + + @Override + public void unlock(MetastoreContext metastoreContext, long lockId) + { + delegate.unlock(metastoreContext, lockId); + } } diff --git a/presto-hive-metastore/src/main/java/com/facebook/presto/hive/metastore/thrift/HiveMetastore.java b/presto-hive-metastore/src/main/java/com/facebook/presto/hive/metastore/thrift/HiveMetastore.java index dbeaef63df10d..83c175c487e27 100644 --- a/presto-hive-metastore/src/main/java/com/facebook/presto/hive/metastore/thrift/HiveMetastore.java +++ b/presto-hive-metastore/src/main/java/com/facebook/presto/hive/metastore/thrift/HiveMetastore.java @@ -114,6 +114,10 @@ default List getPartitionNamesWithVersionByFilter(Meta Set listTablePrivileges(MetastoreContext metastoreContext, String databaseName, String tableName, PrestoPrincipal principal); + long lock(MetastoreContext metastoreContext, String databaseName, String tableName); + + void unlock(MetastoreContext metastoreContext, long lockId); + default void setPartitionLeases(MetastoreContext metastoreContext, String databaseName, String tableName, Map partitionNameToLocation, Duration leaseDuration) { throw new UnsupportedOperationException(); diff --git a/presto-hive-metastore/src/main/java/com/facebook/presto/hive/metastore/thrift/HiveMetastoreClient.java b/presto-hive-metastore/src/main/java/com/facebook/presto/hive/metastore/thrift/HiveMetastoreClient.java index 37d233e1b99f1..e625cc0966dcd 100644 --- a/presto-hive-metastore/src/main/java/com/facebook/presto/hive/metastore/thrift/HiveMetastoreClient.java +++ b/presto-hive-metastore/src/main/java/com/facebook/presto/hive/metastore/thrift/HiveMetastoreClient.java @@ -13,17 +13,21 @@ */ package com.facebook.presto.hive.metastore.thrift; +import org.apache.hadoop.hive.metastore.api.CheckLockRequest; import org.apache.hadoop.hive.metastore.api.ColumnStatisticsObj; import org.apache.hadoop.hive.metastore.api.Database; import org.apache.hadoop.hive.metastore.api.FieldSchema; import org.apache.hadoop.hive.metastore.api.HiveObjectPrivilege; import org.apache.hadoop.hive.metastore.api.HiveObjectRef; +import org.apache.hadoop.hive.metastore.api.LockRequest; +import org.apache.hadoop.hive.metastore.api.LockResponse; import org.apache.hadoop.hive.metastore.api.Partition; import org.apache.hadoop.hive.metastore.api.PrincipalType; import org.apache.hadoop.hive.metastore.api.PrivilegeBag; import org.apache.hadoop.hive.metastore.api.Role; import org.apache.hadoop.hive.metastore.api.RolePrincipalGrant; import org.apache.hadoop.hive.metastore.api.Table; +import org.apache.hadoop.hive.metastore.api.UnlockRequest; import org.apache.thrift.TException; import java.io.Closeable; @@ -146,4 +150,13 @@ List listRoleGrants(String name, PrincipalType principalType void setUGI(String userName) throws TException; + + LockResponse checkLock(CheckLockRequest request) + throws TException; + + LockResponse lock(LockRequest request) + throws TException; + + void unlock(UnlockRequest request) + throws TException; } diff --git a/presto-hive-metastore/src/main/java/com/facebook/presto/hive/metastore/thrift/ThriftHiveMetastore.java b/presto-hive-metastore/src/main/java/com/facebook/presto/hive/metastore/thrift/ThriftHiveMetastore.java index c6f457e32c0c4..0ad2d425bce9b 100644 --- a/presto-hive-metastore/src/main/java/com/facebook/presto/hive/metastore/thrift/ThriftHiveMetastore.java +++ b/presto-hive-metastore/src/main/java/com/facebook/presto/hive/metastore/thrift/ThriftHiveMetastore.java @@ -41,8 +41,10 @@ import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableSet; import com.google.common.collect.Iterables; +import com.google.common.collect.Lists; import org.apache.hadoop.hive.metastore.TableType; import org.apache.hadoop.hive.metastore.api.AlreadyExistsException; +import org.apache.hadoop.hive.metastore.api.CheckLockRequest; import org.apache.hadoop.hive.metastore.api.ColumnStatisticsObj; import org.apache.hadoop.hive.metastore.api.Database; import org.apache.hadoop.hive.metastore.api.FieldSchema; @@ -51,6 +53,11 @@ import org.apache.hadoop.hive.metastore.api.InvalidInputException; import org.apache.hadoop.hive.metastore.api.InvalidObjectException; import org.apache.hadoop.hive.metastore.api.InvalidOperationException; +import org.apache.hadoop.hive.metastore.api.LockComponent; +import org.apache.hadoop.hive.metastore.api.LockLevel; +import org.apache.hadoop.hive.metastore.api.LockRequest; +import org.apache.hadoop.hive.metastore.api.LockResponse; +import org.apache.hadoop.hive.metastore.api.LockState; import org.apache.hadoop.hive.metastore.api.MetaException; import org.apache.hadoop.hive.metastore.api.NoSuchObjectException; import org.apache.hadoop.hive.metastore.api.Partition; @@ -60,6 +67,7 @@ import org.apache.hadoop.hive.metastore.api.Table; import org.apache.hadoop.hive.metastore.api.UnknownDBException; import org.apache.hadoop.hive.metastore.api.UnknownTableException; +import org.apache.hadoop.hive.metastore.api.UnlockRequest; import org.apache.thrift.TException; import org.weakref.jmx.Flatten; import org.weakref.jmx.Managed; @@ -67,6 +75,7 @@ import javax.annotation.concurrent.ThreadSafe; import javax.inject.Inject; +import java.net.InetAddress; import java.util.HashSet; import java.util.Iterator; import java.util.List; @@ -75,6 +84,7 @@ import java.util.OptionalLong; import java.util.Set; import java.util.concurrent.Callable; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.function.Function; import java.util.stream.Collectors; @@ -108,6 +118,9 @@ import static java.util.stream.Collectors.toSet; import static org.apache.hadoop.hive.common.FileUtils.makePartName; import static org.apache.hadoop.hive.metastore.api.HiveObjectType.TABLE; +import static org.apache.hadoop.hive.metastore.api.LockState.ACQUIRED; +import static org.apache.hadoop.hive.metastore.api.LockState.WAITING; +import static org.apache.hadoop.hive.metastore.api.LockType.EXCLUSIVE; import static org.apache.hadoop.hive.metastore.api.hive_metastoreConstants.HIVE_FILTER_FIELD_PARAMS; import static org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector.Category.PRIMITIVE; @@ -1279,6 +1292,100 @@ public Set listTablePrivileges(MetastoreContext metastoreCont } } + @Override + public long lock(MetastoreContext metastoreContext, String databaseName, String tableName) + { + try { + final LockComponent lockComponent = new LockComponent(EXCLUSIVE, LockLevel.TABLE, databaseName); + lockComponent.setTablename(tableName); + final LockRequest lockRequest = new LockRequest(Lists.newArrayList(lockComponent), + metastoreContext.getUsername(), + InetAddress.getLocalHost().getHostName()); + LockResponse lockResponse = stats.getLock().wrap(() -> getMetastoreClientThenCall(metastoreContext, client -> client.lock(lockRequest))).call(); + LockState state = lockResponse.getState(); + long lockId = lockResponse.getLockid(); + final AtomicBoolean acquired = new AtomicBoolean(state.equals(ACQUIRED)); + + try { + if (state.equals(WAITING)) { + retry() + .maxAttempts(Integer.MAX_VALUE - 100) + .stopOnIllegalExceptions() + .exceptionMapper(e -> { + if (e instanceof WaitingForLockException) { + // only retry on waiting for lock exception + return e; + } + else { + return new IllegalStateException(e.getMessage(), e); + } + }) + .run("lock", stats.getLock().wrap(() -> + getMetastoreClientThenCall(metastoreContext, client -> { + LockResponse response = client.checkLock(new CheckLockRequest(lockId)); + LockState newState = response.getState(); + if (newState.equals(WAITING)) { + throw new WaitingForLockException("Waiting for lock."); + } + else if (newState.equals(ACQUIRED)) { + acquired.set(true); + } + else { + throw new RuntimeException(String.format("Failed to acquire lock: %s", newState.name())); + } + return null; + }))); + } + } + finally { + if (!acquired.get()) { + unlock(metastoreContext, lockId); + } + } + + if (!acquired.get()) { + throw new RuntimeException("Failed to acquire lock"); + } + + return lockId; + } + catch (TException e) { + throw new PrestoException(HIVE_METASTORE_ERROR, e); + } + catch (Exception e) { + throw propagate(e); + } + } + + @Override + public void unlock(MetastoreContext metastoreContext, long lockId) + { + try { + retry() + .stopOnIllegalExceptions() + .run("unlock", + stats.getUnlock().wrap(() -> getMetastoreClientThenCall(metastoreContext, client -> { + client.unlock(new UnlockRequest(lockId)); + return null; + }))); + } + catch (TException e) { + throw new PrestoException(HIVE_METASTORE_ERROR, e); + } + catch (Exception e) { + throw propagate(e); + } + } + + private static class WaitingForLockException + extends RuntimeException + { + public WaitingForLockException(String message) + { + super(message); + } + } + private PrivilegeBag buildPrivilegeBag( String databaseName, String tableName, diff --git a/presto-hive-metastore/src/main/java/com/facebook/presto/hive/metastore/thrift/ThriftHiveMetastoreClient.java b/presto-hive-metastore/src/main/java/com/facebook/presto/hive/metastore/thrift/ThriftHiveMetastoreClient.java index fddefc67da263..c9c481f758e79 100644 --- a/presto-hive-metastore/src/main/java/com/facebook/presto/hive/metastore/thrift/ThriftHiveMetastoreClient.java +++ b/presto-hive-metastore/src/main/java/com/facebook/presto/hive/metastore/thrift/ThriftHiveMetastoreClient.java @@ -14,6 +14,7 @@ package com.facebook.presto.hive.metastore.thrift; import com.google.common.collect.ImmutableList; +import org.apache.hadoop.hive.metastore.api.CheckLockRequest; import org.apache.hadoop.hive.metastore.api.ColumnStatistics; import org.apache.hadoop.hive.metastore.api.ColumnStatisticsDesc; import org.apache.hadoop.hive.metastore.api.ColumnStatisticsObj; @@ -26,6 +27,8 @@ import org.apache.hadoop.hive.metastore.api.GrantRevokeType; import org.apache.hadoop.hive.metastore.api.HiveObjectPrivilege; import org.apache.hadoop.hive.metastore.api.HiveObjectRef; +import org.apache.hadoop.hive.metastore.api.LockRequest; +import org.apache.hadoop.hive.metastore.api.LockResponse; import org.apache.hadoop.hive.metastore.api.MetaException; import org.apache.hadoop.hive.metastore.api.Partition; import org.apache.hadoop.hive.metastore.api.PartitionsStatsRequest; @@ -36,6 +39,7 @@ import org.apache.hadoop.hive.metastore.api.Table; import org.apache.hadoop.hive.metastore.api.TableStatsRequest; import org.apache.hadoop.hive.metastore.api.ThriftHiveMetastore; +import org.apache.hadoop.hive.metastore.api.UnlockRequest; import org.apache.thrift.TException; import org.apache.thrift.protocol.TBinaryProtocol; import org.apache.thrift.protocol.TProtocol; @@ -400,4 +404,25 @@ public void setUGI(String userName) { client.set_ugi(userName, new ArrayList<>()); } + + @Override + public LockResponse checkLock(CheckLockRequest request) + throws TException + { + return client.check_lock(request); + } + + @Override + public LockResponse lock(LockRequest request) + throws TException + { + return client.lock(request); + } + + @Override + public void unlock(UnlockRequest request) + throws TException + { + client.unlock(request); + } } diff --git a/presto-hive-metastore/src/main/java/com/facebook/presto/hive/metastore/thrift/ThriftHiveMetastoreStats.java b/presto-hive-metastore/src/main/java/com/facebook/presto/hive/metastore/thrift/ThriftHiveMetastoreStats.java index a95f9290972ff..89d2d4414da17 100644 --- a/presto-hive-metastore/src/main/java/com/facebook/presto/hive/metastore/thrift/ThriftHiveMetastoreStats.java +++ b/presto-hive-metastore/src/main/java/com/facebook/presto/hive/metastore/thrift/ThriftHiveMetastoreStats.java @@ -50,6 +50,8 @@ public class ThriftHiveMetastoreStats private final HiveMetastoreApiStats listRoleGrants = new HiveMetastoreApiStats(); private final HiveMetastoreApiStats createRole = new HiveMetastoreApiStats(); private final HiveMetastoreApiStats dropRole = new HiveMetastoreApiStats(); + private final HiveMetastoreApiStats lock = new HiveMetastoreApiStats(); + private final HiveMetastoreApiStats unlock = new HiveMetastoreApiStats(); @Managed @Nested @@ -274,4 +276,18 @@ public HiveMetastoreApiStats getDropRole() { return dropRole; } + + @Managed + @Nested + public HiveMetastoreApiStats getLock() + { + return lock; + } + + @Managed + @Nested + public HiveMetastoreApiStats getUnlock() + { + return unlock; + } } diff --git a/presto-hive-metastore/src/test/java/com/facebook/presto/hive/metastore/thrift/InMemoryHiveMetastore.java b/presto-hive-metastore/src/test/java/com/facebook/presto/hive/metastore/thrift/InMemoryHiveMetastore.java index 66b2f5932f8f6..2b120185ac76a 100644 --- a/presto-hive-metastore/src/test/java/com/facebook/presto/hive/metastore/thrift/InMemoryHiveMetastore.java +++ b/presto-hive-metastore/src/test/java/com/facebook/presto/hive/metastore/thrift/InMemoryHiveMetastore.java @@ -507,6 +507,18 @@ public void revokeTablePrivileges(MetastoreContext metastoreContext, String data throw new UnsupportedOperationException(); } + @Override + public long lock(MetastoreContext metastoreContext, String databaseName, String tableName) + { + throw new UnsupportedOperationException(); + } + + @Override + public void unlock(MetastoreContext metastoreContext, long lockId) + { + throw new UnsupportedOperationException(); + } + private Partition getPartitionFromInMemoryMap(MetastoreContext metastoreContext, PartitionName name) { com.facebook.presto.hive.metastore.Partition partition = partitions.get(name); diff --git a/presto-hive-metastore/src/test/java/com/facebook/presto/hive/metastore/thrift/MockHiveMetastoreClient.java b/presto-hive-metastore/src/test/java/com/facebook/presto/hive/metastore/thrift/MockHiveMetastoreClient.java index 30bfc40511167..b10bece3dcccb 100644 --- a/presto-hive-metastore/src/test/java/com/facebook/presto/hive/metastore/thrift/MockHiveMetastoreClient.java +++ b/presto-hive-metastore/src/test/java/com/facebook/presto/hive/metastore/thrift/MockHiveMetastoreClient.java @@ -24,11 +24,14 @@ import com.google.common.collect.Lists; import org.apache.hadoop.hive.metastore.TableType; import org.apache.hadoop.hive.metastore.Warehouse; +import org.apache.hadoop.hive.metastore.api.CheckLockRequest; import org.apache.hadoop.hive.metastore.api.ColumnStatisticsObj; import org.apache.hadoop.hive.metastore.api.Database; import org.apache.hadoop.hive.metastore.api.FieldSchema; import org.apache.hadoop.hive.metastore.api.HiveObjectPrivilege; import org.apache.hadoop.hive.metastore.api.HiveObjectRef; +import org.apache.hadoop.hive.metastore.api.LockRequest; +import org.apache.hadoop.hive.metastore.api.LockResponse; import org.apache.hadoop.hive.metastore.api.MetaException; import org.apache.hadoop.hive.metastore.api.NoSuchObjectException; import org.apache.hadoop.hive.metastore.api.Partition; @@ -39,6 +42,7 @@ import org.apache.hadoop.hive.metastore.api.SerDeInfo; import org.apache.hadoop.hive.metastore.api.StorageDescriptor; import org.apache.hadoop.hive.metastore.api.Table; +import org.apache.hadoop.hive.metastore.api.UnlockRequest; import org.apache.thrift.TException; import java.util.List; @@ -416,4 +420,22 @@ public void setUGI(String userName) { // No-op } + + @Override + public LockResponse checkLock(CheckLockRequest request) + { + throw new UnsupportedOperationException(); + } + + @Override + public LockResponse lock(LockRequest request) + { + throw new UnsupportedOperationException(); + } + + @Override + public void unlock(UnlockRequest request) + { + throw new UnsupportedOperationException(); + } } diff --git a/presto-iceberg/src/main/java/com/facebook/presto/iceberg/HiveTableOperations.java b/presto-iceberg/src/main/java/com/facebook/presto/iceberg/HiveTableOperations.java index 78bc184794095..5ddc7c8437a93 100644 --- a/presto-iceberg/src/main/java/com/facebook/presto/iceberg/HiveTableOperations.java +++ b/presto-iceberg/src/main/java/com/facebook/presto/iceberg/HiveTableOperations.java @@ -29,6 +29,9 @@ import com.facebook.presto.spi.SchemaTableName; import com.facebook.presto.spi.TableNotFoundException; import com.facebook.presto.spi.security.PrestoPrincipal; +import com.google.common.cache.CacheBuilder; +import com.google.common.cache.CacheLoader; +import com.google.common.cache.LoadingCache; import com.google.common.collect.ImmutableMultimap; import org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe; import org.apache.hadoop.mapred.FileInputFormat; @@ -53,7 +56,9 @@ import java.util.Map; import java.util.Objects; import java.util.Optional; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; +import java.util.concurrent.locks.ReentrantLock; import static com.facebook.presto.hive.HiveMetadata.TABLE_COMMENT; import static com.facebook.presto.hive.metastore.HivePrivilegeInfo.HivePrivilege.DELETE; @@ -104,6 +109,8 @@ public class HiveTableOperations private boolean shouldRefresh = true; private int version = -1; + private static LoadingCache commitLockCache; + public HiveTableOperations( ExtendedHiveMetastore metastore, MetastoreContext metastoreContext, @@ -156,6 +163,24 @@ private HiveTableOperations( this.tableName = requireNonNull(table, "table is null"); this.owner = requireNonNull(owner, "owner is null"); this.location = requireNonNull(location, "location is null"); + //TODO: duration from config + initTableLevelLockCache(TimeUnit.MINUTES.toMillis(10)); + } + + private static synchronized void initTableLevelLockCache(long evictionTimeout) + { + if (commitLockCache == null) { + commitLockCache = CacheBuilder.newBuilder() + .expireAfterAccess(evictionTimeout, TimeUnit.MILLISECONDS) + .build( + new CacheLoader() { + @Override + public ReentrantLock load(String fullName) + { + return new ReentrantLock(); + } + }); + } } @Override @@ -208,72 +233,88 @@ public void commit(@Nullable TableMetadata base, TableMetadata metadata) String newMetadataLocation = writeNewMetadata(metadata, version + 1); - // TODO: use metastore locking - Table table; + // getting a process-level lock per table to avoid concurrent commit attempts to the same table from the same + // JVM process, which would result in unnecessary and costly HMS lock acquisition requests + Optional lockId = Optional.empty(); + ReentrantLock tableLevelMutex = commitLockCache.getUnchecked(database + "." + tableName); + tableLevelMutex.lock(); try { - if (base == null) { - String tableComment = metadata.properties().get(TABLE_COMMENT); - Map parameters = new HashMap<>(); - parameters.put("EXTERNAL", "TRUE"); - parameters.put(TABLE_TYPE_PROP, ICEBERG_TABLE_TYPE_VALUE); - parameters.put(METADATA_LOCATION, newMetadataLocation); - if (tableComment != null) { - parameters.put(TABLE_COMMENT, tableComment); + try { + lockId = Optional.of(metastore.lock(metastoreContext, database, tableName)); + if (base == null) { + String tableComment = metadata.properties().get(TABLE_COMMENT); + Map parameters = new HashMap<>(); + parameters.put("EXTERNAL", "TRUE"); + parameters.put(TABLE_TYPE_PROP, ICEBERG_TABLE_TYPE_VALUE); + parameters.put(METADATA_LOCATION, newMetadataLocation); + if (tableComment != null) { + parameters.put(TABLE_COMMENT, tableComment); + } + Table.Builder builder = Table.builder() + .setDatabaseName(database) + .setTableName(tableName) + .setOwner(owner.orElseThrow(() -> new IllegalStateException("Owner not set"))) + .setTableType(PrestoTableType.EXTERNAL_TABLE) + .setDataColumns(toHiveColumns(metadata.schema().columns())) + .withStorage(storage -> storage.setLocation(metadata.location())) + .withStorage(storage -> storage.setStorageFormat(STORAGE_FORMAT)) + .setParameters(parameters); + table = builder.build(); } - Table.Builder builder = Table.builder() - .setDatabaseName(database) - .setTableName(tableName) - .setOwner(owner.orElseThrow(() -> new IllegalStateException("Owner not set"))) - .setTableType(PrestoTableType.EXTERNAL_TABLE) - .setDataColumns(toHiveColumns(metadata.schema().columns())) - .withStorage(storage -> storage.setLocation(metadata.location())) - .withStorage(storage -> storage.setStorageFormat(STORAGE_FORMAT)) - .setParameters(parameters); - table = builder.build(); - } - else { - Table currentTable = getTable(); - checkState(currentMetadataLocation != null, "No current metadata location for existing table"); - String metadataLocation = currentTable.getParameters().get(METADATA_LOCATION); - if (!currentMetadataLocation.equals(metadataLocation)) { - throw new CommitFailedException("Metadata location [%s] is not same as table metadata location [%s] for %s", currentMetadataLocation, metadataLocation, getSchemaTableName()); + else { + Table currentTable = getTable(); + checkState(currentMetadataLocation != null, "No current metadata location for existing table"); + String metadataLocation = currentTable.getParameters().get(METADATA_LOCATION); + if (!currentMetadataLocation.equals(metadataLocation)) { + throw new CommitFailedException("Metadata location [%s] is not same as table metadata location [%s] for %s", currentMetadataLocation, metadataLocation, getSchemaTableName()); + } + table = Table.builder(currentTable) + .setDataColumns(toHiveColumns(metadata.schema().columns())) + .withStorage(storage -> storage.setLocation(metadata.location())) + .setParameter(METADATA_LOCATION, newMetadataLocation) + .setParameter(PREVIOUS_METADATA_LOCATION, currentMetadataLocation) + .build(); } - table = Table.builder(currentTable) - .setDataColumns(toHiveColumns(metadata.schema().columns())) - .withStorage(storage -> storage.setLocation(metadata.location())) - .setParameter(METADATA_LOCATION, newMetadataLocation) - .setParameter(PREVIOUS_METADATA_LOCATION, currentMetadataLocation) - .build(); - } - } - catch (RuntimeException e) { - try { - io().deleteFile(newMetadataLocation); } - catch (RuntimeException exception) { - e.addSuppressed(exception); + catch (RuntimeException e) { + try { + io().deleteFile(newMetadataLocation); + } + catch (RuntimeException exception) { + e.addSuppressed(exception); + } + throw e; } - throw e; - } - PrestoPrincipal owner = new PrestoPrincipal(USER, table.getOwner()); - PrincipalPrivileges privileges = new PrincipalPrivileges( - ImmutableMultimap.builder() + PrestoPrincipal owner = new PrestoPrincipal(USER, table.getOwner()); + PrincipalPrivileges privileges = new PrincipalPrivileges( + ImmutableMultimap.builder() .put(table.getOwner(), new HivePrivilegeInfo(SELECT, true, owner, owner)) .put(table.getOwner(), new HivePrivilegeInfo(INSERT, true, owner, owner)) .put(table.getOwner(), new HivePrivilegeInfo(UPDATE, true, owner, owner)) .put(table.getOwner(), new HivePrivilegeInfo(DELETE, true, owner, owner)) .build(), - ImmutableMultimap.of()); - if (base == null) { - metastore.createTable(metastoreContext, table, privileges); + ImmutableMultimap.of()); + if (base == null) { + metastore.createTable(metastoreContext, table, privileges); + } + else { + metastore.replaceTable(metastoreContext, database, tableName, table, privileges); + } } - else { - metastore.replaceTable(metastoreContext, database, tableName, table, privileges); + finally { + shouldRefresh = true; + try { + lockId.ifPresent(id -> metastore.unlock(metastoreContext, id)); + } + catch (Exception e) { + log.error(e, "Failed to unlock: %s", lockId.orElse(null)); + } + finally { + tableLevelMutex.unlock(); + } } - - shouldRefresh = true; } @Override diff --git a/presto-iceberg/src/test/java/com/facebook/presto/iceberg/TestIcebergSmoke.java b/presto-iceberg/src/test/java/com/facebook/presto/iceberg/TestIcebergSmoke.java index 88a26b476e9f2..ac4f971f57318 100644 --- a/presto-iceberg/src/test/java/com/facebook/presto/iceberg/TestIcebergSmoke.java +++ b/presto-iceberg/src/test/java/com/facebook/presto/iceberg/TestIcebergSmoke.java @@ -23,9 +23,17 @@ import org.intellij.lang.annotations.Language; import org.testng.annotations.Test; +import java.util.List; +import java.util.Set; +import java.util.concurrent.CopyOnWriteArraySet; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; import java.util.function.BiConsumer; import java.util.regex.Matcher; import java.util.regex.Pattern; +import java.util.stream.Collectors; +import java.util.stream.Stream; import static com.facebook.presto.common.type.VarcharType.VARCHAR; import static com.facebook.presto.iceberg.IcebergQueryRunner.createIcebergQueryRunner; @@ -37,6 +45,7 @@ import static org.testng.Assert.assertEquals; import static org.testng.Assert.assertFalse; import static org.testng.Assert.assertTrue; +import static org.testng.Assert.fail; public class TestIcebergSmoke extends AbstractTestIntegrationSmokeTest @@ -424,6 +433,51 @@ public void testInsertIntoNotNullColumn() // TODO: To support non-null column. (NOT_NULL_COLUMN_CONSTRAINT) } + @Test + public void testConcurrentInsert() + { + final Session session = getSession(); + assertUpdate(session, "CREATE TABLE test_concurrent_insert (col0 INTEGER, col1 VARCHAR) WITH (format = 'ORC')"); + + int concurrency = 5; + final String[] strings = {"one", "two", "three", "four", "five"}; + final CountDownLatch countDownLatch = new CountDownLatch(concurrency); + AtomicInteger value = new AtomicInteger(0); + Set errors = new CopyOnWriteArraySet<>(); + List threads = Stream.generate(() -> new Thread(() -> { + int i = value.getAndIncrement(); + try { + getQueryRunner().execute(session, format("INSERT INTO test_concurrent_insert VALUES(%s, '%s')", i + 1, strings[i])); + } + catch (Throwable throwable) { + errors.add(throwable); + } + finally { + countDownLatch.countDown(); + } + })).limit(concurrency).collect(Collectors.toList()); + + threads.forEach(Thread::start); + + try { + final int seconds = 10; + if (!countDownLatch.await(seconds, TimeUnit.SECONDS)) { + fail(format("Failed to insert in %s seconds", seconds)); + } + if (!errors.isEmpty()) { + fail(format("Failed to insert concurrently: %s", errors.stream().map(Throwable::getMessage).collect(Collectors.joining(" & ")))); + } + assertQuery(session, "SELECT count(*) FROM test_concurrent_insert", "SELECT " + concurrency); + assertQuery(session, "SELECT * FROM test_concurrent_insert", "VALUES(1, 'one'), (2, 'two'), (3, 'three'), (4, 'four'), (5, 'five')"); + } + catch (InterruptedException e) { + fail("Interrupted when await insertion", e); + } + finally { + dropTable(session, "test_concurrent_insert"); + } + } + @Test public void testSchemaEvolution() {