Skip to content

Commit

Permalink
owner: fix schema snapshot read from TiKV (#2604) (#2608)
Browse files Browse the repository at this point in the history
  • Loading branch information
ti-chi-bot authored Aug 23, 2021
1 parent 4c1896a commit 692c242
Show file tree
Hide file tree
Showing 3 changed files with 10 additions and 11 deletions.
10 changes: 6 additions & 4 deletions cdc/owner/changefeed.go
Original file line number Diff line number Diff line change
Expand Up @@ -207,7 +207,11 @@ LOOP:
c.barriers.Update(ddlJobBarrier, checkpointTs)
c.barriers.Update(finishBarrier, c.state.Info.GetTargetTs())
var err error
c.schema, err = newSchemaWrap4Owner(ctx.GlobalVars().KVStorage, checkpointTs, c.state.Info.Config)
// Note that (checkpointTs == ddl.FinishedTs) DOES NOT imply that the DDL has been completed executed.
// So we need to process all DDLs from the range [checkpointTs, ...), but since the semantics of start-ts requires
// the lower bound of an open interval, i.e. (startTs, ...), we pass checkpointTs-1 as the start-ts to initialize
// the schema cache.
c.schema, err = newSchemaWrap4Owner(ctx.GlobalVars().KVStorage, checkpointTs-1, c.state.Info.Config)
if err != nil {
return errors.Trace(err)
}
Expand All @@ -221,9 +225,7 @@ LOOP:
if err != nil {
return errors.Trace(err)
}
// Since we wait for checkpoint == ddlJob.FinishTs before executing the DDL,
// when there is a recovery, there is no guarantee that the DDL at the checkpoint
// has been executed. So we need to start the DDL puller from (checkpoint-1).
// Refer to the previous comment on why we use (checkpointTs-1).
c.ddlPuller, err = c.newDDLPuller(cancelCtx, checkpointTs-1)
if err != nil {
return errors.Trace(err)
Expand Down
5 changes: 1 addition & 4 deletions cdc/owner/schema.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,10 +46,7 @@ func newSchemaWrap4Owner(kvStorage tidbkv.Storage, startTs model.Ts, config *con
return nil, errors.Trace(err)
}
}
// We do a snapshot read of the metadata from TiKV at (startTs-1) instead of startTs,
// because the DDL puller might send a DDL at startTs, which would cause schema conflicts if
// the DDL's result is already contained in the snapshot.
schemaSnap, err := entry.NewSingleSchemaSnapshotFromMeta(meta, startTs-1, config.ForceReplicate)
schemaSnap, err := entry.NewSingleSchemaSnapshotFromMeta(meta, startTs, config.ForceReplicate)
if err != nil {
return nil, errors.Trace(err)
}
Expand Down
6 changes: 3 additions & 3 deletions tests/ddl_reentrant/run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -103,10 +103,10 @@ function ddl_test() {

echo $ddl > ${WORK_DIR}/ddl_temp.sql
ensure 10 check_ddl_executed "${WORK_DIR}/cdc.log" "${WORK_DIR}/ddl_temp.sql" true
ddl_start_ts=$(grep "Execute DDL succeeded" ${WORK_DIR}/cdc.log|tail -n 1|grep -oE '"StartTs\\":[0-9]{18}'|awk -F: '{print $(NF)}')
ddl_finished_ts=$(grep "Execute DDL succeeded" ${WORK_DIR}/cdc.log|tail -n 1|grep -oE '"CommitTs\\":[0-9]{18}'|awk -F: '{print $(NF)}')
cdc cli changefeed remove --changefeed-id=${changefeedid}
changefeedid=$(cdc cli changefeed create --no-confirm --start-ts=${ddl_start_ts} --sink-uri="$SINK_URI" 2>&1|tail -n2|head -n1|awk '{print $2}')
echo "create new changefeed ${changefeedid} from ${ddl_start_ts}"
changefeedid=$(cdc cli changefeed create --no-confirm --start-ts=${ddl_finished_ts} --sink-uri="$SINK_URI" 2>&1|tail -n2|head -n1|awk '{print $2}')
echo "create new changefeed ${changefeedid} from ${ddl_finished_ts}"
ensure 10 check_ts_forward $changefeedid
ensure 10 check_ddl_executed "${WORK_DIR}/cdc.log" "${WORK_DIR}/ddl_temp.sql" $is_reentrant
}
Expand Down

0 comments on commit 692c242

Please sign in to comment.