diff --git a/seatunnel-core/seatunnel-starter/src/main/java/org/apache/seatunnel/core/starter/seatunnel/command/ClientExecuteCommand.java b/seatunnel-core/seatunnel-starter/src/main/java/org/apache/seatunnel/core/starter/seatunnel/command/ClientExecuteCommand.java index eae7361ec72..ad41ae983c9 100644 --- a/seatunnel-core/seatunnel-starter/src/main/java/org/apache/seatunnel/core/starter/seatunnel/command/ClientExecuteCommand.java +++ b/seatunnel-core/seatunnel-starter/src/main/java/org/apache/seatunnel/core/starter/seatunnel/command/ClientExecuteCommand.java @@ -32,6 +32,8 @@ import org.apache.seatunnel.engine.common.config.ConfigProvider; import org.apache.seatunnel.engine.common.config.JobConfig; import org.apache.seatunnel.engine.common.config.SeaTunnelConfig; +import org.apache.seatunnel.engine.common.exception.SeaTunnelEngineException; +import org.apache.seatunnel.engine.core.job.JobResult; import org.apache.seatunnel.engine.core.job.JobStatus; import org.apache.seatunnel.engine.server.SeaTunnelNodeContext; @@ -182,7 +184,12 @@ public void execute() throws CommandExecuteException { seaTunnelConfig.getEngineConfig().getPrintJobMetricsInfoInterval(), TimeUnit.SECONDS); // wait for job complete - jobStatus = clientJobProxy.waitForJobComplete(); + JobResult jobResult = clientJobProxy.waitForJobCompleteV2(); + jobStatus = jobResult.getStatus(); + if (StringUtils.isNotEmpty(jobResult.getError()) + || jobResult.getStatus().equals(JobStatus.FAILED)) { + throw new SeaTunnelEngineException(jobResult.getError()); + } // get job end time endTime = LocalDateTime.now(); // get job statistic information when job finished diff --git a/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/common/container/seatunnel/SeaTunnelContainer.java b/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/common/container/seatunnel/SeaTunnelContainer.java index 41b985bc9a5..fe47b1988c7 100644 --- a/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/common/container/seatunnel/SeaTunnelContainer.java +++ b/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/common/container/seatunnel/SeaTunnelContainer.java @@ -49,7 +49,7 @@ public class SeaTunnelContainer extends AbstractTestContainer { private static final String JDK_DOCKER_IMAGE = "openjdk:8"; private static final String CLIENT_SHELL = "seatunnel.sh"; private static final String SERVER_SHELL = "seatunnel-cluster.sh"; - private GenericContainer server; + protected GenericContainer server; @Override public void startUp() throws Exception { diff --git a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/JobClientJobProxyIT.java b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/JobClientJobProxyIT.java new file mode 100644 index 00000000000..ce54ba84c29 --- /dev/null +++ b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/JobClientJobProxyIT.java @@ -0,0 +1,96 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.engine.e2e; + +import org.apache.seatunnel.e2e.common.util.ContainerUtil; + +import org.apache.commons.lang3.StringUtils; + +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Test; +import org.testcontainers.containers.Container; +import org.testcontainers.containers.GenericContainer; +import org.testcontainers.containers.output.Slf4jLogConsumer; +import org.testcontainers.containers.wait.strategy.Wait; +import org.testcontainers.utility.DockerLoggerFactory; +import org.testcontainers.utility.MountableFile; + +import java.io.IOException; +import java.nio.file.Paths; +import java.util.Arrays; + +import static org.apache.seatunnel.e2e.common.util.ContainerUtil.PROJECT_ROOT_PATH; + +public class JobClientJobProxyIT extends SeaTunnelContainer { + private static final String JDK_DOCKER_IMAGE = "openjdk:8"; + private static final String SERVER_SHELL = "seatunnel-cluster.sh"; + + @Override + @BeforeAll + public void startUp() throws Exception { + this.server = + new GenericContainer<>(getDockerImage()) + .withNetwork(NETWORK) + .withCommand( + ContainerUtil.adaptPathForWin( + Paths.get(SEATUNNEL_HOME, "bin", SERVER_SHELL).toString())) + .withNetworkAliases("server") + .withExposedPorts() + .withLogConsumer( + new Slf4jLogConsumer( + DockerLoggerFactory.getLogger( + "seatunnel-engine:" + JDK_DOCKER_IMAGE))) + .waitingFor(Wait.forListeningPort()); + copySeaTunnelStarterToContainer(server); + server.setExposedPorts(Arrays.asList(5801)); + server.withCopyFileToContainer( + MountableFile.forHostPath( + PROJECT_ROOT_PATH + + "/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/"), + Paths.get(SEATUNNEL_HOME, "config").toString()); + + // use seatunnel_fixed_slot_num.yaml replace seatunnel.yaml in container + server.withCopyFileToContainer( + MountableFile.forHostPath( + PROJECT_ROOT_PATH + + "/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/seatunnel_fixed_slot_num.yaml"), + Paths.get(SEATUNNEL_HOME, "config/seatunnel.yaml").toString()); + + server.withCopyFileToContainer( + MountableFile.forHostPath( + PROJECT_ROOT_PATH + + "/seatunnel-shade/seatunnel-hadoop3-3.1.4-uber/target/seatunnel-hadoop3-3.1.4-uber.jar"), + Paths.get(SEATUNNEL_HOME, "lib/seatunnel-hadoop3-3.1.4-uber.jar").toString()); + server.start(); + // execute extra commands + executeExtraCommands(server); + } + + @Test + public void testJobFailedWillThrowException() throws IOException, InterruptedException { + Container.ExecResult execResult = executeSeaTunnelJob("/batch_slot_not_enough.conf"); + Assertions.assertNotEquals(0, execResult.getExitCode()); + Assertions.assertTrue( + StringUtils.isNotBlank(execResult.getStderr()) + && execResult + .getStderr() + .contains( + "org.apache.seatunnel.engine.server.resourcemanager.NoEnoughResourceException: can't apply resource request")); + } +} diff --git a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/SeaTunnelSlotIT.java b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/SeaTunnelSlotIT.java new file mode 100644 index 00000000000..8f7b459c488 --- /dev/null +++ b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/SeaTunnelSlotIT.java @@ -0,0 +1,145 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.engine.e2e; + +import org.apache.seatunnel.common.config.Common; +import org.apache.seatunnel.common.config.DeployMode; +import org.apache.seatunnel.engine.client.SeaTunnelClient; +import org.apache.seatunnel.engine.client.job.ClientJobExecutionEnvironment; +import org.apache.seatunnel.engine.client.job.ClientJobProxy; +import org.apache.seatunnel.engine.common.config.ConfigProvider; +import org.apache.seatunnel.engine.common.config.JobConfig; +import org.apache.seatunnel.engine.common.config.SeaTunnelConfig; +import org.apache.seatunnel.engine.core.job.JobStatus; +import org.apache.seatunnel.engine.server.SeaTunnelServerStarter; + +import org.awaitility.Awaitility; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; + +import com.hazelcast.client.config.ClientConfig; +import com.hazelcast.instance.impl.HazelcastInstanceImpl; + +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.TimeUnit; + +public class SeaTunnelSlotIT { + @Test + public void testSlotNotEnough() throws Exception { + HazelcastInstanceImpl node1 = null; + SeaTunnelClient engineClient = null; + + try { + String testClusterName = "testSlotNotEnough"; + SeaTunnelConfig seaTunnelConfig = ConfigProvider.locateAndGetSeaTunnelConfig(); + seaTunnelConfig.getHazelcastConfig().setClusterName(testClusterName); + // slot num is 3 + seaTunnelConfig.getEngineConfig().getSlotServiceConfig().setDynamicSlot(false); + seaTunnelConfig.getEngineConfig().getSlotServiceConfig().setSlotNum(3); + + node1 = SeaTunnelServerStarter.createHazelcastInstance(seaTunnelConfig); + + // client config + Common.setDeployMode(DeployMode.CLIENT); + String filePath = TestUtils.getResource("batch_slot_not_enough.conf"); + JobConfig jobConfig = new JobConfig(); + jobConfig.setName(testClusterName); + + ClientConfig clientConfig = ConfigProvider.locateAndGetClientConfig(); + clientConfig.setClusterName(testClusterName); + engineClient = new SeaTunnelClient(clientConfig); + ClientJobExecutionEnvironment jobExecutionEnv = + engineClient.createExecutionContext(filePath, jobConfig, seaTunnelConfig); + + final ClientJobProxy clientJobProxy = jobExecutionEnv.execute(); + CompletableFuture objectCompletableFuture = + CompletableFuture.supplyAsync(clientJobProxy::waitForJobComplete); + Awaitility.await() + .atMost(600000, TimeUnit.MILLISECONDS) + .untilAsserted( + () -> { + Thread.sleep(2000); + Assertions.assertTrue( + objectCompletableFuture.isDone() + && JobStatus.FAILED.equals( + objectCompletableFuture.get())); + }); + + } finally { + if (engineClient != null) { + engineClient.shutdown(); + } + + if (node1 != null) { + node1.shutdown(); + } + } + } + + @Test + public void testSlotEnough() throws Exception { + HazelcastInstanceImpl node1 = null; + SeaTunnelClient engineClient = null; + + try { + String testClusterName = "testSlotEnough"; + SeaTunnelConfig seaTunnelConfig = ConfigProvider.locateAndGetSeaTunnelConfig(); + seaTunnelConfig.getHazelcastConfig().setClusterName(testClusterName); + // slot num is 10 + seaTunnelConfig.getEngineConfig().getSlotServiceConfig().setDynamicSlot(false); + seaTunnelConfig.getEngineConfig().getSlotServiceConfig().setSlotNum(10); + + node1 = SeaTunnelServerStarter.createHazelcastInstance(seaTunnelConfig); + + // client config + Common.setDeployMode(DeployMode.CLIENT); + String filePath = TestUtils.getResource("batch_slot_not_enough.conf"); + JobConfig jobConfig = new JobConfig(); + jobConfig.setName(testClusterName); + + ClientConfig clientConfig = ConfigProvider.locateAndGetClientConfig(); + clientConfig.setClusterName(testClusterName); + engineClient = new SeaTunnelClient(clientConfig); + ClientJobExecutionEnvironment jobExecutionEnv = + engineClient.createExecutionContext(filePath, jobConfig, seaTunnelConfig); + + final ClientJobProxy clientJobProxy = jobExecutionEnv.execute(); + CompletableFuture objectCompletableFuture = + CompletableFuture.supplyAsync(clientJobProxy::waitForJobComplete); + Awaitility.await() + .atMost(600000, TimeUnit.MILLISECONDS) + .untilAsserted( + () -> { + Thread.sleep(2000); + Assertions.assertTrue( + objectCompletableFuture.isDone() + && JobStatus.FINISHED.equals( + objectCompletableFuture.get())); + }); + + } finally { + if (engineClient != null) { + engineClient.shutdown(); + } + + if (node1 != null) { + node1.shutdown(); + } + } + } +} diff --git a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/batch_slot_not_enough.conf b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/batch_slot_not_enough.conf new file mode 100644 index 00000000000..99e93d6c004 --- /dev/null +++ b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/batch_slot_not_enough.conf @@ -0,0 +1,48 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +###### +###### This config file is a demonstration of streaming processing in seatunnel config +###### + +env { + # You can set engine configuration here + job.mode = "BATCH" + #execution.checkpoint.data-uri = "hdfs://localhost:9000/checkpoint" +} + +source { + # This is a example source plugin **only for test and demonstrate the feature source plugin** + FakeSource { + result_table_name = "fake" + parallelism = 4 + schema = { + fields { + name = "string" + age = "int" + } + } + } +} + +transform { +} + +sink { + console { + source_table_name="fake" + } +} \ No newline at end of file diff --git a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/seatunnel_fixed_slot_num.yaml b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/seatunnel_fixed_slot_num.yaml new file mode 100644 index 00000000000..91736ce34a7 --- /dev/null +++ b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/seatunnel_fixed_slot_num.yaml @@ -0,0 +1,34 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +seatunnel: + engine: + history-job-expire-minutes: 1 + backup-count: 2 + queue-type: blockingqueue + print-execution-info-interval: 10 + slot-service: + dynamic-slot: false + slot-num: 3 + checkpoint: + interval: 300000 + timeout: 100000 + storage: + type: localfile + max-retained: 3 + plugin-config: + namespace: /tmp/seatunnel/checkpoint_snapshot/ diff --git a/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/job/ClientJobProxy.java b/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/job/ClientJobProxy.java index ceec9b33dc1..21802c52156 100644 --- a/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/job/ClientJobProxy.java +++ b/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/job/ClientJobProxy.java @@ -33,8 +33,6 @@ import org.apache.seatunnel.engine.core.protocol.codec.SeaTunnelSubmitJobCodec; import org.apache.seatunnel.engine.core.protocol.codec.SeaTunnelWaitForJobCompleteCodec; -import org.apache.commons.lang3.StringUtils; - import com.hazelcast.client.impl.protocol.ClientMessage; import com.hazelcast.logging.ILogger; import com.hazelcast.logging.Logger; @@ -117,10 +115,6 @@ public JobResult waitForJobCompleteV2() { throw new RuntimeException(e); } LOGGER.info(String.format("Job (%s) end with state %s", jobId, jobResult.getStatus())); - if (StringUtils.isNotEmpty(jobResult.getError()) - || jobResult.getStatus().equals(JobStatus.FAILED)) { - throw new SeaTunnelEngineException(jobResult.getError()); - } return jobResult; } diff --git a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/job/Job.java b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/job/Job.java index 3d4ee7593bb..52fba142054 100644 --- a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/job/Job.java +++ b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/job/Job.java @@ -29,6 +29,7 @@ public interface Job { JobStatus getJobStatus(); + @Deprecated default JobStatus waitForJobComplete() { return waitForJobCompleteV2().getStatus(); } diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/SubPlan.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/SubPlan.java index 0f9141ed00b..c1e7f975c41 100644 --- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/SubPlan.java +++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/SubPlan.java @@ -231,6 +231,15 @@ private PipelineStatus getPipelineEndState() { errorByPhysicalVertex.compareAndSet( null, checkpointCoordinatorState.getThrowableMsg()); } + + // Because the pipeline state must update by tasks, If the pipeline can not get enough + // slot, the pipeline state will turn to Failing and then cancel all tasks in this + // pipeline. + // Because the tasks never run, so the tasks will complete with CANCELED. But the actual + // status of the pipeline should be FAILED + if (getPipelineState().equals(PipelineStatus.FAILING)) { + pipelineStatus = PipelineStatus.FAILED; + } } else { pipelineStatus = PipelineStatus.FINISHED; CheckpointCoordinatorState checkpointCoordinatorState = @@ -322,10 +331,11 @@ public synchronized void updatePipelineState(@NonNull PipelineStatus targetState // now do the actual state transition // we must update runningJobStateTimestampsIMap first and then can update // runningJobStateIMap + PipelineStatus finalTargetState = targetState; RetryUtils.retryWithException( () -> { - updateStateTimestamps(targetState); - runningJobStateIMap.set(pipelineLocation, targetState); + updateStateTimestamps(finalTargetState); + runningJobStateIMap.set(pipelineLocation, finalTargetState); return null; }, new RetryUtils.RetryMaterial( @@ -614,11 +624,13 @@ private synchronized void stateProcess() { case CANCELING: coordinatorVertexList.forEach( task -> { + task.startPhysicalVertex(); task.cancel(); }); physicalVertexList.forEach( task -> { + task.startPhysicalVertex(); task.cancel(); }); break;