Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Improve] Change class name of AbstractJobEnvironment implement #5671

Merged
merged 1 commit into from
Oct 25, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,8 @@
import org.apache.seatunnel.core.starter.seatunnel.args.ClientCommandArgs;
import org.apache.seatunnel.core.starter.utils.FileUtils;
import org.apache.seatunnel.engine.client.SeaTunnelClient;
import org.apache.seatunnel.engine.client.job.ClientJobExecutionEnvironment;
import org.apache.seatunnel.engine.client.job.ClientJobProxy;
import org.apache.seatunnel.engine.client.job.JobExecutionEnvironment;
import org.apache.seatunnel.engine.client.job.JobMetricsRunner;
import org.apache.seatunnel.engine.common.Constant;
import org.apache.seatunnel.engine.common.config.ConfigProvider;
Expand Down Expand Up @@ -123,7 +123,7 @@ public void execute() throws CommandExecuteException {
Path configFile = FileUtils.getConfigPath(clientCommandArgs);
checkConfigExist(configFile);
JobConfig jobConfig = new JobConfig();
JobExecutionEnvironment jobExecutionEnv;
ClientJobExecutionEnvironment jobExecutionEnv;
jobConfig.setName(clientCommandArgs.getJobName());
if (null != clientCommandArgs.getRestoreJobId()) {
jobExecutionEnv =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,8 @@
import org.apache.seatunnel.common.utils.ExceptionUtils;
import org.apache.seatunnel.common.utils.FileUtils;
import org.apache.seatunnel.engine.client.SeaTunnelClient;
import org.apache.seatunnel.engine.client.job.ClientJobExecutionEnvironment;
import org.apache.seatunnel.engine.client.job.ClientJobProxy;
import org.apache.seatunnel.engine.client.job.JobExecutionEnvironment;
import org.apache.seatunnel.engine.common.config.ConfigProvider;
import org.apache.seatunnel.engine.common.config.JobConfig;
import org.apache.seatunnel.engine.common.config.SeaTunnelConfig;
Expand Down Expand Up @@ -108,7 +108,7 @@ public void testBatchJobRunOkIn2Node() throws ExecutionException, InterruptedExc
ClientConfig clientConfig = ConfigProvider.locateAndGetClientConfig();
clientConfig.setClusterName(TestUtils.getClusterName(testClusterName));
engineClient = new SeaTunnelClient(clientConfig);
JobExecutionEnvironment jobExecutionEnv =
ClientJobExecutionEnvironment jobExecutionEnv =
engineClient.createExecutionContext(testResources.getRight(), jobConfig);
ClientJobProxy clientJobProxy = jobExecutionEnv.execute();

Expand Down Expand Up @@ -230,7 +230,7 @@ public void testStreamJobRunOkIn2Node() throws ExecutionException, InterruptedEx
ClientConfig clientConfig = ConfigProvider.locateAndGetClientConfig();
clientConfig.setClusterName(TestUtils.getClusterName(testClusterName));
engineClient = new SeaTunnelClient(clientConfig);
JobExecutionEnvironment jobExecutionEnv =
ClientJobExecutionEnvironment jobExecutionEnv =
engineClient.createExecutionContext(testResources.getRight(), jobConfig);
ClientJobProxy clientJobProxy = jobExecutionEnv.execute();

Expand Down Expand Up @@ -325,7 +325,7 @@ public void testBatchJobRestoreIn2NodeWorkerDown()
ClientConfig clientConfig = ConfigProvider.locateAndGetClientConfig();
clientConfig.setClusterName(TestUtils.getClusterName(testClusterName));
engineClient = new SeaTunnelClient(clientConfig);
JobExecutionEnvironment jobExecutionEnv =
ClientJobExecutionEnvironment jobExecutionEnv =
engineClient.createExecutionContext(testResources.getRight(), jobConfig);
ClientJobProxy clientJobProxy = jobExecutionEnv.execute();

Expand Down Expand Up @@ -422,7 +422,7 @@ public void testStreamJobRestoreIn2NodeWorkerDown()
ClientConfig clientConfig = ConfigProvider.locateAndGetClientConfig();
clientConfig.setClusterName(TestUtils.getClusterName(testClusterName));
engineClient = new SeaTunnelClient(clientConfig);
JobExecutionEnvironment jobExecutionEnv =
ClientJobExecutionEnvironment jobExecutionEnv =
engineClient.createExecutionContext(testResources.getRight(), jobConfig);
ClientJobProxy clientJobProxy = jobExecutionEnv.execute();

Expand Down Expand Up @@ -540,7 +540,7 @@ public void testBatchJobRestoreIn2NodeMasterDown()
ClientConfig clientConfig = ConfigProvider.locateAndGetClientConfig();
clientConfig.setClusterName(TestUtils.getClusterName(testClusterName));
engineClient = new SeaTunnelClient(clientConfig);
JobExecutionEnvironment jobExecutionEnv =
ClientJobExecutionEnvironment jobExecutionEnv =
engineClient.createExecutionContext(testResources.getRight(), jobConfig);
ClientJobProxy clientJobProxy = jobExecutionEnv.execute();

Expand Down Expand Up @@ -642,7 +642,7 @@ public void testStreamJobRestoreIn2NodeMasterDown()
ClientConfig clientConfig = ConfigProvider.locateAndGetClientConfig();
clientConfig.setClusterName(TestUtils.getClusterName(testClusterName));
engineClient = new SeaTunnelClient(clientConfig);
JobExecutionEnvironment jobExecutionEnv =
ClientJobExecutionEnvironment jobExecutionEnv =
engineClient.createExecutionContext(testResources.getRight(), jobConfig);
ClientJobProxy clientJobProxy = jobExecutionEnv.execute();

Expand Down Expand Up @@ -810,7 +810,7 @@ public void testStreamJobRestoreInAllNodeDown()
ClientConfig clientConfig = ConfigProvider.locateAndGetClientConfig();
clientConfig.setClusterName(TestUtils.getClusterName(testClusterName));
engineClient = new SeaTunnelClient(clientConfig);
JobExecutionEnvironment jobExecutionEnv =
ClientJobExecutionEnvironment jobExecutionEnv =
engineClient.createExecutionContext(testResources.getRight(), jobConfig);
ClientJobProxy clientJobProxy = jobExecutionEnv.execute();
Long jobId = clientJobProxy.getJobId();
Expand Down Expand Up @@ -1028,7 +1028,7 @@ public void testStreamJobRestoreFromOssInAllNodeDown()
ClientConfig clientConfig = ConfigProvider.locateAndGetClientConfig();
clientConfig.setClusterName(TestUtils.getClusterName(testClusterName));
engineClient = new SeaTunnelClient(clientConfig);
JobExecutionEnvironment jobExecutionEnv =
ClientJobExecutionEnvironment jobExecutionEnv =
engineClient.createExecutionContext(testResources.getRight(), jobConfig);
ClientJobProxy clientJobProxy = jobExecutionEnv.execute();
Long jobId = clientJobProxy.getJobId();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,8 @@
import org.apache.seatunnel.common.constants.JobMode;
import org.apache.seatunnel.common.utils.FileUtils;
import org.apache.seatunnel.engine.client.SeaTunnelClient;
import org.apache.seatunnel.engine.client.job.ClientJobExecutionEnvironment;
import org.apache.seatunnel.engine.client.job.ClientJobProxy;
import org.apache.seatunnel.engine.client.job.JobExecutionEnvironment;
import org.apache.seatunnel.engine.common.config.ConfigProvider;
import org.apache.seatunnel.engine.common.config.JobConfig;
import org.apache.seatunnel.engine.common.config.SeaTunnelConfig;
Expand Down Expand Up @@ -115,7 +115,7 @@ public void testTwoPipelineBatchJobRunOkIn2Node()
ClientConfig clientConfig = ConfigProvider.locateAndGetClientConfig();
clientConfig.setClusterName(TestUtils.getClusterName(testClusterName));
engineClient = new SeaTunnelClient(clientConfig);
JobExecutionEnvironment jobExecutionEnv =
ClientJobExecutionEnvironment jobExecutionEnv =
engineClient.createExecutionContext(testResources.getRight(), jobConfig);
ClientJobProxy clientJobProxy = jobExecutionEnv.execute();

Expand Down Expand Up @@ -243,7 +243,7 @@ public void testTwoPipelineStreamJobRunOkIn2Node()
ClientConfig clientConfig = ConfigProvider.locateAndGetClientConfig();
clientConfig.setClusterName(TestUtils.getClusterName(testClusterName));
engineClient = new SeaTunnelClient(clientConfig);
JobExecutionEnvironment jobExecutionEnv =
ClientJobExecutionEnvironment jobExecutionEnv =
engineClient.createExecutionContext(testResources.getRight(), jobConfig);
ClientJobProxy clientJobProxy = jobExecutionEnv.execute();

Expand Down Expand Up @@ -341,7 +341,7 @@ public void testTwoPipelineBatchJobRestoreIn2NodeWorkerDown()
ClientConfig clientConfig = ConfigProvider.locateAndGetClientConfig();
clientConfig.setClusterName(TestUtils.getClusterName(testClusterName));
engineClient = new SeaTunnelClient(clientConfig);
JobExecutionEnvironment jobExecutionEnv =
ClientJobExecutionEnvironment jobExecutionEnv =
engineClient.createExecutionContext(testResources.getRight(), jobConfig);
ClientJobProxy clientJobProxy = jobExecutionEnv.execute();

Expand Down Expand Up @@ -456,7 +456,7 @@ public void testTwoPipelineStreamJobRestoreIn2NodeWorkerDown()
ClientConfig clientConfig = ConfigProvider.locateAndGetClientConfig();
clientConfig.setClusterName(TestUtils.getClusterName(testClusterName));
engineClient = new SeaTunnelClient(clientConfig);
JobExecutionEnvironment jobExecutionEnv =
ClientJobExecutionEnvironment jobExecutionEnv =
engineClient.createExecutionContext(testResources.getRight(), jobConfig);
ClientJobProxy clientJobProxy = jobExecutionEnv.execute();

Expand Down Expand Up @@ -586,7 +586,7 @@ public void testTwoPipelineBatchJobRestoreIn2NodeMasterDown()
ClientConfig clientConfig = ConfigProvider.locateAndGetClientConfig();
clientConfig.setClusterName(TestUtils.getClusterName(testClusterName));
engineClient = new SeaTunnelClient(clientConfig);
JobExecutionEnvironment jobExecutionEnv =
ClientJobExecutionEnvironment jobExecutionEnv =
engineClient.createExecutionContext(testResources.getRight(), jobConfig);
ClientJobProxy clientJobProxy = jobExecutionEnv.execute();

Expand Down Expand Up @@ -696,7 +696,7 @@ public void testTwoPipelineStreamJobRestoreIn2NodeMasterDown()
ClientConfig clientConfig = ConfigProvider.locateAndGetClientConfig();
clientConfig.setClusterName(TestUtils.getClusterName(testClusterName));
engineClient = new SeaTunnelClient(clientConfig);
JobExecutionEnvironment jobExecutionEnv =
ClientJobExecutionEnvironment jobExecutionEnv =
engineClient.createExecutionContext(testResources.getRight(), jobConfig);
ClientJobProxy clientJobProxy = jobExecutionEnv.execute();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,8 @@
import org.apache.seatunnel.common.config.Common;
import org.apache.seatunnel.common.config.DeployMode;
import org.apache.seatunnel.engine.client.SeaTunnelClient;
import org.apache.seatunnel.engine.client.job.ClientJobExecutionEnvironment;
import org.apache.seatunnel.engine.client.job.ClientJobProxy;
import org.apache.seatunnel.engine.client.job.JobExecutionEnvironment;
import org.apache.seatunnel.engine.common.config.ConfigProvider;
import org.apache.seatunnel.engine.common.config.JobConfig;
import org.apache.seatunnel.engine.core.job.JobResult;
Expand Down Expand Up @@ -76,7 +76,7 @@ public void testExecuteJob() throws Exception {
ClientConfig clientConfig = ConfigProvider.locateAndGetClientConfig();
clientConfig.setClusterName(TestUtils.getClusterName("JobExecutionIT"));
SeaTunnelClient engineClient = new SeaTunnelClient(clientConfig);
JobExecutionEnvironment jobExecutionEnv =
ClientJobExecutionEnvironment jobExecutionEnv =
engineClient.createExecutionContext(filePath, jobConfig);

final ClientJobProxy clientJobProxy = jobExecutionEnv.execute();
Expand All @@ -103,7 +103,7 @@ public void cancelJobTest() throws Exception {
ClientConfig clientConfig = ConfigProvider.locateAndGetClientConfig();
clientConfig.setClusterName(TestUtils.getClusterName("JobExecutionIT"));
SeaTunnelClient engineClient = new SeaTunnelClient(clientConfig);
JobExecutionEnvironment jobExecutionEnv =
ClientJobExecutionEnvironment jobExecutionEnv =
engineClient.createExecutionContext(filePath, jobConfig);

final ClientJobProxy clientJobProxy = jobExecutionEnv.execute();
Expand Down Expand Up @@ -132,7 +132,7 @@ public void testGetErrorInfo() throws ExecutionException, InterruptedException {
ClientConfig clientConfig = ConfigProvider.locateAndGetClientConfig();
clientConfig.setClusterName(TestUtils.getClusterName("JobExecutionIT"));
SeaTunnelClient engineClient = new SeaTunnelClient(clientConfig);
JobExecutionEnvironment jobExecutionEnv =
ClientJobExecutionEnvironment jobExecutionEnv =
engineClient.createExecutionContext(filePath, jobConfig);
final ClientJobProxy clientJobProxy = jobExecutionEnv.execute();
CompletableFuture<JobStatus> completableFuture =
Expand Down Expand Up @@ -177,7 +177,7 @@ public void testExpiredJobWasDeleted() throws Exception {
ClientConfig clientConfig = ConfigProvider.locateAndGetClientConfig();
clientConfig.setClusterName(TestUtils.getClusterName("JobExecutionIT"));
SeaTunnelClient engineClient = new SeaTunnelClient(clientConfig);
JobExecutionEnvironment jobExecutionEnv =
ClientJobExecutionEnvironment jobExecutionEnv =
engineClient.createExecutionContext(filePath, jobConfig);

final ClientJobProxy clientJobProxy = jobExecutionEnv.execute();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,8 @@
import org.apache.seatunnel.common.config.Common;
import org.apache.seatunnel.common.config.DeployMode;
import org.apache.seatunnel.engine.client.SeaTunnelClient;
import org.apache.seatunnel.engine.client.job.ClientJobExecutionEnvironment;
import org.apache.seatunnel.engine.client.job.ClientJobProxy;
import org.apache.seatunnel.engine.client.job.JobExecutionEnvironment;
import org.apache.seatunnel.engine.common.Constant;
import org.apache.seatunnel.engine.common.config.ConfigProvider;
import org.apache.seatunnel.engine.common.config.JobConfig;
Expand Down Expand Up @@ -71,7 +71,7 @@ void beforeClass() throws Exception {
ClientConfig clientConfig = ConfigProvider.locateAndGetClientConfig();
clientConfig.setClusterName(testClusterName);
SeaTunnelClient engineClient = new SeaTunnelClient(clientConfig);
JobExecutionEnvironment jobExecutionEnv =
ClientJobExecutionEnvironment jobExecutionEnv =
engineClient.createExecutionContext(filePath, jobConfig);

clientJobProxy = jobExecutionEnv.execute();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,8 @@
import org.apache.seatunnel.common.utils.FileUtils;
import org.apache.seatunnel.connectors.seatunnel.file.config.BaseSinkConfig;
import org.apache.seatunnel.engine.client.SeaTunnelClient;
import org.apache.seatunnel.engine.client.job.ClientJobExecutionEnvironment;
import org.apache.seatunnel.engine.client.job.ClientJobProxy;
import org.apache.seatunnel.engine.client.job.JobExecutionEnvironment;
import org.apache.seatunnel.engine.common.config.ConfigProvider;
import org.apache.seatunnel.engine.common.config.JobConfig;
import org.apache.seatunnel.engine.common.config.SeaTunnelConfig;
Expand Down Expand Up @@ -130,7 +130,7 @@ public void enableWriteHeader(String file_format_type, String headerWrite, Strin
ClientConfig clientConfig = ConfigProvider.locateAndGetClientConfig();
clientConfig.setClusterName(TestUtils.getClusterName(testClusterName));
engineClient = new SeaTunnelClient(clientConfig);
JobExecutionEnvironment jobExecutionEnv =
ClientJobExecutionEnvironment jobExecutionEnv =
engineClient.createExecutionContext(testResources.getRight(), jobConfig);
ClientJobProxy clientJobProxy = jobExecutionEnv.execute();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,8 @@
package org.apache.seatunnel.engine.client;

import org.apache.seatunnel.common.utils.JsonUtils;
import org.apache.seatunnel.engine.client.job.ClientJobExecutionEnvironment;
import org.apache.seatunnel.engine.client.job.JobClient;
import org.apache.seatunnel.engine.client.job.JobExecutionEnvironment;
import org.apache.seatunnel.engine.client.job.JobMetricsRunner.JobMetricsSummary;
import org.apache.seatunnel.engine.common.config.JobConfig;
import org.apache.seatunnel.engine.core.job.JobDAGInfo;
Expand Down Expand Up @@ -48,15 +48,15 @@ public SeaTunnelClient(@NonNull ClientConfig clientConfig) {
}

@Override
public JobExecutionEnvironment createExecutionContext(
public ClientJobExecutionEnvironment createExecutionContext(
@NonNull String filePath, @NonNull JobConfig jobConfig) {
return new JobExecutionEnvironment(jobConfig, filePath, hazelcastClient);
return new ClientJobExecutionEnvironment(jobConfig, filePath, hazelcastClient);
}

@Override
public JobExecutionEnvironment restoreExecutionContext(
public ClientJobExecutionEnvironment restoreExecutionContext(
@NonNull String filePath, @NonNull JobConfig jobConfig, @NonNull Long jobId) {
return new JobExecutionEnvironment(jobConfig, filePath, hazelcastClient, true, jobId);
return new ClientJobExecutionEnvironment(jobConfig, filePath, hazelcastClient, true, jobId);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,18 +17,18 @@

package org.apache.seatunnel.engine.client;

import org.apache.seatunnel.engine.client.job.ClientJobExecutionEnvironment;
import org.apache.seatunnel.engine.client.job.JobClient;
import org.apache.seatunnel.engine.client.job.JobExecutionEnvironment;
import org.apache.seatunnel.engine.common.config.JobConfig;

import lombok.NonNull;

public interface SeaTunnelClientInstance {

JobExecutionEnvironment createExecutionContext(
ClientJobExecutionEnvironment createExecutionContext(
@NonNull String filePath, @NonNull JobConfig config);

JobExecutionEnvironment restoreExecutionContext(
ClientJobExecutionEnvironment restoreExecutionContext(
@NonNull String filePath, @NonNull JobConfig config, @NonNull Long jobId);

JobClient createJobClient();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
import java.util.ArrayList;
import java.util.concurrent.ExecutionException;

public class JobExecutionEnvironment extends AbstractJobEnvironment {
public class ClientJobExecutionEnvironment extends AbstractJobEnvironment {

private final String jobFilePath;

Expand All @@ -36,7 +36,7 @@ public class JobExecutionEnvironment extends AbstractJobEnvironment {
private final JobClient jobClient;

/** If the JobId is not empty, it is used to restore job from savePoint */
public JobExecutionEnvironment(
public ClientJobExecutionEnvironment(
JobConfig jobConfig,
String jobFilePath,
SeaTunnelHazelcastClient seaTunnelHazelcastClient,
Expand All @@ -50,7 +50,7 @@ public JobExecutionEnvironment(
new JobContext(isStartWithSavePoint ? jobId : jobClient.getNewJobId()));
}

public JobExecutionEnvironment(
public ClientJobExecutionEnvironment(
JobConfig jobConfig,
String jobFilePath,
SeaTunnelHazelcastClient seaTunnelHazelcastClient) {
Expand Down
Loading