From 2c60ddefb06b65bb1b49f11395929e6d81b05c54 Mon Sep 17 00:00:00 2001 From: CosmosNi <627165587@qq.com> Date: Mon, 9 Sep 2024 16:52:12 +0800 Subject: [PATCH 1/4] [Feature][kafka] Add arg poll.timeout for interval poll messages --- docs/en/connector-v2/source/kafka.md | 3 ++- docs/zh/connector-v2/source/Kafka.md | 3 ++- .../seatunnel/connectors/seatunnel/kafka/config/Config.java | 6 ++++++ .../seatunnel/kafka/source/KafkaPartitionSplitReader.java | 6 ++++-- .../seatunnel/kafka/source/KafkaSourceConfig.java | 3 +++ 5 files changed, 17 insertions(+), 4 deletions(-) diff --git a/docs/en/connector-v2/source/kafka.md b/docs/en/connector-v2/source/kafka.md index e9259fae484..26b6731e430 100644 --- a/docs/en/connector-v2/source/kafka.md +++ b/docs/en/connector-v2/source/kafka.md @@ -40,6 +40,7 @@ They can be downloaded via install-plugin.sh or from the Maven central repositor | pattern | Boolean | No | false | If `pattern` is set to `true`,the regular expression for a pattern of topic names to read from. All topics in clients with names that match the specified regular expression will be subscribed by the consumer. | | consumer.group | String | No | SeaTunnel-Consumer-Group | `Kafka consumer group id`, used to distinguish different consumer groups. | | commit_on_checkpoint | Boolean | No | true | If true the consumer's offset will be periodically committed in the background. | +| poll.timeout | Long | No | 10000L | The interval for poll messages. | | kafka.config | Map | No | - | In addition to the above necessary parameters that must be specified by the `Kafka consumer` client, users can also specify multiple `consumer` client non-mandatory parameters, covering [all consumer parameters specified in the official Kafka document](https://kafka.apache.org/documentation.html#consumerconfigs). | | schema | Config | No | - | The structure of the data, including field names and field types. | | format | String | No | json | Data format. The default format is json. Optional text format, canal_json, debezium_json, maxwell_json, ogg_json, avro and protobuf. If you use json or text format. The default field separator is ", ". If you customize the delimiter, add the "field_delimiter" option.If you use canal format, please refer to [canal-json](../formats/canal-json.md) for details.If you use debezium format, please refer to [debezium-json](../formats/debezium-json.md) for details. Some format details please refer [formats](../formats) | @@ -291,4 +292,4 @@ source { result_table_name = "kafka_table" } } -``` \ No newline at end of file +``` diff --git a/docs/zh/connector-v2/source/Kafka.md b/docs/zh/connector-v2/source/Kafka.md index 8f65e92e924..7a25416b3f8 100644 --- a/docs/zh/connector-v2/source/Kafka.md +++ b/docs/zh/connector-v2/source/Kafka.md @@ -40,6 +40,7 @@ | pattern | Boolean | 否 | false | 如果 `pattern` 设置为 `true`,则会使用指定的正则表达式匹配并订阅主题。 | | consumer.group | String | 否 | SeaTunnel-Consumer-Group | `Kafka 消费者组 ID`,用于区分不同的消费者组。 | | commit_on_checkpoint | Boolean | 否 | true | 如果为 true,消费者的偏移量将会定期在后台提交。 | +| poll.timeout | Long | 否 | 10000L | kafka主动拉取时间间隔。 | | kafka.config | Map | 否 | - | 除了上述必要参数外,用户还可以指定多个非强制的消费者客户端参数,覆盖 [Kafka 官方文档](https://kafka.apache.org/documentation.html#consumerconfigs) 中指定的所有消费者参数。 | | schema | Config | 否 | - | 数据结构,包括字段名称和字段类型。 | | format | String | 否 | json | 数据格式。默认格式为 json。可选格式包括 text, canal_json, debezium_json, ogg_json, maxwell_json, avro 和 protobuf。默认字段分隔符为 ", "。如果自定义分隔符,添加 "field_delimiter" 选项。如果使用 canal 格式,请参考 [canal-json](../formats/canal-json.md) 了解详细信息。如果使用 debezium 格式,请参考 [debezium-json](../formats/debezium-json.md)。一些Format的详细信息请参考 [formats](../formats) | @@ -285,4 +286,4 @@ source { result_table_name = "kafka_table" } } -``` \ No newline at end of file +``` diff --git a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/config/Config.java b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/config/Config.java index a907c9bc212..293821e0edc 100644 --- a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/config/Config.java +++ b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/config/Config.java @@ -162,6 +162,12 @@ public class Config { .withDescription( "The interval for dynamically discovering topics and partitions."); + public static final Option KEY_POLL_TIMEOUT = + Options.key("poll.timeout") + .longType() + .defaultValue(10000L) + .withDescription("The interval for poll message"); + public static final Option MESSAGE_FORMAT_ERROR_HANDLE_WAY_OPTION = Options.key("format_error_handle_way") .enumType(MessageFormatErrorHandleWay.class) diff --git a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaPartitionSplitReader.java b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaPartitionSplitReader.java index 8bca82999c7..d7f0dd0d8d2 100644 --- a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaPartitionSplitReader.java +++ b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaPartitionSplitReader.java @@ -62,7 +62,6 @@ public class KafkaPartitionSplitReader private static final Logger LOG = LoggerFactory.getLogger(KafkaPartitionSplitReader.class); - private static final long POLL_TIMEOUT = 10000L; private static final String CLIENT_ID_PREFIX = "seatunnel"; private final KafkaSourceConfig kafkaSourceConfig; @@ -74,6 +73,8 @@ public class KafkaPartitionSplitReader private final Set emptySplits = new HashSet<>(); + private final long pollTimeout; + public KafkaPartitionSplitReader( KafkaSourceConfig kafkaSourceConfig, SourceReader.Context context) { this.kafkaSourceConfig = kafkaSourceConfig; @@ -81,13 +82,14 @@ public KafkaPartitionSplitReader( this.stoppingOffsets = new HashMap<>(); this.groupId = kafkaSourceConfig.getProperties().getProperty(ConsumerConfig.GROUP_ID_CONFIG); + this.pollTimeout = kafkaSourceConfig.getPollTimeout(); } @Override public RecordsWithSplitIds> fetch() throws IOException { ConsumerRecords consumerRecords; try { - consumerRecords = consumer.poll(Duration.ofMillis(POLL_TIMEOUT)); + consumerRecords = consumer.poll(Duration.ofMillis(pollTimeout)); } catch (WakeupException | IllegalStateException e) { // IllegalStateException will be thrown if the consumer is not assigned any partitions. // This happens if all assigned partitions are invalid or empty (starting offset >= diff --git a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSourceConfig.java b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSourceConfig.java index 960a0184029..0f645d72182 100644 --- a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSourceConfig.java +++ b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSourceConfig.java @@ -70,6 +70,7 @@ import static org.apache.seatunnel.connectors.seatunnel.kafka.config.Config.FORMAT; import static org.apache.seatunnel.connectors.seatunnel.kafka.config.Config.KAFKA_CONFIG; import static org.apache.seatunnel.connectors.seatunnel.kafka.config.Config.KEY_PARTITION_DISCOVERY_INTERVAL_MILLIS; +import static org.apache.seatunnel.connectors.seatunnel.kafka.config.Config.KEY_POLL_TIMEOUT; import static org.apache.seatunnel.connectors.seatunnel.kafka.config.Config.MESSAGE_FORMAT_ERROR_HANDLE_WAY_OPTION; import static org.apache.seatunnel.connectors.seatunnel.kafka.config.Config.PATTERN; import static org.apache.seatunnel.connectors.seatunnel.kafka.config.Config.PROTOBUF_MESSAGE_NAME; @@ -90,6 +91,7 @@ public class KafkaSourceConfig implements Serializable { @Getter private final long discoveryIntervalMillis; @Getter private final MessageFormatErrorHandleWay messageFormatErrorHandleWay; @Getter private final String consumerGroup; + @Getter private final long pollTimeout; public KafkaSourceConfig(ReadonlyConfig readonlyConfig) { this.bootstrap = readonlyConfig.get(BOOTSTRAP_SERVERS); @@ -99,6 +101,7 @@ public KafkaSourceConfig(ReadonlyConfig readonlyConfig) { this.discoveryIntervalMillis = readonlyConfig.get(KEY_PARTITION_DISCOVERY_INTERVAL_MILLIS); this.messageFormatErrorHandleWay = readonlyConfig.get(MESSAGE_FORMAT_ERROR_HANDLE_WAY_OPTION); + this.pollTimeout = readonlyConfig.get(KEY_POLL_TIMEOUT); this.consumerGroup = readonlyConfig.get(CONSUMER_GROUP); } From 46f4b330ce925d6a1ec348cfc0c2b593c2454cbc Mon Sep 17 00:00:00 2001 From: CosmosNi <627165587@qq.com> Date: Mon, 9 Sep 2024 17:54:16 +0800 Subject: [PATCH 2/4] [Feature][kafka] Add arg poll.timeout for interval poll messages --- docs/en/connector-v2/source/kafka.md | 40 ++++++++++++++-------------- docs/zh/connector-v2/source/Kafka.md | 2 +- 2 files changed, 21 insertions(+), 21 deletions(-) diff --git a/docs/en/connector-v2/source/kafka.md b/docs/en/connector-v2/source/kafka.md index 26b6731e430..8f1d9d223c0 100644 --- a/docs/en/connector-v2/source/kafka.md +++ b/docs/en/connector-v2/source/kafka.md @@ -32,27 +32,27 @@ They can be downloaded via install-plugin.sh or from the Maven central repositor ## Source Options -| Name | Type | Required | Default | Description | -|-------------------------------------|-----------------------------------------------------------------------------|----------|--------------------------|-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| -| topic | String | Yes | - | Topic name(s) to read data from when the table is used as source. It also supports topic list for source by separating topic by comma like 'topic-1,topic-2'. | -| table_list | Map | No | - | Topic list config You can configure only one `table_list` and one `topic` at the same time | -| bootstrap.servers | String | Yes | - | Comma separated list of Kafka brokers. | -| pattern | Boolean | No | false | If `pattern` is set to `true`,the regular expression for a pattern of topic names to read from. All topics in clients with names that match the specified regular expression will be subscribed by the consumer. | -| consumer.group | String | No | SeaTunnel-Consumer-Group | `Kafka consumer group id`, used to distinguish different consumer groups. | -| commit_on_checkpoint | Boolean | No | true | If true the consumer's offset will be periodically committed in the background. | -| poll.timeout | Long | No | 10000L | The interval for poll messages. | -| kafka.config | Map | No | - | In addition to the above necessary parameters that must be specified by the `Kafka consumer` client, users can also specify multiple `consumer` client non-mandatory parameters, covering [all consumer parameters specified in the official Kafka document](https://kafka.apache.org/documentation.html#consumerconfigs). | -| schema | Config | No | - | The structure of the data, including field names and field types. | -| format | String | No | json | Data format. The default format is json. Optional text format, canal_json, debezium_json, maxwell_json, ogg_json, avro and protobuf. If you use json or text format. The default field separator is ", ". If you customize the delimiter, add the "field_delimiter" option.If you use canal format, please refer to [canal-json](../formats/canal-json.md) for details.If you use debezium format, please refer to [debezium-json](../formats/debezium-json.md) for details. Some format details please refer [formats](../formats) | -| format_error_handle_way | String | No | fail | The processing method of data format error. The default value is fail, and the optional value is (fail, skip). When fail is selected, data format error will block and an exception will be thrown. When skip is selected, data format error will skip this line data. | -| field_delimiter | String | No | , | Customize the field delimiter for data format. | +| Name | Type | Required | Default | Description | +|-------------------------------------|---------------------------------------------------------------------------|----------|--------------------------|-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| +| topic | String | Yes | - | Topic name(s) to read data from when the table is used as source. It also supports topic list for source by separating topic by comma like 'topic-1,topic-2'. | +| table_list | Map | No | - | Topic list config You can configure only one `table_list` and one `topic` at the same time | +| bootstrap.servers | String | Yes | - | Comma separated list of Kafka brokers. | +| pattern | Boolean | No | false | If `pattern` is set to `true`,the regular expression for a pattern of topic names to read from. All topics in clients with names that match the specified regular expression will be subscribed by the consumer. | +| consumer.group | String | No | SeaTunnel-Consumer-Group | `Kafka consumer group id`, used to distinguish different consumer groups. | +| commit_on_checkpoint | Boolean | No | true | If true the consumer's offset will be periodically committed in the background. | +| poll.timeout | Long | No | 10000L[millis] | The interval for poll messages. | +| kafka.config | Map | No | - | In addition to the above necessary parameters that must be specified by the `Kafka consumer` client, users can also specify multiple `consumer` client non-mandatory parameters, covering [all consumer parameters specified in the official Kafka document](https://kafka.apache.org/documentation.html#consumerconfigs). | +| schema | Config | No | - | The structure of the data, including field names and field types. | +| format | String | No | json | Data format. The default format is json. Optional text format, canal_json, debezium_json, maxwell_json, ogg_json, avro and protobuf. If you use json or text format. The default field separator is ", ". If you customize the delimiter, add the "field_delimiter" option.If you use canal format, please refer to [canal-json](../formats/canal-json.md) for details.If you use debezium format, please refer to [debezium-json](../formats/debezium-json.md) for details. Some format details please refer [formats](../formats) | +| format_error_handle_way | String | No | fail | The processing method of data format error. The default value is fail, and the optional value is (fail, skip). When fail is selected, data format error will block and an exception will be thrown. When skip is selected, data format error will skip this line data. | +| field_delimiter | String | No | , | Customize the field delimiter for data format. | | start_mode | StartMode[earliest],[group_offsets],[latest],[specific_offsets],[timestamp] | No | group_offsets | The initial consumption pattern of consumers. | -| start_mode.offsets | Config | No | - | The offset required for consumption mode to be specific_offsets. | -| start_mode.timestamp | Long | No | - | The time required for consumption mode to be "timestamp". | -| partition-discovery.interval-millis | Long | No | -1 | The interval for dynamically discovering topics and partitions. | -| common-options | | No | - | Source plugin common parameters, please refer to [Source Common Options](../source-common-options.md) for details | -| protobuf_message_name | String | No | - | Effective when the format is set to protobuf, specifies the Message name | -| protobuf_schema | String | No | - | Effective when the format is set to protobuf, specifies the Schema definition | +| start_mode.offsets | Config | No | - | The offset required for consumption mode to be specific_offsets. | +| start_mode.timestamp | Long | No | - | The time required for consumption mode to be "timestamp". | +| partition-discovery.interval-millis | Long | No | -1 | The interval for dynamically discovering topics and partitions. | +| common-options | | No | - | Source plugin common parameters, please refer to [Source Common Options](../source-common-options.md) for details | +| protobuf_message_name | String | No | - | Effective when the format is set to protobuf, specifies the Message name | +| protobuf_schema | String | No | - | Effective when the format is set to protobuf, specifies the Schema definition | ## Task Example diff --git a/docs/zh/connector-v2/source/Kafka.md b/docs/zh/connector-v2/source/Kafka.md index 7a25416b3f8..5a7700b2c4b 100644 --- a/docs/zh/connector-v2/source/Kafka.md +++ b/docs/zh/connector-v2/source/Kafka.md @@ -40,7 +40,7 @@ | pattern | Boolean | 否 | false | 如果 `pattern` 设置为 `true`,则会使用指定的正则表达式匹配并订阅主题。 | | consumer.group | String | 否 | SeaTunnel-Consumer-Group | `Kafka 消费者组 ID`,用于区分不同的消费者组。 | | commit_on_checkpoint | Boolean | 否 | true | 如果为 true,消费者的偏移量将会定期在后台提交。 | -| poll.timeout | Long | 否 | 10000L | kafka主动拉取时间间隔。 | +| poll.timeout | Long | 否 | 10000L[millis] | kafka主动拉取时间间隔。 | | kafka.config | Map | 否 | - | 除了上述必要参数外,用户还可以指定多个非强制的消费者客户端参数,覆盖 [Kafka 官方文档](https://kafka.apache.org/documentation.html#consumerconfigs) 中指定的所有消费者参数。 | | schema | Config | 否 | - | 数据结构,包括字段名称和字段类型。 | | format | String | 否 | json | 数据格式。默认格式为 json。可选格式包括 text, canal_json, debezium_json, ogg_json, maxwell_json, avro 和 protobuf。默认字段分隔符为 ", "。如果自定义分隔符,添加 "field_delimiter" 选项。如果使用 canal 格式,请参考 [canal-json](../formats/canal-json.md) 了解详细信息。如果使用 debezium 格式,请参考 [debezium-json](../formats/debezium-json.md)。一些Format的详细信息请参考 [formats](../formats) | From 93d52f4f2a4b419d929e559a6a4ed8888e6c7db8 Mon Sep 17 00:00:00 2001 From: CosmosNi <40288034+CosmosNi@users.noreply.github.com> Date: Mon, 9 Sep 2024 18:16:24 +0800 Subject: [PATCH 3/4] Update docs/zh/connector-v2/source/Kafka.md Co-authored-by: Jia Fan --- docs/zh/connector-v2/source/Kafka.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/zh/connector-v2/source/Kafka.md b/docs/zh/connector-v2/source/Kafka.md index 5a7700b2c4b..44e27215564 100644 --- a/docs/zh/connector-v2/source/Kafka.md +++ b/docs/zh/connector-v2/source/Kafka.md @@ -40,7 +40,7 @@ | pattern | Boolean | 否 | false | 如果 `pattern` 设置为 `true`,则会使用指定的正则表达式匹配并订阅主题。 | | consumer.group | String | 否 | SeaTunnel-Consumer-Group | `Kafka 消费者组 ID`,用于区分不同的消费者组。 | | commit_on_checkpoint | Boolean | 否 | true | 如果为 true,消费者的偏移量将会定期在后台提交。 | -| poll.timeout | Long | 否 | 10000L[millis] | kafka主动拉取时间间隔。 | +| poll.timeout | Long | 否 | 10000 | kafka主动拉取时间间隔(毫秒)。 | | kafka.config | Map | 否 | - | 除了上述必要参数外,用户还可以指定多个非强制的消费者客户端参数,覆盖 [Kafka 官方文档](https://kafka.apache.org/documentation.html#consumerconfigs) 中指定的所有消费者参数。 | | schema | Config | 否 | - | 数据结构,包括字段名称和字段类型。 | | format | String | 否 | json | 数据格式。默认格式为 json。可选格式包括 text, canal_json, debezium_json, ogg_json, maxwell_json, avro 和 protobuf。默认字段分隔符为 ", "。如果自定义分隔符,添加 "field_delimiter" 选项。如果使用 canal 格式,请参考 [canal-json](../formats/canal-json.md) 了解详细信息。如果使用 debezium 格式,请参考 [debezium-json](../formats/debezium-json.md)。一些Format的详细信息请参考 [formats](../formats) | From 4da34d4c31b29e7d7769fe7f1b087ddfe11947c6 Mon Sep 17 00:00:00 2001 From: CosmosNi <40288034+CosmosNi@users.noreply.github.com> Date: Mon, 9 Sep 2024 18:16:32 +0800 Subject: [PATCH 4/4] Update docs/en/connector-v2/source/kafka.md Co-authored-by: Jia Fan --- docs/en/connector-v2/source/kafka.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/en/connector-v2/source/kafka.md b/docs/en/connector-v2/source/kafka.md index 8f1d9d223c0..90c183c2c13 100644 --- a/docs/en/connector-v2/source/kafka.md +++ b/docs/en/connector-v2/source/kafka.md @@ -40,7 +40,7 @@ They can be downloaded via install-plugin.sh or from the Maven central repositor | pattern | Boolean | No | false | If `pattern` is set to `true`,the regular expression for a pattern of topic names to read from. All topics in clients with names that match the specified regular expression will be subscribed by the consumer. | | consumer.group | String | No | SeaTunnel-Consumer-Group | `Kafka consumer group id`, used to distinguish different consumer groups. | | commit_on_checkpoint | Boolean | No | true | If true the consumer's offset will be periodically committed in the background. | -| poll.timeout | Long | No | 10000L[millis] | The interval for poll messages. | +| poll.timeout | Long | No | 10000 | The interval(millis) for poll messages. | | kafka.config | Map | No | - | In addition to the above necessary parameters that must be specified by the `Kafka consumer` client, users can also specify multiple `consumer` client non-mandatory parameters, covering [all consumer parameters specified in the official Kafka document](https://kafka.apache.org/documentation.html#consumerconfigs). | | schema | Config | No | - | The structure of the data, including field names and field types. | | format | String | No | json | Data format. The default format is json. Optional text format, canal_json, debezium_json, maxwell_json, ogg_json, avro and protobuf. If you use json or text format. The default field separator is ", ". If you customize the delimiter, add the "field_delimiter" option.If you use canal format, please refer to [canal-json](../formats/canal-json.md) for details.If you use debezium format, please refer to [debezium-json](../formats/debezium-json.md) for details. Some format details please refer [formats](../formats) |