Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support iceberg connector concurrent insertion #16983

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -941,6 +941,19 @@ public void setPartitionLeases(MetastoreContext metastoreContext, String databas
delegate.setPartitionLeases(metastoreContext, databaseName, tableName, partitionNameToLocation, leaseDuration);
}

@Override
public long lock(MetastoreContext metastoreContext, String databaseName, String tableName)
{
tableCache.invalidate(getCachingKey(metastoreContext, hiveTableName(databaseName, tableName)));
return delegate.lock(metastoreContext, databaseName, tableName);
}

@Override
public void unlock(MetastoreContext metastoreContext, long lockId)
{
delegate.unlock(metastoreContext, lockId);
}

public Set<HivePrivilegeInfo> loadTablePrivileges(KeyAndContext<UserTableKey> loadTablePrivilegesKey)
{
return delegate.listTablePrivileges(loadTablePrivilegesKey.getContext(), loadTablePrivilegesKey.getKey().getDatabase(), loadTablePrivilegesKey.getKey().getTable(), loadTablePrivilegesKey.getKey().getPrincipal());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
*/
package com.facebook.presto.hive.metastore;

import com.facebook.presto.common.NotSupportedException;
import com.facebook.presto.common.predicate.Domain;
import com.facebook.presto.common.type.Type;
import com.facebook.presto.hive.HiveType;
Expand Down Expand Up @@ -117,4 +118,14 @@ List<PartitionNameWithVersion> getPartitionNamesWithVersionByFilter(
Set<HivePrivilegeInfo> listTablePrivileges(MetastoreContext metastoreContext, String databaseName, String tableName, PrestoPrincipal principal);

void setPartitionLeases(MetastoreContext metastoreContext, String databaseName, String tableName, Map<String, String> partitionNameToLocation, Duration leaseDuration);

default long lock(MetastoreContext metastoreContext, String databaseName, String tableName)
{
throw new NotSupportedException("Lock is not supported by default");
}

default void unlock(MetastoreContext metastoreContext, long lockId)
{
throw new NotSupportedException("Unlock is not supported by default");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import com.facebook.presto.hive.metastore.ExtendedHiveMetastore;
import com.facebook.presto.hive.metastore.HiveColumnStatistics;
import com.facebook.presto.hive.metastore.HivePrivilegeInfo;
import com.facebook.presto.hive.metastore.HiveTableName;
import com.facebook.presto.hive.metastore.MetastoreContext;
import com.facebook.presto.hive.metastore.MetastoreUtil;
import com.facebook.presto.hive.metastore.Partition;
Expand All @@ -45,6 +46,8 @@
import com.facebook.presto.spi.security.PrestoPrincipal;
import com.facebook.presto.spi.security.RoleGrant;
import com.facebook.presto.spi.statistics.ColumnStatisticType;
import com.google.common.collect.BiMap;
import com.google.common.collect.HashBiMap;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
Expand Down Expand Up @@ -81,6 +84,7 @@
import static com.facebook.presto.hive.HiveErrorCode.HIVE_METASTORE_ERROR;
import static com.facebook.presto.hive.HiveErrorCode.HIVE_PARTITION_DROPPED_DURING_QUERY;
import static com.facebook.presto.hive.metastore.HivePrivilegeInfo.HivePrivilege.OWNERSHIP;
import static com.facebook.presto.hive.metastore.HiveTableName.hiveTableName;
import static com.facebook.presto.hive.metastore.MetastoreUtil.convertPredicateToParts;
import static com.facebook.presto.hive.metastore.MetastoreUtil.extractPartitionValues;
import static com.facebook.presto.hive.metastore.MetastoreUtil.getHiveBasicStatistics;
Expand Down Expand Up @@ -123,6 +127,9 @@ public class FileHiveMetastore
private final HdfsContext hdfsContext;
private final FileSystem metadataFileSystem;

private final BiMap<Long, HiveTableName> lockedHiveTables = HashBiMap.create();
private long currentLockId;

private final JsonCodec<DatabaseMetadata> databaseCodec = JsonCodec.jsonCodec(DatabaseMetadata.class);
private final JsonCodec<TableMetadata> tableCodec = JsonCodec.jsonCodec(TableMetadata.class);
private final JsonCodec<PartitionMetadata> partitionCodec = JsonCodec.jsonCodec(PartitionMetadata.class);
Expand Down Expand Up @@ -1005,6 +1012,30 @@ public synchronized void setPartitionLeases(MetastoreContext metastoreContext, S
throw new UnsupportedOperationException("setPartitionLeases is not supported in FileHiveMetastore");
}

@Override
public synchronized long lock(MetastoreContext metastoreContext, String databaseName, String tableName)
{
HiveTableName hiveTableName = hiveTableName(databaseName, tableName);
while (lockedHiveTables.containsValue(hiveTableName)) {
try {
Thread.sleep(10);
liupan664021 marked this conversation as resolved.
Show resolved Hide resolved
}
catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RuntimeException("Waiting for lock interrupted");
liupan664021 marked this conversation as resolved.
Show resolved Hide resolved
}
}
long lockId = ++currentLockId;
lockedHiveTables.put(lockId, hiveTableName);
liupan664021 marked this conversation as resolved.
Show resolved Hide resolved
return lockId;
}

@Override
public synchronized void unlock(MetastoreContext metastoreContext, long lockId)
{
lockedHiveTables.remove(lockId);
}

private synchronized void setTablePrivileges(
MetastoreContext metastoreContext,
PrestoPrincipal grantee,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -378,4 +378,16 @@ public void setPartitionLeases(MetastoreContext metastoreContext, String databas
{
delegate.setPartitionLeases(metastoreContext, databaseName, tableName, partitionNameToLocation, leaseDuration);
}

@Override
public long lock(MetastoreContext metastoreContext, String databaseName, String tableName)
{
return delegate.lock(metastoreContext, databaseName, tableName);
}

@Override
public void unlock(MetastoreContext metastoreContext, long lockId)
{
delegate.unlock(metastoreContext, lockId);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,10 @@ default List<PartitionNameWithVersion> getPartitionNamesWithVersionByFilter(Meta

Set<HivePrivilegeInfo> listTablePrivileges(MetastoreContext metastoreContext, String databaseName, String tableName, PrestoPrincipal principal);

long lock(MetastoreContext metastoreContext, String databaseName, String tableName);

void unlock(MetastoreContext metastoreContext, long lockId);

default void setPartitionLeases(MetastoreContext metastoreContext, String databaseName, String tableName, Map<String, String> partitionNameToLocation, Duration leaseDuration)
{
throw new UnsupportedOperationException();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,17 +13,21 @@
*/
package com.facebook.presto.hive.metastore.thrift;

import org.apache.hadoop.hive.metastore.api.CheckLockRequest;
import org.apache.hadoop.hive.metastore.api.ColumnStatisticsObj;
import org.apache.hadoop.hive.metastore.api.Database;
import org.apache.hadoop.hive.metastore.api.FieldSchema;
import org.apache.hadoop.hive.metastore.api.HiveObjectPrivilege;
import org.apache.hadoop.hive.metastore.api.HiveObjectRef;
import org.apache.hadoop.hive.metastore.api.LockRequest;
import org.apache.hadoop.hive.metastore.api.LockResponse;
import org.apache.hadoop.hive.metastore.api.Partition;
import org.apache.hadoop.hive.metastore.api.PrincipalType;
import org.apache.hadoop.hive.metastore.api.PrivilegeBag;
import org.apache.hadoop.hive.metastore.api.Role;
import org.apache.hadoop.hive.metastore.api.RolePrincipalGrant;
import org.apache.hadoop.hive.metastore.api.Table;
import org.apache.hadoop.hive.metastore.api.UnlockRequest;
import org.apache.thrift.TException;

import java.io.Closeable;
Expand Down Expand Up @@ -146,4 +150,13 @@ List<RolePrincipalGrant> listRoleGrants(String name, PrincipalType principalType

void setUGI(String userName)
throws TException;

LockResponse checkLock(CheckLockRequest request)
throws TException;

LockResponse lock(LockRequest request)
throws TException;

void unlock(UnlockRequest request)
throws TException;
}
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,10 @@
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import org.apache.hadoop.hive.metastore.TableType;
import org.apache.hadoop.hive.metastore.api.AlreadyExistsException;
import org.apache.hadoop.hive.metastore.api.CheckLockRequest;
import org.apache.hadoop.hive.metastore.api.ColumnStatisticsObj;
import org.apache.hadoop.hive.metastore.api.Database;
import org.apache.hadoop.hive.metastore.api.FieldSchema;
Expand All @@ -51,6 +53,11 @@
import org.apache.hadoop.hive.metastore.api.InvalidInputException;
import org.apache.hadoop.hive.metastore.api.InvalidObjectException;
import org.apache.hadoop.hive.metastore.api.InvalidOperationException;
import org.apache.hadoop.hive.metastore.api.LockComponent;
import org.apache.hadoop.hive.metastore.api.LockLevel;
import org.apache.hadoop.hive.metastore.api.LockRequest;
import org.apache.hadoop.hive.metastore.api.LockResponse;
import org.apache.hadoop.hive.metastore.api.LockState;
import org.apache.hadoop.hive.metastore.api.MetaException;
import org.apache.hadoop.hive.metastore.api.NoSuchObjectException;
import org.apache.hadoop.hive.metastore.api.Partition;
Expand All @@ -60,13 +67,15 @@
import org.apache.hadoop.hive.metastore.api.Table;
import org.apache.hadoop.hive.metastore.api.UnknownDBException;
import org.apache.hadoop.hive.metastore.api.UnknownTableException;
import org.apache.hadoop.hive.metastore.api.UnlockRequest;
import org.apache.thrift.TException;
import org.weakref.jmx.Flatten;
import org.weakref.jmx.Managed;

import javax.annotation.concurrent.ThreadSafe;
import javax.inject.Inject;

import java.net.InetAddress;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
Expand All @@ -75,6 +84,7 @@
import java.util.OptionalLong;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Function;
import java.util.stream.Collectors;

Expand Down Expand Up @@ -108,6 +118,9 @@
import static java.util.stream.Collectors.toSet;
import static org.apache.hadoop.hive.common.FileUtils.makePartName;
import static org.apache.hadoop.hive.metastore.api.HiveObjectType.TABLE;
import static org.apache.hadoop.hive.metastore.api.LockState.ACQUIRED;
import static org.apache.hadoop.hive.metastore.api.LockState.WAITING;
import static org.apache.hadoop.hive.metastore.api.LockType.EXCLUSIVE;
import static org.apache.hadoop.hive.metastore.api.hive_metastoreConstants.HIVE_FILTER_FIELD_PARAMS;
import static org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector.Category.PRIMITIVE;

Expand Down Expand Up @@ -1279,6 +1292,100 @@ public Set<HivePrivilegeInfo> listTablePrivileges(MetastoreContext metastoreCont
}
}

@Override
public long lock(MetastoreContext metastoreContext, String databaseName, String tableName)
{
try {
final LockComponent lockComponent = new LockComponent(EXCLUSIVE, LockLevel.TABLE, databaseName);
lockComponent.setTablename(tableName);
final LockRequest lockRequest = new LockRequest(Lists.newArrayList(lockComponent),
metastoreContext.getUsername(),
InetAddress.getLocalHost().getHostName());
LockResponse lockResponse = stats.getLock().wrap(() -> getMetastoreClientThenCall(metastoreContext, client -> client.lock(lockRequest))).call();
LockState state = lockResponse.getState();
long lockId = lockResponse.getLockid();
final AtomicBoolean acquired = new AtomicBoolean(state.equals(ACQUIRED));

try {
if (state.equals(WAITING)) {
retry()
.maxAttempts(Integer.MAX_VALUE - 100)
beinan marked this conversation as resolved.
Show resolved Hide resolved
.stopOnIllegalExceptions()
.exceptionMapper(e -> {
if (e instanceof WaitingForLockException) {
// only retry on waiting for lock exception
return e;
}
else {
return new IllegalStateException(e.getMessage(), e);
}
})
.run("lock", stats.getLock().wrap(() ->
getMetastoreClientThenCall(metastoreContext, client -> {
LockResponse response = client.checkLock(new CheckLockRequest(lockId));
LockState newState = response.getState();
if (newState.equals(WAITING)) {
throw new WaitingForLockException("Waiting for lock.");
}
else if (newState.equals(ACQUIRED)) {
acquired.set(true);
}
else {
throw new RuntimeException(String.format("Failed to acquire lock: %s", newState.name()));
beinan marked this conversation as resolved.
Show resolved Hide resolved
}
return null;
})));
}
}
finally {
if (!acquired.get()) {
unlock(metastoreContext, lockId);
}
}

if (!acquired.get()) {
throw new RuntimeException("Failed to acquire lock");
liupan664021 marked this conversation as resolved.
Show resolved Hide resolved
}

return lockId;
}
catch (TException e) {
throw new PrestoException(HIVE_METASTORE_ERROR, e);
}
catch (Exception e) {
throw propagate(e);
}
}

@Override
public void unlock(MetastoreContext metastoreContext, long lockId)
{
try {
retry()
.stopOnIllegalExceptions()
.run("unlock",
stats.getUnlock().wrap(() -> getMetastoreClientThenCall(metastoreContext, client -> {
client.unlock(new UnlockRequest(lockId));
liupan664021 marked this conversation as resolved.
Show resolved Hide resolved
return null;
})));
}
catch (TException e) {
throw new PrestoException(HIVE_METASTORE_ERROR, e);
}
catch (Exception e) {
throw propagate(e);
}
}

private static class WaitingForLockException
extends RuntimeException
{
public WaitingForLockException(String message)
{
super(message);
}
}

private PrivilegeBag buildPrivilegeBag(
String databaseName,
String tableName,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
package com.facebook.presto.hive.metastore.thrift;

import com.google.common.collect.ImmutableList;
import org.apache.hadoop.hive.metastore.api.CheckLockRequest;
import org.apache.hadoop.hive.metastore.api.ColumnStatistics;
import org.apache.hadoop.hive.metastore.api.ColumnStatisticsDesc;
import org.apache.hadoop.hive.metastore.api.ColumnStatisticsObj;
Expand All @@ -26,6 +27,8 @@
import org.apache.hadoop.hive.metastore.api.GrantRevokeType;
import org.apache.hadoop.hive.metastore.api.HiveObjectPrivilege;
import org.apache.hadoop.hive.metastore.api.HiveObjectRef;
import org.apache.hadoop.hive.metastore.api.LockRequest;
import org.apache.hadoop.hive.metastore.api.LockResponse;
import org.apache.hadoop.hive.metastore.api.MetaException;
import org.apache.hadoop.hive.metastore.api.Partition;
import org.apache.hadoop.hive.metastore.api.PartitionsStatsRequest;
Expand All @@ -36,6 +39,7 @@
import org.apache.hadoop.hive.metastore.api.Table;
import org.apache.hadoop.hive.metastore.api.TableStatsRequest;
import org.apache.hadoop.hive.metastore.api.ThriftHiveMetastore;
import org.apache.hadoop.hive.metastore.api.UnlockRequest;
import org.apache.thrift.TException;
import org.apache.thrift.protocol.TBinaryProtocol;
import org.apache.thrift.protocol.TProtocol;
Expand Down Expand Up @@ -400,4 +404,25 @@ public void setUGI(String userName)
{
client.set_ugi(userName, new ArrayList<>());
}

@Override
public LockResponse checkLock(CheckLockRequest request)
throws TException
{
return client.check_lock(request);
}

@Override
public LockResponse lock(LockRequest request)
throws TException
{
return client.lock(request);
}

@Override
public void unlock(UnlockRequest request)
throws TException
{
client.unlock(request);
}
}
Loading