Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

DM: support inject at dml when open GTID #4848

Closed
wants to merge 5 commits into from
Closed
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion dm/syncer/err-operator/operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -177,6 +177,8 @@ func (h *Holder) MatchAndApply(startLocation, endLocation binlog.Location, curre
h.mu.Lock()
defer h.mu.Unlock()

h.logger.Info("try to match and apply a operator", zap.Stringer("startlocation", startLocation), zap.Stringer("endlocation", endLocation), zap.Any("currentEvent", currentEvent))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
h.logger.Info("try to match and apply a operator", zap.Stringer("startlocation", startLocation), zap.Stringer("endlocation", endLocation), zap.Any("currentEvent", currentEvent))
h.logger.Info("try to match and apply an operator", zap.Stringer("startlocation", startLocation), zap.Stringer("endlocation", endLocation), zap.Any("currentEvent", currentEvent))


key := startLocation.Position.String()
operator, ok := h.operators[key]
if !ok {
Expand Down Expand Up @@ -215,7 +217,7 @@ func (h *Holder) MatchAndApply(startLocation, endLocation binlog.Location, curre
}
}

h.logger.Info("match and apply a operator", zap.Stringer("startlocation", startLocation), zap.Stringer("endlocation", endLocation), zap.Stringer("operator", operator))
h.logger.Info("match and apply a operator", zap.Stringer("startlocation", startLocation), zap.Stringer("endlocation", endLocation), zap.Any("currentEvent", currentEvent), zap.Stringer("operator", operator))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
h.logger.Info("match and apply a operator", zap.Stringer("startlocation", startLocation), zap.Stringer("endlocation", endLocation), zap.Any("currentEvent", currentEvent), zap.Stringer("operator", operator))
h.logger.Info("match and apply an operator", zap.Stringer("startlocation", startLocation), zap.Stringer("endlocation", endLocation), zap.Any("currentEvent", currentEvent), zap.Stringer("operator", operator))


return true, operator.op
}
Expand Down
26 changes: 11 additions & 15 deletions dm/syncer/syncer.go
Original file line number Diff line number Diff line change
Expand Up @@ -1783,9 +1783,12 @@ func (s *Syncer) Run(ctx context.Context) (err error) {

currentLocation = nextLocation
lastLocation = nextLocation
s.tctx.L().Info("shardingReSync not allResolved", zap.Stringer("currentLocation", currentLocation), zap.Stringer("lastLocation", lastLocation))
} else {
currentLocation = savedGlobalLastLocation
lastLocation = savedGlobalLastLocation // restore global last pos
s.tctx.L().Info("shardingReSync allResolved",
zap.Stringer("currentLocation", currentLocation), zap.Stringer("lastLocation", lastLocation))
}
// if suffix>0, we are replacing error
s.isReplacingOrInjectingErr = currentLocation.Suffix != 0
Expand Down Expand Up @@ -1978,27 +1981,27 @@ func (s *Syncer) Run(ctx context.Context) (err error) {
startLocation = binlog.Location{}
switch ev := e.Event.(type) {
case *replication.QueryEvent, *replication.RowsEvent:
startSuffix := currentLocation.Suffix
startLocation = binlog.InitLocation(
mysql.Position{
Name: lastLocation.Position.Name,
Pos: e.Header.LogPos - e.Header.EventSize,
},
lastLocation.GetGTID(),
)
startLocation.Suffix = currentLocation.Suffix

endSuffix := startLocation.Suffix
if s.isReplacingOrInjectingErr {
endSuffix++
}
currentLocation = binlog.InitLocation(
mysql.Position{
Name: lastLocation.Position.Name,
Pos: e.Header.LogPos,
},
lastLocation.GetGTID(),
)
currentLocation.Suffix = endSuffix

if s.isReplacingOrInjectingErr {
startLocation.Suffix = startSuffix
currentLocation.Suffix = startSuffix + 1
}

if queryEvent, ok := ev.(*replication.QueryEvent); ok {
err = currentLocation.SetGTID(queryEvent.GSet)
Expand Down Expand Up @@ -2049,12 +2052,13 @@ func (s *Syncer) Run(ctx context.Context) (err error) {
continue
}
}
s.tctx.L().Info("now info!!!", zap.Stringer("startlocation", startLocation), zap.Stringer("currentLocation", currentLocation), zap.Any("Event", e))
// set endLocation.Suffix=0 of last replace or inject event
if currentLocation.Suffix > 0 && e.Header.EventSize > 0 {
currentLocation.Suffix = 0
s.isReplacingOrInjectingErr = false
s.locations.reset(currentLocation)
if !s.errOperatorHolder.IsInject(startLocation) {
currentLocation.Suffix = 0
// replace operator need redirect to currentLocation
if err = s.streamerController.RedirectStreamer(s.runCtx, currentLocation); err != nil {
return err
Expand Down Expand Up @@ -2321,14 +2325,6 @@ func (s *Syncer) handleRowsEvent(ev *replication.RowsEvent, ec eventContext) err
}
targetTable := s.route(sourceTable)

*ec.currentLocation = binlog.InitLocation(
mysql.Position{
Name: ec.lastLocation.Position.Name,
Pos: ec.header.LogPos,
},
ec.lastLocation.GetGTID(),
)

if ec.shardingReSync != nil {
ec.shardingReSync.currLocation = *ec.currentLocation
if binlog.CompareLocation(ec.shardingReSync.currLocation, ec.shardingReSync.latestLocation, s.cfg.EnableGTID) >= 0 {
Expand Down
6 changes: 3 additions & 3 deletions dm/tests/_utils/test_prepare
Original file line number Diff line number Diff line change
Expand Up @@ -164,9 +164,9 @@ function run_case() {

eval ${init_table_cmd}

truncate -s 0 $WORK_DIR/master/log/dm-master.log
truncate -s 0 $WORK_DIR/worker1/log/dm-worker.log
truncate -s 0 $WORK_DIR/worker2/log/dm-worker.log
# truncate -s 0 $WORK_DIR/master/log/dm-master.log
# truncate -s 0 $WORK_DIR/worker1/log/dm-worker.log
# truncate -s 0 $WORK_DIR/worker2/log/dm-worker.log

run_dm_ctl $WORK_DIR "127.0.0.1:$MASTER_PORT" \
"start-task $cur/conf/${task_conf}.yaml --remove-meta"
Expand Down
50 changes: 50 additions & 0 deletions dm/tests/handle_error/conf/single-source-no-sharding2.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
---
name: test
task-mode: all
is-sharding: false
meta-schema: "dm_meta"

target-database:
host: "127.0.0.1"
port: 4000
user: "test"
password: "/Q7B9DizNLLTTfiZHv9WoEAKamfpIUs="

mysql-instances:
- source-id: "mysql-replica-02"
block-allow-list: "instance"
route-rules: ["sharding-table-rules","sharding-schema-rules"]
mydumper-config-name: "global"
loader-config-name: "global"
syncer-config-name: "global"

block-allow-list:
instance:
do-dbs: ["handle_error"]

routes:
sharding-table-rules:
schema-pattern: "handle_error*"
target-schema: "handle_error"
table-pattern: "tb*"
target-table: "tb"
sharding-schema-rules:
schema-pattern: "handle_error*"
target-schema: "handle_error"

mydumpers:
global:
threads: 4
chunk-filesize: 64
skip-tz-utc: true
extra-args: ""

loaders:
global:
pool-size: 16
dir: "./dumped_data"

syncers:
global:
worker-count: 16
batch: 100
12 changes: 6 additions & 6 deletions dm/tests/handle_error/run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -370,17 +370,17 @@ function DM_INJECT_DML_ERROR_CASE() {

function DM_INJECT_ERROR() {
# inject at ddl
run_case INJECT_DDL_ERROR "single-source-no-sharding" \
run_case INJECT_DDL_ERROR "single-source-no-sharding2" \
"run_sql_source1 \"create table ${db}.${tb1} (a int unique, b int);\"" \
"clean_table" ""
# inject at dml
run_case INJECT_DML_ERROR "single-source-no-sharding" \
run_case INJECT_DML_ERROR "single-source-no-sharding2" \
"run_sql_source1 \"create table ${db}.${tb1} (a int unique, b varchar(10));\"" \
"clean_table" ""
}

function DM_LIST_ERROR_CASE() {
run_sql_source1 "insert into ${db}.${tb1} values(11);"
run_sql_source1 "insert into ${db}.${tb1} values(1, 1);"
run_sql_source1 "alter table ${db}.${tb1} add column c int default 100; alter table ${db}.${tb1} add primary key (c)"
run_sql_source1 "alter table ${db}.${tb1} modify c varchar(10);"
run_sql_source1 "alter table ${db}.${tb1} modify c varchar(20);"
Expand All @@ -391,7 +391,7 @@ function DM_LIST_ERROR_CASE() {
"Unsupported modify column: this column has primary key flag" 1

run_dm_ctl $WORK_DIR "127.0.0.1:$MASTER_PORT" \
"binlog test" \
"binlog list test" \
"\"msg\": \"\[\]\"" 1

first_pos1=$(get_start_pos 127.0.0.1:$MASTER_PORT $source1)
Expand Down Expand Up @@ -427,13 +427,13 @@ function DM_LIST_ERROR_CASE() {
"binlog skip test" \
"\"result\": true" 2

run_sql_source1 "insert into ${db}.${tb1} values(1,1,2.2);"
run_sql_source1 "insert into ${db}.${tb1} values(2,1,2.2);"

run_sql_tidb_with_retry "select count(1) from ${db}.${tb} where c = 2.2;" "count(1): 1"
}

function DM_LIST_ERROR() {
run_case INJECT_DDL_ERROR "single-source-no-sharding" \
run_case LIST_ERROR "single-source-no-sharding" \
"run_sql_source1 \"create table ${db}.${tb1} (a int unique, b int);\"" \
"clean_table" ""
}
Expand Down