Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ticdc: Support create table ddl only appear in tidb_ddl_history instead of tidb_ddl_job table #10907

Merged
merged 18 commits into from
May 7, 2024
Merged
88 changes: 76 additions & 12 deletions cdc/entry/mounter.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
cerror "github.com/pingcap/tiflow/pkg/errors"
pfilter "github.com/pingcap/tiflow/pkg/filter"
"github.com/pingcap/tiflow/pkg/integrity"
"github.com/pingcap/tiflow/pkg/spanz"
"github.com/prometheus/client_golang/prometheus"
"go.uber.org/zap"
)
Expand All @@ -64,6 +65,19 @@
PreRowExist bool
}

// DDLTableInfo contains the tableInfo about tidb_ddl_job and tidb_ddl_history
// and the column id of `job_meta` in these two tables.
type DDLTableInfo struct {
// ddlJobsTable use to parse all ddl jobs except `create table`
DDLJobTable *model.TableInfo
// It holds the column id of `job_meta` in table `tidb_ddl_jobs`.
JobMetaColumnIDinJobTable int64
// ddlHistoryTable only use to parse `create table` ddl job
DDLHistoryTable *model.TableInfo
// It holds the column id of `job_meta` in table `tidb_ddl_history`.
JobMetaColumnIDinHistoryTable int64
}

// Mounter is used to parse SQL events from KV events
type Mounter interface {
// DecodeEvent accepts `model.PolymorphicEvent` with `RawKVEntry` filled and
Expand Down Expand Up @@ -298,39 +312,89 @@
return bytes.HasPrefix(rawKV.Key, metaPrefix)
}

