Skip to content

Commit

Permalink
Split Iceberg commit for new and existing table
Browse files Browse the repository at this point in the history
  • Loading branch information
findepi committed Oct 15, 2021
1 parent 63d4340 commit 912dde0
Showing 1 changed file with 64 additions and 41 deletions.
Original file line number Diff line number Diff line change
@@ -176,47 +176,38 @@ public void commit(@Nullable TableMetadata base, TableMetadata metadata)
return;
}

String newMetadataLocation = writeNewMetadata(metadata, version + 1);
if (base == null) {
commitNewTable(metadata);
}
else {
commitToExistingTable(base, metadata);
}

// TODO: use metastore locking
shouldRefresh = true;
}

protected void commitNewTable(TableMetadata metadata)
{
String newMetadataLocation = writeNewMetadata(metadata, version + 1);

Table table;
try {
if (base == null) {
Table.Builder builder = Table.builder()
.setDatabaseName(database)
.setTableName(tableName)
.setOwner(owner.orElseThrow(() -> new IllegalStateException("Owner not set")))
.setTableType(TableType.EXTERNAL_TABLE.name())
.setDataColumns(toHiveColumns(metadata.schema().columns()))
.withStorage(storage -> storage.setLocation(metadata.location()))
.withStorage(storage -> storage.setStorageFormat(STORAGE_FORMAT))
.setParameter("EXTERNAL", "TRUE")
.setParameter(TABLE_TYPE_PROP, ICEBERG_TABLE_TYPE_VALUE)
.setParameter(METADATA_LOCATION, newMetadataLocation);
String tableComment = metadata.properties().get(TABLE_COMMENT);
if (tableComment != null) {
builder.setParameter(TABLE_COMMENT, tableComment);
}
table = builder.build();
}
else {
Table currentTable = getTable();

checkState(currentMetadataLocation != null, "No current metadata location for existing table");
String metadataLocation = currentTable.getParameters().get(METADATA_LOCATION);
if (!currentMetadataLocation.equals(metadataLocation)) {
throw new CommitFailedException("Metadata location [%s] is not same as table metadata location [%s] for %s",
currentMetadataLocation, metadataLocation, getSchemaTableName());
}

table = Table.builder(currentTable)
.setDataColumns(toHiveColumns(metadata.schema().columns()))
.withStorage(storage -> storage.setLocation(metadata.location()))
.setParameter(METADATA_LOCATION, newMetadataLocation)
.setParameter(PREVIOUS_METADATA_LOCATION, currentMetadataLocation)
.build();
Table.Builder builder = Table.builder()
.setDatabaseName(database)
.setTableName(tableName)
.setOwner(owner.orElseThrow(() -> new IllegalStateException("Owner not set")))
.setTableType(TableType.EXTERNAL_TABLE.name())
.setDataColumns(toHiveColumns(metadata.schema().columns()))
.withStorage(storage -> storage.setLocation(metadata.location()))
.withStorage(storage -> storage.setStorageFormat(STORAGE_FORMAT))
.setParameter("EXTERNAL", "TRUE")
.setParameter(TABLE_TYPE_PROP, ICEBERG_TABLE_TYPE_VALUE)
.setParameter(METADATA_LOCATION, newMetadataLocation);
String tableComment = metadata.properties().get(TABLE_COMMENT);
if (tableComment != null) {
builder.setParameter(TABLE_COMMENT, tableComment);
}
table = builder.build();
}
catch (RuntimeException e) {
try {
@@ -230,14 +221,46 @@ public void commit(@Nullable TableMetadata base, TableMetadata metadata)

PrincipalPrivileges privileges = buildInitialPrivilegeSet(table.getOwner());
HiveIdentity identity = new HiveIdentity(session);
if (base == null) {
metastore.createTable(identity, table, privileges);
metastore.createTable(identity, table, privileges);
}

protected void commitToExistingTable(TableMetadata base, TableMetadata metadata)
{
String newMetadataLocation = writeNewMetadata(metadata, version + 1);

// TODO: use metastore locking

Table table;
try {
Table currentTable = getTable();

checkState(currentMetadataLocation != null, "No current metadata location for existing table");
String metadataLocation = currentTable.getParameters().get(METADATA_LOCATION);
if (!currentMetadataLocation.equals(metadataLocation)) {
throw new CommitFailedException("Metadata location [%s] is not same as table metadata location [%s] for %s",
currentMetadataLocation, metadataLocation, getSchemaTableName());
}

table = Table.builder(currentTable)
.setDataColumns(toHiveColumns(metadata.schema().columns()))
.withStorage(storage -> storage.setLocation(metadata.location()))
.setParameter(METADATA_LOCATION, newMetadataLocation)
.setParameter(PREVIOUS_METADATA_LOCATION, currentMetadataLocation)
.build();
}
else {
metastore.replaceTable(identity, database, tableName, table, privileges);
catch (RuntimeException e) {
try {
io().deleteFile(newMetadataLocation);
}
catch (RuntimeException ex) {
e.addSuppressed(ex);
}
throw e;
}

shouldRefresh = true;
PrincipalPrivileges privileges = buildInitialPrivilegeSet(table.getOwner());
HiveIdentity identity = new HiveIdentity(session);
metastore.replaceTable(identity, database, tableName, table, privileges);
}

@Override

0 comments on commit 912dde0

Please sign in to comment.