Skip to content

Commit

Permalink
cdc/metrics: Integrate sarama producer metrics (#4520)
Browse files Browse the repository at this point in the history
close #4561
  • Loading branch information
3AceShowHand authored Feb 11, 2022
1 parent 7bcfae4 commit 8a709d7
Show file tree
Hide file tree
Showing 7 changed files with 1,357 additions and 172 deletions.
2 changes: 2 additions & 0 deletions cdc/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"github.com/pingcap/tiflow/cdc/puller"
redowriter "github.com/pingcap/tiflow/cdc/redo/writer"
"github.com/pingcap/tiflow/cdc/sink"
"github.com/pingcap/tiflow/cdc/sink/producer/kafka"
"github.com/pingcap/tiflow/cdc/sorter"
"github.com/pingcap/tiflow/cdc/sorter/leveldb"
"github.com/pingcap/tiflow/cdc/sorter/memory"
Expand Down Expand Up @@ -59,4 +60,5 @@ func init() {
leveldb.InitMetrics(registry)
redowriter.InitMetrics(registry)
db.InitMetrics(registry)
kafka.InitMetrics(registry)
}
9 changes: 6 additions & 3 deletions cdc/owner/ddl_sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
cdcContext "github.com/pingcap/tiflow/pkg/context"
cerror "github.com/pingcap/tiflow/pkg/errors"
"github.com/pingcap/tiflow/pkg/filter"
"github.com/pingcap/tiflow/pkg/util"
"go.uber.org/zap"
)

Expand Down Expand Up @@ -90,7 +91,9 @@ func ddlSinkInitializer(ctx cdcContext.Context, a *ddlSinkImpl, id model.ChangeF
return errors.Trace(err)
}

s, err := sink.New(ctx, id, info.SinkURI, filter, info.Config, info.Opts, a.errCh)
stdCtx := util.PutChangefeedIDInCtx(ctx, id)
stdCtx = util.PutRoleInCtx(stdCtx, util.RoleOwner)
s, err := sink.New(stdCtx, id, info.SinkURI, filter, info.Config, info.Opts, a.errCh)
if err != nil {
return errors.Trace(err)
}
Expand All @@ -99,13 +102,13 @@ func ddlSinkInitializer(ctx cdcContext.Context, a *ddlSinkImpl, id model.ChangeF
if !info.SyncPointEnabled {
return nil
}
syncPointStore, err := sink.NewSyncpointStore(ctx, id, info.SinkURI)
syncPointStore, err := sink.NewSyncpointStore(stdCtx, id, info.SinkURI)
if err != nil {
return errors.Trace(err)
}
a.syncPointStore = syncPointStore

if err := a.syncPointStore.CreateSynctable(ctx); err != nil {
if err := a.syncPointStore.CreateSynctable(stdCtx); err != nil {
return errors.Trace(err)
}
return nil
Expand Down
15 changes: 15 additions & 0 deletions cdc/sink/producer/kafka/kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,9 @@ import (
const (
// defaultPartitionNum specifies the default number of partitions when we create the topic.
defaultPartitionNum = 3

// flushMetricsInterval specifies the interval of refresh sarama metrics.
flushMetricsInterval = 5 * time.Second
)

const (
Expand Down Expand Up @@ -75,6 +78,8 @@ type kafkaSaramaProducer struct {

role util.Role
id model.ChangeFeedID

metricsMonitor *saramaMetricsMonitor
}

type kafkaProducerClosingFlag = int32
Expand Down Expand Up @@ -256,6 +261,8 @@ func (k *kafkaSaramaProducer) Close() error {
log.Info("sync client closed", zap.Duration("duration", time.Since(start)),
zap.String("changefeed", k.id), zap.Any("role", k.role))
}

k.metricsMonitor.Cleanup()
return nil
}

Expand All @@ -266,12 +273,17 @@ func (k *kafkaSaramaProducer) run(ctx context.Context) error {
zap.String("changefeed", k.id), zap.Any("role", k.role))
k.stop()
}()

ticker := time.NewTicker(flushMetricsInterval)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
return ctx.Err()
case <-k.closeCh:
return nil
case <-ticker.C:
k.metricsMonitor.CollectMetrics()
case err := <-k.failpointCh:
log.Warn("receive from failpoint chan", zap.Error(err),
zap.String("changefeed", k.id), zap.Any("role", k.role))
Expand Down Expand Up @@ -366,6 +378,9 @@ func NewKafkaSaramaProducer(ctx context.Context, topic string, config *Config,

id: changefeedID,
role: role,

metricsMonitor: NewSaramaMetricsMonitor(cfg.MetricRegistry,
util.CaptureAddrFromCtx(ctx), changefeedID),
}
go func() {
if err := k.run(ctx); err != nil && errors.Cause(err) != context.Canceled {
Expand Down
118 changes: 118 additions & 0 deletions cdc/sink/producer/kafka/metrics.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,118 @@
// Copyright 2022 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.

package kafka

import (
"github.com/prometheus/client_golang/prometheus"
"github.com/rcrowley/go-metrics"
)

var (
// batch-size
batchSizeGauge = prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Namespace: "ticdc",
Subsystem: "sink",
Name: "kafka_producer_batch_size",
Help: "the number of bytes sent per partition per request for all topics",
}, []string{"capture", "changefeed"})

// record-send-rate
recordSendRateGauge = prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Namespace: "ticdc",
Subsystem: "sink",
Name: "kafka_producer_record_send_rate",
Help: "Records/second sent to all topics",
}, []string{"capture", "changefeed"})

// records-per-request
recordPerRequestGauge = prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Namespace: "ticdc",
Subsystem: "sink",
Name: "kafka_producer_records_per_request",
Help: "the number of records sent per request for all topics",
}, []string{"capture", "changefeed"})

