diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/CoordinatorService.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/CoordinatorService.java index f5e5022014c..5e68e661b36 100644 --- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/CoordinatorService.java +++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/CoordinatorService.java @@ -214,10 +214,12 @@ private void restoreJobFromMasterActiveSwitch(@NonNull Long jobId, @NonNull JobI ownedSlotProfilesIMap, engineConfig); + jobMaster.init(runningJobInfoIMap.get(jobId).getInitializationTimestamp()); try { - jobMaster.init(runningJobInfoIMap.get(jobId).getInitializationTimestamp()); + jobMaster.initCheckPointManager(); } catch (Exception e) { - throw new SeaTunnelEngineException(String.format("Job id %s init JobMaster failed", jobId), e); + jobMaster.cancelJob(); + throw new SeaTunnelEngineException(String.format("Job id %s init CheckPointManager failed", jobId), e); } String jobFullName = jobMaster.getPhysicalPlan().getJobFullName(); @@ -339,6 +341,7 @@ public PassiveCompletableFuture submitJob(long jobId, Data jobImmutableInf runningJobInfoIMap.put(jobId, new JobInfo(System.currentTimeMillis(), jobImmutableInformation)); runningJobMasterMap.put(jobId, jobMaster); jobMaster.init(runningJobInfoIMap.get(jobId).getInitializationTimestamp()); + jobMaster.initCheckPointManager(); } catch (Throwable e) { logger.severe(String.format("submit job %s error %s ", jobId, ExceptionUtils.getMessage(e))); voidCompletableFuture.completeExceptionally(e); diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalPlan.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalPlan.java index dee54f55513..29b5328a257 100644 --- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalPlan.java +++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalPlan.java @@ -144,8 +144,11 @@ public void addPipelineEndCallback(SubPlan subPlan) { future.thenAcceptAsync(pipelineState -> { try { // Notify checkpoint manager when the pipeline end, Whether the pipeline will be restarted or not - jobMaster.getCheckpointManager() - .listenPipelineRetry(subPlan.getPipelineLocation().getPipelineId(), subPlan.getPipelineState()).join(); + if (jobMaster.getCheckpointManager() != null) { + jobMaster.getCheckpointManager() + .listenPipelineRetry(subPlan.getPipelineLocation().getPipelineId(), subPlan.getPipelineState()) + .join(); + } if (PipelineStatus.CANCELED.equals(pipelineState.getPipelineStatus())) { if (canRestorePipeline(subPlan)) { subPlan.restorePipeline(); diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/SubPlan.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/SubPlan.java index b882244588f..f21d49c262c 100644 --- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/SubPlan.java +++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/SubPlan.java @@ -262,7 +262,9 @@ public void cancelPipeline() { } private void cancelCheckpointCoordinator() { - jobMaster.getCheckpointManager().listenPipelineRetry(pipelineId, PipelineStatus.CANCELING).join(); + if (jobMaster.getCheckpointManager() != null) { + jobMaster.getCheckpointManager().listenPipelineRetry(pipelineId, PipelineStatus.CANCELING).join(); + } } private void cancelPipelineTasks() { diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobMaster.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobMaster.java index 0e967a2bc60..c9352461ff3 100644 --- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobMaster.java +++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobMaster.java @@ -25,6 +25,7 @@ import org.apache.seatunnel.common.utils.ExceptionUtils; import org.apache.seatunnel.common.utils.RetryUtils; import org.apache.seatunnel.common.utils.SeaTunnelException; +import org.apache.seatunnel.engine.checkpoint.storage.exception.CheckpointStorageException; import org.apache.seatunnel.engine.common.Constant; import org.apache.seatunnel.engine.common.config.EngineConfig; import org.apache.seatunnel.engine.common.config.server.CheckpointConfig; @@ -130,6 +131,8 @@ public class JobMaster { private boolean isRunning = true; + private Map checkpointPlanMap; + public JobMaster(@NonNull Data jobImmutableInformationData, @NonNull NodeEngine nodeEngine, @NonNull ExecutorService executorService, @@ -151,7 +154,7 @@ public JobMaster(@NonNull Data jobImmutableInformationData, this.engineConfig = engineConfig; } - public void init(long initializationTimestamp) throws Exception { + public void init(long initializationTimestamp) { jobImmutableInformation = nodeEngine.getSerializationService().toObject( jobImmutableInformationData); LOGGER.info(String.format("Init JobMaster for Job %s (%s) ", jobImmutableInformation.getJobConfig().getName(), @@ -162,8 +165,6 @@ public void init(long initializationTimestamp) throws Exception { classLoader = new SeaTunnelChildFirstClassLoader(jobImmutableInformation.getPluginJarsUrls()); logicalDag = CustomClassLoadedObject.deserializeWithCustomClassLoader(nodeEngine.getSerializationService(), classLoader, jobImmutableInformation.getLogicalDag()); - CheckpointConfig checkpointConfig = mergeEnvAndEngineConfig(engineConfig.getCheckpointConfig(), - jobImmutableInformation.getJobConfig().getEnvOptions()); final Tuple2> planTuple = PlanUtils.fromLogicalDAG(logicalDag, nodeEngine, @@ -176,14 +177,20 @@ public void init(long initializationTimestamp) throws Exception { engineConfig.getQueueType()); this.physicalPlan = planTuple.f0(); this.physicalPlan.setJobMaster(this); + this.checkpointPlanMap = planTuple.f1(); + this.initStateFuture(); + } + + public void initCheckPointManager() throws CheckpointStorageException { + CheckpointConfig checkpointConfig = mergeEnvAndEngineConfig(engineConfig.getCheckpointConfig(), + jobImmutableInformation.getJobConfig().getEnvOptions()); this.checkpointManager = new CheckpointManager( jobImmutableInformation.getJobId(), jobImmutableInformation.isStartWithSavePoint(), nodeEngine, this, - planTuple.f1(), + checkpointPlanMap, checkpointConfig); - this.initStateFuture(); } // TODO replace it after ReadableConfig Support parse yaml format, then use only one config to read engine and env config. @@ -323,8 +330,9 @@ public List getCurrJobMetrics(Collection { if (taskGroupLocation.getJobId() == this.getJobImmutableInformation().getJobId()) { try { - RawJobMetrics rawJobMetrics = (RawJobMetrics) NodeEngineUtil.sendOperationToMemberNode(nodeEngine, - new GetTaskGroupMetricsOperation(taskGroupLocation), slotProfile.getWorker()).get(); + RawJobMetrics rawJobMetrics = + (RawJobMetrics) NodeEngineUtil.sendOperationToMemberNode(nodeEngine, + new GetTaskGroupMetricsOperation(taskGroupLocation), slotProfile.getWorker()).get(); metrics.add(rawJobMetrics); } catch (Exception e) { throw new SeaTunnelException(e.getMessage()); @@ -336,7 +344,8 @@ public List getCurrJobMetrics(Collection currJobMetrics = this.getCurrJobMetrics(Collections.singleton(this.getOwnedSlotProfiles(pipelineLocation))); + List currJobMetrics = + this.getCurrJobMetrics(Collections.singleton(this.getOwnedSlotProfiles(pipelineLocation))); JobMetrics jobMetrics = JobMetricsUtil.toJobMetrics(currJobMetrics); long jobId = this.getJobImmutableInformation().getJobId(); synchronized (this) { @@ -346,8 +355,9 @@ public void savePipelineMetricsToHistory(PipelineLocation pipelineLocation) { this.cleanTaskGroupContext(pipelineLocation); } - public void removeMetricsContext(PipelineLocation pipelineLocation, PipelineStatus pipelineStatus){ - if (pipelineStatus.equals(PipelineStatus.FINISHED) && !checkpointManager.isSavePointEnd() || pipelineStatus.equals(PipelineStatus.CANCELED)){ + public void removeMetricsContext(PipelineLocation pipelineLocation, PipelineStatus pipelineStatus) { + if (pipelineStatus.equals(PipelineStatus.FINISHED) && !checkpointManager.isSavePointEnd() || + pipelineStatus.equals(PipelineStatus.CANCELED)) { IMap map = nodeEngine.getHazelcastInstance().getMap(Constant.IMAP_RUNNING_JOB_METRICS); map.keySet().stream().filter(taskLocation -> { @@ -399,7 +409,7 @@ public void updateTaskExecutionState(TaskExecutionState taskExecutionState) { /** * Execute savePoint, which will cause the job to end. */ - public CompletableFuture savePoint(){ + public CompletableFuture savePoint() { PassiveCompletableFuture[] passiveCompletableFutures = checkpointManager.triggerSavepoints(); return CompletableFuture.allOf(passiveCompletableFutures); @@ -414,7 +424,8 @@ public void setOwnedSlotProfiles(@NonNull PipelineLocation pipelineLocation, @NonNull Map pipelineOwnedSlotProfiles) { ownedSlotProfilesIMap.put(pipelineLocation, pipelineOwnedSlotProfiles); try { - RetryUtils.retryWithException(() -> pipelineOwnedSlotProfiles.equals(ownedSlotProfilesIMap.get(pipelineLocation)), + RetryUtils.retryWithException( + () -> pipelineOwnedSlotProfiles.equals(ownedSlotProfilesIMap.get(pipelineLocation)), new RetryUtils.RetryMaterial(20, true, exception -> exception instanceof NullPointerException && isRunning, 1000)); } catch (Exception e) {