From 9762f888be7890616acb8571747ed1e0d56ff963 Mon Sep 17 00:00:00 2001 From: gaojun Date: Wed, 8 Feb 2023 14:53:28 +0800 Subject: [PATCH 1/2] improve job clean code --- .../engine/server/CoordinatorService.java | 44 ++++--------------- .../engine/server/master/JobMaster.java | 36 +++++++++++++-- 2 files changed, 41 insertions(+), 39 deletions(-) diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/CoordinatorService.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/CoordinatorService.java index 5e68e661b36..2f9620d877f 100644 --- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/CoordinatorService.java +++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/CoordinatorService.java @@ -212,6 +212,7 @@ private void restoreJobFromMasterActiveSwitch(@NonNull Long jobId, @NonNull JobI runningJobStateIMap, runningJobStateTimestampsIMap, ownedSlotProfilesIMap, + runningJobInfoIMap, engineConfig); jobMaster.init(runningJobInfoIMap.get(jobId).getInitializationTimestamp()); @@ -227,7 +228,7 @@ private void restoreJobFromMasterActiveSwitch(@NonNull Long jobId, @NonNull JobI logger.info(String.format( "The restore %s is in an end state %s, store the job info to JobHistory and clear the job running time info", jobFullName, jobStatus)); - removeJobIMap(jobMaster); + jobMaster.cleanJob(); return; } @@ -251,7 +252,7 @@ private void restoreJobFromMasterActiveSwitch(@NonNull Long jobId, @NonNull JobI jobMaster.run(); } finally { // storage job state info to HistoryStorage - onJobDone(jobMaster, jobId); + runningJobMasterMap.remove(jobId); } }); return; @@ -265,7 +266,7 @@ private void restoreJobFromMasterActiveSwitch(@NonNull Long jobId, @NonNull JobI jobMaster.getPhysicalPlan().getPipelineList().forEach(SubPlan::restorePipelineState); jobMaster.run(); } finally { - onJobDone(jobMaster, jobId); + runningJobMasterMap.remove(jobId); } }); } @@ -335,7 +336,9 @@ public PassiveCompletableFuture submitJob(long jobId, Data jobImmutableInf getJobHistoryService(), runningJobStateIMap, runningJobStateTimestampsIMap, - ownedSlotProfilesIMap, engineConfig); + ownedSlotProfilesIMap, + runningJobInfoIMap, + engineConfig); executorService.submit(() -> { try { runningJobInfoIMap.put(jobId, new JobInfo(System.currentTimeMillis(), jobImmutableInformation)); @@ -353,7 +356,7 @@ public PassiveCompletableFuture submitJob(long jobId, Data jobImmutableInf try { jobMaster.run(); } finally { - onJobDone(jobMaster, jobId); + runningJobMasterMap.remove(jobId); } }); return new PassiveCompletableFuture<>(voidCompletableFuture); @@ -372,37 +375,6 @@ public PassiveCompletableFuture savePoint(long jobId) { return new PassiveCompletableFuture<>(voidCompletableFuture); } - private void onJobDone(JobMaster jobMaster, long jobId) { - // storage job state and metrics to HistoryStorage - jobHistoryService.storeJobInfo(jobId, runningJobMasterMap.get(jobId).getJobDAGInfo()); - jobHistoryService.storeFinishedJobState(jobMaster); - removeJobIMap(jobMaster); - runningJobMasterMap.remove(jobId); - } - - private void removeJobIMap(JobMaster jobMaster) { - Long jobId = jobMaster.getJobImmutableInformation().getJobId(); - runningJobStateTimestampsIMap.remove(jobId); - - jobMaster.getPhysicalPlan().getPipelineList().forEach(pipeline -> { - runningJobStateIMap.remove(pipeline.getPipelineLocation()); - runningJobStateTimestampsIMap.remove(pipeline.getPipelineLocation()); - pipeline.getCoordinatorVertexList().forEach(coordinator -> { - runningJobStateIMap.remove(coordinator.getTaskGroupLocation()); - runningJobStateTimestampsIMap.remove(coordinator.getTaskGroupLocation()); - }); - - pipeline.getPhysicalVertexList().forEach(task -> { - runningJobStateIMap.remove(task.getTaskGroupLocation()); - runningJobStateTimestampsIMap.remove(task.getTaskGroupLocation()); - }); - }); - - // These should be deleted at the end. - runningJobStateIMap.remove(jobId); - runningJobInfoIMap.remove(jobId); - } - public PassiveCompletableFuture waitForJobComplete(long jobId) { JobMaster runningJobMaster = runningJobMasterMap.get(jobId); if (runningJobMaster == null) { diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobMaster.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobMaster.java index c9352461ff3..56ae3ca1f4b 100644 --- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobMaster.java +++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobMaster.java @@ -36,6 +36,7 @@ import org.apache.seatunnel.engine.core.dag.logical.LogicalDag; import org.apache.seatunnel.engine.core.job.JobDAGInfo; import org.apache.seatunnel.engine.core.job.JobImmutableInformation; +import org.apache.seatunnel.engine.core.job.JobInfo; import org.apache.seatunnel.engine.core.job.JobResult; import org.apache.seatunnel.engine.core.job.JobStatus; import org.apache.seatunnel.engine.core.job.PipelineStatus; @@ -133,6 +134,8 @@ public class JobMaster { private Map checkpointPlanMap; + private final IMap runningJobInfoIMap; + public JobMaster(@NonNull Data jobImmutableInformationData, @NonNull NodeEngine nodeEngine, @NonNull ExecutorService executorService, @@ -140,7 +143,9 @@ public JobMaster(@NonNull Data jobImmutableInformationData, @NonNull JobHistoryService jobHistoryService, @NonNull IMap runningJobStateIMap, @NonNull IMap runningJobStateTimestampsIMap, - @NonNull IMap ownedSlotProfilesIMap, EngineConfig engineConfig) { + @NonNull IMap ownedSlotProfilesIMap, + @NonNull IMap runningJobInfoIMap, + EngineConfig engineConfig) { this.jobImmutableInformationData = jobImmutableInformationData; this.nodeEngine = nodeEngine; this.executorService = executorService; @@ -151,6 +156,7 @@ public JobMaster(@NonNull Data jobImmutableInformationData, this.jobHistoryService = jobHistoryService; this.runningJobStateIMap = runningJobStateIMap; this.runningJobStateTimestampsIMap = runningJobStateTimestampsIMap; + this.runningJobInfoIMap = runningJobInfoIMap; this.engineConfig = engineConfig; } @@ -216,9 +222,9 @@ public void initStateFuture() { jobStatusFuture.whenComplete(withTryCatch(LOGGER, (v, t) -> { // We need not handle t, Because we will not return t from physicalPlan if (JobStatus.FAILING.equals(v.getStatus())) { - cleanJob(); physicalPlan.updateJobState(JobStatus.FAILING, JobStatus.FAILED); } + cleanJob(); jobMasterCompleteFuture.complete(new JobResult(physicalPlan.getJobStatus(), v.getError())); })); } @@ -255,6 +261,28 @@ public void handleCheckpointError(long pipelineId, Throwable e) { }); } + private void removeJobIMap() { + Long jobId = getJobImmutableInformation().getJobId(); + runningJobStateTimestampsIMap.remove(jobId); + + getPhysicalPlan().getPipelineList().forEach(pipeline -> { + runningJobStateIMap.remove(pipeline.getPipelineLocation()); + runningJobStateTimestampsIMap.remove(pipeline.getPipelineLocation()); + pipeline.getCoordinatorVertexList().forEach(coordinator -> { + runningJobStateIMap.remove(coordinator.getTaskGroupLocation()); + runningJobStateTimestampsIMap.remove(coordinator.getTaskGroupLocation()); + }); + + pipeline.getPhysicalVertexList().forEach(task -> { + runningJobStateIMap.remove(task.getTaskGroupLocation()); + runningJobStateTimestampsIMap.remove(task.getTaskGroupLocation()); + }); + }); + + runningJobStateIMap.remove(jobId); + runningJobInfoIMap.remove(jobId); + } + public JobDAGInfo getJobDAGInfo() { if (jobDAGInfo == null) { jobDAGInfo = DAGUtils.getJobDAGInfo(logicalDag, jobImmutableInformation, isPhysicalDAGIInfo); @@ -276,7 +304,9 @@ public void releasePipelineResource(SubPlan subPlan) { } public void cleanJob() { - // TODO Add some job clean operation + jobHistoryService.storeJobInfo(jobImmutableInformation.getJobId(), getJobDAGInfo()); + jobHistoryService.storeFinishedJobState(this); + removeJobIMap(); } public Address queryTaskGroupAddress(long taskGroupId) { From a9715a5e6e1b2d011c09fdab9df29113b0f372b0 Mon Sep 17 00:00:00 2001 From: gaojun Date: Wed, 8 Feb 2023 17:40:22 +0800 Subject: [PATCH 2/2] Improve clean job --- .../engine/server/CoordinatorService.java | 20 ++++++++++++++----- .../engine/server/master/JobMaster.java | 3 ++- 2 files changed, 17 insertions(+), 6 deletions(-) diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/CoordinatorService.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/CoordinatorService.java index 2f9620d877f..9957b2437d8 100644 --- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/CoordinatorService.java +++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/CoordinatorService.java @@ -251,8 +251,11 @@ private void restoreJobFromMasterActiveSwitch(@NonNull Long jobId, @NonNull JobI jobMaster.cancelJob(); jobMaster.run(); } finally { - // storage job state info to HistoryStorage - runningJobMasterMap.remove(jobId); + // voidCompletableFuture will be cancelled when zeta master node shutdown to simulate master failure, + // don't update runningJobMasterMap is this case. + if (!jobMaster.getJobMasterCompleteFuture().isCancelled()) { + runningJobMasterMap.remove(jobId); + } } }); return; @@ -266,7 +269,11 @@ private void restoreJobFromMasterActiveSwitch(@NonNull Long jobId, @NonNull JobI jobMaster.getPhysicalPlan().getPipelineList().forEach(SubPlan::restorePipelineState); jobMaster.run(); } finally { - runningJobMasterMap.remove(jobId); + // voidCompletableFuture will be cancelled when zeta master node shutdown to simulate master failure, + // don't update runningJobMasterMap is this case. + if (!jobMaster.getJobMasterCompleteFuture().isCancelled()) { + runningJobMasterMap.remove(jobId); + } } }); } @@ -298,7 +305,6 @@ private void clearCoordinatorService() { try { executorService.awaitTermination(20, TimeUnit.SECONDS); - runningJobMasterMap = new ConcurrentHashMap<>(); } catch (InterruptedException e) { throw new SeaTunnelEngineException("wait clean executor service error", e); } @@ -356,7 +362,11 @@ public PassiveCompletableFuture submitJob(long jobId, Data jobImmutableInf try { jobMaster.run(); } finally { - runningJobMasterMap.remove(jobId); + // voidCompletableFuture will be cancelled when zeta master node shutdown to simulate master failure, + // don't update runningJobMasterMap is this case. + if (!jobMaster.getJobMasterCompleteFuture().isCancelled()) { + runningJobMasterMap.remove(jobId); + } } }); return new PassiveCompletableFuture<>(voidCompletableFuture); diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobMaster.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobMaster.java index 56ae3ca1f4b..20675104326 100644 --- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobMaster.java +++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobMaster.java @@ -224,8 +224,9 @@ public void initStateFuture() { if (JobStatus.FAILING.equals(v.getStatus())) { physicalPlan.updateJobState(JobStatus.FAILING, JobStatus.FAILED); } + JobResult jobResult = new JobResult(physicalPlan.getJobStatus(), v.getError()); cleanJob(); - jobMasterCompleteFuture.complete(new JobResult(physicalPlan.getJobStatus(), v.getError())); + jobMasterCompleteFuture.complete(jobResult); })); }