Skip to content

Commit

Permalink
metrics: add data flow metrics (#2763) (#2829)
Browse files Browse the repository at this point in the history
  • Loading branch information
ti-chi-bot authored Sep 15, 2021
1 parent 11e574c commit 756dfc9
Show file tree
Hide file tree
Showing 10 changed files with 1,337 additions and 321 deletions.
8 changes: 8 additions & 0 deletions cdc/entry/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,10 +33,18 @@ var (
Help: "Bucketed histogram of processing time (s) of unmarshal and mount in mounter.",
Buckets: prometheus.ExponentialBuckets(0.000001, 10, 10),
}, []string{"capture", "changefeed"})
totalRowsCountGauge = prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Namespace: "ticdc",
Subsystem: "mounter",
Name: "total_rows_count",
Help: "The total count of rows that are processed by mounter",
}, []string{"capture", "changefeed"})
)

// InitMetrics registers all metrics in this file
func InitMetrics(registry *prometheus.Registry) {
registry.MustRegister(mounterInputChanSizeGauge)
registry.MustRegister(mountDuration)
registry.MustRegister(totalRowsCountGauge)
}
6 changes: 6 additions & 0 deletions cdc/entry/mounter.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,11 @@ func (m *mounterImpl) codecWorker(ctx context.Context, index int) error {
captureAddr := util.CaptureAddrFromCtx(ctx)
changefeedID := util.ChangefeedIDFromCtx(ctx)
metricMountDuration := mountDuration.WithLabelValues(captureAddr, changefeedID)
metricTotalRows := totalRowsCountGauge.WithLabelValues(captureAddr, changefeedID)
defer func() {
mountDuration.DeleteLabelValues(captureAddr, changefeedID)
totalRowsCountGauge.DeleteLabelValues(captureAddr, changefeedID)
}()

for {
var pEvent *model.PolymorphicEvent
Expand All @@ -136,6 +141,7 @@ func (m *mounterImpl) codecWorker(ctx context.Context, index int) error {
pEvent.RawKV.OldValue = nil
pEvent.PrepareFinished()
metricMountDuration.Observe(time.Since(startTime).Seconds())
metricTotalRows.Inc()
}
}

Expand Down
2 changes: 1 addition & 1 deletion cdc/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -1435,7 +1435,7 @@ func runProcessor(
cancel()
return nil, errors.Trace(err)
}
sinkManager := sink.NewManager(ctx, s, errCh, checkpointTs)
sinkManager := sink.NewManager(ctx, s, errCh, checkpointTs, captureInfo.AdvertiseAddr, changefeedID)
processor, err := newProcessor(ctx, pdCli, grpcPool, session, info, sinkManager,
changefeedID, captureInfo, checkpointTs, errCh, flushCheckpointInterval)
if err != nil {
Expand Down
2 changes: 2 additions & 0 deletions cdc/processor/pipeline/sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"github.com/pingcap/ticdc/cdc/sink"
cerror "github.com/pingcap/ticdc/pkg/errors"
"github.com/pingcap/ticdc/pkg/pipeline"
"go.uber.org/zap"
)

const (
Expand Down Expand Up @@ -150,6 +151,7 @@ func (n *sinkNode) flushSink(ctx pipeline.NodeContext, resolvedTs model.Ts) (err

func (n *sinkNode) emitEvent(ctx pipeline.NodeContext, event *model.PolymorphicEvent) error {
if event == nil || event.Row == nil {
log.Warn("skip emit empty rows", zap.Any("event", event))
return nil
}

Expand Down
3 changes: 2 additions & 1 deletion cdc/processor/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -286,7 +286,8 @@ func (p *processor) lazyInitImpl(ctx cdcContext.Context) error {
return errors.Trace(err)
}
checkpointTs := p.changefeed.Info.GetCheckpointTs(p.changefeed.Status)
p.sinkManager = sink.NewManager(stdCtx, s, errCh, checkpointTs)
captureAddr := ctx.GlobalVars().CaptureInfo.AdvertiseAddr
p.sinkManager = sink.NewManager(stdCtx, s, errCh, checkpointTs, captureAddr, p.changefeedID)
p.initialized = true
log.Info("run processor", cdcContext.ZapFieldCapture(ctx), cdcContext.ZapFieldChangefeed(ctx))
return nil
Expand Down
31 changes: 26 additions & 5 deletions cdc/sink/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"github.com/pingcap/log"
"github.com/pingcap/ticdc/cdc/model"
"github.com/pingcap/ticdc/pkg/util"
"github.com/prometheus/client_golang/prometheus"
"go.uber.org/zap"
)

Expand All @@ -42,16 +43,26 @@ type Manager struct {
flushMu sync.Mutex

drawbackChan chan drawbackMsg

captureAddr string
changefeedID model.ChangeFeedID
metricsTableSinkTotalRows prometheus.Counter
}

// NewManager creates a new Sink manager
func NewManager(ctx context.Context, backendSink Sink, errCh chan error, checkpointTs model.Ts) *Manager {
func NewManager(
ctx context.Context, backendSink Sink, errCh chan error, checkpointTs model.Ts,
captureAddr string, changefeedID model.ChangeFeedID,
) *Manager {
drawbackChan := make(chan drawbackMsg, 16)
return &Manager{
backendSink: newBufferSink(ctx, backendSink, errCh, checkpointTs, drawbackChan),
checkpointTs: checkpointTs,
tableSinks: make(map[model.TableID]*tableSink),
drawbackChan: drawbackChan,
backendSink: newBufferSink(ctx, backendSink, errCh, checkpointTs, drawbackChan),
checkpointTs: checkpointTs,
tableSinks: make(map[model.TableID]*tableSink),
drawbackChan: drawbackChan,
captureAddr: captureAddr,
changefeedID: changefeedID,
metricsTableSinkTotalRows: tableSinkTotalRowsCountCounter.WithLabelValues(captureAddr, changefeedID),
}
}

Expand All @@ -74,6 +85,7 @@ func (m *Manager) CreateTableSink(tableID model.TableID, checkpointTs model.Ts)

// Close closes the Sink manager and backend Sink, this method can be reentrantly called
func (m *Manager) Close(ctx context.Context) error {
tableSinkTotalRowsCountCounter.DeleteLabelValues(m.captureAddr, m.changefeedID)
return m.backendSink.Close(ctx)
}

Expand Down Expand Up @@ -142,6 +154,7 @@ func (t *tableSink) Initialize(ctx context.Context, tableInfo []*model.SimpleTab

func (t *tableSink) EmitRowChangedEvents(ctx context.Context, rows ...*model.RowChangedEvent) error {
t.buffer = append(t.buffer, rows...)
t.manager.metricsTableSinkTotalRows.Add(float64(len(rows)))
return nil
}

Expand Down Expand Up @@ -227,6 +240,13 @@ func (b *bufferSink) run(ctx context.Context, errCh chan error) {
metricFlushDuration := flushRowChangedDuration.WithLabelValues(advertiseAddr, changefeedID, "Flush")
metricEmitRowDuration := flushRowChangedDuration.WithLabelValues(advertiseAddr, changefeedID, "EmitRow")
metricBufferSize := bufferChanSizeGauge.WithLabelValues(advertiseAddr, changefeedID)
metricTotalRows := bufferSinkTotalRowsCountCounter.WithLabelValues(advertiseAddr, changefeedID)
defer func() {
flushRowChangedDuration.DeleteLabelValues(advertiseAddr, changefeedID, "Flush")
flushRowChangedDuration.DeleteLabelValues(advertiseAddr, changefeedID, "EmitRow")
bufferChanSizeGauge.DeleteLabelValues(advertiseAddr, changefeedID)
bufferSinkTotalRowsCountCounter.DeleteLabelValues(advertiseAddr, changefeedID)
}()
for {
select {
case <-ctx.Done():
Expand All @@ -247,6 +267,7 @@ func (b *bufferSink) run(ctx context.Context, errCh chan error) {
i := sort.Search(len(rows), func(i int) bool {
return rows[i].CommitTs > resolvedTs
})
metricTotalRows.Add(float64(i))

start := time.Now()
err := b.Sink.EmitRowChangedEvents(ctx, rows[:i]...)
Expand Down
8 changes: 4 additions & 4 deletions cdc/sink/manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ func (s *managerSuite) TestManagerRandom(c *check.C) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
errCh := make(chan error, 16)
manager := NewManager(ctx, &checkSink{C: c}, errCh, 0)
manager := NewManager(ctx, &checkSink{C: c}, errCh, 0, "", "")
defer manager.Close(ctx)
goroutineNum := 10
rowNum := 100
Expand Down Expand Up @@ -146,7 +146,7 @@ func (s *managerSuite) TestManagerAddRemoveTable(c *check.C) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
errCh := make(chan error, 16)
manager := NewManager(ctx, &checkSink{C: c}, errCh, 0)
manager := NewManager(ctx, &checkSink{C: c}, errCh, 0, "", "")
defer manager.Close(ctx)
goroutineNum := 200
var wg sync.WaitGroup
Expand Down Expand Up @@ -231,7 +231,7 @@ func (s *managerSuite) TestManagerDestroyTableSink(c *check.C) {
defer cancel()

errCh := make(chan error, 16)
manager := NewManager(ctx, &checkSink{C: c}, errCh, 0)
manager := NewManager(ctx, &checkSink{C: c}, errCh, 0, "", "")
defer manager.Close(ctx)

tableID := int64(49)
Expand Down Expand Up @@ -284,7 +284,7 @@ func (s *managerSuite) TestManagerError(c *check.C) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
errCh := make(chan error, 16)
manager := NewManager(ctx, &errorSink{C: c}, errCh, 0)
manager := NewManager(ctx, &errorSink{C: c}, errCh, 0, "", "")
defer manager.Close(ctx)
sink := manager.CreateTableSink(1, 0)
err := sink.EmitRowChangedEvents(ctx, &model.RowChangedEvent{
Expand Down
22 changes: 20 additions & 2 deletions cdc/sink/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,14 +69,14 @@ var (
Namespace: "ticdc",
Subsystem: "sink",
Name: "total_rows_count",
Help: "totla count of rows",
Help: "The total count of rows that are processed by sink",
}, []string{"capture", "changefeed"})
totalFlushedRowsCountGauge = prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Namespace: "ticdc",
Subsystem: "sink",
Name: "total_flushed_rows_count",
Help: "totla count of flushed rows",
Help: "The total count of rows that are flushed by sink",
}, []string{"capture", "changefeed"})
flushRowChangedDuration = prometheus.NewHistogramVec(
prometheus.HistogramOpts{
Expand All @@ -93,6 +93,22 @@ var (
Name: "buffer_chan_size",
Help: "size of row changed event buffer channel in sink manager",
}, []string{"capture", "changefeed"})

tableSinkTotalRowsCountCounter = prometheus.NewCounterVec(
prometheus.CounterOpts{
Namespace: "ticdc",
Subsystem: "sink",
Name: "table_sink_total_rows_count",
Help: "The total count of rows that are processed by table sink",
}, []string{"capture", "changefeed"})

bufferSinkTotalRowsCountCounter = prometheus.NewCounterVec(
prometheus.CounterOpts{
Namespace: "ticdc",
Subsystem: "sink",
Name: "buffer_sink_total_rows_count",
Help: "The total count of rows that are processed by buffer sink",
}, []string{"capture", "changefeed"})
)

// InitMetrics registers all metrics in this file
Expand All @@ -107,4 +123,6 @@ func InitMetrics(registry *prometheus.Registry) {
registry.MustRegister(totalFlushedRowsCountGauge)
registry.MustRegister(flushRowChangedDuration)
registry.MustRegister(bufferChanSizeGauge)
registry.MustRegister(tableSinkTotalRowsCountCounter)
registry.MustRegister(bufferSinkTotalRowsCountCounter)
}
9 changes: 7 additions & 2 deletions cdc/sink/statistics.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,11 +44,16 @@ func NewStatistics(ctx context.Context, name string, opts map[string]string) *St
statistics.metricExecErrCnt = executionErrorCounter.WithLabelValues(statistics.captureAddr, statistics.changefeedID)

// Flush metrics in background for better accuracy and efficiency.
captureAddr, changefeedID := statistics.captureAddr, statistics.changefeedID
ticker := time.NewTicker(flushMetricsInterval)
metricTotalRows := totalRowsCountGauge.WithLabelValues(statistics.captureAddr, statistics.changefeedID)
metricTotalFlushedRows := totalFlushedRowsCountGauge.WithLabelValues(statistics.captureAddr, statistics.changefeedID)
go func() {
defer ticker.Stop()
metricTotalRows := totalRowsCountGauge.WithLabelValues(captureAddr, changefeedID)
metricTotalFlushedRows := totalFlushedRowsCountGauge.WithLabelValues(captureAddr, changefeedID)
defer func() {
totalRowsCountGauge.DeleteLabelValues(captureAddr, changefeedID)
totalFlushedRowsCountGauge.DeleteLabelValues(captureAddr, changefeedID)
}()
for {
select {
case <-ctx.Done():
Expand Down
Loading

0 comments on commit 756dfc9

Please sign in to comment.