Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

metrics: add data flow metrics (#2763) #2831

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions cdc/entry/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,10 +33,18 @@ var (
Help: "Bucketed histogram of processing time (s) of unmarshal and mount in mounter.",
Buckets: prometheus.ExponentialBuckets(0.000001, 10, 10),
}, []string{"capture", "changefeed"})
totalRowsCountGauge = prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Namespace: "ticdc",
Subsystem: "mounter",
Name: "total_rows_count",
Help: "The total count of rows that are processed by mounter",
}, []string{"capture", "changefeed"})
)

// InitMetrics registers all metrics in this file
func InitMetrics(registry *prometheus.Registry) {
registry.MustRegister(mounterInputChanSizeGauge)
registry.MustRegister(mountDuration)
registry.MustRegister(totalRowsCountGauge)
}
6 changes: 6 additions & 0 deletions cdc/entry/mounter.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,11 @@ func (m *mounterImpl) codecWorker(ctx context.Context, index int) error {
captureAddr := util.CaptureAddrFromCtx(ctx)
changefeedID := util.ChangefeedIDFromCtx(ctx)
metricMountDuration := mountDuration.WithLabelValues(captureAddr, changefeedID)
metricTotalRows := totalRowsCountGauge.WithLabelValues(captureAddr, changefeedID)
defer func() {
mountDuration.DeleteLabelValues(captureAddr, changefeedID)
totalRowsCountGauge.DeleteLabelValues(captureAddr, changefeedID)
}()

for {
var pEvent *model.PolymorphicEvent
Expand All @@ -138,6 +143,7 @@ func (m *mounterImpl) codecWorker(ctx context.Context, index int) error {
pEvent.RawKV.OldValue = nil
pEvent.PrepareFinished()
metricMountDuration.Observe(time.Since(startTime).Seconds())
metricTotalRows.Inc()
}
}

Expand Down
2 changes: 1 addition & 1 deletion cdc/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -1229,7 +1229,7 @@ func runProcessor(
cancel()
return nil, errors.Trace(err)
}
sinkManager := sink.NewManager(ctx, s, errCh, checkpointTs)
sinkManager := sink.NewManager(ctx, s, errCh, checkpointTs, captureInfo.AdvertiseAddr, changefeedID)
processor, err := newProcessor(ctx, pdCli, grpcPool, session, info, sinkManager,
changefeedID, captureInfo, checkpointTs, errCh, flushCheckpointInterval)
if err != nil {
Expand Down
2 changes: 2 additions & 0 deletions cdc/processor/pipeline/sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"github.com/pingcap/ticdc/cdc/sink"
cerror "github.com/pingcap/ticdc/pkg/errors"
"github.com/pingcap/ticdc/pkg/pipeline"
"go.uber.org/zap"
)

const (
Expand Down Expand Up @@ -150,6 +151,7 @@ func (n *sinkNode) flushSink(ctx pipeline.NodeContext, resolvedTs model.Ts) (err

func (n *sinkNode) emitEvent(ctx pipeline.NodeContext, event *model.PolymorphicEvent) error {
if event == nil || event.Row == nil {
log.Warn("skip emit empty rows", zap.Any("event", event))
return nil
}

Expand Down
3 changes: 2 additions & 1 deletion cdc/processor/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -286,7 +286,8 @@ func (p *processor) lazyInitImpl(ctx cdcContext.Context) error {
return errors.Trace(err)
}
checkpointTs := p.changefeed.Info.GetCheckpointTs(p.changefeed.Status)
p.sinkManager = sink.NewManager(stdCtx, s, errCh, checkpointTs)
captureAddr := ctx.GlobalVars().CaptureInfo.AdvertiseAddr
p.sinkManager = sink.NewManager(stdCtx, s, errCh, checkpointTs, captureAddr, p.changefeedID)
p.initialized = true
log.Info("run processor", cdcContext.ZapFieldCapture(ctx), cdcContext.ZapFieldChangefeed(ctx))
return nil
Expand Down
31 changes: 26 additions & 5 deletions cdc/sink/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"github.com/pingcap/log"
"github.com/pingcap/ticdc/cdc/model"
"github.com/pingcap/ticdc/pkg/util"
"github.com/prometheus/client_golang/prometheus"
"go.uber.org/zap"
)

Expand All @@ -42,16 +43,26 @@ type Manager struct {
flushMu sync.Mutex

drawbackChan chan drawbackMsg

captureAddr string
changefeedID model.ChangeFeedID
metricsTableSinkTotalRows prometheus.Counter
}

// NewManager creates a new Sink manager
func NewManager(ctx context.Context, backendSink Sink, errCh chan error, checkpointTs model.Ts) *Manager {
func NewManager(
ctx context.Context, backendSink Sink, errCh chan error, checkpointTs model.Ts,
captureAddr string, changefeedID model.ChangeFeedID,
) *Manager {
drawbackChan := make(chan drawbackMsg, 16)
return &Manager{
backendSink: newBufferSink(ctx, backendSink, errCh, checkpointTs, drawbackChan),
checkpointTs: checkpointTs,
tableSinks: make(map[model.TableID]*tableSink),
drawbackChan: drawbackChan,
backendSink: newBufferSink(ctx, backendSink, errCh, checkpointTs, drawbackChan),
checkpointTs: checkpointTs,
tableSinks: make(map[model.TableID]*tableSink),
drawbackChan: drawbackChan,
captureAddr: captureAddr,
changefeedID: changefeedID,
metricsTableSinkTotalRows: tableSinkTotalRowsCountCounter.WithLabelValues(captureAddr, changefeedID),
}
}

Expand All @@ -74,6 +85,7 @@ func (m *Manager) CreateTableSink(tableID model.TableID, checkpointTs model.Ts)

// Close closes the Sink manager and backend Sink, this method can be reentrantly called
func (m *Manager) Close(ctx context.Context) error {
tableSinkTotalRowsCountCounter.DeleteLabelValues(m.captureAddr, m.changefeedID)
return m.backendSink.Close(ctx)
}

Expand Down Expand Up @@ -142,6 +154,7 @@ func (t *tableSink) Initialize(ctx context.Context, tableInfo []*model.SimpleTab

func (t *tableSink) EmitRowChangedEvents(ctx context.Context, rows ...*model.RowChangedEvent) error {
t.buffer = append(t.buffer, rows...)
t.manager.metricsTableSinkTotalRows.Add(float64(len(rows)))
return nil
}

Expand Down Expand Up @@ -227,6 +240,13 @@ func (b *bufferSink) run(ctx context.Context, errCh chan error) {
metricFlushDuration := flushRowChangedDuration.WithLabelValues(advertiseAddr, changefeedID, "Flush")
metricEmitRowDuration := flushRowChangedDuration.WithLabelValues(advertiseAddr, changefeedID, "EmitRow")
metricBufferSize := bufferChanSizeGauge.WithLabelValues(advertiseAddr, changefeedID)
metricTotalRows := bufferSinkTotalRowsCountCounter.WithLabelValues(advertiseAddr, changefeedID)
defer func() {
flushRowChangedDuration.DeleteLabelValues(advertiseAddr, changefeedID, "Flush")
flushRowChangedDuration.DeleteLabelValues(advertiseAddr, changefeedID, "EmitRow")
bufferChanSizeGauge.DeleteLabelValues(advertiseAddr, changefeedID)
bufferSinkTotalRowsCountCounter.DeleteLabelValues(advertiseAddr, changefeedID)
}()
for {
select {
case <-ctx.Done():
Expand All @@ -247,6 +267,7 @@ func (b *bufferSink) run(ctx context.Context, errCh chan error) {
i := sort.Search(len(rows), func(i int) bool {
return rows[i].CommitTs > resolvedTs
})
metricTotalRows.Add(float64(i))

start := time.Now()
err := b.Sink.EmitRowChangedEvents(ctx, rows[:i]...)
Expand Down
8 changes: 4 additions & 4 deletions cdc/sink/manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ func (s *managerSuite) TestManagerRandom(c *check.C) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
errCh := make(chan error, 16)
manager := NewManager(ctx, &checkSink{C: c}, errCh, 0)
manager := NewManager(ctx, &checkSink{C: c}, errCh, 0, "", "")
defer manager.Close(ctx)
goroutineNum := 10
rowNum := 100
Expand Down Expand Up @@ -146,7 +146,7 @@ func (s *managerSuite) TestManagerAddRemoveTable(c *check.C) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
errCh := make(chan error, 16)
manager := NewManager(ctx, &checkSink{C: c}, errCh, 0)
manager := NewManager(ctx, &checkSink{C: c}, errCh, 0, "", "")
defer manager.Close(ctx)
goroutineNum := 200
var wg sync.WaitGroup
Expand Down Expand Up @@ -231,7 +231,7 @@ func (s *managerSuite) TestManagerDestroyTableSink(c *check.C) {
defer cancel()

errCh := make(chan error, 16)
manager := NewManager(ctx, &checkSink{C: c}, errCh, 0)
manager := NewManager(ctx, &checkSink{C: c}, errCh, 0, "", "")
defer manager.Close(ctx)

tableID := int64(49)
Expand Down Expand Up @@ -284,7 +284,7 @@ func (s *managerSuite) TestManagerError(c *check.C) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
errCh := make(chan error, 16)
manager := NewManager(ctx, &errorSink{C: c}, errCh, 0)
manager := NewManager(ctx, &errorSink{C: c}, errCh, 0, "", "")
defer manager.Close(ctx)
sink := manager.CreateTableSink(1, 0)
err := sink.EmitRowChangedEvents(ctx, &model.RowChangedEvent{
Expand Down
22 changes: 20 additions & 2 deletions cdc/sink/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,14 +69,14 @@ var (
Namespace: "ticdc",
Subsystem: "sink",
Name: "total_rows_count",
Help: "totla count of rows",
Help: "The total count of rows that are processed by sink",
}, []string{"capture", "changefeed"})
totalFlushedRowsCountGauge = prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Namespace: "ticdc",
Subsystem: "sink",
Name: "total_flushed_rows_count",
Help: "totla count of flushed rows",
Help: "The total count of rows that are flushed by sink",
}, []string{"capture", "changefeed"})
flushRowChangedDuration = prometheus.NewHistogramVec(
prometheus.HistogramOpts{
Expand All @@ -93,6 +93,22 @@ var (
Name: "buffer_chan_size",
Help: "size of row changed event buffer channel in sink manager",
}, []string{"capture", "changefeed"})

tableSinkTotalRowsCountCounter = prometheus.NewCounterVec(
prometheus.CounterOpts{
Namespace: "ticdc",
Subsystem: "sink",
Name: "table_sink_total_rows_count",
Help: "The total count of rows that are processed by table sink",
}, []string{"capture", "changefeed"})

bufferSinkTotalRowsCountCounter = prometheus.NewCounterVec(
prometheus.CounterOpts{
Namespace: "ticdc",
Subsystem: "sink",
Name: "buffer_sink_total_rows_count",
Help: "The total count of rows that are processed by buffer sink",
}, []string{"capture", "changefeed"})
)

// InitMetrics registers all metrics in this file
Expand All @@ -107,4 +123,6 @@ func InitMetrics(registry *prometheus.Registry) {
registry.MustRegister(totalFlushedRowsCountGauge)
registry.MustRegister(flushRowChangedDuration)
registry.MustRegister(bufferChanSizeGauge)
registry.MustRegister(tableSinkTotalRowsCountCounter)
registry.MustRegister(bufferSinkTotalRowsCountCounter)
}
9 changes: 7 additions & 2 deletions cdc/sink/statistics.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,11 +44,16 @@ func NewStatistics(ctx context.Context, name string, opts map[string]string) *St
statistics.metricExecErrCnt = executionErrorCounter.WithLabelValues(statistics.captureAddr, statistics.changefeedID)

// Flush metrics in background for better accuracy and efficiency.
captureAddr, changefeedID := statistics.captureAddr, statistics.changefeedID
ticker := time.NewTicker(flushMetricsInterval)
metricTotalRows := totalRowsCountGauge.WithLabelValues(statistics.captureAddr, statistics.changefeedID)
metricTotalFlushedRows := totalFlushedRowsCountGauge.WithLabelValues(statistics.captureAddr, statistics.changefeedID)
go func() {
defer ticker.Stop()
metricTotalRows := totalRowsCountGauge.WithLabelValues(captureAddr, changefeedID)
metricTotalFlushedRows := totalFlushedRowsCountGauge.WithLabelValues(captureAddr, changefeedID)
defer func() {
totalRowsCountGauge.DeleteLabelValues(captureAddr, changefeedID)
totalFlushedRowsCountGauge.DeleteLabelValues(captureAddr, changefeedID)
}()
for {
select {
case <-ctx.Done():
Expand Down
Loading