diff --git a/docs/en/seatunnel-engine/rest-api.md b/docs/en/seatunnel-engine/rest-api.md
index 2f44421a3d6..3f8cf910ea0 100644
--- a/docs/en/seatunnel-engine/rest-api.md
+++ b/docs/en/seatunnel-engine/rest-api.md
@@ -238,3 +238,27 @@ network:
------------------------------------------------------------------------------------------
+### Stop Job.
+
+
+POST
/hazelcast/rest/maps/stop-job
(Returns jobId if job stoped successfully.)
+
+#### Body
+
+```json
+{
+ "jobId": 733584788375666689,
+ "isStopWithSavePoint": false # if job is stopped with save point
+}
+```
+
+#### Responses
+
+```json
+{
+"jobId": 733584788375666689
+}
+```
+
+
+
diff --git a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/RestApiIT.java b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/RestApiIT.java
index d38d1c732f1..d896ec17bf7 100644
--- a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/RestApiIT.java
+++ b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/RestApiIT.java
@@ -32,9 +32,9 @@
import org.apache.seatunnel.engine.server.rest.RestConstant;
import org.awaitility.Awaitility;
-import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
-import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import com.hazelcast.client.config.ClientConfig;
@@ -57,8 +57,8 @@ public class RestApiIT {
private static HazelcastInstanceImpl hazelcastInstance;
- @BeforeAll
- static void beforeClass() throws Exception {
+ @BeforeEach
+ void beforeClass() throws Exception {
String testClusterName = TestUtils.getClusterName("RestApiIT");
SeaTunnelConfig seaTunnelConfig = ConfigProvider.locateAndGetSeaTunnelConfig();
seaTunnelConfig.getHazelcastConfig().setClusterName(testClusterName);
@@ -136,10 +136,124 @@ public void testSystemMonitoringInformation() {
@Test
public void testSubmitJob() {
+ String jobId = submitJob("BATCH").getBody().jsonPath().getString("jobId");
+ SeaTunnelServer seaTunnelServer =
+ (SeaTunnelServer)
+ hazelcastInstance
+ .node
+ .getNodeExtension()
+ .createExtensionServices()
+ .get(Constant.SEATUNNEL_SERVICE_NAME);
+ JobStatus jobStatus =
+ seaTunnelServer.getCoordinatorService().getJobStatus(Long.parseLong(jobId));
+ Assertions.assertEquals(JobStatus.RUNNING, jobStatus);
+ Awaitility.await()
+ .atMost(2, TimeUnit.MINUTES)
+ .untilAsserted(
+ () ->
+ Assertions.assertEquals(
+ JobStatus.FINISHED,
+ seaTunnelServer
+ .getCoordinatorService()
+ .getJobStatus(Long.parseLong(jobId))));
+ }
+
+ @Test
+ public void testStopJob() {
+ String jobId = submitJob("STREAMING").getBody().jsonPath().getString("jobId");
+ SeaTunnelServer seaTunnelServer =
+ (SeaTunnelServer)
+ hazelcastInstance
+ .node
+ .getNodeExtension()
+ .createExtensionServices()
+ .get(Constant.SEATUNNEL_SERVICE_NAME);
+ Awaitility.await()
+ .atMost(2, TimeUnit.MINUTES)
+ .untilAsserted(
+ () ->
+ Assertions.assertEquals(
+ JobStatus.RUNNING,
+ seaTunnelServer
+ .getCoordinatorService()
+ .getJobStatus(Long.parseLong(jobId))));
+
+ String parameters = "{" + "\"jobId\":" + jobId + "," + "\"isStopWithSavePoint\":true}";
+
+ given().body(parameters)
+ .post(
+ HOST
+ + hazelcastInstance
+ .getCluster()
+ .getLocalMember()
+ .getAddress()
+ .getPort()
+ + RestConstant.STOP_JOB_URL)
+ .then()
+ .statusCode(200)
+ .body("jobId", equalTo(jobId));
+
+ Awaitility.await()
+ .atMost(6, TimeUnit.MINUTES)
+ .untilAsserted(
+ () ->
+ Assertions.assertEquals(
+ JobStatus.FINISHED,
+ seaTunnelServer
+ .getCoordinatorService()
+ .getJobStatus(Long.parseLong(jobId))));
+
+ String jobId2 = submitJob("STREAMING").getBody().jsonPath().getString("jobId");
+
+ Awaitility.await()
+ .atMost(2, TimeUnit.MINUTES)
+ .untilAsserted(
+ () ->
+ Assertions.assertEquals(
+ JobStatus.RUNNING,
+ seaTunnelServer
+ .getCoordinatorService()
+ .getJobStatus(Long.parseLong(jobId2))));
+ parameters = "{" + "\"jobId\":" + jobId2 + "," + "\"isStopWithSavePoint\":false}";
+
+ given().body(parameters)
+ .post(
+ HOST
+ + hazelcastInstance
+ .getCluster()
+ .getLocalMember()
+ .getAddress()
+ .getPort()
+ + RestConstant.STOP_JOB_URL)
+ .then()
+ .statusCode(200)
+ .body("jobId", equalTo(jobId2));
+
+ Awaitility.await()
+ .atMost(2, TimeUnit.MINUTES)
+ .untilAsserted(
+ () ->
+ Assertions.assertEquals(
+ JobStatus.CANCELED,
+ seaTunnelServer
+ .getCoordinatorService()
+ .getJobStatus(Long.parseLong(jobId2))));
+ }
+
+ @AfterEach
+ void afterClass() {
+ if (hazelcastInstance != null) {
+ hazelcastInstance.shutdown();
+ }
+ }
+
+ private Response submitJob(String jobMode) {
String requestBody =
"{\n"
+ " \"env\": {\n"
- + " \"job.mode\": \"batch\"\n"
+ + " \"job.mode\": \""
+ + jobMode
+ + "\"\n"
+ " },\n"
+ " \"source\": [\n"
+ " {\n"
@@ -181,32 +295,6 @@ public void testSubmitJob() {
+ parameters);
response.then().statusCode(200).body("jobName", equalTo("test"));
- String jobId = response.getBody().jsonPath().getString("jobId");
- SeaTunnelServer seaTunnelServer =
- (SeaTunnelServer)
- hazelcastInstance
- .node
- .getNodeExtension()
- .createExtensionServices()
- .get(Constant.SEATUNNEL_SERVICE_NAME);
- JobStatus jobStatus =
- seaTunnelServer.getCoordinatorService().getJobStatus(Long.parseLong(jobId));
- Assertions.assertEquals(JobStatus.RUNNING, jobStatus);
- Awaitility.await()
- .atMost(2, TimeUnit.MINUTES)
- .untilAsserted(
- () ->
- Assertions.assertEquals(
- JobStatus.FINISHED,
- seaTunnelServer
- .getCoordinatorService()
- .getJobStatus(Long.parseLong(jobId))));
- }
-
- @AfterAll
- static void afterClass() {
- if (hazelcastInstance != null) {
- hazelcastInstance.shutdown();
- }
+ return response;
}
}
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/RestConstant.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/RestConstant.java
index 7776d592b8f..c3178e36725 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/RestConstant.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/RestConstant.java
@@ -19,10 +19,33 @@
public class RestConstant {
+ public static final String JOB_ID = "jobId";
+
+ public static final String JOB_NAME = "jobName";
+
+ public static final String IS_START_WITH_SAVE_POINT = "isStartWithSavePoint";
+
+ public static final String IS_STOP_WITH_SAVE_POINT = "isStopWithSavePoint";
+
+ public static final String JOB_STATUS = "jobStatus";
+
+ public static final String CREATE_TIME = "createTime";
+
+ public static final String ENV_OPTIONS = "envOptions";
+
+ public static final String JOB_DAG = "jobDag";
+
+ public static final String PLUGIN_JARS_URLS = "pluginJarsUrls";
+
+ public static final String JAR_PATH = "jarPath";
+
+ public static final String METRICS = "metrics";
public static final String RUNNING_JOBS_URL = "/hazelcast/rest/maps/running-jobs";
public static final String RUNNING_JOB_URL = "/hazelcast/rest/maps/running-job";
public static final String SUBMIT_JOB_URL = "/hazelcast/rest/maps/submit-job";
public static final String SYSTEM_MONITORING_INFORMATION =
"/hazelcast/rest/maps/system-monitoring-information";
+
+ public static final String STOP_JOB_URL = "/hazelcast/rest/maps/stop-job";
}
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/RestHttpGetCommandProcessor.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/RestHttpGetCommandProcessor.java
index 4c1debd6f87..9addb8d8ec8 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/RestHttpGetCommandProcessor.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/RestHttpGetCommandProcessor.java
@@ -203,8 +203,8 @@ private Map getJobMetrics(String jobMetrics) {
} catch (JsonProcessingException | NullPointerException e) {
return metricsMap;
}
- metricsMap.put("sourceReceivedCount", sourceReadCount);
- metricsMap.put("sinkWriteCount", sinkWriteCount);
+ metricsMap.put(SOURCE_RECEIVED_COUNT, sourceReadCount);
+ metricsMap.put(SINK_WRITE_COUNT, sinkWriteCount);
return metricsMap;
}
@@ -243,28 +243,33 @@ private JsonObject convertToJson(JobInfo jobInfo, long jobId) {
JobStatus jobStatus = getSeaTunnelServer().getCoordinatorService().getJobStatus(jobId);
jobInfoJson
- .add("jobId", String.valueOf(jobId))
- .add("jobName", logicalDag.getJobConfig().getName())
- .add("jobStatus", jobStatus.toString())
- .add("envOptions", JsonUtil.toJsonObject(logicalDag.getJobConfig().getEnvOptions()))
+ .add(RestConstant.JOB_ID, String.valueOf(jobId))
+ .add(RestConstant.JOB_NAME, logicalDag.getJobConfig().getName())
+ .add(RestConstant.JOB_STATUS, jobStatus.toString())
.add(
- "createTime",
+ RestConstant.ENV_OPTIONS,
+ JsonUtil.toJsonObject(logicalDag.getJobConfig().getEnvOptions()))
+ .add(
+ RestConstant.CREATE_TIME,
new SimpleDateFormat("yyyy-MM-dd HH:mm:ss")
.format(new Date(jobImmutableInformation.getCreateTime())))
- .add("jobDag", logicalDag.getLogicalDagAsJson())
+ .add(RestConstant.JOB_DAG, logicalDag.getLogicalDagAsJson())
.add(
- "pluginJarsUrls",
+ RestConstant.PLUGIN_JARS_URLS,
(JsonValue)
jobImmutableInformation.getPluginJarsUrls().stream()
.map(
url -> {
JsonObject jarUrl = new JsonObject();
- jarUrl.add("jarPath", url.toString());
+ jarUrl.add(
+ RestConstant.JAR_PATH, url.toString());
return jarUrl;
})
.collect(JsonArray::new, JsonArray::add, JsonArray::add))
- .add("isStartWithSavePoint", jobImmutableInformation.isStartWithSavePoint())
- .add("metrics", JsonUtil.toJsonObject(getJobMetrics(jobMetrics)));
+ .add(
+ RestConstant.IS_START_WITH_SAVE_POINT,
+ jobImmutableInformation.isStartWithSavePoint())
+ .add(RestConstant.METRICS, JsonUtil.toJsonObject(getJobMetrics(jobMetrics)));
return jobInfoJson;
}
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/RestHttpPostCommandProcessor.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/RestHttpPostCommandProcessor.java
index e0edd932032..66a9131f65b 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/RestHttpPostCommandProcessor.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/RestHttpPostCommandProcessor.java
@@ -20,6 +20,7 @@
import org.apache.seatunnel.shade.com.fasterxml.jackson.databind.JsonNode;
import org.apache.seatunnel.shade.com.typesafe.config.Config;
+import org.apache.seatunnel.common.utils.JsonUtils;
import org.apache.seatunnel.engine.common.Constant;
import org.apache.seatunnel.engine.common.config.JobConfig;
import org.apache.seatunnel.engine.common.utils.PassiveCompletableFuture;
@@ -42,6 +43,7 @@
import static com.hazelcast.internal.ascii.rest.HttpStatusCode.SC_400;
import static com.hazelcast.internal.ascii.rest.HttpStatusCode.SC_500;
+import static org.apache.seatunnel.engine.server.rest.RestConstant.STOP_JOB_URL;
import static org.apache.seatunnel.engine.server.rest.RestConstant.SUBMIT_JOB_URL;
public class RestHttpPostCommandProcessor extends HttpCommandProcessor {
@@ -66,6 +68,8 @@ public void handle(HttpPostCommand httpPostCommand) {
try {
if (uri.startsWith(SUBMIT_JOB_URL)) {
handleSubmitJob(httpPostCommand, uri);
+ } else if (uri.startsWith(STOP_JOB_URL)) {
+ handleStopJob(httpPostCommand, uri);
} else {
original.handle(httpPostCommand);
}
@@ -89,26 +93,17 @@ private void handleSubmitJob(HttpPostCommand httpPostCommand, String uri)
throws IllegalArgumentException {
Map requestParams = new HashMap<>();
RestUtil.buildRequestParams(requestParams, uri);
- byte[] requestBody = httpPostCommand.getData();
- if (requestBody.length == 0) {
- throw new IllegalArgumentException("Request body is empty.");
- }
- JsonNode requestBodyJsonNode;
- try {
- requestBodyJsonNode = RestUtil.convertByteToJsonNode(requestBody);
- } catch (IOException e) {
- throw new IllegalArgumentException("Invalid JSON format in request body.");
- }
- Config config = RestUtil.buildConfig(requestBodyJsonNode);
+ Config config = RestUtil.buildConfig(requestHandle(httpPostCommand));
JobConfig jobConfig = new JobConfig();
- jobConfig.setName(requestParams.get("jobName"));
+ jobConfig.setName(requestParams.get(RestConstant.JOB_NAME));
JobImmutableInformationEnv jobImmutableInformationEnv =
new JobImmutableInformationEnv(
jobConfig,
config,
textCommandService.getNode(),
- Boolean.parseBoolean(requestParams.get("isStartWithSavePoint")),
- Long.parseLong(requestParams.get("jobId")));
+ Boolean.parseBoolean(
+ requestParams.get(RestConstant.IS_START_WITH_SAVE_POINT)),
+ Long.parseLong(requestParams.get(RestConstant.JOB_ID)));
JobImmutableInformation jobImmutableInformation = jobImmutableInformationEnv.build();
CoordinatorService coordinatorService = getSeaTunnelServer().getCoordinatorService();
Data data =
@@ -125,11 +120,52 @@ private void handleSubmitJob(HttpPostCommand httpPostCommand, String uri)
Long jobId = jobImmutableInformationEnv.getJobId();
this.prepareResponse(
httpPostCommand,
- new JsonObject().add("jobId", jobId).add("jobName", requestParams.get("jobName")));
+ new JsonObject()
+ .add(RestConstant.JOB_ID, jobId)
+ .add(RestConstant.JOB_NAME, requestParams.get(RestConstant.JOB_NAME)));
+ }
+
+ private void handleStopJob(HttpPostCommand httpPostCommand, String uri) {
+ Map map = JsonUtils.toMap(requestHandle(httpPostCommand));
+ boolean isStopWithSavePoint = false;
+ if (map.get(RestConstant.JOB_ID) == null) {
+ throw new IllegalArgumentException("jobId cannot be empty.");
+ }
+ long jobId = Long.parseLong(map.get(RestConstant.JOB_ID).toString());
+ if (map.get(RestConstant.IS_STOP_WITH_SAVE_POINT) != null) {
+ isStopWithSavePoint =
+ Boolean.parseBoolean(map.get(RestConstant.IS_STOP_WITH_SAVE_POINT).toString());
+ }
+
+ CoordinatorService coordinatorService = getSeaTunnelServer().getCoordinatorService();
+
+ if (isStopWithSavePoint) {
+ coordinatorService.savePoint(jobId);
+ } else {
+ coordinatorService.cancelJob(jobId);
+ }
+
+ this.prepareResponse(
+ httpPostCommand,
+ new JsonObject().add(RestConstant.JOB_ID, map.get(RestConstant.JOB_ID).toString()));
}
@Override
public void handleRejection(HttpPostCommand httpPostCommand) {
handle(httpPostCommand);
}
+
+ private JsonNode requestHandle(HttpPostCommand httpPostCommand) {
+ byte[] requestBody = httpPostCommand.getData();
+ if (requestBody.length == 0) {
+ throw new IllegalArgumentException("Request body is empty.");
+ }
+ JsonNode requestBodyJsonNode;
+ try {
+ requestBodyJsonNode = RestUtil.convertByteToJsonNode(requestBody);
+ } catch (IOException e) {
+ throw new IllegalArgumentException("Invalid JSON format in request body.");
+ }
+ return requestBodyJsonNode;
+ }
}
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/utils/RestUtil.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/utils/RestUtil.java
index d3761366d09..51c50d85a2b 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/utils/RestUtil.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/utils/RestUtil.java
@@ -24,6 +24,7 @@
import org.apache.seatunnel.common.Constants;
import org.apache.seatunnel.common.utils.JsonUtils;
import org.apache.seatunnel.core.starter.utils.ConfigBuilder;
+import org.apache.seatunnel.engine.server.rest.RestConstant;
import com.hazelcast.internal.util.StringUtil;
@@ -40,9 +41,9 @@ public static JsonNode convertByteToJsonNode(byte[] byteData) throws IOException
}
public static void buildRequestParams(Map requestParams, String uri) {
- requestParams.put("jobId", null);
- requestParams.put("jobName", Constants.LOGO);
- requestParams.put("isStartWithSavePoint", String.valueOf(false));
+ requestParams.put(RestConstant.JOB_ID, null);
+ requestParams.put(RestConstant.JOB_NAME, Constants.LOGO);
+ requestParams.put(RestConstant.IS_START_WITH_SAVE_POINT, String.valueOf(false));
uri = StringUtil.stripTrailingSlash(uri);
if (!uri.contains("?")) {
return;