diff --git a/pkg/causality/conflict_detector.go b/pkg/causality/conflict_detector.go index d7ddbdf922e..b8520a65acf 100644 --- a/pkg/causality/conflict_detector.go +++ b/pkg/causality/conflict_detector.go @@ -16,8 +16,8 @@ package causality import ( "sync" - "github.com/pingcap/tiflow/engine/pkg/containers" "github.com/pingcap/tiflow/pkg/causality/internal" + "github.com/pingcap/tiflow/pkg/chann" "go.uber.org/atomic" ) @@ -37,8 +37,8 @@ type ConflictDetector[Worker worker[Txn], Txn txnEvent] struct { nextWorkerID atomic.Int64 // Used to run a background goroutine to GC or notify nodes. - notifiedNodes *containers.SliceQueue[func()] - garbageNodes *containers.SliceQueue[txnFinishedEvent] + notifiedNodes *chann.Chann[func()] + garbageNodes *chann.Chann[txnFinishedEvent] wg sync.WaitGroup closeCh chan struct{} } @@ -57,8 +57,8 @@ func NewConflictDetector[Worker worker[Txn], Txn txnEvent]( workers: workers, slots: internal.NewSlots[*internal.Node](numSlots), numSlots: numSlots, - notifiedNodes: containers.NewSliceQueue[func()](), - garbageNodes: containers.NewSliceQueue[txnFinishedEvent](), + notifiedNodes: chann.New[func()](), + garbageNodes: chann.New[txnFinishedEvent](), closeCh: make(chan struct{}), } @@ -80,12 +80,12 @@ func (d *ConflictDetector[Worker, Txn]) Add(txn Txn) { node.OnResolved = func(workerID int64) { unlock := func() { node.Remove() - d.garbageNodes.Push(txnFinishedEvent{node, conflictKeys}) + d.garbageNodes.In() <- txnFinishedEvent{node, conflictKeys} } d.sendToWorker(txn, unlock, workerID) } node.RandWorkerID = func() int64 { return d.nextWorkerID.Add(1) % int64(len(d.workers)) } - node.OnNotified = func(callback func()) { d.notifiedNodes.Push(callback) } + node.OnNotified = func(callback func()) { d.notifiedNodes.In() <- callback } d.slots.Add(node, conflictKeys) } @@ -96,24 +96,20 @@ func (d *ConflictDetector[Worker, Txn]) Close() { } func (d *ConflictDetector[Worker, Txn]) runBackgroundTasks() { + defer func() { + d.notifiedNodes.Close() + d.garbageNodes.Close() + }() for { select { case <-d.closeCh: return - case <-d.notifiedNodes.C: - for { - notifiyCallback, ok := d.notifiedNodes.Pop() - if !ok { - break - } + case notifiyCallback := <-d.notifiedNodes.Out(): + if notifiyCallback != nil { notifiyCallback() } - case <-d.garbageNodes.C: - for { - event, ok := d.garbageNodes.Pop() - if !ok { - break - } + case event := <-d.garbageNodes.Out(): + if event.node != nil { d.slots.Free(event.node, event.conflictKeys) } }