Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

cdc/metrics: Integrate sarama producer metrics #4520

Merged
Merged
Show file tree
Hide file tree
Changes from 43 commits
Commits
Show all changes
48 commits
Select commit Hold shift + click to select a range
ed177f2
add a metrics file.
3AceShowHand Feb 8, 2022
bb6e2a6
add basic definition of sarama metrics.
3AceShowHand Feb 8, 2022
eaa8cf2
add basic definition of sarama metrics.
3AceShowHand Feb 8, 2022
4d44eaf
add new for support sarama metrics.
3AceShowHand Feb 8, 2022
1e9b167
only producer metrics amt.
3AceShowHand Feb 8, 2022
0195c1c
add metrics.
3AceShowHand Feb 8, 2022
f474909
fix metrics.
3AceShowHand Feb 8, 2022
2c090cb
add more metrics.
3AceShowHand Feb 8, 2022
4c2502b
add more metrics.
3AceShowHand Feb 8, 2022
020609e
add some label.
3AceShowHand Feb 8, 2022
4f02741
add metrics.
3AceShowHand Feb 8, 2022
9d661d9
fix the metrics.
3AceShowHand Feb 9, 2022
5a914b5
refine.
3AceShowHand Feb 9, 2022
25fe874
nitpick
3AceShowHand Feb 9, 2022
381b37d
add basic metrics.
3AceShowHand Feb 9, 2022
99625a0
add basic sarama producer grafana.
3AceShowHand Feb 9, 2022
202c083
fix .
3AceShowHand Feb 9, 2022
5937508
fix.
3AceShowHand Feb 9, 2022
4ab8e1a
add deleteLabelValus method.
3AceShowHand Feb 9, 2022
de0a429
add rawSaramaMetrics.
3AceShowHand Feb 9, 2022
d88c44f
merge master, fix conflicts.
3AceShowHand Feb 9, 2022
a409013
remove unncessary code .
3AceShowHand Feb 9, 2022
69dd4e8
add compression-ratio to metrics.
3AceShowHand Feb 9, 2022
e26eba1
only support metrics for producer now.
3AceShowHand Feb 10, 2022
42b5072
remove printMetrics.
3AceShowHand Feb 10, 2022
7b23547
fix no captures found.
3AceShowHand Feb 10, 2022
0c77be4
remove captureAddr from create changefeed option.
3AceShowHand Feb 10, 2022
96653f0
fix cleanup metrics.
3AceShowHand Feb 10, 2022
b53c4b2
remove fix.
3AceShowHand Feb 10, 2022
f7b8a4c
fix on metrics.
3AceShowHand Feb 10, 2022
1847c3b
add new kafka sink row.
3AceShowHand Feb 10, 2022
d6d990e
refine grafana json file.
3AceShowHand Feb 10, 2022
f1d70ba
hard code the metrics fetch.
3AceShowHand Feb 10, 2022
e7a313d
hard code the metrics fetch.
3AceShowHand Feb 10, 2022
92e73e7
fix for compression ratio metrics.
3AceShowHand Feb 10, 2022
ee8d4de
put changefeed id and role into ddl sink.
3AceShowHand Feb 10, 2022
91646e5
put changefeed id and role into ddl sink.
3AceShowHand Feb 10, 2022
853bd32
put changefeed into context .
3AceShowHand Feb 10, 2022
01c50f1
defer the ticker.
3AceShowHand Feb 10, 2022
f962a70
fix lint.
3AceShowHand Feb 10, 2022
05b4755
Merge branch 'master' into integrate-sarama-producer-metrics
3AceShowHand Feb 11, 2022
4794bed
set metric name as constant.
3AceShowHand Feb 11, 2022
aed5fb0
use metric name directly.
3AceShowHand Feb 11, 2022
ca27355
remove unncessary code change
3AceShowHand Feb 11, 2022
768457a
remove unncessary code change
3AceShowHand Feb 11, 2022
4087c2a
fix etcd test .
3AceShowHand Feb 11, 2022
c4e1491
Merge branch 'master' into integrate-sarama-producer-metrics
ti-chi-bot Feb 11, 2022
bb510b3
Merge branch 'master' into integrate-sarama-producer-metrics
ti-chi-bot Feb 11, 2022
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions cdc/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"github.com/pingcap/tiflow/cdc/puller"
redowriter "github.com/pingcap/tiflow/cdc/redo/writer"
"github.com/pingcap/tiflow/cdc/sink"
"github.com/pingcap/tiflow/cdc/sink/producer/kafka"
"github.com/pingcap/tiflow/cdc/sorter"
"github.com/pingcap/tiflow/cdc/sorter/leveldb"
"github.com/pingcap/tiflow/cdc/sorter/memory"
Expand Down Expand Up @@ -59,4 +60,5 @@ func init() {
leveldb.InitMetrics(registry)
redowriter.InitMetrics(registry)
db.InitMetrics(registry)
kafka.InitMetrics(registry)
}
9 changes: 6 additions & 3 deletions cdc/owner/ddl_sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
cdcContext "github.com/pingcap/tiflow/pkg/context"
cerror "github.com/pingcap/tiflow/pkg/errors"
"github.com/pingcap/tiflow/pkg/filter"
"github.com/pingcap/tiflow/pkg/util"
"go.uber.org/zap"
)

