Skip to content

Commit

Permalink
improve job clean code
Browse files Browse the repository at this point in the history
  • Loading branch information
EricJoy2048 committed Feb 8, 2023
1 parent 65f5a16 commit 9762f88
Show file tree
Hide file tree
Showing 2 changed files with 41 additions and 39 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -212,6 +212,7 @@ private void restoreJobFromMasterActiveSwitch(@NonNull Long jobId, @NonNull JobI
runningJobStateIMap,
runningJobStateTimestampsIMap,
ownedSlotProfilesIMap,
runningJobInfoIMap,
engineConfig);

jobMaster.init(runningJobInfoIMap.get(jobId).getInitializationTimestamp());
Expand All @@ -227,7 +228,7 @@ private void restoreJobFromMasterActiveSwitch(@NonNull Long jobId, @NonNull JobI
logger.info(String.format(
"The restore %s is in an end state %s, store the job info to JobHistory and clear the job running time info",
jobFullName, jobStatus));
removeJobIMap(jobMaster);
jobMaster.cleanJob();
return;
}

Expand All @@ -251,7 +252,7 @@ private void restoreJobFromMasterActiveSwitch(@NonNull Long jobId, @NonNull JobI
jobMaster.run();
} finally {
// storage job state info to HistoryStorage
onJobDone(jobMaster, jobId);
runningJobMasterMap.remove(jobId);
}
});
return;
Expand All @@ -265,7 +266,7 @@ private void restoreJobFromMasterActiveSwitch(@NonNull Long jobId, @NonNull JobI
jobMaster.getPhysicalPlan().getPipelineList().forEach(SubPlan::restorePipelineState);
jobMaster.run();
} finally {
onJobDone(jobMaster, jobId);
runningJobMasterMap.remove(jobId);
}
});
}
Expand Down Expand Up @@ -335,7 +336,9 @@ public PassiveCompletableFuture<Void> submitJob(long jobId, Data jobImmutableInf
getJobHistoryService(),
runningJobStateIMap,
runningJobStateTimestampsIMap,
ownedSlotProfilesIMap, engineConfig);
ownedSlotProfilesIMap,
runningJobInfoIMap,
engineConfig);
executorService.submit(() -> {
try {
runningJobInfoIMap.put(jobId, new JobInfo(System.currentTimeMillis(), jobImmutableInformation));
Expand All @@ -353,7 +356,7 @@ public PassiveCompletableFuture<Void> submitJob(long jobId, Data jobImmutableInf
try {
jobMaster.run();
} finally {
onJobDone(jobMaster, jobId);
runningJobMasterMap.remove(jobId);
}
});
return new PassiveCompletableFuture<>(voidCompletableFuture);
Expand All @@ -372,37 +375,6 @@ public PassiveCompletableFuture<Void> savePoint(long jobId) {
return new PassiveCompletableFuture<>(voidCompletableFuture);
}

private void onJobDone(JobMaster jobMaster, long jobId) {
// storage job state and metrics to HistoryStorage
jobHistoryService.storeJobInfo(jobId, runningJobMasterMap.get(jobId).getJobDAGInfo());
jobHistoryService.storeFinishedJobState(jobMaster);
removeJobIMap(jobMaster);
runningJobMasterMap.remove(jobId);
}

private void removeJobIMap(JobMaster jobMaster) {
Long jobId = jobMaster.getJobImmutableInformation().getJobId();
runningJobStateTimestampsIMap.remove(jobId);

jobMaster.getPhysicalPlan().getPipelineList().forEach(pipeline -> {
runningJobStateIMap.remove(pipeline.getPipelineLocation());
runningJobStateTimestampsIMap.remove(pipeline.getPipelineLocation());
pipeline.getCoordinatorVertexList().forEach(coordinator -> {
runningJobStateIMap.remove(coordinator.getTaskGroupLocation());
runningJobStateTimestampsIMap.remove(coordinator.getTaskGroupLocation());
});

pipeline.getPhysicalVertexList().forEach(task -> {
runningJobStateIMap.remove(task.getTaskGroupLocation());
runningJobStateTimestampsIMap.remove(task.getTaskGroupLocation());
});
});

// These should be deleted at the end.
runningJobStateIMap.remove(jobId);
runningJobInfoIMap.remove(jobId);
}

public PassiveCompletableFuture<JobResult> waitForJobComplete(long jobId) {
JobMaster runningJobMaster = runningJobMasterMap.get(jobId);
if (runningJobMaster == null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import org.apache.seatunnel.engine.core.dag.logical.LogicalDag;
import org.apache.seatunnel.engine.core.job.JobDAGInfo;
import org.apache.seatunnel.engine.core.job.JobImmutableInformation;
import org.apache.seatunnel.engine.core.job.JobInfo;
import org.apache.seatunnel.engine.core.job.JobResult;
import org.apache.seatunnel.engine.core.job.JobStatus;
import org.apache.seatunnel.engine.core.job.PipelineStatus;
Expand Down Expand Up @@ -133,14 +134,18 @@ public class JobMaster {

private Map<Integer, CheckpointPlan> checkpointPlanMap;

private final IMap<Long, JobInfo> runningJobInfoIMap;

public JobMaster(@NonNull Data jobImmutableInformationData,
@NonNull NodeEngine nodeEngine,
@NonNull ExecutorService executorService,
@NonNull ResourceManager resourceManager,
@NonNull JobHistoryService jobHistoryService,
@NonNull IMap runningJobStateIMap,
@NonNull IMap runningJobStateTimestampsIMap,
@NonNull IMap ownedSlotProfilesIMap, EngineConfig engineConfig) {
@NonNull IMap ownedSlotProfilesIMap,
@NonNull IMap<Long, JobInfo> runningJobInfoIMap,
EngineConfig engineConfig) {
this.jobImmutableInformationData = jobImmutableInformationData;
this.nodeEngine = nodeEngine;
this.executorService = executorService;
Expand All @@ -151,6 +156,7 @@ public JobMaster(@NonNull Data jobImmutableInformationData,
this.jobHistoryService = jobHistoryService;
this.runningJobStateIMap = runningJobStateIMap;
this.runningJobStateTimestampsIMap = runningJobStateTimestampsIMap;
this.runningJobInfoIMap = runningJobInfoIMap;
this.engineConfig = engineConfig;
}

Expand Down Expand Up @@ -216,9 +222,9 @@ public void initStateFuture() {
jobStatusFuture.whenComplete(withTryCatch(LOGGER, (v, t) -> {
// We need not handle t, Because we will not return t from physicalPlan
if (JobStatus.FAILING.equals(v.getStatus())) {
cleanJob();
physicalPlan.updateJobState(JobStatus.FAILING, JobStatus.FAILED);
}
cleanJob();
jobMasterCompleteFuture.complete(new JobResult(physicalPlan.getJobStatus(), v.getError()));
}));
}
Expand Down Expand Up @@ -255,6 +261,28 @@ public void handleCheckpointError(long pipelineId, Throwable e) {
});
}

private void removeJobIMap() {
Long jobId = getJobImmutableInformation().getJobId();
runningJobStateTimestampsIMap.remove(jobId);

getPhysicalPlan().getPipelineList().forEach(pipeline -> {
runningJobStateIMap.remove(pipeline.getPipelineLocation());
runningJobStateTimestampsIMap.remove(pipeline.getPipelineLocation());
pipeline.getCoordinatorVertexList().forEach(coordinator -> {
runningJobStateIMap.remove(coordinator.getTaskGroupLocation());
runningJobStateTimestampsIMap.remove(coordinator.getTaskGroupLocation());
});

pipeline.getPhysicalVertexList().forEach(task -> {
runningJobStateIMap.remove(task.getTaskGroupLocation());
runningJobStateTimestampsIMap.remove(task.getTaskGroupLocation());
});
});

runningJobStateIMap.remove(jobId);
runningJobInfoIMap.remove(jobId);
}

public JobDAGInfo getJobDAGInfo() {
if (jobDAGInfo == null) {
jobDAGInfo = DAGUtils.getJobDAGInfo(logicalDag, jobImmutableInformation, isPhysicalDAGIInfo);
Expand All @@ -276,7 +304,9 @@ public void releasePipelineResource(SubPlan subPlan) {
}

public void cleanJob() {
// TODO Add some job clean operation
jobHistoryService.storeJobInfo(jobImmutableInformation.getJobId(), getJobDAGInfo());
jobHistoryService.storeFinishedJobState(this);
removeJobIMap();
}

public Address queryTaskGroupAddress(long taskGroupId) {
Expand Down

0 comments on commit 9762f88

Please sign in to comment.