From 8b9a8665d6567e1ce673e948dd22b83bac5d28be Mon Sep 17 00:00:00 2001 From: hailin0 Date: Tue, 5 Dec 2023 19:36:54 +0800 Subject: [PATCH] [Feature][Zeta] Add waitForJobCompleteV2 api --- .../apache/seatunnel/engine/client/job/ClientJobProxy.java | 4 ++-- .../main/java/org/apache/seatunnel/engine/core/job/Job.java | 6 +++++- .../org/apache/seatunnel/engine/core/job/JobResult.java | 4 ++++ 3 files changed, 11 insertions(+), 3 deletions(-) diff --git a/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/job/ClientJobProxy.java b/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/job/ClientJobProxy.java index 0e877f6412a..ceec9b33dc1 100644 --- a/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/job/ClientJobProxy.java +++ b/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/job/ClientJobProxy.java @@ -91,7 +91,7 @@ private void submitJob(JobImmutableInformation jobImmutableInformation) { * @return The job final status */ @Override - public JobStatus waitForJobComplete() { + public JobResult waitForJobCompleteV2() { try { jobResult = RetryUtils.retryWithException( @@ -121,7 +121,7 @@ public JobStatus waitForJobComplete() { || jobResult.getStatus().equals(JobStatus.FAILED)) { throw new SeaTunnelEngineException(jobResult.getError()); } - return jobResult.getStatus(); + return jobResult; } public JobResult getJobResultCache() { diff --git a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/job/Job.java b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/job/Job.java index 88b690b9674..3d4ee7593bb 100644 --- a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/job/Job.java +++ b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/job/Job.java @@ -29,5 +29,9 @@ public interface Job { JobStatus getJobStatus(); - JobStatus waitForJobComplete(); + default JobStatus waitForJobComplete() { + return waitForJobCompleteV2().getStatus(); + } + + JobResult waitForJobCompleteV2(); } diff --git a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/job/JobResult.java b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/job/JobResult.java index b2bf1536c34..5f946ffe45c 100644 --- a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/job/JobResult.java +++ b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/job/JobResult.java @@ -30,4 +30,8 @@ public class JobResult implements Serializable { @NonNull private JobStatus status; private String error; + + public JobResult(@NonNull JobStatus status) { + this.status = status; + } }