Skip to content

Commit

Permalink
This is an automated cherry-pick of pingcap#4517
Browse files Browse the repository at this point in the history
Signed-off-by: ti-chi-bot <[email protected]>
  • Loading branch information
3AceShowHand authored and ti-chi-bot committed Feb 15, 2022
1 parent d9af323 commit f87b657
Show file tree
Hide file tree
Showing 3 changed files with 4,915 additions and 111 deletions.
86 changes: 86 additions & 0 deletions cdc/sink/producer/kafka/kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -174,10 +174,20 @@ type kafkaSaramaProducer struct {
// clientLock is used to protect concurrent access of asyncClient and syncClient.
// Since we don't close these two clients (which have an input chan) from the
// sender routine, data race or send on closed chan could happen.
<<<<<<< HEAD
clientLock sync.RWMutex
asyncClient sarama.AsyncProducer
syncClient sarama.SyncProducer
// producersReleased records whether asyncClient and syncClient have been closed properly
=======
clientLock sync.RWMutex
admin kafka.ClusterAdminClient
client sarama.Client
asyncProducer sarama.AsyncProducer
syncProducer sarama.SyncProducer

// producersReleased records whether asyncProducer and syncProducer have been closed properly
>>>>>>> 28fe713de (cdc/sink: kafka sink integrate broker level metrics (#4517))
producersReleased bool
topic string
partitionNum int32
Expand Down Expand Up @@ -357,6 +367,21 @@ func (k *kafkaSaramaProducer) Close() error {
} else {
log.Info("sync client closed", zap.Duration("duration", time.Since(start)))
}
<<<<<<< HEAD
=======

start = time.Now()
if err := k.admin.Close(); err != nil {
log.Warn("close kafka cluster admin with error", zap.Error(err),
zap.Duration("duration", time.Since(start)),
zap.String("changefeed", k.id), zap.Any("role", k.role))
} else {
log.Info("kafka cluster admin closed", zap.Duration("duration", time.Since(start)),
zap.String("changefeed", k.id), zap.Any("role", k.role))
}

k.metricsMonitor.Cleanup()
>>>>>>> 28fe713de (cdc/sink: kafka sink integrate broker level metrics (#4517))
return nil
}

Expand Down Expand Up @@ -402,9 +427,70 @@ func topicPreProcess(topic string, config *Config, saramaConfig *sarama.Config)
if err != nil {
return cerror.WrapError(cerror.ErrKafkaNewSaramaProducer, err)
}
<<<<<<< HEAD
defer func() {
if err := admin.Close(); err != nil {
log.Warn("close kafka cluster admin failed", zap.Error(err))
=======

if err := validateAndCreateTopic(admin, topic, config, cfg, opts); err != nil {
return nil, cerror.WrapError(cerror.ErrKafkaNewSaramaProducer, err)
}

client, err := sarama.NewClient(config.BrokerEndpoints, cfg)
if err != nil {
return nil, cerror.WrapError(cerror.ErrKafkaNewSaramaProducer, err)
}

asyncProducer, err := sarama.NewAsyncProducerFromClient(client)
if err != nil {
return nil, cerror.WrapError(cerror.ErrKafkaNewSaramaProducer, err)
}

syncProducer, err := sarama.NewSyncProducerFromClient(client)
if err != nil {
return nil, cerror.WrapError(cerror.ErrKafkaNewSaramaProducer, err)
}

notifier := new(notify.Notifier)
flushedReceiver, err := notifier.NewReceiver(50 * time.Millisecond)
if err != nil {
return nil, err
}
k := &kafkaSaramaProducer{
admin: admin,
client: client,
asyncProducer: asyncProducer,
syncProducer: syncProducer,
topic: topic,
partitionNum: config.PartitionNum,
partitionOffset: make([]struct {
flushed uint64
sent uint64
}, config.PartitionNum),
flushedNotifier: notifier,
flushedReceiver: flushedReceiver,
closeCh: make(chan struct{}),
failpointCh: make(chan error, 1),
closing: kafkaProducerRunning,

id: changefeedID,
role: role,

metricsMonitor: NewSaramaMetricsMonitor(cfg.MetricRegistry,
util.CaptureAddrFromCtx(ctx), changefeedID, admin),
}
go func() {
if err := k.run(ctx); err != nil && errors.Cause(err) != context.Canceled {
select {
case <-ctx.Done():
return
case errCh <- err:
default:
log.Error("error channel is full", zap.Error(err),
zap.String("changefeed", k.id), zap.Any("role", role))
}
>>>>>>> 28fe713de (cdc/sink: kafka sink integrate broker level metrics (#4517))
}
}()

Expand Down
Loading

0 comments on commit f87b657

Please sign in to comment.