// ParseDDLJob parses the job from the raw KV entry. id is the column id of `job_meta`.
func ParseDDLJob(tblInfo *model.TableInfo, rawKV *model.RawKVEntry, id int64) (*timodel.Job, error) {
// ParseDDLJob parses the job from the raw KV entry.
func ParseDDLJob(rawKV *model.RawKVEntry, ddlTableInfo *DDLTableInfo) (*timodel.Job, error) {

Check warning on line 316 in cdc/entry/mounter.go

View check run for this annotation

Codecov / codecov/patch

cdc/entry/mounter.go#L316

Added line #L316 was not covered by tests
var v []byte
var datum types.Datum

// for test case only

Check warning on line 320 in cdc/entry/mounter.go

View check run for this annotation

Codecov / codecov/patch

cdc/entry/mounter.go#L318-L320

Added lines #L318 - L320 were not covered by tests
if bytes.HasPrefix(rawKV.Key, metaPrefix) {
// old queue base job.
v = rawKV.Value
} else {
// DDL job comes from `tidb_ddl_job` table after we support concurrent DDL. We should decode the job from the column.
recordID, err := tablecodec.DecodeRowKey(rawKV.Key)
job, err := parseJob(v, rawKV.StartTs, rawKV.CRTs, false)
if err != nil || job == nil {
job, err = parseJob(v, rawKV.StartTs, rawKV.CRTs, true)
}
return job, err

Check warning on line 327 in cdc/entry/mounter.go

View check run for this annotation

Codecov / codecov/patch

cdc/entry/mounter.go#L323-L327

Added lines #L323 - L327 were not covered by tests
}

recordID, err := tablecodec.DecodeRowKey(rawKV.Key)
if err != nil {
return nil, errors.Trace(err)
}

Check warning on line 333 in cdc/entry/mounter.go

View check run for this annotation

Codecov / codecov/patch

cdc/entry/mounter.go#L330-L333

Added lines #L330 - L333 were not covered by tests

tableID := tablecodec.DecodeTableID(rawKV.Key)

// parse it with tidb_ddl_job
if tableID == spanz.JobTableID {
row, err := decodeRow(rawKV.Value, recordID, ddlTableInfo.DDLJobTable, time.UTC)

Check warning on line 339 in cdc/entry/mounter.go

View check run for this annotation

Codecov / codecov/patch

cdc/entry/mounter.go#L335-L339

Added lines #L335 - L339 were not covered by tests
if err != nil {
return nil, errors.Trace(err)
}
row, err := decodeRow(rawKV.Value, recordID, tblInfo, time.UTC)
datum = row[ddlTableInfo.JobMetaColumnIDinJobTable]
v = datum.GetBytes()

return parseJob(v, rawKV.StartTs, rawKV.CRTs, false)
} else if tableID == spanz.JobHistoryID {
// parse it with tidb_ddl_history
row, err := decodeRow(rawKV.Value, recordID, ddlTableInfo.DDLHistoryTable, time.UTC)

Check warning on line 349 in cdc/entry/mounter.go

View check run for this annotation

Codecov / codecov/patch

cdc/entry/mounter.go#L343-L349

Added lines #L343 - L349 were not covered by tests
if err != nil {
return nil, errors.Trace(err)
}
datum := row[id]
datum = row[ddlTableInfo.JobMetaColumnIDinHistoryTable]

Check warning on line 353 in cdc/entry/mounter.go

View check run for this annotation

Codecov / codecov/patch

cdc/entry/mounter.go#L353

Added line #L353 was not covered by tests
v = datum.GetBytes()

return parseJob(v, rawKV.StartTs, rawKV.CRTs, true)

Check warning on line 356 in cdc/entry/mounter.go

View check run for this annotation

Codecov / codecov/patch

cdc/entry/mounter.go#L355-L356

Added lines #L355 - L356 were not covered by tests
}

return parseJob(v, rawKV.StartTs, rawKV.CRTs)
return nil, fmt.Errorf("Unvalid tableID %v in rawKV.Key", tableID)

Check warning on line 359 in cdc/entry/mounter.go

View check run for this annotation

Codecov / codecov/patch

cdc/entry/mounter.go#L359

Added line #L359 was not covered by tests
}

// parseJob unmarshal the job from "v".
func parseJob(v []byte, startTs, CRTs uint64) (*timodel.Job, error) {
// fromHistoryTable is used to distinguish the job is from tidb_dd_job or tidb_ddl_history
// We need to be compatible with the two modes, enable_fast_create_table=on and enable_fast_create_table=off
// When enable_fast_create_table=on, `create table` will only be inserted into tidb_ddl_history after being executed successfully.
// When enable_fast_create_table=off, `create table` just like other ddls will be firstly inserted to tidb_ddl_job,
// and being inserted into tidb_ddl_history after being executed successfully.
// In both two modes, other ddls are all firstly inserted into tidb_ddl_job, and then inserted into tidb_ddl_history after being executed successfully.
//
// To be compatible with these two modes, we will get `create table` ddl from tidb_ddl_history, and all ddls from tidb_ddl_job.
// When enable_fast_create_table=off, for each `create table` ddl we will get twice(once from tidb_ddl_history, once from tidb_ddl_job)
// Because in `handleJob` we will skip the repeated ddls, thus it's ok for us to get `create table` twice.
// Besides, the `create table` from tidb_ddl_job always have a earlier commitTs than from tidb_ddl_history.
// Therefore, we always use the commitTs of ddl from `tidb_ddl_job` as StartTs, which ensures we can get all the dmls.
func parseJob(v []byte, startTs, CRTs uint64, fromHistoryTable bool) (*timodel.Job, error) {

Check warning on line 375 in cdc/entry/mounter.go

View check run for this annotation

Codecov / codecov/patch

cdc/entry/mounter.go#L375

Added line #L375 was not covered by tests
var job timodel.Job
err := json.Unmarshal(v, &job)
if err != nil {
return nil, errors.Trace(err)
}
if !job.IsDone() {
return nil, nil

if fromHistoryTable {
// we only want to get `create table` ddl from tidb_ddl_history, so we just throw out others ddls.
// We only want the job with `JobStateSynced`, which is means the ddl job is done successfully.
// Besides, to satisfy the subsequent processing,
// We need to set the job to be Done to make it will replay in schemaStorage
if job.Type != timodel.ActionCreateTable || job.State != timodel.JobStateSynced {
return nil, nil
}
job.State = timodel.JobStateDone
} else {
// we need to get all ddl job which is done from tidb_ddl_job
if !job.IsDone() {
return nil, nil
}

Check warning on line 395 in cdc/entry/mounter.go

View check run for this annotation

Codecov / codecov/patch

cdc/entry/mounter.go#L382-L395

Added lines #L382 - L395 were not covered by tests
}

// FinishedTS is only set when the job is synced,
// but we can use the entry's ts here
job.StartTS = startTs
Expand Down
38 changes: 27 additions & 11 deletions cdc/puller/ddl_puller.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,11 +75,8 @@
resolvedTs uint64
schemaVersion int64
filter filter.Filter
// ddlJobsTable is initialized when receive the first concurrent DDL job.
// It holds the info of table `tidb_ddl_jobs` of upstream TiDB.
ddlJobsTable *model.TableInfo
// It holds the column id of `job_meta` in table `tidb_ddl_jobs`.
jobMetaColumnID int64
// ddlTableInfo is initialized when receive the first concurrent DDL job.
ddlTableInfo *entry.DDLTableInfo
// outputCh sends the DDL job entries to the caller.
outputCh chan *model.DDLJobEntry
}
Expand Down Expand Up @@ -239,13 +236,14 @@
if rawKV.OpType != model.OpTypePut {
return nil, nil
}
if p.ddlJobsTable == nil && !entry.IsLegacyFormatJob(rawKV) {
err := p.initJobTableMeta()
if p.ddlTableInfo == nil && !entry.IsLegacyFormatJob(rawKV) {
err := p.initDDLTableInfo()

Check warning on line 240 in cdc/puller/ddl_puller.go

View check run for this annotation

Codecov / codecov/patch

cdc/puller/ddl_puller.go#L240

Added line #L240 was not covered by tests
if err != nil {
return nil, errors.Trace(err)
}
}
return entry.ParseDDLJob(p.ddlJobsTable, rawKV, p.jobMetaColumnID)

return entry.ParseDDLJob(rawKV, p.ddlTableInfo)
}

func (p *ddlJobPullerImpl) getResolvedTs() uint64 {
Expand All @@ -256,7 +254,7 @@
atomic.StoreUint64(&p.resolvedTs, ts)
}

func (p *ddlJobPullerImpl) initJobTableMeta() error {
func (p *ddlJobPullerImpl) initDDLTableInfo() error {

Check warning on line 257 in cdc/puller/ddl_puller.go

View check run for this annotation

Codecov / codecov/patch

cdc/puller/ddl_puller.go#L257

Added line #L257 was not covered by tests
version, err := p.kvStorage.CurrentVersion(tidbkv.GlobalTxnScope)
if err != nil {
return errors.Trace(err)
Expand All @@ -277,6 +275,8 @@
if err != nil {
return errors.Trace(err)
}

// for tidb_ddl_job
tableInfo, err := findTableByName(tbls, "tidb_ddl_job")
if err != nil {
return errors.Trace(err)
Expand All @@ -287,8 +287,24 @@
return errors.Trace(err)
}

p.ddlJobsTable = model.WrapTableInfo(db.ID, db.Name.L, 0, tableInfo)
p.jobMetaColumnID = col.ID
p.ddlTableInfo = &entry.DDLTableInfo{}
p.ddlTableInfo.DDLJobTable = model.WrapTableInfo(db.ID, db.Name.L, 0, tableInfo)
p.ddlTableInfo.JobMetaColumnIDinJobTable = col.ID

// for tidb_ddl_history
historyTableInfo, err := findTableByName(tbls, "tidb_ddl_history")
if err != nil {
return errors.Trace(err)
}

Check warning on line 298 in cdc/puller/ddl_puller.go

View check run for this annotation

Codecov / codecov/patch

cdc/puller/ddl_puller.go#L290-L298

Added lines #L290 - L298 were not covered by tests

historyTableCol, err := findColumnByName(historyTableInfo.Columns, "job_meta")
if err != nil {
return errors.Trace(err)
}

Check warning on line 303 in cdc/puller/ddl_puller.go

View check run for this annotation

Codecov / codecov/patch

cdc/puller/ddl_puller.go#L300-L303

Added lines #L300 - L303 were not covered by tests

p.ddlTableInfo.DDLHistoryTable = model.WrapTableInfo(db.ID, db.Name.L, 0, historyTableInfo)
p.ddlTableInfo.JobMetaColumnIDinHistoryTable = historyTableCol.ID

Check warning on line 307 in cdc/puller/ddl_puller.go

View check run for this annotation

Codecov / codecov/patch

cdc/puller/ddl_puller.go#L305-L307

Added lines #L305 - L307 were not covered by tests
return nil
}

Expand Down
9 changes: 8 additions & 1 deletion pkg/spanz/span.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@
const (
// JobTableID is the id of `tidb_ddl_job`.
JobTableID = ddl.JobTableID
// JobHistoryID is the id of `tidb_ddl_history`
JobHistoryID = ddl.HistoryTableID
)

// UpperBoundKey represents the maximum value.
Expand Down Expand Up @@ -62,12 +64,17 @@

// GetAllDDLSpan return all cdc interested spans for DDL.
func GetAllDDLSpan() []tablepb.Span {
spans := make([]tablepb.Span, 0, 1)
spans := make([]tablepb.Span, 0, 2)

Check warning on line 67 in pkg/spanz/span.go

View check run for this annotation

Codecov / codecov/patch

pkg/spanz/span.go#L67

Added line #L67 was not covered by tests
start, end := GetTableRange(JobTableID)
spans = append(spans, tablepb.Span{
StartKey: ToComparableKey(start),
EndKey: ToComparableKey(end),
})
start, end = GetTableRange(JobHistoryID)
spans = append(spans, tablepb.Span{
StartKey: ToComparableKey(start),
EndKey: ToComparableKey(end),
})

Check warning on line 77 in pkg/spanz/span.go

View check run for this annotation

Codecov / codecov/patch

pkg/spanz/span.go#L73-L77

Added lines #L73 - L77 were not covered by tests
return spans
}

Expand Down
47 changes: 45 additions & 2 deletions tests/integration_tests/batch_add_table/run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,54 @@ WORK_DIR=$OUT_DIR/$TEST_NAME
CDC_BINARY=cdc.test
SINK_TYPE=$1

function run() {
function run_with_fast_create_table() {
rm -rf $WORK_DIR && mkdir -p $WORK_DIR

start_tidb_cluster --workdir $WORK_DIR

cd $WORK_DIR

run_sql "set global tidb_enable_fast_create_table=on" ${UP_TIDB_HOST} ${UP_TIDB_PORT}

run_sql_file $CUR/data/prepare.sql ${UP_TIDB_HOST} ${UP_TIDB_PORT}
run_sql_file $CUR/data/prepare.sql ${DOWN_TIDB_HOST} ${DOWN_TIDB_PORT}

run_cdc_server --workdir $WORK_DIR --binary $CDC_BINARY

TOPIC_NAME="ticdc-batch-add-table-test-$RANDOM"
case $SINK_TYPE in
kafka) SINK_URI="kafka://127.0.0.1:9092/$TOPIC_NAME?protocol=open-protocol&partition-num=4&kafka-version=${KAFKA_VERSION}&max-message-bytes=10485760" ;;
storage) SINK_URI="file://$WORK_DIR/storage_test/$TOPIC_NAME?protocol=canal-json&enable-tidb-extension=true" ;;
pulsar)
run_pulsar_cluster $WORK_DIR normal
SINK_URI="pulsar://127.0.0.1:6650/$TOPIC_NAME?protocol=canal-json&enable-tidb-extension=true"
;;
*) SINK_URI="mysql://normal:[email protected]:3306/" ;;
esac
run_cdc_cli changefeed create --sink-uri="$SINK_URI"
case $SINK_TYPE in
kafka) run_kafka_consumer $WORK_DIR "kafka://127.0.0.1:9092/$TOPIC_NAME?protocol=open-protocol&partition-num=4&version=${KAFKA_VERSION}&max-message-bytes=10485760" ;;
storage) run_storage_consumer $WORK_DIR $SINK_URI "" "" ;;
pulsar) run_pulsar_consumer --upstream-uri $SINK_URI ;;
esac