Expand Down Expand Up @@ -90,7 +91,9 @@ func ddlSinkInitializer(ctx cdcContext.Context, a *ddlSinkImpl, id model.ChangeF
return errors.Trace(err)
}

s, err := sink.New(ctx, id, info.SinkURI, filter, info.Config, info.Opts, a.errCh)
stdCtx := util.PutChangefeedIDInCtx(ctx, id)
stdCtx = util.PutRoleInCtx(stdCtx, util.RoleOwner)
s, err := sink.New(stdCtx, id, info.SinkURI, filter, info.Config, info.Opts, a.errCh)
if err != nil {
return errors.Trace(err)
}
Expand All @@ -99,13 +102,13 @@ func ddlSinkInitializer(ctx cdcContext.Context, a *ddlSinkImpl, id model.ChangeF
if !info.SyncPointEnabled {
return nil
}
syncPointStore, err := sink.NewSyncpointStore(ctx, id, info.SinkURI)
syncPointStore, err := sink.NewSyncpointStore(stdCtx, id, info.SinkURI)
if err != nil {
return errors.Trace(err)
}
a.syncPointStore = syncPointStore

if err := a.syncPointStore.CreateSynctable(ctx); err != nil {
if err := a.syncPointStore.CreateSynctable(stdCtx); err != nil {
return errors.Trace(err)
}
return nil
Expand Down
15 changes: 15 additions & 0 deletions cdc/sink/producer/kafka/kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,9 @@ import (
const (
// defaultPartitionNum specifies the default number of partitions when we create the topic.
defaultPartitionNum = 3

// flushMetricsInterval specifies the interval of refresh sarama metrics.
flushMetricsInterval = 5 * time.Second
)

const (
Expand Down Expand Up @@ -75,6 +78,8 @@ type kafkaSaramaProducer struct {

role util.Role
id model.ChangeFeedID

metricsMonitor *saramaMetricsMonitor
}

type kafkaProducerClosingFlag = int32
Expand Down Expand Up @@ -256,6 +261,8 @@ func (k *kafkaSaramaProducer) Close() error {
log.Info("sync client closed", zap.Duration("duration", time.Since(start)),
zap.String("changefeed", k.id), zap.Any("role", k.role))
}

k.metricsMonitor.Cleanup()
return nil
}

Expand All @@ -266,12 +273,17 @@ func (k *kafkaSaramaProducer) run(ctx context.Context) error {
zap.String("changefeed", k.id), zap.Any("role", k.role))
k.stop()
}()

ticker := time.NewTicker(flushMetricsInterval)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
return ctx.Err()
case <-k.closeCh:
return nil
case <-ticker.C:
k.metricsMonitor.CollectMetrics()
case err := <-k.failpointCh:
log.Warn("receive from failpoint chan", zap.Error(err),
zap.String("changefeed", k.id), zap.Any("role", k.role))
Expand Down Expand Up @@ -366,6 +378,9 @@ func NewKafkaSaramaProducer(ctx context.Context, topic string, config *Config,

id: changefeedID,
role: role,

metricsMonitor: NewSaramaMetricsMonitor(cfg.MetricRegistry,
util.CaptureAddrFromCtx(ctx), changefeedID),
}
go func() {
if err := k.run(ctx); err != nil && errors.Cause(err) != context.Canceled {
Expand Down
118 changes: 118 additions & 0 deletions cdc/sink/producer/kafka/metrics.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,118 @@
// Copyright 2022 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.

package kafka

import (
"github.com/prometheus/client_golang/prometheus"
"github.com/rcrowley/go-metrics"
)

var (
// batch-size
batchSizeGauge = prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Namespace: "ticdc",
Subsystem: "sink",
Name: "kafka_producer_batch_size",
Help: "the number of bytes sent per partition per request for all topics",
}, []string{"capture", "changefeed"})

// record-send-rate
recordSendRateGauge = prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Namespace: "ticdc",
Subsystem: "sink",
Name: "kafka_producer_record_send_rate",
Help: "Records/second sent to all topics",
}, []string{"capture", "changefeed"})

// records-per-request
recordPerRequestGauge = prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Namespace: "ticdc",
Subsystem: "sink",
Name: "kafka_producer_records_per_request",
Help: "the number of records sent per request for all topics",
}, []string{"capture", "changefeed"})

