From 98c1f250af3f45c4f7a2665d9298b73d48b83d37 Mon Sep 17 00:00:00 2001 From: sdojjy Date: Tue, 14 Feb 2023 15:41:59 +0800 Subject: [PATCH 1/3] use chann.Chann --- cmd/cdc/main.go | 3 +++ pkg/causality/conflict_detector.go | 30 +++++++++++------------------- 2 files changed, 14 insertions(+), 19 deletions(-) diff --git a/cmd/cdc/main.go b/cmd/cdc/main.go index 9097cb6b245..4b36afa2e98 100644 --- a/cmd/cdc/main.go +++ b/cmd/cdc/main.go @@ -14,10 +14,13 @@ package main import ( + "runtime" + _ "github.com/pingcap/tidb/types/parser_driver" "github.com/pingcap/tiflow/pkg/cmd" ) func main() { + runtime.SetMutexProfileFraction(5) cmd.Run() } diff --git a/pkg/causality/conflict_detector.go b/pkg/causality/conflict_detector.go index d7ddbdf922e..3741d670e70 100644 --- a/pkg/causality/conflict_detector.go +++ b/pkg/causality/conflict_detector.go @@ -16,8 +16,8 @@ package causality import ( "sync" - "github.com/pingcap/tiflow/engine/pkg/containers" "github.com/pingcap/tiflow/pkg/causality/internal" + "github.com/pingcap/tiflow/pkg/chann" "go.uber.org/atomic" ) @@ -37,8 +37,8 @@ type ConflictDetector[Worker worker[Txn], Txn txnEvent] struct { nextWorkerID atomic.Int64 // Used to run a background goroutine to GC or notify nodes. - notifiedNodes *containers.SliceQueue[func()] - garbageNodes *containers.SliceQueue[txnFinishedEvent] + notifiedNodes *chann.Chann[func()] + garbageNodes *chann.Chann[txnFinishedEvent] wg sync.WaitGroup closeCh chan struct{} } @@ -57,8 +57,8 @@ func NewConflictDetector[Worker worker[Txn], Txn txnEvent]( workers: workers, slots: internal.NewSlots[*internal.Node](numSlots), numSlots: numSlots, - notifiedNodes: containers.NewSliceQueue[func()](), - garbageNodes: containers.NewSliceQueue[txnFinishedEvent](), + notifiedNodes: chann.New[func()](), + garbageNodes: chann.New[txnFinishedEvent](), closeCh: make(chan struct{}), } @@ -80,12 +80,12 @@ func (d *ConflictDetector[Worker, Txn]) Add(txn Txn) { node.OnResolved = func(workerID int64) { unlock := func() { node.Remove() - d.garbageNodes.Push(txnFinishedEvent{node, conflictKeys}) + d.garbageNodes.In() <- txnFinishedEvent{node, conflictKeys} } d.sendToWorker(txn, unlock, workerID) } node.RandWorkerID = func() int64 { return d.nextWorkerID.Add(1) % int64(len(d.workers)) } - node.OnNotified = func(callback func()) { d.notifiedNodes.Push(callback) } + node.OnNotified = func(callback func()) { d.notifiedNodes.In() <- callback } d.slots.Add(node, conflictKeys) } @@ -100,20 +100,12 @@ func (d *ConflictDetector[Worker, Txn]) runBackgroundTasks() { select { case <-d.closeCh: return - case <-d.notifiedNodes.C: - for { - notifiyCallback, ok := d.notifiedNodes.Pop() - if !ok { - break - } + case notifiyCallback := <-d.notifiedNodes.Out(): + if notifiyCallback != nil { notifiyCallback() } - case <-d.garbageNodes.C: - for { - event, ok := d.garbageNodes.Pop() - if !ok { - break - } + case event := <-d.garbageNodes.Out(): + if event.node != nil { d.slots.Free(event.node, event.conflictKeys) } } From a36f4ecfa756630ce0ee2d4e88213e3b8bf874ca Mon Sep 17 00:00:00 2001 From: sdojjy Date: Tue, 14 Feb 2023 19:46:42 +0800 Subject: [PATCH 2/3] use chann.Chann --- cmd/cdc/main.go | 3 --- 1 file changed, 3 deletions(-) diff --git a/cmd/cdc/main.go b/cmd/cdc/main.go index 4b36afa2e98..9097cb6b245 100644 --- a/cmd/cdc/main.go +++ b/cmd/cdc/main.go @@ -14,13 +14,10 @@ package main import ( - "runtime" - _ "github.com/pingcap/tidb/types/parser_driver" "github.com/pingcap/tiflow/pkg/cmd" ) func main() { - runtime.SetMutexProfileFraction(5) cmd.Run() } From 170502fea6f7e9a5a81bf25698cfa56aa48cb3b7 Mon Sep 17 00:00:00 2001 From: sdojjy Date: Wed, 15 Feb 2023 11:31:39 +0800 Subject: [PATCH 3/3] close unbounded chan --- pkg/causality/conflict_detector.go | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/pkg/causality/conflict_detector.go b/pkg/causality/conflict_detector.go index 3741d670e70..b8520a65acf 100644 --- a/pkg/causality/conflict_detector.go +++ b/pkg/causality/conflict_detector.go @@ -96,6 +96,10 @@ func (d *ConflictDetector[Worker, Txn]) Close() { } func (d *ConflictDetector[Worker, Txn]) runBackgroundTasks() { + defer func() { + d.notifiedNodes.Close() + d.garbageNodes.Close() + }() for { select { case <-d.closeCh: