Skip to content

Commit

Permalink
Fixing tests for incremental cleaning and savepoint interplay
Browse files Browse the repository at this point in the history
  • Loading branch information
nsivabalan committed Feb 12, 2024
1 parent 265cef9 commit d13ebd4
Show file tree
Hide file tree
Showing 3 changed files with 73 additions and 97 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,6 @@
public class CleanPlanActionExecutor<T, I, K, O> extends BaseActionExecutor<T, I, K, O, Option<HoodieCleanerPlan>> {

private static final Logger LOG = LoggerFactory.getLogger(CleanPlanActionExecutor.class);

private final Option<Map<String, String>> extraMetadata;

public CleanPlanActionExecutor(HoodieEngineContext context,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,8 +61,6 @@
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.function.Function;
import java.util.function.Predicate;
import java.util.stream.Collectors;
import java.util.stream.Stream;

Expand Down Expand Up @@ -115,7 +113,7 @@ public CleanPlanner(HoodieEngineContext context, HoodieTable<T, I, K, O> hoodieT
LOG.info("Load all partitions and files into file system view in advance.");
fileSystemView.loadAllPartitions();
}
// collect savepointed timestamps to be used with incremental cleaning. For non-partitioned and metadata table, we may not need this.
// collect savepointed timestamps to be assist with incremental cleaning. For non-partitioned and metadata table, we may not need this.
this.savepointedTimestamps = hoodieTable.isMetadataTable() ? Collections.EMPTY_LIST : (hoodieTable.isPartitioned() ? hoodieTable.getSavepointTimestamps().stream().collect(Collectors.toList())
: Collections.EMPTY_LIST);
}
Expand Down Expand Up @@ -222,41 +220,42 @@ private List<String> getPartitionPathsForIncrementalCleaning(HoodieCleanMetadata
HoodieTimeline.LESSER_THAN, newInstantToRetain.get().getTimestamp()))
.flatMap(this::getPartitionsForInstants).distinct().collect(Collectors.toList());

// If any savepoint is removed b/w previous clean and this clean planning, lets include the partitions of interest
// If any savepoint is removed b/w previous clean and this clean planning, lets include the partitions of interest.
// for metadata table and non partitioned table, we do not need this additional processing.
if (hoodieTable.isMetadataTable() || !hoodieTable.isPartitioned()) {
return incrementalPartitions;
}

List<String> partitionsFromDeletedSavepoints = getPartitionsFromDeletedSavepoint(cleanMetadata);
LOG.info("Including partitions part of savepointed commits which was removed after last known clean " + partitionsFromDeletedSavepoints.toString());
List<String> partitionsOfInterest = new ArrayList<>(incrementalPartitions);
partitionsOfInterest.addAll(partitionsFromDeletedSavepoints);
return incrementalPartitions.stream().distinct().collect(Collectors.toList());
}

private List<String> getPartitionsFromDeletedSavepoint(HoodieCleanMetadata cleanMetadata) {
List<String> savepointedTimestampsFromLastClean = Arrays.stream(cleanMetadata.getExtraMetadata()
.getOrDefault(SAVEPOINTED_TIMESTAMPS, StringUtils.EMPTY_STRING).split(","))
.filter(partition -> !StringUtils.isNullOrEmpty(partition)).collect(Collectors.toList());
if (savepointedTimestampsFromLastClean.isEmpty()) {
return incrementalPartitions;
return Collections.emptyList();
}
// check for any savepointed removed in latest compared to previous saved list
List<String> removedSavepointedTimestamps = new ArrayList<>(savepointedTimestampsFromLastClean);
removedSavepointedTimestamps.removeAll(savepointedTimestamps);
if (removedSavepointedTimestamps.isEmpty()) {
return incrementalPartitions;
return Collections.emptyList();
}

// fetch list of partitions from the removed savepoints and add it to return list

List<String> partitionsFromRemovedSavepoints = removedSavepointedTimestamps.stream().flatMap(savepointCommit -> {
Option<HoodieInstant> instantOption = hoodieTable.getCompletedCommitsTimeline()
.filter(instant -> instant.getTimestamp().equals(savepointCommit)).firstInstant();
return removedSavepointedTimestamps.stream().flatMap(savepointCommit -> {
Option<HoodieInstant> instantOption = hoodieTable.getCompletedCommitsTimeline().filter(instant -> instant.getTimestamp().equals(savepointCommit)).firstInstant();
if (!instantOption.isPresent()) {
LOG.warn("Skipping to process a commit for which savepoint was removed as the instant moved to archived timeline already");
}
HoodieInstant instant = instantOption.get();
return getPartitionsForInstants(instant);
}).collect(Collectors.toList());

LOG.info("Including partitions part of savepointed commits which was removed after last known clean " + removedSavepointedTimestamps.toString());
List<String> partitionsOfInterest = new ArrayList<>(incrementalPartitions);
partitionsOfInterest.addAll(partitionsFromRemovedSavepoints);
return incrementalPartitions.stream().distinct().collect(Collectors.toList());
}

/**
Expand Down
Loading

0 comments on commit d13ebd4

Please sign in to comment.