Skip to content

Commit

Permalink
Core: Adapt commit, scan, and snapshot stats for DVs (#11464)
Browse files Browse the repository at this point in the history
  • Loading branch information
aokolnychyi authored Nov 5, 2024
1 parent 5bd314b commit 549674b
Show file tree
Hide file tree
Showing 11 changed files with 173 additions and 3 deletions.
25 changes: 23 additions & 2 deletions core/src/main/java/org/apache/iceberg/SnapshotSummary.java
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.apache.iceberg.relocated.com.google.common.base.Strings;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.apache.iceberg.util.ContentFileUtil;
import org.apache.iceberg.util.ScanTaskUtil;

public class SnapshotSummary {
Expand All @@ -36,6 +37,8 @@ public class SnapshotSummary {
public static final String REMOVED_EQ_DELETE_FILES_PROP = "removed-equality-delete-files";
public static final String ADD_POS_DELETE_FILES_PROP = "added-position-delete-files";
public static final String REMOVED_POS_DELETE_FILES_PROP = "removed-position-delete-files";
public static final String ADDED_DVS_PROP = "added-dvs";
public static final String REMOVED_DVS_PROP = "removed-dvs";
public static final String REMOVED_DELETE_FILES_PROP = "removed-delete-files";
public static final String TOTAL_DELETE_FILES_PROP = "total-delete-files";
public static final String ADDED_RECORDS_PROP = "added-records";
Expand Down Expand Up @@ -222,6 +225,8 @@ private static class UpdateMetrics {
private int removedEqDeleteFiles = 0;
private int addedPosDeleteFiles = 0;
private int removedPosDeleteFiles = 0;
private int addedDVs = 0;
private int removedDVs = 0;
private int addedDeleteFiles = 0;
private int removedDeleteFiles = 0;
private long addedRecords = 0L;
Expand All @@ -243,6 +248,8 @@ void clear() {
this.removedPosDeleteFiles = 0;
this.addedDeleteFiles = 0;
this.removedDeleteFiles = 0;
this.addedDVs = 0;
this.removedDVs = 0;
this.addedRecords = 0L;
this.deletedRecords = 0L;
this.addedPosDeletes = 0L;
Expand All @@ -262,6 +269,8 @@ void addTo(ImmutableMap.Builder<String, String> builder) {
removedPosDeleteFiles > 0, builder, REMOVED_POS_DELETE_FILES_PROP, removedPosDeleteFiles);
setIf(addedDeleteFiles > 0, builder, ADDED_DELETE_FILES_PROP, addedDeleteFiles);
setIf(removedDeleteFiles > 0, builder, REMOVED_DELETE_FILES_PROP, removedDeleteFiles);
setIf(addedDVs > 0, builder, ADDED_DVS_PROP, addedDVs);
setIf(removedDVs > 0, builder, REMOVED_DVS_PROP, removedDVs);
setIf(addedRecords > 0, builder, ADDED_RECORDS_PROP, addedRecords);
setIf(deletedRecords > 0, builder, DELETED_RECORDS_PROP, deletedRecords);

Expand All @@ -283,8 +292,13 @@ void addedFile(ContentFile<?> file) {
this.addedRecords += file.recordCount();
break;
case POSITION_DELETES:
DeleteFile deleteFile = (DeleteFile) file;
if (ContentFileUtil.isDV(deleteFile)) {
this.addedDVs += 1;
} else {
this.addedPosDeleteFiles += 1;
}
this.addedDeleteFiles += 1;
this.addedPosDeleteFiles += 1;
this.addedPosDeletes += file.recordCount();
break;
case EQUALITY_DELETES:
Expand All @@ -306,8 +320,13 @@ void removedFile(ContentFile<?> file) {
this.deletedRecords += file.recordCount();
break;
case POSITION_DELETES:
DeleteFile deleteFile = (DeleteFile) file;
if (ContentFileUtil.isDV(deleteFile)) {
this.removedDVs += 1;
} else {
this.removedPosDeleteFiles += 1;
}
this.removedDeleteFiles += 1;
this.removedPosDeleteFiles += 1;
this.removedPosDeletes += file.recordCount();
break;
case EQUALITY_DELETES:
Expand Down Expand Up @@ -344,6 +363,8 @@ void merge(UpdateMetrics other) {
this.removedEqDeleteFiles += other.removedEqDeleteFiles;
this.addedPosDeleteFiles += other.addedPosDeleteFiles;
this.removedPosDeleteFiles += other.removedPosDeleteFiles;
this.addedDVs += other.addedDVs;
this.removedDVs += other.removedDVs;
this.addedDeleteFiles += other.addedDeleteFiles;
this.removedDeleteFiles += other.removedDeleteFiles;
this.addedSize += other.addedSize;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,9 @@ public interface CommitMetricsResult {
String ADDED_DELETE_FILES = "added-delete-files";
String ADDED_EQ_DELETE_FILES = "added-equality-delete-files";
String ADDED_POS_DELETE_FILES = "added-positional-delete-files";
String ADDED_DVS = "added-dvs";
String REMOVED_POS_DELETE_FILES = "removed-positional-delete-files";
String REMOVED_DVS = "removed-dvs";
String REMOVED_EQ_DELETE_FILES = "removed-equality-delete-files";
String REMOVED_DELETE_FILES = "removed-delete-files";
String TOTAL_DELETE_FILES = "total-delete-files";
Expand Down Expand Up @@ -75,6 +77,12 @@ public interface CommitMetricsResult {
@Nullable
CounterResult addedPositionalDeleteFiles();

@Nullable
@Value.Default
default CounterResult addedDVs() {
return null;
}

@Nullable
CounterResult removedDeleteFiles();

Expand All @@ -84,6 +92,12 @@ public interface CommitMetricsResult {
@Nullable
CounterResult removedPositionalDeleteFiles();

@Nullable
@Value.Default
default CounterResult removedDVs() {
return null;
}

@Nullable
CounterResult totalDeleteFiles();

Expand Down Expand Up @@ -136,13 +150,15 @@ static CommitMetricsResult from(
.addedDeleteFiles(counterFrom(snapshotSummary, SnapshotSummary.ADDED_DELETE_FILES_PROP))
.addedPositionalDeleteFiles(
counterFrom(snapshotSummary, SnapshotSummary.ADD_POS_DELETE_FILES_PROP))
.addedDVs(counterFrom(snapshotSummary, SnapshotSummary.ADDED_DVS_PROP))
.addedEqualityDeleteFiles(
counterFrom(snapshotSummary, SnapshotSummary.ADD_EQ_DELETE_FILES_PROP))
.removedDeleteFiles(counterFrom(snapshotSummary, SnapshotSummary.REMOVED_DELETE_FILES_PROP))
.removedEqualityDeleteFiles(
counterFrom(snapshotSummary, SnapshotSummary.REMOVED_EQ_DELETE_FILES_PROP))
.removedPositionalDeleteFiles(
counterFrom(snapshotSummary, SnapshotSummary.REMOVED_POS_DELETE_FILES_PROP))
.removedDVs(counterFrom(snapshotSummary, SnapshotSummary.REMOVED_DVS_PROP))
.totalDeleteFiles(counterFrom(snapshotSummary, SnapshotSummary.TOTAL_DELETE_FILES_PROP))
.addedRecords(counterFrom(snapshotSummary, SnapshotSummary.ADDED_RECORDS_PROP))
.removedRecords(counterFrom(snapshotSummary, SnapshotSummary.DELETED_RECORDS_PROP))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,11 @@ static void toJson(CommitMetricsResult metrics, JsonGenerator gen) throws IOExce
CounterResultParser.toJson(metrics.addedPositionalDeleteFiles(), gen);
}

if (null != metrics.addedDVs()) {
gen.writeFieldName(CommitMetricsResult.ADDED_DVS);
CounterResultParser.toJson(metrics.addedDVs(), gen);
}

if (null != metrics.removedDeleteFiles()) {
gen.writeFieldName(CommitMetricsResult.REMOVED_DELETE_FILES);
CounterResultParser.toJson(metrics.removedDeleteFiles(), gen);
Expand All @@ -91,6 +96,11 @@ static void toJson(CommitMetricsResult metrics, JsonGenerator gen) throws IOExce
CounterResultParser.toJson(metrics.removedPositionalDeleteFiles(), gen);
}

if (null != metrics.removedDVs()) {
gen.writeFieldName(CommitMetricsResult.REMOVED_DVS);
CounterResultParser.toJson(metrics.removedDVs(), gen);
}

if (null != metrics.removedEqualityDeleteFiles()) {
gen.writeFieldName(CommitMetricsResult.REMOVED_EQ_DELETE_FILES);
CounterResultParser.toJson(metrics.removedEqualityDeleteFiles(), gen);
Expand Down Expand Up @@ -186,10 +196,12 @@ static CommitMetricsResult fromJson(JsonNode json) {
CounterResultParser.fromJson(CommitMetricsResult.ADDED_EQ_DELETE_FILES, json))
.addedPositionalDeleteFiles(
CounterResultParser.fromJson(CommitMetricsResult.ADDED_POS_DELETE_FILES, json))
.addedDVs(CounterResultParser.fromJson(CommitMetricsResult.ADDED_DVS, json))
.removedEqualityDeleteFiles(
CounterResultParser.fromJson(CommitMetricsResult.REMOVED_EQ_DELETE_FILES, json))
.removedPositionalDeleteFiles(
CounterResultParser.fromJson(CommitMetricsResult.REMOVED_POS_DELETE_FILES, json))
.removedDVs(CounterResultParser.fromJson(CommitMetricsResult.REMOVED_DVS, json))
.removedDeleteFiles(
CounterResultParser.fromJson(CommitMetricsResult.REMOVED_DELETE_FILES, json))
.totalDeleteFiles(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ public abstract class ScanMetrics {
public static final String INDEXED_DELETE_FILES = "indexed-delete-files";
public static final String EQUALITY_DELETE_FILES = "equality-delete-files";
public static final String POSITIONAL_DELETE_FILES = "positional-delete-files";
public static final String DVS = "dvs";

public static ScanMetrics noop() {
return ScanMetrics.of(MetricsContext.nullMetrics());
Expand Down Expand Up @@ -127,6 +128,11 @@ public Counter positionalDeleteFiles() {
return metricsContext().counter(POSITIONAL_DELETE_FILES);
}

@Value.Derived
public Counter dvs() {
return metricsContext().counter(DVS);
}

public static ScanMetrics of(MetricsContext metricsContext) {
return ImmutableScanMetrics.builder().metricsContext(metricsContext).build();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,12 @@ public interface ScanMetricsResult {
@Nullable
CounterResult positionalDeleteFiles();

@Nullable
@Value.Default
default CounterResult dvs() {
return null;
}

static ScanMetricsResult fromScanMetrics(ScanMetrics scanMetrics) {
Preconditions.checkArgument(null != scanMetrics, "Invalid scan metrics: null");
return ImmutableScanMetricsResult.builder()
Expand All @@ -93,6 +99,7 @@ static ScanMetricsResult fromScanMetrics(ScanMetrics scanMetrics) {
.indexedDeleteFiles(CounterResult.fromCounter(scanMetrics.indexedDeleteFiles()))
.equalityDeleteFiles(CounterResult.fromCounter(scanMetrics.equalityDeleteFiles()))
.positionalDeleteFiles(CounterResult.fromCounter(scanMetrics.positionalDeleteFiles()))
.dvs(CounterResult.fromCounter(scanMetrics.dvs()))
.build();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,11 @@ static void toJson(ScanMetricsResult metrics, JsonGenerator gen) throws IOExcept
CounterResultParser.toJson(metrics.positionalDeleteFiles(), gen);
}

if (null != metrics.dvs()) {
gen.writeFieldName(ScanMetrics.DVS);
CounterResultParser.toJson(metrics.dvs(), gen);
}

gen.writeEndObject();
}

Expand Down Expand Up @@ -159,6 +164,7 @@ static ScanMetricsResult fromJson(JsonNode json) {
.equalityDeleteFiles(CounterResultParser.fromJson(ScanMetrics.EQUALITY_DELETE_FILES, json))
.positionalDeleteFiles(
CounterResultParser.fromJson(ScanMetrics.POSITIONAL_DELETE_FILES, json))
.dvs(CounterResultParser.fromJson(ScanMetrics.DVS, json))
.build();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.apache.iceberg.DataFile;
import org.apache.iceberg.DeleteFile;
import org.apache.iceberg.FileContent;
import org.apache.iceberg.util.ContentFileUtil;
import org.apache.iceberg.util.ScanTaskUtil;

public class ScanMetricsUtil {
Expand All @@ -31,7 +32,11 @@ public static void indexedDeleteFile(ScanMetrics metrics, DeleteFile deleteFile)
metrics.indexedDeleteFiles().increment();

if (deleteFile.content() == FileContent.POSITION_DELETES) {
metrics.positionalDeleteFiles().increment();
if (ContentFileUtil.isDV(deleteFile)) {
metrics.dvs().increment();
} else {
metrics.positionalDeleteFiles().increment();
}
} else if (deleteFile.content() == FileContent.EQUALITY_DELETES) {
metrics.equalityDeleteFiles().increment();
}
Expand Down
72 changes: 72 additions & 0 deletions core/src/test/java/org/apache/iceberg/TestSnapshotSummary.java
Original file line number Diff line number Diff line change
Expand Up @@ -358,4 +358,76 @@ public void rewriteWithDeletesAndDuplicates() {
.containsEntry(SnapshotSummary.TOTAL_FILE_SIZE_PROP, "20")
.containsEntry(SnapshotSummary.TOTAL_RECORDS_PROP, "1");
}

@TestTemplate
public void testFileSizeSummaryWithDVs() {
assumeThat(formatVersion).isGreaterThanOrEqualTo(3);

DeleteFile dv1 = newDV(FILE_A);
table.newRowDelta().addDeletes(dv1).commit();

DeleteFile dv2 = newDV(FILE_B);
table.newRowDelta().addDeletes(dv2).commit();

Map<String, String> summary1 = table.currentSnapshot().summary();
long addedPosDeletes1 = dv2.recordCount();
long addedFileSize1 = dv2.contentSizeInBytes();
long totalPosDeletes1 = dv1.recordCount() + dv2.recordCount();
long totalFileSize1 = dv1.contentSizeInBytes() + dv2.contentSizeInBytes();
assertThat(summary1)
.hasSize(12)
.doesNotContainKey(SnapshotSummary.ADD_POS_DELETE_FILES_PROP)
.doesNotContainKey(SnapshotSummary.REMOVED_POS_DELETE_FILES_PROP)
.containsEntry(SnapshotSummary.ADDED_DELETE_FILES_PROP, "1")
.doesNotContainKey(SnapshotSummary.REMOVED_DELETE_FILES_PROP)
.containsEntry(SnapshotSummary.ADDED_DVS_PROP, "1")
.doesNotContainKey(SnapshotSummary.REMOVED_DVS_PROP)
.containsEntry(SnapshotSummary.ADDED_POS_DELETES_PROP, String.valueOf(addedPosDeletes1))
.doesNotContainKey(SnapshotSummary.REMOVED_POS_DELETES_PROP)
.containsEntry(SnapshotSummary.ADDED_FILE_SIZE_PROP, String.valueOf(addedFileSize1))
.doesNotContainKey(SnapshotSummary.REMOVED_FILE_SIZE_PROP)
.containsEntry(SnapshotSummary.TOTAL_DELETE_FILES_PROP, "2")
.containsEntry(SnapshotSummary.TOTAL_POS_DELETES_PROP, String.valueOf(totalPosDeletes1))
.containsEntry(SnapshotSummary.TOTAL_FILE_SIZE_PROP, String.valueOf(totalFileSize1))
.containsEntry(SnapshotSummary.TOTAL_DATA_FILES_PROP, "0")
.containsEntry(SnapshotSummary.TOTAL_EQ_DELETES_PROP, "0")
.containsEntry(SnapshotSummary.TOTAL_RECORDS_PROP, "0")
.containsEntry(SnapshotSummary.CHANGED_PARTITION_COUNT_PROP, "1");

DeleteFile dv3 = newDV(FILE_A);
table
.newRowDelta()
.removeDeletes(dv1)
.removeDeletes(dv2)
.addDeletes(dv3)
.validateFromSnapshot(table.currentSnapshot().snapshotId())
.commit();

Map<String, String> summary2 = table.currentSnapshot().summary();
long addedPosDeletes2 = dv3.recordCount();
long removedPosDeletes2 = dv1.recordCount() + dv2.recordCount();
long addedFileSize2 = dv3.contentSizeInBytes();
long removedFileSize2 = dv1.contentSizeInBytes() + dv2.contentSizeInBytes();
long totalPosDeletes2 = dv3.recordCount();
long totalFileSize2 = dv3.contentSizeInBytes();
assertThat(summary2)
.hasSize(16)
.doesNotContainKey(SnapshotSummary.ADD_POS_DELETE_FILES_PROP)
.doesNotContainKey(SnapshotSummary.REMOVED_POS_DELETE_FILES_PROP)
.containsEntry(SnapshotSummary.ADDED_DELETE_FILES_PROP, "1")
.containsEntry(SnapshotSummary.REMOVED_DELETE_FILES_PROP, "2")
.containsEntry(SnapshotSummary.ADDED_DVS_PROP, "1")
.containsEntry(SnapshotSummary.REMOVED_DVS_PROP, "2")
.containsEntry(SnapshotSummary.ADDED_POS_DELETES_PROP, String.valueOf(addedPosDeletes2))
.containsEntry(SnapshotSummary.REMOVED_POS_DELETES_PROP, String.valueOf(removedPosDeletes2))
.containsEntry(SnapshotSummary.ADDED_FILE_SIZE_PROP, String.valueOf(addedFileSize2))
.containsEntry(SnapshotSummary.REMOVED_FILE_SIZE_PROP, String.valueOf(removedFileSize2))
.containsEntry(SnapshotSummary.TOTAL_DELETE_FILES_PROP, "1")
.containsEntry(SnapshotSummary.TOTAL_POS_DELETES_PROP, String.valueOf(totalPosDeletes2))
.containsEntry(SnapshotSummary.TOTAL_FILE_SIZE_PROP, String.valueOf(totalFileSize2))
.containsEntry(SnapshotSummary.TOTAL_DATA_FILES_PROP, "0")
.containsEntry(SnapshotSummary.TOTAL_EQ_DELETES_PROP, "0")
.containsEntry(SnapshotSummary.TOTAL_RECORDS_PROP, "0")
.containsEntry(SnapshotSummary.CHANGED_PARTITION_COUNT_PROP, "2");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,8 @@ public void roundTripSerde() {
.put(SnapshotSummary.ADDED_DELETE_FILES_PROP, "4")
.put(SnapshotSummary.ADD_EQ_DELETE_FILES_PROP, "5")
.put(SnapshotSummary.ADD_POS_DELETE_FILES_PROP, "6")
.put(SnapshotSummary.ADDED_DVS_PROP, "1")
.put(SnapshotSummary.REMOVED_DVS_PROP, "4")
.put(SnapshotSummary.REMOVED_POS_DELETE_FILES_PROP, "7")
.put(SnapshotSummary.REMOVED_EQ_DELETE_FILES_PROP, "8")
.put(SnapshotSummary.REMOVED_DELETE_FILES_PROP, "9")
Expand Down Expand Up @@ -101,6 +103,8 @@ public void roundTripSerde() {
assertThat(result.addedDeleteFiles().value()).isEqualTo(4L);
assertThat(result.addedEqualityDeleteFiles().value()).isEqualTo(5L);
assertThat(result.addedPositionalDeleteFiles().value()).isEqualTo(6L);
assertThat(result.addedDVs().value()).isEqualTo(1L);
assertThat(result.removedDVs().value()).isEqualTo(4L);
assertThat(result.removedPositionalDeleteFiles().value()).isEqualTo(7L);
assertThat(result.removedEqualityDeleteFiles().value()).isEqualTo(8L);
assertThat(result.removedDeleteFiles().value()).isEqualTo(9L);
Expand Down Expand Up @@ -153,6 +157,10 @@ public void roundTripSerde() {
+ " \"unit\" : \"count\",\n"
+ " \"value\" : 6\n"
+ " },\n"
+ " \"added-dvs\" : {\n"
+ " \"unit\" : \"count\",\n"
+ " \"value\" : 1\n"
+ " },\n"
+ " \"removed-delete-files\" : {\n"
+ " \"unit\" : \"count\",\n"
+ " \"value\" : 9\n"
Expand All @@ -161,6 +169,10 @@ public void roundTripSerde() {
+ " \"unit\" : \"count\",\n"
+ " \"value\" : 7\n"
+ " },\n"
+ " \"removed-dvs\" : {\n"
+ " \"unit\" : \"count\",\n"
+ " \"value\" : 4\n"
+ " },\n"
+ " \"removed-equality-delete-files\" : {\n"
+ " \"unit\" : \"count\",\n"
+ " \"value\" : 8\n"
Expand Down
Loading

0 comments on commit 549674b

Please sign in to comment.