Skip to content

Commit

Permalink
Core: Inherited classes from SnapshotProducer has TableOperations red…
Browse files Browse the repository at this point in the history
…undantly as member (#11578)
  • Loading branch information
gaborkaszab authored Nov 19, 2024
1 parent 568940f commit e71e3cb
Show file tree
Hide file tree
Showing 4 changed files with 41 additions and 40 deletions.
15 changes: 7 additions & 8 deletions core/src/main/java/org/apache/iceberg/BaseRewriteManifests.java
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,6 @@ public class BaseRewriteManifests extends SnapshotProducer<RewriteManifests>
private static final String REPLACED_MANIFESTS_COUNT = "manifests-replaced";
private static final String PROCESSED_ENTRY_COUNT = "entries-processed";

private final TableOperations ops;
private final Map<Integer, PartitionSpec> specsById;
private final long manifestTargetSizeBytes;

Expand All @@ -74,10 +73,10 @@ public class BaseRewriteManifests extends SnapshotProducer<RewriteManifests>

BaseRewriteManifests(TableOperations ops) {
super(ops);
this.ops = ops;
this.specsById = ops.current().specsById();
this.specsById = ops().current().specsById();
this.manifestTargetSizeBytes =
ops.current()
ops()
.current()
.propertyAsLong(MANIFEST_TARGET_SIZE_BYTES, MANIFEST_TARGET_SIZE_BYTES_DEFAULT);
}

Expand Down Expand Up @@ -153,8 +152,8 @@ public RewriteManifests addManifest(ManifestFile manifest) {
}

private ManifestFile copyManifest(ManifestFile manifest) {
TableMetadata current = ops.current();
InputFile toCopy = ops.io().newInputFile(manifest);
TableMetadata current = ops().current();
InputFile toCopy = ops().io().newInputFile(manifest);
EncryptedOutputFile newFile = newManifestOutputFile();
return ManifestFiles.copyRewriteManifest(
current.formatVersion(),
Expand All @@ -168,7 +167,7 @@ private ManifestFile copyManifest(ManifestFile manifest) {

@Override
public List<ManifestFile> apply(TableMetadata base, Snapshot snapshot) {
List<ManifestFile> currentManifests = base.currentSnapshot().allManifests(ops.io());
List<ManifestFile> currentManifests = base.currentSnapshot().allManifests(ops().io());
Set<ManifestFile> currentManifestSet = ImmutableSet.copyOf(currentManifests);

validateDeletedManifests(currentManifestSet, base.currentSnapshot().snapshotId());
Expand Down Expand Up @@ -246,7 +245,7 @@ private void performRewrite(List<ManifestFile> currentManifests) {
} else {
rewrittenManifests.add(manifest);
try (ManifestReader<DataFile> reader =
ManifestFiles.read(manifest, ops.io(), ops.current().specsById())
ManifestFiles.read(manifest, ops().io(), ops().current().specsById())
.select(Collections.singletonList("*"))) {
reader
.liveEntries()
Expand Down
15 changes: 7 additions & 8 deletions core/src/main/java/org/apache/iceberg/FastAppend.java
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@
/** {@link AppendFiles Append} implementation that adds a new manifest file for the write. */
class FastAppend extends SnapshotProducer<AppendFiles> implements AppendFiles {
private final String tableName;
private final TableOperations ops;
private final PartitionSpec spec;
private final SnapshotSummary.Builder summaryBuilder = SnapshotSummary.builder();
private final DataFileSet newFiles = DataFileSet.create();
Expand All @@ -46,8 +45,7 @@ class FastAppend extends SnapshotProducer<AppendFiles> implements AppendFiles {
FastAppend(String tableName, TableOperations ops) {
super(ops);
this.tableName = tableName;
this.ops = ops;
this.spec = ops.current().spec();
this.spec = ops().current().spec();
}

@Override
Expand All @@ -69,7 +67,8 @@ protected String operation() {
@Override
protected Map<String, String> summary() {
summaryBuilder.setPartitionSummaryLimit(
ops.current()
ops()
.current()
.propertyAsInt(
TableProperties.WRITE_PARTITION_SUMMARY_LIMIT,
TableProperties.WRITE_PARTITION_SUMMARY_LIMIT_DEFAULT));
Expand Down Expand Up @@ -118,8 +117,8 @@ public FastAppend appendManifest(ManifestFile manifest) {
}

private ManifestFile copyManifest(ManifestFile manifest) {
TableMetadata current = ops.current();
InputFile toCopy = ops.io().newInputFile(manifest);
TableMetadata current = ops().current();
InputFile toCopy = ops().io().newInputFile(manifest);
EncryptedOutputFile newManifestFile = newManifestOutputFile();
return ManifestFiles.copyAppendManifest(
current.formatVersion(),
Expand Down Expand Up @@ -151,7 +150,7 @@ public List<ManifestFile> apply(TableMetadata base, Snapshot snapshot) {
Iterables.addAll(manifests, appendManifestsWithMetadata);

if (snapshot != null) {
manifests.addAll(snapshot.allManifests(ops.io()));
manifests.addAll(snapshot.allManifests(ops().io()));
}

return manifests;
Expand All @@ -160,7 +159,7 @@ public List<ManifestFile> apply(TableMetadata base, Snapshot snapshot) {
@Override
public Object updateEvent() {
long snapshotId = snapshotId();
Snapshot snapshot = ops.current().snapshot(snapshotId);
Snapshot snapshot = ops().current().snapshot(snapshotId);
long sequenceNumber = snapshot.sequenceNumber();
return new CreateSnapshotEvent(
tableName, operation(), snapshotId, sequenceNumber, snapshot.summary());
Expand Down
47 changes: 23 additions & 24 deletions core/src/main/java/org/apache/iceberg/MergingSnapshotProducer.java
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,6 @@ abstract class MergingSnapshotProducer<ThisT> extends SnapshotProducer<ThisT> {
ImmutableSet.of(DataOperations.OVERWRITE, DataOperations.DELETE, DataOperations.REPLACE);

private final String tableName;
private final TableOperations ops;
private final SnapshotSummary.Builder summaryBuilder = SnapshotSummary.builder();
private final ManifestMergeManager<DataFile> mergeManager;
private final ManifestFilterManager<DataFile> filterManager;
Expand Down Expand Up @@ -108,7 +107,6 @@ abstract class MergingSnapshotProducer<ThisT> extends SnapshotProducer<ThisT> {
MergingSnapshotProducer(String tableName, TableOperations ops) {
super(ops);
this.tableName = tableName;
this.ops = ops;
long targetSizeBytes =
ops.current()
.propertyAsLong(MANIFEST_TARGET_SIZE_BYTES, MANIFEST_TARGET_SIZE_BYTES_DEFAULT);
Expand Down Expand Up @@ -246,7 +244,7 @@ protected void add(DataFile file) {
}

private PartitionSpec spec(int specId) {
return ops.current().spec(specId);
return ops().current().spec(specId);
}

/** Add a delete file to the new snapshot. */
Expand Down Expand Up @@ -304,7 +302,7 @@ protected void validateNewDeleteFile(DeleteFile file) {
}

private int formatVersion() {
return ops.current().formatVersion();
return ops().current().formatVersion();
}

/** Add all files in a manifest to the new snapshot. */
Expand All @@ -322,8 +320,8 @@ protected void add(ManifestFile manifest) {
}

private ManifestFile copyManifest(ManifestFile manifest) {
TableMetadata current = ops.current();
InputFile toCopy = ops.io().newInputFile(manifest);
TableMetadata current = ops().current();
InputFile toCopy = ops().io().newInputFile(manifest);
EncryptedOutputFile newManifestFile = newManifestOutputFile();
return ManifestFiles.copyAppendManifest(
current.formatVersion(),
Expand Down Expand Up @@ -427,7 +425,7 @@ private CloseableIterable<ManifestEntry<DataFile>> addedDataFiles(
Set<Long> newSnapshots = history.second();

ManifestGroup manifestGroup =
new ManifestGroup(ops.io(), manifests, ImmutableList.of())
new ManifestGroup(ops().io(), manifests, ImmutableList.of())
.caseSensitive(caseSensitive)
.filterManifestEntries(entry -> newSnapshots.contains(entry.snapshotId()))
.specsById(base.specsById())
Expand Down Expand Up @@ -590,7 +588,7 @@ protected DeleteFileIndex addedDeleteFiles(
Snapshot parent) {
// if there is no current table state, return empty delete file index
if (parent == null || base.formatVersion() < 2) {
return DeleteFileIndex.builderFor(ops.io(), ImmutableList.of())
return DeleteFileIndex.builderFor(ops().io(), ImmutableList.of())
.specsById(base.specsById())
.build();
}
Expand Down Expand Up @@ -698,7 +696,7 @@ private CloseableIterable<ManifestEntry<DataFile>> deletedDataFiles(
Set<Long> newSnapshots = history.second();

ManifestGroup manifestGroup =
new ManifestGroup(ops.io(), manifests, ImmutableList.of())
new ManifestGroup(ops().io(), manifests, ImmutableList.of())
.caseSensitive(caseSensitive)
.filterManifestEntries(entry -> newSnapshots.contains(entry.snapshotId()))
.filterManifestEntries(entry -> entry.status().equals(ManifestEntry.Status.DELETED))
Expand Down Expand Up @@ -737,10 +735,10 @@ private DeleteFileIndex buildDeleteFileIndex(
Expression dataFilter,
PartitionSet partitionSet) {
DeleteFileIndex.Builder builder =
DeleteFileIndex.builderFor(ops.io(), deleteManifests)
DeleteFileIndex.builderFor(ops().io(), deleteManifests)
.afterSequenceNumber(startingSequenceNumber)
.caseSensitive(caseSensitive)
.specsById(ops.current().specsById());
.specsById(ops().current().specsById());

if (dataFilter != null) {
builder.filterData(dataFilter);
Expand Down Expand Up @@ -778,7 +776,7 @@ protected void validateDataFilesExist(
Set<Long> newSnapshots = history.second();

ManifestGroup matchingDeletesGroup =
new ManifestGroup(ops.io(), manifests, ImmutableList.of())
new ManifestGroup(ops().io(), manifests, ImmutableList.of())
.filterManifestEntries(
entry ->
entry.status() != ManifestEntry.Status.ADDED
Expand Down Expand Up @@ -836,7 +834,7 @@ protected void validateAddedDVs(
private void validateAddedDVs(
ManifestFile manifest, Expression conflictDetectionFilter, Set<Long> newSnapshotIds) {
try (CloseableIterable<ManifestEntry<DeleteFile>> entries =
ManifestFiles.readDeleteManifest(manifest, ops.io(), ops.current().specsById())
ManifestFiles.readDeleteManifest(manifest, ops().io(), ops().current().specsById())
.filterRows(conflictDetectionFilter)
.caseSensitive(caseSensitive)
.liveEntries()) {
Expand Down Expand Up @@ -875,13 +873,13 @@ private Pair<List<ManifestFile>, Set<Long>> validationHistory(
if (matchingOperations.contains(currentSnapshot.operation())) {
newSnapshots.add(currentSnapshot.snapshotId());
if (content == ManifestContent.DATA) {
for (ManifestFile manifest : currentSnapshot.dataManifests(ops.io())) {
for (ManifestFile manifest : currentSnapshot.dataManifests(ops().io())) {
if (manifest.snapshotId() == currentSnapshot.snapshotId()) {
manifests.add(manifest);
}
}
} else {
for (ManifestFile manifest : currentSnapshot.deleteManifests(ops.io())) {
for (ManifestFile manifest : currentSnapshot.deleteManifests(ops().io())) {
if (manifest.snapshotId() == currentSnapshot.snapshotId()) {
manifests.add(manifest);
}
Expand All @@ -902,7 +900,8 @@ private Pair<List<ManifestFile>, Set<Long>> validationHistory(
@Override
protected Map<String, String> summary() {
summaryBuilder.setPartitionSummaryLimit(
ops.current()
ops()
.current()
.propertyAsInt(
TableProperties.WRITE_PARTITION_SUMMARY_LIMIT,
TableProperties.WRITE_PARTITION_SUMMARY_LIMIT_DEFAULT));
Expand All @@ -915,7 +914,7 @@ public List<ManifestFile> apply(TableMetadata base, Snapshot snapshot) {
List<ManifestFile> filtered =
filterManager.filterManifests(
SnapshotUtil.schemaFor(base, targetBranch()),
snapshot != null ? snapshot.dataManifests(ops.io()) : null);
snapshot != null ? snapshot.dataManifests(ops().io()) : null);
long minDataSequenceNumber =
filtered.stream()
.map(ManifestFile::minSequenceNumber)
Expand All @@ -929,7 +928,7 @@ public List<ManifestFile> apply(TableMetadata base, Snapshot snapshot) {
List<ManifestFile> filteredDeletes =
deleteFilterManager.filterManifests(
SnapshotUtil.schemaFor(base, targetBranch()),
snapshot != null ? snapshot.deleteManifests(ops.io()) : null);
snapshot != null ? snapshot.deleteManifests(ops().io()) : null);

// only keep manifests that have live data files or that were written by this commit
Predicate<ManifestFile> shouldKeep =
Expand Down Expand Up @@ -959,7 +958,7 @@ public List<ManifestFile> apply(TableMetadata base, Snapshot snapshot) {
@Override
public Object updateEvent() {
long snapshotId = snapshotId();
Snapshot justSaved = ops.refresh().snapshot(snapshotId);
Snapshot justSaved = ops().refresh().snapshot(snapshotId);
long sequenceNumber = TableMetadata.INVALID_SEQUENCE_NUMBER;
Map<String, String> summary;
if (justSaved == null) {
Expand Down Expand Up @@ -1086,7 +1085,7 @@ private List<ManifestFile> newDeleteFilesAsManifests() {
if (cachedNewDeleteManifests.isEmpty()) {
newDeleteFilesBySpec.forEach(
(specId, deleteFiles) -> {
PartitionSpec spec = ops.current().spec(specId);
PartitionSpec spec = ops().current().spec(specId);
List<ManifestFile> newDeleteManifests = writeDeleteManifests(deleteFiles, spec);
cachedNewDeleteManifests.addAll(newDeleteManifests);
});
Expand All @@ -1099,7 +1098,7 @@ private List<ManifestFile> newDeleteFilesAsManifests() {

private class DataFileFilterManager extends ManifestFilterManager<DataFile> {
private DataFileFilterManager() {
super(ops.current().specsById(), MergingSnapshotProducer.this::workerPool);
super(ops().current().specsById(), MergingSnapshotProducer.this::workerPool);
}

@Override
Expand Down Expand Up @@ -1136,7 +1135,7 @@ protected long snapshotId() {

@Override
protected PartitionSpec spec(int specId) {
return ops.current().spec(specId);
return ops().current().spec(specId);
}

@Override
Expand All @@ -1157,7 +1156,7 @@ protected ManifestReader<DataFile> newManifestReader(ManifestFile manifest) {

private class DeleteFileFilterManager extends ManifestFilterManager<DeleteFile> {
private DeleteFileFilterManager() {
super(ops.current().specsById(), MergingSnapshotProducer.this::workerPool);
super(ops().current().specsById(), MergingSnapshotProducer.this::workerPool);
}

@Override
Expand Down Expand Up @@ -1194,7 +1193,7 @@ protected long snapshotId() {

@Override
protected PartitionSpec spec(int specId) {
return ops.current().spec(specId);
return ops().current().spec(specId);
}

@Override
Expand Down
4 changes: 4 additions & 0 deletions core/src/main/java/org/apache/iceberg/SnapshotProducer.java
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,10 @@ public ThisT scanManifestsWith(ExecutorService executorService) {
return self();
}

protected TableOperations ops() {
return ops;
}

protected CommitMetrics commitMetrics() {
if (commitMetrics == null) {
this.commitMetrics = CommitMetrics.of(new DefaultMetricsContext());
Expand Down

0 comments on commit e71e3cb

Please sign in to comment.