Skip to content

Commit

Permalink
Ignore duplicate tables (pingcap#300)
Browse files Browse the repository at this point in the history
* ignore duplicate table

* ignore duplicate table
  • Loading branch information
sdojjy authored Sep 18, 2024
1 parent 6fcadcb commit 95e36be
Show file tree
Hide file tree
Showing 3 changed files with 26 additions and 4 deletions.
11 changes: 11 additions & 0 deletions maintainer/barrier.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ import (
"time"

"github.com/flowbehappy/tigate/pkg/node"
"github.com/pingcap/log"
"go.uber.org/zap"

"github.com/flowbehappy/tigate/heartbeatpb"
"github.com/flowbehappy/tigate/pkg/common"
Expand Down Expand Up @@ -274,13 +276,19 @@ func (b *BarrierEvent) scheduleBlockEvent() []*messaging.TargetMessage {
switch b.dropDispatchers.InfluenceType {
case heartbeatpb.InfluenceType_DB:
for _, stm := range b.scheduler.GetTasksBySchemaID(b.dropDispatchers.SchemaID) {
log.Info(" remove table",
zap.String("changefeed", b.cfID),
zap.String("table", stm.ID.String()))
msg := b.scheduler.RemoveTask(stm)
if msg != nil {
msgs = append(msgs, msg)
}
}
case heartbeatpb.InfluenceType_Normal:
for _, stm := range b.dropTasks {
log.Info(" remove table",
zap.String("changefeed", b.cfID),
zap.String("table", stm.ID.String()))
msg := b.scheduler.RemoveTask(stm)
if msg != nil {
msgs = append(msgs, msg)
Expand All @@ -291,6 +299,9 @@ func (b *BarrierEvent) scheduleBlockEvent() []*messaging.TargetMessage {
}
}
for _, add := range b.newTables {
log.Info(" add new table",
zap.String("changefeed", b.cfID),
zap.Int64("table", add.TableID))
b.scheduler.AddNewTable(common.Table{
SchemaID: add.SchemaID,
TableID: add.TableID,
Expand Down
16 changes: 14 additions & 2 deletions maintainer/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,11 @@ import (
"sort"
"time"

"github.com/flowbehappy/tigate/pkg/node"

"github.com/flowbehappy/tigate/heartbeatpb"
"github.com/flowbehappy/tigate/maintainer/split"
"github.com/flowbehappy/tigate/pkg/common"
"github.com/flowbehappy/tigate/pkg/messaging"
"github.com/flowbehappy/tigate/pkg/node"
"github.com/flowbehappy/tigate/scheduler"
"github.com/flowbehappy/tigate/utils"
"github.com/flowbehappy/tigate/utils/heap"
Expand All @@ -45,6 +44,8 @@ type Scheduler struct {
nodeTasks map[node.ID]map[common.DispatcherID]*scheduler.StateMachine
// group the tasks by schema id
schemaTasks map[int64]map[common.DispatcherID]*scheduler.StateMachine
// tables
tableTasks map[int64]struct{}
// totalMaps holds all state maps, absent, committing, working and removing
totalMaps []map[common.DispatcherID]*scheduler.StateMachine
bootstrapped bool
Expand All @@ -70,6 +71,7 @@ func NewScheduler(changefeedID string,
s := &Scheduler{
nodeTasks: make(map[node.ID]map[common.DispatcherID]*scheduler.StateMachine),
schemaTasks: make(map[int64]map[common.DispatcherID]*scheduler.StateMachine),
tableTasks: make(map[int64]struct{}),
startCheckpointTs: checkpointTs,
changefeedID: changefeedID,
bootstrapped: false,
Expand Down Expand Up @@ -104,6 +106,13 @@ func (s *Scheduler) GetAllNodes() []node.ID {
}

func (s *Scheduler) AddNewTable(table common.Table, startTs uint64) {
_, ok := s.tableTasks[table.TableID]
if ok {
log.Warn("table already add, ignore",
zap.String("changefeed", s.changefeedID),
zap.Int64("table", table.TableID))
return
}
span := spanz.TableIDToComparableSpan(table.TableID)
tableSpan := &heartbeatpb.TableSpan{
TableID: table.TableID,
Expand Down Expand Up @@ -217,6 +226,7 @@ func (s *Scheduler) RemoveTask(stm *scheduler.StateMachine) *messaging.TargetMes
zap.String("id", id.String()))
delete(s.Absent(), id)
delete(s.schemaTasks[replica.SchemaID], id)
delete(s.tableTasks, replica.Span.TableID)
return nil
}
oldState := stm.State
Expand Down Expand Up @@ -591,6 +601,7 @@ func (s *Scheduler) addNewSpans(schemaID, tableID int64,
s.schemaTasks[schemaID] = schemaMap
}
schemaMap[dispatcherID] = stm
s.tableTasks[tableID] = struct{}{}
}
}

Expand All @@ -612,6 +623,7 @@ func (s *Scheduler) tryMoveTask(dispatcherID common.DispatcherID,
delete(m, dispatcherID)
}
delete(s.schemaTasks[stm.Inferior.(*ReplicaSet).SchemaID], dispatcherID)
delete(s.tableTasks, stm.Inferior.(*ReplicaSet).Span.TableID)
}
// keep node task map is updated
if modifyNodeMap && oldPrimary != stm.Primary {
Expand Down
3 changes: 1 addition & 2 deletions maintainer/scheduler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,9 @@ import (
"testing"
"time"

"github.com/flowbehappy/tigate/pkg/node"

"github.com/flowbehappy/tigate/heartbeatpb"
"github.com/flowbehappy/tigate/pkg/common"
"github.com/flowbehappy/tigate/pkg/node"
"github.com/flowbehappy/tigate/scheduler"
"github.com/flowbehappy/tigate/utils"
"github.com/pingcap/tiflow/cdc/model"
Expand Down

0 comments on commit 95e36be

Please sign in to comment.