diff --git a/docs/en/seatunnel-engine/rest-api.md b/docs/en/seatunnel-engine/rest-api.md index 2f44421a3d6..3f8cf910ea0 100644 --- a/docs/en/seatunnel-engine/rest-api.md +++ b/docs/en/seatunnel-engine/rest-api.md @@ -238,3 +238,27 @@ network: ------------------------------------------------------------------------------------------ +### Stop Job. + +
+POST /hazelcast/rest/maps/stop-job (Returns jobId if job stoped successfully.) + +#### Body + +```json +{ + "jobId": 733584788375666689, + "isStopWithSavePoint": false # if job is stopped with save point +} +``` + +#### Responses + +```json +{ +"jobId": 733584788375666689 +} +``` + +
+ diff --git a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/RestApiIT.java b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/RestApiIT.java index d38d1c732f1..d896ec17bf7 100644 --- a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/RestApiIT.java +++ b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/RestApiIT.java @@ -32,9 +32,9 @@ import org.apache.seatunnel.engine.server.rest.RestConstant; import org.awaitility.Awaitility; -import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.Assertions; -import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import com.hazelcast.client.config.ClientConfig; @@ -57,8 +57,8 @@ public class RestApiIT { private static HazelcastInstanceImpl hazelcastInstance; - @BeforeAll - static void beforeClass() throws Exception { + @BeforeEach + void beforeClass() throws Exception { String testClusterName = TestUtils.getClusterName("RestApiIT"); SeaTunnelConfig seaTunnelConfig = ConfigProvider.locateAndGetSeaTunnelConfig(); seaTunnelConfig.getHazelcastConfig().setClusterName(testClusterName); @@ -136,10 +136,124 @@ public void testSystemMonitoringInformation() { @Test public void testSubmitJob() { + String jobId = submitJob("BATCH").getBody().jsonPath().getString("jobId"); + SeaTunnelServer seaTunnelServer = + (SeaTunnelServer) + hazelcastInstance + .node + .getNodeExtension() + .createExtensionServices() + .get(Constant.SEATUNNEL_SERVICE_NAME); + JobStatus jobStatus = + seaTunnelServer.getCoordinatorService().getJobStatus(Long.parseLong(jobId)); + Assertions.assertEquals(JobStatus.RUNNING, jobStatus); + Awaitility.await() + .atMost(2, TimeUnit.MINUTES) + .untilAsserted( + () -> + Assertions.assertEquals( + JobStatus.FINISHED, + seaTunnelServer + .getCoordinatorService() + .getJobStatus(Long.parseLong(jobId)))); + } + + @Test + public void testStopJob() { + String jobId = submitJob("STREAMING").getBody().jsonPath().getString("jobId"); + SeaTunnelServer seaTunnelServer = + (SeaTunnelServer) + hazelcastInstance + .node + .getNodeExtension() + .createExtensionServices() + .get(Constant.SEATUNNEL_SERVICE_NAME); + Awaitility.await() + .atMost(2, TimeUnit.MINUTES) + .untilAsserted( + () -> + Assertions.assertEquals( + JobStatus.RUNNING, + seaTunnelServer + .getCoordinatorService() + .getJobStatus(Long.parseLong(jobId)))); + + String parameters = "{" + "\"jobId\":" + jobId + "," + "\"isStopWithSavePoint\":true}"; + + given().body(parameters) + .post( + HOST + + hazelcastInstance + .getCluster() + .getLocalMember() + .getAddress() + .getPort() + + RestConstant.STOP_JOB_URL) + .then() + .statusCode(200) + .body("jobId", equalTo(jobId)); + + Awaitility.await() + .atMost(6, TimeUnit.MINUTES) + .untilAsserted( + () -> + Assertions.assertEquals( + JobStatus.FINISHED, + seaTunnelServer + .getCoordinatorService() + .getJobStatus(Long.parseLong(jobId)))); + + String jobId2 = submitJob("STREAMING").getBody().jsonPath().getString("jobId"); + + Awaitility.await() + .atMost(2, TimeUnit.MINUTES) + .untilAsserted( + () -> + Assertions.assertEquals( + JobStatus.RUNNING, + seaTunnelServer + .getCoordinatorService() + .getJobStatus(Long.parseLong(jobId2)))); + parameters = "{" + "\"jobId\":" + jobId2 + "," + "\"isStopWithSavePoint\":false}"; + + given().body(parameters) + .post( + HOST + + hazelcastInstance + .getCluster() + .getLocalMember() + .getAddress() + .getPort() + + RestConstant.STOP_JOB_URL) + .then() + .statusCode(200) + .body("jobId", equalTo(jobId2)); + + Awaitility.await() + .atMost(2, TimeUnit.MINUTES) + .untilAsserted( + () -> + Assertions.assertEquals( + JobStatus.CANCELED, + seaTunnelServer + .getCoordinatorService() + .getJobStatus(Long.parseLong(jobId2)))); + } + + @AfterEach + void afterClass() { + if (hazelcastInstance != null) { + hazelcastInstance.shutdown(); + } + } + + private Response submitJob(String jobMode) { String requestBody = "{\n" + " \"env\": {\n" - + " \"job.mode\": \"batch\"\n" + + " \"job.mode\": \"" + + jobMode + + "\"\n" + " },\n" + " \"source\": [\n" + " {\n" @@ -181,32 +295,6 @@ public void testSubmitJob() { + parameters); response.then().statusCode(200).body("jobName", equalTo("test")); - String jobId = response.getBody().jsonPath().getString("jobId"); - SeaTunnelServer seaTunnelServer = - (SeaTunnelServer) - hazelcastInstance - .node - .getNodeExtension() - .createExtensionServices() - .get(Constant.SEATUNNEL_SERVICE_NAME); - JobStatus jobStatus = - seaTunnelServer.getCoordinatorService().getJobStatus(Long.parseLong(jobId)); - Assertions.assertEquals(JobStatus.RUNNING, jobStatus); - Awaitility.await() - .atMost(2, TimeUnit.MINUTES) - .untilAsserted( - () -> - Assertions.assertEquals( - JobStatus.FINISHED, - seaTunnelServer - .getCoordinatorService() - .getJobStatus(Long.parseLong(jobId)))); - } - - @AfterAll - static void afterClass() { - if (hazelcastInstance != null) { - hazelcastInstance.shutdown(); - } + return response; } } diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/RestConstant.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/RestConstant.java index 7776d592b8f..c3178e36725 100644 --- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/RestConstant.java +++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/RestConstant.java @@ -19,10 +19,33 @@ public class RestConstant { + public static final String JOB_ID = "jobId"; + + public static final String JOB_NAME = "jobName"; + + public static final String IS_START_WITH_SAVE_POINT = "isStartWithSavePoint"; + + public static final String IS_STOP_WITH_SAVE_POINT = "isStopWithSavePoint"; + + public static final String JOB_STATUS = "jobStatus"; + + public static final String CREATE_TIME = "createTime"; + + public static final String ENV_OPTIONS = "envOptions"; + + public static final String JOB_DAG = "jobDag"; + + public static final String PLUGIN_JARS_URLS = "pluginJarsUrls"; + + public static final String JAR_PATH = "jarPath"; + + public static final String METRICS = "metrics"; public static final String RUNNING_JOBS_URL = "/hazelcast/rest/maps/running-jobs"; public static final String RUNNING_JOB_URL = "/hazelcast/rest/maps/running-job"; public static final String SUBMIT_JOB_URL = "/hazelcast/rest/maps/submit-job"; public static final String SYSTEM_MONITORING_INFORMATION = "/hazelcast/rest/maps/system-monitoring-information"; + + public static final String STOP_JOB_URL = "/hazelcast/rest/maps/stop-job"; } diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/RestHttpGetCommandProcessor.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/RestHttpGetCommandProcessor.java index 4c1debd6f87..9addb8d8ec8 100644 --- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/RestHttpGetCommandProcessor.java +++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/RestHttpGetCommandProcessor.java @@ -203,8 +203,8 @@ private Map getJobMetrics(String jobMetrics) { } catch (JsonProcessingException | NullPointerException e) { return metricsMap; } - metricsMap.put("sourceReceivedCount", sourceReadCount); - metricsMap.put("sinkWriteCount", sinkWriteCount); + metricsMap.put(SOURCE_RECEIVED_COUNT, sourceReadCount); + metricsMap.put(SINK_WRITE_COUNT, sinkWriteCount); return metricsMap; } @@ -243,28 +243,33 @@ private JsonObject convertToJson(JobInfo jobInfo, long jobId) { JobStatus jobStatus = getSeaTunnelServer().getCoordinatorService().getJobStatus(jobId); jobInfoJson - .add("jobId", String.valueOf(jobId)) - .add("jobName", logicalDag.getJobConfig().getName()) - .add("jobStatus", jobStatus.toString()) - .add("envOptions", JsonUtil.toJsonObject(logicalDag.getJobConfig().getEnvOptions())) + .add(RestConstant.JOB_ID, String.valueOf(jobId)) + .add(RestConstant.JOB_NAME, logicalDag.getJobConfig().getName()) + .add(RestConstant.JOB_STATUS, jobStatus.toString()) .add( - "createTime", + RestConstant.ENV_OPTIONS, + JsonUtil.toJsonObject(logicalDag.getJobConfig().getEnvOptions())) + .add( + RestConstant.CREATE_TIME, new SimpleDateFormat("yyyy-MM-dd HH:mm:ss") .format(new Date(jobImmutableInformation.getCreateTime()))) - .add("jobDag", logicalDag.getLogicalDagAsJson()) + .add(RestConstant.JOB_DAG, logicalDag.getLogicalDagAsJson()) .add( - "pluginJarsUrls", + RestConstant.PLUGIN_JARS_URLS, (JsonValue) jobImmutableInformation.getPluginJarsUrls().stream() .map( url -> { JsonObject jarUrl = new JsonObject(); - jarUrl.add("jarPath", url.toString()); + jarUrl.add( + RestConstant.JAR_PATH, url.toString()); return jarUrl; }) .collect(JsonArray::new, JsonArray::add, JsonArray::add)) - .add("isStartWithSavePoint", jobImmutableInformation.isStartWithSavePoint()) - .add("metrics", JsonUtil.toJsonObject(getJobMetrics(jobMetrics))); + .add( + RestConstant.IS_START_WITH_SAVE_POINT, + jobImmutableInformation.isStartWithSavePoint()) + .add(RestConstant.METRICS, JsonUtil.toJsonObject(getJobMetrics(jobMetrics))); return jobInfoJson; } diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/RestHttpPostCommandProcessor.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/RestHttpPostCommandProcessor.java index e0edd932032..66a9131f65b 100644 --- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/RestHttpPostCommandProcessor.java +++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/RestHttpPostCommandProcessor.java @@ -20,6 +20,7 @@ import org.apache.seatunnel.shade.com.fasterxml.jackson.databind.JsonNode; import org.apache.seatunnel.shade.com.typesafe.config.Config; +import org.apache.seatunnel.common.utils.JsonUtils; import org.apache.seatunnel.engine.common.Constant; import org.apache.seatunnel.engine.common.config.JobConfig; import org.apache.seatunnel.engine.common.utils.PassiveCompletableFuture; @@ -42,6 +43,7 @@ import static com.hazelcast.internal.ascii.rest.HttpStatusCode.SC_400; import static com.hazelcast.internal.ascii.rest.HttpStatusCode.SC_500; +import static org.apache.seatunnel.engine.server.rest.RestConstant.STOP_JOB_URL; import static org.apache.seatunnel.engine.server.rest.RestConstant.SUBMIT_JOB_URL; public class RestHttpPostCommandProcessor extends HttpCommandProcessor { @@ -66,6 +68,8 @@ public void handle(HttpPostCommand httpPostCommand) { try { if (uri.startsWith(SUBMIT_JOB_URL)) { handleSubmitJob(httpPostCommand, uri); + } else if (uri.startsWith(STOP_JOB_URL)) { + handleStopJob(httpPostCommand, uri); } else { original.handle(httpPostCommand); } @@ -89,26 +93,17 @@ private void handleSubmitJob(HttpPostCommand httpPostCommand, String uri) throws IllegalArgumentException { Map requestParams = new HashMap<>(); RestUtil.buildRequestParams(requestParams, uri); - byte[] requestBody = httpPostCommand.getData(); - if (requestBody.length == 0) { - throw new IllegalArgumentException("Request body is empty."); - } - JsonNode requestBodyJsonNode; - try { - requestBodyJsonNode = RestUtil.convertByteToJsonNode(requestBody); - } catch (IOException e) { - throw new IllegalArgumentException("Invalid JSON format in request body."); - } - Config config = RestUtil.buildConfig(requestBodyJsonNode); + Config config = RestUtil.buildConfig(requestHandle(httpPostCommand)); JobConfig jobConfig = new JobConfig(); - jobConfig.setName(requestParams.get("jobName")); + jobConfig.setName(requestParams.get(RestConstant.JOB_NAME)); JobImmutableInformationEnv jobImmutableInformationEnv = new JobImmutableInformationEnv( jobConfig, config, textCommandService.getNode(), - Boolean.parseBoolean(requestParams.get("isStartWithSavePoint")), - Long.parseLong(requestParams.get("jobId"))); + Boolean.parseBoolean( + requestParams.get(RestConstant.IS_START_WITH_SAVE_POINT)), + Long.parseLong(requestParams.get(RestConstant.JOB_ID))); JobImmutableInformation jobImmutableInformation = jobImmutableInformationEnv.build(); CoordinatorService coordinatorService = getSeaTunnelServer().getCoordinatorService(); Data data = @@ -125,11 +120,52 @@ private void handleSubmitJob(HttpPostCommand httpPostCommand, String uri) Long jobId = jobImmutableInformationEnv.getJobId(); this.prepareResponse( httpPostCommand, - new JsonObject().add("jobId", jobId).add("jobName", requestParams.get("jobName"))); + new JsonObject() + .add(RestConstant.JOB_ID, jobId) + .add(RestConstant.JOB_NAME, requestParams.get(RestConstant.JOB_NAME))); + } + + private void handleStopJob(HttpPostCommand httpPostCommand, String uri) { + Map map = JsonUtils.toMap(requestHandle(httpPostCommand)); + boolean isStopWithSavePoint = false; + if (map.get(RestConstant.JOB_ID) == null) { + throw new IllegalArgumentException("jobId cannot be empty."); + } + long jobId = Long.parseLong(map.get(RestConstant.JOB_ID).toString()); + if (map.get(RestConstant.IS_STOP_WITH_SAVE_POINT) != null) { + isStopWithSavePoint = + Boolean.parseBoolean(map.get(RestConstant.IS_STOP_WITH_SAVE_POINT).toString()); + } + + CoordinatorService coordinatorService = getSeaTunnelServer().getCoordinatorService(); + + if (isStopWithSavePoint) { + coordinatorService.savePoint(jobId); + } else { + coordinatorService.cancelJob(jobId); + } + + this.prepareResponse( + httpPostCommand, + new JsonObject().add(RestConstant.JOB_ID, map.get(RestConstant.JOB_ID).toString())); } @Override public void handleRejection(HttpPostCommand httpPostCommand) { handle(httpPostCommand); } + + private JsonNode requestHandle(HttpPostCommand httpPostCommand) { + byte[] requestBody = httpPostCommand.getData(); + if (requestBody.length == 0) { + throw new IllegalArgumentException("Request body is empty."); + } + JsonNode requestBodyJsonNode; + try { + requestBodyJsonNode = RestUtil.convertByteToJsonNode(requestBody); + } catch (IOException e) { + throw new IllegalArgumentException("Invalid JSON format in request body."); + } + return requestBodyJsonNode; + } } diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/utils/RestUtil.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/utils/RestUtil.java index d3761366d09..51c50d85a2b 100644 --- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/utils/RestUtil.java +++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/utils/RestUtil.java @@ -24,6 +24,7 @@ import org.apache.seatunnel.common.Constants; import org.apache.seatunnel.common.utils.JsonUtils; import org.apache.seatunnel.core.starter.utils.ConfigBuilder; +import org.apache.seatunnel.engine.server.rest.RestConstant; import com.hazelcast.internal.util.StringUtil; @@ -40,9 +41,9 @@ public static JsonNode convertByteToJsonNode(byte[] byteData) throws IOException } public static void buildRequestParams(Map requestParams, String uri) { - requestParams.put("jobId", null); - requestParams.put("jobName", Constants.LOGO); - requestParams.put("isStartWithSavePoint", String.valueOf(false)); + requestParams.put(RestConstant.JOB_ID, null); + requestParams.put(RestConstant.JOB_NAME, Constants.LOGO); + requestParams.put(RestConstant.IS_START_WITH_SAVE_POINT, String.valueOf(false)); uri = StringUtil.stripTrailingSlash(uri); if (!uri.contains("?")) { return;