Skip to content

Commit

Permalink
[Improve][Zeta] Move checkpoint notify complete in checkpoint stage (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
Hisoka-X authored Jul 31, 2023
1 parent 1db9f45 commit bc28eb1
Showing 1 changed file with 8 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -695,14 +695,6 @@ public synchronized void completePendingCheckpoint(CompletedCheckpoint completed
completedCheckpoint.getCheckpointTimestamp(),
completedCheckpoint.getCompletedTimestamp());
final long checkpointId = completedCheckpoint.getCheckpointId();
pendingCheckpoints.remove(checkpointId);
pendingCounter.decrementAndGet();
if (pendingCheckpoints.size() + 1 == coordinatorConfig.getMaxConcurrentCheckpoints()) {
// latest checkpoint completed time > checkpoint interval
if (notFinalCheckpoint(completedCheckpoint.getCheckpointType())) {
scheduleTriggerPendingCheckpoint(0L);
}
}
completedCheckpoints.addLast(completedCheckpoint);
try {
byte[] states = serializer.serialize(completedCheckpoint);
Expand Down Expand Up @@ -742,6 +734,14 @@ public synchronized void completePendingCheckpoint(CompletedCheckpoint completed
completedCheckpoint.getJobId());
latestCompletedCheckpoint = completedCheckpoint;
notifyCompleted(completedCheckpoint);
pendingCheckpoints.remove(checkpointId);
pendingCounter.decrementAndGet();
if (pendingCheckpoints.size() + 1 == coordinatorConfig.getMaxConcurrentCheckpoints()) {
// latest checkpoint completed time > checkpoint interval
if (notFinalCheckpoint(completedCheckpoint.getCheckpointType())) {
scheduleTriggerPendingCheckpoint(0L);
}
}
if (isCompleted()) {
cleanPendingCheckpoint(CheckpointCloseReason.CHECKPOINT_COORDINATOR_COMPLETED);
if (latestCompletedCheckpoint.getCheckpointType().equals(SAVEPOINT_TYPE)) {
Expand Down

0 comments on commit bc28eb1

Please sign in to comment.