From 48439cd8b8738242d2ef1de569acb7a5a96ff6f0 Mon Sep 17 00:00:00 2001 From: Fangzhibin <2535030577@qq.com> Date: Sun, 16 Jul 2023 11:40:41 +0800 Subject: [PATCH 1/3] [Feature][Zeta][REST-API]Add REST API To Submit Job --- .../core/starter/utils/ConfigBuilder.java | 6 + .../seatunnel/engine/e2e/RestApiIT.java | 62 ++++++- .../parse/MultipleTableJobConfigParser.java | 13 ++ .../engine/server/NodeExtension.java | 2 + .../server/checkpoint/CheckpointPlan.java | 3 +- .../job/JobImmutableInformationEnv.java | 160 ++++++++++++++++++ .../engine/server/rest/RestConstant.java | 1 + .../rest/RestHttpPostCommandProcessor.java | 134 +++++++++++++++ .../engine/server/utils/RestUtil.java | 65 +++++++ 9 files changed, 444 insertions(+), 2 deletions(-) create mode 100644 seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/job/JobImmutableInformationEnv.java create mode 100644 seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/RestHttpPostCommandProcessor.java create mode 100644 seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/utils/RestUtil.java diff --git a/seatunnel-core/seatunnel-core-starter/src/main/java/org/apache/seatunnel/core/starter/utils/ConfigBuilder.java b/seatunnel-core/seatunnel-core-starter/src/main/java/org/apache/seatunnel/core/starter/utils/ConfigBuilder.java index ed66b550a04..ad063acac8a 100644 --- a/seatunnel-core/seatunnel-core-starter/src/main/java/org/apache/seatunnel/core/starter/utils/ConfigBuilder.java +++ b/seatunnel-core/seatunnel-core-starter/src/main/java/org/apache/seatunnel/core/starter/utils/ConfigBuilder.java @@ -69,6 +69,12 @@ public static Config of(@NonNull Path filePath) { return config; } + public static Config of(@NonNull Map objectMap) { + log.info("Loading config file from objectMap"); + Config config = ConfigFactory.parseMap(objectMap); + return ConfigShadeUtils.decryptConfig(config); + } + public static Config of(@NonNull ConfigAdapter configAdapter, @NonNull Path filePath) { log.info("With config adapter spi {}", configAdapter.getClass().getName()); try { diff --git a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/RestApiIT.java b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/RestApiIT.java index 5f4e97ac8d5..9ac1251565c 100644 --- a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/RestApiIT.java +++ b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/RestApiIT.java @@ -17,15 +17,19 @@ package org.apache.seatunnel.engine.e2e; +import io.restassured.response.Response; import org.apache.seatunnel.common.config.Common; import org.apache.seatunnel.common.config.DeployMode; import org.apache.seatunnel.engine.client.SeaTunnelClient; import org.apache.seatunnel.engine.client.job.ClientJobProxy; import org.apache.seatunnel.engine.client.job.JobExecutionEnvironment; +import org.apache.seatunnel.engine.common.Constant; import org.apache.seatunnel.engine.common.config.ConfigProvider; import org.apache.seatunnel.engine.common.config.JobConfig; import org.apache.seatunnel.engine.common.config.SeaTunnelConfig; +import org.apache.seatunnel.engine.core.job.JobInfo; import org.apache.seatunnel.engine.core.job.JobStatus; +import org.apache.seatunnel.engine.server.SeaTunnelServer; import org.apache.seatunnel.engine.server.SeaTunnelServerStarter; import org.apache.seatunnel.engine.server.rest.RestConstant; @@ -39,6 +43,8 @@ import com.hazelcast.instance.impl.HazelcastInstanceImpl; import lombok.extern.slf4j.Slf4j; +import java.util.HashMap; +import java.util.Map; import java.util.concurrent.TimeUnit; import static io.restassured.RestAssured.given; @@ -114,7 +120,6 @@ public void testGetRunningJobs() { .body("[0].jobName", equalTo("fake_to_file")) .body("[0].jobStatus", equalTo("RUNNING")); } - @Test public void testSystemMonitoringInformation() { given().get( @@ -131,6 +136,61 @@ public void testSystemMonitoringInformation() { .statusCode(200); } + @Test + public void testSubmitJob(){ + String requestBody = "{\n" + + " \"env\": {\n" + + " \"job.mode\": \"batch\"\n" + + " },\n" + + " \"source\": [\n" + + " {\n" + + " \"plugin_name\": \"FakeSource\",\n" + + " \"result_table_name\": \"fake\",\n" + + " \"row.num\": 100,\n" + + " \"schema\": {\n" + + " \"fields\": {\n" + + " \"name\": \"string\",\n" + + " \"age\": \"int\",\n" + + " \"card\": \"int\"\n" + + " }\n" + + " }\n" + + " }\n" + + " ],\n" + + " \"transform\": [\n" + + " ],\n" + + " \"sink\": [\n" + + " {\n" + + " \"plugin_name\": \"Console\",\n" + + " \"source_table_name\": [\"fake\"]\n" + + " }\n" + + " ]\n" + + "}"; + String parameters = "jobId=1&jobName=test&isStartWithSavePoint=false"; + //Only jobName is compared because jobId is randomly generated if isStartWithSavePoint is false + Response response = given().body(requestBody).post( + HOST + + hazelcastInstance + .getCluster() + .getLocalMember() + .getAddress() + .getPort() + + RestConstant.SUBMIT_JOB_URL + + "?" + + parameters); + + response.then() + .statusCode(200) + .body("jobName", equalTo("test")); + String jobId = response.getBody().jsonPath().getString("jobId"); + JobInfo jobInfo = + (JobInfo)hazelcastInstance.getMap(Constant.IMAP_RUNNING_JOB_INFO) + .get(Long.valueOf(jobId)); + SeaTunnelServer seaTunnelServer = (SeaTunnelServer)hazelcastInstance.node.getNodeExtension().createExtensionServices().get(Constant.SEATUNNEL_SERVICE_NAME); + JobStatus jobStatus = seaTunnelServer.getCoordinatorService().getJobStatus(Long.parseLong(jobId)); + Assertions.assertEquals( + JobStatus.RUNNING, jobStatus); + + } @AfterAll static void afterClass() { if (hazelcastInstance != null) { diff --git a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/parse/MultipleTableJobConfigParser.java b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/parse/MultipleTableJobConfigParser.java index 86c0f3c94f5..748056186c9 100644 --- a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/parse/MultipleTableJobConfigParser.java +++ b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/parse/MultipleTableJobConfigParser.java @@ -130,6 +130,19 @@ public MultipleTableJobConfigParser( new JobConfigParser(idGenerator, commonPluginJars, isStartWithSavePoint); } + public MultipleTableJobConfigParser( + Config seaTunnelJobConfig, + IdGenerator idGenerator, + JobConfig jobConfig, + List commonPluginJars) { + this.idGenerator = idGenerator; + this.jobConfig = jobConfig; + this.commonPluginJars = commonPluginJars; + this.seaTunnelJobConfig = seaTunnelJobConfig; + this.envOptions = ReadonlyConfig.fromConfig(seaTunnelJobConfig.getConfig("env")); + this.fallbackParser = new JobConfigParser(idGenerator, commonPluginJars); + } + public ImmutablePair, Set> parse() { List sourceConfigs = TypesafeConfigUtils.getConfigList( diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/NodeExtension.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/NodeExtension.java index d4137955c8b..37e00cffab2 100644 --- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/NodeExtension.java +++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/NodeExtension.java @@ -21,6 +21,7 @@ import org.apache.seatunnel.engine.server.log.Log4j2HttpGetCommandProcessor; import org.apache.seatunnel.engine.server.log.Log4j2HttpPostCommandProcessor; import org.apache.seatunnel.engine.server.rest.RestHttpGetCommandProcessor; +import org.apache.seatunnel.engine.server.rest.RestHttpPostCommandProcessor; import com.hazelcast.cluster.ClusterState; import com.hazelcast.instance.impl.DefaultNodeExtension; @@ -79,6 +80,7 @@ public TextCommandService createTextCommandService() { register(HTTP_GET, new Log4j2HttpGetCommandProcessor(this)); register(HTTP_POST, new Log4j2HttpPostCommandProcessor(this)); register(HTTP_GET, new RestHttpGetCommandProcessor(this)); + register(HTTP_POST, new RestHttpPostCommandProcessor(this)); } }; } diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointPlan.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointPlan.java index 80b04df54a2..a9fcd52e97d 100644 --- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointPlan.java +++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointPlan.java @@ -28,6 +28,7 @@ import java.util.Map; import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CopyOnWriteArraySet; /** checkpoint plan info */ @@ -63,7 +64,7 @@ public class CheckpointPlan { public static final class Builder { private final Set pipelineSubtasks = new CopyOnWriteArraySet<>(); private final Set startingSubtasks = new CopyOnWriteArraySet<>(); - private final Map pipelineActions = new CopyOnWriteHashMap<>(); + private final Map pipelineActions = new ConcurrentHashMap<>(); private final Map>> subtaskActions = new CopyOnWriteHashMap<>(); diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/job/JobImmutableInformationEnv.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/job/JobImmutableInformationEnv.java new file mode 100644 index 00000000000..cf3ee92b714 --- /dev/null +++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/job/JobImmutableInformationEnv.java @@ -0,0 +1,160 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.engine.server.job; + +import org.apache.seatunnel.shade.com.typesafe.config.Config; + +import org.apache.seatunnel.api.common.JobContext; +import org.apache.seatunnel.api.env.EnvCommonOptions; +import org.apache.seatunnel.common.config.Common; +import org.apache.seatunnel.common.utils.FileUtils; +import org.apache.seatunnel.engine.common.Constant; +import org.apache.seatunnel.engine.common.config.JobConfig; +import org.apache.seatunnel.engine.common.exception.SeaTunnelEngineException; +import org.apache.seatunnel.engine.common.utils.IdGenerator; +import org.apache.seatunnel.engine.core.dag.actions.Action; +import org.apache.seatunnel.engine.core.dag.logical.LogicalDag; +import org.apache.seatunnel.engine.core.dag.logical.LogicalDagGenerator; +import org.apache.seatunnel.engine.core.job.JobImmutableInformation; +import org.apache.seatunnel.engine.core.parse.MultipleTableJobConfigParser; + +import org.apache.commons.lang3.tuple.ImmutablePair; + +import com.hazelcast.instance.impl.Node; +import com.hazelcast.logging.ILogger; +import com.hazelcast.logging.Logger; +import com.hazelcast.spi.impl.NodeEngineImpl; + +import java.io.IOException; +import java.net.MalformedURLException; +import java.net.URL; +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import java.util.stream.Collectors; + +public class JobImmutableInformationEnv { + private static final ILogger LOGGER = Logger.getLogger(JobImmutableInformationEnv.class); + + private final boolean isStartWithSavePoint; + + private final JobConfig jobConfig; + + private final List actions = new ArrayList<>(); + + private final Set jarUrls = new HashSet<>(); + + private final List commonPluginJars = new ArrayList<>(); + + private final Config seaTunnelJobConfig; + + private final IdGenerator idGenerator; + + private final NodeEngineImpl nodeEngine; + + private final Long jobId; + + public JobImmutableInformationEnv( + JobConfig jobConfig, + Config seaTunnelJobConfig, + Node node, + boolean isStartWithSavePoint, + Long jobId) { + this.jobConfig = jobConfig; + this.isStartWithSavePoint = isStartWithSavePoint; + this.idGenerator = new IdGenerator(); + this.seaTunnelJobConfig = seaTunnelJobConfig; + this.nodeEngine = node.getNodeEngine(); + this.jobConfig.setJobContext( + new JobContext( + isStartWithSavePoint + ? jobId + : nodeEngine + .getHazelcastInstance() + .getFlakeIdGenerator(Constant.SEATUNNEL_ID_GENERATOR_NAME) + .newId())); + this.jobId = Long.valueOf(jobConfig.getJobContext().getJobId()); + this.commonPluginJars.addAll(searchPluginJars()); + this.commonPluginJars.addAll( + new ArrayList<>( + Common.getThirdPartyJars( + jobConfig + .getEnvOptions() + .getOrDefault(EnvCommonOptions.JARS.key(), "") + .toString()) + .stream() + .map(Path::toUri) + .map( + uri -> { + try { + return uri.toURL(); + } catch (MalformedURLException e) { + throw new SeaTunnelEngineException( + "the uri of jar illegal:" + uri, e); + } + }) + .collect(Collectors.toList()))); + LOGGER.info("add common jar in plugins :" + commonPluginJars); + } + + public Long getJobId() { + return jobId; + } + + private Set searchPluginJars() { + try { + if (Files.exists(Common.pluginRootDir())) { + return new HashSet<>(FileUtils.searchJarFiles(Common.pluginRootDir())); + } + } catch (IOException | SeaTunnelEngineException e) { + LOGGER.warning( + String.format("Can't search plugin jars in %s.", Common.pluginRootDir()), e); + } + return Collections.emptySet(); + } + + private MultipleTableJobConfigParser getJobConfigParser() { + return new MultipleTableJobConfigParser( + seaTunnelJobConfig, idGenerator, jobConfig, commonPluginJars); + } + + private LogicalDagGenerator getLogicalDagGenerator() { + return new LogicalDagGenerator(actions, jobConfig, idGenerator); + } + + private LogicalDag getLogicalDag() { + ImmutablePair, Set> immutablePair = getJobConfigParser().parse(); + actions.addAll(immutablePair.getLeft()); + jarUrls.addAll(immutablePair.getRight()); + return getLogicalDagGenerator().generate(); + } + + public JobImmutableInformation build() { + return new JobImmutableInformation( + Long.parseLong(jobConfig.getJobContext().getJobId()), + jobConfig.getName(), + isStartWithSavePoint, + nodeEngine.getSerializationService().toData(getLogicalDag()), + jobConfig, + new ArrayList<>(jarUrls)); + } +} diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/RestConstant.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/RestConstant.java index 0a5d8437be3..7776d592b8f 100644 --- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/RestConstant.java +++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/RestConstant.java @@ -21,6 +21,7 @@ public class RestConstant { public static final String RUNNING_JOBS_URL = "/hazelcast/rest/maps/running-jobs"; public static final String RUNNING_JOB_URL = "/hazelcast/rest/maps/running-job"; + public static final String SUBMIT_JOB_URL = "/hazelcast/rest/maps/submit-job"; public static final String SYSTEM_MONITORING_INFORMATION = "/hazelcast/rest/maps/system-monitoring-information"; diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/RestHttpPostCommandProcessor.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/RestHttpPostCommandProcessor.java new file mode 100644 index 00000000000..bc6f7fb5bd5 --- /dev/null +++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/RestHttpPostCommandProcessor.java @@ -0,0 +1,134 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.engine.server.rest; + +import org.apache.seatunnel.engine.common.utils.PassiveCompletableFuture; +import org.apache.seatunnel.shade.com.fasterxml.jackson.databind.JsonNode; +import org.apache.seatunnel.shade.com.typesafe.config.Config; + +import org.apache.seatunnel.engine.common.Constant; +import org.apache.seatunnel.engine.common.config.JobConfig; +import org.apache.seatunnel.engine.core.job.JobImmutableInformation; +import org.apache.seatunnel.engine.server.CoordinatorService; +import org.apache.seatunnel.engine.server.SeaTunnelServer; +import org.apache.seatunnel.engine.server.job.JobImmutableInformationEnv; +import org.apache.seatunnel.engine.server.log.Log4j2HttpPostCommandProcessor; +import org.apache.seatunnel.engine.server.utils.RestUtil; + +import com.hazelcast.internal.ascii.TextCommandService; +import com.hazelcast.internal.ascii.rest.HttpCommandProcessor; +import com.hazelcast.internal.ascii.rest.HttpPostCommand; +import com.hazelcast.internal.json.JsonObject; +import com.hazelcast.internal.serialization.Data; + +import java.io.IOException; +import java.util.HashMap; +import java.util.Map; + +import static com.hazelcast.internal.ascii.rest.HttpStatusCode.SC_400; +import static com.hazelcast.internal.ascii.rest.HttpStatusCode.SC_500; +import static org.apache.seatunnel.engine.server.rest.RestConstant.SUBMIT_JOB_URL; + +public class RestHttpPostCommandProcessor extends HttpCommandProcessor { + private final Log4j2HttpPostCommandProcessor original; + + public RestHttpPostCommandProcessor(TextCommandService textCommandService) { + this(textCommandService, new Log4j2HttpPostCommandProcessor(textCommandService)); + } + + protected RestHttpPostCommandProcessor( + TextCommandService textCommandService, + Log4j2HttpPostCommandProcessor log4j2HttpPostCommandProcessor) { + super( + textCommandService, + textCommandService.getNode().getLogger(Log4j2HttpPostCommandProcessor.class)); + this.original = log4j2HttpPostCommandProcessor; + } + + @Override + public void handle(HttpPostCommand httpPostCommand) { + String uri = httpPostCommand.getURI(); + try { + if (uri.startsWith(SUBMIT_JOB_URL)) { + handleSubmitJob(httpPostCommand, uri); + } else { + original.handle(httpPostCommand); + } + } catch (IllegalArgumentException e) { + prepareResponse(SC_400, httpPostCommand, exceptionResponse(e)); + } catch (Throwable e) { + logger.warning("An error occurred while handling request " + httpPostCommand, e); + prepareResponse(SC_500, httpPostCommand, exceptionResponse(e)); + } + + this.textCommandService.sendResponse(httpPostCommand); + } + + private SeaTunnelServer getSeaTunnelServer() { + Map extensionServices = + this.textCommandService.getNode().getNodeExtension().createExtensionServices(); + return (SeaTunnelServer) extensionServices.get(Constant.SEATUNNEL_SERVICE_NAME); + } + + private void handleSubmitJob(HttpPostCommand httpPostCommand, String uri) + throws IllegalArgumentException { + Map requestParams = new HashMap<>(); + RestUtil.buildRequestParams(requestParams, uri); + byte[] requestBody = httpPostCommand.getData(); + if (requestBody.length == 0) { + throw new IllegalArgumentException("Request body is empty."); + } + JsonNode requestBodyJsonNode; + try { + requestBodyJsonNode = RestUtil.convertByteToJsonNode(requestBody); + } catch (IOException e) { + throw new IllegalArgumentException("Invalid JSON format in request body."); + } + Config config = RestUtil.buildConfig(requestBodyJsonNode); + JobConfig jobConfig = new JobConfig(); + jobConfig.setName(requestParams.get("jobName")); + JobImmutableInformationEnv jobImmutableInformationEnv = + new JobImmutableInformationEnv( + jobConfig, + config, + textCommandService.getNode(), + Boolean.parseBoolean(requestParams.get("isStartWithSavePoint")), + Long.parseLong(requestParams.get("jobId"))); + JobImmutableInformation jobImmutableInformation = jobImmutableInformationEnv.build(); + CoordinatorService coordinatorService = getSeaTunnelServer().getCoordinatorService(); + Data data = + textCommandService + .getNode() + .nodeEngine + .getSerializationService() + .toData(jobImmutableInformation); + PassiveCompletableFuture voidPassiveCompletableFuture = + coordinatorService.submitJob(Long.parseLong(jobConfig.getJobContext().getJobId()), data); + voidPassiveCompletableFuture.join(); + + Long jobId = jobImmutableInformationEnv.getJobId(); + this.prepareResponse(httpPostCommand, new JsonObject() + .add("jobId", jobId) + .add("jobName", requestParams.get("jobName"))); + } + + @Override + public void handleRejection(HttpPostCommand httpPostCommand) { + handle(httpPostCommand); + } +} diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/utils/RestUtil.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/utils/RestUtil.java new file mode 100644 index 00000000000..d3761366d09 --- /dev/null +++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/utils/RestUtil.java @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.engine.server.utils; + +import org.apache.seatunnel.shade.com.fasterxml.jackson.databind.JsonNode; +import org.apache.seatunnel.shade.com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.seatunnel.shade.com.typesafe.config.Config; + +import org.apache.seatunnel.common.Constants; +import org.apache.seatunnel.common.utils.JsonUtils; +import org.apache.seatunnel.core.starter.utils.ConfigBuilder; + +import com.hazelcast.internal.util.StringUtil; + +import java.io.IOException; +import java.util.Map; + +public class RestUtil { + private RestUtil() {} + + private static final ObjectMapper objectMapper = new ObjectMapper(); + + public static JsonNode convertByteToJsonNode(byte[] byteData) throws IOException { + return objectMapper.readTree(byteData); + } + + public static void buildRequestParams(Map requestParams, String uri) { + requestParams.put("jobId", null); + requestParams.put("jobName", Constants.LOGO); + requestParams.put("isStartWithSavePoint", String.valueOf(false)); + uri = StringUtil.stripTrailingSlash(uri); + if (!uri.contains("?")) { + return; + } + int indexEnd = uri.indexOf('?'); + try { + for (String s : uri.substring(indexEnd + 1).split("&")) { + String[] param = s.split("="); + requestParams.put(param[0], param[1]); + } + } catch (IndexOutOfBoundsException e) { + throw new IllegalArgumentException("Invalid Params format in Params."); + } + } + + public static Config buildConfig(JsonNode jsonNode) { + Map objectMap = JsonUtils.toMap(jsonNode); + return ConfigBuilder.of(objectMap); + } +} From 6c7ffe854a6cc03591185534dbc12359ddb7a451 Mon Sep 17 00:00:00 2001 From: Fangzhibin <2535030577@qq.com> Date: Tue, 18 Jul 2023 15:29:57 +0800 Subject: [PATCH 2/3] add doc for this feature --- docs/en/seatunnel-engine/rest-api.md | 58 +++++++++ .../seatunnel/engine/e2e/RestApiIT.java | 122 ++++++++++-------- .../server/checkpoint/CheckpointPlan.java | 3 +- .../rest/RestHttpPostCommandProcessor.java | 11 +- 4 files changed, 132 insertions(+), 62 deletions(-) diff --git a/docs/en/seatunnel-engine/rest-api.md b/docs/en/seatunnel-engine/rest-api.md index 2edec3496ad..2f44421a3d6 100644 --- a/docs/en/seatunnel-engine/rest-api.md +++ b/docs/en/seatunnel-engine/rest-api.md @@ -180,3 +180,61 @@ network: ------------------------------------------------------------------------------------------ +### Submit Job. + +
+POST /hazelcast/rest/maps/submit-job (Returns jobId and jobName if job submitted successfully.) + +#### Parameters + +> | name | type | data type | description | +> |----------------------|----------|-----------|-----------------------------------| +> | jobId | optional | string | job id | +> | jobName | optional | string | job name | +> | isStartWithSavePoint | optional | string | if job is started with save point | + +#### Body + +```json +{ + "env": { + "job.mode": "batch" + }, + "source": [ + { + "plugin_name": "FakeSource", + "result_table_name": "fake", + "row.num": 100, + "schema": { + "fields": { + "name": "string", + "age": "int", + "card": "int" + } + } + } + ], + "transform": [ + ], + "sink": [ + { + "plugin_name": "Console", + "source_table_name": ["fake"] + } + ] +} +``` + +#### Responses + +```json +{ + "jobId": 733584788375666689, + "jobName": "rest_api_test" +} +``` + +
+ +------------------------------------------------------------------------------------------ + diff --git a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/RestApiIT.java b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/RestApiIT.java index 9ac1251565c..d38d1c732f1 100644 --- a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/RestApiIT.java +++ b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/RestApiIT.java @@ -17,7 +17,6 @@ package org.apache.seatunnel.engine.e2e; -import io.restassured.response.Response; import org.apache.seatunnel.common.config.Common; import org.apache.seatunnel.common.config.DeployMode; import org.apache.seatunnel.engine.client.SeaTunnelClient; @@ -27,7 +26,6 @@ import org.apache.seatunnel.engine.common.config.ConfigProvider; import org.apache.seatunnel.engine.common.config.JobConfig; import org.apache.seatunnel.engine.common.config.SeaTunnelConfig; -import org.apache.seatunnel.engine.core.job.JobInfo; import org.apache.seatunnel.engine.core.job.JobStatus; import org.apache.seatunnel.engine.server.SeaTunnelServer; import org.apache.seatunnel.engine.server.SeaTunnelServerStarter; @@ -41,10 +39,9 @@ import com.hazelcast.client.config.ClientConfig; import com.hazelcast.instance.impl.HazelcastInstanceImpl; +import io.restassured.response.Response; import lombok.extern.slf4j.Slf4j; -import java.util.HashMap; -import java.util.Map; import java.util.concurrent.TimeUnit; import static io.restassured.RestAssured.given; @@ -120,6 +117,7 @@ public void testGetRunningJobs() { .body("[0].jobName", equalTo("fake_to_file")) .body("[0].jobStatus", equalTo("RUNNING")); } + @Test public void testSystemMonitoringInformation() { given().get( @@ -137,60 +135,74 @@ public void testSystemMonitoringInformation() { } @Test - public void testSubmitJob(){ - String requestBody = "{\n" + - " \"env\": {\n" + - " \"job.mode\": \"batch\"\n" + - " },\n" + - " \"source\": [\n" + - " {\n" + - " \"plugin_name\": \"FakeSource\",\n" + - " \"result_table_name\": \"fake\",\n" + - " \"row.num\": 100,\n" + - " \"schema\": {\n" + - " \"fields\": {\n" + - " \"name\": \"string\",\n" + - " \"age\": \"int\",\n" + - " \"card\": \"int\"\n" + - " }\n" + - " }\n" + - " }\n" + - " ],\n" + - " \"transform\": [\n" + - " ],\n" + - " \"sink\": [\n" + - " {\n" + - " \"plugin_name\": \"Console\",\n" + - " \"source_table_name\": [\"fake\"]\n" + - " }\n" + - " ]\n" + - "}"; + public void testSubmitJob() { + String requestBody = + "{\n" + + " \"env\": {\n" + + " \"job.mode\": \"batch\"\n" + + " },\n" + + " \"source\": [\n" + + " {\n" + + " \"plugin_name\": \"FakeSource\",\n" + + " \"result_table_name\": \"fake\",\n" + + " \"row.num\": 100,\n" + + " \"schema\": {\n" + + " \"fields\": {\n" + + " \"name\": \"string\",\n" + + " \"age\": \"int\",\n" + + " \"card\": \"int\"\n" + + " }\n" + + " }\n" + + " }\n" + + " ],\n" + + " \"transform\": [\n" + + " ],\n" + + " \"sink\": [\n" + + " {\n" + + " \"plugin_name\": \"Console\",\n" + + " \"source_table_name\": [\"fake\"]\n" + + " }\n" + + " ]\n" + + "}"; String parameters = "jobId=1&jobName=test&isStartWithSavePoint=false"; - //Only jobName is compared because jobId is randomly generated if isStartWithSavePoint is false - Response response = given().body(requestBody).post( - HOST - + hazelcastInstance - .getCluster() - .getLocalMember() - .getAddress() - .getPort() - + RestConstant.SUBMIT_JOB_URL - + "?" - + parameters); - - response.then() - .statusCode(200) - .body("jobName", equalTo("test")); + // Only jobName is compared because jobId is randomly generated if isStartWithSavePoint is + // false + Response response = + given().body(requestBody) + .post( + HOST + + hazelcastInstance + .getCluster() + .getLocalMember() + .getAddress() + .getPort() + + RestConstant.SUBMIT_JOB_URL + + "?" + + parameters); + + response.then().statusCode(200).body("jobName", equalTo("test")); String jobId = response.getBody().jsonPath().getString("jobId"); - JobInfo jobInfo = - (JobInfo)hazelcastInstance.getMap(Constant.IMAP_RUNNING_JOB_INFO) - .get(Long.valueOf(jobId)); - SeaTunnelServer seaTunnelServer = (SeaTunnelServer)hazelcastInstance.node.getNodeExtension().createExtensionServices().get(Constant.SEATUNNEL_SERVICE_NAME); - JobStatus jobStatus = seaTunnelServer.getCoordinatorService().getJobStatus(Long.parseLong(jobId)); - Assertions.assertEquals( - JobStatus.RUNNING, jobStatus); - + SeaTunnelServer seaTunnelServer = + (SeaTunnelServer) + hazelcastInstance + .node + .getNodeExtension() + .createExtensionServices() + .get(Constant.SEATUNNEL_SERVICE_NAME); + JobStatus jobStatus = + seaTunnelServer.getCoordinatorService().getJobStatus(Long.parseLong(jobId)); + Assertions.assertEquals(JobStatus.RUNNING, jobStatus); + Awaitility.await() + .atMost(2, TimeUnit.MINUTES) + .untilAsserted( + () -> + Assertions.assertEquals( + JobStatus.FINISHED, + seaTunnelServer + .getCoordinatorService() + .getJobStatus(Long.parseLong(jobId)))); } + @AfterAll static void afterClass() { if (hazelcastInstance != null) { diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointPlan.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointPlan.java index a9fcd52e97d..80b04df54a2 100644 --- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointPlan.java +++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointPlan.java @@ -28,7 +28,6 @@ import java.util.Map; import java.util.Set; -import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CopyOnWriteArraySet; /** checkpoint plan info */ @@ -64,7 +63,7 @@ public class CheckpointPlan { public static final class Builder { private final Set pipelineSubtasks = new CopyOnWriteArraySet<>(); private final Set startingSubtasks = new CopyOnWriteArraySet<>(); - private final Map pipelineActions = new ConcurrentHashMap<>(); + private final Map pipelineActions = new CopyOnWriteHashMap<>(); private final Map>> subtaskActions = new CopyOnWriteHashMap<>(); diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/RestHttpPostCommandProcessor.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/RestHttpPostCommandProcessor.java index bc6f7fb5bd5..e0edd932032 100644 --- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/RestHttpPostCommandProcessor.java +++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/RestHttpPostCommandProcessor.java @@ -17,12 +17,12 @@ package org.apache.seatunnel.engine.server.rest; -import org.apache.seatunnel.engine.common.utils.PassiveCompletableFuture; import org.apache.seatunnel.shade.com.fasterxml.jackson.databind.JsonNode; import org.apache.seatunnel.shade.com.typesafe.config.Config; import org.apache.seatunnel.engine.common.Constant; import org.apache.seatunnel.engine.common.config.JobConfig; +import org.apache.seatunnel.engine.common.utils.PassiveCompletableFuture; import org.apache.seatunnel.engine.core.job.JobImmutableInformation; import org.apache.seatunnel.engine.server.CoordinatorService; import org.apache.seatunnel.engine.server.SeaTunnelServer; @@ -118,13 +118,14 @@ private void handleSubmitJob(HttpPostCommand httpPostCommand, String uri) .getSerializationService() .toData(jobImmutableInformation); PassiveCompletableFuture voidPassiveCompletableFuture = - coordinatorService.submitJob(Long.parseLong(jobConfig.getJobContext().getJobId()), data); + coordinatorService.submitJob( + Long.parseLong(jobConfig.getJobContext().getJobId()), data); voidPassiveCompletableFuture.join(); Long jobId = jobImmutableInformationEnv.getJobId(); - this.prepareResponse(httpPostCommand, new JsonObject() - .add("jobId", jobId) - .add("jobName", requestParams.get("jobName"))); + this.prepareResponse( + httpPostCommand, + new JsonObject().add("jobId", jobId).add("jobName", requestParams.get("jobName"))); } @Override From a2a6958ae56db6e811f1f781fe54d9183b3a5d21 Mon Sep 17 00:00:00 2001 From: Fangzhibin <2535030577@qq.com> Date: Fri, 4 Aug 2023 16:10:48 +0800 Subject: [PATCH 3/3] refactor abstract AbstractJobEnvironment for JobImmutableInformationEnv and JobExecutionEnvironment --- .../client/job/JobExecutionEnvironment.java | 91 +------------- .../core/job/AbstractJobEnvironment.java | 114 ++++++++++++++++++ .../parse/MultipleTableJobConfigParser.java | 7 +- .../job/JobImmutableInformationEnv.java | 92 +------------- 4 files changed, 130 insertions(+), 174 deletions(-) create mode 100644 seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/job/AbstractJobEnvironment.java diff --git a/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/job/JobExecutionEnvironment.java b/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/job/JobExecutionEnvironment.java index bf3169e4c80..3f870c61216 100644 --- a/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/job/JobExecutionEnvironment.java +++ b/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/job/JobExecutionEnvironment.java @@ -18,55 +18,19 @@ package org.apache.seatunnel.engine.client.job; import org.apache.seatunnel.api.common.JobContext; -import org.apache.seatunnel.api.env.EnvCommonOptions; -import org.apache.seatunnel.common.config.Common; -import org.apache.seatunnel.common.utils.FileUtils; import org.apache.seatunnel.engine.client.SeaTunnelHazelcastClient; import org.apache.seatunnel.engine.common.config.JobConfig; -import org.apache.seatunnel.engine.common.exception.SeaTunnelEngineException; -import org.apache.seatunnel.engine.common.utils.IdGenerator; -import org.apache.seatunnel.engine.core.dag.actions.Action; -import org.apache.seatunnel.engine.core.dag.logical.LogicalDag; -import org.apache.seatunnel.engine.core.dag.logical.LogicalDagGenerator; +import org.apache.seatunnel.engine.core.job.AbstractJobEnvironment; import org.apache.seatunnel.engine.core.job.JobImmutableInformation; import org.apache.seatunnel.engine.core.parse.MultipleTableJobConfigParser; -import org.apache.commons.lang3.tuple.ImmutablePair; - -import com.hazelcast.logging.ILogger; -import com.hazelcast.logging.Logger; - -import java.io.IOException; -import java.net.MalformedURLException; -import java.net.URL; -import java.nio.file.Files; -import java.nio.file.Path; import java.util.ArrayList; -import java.util.Collections; -import java.util.HashSet; -import java.util.List; -import java.util.Set; import java.util.concurrent.ExecutionException; -import java.util.stream.Collectors; - -public class JobExecutionEnvironment { - - private static final ILogger LOGGER = Logger.getLogger(JobExecutionEnvironment.class); - - private final boolean isStartWithSavePoint; - - private final JobConfig jobConfig; - - private final List actions = new ArrayList<>(); - - private final Set jarUrls = new HashSet<>(); - private final List commonPluginJars = new ArrayList<>(); +public class JobExecutionEnvironment extends AbstractJobEnvironment { private final String jobFilePath; - private final IdGenerator idGenerator; - private final SeaTunnelHazelcastClient seaTunnelHazelcastClient; private final JobClient jobClient; @@ -78,35 +42,12 @@ public JobExecutionEnvironment( SeaTunnelHazelcastClient seaTunnelHazelcastClient, boolean isStartWithSavePoint, Long jobId) { - this.jobConfig = jobConfig; + super(jobConfig, isStartWithSavePoint); this.jobFilePath = jobFilePath; - this.idGenerator = new IdGenerator(); this.seaTunnelHazelcastClient = seaTunnelHazelcastClient; this.jobClient = new JobClient(seaTunnelHazelcastClient); - this.isStartWithSavePoint = isStartWithSavePoint; this.jobConfig.setJobContext( new JobContext(isStartWithSavePoint ? jobId : jobClient.getNewJobId())); - this.commonPluginJars.addAll(searchPluginJars()); - this.commonPluginJars.addAll( - new ArrayList<>( - Common.getThirdPartyJars( - jobConfig - .getEnvOptions() - .getOrDefault(EnvCommonOptions.JARS.key(), "") - .toString()) - .stream() - .map(Path::toUri) - .map( - uri -> { - try { - return uri.toURL(); - } catch (MalformedURLException e) { - throw new SeaTunnelEngineException( - "the uri of jar illegal:" + uri, e); - } - }) - .collect(Collectors.toList()))); - LOGGER.info("add common jar in plugins :" + commonPluginJars); } public JobExecutionEnvironment( @@ -117,27 +58,12 @@ public JobExecutionEnvironment( } /** Search all jars in SEATUNNEL_HOME/plugins */ - private Set searchPluginJars() { - try { - if (Files.exists(Common.pluginRootDir())) { - return new HashSet<>(FileUtils.searchJarFiles(Common.pluginRootDir())); - } - } catch (IOException | SeaTunnelEngineException e) { - LOGGER.warning( - String.format("Can't search plugin jars in %s.", Common.pluginRootDir()), e); - } - return Collections.emptySet(); - } - - private MultipleTableJobConfigParser getJobConfigParser() { + @Override + protected MultipleTableJobConfigParser getJobConfigParser() { return new MultipleTableJobConfigParser( jobFilePath, idGenerator, jobConfig, commonPluginJars, isStartWithSavePoint); } - private LogicalDagGenerator getLogicalDagGenerator() { - return new LogicalDagGenerator(actions, jobConfig, idGenerator); - } - public ClientJobProxy execute() throws ExecutionException, InterruptedException { JobImmutableInformation jobImmutableInformation = new JobImmutableInformation( @@ -150,11 +76,4 @@ public ClientJobProxy execute() throws ExecutionException, InterruptedException return jobClient.createJobProxy(jobImmutableInformation); } - - private LogicalDag getLogicalDag() { - ImmutablePair, Set> immutablePair = getJobConfigParser().parse(); - actions.addAll(immutablePair.getLeft()); - jarUrls.addAll(immutablePair.getRight()); - return getLogicalDagGenerator().generate(); - } } diff --git a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/job/AbstractJobEnvironment.java b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/job/AbstractJobEnvironment.java new file mode 100644 index 00000000000..3509903c088 --- /dev/null +++ b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/job/AbstractJobEnvironment.java @@ -0,0 +1,114 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.engine.core.job; + +import org.apache.seatunnel.api.env.EnvCommonOptions; +import org.apache.seatunnel.common.config.Common; +import org.apache.seatunnel.common.utils.FileUtils; +import org.apache.seatunnel.engine.common.config.JobConfig; +import org.apache.seatunnel.engine.common.exception.SeaTunnelEngineException; +import org.apache.seatunnel.engine.common.utils.IdGenerator; +import org.apache.seatunnel.engine.core.dag.actions.Action; +import org.apache.seatunnel.engine.core.dag.logical.LogicalDag; +import org.apache.seatunnel.engine.core.dag.logical.LogicalDagGenerator; +import org.apache.seatunnel.engine.core.parse.MultipleTableJobConfigParser; + +import org.apache.commons.lang3.tuple.ImmutablePair; + +import com.hazelcast.logging.ILogger; +import com.hazelcast.logging.Logger; + +import java.io.IOException; +import java.net.MalformedURLException; +import java.net.URL; +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import java.util.stream.Collectors; + +public abstract class AbstractJobEnvironment { + protected static ILogger LOGGER = null; + + protected final boolean isStartWithSavePoint; + + protected final List actions = new ArrayList<>(); + protected final Set jarUrls = new HashSet<>(); + + protected final JobConfig jobConfig; + + protected final IdGenerator idGenerator; + + protected final List commonPluginJars = new ArrayList<>(); + + public AbstractJobEnvironment(JobConfig jobConfig, boolean isStartWithSavePoint) { + LOGGER = Logger.getLogger(getClass().getName()); + this.jobConfig = jobConfig; + this.isStartWithSavePoint = isStartWithSavePoint; + this.idGenerator = new IdGenerator(); + this.commonPluginJars.addAll(searchPluginJars()); + this.commonPluginJars.addAll( + new ArrayList<>( + Common.getThirdPartyJars( + jobConfig + .getEnvOptions() + .getOrDefault(EnvCommonOptions.JARS.key(), "") + .toString()) + .stream() + .map(Path::toUri) + .map( + uri -> { + try { + return uri.toURL(); + } catch (MalformedURLException e) { + throw new SeaTunnelEngineException( + "the uri of jar illegal:" + uri, e); + } + }) + .collect(Collectors.toList()))); + LOGGER.info("add common jar in plugins :" + commonPluginJars); + } + + protected Set searchPluginJars() { + try { + if (Files.exists(Common.pluginRootDir())) { + return new HashSet<>(FileUtils.searchJarFiles(Common.pluginRootDir())); + } + } catch (IOException | SeaTunnelEngineException e) { + LOGGER.warning( + String.format("Can't search plugin jars in %s.", Common.pluginRootDir()), e); + } + return Collections.emptySet(); + } + + protected abstract MultipleTableJobConfigParser getJobConfigParser(); + + protected LogicalDagGenerator getLogicalDagGenerator() { + return new LogicalDagGenerator(actions, jobConfig, idGenerator); + } + + protected LogicalDag getLogicalDag() { + ImmutablePair, Set> immutablePair = getJobConfigParser().parse(); + actions.addAll(immutablePair.getLeft()); + jarUrls.addAll(immutablePair.getRight()); + return getLogicalDagGenerator().generate(); + } +} diff --git a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/parse/MultipleTableJobConfigParser.java b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/parse/MultipleTableJobConfigParser.java index 748056186c9..ee2505286f8 100644 --- a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/parse/MultipleTableJobConfigParser.java +++ b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/parse/MultipleTableJobConfigParser.java @@ -134,13 +134,16 @@ public MultipleTableJobConfigParser( Config seaTunnelJobConfig, IdGenerator idGenerator, JobConfig jobConfig, - List commonPluginJars) { + List commonPluginJars, + boolean isStartWithSavePoint) { this.idGenerator = idGenerator; this.jobConfig = jobConfig; this.commonPluginJars = commonPluginJars; + this.isStartWithSavePoint = isStartWithSavePoint; this.seaTunnelJobConfig = seaTunnelJobConfig; this.envOptions = ReadonlyConfig.fromConfig(seaTunnelJobConfig.getConfig("env")); - this.fallbackParser = new JobConfigParser(idGenerator, commonPluginJars); + this.fallbackParser = + new JobConfigParser(idGenerator, commonPluginJars, isStartWithSavePoint); } public ImmutablePair, Set> parse() { diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/job/JobImmutableInformationEnv.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/job/JobImmutableInformationEnv.java index cf3ee92b714..4dd72e31cb8 100644 --- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/job/JobImmutableInformationEnv.java +++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/job/JobImmutableInformationEnv.java @@ -20,55 +20,20 @@ import org.apache.seatunnel.shade.com.typesafe.config.Config; import org.apache.seatunnel.api.common.JobContext; -import org.apache.seatunnel.api.env.EnvCommonOptions; -import org.apache.seatunnel.common.config.Common; -import org.apache.seatunnel.common.utils.FileUtils; import org.apache.seatunnel.engine.common.Constant; import org.apache.seatunnel.engine.common.config.JobConfig; -import org.apache.seatunnel.engine.common.exception.SeaTunnelEngineException; -import org.apache.seatunnel.engine.common.utils.IdGenerator; -import org.apache.seatunnel.engine.core.dag.actions.Action; -import org.apache.seatunnel.engine.core.dag.logical.LogicalDag; -import org.apache.seatunnel.engine.core.dag.logical.LogicalDagGenerator; +import org.apache.seatunnel.engine.core.job.AbstractJobEnvironment; import org.apache.seatunnel.engine.core.job.JobImmutableInformation; import org.apache.seatunnel.engine.core.parse.MultipleTableJobConfigParser; -import org.apache.commons.lang3.tuple.ImmutablePair; - import com.hazelcast.instance.impl.Node; -import com.hazelcast.logging.ILogger; -import com.hazelcast.logging.Logger; import com.hazelcast.spi.impl.NodeEngineImpl; -import java.io.IOException; -import java.net.MalformedURLException; -import java.net.URL; -import java.nio.file.Files; -import java.nio.file.Path; import java.util.ArrayList; -import java.util.Collections; -import java.util.HashSet; -import java.util.List; -import java.util.Set; -import java.util.stream.Collectors; - -public class JobImmutableInformationEnv { - private static final ILogger LOGGER = Logger.getLogger(JobImmutableInformationEnv.class); - - private final boolean isStartWithSavePoint; - - private final JobConfig jobConfig; - - private final List actions = new ArrayList<>(); - - private final Set jarUrls = new HashSet<>(); - - private final List commonPluginJars = new ArrayList<>(); +public class JobImmutableInformationEnv extends AbstractJobEnvironment { private final Config seaTunnelJobConfig; - private final IdGenerator idGenerator; - private final NodeEngineImpl nodeEngine; private final Long jobId; @@ -79,9 +44,7 @@ public JobImmutableInformationEnv( Node node, boolean isStartWithSavePoint, Long jobId) { - this.jobConfig = jobConfig; - this.isStartWithSavePoint = isStartWithSavePoint; - this.idGenerator = new IdGenerator(); + super(jobConfig, isStartWithSavePoint); this.seaTunnelJobConfig = seaTunnelJobConfig; this.nodeEngine = node.getNodeEngine(); this.jobConfig.setJobContext( @@ -93,59 +56,16 @@ public JobImmutableInformationEnv( .getFlakeIdGenerator(Constant.SEATUNNEL_ID_GENERATOR_NAME) .newId())); this.jobId = Long.valueOf(jobConfig.getJobContext().getJobId()); - this.commonPluginJars.addAll(searchPluginJars()); - this.commonPluginJars.addAll( - new ArrayList<>( - Common.getThirdPartyJars( - jobConfig - .getEnvOptions() - .getOrDefault(EnvCommonOptions.JARS.key(), "") - .toString()) - .stream() - .map(Path::toUri) - .map( - uri -> { - try { - return uri.toURL(); - } catch (MalformedURLException e) { - throw new SeaTunnelEngineException( - "the uri of jar illegal:" + uri, e); - } - }) - .collect(Collectors.toList()))); - LOGGER.info("add common jar in plugins :" + commonPluginJars); } public Long getJobId() { return jobId; } - private Set searchPluginJars() { - try { - if (Files.exists(Common.pluginRootDir())) { - return new HashSet<>(FileUtils.searchJarFiles(Common.pluginRootDir())); - } - } catch (IOException | SeaTunnelEngineException e) { - LOGGER.warning( - String.format("Can't search plugin jars in %s.", Common.pluginRootDir()), e); - } - return Collections.emptySet(); - } - - private MultipleTableJobConfigParser getJobConfigParser() { + @Override + protected MultipleTableJobConfigParser getJobConfigParser() { return new MultipleTableJobConfigParser( - seaTunnelJobConfig, idGenerator, jobConfig, commonPluginJars); - } - - private LogicalDagGenerator getLogicalDagGenerator() { - return new LogicalDagGenerator(actions, jobConfig, idGenerator); - } - - private LogicalDag getLogicalDag() { - ImmutablePair, Set> immutablePair = getJobConfigParser().parse(); - actions.addAll(immutablePair.getLeft()); - jarUrls.addAll(immutablePair.getRight()); - return getLogicalDagGenerator().generate(); + seaTunnelJobConfig, idGenerator, jobConfig, commonPluginJars, isStartWithSavePoint); } public JobImmutableInformation build() {