Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Feature][Kafka] Support multi-table source read #5992

Merged
merged 97 commits into from
May 22, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
97 commits
Select commit Hold shift + click to select a range
683ba97
first commit
zhilinli123 Dec 11, 2023
5bbbc40
first commit
zhilinli123 Dec 11, 2023
97ffb2b
[Feature][Kafka] Support multi-table source read
zhilinli123 Dec 11, 2023
77eec17
[Feature][Kafka] Support multi-table source read
zhilinli123 Dec 12, 2023
2ecbc0c
[Feature][Kafka] Support multi-table source read
zhilinli123 Dec 12, 2023
0c6445f
[Feature][Kafka] Support multi-table source read
zhilinli123 Dec 12, 2023
aa37aa1
[Feature][Kafka] Support multi-table source read
zhilinli123 Dec 12, 2023
798533e
[Feature][Kafka] Support multi-table source read
zhilinli123 Dec 12, 2023
3996bec
[Feature][Kafka] Support multi-table source read
zhilinli123 Dec 12, 2023
3735d22
[Feature][Kafka] Support multi-table source read
zhilinli123 Dec 12, 2023
b40fdf7
[Feature][Kafka] Support multi-table source read
zhilinli123 Dec 12, 2023
77e3764
[Feature][Kafka] Support multi-table source read
zhilinli123 Dec 12, 2023
5c9ef49
fix null
zhilinli123 Dec 12, 2023
8b59238
add e2e & doc
zhilinli123 Dec 12, 2023
c2bbc3b
add e2e & doc
zhilinli123 Dec 12, 2023
73f1705
add e2e & doc
zhilinli123 Dec 12, 2023
85f98b1
add e2e & doc
zhilinli123 Dec 12, 2023
0dfd4d3
Merge remote-tracking branch 'origin/dev' into feature-multi-table-kafka
zhilinli123 Dec 13, 2023
2ebcf87
fix
zhilinli123 Dec 13, 2023
fcdd775
add doc desc
zhilinli123 Dec 13, 2023
fd26df7
add doc desc
zhilinli123 Dec 13, 2023
4453fb8
Merge remote-tracking branch 'origin/dev' into feature-multi-table-kafka
zhilinli123 Dec 14, 2023
86f9d16
add doc desc
zhilinli123 Dec 14, 2023
d35037a
Merge remote-tracking branch 'origin/dev' into feature-multi-table-kafka
zhilinli123 Dec 14, 2023
9763a80
add doc desc
zhilinli123 Dec 14, 2023
fd9b077
add doc desc
zhilinli123 Dec 15, 2023
7fa08a2
Merge remote-tracking branch 'origin/dev' into feature-multi-table-kafka
zhilinli123 Dec 17, 2023
12c6417
fix
zhilinli123 Dec 17, 2023
066d6dd
fix
zhilinli123 Dec 17, 2023
7dd1e06
Merge branch 'apache:dev' into feature-multi-table-kafka
zhilinli123 Dec 18, 2023
f700166
Merge remote-tracking branch 'origin/dev' into feature-multi-table-kafka
zhilinli123 Jan 5, 2024
5f9026f
fix ci error
zhilinli123 Jan 5, 2024
59e7bb0
fix ci error
zhilinli123 Jan 6, 2024
b30d705
Merge remote-tracking branch 'origin/dev' into feature-multi-table-kafka
zhilinli123 Jan 8, 2024
a3246f8
Merge branch 'apache:dev' into feature-multi-table-kafka
zhilinli123 Jan 19, 2024
cd68129
Merge remote-tracking branch 'origin/dev' into feature-multi-table-kafka
zhilinli123 Feb 3, 2024
9ac89db
fix
zhilinli123 Feb 3, 2024
7dac06b
Merge remote-tracking branch 'origin/dev' into feature-multi-table-kafka
zhilinli123 Feb 29, 2024
7aad106
fix
zhilinli123 Mar 1, 2024
ea67928
fix
zhilinli123 Mar 5, 2024
a832999
Merge branch 'apache:dev' into feature-multi-table-kafka
zhilinli123 Mar 5, 2024
b717932
Merge remote-tracking branch 'origin/dev' into feature-multi-table-kafka
zhilinli123 Mar 5, 2024
5157541
Merge remote-tracking branch 'origin/feature-multi-table-kafka' into …
zhilinli123 Mar 5, 2024
c953284
fix
zhilinli123 Mar 5, 2024
082e252
fix
zhilinli123 Mar 5, 2024
f6c7c9b
fix
zhilinli123 Mar 6, 2024
c911e60
Merge remote-tracking branch 'origin/dev' into feature-multi-table-kafka
zhilinli123 Mar 6, 2024
2fca6c9
fix
zhilinli123 Mar 6, 2024
c357cf9
fix
zhilinli123 Mar 6, 2024
1012062
fix
zhilinli123 Mar 6, 2024
42a251d
fix
zhilinli123 Mar 12, 2024
2ab9137
Merge remote-tracking branch 'origin/dev' into feature-multi-table-kafka
zhilinli123 Mar 14, 2024
a1ec1e2
Merge remote-tracking branch 'origin/dev' into feature-multi-table-kafka
zhilinli123 Apr 8, 2024
07cb3e9
fix
zhilinli123 Apr 9, 2024
87ee211
Merge remote-tracking branch 'origin/dev' into feature-multi-table-kafka
zhilinli123 Apr 9, 2024
2149149
fix
zhilinli123 Apr 9, 2024
cabb092
fix
zhilinli123 Apr 9, 2024
86f5176
fix
zhilinli123 Apr 10, 2024
751531e
fix
zhilinli123 Apr 12, 2024
0aae5ce
fix
zhilinli123 Apr 15, 2024
35d13e7
fix
zhilinli123 Apr 19, 2024
041ce8b
fix
zhilinli123 Apr 19, 2024
0fdd8ca
fix
zhilinli123 Apr 19, 2024
4b7af7c
fix
zhilinli123 Apr 19, 2024
c2056c5
fix
zhilinli123 Apr 19, 2024
cfc76d8
fix
zhilinli123 Apr 19, 2024
0bb4cf2
fix
zhilinli123 Apr 20, 2024
683d1d8
fix
zhilinli123 Apr 20, 2024
fc4c24a
Merge branch 'apache:dev' into feature-multi-table-kafka
zhilinli123 Apr 22, 2024
1433f00
fix
zhilinli123 Apr 22, 2024
9d67361
Merge remote-tracking branch 'origin/feature-multi-table-kafka' into …
zhilinli123 Apr 22, 2024
ab09701
fix
zhilinli123 Apr 28, 2024
23eb0c4
Merge branch 'apache:dev' into feature-multi-table-kafka
zhilinli123 Apr 28, 2024
7bb6353
fix
zhilinli123 Apr 28, 2024
00328fc
Merge remote-tracking branch 'origin/feature-multi-table-kafka' into …
zhilinli123 Apr 28, 2024
89cb3fe
fix
zhilinli123 May 6, 2024
2fb352b
Merge branch 'apache:dev' into feature-multi-table-kafka
zhilinli123 May 6, 2024
d622845
Merge remote-tracking branch 'origin/feature-multi-table-kafka' into …
zhilinli123 May 7, 2024
0cde5c8
fix
zhilinli123 May 9, 2024
78ae347
Merge remote-tracking branch 'origin/feature-multi-table-kafka' into …
zhilinli123 May 9, 2024
925adcb
fix
zhilinli123 May 9, 2024
4129369
fix
zhilinli123 May 10, 2024
d68ed26
fix
zhilinli123 May 10, 2024
4a75154
fix
zhilinli123 May 10, 2024
24d1592
fix
zhilinli123 May 10, 2024
65587e0
fix
zhilinli123 May 10, 2024
0dbd15c
fix
zhilinli123 May 11, 2024
763bbc2
fix
zhilinli123 May 11, 2024
1b9c192
fix
zhilinli123 May 11, 2024
d4fe2d5
fix
zhilinli123 May 13, 2024
878d471
fix
zhilinli123 May 13, 2024
49942aa
fix
zhilinli123 May 13, 2024
ac2f925
fix
zhilinli123 May 14, 2024
e05b350
Merge branch 'apache:dev' into feature-multi-table-kafka
zhilinli123 May 14, 2024
49bda79
fix
zhilinli123 May 14, 2024
72be5f5
Merge remote-tracking branch 'origin/feature-multi-table-kafka' into …
zhilinli123 May 14, 2024
56c900c
fix
zhilinli123 May 14, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
62 changes: 62 additions & 0 deletions docs/en/connector-v2/source/kafka.md
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ They can be downloaded via install-plugin.sh or from the Maven central repositor
| Name | Type | Required | Default | Description |
|-------------------------------------|-----------------------------------------------------------------------------|----------|--------------------------|-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
| topic | String | Yes | - | Topic name(s) to read data from when the table is used as source. It also supports topic list for source by separating topic by comma like 'topic-1,topic-2'. |
| table_list | Map | No | - | Topic list config You can configure only one `table_list` and one `topic` at the same time |
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
| table_list | Map | No | - | Topic list config You can configure only one `table_list` and one `topic` at the same time |
| table_list | Map | No | - | Topic list config. You can configure only one `table_list` or one `topic` at the same time |

