Skip to content

Commit

Permalink
[Feature] [rest-api] get finished jobs info (apache#5949)
Browse files Browse the repository at this point in the history
---------

Co-authored-by: gdliu3 <[email protected]>
  • Loading branch information
2 people authored and DESKTOP-GHPCOV0\dingaolong committed Dec 19, 2023
1 parent 11ccb00 commit a06bcdb
Show file tree
Hide file tree
Showing 4 changed files with 157 additions and 1 deletion.
32 changes: 32 additions & 0 deletions docs/en/seatunnel-engine/rest-api.md
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,38 @@ network:

------------------------------------------------------------------------------------------

### Return all finished Jobs Info.

<details>
<summary><code>GET</code> <code><b>/hazelcast/rest/maps/finished-jobs-info/:state</b></code> <code>(Return all finished Jobs Info.)</code></summary>

#### Parameters

> | name | type | data type | description |
> |-------|----------|-----------|------------------------------------------------------------------|
> | state | optional | string | finished job status. `FINISHED`,`CANCELED`,`FAILED`,`UNKNOWABLE` |

#### Responses

```json
[
{
"jobId": "",
"jobName": "",
"jobStatus": "",
"errorMsg": null,
"createTime": "",
"finishTime": "",
"jobDag": "",
"metrics": ""
}
]
```

</details>

------------------------------------------------------------------------------------------

### Returns system monitoring information.

<details>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@

import java.util.Arrays;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;

import static io.restassured.RestAssured.given;
import static org.hamcrest.Matchers.equalTo;
Expand Down Expand Up @@ -151,6 +152,7 @@ public void testSystemMonitoringInformation() {

@Test
public void testSubmitJob() {
AtomicInteger i = new AtomicInteger();
Arrays.asList(node2, node1)
.forEach(
instance -> {
Expand Down Expand Up @@ -187,6 +189,30 @@ public void testSubmitJob() {
.getJobStatus(
Long.parseLong(
jobId))));

given().get(
HOST
+ instance.getCluster()
.getLocalMember()
.getAddress()
.getPort()
+ RestConstant.FINISHED_JOBS_INFO
+ "/FINISHED")
.then()
.statusCode(200)
.body("[" + i.get() + "].jobName", equalTo("test测试"))
.body("[" + i.get() + "].errorMsg", equalTo(null))
.body(
"[" + i.get() + "].jobDag.jobId",
equalTo(Long.parseLong(jobId)))
.body(
"[" + i.get() + "].metrics.SourceReceivedCount",
equalTo("100"))
.body(
"[" + i.get() + "].metrics.SinkWriteCount",
equalTo("100"))
.body("[" + i.get() + "].jobStatus", equalTo("FINISHED"));
i.getAndIncrement();
});
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@ public class RestConstant {

public static final String CREATE_TIME = "createTime";

public static final String FINISH_TIME = "finishTime";

public static final String ENV_OPTIONS = "envOptions";

public static final String JOB_DAG = "jobDag";
Expand All @@ -39,9 +41,13 @@ public class RestConstant {

public static final String JAR_PATH = "jarPath";

public static final String ERROR_MSG = "errorMsg";

public static final String METRICS = "metrics";
public static final String RUNNING_JOBS_URL = "/hazelcast/rest/maps/running-jobs";
public static final String RUNNING_JOB_URL = "/hazelcast/rest/maps/running-job";

public static final String FINISHED_JOBS_INFO = "/hazelcast/rest/maps/finished-jobs";
public static final String SUBMIT_JOB_URL = "/hazelcast/rest/maps/submit-job";
public static final String ENCRYPT_CONFIG = "/hazelcast/rest/maps/encrypt-config";

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,14 +21,18 @@
import org.apache.seatunnel.shade.com.fasterxml.jackson.databind.JsonNode;
import org.apache.seatunnel.shade.com.fasterxml.jackson.databind.ObjectMapper;

import org.apache.seatunnel.api.common.metrics.JobMetrics;
import org.apache.seatunnel.common.utils.JsonUtils;
import org.apache.seatunnel.engine.common.Constant;
import org.apache.seatunnel.engine.common.loader.SeaTunnelChildFirstClassLoader;
import org.apache.seatunnel.engine.core.dag.logical.LogicalDag;
import org.apache.seatunnel.engine.core.job.JobDAGInfo;
import org.apache.seatunnel.engine.core.job.JobImmutableInformation;
import org.apache.seatunnel.engine.core.job.JobInfo;
import org.apache.seatunnel.engine.core.job.JobStatus;
import org.apache.seatunnel.engine.server.SeaTunnelServer;
import org.apache.seatunnel.engine.server.log.Log4j2HttpGetCommandProcessor;
import org.apache.seatunnel.engine.server.master.JobHistoryService.JobState;
import org.apache.seatunnel.engine.server.operation.GetClusterHealthMetricsOperation;
import org.apache.seatunnel.engine.server.utils.NodeEngineUtil;

Expand All @@ -41,6 +45,7 @@
import com.hazelcast.internal.ascii.TextCommandService;
import com.hazelcast.internal.ascii.rest.HttpCommandProcessor;
import com.hazelcast.internal.ascii.rest.HttpGetCommand;
import com.hazelcast.internal.json.Json;
import com.hazelcast.internal.json.JsonArray;
import com.hazelcast.internal.json.JsonObject;
import com.hazelcast.internal.json.JsonValue;
Expand All @@ -52,13 +57,15 @@

import java.text.SimpleDateFormat;
import java.util.Arrays;
import java.util.Comparator;
import java.util.Date;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ExecutionException;

import static com.hazelcast.internal.ascii.rest.HttpStatusCode.SC_500;
import static org.apache.seatunnel.engine.server.rest.RestConstant.FINISHED_JOBS_INFO;
import static org.apache.seatunnel.engine.server.rest.RestConstant.RUNNING_JOBS_URL;
import static org.apache.seatunnel.engine.server.rest.RestConstant.RUNNING_JOB_URL;
import static org.apache.seatunnel.engine.server.rest.RestConstant.SYSTEM_MONITORING_INFORMATION;
Expand Down Expand Up @@ -92,6 +99,8 @@ public void handle(HttpGetCommand httpGetCommand) {
try {
if (uri.startsWith(RUNNING_JOBS_URL)) {
handleRunningJobsInfo(httpGetCommand);
} else if (uri.startsWith(FINISHED_JOBS_INFO)) {
handleFinishedJobsInfo(httpGetCommand, uri);
} else if (uri.startsWith(RUNNING_JOB_URL)) {
handleJobInfoById(httpGetCommand, uri);
} else if (uri.startsWith(SYSTEM_MONITORING_INFORMATION)) {
Expand Down Expand Up @@ -167,6 +176,67 @@ private void handleRunningJobsInfo(HttpGetCommand command) {
this.prepareResponse(command, jobs);
}

private void handleFinishedJobsInfo(HttpGetCommand command, String uri) {

uri = StringUtil.stripTrailingSlash(uri);
int indexEnd = uri.indexOf('/', URI_MAPS.length());
String state = uri.substring(indexEnd + 1);

IMap<Long, JobState> finishedJob =
this.textCommandService
.getNode()
.getNodeEngine()
.getHazelcastInstance()
.getMap(Constant.IMAP_FINISHED_JOB_STATE);

IMap<Long, JobMetrics> finishedJobMetrics =
this.textCommandService
.getNode()
.getNodeEngine()
.getHazelcastInstance()
.getMap(Constant.IMAP_FINISHED_JOB_METRICS);

IMap<Long, JobDAGInfo> finishedJobDAGInfo =
this.textCommandService
.getNode()
.getNodeEngine()
.getHazelcastInstance()
.getMap(Constant.IMAP_FINISHED_JOB_VERTEX_INFO);

JsonArray jobs =
finishedJob.values().stream()
.filter(
jobState -> {
if (state.isEmpty()) {
return true;
}
return jobState.getJobStatus()
.name()
.equals(state.toUpperCase());
})
.sorted(Comparator.comparing(JobState::getFinishTime))
.map(
jobState -> {
Long jobId = jobState.getJobId();
String jobMetrics =
getSeaTunnelServer()
.getCoordinatorService()
.getJobMetrics(jobId)
.toJsonString();
JobDAGInfo jobDAGInfo = finishedJobDAGInfo.get(jobId);

return convertToJson(
jobState,
jobMetrics,
Json.parse(JsonUtils.toJsonString(jobDAGInfo))
.asObject(),
jobId);
})
.collect(JsonArray::new, JsonArray::add, JsonArray::add);

this.prepareResponse(command, jobs);
}

private void handleJobInfoById(HttpGetCommand command, String uri) {
uri = StringUtil.stripTrailingSlash(uri);
int indexEnd = uri.indexOf('/', URI_MAPS.length());
Expand All @@ -181,7 +251,7 @@ private void handleJobInfoById(HttpGetCommand command, String uri) {
.getMap(Constant.IMAP_RUNNING_JOB_INFO)
.get(Long.valueOf(jobId));

if (!"".equals(jobId) && jobInfo != null) {
if (!jobId.isEmpty() && jobInfo != null) {
this.prepareResponse(command, convertToJson(jobInfo, Long.parseLong(jobId)));
} else {
this.prepareResponse(command, new JsonObject());
Expand Down Expand Up @@ -293,4 +363,26 @@ private JsonObject convertToJson(JobInfo jobInfo, long jobId) {

return jobInfoJson;
}

private JsonObject convertToJson(
JobState jobState, String jobMetrics, JsonObject jobDAGInfo, long jobId) {
JsonObject jobInfoJson = new JsonObject();
jobInfoJson
.add(RestConstant.JOB_ID, String.valueOf(jobId))
.add(RestConstant.JOB_NAME, jobState.getJobName())
.add(RestConstant.JOB_STATUS, jobState.getJobStatus().toString())
.add(RestConstant.ERROR_MSG, jobState.getErrorMessage())
.add(
RestConstant.CREATE_TIME,
new SimpleDateFormat("yyyy-MM-dd HH:mm:ss")
.format(new Date(jobState.getSubmitTime())))
.add(
RestConstant.FINISH_TIME,
new SimpleDateFormat("yyyy-MM-dd HH:mm:ss")
.format(new Date(jobState.getFinishTime())))
.add(RestConstant.JOB_DAG, jobDAGInfo)
.add(RestConstant.METRICS, JsonUtil.toJsonObject(getJobMetrics(jobMetrics)));

return jobInfoJson;
}
}

0 comments on commit a06bcdb

Please sign in to comment.