Skip to content

Commit

Permalink
cdc/sink: Kafka support user set configuration (pingcap#4512)
Browse files Browse the repository at this point in the history
  • Loading branch information
3AceShowHand committed Feb 22, 2022
1 parent bd34fd4 commit b6aef04
Show file tree
Hide file tree
Showing 5 changed files with 70 additions and 34 deletions.
12 changes: 0 additions & 12 deletions cdc/processor/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -1048,17 +1048,6 @@ func (p *processor) Close() error {
}
p.cancel()
p.wg.Wait()
<<<<<<< HEAD
// mark tables share the same cdcContext with its original table, don't need to cancel
failpoint.Inject("processorStopDelay", nil)
resolvedTsGauge.DeleteLabelValues(p.changefeedID, p.captureInfo.AdvertiseAddr)
resolvedTsLagGauge.DeleteLabelValues(p.changefeedID, p.captureInfo.AdvertiseAddr)
checkpointTsGauge.DeleteLabelValues(p.changefeedID, p.captureInfo.AdvertiseAddr)
checkpointTsLagGauge.DeleteLabelValues(p.changefeedID, p.captureInfo.AdvertiseAddr)
syncTableNumGauge.DeleteLabelValues(p.changefeedID, p.captureInfo.AdvertiseAddr)
processorErrorCounter.DeleteLabelValues(p.changefeedID, p.captureInfo.AdvertiseAddr)
processorSchemaStorageGcTsGauge.DeleteLabelValues(p.changefeedID, p.captureInfo.AdvertiseAddr)
=======

if p.newSchedulerEnabled {
if p.agent == nil {
Expand All @@ -1071,7 +1060,6 @@ func (p *processor) Close() error {
}

// sink close might be time-consuming, do it the last.
>>>>>>> 1c1015b01 (sink(cdc): kafka producer use default configuration. (#4359))
if p.sinkManager != nil {
// pass a canceled context is ok here, since we don't need to wait Close
ctx, cancel := context.WithCancel(context.Background())
Expand Down
4 changes: 0 additions & 4 deletions cdc/sink/mq.go
Original file line number Diff line number Diff line change
Expand Up @@ -259,13 +259,9 @@ func (k *mqSink) EmitDDLEvent(ctx context.Context, ddl *model.DDLEvent) error {
}

k.statistics.AddDDLCount()
<<<<<<< HEAD
log.Debug("emit ddl event", zap.String("query", ddl.Query), zap.Uint64("commit-ts", ddl.CommitTs), zap.Int32("partition", partition))
=======
log.Debug("emit ddl event", zap.String("query", ddl.Query),
zap.Uint64("commitTs", ddl.CommitTs), zap.Int32("partition", partition),
zap.String("changefeed", k.id), zap.Any("role", k.role))
>>>>>>> 1c1015b01 (sink(cdc): kafka producer use default configuration. (#4359))
err = k.writeToProducer(ctx, msg, codec.EncoderNeedSyncWrite, partition)
return errors.Trace(err)
}
Expand Down
38 changes: 35 additions & 3 deletions cdc/sink/producer/kafka/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,11 @@ type Config struct {
SaslScram *security.SaslScram
// control whether to create topic
AutoCreate bool

// Timeout for sarama `config.Net` configurations, default to `10s`
DialTimeout time.Duration
WriteTimeout time.Duration
ReadTimeout time.Duration
}

// NewConfig returns a default Kafka configuration
Expand All @@ -59,6 +64,9 @@ func NewConfig() *Config {
Credential: &security.Credential{},
SaslScram: &security.SaslScram{},
AutoCreate: true,
DialTimeout: 10 * time.Second,
WriteTimeout: 10 * time.Second,
ReadTimeout: 10 * time.Second,
}
}

Expand Down Expand Up @@ -197,6 +205,33 @@ func CompleteConfigsAndOpts(sinkURI *url.URL, producerConfig *Config, replicaCon
opts["enable-tidb-extension"] = s
}

s = params.Get("dial-timeout")
if s != "" {
a, err := time.ParseDuration(s)
if err != nil {
return err
}
producerConfig.DialTimeout = a
}

s = params.Get("write-timeout")
if s != "" {
a, err := time.ParseDuration(s)
if err != nil {
return err
}
producerConfig.WriteTimeout = a
}

s = params.Get("read-timeout")
if s != "" {
a, err := time.ParseDuration(s)
if err != nil {
return err
}
producerConfig.ReadTimeout = a
}

return nil
}

Expand Down Expand Up @@ -233,8 +268,6 @@ func newSaramaConfig(ctx context.Context, c *Config) (*sarama.Config, error) {
// This Timeout is useless if the `RefreshMetadata` time cost is less than it.
config.Metadata.Timeout = 1 * time.Minute

<<<<<<< HEAD
=======
// Admin.Retry take effect on `ClusterAdmin` related operations,
// only `CreateTopic` for cdc now. set the `Timeout` to `1m` to make CI stable.
config.Admin.Retry.Max = 5
Expand All @@ -258,7 +291,6 @@ func newSaramaConfig(ctx context.Context, c *Config) (*sarama.Config, error) {
config.Net.WriteTimeout = c.WriteTimeout
config.Net.ReadTimeout = c.ReadTimeout

>>>>>>> 1c1015b01 (sink(cdc): kafka producer use default configuration. (#4359))
config.Producer.Partitioner = sarama.NewManualPartitioner
config.Producer.MaxMessageBytes = c.MaxMessageBytes
config.Producer.Return.Successes = true
Expand Down
37 changes: 35 additions & 2 deletions cdc/sink/producer/kafka/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"context"
"fmt"
"net/url"
"time"

"github.com/Shopify/sarama"
"github.com/pingcap/check"
Expand All @@ -35,7 +36,6 @@ func (s *kafkaSuite) TestNewSaramaConfig(c *check.C) {
config.Version = "invalid"
_, err := newSaramaConfigImpl(ctx, config)
c.Assert(errors.Cause(err), check.ErrorMatches, "invalid version.*")

ctx = util.SetOwnerInCtx(ctx)
config.Version = "2.6.0"
config.ClientID = "^invalid$"
Expand Down Expand Up @@ -84,8 +84,41 @@ func (s *kafkaSuite) TestNewSaramaConfig(c *check.C) {
c.Assert(cfg.Net.SASL.Mechanism, check.Equals, sarama.SASLMechanism("SCRAM-SHA-256"))
}

func (s *kafkaSuite) TestConfigTimeouts(c *check.C) {
defer testleak.AfterTest(c)()

cfg := NewConfig()
c.Assert(cfg.DialTimeout, check.Equals, 10*time.Second)
c.Assert(cfg.ReadTimeout, check.Equals, 10*time.Second)
c.Assert(cfg.WriteTimeout, check.Equals, 10*time.Second)

saramaConfig, err := newSaramaConfig(context.Background(), cfg)
c.Assert(err, check.IsNil)
c.Assert(saramaConfig.Net.DialTimeout, check.Equals, cfg.DialTimeout)
c.Assert(saramaConfig.Net.WriteTimeout, check.Equals, cfg.WriteTimeout)
c.Assert(saramaConfig.Net.ReadTimeout, check.Equals, cfg.ReadTimeout)

uri := "kafka://127.0.0.1:9092/kafka-test?dial-timeout=5s&read-timeout=1000ms" +
"&write-timeout=2m"
sinkURI, err := url.Parse(uri)
c.Assert(err, check.IsNil)
opts := make(map[string]string)
err = CompleteConfigsAndOpts(sinkURI, cfg, config.GetDefaultReplicaConfig(), opts)
c.Assert(err, check.IsNil)

c.Assert(cfg.DialTimeout, check.Equals, 5*time.Second)
c.Assert(cfg.ReadTimeout, check.Equals, 1000*time.Millisecond)
c.Assert(cfg.WriteTimeout, check.Equals, 2*time.Minute)

saramaConfig, err = newSaramaConfig(context.Background(), cfg)
c.Assert(err, check.IsNil)
c.Assert(saramaConfig.Net.DialTimeout, check.Equals, 5*time.Second)
c.Assert(saramaConfig.Net.ReadTimeout, check.Equals, 1000*time.Millisecond)
c.Assert(saramaConfig.Net.WriteTimeout, check.Equals, 2*time.Minute)
}

func (s *kafkaSuite) TestCompleteConfigByOpts(c *check.C) {
defer testleak.AfterTest(c)
defer testleak.AfterTest(c)()
cfg := NewConfig()

// Normal config.
Expand Down
13 changes: 0 additions & 13 deletions cdc/sink/producer/kafka/kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -219,18 +219,6 @@ func (k *kafkaSaramaProducer) Close() error {
return nil
}
k.producersReleased = true
<<<<<<< HEAD
// In fact close sarama sync client doesn't return any error.
// But close async client returns error if error channel is not empty, we
// don't populate this error to the upper caller, just add a log here.
err1 := k.syncClient.Close()
err2 := k.asyncClient.Close()
if err1 != nil {
log.Error("close sync client with error", zap.Error(err1))
}
if err2 != nil {
log.Error("close async client with error", zap.Error(err2))
=======

// `client` is mainly used by `asyncProducer` to fetch metadata and other related
// operations. When we close the `kafkaSaramaProducer`, TiCDC no need to make sure
Expand Down Expand Up @@ -267,7 +255,6 @@ func (k *kafkaSaramaProducer) Close() error {
} else {
log.Info("sync client closed", zap.Duration("duration", time.Since(start)),
zap.String("changefeed", k.id), zap.Any("role", k.role))
>>>>>>> 1c1015b01 (sink(cdc): kafka producer use default configuration. (#4359))
}
return nil
}
Expand Down

0 comments on commit b6aef04

Please sign in to comment.