Skip to content

Commit

Permalink
save work
Browse files Browse the repository at this point in the history
  • Loading branch information
lance6716 committed Jan 16, 2022
1 parent 750816f commit a2ac57d
Show file tree
Hide file tree
Showing 14 changed files with 961 additions and 164 deletions.
27 changes: 16 additions & 11 deletions dm/pkg/schema/tracker.go
Original file line number Diff line number Diff line change
Expand Up @@ -381,13 +381,13 @@ func (tr *Tracker) GetDownStreamTableInfo(tctx *tcontext.Context, tableID string
dti, ok := tr.dsTracker.tableInfos[tableID]
if !ok {
tctx.Logger.Info("Downstream schema tracker init. ", zap.String("tableID", tableID))
ti, err := tr.getTableInfoByCreateStmt(tctx, tableID)
downstreamTI, err := tr.getTableInfoByCreateStmt(tctx, tableID)
if err != nil {
tctx.Logger.Error("Init dowstream schema info error. ", zap.String("tableID", tableID), zap.Error(err))
return nil, err
}

dti = GetDownStreamTi(ti, originTi)
dti = GetDownStreamTI(downstreamTI, originTi)
tr.dsTracker.tableInfos[tableID] = dti
}
return dti, nil
Expand All @@ -396,17 +396,22 @@ func (tr *Tracker) GetDownStreamTableInfo(tctx *tcontext.Context, tableID string
// GetAvailableDownStreamUKIndexInfo gets available downstream UK whose data is not null.
// note. this function will not init downstreamTrack.
func (tr *Tracker) GetAvailableDownStreamUKIndexInfo(tableID string, data []interface{}) *model.IndexInfo {
dti, ok := tr.dsTracker.tableInfos[tableID]
dti := tr.dsTracker.tableInfos[tableID]

return GetIdentityUKByData(dti, data)
}

if !ok || len(dti.AvailableUKIndexList) == 0 {
// GetIdentityUKByData gets available downstream UK whose data is not null.
func GetIdentityUKByData(downstreamTI *DownstreamTableInfo, data []interface{}) *model.IndexInfo {
if downstreamTI == nil || len(downstreamTI.AvailableUKIndexList) == 0 {
return nil
}
// func for check data is not null
fn := func(i int) bool {
return data[i] != nil
}

for _, uk := range dti.AvailableUKIndexList {
for _, uk := range downstreamTI.AvailableUKIndexList {
// check uk's column data is not null
if isSpecifiedIndexColumn(uk, fn) {
return uk
Expand Down Expand Up @@ -483,8 +488,8 @@ func (tr *Tracker) initDownStreamSQLModeAndParser(tctx *tcontext.Context) error
return nil
}

// GetDownStreamTi constructs downstreamTable index cache by tableinfo.
func GetDownStreamTi(ti *model.TableInfo, originTi *model.TableInfo) *DownstreamTableInfo {
// GetDownStreamTI constructs downstreamTable index cache by tableinfo.
func GetDownStreamTI(downstreamTI *model.TableInfo, originTi *model.TableInfo) *DownstreamTableInfo {
var (
absoluteUKIndexInfo *model.IndexInfo
availableUKIndexList = []*model.IndexInfo{}
Expand All @@ -494,10 +499,10 @@ func GetDownStreamTi(ti *model.TableInfo, originTi *model.TableInfo) *Downstream

// func for check not null constraint
fn := func(i int) bool {
return mysql.HasNotNullFlag(ti.Columns[i].Flag)
return mysql.HasNotNullFlag(downstreamTI.Columns[i].Flag)
}

for i, idx := range ti.Indices {
for i, idx := range downstreamTI.Indices {
if !idx.Primary && !idx.Unique {
continue
}
Expand All @@ -520,7 +525,7 @@ func GetDownStreamTi(ti *model.TableInfo, originTi *model.TableInfo) *Downstream
// handle pk exceptional case.
// e.g. "create table t(a int primary key, b int)".
if !hasPk {
exPk := redirectIndexKeys(handlePkExCase(ti), originTi)
exPk := redirectIndexKeys(handlePkExCase(downstreamTI), originTi)
if exPk != nil {
absoluteUKIndexInfo = exPk
absoluteUKPosition = len(availableUKIndexList)
Expand All @@ -534,7 +539,7 @@ func GetDownStreamTi(ti *model.TableInfo, originTi *model.TableInfo) *Downstream
}

return &DownstreamTableInfo{
TableInfo: ti,
TableInfo: downstreamTI,
AbsoluteUKIndexInfo: absoluteUKIndexInfo,
AvailableUKIndexList: availableUKIndexList,
}
Expand Down
7 changes: 4 additions & 3 deletions dm/syncer/causality.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"go.uber.org/zap"

"github.com/pingcap/tidb/sessionctx"

"github.com/pingcap/tiflow/dm/pkg/log"
"github.com/pingcap/tiflow/dm/syncer/metrics"
)
Expand Down Expand Up @@ -79,16 +80,16 @@ func (c *causality) run() {
c.relation.gc(j.flushSeq)
continue
default:
keys := j.dml.identifyKeys(c.sessCtx)
keys := j.dml.CausalityKeys()

// detectConflict before add
if c.detectConflict(keys) {
c.logger.Debug("meet causality key, will generate a conflict job to flush all sqls", zap.Strings("keys", keys))
c.outCh <- newConflictJob(c.workerCount)
c.relation.clear()
}
j.dml.key = c.add(keys)
c.logger.Debug("key for keys", zap.String("key", j.dml.key), zap.Strings("keys", keys))
j.dmlQueueKey = c.add(keys)
c.logger.Debug("key for keys", zap.String("key", j.dmlQueueKey), zap.Strings("keys", keys))
}
metrics.ConflictDetectDurationHistogram.WithLabelValues(c.task, c.source).Observe(time.Since(startTime).Seconds())

Expand Down
4 changes: 2 additions & 2 deletions dm/syncer/causality_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ func (s *testSyncerSuite) TestCasuality(c *C) {
Length: types.UnspecifiedLength,
}},
}
downTi := schema.GetDownStreamTi(ti, ti)
downTi := schema.GetDownStreamTI(ti, ti)
c.Assert(downTi, NotNil)

jobCh := make(chan *job, 10)
Expand Down Expand Up @@ -152,7 +152,7 @@ func (s *testSyncerSuite) TestCasualityWithPrefixIndex(c *C) {
schemaStr := "create table t (c1 text, c2 int unique, unique key c1(c1(3)));"
ti, err := createTableInfo(p, se, int64(0), schemaStr)
c.Assert(err, IsNil)
downTi := schema.GetDownStreamTi(ti, ti)
downTi := schema.GetDownStreamTI(ti, ti)
c.Assert(downTi, NotNil)
c.Assert(len(downTi.AvailableUKIndexList) == 2, IsTrue)
tiIndex := downTi.AvailableUKIndexList[0]
Expand Down
25 changes: 11 additions & 14 deletions dm/syncer/compactor.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,17 +89,17 @@ func (c *compactor) run() {

// set safeMode when receive first job
if len(c.buffer) == 0 {
c.safeMode = j.dml.safeMode
c.safeMode = j.safeMode
}
// if dml has no PK/NOT NULL UK, do not compact it.
if j.dml.identifyColumns() == nil {
if !j.dml.HasNotNullUniqueIdx() {
c.buffer = append(c.buffer, j)
continue
}

// if update job update its identify keys, turn it into delete + insert
if j.dml.op == update && j.dml.updateIdentify() {
delDML, insertDML := updateToDelAndInsert(j.dml)
if j.dml.IsIdentityUpdated() {
delDML, insertDML := j.dml.Split()
delJob := j.clone()
delJob.tp = del
delJob.dml = delDML
Expand Down Expand Up @@ -142,7 +142,7 @@ func (c *compactor) flushBuffer() {
if j != nil {
// set safemode for all jobs by first job in buffer.
// or safemode for insert(delete + insert = insert with safemode)
j.dml.safeMode = c.safeMode || j.dml.safeMode
j.safeMode = c.safeMode || j.safeMode
c.outCh <- j
}
}
Expand All @@ -162,7 +162,7 @@ func (c *compactor) flushBuffer() {
// DELETE + UPDATE => X _|
// .
func (c *compactor) compactJob(j *job) {
tableName := j.dml.targetTableID
tableName := j.dml.TargetTableID()
tableKeyMap, ok := c.keyMap[tableName]
if !ok {
// do not alloc a large buffersize, otherwise if the downstream latency is low
Expand All @@ -171,7 +171,7 @@ func (c *compactor) compactJob(j *job) {
tableKeyMap = c.keyMap[tableName]
}

key := j.dml.identifyKey()
key := j.dml.IdentityKey()

failpoint.Inject("DownstreamIdentifyKeyCheckInCompact", func(v failpoint.Value) {
value, err := strconv.Atoi(key)
Expand All @@ -197,20 +197,17 @@ func (c *compactor) compactJob(j *job) {
if prevJob.tp == insert {
// INSERT + UPDATE => INSERT
j.tp = insert
j.dml.oldValues = nil
j.dml.originOldValues = nil
j.dml.op = insert
j.dml.Reduce(prevJob.dml)
// DELETE + INSERT + UPDATE => INSERT with safemode
j.dml.safeMode = prevJob.dml.safeMode
j.safeMode = prevJob.safeMode
} else if prevJob.tp == update {
// UPDATE + UPDATE => UPDATE
j.dml.oldValues = prevJob.dml.oldValues
j.dml.originOldValues = prevJob.dml.originOldValues
j.dml.Reduce(prevJob.dml)
}
case insert:
if prevJob.tp == del {
// DELETE + INSERT => INSERT with safemode
j.dml.safeMode = true
j.safeMode = true
}
case del:
// do nothing because anything + DELETE => DELETE
Expand Down
4 changes: 2 additions & 2 deletions dm/syncer/compactor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ func (s *testSyncerSuite) TestCompactJob(c *C) {
Length: types.UnspecifiedLength,
}},
}
downTi := schema.GetDownStreamTi(ti, ti)
downTi := schema.GetDownStreamTI(ti, ti)
c.Assert(downTi, NotNil)

var dml *DML
Expand Down Expand Up @@ -208,7 +208,7 @@ func (s *testSyncerSuite) TestCompactorSafeMode(c *C) {
Length: types.UnspecifiedLength,
}},
}
downTi := schema.GetDownStreamTi(ti, ti)
downTi := schema.GetDownStreamTI(ti, ti)
c.Assert(downTi, NotNil)

testCases := []struct {
Expand Down
Loading

0 comments on commit a2ac57d

Please sign in to comment.