diff --git a/seatunnel-common/src/main/java/org/apache/seatunnel/common/Handover.java b/seatunnel-common/src/main/java/org/apache/seatunnel/common/Handover.java index 1686514a15b..3132d93a169 100644 --- a/seatunnel-common/src/main/java/org/apache/seatunnel/common/Handover.java +++ b/seatunnel-common/src/main/java/org/apache/seatunnel/common/Handover.java @@ -30,7 +30,10 @@ public final class Handover implements Closeable { new LinkedBlockingQueue<>(DEFAULT_QUEUE_SIZE); private Throwable error; - public boolean isEmpty() { + public boolean isEmpty() throws Exception { + if (error != null) { + rethrowException(error, error.getMessage()); + } return blockingQueue.isEmpty(); } diff --git a/seatunnel-common/src/test/java/org/apache/seatunnel/common/HandoverTest.java b/seatunnel-common/src/test/java/org/apache/seatunnel/common/HandoverTest.java new file mode 100644 index 00000000000..199a2d4e723 --- /dev/null +++ b/seatunnel-common/src/test/java/org/apache/seatunnel/common/HandoverTest.java @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.common; + +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; + +public class HandoverTest { + + @Test + public void testThrowExceptionWhenQueueIsEmtpy() { + Handover handover = new Handover<>(); + handover.reportError(new RuntimeException("test")); + Assertions.assertThrows(RuntimeException.class, handover::isEmpty); + } +} diff --git a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaRecordEmitter.java b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaRecordEmitter.java index 6593137aff7..87d2b7b7c9f 100644 --- a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaRecordEmitter.java +++ b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaRecordEmitter.java @@ -31,7 +31,6 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.io.IOException; import java.util.Map; public class KafkaRecordEmitter @@ -71,13 +70,14 @@ public void emitRecord( // consumerRecord.offset + 1 is the offset commit to Kafka and also the start offset // for the next run splitState.setCurrentOffset(consumerRecord.offset() + 1); - } catch (IOException e) { + } catch (Exception e) { if (this.messageFormatErrorHandleWay == MessageFormatErrorHandleWay.SKIP) { logger.warn( "Deserialize message failed, skip this message, message: {}", new String(consumerRecord.value())); + } else { + throw e; } - throw e; } } diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/java/org/apache/seatunnel/e2e/connector/kafka/KafkaIT.java b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/java/org/apache/seatunnel/e2e/connector/kafka/KafkaIT.java index ffc97f4dd33..4a57cbdbd35 100644 --- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/java/org/apache/seatunnel/e2e/connector/kafka/KafkaIT.java +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/java/org/apache/seatunnel/e2e/connector/kafka/KafkaIT.java @@ -291,7 +291,7 @@ public void testSourceKafkaJsonFormatErrorHandleWaySkipToConsole(TestContainer c DEFAULT_FORMAT, DEFAULT_FIELD_DELIMITER, null); - generateTestData(row -> serializer.serializeRow(row), 0, 100); + generateTestData(serializer::serializeRow, 0, 100); Container.ExecResult execResult = container.executeJob( "/kafka/kafkasource_format_error_handle_way_skip_to_console.conf"); @@ -308,11 +308,11 @@ public void testSourceKafkaJsonFormatErrorHandleWayFailToConsole(TestContainer c DEFAULT_FORMAT, DEFAULT_FIELD_DELIMITER, null); - generateTestData(row -> serializer.serializeRow(row), 0, 100); + generateTestData(serializer::serializeRow, 0, 100); Container.ExecResult execResult = container.executeJob( "/kafka/kafkasource_format_error_handle_way_fail_to_console.conf"); - Assertions.assertEquals(0, execResult.getExitCode(), execResult.getStderr()); + Assertions.assertEquals(1, execResult.getExitCode(), execResult.getStderr()); } @TestTemplate diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/kafka/kafkasource_format_error_handle_way_fail_to_console.conf b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/kafka/kafkasource_format_error_handle_way_fail_to_console.conf index d2a0f05354d..dd1390d1679 100644 --- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/kafka/kafkasource_format_error_handle_way_fail_to_console.conf +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/kafka/kafkasource_format_error_handle_way_fail_to_console.conf @@ -37,6 +37,7 @@ source { result_table_name = "kafka_table" start_mode = "earliest" format_error_handle_way = fail + format = text schema = { fields { id = bigint diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/kafka/kafkasource_format_error_handle_way_skip_to_console.conf b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/kafka/kafkasource_format_error_handle_way_skip_to_console.conf index 88b6098b5e5..a34856d31b1 100644 --- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/kafka/kafkasource_format_error_handle_way_skip_to_console.conf +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/kafka/kafkasource_format_error_handle_way_skip_to_console.conf @@ -37,6 +37,7 @@ source { result_table_name = "kafka_table" start_mode = "earliest" format_error_handle_way = skip + format = text schema = { fields { id = bigint diff --git a/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-2.4/src/main/java/org/apache/seatunnel/translation/spark/source/reader/SeaTunnelInputPartitionReader.java b/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-2.4/src/main/java/org/apache/seatunnel/translation/spark/source/reader/SeaTunnelInputPartitionReader.java index 26e554b7fdb..4b651ec0535 100644 --- a/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-2.4/src/main/java/org/apache/seatunnel/translation/spark/source/reader/SeaTunnelInputPartitionReader.java +++ b/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-2.4/src/main/java/org/apache/seatunnel/translation/spark/source/reader/SeaTunnelInputPartitionReader.java @@ -34,7 +34,11 @@ public SeaTunnelInputPartitionReader(ParallelBatchPartitionReader partitionReade @Override public boolean next() throws IOException { - return partitionReader.next(); + try { + return partitionReader.next(); + } catch (Exception e) { + throw new RuntimeException(e); + } } @Override diff --git a/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-2.4/src/main/java/org/apache/seatunnel/translation/spark/source/reader/batch/ParallelBatchPartitionReader.java b/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-2.4/src/main/java/org/apache/seatunnel/translation/spark/source/reader/batch/ParallelBatchPartitionReader.java index 20240638702..e20dca09d51 100644 --- a/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-2.4/src/main/java/org/apache/seatunnel/translation/spark/source/reader/batch/ParallelBatchPartitionReader.java +++ b/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-2.4/src/main/java/org/apache/seatunnel/translation/spark/source/reader/batch/ParallelBatchPartitionReader.java @@ -84,7 +84,7 @@ protected String getEnumeratorThreadName() { return String.format("parallel-split-enumerator-executor-%s", subtaskId); } - public boolean next() throws IOException { + public boolean next() throws Exception { prepare(); while (running && handover.isEmpty()) { try { diff --git a/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-3.3/src/main/java/org/apache/seatunnel/translation/spark/source/partition/batch/ParallelBatchPartitionReader.java b/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-3.3/src/main/java/org/apache/seatunnel/translation/spark/source/partition/batch/ParallelBatchPartitionReader.java index 27ab9f42d2c..5ca32d6775a 100644 --- a/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-3.3/src/main/java/org/apache/seatunnel/translation/spark/source/partition/batch/ParallelBatchPartitionReader.java +++ b/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-3.3/src/main/java/org/apache/seatunnel/translation/spark/source/partition/batch/ParallelBatchPartitionReader.java @@ -84,7 +84,7 @@ protected String getEnumeratorThreadName() { return String.format("parallel-split-enumerator-executor-%s", subtaskId); } - public boolean next() throws IOException { + public boolean next() throws Exception { prepare(); while (running && handover.isEmpty()) { try { diff --git a/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-3.3/src/main/java/org/apache/seatunnel/translation/spark/source/partition/batch/SeaTunnelBatchPartitionReader.java b/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-3.3/src/main/java/org/apache/seatunnel/translation/spark/source/partition/batch/SeaTunnelBatchPartitionReader.java index e9c9268a463..9841a0dfbdd 100644 --- a/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-3.3/src/main/java/org/apache/seatunnel/translation/spark/source/partition/batch/SeaTunnelBatchPartitionReader.java +++ b/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-3.3/src/main/java/org/apache/seatunnel/translation/spark/source/partition/batch/SeaTunnelBatchPartitionReader.java @@ -32,7 +32,11 @@ public SeaTunnelBatchPartitionReader(ParallelBatchPartitionReader partitionReade @Override public boolean next() throws IOException { - return partitionReader.next(); + try { + return partitionReader.next(); + } catch (Exception e) { + throw new RuntimeException(e); + } } @Override diff --git a/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-3.3/src/main/java/org/apache/seatunnel/translation/spark/source/partition/micro/SeaTunnelMicroBatchPartitionReader.java b/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-3.3/src/main/java/org/apache/seatunnel/translation/spark/source/partition/micro/SeaTunnelMicroBatchPartitionReader.java index 61d466d946d..597be91d675 100644 --- a/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-3.3/src/main/java/org/apache/seatunnel/translation/spark/source/partition/micro/SeaTunnelMicroBatchPartitionReader.java +++ b/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-3.3/src/main/java/org/apache/seatunnel/translation/spark/source/partition/micro/SeaTunnelMicroBatchPartitionReader.java @@ -34,7 +34,11 @@ public SeaTunnelMicroBatchPartitionReader(ParallelBatchPartitionReader partition @Override public boolean next() throws IOException { - return partitionReader.next(); + try { + return partitionReader.next(); + } catch (Exception e) { + throw new RuntimeException(e); + } } @Override