Skip to content

Commit

Permalink
Core: Use 'delete' / 'append' if OverwriteFiles only deletes/appends …
Browse files Browse the repository at this point in the history
…data files (#10150)
  • Loading branch information
nastra authored Apr 25, 2024
1 parent 0f11f54 commit f460964
Show file tree
Hide file tree
Showing 11 changed files with 109 additions and 11 deletions.
8 changes: 8 additions & 0 deletions core/src/main/java/org/apache/iceberg/BaseOverwriteFiles.java
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,14 @@ protected OverwriteFiles self() {

@Override
protected String operation() {
if (deletesDataFiles() && !addsDataFiles()) {
return DataOperations.DELETE;
}

if (addsDataFiles() && !deletesDataFiles()) {
return DataOperations.APPEND;
}

return DataOperations.OVERWRITE;
}

Expand Down
46 changes: 44 additions & 2 deletions core/src/test/java/org/apache/iceberg/TestOverwrite.java
Original file line number Diff line number Diff line change
Expand Up @@ -101,8 +101,8 @@ public class TestOverwrite extends TestBase {
ImmutableMap.of(1, 5L, 2, 3L), // value count
ImmutableMap.of(1, 0L, 2, 2L), // null count
null,
ImmutableMap.of(1, longToBuffer(5L)), // lower bounds
ImmutableMap.of(1, longToBuffer(9L)) // upper bounds
ImmutableMap.of(1, longToBuffer(10L)), // lower bounds
ImmutableMap.of(1, longToBuffer(14L)) // upper bounds
))
.build();

Expand Down Expand Up @@ -135,6 +135,43 @@ public void createTestTable() throws IOException {
commit(table, table.newAppend().appendFile(FILE_0_TO_4).appendFile(FILE_5_TO_9), branch);
}

@TestTemplate
public void deleteDataFilesProducesDeleteOperation() {
commit(table, table.newOverwrite().deleteFile(FILE_A).deleteFile(FILE_B), branch);
assertThat(latestSnapshot(table, branch).operation()).isEqualTo(DataOperations.DELETE);
}

@TestTemplate
public void addAndDeleteDataFilesProducesOverwriteOperation() {
commit(table, table.newOverwrite().addFile(FILE_10_TO_14).deleteFile(FILE_B), branch);
assertThat(latestSnapshot(table, branch).operation()).isEqualTo(DataOperations.OVERWRITE);
}

@TestTemplate
public void overwriteByRowFilterProducesDeleteOperation() {
commit(table, table.newOverwrite().overwriteByRowFilter(equal("date", "2018-06-08")), branch);
assertThat(latestSnapshot(table, branch).operation()).isEqualTo(DataOperations.DELETE);
}

@TestTemplate
public void addAndOverwriteByRowFilterProducesOverwriteOperation() {
commit(
table,
table
.newOverwrite()
.addFile(FILE_10_TO_14)
.overwriteByRowFilter(equal("date", "2018-06-08")),
branch);

assertThat(latestSnapshot(table, branch).operation()).isEqualTo(DataOperations.OVERWRITE);
}

@TestTemplate
public void addFilesProducesAppendOperation() {
commit(table, table.newOverwrite().addFile(FILE_10_TO_14).addFile(FILE_5_TO_9), branch);
assertThat(latestSnapshot(table, branch).operation()).isEqualTo(DataOperations.APPEND);
}

@TestTemplate
public void testOverwriteWithoutAppend() {
TableMetadata base = TestTables.readMetadata(TABLE_NAME);
Expand All @@ -145,6 +182,7 @@ public void testOverwriteWithoutAppend() {
long overwriteId = latestSnapshot(table, branch).snapshotId();

assertThat(overwriteId).isNotEqualTo(baseId);
assertThat(latestSnapshot(table, branch).operation()).isEqualTo(DataOperations.DELETE);
assertThat(latestSnapshot(table, branch).allManifests(table.io())).hasSize(1);

validateManifestEntries(
Expand Down Expand Up @@ -188,6 +226,7 @@ public void testOverwriteWithAppendOutsideOfDelete() {

long overwriteId = latestSnapshot(table, branch).snapshotId();

assertThat(latestSnapshot(table, branch).operation()).isEqualTo(DataOperations.OVERWRITE);
assertThat(overwriteId).isNotEqualTo(baseId);
assertThat(latestSnapshot(table, branch).allManifests(table.io())).hasSize(2);

Expand Down Expand Up @@ -224,6 +263,7 @@ public void testOverwriteWithMergedAppendOutsideOfDelete() {

long overwriteId = latestSnapshot(table, branch).snapshotId();

assertThat(latestSnapshot(table, branch).operation()).isEqualTo(DataOperations.OVERWRITE);
assertThat(overwriteId).isNotEqualTo(baseId);
assertThat(latestSnapshot(table, branch).allManifests(table.io())).hasSize(1);

Expand Down Expand Up @@ -255,6 +295,7 @@ public void testValidatedOverwriteWithAppendOutsideOfDelete() {
.hasMessageStartingWith("Cannot append file with rows that do not match filter");

assertThat(latestSnapshot(table, branch).snapshotId()).isEqualTo(baseId);
assertThat(latestSnapshot(table, branch).operation()).isEqualTo(DataOperations.APPEND);
}

@TestTemplate
Expand All @@ -275,6 +316,7 @@ public void testValidatedOverwriteWithAppendOutsideOfDeleteMetrics() {
.hasMessageStartingWith("Cannot append file with rows that do not match filter");

assertThat(latestSnapshot(base, branch).snapshotId()).isEqualTo(baseId);
assertThat(latestSnapshot(table, branch).operation()).isEqualTo(DataOperations.APPEND);
}

@TestTemplate
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -247,8 +247,9 @@ protected void validateCopyOnWrite(
String changedPartitionCount,
String deletedDataFiles,
String addedDataFiles) {
String operation = null == addedDataFiles && null != deletedDataFiles ? DELETE : OVERWRITE;
validateSnapshot(
snapshot, OVERWRITE, changedPartitionCount, deletedDataFiles, null, addedDataFiles);
snapshot, operation, changedPartitionCount, deletedDataFiles, null, addedDataFiles);
}

protected void validateMergeOnRead(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
*/
package org.apache.iceberg.spark.extensions;

import static org.apache.iceberg.DataOperations.DELETE;
import static org.apache.iceberg.RowLevelOperationMode.COPY_ON_WRITE;
import static org.apache.iceberg.SnapshotSummary.ADD_POS_DELETE_FILES_PROP;
import static org.apache.iceberg.TableProperties.DELETE_ISOLATION_LEVEL;
Expand Down Expand Up @@ -424,8 +425,10 @@ public void testDeleteWithArbitraryPartitionPredicates() {
Table table = validationCatalog.loadTable(tableIdent);
Assert.assertEquals("Should have 4 snapshots", 4, Iterables.size(table.snapshots()));

// should be an overwrite since cannot be executed using a metadata operation
// should be a "delete" instead of an "overwrite" as only data files have been removed (COW) /
// delete files have been added (MOR)
Snapshot currentSnapshot = SnapshotUtil.latestSnapshot(table, branch);
assertThat(currentSnapshot.operation()).isEqualTo(DELETE);
if (mode(table) == COPY_ON_WRITE) {
validateCopyOnWrite(currentSnapshot, "1", "1", null);
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -571,8 +571,20 @@ public void testReadStreamWithSnapshotTypeDeleteAndSkipOverwriteOption() throws
List<List<SimpleRecord>> dataAcrossSnapshots = TEST_DATA_MULTIPLE_SNAPSHOTS;
appendDataAsMultipleSnapshots(dataAcrossSnapshots);

DataFile dataFile =
DataFiles.builder(table.spec())
.withPath(temp.newFile().toString())
.withFileSizeInBytes(10)
.withRecordCount(1)
.withFormat(FileFormat.PARQUET)
.build();

// this should create a snapshot with type overwrite.
table.newOverwrite().overwriteByRowFilter(Expressions.greaterThan("id", 4)).commit();
table
.newOverwrite()
.addFile(dataFile)
.overwriteByRowFilter(Expressions.greaterThan("id", 4))
.commit();

// check pre-condition - that the above delete operation on table resulted in Snapshot of Type
// OVERWRITE.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -283,8 +283,9 @@ protected void validateCopyOnWrite(
String changedPartitionCount,
String deletedDataFiles,
String addedDataFiles) {
String operation = null == addedDataFiles && null != deletedDataFiles ? DELETE : OVERWRITE;
validateSnapshot(
snapshot, OVERWRITE, changedPartitionCount, deletedDataFiles, null, addedDataFiles);
snapshot, operation, changedPartitionCount, deletedDataFiles, null, addedDataFiles);
}

protected void validateMergeOnRead(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
*/
package org.apache.iceberg.spark.extensions;

import static org.apache.iceberg.DataOperations.DELETE;
import static org.apache.iceberg.RowLevelOperationMode.COPY_ON_WRITE;
import static org.apache.iceberg.SnapshotSummary.ADD_POS_DELETE_FILES_PROP;
import static org.apache.iceberg.TableProperties.DELETE_DISTRIBUTION_MODE;
Expand Down Expand Up @@ -592,8 +593,10 @@ public void testDeleteWithArbitraryPartitionPredicates() {
Table table = validationCatalog.loadTable(tableIdent);
Assert.assertEquals("Should have 4 snapshots", 4, Iterables.size(table.snapshots()));

// should be an overwrite since cannot be executed using a metadata operation
// should be a "delete" instead of an "overwrite" as only data files have been removed (COW) /
// delete files have been added (MOR)
Snapshot currentSnapshot = SnapshotUtil.latestSnapshot(table, branch);
assertThat(currentSnapshot.operation()).isEqualTo(DELETE);
if (mode(table) == COPY_ON_WRITE) {
validateCopyOnWrite(currentSnapshot, "1", "1", null);
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -576,8 +576,20 @@ public void testReadStreamWithSnapshotTypeDeleteAndSkipOverwriteOption() throws
List<List<SimpleRecord>> dataAcrossSnapshots = TEST_DATA_MULTIPLE_SNAPSHOTS;
appendDataAsMultipleSnapshots(dataAcrossSnapshots);

DataFile dataFile =
DataFiles.builder(table.spec())
.withPath(temp.newFile().toString())
.withFileSizeInBytes(10)
.withRecordCount(1)
.withFormat(FileFormat.PARQUET)
.build();

// this should create a snapshot with type overwrite.
table.newOverwrite().overwriteByRowFilter(Expressions.greaterThan("id", 4)).commit();
table
.newOverwrite()
.addFile(dataFile)
.overwriteByRowFilter(Expressions.greaterThan("id", 4))
.commit();

// check pre-condition - that the above delete operation on table resulted in Snapshot of Type
// OVERWRITE.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -276,8 +276,9 @@ protected void validateCopyOnWrite(
String changedPartitionCount,
String deletedDataFiles,
String addedDataFiles) {
String operation = null == addedDataFiles && null != deletedDataFiles ? DELETE : OVERWRITE;
validateSnapshot(
snapshot, OVERWRITE, changedPartitionCount, deletedDataFiles, null, addedDataFiles);
snapshot, operation, changedPartitionCount, deletedDataFiles, null, addedDataFiles);
}

protected void validateMergeOnRead(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
*/
package org.apache.iceberg.spark.extensions;

import static org.apache.iceberg.DataOperations.DELETE;
import static org.apache.iceberg.RowLevelOperationMode.COPY_ON_WRITE;
import static org.apache.iceberg.SnapshotSummary.ADD_POS_DELETE_FILES_PROP;
import static org.apache.iceberg.TableProperties.DELETE_DISTRIBUTION_MODE;
Expand Down Expand Up @@ -590,8 +591,10 @@ public void testDeleteWithArbitraryPartitionPredicates() {
Table table = validationCatalog.loadTable(tableIdent);
assertThat(table.snapshots()).as("Should have 4 snapshots").hasSize(4);

// should be an overwrite since cannot be executed using a metadata operation
// should be a "delete" instead of an "overwrite" as only data files have been removed (COW) /
// delete files have been added (MOR)
Snapshot currentSnapshot = SnapshotUtil.latestSnapshot(table, branch);
assertThat(currentSnapshot.operation()).isEqualTo(DELETE);
if (mode(table) == COPY_ON_WRITE) {
validateCopyOnWrite(currentSnapshot, "1", "1", null);
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -656,8 +656,20 @@ public void testReadStreamWithSnapshotTypeDeleteAndSkipOverwriteOption() throws
List<List<SimpleRecord>> dataAcrossSnapshots = TEST_DATA_MULTIPLE_SNAPSHOTS;
appendDataAsMultipleSnapshots(dataAcrossSnapshots);

DataFile dataFile =
DataFiles.builder(table.spec())
.withPath(File.createTempFile("junit", null, temp.toFile()).getPath())
.withFileSizeInBytes(10)
.withRecordCount(1)
.withFormat(FileFormat.PARQUET)
.build();

// this should create a snapshot with type overwrite.
table.newOverwrite().overwriteByRowFilter(Expressions.greaterThan("id", 4)).commit();
table
.newOverwrite()
.addFile(dataFile)
.overwriteByRowFilter(Expressions.greaterThan("id", 4))
.commit();

// check pre-condition - that the above delete operation on table resulted in Snapshot of Type
// OVERWRITE.
Expand Down

0 comments on commit f460964

Please sign in to comment.