From 58f1ae476aeac154fb7cc0b7ae5ec222b99455d1 Mon Sep 17 00:00:00 2001 From: 3AceShowHand Date: Tue, 11 Jan 2022 19:17:14 +0800 Subject: [PATCH] fix mq. --- cdc/sink/mq.go | 4 +-- cdc/sink/producer/kafka/kafka.go | 48 ++++++++++++--------------- cdc/sink/producer/kafka/kafka_test.go | 10 +++--- 3 files changed, 28 insertions(+), 34 deletions(-) diff --git a/cdc/sink/mq.go b/cdc/sink/mq.go index 3701ba60ca9..999a1402c85 100644 --- a/cdc/sink/mq.go +++ b/cdc/sink/mq.go @@ -412,9 +412,7 @@ func newKafkaSaramaSink(ctx context.Context, sinkURI *url.URL, filter *filter.Fi return nil, cerror.ErrKafkaInvalidConfig.GenWithStack("no topic is specified in sink-uri") } - var protocol codec.Protocol - protocol.FromString(replicaConfig.Sink.Protocol) - producer, err := kafka.NewKafkaSaramaProducer(ctx, topic, protocol, config, errCh) + producer, err := kafka.NewKafkaSaramaProducer(ctx, topic, config, errCh) if err != nil { return nil, errors.Trace(err) } diff --git a/cdc/sink/producer/kafka/kafka.go b/cdc/sink/producer/kafka/kafka.go index b9e194e61de..eab55b678bf 100644 --- a/cdc/sink/producer/kafka/kafka.go +++ b/cdc/sink/producer/kafka/kafka.go @@ -387,7 +387,7 @@ func (k *kafkaSaramaProducer) run(ctx context.Context) error { } } -func topicPreProcess(topic string, protocol codec.Protocol, config *Config, saramaConfig *sarama.Config) error { +func topicPreProcess(topic string, config *Config, saramaConfig *sarama.Config) error { // FIXME: find a way to remove this failpoint for workload the unit test failpoint.Inject("SkipTopicAutoCreate", func() { failpoint.Return(nil) @@ -413,17 +413,15 @@ func topicPreProcess(topic string, protocol codec.Protocol, config *Config, sara // make sure that topic's `max.message.bytes` is not less than given `max-message-bytes` // else the producer will send message that too large to make topic reject, then changefeed would error. // only the default `open protocol` and `craft protocol` use `max-message-bytes`, so check this for them. - if protocol == codec.ProtocolDefault || protocol == codec.ProtocolCraft { - topicMaxMessageBytes, err := getTopicMaxMessageBytes(admin, info) - if err != nil { - return cerror.WrapError(cerror.ErrKafkaNewSaramaProducer, err) - } - if topicMaxMessageBytes < config.MaxMessageBytes { - return cerror.ErrKafkaInvalidConfig.GenWithStack( - "topic already exist, and topic's max.message.bytes(%d) less than max-message-bytes(%d)."+ - "Please make sure `max-message-bytes` not greater than topic `max.message.bytes`", - topicMaxMessageBytes, config.MaxMessageBytes) - } + topicMaxMessageBytes, err := getTopicMaxMessageBytes(admin, info) + if err != nil { + return cerror.WrapError(cerror.ErrKafkaNewSaramaProducer, err) + } + if topicMaxMessageBytes < config.MaxMessageBytes { + return cerror.ErrKafkaInvalidConfig.GenWithStack( + "topic already exist, and topic's max.message.bytes(%d) less than max-message-bytes(%d)."+ + "Please make sure `max-message-bytes` not greater than topic `max.message.bytes`", + topicMaxMessageBytes, config.MaxMessageBytes) } // no need to create the topic, but we would have to log user if they found enter wrong topic name later @@ -446,19 +444,17 @@ func topicPreProcess(topic string, protocol codec.Protocol, config *Config, sara // when try to create the topic, we don't know how to set the `max.message.bytes` for the topic. // Kafka would create the topic with broker's `message.max.bytes`, // we have to make sure it's not greater than `max-message-bytes` for the default open protocol. - if protocol == codec.ProtocolDefault || protocol == codec.ProtocolCraft { - brokerMessageMaxBytes, err := getBrokerMessageMaxBytes(admin) - if err != nil { - log.Warn("TiCDC cannot find `message.max.bytes` from broker's configuration") - return errors.Trace(err) - } + brokerMessageMaxBytes, err := getBrokerMessageMaxBytes(admin) + if err != nil { + log.Warn("TiCDC cannot find `message.max.bytes` from broker's configuration") + return errors.Trace(err) + } - if brokerMessageMaxBytes < config.MaxMessageBytes { - return cerror.ErrKafkaInvalidConfig.GenWithStack( - "broker's message.max.bytes(%d) less than max-message-bytes(%d)"+ - "Please make sure `max-message-bytes` not greater than broker's `message.max.bytes`", - brokerMessageMaxBytes, config.MaxMessageBytes) - } + if brokerMessageMaxBytes < config.MaxMessageBytes { + return cerror.ErrKafkaInvalidConfig.GenWithStack( + "broker's message.max.bytes(%d) less than max-message-bytes(%d)"+ + "Please make sure `max-message-bytes` not greater than broker's `message.max.bytes`", + brokerMessageMaxBytes, config.MaxMessageBytes) } // topic not created yet, and user does not specify the `partition-num` in the sink uri. @@ -487,14 +483,14 @@ func topicPreProcess(topic string, protocol codec.Protocol, config *Config, sara var newSaramaConfigImpl = newSaramaConfig // NewKafkaSaramaProducer creates a kafka sarama producer -func NewKafkaSaramaProducer(ctx context.Context, topic string, protocol codec.Protocol, config *Config, errCh chan error) (*kafkaSaramaProducer, error) { +func NewKafkaSaramaProducer(ctx context.Context, topic string, config *Config, errCh chan error) (*kafkaSaramaProducer, error) { log.Info("Starting kafka sarama producer ...", zap.Reflect("config", config)) cfg, err := newSaramaConfigImpl(ctx, config) if err != nil { return nil, err } - if err := topicPreProcess(topic, protocol, config, cfg); err != nil { + if err := topicPreProcess(topic, config, cfg); err != nil { return nil, cerror.WrapError(cerror.ErrKafkaNewSaramaProducer, err) } diff --git a/cdc/sink/producer/kafka/kafka_test.go b/cdc/sink/producer/kafka/kafka_test.go index b0226b58bad..8f0c1b40d33 100644 --- a/cdc/sink/producer/kafka/kafka_test.go +++ b/cdc/sink/producer/kafka/kafka_test.go @@ -152,7 +152,7 @@ func (s *kafkaSuite) TestSaramaProducer(c *check.C) { _ = failpoint.Disable("github.com/pingcap/tiflow/cdc/sink/producer/kafka/SkipTopicAutoCreate") }() - producer, err := NewKafkaSaramaProducer(ctx, topic, codec.ProtocolDefault, config, errCh) + producer, err := NewKafkaSaramaProducer(ctx, topic, config, errCh) c.Assert(err, check.IsNil) c.Assert(producer.GetPartitionNum(), check.Equals, int32(2)) for i := 0; i < 100; i++ { @@ -279,7 +279,7 @@ func (s *kafkaSuite) TestTopicPreProcess(c *check.C) { config.BrokerEndpoints = []string{""} cfg.Metadata.Retry.Max = 1 - err = topicPreProcess(topic, codec.ProtocolDefault, config, cfg) + err = topicPreProcess(topic, config, cfg) c.Assert(errors.Cause(err), check.Equals, sarama.ErrOutOfBrokers) } @@ -348,7 +348,7 @@ func (s *kafkaSuite) TestCreateProducerFailed(c *check.C) { config.BrokerEndpoints = []string{"127.0.0.1:1111"} topic := "topic" c.Assert(failpoint.Enable("github.com/pingcap/tiflow/cdc/sink/producer/kafka/SkipTopicAutoCreate", "return(true)"), check.IsNil) - _, err := NewKafkaSaramaProducer(ctx, topic, codec.ProtocolDefault, config, errCh) + _, err := NewKafkaSaramaProducer(ctx, topic, config, errCh) c.Assert(errors.Cause(err), check.ErrorMatches, "invalid version.*") _ = failpoint.Disable("github.com/pingcap/tiflow/cdc/sink/producer/kafka/SkipTopicAutoCreate") } @@ -393,7 +393,7 @@ func (s *kafkaSuite) TestProducerSendMessageFailed(c *check.C) { }() errCh := make(chan error, 1) - producer, err := NewKafkaSaramaProducer(ctx, topic, codec.ProtocolDefault, config, errCh) + producer, err := NewKafkaSaramaProducer(ctx, topic, config, errCh) defer func() { _ = failpoint.Disable("github.com/pingcap/tiflow/cdc/sink/producer/kafka/SkipTopicAutoCreate") err := producer.Close() @@ -458,7 +458,7 @@ func (s *kafkaSuite) TestProducerDoubleClose(c *check.C) { c.Assert(failpoint.Enable("github.com/pingcap/tiflow/cdc/sink/producer/kafka/SkipTopicAutoCreate", "return(true)"), check.IsNil) errCh := make(chan error, 1) - producer, err := NewKafkaSaramaProducer(ctx, topic, codec.ProtocolDefault, config, errCh) + producer, err := NewKafkaSaramaProducer(ctx, topic, config, errCh) defer func() { err := producer.Close() c.Assert(err, check.IsNil)