Skip to content

Commit

Permalink
Core: Support DVs in DeleteFileIndex (#11467)
Browse files Browse the repository at this point in the history
  • Loading branch information
aokolnychyi authored Nov 5, 2024
1 parent 67ee082 commit 5bd314b
Show file tree
Hide file tree
Showing 3 changed files with 121 additions and 7 deletions.
62 changes: 55 additions & 7 deletions core/src/main/java/org/apache/iceberg/DeleteFileIndex.java
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ExecutorService;
import org.apache.iceberg.exceptions.RuntimeIOException;
import org.apache.iceberg.exceptions.ValidationException;
import org.apache.iceberg.expressions.Expression;
import org.apache.iceberg.expressions.Expressions;
import org.apache.iceberg.expressions.ManifestEvaluator;
Expand Down Expand Up @@ -70,6 +71,7 @@ class DeleteFileIndex {
private final PartitionMap<EqualityDeletes> eqDeletesByPartition;
private final PartitionMap<PositionDeletes> posDeletesByPartition;
private final Map<String, PositionDeletes> posDeletesByPath;
private final Map<String, DeleteFile> dvByPath;
private final boolean hasEqDeletes;
private final boolean hasPosDeletes;
private final boolean isEmpty;
Expand All @@ -78,13 +80,16 @@ private DeleteFileIndex(
EqualityDeletes globalDeletes,
PartitionMap<EqualityDeletes> eqDeletesByPartition,
PartitionMap<PositionDeletes> posDeletesByPartition,
Map<String, PositionDeletes> posDeletesByPath) {
Map<String, PositionDeletes> posDeletesByPath,
Map<String, DeleteFile> dvByPath) {
this.globalDeletes = globalDeletes;
this.eqDeletesByPartition = eqDeletesByPartition;
this.posDeletesByPartition = posDeletesByPartition;
this.posDeletesByPath = posDeletesByPath;
this.dvByPath = dvByPath;
this.hasEqDeletes = globalDeletes != null || eqDeletesByPartition != null;
this.hasPosDeletes = posDeletesByPartition != null || posDeletesByPath != null;
this.hasPosDeletes =
posDeletesByPartition != null || posDeletesByPath != null || dvByPath != null;
this.isEmpty = !hasEqDeletes && !hasPosDeletes;
}

Expand Down Expand Up @@ -125,6 +130,10 @@ public Iterable<DeleteFile> referencedDeleteFiles() {
}
}

if (dvByPath != null) {
deleteFiles = Iterables.concat(deleteFiles, dvByPath.values());
}

return deleteFiles;
}

Expand All @@ -143,9 +152,16 @@ DeleteFile[] forDataFile(long sequenceNumber, DataFile file) {

DeleteFile[] global = findGlobalDeletes(sequenceNumber, file);
DeleteFile[] eqPartition = findEqPartitionDeletes(sequenceNumber, file);
DeleteFile[] posPartition = findPosPartitionDeletes(sequenceNumber, file);
DeleteFile[] posPath = findPathDeletes(sequenceNumber, file);
return concat(global, eqPartition, posPartition, posPath);
DeleteFile dv = findDV(sequenceNumber, file);
if (dv != null && global == null && eqPartition == null) {
return new DeleteFile[] {dv};
} else if (dv != null) {
return concat(global, eqPartition, new DeleteFile[] {dv});
} else {
DeleteFile[] posPartition = findPosPartitionDeletes(sequenceNumber, file);
DeleteFile[] posPath = findPathDeletes(sequenceNumber, file);
return concat(global, eqPartition, posPartition, posPath);
}
}

