Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

cdc/sink: Sink manager manage checkpoint per table #3621

Merged
Merged
Show file tree
Hide file tree
Changes from 10 commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
ba2c2fd
refine Sink interface
sdojjy Nov 25, 2021
26070d9
fix ut
sdojjy Nov 25, 2021
f388718
fix lint
sdojjy Nov 25, 2021
51b754a
fix lint
sdojjy Nov 25, 2021
112b4e0
sink manager manage checkpointTs per table
sdojjy Nov 25, 2021
bf10cc1
Merge branch 'master' into sink-manager-manage-checkpoint-per-table
sdojjy Nov 25, 2021
aaa2a9d
buffer sink manage checkpointTs per table
sdojjy Nov 26, 2021
6bccf2f
Merge remote-tracking branch 'origin/sink-manager-manage-checkpoint-p…
sdojjy Nov 26, 2021
dbd09d3
sink manager manage checkpointTs per table
sdojjy Nov 26, 2021
e189647
buffer sink manage checkpointTs per table
sdojjy Nov 26, 2021
fc0d687
sink manager manage checkpointTs per table
sdojjy Nov 26, 2021
4008fce
Merge branch 'master' into sink-manager-manage-checkpoint-per-table
sdojjy Nov 26, 2021
3421986
fix lint
sdojjy Nov 29, 2021
594593e
Merge branch 'master' into sink-manager-manage-checkpoint-per-table
sdojjy Nov 30, 2021
78f2628
address comment
sdojjy Nov 30, 2021
d87f6e7
address comment
sdojjy Nov 30, 2021
6e34487
Merge branch 'master' into sink-manager-manage-checkpoint-per-table
sdojjy Nov 30, 2021
d3444c1
fix ut
sdojjy Nov 30, 2021
6cd2eaa
fix ut
sdojjy Nov 30, 2021
2227e75
fix ut
sdojjy Nov 30, 2021
e75d9d0
Merge remote-tracking branch 'upstream/master' into sink-manager-mana…
sdojjy Nov 30, 2021
6037cd1
Merge branch 'master' into sink-manager-manage-checkpoint-per-table
ti-chi-bot Nov 30, 2021
3c35bdd
Merge branch 'master' into sink-manager-manage-checkpoint-per-table
ti-chi-bot Nov 30, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 22 additions & 16 deletions cdc/sink/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import (
"github.com/pingcap/errors"
"github.com/pingcap/log"
"github.com/pingcap/ticdc/cdc/model"
redo "github.com/pingcap/ticdc/cdc/redo"
"github.com/pingcap/ticdc/cdc/redo"
"github.com/prometheus/client_golang/prometheus"
"go.uber.org/zap"
)
Expand All @@ -34,10 +34,11 @@ const (

// Manager manages table sinks, maintains the relationship between table sinks and backendSink.
type Manager struct {
backendSink Sink
checkpointTs model.Ts
tableSinks map[model.TableID]*tableSink
tableSinksMu sync.Mutex
backendSink Sink
tableCheckpointTsMap sync.Map
tableSinks map[model.TableID]*tableSink
tableSinksMu sync.Mutex
changeFeedCheckpointTs uint64

flushMu sync.Mutex
flushing int64
Expand All @@ -57,7 +58,7 @@ func NewManager(
drawbackChan := make(chan drawbackMsg, 16)
return &Manager{
backendSink: newBufferSink(ctx, backendSink, errCh, checkpointTs, drawbackChan),
checkpointTs: checkpointTs,
changeFeedCheckpointTs: checkpointTs,
tableSinks: make(map[model.TableID]*tableSink),
drawbackChan: drawbackChan,
captureAddr: captureAddr,
Expand Down Expand Up @@ -90,15 +91,15 @@ func (m *Manager) Close(ctx context.Context) error {
return m.backendSink.Close(ctx)
}

func (m *Manager) getMinEmittedTs() model.Ts {
func (m *Manager) getMinEmittedTs(tableID model.TableID) model.Ts {
m.tableSinksMu.Lock()
defer m.tableSinksMu.Unlock()
if len(m.tableSinks) == 0 {
return m.getCheckpointTs()
return m.getCheckpointTs(tableID)
}
minTs := model.Ts(math.MaxUint64)
for _, tableSink := range m.tableSinks {
resolvedTs := tableSink.getResolvedTs()
for _, tblSink := range m.tableSinks {
resolvedTs := tblSink.getResolvedTs()
if minTs > resolvedTs {
minTs = resolvedTs
}
Expand All @@ -111,19 +112,19 @@ func (m *Manager) flushBackendSink(ctx context.Context, tableID model.TableID) (
// which will cause a lot of lock contention and blocking in high concurrency cases.
// So here we use flushing as a lightweight lock to improve the lock competition problem.
if !atomic.CompareAndSwapInt64(&m.flushing, 0, 1) {
return m.getCheckpointTs(), nil
return m.getCheckpointTs(tableID), nil
}
m.flushMu.Lock()
defer func() {
m.flushMu.Unlock()
atomic.StoreInt64(&m.flushing, 0)
}()
minEmittedTs := m.getMinEmittedTs()
minEmittedTs := m.getMinEmittedTs(tableID)
checkpointTs, err := m.backendSink.FlushRowChangedEvents(ctx, tableID, minEmittedTs)
if err != nil {
return m.getCheckpointTs(), errors.Trace(err)
return m.getCheckpointTs(tableID), errors.Trace(err)
}
atomic.StoreUint64(&m.checkpointTs, checkpointTs)
m.tableCheckpointTsMap.Store(tableID, checkpointTs)
return checkpointTs, nil
}

Expand All @@ -145,8 +146,13 @@ func (m *Manager) destroyTableSink(ctx context.Context, tableID model.TableID) e
return m.backendSink.Barrier(ctx, tableID)
}

func (m *Manager) getCheckpointTs() uint64 {
return atomic.LoadUint64(&m.checkpointTs)
func (m *Manager) getCheckpointTs(tableID model.TableID) uint64 {
checkPoints, ok := m.tableCheckpointTsMap.Load(tableID)
if ok {
return checkPoints.(uint64)
}
// this function is call when something unexpected, return change level checkpoint is safe
return m.changeFeedCheckpointTs
sdojjy marked this conversation as resolved.
Show resolved Hide resolved
sdojjy marked this conversation as resolved.
Show resolved Hide resolved
}

type drawbackMsg struct {
Expand Down
39 changes: 25 additions & 14 deletions cdc/sink/manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,9 +35,17 @@ var _ = check.Suite(&managerSuite{})

type checkSink struct {
*check.C
rows []*model.RowChangedEvent
rows map[model.TableID][]*model.RowChangedEvent
rowsMu sync.Mutex
lastResolvedTs uint64
lastResolvedTs map[model.TableID]uint64
}

func newCheckSink(c *check.C) *checkSink {
return &checkSink{
C: c,
rows: make(map[model.TableID][]*model.RowChangedEvent),
lastResolvedTs: make(map[model.TableID]uint64),
}
}

func (c *checkSink) Initialize(ctx context.Context, tableInfo []*model.SimpleTableInfo) error {
Expand All @@ -47,7 +55,9 @@ func (c *checkSink) Initialize(ctx context.Context, tableInfo []*model.SimpleTab
func (c *checkSink) EmitRowChangedEvents(ctx context.Context, rows ...*model.RowChangedEvent) error {
c.rowsMu.Lock()
defer c.rowsMu.Unlock()
c.rows = append(c.rows, rows...)
for _, row := range rows {
c.rows[row.Table.TableID] = append(c.rows[row.Table.TableID], row)
}
return nil
}

Expand All @@ -59,20 +69,21 @@ func (c *checkSink) FlushRowChangedEvents(ctx context.Context, tableID model.Tab
c.rowsMu.Lock()
defer c.rowsMu.Unlock()
var newRows []*model.RowChangedEvent
for _, row := range c.rows {
if row.CommitTs <= c.lastResolvedTs {
return c.lastResolvedTs, errors.Errorf("commit-ts(%d) is not greater than lastResolvedTs(%d)", row.CommitTs, c.lastResolvedTs)
rows := c.rows[tableID]
for _, row := range rows {
if row.CommitTs <= c.lastResolvedTs[tableID] {
return c.lastResolvedTs[tableID], errors.Errorf("commit-ts(%d) is not greater than lastResolvedTs(%d)", row.CommitTs, c.lastResolvedTs)
}
if row.CommitTs > resolvedTs {
newRows = append(newRows, row)
}
}

c.Assert(c.lastResolvedTs, check.LessEqual, resolvedTs)
c.lastResolvedTs = resolvedTs
c.rows = newRows
c.Assert(c.lastResolvedTs[tableID], check.LessEqual, resolvedTs)
c.lastResolvedTs[tableID] = resolvedTs
c.rows[tableID] = newRows

return c.lastResolvedTs, nil
return c.lastResolvedTs[tableID], nil
}

func (c *checkSink) EmitCheckpointTs(ctx context.Context, ts uint64) error {
Expand All @@ -92,7 +103,7 @@ func (s *managerSuite) TestManagerRandom(c *check.C) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
errCh := make(chan error, 16)
manager := NewManager(ctx, &checkSink{C: c}, errCh, 0, "", "")
manager := NewManager(ctx, newCheckSink(c), errCh, 0, "", "")
defer manager.Close(ctx)
goroutineNum := 10
rowNum := 100
Expand Down Expand Up @@ -147,7 +158,7 @@ func (s *managerSuite) TestManagerAddRemoveTable(c *check.C) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
errCh := make(chan error, 16)
manager := NewManager(ctx, &checkSink{C: c}, errCh, 0, "", "")
manager := NewManager(ctx, newCheckSink(c), errCh, 0, "", "")
defer manager.Close(ctx)
goroutineNum := 200
var wg sync.WaitGroup
Expand Down Expand Up @@ -234,7 +245,7 @@ func (s *managerSuite) TestManagerDestroyTableSink(c *check.C) {
defer cancel()

errCh := make(chan error, 16)
manager := NewManager(ctx, &checkSink{C: c}, errCh, 0, "", "")
manager := NewManager(ctx, newCheckSink(c), errCh, 0, "", "")
defer manager.Close(ctx)

tableID := int64(49)
Expand All @@ -255,7 +266,7 @@ func (s *managerSuite) TestManagerDestroyTableSink(c *check.C) {
func BenchmarkManagerFlushing(b *testing.B) {
ctx, cancel := context.WithCancel(context.Background())
errCh := make(chan error, 16)
manager := NewManager(ctx, &checkSink{}, errCh, 0, "", "")
manager := NewManager(ctx, newCheckSink(nil), errCh, 0, "", "")

// Init table sinks.
goroutineNum := 2000
Expand Down
4 changes: 2 additions & 2 deletions cdc/sink/table_sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ func (t *tableSink) FlushRowChangedEvents(ctx context.Context, tableID model.Tab

err := t.manager.backendSink.EmitRowChangedEvents(ctx, resolvedRows...)
if err != nil {
return t.manager.getCheckpointTs(), errors.Trace(err)
return t.manager.getCheckpointTs(tableID), errors.Trace(err)
}
atomic.StoreUint64(&t.emittedTs, resolvedTs)
ckpt, err := t.flushRedoLogs(ctx, resolvedTs)
Expand All @@ -85,7 +85,7 @@ func (t *tableSink) flushRedoLogs(ctx context.Context, resolvedTs uint64) (uint6
if t.redoManager.Enabled() {
err := t.redoManager.FlushLog(ctx, t.tableID, resolvedTs)
if err != nil {
return t.manager.getCheckpointTs(), err
return t.manager.getCheckpointTs(t.tableID), err
}
}
return 0, nil
Expand Down