Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

owner: fix schema snapshot read from TiKV #2604

Merged
merged 6 commits into from
Aug 23, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 6 additions & 4 deletions cdc/owner/changefeed.go
Original file line number Diff line number Diff line change
Expand Up @@ -207,7 +207,11 @@ LOOP:
c.barriers.Update(ddlJobBarrier, checkpointTs)
c.barriers.Update(finishBarrier, c.state.Info.GetTargetTs())
var err error
c.schema, err = newSchemaWrap4Owner(ctx.GlobalVars().KVStorage, checkpointTs, c.state.Info.Config)
// Note that (checkpointTs == ddl.FinishedTs) DOES NOT imply that the DDL has been completed executed.
// So we need to process all DDLs from the range [checkpointTs, ...), but since the semantics of start-ts requires
// the lower bound of an open interval, i.e. (startTs, ...), we pass checkpointTs-1 as the start-ts to initialize
// the schema cache.
c.schema, err = newSchemaWrap4Owner(ctx.GlobalVars().KVStorage, checkpointTs-1, c.state.Info.Config)
if err != nil {
return errors.Trace(err)
}
Expand All @@ -221,9 +225,7 @@ LOOP:
if err != nil {
return errors.Trace(err)
}
// Since we wait for checkpoint == ddlJob.FinishTs before executing the DDL,
// when there is a recovery, there is no guarantee that the DDL at the checkpoint
// has been executed. So we need to start the DDL puller from (checkpoint-1).
// Refer to the previous comment on why we use (checkpointTs-1).
c.ddlPuller, err = c.newDDLPuller(cancelCtx, checkpointTs-1)
if err != nil {
return errors.Trace(err)
Expand Down
5 changes: 1 addition & 4 deletions cdc/owner/schema.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,10 +46,7 @@ func newSchemaWrap4Owner(kvStorage tidbkv.Storage, startTs model.Ts, config *con
return nil, errors.Trace(err)
}
}
// We do a snapshot read of the metadata from TiKV at (startTs-1) instead of startTs,
// because the DDL puller might send a DDL at startTs, which would cause schema conflicts if
// the DDL's result is already contained in the snapshot.
schemaSnap, err := entry.NewSingleSchemaSnapshotFromMeta(meta, startTs-1, config.ForceReplicate)
schemaSnap, err := entry.NewSingleSchemaSnapshotFromMeta(meta, startTs, config.ForceReplicate)
if err != nil {
return nil, errors.Trace(err)
}
Expand Down
6 changes: 3 additions & 3 deletions tests/ddl_reentrant/run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -103,10 +103,10 @@ function ddl_test() {

echo $ddl > ${WORK_DIR}/ddl_temp.sql
ensure 10 check_ddl_executed "${WORK_DIR}/cdc.log" "${WORK_DIR}/ddl_temp.sql" true
ddl_start_ts=$(grep "Execute DDL succeeded" ${WORK_DIR}/cdc.log|tail -n 1|grep -oE '"StartTs\\":[0-9]{18}'|awk -F: '{print $(NF)}')
ddl_finished_ts=$(grep "Execute DDL succeeded" ${WORK_DIR}/cdc.log|tail -n 1|grep -oE '"CommitTs\\":[0-9]{18}'|awk -F: '{print $(NF)}')
cdc cli changefeed remove --changefeed-id=${changefeedid}
changefeedid=$(cdc cli changefeed create --no-confirm --start-ts=${ddl_start_ts} --sink-uri="$SINK_URI" 2>&1|tail -n2|head -n1|awk '{print $2}')
echo "create new changefeed ${changefeedid} from ${ddl_start_ts}"
changefeedid=$(cdc cli changefeed create --no-confirm --start-ts=${ddl_finished_ts} --sink-uri="$SINK_URI" 2>&1|tail -n2|head -n1|awk '{print $2}')
echo "create new changefeed ${changefeedid} from ${ddl_finished_ts}"
ensure 10 check_ts_forward $changefeedid
ensure 10 check_ddl_executed "${WORK_DIR}/cdc.log" "${WORK_DIR}/ddl_temp.sql" $is_reentrant
}
Expand Down