private DeleteFile[] findGlobalDeletes(long seq, DataFile dataFile) {
Expand Down Expand Up @@ -180,6 +196,22 @@ private DeleteFile[] findPathDeletes(long seq, DataFile dataFile) {
return deletes == null ? EMPTY_DELETES : deletes.filter(seq);
}

private DeleteFile findDV(long seq, DataFile dataFile) {
if (dvByPath == null) {
return null;
}

DeleteFile dv = dvByPath.get(dataFile.location());
if (dv != null) {
ValidationException.check(
dv.dataSequenceNumber() >= seq,
"DV data sequence number (%s) must be greater than or equal to data file sequence number (%s)",
dv.dataSequenceNumber(),
seq);
}
return dv;
}

@SuppressWarnings("checkstyle:CyclomaticComplexity")
private static boolean canContainEqDeletesForFile(
DataFile dataFile, EqualityDeleteFile deleteFile) {
Expand Down Expand Up @@ -434,11 +466,16 @@ DeleteFileIndex build() {
PartitionMap<EqualityDeletes> eqDeletesByPartition = PartitionMap.create(specsById);
PartitionMap<PositionDeletes> posDeletesByPartition = PartitionMap.create(specsById);
Map<String, PositionDeletes> posDeletesByPath = Maps.newHashMap();
Map<String, DeleteFile> dvByPath = Maps.newHashMap();

for (DeleteFile file : files) {
switch (file.content()) {
case POSITION_DELETES:
add(posDeletesByPath, posDeletesByPartition, file);
if (ContentFileUtil.isDV(file)) {
add(dvByPath, file);
} else {
add(posDeletesByPath, posDeletesByPartition, file);
}
break;
case EQUALITY_DELETES:
add(globalDeletes, eqDeletesByPartition, file);
Expand All @@ -453,7 +490,18 @@ DeleteFileIndex build() {
globalDeletes.isEmpty() ? null : globalDeletes,
eqDeletesByPartition.isEmpty() ? null : eqDeletesByPartition,
posDeletesByPartition.isEmpty() ? null : posDeletesByPartition,
posDeletesByPath.isEmpty() ? null : posDeletesByPath);
posDeletesByPath.isEmpty() ? null : posDeletesByPath,
dvByPath.isEmpty() ? null : dvByPath);
}

private void add(Map<String, DeleteFile> dvByPath, DeleteFile dv) {
String path = dv.referencedDataFile();
DeleteFile existingDV = dvByPath.putIfAbsent(path, dv);
if (existingDV != null) {
throw new ValidationException(
"Can't index multiple DVs for %s: %s and %s",
path, ContentFileUtil.dvDesc(dv), ContentFileUtil.dvDesc(existingDV));
}
}

private void add(
Expand Down
14 changes: 14 additions & 0 deletions core/src/main/java/org/apache/iceberg/util/ContentFileUtil.java
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.apache.iceberg.ContentFile;
import org.apache.iceberg.DeleteFile;
import org.apache.iceberg.FileContent;
import org.apache.iceberg.FileFormat;
import org.apache.iceberg.MetadataColumns;
import org.apache.iceberg.types.Conversions;
import org.apache.iceberg.types.Type;
Expand Down Expand Up @@ -84,4 +85,17 @@ public static String referencedDataFileLocation(DeleteFile deleteFile) {
CharSequence location = referencedDataFile(deleteFile);
return location != null ? location.toString() : null;
}

public static boolean isDV(DeleteFile deleteFile) {
return deleteFile.format() == FileFormat.PUFFIN;
}

public static String dvDesc(DeleteFile deleteFile) {
return String.format(
"DV{location=%s, offset=%s, length=%s, referencedDataFile=%s}",
deleteFile.location(),
deleteFile.contentOffset(),
deleteFile.contentSizeInBytes(),
deleteFile.referencedDataFile());
}
}
52 changes: 52 additions & 0 deletions core/src/test/java/org/apache/iceberg/DeleteFileIndexTestBase.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,20 +22,24 @@
import static org.apache.iceberg.expressions.Expressions.equal;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
import static org.assertj.core.api.Assumptions.assumeThat;

import java.io.File;
import java.io.IOException;
import java.nio.file.Files;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.UUID;
import org.apache.iceberg.DeleteFileIndex.EqualityDeletes;
import org.apache.iceberg.DeleteFileIndex.PositionDeletes;
import org.apache.iceberg.exceptions.ValidationException;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.relocated.com.google.common.collect.Sets;
import org.apache.iceberg.util.CharSequenceSet;
import org.apache.iceberg.util.ContentFileUtil;
import org.junit.jupiter.api.TestTemplate;
import org.junit.jupiter.api.extension.ExtendWith;

Expand Down Expand Up @@ -621,4 +625,52 @@ public void testEqualityDeletesGroup() {
// it should not be possible to add more elements upon indexing
assertThatThrownBy(() -> group.add(SPEC, file1)).isInstanceOf(IllegalStateException.class);
}

@TestTemplate
public void testMixDeleteFilesAndDVs() {
assumeThat(formatVersion).isGreaterThanOrEqualTo(3);

List<DeleteFile> deletes =
Arrays.asList(
withDataSequenceNumber(1, partitionedPosDeletes(SPEC, FILE_A.partition())),
withDataSequenceNumber(2, newDV(FILE_A)),
withDataSequenceNumber(1, partitionedPosDeletes(SPEC, FILE_B.partition())),
withDataSequenceNumber(2, partitionedPosDeletes(SPEC, FILE_B.partition())));

DeleteFileIndex index = DeleteFileIndex.builderFor(deletes).specsById(table.specs()).build();

DeleteFile[] fileADeletes = index.forDataFile(0, FILE_A);
assertThat(fileADeletes).as("Only DV should apply to FILE_A").hasSize(1);
assertThat(ContentFileUtil.isDV(fileADeletes[0])).isTrue();
assertThat(fileADeletes[0].referencedDataFile()).isEqualTo(FILE_A.location());

DeleteFile[] fileBDeletes = index.forDataFile(0, FILE_B);
assertThat(fileBDeletes).as("Two delete files should apply to FILE_B").hasSize(2);
assertThat(ContentFileUtil.isDV(fileBDeletes[0])).isFalse();
assertThat(ContentFileUtil.isDV(fileBDeletes[1])).isFalse();
}

@TestTemplate
public void testMultipleDVs() {
assumeThat(formatVersion).isGreaterThanOrEqualTo(3);

DeleteFile dv1 = withDataSequenceNumber(1, newDV(FILE_A));
DeleteFile dv2 = withDataSequenceNumber(2, newDV(FILE_A));
List<DeleteFile> dvs = Arrays.asList(dv1, dv2);

assertThatThrownBy(() -> DeleteFileIndex.builderFor(dvs).specsById(table.specs()).build())
.isInstanceOf(ValidationException.class)
.hasMessageContaining("Can't index multiple DVs for %s", FILE_A.location());
}

@TestTemplate
public void testInvalidDVSequenceNumber() {
assumeThat(formatVersion).isGreaterThanOrEqualTo(3);
DeleteFile dv = withDataSequenceNumber(1, newDV(FILE_A));
List<DeleteFile> dvs = Collections.singletonList(dv);
DeleteFileIndex index = DeleteFileIndex.builderFor(dvs).specsById(table.specs()).build();
assertThatThrownBy(() -> index.forDataFile(2, FILE_A))
.isInstanceOf(ValidationException.class)
.hasMessageContaining("must be greater than or equal to data file sequence number");
}
}

0 comments on commit 5bd314b

Please sign in to comment.