Skip to content

Commit

Permalink
[Hotfix][Zeta] Fix zeta scheduler bug (apache#6050)
Browse files Browse the repository at this point in the history
  • Loading branch information
EricJoy2048 authored and chaorongzhi committed Aug 21, 2024
1 parent 953120d commit 9dce598
Show file tree
Hide file tree
Showing 9 changed files with 347 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@
import org.apache.seatunnel.engine.common.config.ConfigProvider;
import org.apache.seatunnel.engine.common.config.JobConfig;
import org.apache.seatunnel.engine.common.config.SeaTunnelConfig;
import org.apache.seatunnel.engine.common.exception.SeaTunnelEngineException;
import org.apache.seatunnel.engine.core.job.JobResult;
import org.apache.seatunnel.engine.core.job.JobStatus;
import org.apache.seatunnel.engine.server.SeaTunnelNodeContext;

Expand Down Expand Up @@ -182,7 +184,12 @@ public void execute() throws CommandExecuteException {
seaTunnelConfig.getEngineConfig().getPrintJobMetricsInfoInterval(),
TimeUnit.SECONDS);
// wait for job complete
jobStatus = clientJobProxy.waitForJobComplete();
JobResult jobResult = clientJobProxy.waitForJobCompleteV2();
jobStatus = jobResult.getStatus();
if (StringUtils.isNotEmpty(jobResult.getError())
|| jobResult.getStatus().equals(JobStatus.FAILED)) {
throw new SeaTunnelEngineException(jobResult.getError());
}
// get job end time
endTime = LocalDateTime.now();
// get job statistic information when job finished
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ public class SeaTunnelContainer extends AbstractTestContainer {
private static final String JDK_DOCKER_IMAGE = "openjdk:8";
private static final String CLIENT_SHELL = "seatunnel.sh";
private static final String SERVER_SHELL = "seatunnel-cluster.sh";
private GenericContainer<?> server;
protected GenericContainer<?> server;

@Override
public void startUp() throws Exception {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.seatunnel.engine.e2e;

import org.apache.seatunnel.e2e.common.util.ContainerUtil;

import org.apache.commons.lang3.StringUtils;

import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import org.testcontainers.containers.Container;
import org.testcontainers.containers.GenericContainer;
import org.testcontainers.containers.output.Slf4jLogConsumer;
import org.testcontainers.containers.wait.strategy.Wait;
import org.testcontainers.utility.DockerLoggerFactory;
import org.testcontainers.utility.MountableFile;

import java.io.IOException;
import java.nio.file.Paths;
import java.util.Arrays;

import static org.apache.seatunnel.e2e.common.util.ContainerUtil.PROJECT_ROOT_PATH;

public class JobClientJobProxyIT extends SeaTunnelContainer {
private static final String JDK_DOCKER_IMAGE = "openjdk:8";
private static final String SERVER_SHELL = "seatunnel-cluster.sh";

@Override
@BeforeAll
public void startUp() throws Exception {
this.server =
new GenericContainer<>(getDockerImage())
.withNetwork(NETWORK)
.withCommand(
ContainerUtil.adaptPathForWin(
Paths.get(SEATUNNEL_HOME, "bin", SERVER_SHELL).toString()))
.withNetworkAliases("server")
.withExposedPorts()
.withLogConsumer(
new Slf4jLogConsumer(
DockerLoggerFactory.getLogger(
"seatunnel-engine:" + JDK_DOCKER_IMAGE)))
.waitingFor(Wait.forListeningPort());
copySeaTunnelStarterToContainer(server);
server.setExposedPorts(Arrays.asList(5801));
server.withCopyFileToContainer(
MountableFile.forHostPath(
PROJECT_ROOT_PATH
+ "/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/"),
Paths.get(SEATUNNEL_HOME, "config").toString());

// use seatunnel_fixed_slot_num.yaml replace seatunnel.yaml in container
server.withCopyFileToContainer(
MountableFile.forHostPath(
PROJECT_ROOT_PATH
+ "/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/seatunnel_fixed_slot_num.yaml"),
Paths.get(SEATUNNEL_HOME, "config/seatunnel.yaml").toString());

server.withCopyFileToContainer(
MountableFile.forHostPath(
PROJECT_ROOT_PATH
+ "/seatunnel-shade/seatunnel-hadoop3-3.1.4-uber/target/seatunnel-hadoop3-3.1.4-uber.jar"),
Paths.get(SEATUNNEL_HOME, "lib/seatunnel-hadoop3-3.1.4-uber.jar").toString());
server.start();
// execute extra commands
executeExtraCommands(server);
}

@Test
public void testJobFailedWillThrowException() throws IOException, InterruptedException {
Container.ExecResult execResult = executeSeaTunnelJob("/batch_slot_not_enough.conf");
Assertions.assertNotEquals(0, execResult.getExitCode());
Assertions.assertTrue(
StringUtils.isNotBlank(execResult.getStderr())
&& execResult
.getStderr()
.contains(
"org.apache.seatunnel.engine.server.resourcemanager.NoEnoughResourceException: can't apply resource request"));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,145 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.seatunnel.engine.e2e;

import org.apache.seatunnel.common.config.Common;
import org.apache.seatunnel.common.config.DeployMode;
import org.apache.seatunnel.engine.client.SeaTunnelClient;
import org.apache.seatunnel.engine.client.job.ClientJobExecutionEnvironment;
import org.apache.seatunnel.engine.client.job.ClientJobProxy;
import org.apache.seatunnel.engine.common.config.ConfigProvider;
import org.apache.seatunnel.engine.common.config.JobConfig;
import org.apache.seatunnel.engine.common.config.SeaTunnelConfig;
import org.apache.seatunnel.engine.core.job.JobStatus;
import org.apache.seatunnel.engine.server.SeaTunnelServerStarter;

import org.awaitility.Awaitility;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;

import com.hazelcast.client.config.ClientConfig;
import com.hazelcast.instance.impl.HazelcastInstanceImpl;

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;

public class SeaTunnelSlotIT {
@Test
public void testSlotNotEnough() throws Exception {
HazelcastInstanceImpl node1 = null;
SeaTunnelClient engineClient = null;

try {
String testClusterName = "testSlotNotEnough";
SeaTunnelConfig seaTunnelConfig = ConfigProvider.locateAndGetSeaTunnelConfig();
seaTunnelConfig.getHazelcastConfig().setClusterName(testClusterName);
// slot num is 3
seaTunnelConfig.getEngineConfig().getSlotServiceConfig().setDynamicSlot(false);
seaTunnelConfig.getEngineConfig().getSlotServiceConfig().setSlotNum(3);

node1 = SeaTunnelServerStarter.createHazelcastInstance(seaTunnelConfig);

// client config
Common.setDeployMode(DeployMode.CLIENT);
String filePath = TestUtils.getResource("batch_slot_not_enough.conf");
JobConfig jobConfig = new JobConfig();
jobConfig.setName(testClusterName);

ClientConfig clientConfig = ConfigProvider.locateAndGetClientConfig();
clientConfig.setClusterName(testClusterName);
engineClient = new SeaTunnelClient(clientConfig);
ClientJobExecutionEnvironment jobExecutionEnv =
engineClient.createExecutionContext(filePath, jobConfig, seaTunnelConfig);

final ClientJobProxy clientJobProxy = jobExecutionEnv.execute();
CompletableFuture<JobStatus> objectCompletableFuture =
CompletableFuture.supplyAsync(clientJobProxy::waitForJobComplete);
Awaitility.await()
.atMost(600000, TimeUnit.MILLISECONDS)
.untilAsserted(
() -> {
Thread.sleep(2000);
Assertions.assertTrue(
objectCompletableFuture.isDone()
&& JobStatus.FAILED.equals(
objectCompletableFuture.get()));
});

} finally {
if (engineClient != null) {
engineClient.shutdown();
}

if (node1 != null) {
node1.shutdown();
}
}
}

@Test
public void testSlotEnough() throws Exception {
HazelcastInstanceImpl node1 = null;
SeaTunnelClient engineClient = null;

try {
String testClusterName = "testSlotEnough";
SeaTunnelConfig seaTunnelConfig = ConfigProvider.locateAndGetSeaTunnelConfig();
seaTunnelConfig.getHazelcastConfig().setClusterName(testClusterName);
// slot num is 10
seaTunnelConfig.getEngineConfig().getSlotServiceConfig().setDynamicSlot(false);
seaTunnelConfig.getEngineConfig().getSlotServiceConfig().setSlotNum(10);

node1 = SeaTunnelServerStarter.createHazelcastInstance(seaTunnelConfig);

// client config
Common.setDeployMode(DeployMode.CLIENT);
String filePath = TestUtils.getResource("batch_slot_not_enough.conf");
JobConfig jobConfig = new JobConfig();
jobConfig.setName(testClusterName);

ClientConfig clientConfig = ConfigProvider.locateAndGetClientConfig();
clientConfig.setClusterName(testClusterName);
engineClient = new SeaTunnelClient(clientConfig);
ClientJobExecutionEnvironment jobExecutionEnv =
engineClient.createExecutionContext(filePath, jobConfig, seaTunnelConfig);

final ClientJobProxy clientJobProxy = jobExecutionEnv.execute();
CompletableFuture<JobStatus> objectCompletableFuture =
CompletableFuture.supplyAsync(clientJobProxy::waitForJobComplete);
Awaitility.await()
.atMost(600000, TimeUnit.MILLISECONDS)
.untilAsserted(
() -> {
Thread.sleep(2000);
Assertions.assertTrue(
objectCompletableFuture.isDone()
&& JobStatus.FINISHED.equals(
objectCompletableFuture.get()));
});

} finally {
if (engineClient != null) {
engineClient.shutdown();
}

if (node1 != null) {
node1.shutdown();
}
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
######
###### This config file is a demonstration of streaming processing in seatunnel config
######

env {
# You can set engine configuration here
job.mode = "BATCH"
#execution.checkpoint.data-uri = "hdfs://localhost:9000/checkpoint"
}

source {
# This is a example source plugin **only for test and demonstrate the feature source plugin**
FakeSource {
result_table_name = "fake"
parallelism = 4
schema = {
fields {
name = "string"
age = "int"
}
}
}
}

transform {
}

sink {
console {
source_table_name="fake"
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

seatunnel:
engine:
history-job-expire-minutes: 1
backup-count: 2
queue-type: blockingqueue
print-execution-info-interval: 10
slot-service:
dynamic-slot: false
slot-num: 3
checkpoint:
interval: 300000
timeout: 100000
storage:
type: localfile
max-retained: 3
plugin-config:
namespace: /tmp/seatunnel/checkpoint_snapshot/
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,6 @@
import org.apache.seatunnel.engine.core.protocol.codec.SeaTunnelSubmitJobCodec;
import org.apache.seatunnel.engine.core.protocol.codec.SeaTunnelWaitForJobCompleteCodec;

import org.apache.commons.lang3.StringUtils;

import com.hazelcast.client.impl.protocol.ClientMessage;
import com.hazelcast.logging.ILogger;
import com.hazelcast.logging.Logger;
Expand Down Expand Up @@ -117,10 +115,6 @@ public JobResult waitForJobCompleteV2() {
throw new RuntimeException(e);
}
LOGGER.info(String.format("Job (%s) end with state %s", jobId, jobResult.getStatus()));
if (StringUtils.isNotEmpty(jobResult.getError())
|| jobResult.getStatus().equals(JobStatus.FAILED)) {
throw new SeaTunnelEngineException(jobResult.getError());
}
return jobResult;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ public interface Job {

JobStatus getJobStatus();

@Deprecated
default JobStatus waitForJobComplete() {
return waitForJobCompleteV2().getStatus();
}
Expand Down
Loading

0 comments on commit 9dce598

Please sign in to comment.