diff --git a/core/src/main/java/org/apache/iceberg/BaseRewriteManifests.java b/core/src/main/java/org/apache/iceberg/BaseRewriteManifests.java index dce6d4a995bd..ef578a82f3fa 100644 --- a/core/src/main/java/org/apache/iceberg/BaseRewriteManifests.java +++ b/core/src/main/java/org/apache/iceberg/BaseRewriteManifests.java @@ -52,7 +52,6 @@ public class BaseRewriteManifests extends SnapshotProducer private static final String REPLACED_MANIFESTS_COUNT = "manifests-replaced"; private static final String PROCESSED_ENTRY_COUNT = "entries-processed"; - private final TableOperations ops; private final Map specsById; private final long manifestTargetSizeBytes; @@ -74,10 +73,10 @@ public class BaseRewriteManifests extends SnapshotProducer BaseRewriteManifests(TableOperations ops) { super(ops); - this.ops = ops; - this.specsById = ops.current().specsById(); + this.specsById = ops().current().specsById(); this.manifestTargetSizeBytes = - ops.current() + ops() + .current() .propertyAsLong(MANIFEST_TARGET_SIZE_BYTES, MANIFEST_TARGET_SIZE_BYTES_DEFAULT); } @@ -153,8 +152,8 @@ public RewriteManifests addManifest(ManifestFile manifest) { } private ManifestFile copyManifest(ManifestFile manifest) { - TableMetadata current = ops.current(); - InputFile toCopy = ops.io().newInputFile(manifest); + TableMetadata current = ops().current(); + InputFile toCopy = ops().io().newInputFile(manifest); EncryptedOutputFile newFile = newManifestOutputFile(); return ManifestFiles.copyRewriteManifest( current.formatVersion(), @@ -168,7 +167,7 @@ private ManifestFile copyManifest(ManifestFile manifest) { @Override public List apply(TableMetadata base, Snapshot snapshot) { - List currentManifests = base.currentSnapshot().allManifests(ops.io()); + List currentManifests = base.currentSnapshot().allManifests(ops().io()); Set currentManifestSet = ImmutableSet.copyOf(currentManifests); validateDeletedManifests(currentManifestSet, base.currentSnapshot().snapshotId()); @@ -246,7 +245,7 @@ private void performRewrite(List currentManifests) { } else { rewrittenManifests.add(manifest); try (ManifestReader reader = - ManifestFiles.read(manifest, ops.io(), ops.current().specsById()) + ManifestFiles.read(manifest, ops().io(), ops().current().specsById()) .select(Collections.singletonList("*"))) { reader .liveEntries() diff --git a/core/src/main/java/org/apache/iceberg/FastAppend.java b/core/src/main/java/org/apache/iceberg/FastAppend.java index 1e2f6fe0d90d..51c0d5926fdb 100644 --- a/core/src/main/java/org/apache/iceberg/FastAppend.java +++ b/core/src/main/java/org/apache/iceberg/FastAppend.java @@ -34,7 +34,6 @@ /** {@link AppendFiles Append} implementation that adds a new manifest file for the write. */ class FastAppend extends SnapshotProducer implements AppendFiles { private final String tableName; - private final TableOperations ops; private final PartitionSpec spec; private final SnapshotSummary.Builder summaryBuilder = SnapshotSummary.builder(); private final DataFileSet newFiles = DataFileSet.create(); @@ -46,8 +45,7 @@ class FastAppend extends SnapshotProducer implements AppendFiles { FastAppend(String tableName, TableOperations ops) { super(ops); this.tableName = tableName; - this.ops = ops; - this.spec = ops.current().spec(); + this.spec = ops().current().spec(); } @Override @@ -69,7 +67,8 @@ protected String operation() { @Override protected Map summary() { summaryBuilder.setPartitionSummaryLimit( - ops.current() + ops() + .current() .propertyAsInt( TableProperties.WRITE_PARTITION_SUMMARY_LIMIT, TableProperties.WRITE_PARTITION_SUMMARY_LIMIT_DEFAULT)); @@ -118,8 +117,8 @@ public FastAppend appendManifest(ManifestFile manifest) { } private ManifestFile copyManifest(ManifestFile manifest) { - TableMetadata current = ops.current(); - InputFile toCopy = ops.io().newInputFile(manifest); + TableMetadata current = ops().current(); + InputFile toCopy = ops().io().newInputFile(manifest); EncryptedOutputFile newManifestFile = newManifestOutputFile(); return ManifestFiles.copyAppendManifest( current.formatVersion(), @@ -151,7 +150,7 @@ public List apply(TableMetadata base, Snapshot snapshot) { Iterables.addAll(manifests, appendManifestsWithMetadata); if (snapshot != null) { - manifests.addAll(snapshot.allManifests(ops.io())); + manifests.addAll(snapshot.allManifests(ops().io())); } return manifests; @@ -160,7 +159,7 @@ public List apply(TableMetadata base, Snapshot snapshot) { @Override public Object updateEvent() { long snapshotId = snapshotId(); - Snapshot snapshot = ops.current().snapshot(snapshotId); + Snapshot snapshot = ops().current().snapshot(snapshotId); long sequenceNumber = snapshot.sequenceNumber(); return new CreateSnapshotEvent( tableName, operation(), snapshotId, sequenceNumber, snapshot.summary()); diff --git a/core/src/main/java/org/apache/iceberg/MergingSnapshotProducer.java b/core/src/main/java/org/apache/iceberg/MergingSnapshotProducer.java index 41f0ad00178c..75dd7410115e 100644 --- a/core/src/main/java/org/apache/iceberg/MergingSnapshotProducer.java +++ b/core/src/main/java/org/apache/iceberg/MergingSnapshotProducer.java @@ -77,7 +77,6 @@ abstract class MergingSnapshotProducer extends SnapshotProducer { ImmutableSet.of(DataOperations.OVERWRITE, DataOperations.DELETE, DataOperations.REPLACE); private final String tableName; - private final TableOperations ops; private final SnapshotSummary.Builder summaryBuilder = SnapshotSummary.builder(); private final ManifestMergeManager mergeManager; private final ManifestFilterManager filterManager; @@ -108,7 +107,6 @@ abstract class MergingSnapshotProducer extends SnapshotProducer { MergingSnapshotProducer(String tableName, TableOperations ops) { super(ops); this.tableName = tableName; - this.ops = ops; long targetSizeBytes = ops.current() .propertyAsLong(MANIFEST_TARGET_SIZE_BYTES, MANIFEST_TARGET_SIZE_BYTES_DEFAULT); @@ -246,7 +244,7 @@ protected void add(DataFile file) { } private PartitionSpec spec(int specId) { - return ops.current().spec(specId); + return ops().current().spec(specId); } /** Add a delete file to the new snapshot. */ @@ -304,7 +302,7 @@ protected void validateNewDeleteFile(DeleteFile file) { } private int formatVersion() { - return ops.current().formatVersion(); + return ops().current().formatVersion(); } /** Add all files in a manifest to the new snapshot. */ @@ -322,8 +320,8 @@ protected void add(ManifestFile manifest) { } private ManifestFile copyManifest(ManifestFile manifest) { - TableMetadata current = ops.current(); - InputFile toCopy = ops.io().newInputFile(manifest); + TableMetadata current = ops().current(); + InputFile toCopy = ops().io().newInputFile(manifest); EncryptedOutputFile newManifestFile = newManifestOutputFile(); return ManifestFiles.copyAppendManifest( current.formatVersion(), @@ -427,7 +425,7 @@ private CloseableIterable> addedDataFiles( Set newSnapshots = history.second(); ManifestGroup manifestGroup = - new ManifestGroup(ops.io(), manifests, ImmutableList.of()) + new ManifestGroup(ops().io(), manifests, ImmutableList.of()) .caseSensitive(caseSensitive) .filterManifestEntries(entry -> newSnapshots.contains(entry.snapshotId())) .specsById(base.specsById()) @@ -590,7 +588,7 @@ protected DeleteFileIndex addedDeleteFiles( Snapshot parent) { // if there is no current table state, return empty delete file index if (parent == null || base.formatVersion() < 2) { - return DeleteFileIndex.builderFor(ops.io(), ImmutableList.of()) + return DeleteFileIndex.builderFor(ops().io(), ImmutableList.of()) .specsById(base.specsById()) .build(); } @@ -698,7 +696,7 @@ private CloseableIterable> deletedDataFiles( Set newSnapshots = history.second(); ManifestGroup manifestGroup = - new ManifestGroup(ops.io(), manifests, ImmutableList.of()) + new ManifestGroup(ops().io(), manifests, ImmutableList.of()) .caseSensitive(caseSensitive) .filterManifestEntries(entry -> newSnapshots.contains(entry.snapshotId())) .filterManifestEntries(entry -> entry.status().equals(ManifestEntry.Status.DELETED)) @@ -737,10 +735,10 @@ private DeleteFileIndex buildDeleteFileIndex( Expression dataFilter, PartitionSet partitionSet) { DeleteFileIndex.Builder builder = - DeleteFileIndex.builderFor(ops.io(), deleteManifests) + DeleteFileIndex.builderFor(ops().io(), deleteManifests) .afterSequenceNumber(startingSequenceNumber) .caseSensitive(caseSensitive) - .specsById(ops.current().specsById()); + .specsById(ops().current().specsById()); if (dataFilter != null) { builder.filterData(dataFilter); @@ -778,7 +776,7 @@ protected void validateDataFilesExist( Set newSnapshots = history.second(); ManifestGroup matchingDeletesGroup = - new ManifestGroup(ops.io(), manifests, ImmutableList.of()) + new ManifestGroup(ops().io(), manifests, ImmutableList.of()) .filterManifestEntries( entry -> entry.status() != ManifestEntry.Status.ADDED @@ -836,7 +834,7 @@ protected void validateAddedDVs( private void validateAddedDVs( ManifestFile manifest, Expression conflictDetectionFilter, Set newSnapshotIds) { try (CloseableIterable> entries = - ManifestFiles.readDeleteManifest(manifest, ops.io(), ops.current().specsById()) + ManifestFiles.readDeleteManifest(manifest, ops().io(), ops().current().specsById()) .filterRows(conflictDetectionFilter) .caseSensitive(caseSensitive) .liveEntries()) { @@ -875,13 +873,13 @@ private Pair, Set> validationHistory( if (matchingOperations.contains(currentSnapshot.operation())) { newSnapshots.add(currentSnapshot.snapshotId()); if (content == ManifestContent.DATA) { - for (ManifestFile manifest : currentSnapshot.dataManifests(ops.io())) { + for (ManifestFile manifest : currentSnapshot.dataManifests(ops().io())) { if (manifest.snapshotId() == currentSnapshot.snapshotId()) { manifests.add(manifest); } } } else { - for (ManifestFile manifest : currentSnapshot.deleteManifests(ops.io())) { + for (ManifestFile manifest : currentSnapshot.deleteManifests(ops().io())) { if (manifest.snapshotId() == currentSnapshot.snapshotId()) { manifests.add(manifest); } @@ -902,7 +900,8 @@ private Pair, Set> validationHistory( @Override protected Map summary() { summaryBuilder.setPartitionSummaryLimit( - ops.current() + ops() + .current() .propertyAsInt( TableProperties.WRITE_PARTITION_SUMMARY_LIMIT, TableProperties.WRITE_PARTITION_SUMMARY_LIMIT_DEFAULT)); @@ -915,7 +914,7 @@ public List apply(TableMetadata base, Snapshot snapshot) { List filtered = filterManager.filterManifests( SnapshotUtil.schemaFor(base, targetBranch()), - snapshot != null ? snapshot.dataManifests(ops.io()) : null); + snapshot != null ? snapshot.dataManifests(ops().io()) : null); long minDataSequenceNumber = filtered.stream() .map(ManifestFile::minSequenceNumber) @@ -929,7 +928,7 @@ public List apply(TableMetadata base, Snapshot snapshot) { List filteredDeletes = deleteFilterManager.filterManifests( SnapshotUtil.schemaFor(base, targetBranch()), - snapshot != null ? snapshot.deleteManifests(ops.io()) : null); + snapshot != null ? snapshot.deleteManifests(ops().io()) : null); // only keep manifests that have live data files or that were written by this commit Predicate shouldKeep = @@ -959,7 +958,7 @@ public List apply(TableMetadata base, Snapshot snapshot) { @Override public Object updateEvent() { long snapshotId = snapshotId(); - Snapshot justSaved = ops.refresh().snapshot(snapshotId); + Snapshot justSaved = ops().refresh().snapshot(snapshotId); long sequenceNumber = TableMetadata.INVALID_SEQUENCE_NUMBER; Map summary; if (justSaved == null) { @@ -1086,7 +1085,7 @@ private List newDeleteFilesAsManifests() { if (cachedNewDeleteManifests.isEmpty()) { newDeleteFilesBySpec.forEach( (specId, deleteFiles) -> { - PartitionSpec spec = ops.current().spec(specId); + PartitionSpec spec = ops().current().spec(specId); List newDeleteManifests = writeDeleteManifests(deleteFiles, spec); cachedNewDeleteManifests.addAll(newDeleteManifests); }); @@ -1099,7 +1098,7 @@ private List newDeleteFilesAsManifests() { private class DataFileFilterManager extends ManifestFilterManager { private DataFileFilterManager() { - super(ops.current().specsById(), MergingSnapshotProducer.this::workerPool); + super(ops().current().specsById(), MergingSnapshotProducer.this::workerPool); } @Override @@ -1136,7 +1135,7 @@ protected long snapshotId() { @Override protected PartitionSpec spec(int specId) { - return ops.current().spec(specId); + return ops().current().spec(specId); } @Override @@ -1157,7 +1156,7 @@ protected ManifestReader newManifestReader(ManifestFile manifest) { private class DeleteFileFilterManager extends ManifestFilterManager { private DeleteFileFilterManager() { - super(ops.current().specsById(), MergingSnapshotProducer.this::workerPool); + super(ops().current().specsById(), MergingSnapshotProducer.this::workerPool); } @Override @@ -1194,7 +1193,7 @@ protected long snapshotId() { @Override protected PartitionSpec spec(int specId) { - return ops.current().spec(specId); + return ops().current().spec(specId); } @Override diff --git a/core/src/main/java/org/apache/iceberg/SnapshotProducer.java b/core/src/main/java/org/apache/iceberg/SnapshotProducer.java index 45b71d654344..f5a7e99b684e 100644 --- a/core/src/main/java/org/apache/iceberg/SnapshotProducer.java +++ b/core/src/main/java/org/apache/iceberg/SnapshotProducer.java @@ -159,6 +159,10 @@ public ThisT scanManifestsWith(ExecutorService executorService) { return self(); } + protected TableOperations ops() { + return ops; + } + protected CommitMetrics commitMetrics() { if (commitMetrics == null) { this.commitMetrics = CommitMetrics.of(new DefaultMetricsContext());