Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

syncer(dm): use DML library #4313

Merged
merged 16 commits into from
Feb 11, 2022
4 changes: 2 additions & 2 deletions dm/pkg/schema/tracker.go
Original file line number Diff line number Diff line change
Expand Up @@ -397,13 +397,13 @@ func (tr *Tracker) GetDownStreamTableInfo(tctx *tcontext.Context, tableID string
dti, ok := tr.dsTracker.tableInfos[tableID]
if !ok {
tctx.Logger.Info("Downstream schema tracker init. ", zap.String("tableID", tableID))
ti, err := tr.getTableInfoByCreateStmt(tctx, tableID)
downstreamTI, err := tr.getTableInfoByCreateStmt(tctx, tableID)
if err != nil {
tctx.Logger.Error("Init dowstream schema info error. ", zap.String("tableID", tableID), zap.Error(err))
return nil, err
}

dti = GetDownStreamTI(ti, originTi)
dti = GetDownStreamTI(downstreamTI, originTi)
tr.dsTracker.tableInfos[tableID] = dti
}
return dti, nil
Expand Down
7 changes: 4 additions & 3 deletions dm/syncer/causality.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"go.uber.org/zap"

"github.com/pingcap/tidb/sessionctx"

"github.com/pingcap/tiflow/dm/pkg/log"
"github.com/pingcap/tiflow/dm/syncer/metrics"
)
Expand Down Expand Up @@ -79,16 +80,16 @@ func (c *causality) run() {
c.relation.gc(j.flushSeq)
continue
default:
keys := j.dml.identifyKeys(c.sessCtx)
keys := j.dml.CausalityKeys()

// detectConflict before add
if c.detectConflict(keys) {
c.logger.Debug("meet causality key, will generate a conflict job to flush all sqls", zap.Strings("keys", keys))
c.outCh <- newConflictJob(c.workerCount)
c.relation.clear()
}
j.dml.key = c.add(keys)
c.logger.Debug("key for keys", zap.String("key", j.dml.key), zap.Strings("keys", keys))
j.dmlQueueKey = c.add(keys)
c.logger.Debug("key for keys", zap.String("key", j.dmlQueueKey), zap.Strings("keys", keys))
}
metrics.ConflictDetectDurationHistogram.WithLabelValues(c.task, c.source).Observe(time.Since(startTime).Seconds())

Expand Down
143 changes: 23 additions & 120 deletions dm/syncer/causality_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,21 +15,19 @@ package syncer

import (
"math"
"testing"
"time"

. "github.com/pingcap/check"
"github.com/pingcap/tidb-tools/pkg/filter"
"github.com/pingcap/tidb/parser"
"github.com/pingcap/tidb/parser/model"
"github.com/pingcap/tidb/parser/types"
"github.com/pingcap/tidb/util/mock"
"github.com/stretchr/testify/require"

cdcmodel "github.com/pingcap/tiflow/cdc/model"
"github.com/pingcap/tiflow/dm/dm/config"
"github.com/pingcap/tiflow/dm/pkg/binlog"
tcontext "github.com/pingcap/tiflow/dm/pkg/context"
"github.com/pingcap/tiflow/dm/pkg/log"
"github.com/pingcap/tiflow/dm/pkg/schema"
"github.com/pingcap/tiflow/dm/pkg/utils"
"github.com/pingcap/tiflow/pkg/sqlmodel"
)

