From aec5c437246d92b471b8e2a78d76c2f0a4563d1a Mon Sep 17 00:00:00 2001 From: zhouyao Date: Fri, 21 Oct 2022 15:25:06 +0800 Subject: [PATCH 1/6] Support setting read starting offset or time at startup config --- docs/en/connector-v2/source/kafka.md | 25 +++- .../seatunnel/kafka/config/Config.java | 15 +++ .../seatunnel/kafka/config/StartMode.java | 45 +++++++ .../kafka/source/ConsumerMetadata.java | 32 +++++ .../seatunnel/kafka/source/KafkaSource.java | 56 ++++++++- .../source/KafkaSourceSplitEnumerator.java | 119 +++++++++++++----- 6 files changed, 256 insertions(+), 36 deletions(-) create mode 100644 seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/config/StartMode.java diff --git a/docs/en/connector-v2/source/kafka.md b/docs/en/connector-v2/source/kafka.md index 425817ca512..d644bc9ac69 100644 --- a/docs/en/connector-v2/source/kafka.md +++ b/docs/en/connector-v2/source/kafka.md @@ -18,7 +18,7 @@ Source connector for Apache Kafka. ## Options | name | type | required | default value | -| -------------------- | ------- | -------- | ------------------------ | +|----------------------|---------| -------- |--------------------------| | topic | String | yes | - | | bootstrap.servers | String | yes | - | | pattern | Boolean | no | false | @@ -28,6 +28,9 @@ Source connector for Apache Kafka. | common-options | | no | - | | schema | | no | - | | format | String | no | json | +| start_mode | String | no | group_offsets | +| start_mode.offsets | | no | | +| start_mode.timestamp | Long | no | | ### topic [string] @@ -66,6 +69,24 @@ The structure of the data, including field names and field types. Data format. The default format is json. Optional text format. The default field separator is ", ". If you customize the delimiter, add the "field_delimiter" option. +## start_mode +The initial consumption pattern of consumers,there are several types: +[earliest],[group_offsets],[latest],[specific_offsets] + +## start_mode.timestamp +The time required for consumption mode to be timestamp + +## start_mode.offsets +The offset required for consumption mode to be specific_offsets +for example: +```hocon + start_mode.offsets = { + info-0 = 70 + info-1 = 10 + info-2 = 10 + } +``` + ## Example ### Simple @@ -84,7 +105,7 @@ source { format = text field_delimiter = "#“ topic = "topic_1,topic_2,topic_3" - bootstrap.server = "localhost:9092" + bootstrap.servers = "localhost:9092" kafka.max.poll.records = 500 kafka.client.id = client_1 } diff --git a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/config/Config.java b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/config/Config.java index 2dedcd13df6..f9c28d5463f 100644 --- a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/config/Config.java +++ b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/config/Config.java @@ -90,4 +90,19 @@ public class Config { */ public static final String PARTITION_KEY = "partition_key"; + /** + * The initial consumption pattern of consumers + */ + public static final String START_MODE = "start_mode"; + + /** + * The time required for consumption mode to be timestamp + */ + public static final String START_MODE_TIMESTAMP = "start_mode.timestamp"; + + /** + * The offset required for consumption mode to be specific_offsets + */ + public static final String START_MODE_OFFSETS = "start_mode.offsets"; + } diff --git a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/config/StartMode.java b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/config/StartMode.java new file mode 100644 index 00000000000..11e3aa03a8c --- /dev/null +++ b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/config/StartMode.java @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.seatunnel.connectors.seatunnel.kafka.config; + +public enum StartMode { + + EARLIEST("earliest"), + + GROUP_OFFSETS("group_offsets"), + + LATEST("latest"), + + TIMESTAMP("timestamp"), + + SPECIFIC_OFFSETS("specific_offsets"); + + private String mode; + + StartMode(String mode) { + this.mode = mode; + } + + public String getMode() { + return mode; + } + + @Override + public String toString() { + return mode; + } +} diff --git a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/ConsumerMetadata.java b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/ConsumerMetadata.java index ee942aaefc7..615d89416ee 100644 --- a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/ConsumerMetadata.java +++ b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/ConsumerMetadata.java @@ -17,7 +17,12 @@ package org.apache.seatunnel.connectors.seatunnel.kafka.source; +import org.apache.seatunnel.connectors.seatunnel.kafka.config.StartMode; + +import org.apache.kafka.common.TopicPartition; + import java.io.Serializable; +import java.util.Map; import java.util.Properties; /** @@ -31,6 +36,9 @@ public class ConsumerMetadata implements Serializable { private Properties properties; private String consumerGroup; private boolean commitOnCheckpoint = false; + private StartMode startMode = StartMode.GROUP_OFFSETS; + private Map specificStartOffsets; + private Long startOffsetsTimestamp; public boolean isCommitOnCheckpoint() { return commitOnCheckpoint; @@ -79,4 +87,28 @@ public String getConsumerGroup() { public void setConsumerGroup(String consumerGroup) { this.consumerGroup = consumerGroup; } + + public StartMode getStartMode() { + return startMode; + } + + public void setStartMode(StartMode startMode) { + this.startMode = startMode; + } + + public Map getSpecificStartOffsets() { + return specificStartOffsets; + } + + public void setSpecificStartOffsets(Map specificStartOffsets) { + this.specificStartOffsets = specificStartOffsets; + } + + public Long getStartOffsetsTimestamp() { + return startOffsetsTimestamp; + } + + public void setStartOffsetsTimestamp(Long startOffsetsTimestamp) { + this.startOffsetsTimestamp = startOffsetsTimestamp; + } } diff --git a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSource.java b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSource.java index 3c608f4a59b..07882ea49d8 100644 --- a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSource.java +++ b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSource.java @@ -26,6 +26,9 @@ import static org.apache.seatunnel.connectors.seatunnel.kafka.config.Config.FORMAT; import static org.apache.seatunnel.connectors.seatunnel.kafka.config.Config.PATTERN; import static org.apache.seatunnel.connectors.seatunnel.kafka.config.Config.SCHEMA; +import static org.apache.seatunnel.connectors.seatunnel.kafka.config.Config.START_MODE; +import static org.apache.seatunnel.connectors.seatunnel.kafka.config.Config.START_MODE_OFFSETS; +import static org.apache.seatunnel.connectors.seatunnel.kafka.config.Config.START_MODE_TIMESTAMP; import static org.apache.seatunnel.connectors.seatunnel.kafka.config.Config.TOPIC; import org.apache.seatunnel.api.common.JobContext; @@ -42,15 +45,22 @@ import org.apache.seatunnel.common.config.TypesafeConfigUtils; import org.apache.seatunnel.common.constants.JobMode; import org.apache.seatunnel.common.constants.PluginType; +import org.apache.seatunnel.common.utils.JsonUtils; import org.apache.seatunnel.connectors.seatunnel.common.schema.SeaTunnelSchema; +import org.apache.seatunnel.connectors.seatunnel.kafka.config.StartMode; import org.apache.seatunnel.connectors.seatunnel.kafka.state.KafkaSourceState; import org.apache.seatunnel.format.json.JsonDeserializationSchema; import org.apache.seatunnel.format.text.TextDeserializationSchema; import org.apache.seatunnel.shade.com.typesafe.config.Config; +import org.apache.seatunnel.shade.com.typesafe.config.ConfigRenderOptions; +import com.fasterxml.jackson.databind.node.ObjectNode; import com.google.auto.service.AutoService; +import org.apache.kafka.common.TopicPartition; +import java.util.HashMap; +import java.util.Map; import java.util.Properties; @AutoService(SeaTunnelSource.class) @@ -96,6 +106,40 @@ public void prepare(Config config) throws PrepareFailException { this.metadata.setCommitOnCheckpoint(config.getBoolean(COMMIT_ON_CHECKPOINT)); } + if (config.hasPath(START_MODE)) { + StartMode startMode = StartMode.valueOf(config.getString(START_MODE).toUpperCase()); + this.metadata.setStartMode(startMode); + switch (startMode) { + case TIMESTAMP: + long startOffsetsTimestamp = config.getLong(START_MODE_TIMESTAMP); + long currentTimestamp = System.currentTimeMillis(); + if (startOffsetsTimestamp < 0 || startOffsetsTimestamp > currentTimestamp) { + throw new IllegalArgumentException("start_mode.timestamp The value is smaller than 0 or smaller than the current time"); + } + this.metadata.setStartOffsetsTimestamp(startOffsetsTimestamp); + break; + case SPECIFIC_OFFSETS: + Config offsets = config.getConfig(START_MODE_OFFSETS); + ConfigRenderOptions options = ConfigRenderOptions.concise(); + String offsetsJson = offsets.root().render(options); + if (offsetsJson == null) { + throw new IllegalArgumentException("start mode is " + StartMode.SPECIFIC_OFFSETS + "but no specific offsets were specified."); + } + Map specificStartOffsets = new HashMap<>(); + ObjectNode jsonNodes = JsonUtils.parseObject(offsetsJson); + jsonNodes.fieldNames().forEachRemaining(key -> { + String[] topicAndPartition = key.split("-"); + long offset = jsonNodes.get(key).asLong(); + TopicPartition topicPartition = new TopicPartition(topicAndPartition[0], Integer.valueOf(topicAndPartition[1])); + specificStartOffsets.put(topicPartition, offset); + }); + this.metadata.setSpecificStartOffsets(specificStartOffsets); + break; + default: + break; + } + } + TypesafeConfigUtils.extractSubConfig(config, "kafka.", false).entrySet().forEach(e -> { this.metadata.getProperties().put(e.getKey(), String.valueOf(e.getValue().unwrapped())); }); @@ -144,9 +188,9 @@ private void setDeserialization(Config config) { delimiter = config.getString(FIELD_DELIMITER); } deserializationSchema = TextDeserializationSchema.builder() - .seaTunnelRowType(typeInfo) - .delimiter(delimiter) - .build(); + .seaTunnelRowType(typeInfo) + .delimiter(delimiter) + .build(); } else { // TODO: use format SPI throw new UnsupportedOperationException("Unsupported format: " + format); @@ -154,9 +198,9 @@ private void setDeserialization(Config config) { } else { typeInfo = SeaTunnelSchema.buildSimpleTextSchema(); this.deserializationSchema = TextDeserializationSchema.builder() - .seaTunnelRowType(typeInfo) - .delimiter(String.valueOf('\002')) - .build(); + .seaTunnelRowType(typeInfo) + .delimiter(String.valueOf('\002')) + .build(); } } } diff --git a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSourceSplitEnumerator.java b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSourceSplitEnumerator.java index 0852fe8fdd6..0390536a864 100644 --- a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSourceSplitEnumerator.java +++ b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSourceSplitEnumerator.java @@ -23,7 +23,7 @@ import lombok.extern.slf4j.Slf4j; import org.apache.kafka.clients.admin.AdminClient; -import org.apache.kafka.clients.admin.ListOffsetsResult; +import org.apache.kafka.clients.admin.ListConsumerGroupOffsetsOptions; import org.apache.kafka.clients.admin.OffsetSpec; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.common.TopicPartition; @@ -33,7 +33,6 @@ import java.util.Arrays; import java.util.Collection; import java.util.HashMap; -import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Properties; @@ -51,14 +50,14 @@ public class KafkaSourceSplitEnumerator implements SourceSplitEnumerator context; private AdminClient adminClient; - private Set pendingSplit; - private final Set assignedSplit; + private Map pendingSplit; + private final Map assignedSplit; KafkaSourceSplitEnumerator(ConsumerMetadata metadata, Context context) { this.metadata = metadata; this.context = context; - this.assignedSplit = new HashSet<>(); - this.pendingSplit = new HashSet<>(); + this.assignedSplit = new HashMap<>(); + this.pendingSplit = new HashMap<>(); } KafkaSourceSplitEnumerator(ConsumerMetadata metadata, Context context, @@ -74,13 +73,45 @@ public void open() { @Override public void run() throws ExecutionException, InterruptedException { getTopicInfo().forEach(split -> { - if (!assignedSplit.contains(split)) { - pendingSplit.add(split); + if (!assignedSplit.containsKey(split.getTopicPartition())) { + if (!pendingSplit.containsKey(split.getTopicPartition())) { + pendingSplit.put(split.getTopicPartition(), split); + } } }); + setPartitionStartOffset(); assignSplit(); } + private void setPartitionStartOffset() throws ExecutionException, InterruptedException { + Collection topicPartitions = pendingSplit.keySet(); + Map topicPartitionOffsets = null; + switch (metadata.getStartMode()) { + case EARLIEST: + topicPartitionOffsets = listOffsets(topicPartitions, OffsetSpec.earliest()); + break; + case GROUP_OFFSETS: + topicPartitionOffsets = listConsumerGroupOffsets(topicPartitions); + break; + case LATEST: + topicPartitionOffsets = listOffsets(topicPartitions, OffsetSpec.earliest()); + break; + case TIMESTAMP: + topicPartitionOffsets = listOffsets(topicPartitions, OffsetSpec.forTimestamp(metadata.getStartOffsetsTimestamp())); + break; + case SPECIFIC_OFFSETS: + topicPartitionOffsets = metadata.getSpecificStartOffsets(); + break; + default: + break; + } + topicPartitionOffsets.entrySet().forEach(entry -> { + if (pendingSplit.containsKey(entry.getKey())) { + pendingSplit.get(entry.getKey()).setStartOffset(entry.getValue()); + } + }); + } + @Override public void close() throws IOException { if (this.adminClient != null) { @@ -91,20 +122,20 @@ public void close() throws IOException { @Override public void addSplitsBack(List splits, int subtaskId) { if (!splits.isEmpty()) { - pendingSplit.addAll(convertToNextSplit(splits)); + pendingSplit.putAll(convertToNextSplit(splits)); assignSplit(); } } - private Collection convertToNextSplit(List splits) { + private Map convertToNextSplit(List splits) { try { - Map listOffsets = - getKafkaPartitionLatestOffset(splits.stream().map(KafkaSourceSplit::getTopicPartition).collect(Collectors.toList())); + Map listOffsets = + listOffsets(splits.stream().map(KafkaSourceSplit::getTopicPartition).collect(Collectors.toList()), OffsetSpec.earliest()); splits.forEach(split -> { split.setStartOffset(split.getEndOffset() + 1); - split.setEndOffset(listOffsets.get(split.getTopicPartition()).offset()); + split.setEndOffset(listOffsets.get(split.getTopicPartition())); }); - return splits; + return splits.stream().collect(Collectors.toMap(split -> split.getTopicPartition(), split -> split)); } catch (Exception e) { throw new RuntimeException(e); } @@ -129,7 +160,7 @@ public void registerReader(int subtaskId) { @Override public KafkaSourceState snapshotState(long checkpointId) throws Exception { - return new KafkaSourceState(assignedSplit); + return new KafkaSourceState(assignedSplit.values().stream().collect(Collectors.toSet())); } @Override @@ -158,34 +189,28 @@ private Set getTopicInfo() throws ExecutionException, Interrup Collection partitions = adminClient.describeTopics(topics).all().get().values().stream().flatMap(t -> t.partitions().stream() .map(p -> new TopicPartition(t.name(), p.partition()))).collect(Collectors.toSet()); - return getKafkaPartitionLatestOffset(partitions).entrySet().stream().map(partition -> { - KafkaSourceSplit split = new KafkaSourceSplit(partition.getKey()); - split.setEndOffset(partition.getValue().offset()); + return partitions.stream().map(partition -> { + KafkaSourceSplit split = new KafkaSourceSplit(partition); return split; }).collect(Collectors.toSet()); } - private Map getKafkaPartitionLatestOffset(Collection partitions) throws InterruptedException, ExecutionException { - return adminClient.listOffsets(partitions.stream().collect(Collectors.toMap(p -> p, p -> OffsetSpec.latest()))) - .all().get(); - } - private void assignSplit() { Map> readySplit = new HashMap<>(Common.COLLECTION_SIZE); for (int taskID = 0; taskID < context.currentParallelism(); taskID++) { readySplit.computeIfAbsent(taskID, id -> new ArrayList<>()); } - pendingSplit.forEach(s -> { - if (!assignedSplit.contains(s)) { - readySplit.get(getSplitOwner(s.getTopicPartition(), context.currentParallelism())) - .add(s); + pendingSplit.entrySet().forEach(s -> { + if (!assignedSplit.containsKey(s.getKey())) { + readySplit.get(getSplitOwner(s.getKey(), context.currentParallelism())) + .add(s.getValue()); } }); readySplit.forEach(context::assignSplit); - assignedSplit.addAll(pendingSplit); + assignedSplit.putAll(pendingSplit); pendingSplit.clear(); } @@ -195,4 +220,42 @@ private static int getSplitOwner(TopicPartition tp, int numReaders) { return (startIndex + tp.partition()) % numReaders; } + private Map listOffsets(Collection partitions, OffsetSpec offsetSpec) throws ExecutionException, InterruptedException { + Map topicPartitionOffsets = partitions.stream().collect(Collectors.toMap(partition -> partition, __ -> offsetSpec)); + + return adminClient.listOffsets(topicPartitionOffsets).all() + .thenApply( + result -> { + Map + offsets = new HashMap<>(); + result.forEach( + (tp, offsetsResultInfo) -> { + if (offsetsResultInfo != null) { + offsets.put(tp, offsetsResultInfo.offset()); + } + }); + return offsets; + }) + .get(); + } + + public Map listConsumerGroupOffsets(Collection partitions) throws ExecutionException, InterruptedException { + ListConsumerGroupOffsetsOptions options = new ListConsumerGroupOffsetsOptions().topicPartitions(new ArrayList<>(partitions)); + return adminClient + .listConsumerGroupOffsets(metadata.getConsumerGroup(), options) + .partitionsToOffsetAndMetadata() + .thenApply( + result -> { + Map offsets = new HashMap<>(); + result.forEach( + (tp, oam) -> { + if (oam != null) { + offsets.put(tp, oam.offset()); + } + }); + return offsets; + }) + .get(); + } + } From 1d48d85b521029cbb539f72b8235ded5bc2b400e Mon Sep 17 00:00:00 2001 From: zhouyao Date: Fri, 21 Oct 2022 16:12:05 +0800 Subject: [PATCH 2/6] checkstyle --- .../seatunnel/connectors/seatunnel/kafka/config/StartMode.java | 1 + 1 file changed, 1 insertion(+) diff --git a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/config/StartMode.java b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/config/StartMode.java index 11e3aa03a8c..21ec4ffb22a 100644 --- a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/config/StartMode.java +++ b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/config/StartMode.java @@ -14,6 +14,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + package org.apache.seatunnel.connectors.seatunnel.kafka.config; public enum StartMode { From 507c184327244895a700f966600d1c584ce89933 Mon Sep 17 00:00:00 2001 From: zhouyao Date: Wed, 26 Oct 2022 18:20:00 +0800 Subject: [PATCH 3/6] [Improve][Connector-V2-kafka] Support setting read starting offset or time at startup config --- .../seatunnel/kafka/source/KafkaSourceSplitEnumerator.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSourceSplitEnumerator.java b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSourceSplitEnumerator.java index 0390536a864..e814161c949 100644 --- a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSourceSplitEnumerator.java +++ b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSourceSplitEnumerator.java @@ -94,7 +94,7 @@ private void setPartitionStartOffset() throws ExecutionException, InterruptedExc topicPartitionOffsets = listConsumerGroupOffsets(topicPartitions); break; case LATEST: - topicPartitionOffsets = listOffsets(topicPartitions, OffsetSpec.earliest()); + topicPartitionOffsets = listOffsets(topicPartitions, OffsetSpec.latest()); break; case TIMESTAMP: topicPartitionOffsets = listOffsets(topicPartitions, OffsetSpec.forTimestamp(metadata.getStartOffsetsTimestamp())); @@ -130,7 +130,7 @@ public void addSplitsBack(List splits, int subtaskId) { private Map convertToNextSplit(List splits) { try { Map listOffsets = - listOffsets(splits.stream().map(KafkaSourceSplit::getTopicPartition).collect(Collectors.toList()), OffsetSpec.earliest()); + listOffsets(splits.stream().map(KafkaSourceSplit::getTopicPartition).collect(Collectors.toList()), OffsetSpec.latest()); splits.forEach(split -> { split.setStartOffset(split.getEndOffset() + 1); split.setEndOffset(listOffsets.get(split.getTopicPartition())); From 0fc7bf45c935313c5f776e799e8f0b9061a603f7 Mon Sep 17 00:00:00 2001 From: zhouyao Date: Mon, 31 Oct 2022 18:51:55 +0800 Subject: [PATCH 4/6] [Improve][Connector-V2-kafka] Support setting read starting offset or time at startup config --- docs/en/connector-v2/source/kafka.md | 2 +- .../source/KafkaSourceSplitEnumerator.java | 2 + .../v2/kafka/KafkaSourceJsonToConsoleIT.java | 65 +----------- .../KafkaSourceStartConfigToConsoleIT.java | 81 +++++++++++++++ .../v2/kafka/KafkaSourceTextToConsoleIT.java | 66 +------------ .../e2e/flink/v2/kafka/KafkaTestBaseIT.java | 73 ++++++++++++++ .../kafkasource_earliest_to_console.conf | 75 ++++++++++++++ .../kafkasource_group_offset_to_console.conf | 75 ++++++++++++++ .../kafka/kafkasource_latest_to_console.conf | 75 ++++++++++++++ ...fkasource_specific_offsets_to_console.conf | 79 +++++++++++++++ .../kafkasource_timestamp_to_console.conf | 76 ++++++++++++++ .../v2/kafka/KafkaSourceJsonToConsoleIT.java | 62 +----------- .../KafkaSourceStartConfigToConsoleIT.java | 98 +++++++++++++++++++ .../v2/kafka/KafkaSourceTextToConsoleIT.java | 63 +----------- .../e2e/spark/v2/kafka/KafkaTestBaseIT.java | 90 +++++++++++++++++ .../kafkasource_earliest_to_console.conf | 75 ++++++++++++++ .../kafkasource_group_offset_to_console.conf | 75 ++++++++++++++ .../kafka/kafkasource_json_to_console.conf | 2 +- .../kafka/kafkasource_latest_to_console.conf | 75 ++++++++++++++ ...fkasource_specific_offsets_to_console.conf | 79 +++++++++++++++ .../kafka/kafkasource_text_to_console.conf | 2 +- .../kafkasource_timestamp_to_console.conf | 76 ++++++++++++++ 22 files changed, 1117 insertions(+), 249 deletions(-) create mode 100644 seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-kafka-flink-e2e/src/test/java/org/apache/seatunnel/e2e/flink/v2/kafka/KafkaSourceStartConfigToConsoleIT.java create mode 100644 seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-kafka-flink-e2e/src/test/java/org/apache/seatunnel/e2e/flink/v2/kafka/KafkaTestBaseIT.java create mode 100644 seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-kafka-flink-e2e/src/test/resources/kafka/kafkasource_earliest_to_console.conf create mode 100644 seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-kafka-flink-e2e/src/test/resources/kafka/kafkasource_group_offset_to_console.conf create mode 100644 seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-kafka-flink-e2e/src/test/resources/kafka/kafkasource_latest_to_console.conf create mode 100644 seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-kafka-flink-e2e/src/test/resources/kafka/kafkasource_specific_offsets_to_console.conf create mode 100644 seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-kafka-flink-e2e/src/test/resources/kafka/kafkasource_timestamp_to_console.conf create mode 100644 seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-kafka-spark-e2e/src/test/java/org/apache/seatunnel/e2e/spark/v2/kafka/KafkaSourceStartConfigToConsoleIT.java create mode 100644 seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-kafka-spark-e2e/src/test/java/org/apache/seatunnel/e2e/spark/v2/kafka/KafkaTestBaseIT.java create mode 100644 seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-kafka-spark-e2e/src/test/resources/kafka/kafkasource_earliest_to_console.conf create mode 100644 seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-kafka-spark-e2e/src/test/resources/kafka/kafkasource_group_offset_to_console.conf create mode 100644 seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-kafka-spark-e2e/src/test/resources/kafka/kafkasource_latest_to_console.conf create mode 100644 seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-kafka-spark-e2e/src/test/resources/kafka/kafkasource_specific_offsets_to_console.conf create mode 100644 seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-kafka-spark-e2e/src/test/resources/kafka/kafkasource_timestamp_to_console.conf diff --git a/docs/en/connector-v2/source/kafka.md b/docs/en/connector-v2/source/kafka.md index d644bc9ac69..966ebd972c0 100644 --- a/docs/en/connector-v2/source/kafka.md +++ b/docs/en/connector-v2/source/kafka.md @@ -71,7 +71,7 @@ If you customize the delimiter, add the "field_delimiter" option. ## start_mode The initial consumption pattern of consumers,there are several types: -[earliest],[group_offsets],[latest],[specific_offsets] +[earliest],[group_offsets],[latest],[specific_offsets],[timestamp] ## start_mode.timestamp The time required for consumption mode to be timestamp diff --git a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSourceSplitEnumerator.java b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSourceSplitEnumerator.java index e814161c949..59c1aba7faf 100644 --- a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSourceSplitEnumerator.java +++ b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSourceSplitEnumerator.java @@ -189,8 +189,10 @@ private Set getTopicInfo() throws ExecutionException, Interrup Collection partitions = adminClient.describeTopics(topics).all().get().values().stream().flatMap(t -> t.partitions().stream() .map(p -> new TopicPartition(t.name(), p.partition()))).collect(Collectors.toSet()); + Map latestOffsets = listOffsets(partitions, OffsetSpec.latest()); return partitions.stream().map(partition -> { KafkaSourceSplit split = new KafkaSourceSplit(partition); + split.setEndOffset(latestOffsets.get(split.getTopicPartition())); return split; }).collect(Collectors.toSet()); } diff --git a/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-kafka-flink-e2e/src/test/java/org/apache/seatunnel/e2e/flink/v2/kafka/KafkaSourceJsonToConsoleIT.java b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-kafka-flink-e2e/src/test/java/org/apache/seatunnel/e2e/flink/v2/kafka/KafkaSourceJsonToConsoleIT.java index e3468e8dd34..c1499b14edb 100644 --- a/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-kafka-flink-e2e/src/test/java/org/apache/seatunnel/e2e/flink/v2/kafka/KafkaSourceJsonToConsoleIT.java +++ b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-kafka-flink-e2e/src/test/java/org/apache/seatunnel/e2e/flink/v2/kafka/KafkaSourceJsonToConsoleIT.java @@ -27,69 +27,28 @@ import org.apache.seatunnel.api.table.type.SeaTunnelRow; import org.apache.seatunnel.api.table.type.SeaTunnelRowType; import org.apache.seatunnel.connectors.seatunnel.kafka.serialize.DefaultSeaTunnelRowSerializer; -import org.apache.seatunnel.e2e.flink.FlinkContainer; import lombok.extern.slf4j.Slf4j; -import org.apache.kafka.clients.producer.KafkaProducer; -import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.clients.producer.ProducerRecord; -import org.apache.kafka.common.serialization.ByteArraySerializer; -import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.Assertions; -import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.testcontainers.containers.Container; -import org.testcontainers.containers.output.Slf4jLogConsumer; -import org.testcontainers.lifecycle.Startables; -import org.testcontainers.shaded.com.google.common.collect.Lists; -import org.testcontainers.shaded.org.awaitility.Awaitility; -import org.testcontainers.utility.DockerImageName; -import org.testcontainers.utility.DockerLoggerFactory; import java.io.IOException; import java.math.BigDecimal; import java.time.LocalDate; import java.time.LocalDateTime; import java.util.Collections; -import java.util.Properties; -import java.util.concurrent.TimeUnit; -import java.util.stream.Stream; /** * This test case is used to verify that the kafka source is able to send data to the console. - * Make sure the SeaTunnel job can submit successfully on spark engine. + * Make sure the SeaTunnel job can submit successfully on flink engine. */ @Slf4j -public class KafkaSourceJsonToConsoleIT extends FlinkContainer { - - private static final int KAFKA_PORT = 9093; - - private static final String KAFKA_HOST = "kafkaCluster"; - - private KafkaProducer producer; - - private KafkaContainer kafkaContainer; - - @BeforeEach - public void startKafkaContainer() { - kafkaContainer = new KafkaContainer(DockerImageName.parse("confluentinc/cp-kafka:6.2.1")) - .withNetwork(NETWORK) - .withNetworkAliases(KAFKA_HOST) - .withLogConsumer(new Slf4jLogConsumer(DockerLoggerFactory.getLogger("confluentinc/cp-kafka:6.2.1"))); - kafkaContainer.setPortBindings(Lists.newArrayList( - String.format("%s:%s", KAFKA_PORT, KAFKA_PORT))); - Startables.deepStart(Stream.of(kafkaContainer)).join(); - log.info("Kafka container started"); - Awaitility.given().ignoreExceptions() - .atLeast(100, TimeUnit.MILLISECONDS) - .pollInterval(500, TimeUnit.MILLISECONDS) - .atMost(180, TimeUnit.SECONDS) - .untilAsserted(() -> initKafkaProducer()); - generateTestData(); - } +public class KafkaSourceJsonToConsoleIT extends KafkaTestBaseIT { @SuppressWarnings("checkstyle:Indentation") - private void generateTestData() { + protected void generateTestData() { SeaTunnelRowType seatunnelRowType = new SeaTunnelRowType( new String[]{ @@ -160,22 +119,4 @@ public void testKafkaSourceJsonToConsole() throws IOException, InterruptedExcept Assertions.assertEquals(0, execResult.getExitCode(), execResult.getStderr()); } - private void initKafkaProducer() { - Properties props = new Properties(); - String bootstrapServers = kafkaContainer.getBootstrapServers(); - props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); - props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class); - props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class); - producer = new KafkaProducer<>(props); - } - - @AfterEach - public void close() { - if (producer != null) { - producer.close(); - } - if (kafkaContainer != null) { - kafkaContainer.close(); - } - } } diff --git a/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-kafka-flink-e2e/src/test/java/org/apache/seatunnel/e2e/flink/v2/kafka/KafkaSourceStartConfigToConsoleIT.java b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-kafka-flink-e2e/src/test/java/org/apache/seatunnel/e2e/flink/v2/kafka/KafkaSourceStartConfigToConsoleIT.java new file mode 100644 index 00000000000..dd511817b03 --- /dev/null +++ b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-kafka-flink-e2e/src/test/java/org/apache/seatunnel/e2e/flink/v2/kafka/KafkaSourceStartConfigToConsoleIT.java @@ -0,0 +1,81 @@ +package org.apache.seatunnel.e2e.flink.v2.kafka; + +import org.apache.seatunnel.api.table.type.BasicType; +import org.apache.seatunnel.api.table.type.SeaTunnelDataType; +import org.apache.seatunnel.api.table.type.SeaTunnelRow; +import org.apache.seatunnel.api.table.type.SeaTunnelRowType; +import org.apache.seatunnel.connectors.seatunnel.kafka.serialize.DefaultSeaTunnelRowSerializer; + +import lombok.extern.slf4j.Slf4j; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; +import org.testcontainers.containers.Container; + +import java.io.IOException; + +@Slf4j +public class KafkaSourceStartConfigToConsoleIT extends KafkaTestBaseIT { + @Override + protected void generateTestData() { + generateStepTestData(0, 100); + } + + private void generateStepTestData(int start, int end) { + + SeaTunnelRowType seatunnelRowType = new SeaTunnelRowType( + new String[]{ + "id" + }, + new SeaTunnelDataType[]{ + BasicType.LONG_TYPE + } + ); + + DefaultSeaTunnelRowSerializer serializer = new DefaultSeaTunnelRowSerializer("test_topic", seatunnelRowType); + for (int i = start; i < end; i++) { + SeaTunnelRow row = new SeaTunnelRow( + new Object[]{ + Long.valueOf(i) + }); + ProducerRecord producerRecord = serializer.serializeRow(row); + producer.send(producerRecord); + } + } + + @Test + public void testKafka() throws IOException, InterruptedException { + testKafkaLatestToConsole(); + testKafkaEarliestToConsole(); + testKafkaSpecificOffsetsToConsole(); + testKafkaGroupOffsetsToConsole(); + testKafkaTimestampToConsole(); + } + + public void testKafkaLatestToConsole() throws IOException, InterruptedException { + Container.ExecResult execResult = executeSeaTunnelFlinkJob("/kafka/kafkasource_latest_to_console.conf"); + Assertions.assertEquals(0, execResult.getExitCode(), execResult.getStderr()); + } + + public void testKafkaEarliestToConsole() throws IOException, InterruptedException { + Container.ExecResult execResult = executeSeaTunnelFlinkJob("/kafka/kafkasource_earliest_to_console.conf"); + Assertions.assertEquals(0, execResult.getExitCode(), execResult.getStderr()); + } + + public void testKafkaSpecificOffsetsToConsole() throws IOException, InterruptedException { + Container.ExecResult execResult = executeSeaTunnelFlinkJob("/kafka/kafkasource_specific_offsets_to_console.conf"); + Assertions.assertEquals(0, execResult.getExitCode(), execResult.getStderr()); + } + + public void testKafkaGroupOffsetsToConsole() throws IOException, InterruptedException { + generateStepTestData(100, 150); + Container.ExecResult execResult = executeSeaTunnelFlinkJob("/kafka/kafkasource_group_offset_to_console.conf"); + Assertions.assertEquals(0, execResult.getExitCode(), execResult.getStderr()); + } + + public void testKafkaTimestampToConsole() throws IOException, InterruptedException { + Container.ExecResult execResult = executeSeaTunnelFlinkJob("/kafka/kafkasource_timestamp_to_console.conf"); + Assertions.assertEquals(0, execResult.getExitCode(), execResult.getStderr()); + } + +} diff --git a/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-kafka-flink-e2e/src/test/java/org/apache/seatunnel/e2e/flink/v2/kafka/KafkaSourceTextToConsoleIT.java b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-kafka-flink-e2e/src/test/java/org/apache/seatunnel/e2e/flink/v2/kafka/KafkaSourceTextToConsoleIT.java index f0855b4551d..bfe37078db2 100644 --- a/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-kafka-flink-e2e/src/test/java/org/apache/seatunnel/e2e/flink/v2/kafka/KafkaSourceTextToConsoleIT.java +++ b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-kafka-flink-e2e/src/test/java/org/apache/seatunnel/e2e/flink/v2/kafka/KafkaSourceTextToConsoleIT.java @@ -26,70 +26,28 @@ import org.apache.seatunnel.api.table.type.SeaTunnelDataType; import org.apache.seatunnel.api.table.type.SeaTunnelRow; import org.apache.seatunnel.api.table.type.SeaTunnelRowType; -import org.apache.seatunnel.e2e.flink.FlinkContainer; import org.apache.seatunnel.format.text.TextSerializationSchema; import lombok.extern.slf4j.Slf4j; -import org.apache.kafka.clients.producer.KafkaProducer; -import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.clients.producer.ProducerRecord; -import org.apache.kafka.common.serialization.ByteArraySerializer; -import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.Assertions; -import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.testcontainers.containers.Container; -import org.testcontainers.containers.output.Slf4jLogConsumer; -import org.testcontainers.lifecycle.Startables; -import org.testcontainers.shaded.com.google.common.collect.Lists; -import org.testcontainers.shaded.org.awaitility.Awaitility; -import org.testcontainers.utility.DockerImageName; -import org.testcontainers.utility.DockerLoggerFactory; import java.io.IOException; import java.math.BigDecimal; import java.time.LocalDate; import java.time.LocalDateTime; import java.util.Collections; -import java.util.Properties; -import java.util.concurrent.TimeUnit; -import java.util.stream.Stream; /** * This test case is used to verify that the kafka source is able to send data to the console. - * Make sure the SeaTunnel job can submit successfully on spark engine. + * Make sure the SeaTunnel job can submit successfully on flink engine. */ @Slf4j -public class KafkaSourceTextToConsoleIT extends FlinkContainer { - - private static final int KAFKA_PORT = 9093; - - private static final String KAFKA_HOST = "kafkaCluster"; - - private KafkaProducer producer; - - private KafkaContainer kafkaContainer; - - @BeforeEach - public void startKafkaContainer() { - kafkaContainer = new KafkaContainer(DockerImageName.parse("confluentinc/cp-kafka:6.2.1")) - .withNetwork(NETWORK) - .withNetworkAliases(KAFKA_HOST) - .withLogConsumer(new Slf4jLogConsumer(DockerLoggerFactory.getLogger("confluentinc/cp-kafka:6.2.1"))); - kafkaContainer.setPortBindings(Lists.newArrayList( - String.format("%s:%s", KAFKA_PORT, KAFKA_PORT))); - Startables.deepStart(Stream.of(kafkaContainer)).join(); - log.info("Kafka container started"); - Awaitility.given().ignoreExceptions() - .atLeast(100, TimeUnit.MILLISECONDS) - .pollInterval(500, TimeUnit.MILLISECONDS) - .atMost(180, TimeUnit.SECONDS) - .untilAsserted(() -> initKafkaProducer()); - generateTestData(); - } - +public class KafkaSourceTextToConsoleIT extends KafkaTestBaseIT { @SuppressWarnings("checkstyle:Indentation") - private void generateTestData() { + protected void generateTestData() { SeaTunnelRowType seatunnelRowType = new SeaTunnelRowType( new String[]{ @@ -163,22 +121,4 @@ public void testKafkaSourceTextToConsole() throws IOException, InterruptedExcept Assertions.assertEquals(0, execResult.getExitCode(), execResult.getStderr()); } - private void initKafkaProducer() { - Properties props = new Properties(); - String bootstrapServers = kafkaContainer.getBootstrapServers(); - props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); - props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class); - props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class); - producer = new KafkaProducer<>(props); - } - - @AfterEach - public void close() { - if (producer != null) { - producer.close(); - } - if (kafkaContainer != null) { - kafkaContainer.close(); - } - } } diff --git a/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-kafka-flink-e2e/src/test/java/org/apache/seatunnel/e2e/flink/v2/kafka/KafkaTestBaseIT.java b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-kafka-flink-e2e/src/test/java/org/apache/seatunnel/e2e/flink/v2/kafka/KafkaTestBaseIT.java new file mode 100644 index 00000000000..9941249e3da --- /dev/null +++ b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-kafka-flink-e2e/src/test/java/org/apache/seatunnel/e2e/flink/v2/kafka/KafkaTestBaseIT.java @@ -0,0 +1,73 @@ +package org.apache.seatunnel.e2e.flink.v2.kafka; + +import org.apache.seatunnel.e2e.flink.FlinkContainer; + +import lombok.extern.slf4j.Slf4j; +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.kafka.common.serialization.ByteArraySerializer; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.testcontainers.containers.output.Slf4jLogConsumer; +import org.testcontainers.lifecycle.Startables; +import org.testcontainers.shaded.com.google.common.collect.Lists; +import org.testcontainers.shaded.org.awaitility.Awaitility; +import org.testcontainers.utility.DockerImageName; +import org.testcontainers.utility.DockerLoggerFactory; + +import java.util.Properties; +import java.util.concurrent.TimeUnit; +import java.util.stream.Stream; + +@Slf4j +public class KafkaTestBaseIT extends FlinkContainer { + protected static final int KAFKA_PORT = 9093; + + protected static final String KAFKA_HOST = "kafkaCluster"; + + protected KafkaProducer producer; + + protected KafkaContainer kafkaContainer; + + @BeforeEach + public void startKafkaContainer() { + kafkaContainer = new KafkaContainer(DockerImageName.parse("confluentinc/cp-kafka:6.2.1")) + .withNetwork(NETWORK) + .withNetworkAliases(KAFKA_HOST) + .withLogConsumer(new Slf4jLogConsumer(DockerLoggerFactory.getLogger("confluentinc/cp-kafka:6.2.1"))); + kafkaContainer.setPortBindings(Lists.newArrayList( + String.format("%s:%s", KAFKA_PORT, KAFKA_PORT))); + Startables.deepStart(Stream.of(kafkaContainer)).join(); + log.info("Kafka container started"); + Awaitility.given().ignoreExceptions() + .atLeast(100, TimeUnit.MILLISECONDS) + .pollInterval(500, TimeUnit.MILLISECONDS) + .atMost(180, TimeUnit.SECONDS) + .untilAsserted(() -> initKafkaProducer()); + generateTestData(); + } + + protected void initKafkaProducer() { + Properties props = new Properties(); + String bootstrapServers = kafkaContainer.getBootstrapServers(); + props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); + props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class); + props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class); + producer = new KafkaProducer<>(props); + } + + @SuppressWarnings("checkstyle:Indentation") + protected void generateTestData() { + + } + + @AfterEach + public void close() { + if (producer != null) { + producer.close(); + } + if (kafkaContainer != null) { + kafkaContainer.close(); + } + } +} diff --git a/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-kafka-flink-e2e/src/test/resources/kafka/kafkasource_earliest_to_console.conf b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-kafka-flink-e2e/src/test/resources/kafka/kafkasource_earliest_to_console.conf new file mode 100644 index 00000000000..97f9ddb5db1 --- /dev/null +++ b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-kafka-flink-e2e/src/test/resources/kafka/kafkasource_earliest_to_console.conf @@ -0,0 +1,75 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +###### +###### This config file is a demonstration of streaming processing in seatunnel config +###### + +env { + # You can set flink configuration here + execution.parallelism = 1 + #execution.checkpoint.interval = 10000 + #execution.checkpoint.data-uri = "hdfs://localhost:9000/checkpoint" +} + +source { + Kafka { + bootstrap.servers = "kafkaCluster:9093" + topic = "test_topic" + result_table_name = "kafka_table" + # The default format is json, which is optional + format = json + start_mode = earliest + schema = { + fields { + id = bigint + } + } + } + + # If you would like to get more information about how to configure seatunnel and see full list of source plugins, + # please go to https://seatunnel.apache.org/docs/connector-v2/source/KafkaSource +} + +transform { + } + +sink { + Console {} + + Assert { + rules = + { + field_rules = [ + { + field_name = id + field_type = long + field_value = [ + + { + rule_type = MIN + rule_value = 0 + }, + { + rule_type = MAX + rule_value = 99 + } + ] + } + ] + } + } + } \ No newline at end of file diff --git a/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-kafka-flink-e2e/src/test/resources/kafka/kafkasource_group_offset_to_console.conf b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-kafka-flink-e2e/src/test/resources/kafka/kafkasource_group_offset_to_console.conf new file mode 100644 index 00000000000..2a7f9828c2a --- /dev/null +++ b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-kafka-flink-e2e/src/test/resources/kafka/kafkasource_group_offset_to_console.conf @@ -0,0 +1,75 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +###### +###### This config file is a demonstration of streaming processing in seatunnel config +###### + +env { + # You can set flink configuration here + execution.parallelism = 1 + #execution.checkpoint.interval = 10000 + #execution.checkpoint.data-uri = "hdfs://localhost:9000/checkpoint" +} + +source { + Kafka { + bootstrap.servers = "kafkaCluster:9093" + topic = "test_topic" + result_table_name = "kafka_table" + # The default format is json, which is optional + format = json + start_mode = group_offsets + schema = { + fields { + id = bigint + } + } + } + + # If you would like to get more information about how to configure seatunnel and see full list of source plugins, + # please go to https://seatunnel.apache.org/docs/connector-v2/source/KafkaSource +} + +transform { + } + +sink { + Console {} + + Assert { + rules = + { + field_rules = [ + { + field_name = id + field_type = long + field_value = [ + + { + rule_type = MIN + rule_value = 100 + }, + { + rule_type = MAX + rule_value = 149 + } + ] + } + ] + } + } + } \ No newline at end of file diff --git a/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-kafka-flink-e2e/src/test/resources/kafka/kafkasource_latest_to_console.conf b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-kafka-flink-e2e/src/test/resources/kafka/kafkasource_latest_to_console.conf new file mode 100644 index 00000000000..736104ea079 --- /dev/null +++ b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-kafka-flink-e2e/src/test/resources/kafka/kafkasource_latest_to_console.conf @@ -0,0 +1,75 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +###### +###### This config file is a demonstration of streaming processing in seatunnel config +###### + +env { + # You can set flink configuration here + execution.parallelism = 1 + #execution.checkpoint.interval = 10000 + #execution.checkpoint.data-uri = "hdfs://localhost:9000/checkpoint" +} + +source { + Kafka { + bootstrap.servers = "kafkaCluster:9093" + topic = "test_topic" + result_table_name = "kafka_table" + # The default format is json, which is optional + format = json + start_mode = latest + schema = { + fields { + id = bigint + } + } + } + + # If you would like to get more information about how to configure seatunnel and see full list of source plugins, + # please go to https://seatunnel.apache.org/docs/connector-v2/source/KafkaSource +} + +transform { + } + +sink { + Console {} + + Assert { + rules = + { + field_rules = [ + { + field_name = id + field_type = long + field_value = [ + + { + rule_type = MIN + rule_value = 99 + }, + { + rule_type = MAX + rule_value = 99 + } + ] + } + ] + } + } + } \ No newline at end of file diff --git a/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-kafka-flink-e2e/src/test/resources/kafka/kafkasource_specific_offsets_to_console.conf b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-kafka-flink-e2e/src/test/resources/kafka/kafkasource_specific_offsets_to_console.conf new file mode 100644 index 00000000000..b7575632705 --- /dev/null +++ b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-kafka-flink-e2e/src/test/resources/kafka/kafkasource_specific_offsets_to_console.conf @@ -0,0 +1,79 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +###### +###### This config file is a demonstration of streaming processing in seatunnel config +###### + +env { + # You can set flink configuration here + execution.parallelism = 1 + #execution.checkpoint.interval = 10000 + #execution.checkpoint.data-uri = "hdfs://localhost:9000/checkpoint" +} + +source { + Kafka { + bootstrap.servers = "kafkaCluster:9093" + topic = "test_topic" + result_table_name = "kafka_table" + # The default format is json, which is optional + format = json + start_mode = specific_offsets + schema = { + fields { + id = bigint + } + } + + start_mode.offsets = { + test_topic-0 = 50 + } + } + + # If you would like to get more information about how to configure seatunnel and see full list of source plugins, + # please go to https://seatunnel.apache.org/docs/connector-v2/source/KafkaSource +} + +transform { + } + +sink { + Console {} + + Assert { + rules = + { + field_rules = [ + { + field_name = id + field_type = long + field_value = [ + + { + rule_type = MIN + rule_value = 50 + }, + { + rule_type = MAX + rule_value = 99 + } + ] + } + ] + } + } + } \ No newline at end of file diff --git a/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-kafka-flink-e2e/src/test/resources/kafka/kafkasource_timestamp_to_console.conf b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-kafka-flink-e2e/src/test/resources/kafka/kafkasource_timestamp_to_console.conf new file mode 100644 index 00000000000..b491c654618 --- /dev/null +++ b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-kafka-flink-e2e/src/test/resources/kafka/kafkasource_timestamp_to_console.conf @@ -0,0 +1,76 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +###### +###### This config file is a demonstration of streaming processing in seatunnel config +###### + +env { + # You can set flink configuration here + execution.parallelism = 1 + #execution.checkpoint.interval = 10000 + #execution.checkpoint.data-uri = "hdfs://localhost:9000/checkpoint" +} + +source { + Kafka { + bootstrap.servers = "kafkaCluster:9093" + topic = "test_topic" + result_table_name = "kafka_table" + # The default format is json, which is optional + format = json + start_mode = timestamp + schema = { + fields { + id = bigint + } + } + start_mode.timestamp = 1667179890315 + } + + # If you would like to get more information about how to configure seatunnel and see full list of source plugins, + # please go to https://seatunnel.apache.org/docs/connector-v2/source/KafkaSource +} + +transform { + } + +sink { + Console {} + + Assert { + rules = + { + field_rules = [ + { + field_name = id + field_type = long + field_value = [ + + { + rule_type = MIN + rule_value = 0 + }, + { + rule_type = MAX + rule_value = 149 + } + ] + } + ] + } + } + } \ No newline at end of file diff --git a/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-kafka-spark-e2e/src/test/java/org/apache/seatunnel/e2e/spark/v2/kafka/KafkaSourceJsonToConsoleIT.java b/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-kafka-spark-e2e/src/test/java/org/apache/seatunnel/e2e/spark/v2/kafka/KafkaSourceJsonToConsoleIT.java index 8b9a425a9e2..de6abf8290f 100644 --- a/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-kafka-spark-e2e/src/test/java/org/apache/seatunnel/e2e/spark/v2/kafka/KafkaSourceJsonToConsoleIT.java +++ b/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-kafka-spark-e2e/src/test/java/org/apache/seatunnel/e2e/spark/v2/kafka/KafkaSourceJsonToConsoleIT.java @@ -27,68 +27,28 @@ import org.apache.seatunnel.api.table.type.SeaTunnelRow; import org.apache.seatunnel.api.table.type.SeaTunnelRowType; import org.apache.seatunnel.connectors.seatunnel.kafka.serialize.DefaultSeaTunnelRowSerializer; -import org.apache.seatunnel.e2e.spark.SparkContainer; import lombok.extern.slf4j.Slf4j; -import org.apache.kafka.clients.producer.KafkaProducer; -import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.clients.producer.ProducerRecord; -import org.apache.kafka.common.serialization.ByteArraySerializer; -import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.Assertions; -import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.testcontainers.containers.Container; -import org.testcontainers.containers.output.Slf4jLogConsumer; -import org.testcontainers.lifecycle.Startables; -import org.testcontainers.shaded.com.google.common.collect.Lists; -import org.testcontainers.shaded.org.awaitility.Awaitility; -import org.testcontainers.utility.DockerImageName; import java.io.IOException; import java.math.BigDecimal; import java.time.LocalDate; import java.time.LocalDateTime; import java.util.Collections; -import java.util.Properties; -import java.util.concurrent.TimeUnit; -import java.util.stream.Stream; /** * This test case is used to verify that the kafka source is able to send data to the console. * Make sure the SeaTunnel job can submit successfully on spark engine. */ @SuppressWarnings("checkstyle:EmptyLineSeparator") @Slf4j -public class KafkaSourceJsonToConsoleIT extends SparkContainer { - - private static final int KAFKA_PORT = 9093; - - private static final String KAFKA_HOST = "kafkaCluster"; - - private KafkaProducer producer; - - private KafkaContainer kafkaContainer; - - @BeforeEach - public void startKafkaContainer() { - kafkaContainer = new KafkaContainer(DockerImageName.parse("confluentinc/cp-kafka:6.2.1")) - .withNetwork(NETWORK) - .withNetworkAliases(KAFKA_HOST) - .withLogConsumer(new Slf4jLogConsumer(log)); - kafkaContainer.setPortBindings(Lists.newArrayList( - String.format("%s:%s", KAFKA_PORT, KAFKA_PORT))); - Startables.deepStart(Stream.of(kafkaContainer)).join(); - log.info("Kafka container started"); - Awaitility.given().ignoreExceptions() - .atLeast(100, TimeUnit.MILLISECONDS) - .pollInterval(500, TimeUnit.MILLISECONDS) - .atMost(180, TimeUnit.SECONDS) - .untilAsserted(() -> initKafkaProducer()); - generateTestData(); - } +public class KafkaSourceJsonToConsoleIT extends KafkaTestBaseIT { @SuppressWarnings("checkstyle:Indentation") - private void generateTestData() { + protected void generateTestData() { SeaTunnelRowType seatunnelRowType = new SeaTunnelRowType( new String[]{ @@ -159,22 +119,4 @@ public void testKafkaSource() throws IOException, InterruptedException { Assertions.assertEquals(0, execResult.getExitCode(), execResult.getStderr()); } - private void initKafkaProducer() { - Properties props = new Properties(); - String bootstrapServers = kafkaContainer.getBootstrapServers(); - props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); - props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class); - props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class); - producer = new KafkaProducer<>(props); - } - - @AfterEach - public void close() { - if (producer != null) { - producer.close(); - } - if (kafkaContainer != null) { - kafkaContainer.close(); - } - } } diff --git a/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-kafka-spark-e2e/src/test/java/org/apache/seatunnel/e2e/spark/v2/kafka/KafkaSourceStartConfigToConsoleIT.java b/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-kafka-spark-e2e/src/test/java/org/apache/seatunnel/e2e/spark/v2/kafka/KafkaSourceStartConfigToConsoleIT.java new file mode 100644 index 00000000000..3ff1faff3ba --- /dev/null +++ b/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-kafka-spark-e2e/src/test/java/org/apache/seatunnel/e2e/spark/v2/kafka/KafkaSourceStartConfigToConsoleIT.java @@ -0,0 +1,98 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.e2e.spark.v2.kafka; + +import org.apache.seatunnel.api.table.type.BasicType; +import org.apache.seatunnel.api.table.type.SeaTunnelDataType; +import org.apache.seatunnel.api.table.type.SeaTunnelRow; +import org.apache.seatunnel.api.table.type.SeaTunnelRowType; +import org.apache.seatunnel.connectors.seatunnel.kafka.serialize.DefaultSeaTunnelRowSerializer; + +import lombok.extern.slf4j.Slf4j; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; +import org.testcontainers.containers.Container; + +import java.io.IOException; + +@Slf4j +public class KafkaSourceStartConfigToConsoleIT extends KafkaTestBaseIT { + @Override + protected void generateTestData() { + generateStepTestData(0, 100); + } + + private void generateStepTestData(int start, int end) { + + SeaTunnelRowType seatunnelRowType = new SeaTunnelRowType( + new String[]{ + "id" + }, + new SeaTunnelDataType[]{ + BasicType.LONG_TYPE + } + ); + + DefaultSeaTunnelRowSerializer serializer = new DefaultSeaTunnelRowSerializer("test_topic", seatunnelRowType); + for (int i = start; i < end; i++) { + SeaTunnelRow row = new SeaTunnelRow( + new Object[]{ + Long.valueOf(i) + }); + ProducerRecord producerRecord = serializer.serializeRow(row); + producer.send(producerRecord); + } + } + + @Test + public void testKafka() throws IOException, InterruptedException { + testKafkaLatestToConsole(); + testKafkaEarliestToConsole(); + testKafkaSpecificOffsetsToConsole(); + testKafkaGroupOffsetsToConsole(); + testKafkaTimestampToConsole(); + } + + public void testKafkaLatestToConsole() throws IOException, InterruptedException { + Container.ExecResult execResult = executeSeaTunnelSparkJob("/kafka/kafkasource_latest_to_console.conf"); + Assertions.assertEquals(0, execResult.getExitCode(), execResult.getStderr()); + } + + public void testKafkaEarliestToConsole() throws IOException, InterruptedException { + Container.ExecResult execResult = executeSeaTunnelSparkJob("/kafka/kafkasource_earliest_to_console.conf"); + Assertions.assertEquals(0, execResult.getExitCode(), execResult.getStderr()); + } + + public void testKafkaSpecificOffsetsToConsole() throws IOException, InterruptedException { + Container.ExecResult execResult = executeSeaTunnelSparkJob("/kafka/kafkasource_specific_offsets_to_console.conf"); + Assertions.assertEquals(0, execResult.getExitCode(), execResult.getStderr()); + } + + public void testKafkaGroupOffsetsToConsole() throws IOException, InterruptedException { + generateStepTestData(100, 150); + Container.ExecResult execResult = executeSeaTunnelSparkJob("/kafka/kafkasource_group_offset_to_console.conf"); + Assertions.assertEquals(0, execResult.getExitCode(), execResult.getStderr()); + } + + public void testKafkaTimestampToConsole() throws IOException, InterruptedException { + Container.ExecResult execResult = executeSeaTunnelSparkJob("/kafka/kafkasource_timestamp_to_console.conf"); + Assertions.assertEquals(0, execResult.getExitCode(), execResult.getStderr()); + } + +} diff --git a/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-kafka-spark-e2e/src/test/java/org/apache/seatunnel/e2e/spark/v2/kafka/KafkaSourceTextToConsoleIT.java b/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-kafka-spark-e2e/src/test/java/org/apache/seatunnel/e2e/spark/v2/kafka/KafkaSourceTextToConsoleIT.java index c2842557cb3..f4575f4e8ff 100644 --- a/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-kafka-spark-e2e/src/test/java/org/apache/seatunnel/e2e/spark/v2/kafka/KafkaSourceTextToConsoleIT.java +++ b/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-kafka-spark-e2e/src/test/java/org/apache/seatunnel/e2e/spark/v2/kafka/KafkaSourceTextToConsoleIT.java @@ -26,70 +26,29 @@ import org.apache.seatunnel.api.table.type.SeaTunnelDataType; import org.apache.seatunnel.api.table.type.SeaTunnelRow; import org.apache.seatunnel.api.table.type.SeaTunnelRowType; -import org.apache.seatunnel.e2e.spark.SparkContainer; import org.apache.seatunnel.format.text.TextSerializationSchema; import lombok.extern.slf4j.Slf4j; -import org.apache.kafka.clients.producer.KafkaProducer; -import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.clients.producer.ProducerRecord; -import org.apache.kafka.common.serialization.ByteArraySerializer; -import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.Assertions; -import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.testcontainers.containers.Container; -import org.testcontainers.containers.output.Slf4jLogConsumer; -import org.testcontainers.lifecycle.Startables; -import org.testcontainers.shaded.com.google.common.collect.Lists; -import org.testcontainers.shaded.org.awaitility.Awaitility; -import org.testcontainers.utility.DockerImageName; -import org.testcontainers.utility.DockerLoggerFactory; import java.io.IOException; import java.math.BigDecimal; import java.time.LocalDate; import java.time.LocalDateTime; import java.util.Collections; -import java.util.Properties; -import java.util.concurrent.TimeUnit; -import java.util.stream.Stream; /** * This test case is used to verify that the kafka source is able to send data to the console. * Make sure the SeaTunnel job can submit successfully on spark engine. */ @Slf4j -public class KafkaSourceTextToConsoleIT extends SparkContainer { - - private static final int KAFKA_PORT = 9093; - - private static final String KAFKA_HOST = "kafkaCluster"; - - private KafkaProducer producer; - - private KafkaContainer kafkaContainer; - - @BeforeEach - public void startKafkaContainer() { - kafkaContainer = new KafkaContainer(DockerImageName.parse("confluentinc/cp-kafka:6.2.1")) - .withNetwork(NETWORK) - .withNetworkAliases(KAFKA_HOST) - .withLogConsumer(new Slf4jLogConsumer(DockerLoggerFactory.getLogger("confluentinc/cp-kafka:6.2.1"))); - kafkaContainer.setPortBindings(Lists.newArrayList( - String.format("%s:%s", KAFKA_PORT, KAFKA_PORT))); - Startables.deepStart(Stream.of(kafkaContainer)).join(); - log.info("Kafka container started"); - Awaitility.given().ignoreExceptions() - .atLeast(100, TimeUnit.MILLISECONDS) - .pollInterval(500, TimeUnit.MILLISECONDS) - .atMost(180, TimeUnit.SECONDS) - .untilAsserted(() -> initKafkaProducer()); - generateTestData(); - } +public class KafkaSourceTextToConsoleIT extends KafkaTestBaseIT { @SuppressWarnings("checkstyle:Indentation") - private void generateTestData() { + protected void generateTestData() { SeaTunnelRowType seatunnelRowType = new SeaTunnelRowType( new String[]{ @@ -163,22 +122,4 @@ public void testKafkaSourceTextToConsole() throws IOException, InterruptedExcept Assertions.assertEquals(0, execResult.getExitCode(), execResult.getStderr()); } - private void initKafkaProducer() { - Properties props = new Properties(); - String bootstrapServers = kafkaContainer.getBootstrapServers(); - props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); - props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class); - props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class); - producer = new KafkaProducer<>(props); - } - - @AfterEach - public void close() { - if (producer != null) { - producer.close(); - } - if (kafkaContainer != null) { - kafkaContainer.close(); - } - } } diff --git a/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-kafka-spark-e2e/src/test/java/org/apache/seatunnel/e2e/spark/v2/kafka/KafkaTestBaseIT.java b/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-kafka-spark-e2e/src/test/java/org/apache/seatunnel/e2e/spark/v2/kafka/KafkaTestBaseIT.java new file mode 100644 index 00000000000..f8447179422 --- /dev/null +++ b/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-kafka-spark-e2e/src/test/java/org/apache/seatunnel/e2e/spark/v2/kafka/KafkaTestBaseIT.java @@ -0,0 +1,90 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.e2e.spark.v2.kafka; + +import org.apache.seatunnel.e2e.spark.SparkContainer; + +import lombok.extern.slf4j.Slf4j; +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.kafka.common.serialization.ByteArraySerializer; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.testcontainers.containers.output.Slf4jLogConsumer; +import org.testcontainers.lifecycle.Startables; +import org.testcontainers.shaded.com.google.common.collect.Lists; +import org.testcontainers.shaded.org.awaitility.Awaitility; +import org.testcontainers.utility.DockerImageName; +import org.testcontainers.utility.DockerLoggerFactory; + +import java.util.Properties; +import java.util.concurrent.TimeUnit; +import java.util.stream.Stream; + +@Slf4j +public class KafkaTestBaseIT extends SparkContainer { + protected static final int KAFKA_PORT = 9093; + + protected static final String KAFKA_HOST = "kafkaCluster"; + + protected KafkaProducer producer; + + protected KafkaContainer kafkaContainer; + + @BeforeEach + public void startKafkaContainer() { + kafkaContainer = new KafkaContainer(DockerImageName.parse("confluentinc/cp-kafka:6.2.1")) + .withNetwork(NETWORK) + .withNetworkAliases(KAFKA_HOST) + .withLogConsumer(new Slf4jLogConsumer(DockerLoggerFactory.getLogger("confluentinc/cp-kafka:6.2.1"))); + kafkaContainer.setPortBindings(Lists.newArrayList( + String.format("%s:%s", KAFKA_PORT, KAFKA_PORT))); + Startables.deepStart(Stream.of(kafkaContainer)).join(); + log.info("Kafka container started"); + Awaitility.given().ignoreExceptions() + .atLeast(100, TimeUnit.MILLISECONDS) + .pollInterval(500, TimeUnit.MILLISECONDS) + .atMost(180, TimeUnit.SECONDS) + .untilAsserted(() -> initKafkaProducer()); + generateTestData(); + } + + protected void initKafkaProducer() { + Properties props = new Properties(); + String bootstrapServers = kafkaContainer.getBootstrapServers(); + props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); + props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class); + props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class); + producer = new KafkaProducer<>(props); + } + + @SuppressWarnings("checkstyle:Indentation") + protected void generateTestData() { + + } + + @AfterEach + public void close() { + if (producer != null) { + producer.close(); + } + if (kafkaContainer != null) { + kafkaContainer.close(); + } + } +} diff --git a/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-kafka-spark-e2e/src/test/resources/kafka/kafkasource_earliest_to_console.conf b/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-kafka-spark-e2e/src/test/resources/kafka/kafkasource_earliest_to_console.conf new file mode 100644 index 00000000000..aabb06a3061 --- /dev/null +++ b/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-kafka-spark-e2e/src/test/resources/kafka/kafkasource_earliest_to_console.conf @@ -0,0 +1,75 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +###### +###### This config file is a demonstration of streaming processing in seatunnel config +###### + +env { + # You can set spark configuration here + job.name = "SeaTunnel" + source.parallelism = 1 + job.mode = "BATCH" +} + +source { + Kafka { + bootstrap.servers = "kafkaCluster:9093" + topic = "test_topic" + result_table_name = "kafka_table" + # The default format is json, which is optional + format = json + start_mode = earliest + schema = { + fields { + id = bigint + } + } + } + + # If you would like to get more information about how to configure seatunnel and see full list of source plugins, + # please go to https://seatunnel.apache.org/docs/connector-v2/source/KafkaSource +} + +transform { + } + +sink { + Console {} + + Assert { + rules = + { + field_rules = [ + { + field_name = id + field_type = long + field_value = [ + + { + rule_type = MIN + rule_value = 0 + }, + { + rule_type = MAX + rule_value = 99 + } + ] + } + ] + } + } + } \ No newline at end of file diff --git a/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-kafka-spark-e2e/src/test/resources/kafka/kafkasource_group_offset_to_console.conf b/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-kafka-spark-e2e/src/test/resources/kafka/kafkasource_group_offset_to_console.conf new file mode 100644 index 00000000000..e3edd31e931 --- /dev/null +++ b/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-kafka-spark-e2e/src/test/resources/kafka/kafkasource_group_offset_to_console.conf @@ -0,0 +1,75 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +###### +###### This config file is a demonstration of streaming processing in seatunnel config +###### + +env { + # You can set spark configuration here + job.name = "SeaTunnel" + source.parallelism = 1 + job.mode = "BATCH" +} + +source { + Kafka { + bootstrap.servers = "kafkaCluster:9093" + topic = "test_topic" + result_table_name = "kafka_table" + # The default format is json, which is optional + format = json + start_mode = group_offsets + schema = { + fields { + id = bigint + } + } + } + + # If you would like to get more information about how to configure seatunnel and see full list of source plugins, + # please go to https://seatunnel.apache.org/docs/connector-v2/source/KafkaSource +} + +transform { + } + +sink { + Console {} + + Assert { + rules = + { + field_rules = [ + { + field_name = id + field_type = long + field_value = [ + + { + rule_type = MIN + rule_value = 100 + }, + { + rule_type = MAX + rule_value = 149 + } + ] + } + ] + } + } + } \ No newline at end of file diff --git a/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-kafka-spark-e2e/src/test/resources/kafka/kafkasource_json_to_console.conf b/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-kafka-spark-e2e/src/test/resources/kafka/kafkasource_json_to_console.conf index 91c92023722..0a1f5df1401 100644 --- a/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-kafka-spark-e2e/src/test/resources/kafka/kafkasource_json_to_console.conf +++ b/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-kafka-spark-e2e/src/test/resources/kafka/kafkasource_json_to_console.conf @@ -20,7 +20,7 @@ env { # You can set spark configuration here - spark.app.name = "SeaTunnel" + job.name = "SeaTunnel" source.parallelism = 1 job.mode = "BATCH" } diff --git a/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-kafka-spark-e2e/src/test/resources/kafka/kafkasource_latest_to_console.conf b/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-kafka-spark-e2e/src/test/resources/kafka/kafkasource_latest_to_console.conf new file mode 100644 index 00000000000..c974fc3a352 --- /dev/null +++ b/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-kafka-spark-e2e/src/test/resources/kafka/kafkasource_latest_to_console.conf @@ -0,0 +1,75 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +###### +###### This config file is a demonstration of streaming processing in seatunnel config +###### + +env { + # You can set spark configuration here + job.name = "SeaTunnel" + source.parallelism = 1 + job.mode = "BATCH" +} + +source { + Kafka { + bootstrap.servers = "kafkaCluster:9093" + topic = "test_topic" + result_table_name = "kafka_table" + # The default format is json, which is optional + format = json + start_mode = latest + schema = { + fields { + id = bigint + } + } + } + + # If you would like to get more information about how to configure seatunnel and see full list of source plugins, + # please go to https://seatunnel.apache.org/docs/connector-v2/source/KafkaSource +} + +transform { + } + +sink { + Console {} + + Assert { + rules = + { + field_rules = [ + { + field_name = id + field_type = long + field_value = [ + + { + rule_type = MIN + rule_value = 99 + }, + { + rule_type = MAX + rule_value = 99 + } + ] + } + ] + } + } + } \ No newline at end of file diff --git a/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-kafka-spark-e2e/src/test/resources/kafka/kafkasource_specific_offsets_to_console.conf b/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-kafka-spark-e2e/src/test/resources/kafka/kafkasource_specific_offsets_to_console.conf new file mode 100644 index 00000000000..9db55defb52 --- /dev/null +++ b/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-kafka-spark-e2e/src/test/resources/kafka/kafkasource_specific_offsets_to_console.conf @@ -0,0 +1,79 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +###### +###### This config file is a demonstration of streaming processing in seatunnel config +###### + +env { + # You can set spark configuration here + job.name = "SeaTunnel" + source.parallelism = 1 + job.mode = "BATCH" +} + +source { + Kafka { + bootstrap.servers = "kafkaCluster:9093" + topic = "test_topic" + result_table_name = "kafka_table" + # The default format is json, which is optional + format = json + start_mode = specific_offsets + schema = { + fields { + id = bigint + } + } + + start_mode.offsets = { + test_topic-0 = 50 + } + } + + # If you would like to get more information about how to configure seatunnel and see full list of source plugins, + # please go to https://seatunnel.apache.org/docs/connector-v2/source/KafkaSource +} + +transform { + } + +sink { + Console {} + + Assert { + rules = + { + field_rules = [ + { + field_name = id + field_type = long + field_value = [ + + { + rule_type = MIN + rule_value = 50 + }, + { + rule_type = MAX + rule_value = 99 + } + ] + } + ] + } + } + } \ No newline at end of file diff --git a/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-kafka-spark-e2e/src/test/resources/kafka/kafkasource_text_to_console.conf b/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-kafka-spark-e2e/src/test/resources/kafka/kafkasource_text_to_console.conf index 369f34a2f22..e132a62fbfe 100644 --- a/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-kafka-spark-e2e/src/test/resources/kafka/kafkasource_text_to_console.conf +++ b/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-kafka-spark-e2e/src/test/resources/kafka/kafkasource_text_to_console.conf @@ -20,7 +20,7 @@ env { # You can set spark configuration here - spark.app.name = "SeaTunnel" + job.name = "SeaTunnel" source.parallelism = 1 job.mode = "BATCH" } diff --git a/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-kafka-spark-e2e/src/test/resources/kafka/kafkasource_timestamp_to_console.conf b/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-kafka-spark-e2e/src/test/resources/kafka/kafkasource_timestamp_to_console.conf new file mode 100644 index 00000000000..c91a8529bf3 --- /dev/null +++ b/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-kafka-spark-e2e/src/test/resources/kafka/kafkasource_timestamp_to_console.conf @@ -0,0 +1,76 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +###### +###### This config file is a demonstration of streaming processing in seatunnel config +###### + +env { + # You can set spark configuration here + job.app.name = "SeaTunnel" + source.parallelism = 1 + job.mode = "BATCH" +} + +source { + Kafka { + bootstrap.servers = "kafkaCluster:9093" + topic = "test_topic" + result_table_name = "kafka_table" + # The default format is json, which is optional + format = json + start_mode = timestamp + schema = { + fields { + id = bigint + } + } + start_mode.timestamp = 1667179890315 + } + + # If you would like to get more information about how to configure seatunnel and see full list of source plugins, + # please go to https://seatunnel.apache.org/docs/connector-v2/source/KafkaSource +} + +transform { + } + +sink { + Console {} + + Assert { + rules = + { + field_rules = [ + { + field_name = id + field_type = long + field_value = [ + + { + rule_type = MIN + rule_value = 0 + }, + { + rule_type = MAX + rule_value = 149 + } + ] + } + ] + } + } + } \ No newline at end of file From 73b06d1f752f96218209ffc39d4c19283c734c7b Mon Sep 17 00:00:00 2001 From: zhouyao Date: Tue, 1 Nov 2022 09:00:14 +0800 Subject: [PATCH 5/6] [Improve][Connector-V2-kafka] Support setting read starting offset or time at startup config --- .../KafkaSourceStartConfigToConsoleIT.java | 17 +++++++++++++++++ .../e2e/flink/v2/kafka/KafkaTestBaseIT.java | 17 +++++++++++++++++ 2 files changed, 34 insertions(+) diff --git a/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-kafka-flink-e2e/src/test/java/org/apache/seatunnel/e2e/flink/v2/kafka/KafkaSourceStartConfigToConsoleIT.java b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-kafka-flink-e2e/src/test/java/org/apache/seatunnel/e2e/flink/v2/kafka/KafkaSourceStartConfigToConsoleIT.java index dd511817b03..1d7225ae88e 100644 --- a/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-kafka-flink-e2e/src/test/java/org/apache/seatunnel/e2e/flink/v2/kafka/KafkaSourceStartConfigToConsoleIT.java +++ b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-kafka-flink-e2e/src/test/java/org/apache/seatunnel/e2e/flink/v2/kafka/KafkaSourceStartConfigToConsoleIT.java @@ -1,3 +1,20 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package org.apache.seatunnel.e2e.flink.v2.kafka; import org.apache.seatunnel.api.table.type.BasicType; diff --git a/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-kafka-flink-e2e/src/test/java/org/apache/seatunnel/e2e/flink/v2/kafka/KafkaTestBaseIT.java b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-kafka-flink-e2e/src/test/java/org/apache/seatunnel/e2e/flink/v2/kafka/KafkaTestBaseIT.java index 9941249e3da..0c339cccc00 100644 --- a/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-kafka-flink-e2e/src/test/java/org/apache/seatunnel/e2e/flink/v2/kafka/KafkaTestBaseIT.java +++ b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-kafka-flink-e2e/src/test/java/org/apache/seatunnel/e2e/flink/v2/kafka/KafkaTestBaseIT.java @@ -1,3 +1,20 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package org.apache.seatunnel.e2e.flink.v2.kafka; import org.apache.seatunnel.e2e.flink.FlinkContainer; From 1595f4cb7716fb56c04683221f3b682364b61d7f Mon Sep 17 00:00:00 2001 From: zhouyao Date: Tue, 1 Nov 2022 09:43:25 +0800 Subject: [PATCH 6/6] [Improve][Connector-V2-kafka] Support setting read starting offset or time at startup config --- .../org/apache/seatunnel/e2e/spark/v2/kafka/KafkaContainer.java | 2 +- .../apache/seatunnel/e2e/spark/v2/kafka/KafkaTestBaseIT.java | 2 +- .../test/resources/kafka/kafkasource_earliest_to_console.conf | 2 +- .../resources/kafka/kafkasource_group_offset_to_console.conf | 2 +- .../src/test/resources/kafka/kafkasource_json_to_console.conf | 2 +- .../src/test/resources/kafka/kafkasource_latest_to_console.conf | 2 +- .../kafka/kafkasource_specific_offsets_to_console.conf | 2 +- .../src/test/resources/kafka/kafkasource_text_to_console.conf | 2 +- .../test/resources/kafka/kafkasource_timestamp_to_console.conf | 2 +- 9 files changed, 9 insertions(+), 9 deletions(-) diff --git a/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-kafka-spark-e2e/src/test/java/org/apache/seatunnel/e2e/spark/v2/kafka/KafkaContainer.java b/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-kafka-spark-e2e/src/test/java/org/apache/seatunnel/e2e/spark/v2/kafka/KafkaContainer.java index 431df9205e9..117daf78e82 100644 --- a/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-kafka-spark-e2e/src/test/java/org/apache/seatunnel/e2e/spark/v2/kafka/KafkaContainer.java +++ b/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-kafka-spark-e2e/src/test/java/org/apache/seatunnel/e2e/spark/v2/kafka/KafkaContainer.java @@ -35,7 +35,7 @@ public class KafkaContainer extends GenericContainer { private static final DockerImageName DEFAULT_IMAGE_NAME = DockerImageName.parse("confluentinc/cp-kafka"); - public static final int KAFKA_PORT = 9093; + public static final int KAFKA_PORT = 9094; public static final int ZOOKEEPER_PORT = 2181; diff --git a/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-kafka-spark-e2e/src/test/java/org/apache/seatunnel/e2e/spark/v2/kafka/KafkaTestBaseIT.java b/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-kafka-spark-e2e/src/test/java/org/apache/seatunnel/e2e/spark/v2/kafka/KafkaTestBaseIT.java index f8447179422..b6d5354a98a 100644 --- a/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-kafka-spark-e2e/src/test/java/org/apache/seatunnel/e2e/spark/v2/kafka/KafkaTestBaseIT.java +++ b/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-kafka-spark-e2e/src/test/java/org/apache/seatunnel/e2e/spark/v2/kafka/KafkaTestBaseIT.java @@ -38,7 +38,7 @@ @Slf4j public class KafkaTestBaseIT extends SparkContainer { - protected static final int KAFKA_PORT = 9093; + protected static final int KAFKA_PORT = 9094; protected static final String KAFKA_HOST = "kafkaCluster"; diff --git a/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-kafka-spark-e2e/src/test/resources/kafka/kafkasource_earliest_to_console.conf b/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-kafka-spark-e2e/src/test/resources/kafka/kafkasource_earliest_to_console.conf index aabb06a3061..c1674e19267 100644 --- a/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-kafka-spark-e2e/src/test/resources/kafka/kafkasource_earliest_to_console.conf +++ b/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-kafka-spark-e2e/src/test/resources/kafka/kafkasource_earliest_to_console.conf @@ -27,7 +27,7 @@ env { source { Kafka { - bootstrap.servers = "kafkaCluster:9093" + bootstrap.servers = "kafkaCluster:9094" topic = "test_topic" result_table_name = "kafka_table" # The default format is json, which is optional diff --git a/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-kafka-spark-e2e/src/test/resources/kafka/kafkasource_group_offset_to_console.conf b/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-kafka-spark-e2e/src/test/resources/kafka/kafkasource_group_offset_to_console.conf index e3edd31e931..336d364c0bf 100644 --- a/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-kafka-spark-e2e/src/test/resources/kafka/kafkasource_group_offset_to_console.conf +++ b/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-kafka-spark-e2e/src/test/resources/kafka/kafkasource_group_offset_to_console.conf @@ -27,7 +27,7 @@ env { source { Kafka { - bootstrap.servers = "kafkaCluster:9093" + bootstrap.servers = "kafkaCluster:9094" topic = "test_topic" result_table_name = "kafka_table" # The default format is json, which is optional diff --git a/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-kafka-spark-e2e/src/test/resources/kafka/kafkasource_json_to_console.conf b/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-kafka-spark-e2e/src/test/resources/kafka/kafkasource_json_to_console.conf index 0a1f5df1401..cf5c67743c1 100644 --- a/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-kafka-spark-e2e/src/test/resources/kafka/kafkasource_json_to_console.conf +++ b/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-kafka-spark-e2e/src/test/resources/kafka/kafkasource_json_to_console.conf @@ -28,7 +28,7 @@ env { source { Kafka { - bootstrap.servers = "kafkaCluster:9093" + bootstrap.servers = "kafkaCluster:9094" topic = "test_topic" result_table_name = "kafka_table" kafka.auto.offset.reset = "earliest" diff --git a/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-kafka-spark-e2e/src/test/resources/kafka/kafkasource_latest_to_console.conf b/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-kafka-spark-e2e/src/test/resources/kafka/kafkasource_latest_to_console.conf index c974fc3a352..0c20ebc9a7a 100644 --- a/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-kafka-spark-e2e/src/test/resources/kafka/kafkasource_latest_to_console.conf +++ b/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-kafka-spark-e2e/src/test/resources/kafka/kafkasource_latest_to_console.conf @@ -27,7 +27,7 @@ env { source { Kafka { - bootstrap.servers = "kafkaCluster:9093" + bootstrap.servers = "kafkaCluster:9094" topic = "test_topic" result_table_name = "kafka_table" # The default format is json, which is optional diff --git a/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-kafka-spark-e2e/src/test/resources/kafka/kafkasource_specific_offsets_to_console.conf b/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-kafka-spark-e2e/src/test/resources/kafka/kafkasource_specific_offsets_to_console.conf index 9db55defb52..8f6f00a688e 100644 --- a/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-kafka-spark-e2e/src/test/resources/kafka/kafkasource_specific_offsets_to_console.conf +++ b/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-kafka-spark-e2e/src/test/resources/kafka/kafkasource_specific_offsets_to_console.conf @@ -27,7 +27,7 @@ env { source { Kafka { - bootstrap.servers = "kafkaCluster:9093" + bootstrap.servers = "kafkaCluster:9094" topic = "test_topic" result_table_name = "kafka_table" # The default format is json, which is optional diff --git a/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-kafka-spark-e2e/src/test/resources/kafka/kafkasource_text_to_console.conf b/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-kafka-spark-e2e/src/test/resources/kafka/kafkasource_text_to_console.conf index e132a62fbfe..94af38e644f 100644 --- a/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-kafka-spark-e2e/src/test/resources/kafka/kafkasource_text_to_console.conf +++ b/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-kafka-spark-e2e/src/test/resources/kafka/kafkasource_text_to_console.conf @@ -28,7 +28,7 @@ env { source { Kafka { - bootstrap.servers = "kafkaCluster:9093" + bootstrap.servers = "kafkaCluster:9094" topic = "test_topic" result_table_name = "kafka_table" kafka.auto.offset.reset = "earliest" diff --git a/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-kafka-spark-e2e/src/test/resources/kafka/kafkasource_timestamp_to_console.conf b/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-kafka-spark-e2e/src/test/resources/kafka/kafkasource_timestamp_to_console.conf index c91a8529bf3..e7ec35c7d6c 100644 --- a/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-kafka-spark-e2e/src/test/resources/kafka/kafkasource_timestamp_to_console.conf +++ b/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-kafka-spark-e2e/src/test/resources/kafka/kafkasource_timestamp_to_console.conf @@ -27,7 +27,7 @@ env { source { Kafka { - bootstrap.servers = "kafkaCluster:9093" + bootstrap.servers = "kafkaCluster:9094" topic = "test_topic" result_table_name = "kafka_table" # The default format is json, which is optional