Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

API, Core: Add content offset and size to DeleteFile #11446

Merged
merged 3 commits into from
Nov 4, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion api/src/main/java/org/apache/iceberg/AddedRowsScanTask.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.iceberg;

import java.util.List;
import org.apache.iceberg.util.ScanTaskUtil;

/**
* A scan task for inserts generated by adding a data file to the table.
Expand Down Expand Up @@ -55,7 +56,7 @@ default ChangelogOperation operation() {

@Override
default long sizeBytes() {
return length() + deletes().stream().mapToLong(ContentFile::fileSizeInBytes).sum();
return length() + ScanTaskUtil.contentSizeInBytes(deletes());
}

@Override
Expand Down
15 changes: 13 additions & 2 deletions api/src/main/java/org/apache/iceberg/DataFile.java
Original file line number Diff line number Diff line change
Expand Up @@ -104,12 +104,21 @@ public interface DataFile extends ContentFile<DataFile> {
"referenced_data_file",
StringType.get(),
"Fully qualified location (URI with FS scheme) of a data file that all deletes reference");
Types.NestedField CONTENT_OFFSET =
optional(
144, "content_offset", LongType.get(), "The offset in the file where the content starts");
Types.NestedField CONTENT_SIZE =
optional(
145,
"content_size_in_bytes",
LongType.get(),
"The length of referenced content stored in the file");

int PARTITION_ID = 102;
String PARTITION_NAME = "partition";
String PARTITION_DOC = "Partition data tuple, schema based on the partition spec";

// NEXT ID TO ASSIGN: 144
// NEXT ID TO ASSIGN: 146

static StructType getType(StructType partitionType) {
// IDs start at 100 to leave room for changes to ManifestEntry
Expand All @@ -131,7 +140,9 @@ static StructType getType(StructType partitionType) {
SPLIT_OFFSETS,
EQUALITY_IDS,
SORT_ORDER_ID,
REFERENCED_DATA_FILE);
REFERENCED_DATA_FILE,
CONTENT_OFFSET,
CONTENT_SIZE);
}

/**
Expand Down
22 changes: 22 additions & 0 deletions api/src/main/java/org/apache/iceberg/DeleteFile.java
Original file line number Diff line number Diff line change
Expand Up @@ -42,4 +42,26 @@ default List<Long> splitOffsets() {
default String referencedDataFile() {
return null;
}

/**
* Returns the offset in the file where the content starts.
*
* <p>The content offset is required for deletion vectors and points to the start of the deletion
* vector blob in the Puffin file, enabling direct access. This method always returns null for
* equality and position delete files.
*/
default Long contentOffset() {
return null;
}

