diff --git a/seatunnel-common/src/main/java/org/apache/seatunnel/common/utils/ExceptionUtils.java b/seatunnel-common/src/main/java/org/apache/seatunnel/common/utils/ExceptionUtils.java index 1ee0ae53efa..b8a43566fcf 100644 --- a/seatunnel-common/src/main/java/org/apache/seatunnel/common/utils/ExceptionUtils.java +++ b/seatunnel-common/src/main/java/org/apache/seatunnel/common/utils/ExceptionUtils.java @@ -17,6 +17,8 @@ package org.apache.seatunnel.common.utils; +import lombok.NonNull; + import java.io.PrintWriter; import java.io.StringWriter; @@ -38,4 +40,13 @@ public static String getMessage(Throwable e) { throw new RuntimeException("Failed to print exception logs", e1); } } + + public static Throwable getRootException(@NonNull Throwable e) { + Throwable cause = e.getCause(); + if (cause != null) { + return getRootException(cause); + } else { + return e; + } + } } diff --git a/seatunnel-common/src/test/java/org/apache/seatunnel/common/utils/ExceptionUtilsTest.java b/seatunnel-common/src/test/java/org/apache/seatunnel/common/utils/ExceptionUtilsTest.java new file mode 100644 index 00000000000..d0e036f0994 --- /dev/null +++ b/seatunnel-common/src/test/java/org/apache/seatunnel/common/utils/ExceptionUtilsTest.java @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.common.utils; + +import org.apache.seatunnel.common.exception.CommonErrorCode; +import org.apache.seatunnel.common.exception.SeaTunnelRuntimeException; + +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; + +public class ExceptionUtilsTest { + @Test + public void testGetRootException() { + Exception exception = + new UnsupportedOperationException( + new SeaTunnelException( + new SeaTunnelRuntimeException( + CommonErrorCode.CLASS_NOT_FOUND, "class not fount"))); + Throwable throwable = ExceptionUtils.getRootException(exception); + Assertions.assertTrue(throwable instanceof SeaTunnelRuntimeException); + } +} diff --git a/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/job/ClientJobProxy.java b/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/job/ClientJobProxy.java index 9331591b547..0e877f6412a 100644 --- a/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/job/ClientJobProxy.java +++ b/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/job/ClientJobProxy.java @@ -22,6 +22,7 @@ import org.apache.seatunnel.engine.client.SeaTunnelHazelcastClient; import org.apache.seatunnel.engine.common.Constant; import org.apache.seatunnel.engine.common.exception.SeaTunnelEngineException; +import org.apache.seatunnel.engine.common.utils.ExceptionUtil; import org.apache.seatunnel.engine.common.utils.PassiveCompletableFuture; import org.apache.seatunnel.engine.core.job.Job; import org.apache.seatunnel.engine.core.job.JobImmutableInformation; @@ -35,7 +36,6 @@ import org.apache.commons.lang3.StringUtils; import com.hazelcast.client.impl.protocol.ClientMessage; -import com.hazelcast.core.OperationTimeoutException; import com.hazelcast.logging.ILogger; import com.hazelcast.logging.Logger; import lombok.NonNull; @@ -104,8 +104,7 @@ public JobStatus waitForJobComplete() { 100000, true, exception -> - exception.getCause() - instanceof OperationTimeoutException, + ExceptionUtil.isOperationNeedRetryException(exception), Constant.OPERATION_RETRY_SLEEP)); if (jobResult == null) { throw new SeaTunnelEngineException("failed to fetch job result"); diff --git a/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/utils/ExceptionUtil.java b/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/utils/ExceptionUtil.java index e2503fcb0f5..e0714349392 100644 --- a/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/utils/ExceptionUtil.java +++ b/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/utils/ExceptionUtil.java @@ -16,6 +16,7 @@ package org.apache.seatunnel.engine.common.utils; +import org.apache.seatunnel.common.utils.ExceptionUtils; import org.apache.seatunnel.common.utils.function.ConsumerWithException; import org.apache.seatunnel.common.utils.function.RunnableWithException; import org.apache.seatunnel.common.utils.function.SupplierWithException; @@ -27,6 +28,8 @@ import com.hazelcast.client.impl.protocol.ClientExceptionFactory; import com.hazelcast.client.impl.protocol.ClientProtocolErrorCodes; +import com.hazelcast.core.HazelcastInstanceNotActiveException; +import com.hazelcast.core.OperationTimeoutException; import com.hazelcast.instance.impl.OutOfMemoryErrorDispatcher; import lombok.NonNull; @@ -142,4 +145,11 @@ public static R sneaky(SupplierWithException supp // This method wouldn't be executed. throw new RuntimeException("Never throw here."); } + + public static boolean isOperationNeedRetryException(@NonNull Throwable e) { + Throwable exception = ExceptionUtils.getRootException(e); + return exception instanceof HazelcastInstanceNotActiveException + || exception instanceof InterruptedException + || exception instanceof OperationTimeoutException; + } } diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/IMapCheckpointIDCounter.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/IMapCheckpointIDCounter.java index ffbcea0e115..6bb67973250 100644 --- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/IMapCheckpointIDCounter.java +++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/IMapCheckpointIDCounter.java @@ -19,10 +19,10 @@ import org.apache.seatunnel.common.utils.RetryUtils; import org.apache.seatunnel.engine.common.Constant; +import org.apache.seatunnel.engine.common.utils.ExceptionUtil; import org.apache.seatunnel.engine.core.checkpoint.CheckpointIDCounter; import org.apache.seatunnel.engine.core.job.PipelineStatus; -import com.hazelcast.core.HazelcastInstanceNotActiveException; import com.hazelcast.map.IMap; import com.hazelcast.spi.impl.NodeEngine; @@ -56,7 +56,7 @@ public void start() throws Exception { new RetryUtils.RetryMaterial( Constant.OPERATION_RETRY_TIME, true, - exception -> exception instanceof HazelcastInstanceNotActiveException, + exception -> ExceptionUtil.isOperationNeedRetryException(exception), Constant.OPERATION_RETRY_SLEEP)); } diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalPlan.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalPlan.java index 4a81fdf6373..ed3ec5c8c86 100644 --- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalPlan.java +++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalPlan.java @@ -140,10 +140,6 @@ public void addPipelineEndCallback(SubPlan subPlan) { pipelineState -> { try { if (PipelineStatus.CANCELED.equals(pipelineState.getPipelineStatus())) { - if (subPlan.canRestorePipeline()) { - subPlan.restorePipeline(); - return; - } canceledPipelineNum.incrementAndGet(); if (makeJobEndWhenPipelineEnded) { LOGGER.info( @@ -154,14 +150,6 @@ public void addPipelineEndCallback(SubPlan subPlan) { } } else if (PipelineStatus.FAILED.equals( pipelineState.getPipelineStatus())) { - if (subPlan.canRestorePipeline()) { - LOGGER.info( - String.format( - "Can restore pipeline %s", - subPlan.getPipelineFullName())); - subPlan.restorePipeline(); - return; - } failedPipelineNum.incrementAndGet(); errorBySubPlan.compareAndSet(null, pipelineState.getThrowableMsg()); if (makeJobEndWhenPipelineEnded) { diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalVertex.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalVertex.java index 586afbec330..20923cc2bdf 100644 --- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalVertex.java +++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalVertex.java @@ -18,7 +18,9 @@ package org.apache.seatunnel.engine.server.dag.physical; import org.apache.seatunnel.common.utils.ExceptionUtils; +import org.apache.seatunnel.common.utils.RetryUtils; import org.apache.seatunnel.engine.common.Constant; +import org.apache.seatunnel.engine.common.utils.ExceptionUtil; import org.apache.seatunnel.engine.common.utils.PassiveCompletableFuture; import org.apache.seatunnel.engine.core.job.JobImmutableInformation; import org.apache.seatunnel.engine.server.SeaTunnelServer; @@ -100,6 +102,8 @@ public class PhysicalVertex { private JobMaster jobMaster; + private volatile ExecutionState currExecutionState = ExecutionState.CREATED; + public PhysicalVertex( int subTaskGroupIndex, @NonNull ExecutorService executorService, @@ -135,6 +139,8 @@ public PhysicalVertex( runningJobStateIMap.put(taskGroupLocation, ExecutionState.CREATED); } + this.currExecutionState = (ExecutionState) runningJobStateIMap.get(taskGroupLocation); + this.nodeEngine = nodeEngine; if (LOGGER.isFineEnabled() || LOGGER.isFinestEnabled()) { this.taskFullName = @@ -169,18 +175,18 @@ public PhysicalVertex( public PassiveCompletableFuture initStateFuture() { this.taskFuture = new CompletableFuture<>(); - ExecutionState executionState = (ExecutionState) runningJobStateIMap.get(taskGroupLocation); - if (executionState != null) { + this.currExecutionState = (ExecutionState) runningJobStateIMap.get(taskGroupLocation); + if (currExecutionState != null) { LOGGER.info( String.format( "The task %s is in state %s when init state future", - taskFullName, executionState)); + taskFullName, currExecutionState)); } // if the task state is RUNNING // We need to check the real running status of Task from taskExecutionServer. // Because the state may be RUNNING when the cluster is restarted, but the Task no longer // exists. - if (ExecutionState.RUNNING.equals(executionState)) { + if (ExecutionState.RUNNING.equals(currExecutionState)) { if (!checkTaskGroupIsExecuting(taskGroupLocation)) { updateTaskState(ExecutionState.RUNNING, ExecutionState.FAILED); this.taskFuture.complete( @@ -188,10 +194,10 @@ public PassiveCompletableFuture initStateFuture() { } } // If the task state is CANCELING we need call noticeTaskExecutionServiceCancel(). - else if (ExecutionState.CANCELING.equals(executionState)) { + else if (ExecutionState.CANCELING.equals(currExecutionState)) { noticeTaskExecutionServiceCancel(); - } else if (executionState.isEndState()) { - this.taskFuture.complete(new TaskExecutionState(taskGroupLocation, executionState)); + } else if (currExecutionState.isEndState()) { + this.taskFuture.complete(new TaskExecutionState(taskGroupLocation, currExecutionState)); } return new PassiveCompletableFuture<>(this.taskFuture); } @@ -248,7 +254,7 @@ private SlotProfile getOwnedSlotProfilesByTaskGroup( return null; } - private TaskDeployState deployOnLocal(@NonNull SlotProfile slotProfile) { + private TaskDeployState deployOnLocal(@NonNull SlotProfile slotProfile) throws Exception { return deployInternal( taskGroupImmutableInformation -> { SeaTunnelServer server = nodeEngine.getService(SeaTunnelServer.SERVICE_NAME); @@ -259,7 +265,7 @@ private TaskDeployState deployOnLocal(@NonNull SlotProfile slotProfile) { }); } - private TaskDeployState deployOnRemote(@NonNull SlotProfile slotProfile) { + private TaskDeployState deployOnRemote(@NonNull SlotProfile slotProfile) throws Exception { return deployInternal( taskGroupImmutableInformation -> { try { @@ -300,9 +306,7 @@ private TaskDeployState deployInternal( TaskGroupImmutableInformation taskGroupImmutableInformation = getTaskGroupImmutableInformation(); synchronized (this) { - ExecutionState currentState = - (ExecutionState) runningJobStateIMap.get(taskGroupLocation); - if (ExecutionState.DEPLOYING.equals(currentState)) { + if (ExecutionState.DEPLOYING.equals(currExecutionState)) { TaskDeployState state = taskGroupConsumer.apply(taskGroupImmutableInformation); updateTaskState(ExecutionState.DEPLOYING, ExecutionState.RUNNING); return state; @@ -339,22 +343,40 @@ private boolean turnToEndState(@NonNull ExecutionState endState) { return false; } // consistency check - ExecutionState currentState = - (ExecutionState) runningJobStateIMap.get(taskGroupLocation); - if (currentState.equals(endState)) { + if (currExecutionState.equals(endState)) { return true; } - if (currentState.isEndState()) { + if (currExecutionState.isEndState()) { String message = String.format( "Task %s is already in terminal state %s", - taskFullName, currentState); + taskFullName, currExecutionState); LOGGER.warning(message); return false; } - updateStateTimestamps(endState); - runningJobStateIMap.set(taskGroupLocation, endState); + try { + RetryUtils.retryWithException( + () -> { + updateStateTimestamps(endState); + runningJobStateIMap.set(taskGroupLocation, endState); + return null; + }, + new RetryUtils.RetryMaterial( + Constant.OPERATION_RETRY_TIME, + true, + exception -> ExceptionUtil.isOperationNeedRetryException(exception), + Constant.OPERATION_RETRY_SLEEP)); + } catch (Exception e) { + LOGGER.warning(ExceptionUtils.getMessage(e)); + // If master/worker node done, The job will restore and fix the state from + // TaskExecutionService + LOGGER.warning( + String.format( + "Set %s state %s to Imap failed, skip.", + getTaskFullName(), endState)); + } + this.currExecutionState = endState; LOGGER.info(String.format("%s turn to end state %s.", taskFullName, endState)); return true; } @@ -362,11 +384,11 @@ private boolean turnToEndState(@NonNull ExecutionState endState) { public boolean updateTaskState( @NonNull ExecutionState current, @NonNull ExecutionState targetState) { - LOGGER.fine( - String.format( - "Try to update the task %s state from %s to %s", - taskFullName, current, targetState)); synchronized (this) { + LOGGER.fine( + String.format( + "Try to update the task %s state from %s to %s", + taskFullName, current, targetState)); // consistency check if (current.isEndState()) { String message = "Task is trying to leave terminal state " + current; @@ -396,9 +418,30 @@ public boolean updateTaskState( } // now do the actual state transition - if (current.equals(runningJobStateIMap.get(taskGroupLocation))) { - updateStateTimestamps(targetState); - runningJobStateIMap.set(taskGroupLocation, targetState); + if (current.equals(currExecutionState)) { + try { + RetryUtils.retryWithException( + () -> { + updateStateTimestamps(targetState); + runningJobStateIMap.set(taskGroupLocation, targetState); + return null; + }, + new RetryUtils.RetryMaterial( + Constant.OPERATION_RETRY_TIME, + true, + exception -> + ExceptionUtil.isOperationNeedRetryException(exception), + Constant.OPERATION_RETRY_SLEEP)); + } catch (Exception e) { + LOGGER.warning(ExceptionUtils.getMessage(e)); + // If master/worker node done, The job will restore and fix the state from + // TaskExecutionService + LOGGER.warning( + String.format( + "Set %s state %s to Imap failed, skip.", + getTaskFullName(), targetState)); + } + this.currExecutionState = targetState; LOGGER.info( String.format( "%s turn from state %s to %s.", @@ -408,7 +451,7 @@ public boolean updateTaskState( LOGGER.warning( String.format( "The task %s state in Imap is %s, not equals expected state %s", - taskFullName, runningJobStateIMap.get(taskGroupLocation), current)); + taskFullName, currExecutionState, current)); return false; } } @@ -425,11 +468,6 @@ public void cancel() { } else if (ExecutionState.CANCELING.equals(runningJobStateIMap.get(taskGroupLocation))) { noticeTaskExecutionServiceCancel(); } - - LOGGER.info( - String.format( - "can not cancel task %s because it is in state %s ", - taskFullName, getExecutionState())); } @SuppressWarnings("checkstyle:MagicNumber") @@ -490,7 +528,7 @@ private void updateStateTimestamps(@NonNull ExecutionState targetState) { } public ExecutionState getExecutionState() { - return (ExecutionState) runningJobStateIMap.get(taskGroupLocation); + return currExecutionState; } private void resetExecutionState() { @@ -504,8 +542,28 @@ private void resetExecutionState() { LOGGER.severe(message); throw new IllegalStateException(message); } - updateStateTimestamps(ExecutionState.CREATED); - runningJobStateIMap.set(taskGroupLocation, ExecutionState.CREATED); + try { + RetryUtils.retryWithException( + () -> { + updateStateTimestamps(ExecutionState.CREATED); + runningJobStateIMap.set(taskGroupLocation, ExecutionState.CREATED); + return null; + }, + new RetryUtils.RetryMaterial( + Constant.OPERATION_RETRY_TIME, + true, + exception -> ExceptionUtil.isOperationNeedRetryException(exception), + Constant.OPERATION_RETRY_SLEEP)); + } catch (Exception e) { + LOGGER.warning(ExceptionUtils.getMessage(e)); + // If master/worker node done, The job will restore and fix the state from + // TaskExecutionService + LOGGER.warning( + String.format( + "Set %s state %s to Imap failed, skip.", + getTaskFullName(), ExecutionState.CREATED)); + } + this.currExecutionState = ExecutionState.CREATED; LOGGER.info( String.format("%s turn to state %s.", taskFullName, ExecutionState.CREATED)); } diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/SubPlan.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/SubPlan.java index 1540db61f1d..bc9e3e2aaef 100644 --- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/SubPlan.java +++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/SubPlan.java @@ -18,6 +18,9 @@ package org.apache.seatunnel.engine.server.dag.physical; import org.apache.seatunnel.common.utils.ExceptionUtils; +import org.apache.seatunnel.common.utils.RetryUtils; +import org.apache.seatunnel.engine.common.Constant; +import org.apache.seatunnel.engine.common.utils.ExceptionUtil; import org.apache.seatunnel.engine.common.utils.PassiveCompletableFuture; import org.apache.seatunnel.engine.core.job.JobImmutableInformation; import org.apache.seatunnel.engine.core.job.PipelineExecutionState; @@ -97,6 +100,8 @@ public class SubPlan { private final Object restoreLock = new Object(); + private volatile PipelineStatus currPipelineStatus = PipelineStatus.INITIALIZING; + public SubPlan( int pipelineId, int totalPipelineNum, @@ -130,6 +135,8 @@ public SubPlan( runningJobStateIMap.put(pipelineLocation, PipelineStatus.CREATED); } + this.currPipelineStatus = (PipelineStatus) runningJobStateIMap.get(pipelineLocation); + this.pipelineFullName = String.format( "Job %s (%s), Pipeline: [(%d/%d)]", @@ -181,67 +188,32 @@ private void addPhysicalVertexCallBack(PassiveCompletableFuture 0) { - pipelineStatus = PipelineStatus.FAILED; - // we don't care the checkpoint error reason when the task is - // failed. - jobMaster - .getCheckpointManager() - .cancelCheckpoint(getPipelineId()) - .join(); - } else if (canceledTaskNum.get() > 0) { - pipelineStatus = PipelineStatus.CANCELED; - CheckpointCoordinatorState checkpointCoordinatorState = - jobMaster - .getCheckpointManager() - .cancelCheckpoint(getPipelineId()) - .join(); - if (CheckpointCoordinatorStatus.FAILED.equals( - checkpointCoordinatorState - .getCheckpointCoordinatorStatus())) { - pipelineStatus = PipelineStatus.FAILED; - errorByPhysicalVertex.compareAndSet( - null, checkpointCoordinatorState.getThrowableMsg()); - } - } else { - pipelineStatus = PipelineStatus.FINISHED; - CheckpointCoordinatorState checkpointCoordinatorState = - jobMaster - .getCheckpointManager() - .waitCheckpointCoordinatorComplete(getPipelineId()) - .join(); - - if (CheckpointCoordinatorStatus.FAILED.equals( - checkpointCoordinatorState - .getCheckpointCoordinatorStatus())) { - pipelineStatus = PipelineStatus.FAILED; - errorByPhysicalVertex.compareAndSet( - null, checkpointCoordinatorState.getThrowableMsg()); - } else if (CheckpointCoordinatorStatus.CANCELED.equals( - checkpointCoordinatorState - .getCheckpointCoordinatorStatus())) { - pipelineStatus = PipelineStatus.CANCELED; - errorByPhysicalVertex.compareAndSet( - null, checkpointCoordinatorState.getThrowableMsg()); - } - } - - if (!checkNeedRestore(pipelineStatus)) { - subPlanDone(pipelineStatus); - } - - turnToEndState(pipelineStatus); + PipelineStatus pipelineEndState = getPipelineEndState(); LOGGER.info( String.format( "%s end with state %s", - this.pipelineFullName, pipelineStatus)); - - pipelineFuture.complete( - new PipelineExecutionState( - pipelineId, - pipelineStatus, - errorByPhysicalVertex.get())); + this.pipelineFullName, pipelineEndState)); + + if (!checkNeedRestore(pipelineEndState)) { + subPlanDone(pipelineEndState); + turnToEndState(pipelineEndState); + pipelineFuture.complete( + new PipelineExecutionState( + pipelineId, + pipelineEndState, + errorByPhysicalVertex.get())); + } else { + turnToEndState(pipelineEndState); + if (prepareRestorePipeline()) { + restorePipeline(); + } else { + pipelineFuture.complete( + new PipelineExecutionState( + pipelineId, + pipelineEndState, + errorByPhysicalVertex.get())); + } + } } } catch (Throwable e) { LOGGER.severe( @@ -255,6 +227,46 @@ private void addPhysicalVertexCallBack(PassiveCompletableFuture 0) { + pipelineStatus = PipelineStatus.FAILED; + // we don't care the checkpoint error reason when the task is + // failed. + jobMaster.getCheckpointManager().cancelCheckpoint(getPipelineId()).join(); + } else if (canceledTaskNum.get() > 0) { + pipelineStatus = PipelineStatus.CANCELED; + CheckpointCoordinatorState checkpointCoordinatorState = + jobMaster.getCheckpointManager().cancelCheckpoint(getPipelineId()).join(); + if (CheckpointCoordinatorStatus.FAILED.equals( + checkpointCoordinatorState.getCheckpointCoordinatorStatus())) { + pipelineStatus = PipelineStatus.FAILED; + errorByPhysicalVertex.compareAndSet( + null, checkpointCoordinatorState.getThrowableMsg()); + } + } else { + pipelineStatus = PipelineStatus.FINISHED; + CheckpointCoordinatorState checkpointCoordinatorState = + jobMaster + .getCheckpointManager() + .waitCheckpointCoordinatorComplete(getPipelineId()) + .join(); + + if (CheckpointCoordinatorStatus.FAILED.equals( + checkpointCoordinatorState.getCheckpointCoordinatorStatus())) { + pipelineStatus = PipelineStatus.FAILED; + errorByPhysicalVertex.compareAndSet( + null, checkpointCoordinatorState.getThrowableMsg()); + } else if (CheckpointCoordinatorStatus.CANCELED.equals( + checkpointCoordinatorState.getCheckpointCoordinatorStatus())) { + pipelineStatus = PipelineStatus.CANCELED; + errorByPhysicalVertex.compareAndSet( + null, checkpointCoordinatorState.getThrowableMsg()); + } + } + return pipelineStatus; + } + private boolean checkNeedRestore(PipelineStatus pipelineStatus) { return canRestorePipeline() && !PipelineStatus.FINISHED.equals(pipelineStatus); } @@ -270,23 +282,32 @@ private void notifyCheckpointManagerPipelineEnd(PipelineStatus pipelineStatus) { .join(); } - private void subPlanDone(PipelineStatus pipelineStatus) { - jobMaster.savePipelineMetricsToHistory(getPipelineLocation()); - jobMaster.removeMetricsContext(getPipelineLocation(), pipelineStatus); - jobMaster.releasePipelineResource(this); - notifyCheckpointManagerPipelineEnd(pipelineStatus); + private void subPlanDone(PipelineStatus pipelineStatus) throws Exception { + RetryUtils.retryWithException( + () -> { + jobMaster.savePipelineMetricsToHistory(getPipelineLocation()); + jobMaster.removeMetricsContext(getPipelineLocation(), pipelineStatus); + jobMaster.releasePipelineResource(this); + notifyCheckpointManagerPipelineEnd(pipelineStatus); + return null; + }, + new RetryUtils.RetryMaterial( + Constant.OPERATION_RETRY_TIME, + true, + exception -> ExceptionUtil.isOperationNeedRetryException(exception), + Constant.OPERATION_RETRY_SLEEP)); } public boolean canRestorePipeline() { return jobMaster.isNeedRestore() && getPipelineRestoreNum() < PIPELINE_MAX_RESTORE_NUM; } - private void turnToEndState(@NonNull PipelineStatus endState) { + private void turnToEndState(@NonNull PipelineStatus endState) throws Exception { synchronized (this) { // consistency check - PipelineStatus current = (PipelineStatus) runningJobStateIMap.get(pipelineLocation); - if (current.isEndState() && !endState.isEndState()) { - String message = "Pipeline is trying to leave terminal state " + current; + if (this.currPipelineStatus.isEndState() && !endState.isEndState()) { + String message = + "Pipeline is trying to leave terminal state " + this.currPipelineStatus; LOGGER.severe(message); throw new IllegalStateException(message); } @@ -299,14 +320,23 @@ private void turnToEndState(@NonNull PipelineStatus endState) { // we must update runningJobStateTimestampsIMap first and then can update // runningJobStateIMap - updateStateTimestamps(endState); - - runningJobStateIMap.set(pipelineLocation, endState); + RetryUtils.retryWithException( + () -> { + updateStateTimestamps(endState); + runningJobStateIMap.set(pipelineLocation, endState); + return null; + }, + new RetryUtils.RetryMaterial( + Constant.OPERATION_RETRY_TIME, + true, + exception -> ExceptionUtil.isOperationNeedRetryException(exception), + Constant.OPERATION_RETRY_SLEEP)); + this.currPipelineStatus = endState; } } public boolean updatePipelineState( - @NonNull PipelineStatus current, @NonNull PipelineStatus targetState) { + @NonNull PipelineStatus current, @NonNull PipelineStatus targetState) throws Exception { synchronized (this) { // consistency check if (current.isEndState()) { @@ -345,8 +375,18 @@ public boolean updatePipelineState( // we must update runningJobStateTimestampsIMap first and then can update // runningJobStateIMap - updateStateTimestamps(targetState); - runningJobStateIMap.set(pipelineLocation, targetState); + RetryUtils.retryWithException( + () -> { + updateStateTimestamps(targetState); + runningJobStateIMap.set(pipelineLocation, targetState); + return null; + }, + new RetryUtils.RetryMaterial( + Constant.OPERATION_RETRY_TIME, + true, + exception -> ExceptionUtil.isOperationNeedRetryException(exception), + Constant.OPERATION_RETRY_SLEEP)); + this.currPipelineStatus = targetState; return true; } else { return false; @@ -436,15 +476,11 @@ private CompletableFuture cancelTask(@NonNull PhysicalVertex task) { }, executorService); } - LOGGER.info( - String.format( - "can not cancel task %s because it is in state %s ", - task.getTaskFullName(), task.getExecutionState())); return null; } /** Before restore a pipeline, the pipeline must do reset */ - private synchronized void reset() { + private synchronized void reset() throws Exception { resetPipelineState(); finishedTaskNum.set(0); canceledTaskNum.set(0); @@ -463,23 +499,38 @@ private void updateStateTimestamps(@NonNull PipelineStatus targetState) { runningJobStateTimestampsIMap.set(pipelineLocation, stateTimestamps); } - private void resetPipelineState() { - PipelineStatus pipelineState = getPipelineState(); - if (!pipelineState.isEndState()) { - String message = - String.format( - "%s reset state failed, only end state can be reset, current is %s", - getPipelineFullName(), pipelineState); - LOGGER.severe(message); - throw new IllegalStateException(message); - } + private void resetPipelineState() throws Exception { + RetryUtils.retryWithException( + () -> { + PipelineStatus pipelineState = getPipelineState(); + if (!pipelineState.isEndState()) { + String message = + String.format( + "%s reset state failed, only end state can be reset, current is %s", + getPipelineFullName(), pipelineState); + LOGGER.severe(message); + throw new IllegalStateException(message); + } - updateStateTimestamps(PipelineStatus.CREATED); - runningJobStateIMap.set(pipelineLocation, PipelineStatus.CREATED); + updateStateTimestamps(PipelineStatus.CREATED); + runningJobStateIMap.set(pipelineLocation, PipelineStatus.CREATED); + this.currPipelineStatus = PipelineStatus.CREATED; + ; + return null; + }, + new RetryUtils.RetryMaterial( + Constant.OPERATION_RETRY_TIME, + true, + exception -> ExceptionUtil.isOperationNeedRetryException(exception), + Constant.OPERATION_RETRY_SLEEP)); } - /** restore the pipeline when pipeline failed or canceled by error. */ - public void restorePipeline() { + /** + * reset the pipeline and task state and init state future again + * + * @return + */ + private boolean prepareRestorePipeline() { synchronized (restoreLock) { try { pipelineRestoreNum++; @@ -497,9 +548,24 @@ public void restorePipeline() { } reset(); jobMaster.getPhysicalPlan().addPipelineEndCallback(this); + return true; + } catch (Throwable e) { + if (this.currPipelineStatus.isEndState()) { + // restore failed + return false; + } + jobMaster.getPhysicalPlan().addPipelineEndCallback(this); + return true; + } + } + } + + /** restore the pipeline when pipeline failed or canceled by error. */ + public void restorePipeline() { + synchronized (restoreLock) { + try { if (jobMaster.getCheckpointManager().isCompletedPipeline(pipelineId)) { forcePipelineFinish(); - return; } jobMaster.getCheckpointManager().reportedPipelineRunning(pipelineId, false); reSchedulerPipelineFuture = jobMaster.reSchedulerPipeline(this); @@ -560,7 +626,7 @@ public String getPipelineFullName() { } public PipelineStatus getPipelineState() { - return (PipelineStatus) runningJobStateIMap.get(pipelineLocation); + return this.currPipelineStatus; } public PipelineLocation getPipelineLocation() { diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/scheduler/PipelineBaseScheduler.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/scheduler/PipelineBaseScheduler.java index fc6aff87f9a..ab5ad8f826f 100644 --- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/scheduler/PipelineBaseScheduler.java +++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/scheduler/PipelineBaseScheduler.java @@ -291,8 +291,19 @@ private CompletableFuture deployTask(PhysicalVertex task, SlotProfile slot private void deployPipeline( @NonNull SubPlan pipeline, Map slotProfiles) { - if (pipeline.updatePipelineState(PipelineStatus.SCHEDULED, PipelineStatus.DEPLOYING)) { - + boolean changeStateSuccess = false; + try { + changeStateSuccess = + pipeline.updatePipelineState( + PipelineStatus.SCHEDULED, PipelineStatus.DEPLOYING); + } catch (Exception e) { + log.warn( + "{} turn to state {} failed, cancel pipeline", + pipeline.getPipelineFullName(), + PipelineStatus.DEPLOYING); + pipeline.cancelPipeline(); + } + if (changeStateSuccess) { try { List> deployCoordinatorFuture = pipeline.getCoordinatorVertexList().stream()