Skip to content

Commit

Permalink
sink: reduce lock competition during flushBackendSink (#2617) (#2759)
Browse files Browse the repository at this point in the history
  • Loading branch information
ti-chi-bot authored Sep 26, 2021
1 parent be3daaa commit 29a1df4
Show file tree
Hide file tree
Showing 2 changed files with 92 additions and 2 deletions.
14 changes: 12 additions & 2 deletions cdc/sink/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,8 @@ type Manager struct {
tableSinks map[model.TableID]*tableSink
tableSinksMu sync.Mutex

flushMu sync.Mutex
flushMu sync.Mutex
flushing int64

drawbackChan chan drawbackMsg

Expand Down Expand Up @@ -106,8 +107,17 @@ func (m *Manager) getMinEmittedTs() model.Ts {
}

func (m *Manager) flushBackendSink(ctx context.Context) (model.Ts, error) {
// NOTICE: Because all table sinks will try to flush backend sink,
// which will cause a lot of lock contention and blocking in high concurrency cases.
// So here we use flushing as a lightweight lock to improve the lock competition problem.
if !atomic.CompareAndSwapInt64(&m.flushing, 0, 1) {
return m.getCheckpointTs(), nil
}
m.flushMu.Lock()
defer m.flushMu.Unlock()
defer func() {
m.flushMu.Unlock()
atomic.StoreInt64(&m.flushing, 0)
}()
minEmittedTs := m.getMinEmittedTs()
checkpointTs, err := m.backendSink.FlushRowChangedEvents(ctx, minEmittedTs)
if err != nil {
Expand Down
80 changes: 80 additions & 0 deletions cdc/sink/manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"math/rand"
"sync"
"sync/atomic"
"testing"
"time"

"github.com/pingcap/errors"
Expand Down Expand Up @@ -247,6 +248,85 @@ func (s *managerSuite) TestManagerDestroyTableSink(c *check.C) {
c.Assert(err, check.IsNil)
}

// Run the benchmark
// go test -benchmem -run='^$' -bench '^(BenchmarkManagerFlushing)$' github.com/pingcap/ticdc/cdc/sink
func BenchmarkManagerFlushing(b *testing.B) {
ctx, cancel := context.WithCancel(context.Background())
errCh := make(chan error, 16)
manager := NewManager(ctx, &checkSink{}, errCh, 0, "", "")

// Init table sinks.
goroutineNum := 2000
rowNum := 2000
var wg sync.WaitGroup
tableSinks := make([]Sink, goroutineNum)
for i := 0; i < goroutineNum; i++ {
i := i
wg.Add(1)
go func() {
defer wg.Done()
tableSinks[i] = manager.CreateTableSink(model.TableID(i), 0)
}()
}
wg.Wait()

// Concurrent emit events.
for i := 0; i < goroutineNum; i++ {
i := i
tableSink := tableSinks[i]
wg.Add(1)
go func() {
defer wg.Done()
for j := 1; j < rowNum; j++ {
err := tableSink.EmitRowChangedEvents(context.Background(), &model.RowChangedEvent{
Table: &model.TableName{TableID: int64(i)},
CommitTs: uint64(j),
})
if err != nil {
b.Error(err)
}
}
}()
}
wg.Wait()

// All tables are flushed concurrently, except table 0.
for i := 1; i < goroutineNum; i++ {
i := i
tableSink := tableSinks[i]
go func() {
for j := 1; j < rowNum; j++ {
if j%2 == 0 {
_, err := tableSink.FlushRowChangedEvents(context.Background(), uint64(j))
if err != nil {
b.Error(err)
}
}
}
}()
}

b.ResetTimer()
// Table 0 flush.
tableSink := tableSinks[0]
for i := 0; i < b.N; i++ {
_, err := tableSink.FlushRowChangedEvents(context.Background(), uint64(rowNum))
if err != nil {
b.Error(err)
}
}
b.StopTimer()

cancel()
_ = manager.Close(ctx)
close(errCh)
for err := range errCh {
if err != nil {
b.Error(err)
}
}
}

type errorSink struct {
*check.C
}
Expand Down

0 comments on commit 29a1df4

Please sign in to comment.