Skip to content

Commit

Permalink
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
fix mq.
Browse files Browse the repository at this point in the history
3AceShowHand committed Jan 11, 2022
1 parent 70a6df8 commit 58f1ae4
Showing 3 changed files with 28 additions and 34 deletions.
4 changes: 1 addition & 3 deletions cdc/sink/mq.go
Original file line number Diff line number Diff line change
@@ -412,9 +412,7 @@ func newKafkaSaramaSink(ctx context.Context, sinkURI *url.URL, filter *filter.Fi
return nil, cerror.ErrKafkaInvalidConfig.GenWithStack("no topic is specified in sink-uri")
}

var protocol codec.Protocol
protocol.FromString(replicaConfig.Sink.Protocol)
producer, err := kafka.NewKafkaSaramaProducer(ctx, topic, protocol, config, errCh)
producer, err := kafka.NewKafkaSaramaProducer(ctx, topic, config, errCh)
if err != nil {
return nil, errors.Trace(err)
}
48 changes: 22 additions & 26 deletions cdc/sink/producer/kafka/kafka.go
Original file line number Diff line number Diff line change
@@ -387,7 +387,7 @@ func (k *kafkaSaramaProducer) run(ctx context.Context) error {
}
}

func topicPreProcess(topic string, protocol codec.Protocol, config *Config, saramaConfig *sarama.Config) error {
func topicPreProcess(topic string, config *Config, saramaConfig *sarama.Config) error {
// FIXME: find a way to remove this failpoint for workload the unit test
failpoint.Inject("SkipTopicAutoCreate", func() {
failpoint.Return(nil)
@@ -413,17 +413,15 @@ func topicPreProcess(topic string, protocol codec.Protocol, config *Config, sara
// make sure that topic's `max.message.bytes` is not less than given `max-message-bytes`
// else the producer will send message that too large to make topic reject, then changefeed would error.
// only the default `open protocol` and `craft protocol` use `max-message-bytes`, so check this for them.
if protocol == codec.ProtocolDefault || protocol == codec.ProtocolCraft {
topicMaxMessageBytes, err := getTopicMaxMessageBytes(admin, info)
if err != nil {
return cerror.WrapError(cerror.ErrKafkaNewSaramaProducer, err)
}
if topicMaxMessageBytes < config.MaxMessageBytes {
return cerror.ErrKafkaInvalidConfig.GenWithStack(
"topic already exist, and topic's max.message.bytes(%d) less than max-message-bytes(%d)."+
"Please make sure `max-message-bytes` not greater than topic `max.message.bytes`",
topicMaxMessageBytes, config.MaxMessageBytes)
}
topicMaxMessageBytes, err := getTopicMaxMessageBytes(admin, info)
if err != nil {
return cerror.WrapError(cerror.ErrKafkaNewSaramaProducer, err)
}
if topicMaxMessageBytes < config.MaxMessageBytes {
return cerror.ErrKafkaInvalidConfig.GenWithStack(
"topic already exist, and topic's max.message.bytes(%d) less than max-message-bytes(%d)."+
"Please make sure `max-message-bytes` not greater than topic `max.message.bytes`",
topicMaxMessageBytes, config.MaxMessageBytes)
}

// no need to create the topic, but we would have to log user if they found enter wrong topic name later
@@ -446,19 +444,17 @@ func topicPreProcess(topic string, protocol codec.Protocol, config *Config, sara
// when try to create the topic, we don't know how to set the `max.message.bytes` for the topic.
// Kafka would create the topic with broker's `message.max.bytes`,
// we have to make sure it's not greater than `max-message-bytes` for the default open protocol.
if protocol == codec.ProtocolDefault || protocol == codec.ProtocolCraft {
brokerMessageMaxBytes, err := getBrokerMessageMaxBytes(admin)
if err != nil {
log.Warn("TiCDC cannot find `message.max.bytes` from broker's configuration")
return errors.Trace(err)
}
brokerMessageMaxBytes, err := getBrokerMessageMaxBytes(admin)
if err != nil {
log.Warn("TiCDC cannot find `message.max.bytes` from broker's configuration")
return errors.Trace(err)
}

if brokerMessageMaxBytes < config.MaxMessageBytes {
return cerror.ErrKafkaInvalidConfig.GenWithStack(
"broker's message.max.bytes(%d) less than max-message-bytes(%d)"+
"Please make sure `max-message-bytes` not greater than broker's `message.max.bytes`",
brokerMessageMaxBytes, config.MaxMessageBytes)
}
if brokerMessageMaxBytes < config.MaxMessageBytes {
return cerror.ErrKafkaInvalidConfig.GenWithStack(
"broker's message.max.bytes(%d) less than max-message-bytes(%d)"+
"Please make sure `max-message-bytes` not greater than broker's `message.max.bytes`",
brokerMessageMaxBytes, config.MaxMessageBytes)
}

// topic not created yet, and user does not specify the `partition-num` in the sink uri.
@@ -487,14 +483,14 @@ func topicPreProcess(topic string, protocol codec.Protocol, config *Config, sara
var newSaramaConfigImpl = newSaramaConfig

// NewKafkaSaramaProducer creates a kafka sarama producer
func NewKafkaSaramaProducer(ctx context.Context, topic string, protocol codec.Protocol, config *Config, errCh chan error) (*kafkaSaramaProducer, error) {
func NewKafkaSaramaProducer(ctx context.Context, topic string, config *Config, errCh chan error) (*kafkaSaramaProducer, error) {
log.Info("Starting kafka sarama producer ...", zap.Reflect("config", config))
cfg, err := newSaramaConfigImpl(ctx, config)
if err != nil {
return nil, err
}

if err := topicPreProcess(topic, protocol, config, cfg); err != nil {
if err := topicPreProcess(topic, config, cfg); err != nil {
return nil, cerror.WrapError(cerror.ErrKafkaNewSaramaProducer, err)
}

10 changes: 5 additions & 5 deletions cdc/sink/producer/kafka/kafka_test.go
Original file line number Diff line number Diff line change
@@ -152,7 +152,7 @@ func (s *kafkaSuite) TestSaramaProducer(c *check.C) {
_ = failpoint.Disable("github.com/pingcap/tiflow/cdc/sink/producer/kafka/SkipTopicAutoCreate")
}()

producer, err := NewKafkaSaramaProducer(ctx, topic, codec.ProtocolDefault, config, errCh)
producer, err := NewKafkaSaramaProducer(ctx, topic, config, errCh)
c.Assert(err, check.IsNil)
c.Assert(producer.GetPartitionNum(), check.Equals, int32(2))
for i := 0; i < 100; i++ {
@@ -279,7 +279,7 @@ func (s *kafkaSuite) TestTopicPreProcess(c *check.C) {
config.BrokerEndpoints = []string{""}
cfg.Metadata.Retry.Max = 1

err = topicPreProcess(topic, codec.ProtocolDefault, config, cfg)
err = topicPreProcess(topic, config, cfg)
c.Assert(errors.Cause(err), check.Equals, sarama.ErrOutOfBrokers)
}

@@ -348,7 +348,7 @@ func (s *kafkaSuite) TestCreateProducerFailed(c *check.C) {
config.BrokerEndpoints = []string{"127.0.0.1:1111"}
topic := "topic"
c.Assert(failpoint.Enable("github.com/pingcap/tiflow/cdc/sink/producer/kafka/SkipTopicAutoCreate", "return(true)"), check.IsNil)
_, err := NewKafkaSaramaProducer(ctx, topic, codec.ProtocolDefault, config, errCh)
_, err := NewKafkaSaramaProducer(ctx, topic, config, errCh)
c.Assert(errors.Cause(err), check.ErrorMatches, "invalid version.*")
_ = failpoint.Disable("github.com/pingcap/tiflow/cdc/sink/producer/kafka/SkipTopicAutoCreate")
}
@@ -393,7 +393,7 @@ func (s *kafkaSuite) TestProducerSendMessageFailed(c *check.C) {
}()

errCh := make(chan error, 1)
producer, err := NewKafkaSaramaProducer(ctx, topic, codec.ProtocolDefault, config, errCh)
producer, err := NewKafkaSaramaProducer(ctx, topic, config, errCh)
defer func() {
_ = failpoint.Disable("github.com/pingcap/tiflow/cdc/sink/producer/kafka/SkipTopicAutoCreate")
err := producer.Close()
@@ -458,7 +458,7 @@ func (s *kafkaSuite) TestProducerDoubleClose(c *check.C) {
c.Assert(failpoint.Enable("github.com/pingcap/tiflow/cdc/sink/producer/kafka/SkipTopicAutoCreate", "return(true)"), check.IsNil)

errCh := make(chan error, 1)
producer, err := NewKafkaSaramaProducer(ctx, topic, codec.ProtocolDefault, config, errCh)
producer, err := NewKafkaSaramaProducer(ctx, topic, config, errCh)
defer func() {
err := producer.Close()
c.Assert(err, check.IsNil)

0 comments on commit 58f1ae4

Please sign in to comment.