Skip to content

Commit

Permalink
[Hotfix][Zeta] Fix taskgroup failed log lost (apache#7241)
Browse files Browse the repository at this point in the history
  • Loading branch information
EricJoy2048 authored Jul 23, 2024
1 parent b0fe432 commit fe0c477
Show file tree
Hide file tree
Showing 5 changed files with 140 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@

import com.google.auto.service.AutoService;

import java.util.List;

import static org.apache.seatunnel.shade.com.google.common.base.Preconditions.checkArgument;

@AutoService(Factory.class)
Expand All @@ -50,6 +52,9 @@ public class InMemorySinkFactory
public static final Option<String> ASSERT_OPTIONS_VALUE =
Options.key("assert_options_value").stringType().noDefaultValue();

public static final Option<List<String>> THROW_RUNTIME_EXCEPTION_LIST =
Options.key("throw_runtime_exception_list").listType().noDefaultValue();

@Override
public String factoryIdentifier() {
return "InMemory";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@ public class InMemorySinkWriter
// use a daemon thread to test classloader leak
private static final Thread THREAD;

private static int restoreCount = -1;

static {
// use the daemon thread to always hold the classloader
ClassLoader classLoader = Thread.currentThread().getContextClassLoader();
Expand Down Expand Up @@ -80,6 +82,12 @@ public void write(SeaTunnelRow element) throws IOException {
if (config.get(InMemorySinkFactory.THROW_OUT_OF_MEMORY)) {
throw new OutOfMemoryError();
}

if (config.getOptional(InMemorySinkFactory.THROW_RUNTIME_EXCEPTION_LIST).isPresent()) {
restoreCount++;
throw new RuntimeException(
config.get(InMemorySinkFactory.THROW_RUNTIME_EXCEPTION_LIST).get(restoreCount));
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,14 @@
package org.apache.seatunnel.engine.e2e;

import org.apache.seatunnel.engine.client.SeaTunnelClient;
import org.apache.seatunnel.engine.client.job.ClientJobExecutionEnvironment;
import org.apache.seatunnel.engine.client.job.ClientJobProxy;
import org.apache.seatunnel.engine.common.config.ConfigProvider;
import org.apache.seatunnel.engine.common.config.JobConfig;
import org.apache.seatunnel.engine.common.config.SeaTunnelConfig;
import org.apache.seatunnel.engine.common.utils.PassiveCompletableFuture;
import org.apache.seatunnel.engine.core.job.JobResult;
import org.apache.seatunnel.engine.core.job.JobStatus;
import org.apache.seatunnel.engine.server.SeaTunnelServerStarter;

import org.awaitility.Awaitility;
Expand All @@ -31,6 +37,7 @@
import lombok.extern.slf4j.Slf4j;

import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;

@Slf4j
Expand Down Expand Up @@ -89,4 +96,70 @@ public void getClusterHealthMetrics() {
}
}
}

@Test
public void testTaskGroupErrorMsgLost() throws Exception {
HazelcastInstanceImpl node1 = null;
SeaTunnelClient engineClient = null;

String testClusterName = "Test_TaskGroupErrorMsgLost";

SeaTunnelConfig seaTunnelConfig = ConfigProvider.locateAndGetSeaTunnelConfig();
seaTunnelConfig
.getHazelcastConfig()
.setClusterName(TestUtils.getClusterName(testClusterName));
seaTunnelConfig.getEngineConfig().setClassloaderCacheMode(true);

try {
node1 = SeaTunnelServerStarter.createHazelcastInstance(seaTunnelConfig);
HazelcastInstanceImpl finalNode = node1;
Awaitility.await()
.atMost(10000, TimeUnit.MILLISECONDS)
.untilAsserted(
() ->
Assertions.assertEquals(
1, finalNode.getCluster().getMembers().size()));

ClientConfig clientConfig = ConfigProvider.locateAndGetClientConfig();
clientConfig.setClusterName(TestUtils.getClusterName(testClusterName));
engineClient = new SeaTunnelClient(clientConfig);

String filePath =
TestUtils.getResource("stream_fake_to_inmemory_with_runtime_list.conf");
JobConfig jobConfig = new JobConfig();
jobConfig.setName(testClusterName);
ClientJobExecutionEnvironment jobExecutionEnv =
engineClient.createExecutionContext(filePath, jobConfig, seaTunnelConfig);

final ClientJobProxy clientJobProxy = jobExecutionEnv.execute();

CompletableFuture<PassiveCompletableFuture<JobResult>> objectCompletableFuture =
CompletableFuture.supplyAsync(clientJobProxy::doWaitForJobComplete);

Awaitility.await()
.atMost(120000, TimeUnit.MILLISECONDS)
.untilAsserted(
() -> {
Thread.sleep(2000);
Assertions.assertTrue(objectCompletableFuture.isDone());

PassiveCompletableFuture<JobResult>
jobResultPassiveCompletableFuture =
objectCompletableFuture.get();
JobResult jobResult = jobResultPassiveCompletableFuture.get();
Assertions.assertEquals(JobStatus.FAILED, jobResult.getStatus());
Assertions.assertTrue(
jobResult.getError().contains("runtime error 4"));
});

} finally {
if (engineClient != null) {
engineClient.close();
}

if (node1 != null) {
node1.shutdown();
}
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
######
###### This config file is a demonstration of streaming processing in seatunnel config
######

env {
parallelism = 1
job.mode = "STREAMING"
checkpoint.interval = 5000
}

source {
# This is a example source plugin **only for test and demonstrate the feature source plugin**
FakeSource {
result_table_name = "fake"
row.num = 100
split.num = 5
schema = {
fields {
name = "string"
age = "int"
}
}
parallelism = 1
}
}

transform {
}

sink {
InMemory {
source_table_name="fake"
throw_runtime_exception_list=["runtime error1", "runtime error 2", "runtime error 3", "runtime error 4"]
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -214,7 +214,7 @@ public PassiveCompletableFuture<TaskExecutionState> initStateFuture() {
}
} else if (ExecutionState.DEPLOYING.equals(currExecutionState)) {
if (!checkTaskGroupIsExecuting(taskGroupLocation)) {
updateTaskState(ExecutionState.RUNNING);
updateTaskState(ExecutionState.FAILING);
}
}
return new PassiveCompletableFuture<>(this.taskFuture);
Expand Down Expand Up @@ -485,6 +485,8 @@ private void resetExecutionState() {
() -> {
updateStateTimestamps(ExecutionState.CREATED);
runningJobStateIMap.set(taskGroupLocation, ExecutionState.CREATED);
// reset the errorByPhysicalVertex
errorByPhysicalVertex = new AtomicReference<>();
return null;
},
new RetryUtils.RetryMaterial(
Expand Down

0 comments on commit fe0c477

Please sign in to comment.