Skip to content

Commit

Permalink
Merge branch 'bump-tidb' of github.com:okJiang/tiflow into bump-tidb
Browse files Browse the repository at this point in the history
  • Loading branch information
okJiang committed Nov 15, 2022
2 parents 9e292a3 + c8279b8 commit 36b0652
Show file tree
Hide file tree
Showing 35 changed files with 476 additions and 335 deletions.
31 changes: 15 additions & 16 deletions cdc/capture/capture.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
"github.com/pingcap/tiflow/cdc/owner"
"github.com/pingcap/tiflow/cdc/processor"
"github.com/pingcap/tiflow/cdc/processor/pipeline/system"
"github.com/pingcap/tiflow/cdc/processor/sourcemanager/engine/factory"
ssystem "github.com/pingcap/tiflow/cdc/sorter/db/system"
"github.com/pingcap/tiflow/pkg/config"
cdcContext "github.com/pingcap/tiflow/pkg/context"
Expand All @@ -36,7 +37,6 @@ import (
"github.com/pingcap/tiflow/pkg/migrate"
"github.com/pingcap/tiflow/pkg/orchestrator"
"github.com/pingcap/tiflow/pkg/p2p"
sortmgr "github.com/pingcap/tiflow/pkg/sorter/manager"
"github.com/pingcap/tiflow/pkg/upstream"
"github.com/pingcap/tiflow/pkg/util"
"github.com/pingcap/tiflow/pkg/version"
Expand Down Expand Up @@ -92,12 +92,12 @@ type captureImpl struct {
EtcdClient etcd.CDCEtcdClient
tableActorSystem *system.System

// useEventSortEngine indicates whether to use the new pull based sort engine or
// useSortEngine indicates whether to use the new pull based sort engine or
// the old push based sorter system. the latter will be removed after all sorter
// have been transformed into pull based sort engine.
useEventSortEngine bool
sorterSystem *ssystem.System
sortEngineManager *sortmgr.EventSortEngineManager
useSortEngine bool
sorterSystem *ssystem.System
sortEngineFactory *factory.SortEngineFactory

// MessageServer is the receiver of the messages from the other nodes.
// It should be recreated each time the capture is restarted.
Expand Down Expand Up @@ -130,7 +130,7 @@ func NewCapture(pdEndpoints []string,
etcdClient etcd.CDCEtcdClient,
grpcService *p2p.ServerWrapper,
tableActorSystem *system.System,
sortEngineManager *sortmgr.EventSortEngineManager,
sortEngineMangerFactory *factory.SortEngineFactory,
sorterSystem *ssystem.System,
) Capture {
conf := config.GetGlobalServerConfig()
Expand All @@ -146,9 +146,9 @@ func NewCapture(pdEndpoints []string,
newOwner: owner.NewOwner,
info: &model.CaptureInfo{},

useEventSortEngine: sortEngineManager != nil,
sortEngineManager: sortEngineManager,
sorterSystem: sorterSystem,
useSortEngine: sortEngineMangerFactory != nil,
sortEngineFactory: sortEngineMangerFactory,
sorterSystem: sorterSystem,

migrator: migrate.NewMigrator(etcdClient, pdEndpoints, conf),
}
Expand Down Expand Up @@ -311,14 +311,13 @@ func (c *captureImpl) run(stdCtx context.Context) error {

g, stdCtx := errgroup.WithContext(stdCtx)
ctx := cdcContext.NewContext(stdCtx, &cdcContext.GlobalVars{
CaptureInfo: c.info,
EtcdClient: c.EtcdClient,
TableActorSystem: c.tableActorSystem,
MessageServer: c.MessageServer,
MessageRouter: c.MessageRouter,

CaptureInfo: c.info,
EtcdClient: c.EtcdClient,
TableActorSystem: c.tableActorSystem,
MessageServer: c.MessageServer,
MessageRouter: c.MessageRouter,
SorterSystem: c.sorterSystem,
SortEngineManager: c.sortEngineManager,
SortEngineFactory: c.sortEngineFactory,
})

g.Go(func() error {
Expand Down
50 changes: 0 additions & 50 deletions cdc/processor/pipeline/puller.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ import (
"github.com/pingcap/tiflow/pkg/config"
"github.com/pingcap/tiflow/pkg/pipeline"
"github.com/pingcap/tiflow/pkg/regionspan"
"github.com/pingcap/tiflow/pkg/sorter"
"github.com/pingcap/tiflow/pkg/upstream"
"github.com/pingcap/tiflow/pkg/util"
"golang.org/x/sync/errgroup"
Expand Down Expand Up @@ -107,52 +106,3 @@ func (n *pullerNode) startWithSorterNode(ctx pipeline.NodeContext,
n.cancel = cancel
return nil
}

func (n *pullerNode) startWithEventSortEngine(ctx pipeline.NodeContext,
up *upstream.Upstream, wg *errgroup.Group,
eventSortEngine sorter.EventSortEngine,
) error {
n.wg = wg
ctxC, cancel := context.WithCancel(ctx)
ctxC = contextutil.PutCaptureAddrInCtx(ctxC, ctx.GlobalVars().CaptureInfo.AdvertiseAddr)
ctxC = contextutil.PutRoleInCtx(ctxC, util.RoleProcessor)
kvCfg := config.GetGlobalServerConfig().KVClient
// NOTICE: always pull the old value internally
// See also: https://github.com/pingcap/tiflow/issues/2301.
n.plr = puller.New(
ctxC,
up.PDClient,
up.GrpcPool,
up.RegionCache,
up.KVStorage,
up.PDClock,
n.startTs,
n.tableSpan(),
kvCfg,
n.changefeed,
n.tableID,
n.tableName,
)
n.wg.Go(func() error {
ctx.Throw(errors.Trace(n.plr.Run(ctxC)))
return nil
})
n.wg.Go(func() error {
for {
select {
case <-ctxC.Done():
return nil
case rawKV := <-n.plr.Output():
if rawKV == nil {
continue
}
pEvent := model.NewPolymorphicEvent(rawKV)
if err := eventSortEngine.Add(n.tableID, pEvent); err != nil {
return err
}
}
}
})
n.cancel = cancel
return nil
}
109 changes: 28 additions & 81 deletions cdc/processor/pipeline/table_actor.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@ import (
cdcContext "github.com/pingcap/tiflow/pkg/context"
cerror "github.com/pingcap/tiflow/pkg/errors"
pmessage "github.com/pingcap/tiflow/pkg/pipeline/message"
"github.com/pingcap/tiflow/pkg/sorter"
"github.com/pingcap/tiflow/pkg/upstream"
"github.com/tikv/client-go/v2/oracle"
uberatomic "go.uber.org/atomic"
Expand Down Expand Up @@ -78,12 +77,7 @@ type tableActor struct {
// contains all nodes except pullerNode
nodes []*ActorNode

// If useEventSortEngine is true eventSortEngine will be used, otherwise sortNode will be used.
//
// TODO(qupeng): adjust it after all sorters are transformed to EventSortEngine.
useEventSortEngine bool
eventSortEngine sorter.EventSortEngine
sortNode *sorterNode
sortNode *sorterNode

// states of table actor
started bool
Expand Down Expand Up @@ -137,10 +131,6 @@ func NewTableActor(
// All sub-goroutines should be spawn in this wait group.
wg, cctx := errgroup.WithContext(ctx)

// TODO(qupeng): adjust it after all sorters are transformed to EventSortEngine.
debugConfig := serverConfig.GetGlobalServerConfig().Debug
useEventSortEngine := debugConfig.EnablePullBasedSink && debugConfig.EnableDBSorter

table := &tableActor{
// all errors in table actor will be reported to processor
reportErr: cdcCtx.Throw,
Expand All @@ -162,9 +152,7 @@ func NewTableActor(
targetTs: targetTs,
started: false,

useEventSortEngine: useEventSortEngine,
eventSortEngine: nil,
sortNode: nil,
sortNode: nil,

changefeedID: changefeedVars.ID,
changefeedVars: changefeedVars,
Expand Down Expand Up @@ -263,9 +251,7 @@ func (t *tableActor) handleDataMsg(ctx context.Context) error {
}

func (t *tableActor) handleBarrierMsg(ctx context.Context, barrierTs model.Ts) error {
if !t.useEventSortEngine {
t.sortNode.updateBarrierTs(barrierTs)
}
t.sortNode.updateBarrierTs(barrierTs)
return t.sinkNode.updateBarrierTs(ctx, barrierTs)
}

Expand Down Expand Up @@ -311,24 +297,12 @@ func (t *tableActor) start(sdtTableContext context.Context) error {

flowController := flowcontrol.NewTableFlowController(t.memoryQuota,
t.redoManager.Enabled(), splitTxn)
if !t.useEventSortEngine {
sorterNode := newSorterNode(t.tableName, t.tableID,
t.replicaInfo.StartTs, flowController,
t.mg, &t.state, t.changefeedID, t.redoManager.Enabled(),
t.upstream.PDClient,
)
t.sortNode = sorterNode
} else {
engine, err := t.globalVars.SortEngineManager.Create(t.changefeedVars.ID)
if err != nil {
log.Error("create sort engine fail",
zap.String("namespace", t.changefeedID.Namespace),
zap.String("changefeed", t.changefeedID.ID),
zap.Error(err))
return err
}
t.eventSortEngine = engine
}
sorterNode := newSorterNode(t.tableName, t.tableID,
t.replicaInfo.StartTs, flowController,
t.mg, &t.state, t.changefeedID, t.redoManager.Enabled(),
t.upstream.PDClient,
)
t.sortNode = sorterNode

sortActorNodeContext := newContext(sdtTableContext, t.tableName,
t.globalVars.TableActorSystem.Router(),
Expand Down Expand Up @@ -397,13 +371,10 @@ func (t *tableActor) stop() {
}
atomic.StoreUint32(&t.stopped, stopped)

if !t.useEventSortEngine && t.sortNode != nil {
if t.sortNode != nil {
// releaseResource will send a message to sorter router
t.sortNode.releaseResource()
}
if t.useEventSortEngine && t.eventSortEngine != nil {
t.eventSortEngine.RemoveTable(t.tableID)
}

t.cancel()
if t.sinkNode != nil && !t.sinkStopped.Load() {
Expand Down Expand Up @@ -445,9 +416,6 @@ func (t *tableActor) ResolvedTs() model.Ts {
if t.redoManager.Enabled() {
return t.redoManager.GetResolvedTs(t.tableID)
}
if t.useEventSortEngine {
return t.eventSortEngine.GetResolvedTs(t.tableID)
}
return t.sortNode.ResolvedTs()
}

Expand Down Expand Up @@ -518,16 +486,14 @@ func (t *tableActor) Stats() tablepb.Stats {
},
}

if !t.useEventSortEngine {
sorterStats := t.sortNode.sorter.Stats()
stats.StageCheckpoints["sorter-ingress"] = tablepb.Checkpoint{
CheckpointTs: sorterStats.CheckpointTsIngress,
ResolvedTs: sorterStats.ResolvedTsIngress,
}
stats.StageCheckpoints["sorter-egress"] = tablepb.Checkpoint{
CheckpointTs: sorterStats.CheckpointTsEgress,
ResolvedTs: sorterStats.ResolvedTsEgress,
}
sorterStats := t.sortNode.sorter.Stats()
stats.StageCheckpoints["sorter-ingress"] = tablepb.Checkpoint{
CheckpointTs: sorterStats.CheckpointTsIngress,
ResolvedTs: sorterStats.ResolvedTsIngress,
}
stats.StageCheckpoints["sorter-egress"] = tablepb.Checkpoint{
CheckpointTs: sorterStats.CheckpointTsEgress,
ResolvedTs: sorterStats.ResolvedTsEgress,
}

return stats
Expand Down Expand Up @@ -570,48 +536,29 @@ func (t *tableActor) Wait() {

// MemoryConsumption return the memory consumption in bytes
func (t *tableActor) MemoryConsumption() uint64 {
if !t.useEventSortEngine {
return t.sortNode.flowController.GetConsumption()
}
// TODO(qupeng): sink manager should handle this.
return 0
return t.sortNode.flowController.GetConsumption()
}

func (t *tableActor) Start(ts model.Ts) {
if !t.useEventSortEngine {
if atomic.CompareAndSwapInt32(&t.sortNode.started, 0, 1) {
t.sortNode.startTsCh <- ts
close(t.sortNode.startTsCh)
}
} else {
t.eventSortEngine.AddTable(t.tableID)
if atomic.CompareAndSwapInt32(&t.sortNode.started, 0, 1) {
t.sortNode.startTsCh <- ts
close(t.sortNode.startTsCh)
}
}

func (t *tableActor) RemainEvents() int64 {
if !t.useEventSortEngine {
return t.sortNode.remainEvent()
}
// TODO(qupeng): record it in sort engine and sinkmanager.
return 0
return t.sortNode.remainEvent()
}

// for ut
var startPuller = func(t *tableActor, ctx *actorNodeContext) error {
if !t.useEventSortEngine {
return t.pullerNode.startWithSorterNode(ctx, t.upstream, t.wg, t.sortNode)
}
return t.pullerNode.startWithEventSortEngine(ctx, t.upstream, t.wg, t.eventSortEngine)
return t.pullerNode.startWithSorterNode(ctx, t.upstream, t.wg, t.sortNode)
}

var startSorter = func(t *tableActor, ctx *actorNodeContext) error {
if !t.useEventSortEngine {
eventSorter, err := createSorter(ctx, t.tableName, t.tableID)
if err != nil {
return errors.Trace(err)
}
return t.sortNode.start(ctx, t.wg, t.actorID, t.router, eventSorter)
eventSorter, err := createSorter(ctx, t.tableName, t.tableID)
if err != nil {
return errors.Trace(err)
}
t.eventSortEngine.AddTable(t.tableID)
return nil
return t.sortNode.start(ctx, t.wg, t.actorID, t.router, eventSorter)
}
6 changes: 3 additions & 3 deletions cdc/processor/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -1086,9 +1086,9 @@ func (p *processor) Close(ctx cdcContext.Context) error {
zap.Duration("duration", time.Since(start)))
}

sortEngineManager := ctx.GlobalVars().SortEngineManager
if sortEngineManager != nil {
if err := sortEngineManager.Drop(p.changefeedID); err != nil {
engineFactory := ctx.GlobalVars().SortEngineFactory
if engineFactory != nil {
if err := engineFactory.Drop(p.changefeedID); err != nil {
log.Error("drop event sort engine fail",
zap.String("namespace", p.changefeedID.Namespace),
zap.String("changefeed", p.changefeedID.ID),
Expand Down
Loading

0 comments on commit 36b0652

Please sign in to comment.