Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[close #343]fix cdc residue metrics #342

Merged
merged 7 commits into from
Jun 2, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions cdc/cdc/capture/capture.go
Original file line number Diff line number Diff line change
Expand Up @@ -234,6 +234,7 @@ func (c *Capture) run(stdCtx context.Context) error {
// (recoverable errors are intercepted in the processor tick)
// so we should also stop the processor and let capture restart or exit
processorErr = c.runEtcdWorker(ctx, c.processorManager, globalState, processorFlushInterval)
c.processorManager.SyncClose()
log.Info("the processor routine has exited", zap.Error(processorErr))
}()
wg.Add(1)
Expand Down Expand Up @@ -326,6 +327,7 @@ func (c *Capture) campaignOwner(ctx cdcContext.Context) error {
err = c.runEtcdWorker(ownerCtx, owner, orchestrator.NewGlobalState(), ownerFlushInterval)
c.setOwner(nil)
log.Info("run owner exited", zap.Error(err))
owner.CloseAllChangefeeds(ownerCtx)

// TODO: fix invalid resign
// When exiting normally, cancel will be called to make `owner routine`
Expand Down
18 changes: 12 additions & 6 deletions cdc/cdc/owner/owner.go
Original file line number Diff line number Diff line change
Expand Up @@ -188,17 +188,23 @@ func (o *Owner) Tick(stdCtx context.Context, rawState orchestrator.ReactorState)
}
}
if atomic.LoadInt32(&o.closed) != 0 {
for changefeedID, cfReactor := range o.changefeeds {
ctx = cdcContext.WithChangefeedVars(ctx, &cdcContext.ChangefeedVars{
ID: changefeedID,
})
cfReactor.Close(ctx)
}
o.CloseAllChangefeeds(ctx)
return state, cerror.ErrReactorFinished.GenWithStackByArgs()
}
return state, nil
}

// CloseAllCaptures close all changefeeds.
// Note: Please be careful to call this method!
func (o *Owner) CloseAllChangefeeds(ctx cdcContext.Context) {
for changefeedID, cfReactor := range o.changefeeds {
ctx = cdcContext.WithChangefeedVars(ctx, &cdcContext.ChangefeedVars{
ID: changefeedID,
})
cfReactor.Close(ctx)
}
}

// EnqueueJob enqueues an admin job into an internal queue, and the Owner will handle the job in the next tick
func (o *Owner) EnqueueJob(adminJob model.AdminJob) {
o.pushOwnerJob(&ownerJob{
Expand Down
8 changes: 8 additions & 0 deletions cdc/cdc/processor/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,14 @@ func (m *Manager) sendCommand(tp commandTp, payload interface{}) chan struct{} {
return cmd.done
}

// SyncClose closes all processors
// Note: This method must not be called with `Tick`!!!
func (m *Manager) SyncClose() {
for changefeedID := range m.processors {
m.closeProcessor(changefeedID)
}
}

func (m *Manager) handleCommand() error {
var cmd *command
select {
Expand Down
5 changes: 5 additions & 0 deletions cdc/cdc/processor/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -649,6 +649,10 @@ func (p *processor) removeKeySpan(keyspan keyspanpipeline.KeySpanPipeline, keysp
}

func (p *processor) Close() error {
if !p.initialized {
return nil
}

for _, tbl := range p.keyspans {
tbl.Cancel()
}
Expand All @@ -675,6 +679,7 @@ func (p *processor) Close() error {
}
}

p.initialized = false
return nil
}

Expand Down
5 changes: 4 additions & 1 deletion cdc/cdc/processor/processor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,10 @@ func newProcessor4Test(
createKeySpanPipeline func(ctx cdcContext.Context, keyspanID model.KeySpanID, replicaInfo *model.KeySpanReplicaInfo) (keyspanpipeline.KeySpanPipeline, error),
) *processor {
p := newProcessor(ctx)
p.lazyInit = func(ctx cdcContext.Context) error { return nil }
p.lazyInit = func(ctx cdcContext.Context) error {
p.initialized = true
return nil
}
p.sinkManager = &sink.Manager{}
p.createKeySpanPipeline = createKeySpanPipeline
return p
Expand Down