Skip to content

Commit

Permalink
Remove duplicate code for get Iceberg table of different catalogType
Browse files Browse the repository at this point in the history
  • Loading branch information
hantangwangd authored and tdcmeehan committed Oct 30, 2023
1 parent efdd8e0 commit 409404a
Show file tree
Hide file tree
Showing 2 changed files with 7 additions and 55 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,6 @@
*/
package com.facebook.presto.iceberg;

import com.facebook.presto.hive.HdfsEnvironment;
import com.facebook.presto.hive.metastore.ExtendedHiveMetastore;
import com.facebook.presto.spi.ConnectorSession;
import com.facebook.presto.spi.ConnectorSplitSource;
import com.facebook.presto.spi.ConnectorTableLayoutHandle;
Expand All @@ -28,34 +26,20 @@

import javax.inject.Inject;

import static com.facebook.presto.iceberg.CatalogType.HADOOP;
import static com.facebook.presto.iceberg.CatalogType.NESSIE;
import static com.facebook.presto.iceberg.ExpressionConverter.toIcebergExpression;
import static com.facebook.presto.iceberg.IcebergSessionProperties.getMinimumAssignedSplitWeight;
import static com.facebook.presto.iceberg.IcebergUtil.getHiveIcebergTable;
import static com.facebook.presto.iceberg.IcebergUtil.getNativeIcebergTable;
import static com.facebook.presto.iceberg.IcebergUtil.getIcebergTable;
import static java.util.Objects.requireNonNull;

public class IcebergSplitManager
implements ConnectorSplitManager
{
private final IcebergTransactionManager transactionManager;
private final HdfsEnvironment hdfsEnvironment;
private final IcebergResourceFactory resourceFactory;
private final CatalogType catalogType;

@Inject
public IcebergSplitManager(
IcebergConfig config,
IcebergResourceFactory resourceFactory,
IcebergTransactionManager transactionManager,
HdfsEnvironment hdfsEnvironment)
public IcebergSplitManager(IcebergTransactionManager transactionManager)
{
this.transactionManager = requireNonNull(transactionManager, "transactionManager is null");
this.hdfsEnvironment = requireNonNull(hdfsEnvironment, "hdfsEnvironment is null");
this.resourceFactory = requireNonNull(resourceFactory, "resourceFactory is null");
requireNonNull(config, "config is null");
this.catalogType = config.getCatalogType();
}

@Override
Expand All @@ -72,15 +56,7 @@ public ConnectorSplitSource getSplits(
return new FixedSplitSource(ImmutableList.of());
}

Table icebergTable;
if (catalogType == HADOOP || catalogType == NESSIE) {
icebergTable = getNativeIcebergTable(resourceFactory, session, table.getSchemaTableName());
}
else {
ExtendedHiveMetastore metastore = ((IcebergHiveMetadata) transactionManager.get(transaction)).getMetastore();
icebergTable = getHiveIcebergTable(metastore, hdfsEnvironment, session, table.getSchemaTableName());
}

Table icebergTable = getIcebergTable(transactionManager.get(transaction), session, table.getSchemaTableName());
TableScan tableScan = icebergTable.newScan()
.filter(toIcebergExpression(table.getPredicate()))
.useSnapshot(table.getSnapshotId().get());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,14 +13,11 @@
*/
package com.facebook.presto.iceberg;

import com.facebook.presto.hive.HdfsEnvironment;
import com.facebook.presto.hive.metastore.ExtendedHiveMetastore;
import com.facebook.presto.spi.ConnectorSession;
import com.facebook.presto.spi.SchemaTableName;
import com.facebook.presto.spi.connector.ConnectorMetadata;
import com.facebook.presto.spi.procedure.Procedure;
import com.google.common.collect.ImmutableList;
import org.apache.iceberg.Table;

import javax.inject.Inject;
import javax.inject.Provider;
Expand All @@ -30,10 +27,7 @@
import static com.facebook.presto.common.block.MethodHandleUtil.methodHandle;
import static com.facebook.presto.common.type.StandardTypes.BIGINT;
import static com.facebook.presto.common.type.StandardTypes.VARCHAR;
import static com.facebook.presto.iceberg.CatalogType.HADOOP;
import static com.facebook.presto.iceberg.CatalogType.NESSIE;
import static com.facebook.presto.iceberg.IcebergUtil.getHiveIcebergTable;
import static com.facebook.presto.iceberg.util.IcebergPrestoModelConverters.toIcebergTableIdentifier;
import static com.facebook.presto.iceberg.IcebergUtil.getIcebergTable;
import static java.util.Objects.requireNonNull;

public class RollbackToSnapshotProcedure
Expand All @@ -48,22 +42,11 @@ public class RollbackToSnapshotProcedure
Long.class);

private final IcebergMetadataFactory metadataFactory;
private final HdfsEnvironment hdfsEnvironment;
private final IcebergResourceFactory resourceFactory;
private final CatalogType catalogType;

@Inject
public RollbackToSnapshotProcedure(
IcebergConfig config,
IcebergMetadataFactory metadataFactory,
HdfsEnvironment hdfsEnvironment,
IcebergResourceFactory resourceFactory)
public RollbackToSnapshotProcedure(IcebergMetadataFactory metadataFactory)
{
this.metadataFactory = requireNonNull(metadataFactory, "metadataFactory is null");
this.hdfsEnvironment = requireNonNull(hdfsEnvironment, "hdfsEnvironment is null");
this.resourceFactory = requireNonNull(resourceFactory, "resourceFactory is null");
requireNonNull(config, "config is null");
this.catalogType = config.getCatalogType();
}

@Override
Expand All @@ -83,14 +66,7 @@ public void rollbackToSnapshot(ConnectorSession clientSession, String schema, St
{
SchemaTableName schemaTableName = new SchemaTableName(schema, table);
ConnectorMetadata metadata = metadataFactory.create();
Table icebergTable;
if (catalogType == HADOOP || catalogType == NESSIE) {
icebergTable = resourceFactory.getCatalog(clientSession).loadTable(toIcebergTableIdentifier(schema, table));
}
else {
ExtendedHiveMetastore metastore = ((IcebergHiveMetadata) metadata).getMetastore();
icebergTable = getHiveIcebergTable(metastore, hdfsEnvironment, clientSession, schemaTableName);
}
icebergTable.manageSnapshots().rollbackTo(snapshotId).commit();
getIcebergTable(metadata, clientSession, schemaTableName)
.manageSnapshots().rollbackTo(snapshotId).commit();
}
}

0 comments on commit 409404a

Please sign in to comment.