Skip to content

Commit

Permalink
Merge branch 'release-5.3' into cherry-pick-4038-to-release-5.3
Browse files Browse the repository at this point in the history
  • Loading branch information
ti-chi-bot authored Dec 30, 2021
2 parents 4e6244e + c7c01b5 commit a9063b3
Show file tree
Hide file tree
Showing 43 changed files with 1,475 additions and 956 deletions.
2 changes: 1 addition & 1 deletion cdc/owner/async_sink_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ func (m *mockSink) Close(ctx context.Context) error {
return nil
}

func (m *mockSink) Barrier(ctx context.Context) error {
func (m *mockSink) Barrier(ctx context.Context, tableID model.TableID) error {
return nil
}

Expand Down
10 changes: 6 additions & 4 deletions cdc/processor/pipeline/sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,8 +64,9 @@ func (s *TableStatus) Store(new TableStatus) {
}

type sinkNode struct {
sink sink.Sink
status TableStatus
sink sink.Sink
status TableStatus
tableID model.TableID

resolvedTs model.Ts
checkpointTs model.Ts
Expand All @@ -78,8 +79,9 @@ type sinkNode struct {
flowController tableFlowController
}

func newSinkNode(sink sink.Sink, startTs model.Ts, targetTs model.Ts, flowController tableFlowController) *sinkNode {
func newSinkNode(tableID model.TableID, sink sink.Sink, startTs model.Ts, targetTs model.Ts, flowController tableFlowController) *sinkNode {
return &sinkNode{
tableID: tableID,
sink: sink,
status: TableStatusInitializing,
targetTs: targetTs,
Expand Down Expand Up @@ -136,7 +138,7 @@ func (n *sinkNode) flushSink(ctx pipeline.NodeContext, resolvedTs model.Ts) (err
if err := n.emitRow2Sink(ctx); err != nil {
return errors.Trace(err)
}
checkpointTs, err := n.sink.FlushRowChangedEvents(ctx, resolvedTs)
checkpointTs, err := n.sink.FlushRowChangedEvents(ctx, n.tableID, resolvedTs)
if err != nil {
return errors.Trace(err)
}
Expand Down
20 changes: 10 additions & 10 deletions cdc/processor/pipeline/sink_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ func (s *mockSink) EmitDDLEvent(ctx context.Context, ddl *model.DDLEvent) error
panic("unreachable")
}

func (s *mockSink) FlushRowChangedEvents(ctx context.Context, resolvedTs uint64) (uint64, error) {
func (s *mockSink) FlushRowChangedEvents(ctx context.Context, _ model.TableID, resolvedTs uint64) (uint64, error) {
s.received = append(s.received, struct {
resolvedTs model.Ts
row *model.RowChangedEvent
Expand All @@ -92,7 +92,7 @@ func (s *mockSink) Close(ctx context.Context) error {
return nil
}

func (s *mockSink) Barrier(ctx context.Context) error {
func (s *mockSink) Barrier(ctx context.Context, tableID model.TableID) error {
return nil
}

Expand Down Expand Up @@ -137,7 +137,7 @@ func (s *outputSuite) TestStatus(c *check.C) {
})

// test stop at targetTs
node := newSinkNode(&mockSink{}, 0, 10, &mockFlowController{})
node := newSinkNode(1, &mockSink{}, 0, 10, &mockFlowController{})
c.Assert(node.Init(pipeline.MockNodeContext4Test(ctx, pipeline.Message{}, nil)), check.IsNil)
c.Assert(node.Status(), check.Equals, TableStatusInitializing)

Expand All @@ -163,7 +163,7 @@ func (s *outputSuite) TestStatus(c *check.C) {
c.Assert(node.CheckpointTs(), check.Equals, uint64(10))

// test the stop at ts command
node = newSinkNode(&mockSink{}, 0, 10, &mockFlowController{})
node = newSinkNode(1, &mockSink{}, 0, 10, &mockFlowController{})
c.Assert(node.Init(pipeline.MockNodeContext4Test(ctx, pipeline.Message{}, nil)), check.IsNil)
c.Assert(node.Status(), check.Equals, TableStatusInitializing)

Expand All @@ -186,7 +186,7 @@ func (s *outputSuite) TestStatus(c *check.C) {
c.Assert(node.CheckpointTs(), check.Equals, uint64(2))

// test the stop at ts command is after then resolvedTs and checkpointTs is greater than stop ts
node = newSinkNode(&mockSink{}, 0, 10, &mockFlowController{})
node = newSinkNode(1, &mockSink{}, 0, 10, &mockFlowController{})
c.Assert(node.Init(pipeline.MockNodeContext4Test(ctx, pipeline.Message{}, nil)), check.IsNil)
c.Assert(node.Status(), check.Equals, TableStatusInitializing)

Expand Down Expand Up @@ -223,7 +223,7 @@ func (s *outputSuite) TestStopStatus(c *check.C) {
})

closeCh := make(chan interface{}, 1)
node := newSinkNode(&mockCloseControlSink{mockSink: mockSink{}, closeCh: closeCh}, 0, 100, &mockFlowController{})
node := newSinkNode(1, &mockCloseControlSink{mockSink: mockSink{}, closeCh: closeCh}, 0, 100, &mockFlowController{})
c.Assert(node.Init(pipeline.MockNodeContext4Test(ctx, pipeline.Message{}, nil)), check.IsNil)
c.Assert(node.Status(), check.Equals, TableStatusInitializing)
c.Assert(node.Receive(pipeline.MockNodeContext4Test(ctx,
Expand Down Expand Up @@ -258,7 +258,7 @@ func (s *outputSuite) TestManyTs(c *check.C) {
},
})
sink := &mockSink{}
node := newSinkNode(sink, 0, 10, &mockFlowController{})
node := newSinkNode(1, sink, 0, 10, &mockFlowController{})
c.Assert(node.Init(pipeline.MockNodeContext4Test(ctx, pipeline.Message{}, nil)), check.IsNil)
c.Assert(node.Status(), check.Equals, TableStatusInitializing)

Expand Down Expand Up @@ -379,7 +379,7 @@ func (s *outputSuite) TestIgnoreEmptyRowChangeEvent(c *check.C) {
},
})
sink := &mockSink{}
node := newSinkNode(sink, 0, 10, &mockFlowController{})
node := newSinkNode(1, sink, 0, 10, &mockFlowController{})
c.Assert(node.Init(pipeline.MockNodeContext4Test(ctx, pipeline.Message{}, nil)), check.IsNil)

// empty row, no Columns and PreColumns.
Expand All @@ -399,7 +399,7 @@ func (s *outputSuite) TestSplitUpdateEventWhenEnableOldValue(c *check.C) {
},
})
sink := &mockSink{}
node := newSinkNode(sink, 0, 10, &mockFlowController{})
node := newSinkNode(1, sink, 0, 10, &mockFlowController{})
c.Assert(node.Init(pipeline.MockNodeContext4Test(ctx, pipeline.Message{}, nil)), check.IsNil)

// nil row.
Expand Down Expand Up @@ -458,7 +458,7 @@ func (s *outputSuite) TestSplitUpdateEventWhenDisableOldValue(c *check.C) {
},
})
sink := &mockSink{}
node := newSinkNode(sink, 0, 10, &mockFlowController{})
node := newSinkNode(1, sink, 0, 10, &mockFlowController{})
c.Assert(node.Init(pipeline.MockNodeContext4Test(ctx, pipeline.Message{}, nil)), check.IsNil)

// nil row.
Expand Down
2 changes: 1 addition & 1 deletion cdc/processor/pipeline/table.go
Original file line number Diff line number Diff line change
Expand Up @@ -206,7 +206,7 @@ func NewTablePipeline(ctx cdcContext.Context,

p := pipeline.NewPipeline(ctx, 500*time.Millisecond, runnerSize, defaultOutputChannelSize)
sorterNode := newSorterNode(tableName, tableID, replicaInfo.StartTs, flowController, mounter)
sinkNode := newSinkNode(sink, replicaInfo.StartTs, targetTs, flowController)
sinkNode := newSinkNode(tableID, sink, replicaInfo.StartTs, targetTs, flowController)

p.AppendNode(ctx, "puller", newPullerNode(tableID, replicaInfo, tableName))
p.AppendNode(ctx, "sorter", sorterNode)
Expand Down
4 changes: 3 additions & 1 deletion cdc/processor/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,8 @@ func (p *processor) tick(ctx cdcContext.Context, state *orchestrator.ChangefeedR
if err := p.lazyInit(ctx); err != nil {
return nil, errors.Trace(err)
}
// sink manager will return this checkpointTs to sink node if sink node resolvedTs flush failed
p.sinkManager.UpdateChangeFeedCheckpointTs(state.Info.GetCheckpointTs(state.Status))
if err := p.handleTableOperation(ctx); err != nil {
return nil, errors.Trace(err)
}
Expand Down Expand Up @@ -705,7 +707,7 @@ func (p *processor) createTablePipelineImpl(ctx cdcContext.Context, tableID mode
}
markTableID = tableInfo.ID
return nil
}, retry.WithBackoffMaxDelay(50), retry.WithBackoffMaxDelay(60*1000), retry.WithMaxTries(20))
}, retry.WithBackoffBaseDelay(50), retry.WithBackoffMaxDelay(60*1000), retry.WithMaxTries(20))
if err != nil {
return nil, errors.Trace(err)
}
Expand Down
2 changes: 2 additions & 0 deletions cdc/processor/processor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
"github.com/pingcap/tiflow/cdc/model"
tablepipeline "github.com/pingcap/tiflow/cdc/processor/pipeline"
"github.com/pingcap/tiflow/cdc/redo"
"github.com/pingcap/tiflow/cdc/sink"
cdcContext "github.com/pingcap/tiflow/pkg/context"
cerror "github.com/pingcap/tiflow/pkg/errors"
"github.com/pingcap/tiflow/pkg/etcd"
Expand All @@ -48,6 +49,7 @@ func newProcessor4Test(
) *processor {
p := newProcessor(ctx)
p.lazyInit = func(ctx cdcContext.Context) error { return nil }
p.sinkManager = &sink.Manager{}
p.redoManager = redo.NewDisabledManager()
p.createTablePipeline = createTablePipeline
p.schemaStorage = &mockSchemaStorage{c: c}
Expand Down
6 changes: 2 additions & 4 deletions cdc/sink/black_hole.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@ func newBlackHoleSink(ctx context.Context, opts map[string]string) *blackHoleSin

type blackHoleSink struct {
statistics *Statistics
checkpointTs uint64
accumulated uint64
lastAccumulated uint64
}
Expand All @@ -46,7 +45,7 @@ func (b *blackHoleSink) EmitRowChangedEvents(ctx context.Context, rows ...*model
return nil
}

func (b *blackHoleSink) FlushRowChangedEvents(ctx context.Context, resolvedTs uint64) (uint64, error) {
func (b *blackHoleSink) FlushRowChangedEvents(ctx context.Context, _ model.TableID, resolvedTs uint64) (uint64, error) {
log.Debug("BlockHoleSink: FlushRowChangedEvents", zap.Uint64("resolvedTs", resolvedTs))
err := b.statistics.RecordBatchExecution(func() (int, error) {
// TODO: add some random replication latency
Expand All @@ -56,7 +55,6 @@ func (b *blackHoleSink) FlushRowChangedEvents(ctx context.Context, resolvedTs ui
return int(batchSize), nil
})
b.statistics.PrintStatus(ctx)
atomic.StoreUint64(&b.checkpointTs, resolvedTs)
return resolvedTs, err
}

Expand All @@ -79,6 +77,6 @@ func (b *blackHoleSink) Close(ctx context.Context) error {
return nil
}

func (b *blackHoleSink) Barrier(ctx context.Context) error {
func (b *blackHoleSink) Barrier(ctx context.Context, tableID model.TableID) error {
return nil
}
58 changes: 41 additions & 17 deletions cdc/sink/buffer_sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,11 +29,12 @@ import (

type bufferSink struct {
Sink
checkpointTs uint64
buffer map[model.TableID][]*model.RowChangedEvent
bufferMu sync.Mutex
flushTsChan chan uint64
drawbackChan chan drawbackMsg
changeFeedCheckpointTs uint64
tableCheckpointTsMap sync.Map
buffer map[model.TableID][]*model.RowChangedEvent
bufferMu sync.Mutex
flushTsChan chan flushMsg
drawbackChan chan drawbackMsg
}

func newBufferSink(
Expand All @@ -42,14 +43,14 @@ func newBufferSink(
errCh chan error,
checkpointTs model.Ts,
drawbackChan chan drawbackMsg,
) Sink {
) *bufferSink {
sink := &bufferSink{
Sink: backendSink,
// buffer shares the same flow control with table sink
buffer: make(map[model.TableID][]*model.RowChangedEvent),
checkpointTs: checkpointTs,
flushTsChan: make(chan uint64, 128),
drawbackChan: drawbackChan,
buffer: make(map[model.TableID][]*model.RowChangedEvent),
changeFeedCheckpointTs: checkpointTs,
flushTsChan: make(chan flushMsg, 128),
drawbackChan: drawbackChan,
}
go sink.run(ctx, errCh)
return sink
Expand Down Expand Up @@ -81,8 +82,9 @@ func (b *bufferSink) run(ctx context.Context, errCh chan error) {
delete(b.buffer, drawback.tableID)
b.bufferMu.Unlock()
close(drawback.callback)
case resolvedTs := <-b.flushTsChan:
case flushEvent := <-b.flushTsChan:
b.bufferMu.Lock()
resolvedTs := flushEvent.resolvedTs
// find all rows before resolvedTs and emit to backend sink
for tableID, rows := range b.buffer {
i := sort.Search(len(rows), func(i int) bool {
Expand All @@ -109,14 +111,15 @@ func (b *bufferSink) run(ctx context.Context, errCh chan error) {
b.bufferMu.Unlock()

start := time.Now()
checkpointTs, err := b.Sink.FlushRowChangedEvents(ctx, resolvedTs)
tableID := flushEvent.tableID
checkpointTs, err := b.Sink.FlushRowChangedEvents(ctx, flushEvent.tableID, resolvedTs)
if err != nil {
if errors.Cause(err) != context.Canceled {
errCh <- err
}
return
}
atomic.StoreUint64(&b.checkpointTs, checkpointTs)
b.tableCheckpointTsMap.Store(tableID, checkpointTs)

dur := time.Since(start)
metricFlushDuration.Observe(dur.Seconds())
Expand Down Expand Up @@ -146,11 +149,32 @@ func (b *bufferSink) EmitRowChangedEvents(ctx context.Context, rows ...*model.Ro
return nil
}

func (b *bufferSink) FlushRowChangedEvents(ctx context.Context, resolvedTs uint64) (uint64, error) {
func (b *bufferSink) FlushRowChangedEvents(ctx context.Context, tableID model.TableID, resolvedTs uint64) (uint64, error) {
select {
case <-ctx.Done():
return atomic.LoadUint64(&b.checkpointTs), ctx.Err()
case b.flushTsChan <- resolvedTs:
return b.getTableCheckpointTs(tableID), ctx.Err()
case b.flushTsChan <- flushMsg{
tableID: tableID,
resolvedTs: resolvedTs,
}:
}
return atomic.LoadUint64(&b.checkpointTs), nil
return b.getTableCheckpointTs(tableID), nil
}

type flushMsg struct {
tableID model.TableID
resolvedTs uint64
}

func (b *bufferSink) getTableCheckpointTs(tableID model.TableID) uint64 {
checkPoints, ok := b.tableCheckpointTsMap.Load(tableID)
if ok {
return checkPoints.(uint64)
}
return atomic.LoadUint64(&b.changeFeedCheckpointTs)
}

// UpdateChangeFeedCheckpointTs update the changeFeedCheckpointTs every processor tick
func (b *bufferSink) UpdateChangeFeedCheckpointTs(checkpointTs uint64) {
atomic.StoreUint64(&b.changeFeedCheckpointTs, checkpointTs)
}
Loading

0 comments on commit a9063b3

Please sign in to comment.