| bootstrap.servers | String | Yes | - | Comma separated list of Kafka brokers. |
| pattern | Boolean | No | false | If `pattern` is set to `true`,the regular expression for a pattern of topic names to read from. All topics in clients with names that match the specified regular expression will be subscribed by the consumer. |
| consumer.group | String | No | SeaTunnel-Consumer-Group | `Kafka consumer group id`, used to distinguish different consumer groups. |
Expand Down Expand Up @@ -180,3 +181,64 @@ source {
}
```

### Multiple Kafka Source

> This is written to the same pg table according to different formats and topics of parsing kafka Perform upsert operations based on the id

```hocon

env {
execution.parallelism = 1
job.mode = "BATCH"
}

source {
Kafka {
bootstrap.servers = "kafka_e2e:9092"
table_list = [
{
topic = "^test-ogg-sou.*"
pattern = "true"
consumer.group = "ogg_multi_group"
start_mode = earliest
schema = {
fields {
id = "int"
name = "string"
description = "string"
weight = "string"
}
},
format = ogg_json
},
{
topic = "test-cdc_mds"
start_mode = earliest
schema = {
fields {
id = "int"
name = "string"
description = "string"
weight = "string"
}
},
format = canal_json
}
]
}
}

sink {
Jdbc {
driver = org.postgresql.Driver
url = "jdbc:postgresql://postgresql:5432/test?loggerLevel=OFF"
user = test
password = test
generate_sink_sql = true
database = test
table = public.sink
primary_keys = ["id"]
}
}
```

Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.apache.seatunnel.api.source.Boundedness;
import org.apache.seatunnel.api.source.SeaTunnelSource;
import org.apache.seatunnel.api.source.SupportColumnProjection;
import org.apache.seatunnel.api.table.catalog.CatalogTable;
import org.apache.seatunnel.api.table.catalog.CatalogTableUtil;
import org.apache.seatunnel.api.table.catalog.schema.TableSchemaOptions;
import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
Expand Down Expand Up @@ -66,6 +67,7 @@ public class AmazonSqsSource extends AbstractSingleSplitSource<SeaTunnelRow>
private AmazonSqsSourceOptions amazonSqsSourceOptions;
private DeserializationSchema<SeaTunnelRow> deserializationSchema;
private SeaTunnelRowType typeInfo;
private CatalogTable catalogTable;

@Override
public String getPluginName() {
Expand All @@ -84,9 +86,9 @@ public void prepare(Config pluginConfig) throws PrepareFailException {
"PluginName: %s, PluginType: %s, Message: %s",
getPluginName(), PluginType.SOURCE, result.getMsg()));
}
amazonSqsSourceOptions = new AmazonSqsSourceOptions(pluginConfig);
typeInfo = CatalogTableUtil.buildWithConfig(pluginConfig).getSeaTunnelRowType();

this.amazonSqsSourceOptions = new AmazonSqsSourceOptions(pluginConfig);
this.catalogTable = CatalogTableUtil.buildWithConfig(pluginConfig);
this.typeInfo = catalogTable.getSeaTunnelRowType();
setDeserialization(pluginConfig);
}

Expand All @@ -109,11 +111,11 @@ public AbstractSingleSplitReader<SeaTunnelRow> createReader(

private void setDeserialization(Config config) {
if (config.hasPath(TableSchemaOptions.SCHEMA.key())) {
typeInfo = CatalogTableUtil.buildWithConfig(config).getSeaTunnelRowType();
MessageFormat format = ReadonlyConfig.fromConfig(config).get(FORMAT);
switch (format) {
case JSON:
deserializationSchema = new JsonDeserializationSchema(false, false, typeInfo);
deserializationSchema =
new JsonDeserializationSchema(catalogTable, false, false);
break;
case TEXT:
String delimiter = DEFAULT_FIELD_DELIMITER;
Expand All @@ -128,7 +130,7 @@ private void setDeserialization(Config config) {
break;
case CANAL_JSON:
deserializationSchema =
CanalJsonDeserializationSchema.builder(typeInfo)
CanalJsonDeserializationSchema.builder(catalogTable)
.setIgnoreParseErrors(true)
.build();
break;
Expand All @@ -138,7 +140,8 @@ private void setDeserialization(Config config) {
includeSchema = config.getBoolean(DEBEZIUM_RECORD_INCLUDE_SCHEMA.key());
}
deserializationSchema =
new DebeziumJsonDeserializationSchema(typeInfo, true, includeSchema);
new DebeziumJsonDeserializationSchema(
catalogTable, true, includeSchema);
break;
default:
throw new SeaTunnelJsonFormatException(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,8 +52,7 @@ public FakeDataGenerator(FakeConfig fakeConfig) {
this.jsonDeserializationSchema =
fakeConfig.getFakeRows() == null
? null
: new JsonDeserializationSchema(
false, false, catalogTable.getSeaTunnelRowType());
: new JsonDeserializationSchema(catalogTable, false, false);
this.fakeDataRandomUtils = new FakeDataRandomUtils(fakeConfig);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.apache.seatunnel.api.serialization.DeserializationSchema;
import org.apache.seatunnel.api.source.Boundedness;
import org.apache.seatunnel.api.source.SeaTunnelSource;
import org.apache.seatunnel.api.table.catalog.CatalogTable;
import org.apache.seatunnel.api.table.catalog.CatalogTableUtil;
import org.apache.seatunnel.api.table.catalog.schema.TableSchemaOptions;
import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
Expand All @@ -46,6 +47,7 @@
public class SheetsSource extends AbstractSingleSplitSource<SeaTunnelRow> {

private SeaTunnelRowType seaTunnelRowType;
private CatalogTable catalogTable;

private SheetsParameters sheetsParameters;

Expand Down Expand Up @@ -75,12 +77,13 @@ public void prepare(Config pluginConfig) throws PrepareFailException {
}
this.sheetsParameters = new SheetsParameters().buildWithConfig(pluginConfig);
if (pluginConfig.hasPath(TableSchemaOptions.SCHEMA.key())) {
this.seaTunnelRowType =
CatalogTableUtil.buildWithConfig(pluginConfig).getSeaTunnelRowType();
this.catalogTable = CatalogTableUtil.buildWithConfig(pluginConfig);
} else {
this.seaTunnelRowType = CatalogTableUtil.buildSimpleTextSchema();
this.catalogTable = CatalogTableUtil.buildSimpleTextTable();
}
this.deserializationSchema = new JsonDeserializationSchema(false, false, seaTunnelRowType);

this.seaTunnelRowType = catalogTable.getSeaTunnelRowType();
this.deserializationSchema = new JsonDeserializationSchema(catalogTable, false, false);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
package org.apache.seatunnel.connectors.seatunnel.google.sheets.deserialize;

import org.apache.seatunnel.api.serialization.DeserializationSchema;
import org.apache.seatunnel.api.table.catalog.CatalogTable;
import org.apache.seatunnel.api.table.catalog.CatalogTableUtil;
import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
Expand All @@ -39,8 +41,11 @@ public class GoogleSheetsDeserializerTest {
public void testJsonParseError() {
SeaTunnelRowType schema =
new SeaTunnelRowType(new String[] {"name"}, new SeaTunnelDataType[] {STRING_TYPE});

CatalogTable catalogTables = CatalogTableUtil.getCatalogTable("", "", "", "", schema);

final DeserializationSchema<SeaTunnelRow> deser =
new JsonDeserializationSchema(false, false, schema);
new JsonDeserializationSchema(catalogTables, false, false);
final GoogleSheetsDeserializer googleSheetsDeser =
new GoogleSheetsDeserializer(schema.getFieldNames(), deser);
List<Object> row = new ArrayList<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -127,8 +127,7 @@ protected void buildSchemaWithConfig(Config pluginConfig) {
switch (format) {
case JSON:
this.deserializationSchema =
new JsonDeserializationSchema(
false, false, catalogTable.getSeaTunnelRowType());
new JsonDeserializationSchema(catalogTable, false, false);
if (pluginConfig.hasPath(HttpConfig.JSON_FIELD.key())) {
jsonField =
getJsonField(pluginConfig.getConfig(HttpConfig.JSON_FIELD.key()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -177,4 +177,10 @@ public class Config {
.defaultValue(KafkaSemantics.NON)
.withDescription(
"Semantics that can be chosen EXACTLY_ONCE/AT_LEAST_ONCE/NON, default NON.");
public static final Option<List<Map<String, Object>>> TABLE_LIST =
Options.key("table_list")
.type(new TypeReference<List<Map<String, Object>>>() {})
.noDefaultValue()
.withDescription(
"Topic list config. You can configure only one `table_list` or one `topic` at the same time");
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,9 @@

package org.apache.seatunnel.connectors.seatunnel.kafka.source;

import org.apache.seatunnel.api.serialization.DeserializationSchema;
import org.apache.seatunnel.api.table.catalog.CatalogTable;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.connectors.seatunnel.kafka.config.StartMode;

import org.apache.kafka.common.TopicPartition;
Expand All @@ -33,11 +36,11 @@ public class ConsumerMetadata implements Serializable {

private String topic;
private boolean isPattern = false;
private String bootstrapServers;
private Properties properties;
private String consumerGroup;
private boolean commitOnCheckpoint = false;
private StartMode startMode = StartMode.GROUP_OFFSETS;
private Map<TopicPartition, Long> specificStartOffsets;
private Long startOffsetsTimestamp;
private DeserializationSchema<SeaTunnelRow> deserializationSchema;
private CatalogTable catalogTable;
}
Original file line number Diff line number Diff line change
Expand Up @@ -37,15 +37,15 @@ public class KafkaConsumerThread implements Runnable {

private final LinkedBlockingQueue<Consumer<KafkaConsumer<byte[], byte[]>>> tasks;

public KafkaConsumerThread(ConsumerMetadata metadata) {
public KafkaConsumerThread(KafkaSourceConfig kafkaSourceConfig, ConsumerMetadata metadata) {
this.metadata = metadata;
this.tasks = new LinkedBlockingQueue<>();
this.consumer =
initConsumer(
this.metadata.getBootstrapServers(),
this.metadata.getConsumerGroup(),
this.metadata.getProperties(),
!this.metadata.isCommitOnCheckpoint());
kafkaSourceConfig.getBootstrap(),
metadata.getConsumerGroup(),
kafkaSourceConfig.getProperties(),
kafkaSourceConfig.isCommitOnCheckpoint());
}

@Override
Expand All @@ -64,7 +64,9 @@ public void run() {
}
} finally {
try {
consumer.close();
if (consumer != null) {
consumer.close();
}
} catch (Throwable t) {
throw new KafkaConnectorException(KafkaConnectorErrorCode.CONSUMER_CLOSE_FAILED, t);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,8 @@
import org.apache.seatunnel.common.constants.JobMode;
import org.apache.seatunnel.connectors.seatunnel.kafka.state.KafkaSourceState;

import com.google.common.collect.Lists;

import java.util.List;
import java.util.stream.Collectors;

public class KafkaSource
implements SeaTunnelSource<SeaTunnelRow, KafkaSourceSplit, KafkaSourceState>,
Expand Down Expand Up @@ -59,37 +58,32 @@ public String getPluginName() {

@Override
public List<CatalogTable> getProducedCatalogTables() {
return Lists.newArrayList(kafkaSourceConfig.getCatalogTable());
return kafkaSourceConfig.getMapMetadata().values().stream()
.map(ConsumerMetadata::getCatalogTable)
.collect(Collectors.toList());
}

@Override
public SourceReader<SeaTunnelRow, KafkaSourceSplit> createReader(
SourceReader.Context readerContext) {
return new KafkaSourceReader(
kafkaSourceConfig.getMetadata(),
kafkaSourceConfig.getDeserializationSchema(),
kafkaSourceConfig,
readerContext,
kafkaSourceConfig.getMessageFormatErrorHandleWay());
}

@Override
public SourceSplitEnumerator<KafkaSourceSplit, KafkaSourceState> createEnumerator(
SourceSplitEnumerator.Context<KafkaSourceSplit> enumeratorContext) {
return new KafkaSourceSplitEnumerator(
kafkaSourceConfig.getMetadata(),
enumeratorContext,
kafkaSourceConfig.getDiscoveryIntervalMillis());
return new KafkaSourceSplitEnumerator(kafkaSourceConfig, enumeratorContext, null);
}

@Override
public SourceSplitEnumerator<KafkaSourceSplit, KafkaSourceState> restoreEnumerator(
SourceSplitEnumerator.Context<KafkaSourceSplit> enumeratorContext,
KafkaSourceState checkpointState) {
return new KafkaSourceSplitEnumerator(
kafkaSourceConfig.getMetadata(),
enumeratorContext,
checkpointState,
kafkaSourceConfig.getDiscoveryIntervalMillis());
kafkaSourceConfig, enumeratorContext, checkpointState);
}

@Override
Expand Down
Loading
Loading