Skip to content

Commit

Permalink
adjust
Browse files Browse the repository at this point in the history
  • Loading branch information
liuzix committed Aug 23, 2021
1 parent b3066a5 commit b37220c
Show file tree
Hide file tree
Showing 2 changed files with 9 additions and 10 deletions.
10 changes: 6 additions & 4 deletions cdc/owner/changefeed.go
Original file line number Diff line number Diff line change
Expand Up @@ -207,7 +207,11 @@ LOOP:
c.barriers.Update(ddlJobBarrier, checkpointTs)
c.barriers.Update(finishBarrier, c.state.Info.GetTargetTs())
var err error
c.schema, err = newSchemaWrap4Owner(ctx.GlobalVars().KVStorage, checkpointTs, c.state.Info.Config)
// Note that (checkpointTs == ddl.FinishedTs) DOES NOT imply that the DDL has been completed executed.
// So we need to process all DDLs from the range [checkpointTs, ...), but since the semantics of start-ts requires
// the lower bound of an open interval, i.e. (startTs, ...), we pass checkpointTs-1 as the start-ts to initialize
// the schema cache.
c.schema, err = newSchemaWrap4Owner(ctx.GlobalVars().KVStorage, checkpointTs-1, c.state.Info.Config)
if err != nil {
return errors.Trace(err)
}
Expand All @@ -221,9 +225,7 @@ LOOP:
if err != nil {
return errors.Trace(err)
}
// Since we wait for checkpoint == ddlJob.FinishTs before executing the DDL,
// when there is a recovery, there is no guarantee that the DDL at the checkpoint
// has been executed. So we need to start the DDL puller from (checkpoint-1).
// Refer to the previous comment on why we use (checkpointTs-1).
c.ddlPuller, err = c.newDDLPuller(cancelCtx, checkpointTs-1)
if err != nil {
return errors.Trace(err)
Expand Down
9 changes: 3 additions & 6 deletions cdc/owner/schema.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,15 +41,12 @@ func newSchemaWrap4Owner(kvStorage tidbkv.Storage, startTs model.Ts, config *con
var meta *timeta.Meta
if kvStorage != nil {
var err error
meta, err = kv.GetSnapshotMeta(kvStorage, startTs-1)
meta, err = kv.GetSnapshotMeta(kvStorage, startTs)
if err != nil {
return nil, errors.Trace(err)
}
}
// We do a snapshot read of the metadata from TiKV at (startTs-1) instead of startTs,
// because the DDL puller might send a DDL at startTs, which would cause schema conflicts if
// the DDL's result is already contained in the snapshot.
schemaSnap, err := entry.NewSingleSchemaSnapshotFromMeta(meta, startTs-1, config.ForceReplicate)
schemaSnap, err := entry.NewSingleSchemaSnapshotFromMeta(meta, startTs, config.ForceReplicate)
if err != nil {
return nil, errors.Trace(err)
}
Expand All @@ -61,7 +58,7 @@ func newSchemaWrap4Owner(kvStorage tidbkv.Storage, startTs model.Ts, config *con
schemaSnapshot: schemaSnap,
filter: f,
config: config,
ddlHandledTs: startTs - 1,
ddlHandledTs: startTs,
}, nil
}

Expand Down

0 comments on commit b37220c

Please sign in to comment.