Skip to content

Commit

Permalink
[Bugfix][Kafka] In kafak flow mode, stop offse should be Long.MAX_VALUE
Browse files Browse the repository at this point in the history
  • Loading branch information
Carl-Zhou-CN committed Nov 15, 2024
1 parent a2a961b commit a42e8f0
Show file tree
Hide file tree
Showing 4 changed files with 178 additions and 85 deletions.
1 change: 1 addition & 0 deletions docs/en/connector-v2/source/kafka.md
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ They can be downloaded via install-plugin.sh or from the Maven central repositor
### Simple

> This example reads the data of kafka's topic_1, topic_2, topic_3 and prints it to the client.And if you have not yet installed and deployed SeaTunnel, you need to follow the instructions in Install SeaTunnel to install and deploy SeaTunnel. And if you have not yet installed and deployed SeaTunnel, you need to follow the instructions in [Install SeaTunnel](../../start-v2/locally/deployment.md) to install and deploy SeaTunnel. And then follow the instructions in [Quick Start With SeaTunnel Engine](../../start-v2/locally/quick-start-seatunnel-engine.md) to run this job.
> In batch mode, it will consume continuously until it reaches the maximum offset.
```hocon
# Defining the runtime environment
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
Expand Down Expand Up @@ -89,7 +88,7 @@ public class KafkaSourceSplitEnumerator
}

@VisibleForTesting
protected KafkaSourceSplitEnumerator(
public KafkaSourceSplitEnumerator(
AdminClient adminClient,
Map<TopicPartition, KafkaSourceSplit> pendingSplit,
Map<TopicPartition, KafkaSourceSplit> assignedSplit) {
Expand All @@ -102,6 +101,16 @@ protected KafkaSourceSplitEnumerator(
this.assignedSplit = assignedSplit;
}

@VisibleForTesting
public KafkaSourceSplitEnumerator(
AdminClient adminClient,
Map<TopicPartition, KafkaSourceSplit> pendingSplit,
Map<TopicPartition, KafkaSourceSplit> assignedSplit,
boolean isStreamingMode) {
this(adminClient, pendingSplit, assignedSplit);
this.isStreamingMode = isStreamingMode;
}

@Override
public void open() {
if (discoveryIntervalMillis > 0) {
Expand Down Expand Up @@ -209,7 +218,7 @@ public void addSplitsBack(List<KafkaSourceSplit> splits, int subtaskId) {
private Map<TopicPartition, ? extends KafkaSourceSplit> convertToNextSplit(
List<KafkaSourceSplit> splits) {
try {
Map<TopicPartition, Long> listOffsets =
Map<TopicPartition, Long> latestOffsets =
listOffsets(
splits.stream()
.map(KafkaSourceSplit::getTopicPartition)
Expand All @@ -219,7 +228,10 @@ public void addSplitsBack(List<KafkaSourceSplit> splits, int subtaskId) {
splits.forEach(
split -> {
split.setStartOffset(split.getEndOffset() + 1);
split.setEndOffset(listOffsets.get(split.getTopicPartition()));
split.setEndOffset(
isStreamingMode
? Long.MAX_VALUE
: latestOffsets.get(split.getTopicPartition()));
});
return splits.stream()
.collect(Collectors.toMap(KafkaSourceSplit::getTopicPartition, split -> split));
Expand Down Expand Up @@ -311,8 +323,9 @@ private Set<KafkaSourceSplit> getTopicInfo() throws ExecutionException, Interrup
TablePath tablePath = topicMappingTablePathMap.get(partition.topic());
KafkaSourceSplit split = new KafkaSourceSplit(tablePath, partition);
split.setEndOffset(
latestOffsets.getOrDefault(
split.getTopicPartition(), Long.MAX_VALUE));
isStreamingMode
? Long.MAX_VALUE
: latestOffsets.get(partition));
return split;
})
.collect(Collectors.toSet());
Expand Down Expand Up @@ -352,10 +365,6 @@ private Map<TopicPartition, Long> listOffsets(
Collection<TopicPartition> partitions, OffsetSpec offsetSpec)
throws ExecutionException, InterruptedException {

if (isStreamingMode) {
return Collections.emptyMap();
}

Map<TopicPartition, OffsetSpec> topicPartitionOffsets =
partitions.stream()
.collect(Collectors.toMap(partition -> partition, __ -> offsetSpec));
Expand Down Expand Up @@ -403,7 +412,8 @@ private void discoverySplits() throws ExecutionException, InterruptedException {
assignSplit();
}

private void fetchPendingPartitionSplit() throws ExecutionException, InterruptedException {
@VisibleForTesting
public void fetchPendingPartitionSplit() throws ExecutionException, InterruptedException {
getTopicInfo()
.forEach(
split -> {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,156 @@
/*
* * Licensed to the Apache Software Foundation (ASF) under one or more
* * contributor license agreements. See the NOTICE file distributed with
* * this work for additional information regarding copyright ownership.
* * The ASF licenses this file to You under the Apache License, Version 2.0
* * (the "License"); you may not use this file except in compliance with
* * the License. You may obtain a copy of the License at
* *
* * http://www.apache.org/licenses/LICENSE-2.0
* *
* * Unless required by applicable law or agreed to in writing, software
* * distributed under the License is distributed on an "AS IS" BASIS,
* * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* * See the License for the specific language governing permissions and
* * limitations under the License.
*/

package org.apache.kafka.clients.admin;

import org.apache.seatunnel.connectors.seatunnel.kafka.source.KafkaSourceSplit;
import org.apache.seatunnel.connectors.seatunnel.kafka.source.KafkaSourceSplitEnumerator;

import org.apache.kafka.common.KafkaFuture;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.TopicPartitionInfo;

import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.mockito.Mockito;

import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ExecutionException;

class KafkaSourceSplitEnumeratorTest {

AdminClient adminClient = Mockito.mock(KafkaAdminClient.class);
// prepare
TopicPartition partition = new TopicPartition("test", 0);

@BeforeEach
void init() {

Mockito.when(adminClient.listOffsets(Mockito.any(java.util.Map.class)))
.thenReturn(
new ListOffsetsResult(
new HashMap<
TopicPartition,
KafkaFuture<ListOffsetsResult.ListOffsetsResultInfo>>() {
{
put(
partition,
KafkaFuture.completedFuture(
new ListOffsetsResult.ListOffsetsResultInfo(
0, 0, Optional.of(0))));
}
}));
Mockito.when(adminClient.describeTopics(Mockito.any(java.util.Collection.class)))
.thenReturn(
DescribeTopicsResult.ofTopicNames(
new HashMap<String, KafkaFuture<TopicDescription>>() {
{
put(
partition.topic(),
KafkaFuture.completedFuture(
new TopicDescription(
partition.topic(),
false,
Collections.singletonList(
new TopicPartitionInfo(
0,
null,
Collections
.emptyList(),
Collections
.emptyList())))));
}
}));
}

@Test
void addSplitsBack() {
// test
Map<TopicPartition, KafkaSourceSplit> assignedSplit =
new HashMap<TopicPartition, KafkaSourceSplit>() {
{
put(partition, new KafkaSourceSplit(null, partition));
}
};
Map<TopicPartition, KafkaSourceSplit> pendingSplit = new HashMap<>();
List<KafkaSourceSplit> splits = Arrays.asList(new KafkaSourceSplit(null, partition));
KafkaSourceSplitEnumerator enumerator =
new KafkaSourceSplitEnumerator(adminClient, pendingSplit, assignedSplit);
enumerator.addSplitsBack(splits, 1);
Assertions.assertTrue(pendingSplit.size() == splits.size());
Assertions.assertNull(assignedSplit.get(partition));
Assertions.assertTrue(pendingSplit.get(partition).getEndOffset() == 0);
}

@Test
void addStreamingSplitsBack() {
// test
Map<TopicPartition, KafkaSourceSplit> assignedSplit =
new HashMap<TopicPartition, KafkaSourceSplit>() {
{
put(partition, new KafkaSourceSplit(null, partition));
}
};
Map<TopicPartition, KafkaSourceSplit> pendingSplit = new HashMap<>();
List<KafkaSourceSplit> splits =
Collections.singletonList(new KafkaSourceSplit(null, partition));
KafkaSourceSplitEnumerator enumerator =
new KafkaSourceSplitEnumerator(adminClient, pendingSplit, assignedSplit, true);
enumerator.addSplitsBack(splits, 1);
Assertions.assertEquals(pendingSplit.size(), splits.size());
Assertions.assertNull(assignedSplit.get(partition));
Assertions.assertTrue(pendingSplit.get(partition).getEndOffset() == Long.MAX_VALUE);
}

@Test
void addStreamingSplits() throws ExecutionException, InterruptedException {
// test
Map<TopicPartition, KafkaSourceSplit> assignedSplit =
new HashMap<TopicPartition, KafkaSourceSplit>();
Map<TopicPartition, KafkaSourceSplit> pendingSplit = new HashMap<>();
List<KafkaSourceSplit> splits =
Collections.singletonList(new KafkaSourceSplit(null, partition));
KafkaSourceSplitEnumerator enumerator =
new KafkaSourceSplitEnumerator(adminClient, pendingSplit, assignedSplit, true);
enumerator.fetchPendingPartitionSplit();
Assertions.assertEquals(pendingSplit.size(), splits.size());
Assertions.assertNotNull(pendingSplit.get(partition));
Assertions.assertTrue(pendingSplit.get(partition).getEndOffset() == Long.MAX_VALUE);
}

@Test
void addplits() throws ExecutionException, InterruptedException {
// test
Map<TopicPartition, KafkaSourceSplit> assignedSplit =
new HashMap<TopicPartition, KafkaSourceSplit>();
Map<TopicPartition, KafkaSourceSplit> pendingSplit = new HashMap<>();
List<KafkaSourceSplit> splits =
Collections.singletonList(new KafkaSourceSplit(null, partition));
KafkaSourceSplitEnumerator enumerator =
new KafkaSourceSplitEnumerator(adminClient, pendingSplit, assignedSplit, false);
enumerator.fetchPendingPartitionSplit();
Assertions.assertEquals(pendingSplit.size(), splits.size());
Assertions.assertNotNull(pendingSplit.get(partition));
Assertions.assertTrue(pendingSplit.get(partition).getEndOffset() == 0);
}
}

This file was deleted.

0 comments on commit a42e8f0

Please sign in to comment.