Skip to content

Commit

Permalink
API, Core: Add data file reference to DeleteFile (#11443)
Browse files Browse the repository at this point in the history
  • Loading branch information
aokolnychyi authored Nov 2, 2024
1 parent d368a5f commit d9b9768
Show file tree
Hide file tree
Showing 17 changed files with 157 additions and 15 deletions.
11 changes: 9 additions & 2 deletions api/src/main/java/org/apache/iceberg/DataFile.java
Original file line number Diff line number Diff line change
Expand Up @@ -98,12 +98,18 @@ public interface DataFile extends ContentFile<DataFile> {
Types.NestedField SORT_ORDER_ID =
optional(140, "sort_order_id", IntegerType.get(), "Sort order ID");
Types.NestedField SPEC_ID = optional(141, "spec_id", IntegerType.get(), "Partition spec ID");
Types.NestedField REFERENCED_DATA_FILE =
optional(
143,
"referenced_data_file",
StringType.get(),
"Fully qualified location (URI with FS scheme) of a data file that all deletes reference");

int PARTITION_ID = 102;
String PARTITION_NAME = "partition";
String PARTITION_DOC = "Partition data tuple, schema based on the partition spec";

// NEXT ID TO ASSIGN: 142
// NEXT ID TO ASSIGN: 144

static StructType getType(StructType partitionType) {
// IDs start at 100 to leave room for changes to ManifestEntry
Expand All @@ -124,7 +130,8 @@ static StructType getType(StructType partitionType) {
KEY_METADATA,
SPLIT_OFFSETS,
EQUALITY_IDS,
SORT_ORDER_ID);
SORT_ORDER_ID,
REFERENCED_DATA_FILE);
}

/**
Expand Down
11 changes: 11 additions & 0 deletions api/src/main/java/org/apache/iceberg/DeleteFile.java
Original file line number Diff line number Diff line change
Expand Up @@ -31,4 +31,15 @@ public interface DeleteFile extends ContentFile<DeleteFile> {
default List<Long> splitOffsets() {
return null;
}

/**
* Returns the location of a data file that all deletes reference.
*
* <p>The referenced data file is required for deletion vectors and may be optionally captured for
* position delete files that apply to only one data file. This method always returns null for
* equality delete files.
*/
default String referencedDataFile() {
return null;
}
}
17 changes: 16 additions & 1 deletion core/src/main/java/org/apache/iceberg/BaseFile.java
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ public PartitionData copy() {
private int[] equalityIds = null;
private byte[] keyMetadata = null;
private Integer sortOrderId;
private String referencedDataFile = null;

// cached schema
private transient Schema avroSchema = null;
Expand Down Expand Up @@ -108,6 +109,7 @@ public PartitionData copy() {
DataFile.SPLIT_OFFSETS,
DataFile.EQUALITY_IDS,
DataFile.SORT_ORDER_ID,
DataFile.REFERENCED_DATA_FILE,
MetadataColumns.ROW_POSITION);

/** Used by Avro reflection to instantiate this class when reading manifest files. */
Expand Down Expand Up @@ -149,7 +151,8 @@ public PartitionData copy() {
List<Long> splitOffsets,
int[] equalityFieldIds,
Integer sortOrderId,
ByteBuffer keyMetadata) {
ByteBuffer keyMetadata,
String referencedDataFile) {
super(BASE_TYPE.fields().size());
this.partitionSpecId = specId;
this.content = content;
Expand Down Expand Up @@ -178,6 +181,7 @@ public PartitionData copy() {
this.equalityIds = equalityFieldIds;
this.sortOrderId = sortOrderId;
this.keyMetadata = ByteBuffers.toByteArray(keyMetadata);
this.referencedDataFile = referencedDataFile;
}

/**
Expand Down Expand Up @@ -230,6 +234,7 @@ public PartitionData copy() {
this.sortOrderId = toCopy.sortOrderId;
this.dataSequenceNumber = toCopy.dataSequenceNumber;
this.fileSequenceNumber = toCopy.fileSequenceNumber;
this.referencedDataFile = toCopy.referencedDataFile;
}

/** Constructor for Java serialization. */
Expand Down Expand Up @@ -339,6 +344,9 @@ protected <T> void internalSet(int pos, T value) {
this.sortOrderId = (Integer) value;
return;
case 17:
this.referencedDataFile = value != null ? value.toString() : null;
return;
case 18:
this.fileOrdinal = (long) value;
return;
default:
Expand Down Expand Up @@ -388,6 +396,8 @@ private Object getByPos(int basePos) {
case 16:
return sortOrderId;
case 17:
return referencedDataFile;
case 18:
return fileOrdinal;
default:
throw new UnsupportedOperationException("Unknown field ordinal: " + basePos);
Expand Down Expand Up @@ -514,6 +524,10 @@ public Integer sortOrderId() {
return sortOrderId;
}

public String referencedDataFile() {
return referencedDataFile;
}

private static <K, V> Map<K, V> copyMap(Map<K, V> map, Set<K> keys) {
return keys == null ? SerializableMap.copyOf(map) : SerializableMap.filteredCopyOf(map, keys);
}
Expand Down Expand Up @@ -565,6 +579,7 @@ public String toString() {
.add("sort_order_id", sortOrderId)
.add("data_sequence_number", dataSequenceNumber == null ? "null" : dataSequenceNumber)
.add("file_sequence_number", fileSequenceNumber == null ? "null" : fileSequenceNumber)
.add("referenced_data_file", referencedDataFile == null ? "null" : referencedDataFile)
.toString();
}
}
1 change: 1 addition & 0 deletions core/src/main/java/org/apache/iceberg/BaseScan.java
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ abstract class BaseScan<ThisT, T extends ScanTask, G extends ScanTaskGroup<T>>
"partition",
"key_metadata",
"split_offsets",
"referenced_data_file",
"equality_ids");

protected static final List<String> DELETE_SCAN_WITH_STATS_COLUMNS =
Expand Down
13 changes: 12 additions & 1 deletion core/src/main/java/org/apache/iceberg/ContentFileParser.java
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ class ContentFileParser {
private static final String SPLIT_OFFSETS = "split-offsets";
private static final String EQUALITY_IDS = "equality-ids";
private static final String SORT_ORDER_ID = "sort-order-id";
private static final String REFERENCED_DATA_FILE = "referenced-data-file";

private ContentFileParser() {}

Expand Down Expand Up @@ -109,6 +110,14 @@ static void toJson(ContentFile<?> contentFile, PartitionSpec spec, JsonGenerator
generator.writeNumberField(SORT_ORDER_ID, contentFile.sortOrderId());
}

if (contentFile instanceof DeleteFile) {
DeleteFile deleteFile = (DeleteFile) contentFile;

if (deleteFile.referencedDataFile() != null) {
generator.writeStringField(REFERENCED_DATA_FILE, deleteFile.referencedDataFile());
}
}

generator.writeEndObject();
}

Expand Down Expand Up @@ -145,6 +154,7 @@ static ContentFile<?> fromJson(JsonNode jsonNode, PartitionSpec spec) {
List<Long> splitOffsets = JsonUtil.getLongListOrNull(SPLIT_OFFSETS, jsonNode);
int[] equalityFieldIds = JsonUtil.getIntArrayOrNull(EQUALITY_IDS, jsonNode);
Integer sortOrderId = JsonUtil.getIntOrNull(SORT_ORDER_ID, jsonNode);
String referencedDataFile = JsonUtil.getStringOrNull(REFERENCED_DATA_FILE, jsonNode);

if (fileContent == FileContent.DATA) {
return new GenericDataFile(
Expand All @@ -169,7 +179,8 @@ static ContentFile<?> fromJson(JsonNode jsonNode, PartitionSpec spec) {
equalityFieldIds,
sortOrderId,
splitOffsets,
keyMetadata);
keyMetadata,
referencedDataFile);
}
}

Expand Down
13 changes: 12 additions & 1 deletion core/src/main/java/org/apache/iceberg/FileMetadata.java
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ public static class Builder {
private ByteBuffer keyMetadata = null;
private Integer sortOrderId = null;
private List<Long> splitOffsets = null;
private String referencedDataFile = null;

Builder(PartitionSpec spec) {
this.spec = spec;
Expand Down Expand Up @@ -220,6 +221,15 @@ public Builder withSortOrder(SortOrder newSortOrder) {
return this;
}

public Builder withReferencedDataFile(CharSequence newReferencedDataFile) {
if (newReferencedDataFile != null) {
this.referencedDataFile = newReferencedDataFile.toString();
} else {
this.referencedDataFile = null;
}
return this;
}

public DeleteFile build() {
Preconditions.checkArgument(filePath != null, "File path is required");
if (format == null) {
Expand Down Expand Up @@ -262,7 +272,8 @@ public DeleteFile build() {
equalityFieldIds,
sortOrderId,
splitOffsets,
keyMetadata);
keyMetadata,
referencedDataFile);
}
}
}
3 changes: 2 additions & 1 deletion core/src/main/java/org/apache/iceberg/GenericDataFile.java
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,8 @@ class GenericDataFile extends BaseFile<DataFile> implements DataFile {
splitOffsets,
null /* no equality field IDs */,
sortOrderId,
keyMetadata);
keyMetadata,
null /* no referenced data file */);
}

