diff --git a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/sink/KafkaSinkFactory.java b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/sink/KafkaSinkFactory.java index 3fbf6bb99bd..fe6965132d2 100644 --- a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/sink/KafkaSinkFactory.java +++ b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/sink/KafkaSinkFactory.java @@ -23,12 +23,9 @@ import org.apache.seatunnel.api.table.factory.TableSinkFactory; import org.apache.seatunnel.api.table.factory.TableSinkFactoryContext; import org.apache.seatunnel.connectors.seatunnel.kafka.config.Config; -import org.apache.seatunnel.connectors.seatunnel.kafka.config.MessageFormat; import com.google.auto.service.AutoService; -import java.util.Arrays; - @AutoService(Factory.class) public class KafkaSinkFactory implements TableSinkFactory { @Override @@ -39,17 +36,9 @@ public String factoryIdentifier() { @Override public OptionRule optionRule() { return OptionRule.builder() - .required(Config.FORMAT, Config.BOOTSTRAP_SERVERS) - .conditional( - Config.FORMAT, - Arrays.asList( - MessageFormat.JSON, - MessageFormat.CANAL_JSON, - MessageFormat.TEXT, - MessageFormat.OGG_JSON, - MessageFormat.AVRO), - Config.TOPIC) + .required(Config.TOPIC, Config.BOOTSTRAP_SERVERS) .optional( + Config.FORMAT, Config.KAFKA_CONFIG, Config.ASSIGN_PARTITIONS, Config.TRANSACTION_PREFIX,