Skip to content

Commit

Permalink
[to #246] Cherry pick #247,#249 to cdc-release-v1.0 (#252)
Browse files Browse the repository at this point in the history
* [fix #246] fix unified sorter (#247)

* fix unified sorter

Signed-off-by: zeminzhou <[email protected]>

* remove debug log

Signed-off-by: zeminzhou <[email protected]>

* add metric for the input event of puller

Signed-off-by: zeminzhou <[email protected]>

* remove puller input metric

Signed-off-by: zeminzhou <[email protected]>

* add back

Signed-off-by: zeminzhou <[email protected]>

Signed-off-by: zeminzhou <[email protected]>
Signed-off-by: pingyu <[email protected]>

* fix keyspan sink metrics (#249)

Signed-off-by: pingyu <[email protected]>

Signed-off-by: pingyu <[email protected]>

* fix gh action

Signed-off-by: pingyu <[email protected]>

Signed-off-by: zeminzhou <[email protected]>
Signed-off-by: pingyu <[email protected]>
Co-authored-by: zzm <[email protected]>
  • Loading branch information
pingyu and zeminzhou authored Oct 9, 2022
1 parent ff608e7 commit dba5fe8
Show file tree
Hide file tree
Showing 11 changed files with 38 additions and 64 deletions.
8 changes: 6 additions & 2 deletions .github/workflows/ci-cdc.yml
Original file line number Diff line number Diff line change
@@ -1,10 +1,14 @@
name: TiKV-CDC
on:
push:
branches: main
branches:
- main
- 'cdc-release-**'
paths: cdc/**
pull_request:
branches: main
branches:
- main
- 'cdc-release-**'
paths: cdc/**

permissions:
Expand Down
2 changes: 0 additions & 2 deletions cdc/cdc/model/kv_gen.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 0 additions & 3 deletions cdc/cdc/processor/pipeline/puller.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,12 @@ import (
"strconv"

"github.com/pingcap/errors"
"github.com/pingcap/log"
"github.com/tikv/client-go/v2/oracle"
"github.com/tikv/migration/cdc/cdc/model"
"github.com/tikv/migration/cdc/cdc/puller"
"github.com/tikv/migration/cdc/pkg/pipeline"
"github.com/tikv/migration/cdc/pkg/regionspan"
"github.com/tikv/migration/cdc/pkg/util"
"go.uber.org/zap"
"golang.org/x/sync/errgroup"
)

Expand Down Expand Up @@ -86,7 +84,6 @@ func (n *pullerNode) InitWithWaitGroup(ctx pipeline.NodeContext, wg *errgroup.Gr
metricKeySpanResolvedTsGauge.Set(float64(oracle.ExtractPhysical(rawKV.CRTs)))
}
pEvent := model.NewPolymorphicEvent(rawKV)
log.Debug("[TRACE] pullerNode.SendToNextNode", zap.Any("event", pEvent))
ctx.SendToNextNode(pipeline.PolymorphicEventMessage(pEvent))
}
}
Expand Down
11 changes: 0 additions & 11 deletions cdc/cdc/processor/pipeline/sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -141,24 +141,19 @@ func (n *sinkNode) flushSink(ctx context.Context, resolvedTs model.Ts) (err erro
err = n.stop(ctx)
}
}()
requestResolvedTs := resolvedTs
if resolvedTs > n.barrierTs {
resolvedTs = n.barrierTs
}
if resolvedTs > n.targetTs {
resolvedTs = n.targetTs
}
log.Debug("[TRACE] sink.flushSink", zap.Uint64("requestResolvedTs", requestResolvedTs), zap.Uint64("resultResolvedTs", resolvedTs), zap.Uint64("barrierTs", n.barrierTs),
zap.Uint64("targetTs", n.targetTs), zap.Uint64("checkpointTs", n.CheckpointTs()))
if resolvedTs <= n.CheckpointTs() {
return nil
}
if err := n.emitRow2Sink(ctx); err != nil {
return errors.Trace(err)
}
checkpointTs, err := n.sink.FlushChangedEvents(ctx, n.keyspanID, resolvedTs)
log.Debug("[TRACE] sinkNode.sink.FlushChangedEvents", zap.Uint64("keyspanID", n.keyspanID), zap.Uint64("resolvedTs", resolvedTs), zap.Uint64("checkpointTs", checkpointTs),
zap.Uint64("n.CheckpointTs()", n.CheckpointTs()), zap.Error(err))
if err != nil {
return errors.Trace(err)
}
Expand All @@ -185,7 +180,6 @@ func (n *sinkNode) emitEvent(ctx context.Context, event *model.PolymorphicEvent)
log.Warn("skip emit nil event", zap.Any("event", event))
return nil
}
log.Debug("[TRACE] sinkNode.emitEvent", zap.Any("event", event))

n.eventBuffer = append(n.eventBuffer, event)

Expand Down Expand Up @@ -229,9 +223,6 @@ func (n *sinkNode) emitRow2Sink(ctx context.Context) error {
time.Sleep(10 * time.Second)
panic("ProcessorSyncResolvedPreEmit")
})
if len(n.rawKVBuffer) > 0 {
log.Debug("[TRACE] sinkNode.emitRow2Sink", zap.Any("n.rawKVBuffer", n.rawKVBuffer))
}
err := n.sink.EmitChangedEvents(ctx, n.rawKVBuffer...)
if err != nil {
return errors.Trace(err)
Expand All @@ -247,7 +238,6 @@ func (n *sinkNode) Receive(ctx pipeline.NodeContext) error {
}

func (n *sinkNode) HandleMessage(ctx context.Context, msg pipeline.Message) (bool, error) {
log.Debug("[TRACE] sinkNode.HandleMessage", zap.Any("msg", msg))
if n.status == KeySpanStatusStopped {
return false, cerror.ErrKeySpanProcessorStoppedSafely.GenWithStackByArgs()
}
Expand All @@ -261,7 +251,6 @@ func (n *sinkNode) HandleMessage(ctx context.Context, msg pipeline.Message) (boo
failpoint.Inject("ProcessorSyncResolvedError", func() {
failpoint.Return(false, errors.New("processor sync resolved injected error"))
})
log.Debug("[TRACE] sinkNode.flushSink", zap.Uint64("msg.PolymorphicEvent.CRTs", msg.PolymorphicEvent.CRTs))
if err := n.flushSink(ctx, msg.PolymorphicEvent.CRTs); err != nil {
return false, errors.Trace(err)
}
Expand Down
3 changes: 0 additions & 3 deletions cdc/cdc/processor/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -521,9 +521,6 @@ func (p *processor) handlePosition(currentTs int64) {
p.metricCheckpointTsLagGauge.Set(float64(currentTs-checkpointPhyTs) / 1e3)
p.metricCheckpointTsGauge.Set(float64(checkpointPhyTs))

log.Debug("[TRACE] processor.handlePosition", zap.Uint64("minResolvedTs", minResolvedTs), zap.Uint64("minCheckpointTs", minCheckpointTs),
zap.Int64("resolvedPhyTs", resolvedPhyTs), zap.Int64("checkpointPhyTs", checkpointPhyTs))

// minResolvedTs and minCheckpointTs may less than global resolved ts and global checkpoint ts when a new keyspan added, the startTs of the new keyspan is less than global checkpoint ts.
if minResolvedTs != p.changefeed.TaskPositions[p.captureInfo.ID].ResolvedTs ||
minCheckpointTs != p.changefeed.TaskPositions[p.captureInfo.ID].CheckPointTs {
Expand Down
14 changes: 7 additions & 7 deletions cdc/cdc/puller/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,19 +18,19 @@ import (
)

var (
kvEventCounter = prometheus.NewCounterVec(
inputEventCounter = prometheus.NewCounterVec(
prometheus.CounterOpts{
Namespace: "tikv_cdc",
Subsystem: "puller",
Name: "kv_event_count",
Name: "input_event_count",
Help: "The number of events received from kv client event channel",
}, []string{"capture", "changefeed", "type"})
txnCollectCounter = prometheus.NewCounterVec(
outputEventCounter = prometheus.NewCounterVec(
prometheus.CounterOpts{
Namespace: "tikv_cdc",
Subsystem: "puller",
Name: "txn_collect_event_count",
Help: "The number of events received from txn collector",
Name: "output_event_count",
Help: "The number of events sent to sorter",
}, []string{"capture", "changefeed", "type"})
pullerResolvedTsGauge = prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Expand Down Expand Up @@ -66,8 +66,8 @@ var (

// InitMetrics registers all metrics in this file
func InitMetrics(registry *prometheus.Registry) {
registry.MustRegister(kvEventCounter)
registry.MustRegister(txnCollectCounter)
registry.MustRegister(inputEventCounter)
registry.MustRegister(outputEventCounter)
registry.MustRegister(pullerResolvedTsGauge)
registry.MustRegister(memBufferSizeGauge)
registry.MustRegister(outputChanSizeHistogram)
Expand Down
27 changes: 16 additions & 11 deletions cdc/cdc/puller/puller.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,17 +123,19 @@ func (p *pullerImpl) Run(ctx context.Context) error {
metricOutputChanSize := outputChanSizeHistogram.WithLabelValues(captureAddr, changefeedID)
metricEventChanSize := eventChanSizeHistogram.WithLabelValues(captureAddr, changefeedID)
metricPullerResolvedTs := pullerResolvedTsGauge.WithLabelValues(captureAddr, changefeedID)
metricTxnCollectCounterKv := txnCollectCounter.WithLabelValues(captureAddr, changefeedID, "kv")
metricTxnCollectCounterResolved := txnCollectCounter.WithLabelValues(captureAddr, changefeedID, "resolved")
metricPullerInputCounterKv := inputEventCounter.WithLabelValues(captureAddr, changefeedID, "kv")
metricPullerInputCounterResolved := inputEventCounter.WithLabelValues(captureAddr, changefeedID, "resolved")
metricPullerOutputCounterKv := outputEventCounter.WithLabelValues(captureAddr, changefeedID, "kv")
metricPullerOutputCounterResolved := outputEventCounter.WithLabelValues(captureAddr, changefeedID, "resolved")
defer func() {
outputChanSizeHistogram.DeleteLabelValues(captureAddr, changefeedID)
eventChanSizeHistogram.DeleteLabelValues(captureAddr, changefeedID)
memBufferSizeGauge.DeleteLabelValues(captureAddr, changefeedID)
pullerResolvedTsGauge.DeleteLabelValues(captureAddr, changefeedID)
kvEventCounter.DeleteLabelValues(captureAddr, changefeedID, "kv")
kvEventCounter.DeleteLabelValues(captureAddr, changefeedID, "resolved")
txnCollectCounter.DeleteLabelValues(captureAddr, changefeedID, "kv")
txnCollectCounter.DeleteLabelValues(captureAddr, changefeedID, "resolved")
inputEventCounter.DeleteLabelValues(captureAddr, changefeedID, "kv")
inputEventCounter.DeleteLabelValues(captureAddr, changefeedID, "resolved")
outputEventCounter.DeleteLabelValues(captureAddr, changefeedID, "kv")
outputEventCounter.DeleteLabelValues(captureAddr, changefeedID, "resolved")
}()
g.Go(func() error {
for {
Expand Down Expand Up @@ -170,6 +172,11 @@ func (p *pullerImpl) Run(ctx context.Context) error {
case <-ctx.Done():
return errors.Trace(ctx.Err())
case p.outputCh <- raw:
if raw.OpType == model.OpTypeResolved {
metricPullerOutputCounterResolved.Inc()
} else {
metricPullerOutputCounterKv.Inc()
}
}
return nil
}
Expand All @@ -184,28 +191,26 @@ func (p *pullerImpl) Run(ctx context.Context) error {
return errors.Trace(ctx.Err())
}

log.Debug("[TRACE] revcive region feed event", zap.Stringer("event", e))

if e.Val != nil {
metricPullerInputCounterKv.Inc()

e.Val.Sequence = p.eventSeq
p.eventSeq += 1
metricTxnCollectCounterKv.Inc()
if err := output(e.Val); err != nil {
return errors.Trace(err)
}
} else if e.Resolved != nil {
metricTxnCollectCounterResolved.Inc()
if !regionspan.IsSubSpan(e.Resolved.Span, p.spans...) {
log.Panic("the resolved span is not in the total span",
zap.Reflect("resolved", e.Resolved),
zap.Uint64("keyspanID", keyspanID),
zap.Reflect("spans", p.spans),
)
}
metricPullerInputCounterResolved.Inc()
// Forward is called in a single thread
p.tsTracker.Forward(e.Resolved.Span, e.Resolved.ResolvedTs)
resolvedTs := p.tsTracker.Frontier()
log.Debug("[TRACE] puller.tsTracker.Forward", zap.Any("e.Span", e.Resolved.Span), zap.Uint64("e.ResolvedTs", e.Resolved.ResolvedTs), zap.Uint64("tsTracker.Frontier", resolvedTs))
if resolvedTs > 0 && !initialized {
// Advancing to a non-zero value means the puller level
// resolved ts is initialized.
Expand Down
4 changes: 0 additions & 4 deletions cdc/cdc/sink/buffer_sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -168,9 +168,6 @@ func (b *bufferSink) runOnce(ctx context.Context, state *runState) (bool, error)
}

func (b *bufferSink) EmitChangedEvents(ctx context.Context, rawKVEntries ...*model.RawKVEntry) error {
if len(rawKVEntries) > 0 {
log.Debug("[TRACE] bufferSink.EmitChangedEvents", zap.Any("rawKVEntries", rawKVEntries))
}
select {
case <-ctx.Done():
return ctx.Err()
Expand All @@ -187,7 +184,6 @@ func (b *bufferSink) EmitChangedEvents(ctx context.Context, rawKVEntries ...*mod
}

func (b *bufferSink) FlushChangedEvents(ctx context.Context, keyspanID model.KeySpanID, resolvedTs uint64) (uint64, error) {
log.Debug("[TRACE] bufferSink.FlushChangedEvents", zap.Uint64("keyspanID", keyspanID), zap.Uint64("resolvedTs", resolvedTs))
select {
case <-ctx.Done():
return b.getKeySpanCheckpointTs(keyspanID), ctx.Err()
Expand Down
4 changes: 0 additions & 4 deletions cdc/cdc/sink/keyspan_sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,9 +31,6 @@ type keyspanSink struct {
var _ Sink = (*keyspanSink)(nil)

func (t *keyspanSink) EmitChangedEvents(ctx context.Context, rawKVEntries ...*model.RawKVEntry) error {
if len(rawKVEntries) > 0 {
log.Debug("[TRACE] keyspanSink.EmitChangedEvents", zap.Any("rawKVEntries", rawKVEntries))
}
t.buffer = append(t.buffer, rawKVEntries...)
t.manager.metricsKeySpanSinkTotalEvents.Add(float64(len(rawKVEntries)))
return nil
Expand All @@ -43,7 +40,6 @@ func (t *keyspanSink) EmitChangedEvents(ctx context.Context, rawKVEntries ...*mo
// is required to be no more than global resolvedTs, keyspan barrierTs and keyspan
// redo log watermarkTs.
func (t *keyspanSink) FlushChangedEvents(ctx context.Context, keyspanID model.KeySpanID, resolvedTs uint64) (uint64, error) {
log.Debug("[TRACE] bufferSink.FlushChangedEvents", zap.Uint64("keyspanID", keyspanID), zap.Uint64("resolvedTs", resolvedTs))
if keyspanID != t.keyspanID {
log.Panic("inconsistent keyspan sink",
zap.Uint64("keyspanID", keyspanID), zap.Uint64("sinkKeySpanID", t.keyspanID))
Expand Down
8 changes: 0 additions & 8 deletions cdc/cdc/sink/tikv.go
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,6 @@ func (k *tikvSink) dispatch(entry *model.RawKVEntry) uint32 {
}

func (k *tikvSink) EmitChangedEvents(ctx context.Context, rawKVEntries ...*model.RawKVEntry) error {
log.Debug("[TRACE] tikvSink.EmitChangedEvents", zap.Any("rawKVEntries", rawKVEntries))
entriesCount := 0

for _, rawKVEntry := range rawKVEntries {
Expand All @@ -166,8 +165,6 @@ func (k *tikvSink) EmitChangedEvents(ctx context.Context, rawKVEntries ...*model
}

func (k *tikvSink) FlushChangedEvents(ctx context.Context, keyspanID model.KeySpanID, resolvedTs uint64) (uint64, error) {
log.Debug("[TRACE] tikvSink::FlushChangedEvents", zap.Uint64("resolvedTs", resolvedTs), zap.Uint64("checkpointTs", k.checkpointTs))

if resolvedTs <= k.checkpointTs {
return k.checkpointTs, nil
}
Expand Down Expand Up @@ -282,8 +279,6 @@ func extractEntry(entry *model.RawKVEntry, now uint64) (opType model.OpType, key
}

func (b *tikvBatcher) Append(entry *model.RawKVEntry) {
log.Debug("[TRACE] tikvBatch::Append", zap.Any("event", entry))

if len(b.Batches) == 0 {
b.now = b.getNow()
}
Expand Down Expand Up @@ -357,7 +352,6 @@ func (k *tikvSink) runWorker(ctx context.Context, workerIdx uint32) error {
if err != nil {
return 0, err
}
log.Debug("[TRACE] tikvSink::flushToTiKV", zap.Int("thisBatchSize", thisBatchSize), zap.Any("batch", batch))
}
batcher.Reset()
return thisBatchSize, nil
Expand All @@ -380,7 +374,6 @@ func (k *tikvSink) runWorker(ctx context.Context, workerIdx uint32) error {
}
if e.rawKVEntry == nil {
if e.resolvedTs != 0 {
log.Debug("[TRACE] tikvSink::runWorker push workerResolvedTs", zap.Uint32("workerIdx", workerIdx), zap.Uint64("event.resolvedTs", e.resolvedTs))
if err := flushToTiKV(); err != nil {
return errors.Trace(err)
}
Expand All @@ -390,7 +383,6 @@ func (k *tikvSink) runWorker(ctx context.Context, workerIdx uint32) error {
}
continue
}
log.Debug("[TRACE] tikvSink::runWorker append event", zap.Uint32("workerIdx", workerIdx), zap.Any("event", e.rawKVEntry))
batcher.Append(e.rawKVEntry)

if batcher.ByteSize() >= defaultTiKVBatchBytesLimit {
Expand Down
18 changes: 9 additions & 9 deletions cdc/metrics/grafana/tikv-cdc.json
Original file line number Diff line number Diff line change
Expand Up @@ -3421,7 +3421,7 @@
"steppedLine": false,
"targets": [
{
"expr": "sum (rate(tikv_cdc_puller_txn_collect_event_count{tidb_cluster=\"$tidb_cluster\", changefeed=~\"$changefeed\", capture=~\"$capture\"}[1m])) by (changefeed, capture, type)",
"expr": "sum (rate(tikv_cdc_puller_output_event_count{tidb_cluster=\"$tidb_cluster\", changefeed=~\"$changefeed\", capture=~\"$capture\"}[1m])) by (changefeed, capture, type)",
"format": "time_series",
"intervalFactor": 1,
"legendFormat": "{{changefeed}}-{{capture}}-{{type}}",
Expand Down Expand Up @@ -3515,7 +3515,7 @@
"steppedLine": false,
"targets": [
{
"expr": "sum(tikv_cdc_puller_txn_collect_event_count{tidb_cluster=\"$tidb_cluster\", changefeed=~\"$changefeed\", capture=~\"$capture\"}) by (changefeed, capture, type)",
"expr": "sum(tikv_cdc_puller_output_event_count{tidb_cluster=\"$tidb_cluster\", changefeed=~\"$changefeed\", capture=~\"$capture\"}) by (changefeed, capture, type)",
"format": "time_series",
"intervalFactor": 1,
"legendFormat": "{{changefeed}}-{{capture}}-{{type}}",
Expand Down Expand Up @@ -3757,7 +3757,7 @@
"dashLength": 10,
"dashes": false,
"datasource": "${DS_TEST-CLUSTER}",
"description": "The number of events that table sorter outputs to buffer sink per second",
"description": "The number of events that sorter outputs to keyspan sink per second",
"fill": 1,
"fillGradient": 0,
"gridPos": {
Expand Down Expand Up @@ -3797,7 +3797,7 @@
"steppedLine": false,
"targets": [
{
"expr": "sum(rate(tikv_cdc_sink_table_sink_total_rows_count{tidb_cluster=\"$tidb_cluster\", changefeed=~\"$changefeed\", capture=~\"$capture\"}[1m])) by (changefeed, capture)",
"expr": "sum(rate(tikv_cdc_sink_keyspan_sink_total_event_count{tidb_cluster=\"$tidb_cluster\", changefeed=~\"$changefeed\", capture=~\"$capture\"}[1m])) by (changefeed, capture)",
"format": "time_series",
"intervalFactor": 1,
"legendFormat": "{{changefeed}}-{{capture}}",
Expand All @@ -3808,7 +3808,7 @@
"timeFrom": null,
"timeRegions": [],
"timeShift": null,
"title": "Table sink output events/s",
"title": "Keyspan sink output events/s",
"tooltip": {
"shared": true,
"sort": 0,
Expand Down Expand Up @@ -3851,7 +3851,7 @@
"dashLength": 10,
"dashes": false,
"datasource": "${DS_TEST-CLUSTER}",
"description": "The total number of events that table sinks emit",
"description": "The total number of events that keyspan sinks emit",
"fill": 1,
"fillGradient": 0,
"gridPos": {
Expand Down Expand Up @@ -3891,7 +3891,7 @@
"steppedLine": false,
"targets": [
{
"expr": "sum(tikv_cdc_sink_table_sink_total_rows_count{tidb_cluster=\"$tidb_cluster\", changefeed=~\"$changefeed\", capture=~\"$capture\"}) by (changefeed, capture)",
"expr": "sum(tikv_cdc_sink_keyspan_sink_total_event_count{tidb_cluster=\"$tidb_cluster\", changefeed=~\"$changefeed\", capture=~\"$capture\"}) by (changefeed, capture)",
"format": "time_series",
"intervalFactor": 1,
"legendFormat": "{{changefeed}}-{{capture}}",
Expand All @@ -3902,7 +3902,7 @@
"timeFrom": null,
"timeRegions": [],
"timeShift": null,
"title": "Table sink output events",
"title": "Keyspan sink output events",
"tooltip": {
"shared": true,
"sort": 0,
Expand Down Expand Up @@ -13413,4 +13413,4 @@
"title": "Test-Cluster-TiKV-CDC",
"uid": "HXILPi3nz",
"version": 1
}
}

0 comments on commit dba5fe8

Please sign in to comment.