Skip to content

Commit

Permalink
Support iceberg connector concurrent insertion
Browse files Browse the repository at this point in the history
  • Loading branch information
liupan664021 authored and beinan committed Feb 23, 2022
1 parent fe1e84d commit 59d092c
Show file tree
Hide file tree
Showing 13 changed files with 413 additions and 52 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -941,6 +941,19 @@ public void setPartitionLeases(MetastoreContext metastoreContext, String databas
delegate.setPartitionLeases(metastoreContext, databaseName, tableName, partitionNameToLocation, leaseDuration);
}

@Override
public long lock(MetastoreContext metastoreContext, String databaseName, String tableName)
{
tableCache.invalidate(getCachingKey(metastoreContext, hiveTableName(databaseName, tableName)));
return delegate.lock(metastoreContext, databaseName, tableName);
}

@Override
public void unlock(MetastoreContext metastoreContext, long lockId)
{
delegate.unlock(metastoreContext, lockId);
}

public Set<HivePrivilegeInfo> loadTablePrivileges(KeyAndContext<UserTableKey> loadTablePrivilegesKey)
{
return delegate.listTablePrivileges(loadTablePrivilegesKey.getContext(), loadTablePrivilegesKey.getKey().getDatabase(), loadTablePrivilegesKey.getKey().getTable(), loadTablePrivilegesKey.getKey().getPrincipal());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
*/
package com.facebook.presto.hive.metastore;

import com.facebook.presto.common.NotSupportedException;
import com.facebook.presto.common.predicate.Domain;
import com.facebook.presto.common.type.Type;
import com.facebook.presto.hive.HiveType;
Expand Down Expand Up @@ -117,4 +118,14 @@ List<PartitionNameWithVersion> getPartitionNamesWithVersionByFilter(
Set<HivePrivilegeInfo> listTablePrivileges(MetastoreContext metastoreContext, String databaseName, String tableName, PrestoPrincipal principal);

void setPartitionLeases(MetastoreContext metastoreContext, String databaseName, String tableName, Map<String, String> partitionNameToLocation, Duration leaseDuration);

default long lock(MetastoreContext metastoreContext, String databaseName, String tableName)
{
throw new NotSupportedException("Lock is not supported by default");
}

default void unlock(MetastoreContext metastoreContext, long lockId)
{
throw new NotSupportedException("Unlock is not supported by default");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import com.facebook.presto.hive.metastore.ExtendedHiveMetastore;
import com.facebook.presto.hive.metastore.HiveColumnStatistics;
import com.facebook.presto.hive.metastore.HivePrivilegeInfo;
import com.facebook.presto.hive.metastore.HiveTableName;
import com.facebook.presto.hive.metastore.MetastoreContext;
import com.facebook.presto.hive.metastore.MetastoreUtil;
import com.facebook.presto.hive.metastore.Partition;
Expand All @@ -45,6 +46,8 @@
import com.facebook.presto.spi.security.PrestoPrincipal;
import com.facebook.presto.spi.security.RoleGrant;
import com.facebook.presto.spi.statistics.ColumnStatisticType;
import com.google.common.collect.BiMap;
import com.google.common.collect.HashBiMap;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
Expand Down Expand Up @@ -81,6 +84,7 @@
import static com.facebook.presto.hive.HiveErrorCode.HIVE_METASTORE_ERROR;
import static com.facebook.presto.hive.HiveErrorCode.HIVE_PARTITION_DROPPED_DURING_QUERY;
import static com.facebook.presto.hive.metastore.HivePrivilegeInfo.HivePrivilege.OWNERSHIP;
import static com.facebook.presto.hive.metastore.HiveTableName.hiveTableName;
import static com.facebook.presto.hive.metastore.MetastoreUtil.convertPredicateToParts;
import static com.facebook.presto.hive.metastore.MetastoreUtil.extractPartitionValues;
import static com.facebook.presto.hive.metastore.MetastoreUtil.getHiveBasicStatistics;
Expand Down Expand Up @@ -123,6 +127,9 @@ public class FileHiveMetastore
private final HdfsContext hdfsContext;
private final FileSystem metadataFileSystem;

private final BiMap<Long, HiveTableName> lockedHiveTables = HashBiMap.create();
private long currentLockId;

private final JsonCodec<DatabaseMetadata> databaseCodec = JsonCodec.jsonCodec(DatabaseMetadata.class);
private final JsonCodec<TableMetadata> tableCodec = JsonCodec.jsonCodec(TableMetadata.class);
private final JsonCodec<PartitionMetadata> partitionCodec = JsonCodec.jsonCodec(PartitionMetadata.class);
Expand Down Expand Up @@ -1005,6 +1012,30 @@ public synchronized void setPartitionLeases(MetastoreContext metastoreContext, S
throw new UnsupportedOperationException("setPartitionLeases is not supported in FileHiveMetastore");
}

@Override
public synchronized long lock(MetastoreContext metastoreContext, String databaseName, String tableName)
{
HiveTableName hiveTableName = hiveTableName(databaseName, tableName);
while (lockedHiveTables.containsValue(hiveTableName)) {
try {
Thread.sleep(10);
}
catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RuntimeException("Waiting for lock interrupted");
}
}
long lockId = ++currentLockId;
lockedHiveTables.put(lockId, hiveTableName);
return lockId;
}

@Override
public synchronized void unlock(MetastoreContext metastoreContext, long lockId)
{
lockedHiveTables.remove(lockId);
}

private synchronized void setTablePrivileges(
MetastoreContext metastoreContext,
PrestoPrincipal grantee,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -378,4 +378,16 @@ public void setPartitionLeases(MetastoreContext metastoreContext, String databas
{
delegate.setPartitionLeases(metastoreContext, databaseName, tableName, partitionNameToLocation, leaseDuration);
}

@Override
public long lock(MetastoreContext metastoreContext, String databaseName, String tableName)
{
return delegate.lock(metastoreContext, databaseName, tableName);
}

@Override
public void unlock(MetastoreContext metastoreContext, long lockId)
{
delegate.unlock(metastoreContext, lockId);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,10 @@ default List<PartitionNameWithVersion> getPartitionNamesWithVersionByFilter(Meta

Set<HivePrivilegeInfo> listTablePrivileges(MetastoreContext metastoreContext, String databaseName, String tableName, PrestoPrincipal principal);

long lock(MetastoreContext metastoreContext, String databaseName, String tableName);

void unlock(MetastoreContext metastoreContext, long lockId);

default void setPartitionLeases(MetastoreContext metastoreContext, String databaseName, String tableName, Map<String, String> partitionNameToLocation, Duration leaseDuration)
{
throw new UnsupportedOperationException();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,17 +13,21 @@
*/
package com.facebook.presto.hive.metastore.thrift;

import org.apache.hadoop.hive.metastore.api.CheckLockRequest;
import org.apache.hadoop.hive.metastore.api.ColumnStatisticsObj;
import org.apache.hadoop.hive.metastore.api.Database;
import org.apache.hadoop.hive.metastore.api.FieldSchema;
import org.apache.hadoop.hive.metastore.api.HiveObjectPrivilege;
import org.apache.hadoop.hive.metastore.api.HiveObjectRef;
import org.apache.hadoop.hive.metastore.api.LockRequest;
import org.apache.hadoop.hive.metastore.api.LockResponse;
import org.apache.hadoop.hive.metastore.api.Partition;
import org.apache.hadoop.hive.metastore.api.PrincipalType;
import org.apache.hadoop.hive.metastore.api.PrivilegeBag;
import org.apache.hadoop.hive.metastore.api.Role;
import org.apache.hadoop.hive.metastore.api.RolePrincipalGrant;
import org.apache.hadoop.hive.metastore.api.Table;
import org.apache.hadoop.hive.metastore.api.UnlockRequest;
import org.apache.thrift.TException;

import java.io.Closeable;
Expand Down Expand Up @@ -146,4 +150,13 @@ List<RolePrincipalGrant> listRoleGrants(String name, PrincipalType principalType

void setUGI(String userName)
throws TException;

LockResponse checkLock(CheckLockRequest request)
throws TException;

LockResponse lock(LockRequest request)
throws TException;

void unlock(UnlockRequest request)
throws TException;
}
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,10 @@
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import org.apache.hadoop.hive.metastore.TableType;
import org.apache.hadoop.hive.metastore.api.AlreadyExistsException;
import org.apache.hadoop.hive.metastore.api.CheckLockRequest;
import org.apache.hadoop.hive.metastore.api.ColumnStatisticsObj;
import org.apache.hadoop.hive.metastore.api.Database;
import org.apache.hadoop.hive.metastore.api.FieldSchema;
Expand All @@ -53,6 +55,11 @@
import org.apache.hadoop.hive.metastore.api.InvalidInputException;
import org.apache.hadoop.hive.metastore.api.InvalidObjectException;
import org.apache.hadoop.hive.metastore.api.InvalidOperationException;
import org.apache.hadoop.hive.metastore.api.LockComponent;
import org.apache.hadoop.hive.metastore.api.LockLevel;
import org.apache.hadoop.hive.metastore.api.LockRequest;
import org.apache.hadoop.hive.metastore.api.LockResponse;
import org.apache.hadoop.hive.metastore.api.LockState;
import org.apache.hadoop.hive.metastore.api.MetaException;
import org.apache.hadoop.hive.metastore.api.NoSuchObjectException;
import org.apache.hadoop.hive.metastore.api.Partition;
Expand All @@ -62,13 +69,15 @@
import org.apache.hadoop.hive.metastore.api.Table;
import org.apache.hadoop.hive.metastore.api.UnknownDBException;
import org.apache.hadoop.hive.metastore.api.UnknownTableException;
import org.apache.hadoop.hive.metastore.api.UnlockRequest;
import org.apache.thrift.TException;
import org.weakref.jmx.Flatten;
import org.weakref.jmx.Managed;

import javax.annotation.concurrent.ThreadSafe;
import javax.inject.Inject;

import java.net.InetAddress;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
Expand All @@ -77,6 +86,7 @@
import java.util.OptionalLong;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Function;
import java.util.stream.Collectors;

Expand Down Expand Up @@ -124,6 +134,9 @@
import static java.util.stream.Collectors.toSet;
import static org.apache.hadoop.hive.common.FileUtils.makePartName;
import static org.apache.hadoop.hive.metastore.api.HiveObjectType.TABLE;
import static org.apache.hadoop.hive.metastore.api.LockState.ACQUIRED;
import static org.apache.hadoop.hive.metastore.api.LockState.WAITING;
import static org.apache.hadoop.hive.metastore.api.LockType.EXCLUSIVE;
import static org.apache.hadoop.hive.metastore.api.hive_metastoreConstants.HIVE_FILTER_FIELD_PARAMS;

@ThreadSafe
Expand Down Expand Up @@ -1309,6 +1322,100 @@ public Set<HivePrivilegeInfo> listTablePrivileges(MetastoreContext metastoreCont
}
}

@Override
public long lock(MetastoreContext metastoreContext, String databaseName, String tableName)
{
try {
final LockComponent lockComponent = new LockComponent(EXCLUSIVE, LockLevel.TABLE, databaseName);
lockComponent.setTablename(tableName);
final LockRequest lockRequest = new LockRequest(Lists.newArrayList(lockComponent),
metastoreContext.getUsername(),
InetAddress.getLocalHost().getHostName());
LockResponse lockResponse = stats.getLock().wrap(() -> getMetastoreClientThenCall(metastoreContext, client -> client.lock(lockRequest))).call();
LockState state = lockResponse.getState();
long lockId = lockResponse.getLockid();
final AtomicBoolean acquired = new AtomicBoolean(state.equals(ACQUIRED));

try {
if (state.equals(WAITING)) {
retry()
.maxAttempts(Integer.MAX_VALUE - 100)
.stopOnIllegalExceptions()
.exceptionMapper(e -> {
if (e instanceof WaitingForLockException) {
// only retry on waiting for lock exception
return e;
}
else {
return new IllegalStateException(e.getMessage(), e);
}
})
.run("lock", stats.getLock().wrap(() ->
getMetastoreClientThenCall(metastoreContext, client -> {
LockResponse response = client.checkLock(new CheckLockRequest(lockId));
LockState newState = response.getState();
if (newState.equals(WAITING)) {
throw new WaitingForLockException("Waiting for lock.");
}
else if (newState.equals(ACQUIRED)) {
acquired.set(true);
}
else {
throw new RuntimeException(String.format("Failed to acquire lock: %s", newState.name()));
}
return null;
})));
}
}
finally {
if (!acquired.get()) {
unlock(metastoreContext, lockId);
}
}

if (!acquired.get()) {
throw new RuntimeException("Failed to acquire lock");
}

return lockId;
}
catch (TException e) {
throw new PrestoException(HIVE_METASTORE_ERROR, e);
}
catch (Exception e) {
throw propagate(e);
}
}

@Override
public void unlock(MetastoreContext metastoreContext, long lockId)
{
try {
retry()
.stopOnIllegalExceptions()
.run("unlock",
stats.getUnlock().wrap(() -> getMetastoreClientThenCall(metastoreContext, client -> {
client.unlock(new UnlockRequest(lockId));
return null;
})));
}
catch (TException e) {
throw new PrestoException(HIVE_METASTORE_ERROR, e);
}
catch (Exception e) {
throw propagate(e);
}
}

private static class WaitingForLockException
extends RuntimeException
{
public WaitingForLockException(String message)
{
super(message);
}
}

private PrivilegeBag buildPrivilegeBag(
String databaseName,
String tableName,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
package com.facebook.presto.hive.metastore.thrift;

import com.google.common.collect.ImmutableList;
import org.apache.hadoop.hive.metastore.api.CheckLockRequest;
import org.apache.hadoop.hive.metastore.api.ColumnStatistics;
import org.apache.hadoop.hive.metastore.api.ColumnStatisticsDesc;
import org.apache.hadoop.hive.metastore.api.ColumnStatisticsObj;
Expand All @@ -26,6 +27,8 @@
import org.apache.hadoop.hive.metastore.api.GrantRevokeType;
import org.apache.hadoop.hive.metastore.api.HiveObjectPrivilege;
import org.apache.hadoop.hive.metastore.api.HiveObjectRef;
import org.apache.hadoop.hive.metastore.api.LockRequest;
import org.apache.hadoop.hive.metastore.api.LockResponse;
import org.apache.hadoop.hive.metastore.api.MetaException;
import org.apache.hadoop.hive.metastore.api.Partition;
import org.apache.hadoop.hive.metastore.api.PartitionsStatsRequest;
Expand All @@ -36,6 +39,7 @@
import org.apache.hadoop.hive.metastore.api.Table;
import org.apache.hadoop.hive.metastore.api.TableStatsRequest;
import org.apache.hadoop.hive.metastore.api.ThriftHiveMetastore;
import org.apache.hadoop.hive.metastore.api.UnlockRequest;
import org.apache.thrift.TException;
import org.apache.thrift.protocol.TBinaryProtocol;
import org.apache.thrift.protocol.TProtocol;
Expand Down Expand Up @@ -400,4 +404,25 @@ public void setUGI(String userName)
{
client.set_ugi(userName, new ArrayList<>());
}

@Override
public LockResponse checkLock(CheckLockRequest request)
throws TException
{
return client.check_lock(request);
}

@Override
public LockResponse lock(LockRequest request)
throws TException
{
return client.lock(request);
}

@Override
public void unlock(UnlockRequest request)
throws TException
{
client.unlock(request);
}
}
Loading

0 comments on commit 59d092c

Please sign in to comment.