Skip to content

Commit

Permalink
Core: Exclude deleted content file in RewriteTablePathUtil copy plan (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
dramaticlly authored Feb 7, 2025
1 parent bc10617 commit e91655b
Show file tree
Hide file tree
Showing 2 changed files with 122 additions and 25 deletions.
21 changes: 15 additions & 6 deletions core/src/main/java/org/apache/iceberg/RewriteTablePathUtil.java
Original file line number Diff line number Diff line change
Expand Up @@ -360,7 +360,10 @@ private static RewriteResult<DataFile> writeDataFileEntry(
DataFile newDataFile =
DataFiles.builder(spec).copy(entry.file()).withPath(targetDataFilePath).build();
appendEntryWithFile(entry, writer, newDataFile);
result.copyPlan().add(Pair.of(sourceDataFilePath, newDataFile.location()));
// keep deleted data file entries but exclude them from copyPlan
if (entry.isLive()) {
result.copyPlan().add(Pair.of(sourceDataFilePath, newDataFile.location()));
}
return result;
}

Expand All @@ -387,16 +390,22 @@ private static RewriteResult<DeleteFile> writeDeleteFileEntry(
.withMetrics(metricsWithTargetPath)
.build();
appendEntryWithFile(entry, writer, movedFile);
result
.copyPlan()
.add(Pair.of(stagingPath(file.location(), stagingLocation), movedFile.location()));
// keep deleted position delete entries but exclude them from copyPlan
if (entry.isLive()) {
result
.copyPlan()
.add(Pair.of(stagingPath(file.location(), stagingLocation), movedFile.location()));
}
result.toRewrite().add(file);
return result;
case EQUALITY_DELETES:
DeleteFile eqDeleteFile = newEqualityDeleteEntry(file, spec, sourcePrefix, targetPrefix);
appendEntryWithFile(entry, writer, eqDeleteFile);
// No need to rewrite equality delete files as they do not contain absolute file paths.
result.copyPlan().add(Pair.of(file.location(), eqDeleteFile.location()));
// keep deleted equality delete entries but exclude them from copyPlan
if (entry.isLive()) {
// No need to rewrite equality delete files as they do not contain absolute file paths.
result.copyPlan().add(Pair.of(file.location(), eqDeleteFile.location()));
}
return result;

default:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import java.nio.file.Path;
import java.util.List;
import java.util.Map;
import java.util.function.Predicate;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import java.util.stream.StreamSupport;
Expand All @@ -40,13 +41,15 @@
import org.apache.iceberg.HasTableOperations;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.Schema;
import org.apache.iceberg.Snapshot;
import org.apache.iceberg.StaticTableOperations;
import org.apache.iceberg.Table;
import org.apache.iceberg.TableMetadata;
import org.apache.iceberg.TableMetadataParser;
import org.apache.iceberg.TableProperties;
import org.apache.iceberg.TestHelpers;
import org.apache.iceberg.actions.ActionsProvider;
import org.apache.iceberg.actions.ExpireSnapshots;
import org.apache.iceberg.actions.RewriteTablePath;
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.data.FileHelpers;
Expand All @@ -57,13 +60,15 @@
import org.apache.iceberg.io.FileIO;
import org.apache.iceberg.io.OutputFile;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.apache.iceberg.spark.SparkCatalog;
import org.apache.iceberg.spark.TestBase;
import org.apache.iceberg.spark.source.ThreeColumnRecord;
import org.apache.iceberg.types.Types;
import org.apache.iceberg.util.Pair;
import org.apache.iceberg.util.SnapshotUtil;
import org.apache.spark.SparkEnv;
import org.apache.spark.broadcast.Broadcast;
import org.apache.spark.sql.Dataset;
Expand Down Expand Up @@ -105,14 +110,14 @@ protected ActionsProvider actions() {
private final String backupNs = "backupns";

@BeforeEach
public void setupTableLocation() throws Exception {
public void setupTableLocation() {
this.tableLocation = tableDir.toFile().toURI().toString();
this.table = createATableWith2Snapshots(tableLocation);
createNameSpaces();
}

@AfterEach
public void cleanupTableSetup() throws Exception {
public void cleanupTableSetup() {
dropNameSpaces();
}

Expand All @@ -126,6 +131,11 @@ private Table createTableWithSnapshots(String location, int snapshotNumber) {

protected Table createTableWithSnapshots(
String location, int snapshotNumber, Map<String, String> properties) {
return createTableWithSnapshots(location, snapshotNumber, properties, "append");
}

private Table createTableWithSnapshots(
String location, int snapshotNumber, Map<String, String> properties, String mode) {
Table newTable = TABLES.create(SCHEMA, PartitionSpec.unpartitioned(), properties, location);

List<ThreeColumnRecord> records =
Expand All @@ -134,7 +144,7 @@ protected Table createTableWithSnapshots(
Dataset<Row> df = spark.createDataFrame(records, ThreeColumnRecord.class).coalesce(1);

for (int i = 0; i < snapshotNumber; i++) {
df.select("c1", "c2", "c3").write().format("iceberg").mode("append").save(location);
df.select("c1", "c2", "c3").write().format("iceberg").mode(mode).save(location);
}

return newTable;
Expand Down Expand Up @@ -478,10 +488,13 @@ public void testFullTableRewritePathWithDeletedVersionFiles() throws Exception {
Table sourceTable = createTableWithSnapshots(location, 2);
// expire the first snapshot
Table staticTable = newStaticTable(location + "metadata/v2.metadata.json", table.io());
actions()
.expireSnapshots(sourceTable)
.expireSnapshotId(staticTable.currentSnapshot().snapshotId())
.execute();
int expiredManifestListCount = 1;
ExpireSnapshots.Result expireResult =
actions()
.expireSnapshots(sourceTable)
.expireSnapshotId(staticTable.currentSnapshot().snapshotId())
.execute();
assertThat(expireResult.deletedManifestListsCount()).isEqualTo(expiredManifestListCount);

// create 100 more snapshots
List<ThreeColumnRecord> records =
Expand All @@ -492,17 +505,25 @@ public void testFullTableRewritePathWithDeletedVersionFiles() throws Exception {
}
sourceTable.refresh();

// each iteration generate 1 version file, 1 manifest list, 1 manifest and 1 data file
int totalIteration = 102;
// v1/v2/v3.metadata.json has been deleted in v104.metadata.json, and there is no way to find
// the first snapshot
// from the version file history
int missingVersionFile = 1;
RewriteTablePath.Result result =
actions()
.rewriteTablePath(sourceTable)
.stagingLocation(stagingLocation())
.rewriteLocationPrefix(location, targetTableLocation())
.execute();

checkFileNum(101, 101, 101, 406, result);
checkFileNum(
totalIteration - missingVersionFile,
totalIteration - expiredManifestListCount,
totalIteration,
totalIteration * 4 - missingVersionFile - expiredManifestListCount,
result);
}

@Test
Expand Down Expand Up @@ -533,14 +554,77 @@ public void testExpireSnapshotBeforeRewrite() throws Exception {
checkFileNum(4, 1, 2, 9, result);
}

@Test
public void testRewritePathWithNonLiveEntry() throws Exception {
String location = newTableLocation();
// first overwrite generate 1 manifest and 1 data file
// each subsequent overwrite on unpartitioned table generate 2 manifests and 1 data file
Table tableWith3Snaps = createTableWithSnapshots(location, 3, Maps.newHashMap(), "overwrite");

Snapshot oldest = SnapshotUtil.oldestAncestor(tableWith3Snaps);
String oldestDataFilePath =
Iterables.getOnlyElement(
tableWith3Snaps.snapshot(oldest.snapshotId()).addedDataFiles(tableWith3Snaps.io()))
.location();
String deletedDataFilePathInTargetLocation =
String.format("%sdata/%s", targetTableLocation(), fileName(oldestDataFilePath));

// expire the oldest snapshot and remove oldest DataFile
ExpireSnapshots.Result expireResult =
actions().expireSnapshots(tableWith3Snaps).expireSnapshotId(oldest.snapshotId()).execute();
assertThat(expireResult)
.as("Should deleted 1 data files in root snapshot")
.extracting(
ExpireSnapshots.Result::deletedManifestListsCount,
ExpireSnapshots.Result::deletedManifestsCount,
ExpireSnapshots.Result::deletedDataFilesCount)
.contains(1L, 1L, 1L);

RewriteTablePath.Result result =
actions()
.rewriteTablePath(tableWith3Snaps)
.stagingLocation(stagingLocation())
.rewriteLocationPrefix(tableWith3Snaps.location(), targetTableLocation())
.execute();

// 5 version files include 1 table creation 3 overwrite and 1 snapshot expiration
// 3 overwrites generate 3 manifest list and 5 manifests with 3 data files
// snapshot expiration removed 1 of each
checkFileNum(5, 2, 4, 13, result);

// copy the metadata files and data files
copyTableFiles(result);

// expect deleted data file is excluded from rewrite and copy
List<String> copiedDataFiles =
spark
.read()
.format("iceberg")
.load(targetTableLocation() + "#all_files")
.select("file_path")
.as(Encoders.STRING())
.collectAsList();
assertThat(copiedDataFiles).hasSize(2).doesNotContain(deletedDataFilePathInTargetLocation);

// expect manifest entries still contain deleted entry
List<String> copiedEntries =
spark
.read()
.format("iceberg")
.load(targetTableLocation() + "#all_entries")
.filter("status == 2")
.select("data_file.file_path")
.as(Encoders.STRING())
.collectAsList();
assertThat(copiedEntries).contains(deletedDataFilePathInTargetLocation);
}

@Test
public void testStartSnapshotWithoutValidSnapshot() throws Exception {
// expire one snapshot
actions().expireSnapshots(table).expireSnapshotId(table.currentSnapshot().parentId()).execute();

assertThat(((List) table.snapshots()).size())
.withFailMessage("1 out 2 snapshot has been removed")
.isEqualTo(1);
assertThat(table.snapshots()).hasSize(1);

RewriteTablePath.Result result =
actions()
Expand Down Expand Up @@ -573,7 +657,7 @@ public void testMoveTheVersionExpireSnapshot() throws Exception {
}

@Test
public void testMoveVersionWithInvalidSnapshots() throws Exception {
public void testMoveVersionWithInvalidSnapshots() {
// expire one snapshot
actions().expireSnapshots(table).expireSnapshotId(table.currentSnapshot().parentId()).execute();

Expand Down Expand Up @@ -940,16 +1024,20 @@ protected void checkFileNum(
.load(result.fileListLocation())
.as(Encoders.STRING())
.collectAsList();
assertThat(filesToMove.stream().filter(f -> f.endsWith(".metadata.json")).count())
.withFailMessage("Wrong rebuilt version file count")
Predicate<String> isManifest = f -> f.endsWith("-m0.avro") || f.endsWith("-m1.avro");
Predicate<String> isManifestList = f -> f.contains("snap-") && f.endsWith(".avro");
Predicate<String> isMetadataJSON = f -> f.endsWith(".metadata.json");

assertThat(filesToMove.stream().filter(isMetadataJSON).count())
.as("Wrong rebuilt version file count")
.isEqualTo(versionFileCount);
assertThat(filesToMove.stream().filter(f -> f.contains("snap-")).count())
.withFailMessage("Wrong rebuilt Manifest list file count")
assertThat(filesToMove.stream().filter(isManifestList).count())
.as("Wrong rebuilt Manifest list file count")
.isEqualTo(manifestListCount);
assertThat(filesToMove.stream().filter(f -> f.endsWith("-m0.avro")).count())
.withFailMessage("Wrong rebuilt Manifest file file count")
assertThat(filesToMove.stream().filter(isManifest).count())
.as("Wrong rebuilt Manifest file file count")
.isEqualTo(manifestFileCount);
assertThat(filesToMove.size()).withFailMessage("Wrong total file count").isEqualTo(totalCount);
assertThat(filesToMove.size()).as("Wrong total file count").isEqualTo(totalCount);
}

protected String newTableLocation() throws IOException {
Expand Down

0 comments on commit e91655b

Please sign in to comment.