diff --git a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/JobExecutionIT.java b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/JobExecutionIT.java index f8891da7401..5fa521eb504 100644 --- a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/JobExecutionIT.java +++ b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/JobExecutionIT.java @@ -19,16 +19,19 @@ import org.apache.seatunnel.common.config.Common; import org.apache.seatunnel.common.config.DeployMode; +import org.apache.seatunnel.common.utils.RetryUtils; import org.apache.seatunnel.engine.client.SeaTunnelClient; import org.apache.seatunnel.engine.client.job.ClientJobProxy; import org.apache.seatunnel.engine.client.job.JobExecutionEnvironment; +import org.apache.seatunnel.engine.common.Constant; import org.apache.seatunnel.engine.common.config.ConfigProvider; import org.apache.seatunnel.engine.common.config.JobConfig; +import org.apache.seatunnel.engine.common.utils.ExceptionUtil; +import org.apache.seatunnel.engine.common.utils.PassiveCompletableFuture; import org.apache.seatunnel.engine.core.job.JobResult; import org.apache.seatunnel.engine.core.job.JobStatus; import org.apache.seatunnel.engine.server.SeaTunnelServerStarter; -import org.awaitility.Awaitility; import org.junit.jupiter.api.AfterAll; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.BeforeAll; @@ -42,6 +45,8 @@ import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; +import static org.awaitility.Awaitility.await; + @Slf4j public class JobExecutionIT { @@ -86,8 +91,7 @@ public void testExecuteJob() throws Exception { return clientJobProxy.waitForJobComplete(); }); - Awaitility.await() - .atMost(600000, TimeUnit.MILLISECONDS) + await().atMost(600000, TimeUnit.MILLISECONDS) .untilAsserted( () -> Assertions.assertTrue( @@ -121,8 +125,7 @@ public void cancelJobTest() throws Exception { Thread.sleep(1000); clientJobProxy.cancelJob(); - Awaitility.await() - .atMost(20000, TimeUnit.MILLISECONDS) + await().atMost(20000, TimeUnit.MILLISECONDS) .untilAsserted( () -> Assertions.assertTrue( @@ -145,11 +148,36 @@ public void testGetErrorInfo() throws ExecutionException, InterruptedException { final ClientJobProxy clientJobProxy = jobExecutionEnv.execute(); JobStatus jobStatus = clientJobProxy.getJobStatus(); while (jobStatus == JobStatus.RUNNING) { - Thread.sleep(1 * 1000L); + Thread.sleep(1000L); jobStatus = clientJobProxy.getJobStatus(); } - CompletableFuture future = clientJobProxy.doWaitForJobComplete(); - JobResult result = future.get(); + + CompletableFuture completableFuture = + CompletableFuture.supplyAsync( + () -> { + try { + return RetryUtils.retryWithException( + () -> { + PassiveCompletableFuture jobFuture = + clientJobProxy.doWaitForJobComplete(); + return jobFuture.get(); + }, + new RetryUtils.RetryMaterial( + 100000, + true, + exception -> + ExceptionUtil.isOperationNeedRetryException( + exception), + Constant.OPERATION_RETRY_SLEEP)); + } catch (Exception e) { + throw new RuntimeException(e); + } + }); + + await().atMost(600000, TimeUnit.MILLISECONDS) + .untilAsserted(() -> Assertions.assertTrue(completableFuture.isDone())); + + JobResult result = completableFuture.get(); Assertions.assertEquals(result.getStatus(), JobStatus.FAILED); Assertions.assertTrue(result.getError().startsWith("java.lang.NumberFormatException")); }