run_sql_file $CUR/data/test.sql ${UP_TIDB_HOST} ${UP_TIDB_PORT}
# sync_diff can't check non-exist table, so we check expected tables are created in downstream first
check_table_exists batch_add_table.finish_mark ${DOWN_TIDB_HOST} ${DOWN_TIDB_PORT}
check_sync_diff $WORK_DIR $CUR/conf/diff_config.toml

cleanup_process $CDC_BINARY
}

function run_without_fast_create_table() {
rm -rf $WORK_DIR && mkdir -p $WORK_DIR

start_tidb_cluster --workdir $WORK_DIR

cd $WORK_DIR

run_sql "set global tidb_enable_fast_create_table=off" ${UP_TIDB_HOST} ${UP_TIDB_PORT}

run_sql_file $CUR/data/prepare.sql ${UP_TIDB_HOST} ${UP_TIDB_PORT}
run_sql_file $CUR/data/prepare.sql ${DOWN_TIDB_HOST} ${DOWN_TIDB_PORT}

Expand Down Expand Up @@ -46,6 +87,8 @@ function run() {
}

trap stop_tidb_cluster EXIT
run $*
run_without_fast_create_table $*
stop_tidb_cluster
run_with_fast_create_table $*
check_logs $WORK_DIR
echo "[$(date)] <<<<<< run test case $TEST_NAME success! >>>>>>"
25 changes: 25 additions & 0 deletions tests/integration_tests/multi_source/run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -41,12 +41,14 @@ function prepare() {

trap stop_tidb_cluster EXIT
# storage is not supported yet.
# test without fast create table
if [ "$SINK_TYPE" != "storage" ]; then
# TODO(dongmen): enable pulsar in the future.
if [ "$SINK_TYPE" == "pulsar" ]; then
exit 0
fi
prepare $*
run_sql "set global tidb_enable_fast_create_table=off" ${UP_TIDB_HOST} ${UP_TIDB_PORT}
cd "$(dirname "$0")"
set -o pipefail
GO111MODULE=on go run main.go -config ./config.toml 2>&1 | tee $WORK_DIR/tester.log
Expand All @@ -55,6 +57,29 @@ if [ "$SINK_TYPE" != "storage" ]; then
check_table_exists mark.finish_mark_2 ${DOWN_TIDB_HOST} ${DOWN_TIDB_PORT} 300
check_table_exists mark.finish_mark_3 ${DOWN_TIDB_HOST} ${DOWN_TIDB_PORT} 300
check_table_exists mark.finish_mark_4 ${DOWN_TIDB_HOST} ${DOWN_TIDB_PORT} 300
check_table_exists mark.finish_mark_5 ${DOWN_TIDB_HOST} ${DOWN_TIDB_PORT} 300
check_table_exists mark.finish_mark ${DOWN_TIDB_HOST} ${DOWN_TIDB_PORT} 300
check_sync_diff $WORK_DIR $CUR/diff_config.toml
cleanup_process $CDC_BINARY
check_logs $WORK_DIR
fi
# test with fast create table
if [ "$SINK_TYPE" != "storage" ]; then
# TODO(dongmen): enable pulsar in the future.
if [ "$SINK_TYPE" == "pulsar" ]; then
exit 0
fi
prepare $*
run_sql "set global tidb_enable_fast_create_table=on" ${UP_TIDB_HOST} ${UP_TIDB_PORT}
cd "$(dirname "$0")"
set -o pipefail
GO111MODULE=on go run main.go -config ./config.toml 2>&1 | tee $WORK_DIR/tester.log
check_table_exists mark.finish_mark_0 ${DOWN_TIDB_HOST} ${DOWN_TIDB_PORT} 300
check_table_exists mark.finish_mark_1 ${DOWN_TIDB_HOST} ${DOWN_TIDB_PORT} 300
check_table_exists mark.finish_mark_2 ${DOWN_TIDB_HOST} ${DOWN_TIDB_PORT} 300
check_table_exists mark.finish_mark_3 ${DOWN_TIDB_HOST} ${DOWN_TIDB_PORT} 300
check_table_exists mark.finish_mark_4 ${DOWN_TIDB_HOST} ${DOWN_TIDB_PORT} 300
check_table_exists mark.finish_mark_5 ${DOWN_TIDB_HOST} ${DOWN_TIDB_PORT} 300
check_table_exists mark.finish_mark ${DOWN_TIDB_HOST} ${DOWN_TIDB_PORT} 300
check_sync_diff $WORK_DIR $CUR/diff_config.toml
cleanup_process $CDC_BINARY
Expand Down
Loading