Skip to content

Commit

Permalink
Fix data race in coordinator (#957)
Browse files Browse the repository at this point in the history
close #956
  • Loading branch information
hongyunyan authored Jan 27, 2025
1 parent a9806ae commit 38ec3a4
Showing 1 changed file with 8 additions and 3 deletions.
11 changes: 8 additions & 3 deletions coordinator/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,9 +60,10 @@ type Controller struct {
nodeChanged bool
nodeManager *watcher.NodeManager

taskScheduler threadpool.ThreadPool
taskHandlers []*threadpool.TaskHandle
messageCenter messaging.MessageCenter
taskScheduler threadpool.ThreadPool
taskHandlerMutex sync.Mutex // protect taskHandlers
taskHandlers []*threadpool.TaskHandle
messageCenter messaging.MessageCenter

updatedChangefeedCh chan map[common.ChangeFeedID]*changefeed.Changefeed
stateChangedCh chan *ChangefeedStateChangeEvent
Expand Down Expand Up @@ -387,13 +388,17 @@ func (c *Controller) FinishBootstrap(runningChangefeeds map[common.ChangeFeedID]
}

// start operator and scheduler
c.taskHandlerMutex.Lock()
defer c.taskHandlerMutex.Unlock()
c.taskHandlers = append(c.taskHandlers, c.scheduler.Start(c.taskScheduler)...)
operatorControllerHandle := c.taskScheduler.Submit(c.operatorController, time.Now())
c.taskHandlers = append(c.taskHandlers, operatorControllerHandle)
c.bootstrapped.Store(true)
}

func (c *Controller) Stop() {
c.taskHandlerMutex.Lock()
defer c.taskHandlerMutex.Unlock()
for _, h := range c.taskHandlers {
h.Cancel()
}
Expand Down

0 comments on commit 38ec3a4

Please sign in to comment.