Skip to content

Commit

Permalink
dispatcher: refactor mysql sink and dispatcher (pingcap#890)
Browse files Browse the repository at this point in the history
* fix import in the kafka sink

* mysql sink run the worker by the context explictly.

* mysql sink run the worker by the context explictly.

* refactor the dispatcher

* simplify the condition

* move format vector type check out

* also print the rule
  • Loading branch information
3AceShowHand authored Jan 16, 2025
1 parent 082560c commit f43d142
Show file tree
Hide file tree
Showing 12 changed files with 94 additions and 83 deletions.
68 changes: 33 additions & 35 deletions downstreamadapter/dispatcher/dispatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ All dispatchers will communicate with the Maintainer about self progress and whe
Because Sink does not flush events to the downstream in strict order.
the dispatcher can't send event to Sink continuously all the time,
1. The ddl event/sync point event can be send to Sink only when the previous event has beed flushed to downstream successfully.
1. The ddl event/sync point event can be sent to Sink only when the previous event has been flushed to downstream successfully.
2. Only when the ddl event/sync point event is flushed to downstream successfully, the dispatcher can send the following event to Sink.
3. For the cross table ddl event/sync point event, dispatcher needs to negotiate with the maintainer to decide whether and when send it to Sink.
Expand Down Expand Up @@ -92,13 +92,13 @@ type Dispatcher struct {
// Because when the dispatcher scheduled or the node restarts, there may be some dml events to receive twice.
// So we need to use `Replace` to avoid duplicate key error.
// Table Trigger Event Dispatcher doesn't need this, because it doesn't deal with dml events.
creatationPDTs uint64
creationPDTs uint64
// componentStatus is the status of the dispatcher, such as working, removing, stopped.
componentStatus *ComponentStateWithMutex
// the config of filter
filterConfig *eventpb.FilterConfig

// tableInfo is the latest table info of the dispatcher
// tableInfo is the latest table info of the dispatcher's corresponding table.
tableInfo *common.TableInfo

// shared by the event dispatcher manager
Expand Down Expand Up @@ -175,7 +175,7 @@ func NewDispatcher(
schemaID: schemaID,
schemaIDToDispatchers: schemaIDToDispatchers,
resendTaskMap: newResendTaskMap(),
creatationPDTs: currentPdTs,
creationPDTs: currentPdTs,
errCh: errCh,
}

Expand Down Expand Up @@ -273,7 +273,7 @@ func (d *Dispatcher) HandleDispatcherStatus(dispatcherStatus *heartbeatpb.Dispat
}

// HandleEvents can batch handle events about resolvedTs Event and DML Event.
// While for DDLEvent and SyncPointEvent, they should be handled singly,
// While for DDLEvent and SyncPointEvent, they should be handled separately,
// because they are block events.
// We ensure we only will receive one event when it's ddl event or sync point event
// by setting them with different event types in DispatcherEventsHandler.GetType
Expand Down Expand Up @@ -312,10 +312,10 @@ func (d *Dispatcher) HandleEvents(dispatcherEvents []DispatcherEvent, wakeCallba
return block
}
block = true
dml.ReplicatingTs = d.creatationPDTs
dml.ReplicatingTs = d.creationPDTs
dml.AssembleRows(d.tableInfo)
dml.AddPostFlushFunc(func() {
// Considering dml event in sink may be write to downstream not in order,
// Considering dml event in sink may be written to downstream not in order,
// thus, we use tableProgress.Empty() to ensure these events are flushed to downstream completely
// and wake dynamic stream to handle the next events.
if d.tableProgress.Empty() {
Expand All @@ -329,32 +329,32 @@ func (d *Dispatcher) HandleEvents(dispatcherEvents []DispatcherEvent, wakeCallba
log.Panic("ddl event should only be singly handled", zap.Any("dispatcherID", d.id))
}
block = true
event := event.(*commonEvent.DDLEvent)
// Update the table info of the dispatcher, when it receive ddl event.
d.tableInfo = event.TableInfo
ddl := event.(*commonEvent.DDLEvent)
// Update the table info of the dispatcher, when it receives ddl event.
d.tableInfo = ddl.TableInfo
log.Info("dispatcher receive ddl event",
zap.Stringer("dispatcher", d.id),
zap.String("query", event.Query),
zap.Int64("table", event.TableID),
zap.String("query", ddl.Query),
zap.Int64("table", ddl.TableID),
zap.Uint64("commitTs", event.GetCommitTs()),
zap.Uint64("seq", event.GetSeq()))
event.AddPostFlushFunc(func() {
ddl.AddPostFlushFunc(func() {
if d.tableSchemaStore != nil {
d.tableSchemaStore.AddEvent(event)
d.tableSchemaStore.AddEvent(ddl)
}
wakeCallback()
})
d.dealWithBlockEvent(event)
d.dealWithBlockEvent(ddl)
case commonEvent.TypeSyncPointEvent:
if len(dispatcherEvents) != 1 {
log.Panic("sync point event should only be singly handled", zap.Any("dispatcherID", d.id))
}
block = true
event := event.(*commonEvent.SyncPointEvent)
event.AddPostFlushFunc(func() {
syncPoint := event.(*commonEvent.SyncPointEvent)
syncPoint.AddPostFlushFunc(func() {
wakeCallback()
})
d.dealWithBlockEvent(event)
d.dealWithBlockEvent(syncPoint)
case commonEvent.TypeHandshakeEvent:
log.Warn("Receive handshake event unexpectedly",
zap.Stringer("dispatcher", d.id), zap.Any("event", event))
Expand Down Expand Up @@ -407,21 +407,22 @@ func (d *Dispatcher) shouldBlock(event commonEvent.BlockEvent) bool {
switch event.GetType() {
case commonEvent.TypeDDLEvent:
ddlEvent := event.(*commonEvent.DDLEvent)
if ddlEvent.BlockedTables != nil {
switch ddlEvent.GetBlockedTables().InfluenceType {
case commonEvent.InfluenceTypeNormal:
if len(ddlEvent.GetBlockedTables().TableIDs) > 1 {
return true
} else if !isCompleteSpan(d.tableSpan) {
// if the table is split, even the blockTable only itself, it should block
return true
}
return false
case commonEvent.InfluenceTypeDB, commonEvent.InfluenceTypeAll:
if ddlEvent.BlockedTables == nil {
return false
}
switch ddlEvent.GetBlockedTables().InfluenceType {
case commonEvent.InfluenceTypeNormal:
if len(ddlEvent.GetBlockedTables().TableIDs) > 1 {
return true
}
if !isCompleteSpan(d.tableSpan) {
// if the table is split, even the blockTable only itself, it should block
return true
}
return false
case commonEvent.InfluenceTypeDB, commonEvent.InfluenceTypeAll:
return true
}
return false
case commonEvent.TypeSyncPointEvent:
return true
default:
Expand Down Expand Up @@ -598,9 +599,8 @@ func (d *Dispatcher) GetFilterConfig() *eventpb.FilterConfig {
func (d *Dispatcher) GetSyncPointInterval() time.Duration {
if d.syncPointConfig != nil {
return d.syncPointConfig.SyncPointInterval
} else {
return time.Duration(0)
}
return time.Duration(0)
}

func (d *Dispatcher) Remove() {
Expand All @@ -627,16 +627,14 @@ func (d *Dispatcher) addToStatusDynamicStream() {
func (d *Dispatcher) TryClose() (w heartbeatpb.Watermark, ok bool) {
// If sink is normal(not meet error), we need to wait all the events in sink to flushed downstream successfully.
// If sink is not normal, we can close the dispatcher immediately.
if (d.sink.IsNormal() && d.tableProgress.Empty()) || !d.sink.IsNormal() {
if !d.sink.IsNormal() || d.tableProgress.Empty() {
w.CheckpointTs = d.GetCheckpointTs()
w.ResolvedTs = d.GetResolvedTs()

d.componentStatus.Set(heartbeatpb.ComponentState_Stopped)

if d.IsTableTriggerEventDispatcher() {
d.tableSchemaStore.Clear()
}

return w, true
}
return w, false
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ import (
"sync/atomic"
"time"

"github.com/pingcap/errors"
"github.com/pingcap/log"
"github.com/pingcap/ticdc/downstreamadapter/dispatcher"
"github.com/pingcap/ticdc/downstreamadapter/eventcollector"
Expand All @@ -31,6 +30,7 @@ import (
"github.com/pingcap/ticdc/pkg/common"
appcontext "github.com/pingcap/ticdc/pkg/common/context"
"github.com/pingcap/ticdc/pkg/config"
"github.com/pingcap/ticdc/pkg/errors"
"github.com/pingcap/ticdc/pkg/metrics"
"github.com/pingcap/ticdc/pkg/node"
"github.com/pingcap/ticdc/pkg/pdutil"
Expand Down Expand Up @@ -184,7 +184,7 @@ func NewEventDispatcherManager(
go func() {
defer wg.Done()
err = manager.sink.Run(ctx)
if err != nil && errors.Cause(err) != context.Canceled {
if err != nil && !errors.Is(errors.Cause(err), context.Canceled) {
select {
case <-ctx.Done():
return
Expand Down Expand Up @@ -460,7 +460,7 @@ func (e *EventDispatcherManager) collectErrors(ctx context.Context) {
case <-ctx.Done():
return
case err := <-e.errCh:
if errors.Cause(err) != context.Canceled {
if !errors.Is(errors.Cause(err), context.Canceled) {
log.Error("Event Dispatcher Manager Meets Error",
zap.String("changefeedID", e.changefeedID.String()),
zap.Error(err))
Expand All @@ -477,7 +477,7 @@ func (e *EventDispatcherManager) collectErrors(ctx context.Context) {
e.heartbeatRequestQueue.Enqueue(&HeartBeatRequestWithTargetID{TargetID: e.GetMaintainerID(), Request: &message})

// resend message until the event dispatcher manager is closed
// the first error is matter most, so we just need to resend it continuely and ignore the other errors.
// the first error is matter most, so we just need to resend it continue and ignore the other errors.
ticker := time.NewTicker(time.Second * 5)
for {
select {
Expand Down
5 changes: 2 additions & 3 deletions downstreamadapter/dispatchermanager/helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -451,9 +451,8 @@ func (h *CheckpointTsMessageHandler) Handle(eventDispatcherManager *EventDispatc
panic("invalid message count")
}
checkpointTsMessage := messages[0]
if eventDispatcherManager.tableTriggerEventDispatcher != nil && eventDispatcherManager.sink.SinkType() != common.MysqlSinkType {
tableTriggerEventDispatcher := eventDispatcherManager.tableTriggerEventDispatcher
tableTriggerEventDispatcher.HandleCheckpointTs(checkpointTsMessage.CheckpointTs)
if eventDispatcherManager.tableTriggerEventDispatcher != nil {
eventDispatcherManager.tableTriggerEventDispatcher.HandleCheckpointTs(checkpointTsMessage.CheckpointTs)
}
return false
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
package partition

import (
"go.uber.org/zap"
"strings"

"github.com/pingcap/log"
Expand Down Expand Up @@ -49,7 +50,7 @@ func GetPartitionGenerator(rule string, scheme string, indexName string, columns
return newKeyPartitionGenerator(rule)
}

log.Warn("the partition dispatch rule is not default/ts/table/index-value/columns," +
" use the default rule instead.")
log.Warn("the partition dispatch rule is not default/ts/table/index-value/columns,"+
" use the default rule instead.", zap.String("rule", rule))
return newTablePartitionGenerator()
}
9 changes: 5 additions & 4 deletions downstreamadapter/sink/kafka_sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,15 +19,14 @@ import (
"net/url"
"sync/atomic"

"github.com/pingcap/errors"
"github.com/pingcap/log"
"github.com/pingcap/ticdc/downstreamadapter/sink/helper/topicmanager"
"github.com/pingcap/ticdc/downstreamadapter/worker"
"github.com/pingcap/ticdc/downstreamadapter/worker/producer"
"github.com/pingcap/ticdc/pkg/common"
commonEvent "github.com/pingcap/ticdc/pkg/common/event"
"github.com/pingcap/ticdc/pkg/config"
cerror "github.com/pingcap/ticdc/pkg/errors"
"github.com/pingcap/ticdc/pkg/errors"
"github.com/pingcap/ticdc/pkg/metrics"
"github.com/pingcap/ticdc/pkg/sink/kafka"
"github.com/pingcap/ticdc/pkg/sink/util"
Expand All @@ -48,7 +47,9 @@ type KafkaSink struct {
statistics *metrics.Statistics
metricsCollector kafka.MetricsCollector

isNormal uint32 // if sink is normal, isNormal is 1, otherwise is 0
// isNormal means the sink does not meet error.
// if sink is normal, isNormal is 1, otherwise is 0
isNormal uint32
ctx context.Context
}

Expand Down Expand Up @@ -86,7 +87,7 @@ func newKafkaSink(
statistics := metrics.NewStatistics(changefeedID, "KafkaSink")
asyncProducer, err := kafkaComponent.Factory.AsyncProducer(ctx)
if err != nil {
return nil, cerror.WrapError(cerror.ErrKafkaNewProducer, err)
return nil, errors.WrapError(errors.ErrKafkaNewProducer, err)
}
dmlProducer := producer.NewKafkaDMLProducer(changefeedID, asyncProducer)
dmlWorker := worker.NewKafkaDMLWorker(
Expand Down
15 changes: 8 additions & 7 deletions downstreamadapter/sink/mysql_sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ func verifyMySQLSink(
if err != nil {
return err
}
db.Close()
_ = db.Close()
return nil
}

Expand Down Expand Up @@ -99,18 +99,19 @@ func newMysqlSinkWithDBAndConfig(
statistics: stat,
isNormal: 1,
}
formatVectorType := mysql.ShouldFormatVectorType(db, cfg)
for i := 0; i < workerCount; i++ {
mysqlSink.dmlWorker[i] = worker.NewMysqlDMLWorker(ctx, db, cfg, i, changefeedID, stat)
mysqlSink.dmlWorker[i] = worker.NewMysqlDMLWorker(ctx, db, cfg, i, changefeedID, stat, formatVectorType)
}
mysqlSink.ddlWorker = worker.NewMysqlDDLWorker(ctx, db, cfg, mysqlSink.changefeedID, stat)
mysqlSink.ddlWorker = worker.NewMysqlDDLWorker(ctx, db, cfg, changefeedID, stat, formatVectorType)
return mysqlSink
}

func (s *MysqlSink) Run(ctx context.Context) error {
g, _ := errgroup.WithContext(ctx)
g, ctx := errgroup.WithContext(ctx)
for i := 0; i < s.workerCount; i++ {
g.Go(func() error {
return s.dmlWorker[i].Run()
return s.dmlWorker[i].Run(ctx)
})
}
err := g.Wait()
Expand All @@ -136,7 +137,7 @@ func (s *MysqlSink) AddDMLEvent(event *commonEvent.DMLEvent) {
// directly dividing by the number of buckets may cause unevenness between buckets.
// Therefore, we first take the modulus of the prime number and then take the modulus of the bucket.
index := int64(event.PhysicalTableID) % prime % int64(s.workerCount)
s.dmlWorker[index].GetEventChan() <- event
s.dmlWorker[index].AddDMLEvent(event)
}

func (s *MysqlSink) PassBlockEvent(event commonEvent.BlockEvent) {
Expand All @@ -152,7 +153,7 @@ func (s *MysqlSink) WriteBlockEvent(event commonEvent.BlockEvent) error {
return nil
}

func (s *MysqlSink) AddCheckpointTs(ts uint64) {}
func (s *MysqlSink) AddCheckpointTs(_ uint64) {}

func (s *MysqlSink) GetStartTsList(
tableIds []int64,
Expand Down
18 changes: 11 additions & 7 deletions downstreamadapter/worker/mysql_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@ import (

// MysqlDMLWorker is used to flush the dml event downstream
type MysqlDMLWorker struct {
ctx context.Context
changefeedID common.ChangeFeedID

eventChan chan *commonEvent.DMLEvent
Expand All @@ -48,10 +47,10 @@ func NewMysqlDMLWorker(
id int,
changefeedID common.ChangeFeedID,
statistics *metrics.Statistics,
formatVectorType bool,
) *MysqlDMLWorker {
return &MysqlDMLWorker{
ctx: ctx,
mysqlWriter: mysql.NewMysqlWriter(ctx, db, config, changefeedID, statistics),
mysqlWriter: mysql.NewMysqlWriter(ctx, db, config, changefeedID, statistics, formatVectorType),
id: id,
maxRows: config.MaxTxnRow,
eventChan: make(chan *commonEvent.DMLEvent, 16),
Expand All @@ -63,7 +62,7 @@ func (w *MysqlDMLWorker) GetEventChan() chan *commonEvent.DMLEvent {
return w.eventChan
}

func (w *MysqlDMLWorker) Run() error {
func (w *MysqlDMLWorker) Run(ctx context.Context) error {
namespace := w.changefeedID.Namespace()
changefeed := w.changefeedID.Name()

Expand All @@ -83,8 +82,8 @@ func (w *MysqlDMLWorker) Run() error {
for {
needFlush := false
select {
case <-w.ctx.Done():
return errors.Trace(w.ctx.Err())
case <-ctx.Done():
return errors.Trace(ctx.Err())
case txnEvent := <-w.eventChan:
events = append(events, txnEvent)
rows += int(txnEvent.Len())
Expand Down Expand Up @@ -135,6 +134,10 @@ func (w *MysqlDMLWorker) Close() {
w.mysqlWriter.Close()
}

func (w *MysqlDMLWorker) AddDMLEvent(event *commonEvent.DMLEvent) {
w.eventChan <- event
}

// MysqlDDLWorker is use to flush the ddl event and sync point eventdownstream
type MysqlDDLWorker struct {
changefeedID common.ChangeFeedID
Expand All @@ -147,10 +150,11 @@ func NewMysqlDDLWorker(
config *mysql.MysqlConfig,
changefeedID common.ChangeFeedID,
statistics *metrics.Statistics,
formatVectorType bool,
) *MysqlDDLWorker {
return &MysqlDDLWorker{
changefeedID: changefeedID,
mysqlWriter: mysql.NewMysqlWriter(ctx, db, config, changefeedID, statistics),
mysqlWriter: mysql.NewMysqlWriter(ctx, db, config, changefeedID, statistics, formatVectorType),
}
}

Expand Down
Loading

0 comments on commit f43d142

Please sign in to comment.