diff --git a/cdc/sink/producer/kafka/config.go b/cdc/sink/producer/kafka/config.go index b5f1b3f93f9..8099a2814d8 100644 --- a/cdc/sink/producer/kafka/config.go +++ b/cdc/sink/producer/kafka/config.go @@ -188,18 +188,6 @@ func CompleteConfigsAndOpts(sinkURI *url.URL, producerConfig *Config, replicaCon producerConfig.AutoCreate = autoCreate } - s = params.Get("enable-tidb-extension") - if s != "" { - _, err := strconv.ParseBool(s) - if err != nil { - return err - } - if replicaConfig.Sink.Protocol != "canal-json" { - return cerror.WrapError(cerror.ErrKafkaInvalidConfig, errors.New("enable-tidb-extension only support canal-json protocol")) - } - opts["enable-tidb-extension"] = s - } - s = params.Get("dial-timeout") if s != "" { a, err := time.ParseDuration(s) diff --git a/cdc/sink/producer/kafka/config_test.go b/cdc/sink/producer/kafka/config_test.go index 2a10aaf4ab5..05d297d0be7 100644 --- a/cdc/sink/producer/kafka/config_test.go +++ b/cdc/sink/producer/kafka/config_test.go @@ -183,29 +183,6 @@ func (s *kafkaSuite) TestCompleteConfigByOpts(c *check.C) { cfg = NewConfig() err = CompleteConfigsAndOpts(sinkURI, cfg, config.GetDefaultReplicaConfig(), opts) c.Assert(errors.Cause(err), check.ErrorMatches, ".*invalid partition num.*") - - // Use enable-tidb-extension on other protocols. - uri = "kafka://127.0.0.1:9092/abc?kafka-version=2.6.0&partition-num=1&enable-tidb-extension=true" - sinkURI, err = url.Parse(uri) - c.Assert(err, check.IsNil) - cfg = NewConfig() - err = CompleteConfigsAndOpts(sinkURI, cfg, config.GetDefaultReplicaConfig(), opts) - c.Assert(errors.Cause(err), check.ErrorMatches, ".*enable-tidb-extension only support canal-json protocol.*") - - // Test enable-tidb-extension. - uri = "kafka://127.0.0.1:9092/abc?enable-tidb-extension=true&protocol=canal-json" - sinkURI, err = url.Parse(uri) - c.Assert(err, check.IsNil) - cfg = NewConfig() - opts = make(map[string]string) - err = CompleteConfigsAndOpts(sinkURI, cfg, config.GetDefaultReplicaConfig(), opts) - c.Assert(err, check.IsNil) - expectedOpts = map[string]string{ - "enable-tidb-extension": "true", - } - for k, v := range opts { - c.Assert(v, check.Equals, expectedOpts[k]) - } } func (s *kafkaSuite) TestSetPartitionNum(c *check.C) {