// compression-ratio
compressionRatioGauge = prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Namespace: "ticdc",
Subsystem: "sink",
Name: "kafka_producer_compression_ratio",
Help: "the compression ratio times 100 of record batches for all topics",
}, []string{"capture", "changefeed"})
)

// InitMetrics registers all metrics in this file
func InitMetrics(registry *prometheus.Registry) {
registry.MustRegister(batchSizeGauge)
registry.MustRegister(recordSendRateGauge)
registry.MustRegister(recordPerRequestGauge)
registry.MustRegister(compressionRatioGauge)
}

// sarama metrics names, see https://pkg.go.dev/github.com/Shopify/sarama#pkg-overview
const (
batchSizeMetricName = "batch-size"
recordSendRateMetricName = "record-send-rate"
recordPerRequestMetricName = "records-per-request"
compressionRatioMetricName = "compression-ratio"
)

type saramaMetricsMonitor struct {
captureAddr string
changefeedID string

registry metrics.Registry
}

// CollectMetrics collect all monitored metrics
func (sm *saramaMetricsMonitor) CollectMetrics() {
batchSizeMetric := sm.registry.Get(batchSizeMetricName)
if histogram, ok := batchSizeMetric.(metrics.Histogram); ok {
batchSizeGauge.WithLabelValues(sm.captureAddr, sm.changefeedID).Set(histogram.Mean())
}

recordSendRateMetric := sm.registry.Get(recordSendRateMetricName)
if meter, ok := recordSendRateMetric.(metrics.Meter); ok {
recordSendRateGauge.WithLabelValues(sm.captureAddr, sm.changefeedID).Set(meter.Rate1())
}

recordPerRequestMetric := sm.registry.Get(recordPerRequestMetricName)
if histogram, ok := recordPerRequestMetric.(metrics.Histogram); ok {
recordPerRequestGauge.WithLabelValues(sm.captureAddr, sm.changefeedID).Set(histogram.Mean())
}

compressionRatioMetric := sm.registry.Get(compressionRatioMetricName)
if histogram, ok := compressionRatioMetric.(metrics.Histogram); ok {
compressionRatioGauge.WithLabelValues(sm.captureAddr, sm.changefeedID).Set(histogram.Mean())
}
}

func NewSaramaMetricsMonitor(registry metrics.Registry, captureAddr, changefeedID string) *saramaMetricsMonitor {
return &saramaMetricsMonitor{
captureAddr: captureAddr,
changefeedID: changefeedID,
registry: registry,
}
}

func (sm *saramaMetricsMonitor) Cleanup() {
batchSizeGauge.DeleteLabelValues(sm.captureAddr, sm.changefeedID)
recordSendRateGauge.DeleteLabelValues(sm.captureAddr, sm.changefeedID)
recordPerRequestGauge.DeleteLabelValues(sm.captureAddr, sm.changefeedID)
compressionRatioGauge.DeleteLabelValues(sm.captureAddr, sm.changefeedID)
}
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ require (
github.com/prometheus/client_golang v1.7.1
github.com/prometheus/client_model v0.2.0
github.com/r3labs/diff v1.1.0
github.com/rcrowley/go-metrics v0.0.0-20200313005456-10cdbea86bc0
github.com/shopspring/decimal v1.3.0
github.com/soheilhy/cmux v0.1.5
github.com/spf13/cobra v1.2.1
Expand Down
Loading

0 comments on commit 8a709d7

Please sign in to comment.