/**
* Returns the length of referenced content stored in the file.
*
* <p>The content size is required for deletion vectors and indicates the size of the deletion
* vector blob in the Puffin file, enabling direct access. This method always returns null for
* equality and position delete files.
*/
default Long contentSizeInBytes() {
return null;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.iceberg;

import java.util.List;
import org.apache.iceberg.util.ScanTaskUtil;

/**
* A scan task for deletes generated by removing a data file from the table.
Expand Down Expand Up @@ -54,7 +55,7 @@ default ChangelogOperation operation() {

@Override
default long sizeBytes() {
return length() + existingDeletes().stream().mapToLong(ContentFile::fileSizeInBytes).sum();
return length() + ScanTaskUtil.contentSizeInBytes(existingDeletes());
}

@Override
Expand Down
7 changes: 4 additions & 3 deletions api/src/main/java/org/apache/iceberg/DeletedRowsScanTask.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.iceberg;

import java.util.List;
import org.apache.iceberg.util.ScanTaskUtil;

/**
* A scan task for deletes generated by adding delete files to the table.
Expand Down Expand Up @@ -63,9 +64,9 @@ default ChangelogOperation operation() {

@Override
default long sizeBytes() {
return length()
+ addedDeletes().stream().mapToLong(ContentFile::fileSizeInBytes).sum()
+ existingDeletes().stream().mapToLong(ContentFile::fileSizeInBytes).sum();
long addedDeletesSize = ScanTaskUtil.contentSizeInBytes(addedDeletes());
long existingDeletesSize = ScanTaskUtil.contentSizeInBytes(existingDeletes());
return length() + addedDeletesSize + existingDeletesSize;
}

@Override
Expand Down
1 change: 1 addition & 0 deletions api/src/main/java/org/apache/iceberg/FileFormat.java
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@

/** Enum of supported file formats. */
public enum FileFormat {
PUFFIN("puffin", false),
ORC("orc", true),
PARQUET("parquet", true),
AVRO("avro", true),
Expand Down
3 changes: 2 additions & 1 deletion api/src/main/java/org/apache/iceberg/FileScanTask.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.iceberg;

import java.util.List;
import org.apache.iceberg.util.ScanTaskUtil;

/** A scan task over a range of bytes in a single data file. */
public interface FileScanTask extends ContentScanTask<DataFile>, SplittableScanTask<FileScanTask> {
Expand All @@ -36,7 +37,7 @@ default Schema schema() {

@Override
default long sizeBytes() {
return length() + deletes().stream().mapToLong(ContentFile::fileSizeInBytes).sum();
return length() + ScanTaskUtil.contentSizeInBytes(deletes());
}

@Override
Expand Down
7 changes: 4 additions & 3 deletions api/src/main/java/org/apache/iceberg/util/DeleteFileSet.java
Original file line number Diff line number Diff line change
Expand Up @@ -97,13 +97,14 @@ public boolean equals(Object o) {
}

DeleteFileWrapper that = (DeleteFileWrapper) o;
// this needs to be updated once deletion vector support is added
return Objects.equals(file.location(), that.file.location());
return Objects.equals(file.location(), that.file.location())
&& Objects.equals(file.contentOffset(), that.file.contentOffset())
&& Objects.equals(file.contentSizeInBytes(), that.file.contentSizeInBytes());
}

@Override
public int hashCode() {
return Objects.hashCode(file.location());
return Objects.hash(file.location(), file.contentOffset(), file.contentSizeInBytes());
}

@Override
Expand Down
50 changes: 50 additions & 0 deletions api/src/main/java/org/apache/iceberg/util/ScanTaskUtil.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iceberg.util;

import org.apache.iceberg.ContentFile;
import org.apache.iceberg.DeleteFile;
import org.apache.iceberg.FileContent;
import org.apache.iceberg.FileFormat;

public class ScanTaskUtil {

private ScanTaskUtil() {}

public static long contentSizeInBytes(ContentFile<?> file) {
if (file.content() == FileContent.DATA) {
return file.fileSizeInBytes();
} else {
DeleteFile deleteFile = (DeleteFile) file;
return isDV(deleteFile) ? deleteFile.contentSizeInBytes() : deleteFile.fileSizeInBytes();
}
}

public static long contentSizeInBytes(Iterable<? extends ContentFile<?>> files) {
long size = 0L;
for (ContentFile<?> file : files) {
size += contentSizeInBytes(file);
}
return size;
}

private static boolean isDV(DeleteFile deleteFile) {
return deleteFile.format() == FileFormat.PUFFIN;
}
}
56 changes: 56 additions & 0 deletions api/src/test/java/org/apache/iceberg/util/TestScanTaskUtil.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iceberg.util;

import static org.assertj.core.api.Assertions.assertThat;

import org.apache.iceberg.DeleteFile;
import org.apache.iceberg.FileFormat;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
import org.junit.jupiter.api.Test;
import org.mockito.Mockito;

public class TestScanTaskUtil {

@Test
public void testContentSize() {
DeleteFile dv1 = mockDV("dv1.puffin", 20L, 25L, "data1.parquet");
DeleteFile dv2 = mockDV("dv2.puffin", 4L, 15L, "data2.parquet");

long size1 = ScanTaskUtil.contentSizeInBytes(ImmutableList.of());
assertThat(size1).isEqualTo(0);

long size2 = ScanTaskUtil.contentSizeInBytes(ImmutableList.of(dv1));
assertThat(size2).isEqualTo(25L);

long size3 = ScanTaskUtil.contentSizeInBytes(ImmutableList.of(dv1, dv2));
assertThat(size3).isEqualTo(40L);
}

private static DeleteFile mockDV(
String location, long contentOffset, long contentSize, String referencedDataFile) {
DeleteFile mockFile = Mockito.mock(DeleteFile.class);
Mockito.when(mockFile.format()).thenReturn(FileFormat.PUFFIN);
Mockito.when(mockFile.location()).thenReturn(location);
Mockito.when(mockFile.contentOffset()).thenReturn(contentOffset);
Mockito.when(mockFile.contentSizeInBytes()).thenReturn(contentSize);
Mockito.when(mockFile.referencedDataFile()).thenReturn(referencedDataFile);
return mockFile;
}
}
32 changes: 31 additions & 1 deletion core/src/main/java/org/apache/iceberg/BaseFile.java
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,8 @@ public PartitionData copy() {
private byte[] keyMetadata = null;
private Integer sortOrderId;
private String referencedDataFile = null;
private Long contentOffset = null;
private Long contentSizeInBytes = null;

// cached schema
private transient Schema avroSchema = null;
Expand Down Expand Up @@ -110,6 +112,8 @@ public PartitionData copy() {
DataFile.EQUALITY_IDS,
DataFile.SORT_ORDER_ID,
DataFile.REFERENCED_DATA_FILE,
DataFile.CONTENT_OFFSET,
DataFile.CONTENT_SIZE,
MetadataColumns.ROW_POSITION);

/** Used by Avro reflection to instantiate this class when reading manifest files. */
Expand Down Expand Up @@ -152,7 +156,9 @@ public PartitionData copy() {
int[] equalityFieldIds,
Integer sortOrderId,
ByteBuffer keyMetadata,
String referencedDataFile) {
String referencedDataFile,
Long contentOffset,
Long contentSizeInBytes) {
super(BASE_TYPE.fields().size());
this.partitionSpecId = specId;
this.content = content;
Expand Down Expand Up @@ -182,6 +188,8 @@ public PartitionData copy() {
this.sortOrderId = sortOrderId;
this.keyMetadata = ByteBuffers.toByteArray(keyMetadata);
this.referencedDataFile = referencedDataFile;
this.contentOffset = contentOffset;
this.contentSizeInBytes = contentSizeInBytes;
}

/**
Expand Down Expand Up @@ -235,6 +243,8 @@ public PartitionData copy() {
this.dataSequenceNumber = toCopy.dataSequenceNumber;
this.fileSequenceNumber = toCopy.fileSequenceNumber;
this.referencedDataFile = toCopy.referencedDataFile;
this.contentOffset = toCopy.contentOffset;
this.contentSizeInBytes = toCopy.contentSizeInBytes;
}

/** Constructor for Java serialization. */
Expand Down Expand Up @@ -347,6 +357,12 @@ protected <T> void internalSet(int pos, T value) {
this.referencedDataFile = value != null ? value.toString() : null;
return;
case 18:
this.contentOffset = (Long) value;
return;
case 19:
this.contentSizeInBytes = (Long) value;
return;
case 20:
this.fileOrdinal = (long) value;
return;
default:
Expand Down Expand Up @@ -398,6 +414,10 @@ private Object getByPos(int basePos) {
case 17:
return referencedDataFile;
case 18:
return contentOffset;
case 19:
return contentSizeInBytes;
case 20:
return fileOrdinal;
default:
throw new UnsupportedOperationException("Unknown field ordinal: " + basePos);
Expand Down Expand Up @@ -528,6 +548,14 @@ public String referencedDataFile() {
return referencedDataFile;
}

public Long contentOffset() {
return contentOffset;
}

public Long contentSizeInBytes() {
return contentSizeInBytes;
}

private static <K, V> Map<K, V> copyMap(Map<K, V> map, Set<K> keys) {
return keys == null ? SerializableMap.copyOf(map) : SerializableMap.filteredCopyOf(map, keys);
}
Expand Down Expand Up @@ -580,6 +608,8 @@ public String toString() {
.add("data_sequence_number", dataSequenceNumber == null ? "null" : dataSequenceNumber)
.add("file_sequence_number", fileSequenceNumber == null ? "null" : fileSequenceNumber)
.add("referenced_data_file", referencedDataFile == null ? "null" : referencedDataFile)
.add("content_offset", contentOffset == null ? "null" : contentOffset)
.add("content_size_in_bytes", contentSizeInBytes == null ? "null" : contentSizeInBytes)
.toString();
}
}
9 changes: 3 additions & 6 deletions core/src/main/java/org/apache/iceberg/BaseFileScanTask.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.apache.iceberg.expressions.ResidualEvaluator;
import org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
import org.apache.iceberg.util.ScanTaskUtil;

public class BaseFileScanTask extends BaseContentScanTask<FileScanTask, DataFile>
implements FileScanTask {
Expand Down Expand Up @@ -79,7 +80,7 @@ private long deletesSizeBytes() {
if (deletesSizeBytes == 0L && deletes.length > 0) {
long size = 0L;
for (DeleteFile deleteFile : deletes) {
size += deleteFile.fileSizeInBytes();
size += ScanTaskUtil.contentSizeInBytes(deleteFile);
}
this.deletesSizeBytes = size;
}
Expand Down Expand Up @@ -180,11 +181,7 @@ public SplitScanTask merge(ScanTask other) {

private long deletesSizeBytes() {
if (deletesSizeBytes == 0L && fileScanTask.filesCount() > 1) {
long size = 0L;
for (DeleteFile deleteFile : fileScanTask.deletes()) {
size += deleteFile.fileSizeInBytes();
}
this.deletesSizeBytes = size;
this.deletesSizeBytes = ScanTaskUtil.contentSizeInBytes(fileScanTask.deletes());
}

return deletesSizeBytes;
Expand Down
Loading