diff --git a/cdc/owner/changefeed.go b/cdc/owner/changefeed.go index d75ba223479..0281fcf82f2 100644 --- a/cdc/owner/changefeed.go +++ b/cdc/owner/changefeed.go @@ -207,7 +207,11 @@ LOOP: c.barriers.Update(ddlJobBarrier, checkpointTs) c.barriers.Update(finishBarrier, c.state.Info.GetTargetTs()) var err error - c.schema, err = newSchemaWrap4Owner(ctx.GlobalVars().KVStorage, checkpointTs, c.state.Info.Config) + // Note that (checkpointTs == ddl.FinishedTs) DOES NOT imply that the DDL has been completed executed. + // So we need to process all DDLs from the range [checkpointTs, ...), but since the semantics of start-ts requires + // the lower bound of an open interval, i.e. (startTs, ...), we pass checkpointTs-1 as the start-ts to initialize + // the schema cache. + c.schema, err = newSchemaWrap4Owner(ctx.GlobalVars().KVStorage, checkpointTs-1, c.state.Info.Config) if err != nil { return errors.Trace(err) } @@ -221,9 +225,7 @@ LOOP: if err != nil { return errors.Trace(err) } - // Since we wait for checkpoint == ddlJob.FinishTs before executing the DDL, - // when there is a recovery, there is no guarantee that the DDL at the checkpoint - // has been executed. So we need to start the DDL puller from (checkpoint-1). + // Refer to the previous comment on why we use (checkpointTs-1). c.ddlPuller, err = c.newDDLPuller(cancelCtx, checkpointTs-1) if err != nil { return errors.Trace(err) diff --git a/cdc/owner/schema.go b/cdc/owner/schema.go index 174daabc9a9..f518cdada16 100644 --- a/cdc/owner/schema.go +++ b/cdc/owner/schema.go @@ -41,15 +41,12 @@ func newSchemaWrap4Owner(kvStorage tidbkv.Storage, startTs model.Ts, config *con var meta *timeta.Meta if kvStorage != nil { var err error - meta, err = kv.GetSnapshotMeta(kvStorage, startTs-1) + meta, err = kv.GetSnapshotMeta(kvStorage, startTs) if err != nil { return nil, errors.Trace(err) } } - // We do a snapshot read of the metadata from TiKV at (startTs-1) instead of startTs, - // because the DDL puller might send a DDL at startTs, which would cause schema conflicts if - // the DDL's result is already contained in the snapshot. - schemaSnap, err := entry.NewSingleSchemaSnapshotFromMeta(meta, startTs-1, config.ForceReplicate) + schemaSnap, err := entry.NewSingleSchemaSnapshotFromMeta(meta, startTs, config.ForceReplicate) if err != nil { return nil, errors.Trace(err) } @@ -61,7 +58,7 @@ func newSchemaWrap4Owner(kvStorage tidbkv.Storage, startTs model.Ts, config *con schemaSnapshot: schemaSnap, filter: f, config: config, - ddlHandledTs: startTs - 1, + ddlHandledTs: startTs, }, nil }