Skip to content

Commit

Permalink
Merge branch 'master' into master
Browse files Browse the repository at this point in the history
  • Loading branch information
buchuitoudegou authored Jan 24, 2022
2 parents 6080a27 + 0236882 commit be529fc
Show file tree
Hide file tree
Showing 5 changed files with 24 additions and 41 deletions.
12 changes: 4 additions & 8 deletions cdc/processor/pipeline/sorter.go
Original file line number Diff line number Diff line change
Expand Up @@ -140,14 +140,6 @@ func (n *sorterNode) StartActorNode(ctx pipeline.NodeContext, isTableActorMode b
ctx.Throw(errors.Trace(eventSorter.Run(stdCtx)))
return nil
})
n.eg.Go(func() error {
// Since the flowController is implemented by `Cond`, it is not cancelable
// by a context. We need to listen on cancellation and aborts the flowController
// manually.
<-stdCtx.Done()
n.flowController.Abort()
return nil
})
n.eg.Go(func() error {
lastSentResolvedTs := uint64(0)
lastSendResolvedTsTime := time.Now() // the time at which we last sent a resolved-ts.
Expand Down Expand Up @@ -314,6 +306,10 @@ func (n *sorterNode) Destroy(ctx pipeline.NodeContext) error {
log.Warn("schedule table cleanup task failed", zap.Error(err))
}
}
// Since the flowController is implemented by `Cond`, it is not cancelable by a context
// the flowController will be blocked in a background goroutine,
// We need to abort the flowController manually in the nodeRunner
n.flowController.Abort()
return n.eg.Wait()
}

Expand Down
12 changes: 1 addition & 11 deletions cdc/processor/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -363,6 +363,7 @@ func (p *processor) tick(ctx cdcContext.Context, state *orchestrator.ChangefeedR
p.handleWorkload()
}
p.doGCSchemaStorage(ctx)
p.metricSyncTableNumGauge.Set(float64(len(p.tables)))

if p.newSchedulerEnabled {
if err := p.agent.Tick(ctx); err != nil {
Expand Down Expand Up @@ -974,17 +975,6 @@ func (p *processor) createTablePipelineImpl(ctx cdcContext.Context, tableID mode
sink,
p.changefeed.Info.GetTargetTs(),
)
p.wg.Add(1)
p.metricSyncTableNumGauge.Inc()
go func() {
table.Wait()
p.wg.Done()
p.metricSyncTableNumGauge.Dec()
log.Debug("Table pipeline exited", zap.Int64("tableID", tableID),
cdcContext.ZapFieldChangefeed(ctx),
zap.String("name", table.Name()),
zap.Any("replicaInfo", replicaInfo))
}()

if p.redoManager.Enabled() {
p.redoManager.AddTable(tableID, replicaInfo.StartTs)
Expand Down
19 changes: 7 additions & 12 deletions cdc/puller/puller.go
Original file line number Diff line number Diff line change
Expand Up @@ -139,21 +139,11 @@ func (p *pullerImpl) Run(ctx context.Context) error {
txnCollectCounter.DeleteLabelValues(captureAddr, changefeedID, "kv")
txnCollectCounter.DeleteLabelValues(captureAddr, changefeedID, "resolved")
}()
g.Go(func() error {
for {
select {
case <-ctx.Done():
return nil
case <-time.After(15 * time.Second):
metricEventChanSize.Observe(float64(len(eventCh)))
metricOutputChanSize.Observe(float64(len(p.outputCh)))
metricPullerResolvedTs.Set(float64(oracle.ExtractPhysical(atomic.LoadUint64(&p.resolvedTs))))
}
}
})

lastResolvedTs := p.checkpointTs
g.Go(func() error {
metricsTicker := time.NewTicker(15 * time.Second)
defer metricsTicker.Stop()
output := func(raw *model.RawKVEntry) error {
// even after https://github.com/pingcap/tiflow/pull/2038, kv client
// could still miss region change notification, which leads to resolved
Expand Down Expand Up @@ -181,6 +171,11 @@ func (p *pullerImpl) Run(ctx context.Context) error {
for {
var e model.RegionFeedEvent
select {
case <-metricsTicker.C:
metricEventChanSize.Observe(float64(len(eventCh)))
metricOutputChanSize.Observe(float64(len(p.outputCh)))
metricPullerResolvedTs.Set(float64(oracle.ExtractPhysical(atomic.LoadUint64(&p.resolvedTs))))
continue
case e = <-eventCh:
case <-ctx.Done():
return errors.Trace(ctx.Err())
Expand Down
3 changes: 2 additions & 1 deletion cdc/sorter/unified/unified_sorter.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ func (s *Sorter) Run(ctx context.Context) error {

defer close(s.closeCh)

finish := util.MonitorCancelLatency(ctx, "Unified Sorter")
finish, startCancel := util.MonitorCancelLatency(ctx, "Unified Sorter")
defer finish()

ctx = context.WithValue(ctx, ctxKey{}, s)
Expand Down Expand Up @@ -163,6 +163,7 @@ func (s *Sorter) Run(ctx context.Context) error {

select {
case <-subctx.Done():
startCancel()
return errors.Trace(subctx.Err())
case err := <-heapSorterErrCh:
return errors.Trace(err)
Expand Down
19 changes: 10 additions & 9 deletions pkg/util/cancel_monitor.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,14 @@ import (
"go.uber.org/zap"
)

// MonitorCancelLatency monitors the latency from ctx being cancelled and the returned function being called
func MonitorCancelLatency(ctx context.Context, identifier string) func() {
// MonitorCancelLatency monitors the latency from ctx being cancelled
// the first returned function should be called when the cancellation is done
// the second returned function should be called to mark the cancellation is started, it will start a
// background go routine to monitor the latency util finish is called or cancellation is done
func MonitorCancelLatency(ctx context.Context, identifier string) (func(), func()) {
finishedCh := make(chan struct{})
go func() {
select {
case <-ctx.Done():
start := func() {
go func() {
log.Debug("MonitorCancelLatency: Cancelled", zap.String("identifier", identifier))
ticker := time.NewTicker(time.Second)
defer ticker.Stop()
Expand All @@ -43,10 +45,9 @@ func MonitorCancelLatency(ctx context.Context, identifier string) func() {
zap.Int("duration", elapsed), zap.Error(ctx.Err()))
}
}
case <-finishedCh:
}
}()
}()
}
return func() {
close(finishedCh)
}
}, start
}

0 comments on commit be529fc

Please sign in to comment.