Skip to content

Commit

Permalink
owner/(ticdc): fix the disorder problem for ddl events execution (#5408)
Browse files Browse the repository at this point in the history
close #5406
  • Loading branch information
zhaoxinyu authored May 13, 2022
1 parent f3bf091 commit 2b270b2
Show file tree
Hide file tree
Showing 4 changed files with 28 additions and 43 deletions.
1 change: 1 addition & 0 deletions cdc/model/sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -468,6 +468,7 @@ type DDLEvent struct {
PreTableInfo *SimpleTableInfo `msg:"pre-table-info"`
Query string `msg:"query"`
Type model.ActionType `msg:"-"`
Done bool `msg:"-"`
}

// RedoDDLEvent represents DDL event used in redo log persistent
Expand Down
15 changes: 3 additions & 12 deletions cdc/owner/changefeed.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ type changefeed struct {
// a DDL job asynchronously. After the DDL job has been executed,
// ddlEventCache will be set to nil. ddlEventCache contains more than
// one event for a rename tables DDL job.
ddlEventCache map[*model.DDLEvent]bool
ddlEventCache []*model.DDLEvent
// currentTableNames is the table names that the changefeed is watching.
// And it contains only the tables of the ddl that have been processed.
// The ones that have not been executed yet do not have.
Expand Down Expand Up @@ -569,10 +569,7 @@ func (c *changefeed) asyncExecDDLJob(ctx cdcContext.Context,
if err != nil {
return false, errors.Trace(err)
}
c.ddlEventCache = make(map[*model.DDLEvent]bool)
for _, event := range ddlEvents {
c.ddlEventCache[event] = false
}
c.ddlEventCache = ddlEvents
for _, ddlEvent := range ddlEvents {
if c.redoManager.Enabled() {
err = c.redoManager.EmitDDLEvent(ctx, ddlEvent)
Expand All @@ -584,17 +581,11 @@ func (c *changefeed) asyncExecDDLJob(ctx cdcContext.Context,
}

jobDone := true
for event, done := range c.ddlEventCache {
if done {
continue
}
for _, event := range c.ddlEventCache {
eventDone, err := c.asyncExecDDLEvent(ctx, event)
if err != nil {
return false, err
}
if eventDone {
c.ddlEventCache[event] = true
}
jobDone = jobDone && eventDone
}

Expand Down
34 changes: 17 additions & 17 deletions cdc/owner/changefeed_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -805,12 +805,12 @@ func TestExecRenameTablesDDL(t *testing.T) {
oldTableIDs = append(oldTableIDs, job.TableID)
}
require.Nil(t, err)
require.Equal(t, done, false)
require.Equal(t, false, done)
require.Equal(t, expectedDDL, mockDDLSink.ddlExecuting.Query)
mockDDLSink.ddlDone = true
done, err = cf.asyncExecDDLJob(ctx, job)
require.Nil(t, err)
require.Equal(t, done, true)
require.Equal(t, true, done)
require.Equal(t, expectedDDL, mockDDLSink.ddlExecuting.Query)
}

Expand Down Expand Up @@ -852,19 +852,19 @@ func TestExecRenameTablesDDL(t *testing.T) {
mockDDLSink.recordDDLHistory = true
done, err := cf.asyncExecDDLJob(ctx, job)
require.Nil(t, err)
require.Equal(t, done, false)
require.Equal(t, false, done)
require.Len(t, mockDDLSink.ddlHistory, 2)
require.Equal(t, mockDDLSink.ddlHistory[0],
"RENAME TABLE `test1`.`tb1` TO `test2`.`tb10`")
require.Equal(t, mockDDLSink.ddlHistory[1],
"RENAME TABLE `test2`.`tb2` TO `test1`.`tb20`")
require.Equal(t, "RENAME TABLE `test1`.`tb1` TO `test2`.`tb10`",
mockDDLSink.ddlHistory[0])
require.Equal(t, "RENAME TABLE `test2`.`tb2` TO `test1`.`tb20`",
mockDDLSink.ddlHistory[1])

// mock all of the rename table statements have been done
mockDDLSink.resetDDLDone = false
mockDDLSink.ddlDone = true
done, err = cf.asyncExecDDLJob(ctx, job)
require.Nil(t, err)
require.Equal(t, done, true)
require.Equal(t, true, done)
}

func TestExecDropTablesDDL(t *testing.T) {
Expand All @@ -886,12 +886,12 @@ func TestExecDropTablesDDL(t *testing.T) {
job := helper.DDL2Job(actualDDL)
done, err := cf.asyncExecDDLJob(ctx, job)
require.Nil(t, err)
require.Equal(t, done, false)
require.Equal(t, false, done)
require.Equal(t, expectedDDL, mockDDLSink.ddlExecuting.Query)
mockDDLSink.ddlDone = true
done, err = cf.asyncExecDDLJob(ctx, job)
require.Nil(t, err)
require.Equal(t, done, true)
require.Equal(t, true, done)
}

execCreateStmt("create database test1",
Expand All @@ -909,12 +909,12 @@ func TestExecDropTablesDDL(t *testing.T) {
execDropStmt := func(job *timodel.Job, expectedDDL string) {
done, err := cf.asyncExecDDLJob(ctx, job)
require.Nil(t, err)
require.Equal(t, done, false)
require.Equal(t, false, done)
require.Equal(t, mockDDLSink.ddlExecuting.Query, expectedDDL)
mockDDLSink.ddlDone = true
done, err = cf.asyncExecDDLJob(ctx, job)
require.Nil(t, err)
require.Equal(t, done, true)
require.Equal(t, true, done)
}

execDropStmt(jobs[0], "DROP TABLE `test1`.`tb2`")
Expand All @@ -940,12 +940,12 @@ func TestExecDropViewsDDL(t *testing.T) {
job := helper.DDL2Job(actualDDL)
done, err := cf.asyncExecDDLJob(ctx, job)
require.Nil(t, err)
require.Equal(t, done, false)
require.Equal(t, false, done)
require.Equal(t, expectedDDL, mockDDLSink.ddlExecuting.Query)
mockDDLSink.ddlDone = true
done, err = cf.asyncExecDDLJob(ctx, job)
require.Nil(t, err)
require.Equal(t, done, true)
require.Equal(t, true, done)
}
execCreateStmt("create database test1",
"CREATE DATABASE `test1`")
Expand All @@ -970,12 +970,12 @@ func TestExecDropViewsDDL(t *testing.T) {
execDropStmt := func(job *timodel.Job, expectedDDL string) {
done, err := cf.asyncExecDDLJob(ctx, job)
require.Nil(t, err)
require.Equal(t, done, false)
require.Equal(t, mockDDLSink.ddlExecuting.Query, expectedDDL)
require.Equal(t, false, done)
require.Equal(t, expectedDDL, mockDDLSink.ddlExecuting.Query)
mockDDLSink.ddlDone = true
done, err = cf.asyncExecDDLJob(ctx, job)
require.Nil(t, err)
require.Equal(t, done, true)
require.Equal(t, true, done)
}

execDropStmt(jobs[0], "DROP VIEW `test1`.`view2`")
Expand Down
21 changes: 7 additions & 14 deletions cdc/owner/ddl_sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,9 +65,6 @@ type ddlSinkImpl struct {
checkpointTs model.Ts
currentTableNames []model.TableName
}
// ddlFinishedTsMap is used to check whether a ddl event in a ddl job has
// been executed successfully.
ddlFinishedTsMap sync.Map
// ddlSentTsMap is used to check whether a ddl event in a ddl job has been
// sent to `ddlCh` successfully.
ddlSentTsMap map[*model.DDLEvent]model.Ts
Expand Down Expand Up @@ -198,10 +195,10 @@ func (s *ddlSinkImpl) run(ctx cdcContext.Context, id model.ChangeFeedID, info *m
zap.String("changefeed", ctx.ChangefeedVars().ID.ID),
zap.Bool("ignored", err != nil),
zap.Any("ddl", ddl))
s.ddlFinishedTsMap.Store(ddl, ddl.CommitTs)
// Force emitting checkpoint ts when a ddl event is finished.
// Otherwise, a kafka consumer may not execute that ddl event.
s.mu.Lock()
ddl.Done = true
checkpointTs := s.mu.checkpointTs
if checkpointTs == 0 || checkpointTs <= lastCheckpointTs {
s.mu.Unlock()
Expand Down Expand Up @@ -242,22 +239,18 @@ func (s *ddlSinkImpl) emitCheckpointTs(ts uint64, tableNames []model.TableName)
// and CommitTs. So in emitDDLEvent, we get the DDL finished ts of an event
// from a map in order to check whether that event is finshed or not.
func (s *ddlSinkImpl) emitDDLEvent(ctx cdcContext.Context, ddl *model.DDLEvent) (bool, error) {
var ddlFinishedTs model.Ts

ts, ok := s.ddlFinishedTsMap.Load(ddl)
if ok {
ddlFinishedTs = ts.(model.Ts)
}
if ddl.CommitTs <= ddlFinishedTs {
s.mu.Lock()
if ddl.Done {
// the DDL event is executed successfully, and done is true
log.Info("ddl already executed",
zap.String("namespace", ctx.ChangefeedVars().ID.Namespace),
zap.String("changefeed", ctx.ChangefeedVars().ID.ID),
zap.Uint64("ddlFinishedTs", ddlFinishedTs), zap.Any("DDL", ddl))
s.ddlFinishedTsMap.Delete(ddl)
zap.Any("DDL", ddl))
delete(s.ddlSentTsMap, ddl)
s.mu.Unlock()
return true, nil
}
s.mu.Unlock()

ddlSentTs := s.ddlSentTsMap[ddl]
if ddl.CommitTs <= ddlSentTs {
Expand All @@ -282,7 +275,7 @@ func (s *ddlSinkImpl) emitDDLEvent(ctx cdcContext.Context, ddl *model.DDLEvent)
zap.String("namespace", ctx.ChangefeedVars().ID.Namespace),
zap.String("changefeed", ctx.ChangefeedVars().ID.ID),
zap.Uint64("ddlSentTs", ddlSentTs),
zap.Uint64("ddlFinishedTs", ddlFinishedTs), zap.Any("DDL", ddl))
zap.Any("DDL", ddl))
// if this hit, we think that ddlCh is full,
// just return false and send the ddl in the next round.
}
Expand Down

0 comments on commit 2b270b2

Please sign in to comment.