/**
Expand Down
6 changes: 4 additions & 2 deletions core/src/main/java/org/apache/iceberg/GenericDeleteFile.java
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,8 @@ class GenericDeleteFile extends BaseFile<DeleteFile> implements DeleteFile {
int[] equalityFieldIds,
Integer sortOrderId,
List<Long> splitOffsets,
ByteBuffer keyMetadata) {
ByteBuffer keyMetadata,
String referencedDataFile) {
super(
specId,
content,
Expand All @@ -66,7 +67,8 @@ class GenericDeleteFile extends BaseFile<DeleteFile> implements DeleteFile {
splitOffsets,
equalityFieldIds,
sortOrderId,
keyMetadata);
keyMetadata,
referencedDataFile);
}

/**
Expand Down
5 changes: 5 additions & 0 deletions core/src/main/java/org/apache/iceberg/SnapshotProducer.java
Original file line number Diff line number Diff line change
Expand Up @@ -923,5 +923,10 @@ public List<Integer> equalityFieldIds() {
public Integer sortOrderId() {
return deleteFile.sortOrderId();
}

@Override
public String referencedDataFile() {
return deleteFile.referencedDataFile();
}
}
}
9 changes: 8 additions & 1 deletion core/src/main/java/org/apache/iceberg/V2Metadata.java
Original file line number Diff line number Diff line change
Expand Up @@ -274,7 +274,8 @@ static Types.StructType fileType(Types.StructType partitionType) {
DataFile.KEY_METADATA,
DataFile.SPLIT_OFFSETS,
DataFile.EQUALITY_IDS,
DataFile.SORT_ORDER_ID);
DataFile.SORT_ORDER_ID,
DataFile.REFERENCED_DATA_FILE);
}

static class IndexedManifestEntry<F extends ContentFile<F>>
Expand Down Expand Up @@ -448,6 +449,12 @@ public Object get(int pos) {
return wrapped.equalityFieldIds();
case 15:
return wrapped.sortOrderId();
case 16:
if (wrapped instanceof DeleteFile) {
return ((DeleteFile) wrapped).referencedDataFile();
} else {
return null;
}
}
throw new IllegalArgumentException("Unknown field ordinal: " + pos);
}
Expand Down
9 changes: 8 additions & 1 deletion core/src/main/java/org/apache/iceberg/V3Metadata.java
Original file line number Diff line number Diff line change
Expand Up @@ -274,7 +274,8 @@ static Types.StructType fileType(Types.StructType partitionType) {
DataFile.KEY_METADATA,
DataFile.SPLIT_OFFSETS,
DataFile.EQUALITY_IDS,
DataFile.SORT_ORDER_ID);
DataFile.SORT_ORDER_ID,
DataFile.REFERENCED_DATA_FILE);
}

static class IndexedManifestEntry<F extends ContentFile<F>>
Expand Down Expand Up @@ -448,6 +449,12 @@ public Object get(int pos) {
return wrapped.equalityFieldIds();
case 15:
return wrapped.sortOrderId();
case 16:
if (wrapped.content() == FileContent.POSITION_DELETES) {
return ((DeleteFile) wrapped).referencedDataFile();
} else {
return null;
}
}
throw new IllegalArgumentException("Unknown field ordinal: " + pos);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,10 @@ public static CharSequence referencedDataFile(DeleteFile deleteFile) {
return null;
}

if (deleteFile.referencedDataFile() != null) {
return deleteFile.referencedDataFile();
}

int pathId = MetadataColumns.DELETE_FILE_PATH.fieldId();
Type pathType = MetadataColumns.DELETE_FILE_PATH.type();

Expand Down
12 changes: 12 additions & 0 deletions core/src/test/java/org/apache/iceberg/TestBase.java
Original file line number Diff line number Diff line change
Expand Up @@ -654,6 +654,18 @@ protected DeleteFile newDeleteFile(int specId, String partitionPath) {
.build();
}

protected DeleteFile newDeleteFileWithRef(DataFile dataFile) {
PartitionSpec spec = table.specs().get(dataFile.specId());
return FileMetadata.deleteFileBuilder(spec)
.ofPositionDeletes()
.withPath("/path/to/delete-" + UUID.randomUUID() + ".parquet")
.withFileSizeInBytes(10)
.withPartition(dataFile.partition())
.withReferencedDataFile(dataFile.location())
.withRecordCount(1)
.build();
}

protected DeleteFile newEqualityDeleteFile(int specId, String partitionPath, int... fieldIds) {
PartitionSpec spec = table.specs().get(specId);
return FileMetadata.deleteFileBuilder(spec)
Expand Down
32 changes: 30 additions & 2 deletions core/src/test/java/org/apache/iceberg/TestContentFileParser.java
Original file line number Diff line number Diff line change
Expand Up @@ -213,7 +213,33 @@ private static Stream<Arguments> provideSpecAndDeleteFile() {
Arguments.of(
TestBase.SPEC,
deleteFileWithAllOptional(TestBase.SPEC),
deleteFileJsonWithAllOptional(TestBase.SPEC)));
deleteFileJsonWithAllOptional(TestBase.SPEC)),
Arguments.of(
TestBase.SPEC, deleteFileWithDataRef(TestBase.SPEC), deleteFileWithDataRefJson()));
}

private static DeleteFile deleteFileWithDataRef(PartitionSpec spec) {
PartitionData partitionData = new PartitionData(spec.partitionType());
partitionData.set(0, 4);
return new GenericDeleteFile(
spec.specId(),
FileContent.POSITION_DELETES,
"/path/to/delete.parquet",
FileFormat.PARQUET,
partitionData,
1234,
new Metrics(10L, null, null, null, null),
null,
null,
null,
null,
"/path/to/data/file.parquet");
}

private static String deleteFileWithDataRefJson() {
return "{\"spec-id\":0,\"content\":\"POSITION_DELETES\",\"file-path\":\"/path/to/delete.parquet\","
+ "\"file-format\":\"PARQUET\",\"partition\":{\"1000\":4},\"file-size-in-bytes\":1234,"
+ "\"record-count\":10,\"referenced-data-file\":\"/path/to/data/file.parquet\"}";
}

private static DeleteFile deleteFileWithRequiredOnly(PartitionSpec spec) {
Expand All @@ -234,6 +260,7 @@ private static DeleteFile deleteFileWithRequiredOnly(PartitionSpec spec) {
null,
null,
null,
null,
null);
}

Expand Down Expand Up @@ -273,7 +300,8 @@ private static DeleteFile deleteFileWithAllOptional(PartitionSpec spec) {
new int[] {3},
1,
Collections.singletonList(128L),
ByteBuffer.wrap(new byte[16]));
ByteBuffer.wrap(new byte[16]),
null);
}

private static String deleteFileJsonWithRequiredOnly(PartitionSpec spec) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,8 @@ public class TestManifestEncryption {
EQUALITY_ID_ARR,
SORT_ORDER_ID,
null,
CONTENT_KEY_METADATA);
CONTENT_KEY_METADATA,
null);

private static final EncryptionManager ENCRYPTION_MANAGER =
EncryptionTestHelpers.createEncryptionManager();
Expand Down
Loading

0 comments on commit d9b9768

Please sign in to comment.