// compression-ratio
compressionRatioGauge = prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Namespace: "ticdc",
Subsystem: "sink",
Name: "kafka_producer_compression_ratio",
Help: "the compression ratio times 100 of record batches for all topics",
}, []string{"capture", "changefeed"})
)

// InitMetrics registers all metrics in this file
func InitMetrics(registry *prometheus.Registry) {
registry.MustRegister(batchSizeGauge)
registry.MustRegister(recordSendRateGauge)
registry.MustRegister(recordPerRequestGauge)
registry.MustRegister(compressionRatioGauge)
}

// sarama metrics names, see https://pkg.go.dev/github.com/Shopify/sarama#pkg-overview
const (
batchSizeMetricName = "batch-size"
recordSendRateMetricName = "record-send-rate"
recordPerRequestMetricName = "records-per-request"
compressionRatioMetricName = "compression-ratio"
)

type saramaMetricsMonitor struct {
captureAddr string
changefeedID string

registry metrics.Registry
}

// CollectMetrics collect all monitored metrics
func (sm *saramaMetricsMonitor) CollectMetrics() {
batchSizeMetric := sm.registry.Get(batchSizeMetricName)
if histogram, ok := batchSizeMetric.(metrics.Histogram); ok {
batchSizeGauge.WithLabelValues(sm.captureAddr, sm.changefeedID).Set(histogram.Mean())
}

recordSendRateMetric := sm.registry.Get(recordSendRateMetricName)
if meter, ok := recordSendRateMetric.(metrics.Meter); ok {
recordSendRateGauge.WithLabelValues(sm.captureAddr, sm.changefeedID).Set(meter.Rate1())
}

recordPerRequestMetric := sm.registry.Get(recordPerRequestMetricName)
if histogram, ok := recordPerRequestMetric.(metrics.Histogram); ok {
recordPerRequestGauge.WithLabelValues(sm.captureAddr, sm.changefeedID).Set(histogram.Mean())
}

compressionRatioMetric := sm.registry.Get(compressionRatioMetricName)
if histogram, ok := compressionRatioMetric.(metrics.Histogram); ok {
compressionRatioGauge.WithLabelValues(sm.captureAddr, sm.changefeedID).Set(histogram.Mean())
}
}

func NewSaramaMetricsMonitor(registry metrics.Registry, captureAddr, changefeedID string) *saramaMetricsMonitor {
return &saramaMetricsMonitor{
captureAddr: captureAddr,
changefeedID: changefeedID,
registry: registry,
}
}

func (sm *saramaMetricsMonitor) Cleanup() {
batchSizeGauge.DeleteLabelValues(sm.captureAddr, sm.changefeedID)
recordSendRateGauge.DeleteLabelValues(sm.captureAddr, sm.changefeedID)
recordPerRequestGauge.DeleteLabelValues(sm.captureAddr, sm.changefeedID)
compressionRatioGauge.DeleteLabelValues(sm.captureAddr, sm.changefeedID)
}
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ require (
github.com/prometheus/client_golang v1.7.1
github.com/prometheus/client_model v0.2.0
github.com/r3labs/diff v1.1.0
github.com/rcrowley/go-metrics v0.0.0-20200313005456-10cdbea86bc0
github.com/shopspring/decimal v1.3.0
github.com/soheilhy/cmux v0.1.5
github.com/spf13/cobra v1.2.1
Expand Down
Loading