From add2936aafe9e2f18402b3e973219dc16bd0e451 Mon Sep 17 00:00:00 2001 From: jarvis Date: Mon, 8 Apr 2024 08:02:59 +0800 Subject: [PATCH 1/3] [Fix][Kafka-Sink] fix kafka sink factory option rule --- .../seatunnel/kafka/sink/KafkaSinkFactory.java | 15 ++------------- 1 file changed, 2 insertions(+), 13 deletions(-) diff --git a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/sink/KafkaSinkFactory.java b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/sink/KafkaSinkFactory.java index 3fbf6bb99bd..fe6965132d2 100644 --- a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/sink/KafkaSinkFactory.java +++ b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/sink/KafkaSinkFactory.java @@ -23,12 +23,9 @@ import org.apache.seatunnel.api.table.factory.TableSinkFactory; import org.apache.seatunnel.api.table.factory.TableSinkFactoryContext; import org.apache.seatunnel.connectors.seatunnel.kafka.config.Config; -import org.apache.seatunnel.connectors.seatunnel.kafka.config.MessageFormat; import com.google.auto.service.AutoService; -import java.util.Arrays; - @AutoService(Factory.class) public class KafkaSinkFactory implements TableSinkFactory { @Override @@ -39,17 +36,9 @@ public String factoryIdentifier() { @Override public OptionRule optionRule() { return OptionRule.builder() - .required(Config.FORMAT, Config.BOOTSTRAP_SERVERS) - .conditional( - Config.FORMAT, - Arrays.asList( - MessageFormat.JSON, - MessageFormat.CANAL_JSON, - MessageFormat.TEXT, - MessageFormat.OGG_JSON, - MessageFormat.AVRO), - Config.TOPIC) + .required(Config.TOPIC, Config.BOOTSTRAP_SERVERS) .optional( + Config.FORMAT, Config.KAFKA_CONFIG, Config.ASSIGN_PARTITIONS, Config.TRANSACTION_PREFIX, From f622f5559135a929094e630bc79c3203122c416d Mon Sep 17 00:00:00 2001 From: jarvis Date: Mon, 8 Apr 2024 15:39:52 +0800 Subject: [PATCH 2/3] [Fix][Kafka-Sink] fix kafka sink factory option rule --- .../seatunnel/kafka/sink/KafkaSinkFactory.java | 14 ++++++++++++++ 1 file changed, 14 insertions(+) diff --git a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/sink/KafkaSinkFactory.java b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/sink/KafkaSinkFactory.java index fe6965132d2..c373bba07e2 100644 --- a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/sink/KafkaSinkFactory.java +++ b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/sink/KafkaSinkFactory.java @@ -23,9 +23,12 @@ import org.apache.seatunnel.api.table.factory.TableSinkFactory; import org.apache.seatunnel.api.table.factory.TableSinkFactoryContext; import org.apache.seatunnel.connectors.seatunnel.kafka.config.Config; +import org.apache.seatunnel.connectors.seatunnel.kafka.config.MessageFormat; import com.google.auto.service.AutoService; +import java.util.Arrays; + @AutoService(Factory.class) public class KafkaSinkFactory implements TableSinkFactory { @Override @@ -45,6 +48,17 @@ public OptionRule optionRule() { Config.SEMANTICS, Config.PARTITION, Config.PARTITION_KEY_FIELDS) + .conditional( + Config.FORMAT, + Arrays.asList( + MessageFormat.JSON, + MessageFormat.TEXT, + MessageFormat.CANAL_JSON, + MessageFormat.DEBEZIUM_JSON, + MessageFormat.COMPATIBLE_DEBEZIUM_JSON, + MessageFormat.COMPATIBLE_KAFKA_CONNECT_JSON, + MessageFormat.OGG_JSON, + MessageFormat.AVRO)) .build(); } From b5f87cd1c318cbda97d4c50c50fecbba669e2563 Mon Sep 17 00:00:00 2001 From: jarvis Date: Fri, 12 Apr 2024 10:46:39 +0800 Subject: [PATCH 3/3] remove conditional option --- .../seatunnel/kafka/sink/KafkaSinkFactory.java | 14 -------------- 1 file changed, 14 deletions(-) diff --git a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/sink/KafkaSinkFactory.java b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/sink/KafkaSinkFactory.java index c373bba07e2..fe6965132d2 100644 --- a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/sink/KafkaSinkFactory.java +++ b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/sink/KafkaSinkFactory.java @@ -23,12 +23,9 @@ import org.apache.seatunnel.api.table.factory.TableSinkFactory; import org.apache.seatunnel.api.table.factory.TableSinkFactoryContext; import org.apache.seatunnel.connectors.seatunnel.kafka.config.Config; -import org.apache.seatunnel.connectors.seatunnel.kafka.config.MessageFormat; import com.google.auto.service.AutoService; -import java.util.Arrays; - @AutoService(Factory.class) public class KafkaSinkFactory implements TableSinkFactory { @Override @@ -48,17 +45,6 @@ public OptionRule optionRule() { Config.SEMANTICS, Config.PARTITION, Config.PARTITION_KEY_FIELDS) - .conditional( - Config.FORMAT, - Arrays.asList( - MessageFormat.JSON, - MessageFormat.TEXT, - MessageFormat.CANAL_JSON, - MessageFormat.DEBEZIUM_JSON, - MessageFormat.COMPATIBLE_DEBEZIUM_JSON, - MessageFormat.COMPATIBLE_KAFKA_CONNECT_JSON, - MessageFormat.OGG_JSON, - MessageFormat.AVRO)) .build(); }