diff --git a/dm/pkg/schema/tracker.go b/dm/pkg/schema/tracker.go index a9e22857156..898c7b101b3 100644 --- a/dm/pkg/schema/tracker.go +++ b/dm/pkg/schema/tracker.go @@ -381,13 +381,13 @@ func (tr *Tracker) GetDownStreamTableInfo(tctx *tcontext.Context, tableID string dti, ok := tr.dsTracker.tableInfos[tableID] if !ok { tctx.Logger.Info("Downstream schema tracker init. ", zap.String("tableID", tableID)) - ti, err := tr.getTableInfoByCreateStmt(tctx, tableID) + downstreamTI, err := tr.getTableInfoByCreateStmt(tctx, tableID) if err != nil { tctx.Logger.Error("Init dowstream schema info error. ", zap.String("tableID", tableID), zap.Error(err)) return nil, err } - dti = GetDownStreamTi(ti, originTi) + dti = GetDownStreamTI(downstreamTI, originTi) tr.dsTracker.tableInfos[tableID] = dti } return dti, nil @@ -396,9 +396,14 @@ func (tr *Tracker) GetDownStreamTableInfo(tctx *tcontext.Context, tableID string // GetAvailableDownStreamUKIndexInfo gets available downstream UK whose data is not null. // note. this function will not init downstreamTrack. func (tr *Tracker) GetAvailableDownStreamUKIndexInfo(tableID string, data []interface{}) *model.IndexInfo { - dti, ok := tr.dsTracker.tableInfos[tableID] + dti := tr.dsTracker.tableInfos[tableID] + + return GetIdentityUKByData(dti, data) +} - if !ok || len(dti.AvailableUKIndexList) == 0 { +// GetIdentityUKByData gets available downstream UK whose data is not null. +func GetIdentityUKByData(downstreamTI *DownstreamTableInfo, data []interface{}) *model.IndexInfo { + if downstreamTI == nil || len(downstreamTI.AvailableUKIndexList) == 0 { return nil } // func for check data is not null @@ -406,7 +411,7 @@ func (tr *Tracker) GetAvailableDownStreamUKIndexInfo(tableID string, data []inte return data[i] != nil } - for _, uk := range dti.AvailableUKIndexList { + for _, uk := range downstreamTI.AvailableUKIndexList { // check uk's column data is not null if isSpecifiedIndexColumn(uk, fn) { return uk @@ -483,8 +488,8 @@ func (tr *Tracker) initDownStreamSQLModeAndParser(tctx *tcontext.Context) error return nil } -// GetDownStreamTi constructs downstreamTable index cache by tableinfo. -func GetDownStreamTi(ti *model.TableInfo, originTi *model.TableInfo) *DownstreamTableInfo { +// GetDownStreamTI constructs downstreamTable index cache by tableinfo. +func GetDownStreamTI(downstreamTI *model.TableInfo, originTi *model.TableInfo) *DownstreamTableInfo { var ( absoluteUKIndexInfo *model.IndexInfo availableUKIndexList = []*model.IndexInfo{} @@ -494,10 +499,10 @@ func GetDownStreamTi(ti *model.TableInfo, originTi *model.TableInfo) *Downstream // func for check not null constraint fn := func(i int) bool { - return mysql.HasNotNullFlag(ti.Columns[i].Flag) + return mysql.HasNotNullFlag(downstreamTI.Columns[i].Flag) } - for i, idx := range ti.Indices { + for i, idx := range downstreamTI.Indices { if !idx.Primary && !idx.Unique { continue } @@ -520,7 +525,7 @@ func GetDownStreamTi(ti *model.TableInfo, originTi *model.TableInfo) *Downstream // handle pk exceptional case. // e.g. "create table t(a int primary key, b int)". if !hasPk { - exPk := redirectIndexKeys(handlePkExCase(ti), originTi) + exPk := redirectIndexKeys(handlePkExCase(downstreamTI), originTi) if exPk != nil { absoluteUKIndexInfo = exPk absoluteUKPosition = len(availableUKIndexList) @@ -534,7 +539,7 @@ func GetDownStreamTi(ti *model.TableInfo, originTi *model.TableInfo) *Downstream } return &DownstreamTableInfo{ - TableInfo: ti, + TableInfo: downstreamTI, AbsoluteUKIndexInfo: absoluteUKIndexInfo, AvailableUKIndexList: availableUKIndexList, } diff --git a/dm/syncer/causality.go b/dm/syncer/causality.go index ab5eb8e8699..72b7546797b 100644 --- a/dm/syncer/causality.go +++ b/dm/syncer/causality.go @@ -20,6 +20,7 @@ import ( "go.uber.org/zap" "github.com/pingcap/tidb/sessionctx" + "github.com/pingcap/tiflow/dm/pkg/log" "github.com/pingcap/tiflow/dm/syncer/metrics" ) @@ -79,7 +80,7 @@ func (c *causality) run() { c.relation.gc(j.flushSeq) continue default: - keys := j.dml.identifyKeys(c.sessCtx) + keys := j.dml.CausalityKeys() // detectConflict before add if c.detectConflict(keys) { @@ -87,8 +88,8 @@ func (c *causality) run() { c.outCh <- newConflictJob(c.workerCount) c.relation.clear() } - j.dml.key = c.add(keys) - c.logger.Debug("key for keys", zap.String("key", j.dml.key), zap.Strings("keys", keys)) + j.dmlQueueKey = c.add(keys) + c.logger.Debug("key for keys", zap.String("key", j.dmlQueueKey), zap.Strings("keys", keys)) } metrics.ConflictDetectDurationHistogram.WithLabelValues(c.task, c.source).Observe(time.Since(startTime).Seconds()) diff --git a/dm/syncer/causality_test.go b/dm/syncer/causality_test.go index 1354ca7a979..1109b8c1b80 100644 --- a/dm/syncer/causality_test.go +++ b/dm/syncer/causality_test.go @@ -83,7 +83,7 @@ func (s *testSyncerSuite) TestCasuality(c *C) { Length: types.UnspecifiedLength, }}, } - downTi := schema.GetDownStreamTi(ti, ti) + downTi := schema.GetDownStreamTI(ti, ti) c.Assert(downTi, NotNil) jobCh := make(chan *job, 10) @@ -152,7 +152,7 @@ func (s *testSyncerSuite) TestCasualityWithPrefixIndex(c *C) { schemaStr := "create table t (c1 text, c2 int unique, unique key c1(c1(3)));" ti, err := createTableInfo(p, se, int64(0), schemaStr) c.Assert(err, IsNil) - downTi := schema.GetDownStreamTi(ti, ti) + downTi := schema.GetDownStreamTI(ti, ti) c.Assert(downTi, NotNil) c.Assert(len(downTi.AvailableUKIndexList) == 2, IsTrue) tiIndex := downTi.AvailableUKIndexList[0] diff --git a/dm/syncer/compactor.go b/dm/syncer/compactor.go index 62c907f037d..7e2be747c3e 100644 --- a/dm/syncer/compactor.go +++ b/dm/syncer/compactor.go @@ -89,17 +89,17 @@ func (c *compactor) run() { // set safeMode when receive first job if len(c.buffer) == 0 { - c.safeMode = j.dml.safeMode + c.safeMode = j.safeMode } // if dml has no PK/NOT NULL UK, do not compact it. - if j.dml.identifyColumns() == nil { + if !j.dml.HasNotNullUniqueIdx() { c.buffer = append(c.buffer, j) continue } // if update job update its identify keys, turn it into delete + insert - if j.dml.op == update && j.dml.updateIdentify() { - delDML, insertDML := updateToDelAndInsert(j.dml) + if j.dml.IsIdentityUpdated() { + delDML, insertDML := j.dml.Split() delJob := j.clone() delJob.tp = del delJob.dml = delDML @@ -142,7 +142,7 @@ func (c *compactor) flushBuffer() { if j != nil { // set safemode for all jobs by first job in buffer. // or safemode for insert(delete + insert = insert with safemode) - j.dml.safeMode = c.safeMode || j.dml.safeMode + j.safeMode = c.safeMode || j.safeMode c.outCh <- j } } @@ -162,7 +162,7 @@ func (c *compactor) flushBuffer() { // DELETE + UPDATE => X _| // . func (c *compactor) compactJob(j *job) { - tableName := j.dml.targetTableID + tableName := j.dml.TargetTableID() tableKeyMap, ok := c.keyMap[tableName] if !ok { // do not alloc a large buffersize, otherwise if the downstream latency is low @@ -171,7 +171,7 @@ func (c *compactor) compactJob(j *job) { tableKeyMap = c.keyMap[tableName] } - key := j.dml.identifyKey() + key := j.dml.IdentityKey() failpoint.Inject("DownstreamIdentifyKeyCheckInCompact", func(v failpoint.Value) { value, err := strconv.Atoi(key) @@ -197,20 +197,17 @@ func (c *compactor) compactJob(j *job) { if prevJob.tp == insert { // INSERT + UPDATE => INSERT j.tp = insert - j.dml.oldValues = nil - j.dml.originOldValues = nil - j.dml.op = insert + j.dml.Reduce(prevJob.dml) // DELETE + INSERT + UPDATE => INSERT with safemode - j.dml.safeMode = prevJob.dml.safeMode + j.safeMode = prevJob.safeMode } else if prevJob.tp == update { // UPDATE + UPDATE => UPDATE - j.dml.oldValues = prevJob.dml.oldValues - j.dml.originOldValues = prevJob.dml.originOldValues + j.dml.Reduce(prevJob.dml) } case insert: if prevJob.tp == del { // DELETE + INSERT => INSERT with safemode - j.dml.safeMode = true + j.safeMode = true } case del: // do nothing because anything + DELETE => DELETE diff --git a/dm/syncer/compactor_test.go b/dm/syncer/compactor_test.go index 06ad3993791..506f9581a7f 100644 --- a/dm/syncer/compactor_test.go +++ b/dm/syncer/compactor_test.go @@ -91,7 +91,7 @@ func (s *testSyncerSuite) TestCompactJob(c *C) { Length: types.UnspecifiedLength, }}, } - downTi := schema.GetDownStreamTi(ti, ti) + downTi := schema.GetDownStreamTI(ti, ti) c.Assert(downTi, NotNil) var dml *DML @@ -208,7 +208,7 @@ func (s *testSyncerSuite) TestCompactorSafeMode(c *C) { Length: types.UnspecifiedLength, }}, } - downTi := schema.GetDownStreamTi(ti, ti) + downTi := schema.GetDownStreamTI(ti, ti) c.Assert(downTi, NotNil) testCases := []struct { diff --git a/dm/syncer/dml.go b/dm/syncer/dml.go index e5f5e8acc8f..40b57f0a6f6 100644 --- a/dm/syncer/dml.go +++ b/dm/syncer/dml.go @@ -32,11 +32,13 @@ import ( "github.com/shopspring/decimal" "go.uber.org/zap" + cdcmodel "github.com/pingcap/tiflow/cdc/model" tcontext "github.com/pingcap/tiflow/dm/pkg/context" "github.com/pingcap/tiflow/dm/pkg/log" "github.com/pingcap/tiflow/dm/pkg/schema" "github.com/pingcap/tiflow/dm/pkg/terror" "github.com/pingcap/tiflow/dm/pkg/utils" + "github.com/pingcap/tiflow/pkg/sqlmodel" ) // this type is used to generate DML SQL, opType is used to mark type in binlog. @@ -68,14 +70,12 @@ func (op dmlOpType) String() (str string) { // genDMLParam stores pruned columns, data as well as the original columns, data, index. type genDMLParam struct { - targetTableID string // as a key in map like `schema`.`table` - sourceTable *filter.Table // origin table - safeMode bool // only used in update - data [][]interface{} // pruned data - originalData [][]interface{} // all data - columns []*model.ColumnInfo // pruned columns - sourceTableInfo *model.TableInfo // all table info - extendData [][]interface{} // all data include extend data + sourceTable *filter.Table // origin table + targetTable *filter.Table + safeMode bool // only used in update + originalData [][]interface{} // all data + sourceTableInfo *model.TableInfo // all table info + extendData [][]interface{} // all data include extend data } // extractValueFromData adjust the values obtained from go-mysql so that @@ -117,15 +117,13 @@ func extractValueFromData(data []interface{}, columns []*model.ColumnInfo, sourc return value } -func (s *Syncer) genAndFilterInsertDMLs(tctx *tcontext.Context, param *genDMLParam, filterExprs []expression.Expression) ([]*DML, error) { +func (s *Syncer) genAndFilterInsertDMLs(tctx *tcontext.Context, param *genDMLParam, filterExprs []expression.Expression) ([]*sqlmodel.RowChange, error) { var ( - tableID = param.targetTableID - dataSeq = param.data + tableID = utils.GenTableID(param.targetTable) originalDataSeq = param.originalData - columns = param.columns ti = param.sourceTableInfo extendData = param.extendData - dmls = make([]*DML, 0, len(dataSeq)) + dmls = make([]*sqlmodel.RowChange, 0, len(originalDataSeq)) ) // if downstream pk/uk(not null) exits, then use downstream pk/uk(not null) @@ -133,23 +131,18 @@ func (s *Syncer) genAndFilterInsertDMLs(tctx *tcontext.Context, param *genDMLPar if err != nil { return nil, err } - downstreamIndexColumns := downstreamTableInfo.AbsoluteUKIndexInfo if extendData != nil { originalDataSeq = extendData } RowLoop: - for dataIdx, data := range dataSeq { - if len(data) != len(columns) { - return nil, terror.ErrSyncerUnitDMLColumnNotMatch.Generate(len(columns), len(data)) + for dataIdx, data := range originalDataSeq { + if len(data) != len(ti.Columns) { + return nil, terror.ErrSyncerUnitDMLColumnNotMatch.Generate(len(ti.Columns), len(data)) } - value := extractValueFromData(data, columns, ti) - originalValue := value - if len(columns) != len(ti.Columns) { - originalValue = extractValueFromData(originalDataSeq[dataIdx], ti.Columns, ti) - } + originalValue := extractValueFromData(originalDataSeq[dataIdx], ti.Columns, ti) for _, expr := range filterExprs { skip, err := SkipDMLByExpression(s.sessCtx, originalValue, expr, ti.Columns) @@ -162,11 +155,17 @@ RowLoop: } } - if downstreamIndexColumns == nil { - downstreamIndexColumns = s.schemaTracker.GetAvailableDownStreamUKIndexInfo(tableID, value) - } - - dmls = append(dmls, newDML(insert, param.safeMode, tableID, param.sourceTable, nil, value, nil, originalValue, columns, ti, downstreamIndexColumns, downstreamTableInfo)) + rowChange := sqlmodel.NewRowChange( + &cdcmodel.TableName{Schema: param.sourceTable.Schema, Table: param.sourceTable.Name}, + &cdcmodel.TableName{Schema: param.targetTable.Schema, Table: param.targetTable.Name}, + nil, + originalValue, + param.sourceTableInfo, + downstreamTableInfo.TableInfo, + s.sessCtx, + ) + rowChange.SetIdentifyInfo(downstreamTableInfo) + dmls = append(dmls, rowChange) } return dmls, nil @@ -177,15 +176,13 @@ func (s *Syncer) genAndFilterUpdateDMLs( param *genDMLParam, oldValueFilters []expression.Expression, newValueFilters []expression.Expression, -) ([]*DML, error) { +) ([]*sqlmodel.RowChange, error) { var ( - tableID = param.targetTableID - data = param.data + tableID = utils.GenTableID(param.targetTable) originalData = param.originalData - columns = param.columns ti = param.sourceTableInfo extendData = param.extendData - dmls = make([]*DML, 0, len(data)/2) + dmls = make([]*sqlmodel.RowChange, 0, len(originalData)/2) ) // if downstream pk/uk(not null) exits, then use downstream pk/uk(not null) @@ -193,38 +190,26 @@ func (s *Syncer) genAndFilterUpdateDMLs( if err != nil { return nil, err } - downstreamIndexColumns := downstreamTableInfo.AbsoluteUKIndexInfo if extendData != nil { originalData = extendData } RowLoop: - for i := 0; i < len(data); i += 2 { - oldData := data[i] - changedData := data[i+1] + for i := 0; i < len(originalData); i += 2 { oriOldData := originalData[i] oriChangedData := originalData[i+1] - if len(oldData) != len(changedData) { - return nil, terror.ErrSyncerUnitDMLOldNewValueMismatch.Generate(len(oldData), len(changedData)) + if len(oriOldData) != len(oriChangedData) { + return nil, terror.ErrSyncerUnitDMLOldNewValueMismatch.Generate(len(oriOldData), len(oriChangedData)) } - if len(oldData) != len(columns) { - return nil, terror.ErrSyncerUnitDMLColumnNotMatch.Generate(len(columns), len(oldData)) + if len(oriOldData) != len(ti.Columns) { + return nil, terror.ErrSyncerUnitDMLColumnNotMatch.Generate(len(ti.Columns), len(oriOldData)) } - oldValues := extractValueFromData(oldData, columns, ti) - changedValues := extractValueFromData(changedData, columns, ti) - - var oriOldValues, oriChangedValues []interface{} - if len(columns) == len(ti.Columns) { - oriOldValues = oldValues - oriChangedValues = changedValues - } else { - oriOldValues = extractValueFromData(oriOldData, ti.Columns, ti) - oriChangedValues = extractValueFromData(oriChangedData, ti.Columns, ti) - } + oriOldValues := extractValueFromData(oriOldData, ti.Columns, ti) + oriChangedValues := extractValueFromData(oriChangedData, ti.Columns, ti) for j := range oldValueFilters { // AND logic @@ -244,23 +229,30 @@ RowLoop: } } - if downstreamIndexColumns == nil { - downstreamIndexColumns = s.schemaTracker.GetAvailableDownStreamUKIndexInfo(tableID, oriOldValues) - } + rowChange := sqlmodel.NewRowChange( + &cdcmodel.TableName{Schema: param.sourceTable.Schema, Table: param.sourceTable.Name}, + &cdcmodel.TableName{Schema: param.targetTable.Schema, Table: param.targetTable.Name}, + oriOldValues, + oriChangedValues, + param.sourceTableInfo, + downstreamTableInfo.TableInfo, + s.sessCtx, + ) + rowChange.SetIdentifyInfo(downstreamTableInfo) + dmls = append(dmls, rowChange) - dmls = append(dmls, newDML(update, param.safeMode, param.targetTableID, param.sourceTable, oldValues, changedValues, oriOldValues, oriChangedValues, columns, ti, downstreamIndexColumns, downstreamTableInfo)) } return dmls, nil } -func (s *Syncer) genAndFilterDeleteDMLs(tctx *tcontext.Context, param *genDMLParam, filterExprs []expression.Expression) ([]*DML, error) { +func (s *Syncer) genAndFilterDeleteDMLs(tctx *tcontext.Context, param *genDMLParam, filterExprs []expression.Expression) ([]*sqlmodel.RowChange, error) { var ( - tableID = param.targetTableID + tableID = utils.GenTableID(param.targetTable) dataSeq = param.originalData ti = param.sourceTableInfo extendData = param.extendData - dmls = make([]*DML, 0, len(dataSeq)) + dmls = make([]*sqlmodel.RowChange, 0, len(dataSeq)) ) // if downstream pk/uk(not null) exits, then use downstream pk/uk(not null) @@ -268,7 +260,6 @@ func (s *Syncer) genAndFilterDeleteDMLs(tctx *tcontext.Context, param *genDMLPar if err != nil { return nil, err } - downstreamIndexColumns := downstreamTableInfo.AbsoluteUKIndexInfo if extendData != nil { dataSeq = extendData @@ -293,11 +284,18 @@ RowLoop: } } - if downstreamIndexColumns == nil { - downstreamIndexColumns = s.schemaTracker.GetAvailableDownStreamUKIndexInfo(tableID, value) - } + rowChange := sqlmodel.NewRowChange( + &cdcmodel.TableName{Schema: param.sourceTable.Schema, Table: param.sourceTable.Name}, + &cdcmodel.TableName{Schema: param.targetTable.Schema, Table: param.targetTable.Name}, + value, + nil, + param.sourceTableInfo, + downstreamTableInfo.TableInfo, + s.sessCtx, + ) + rowChange.SetIdentifyInfo(downstreamTableInfo) + dmls = append(dmls, rowChange) - dmls = append(dmls, newDML(del, false, param.targetTableID, param.sourceTable, nil, value, nil, value, ti.Columns, ti, downstreamIndexColumns, downstreamTableInfo)) } return dmls, nil diff --git a/dm/syncer/dml_test.go b/dm/syncer/dml_test.go index a7748abb4bf..148c2538eff 100644 --- a/dm/syncer/dml_test.go +++ b/dm/syncer/dml_test.go @@ -224,7 +224,7 @@ func (s *testSyncerSuite) TestGenMultipleKeys(c *C) { ti, err := createTableInfo(p, se, int64(i+1), tc.schema) assert(err, IsNil) - dti := schema.GetDownStreamTi(ti, ti) + dti := schema.GetDownStreamTI(ti, ti) assert(dti, NotNil) keys := genMultipleKeys(sessCtx, dti, ti, tc.values, "table") assert(keys, DeepEquals, tc.keys) @@ -619,7 +619,7 @@ func (s *testSyncerSuite) TestTruncateIndexValues(c *C) { } ti, err := createTableInfo(p, se, int64(i+1), tc.schema) assert(err, IsNil) - dti := schema.GetDownStreamTi(ti, ti) + dti := schema.GetDownStreamTI(ti, ti) assert(dti, NotNil) assert(dti.AvailableUKIndexList, NotNil) cols := make([]*model.ColumnInfo, 0, len(dti.AvailableUKIndexList[0].Columns)) diff --git a/dm/syncer/dml_worker.go b/dm/syncer/dml_worker.go index 051bb140f05..f32d899d7e2 100644 --- a/dm/syncer/dml_worker.go +++ b/dm/syncer/dml_worker.go @@ -27,6 +27,7 @@ import ( "github.com/pingcap/tiflow/dm/pkg/utils" "github.com/pingcap/tiflow/dm/syncer/dbconn" "github.com/pingcap/tiflow/dm/syncer/metrics" + "github.com/pingcap/tiflow/pkg/sqlmodel" ) // DMLWorker is used to sync dml. @@ -132,10 +133,10 @@ func (w *DMLWorker) run() { j.flushWg.Wait() w.addCountFunc(true, adminQueueName, j.tp, 1, j.targetTable) default: - queueBucket := int(utils.GenHashKey(j.dml.key)) % w.workerCount + queueBucket := int(utils.GenHashKey(j.dmlQueueKey)) % w.workerCount w.addCountFunc(false, queueBucketMapping[queueBucket], j.tp, 1, j.targetTable) startTime := time.Now() - w.logger.Debug("queue for key", zap.Int("queue", queueBucket), zap.String("key", j.dml.key)) + w.logger.Debug("queue for key", zap.Int("queue", queueBucket), zap.String("key", j.dmlQueueKey)) jobChs[queueBucket] <- j metrics.AddJobDurationHistogram.WithLabelValues(j.tp.String(), w.task, queueBucketMapping[queueBucket], w.source).Observe(time.Since(startTime).Seconds()) } @@ -199,7 +200,7 @@ func (w *DMLWorker) executeBatchJobs(queueID int, jobs []*job) { affect int db = w.toDBConns[queueID] err error - dmls = make([]*DML, 0, len(jobs)) + dmls = make([]*sqlmodel.RowChange, 0, len(jobs)) ) defer func() { @@ -227,10 +228,7 @@ func (w *DMLWorker) executeBatchJobs(queueID int, jobs []*job) { } }) - for _, j := range jobs { - dmls = append(dmls, j.dml) - } - queries, args := w.genSQLs(dmls) + queries, args := w.genSQLs(jobs) failpoint.Inject("WaitUserCancel", func(v failpoint.Value) { t := v.(int) time.Sleep(time.Duration(t) * time.Second) @@ -248,15 +246,15 @@ func (w *DMLWorker) executeBatchJobs(queueID int, jobs []*job) { } // genSQLs generate SQLs in single row mode or multiple rows mode. -func (w *DMLWorker) genSQLs(dmls []*DML) ([]string, [][]interface{}) { +func (w *DMLWorker) genSQLs(jobs []*job) ([]string, [][]interface{}) { if w.multipleRows { return genDMLsWithSameOp(dmls) } queries := make([]string, 0, len(dmls)) args := make([][]interface{}, 0, len(dmls)) - for _, dml := range dmls { - query, arg := dml.genSQL() + for _, j := range jobs { + query, arg := j.dml.genSQL() queries = append(queries, query...) args = append(args, arg...) } diff --git a/dm/syncer/job.go b/dm/syncer/job.go index d6161b587da..c7ada2a41d3 100644 --- a/dm/syncer/job.go +++ b/dm/syncer/job.go @@ -22,6 +22,7 @@ import ( "github.com/pingcap/tidb-tools/pkg/filter" "github.com/pingcap/tiflow/dm/pkg/binlog" + "github.com/pingcap/tiflow/pkg/sqlmodel" ) type opType byte @@ -83,7 +84,9 @@ type job struct { // sql example: drop table `s1`.`t1`, `s2`.`t2`. sourceTbls map[string][]*filter.Table targetTable *filter.Table - dml *DML + dml *sqlmodel.RowChange + dmlQueueKey string + safeMode bool retry bool location binlog.Location // location of last received (ROTATE / QUERY / XID) event, for global/table checkpoint startLocation binlog.Location // start location of the sql in binlog, for handle_error @@ -112,13 +115,14 @@ func (j *job) String() string { return fmt.Sprintf("tp: %s, flushSeq: %d, dml: %s, ddls: %s, last_location: %s, start_location: %s, current_location: %s", j.tp, j.flushSeq, dmlStr, j.ddls, j.location, j.startLocation, j.currentLocation) } -func newDMLJob(tp opType, sourceTable, targetTable *filter.Table, dml *DML, ec *eventContext) *job { +func newDMLJob(tp opType, sourceTable, targetTable *filter.Table, dml *sqlmodel.RowChange, ec *eventContext) *job { return &job{ tp: tp, sourceTbls: map[string][]*filter.Table{sourceTable.Schema: {sourceTable}}, targetTable: targetTable, dml: dml, retry: true, + safeMode: ec.safeMode, location: *ec.lastLocation, startLocation: *ec.startLocation, diff --git a/dm/syncer/syncer.go b/dm/syncer/syncer.go index ab7e97fe8ee..ebbfa624c36 100644 --- a/dm/syncer/syncer.go +++ b/dm/syncer/syncer.go @@ -73,6 +73,7 @@ import ( sm "github.com/pingcap/tiflow/dm/syncer/safe-mode" "github.com/pingcap/tiflow/dm/syncer/shardddl" "github.com/pingcap/tiflow/pkg/errorutil" + "github.com/pingcap/tiflow/pkg/sqlmodel" ) var ( @@ -2245,26 +2246,15 @@ func (s *Syncer) handleRowsEvent(ev *replication.RowsEvent, ec eventContext) err } extRows := generateExtendColumn(originRows, s.tableRouter, sourceTable, s.cfg.SourceID) - rows := originRows - if extRows != nil { - rows = extRows - } - - prunedColumns, prunedRows, err := pruneGeneratedColumnDML(tableInfo, rows) - if err != nil { - return err - } var ( - dmls []*DML + dmls []*sqlmodel.RowChange jobType opType ) param := &genDMLParam{ - targetTableID: utils.GenTableID(targetTable), - data: prunedRows, + targetTable: targetTable, originalData: originRows, - columns: prunedColumns, sourceTableInfo: tableInfo, sourceTable: sourceTable, extendData: extRows, diff --git a/pkg/sqlmodel/causality.go b/pkg/sqlmodel/causality.go new file mode 100644 index 00000000000..a688fe01318 --- /dev/null +++ b/pkg/sqlmodel/causality.go @@ -0,0 +1,158 @@ +// Copyright 2022 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package sqlmodel + +import ( + "fmt" + "strconv" + "strings" + + timodel "github.com/pingcap/tidb/parser/model" + "github.com/pingcap/tidb/sessionctx" + "github.com/pingcap/tidb/tablecodec" + "go.uber.org/zap" + + "github.com/pingcap/tiflow/dm/pkg/log" + "github.com/pingcap/tiflow/dm/pkg/utils" +) + +// CausalityKeys returns all string representation of causality keys. If two row changes has the same causality keys, +// they must be replicated sequentially. +func (r *RowChange) CausalityKeys() []string { + r.lazyInitIdentityInfo() + + ret := make([]string, 0, 1) + if r.preValues != nil { + ret = append(ret, r.getCausalityString(r.preValues)...) + } + if r.postValues != nil { + ret = append(ret, r.getCausalityString(r.postValues)...) + } + return ret +} + +func columnValue2String(value interface{}) string { + var data string + switch v := value.(type) { + case nil: + data = "null" + case bool: + if v { + data = "1" + } else { + data = "0" + } + case int: + data = strconv.FormatInt(int64(v), 10) + case int8: + data = strconv.FormatInt(int64(v), 10) + case int16: + data = strconv.FormatInt(int64(v), 10) + case int32: + data = strconv.FormatInt(int64(v), 10) + case int64: + data = strconv.FormatInt(v, 10) + case uint8: + data = strconv.FormatUint(uint64(v), 10) + case uint16: + data = strconv.FormatUint(uint64(v), 10) + case uint32: + data = strconv.FormatUint(uint64(v), 10) + case uint64: + data = strconv.FormatUint(v, 10) + case float32: + data = strconv.FormatFloat(float64(v), 'f', -1, 32) + case float64: + data = strconv.FormatFloat(v, 'f', -1, 64) + case string: + data = v + case []byte: + data = string(v) + default: + data = fmt.Sprintf("%v", v) + } + + return data +} + +func genKeyString(table string, columns []*timodel.ColumnInfo, values []interface{}) string { + var buf strings.Builder + for i, data := range values { + if data == nil { + log.L().Debug("ignore null value", zap.String("column", columns[i].Name.O), zap.String("table", table)) + continue // ignore `null` value. + } + // one column key looks like:`column_val.column_name.` + buf.WriteString(columnValue2String(data)) + buf.WriteString(".") + buf.WriteString(columns[i].Name.O) + buf.WriteString(".") + } + if buf.Len() == 0 { + log.L().Debug("all value are nil, no key generated", zap.String("table", table)) + return "" // all values are `null`. + } + buf.WriteString(table) + return buf.String() +} + +// truncateIndexValues truncate prefix index from data. +func truncateIndexValues( + ctx sessionctx.Context, + ti *timodel.TableInfo, + indexColumns *timodel.IndexInfo, + tiColumns []*timodel.ColumnInfo, + data []interface{}, +) []interface{} { + values := make([]interface{}, 0, len(indexColumns.Columns)) + datums, err := utils.AdjustBinaryProtocolForDatum(ctx, data, tiColumns) + if err != nil { + log.L().Warn("adjust binary protocol for datum error", zap.Error(err)) + return data + } + tablecodec.TruncateIndexValues(ti, indexColumns, datums) + for _, datum := range datums { + values = append(values, datum.GetValue()) + } + return values +} + +func (r *RowChange) getCausalityString(values []interface{}) []string { + pkAndUks := r.identityInfo.AvailableUKIndexList + if len(pkAndUks) == 0 { + // the table has no PK/UK, all values of the row consists the causality key + return []string{genKeyString(r.sourceTable.String(), r.sourceTableInfo.Columns, values)} + } + + ret := make([]string, 0, len(pkAndUks)) + + for _, indexCols := range pkAndUks { + cols, vals := getColsAndValuesOfIdx(r.sourceTableInfo.Columns, indexCols, values) + // handle prefix index + truncVals := truncateIndexValues(r.tiSessionCtx, r.sourceTableInfo, indexCols, cols, vals) + key := genKeyString(r.sourceTable.String(), cols, truncVals) + if len(key) > 0 { // ignore `null` value. + ret = append(ret, key) + } else { + log.L().Debug("ignore empty key", zap.String("table", r.sourceTable.String())) + } + } + + if len(ret) == 0 { + // the table has no PK/UK, or all UK are NULL. all values of the row consists the causality key + return []string{genKeyString(r.sourceTable.String(), r.sourceTableInfo.Columns, values)} + } + + return ret +} diff --git a/pkg/sqlmodel/compact.go b/pkg/sqlmodel/compact.go new file mode 100644 index 00000000000..a986e9a3071 --- /dev/null +++ b/pkg/sqlmodel/compact.go @@ -0,0 +1,150 @@ +// Copyright 2022 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package sqlmodel + +import ( + "fmt" + "strings" + + "go.uber.org/zap" + + "github.com/pingcap/tiflow/dm/pkg/log" +) + +// HasNotNullUniqueIdx returns true when the target table structure has PK or UK whose columns are all NOT NULL. +func (r *RowChange) HasNotNullUniqueIdx() bool { + r.lazyInitIdentityInfo() + + return r.identityInfo.AbsoluteUKIndexInfo != nil +} + +// IdentityValues returns the two group of values that can be used to identify the row. That is to say, if two row +// changes has same IdentityValues, they are changes of the same row. We can use this property to only replicate latest +// changes. +// We always use same index for same table structure to get IdentityValues. +// two groups returned are from preValues and postValues. +func (r *RowChange) IdentityValues() ([]interface{}, []interface{}) { + r.lazyInitIdentityInfo() + + indexInfo := r.identityInfo.AbsoluteUKIndexInfo + if indexInfo == nil { + return r.preValues, r.postValues + } + + pre := make([]interface{}, 0, len(indexInfo.Columns)) + post := make([]interface{}, 0, len(indexInfo.Columns)) + + for _, column := range indexInfo.Columns { + if r.preValues != nil { + pre = append(pre, r.preValues[column.Offset]) + } + if r.postValues != nil { + post = append(post, r.postValues[column.Offset]) + } + } + return pre, post +} + +func (r *RowChange) IsIdentityUpdated() bool { + if r.tp != RowChangeUpdate { + return false + } + + r.lazyInitIdentityInfo() + pre, post := r.IdentityValues() + if len(pre) != len(post) { + // should not happen + return true + } + for i := range pre { + if pre[i] != post[i] { + return true + } + } + return false +} + +// genKey gens key by values e.g. "a.1.b". +func genKey(values []interface{}) string { + builder := new(strings.Builder) + for i, v := range values { + if i != 0 { + builder.WriteString(".") + } + fmt.Fprintf(builder, "%v", v) + } + + return builder.String() +} + +// IdentityKey returns a string generated by IdentityValues. If RowChange.IsIdentityUpdated, the behaviour is undefined. +func (r *RowChange) IdentityKey() string { + pre, post := r.IdentityValues() + if pre != nil { + return genKey(pre) + } + return genKey(post) +} + +// Reduce will merge two row changes of same row into one row changes, e.g., INSERT{1} + UPDATE{1 -> 2} -> INSERT{2}. +// the receiver will be changed in-place. +func (r *RowChange) Reduce(preRowChange *RowChange) { + if r.IdentityKey() != preRowChange.IdentityKey() { + log.L().DPanic("reduce row change failed, identity key not match", + zap.String("preID", preRowChange.IdentityKey()), + zap.String("curID", r.IdentityKey())) + return + } + + // special handle INSERT + DELETE -> DELETE + if r.tp == RowChangeDelete && preRowChange.tp == RowChangeInsert { + return + } + + r.preValues = preRowChange.preValues + r.calculateType() +} + +// Split will split current RowChangeUpdate into two RowChangeDelete and RowChangeInsert one. The behaviour is undefined +// for other types of RowChange. +func (r *RowChange) Split() (*RowChange, *RowChange) { + if r.tp != RowChangeUpdate { + log.L().DPanic("Split should only be called on RowChangeUpdate", + zap.Stringer("row change", r)) + return nil, nil + } + + pre := &RowChange{ + sourceTable: r.sourceTable, + targetTable: r.targetTable, + preValues: r.preValues, + sourceTableInfo: r.sourceTableInfo, + targetTableInfo: r.targetTableInfo, + tiSessionCtx: r.tiSessionCtx, + tp: RowChangeDelete, + identityInfo: r.identityInfo, + } + post := &RowChange{ + sourceTable: r.sourceTable, + targetTable: r.targetTable, + postValues: r.postValues, + sourceTableInfo: r.sourceTableInfo, + targetTableInfo: r.targetTableInfo, + tiSessionCtx: r.tiSessionCtx, + tp: RowChangeInsert, + identityInfo: r.identityInfo, + } + + return pre, post +} diff --git a/pkg/sqlmodel/row_change.go b/pkg/sqlmodel/row_change.go new file mode 100644 index 00000000000..c7253b83feb --- /dev/null +++ b/pkg/sqlmodel/row_change.go @@ -0,0 +1,372 @@ +// Copyright 2022 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package sqlmodel + +import ( + "fmt" + "strings" + + "github.com/pingcap/failpoint" + timodel "github.com/pingcap/tidb/parser/model" + "github.com/pingcap/tidb/sessionctx" + "go.uber.org/zap" + + cdcmodel "github.com/pingcap/tiflow/cdc/model" + "github.com/pingcap/tiflow/dm/pkg/log" + "github.com/pingcap/tiflow/dm/pkg/schema" + "github.com/pingcap/tiflow/dm/pkg/utils" + "github.com/pingcap/tiflow/pkg/quotes" +) + +type RowChangeType int + +// these consts represent types of row change. +const ( + RowChangeNull RowChangeType = iota + RowChangeInsert + RowChangeUpdate + RowChangeDelete +) + +func (t RowChangeType) String() string { + switch t { + case RowChangeInsert: + return "ChangeInsert" + case RowChangeUpdate: + return "ChangeUpdate" + case RowChangeDelete: + return "ChangeDelete" + } + + return "" +} + +// RowChange represents a row change, it can be further converted into DML SQL. +type RowChange struct { + sourceTable *cdcmodel.TableName + targetTable *cdcmodel.TableName + + preValues []interface{} // values to be updated for UPDATE and values to be deleted for DELETE + postValues []interface{} // values to be inserted for INSERT and values updated for UPDATE + + sourceTableInfo *timodel.TableInfo + targetTableInfo *timodel.TableInfo + + tiSessionCtx sessionctx.Context + + tp RowChangeType + identityInfo *schema.DownstreamTableInfo // this field can be set from outside or lazy initialized +} + +// NewRowChange creates a new RowChange. +// These parameters can be nil: +// - targetTable: when same as sourceTable or not applicable +// - preValues: when INSERT +// - postValues: when DELETE +// - targetTableInfo: when same as sourceTableInfo or not applicable +// - tiSessionCtx: use default sessionCtx which is UTC timezone +// The arguments must not be changed after assigned to RowChange, any modification (like convert []byte to string) +// should be done before NewRowChange. +func NewRowChange( + sourceTable *cdcmodel.TableName, + targetTable *cdcmodel.TableName, + preValues []interface{}, + postValues []interface{}, + sourceTableInfo *timodel.TableInfo, + downstreamTableInfo *timodel.TableInfo, + tiCtx sessionctx.Context, +) *RowChange { + ret := &RowChange{ + sourceTable: sourceTable, + preValues: preValues, + postValues: postValues, + sourceTableInfo: sourceTableInfo, + } + + if targetTable != nil { + ret.targetTable = targetTable + } else { + ret.targetTable = sourceTable + } + + if downstreamTableInfo != nil { + ret.targetTableInfo = downstreamTableInfo + } else { + ret.targetTableInfo = sourceTableInfo + } + + if tiCtx != nil { + ret.tiSessionCtx = tiCtx + } else { + ret.tiSessionCtx = utils.NewSessionCtx(nil) + } + + ret.calculateType() + + return ret +} + +func (r *RowChange) calculateType() { + switch { + case r.preValues == nil && r.postValues != nil: + r.tp = RowChangeInsert + case r.preValues != nil && r.postValues != nil: + r.tp = RowChangeUpdate + case r.preValues != nil && r.postValues == nil: + r.tp = RowChangeDelete + default: + log.L().DPanic("preValues and postValues can't both be nil", zap.Stringer("sourceTable", r.sourceTable)) + } +} + +// String implements Stringer interface. +func (r *RowChange) String() string { + return fmt.Sprintf("type: %s, source table: %s, target table: %s, preValues: %v, postValues: %v", + r.tp, r.sourceTable, r.targetTable, r.preValues, r.postValues) +} + +// TargetTableID returns a ID string for target table. +func (r *RowChange) TargetTableID() string { + return r.targetTable.QuoteString() +} + +// SetIdentifyInfo can be used to calculate and cache identityInfo in caller, to avoid every RowChange lazy initialize +// it. +func (r *RowChange) SetIdentifyInfo(info *schema.DownstreamTableInfo) { + r.identityInfo = info +} + +func (r *RowChange) lazyInitIdentityInfo() { + if r.identityInfo != nil { + return + } + + r.identityInfo = schema.GetDownStreamTI(r.targetTableInfo, r.sourceTableInfo) +} + +func getColsAndValuesOfIdx(columns []*timodel.ColumnInfo, indexColumns *timodel.IndexInfo, data []interface{}) ([]*timodel.ColumnInfo, []interface{}) { + cols := make([]*timodel.ColumnInfo, 0, len(indexColumns.Columns)) + values := make([]interface{}, 0, len(indexColumns.Columns)) + for _, column := range indexColumns.Columns { + cols = append(cols, columns[column.Offset]) + values = append(values, data[column.Offset]) + } + + return cols, values +} + +// whereColumnsAndValues returns columns and values to identify the row. +func (r *RowChange) whereColumnsAndValues() ([]string, []interface{}) { + r.lazyInitIdentityInfo() + + uniqueIndex := r.identityInfo.AbsoluteUKIndexInfo + if uniqueIndex == nil { + uniqueIndex = schema.GetIdentityUKByData(r.identityInfo, r.preValues) + } + + columns, values := r.sourceTableInfo.Columns, r.preValues + if uniqueIndex != nil { + columns, values = getColsAndValuesOfIdx(r.sourceTableInfo.Columns, uniqueIndex, values) + } + + columnNames := make([]string, 0, len(columns)) + for _, column := range columns { + columnNames = append(columnNames, column.Name.O) + } + + failpoint.Inject("DownstreamTrackerWhereCheck", func() { + if r.tp == RowChangeUpdate { + log.L().Info("UpdateWhereColumnsCheck", zap.String("Columns", fmt.Sprintf("%v", columnNames))) + } else if r.tp == RowChangeDelete { + log.L().Info("DeleteWhereColumnsCheck", zap.String("Columns", fmt.Sprintf("%v", columnNames))) + } + }) + + return columnNames, values +} + +// genWhere generates where clause for UPDATE and DELETE to identify the row. +// the SQL part is written to `buf` and the args part is returned. +func (r *RowChange) genWhere(buf *strings.Builder) []interface{} { + whereColumns, whereValues := r.whereColumnsAndValues() + + for i, col := range whereColumns { + if i != 0 { + buf.WriteString(" AND ") + } + buf.WriteString(quotes.QuoteName(col)) + if whereValues[i] == nil { + buf.WriteString(" IS ?") + } else { + buf.WriteString(" = ?") + } + } + return whereValues +} + +func (r *RowChange) genDeleteSQL() (string, []interface{}) { + if r.tp != RowChangeDelete && r.tp != RowChangeUpdate { + log.L().DPanic("illegal type for genDeleteSQL", + zap.String("source table", r.sourceTable.String()), + zap.Stringer("change type", r.tp)) + return "", nil + } + + var buf strings.Builder + buf.Grow(1024) + buf.WriteString("DELETE FROM ") + buf.WriteString(r.targetTable.QuoteString()) + buf.WriteString(" WHERE ") + whereArgs := r.genWhere(&buf) + buf.WriteString(" LIMIT 1") + + return buf.String(), whereArgs +} + +func (r *RowChange) genUpdateSQL() (string, []interface{}) { + if r.tp != RowChangeUpdate { + log.L().DPanic("illegal type for genUpdateSQL", + zap.String("source table", r.sourceTable.String()), + zap.Stringer("change type", r.tp)) + return "", nil + } + + var buf strings.Builder + buf.Grow(2048) + buf.WriteString("UPDATE ") + buf.WriteString(r.targetTable.QuoteString()) + buf.WriteString(" SET ") + + args := make([]interface{}, 0, len(r.preValues)+len(r.postValues)) + for i, col := range r.sourceTableInfo.Columns { + if col.IsGenerated() { + continue + } + + if i == len(r.sourceTableInfo.Columns)-1 { + fmt.Fprintf(&buf, "%s = ?", quotes.QuoteName(col.Name.O)) + } else { + fmt.Fprintf(&buf, "%s = ?, ", quotes.QuoteName(col.Name.O)) + } + args = append(args, r.postValues[i]) + } + + buf.WriteString(" WHERE ") + whereArgs := r.genWhere(&buf) + buf.WriteString(" LIMIT 1") + + args = append(args, whereArgs...) + return buf.String(), args +} + +func (r *RowChange) genInsertSQL(tp DMLType) (string, []interface{}) { + if r.tp != RowChangeInsert && r.tp != RowChangeUpdate { + log.L().DPanic("illegal type for genInsertSQL", + zap.String("source table", r.sourceTable.String()), + zap.Stringer("change type", r.tp)) + return "", nil + } + + var buf strings.Builder + buf.Grow(1024) + if tp == DMLReplace { + buf.WriteString("REPLACE INTO ") + } else { + buf.WriteString("INSERT INTO ") + } + buf.WriteString(r.targetTable.QuoteString()) + buf.WriteString(" (") + + columnNum := 0 + for i, col := range r.sourceTableInfo.Columns { + if col.IsGenerated() { + continue + } + + columnNum++ + buf.WriteString(quotes.QuoteName(col.Name.O)) + if i != len(r.sourceTableInfo.Columns)-1 { + buf.WriteByte(',') + } else { + buf.WriteByte(')') + } + } + + buf.WriteString(" VALUES (") + + // placeholders + i := 0 + for ; i < columnNum-1; i++ { + buf.WriteString("?,") + } + if i == columnNum-1 { + buf.WriteString("?)") + } + + if tp == DMLInsertOnDuplicateUpdate { + buf.WriteString(" ON DUPLICATE KEY UPDATE ") + for j, col := range r.sourceTableInfo.Columns { + colName := quotes.QuoteName(col.Name.O) + buf.WriteString(colName + "=VALUES(" + colName + ")") + if j != len(r.sourceTableInfo.Columns)-1 { + buf.WriteByte(',') + } + } + } + return buf.String(), r.postValues +} + +type DMLType int + +// these consts represent types of row change. +const ( + DMLNull DMLType = iota + DMLInsert + DMLReplace + DMLInsertOnDuplicateUpdate + DMLUpdate + DMLDelete +) + +func (t DMLType) String() string { + switch t { + case DMLInsert: + return "DMLInsert" + case DMLReplace: + return "DMLReplace" + case DMLUpdate: + return "DMLUpdate" + case DMLInsertOnDuplicateUpdate: + return "DMLInsertOnDuplicateUpdate" + case DMLDelete: + return "DMLDelete" + } + + return "" +} + +func (r *RowChange) GenSQL(tp DMLType) (string, []interface{}) { + switch tp { + case DMLInsert, DMLReplace, DMLInsertOnDuplicateUpdate: + return r.genInsertSQL(tp) + case DMLUpdate: + return r.genUpdateSQL() + case DMLDelete: + return r.genDeleteSQL() + } + log.L().DPanic("illegal type for GenSQL", + zap.String("source table", r.sourceTable.String()), + zap.Stringer("DML type", tp)) + return "", nil +}