Skip to content

Commit

Permalink
Core, Flink, Spark: Test DVs with format-version=3 (#11485)
Browse files Browse the repository at this point in the history
  • Loading branch information
nastra authored Nov 13, 2024
1 parent 0280885 commit e06b069
Show file tree
Hide file tree
Showing 13 changed files with 624 additions and 373 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.apache.iceberg.relocated.com.google.common.base.MoreObjects;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
import org.apache.iceberg.util.ArrayUtil;
import org.apache.iceberg.util.ScanTaskUtil;

abstract class BaseContentScanTask<ThisT extends ContentScanTask<F>, F extends ContentFile<F>>
implements ContentScanTask<F>, SplittableScanTask<ThisT> {
Expand Down Expand Up @@ -82,7 +83,7 @@ public long start() {

@Override
public long length() {
return file.fileSizeInBytes();
return ScanTaskUtil.contentSizeInBytes(file);
}

@Override
Expand Down
2 changes: 2 additions & 0 deletions core/src/main/java/org/apache/iceberg/BaseFileScanTask.java
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,8 @@ public boolean canMerge(ScanTask other) {
@Override
public SplitScanTask merge(ScanTask other) {
SplitScanTask that = (SplitScanTask) other;
// don't use deletesSizeBytes() here so that deletesSizeBytes is only calculated once after
// merging rather than for each task before merging
return new SplitScanTask(offset, len + that.length(), fileScanTask, deletesSizeBytes);
}

Expand Down
68 changes: 27 additions & 41 deletions core/src/test/java/org/apache/iceberg/TestMetadataTableFilters.java
Original file line number Diff line number Diff line change
Expand Up @@ -52,18 +52,26 @@ protected static List<Object> parameters() {
return Arrays.asList(
new Object[] {1, MetadataTableType.DATA_FILES},
new Object[] {2, MetadataTableType.DATA_FILES},
new Object[] {3, MetadataTableType.DATA_FILES},
new Object[] {2, MetadataTableType.DELETE_FILES},
new Object[] {3, MetadataTableType.DELETE_FILES},
new Object[] {1, MetadataTableType.FILES},
new Object[] {2, MetadataTableType.FILES},
new Object[] {3, MetadataTableType.FILES},
new Object[] {1, MetadataTableType.ALL_DATA_FILES},
new Object[] {2, MetadataTableType.ALL_DATA_FILES},
new Object[] {3, MetadataTableType.ALL_DATA_FILES},
new Object[] {2, MetadataTableType.ALL_DELETE_FILES},
new Object[] {3, MetadataTableType.ALL_DELETE_FILES},
new Object[] {1, MetadataTableType.ALL_FILES},
new Object[] {2, MetadataTableType.ALL_FILES},
new Object[] {3, MetadataTableType.ALL_FILES},
new Object[] {1, MetadataTableType.ENTRIES},
new Object[] {2, MetadataTableType.ENTRIES},
new Object[] {3, MetadataTableType.ENTRIES},
new Object[] {1, MetadataTableType.ALL_ENTRIES},
new Object[] {2, MetadataTableType.ALL_ENTRIES});
new Object[] {2, MetadataTableType.ALL_ENTRIES},
new Object[] {3, MetadataTableType.ALL_ENTRIES});
}

@BeforeEach
Expand All @@ -76,9 +84,9 @@ public void setupTable() throws Exception {
table.newFastAppend().appendFile(FILE_D).commit();
table.newFastAppend().appendFile(FILE_B).commit();

if (formatVersion == 2) {
table.newRowDelta().addDeletes(FILE_A_DELETES).commit();
table.newRowDelta().addDeletes(FILE_B_DELETES).commit();
if (formatVersion >= 2) {
table.newRowDelta().addDeletes(fileADeletes()).commit();
table.newRowDelta().addDeletes(fileBDeletes()).commit();
table.newRowDelta().addDeletes(FILE_C2_DELETES).commit();
table.newRowDelta().addDeletes(FILE_D2_DELETES).commit();
}
Expand Down Expand Up @@ -366,7 +374,7 @@ public void testPartitionSpecEvolutionRemovalV1() {

@TestTemplate
public void testPartitionSpecEvolutionRemovalV2() {
assumeThat(formatVersion).isEqualTo(2);
assumeThat(formatVersion).isGreaterThanOrEqualTo(2);

// Change spec and add two data and delete files each
table.updateSpec().removeField(Expressions.bucket("data", 16)).addField("id").commit();
Expand All @@ -388,27 +396,13 @@ public void testPartitionSpecEvolutionRemovalV2() {
.withPartitionPath("id=11")
.build();

DeleteFile delete10 =
FileMetadata.deleteFileBuilder(newSpec)
.ofPositionDeletes()
.withPath("/path/to/data-10-deletes.parquet")
.withFileSizeInBytes(10)
.withPartitionPath("id=10")
.withRecordCount(1)
.build();
DeleteFile delete11 =
FileMetadata.deleteFileBuilder(newSpec)
.ofPositionDeletes()
.withPath("/path/to/data-11-deletes.parquet")
.withFileSizeInBytes(10)
.withPartitionPath("id=11")
.withRecordCount(1)
.build();
DeleteFile delete10 = posDelete(table, data10);
DeleteFile delete11 = posDelete(table, data11);

table.newFastAppend().appendFile(data10).commit();
table.newFastAppend().appendFile(data11).commit();

if (formatVersion == 2) {
if (formatVersion >= 2) {
table.newRowDelta().addDeletes(delete10).commit();
table.newRowDelta().addDeletes(delete11).commit();
}
Expand Down Expand Up @@ -447,6 +441,12 @@ public void testPartitionSpecEvolutionRemovalV2() {
assertThat(tasks).hasSize(expectedScanTaskCount(3));
}

private DeleteFile posDelete(Table table, DataFile dataFile) {
return formatVersion >= 3
? FileGenerationUtil.generateDV(table, dataFile)
: FileGenerationUtil.generatePositionDeleteFile(table, dataFile);
}

@TestTemplate
public void testPartitionSpecEvolutionAdditiveV1() {
assumeThat(formatVersion).isEqualTo(1);
Expand Down Expand Up @@ -514,8 +514,8 @@ public void testPartitionSpecEvolutionAdditiveV1() {
}

@TestTemplate
public void testPartitionSpecEvolutionAdditiveV2() {
assumeThat(formatVersion).isEqualTo(2);
public void testPartitionSpecEvolutionAdditiveV2AndAbove() {
assumeThat(formatVersion).isGreaterThanOrEqualTo(2);

// Change spec and add two data and delete files each
table.updateSpec().addField("id").commit();
Expand All @@ -537,27 +537,13 @@ public void testPartitionSpecEvolutionAdditiveV2() {
.withPartitionPath("data_bucket=1/id=11")
.build();

DeleteFile delete10 =
FileMetadata.deleteFileBuilder(newSpec)
.ofPositionDeletes()
.withPath("/path/to/data-10-deletes.parquet")
.withFileSizeInBytes(10)
.withPartitionPath("data_bucket=0/id=10")
.withRecordCount(1)
.build();
DeleteFile delete11 =
FileMetadata.deleteFileBuilder(newSpec)
.ofPositionDeletes()
.withPath("/path/to/data-11-deletes.parquet")
.withFileSizeInBytes(10)
.withPartitionPath("data_bucket=1/id=11")
.withRecordCount(1)
.build();
DeleteFile delete10 = posDelete(table, data10);
DeleteFile delete11 = posDelete(table, data11);

table.newFastAppend().appendFile(data10).commit();
table.newFastAppend().appendFile(data11).commit();

if (formatVersion == 2) {
if (formatVersion >= 2) {
table.newRowDelta().addDeletes(delete10).commit();
table.newRowDelta().addDeletes(delete11).commit();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1733,7 +1733,7 @@ public void testFilesTableEstimateSize() throws Exception {
assertEstimatedRowCount(new AllDataFilesTable(table), 4);
assertEstimatedRowCount(new AllFilesTable(table), 4);

if (formatVersion == 2) {
if (formatVersion >= 2) {
assertEstimatedRowCount(new DeleteFilesTable(table), 4);
assertEstimatedRowCount(new AllDeleteFilesTable(table), 4);
}
Expand Down
98 changes: 80 additions & 18 deletions data/src/test/java/org/apache/iceberg/data/FileHelpers.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,12 @@
import java.util.List;
import org.apache.iceberg.DataFile;
import org.apache.iceberg.DeleteFile;
import org.apache.iceberg.FileFormat;
import org.apache.iceberg.Schema;
import org.apache.iceberg.StructLike;
import org.apache.iceberg.Table;
import org.apache.iceberg.deletes.BaseDVFileWriter;
import org.apache.iceberg.deletes.DVFileWriter;
import org.apache.iceberg.deletes.EqualityDeleteWriter;
import org.apache.iceberg.deletes.PositionDelete;
import org.apache.iceberg.deletes.PositionDeleteWriter;
Expand All @@ -35,6 +38,8 @@
import org.apache.iceberg.io.DataWriter;
import org.apache.iceberg.io.FileWriterFactory;
import org.apache.iceberg.io.OutputFile;
import org.apache.iceberg.io.OutputFileFactory;
import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
import org.apache.iceberg.types.Types;
import org.apache.iceberg.util.CharSequenceSet;
import org.apache.iceberg.util.Pair;
Expand All @@ -47,21 +52,53 @@ public static Pair<DeleteFile, CharSequenceSet> writeDeleteFile(
return writeDeleteFile(table, out, null, deletes);
}

public static Pair<DeleteFile, CharSequenceSet> writeDeleteFile(
Table table, OutputFile out, List<Pair<CharSequence, Long>> deletes, int formatVersion)
throws IOException {
return writeDeleteFile(table, out, null, deletes, formatVersion);
}

public static Pair<DeleteFile, CharSequenceSet> writeDeleteFile(
Table table, OutputFile out, StructLike partition, List<Pair<CharSequence, Long>> deletes)
throws IOException {
FileWriterFactory<Record> factory = GenericFileWriterFactory.builderFor(table).build();
return writeDeleteFile(table, out, partition, deletes, 2);
}

PositionDeleteWriter<Record> writer =
factory.newPositionDeleteWriter(encrypt(out), table.spec(), partition);
PositionDelete<Record> posDelete = PositionDelete.create();
try (Closeable toClose = writer) {
for (Pair<CharSequence, Long> delete : deletes) {
writer.write(posDelete.set(delete.first(), delete.second(), null));
public static Pair<DeleteFile, CharSequenceSet> writeDeleteFile(
Table table,
OutputFile out,
StructLike partition,
List<Pair<CharSequence, Long>> deletes,
int formatVersion)
throws IOException {
if (formatVersion >= 3) {
OutputFileFactory fileFactory =
OutputFileFactory.builderFor(table, 1, 1).format(FileFormat.PUFFIN).build();
DVFileWriter writer = new BaseDVFileWriter(fileFactory, p -> null);
try (DVFileWriter closeableWriter = writer) {
for (Pair<CharSequence, Long> delete : deletes) {
closeableWriter.delete(
delete.first().toString(), delete.second(), table.spec(), partition);
}
}

return Pair.of(
Iterables.getOnlyElement(writer.result().deleteFiles()),
writer.result().referencedDataFiles());
} else {
FileWriterFactory<Record> factory = GenericFileWriterFactory.builderFor(table).build();

PositionDeleteWriter<Record> writer =
factory.newPositionDeleteWriter(encrypt(out), table.spec(), partition);
PositionDelete<Record> posDelete = PositionDelete.create();
try (Closeable toClose = writer) {
for (Pair<CharSequence, Long> delete : deletes) {
writer.write(posDelete.set(delete.first(), delete.second(), null));
}
}
}

return Pair.of(writer.toDeleteFile(), writer.referencedDataFiles());
return Pair.of(writer.toDeleteFile(), writer.referencedDataFiles());
}
}

public static DeleteFile writeDeleteFile(
Expand Down Expand Up @@ -121,18 +158,43 @@ public static DataFile writeDataFile(
public static DeleteFile writePosDeleteFile(
Table table, OutputFile out, StructLike partition, List<PositionDelete<?>> deletes)
throws IOException {
FileWriterFactory<Record> factory =
GenericFileWriterFactory.builderFor(table).positionDeleteRowSchema(table.schema()).build();
return writePosDeleteFile(table, out, partition, deletes, 2);
}

PositionDeleteWriter<?> writer =
factory.newPositionDeleteWriter(encrypt(out), table.spec(), partition);
try (Closeable toClose = writer) {
for (PositionDelete delete : deletes) {
writer.write(delete);
public static DeleteFile writePosDeleteFile(
Table table,
OutputFile out,
StructLike partition,
List<PositionDelete<?>> deletes,
int formatVersion)
throws IOException {
if (formatVersion >= 3) {
OutputFileFactory fileFactory =
OutputFileFactory.builderFor(table, 1, 1).format(FileFormat.PUFFIN).build();
DVFileWriter writer = new BaseDVFileWriter(fileFactory, p -> null);
try (DVFileWriter closeableWriter = writer) {
for (PositionDelete<?> delete : deletes) {
closeableWriter.delete(delete.path().toString(), delete.pos(), table.spec(), partition);
}
}
}

return writer.toDeleteFile();
return Iterables.getOnlyElement(writer.result().deleteFiles());
} else {
FileWriterFactory<Record> factory =
GenericFileWriterFactory.builderFor(table)
.positionDeleteRowSchema(table.schema())
.build();

PositionDeleteWriter<?> writer =
factory.newPositionDeleteWriter(encrypt(out), table.spec(), partition);
try (Closeable toClose = writer) {
for (PositionDelete delete : deletes) {
writer.write(delete);
}
}

return writer.toDeleteFile();
}
}

private static EncryptedOutputFile encrypt(OutputFile out) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import java.io.File;
import java.io.IOException;
import java.nio.file.Path;
import java.util.Arrays;
import java.util.Collection;
import java.util.List;
import java.util.Set;
Expand All @@ -46,6 +47,7 @@
import org.apache.iceberg.Parameters;
import org.apache.iceberg.Schema;
import org.apache.iceberg.Table;
import org.apache.iceberg.TableProperties;
import org.apache.iceberg.actions.RewriteDataFilesActionResult;
import org.apache.iceberg.catalog.Namespace;
import org.apache.iceberg.catalog.TableIdentifier;
Expand Down Expand Up @@ -77,6 +79,9 @@ public class TestRewriteDataFilesAction extends CatalogTestBase {
@Parameter(index = 2)
private FileFormat format;

@Parameter(index = 3)
private int formatVersion;

private Table icebergTableUnPartitioned;
private Table icebergTablePartitioned;
private Table icebergTableWithPk;
Expand All @@ -87,15 +92,17 @@ protected TableEnvironment getTableEnv() {
return super.getTableEnv();
}

@Parameters(name = "catalogName={0}, baseNamespace={1}, format={2}")
@Parameters(name = "catalogName={0}, baseNamespace={1}, format={2}, formatVersion={3}")
public static List<Object[]> parameters() {
List<Object[]> parameters = Lists.newArrayList();
for (FileFormat format :
new FileFormat[] {FileFormat.AVRO, FileFormat.ORC, FileFormat.PARQUET}) {
for (Object[] catalogParams : CatalogTestBase.parameters()) {
String catalogName = (String) catalogParams[0];
Namespace baseNamespace = (Namespace) catalogParams[1];
parameters.add(new Object[] {catalogName, baseNamespace, format});
for (int version : Arrays.asList(2, 3)) {
String catalogName = (String) catalogParams[0];
Namespace baseNamespace = (Namespace) catalogParams[1];
parameters.add(new Object[] {catalogName, baseNamespace, format, version});
}
}
}
return parameters;
Expand All @@ -111,21 +118,21 @@ public void before() {
sql("USE CATALOG %s", catalogName);
sql("USE %s", DATABASE);
sql(
"CREATE TABLE %s (id int, data varchar) with ('write.format.default'='%s')",
TABLE_NAME_UNPARTITIONED, format.name());
"CREATE TABLE %s (id int, data varchar) with ('write.format.default'='%s', '%s'='%s')",
TABLE_NAME_UNPARTITIONED, format.name(), TableProperties.FORMAT_VERSION, formatVersion);
icebergTableUnPartitioned =
validationCatalog.loadTable(TableIdentifier.of(icebergNamespace, TABLE_NAME_UNPARTITIONED));

sql(
"CREATE TABLE %s (id int, data varchar,spec varchar) "
+ " PARTITIONED BY (data,spec) with ('write.format.default'='%s')",
TABLE_NAME_PARTITIONED, format.name());
+ " PARTITIONED BY (data,spec) with ('write.format.default'='%s', '%s'='%s')",
TABLE_NAME_PARTITIONED, format.name(), TableProperties.FORMAT_VERSION, formatVersion);
icebergTablePartitioned =
validationCatalog.loadTable(TableIdentifier.of(icebergNamespace, TABLE_NAME_PARTITIONED));

sql(
"CREATE TABLE %s (id int, data varchar, PRIMARY KEY(`id`) NOT ENFORCED) with ('write.format.default'='%s', 'format-version'='2')",
TABLE_NAME_WITH_PK, format.name());
"CREATE TABLE %s (id int, data varchar, PRIMARY KEY(`id`) NOT ENFORCED) with ('write.format.default'='%s', '%s'='%s')",
TABLE_NAME_WITH_PK, format.name(), TableProperties.FORMAT_VERSION, formatVersion);
icebergTableWithPk =
validationCatalog.loadTable(TableIdentifier.of(icebergNamespace, TABLE_NAME_WITH_PK));
}
Expand Down
Loading

0 comments on commit e06b069

Please sign in to comment.