Skip to content
This repository has been archived by the owner on Nov 24, 2023. It is now read-only.

Commit

Permalink
*: read downstream table before CREATE TABLE to fix IF NOT EXISTS (#1915
Browse files Browse the repository at this point in the history
)
  • Loading branch information
lance6716 authored Jul 26, 2021
1 parent 4bea97f commit 04ca3a4
Show file tree
Hide file tree
Showing 5 changed files with 26 additions and 15 deletions.
1 change: 1 addition & 0 deletions loader/loader.go
Original file line number Diff line number Diff line change
Expand Up @@ -1529,6 +1529,7 @@ func (l *Loader) cleanDumpFiles() {
var lastErr error
for f := range files {
if strings.HasSuffix(f, ".sql") {
// TODO: table structure files are not used now, but we plan to used them in future so not delete them
if strings.HasSuffix(f, "-schema-create.sql") || strings.HasSuffix(f, "-schema.sql") {
continue
}
Expand Down
18 changes: 11 additions & 7 deletions syncer/syncer.go
Original file line number Diff line number Diff line change
Expand Up @@ -1389,11 +1389,7 @@ func (s *Syncer) Run(ctx context.Context) (err error) {
if s.cfg.Mode == config.ModeAll && fresh {
delLoadTask = true
flushCheckpoint = true
err = s.loadTableStructureFromDump(ctx)
if err != nil {
tctx.L().Warn("error happened when load table structure from dump files", zap.Error(err))
cleanDumpFile = false
}
// TODO: loadTableStructureFromDump in future
} else {
cleanDumpFile = false
}
Expand Down Expand Up @@ -2519,8 +2515,9 @@ func (s *Syncer) trackDDL(usedSchema string, sql string, tableNames [][]*filter.
var (
shouldExecDDLOnSchemaTracker bool
shouldSchemaExist bool
shouldTableExistNum int // tableNames[:shouldTableExistNum] should exist
shouldRefTableExistNum int // tableNames[1:shouldTableExistNum] should exist, since first one is "caller table"
shouldTableExistNum int // tableNames[:shouldTableExistNum] should exist
shouldRefTableExistNum int // tableNames[1:shouldTableExistNum] should exist, since first one is "caller table"
tryFetchDownstreamTable bool // to make sure if not exists will execute correctly
)

switch node := stmt.(type) {
Expand All @@ -2544,6 +2541,7 @@ func (s *Syncer) trackDDL(usedSchema string, sql string, tableNames [][]*filter.
shouldSchemaExist = true
// for CREATE TABLE LIKE/AS, the reference tables should exist
shouldRefTableExistNum = len(srcTables)
tryFetchDownstreamTable = true
case *ast.DropTableStmt:
shouldExecDDLOnSchemaTracker = true
if err := s.checkpoint.DeleteTablePoint(ec.tctx, srcTable.Schema, srcTable.Name); err != nil {
Expand Down Expand Up @@ -2598,6 +2596,11 @@ func (s *Syncer) trackDDL(usedSchema string, sql string, tableNames [][]*filter.
}
}

if tryFetchDownstreamTable {
// ignore table not exists error, just try to fetch table from downstream.
_, _ = s.getTable(ec.tctx, srcTables[0].Schema, srcTables[0].Name, targetTables[0].Schema, targetTables[0].Name)
}

if shouldExecDDLOnSchemaTracker {
if err := s.schemaTracker.Exec(ec.tctx.Ctx, usedSchema, sql); err != nil {
ec.tctx.L().Error("cannot track DDL", zap.String("schema", usedSchema), zap.String("statement", sql), log.WrapStringerField("location", ec.currentLocation), log.ShortError(err))
Expand Down Expand Up @@ -2658,6 +2661,7 @@ func (s *Syncer) genRouter() error {
return nil
}

//nolint:unused
func (s *Syncer) loadTableStructureFromDump(ctx context.Context) error {
logger := s.tctx.L()

Expand Down
11 changes: 10 additions & 1 deletion tests/downstream_more_column/run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,15 @@ function run() {
run_sql_tidb_with_retry "select count(1) from ${db}.${tb} where c1<100;" "count(1): 2"

run_sql_file $cur/data/db1.increment.sql $MYSQL_HOST1 $MYSQL_PORT1 $MYSQL_PASSWORD1
run_dm_ctl_with_retry $WORK_DIR "127.0.0.1:$MASTER_PORT" \
"query-status test" \
"Column count doesn't match value count" 1
run_dm_ctl $WORK_DIR "127.0.0.1:$MASTER_PORT" \
"operate-schema set -s mysql-replica-01 test -d ${db} -t ${tb} $cur/data/schema.sql" \
"\"result\": true" 2

run_dm_ctl $WORK_DIR "127.0.0.1:$MASTER_PORT" \
"resume-task test"
# check incremental data
run_sql_tidb_with_retry "select count(1) from ${db}.${tb} where c1>100 and c1<1000;" "count(1): 2"

Expand All @@ -47,7 +56,7 @@ function run() {
"\"result\": true" 2

run_sql_file $cur/data/db1.increment2.sql $MYSQL_HOST1 $MYSQL_PORT1 $MYSQL_PASSWORD1
# coulmn count doesn't match
# Column count doesn't match value count
run_dm_ctl_with_retry $WORK_DIR "127.0.0.1:$MASTER_PORT" \
"query-status test" \
"Column count doesn't match value count" 1
Expand Down
5 changes: 1 addition & 4 deletions tests/shardddl1/run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,7 @@ function DM_001_CASE() {
# schema tracker could track per table without error
run_dm_ctl_with_retry $WORK_DIR "127.0.0.1:$MASTER_PORT" \
"query-status test" \
"\"result\": true" 2 \
"\"synced\": true" 1
# only downstream sees a duplicate error, but currently ignored by DM
check_log_contain_with_retry "Duplicate column name 'new_col1'" $WORK_DIR/worker1/log/dm-worker.log $WORK_DIR/worker2/log/dm-worker.log
"Duplicate column name 'new_col1'" 1
}

function DM_001() {
Expand Down
6 changes: 3 additions & 3 deletions tests/shardddl1_1/run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -252,11 +252,11 @@ function DM_027_CASE() {
run_sql_source1 "insert into ${shardddl1}.${tb2} values (4)"
run_sql_source1 "insert into ${shardddl1}.${tb3} values (5,6)"
# we now haven't checked table struct when create sharding table
# there should be a error message like "Unknown column 'val' in 'field list'", "unknown column val"
# but different TiDB version output different message. so we only roughly match here
# and we'll attach IF NOT EXISTS to every CREATE TABLE and fetch downstream table first, so downstream table strcuture
# is in use indeed. This leads to the error below.
run_dm_ctl_with_retry $WORK_DIR "127.0.0.1:$MASTER_PORT" \
"query-status test" \
"nknown column" 1 # ignore case for first letter
"Column count doesn't match value count: 1 (columns) vs 2 (values)" 1
}

function DM_027() {
Expand Down

0 comments on commit 04ca3a4

Please sign in to comment.