diff --git a/cdc/processor/pipeline/sorter.go b/cdc/processor/pipeline/sorter.go index 3032c41fcaa..0368d27a758 100644 --- a/cdc/processor/pipeline/sorter.go +++ b/cdc/processor/pipeline/sorter.go @@ -140,14 +140,6 @@ func (n *sorterNode) StartActorNode(ctx pipeline.NodeContext, isTableActorMode b ctx.Throw(errors.Trace(eventSorter.Run(stdCtx))) return nil }) - n.eg.Go(func() error { - // Since the flowController is implemented by `Cond`, it is not cancelable - // by a context. We need to listen on cancellation and aborts the flowController - // manually. - <-stdCtx.Done() - n.flowController.Abort() - return nil - }) n.eg.Go(func() error { lastSentResolvedTs := uint64(0) lastSendResolvedTsTime := time.Now() // the time at which we last sent a resolved-ts. @@ -314,6 +306,10 @@ func (n *sorterNode) Destroy(ctx pipeline.NodeContext) error { log.Warn("schedule table cleanup task failed", zap.Error(err)) } } + // Since the flowController is implemented by `Cond`, it is not cancelable by a context + // the flowController will be blocked in a background goroutine, + // We need to abort the flowController manually in the nodeRunner + n.flowController.Abort() return n.eg.Wait() } diff --git a/cdc/processor/processor.go b/cdc/processor/processor.go index e4f9251acd3..b66f7831089 100644 --- a/cdc/processor/processor.go +++ b/cdc/processor/processor.go @@ -363,6 +363,7 @@ func (p *processor) tick(ctx cdcContext.Context, state *orchestrator.ChangefeedR p.handleWorkload() } p.doGCSchemaStorage(ctx) + p.metricSyncTableNumGauge.Set(float64(len(p.tables))) if p.newSchedulerEnabled { if err := p.agent.Tick(ctx); err != nil { @@ -974,17 +975,6 @@ func (p *processor) createTablePipelineImpl(ctx cdcContext.Context, tableID mode sink, p.changefeed.Info.GetTargetTs(), ) - p.wg.Add(1) - p.metricSyncTableNumGauge.Inc() - go func() { - table.Wait() - p.wg.Done() - p.metricSyncTableNumGauge.Dec() - log.Debug("Table pipeline exited", zap.Int64("tableID", tableID), - cdcContext.ZapFieldChangefeed(ctx), - zap.String("name", table.Name()), - zap.Any("replicaInfo", replicaInfo)) - }() if p.redoManager.Enabled() { p.redoManager.AddTable(tableID, replicaInfo.StartTs) diff --git a/cdc/puller/puller.go b/cdc/puller/puller.go index a04831a0eba..ed698dec057 100644 --- a/cdc/puller/puller.go +++ b/cdc/puller/puller.go @@ -139,21 +139,11 @@ func (p *pullerImpl) Run(ctx context.Context) error { txnCollectCounter.DeleteLabelValues(captureAddr, changefeedID, "kv") txnCollectCounter.DeleteLabelValues(captureAddr, changefeedID, "resolved") }() - g.Go(func() error { - for { - select { - case <-ctx.Done(): - return nil - case <-time.After(15 * time.Second): - metricEventChanSize.Observe(float64(len(eventCh))) - metricOutputChanSize.Observe(float64(len(p.outputCh))) - metricPullerResolvedTs.Set(float64(oracle.ExtractPhysical(atomic.LoadUint64(&p.resolvedTs)))) - } - } - }) lastResolvedTs := p.checkpointTs g.Go(func() error { + metricsTicker := time.NewTicker(15 * time.Second) + defer metricsTicker.Stop() output := func(raw *model.RawKVEntry) error { // even after https://github.com/pingcap/tiflow/pull/2038, kv client // could still miss region change notification, which leads to resolved @@ -181,6 +171,11 @@ func (p *pullerImpl) Run(ctx context.Context) error { for { var e model.RegionFeedEvent select { + case <-metricsTicker.C: + metricEventChanSize.Observe(float64(len(eventCh))) + metricOutputChanSize.Observe(float64(len(p.outputCh))) + metricPullerResolvedTs.Set(float64(oracle.ExtractPhysical(atomic.LoadUint64(&p.resolvedTs)))) + continue case e = <-eventCh: case <-ctx.Done(): return errors.Trace(ctx.Err()) diff --git a/cdc/sorter/unified/unified_sorter.go b/cdc/sorter/unified/unified_sorter.go index e8c1770a8f9..42a0394e0ec 100644 --- a/cdc/sorter/unified/unified_sorter.go +++ b/cdc/sorter/unified/unified_sorter.go @@ -114,7 +114,7 @@ func (s *Sorter) Run(ctx context.Context) error { defer close(s.closeCh) - finish := util.MonitorCancelLatency(ctx, "Unified Sorter") + finish, startCancel := util.MonitorCancelLatency(ctx, "Unified Sorter") defer finish() ctx = context.WithValue(ctx, ctxKey{}, s) @@ -163,6 +163,7 @@ func (s *Sorter) Run(ctx context.Context) error { select { case <-subctx.Done(): + startCancel() return errors.Trace(subctx.Err()) case err := <-heapSorterErrCh: return errors.Trace(err) diff --git a/pkg/util/cancel_monitor.go b/pkg/util/cancel_monitor.go index fb4953bc9cb..3e22ce4d288 100644 --- a/pkg/util/cancel_monitor.go +++ b/pkg/util/cancel_monitor.go @@ -21,12 +21,14 @@ import ( "go.uber.org/zap" ) -// MonitorCancelLatency monitors the latency from ctx being cancelled and the returned function being called -func MonitorCancelLatency(ctx context.Context, identifier string) func() { +// MonitorCancelLatency monitors the latency from ctx being cancelled +// the first returned function should be called when the cancellation is done +// the second returned function should be called to mark the cancellation is started, it will start a +// background go routine to monitor the latency util finish is called or cancellation is done +func MonitorCancelLatency(ctx context.Context, identifier string) (func(), func()) { finishedCh := make(chan struct{}) - go func() { - select { - case <-ctx.Done(): + start := func() { + go func() { log.Debug("MonitorCancelLatency: Cancelled", zap.String("identifier", identifier)) ticker := time.NewTicker(time.Second) defer ticker.Stop() @@ -43,10 +45,9 @@ func MonitorCancelLatency(ctx context.Context, identifier string) func() { zap.Int("duration", elapsed), zap.Error(ctx.Err())) } } - case <-finishedCh: - } - }() + }() + } return func() { close(finishedCh) - } + }, start }