Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Feature][Zeta][REST-API] Stop a running job. #5512

Merged
merged 5 commits into from
Oct 4, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 24 additions & 0 deletions docs/en/seatunnel-engine/rest-api.md
Original file line number Diff line number Diff line change
Expand Up @@ -238,3 +238,27 @@ network:

------------------------------------------------------------------------------------------

### Stop Job.

<details>
<summary><code>POST</code> <code><b>/hazelcast/rest/maps/stop-job</b></code> <code>(Returns jobId if job stoped successfully.)</code></summary>

#### Body

```json
{
"jobId": 733584788375666689,
"isStopWithSavePoint": false # if job is stopped with save point
}
```

#### Responses

```json
{
"jobId": 733584788375666689
}
```

</details>

Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,9 @@
import org.apache.seatunnel.engine.server.rest.RestConstant;

import org.awaitility.Awaitility;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

import com.hazelcast.client.config.ClientConfig;
Expand All @@ -57,8 +57,8 @@ public class RestApiIT {

private static HazelcastInstanceImpl hazelcastInstance;

@BeforeAll
static void beforeClass() throws Exception {
@BeforeEach
void beforeClass() throws Exception {
String testClusterName = TestUtils.getClusterName("RestApiIT");
SeaTunnelConfig seaTunnelConfig = ConfigProvider.locateAndGetSeaTunnelConfig();
seaTunnelConfig.getHazelcastConfig().setClusterName(testClusterName);
Expand Down Expand Up @@ -136,10 +136,124 @@ public void testSystemMonitoringInformation() {

@Test
public void testSubmitJob() {
String jobId = submitJob("BATCH").getBody().jsonPath().getString("jobId");
SeaTunnelServer seaTunnelServer =
(SeaTunnelServer)
hazelcastInstance
.node
.getNodeExtension()
.createExtensionServices()
.get(Constant.SEATUNNEL_SERVICE_NAME);
JobStatus jobStatus =
seaTunnelServer.getCoordinatorService().getJobStatus(Long.parseLong(jobId));
Assertions.assertEquals(JobStatus.RUNNING, jobStatus);
Awaitility.await()
.atMost(2, TimeUnit.MINUTES)
.untilAsserted(
() ->
Assertions.assertEquals(
JobStatus.FINISHED,
seaTunnelServer
.getCoordinatorService()
.getJobStatus(Long.parseLong(jobId))));
}

@Test
public void testStopJob() {
String jobId = submitJob("STREAMING").getBody().jsonPath().getString("jobId");
SeaTunnelServer seaTunnelServer =
(SeaTunnelServer)
hazelcastInstance
.node
.getNodeExtension()
.createExtensionServices()
.get(Constant.SEATUNNEL_SERVICE_NAME);
Awaitility.await()
.atMost(2, TimeUnit.MINUTES)
.untilAsserted(
() ->
Assertions.assertEquals(
JobStatus.RUNNING,
seaTunnelServer
.getCoordinatorService()
.getJobStatus(Long.parseLong(jobId))));

String parameters = "{" + "\"jobId\":" + jobId + "," + "\"isStopWithSavePoint\":true}";

given().body(parameters)
.post(
HOST
+ hazelcastInstance
.getCluster()
.getLocalMember()
.getAddress()
.getPort()
+ RestConstant.STOP_JOB_URL)
.then()
.statusCode(200)
.body("jobId", equalTo(jobId));

Awaitility.await()
.atMost(6, TimeUnit.MINUTES)
.untilAsserted(
() ->
Assertions.assertEquals(
JobStatus.FINISHED,
seaTunnelServer
.getCoordinatorService()
.getJobStatus(Long.parseLong(jobId))));

String jobId2 = submitJob("STREAMING").getBody().jsonPath().getString("jobId");

Awaitility.await()
.atMost(2, TimeUnit.MINUTES)
.untilAsserted(
() ->
Assertions.assertEquals(
JobStatus.RUNNING,
seaTunnelServer
.getCoordinatorService()
.getJobStatus(Long.parseLong(jobId2))));
parameters = "{" + "\"jobId\":" + jobId2 + "," + "\"isStopWithSavePoint\":false}";

given().body(parameters)
.post(
HOST
+ hazelcastInstance
.getCluster()
.getLocalMember()
.getAddress()
.getPort()
+ RestConstant.STOP_JOB_URL)
.then()
.statusCode(200)
.body("jobId", equalTo(jobId2));

Awaitility.await()
.atMost(2, TimeUnit.MINUTES)
.untilAsserted(
() ->
Assertions.assertEquals(
JobStatus.CANCELED,
seaTunnelServer
.getCoordinatorService()
.getJobStatus(Long.parseLong(jobId2))));
}

@AfterEach
void afterClass() {
if (hazelcastInstance != null) {
hazelcastInstance.shutdown();
}
}

private Response submitJob(String jobMode) {
String requestBody =
"{\n"
+ " \"env\": {\n"
+ " \"job.mode\": \"batch\"\n"
+ " \"job.mode\": \""
+ jobMode
+ "\"\n"
+ " },\n"
+ " \"source\": [\n"
+ " {\n"
Expand Down Expand Up @@ -181,32 +295,6 @@ public void testSubmitJob() {
+ parameters);

response.then().statusCode(200).body("jobName", equalTo("test"));
String jobId = response.getBody().jsonPath().getString("jobId");
SeaTunnelServer seaTunnelServer =
(SeaTunnelServer)
hazelcastInstance
.node
.getNodeExtension()
.createExtensionServices()
.get(Constant.SEATUNNEL_SERVICE_NAME);
JobStatus jobStatus =
seaTunnelServer.getCoordinatorService().getJobStatus(Long.parseLong(jobId));
Assertions.assertEquals(JobStatus.RUNNING, jobStatus);
Awaitility.await()
.atMost(2, TimeUnit.MINUTES)
.untilAsserted(
() ->
Assertions.assertEquals(
JobStatus.FINISHED,
seaTunnelServer
.getCoordinatorService()
.getJobStatus(Long.parseLong(jobId))));
}

@AfterAll
static void afterClass() {
if (hazelcastInstance != null) {
hazelcastInstance.shutdown();
}
return response;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,33 @@

public class RestConstant {

public static final String JOB_ID = "jobId";

public static final String JOB_NAME = "jobName";

public static final String IS_START_WITH_SAVE_POINT = "isStartWithSavePoint";

public static final String IS_STOP_WITH_SAVE_POINT = "isStopWithSavePoint";

public static final String JOB_STATUS = "jobStatus";

public static final String CREATE_TIME = "createTime";

public static final String ENV_OPTIONS = "envOptions";

public static final String JOB_DAG = "jobDag";

public static final String PLUGIN_JARS_URLS = "pluginJarsUrls";

public static final String JAR_PATH = "jarPath";

public static final String METRICS = "metrics";
public static final String RUNNING_JOBS_URL = "/hazelcast/rest/maps/running-jobs";
public static final String RUNNING_JOB_URL = "/hazelcast/rest/maps/running-job";
public static final String SUBMIT_JOB_URL = "/hazelcast/rest/maps/submit-job";

public static final String SYSTEM_MONITORING_INFORMATION =
"/hazelcast/rest/maps/system-monitoring-information";

public static final String STOP_JOB_URL = "/hazelcast/rest/maps/stop-job";
}
Original file line number Diff line number Diff line change
Expand Up @@ -203,8 +203,8 @@ private Map<String, Long> getJobMetrics(String jobMetrics) {
} catch (JsonProcessingException | NullPointerException e) {
return metricsMap;
}
metricsMap.put("sourceReceivedCount", sourceReadCount);
metricsMap.put("sinkWriteCount", sinkWriteCount);
metricsMap.put(SOURCE_RECEIVED_COUNT, sourceReadCount);
metricsMap.put(SINK_WRITE_COUNT, sinkWriteCount);

return metricsMap;
}
Expand Down Expand Up @@ -243,28 +243,33 @@ private JsonObject convertToJson(JobInfo jobInfo, long jobId) {
JobStatus jobStatus = getSeaTunnelServer().getCoordinatorService().getJobStatus(jobId);

jobInfoJson
.add("jobId", String.valueOf(jobId))
.add("jobName", logicalDag.getJobConfig().getName())
.add("jobStatus", jobStatus.toString())
.add("envOptions", JsonUtil.toJsonObject(logicalDag.getJobConfig().getEnvOptions()))
.add(RestConstant.JOB_ID, String.valueOf(jobId))
.add(RestConstant.JOB_NAME, logicalDag.getJobConfig().getName())
.add(RestConstant.JOB_STATUS, jobStatus.toString())
.add(
"createTime",
RestConstant.ENV_OPTIONS,
JsonUtil.toJsonObject(logicalDag.getJobConfig().getEnvOptions()))
.add(
RestConstant.CREATE_TIME,
new SimpleDateFormat("yyyy-MM-dd HH:mm:ss")
.format(new Date(jobImmutableInformation.getCreateTime())))
.add("jobDag", logicalDag.getLogicalDagAsJson())
.add(RestConstant.JOB_DAG, logicalDag.getLogicalDagAsJson())
.add(
"pluginJarsUrls",
RestConstant.PLUGIN_JARS_URLS,
(JsonValue)
jobImmutableInformation.getPluginJarsUrls().stream()
.map(
url -> {
JsonObject jarUrl = new JsonObject();
jarUrl.add("jarPath", url.toString());
jarUrl.add(
RestConstant.JAR_PATH, url.toString());
return jarUrl;
})
.collect(JsonArray::new, JsonArray::add, JsonArray::add))
.add("isStartWithSavePoint", jobImmutableInformation.isStartWithSavePoint())
.add("metrics", JsonUtil.toJsonObject(getJobMetrics(jobMetrics)));
.add(
RestConstant.IS_START_WITH_SAVE_POINT,
jobImmutableInformation.isStartWithSavePoint())
.add(RestConstant.METRICS, JsonUtil.toJsonObject(getJobMetrics(jobMetrics)));

return jobInfoJson;
}
Expand Down
Loading