Skip to content

Commit

Permalink
[Feature][Zeta][REST-API] Stop a running job. (#5512)
Browse files Browse the repository at this point in the history
  • Loading branch information
liugddx authored Oct 4, 2023
1 parent 70a3980 commit 8d2fafc
Show file tree
Hide file tree
Showing 6 changed files with 239 additions and 62 deletions.
24 changes: 24 additions & 0 deletions docs/en/seatunnel-engine/rest-api.md
Original file line number Diff line number Diff line change
Expand Up @@ -238,3 +238,27 @@ network:

------------------------------------------------------------------------------------------

### Stop Job.

<details>
<summary><code>POST</code> <code><b>/hazelcast/rest/maps/stop-job</b></code> <code>(Returns jobId if job stoped successfully.)</code></summary>

#### Body

```json
{
"jobId": 733584788375666689,
"isStopWithSavePoint": false # if job is stopped with save point
}
```

#### Responses

```json
{
"jobId": 733584788375666689
}
```

</details>

Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,9 @@
import org.apache.seatunnel.engine.server.rest.RestConstant;

import org.awaitility.Awaitility;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

import com.hazelcast.client.config.ClientConfig;
Expand All @@ -57,8 +57,8 @@ public class RestApiIT {

private static HazelcastInstanceImpl hazelcastInstance;

@BeforeAll
static void beforeClass() throws Exception {
@BeforeEach
void beforeClass() throws Exception {
String testClusterName = TestUtils.getClusterName("RestApiIT");
SeaTunnelConfig seaTunnelConfig = ConfigProvider.locateAndGetSeaTunnelConfig();
seaTunnelConfig.getHazelcastConfig().setClusterName(testClusterName);
Expand Down Expand Up @@ -136,10 +136,124 @@ public void testSystemMonitoringInformation() {

@Test
public void testSubmitJob() {
String jobId = submitJob("BATCH").getBody().jsonPath().getString("jobId");
SeaTunnelServer seaTunnelServer =
(SeaTunnelServer)
hazelcastInstance
.node
.getNodeExtension()
.createExtensionServices()
.get(Constant.SEATUNNEL_SERVICE_NAME);
JobStatus jobStatus =
seaTunnelServer.getCoordinatorService().getJobStatus(Long.parseLong(jobId));
Assertions.assertEquals(JobStatus.RUNNING, jobStatus);
Awaitility.await()
.atMost(2, TimeUnit.MINUTES)
.untilAsserted(
() ->
Assertions.assertEquals(
JobStatus.FINISHED,
seaTunnelServer
.getCoordinatorService()
.getJobStatus(Long.parseLong(jobId))));
}

@Test
public void testStopJob() {
String jobId = submitJob("STREAMING").getBody().jsonPath().getString("jobId");
SeaTunnelServer seaTunnelServer =
(SeaTunnelServer)
hazelcastInstance
.node
.getNodeExtension()
.createExtensionServices()
.get(Constant.SEATUNNEL_SERVICE_NAME);
Awaitility.await()
.atMost(2, TimeUnit.MINUTES)
.untilAsserted(
() ->
Assertions.assertEquals(
JobStatus.RUNNING,
seaTunnelServer
.getCoordinatorService()
.getJobStatus(Long.parseLong(jobId))));

String parameters = "{" + "\"jobId\":" + jobId + "," + "\"isStopWithSavePoint\":true}";

given().body(parameters)
.post(
HOST
+ hazelcastInstance
.getCluster()
.getLocalMember()
.getAddress()
.getPort()
+ RestConstant.STOP_JOB_URL)
.then()
.statusCode(200)
.body("jobId", equalTo(jobId));

Awaitility.await()
.atMost(6, TimeUnit.MINUTES)
.untilAsserted(
() ->
Assertions.assertEquals(
JobStatus.FINISHED,
seaTunnelServer
.getCoordinatorService()
.getJobStatus(Long.parseLong(jobId))));

String jobId2 = submitJob("STREAMING").getBody().jsonPath().getString("jobId");

Awaitility.await()
.atMost(2, TimeUnit.MINUTES)
.untilAsserted(
() ->
Assertions.assertEquals(
JobStatus.RUNNING,
seaTunnelServer
.getCoordinatorService()
.getJobStatus(Long.parseLong(jobId2))));
parameters = "{" + "\"jobId\":" + jobId2 + "," + "\"isStopWithSavePoint\":false}";

given().body(parameters)
.post(
HOST
+ hazelcastInstance
.getCluster()
.getLocalMember()
.getAddress()
.getPort()
+ RestConstant.STOP_JOB_URL)
.then()
.statusCode(200)
.body("jobId", equalTo(jobId2));

Awaitility.await()
.atMost(2, TimeUnit.MINUTES)
.untilAsserted(
() ->
Assertions.assertEquals(
JobStatus.CANCELED,
seaTunnelServer
.getCoordinatorService()
.getJobStatus(Long.parseLong(jobId2))));
}

@AfterEach
void afterClass() {
if (hazelcastInstance != null) {
hazelcastInstance.shutdown();
}
}

private Response submitJob(String jobMode) {
String requestBody =
"{\n"
+ " \"env\": {\n"
+ " \"job.mode\": \"batch\"\n"
+ " \"job.mode\": \""
+ jobMode
+ "\"\n"
+ " },\n"
+ " \"source\": [\n"
+ " {\n"
Expand Down Expand Up @@ -181,32 +295,6 @@ public void testSubmitJob() {
+ parameters);

response.then().statusCode(200).body("jobName", equalTo("test"));
String jobId = response.getBody().jsonPath().getString("jobId");
SeaTunnelServer seaTunnelServer =
(SeaTunnelServer)
hazelcastInstance
.node
.getNodeExtension()
.createExtensionServices()
.get(Constant.SEATUNNEL_SERVICE_NAME);
JobStatus jobStatus =
seaTunnelServer.getCoordinatorService().getJobStatus(Long.parseLong(jobId));
Assertions.assertEquals(JobStatus.RUNNING, jobStatus);
Awaitility.await()
.atMost(2, TimeUnit.MINUTES)
.untilAsserted(
() ->
Assertions.assertEquals(
JobStatus.FINISHED,
seaTunnelServer
.getCoordinatorService()
.getJobStatus(Long.parseLong(jobId))));
}

@AfterAll
static void afterClass() {
if (hazelcastInstance != null) {
hazelcastInstance.shutdown();
}
return response;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,33 @@

public class RestConstant {

public static final String JOB_ID = "jobId";

public static final String JOB_NAME = "jobName";

public static final String IS_START_WITH_SAVE_POINT = "isStartWithSavePoint";

public static final String IS_STOP_WITH_SAVE_POINT = "isStopWithSavePoint";

public static final String JOB_STATUS = "jobStatus";

public static final String CREATE_TIME = "createTime";

public static final String ENV_OPTIONS = "envOptions";

public static final String JOB_DAG = "jobDag";

public static final String PLUGIN_JARS_URLS = "pluginJarsUrls";

public static final String JAR_PATH = "jarPath";

public static final String METRICS = "metrics";
public static final String RUNNING_JOBS_URL = "/hazelcast/rest/maps/running-jobs";
public static final String RUNNING_JOB_URL = "/hazelcast/rest/maps/running-job";
public static final String SUBMIT_JOB_URL = "/hazelcast/rest/maps/submit-job";

public static final String SYSTEM_MONITORING_INFORMATION =
"/hazelcast/rest/maps/system-monitoring-information";

public static final String STOP_JOB_URL = "/hazelcast/rest/maps/stop-job";
}
Original file line number Diff line number Diff line change
Expand Up @@ -203,8 +203,8 @@ private Map<String, Long> getJobMetrics(String jobMetrics) {
} catch (JsonProcessingException | NullPointerException e) {
return metricsMap;
}
metricsMap.put("sourceReceivedCount", sourceReadCount);
metricsMap.put("sinkWriteCount", sinkWriteCount);
metricsMap.put(SOURCE_RECEIVED_COUNT, sourceReadCount);
metricsMap.put(SINK_WRITE_COUNT, sinkWriteCount);

return metricsMap;
}
Expand Down Expand Up @@ -243,28 +243,33 @@ private JsonObject convertToJson(JobInfo jobInfo, long jobId) {
JobStatus jobStatus = getSeaTunnelServer().getCoordinatorService().getJobStatus(jobId);

jobInfoJson
.add("jobId", String.valueOf(jobId))
.add("jobName", logicalDag.getJobConfig().getName())
.add("jobStatus", jobStatus.toString())
.add("envOptions", JsonUtil.toJsonObject(logicalDag.getJobConfig().getEnvOptions()))
.add(RestConstant.JOB_ID, String.valueOf(jobId))
.add(RestConstant.JOB_NAME, logicalDag.getJobConfig().getName())
.add(RestConstant.JOB_STATUS, jobStatus.toString())
.add(
"createTime",
RestConstant.ENV_OPTIONS,
JsonUtil.toJsonObject(logicalDag.getJobConfig().getEnvOptions()))
.add(
RestConstant.CREATE_TIME,
new SimpleDateFormat("yyyy-MM-dd HH:mm:ss")
.format(new Date(jobImmutableInformation.getCreateTime())))
.add("jobDag", logicalDag.getLogicalDagAsJson())
.add(RestConstant.JOB_DAG, logicalDag.getLogicalDagAsJson())
.add(
"pluginJarsUrls",
RestConstant.PLUGIN_JARS_URLS,
(JsonValue)
jobImmutableInformation.getPluginJarsUrls().stream()
.map(
url -> {
JsonObject jarUrl = new JsonObject();
jarUrl.add("jarPath", url.toString());
jarUrl.add(
RestConstant.JAR_PATH, url.toString());
return jarUrl;
})
.collect(JsonArray::new, JsonArray::add, JsonArray::add))
.add("isStartWithSavePoint", jobImmutableInformation.isStartWithSavePoint())
.add("metrics", JsonUtil.toJsonObject(getJobMetrics(jobMetrics)));
.add(
RestConstant.IS_START_WITH_SAVE_POINT,
jobImmutableInformation.isStartWithSavePoint())
.add(RestConstant.METRICS, JsonUtil.toJsonObject(getJobMetrics(jobMetrics)));

return jobInfoJson;
}
Expand Down
Loading

0 comments on commit 8d2fafc

Please sign in to comment.