Skip to content

Commit

Permalink
syncer(dm): enhance error handling about add inject at dml event (#4259)
Browse files Browse the repository at this point in the history
close #4260
  • Loading branch information
WizardXiao authored Feb 8, 2022
1 parent c4f8055 commit 5459951
Show file tree
Hide file tree
Showing 5 changed files with 49 additions and 73 deletions.
47 changes: 17 additions & 30 deletions dm/syncer/err-operator/operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,11 +34,10 @@ import (
// Operator contains an operation for specified binlog pos
// used by `handle-error`.
type Operator struct {
uuid string // add a UUID, make it more friendly to be traced in log
op pb.ErrorOp
isAllInjected bool // is all injected, used by inject
events []*replication.BinlogEvent // startLocation -> events
originReq *pb.HandleWorkerErrorRequest
uuid string // add a UUID, make it more friendly to be traced in log
op pb.ErrorOp
events []*replication.BinlogEvent // ddls -> events
originReq *pb.HandleWorkerErrorRequest
}

// newOperator creates a new operator with a random UUID.
Expand Down Expand Up @@ -130,18 +129,6 @@ func (h *Holder) GetBehindCommands(pos string) []*pb.HandleWorkerErrorRequest {
return res
}

func (h *Holder) SetHasAllInjected(startLocation binlog.Location) {
h.mu.Lock()
defer h.mu.Unlock()

key := startLocation.Position.String()
operator, ok := h.operators[key]
if !ok {
return
}
operator.isAllInjected = true
}

func (h *Holder) IsInject(startLocation binlog.Location) bool {
h.mu.RLock()
defer h.mu.RUnlock()
Expand Down Expand Up @@ -186,7 +173,7 @@ func (h *Holder) GetEvent(startLocation binlog.Location) (*replication.BinlogEve
}

// MatchAndApply tries to match operation for event by location and apply it on replace events.
func (h *Holder) MatchAndApply(startLocation, endLocation binlog.Location, realEventHeaderTS uint32) (bool, pb.ErrorOp) {
func (h *Holder) MatchAndApply(startLocation, endLocation binlog.Location, currentEvent *replication.BinlogEvent) (bool, pb.ErrorOp) {
h.mu.Lock()
defer h.mu.Unlock()

Expand All @@ -196,10 +183,6 @@ func (h *Holder) MatchAndApply(startLocation, endLocation binlog.Location, realE
return false, pb.ErrorOp_InvalidErrorOp
}

if operator.isAllInjected {
return false, pb.ErrorOp_InvalidErrorOp
}

if operator.op == pb.ErrorOp_Replace || operator.op == pb.ErrorOp_Inject {
if len(operator.events) == 0 {
// this should not happen
Expand All @@ -209,22 +192,26 @@ func (h *Holder) MatchAndApply(startLocation, endLocation binlog.Location, realE
// set LogPos as start position
for _, ev := range operator.events {
ev.Header.LogPos = startLocation.Position.Pos
ev.Header.Timestamp = realEventHeaderTS
ev.Header.Timestamp = currentEvent.Header.Timestamp
if e, ok := ev.Event.(*replication.QueryEvent); ok {
if startLocation.GetGTID() != nil {
e.GSet = startLocation.GetGTID().Origin()
}
}
}

// set the last replace event as end position
e := operator.events[len(operator.events)-1]
e.Header.EventSize = endLocation.Position.Pos - startLocation.Position.Pos
e.Header.LogPos = endLocation.Position.Pos
if e, ok := e.Event.(*replication.QueryEvent); ok {
if endLocation.GetGTID() != nil {
e.GSet = endLocation.GetGTID().Origin()
if operator.op == pb.ErrorOp_Replace {
// set the last replace event as end position
e := operator.events[len(operator.events)-1]
e.Header.EventSize = endLocation.Position.Pos - startLocation.Position.Pos
e.Header.LogPos = endLocation.Position.Pos
if e, ok := e.Event.(*replication.QueryEvent); ok {
if endLocation.GetGTID() != nil {
e.GSet = endLocation.GetGTID().Origin()
}
}
} else if operator.op == pb.ErrorOp_Inject {
operator.events = append(operator.events, currentEvent)
}
}

Expand Down
26 changes: 8 additions & 18 deletions dm/syncer/err-operator/operator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,14 +86,14 @@ func (o *testOperatorSuite) TestOperator(c *C) {
// skip event
err = h.Set(&pb.HandleWorkerErrorRequest{Op: pb.ErrorOp_Skip, BinlogPos: startLocation.Position.String()}, nil)
c.Assert(err, IsNil)
apply, op := h.MatchAndApply(startLocation, endLocation, event1.Header.Timestamp)
apply, op := h.MatchAndApply(startLocation, endLocation, event1)
c.Assert(apply, IsTrue)
c.Assert(op, Equals, pb.ErrorOp_Skip)

// overwrite operator
err = h.Set(&pb.HandleWorkerErrorRequest{Op: pb.ErrorOp_Replace, BinlogPos: startLocation.Position.String()}, []*replication.BinlogEvent{event1, event2})
c.Assert(err, IsNil)
apply, op = h.MatchAndApply(startLocation, endLocation, event2.Header.Timestamp)
apply, op = h.MatchAndApply(startLocation, endLocation, event2)
c.Assert(apply, IsTrue)
c.Assert(op, Equals, pb.ErrorOp_Replace)

Expand Down Expand Up @@ -126,7 +126,7 @@ func (o *testOperatorSuite) TestOperator(c *C) {
// revert exist operator
err = h.Set(&pb.HandleWorkerErrorRequest{Op: pb.ErrorOp_Revert, BinlogPos: startLocation.Position.String()}, nil)
c.Assert(err, IsNil)
apply, op = h.MatchAndApply(startLocation, endLocation, event1.Header.Timestamp)
apply, op = h.MatchAndApply(startLocation, endLocation, event1)
c.Assert(apply, IsFalse)
c.Assert(op, Equals, pb.ErrorOp_InvalidErrorOp)

Expand All @@ -139,17 +139,17 @@ func (o *testOperatorSuite) TestOperator(c *C) {
// test removeOutdated
flushLocation := startLocation
c.Assert(h.RemoveOutdated(flushLocation), IsNil)
apply, op = h.MatchAndApply(startLocation, endLocation, event1.Header.Timestamp)
apply, op = h.MatchAndApply(startLocation, endLocation, event1)
c.Assert(apply, IsTrue)
c.Assert(op, Equals, pb.ErrorOp_Replace)

flushLocation = endLocation
c.Assert(h.RemoveOutdated(flushLocation), IsNil)
apply, op = h.MatchAndApply(startLocation, endLocation, event1.Header.Timestamp)
apply, op = h.MatchAndApply(startLocation, endLocation, event1)
c.Assert(apply, IsFalse)
c.Assert(op, Equals, pb.ErrorOp_InvalidErrorOp)

apply, op = h.MatchAndApply(endLocation, nextLocation, event1.Header.Timestamp)
apply, op = h.MatchAndApply(endLocation, nextLocation, event1)
c.Assert(apply, IsTrue)
c.Assert(op, Equals, pb.ErrorOp_Replace)
}
Expand Down Expand Up @@ -211,20 +211,10 @@ func (o *testOperatorSuite) TestInjectOperator(c *C) {
c.Assert(isInject, IsFalse)

// test MatchAndApply
apply, op := h.MatchAndApply(startLocation, endLocation, event2.Header.Timestamp)
apply, op := h.MatchAndApply(startLocation, endLocation, event2)
c.Assert(apply, IsTrue)
c.Assert(op, Equals, pb.ErrorOp_Inject)
apply, op = h.MatchAndApply(endLocation, nextLocation, event2.Header.Timestamp)
c.Assert(apply, IsFalse)
c.Assert(op, Equals, pb.ErrorOp_InvalidErrorOp)

// test set all injected
h.SetHasAllInjected(endLocation)
apply, op = h.MatchAndApply(startLocation, endLocation, event2.Header.Timestamp)
c.Assert(apply, IsTrue)
c.Assert(op, Equals, pb.ErrorOp_Inject)
h.SetHasAllInjected(startLocation)
apply, op = h.MatchAndApply(startLocation, endLocation, event2.Header.Timestamp)
apply, op = h.MatchAndApply(endLocation, nextLocation, event2)
c.Assert(apply, IsFalse)
c.Assert(op, Equals, pb.ErrorOp_InvalidErrorOp)
}
Expand Down
2 changes: 1 addition & 1 deletion dm/syncer/handle_error.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ func (s *Syncer) HandleError(ctx context.Context, req *pb.HandleWorkerErrorReque
return "", fmt.Errorf("source '%s' has no error", s.cfg.SourceID)
}

if !isQueryEvent {
if !isQueryEvent && req.Op != pb.ErrorOp_Inject {
return "", fmt.Errorf("only support to handle ddl error currently, see https://docs.pingcap.com/tidb-data-migration/stable/error-handling for other errors")
}
pos = startLocation.Position.String()
Expand Down
36 changes: 17 additions & 19 deletions dm/syncer/syncer.go
Original file line number Diff line number Diff line change
Expand Up @@ -1877,11 +1877,12 @@ func (s *Syncer) Run(ctx context.Context) (err error) {

tctx.L().Debug("receive binlog event", zap.Reflect("header", e.Header))

// TODO: support all event
// support QueryEvent and RowsEvent
// we calculate startLocation and endLocation(currentLocation) for Query event here
// set startLocation empty for other events to avoid misuse
startLocation = binlog.Location{}
if ev, ok := e.Event.(*replication.QueryEvent); ok {
switch ev := e.Event.(type) {
case *replication.QueryEvent, *replication.RowsEvent:
startLocation = binlog.InitLocation(
mysql.Position{
Name: lastLocation.Position.Name,
Expand All @@ -1904,19 +1905,22 @@ func (s *Syncer) Run(ctx context.Context) (err error) {
)
currentLocation.Suffix = endSuffix

err = currentLocation.SetGTID(ev.GSet)
if err != nil {
return terror.Annotatef(err, "fail to record GTID %v", ev.GSet)
if queryEvent, ok := ev.(*replication.QueryEvent); ok {
err = currentLocation.SetGTID(queryEvent.GSet)
if err != nil {
return terror.Annotatef(err, "fail to record GTID %v", queryEvent.GSet)
}
}

if !s.isReplacingOrInjectingErr {
apply, op := s.errOperatorHolder.MatchAndApply(startLocation, currentLocation, e.Header.Timestamp)
apply, op := s.errOperatorHolder.MatchAndApply(startLocation, currentLocation, e)
if apply {
if op == pb.ErrorOp_Replace || op == pb.ErrorOp_Inject {
s.isReplacingOrInjectingErr = true
// revert currentLocation to startLocation
currentLocation = startLocation
} else if op == pb.ErrorOp_Skip {
queryEvent := ev.(*replication.QueryEvent)
ec := eventContext{
tctx: tctx,
header: e.Header,
Expand All @@ -1925,9 +1929,9 @@ func (s *Syncer) Run(ctx context.Context) (err error) {
lastLocation: &lastLocation,
}
var sourceTbls map[string]map[string]struct{}
sourceTbls, err = s.trackOriginDDL(ev, ec)
sourceTbls, err = s.trackOriginDDL(queryEvent, ec)
if err != nil {
tctx.L().Warn("failed to track query when handle-error skip", zap.Error(err), zap.ByteString("sql", ev.Query))
tctx.L().Warn("failed to track query when handle-error skip", zap.Error(err), zap.ByteString("sql", queryEvent.Query))
}

s.saveGlobalPoint(currentLocation)
Expand All @@ -1951,21 +1955,15 @@ func (s *Syncer) Run(ctx context.Context) (err error) {
}
}
// set endLocation.Suffix=0 of last replace or inject event
// also redirect stream to next event
if currentLocation.Suffix > 0 && e.Header.EventSize > 0 {
currentLocation.Suffix = 0
s.isReplacingOrInjectingErr = false
s.locations.reset(currentLocation)
if s.errOperatorHolder.IsInject(startLocation) {
s.errOperatorHolder.SetHasAllInjected(startLocation)
// reset event as startLocation, avoid to be marked in checkpoint
currentLocation.Position.Pos = startLocation.Position.Pos
err = s.streamerController.RedirectStreamer(tctx, startLocation)
} else {
err = s.streamerController.RedirectStreamer(tctx, currentLocation)
}
if err != nil {
return err
if !s.errOperatorHolder.IsInject(startLocation) {
// replace operator need redirect to currentLocation
if err = s.streamerController.RedirectStreamer(tctx, currentLocation); err != nil {
return err
}
}
}
}
Expand Down
11 changes: 6 additions & 5 deletions dm/tests/handle_error/run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -365,17 +365,18 @@ function DM_INJECT_DML_ERROR_CASE() {
"binlog inject test alter table ${db}.${tb1} drop index b;alter table ${db}.${tb1} add unique(c);" \
"\"result\": true" 2

run_sql_tidb_with_retry "select count(1) from ${db}.${tb} where d = 2;" "count(1): 1"
run_sql_tidb_with_retry "select count(1) from ${db}.${tb} where b = 2;" "count(1): 2"
}

function DM_INJECT_ERROR() {
# inject at ddl
run_case INJECT_DDL_ERROR "single-source-no-sharding" \
"run_sql_source1 \"create table ${db}.${tb1} (a int unique, b int);\"" \
"clean_table" ""
# TODO inject dml, because get dml error position is not supported.
# run_case INJECT_DML_ERROR "single-source-no-sharding" \
# "run_sql_source1 \"create table ${db}.${tb1} (a int unique, b varchar(10));\"" \
# "clean_table" ""
# inject at dml
run_case INJECT_DML_ERROR "single-source-no-sharding" \
"run_sql_source1 \"create table ${db}.${tb1} (a int unique, b varchar(10));\"" \
"clean_table" ""
}

function DM_LIST_ERROR_CASE() {
Expand Down

0 comments on commit 5459951

Please sign in to comment.