func (s *testSyncerSuite) TestDetectConflict(c *C) {
Expand Down Expand Up @@ -65,26 +63,11 @@ func (s *testSyncerSuite) TestDetectConflict(c *C) {
c.Assert(ca.relation.len(), Equals, 0)
}

func (s *testSyncerSuite) TestCasuality(c *C) {
p := parser.New()
se := mock.NewContext()
func TestCausality(t *testing.T) {
t.Parallel()

schemaStr := "create table tb(a int primary key, b int unique);"
ti, err := createTableInfo(p, se, int64(0), schemaStr)
c.Assert(err, IsNil)
tiIndex := &model.IndexInfo{
Table: ti.Name,
Unique: true,
Primary: true,
State: model.StatePublic,
Tp: model.IndexTypeBtree,
Columns: []*model.IndexColumn{{
Name: ti.Columns[0].Name,
Offset: ti.Columns[0].Offset,
Length: types.UnspecifiedLength,
}},
}
downTi := schema.GetDownStreamTI(ti, ti)
c.Assert(downTi, NotNil)
ti := mockTableInfo(t, schemaStr)

jobCh := make(chan *job, 10)
syncer := &Syncer{
Expand All @@ -100,124 +83,44 @@ func (s *testSyncerSuite) TestCasuality(c *C) {
}
causalityCh := causalityWrap(jobCh, syncer)
testCases := []struct {
op opType
oldVals []interface{}
vals []interface{}
preVals []interface{}
postVals []interface{}
}{
{
op: insert,
vals: []interface{}{1, 2},
postVals: []interface{}{1, 2},
},
{
op: insert,
vals: []interface{}{2, 3},
postVals: []interface{}{2, 3},
},
{
op: update,
oldVals: []interface{}{2, 3},
vals: []interface{}{3, 4},
preVals: []interface{}{2, 3},
postVals: []interface{}{3, 4},
},
{
op: del,
vals: []interface{}{1, 2},
preVals: []interface{}{1, 2},
},
{
op: insert,
vals: []interface{}{1, 3},
postVals: []interface{}{1, 3},
},
}
results := []opType{insert, insert, update, del, conflict, insert}
table := &filter.Table{Schema: "test", Name: "t1"}
results := []opType{dml, dml, dml, dml, conflict, dml}
table := &cdcmodel.TableName{Schema: "test", Table: "t1"}
location := binlog.NewLocation("")
ec := &eventContext{startLocation: &location, currentLocation: &location, lastLocation: &location}

for _, tc := range testCases {
job := newDMLJob(tc.op, table, table, newDML(tc.op, false, "", table, tc.oldVals, tc.vals, tc.oldVals, tc.vals, ti.Columns, ti, tiIndex, downTi), ec)
change := sqlmodel.NewRowChange(table, nil, tc.preVals, tc.postVals, ti, nil, nil)
job := newDMLJob(change, ec)
jobCh <- job
}

c.Assert(utils.WaitSomething(30, 100*time.Millisecond, func() bool {
require.Eventually(t, func() bool {
return len(causalityCh) == len(results)
}), IsTrue)
}, 3*time.Second, 100*time.Millisecond)

for _, op := range results {
job := <-causalityCh
c.Assert(job.tp, Equals, op)
}
}

func (s *testSyncerSuite) TestCasualityWithPrefixIndex(c *C) {
p := parser.New()
se := mock.NewContext()
schemaStr := "create table t (c1 text, c2 int unique, unique key c1(c1(3)));"
ti, err := createTableInfo(p, se, int64(0), schemaStr)
c.Assert(err, IsNil)
downTi := schema.GetDownStreamTI(ti, ti)
c.Assert(downTi, NotNil)
c.Assert(len(downTi.AvailableUKIndexList) == 2, IsTrue)
tiIndex := downTi.AvailableUKIndexList[0]

jobCh := make(chan *job, 10)
syncer := &Syncer{
cfg: &config.SubTaskConfig{
SyncerConfig: config.SyncerConfig{
QueueSize: 1024,
},
Name: "task",
SourceID: "source",
},
tctx: tcontext.Background().WithLogger(log.L()),
sessCtx: utils.NewSessionCtx(map[string]string{"time_zone": "UTC"}),
}
causalityCh := causalityWrap(jobCh, syncer)
testCases := []struct {
op opType
oldVals []interface{}
vals []interface{}
}{
{
op: insert,
vals: []interface{}{"1234", 1},
},
{
op: insert,
vals: []interface{}{"2345", 2},
},
{
op: update,
oldVals: []interface{}{"2345", 2},
vals: []interface{}{"2345", 3},
},
{
op: del,
vals: []interface{}{"1234", 1},
},
{
op: insert,
vals: []interface{}{"2345", 1},
},
}
results := []opType{insert, insert, update, del, conflict, insert}
resultKeys := []string{"123.c1.", "234.c1.", "234.c1.", "123.c1.", "conflict", "234.c1."}
table := &filter.Table{Schema: "test", Name: "t1"}
location := binlog.NewLocation("")
ec := &eventContext{startLocation: &location, currentLocation: &location, lastLocation: &location}

for _, tc := range testCases {
job := newDMLJob(tc.op, table, table, newDML(tc.op, false, "", table, tc.oldVals, tc.vals, tc.oldVals, tc.vals, ti.Columns, ti, tiIndex, downTi), ec)
jobCh <- job
}

c.Assert(utils.WaitSomething(30, 100*time.Millisecond, func() bool {
return len(causalityCh) == len(results)
}), IsTrue)

for i, op := range results {
job := <-causalityCh
if job.tp != conflict {
c.Assert(job.dml.key, Equals, resultKeys[i])
}
c.Assert(job.tp, Equals, op)
require.Equal(t, op, job.tp)
}
}

Expand Down
12 changes: 6 additions & 6 deletions dm/syncer/checkpoint_flush_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,11 +37,11 @@ type checkpointFlushTask struct {
}

type checkpointFlushWorker struct {
input chan *checkpointFlushTask
cp CheckPoint
execError *atomic.Error
afterFlushFn func(task *checkpointFlushTask) error
addCountFunc func(bool, string, opType, int64, *filter.Table)
input chan *checkpointFlushTask
cp CheckPoint
execError *atomic.Error
afterFlushFn func(task *checkpointFlushTask) error
updateJobMetricsFn func(bool, string, *job)
}

// Add add a new flush checkpoint job.
Expand All @@ -60,7 +60,7 @@ func (w *checkpointFlushWorker) Run(ctx *tcontext.Context) {
if isAsyncFlush {
task.asyncflushJob.flushWg.Wait()

w.addCountFunc(true, adminQueueName, task.asyncflushJob.tp, 1, task.asyncflushJob.targetTable)
w.updateJobMetricsFn(true, adminQueueName, task.asyncflushJob)
ctx.L().Info("async flush checkpoint snapshot job has been processed by dml worker, about to flush checkpoint snapshot", zap.Int64("job sequence", task.asyncflushJob.flushSeq), zap.Int("snapshot_id", task.snapshotInfo.id))
} else {
ctx.L().Info("about to sync flush checkpoint snapshot", zap.Int("snapshot_id", task.snapshotInfo.id))
Expand Down
Loading