From 5459951cc902021092cbc3ce5f9c2bbc2c4e9bf4 Mon Sep 17 00:00:00 2001 From: WizardXiao <89761062+WizardXiao@users.noreply.github.com> Date: Tue, 8 Feb 2022 14:19:35 +0800 Subject: [PATCH] syncer(dm): enhance error handling about add inject at dml event (#4259) close pingcap/tiflow#4260 --- dm/syncer/err-operator/operator.go | 47 +++++++++---------------- dm/syncer/err-operator/operator_test.go | 26 +++++--------- dm/syncer/handle_error.go | 2 +- dm/syncer/syncer.go | 36 +++++++++---------- dm/tests/handle_error/run.sh | 11 +++--- 5 files changed, 49 insertions(+), 73 deletions(-) diff --git a/dm/syncer/err-operator/operator.go b/dm/syncer/err-operator/operator.go index 2ce26470ef9..e99e57c4e44 100644 --- a/dm/syncer/err-operator/operator.go +++ b/dm/syncer/err-operator/operator.go @@ -34,11 +34,10 @@ import ( // Operator contains an operation for specified binlog pos // used by `handle-error`. type Operator struct { - uuid string // add a UUID, make it more friendly to be traced in log - op pb.ErrorOp - isAllInjected bool // is all injected, used by inject - events []*replication.BinlogEvent // startLocation -> events - originReq *pb.HandleWorkerErrorRequest + uuid string // add a UUID, make it more friendly to be traced in log + op pb.ErrorOp + events []*replication.BinlogEvent // ddls -> events + originReq *pb.HandleWorkerErrorRequest } // newOperator creates a new operator with a random UUID. @@ -130,18 +129,6 @@ func (h *Holder) GetBehindCommands(pos string) []*pb.HandleWorkerErrorRequest { return res } -func (h *Holder) SetHasAllInjected(startLocation binlog.Location) { - h.mu.Lock() - defer h.mu.Unlock() - - key := startLocation.Position.String() - operator, ok := h.operators[key] - if !ok { - return - } - operator.isAllInjected = true -} - func (h *Holder) IsInject(startLocation binlog.Location) bool { h.mu.RLock() defer h.mu.RUnlock() @@ -186,7 +173,7 @@ func (h *Holder) GetEvent(startLocation binlog.Location) (*replication.BinlogEve } // MatchAndApply tries to match operation for event by location and apply it on replace events. -func (h *Holder) MatchAndApply(startLocation, endLocation binlog.Location, realEventHeaderTS uint32) (bool, pb.ErrorOp) { +func (h *Holder) MatchAndApply(startLocation, endLocation binlog.Location, currentEvent *replication.BinlogEvent) (bool, pb.ErrorOp) { h.mu.Lock() defer h.mu.Unlock() @@ -196,10 +183,6 @@ func (h *Holder) MatchAndApply(startLocation, endLocation binlog.Location, realE return false, pb.ErrorOp_InvalidErrorOp } - if operator.isAllInjected { - return false, pb.ErrorOp_InvalidErrorOp - } - if operator.op == pb.ErrorOp_Replace || operator.op == pb.ErrorOp_Inject { if len(operator.events) == 0 { // this should not happen @@ -209,7 +192,7 @@ func (h *Holder) MatchAndApply(startLocation, endLocation binlog.Location, realE // set LogPos as start position for _, ev := range operator.events { ev.Header.LogPos = startLocation.Position.Pos - ev.Header.Timestamp = realEventHeaderTS + ev.Header.Timestamp = currentEvent.Header.Timestamp if e, ok := ev.Event.(*replication.QueryEvent); ok { if startLocation.GetGTID() != nil { e.GSet = startLocation.GetGTID().Origin() @@ -217,14 +200,18 @@ func (h *Holder) MatchAndApply(startLocation, endLocation binlog.Location, realE } } - // set the last replace event as end position - e := operator.events[len(operator.events)-1] - e.Header.EventSize = endLocation.Position.Pos - startLocation.Position.Pos - e.Header.LogPos = endLocation.Position.Pos - if e, ok := e.Event.(*replication.QueryEvent); ok { - if endLocation.GetGTID() != nil { - e.GSet = endLocation.GetGTID().Origin() + if operator.op == pb.ErrorOp_Replace { + // set the last replace event as end position + e := operator.events[len(operator.events)-1] + e.Header.EventSize = endLocation.Position.Pos - startLocation.Position.Pos + e.Header.LogPos = endLocation.Position.Pos + if e, ok := e.Event.(*replication.QueryEvent); ok { + if endLocation.GetGTID() != nil { + e.GSet = endLocation.GetGTID().Origin() + } } + } else if operator.op == pb.ErrorOp_Inject { + operator.events = append(operator.events, currentEvent) } } diff --git a/dm/syncer/err-operator/operator_test.go b/dm/syncer/err-operator/operator_test.go index 929e91b113b..8dba3cccf63 100644 --- a/dm/syncer/err-operator/operator_test.go +++ b/dm/syncer/err-operator/operator_test.go @@ -86,14 +86,14 @@ func (o *testOperatorSuite) TestOperator(c *C) { // skip event err = h.Set(&pb.HandleWorkerErrorRequest{Op: pb.ErrorOp_Skip, BinlogPos: startLocation.Position.String()}, nil) c.Assert(err, IsNil) - apply, op := h.MatchAndApply(startLocation, endLocation, event1.Header.Timestamp) + apply, op := h.MatchAndApply(startLocation, endLocation, event1) c.Assert(apply, IsTrue) c.Assert(op, Equals, pb.ErrorOp_Skip) // overwrite operator err = h.Set(&pb.HandleWorkerErrorRequest{Op: pb.ErrorOp_Replace, BinlogPos: startLocation.Position.String()}, []*replication.BinlogEvent{event1, event2}) c.Assert(err, IsNil) - apply, op = h.MatchAndApply(startLocation, endLocation, event2.Header.Timestamp) + apply, op = h.MatchAndApply(startLocation, endLocation, event2) c.Assert(apply, IsTrue) c.Assert(op, Equals, pb.ErrorOp_Replace) @@ -126,7 +126,7 @@ func (o *testOperatorSuite) TestOperator(c *C) { // revert exist operator err = h.Set(&pb.HandleWorkerErrorRequest{Op: pb.ErrorOp_Revert, BinlogPos: startLocation.Position.String()}, nil) c.Assert(err, IsNil) - apply, op = h.MatchAndApply(startLocation, endLocation, event1.Header.Timestamp) + apply, op = h.MatchAndApply(startLocation, endLocation, event1) c.Assert(apply, IsFalse) c.Assert(op, Equals, pb.ErrorOp_InvalidErrorOp) @@ -139,17 +139,17 @@ func (o *testOperatorSuite) TestOperator(c *C) { // test removeOutdated flushLocation := startLocation c.Assert(h.RemoveOutdated(flushLocation), IsNil) - apply, op = h.MatchAndApply(startLocation, endLocation, event1.Header.Timestamp) + apply, op = h.MatchAndApply(startLocation, endLocation, event1) c.Assert(apply, IsTrue) c.Assert(op, Equals, pb.ErrorOp_Replace) flushLocation = endLocation c.Assert(h.RemoveOutdated(flushLocation), IsNil) - apply, op = h.MatchAndApply(startLocation, endLocation, event1.Header.Timestamp) + apply, op = h.MatchAndApply(startLocation, endLocation, event1) c.Assert(apply, IsFalse) c.Assert(op, Equals, pb.ErrorOp_InvalidErrorOp) - apply, op = h.MatchAndApply(endLocation, nextLocation, event1.Header.Timestamp) + apply, op = h.MatchAndApply(endLocation, nextLocation, event1) c.Assert(apply, IsTrue) c.Assert(op, Equals, pb.ErrorOp_Replace) } @@ -211,20 +211,10 @@ func (o *testOperatorSuite) TestInjectOperator(c *C) { c.Assert(isInject, IsFalse) // test MatchAndApply - apply, op := h.MatchAndApply(startLocation, endLocation, event2.Header.Timestamp) + apply, op := h.MatchAndApply(startLocation, endLocation, event2) c.Assert(apply, IsTrue) c.Assert(op, Equals, pb.ErrorOp_Inject) - apply, op = h.MatchAndApply(endLocation, nextLocation, event2.Header.Timestamp) - c.Assert(apply, IsFalse) - c.Assert(op, Equals, pb.ErrorOp_InvalidErrorOp) - - // test set all injected - h.SetHasAllInjected(endLocation) - apply, op = h.MatchAndApply(startLocation, endLocation, event2.Header.Timestamp) - c.Assert(apply, IsTrue) - c.Assert(op, Equals, pb.ErrorOp_Inject) - h.SetHasAllInjected(startLocation) - apply, op = h.MatchAndApply(startLocation, endLocation, event2.Header.Timestamp) + apply, op = h.MatchAndApply(endLocation, nextLocation, event2) c.Assert(apply, IsFalse) c.Assert(op, Equals, pb.ErrorOp_InvalidErrorOp) } diff --git a/dm/syncer/handle_error.go b/dm/syncer/handle_error.go index 5103974aafa..413115ea171 100644 --- a/dm/syncer/handle_error.go +++ b/dm/syncer/handle_error.go @@ -39,7 +39,7 @@ func (s *Syncer) HandleError(ctx context.Context, req *pb.HandleWorkerErrorReque return "", fmt.Errorf("source '%s' has no error", s.cfg.SourceID) } - if !isQueryEvent { + if !isQueryEvent && req.Op != pb.ErrorOp_Inject { return "", fmt.Errorf("only support to handle ddl error currently, see https://docs.pingcap.com/tidb-data-migration/stable/error-handling for other errors") } pos = startLocation.Position.String() diff --git a/dm/syncer/syncer.go b/dm/syncer/syncer.go index 0c0ca3d9392..c1e9be052b6 100644 --- a/dm/syncer/syncer.go +++ b/dm/syncer/syncer.go @@ -1877,11 +1877,12 @@ func (s *Syncer) Run(ctx context.Context) (err error) { tctx.L().Debug("receive binlog event", zap.Reflect("header", e.Header)) - // TODO: support all event + // support QueryEvent and RowsEvent // we calculate startLocation and endLocation(currentLocation) for Query event here // set startLocation empty for other events to avoid misuse startLocation = binlog.Location{} - if ev, ok := e.Event.(*replication.QueryEvent); ok { + switch ev := e.Event.(type) { + case *replication.QueryEvent, *replication.RowsEvent: startLocation = binlog.InitLocation( mysql.Position{ Name: lastLocation.Position.Name, @@ -1904,19 +1905,22 @@ func (s *Syncer) Run(ctx context.Context) (err error) { ) currentLocation.Suffix = endSuffix - err = currentLocation.SetGTID(ev.GSet) - if err != nil { - return terror.Annotatef(err, "fail to record GTID %v", ev.GSet) + if queryEvent, ok := ev.(*replication.QueryEvent); ok { + err = currentLocation.SetGTID(queryEvent.GSet) + if err != nil { + return terror.Annotatef(err, "fail to record GTID %v", queryEvent.GSet) + } } if !s.isReplacingOrInjectingErr { - apply, op := s.errOperatorHolder.MatchAndApply(startLocation, currentLocation, e.Header.Timestamp) + apply, op := s.errOperatorHolder.MatchAndApply(startLocation, currentLocation, e) if apply { if op == pb.ErrorOp_Replace || op == pb.ErrorOp_Inject { s.isReplacingOrInjectingErr = true // revert currentLocation to startLocation currentLocation = startLocation } else if op == pb.ErrorOp_Skip { + queryEvent := ev.(*replication.QueryEvent) ec := eventContext{ tctx: tctx, header: e.Header, @@ -1925,9 +1929,9 @@ func (s *Syncer) Run(ctx context.Context) (err error) { lastLocation: &lastLocation, } var sourceTbls map[string]map[string]struct{} - sourceTbls, err = s.trackOriginDDL(ev, ec) + sourceTbls, err = s.trackOriginDDL(queryEvent, ec) if err != nil { - tctx.L().Warn("failed to track query when handle-error skip", zap.Error(err), zap.ByteString("sql", ev.Query)) + tctx.L().Warn("failed to track query when handle-error skip", zap.Error(err), zap.ByteString("sql", queryEvent.Query)) } s.saveGlobalPoint(currentLocation) @@ -1951,21 +1955,15 @@ func (s *Syncer) Run(ctx context.Context) (err error) { } } // set endLocation.Suffix=0 of last replace or inject event - // also redirect stream to next event if currentLocation.Suffix > 0 && e.Header.EventSize > 0 { currentLocation.Suffix = 0 s.isReplacingOrInjectingErr = false s.locations.reset(currentLocation) - if s.errOperatorHolder.IsInject(startLocation) { - s.errOperatorHolder.SetHasAllInjected(startLocation) - // reset event as startLocation, avoid to be marked in checkpoint - currentLocation.Position.Pos = startLocation.Position.Pos - err = s.streamerController.RedirectStreamer(tctx, startLocation) - } else { - err = s.streamerController.RedirectStreamer(tctx, currentLocation) - } - if err != nil { - return err + if !s.errOperatorHolder.IsInject(startLocation) { + // replace operator need redirect to currentLocation + if err = s.streamerController.RedirectStreamer(tctx, currentLocation); err != nil { + return err + } } } } diff --git a/dm/tests/handle_error/run.sh b/dm/tests/handle_error/run.sh index 4e6b43ddbad..3b4417544e6 100644 --- a/dm/tests/handle_error/run.sh +++ b/dm/tests/handle_error/run.sh @@ -365,17 +365,18 @@ function DM_INJECT_DML_ERROR_CASE() { "binlog inject test alter table ${db}.${tb1} drop index b;alter table ${db}.${tb1} add unique(c);" \ "\"result\": true" 2 - run_sql_tidb_with_retry "select count(1) from ${db}.${tb} where d = 2;" "count(1): 1" + run_sql_tidb_with_retry "select count(1) from ${db}.${tb} where b = 2;" "count(1): 2" } function DM_INJECT_ERROR() { + # inject at ddl run_case INJECT_DDL_ERROR "single-source-no-sharding" \ "run_sql_source1 \"create table ${db}.${tb1} (a int unique, b int);\"" \ "clean_table" "" - # TODO inject dml, because get dml error position is not supported. - # run_case INJECT_DML_ERROR "single-source-no-sharding" \ - # "run_sql_source1 \"create table ${db}.${tb1} (a int unique, b varchar(10));\"" \ - # "clean_table" "" + # inject at dml + run_case INJECT_DML_ERROR "single-source-no-sharding" \ + "run_sql_source1 \"create table ${db}.${tb1} (a int unique, b varchar(10));\"" \ + "clean_table" "" } function DM_LIST_ERROR_CASE() {