Skip to content

Commit

Permalink
fix incorrect method
Browse files Browse the repository at this point in the history
  • Loading branch information
Jonathan Vexler committed Jan 16, 2024
1 parent 57a0846 commit 9cfce03
Show file tree
Hide file tree
Showing 3 changed files with 24 additions and 18 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.apache.hudi.common.model.HoodieCommitMetadata;
import org.apache.hudi.common.model.WriteOperationType;
import org.apache.hudi.common.table.timeline.HoodieInstant.State;
import org.apache.hudi.common.util.ClusteringUtils;
import org.apache.hudi.common.util.CollectionUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.StringUtils;
Expand Down Expand Up @@ -517,21 +518,9 @@ public Option<HoodieInstant> getLastClusterCommit() {

@Override
public Option<HoodieInstant> getLastPendingClusterCommit() {
return Option.fromJavaOptional(getCommitsTimeline().filter(s -> s.getAction().equalsIgnoreCase(HoodieTimeline.REPLACE_COMMIT_ACTION))
return Option.fromJavaOptional(filterPendingReplaceTimeline()
.getReverseOrderedInstants()
.filter(i -> {
try {
if (!i.isCompleted()) {
HoodieCommitMetadata metadata = TimelineUtils.getCommitMetadata(i, this);
return metadata.getOperationType().equals(WriteOperationType.CLUSTER);
} else {
return false;
}
} catch (IOException e) {
LOG.warn("Unable to read commit metadata for " + i + " due to " + e.getMessage());
return false;
}
}).findFirst());
.filter(i -> ClusteringUtils.isPendingClusteringInstant(this, i)).findFirst());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,12 +82,12 @@ public static boolean isClusteringCommit(HoodieTableMetaClient metaClient, Hoodi

/**
* Get requested replace metadata from timeline.
* @param metaClient
* @param timeline
* @param pendingReplaceInstant
* @return
* @throws IOException
*/
private static Option<HoodieRequestedReplaceMetadata> getRequestedReplaceMetadata(HoodieTableMetaClient metaClient, HoodieInstant pendingReplaceInstant) throws IOException {
private static Option<HoodieRequestedReplaceMetadata> getRequestedReplaceMetadata(HoodieTimeline timeline, HoodieInstant pendingReplaceInstant) throws IOException {
final HoodieInstant requestedInstant;
if (!pendingReplaceInstant.isRequested()) {
// inflight replacecommit files don't have clustering plan.
Expand All @@ -97,7 +97,7 @@ private static Option<HoodieRequestedReplaceMetadata> getRequestedReplaceMetadat
} else {
requestedInstant = pendingReplaceInstant;
}
Option<byte[]> content = metaClient.getActiveTimeline().getInstantDetails(requestedInstant);
Option<byte[]> content = timeline.getInstantDetails(requestedInstant);
if (!content.isPresent() || content.get().length == 0) {
// few operations create requested file without any content. Assume these are not clustering
return Option.empty();
Expand All @@ -112,8 +112,18 @@ private static Option<HoodieRequestedReplaceMetadata> getRequestedReplaceMetadat
* @return
*/
public static Option<Pair<HoodieInstant, HoodieClusteringPlan>> getClusteringPlan(HoodieTableMetaClient metaClient, HoodieInstant pendingReplaceInstant) {
return getClusteringPlan(metaClient.getActiveTimeline(), pendingReplaceInstant);
}

/**
* Get Clustering plan from timeline.
* @param timeline
* @param pendingReplaceInstant
* @return
*/
public static Option<Pair<HoodieInstant, HoodieClusteringPlan>> getClusteringPlan(HoodieTimeline timeline, HoodieInstant pendingReplaceInstant) {
try {
Option<HoodieRequestedReplaceMetadata> requestedReplaceMetadata = getRequestedReplaceMetadata(metaClient, pendingReplaceInstant);
Option<HoodieRequestedReplaceMetadata> requestedReplaceMetadata = getRequestedReplaceMetadata(timeline, pendingReplaceInstant);
if (requestedReplaceMetadata.isPresent() && WriteOperationType.CLUSTER.name().equals(requestedReplaceMetadata.get().getOperationType())) {
return Option.of(Pair.of(pendingReplaceInstant, requestedReplaceMetadata.get().getClusteringPlan()));
}
Expand Down Expand Up @@ -235,6 +245,10 @@ public static boolean isPendingClusteringInstant(HoodieTableMetaClient metaClien
return getClusteringPlan(metaClient, instant).isPresent();
}

public static boolean isPendingClusteringInstant(HoodieTimeline timeline, HoodieInstant instant) {
return getClusteringPlan(timeline, instant).isPresent();
}

/**
* Returns the earliest instant to retain.
* Make sure the clustering instant won't be archived before cleaned, and the earliest inflight clustering instant has a previous commit.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,9 @@ public void testClusteringPlanMultipleInstants() throws Exception {
validateClusteringInstant(fileIds1, partitionPath1, clusterTime1, fileGroupToInstantMap);
validateClusteringInstant(fileIds2, partitionPath1, clusterTime, fileGroupToInstantMap);
validateClusteringInstant(fileIds3, partitionPath1, clusterTime, fileGroupToInstantMap);
Option<HoodieInstant> lastPendingClustering = metaClient.getActiveTimeline().getLastPendingClusterCommit();
assertTrue(lastPendingClustering.isPresent());
assertEquals("2", lastPendingClustering.get().getTimestamp());
}

// replacecommit.inflight doesn't have clustering plan.
Expand Down

0 comments on commit 9cfce03

Please sign in to comment.