Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

dm/syncer: multiple rows use downstream schema #3308

Merged
merged 10 commits into from
Nov 9, 2021
Merged
50 changes: 28 additions & 22 deletions dm/pkg/schema/tracker.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,14 +72,14 @@ type Tracker struct {
type downstreamTracker struct {
downstreamConn *dbconn.DBConn // downstream connection
stmtParser *parser.Parser // statement parser
tableInfos map[string]*downstreamTableInfo // downstream table infos
tableInfos map[string]*DownstreamTableInfo // downstream table infos
}

// downstreamTableInfo contains tableinfo and index cache.
type downstreamTableInfo struct {
tableInfo *model.TableInfo // tableInfo which comes from parse create statement syntaxtree
indexCache *model.IndexInfo // index cache include pk/uk(not null)
availableUKCache []*model.IndexInfo // index cache include uks(data not null)
// DownstreamTableInfo contains tableinfo and index cache.
type DownstreamTableInfo struct {
TableInfo *model.TableInfo // tableInfo which comes from parse create statement syntaxtree
IndexCache *model.IndexInfo // index cache include pk/uk(not null)
WizardXiao marked this conversation as resolved.
Show resolved Hide resolved
AvailableUKCache []*model.IndexInfo // index cache include uks(data not null)
}

// NewTracker creates a new tracker. `sessionCfg` will be set as tracker's session variables if specified, or retrieve
Expand Down Expand Up @@ -181,7 +181,7 @@ func NewTracker(ctx context.Context, task string, sessionCfg map[string]string,
// init downstreamTracker
dsTracker := &downstreamTracker{
downstreamConn: downstreamConn,
tableInfos: make(map[string]*downstreamTableInfo),
tableInfos: make(map[string]*DownstreamTableInfo),
}

return &Tracker{
Expand Down Expand Up @@ -377,42 +377,42 @@ func (tr *Tracker) GetSystemVar(name string) (string, bool) {

// GetDownStreamIndexInfo gets downstream PK/UK(not null) Index.
// note. this function will init downstreamTrack's table info.
WizardXiao marked this conversation as resolved.
Show resolved Hide resolved
func (tr *Tracker) GetDownStreamIndexInfo(tctx *tcontext.Context, tableID string, originTi *model.TableInfo) (*model.IndexInfo, error) {
func (tr *Tracker) GetDownStreamIndexAndTableInfo(tctx *tcontext.Context, tableID string, originTi *model.TableInfo) (*model.IndexInfo, *DownstreamTableInfo, error) {
WizardXiao marked this conversation as resolved.
Show resolved Hide resolved
dti, ok := tr.dsTracker.tableInfos[tableID]
if !ok {
tctx.Logger.Info("Downstream schema tracker init. ", zap.String("tableID", tableID))
ti, err := tr.getTableInfoByCreateStmt(tctx, tableID)
if err != nil {
tctx.Logger.Error("Init dowstream schema info error. ", zap.String("tableID", tableID), zap.Error(err))
return nil, err
return nil, nil, err
}

dti = getDownStreamTi(ti, originTi)
dti = GetDownStreamTi(ti, originTi)
tr.dsTracker.tableInfos[tableID] = dti
}
return dti.indexCache, nil
return dti.IndexCache, dti, nil
}

// GetAvailableDownStreamUKIndexInfo gets available downstream UK whose data is not null.
// note. this function will not init downstreamTrack.
func (tr *Tracker) GetAvailableDownStreamUKIndexInfo(tableID string, data []interface{}) *model.IndexInfo {
dti, ok := tr.dsTracker.tableInfos[tableID]

if !ok || len(dti.availableUKCache) == 0 {
if !ok || len(dti.AvailableUKCache) == 0 {
return nil
}
// func for check data is not null
fn := func(i int) bool {
return data[i] != nil
}

for i, uk := range dti.availableUKCache {
for _, uk := range dti.AvailableUKCache {
// check uk's column data is not null
if isSpecifiedIndexColumn(uk, fn) {
if i != 0 {
// exchange available uk to the first of the array to reduce judgements for next row
dti.availableUKCache[0], dti.availableUKCache[i] = dti.availableUKCache[i], dti.availableUKCache[0]
}
// if i != 0 {
WizardXiao marked this conversation as resolved.
Show resolved Hide resolved
// // exchange available uk to the first of the array to reduce judgements for next row
// dti.availableUKCache[0], dti.availableUKCache[i] = dti.availableUKCache[i], dti.availableUKCache[0]
// }
WizardXiao marked this conversation as resolved.
Show resolved Hide resolved
return uk
}
}
Expand Down Expand Up @@ -488,7 +488,7 @@ func (tr *Tracker) initDownStreamSQLModeAndParser(tctx *tcontext.Context) error
}

// getDownStreamTi constructs downstreamTable index cache by tableinfo.
func getDownStreamTi(ti *model.TableInfo, originTi *model.TableInfo) *downstreamTableInfo {
func GetDownStreamTi(ti *model.TableInfo, originTi *model.TableInfo) *DownstreamTableInfo {
var (
indexCache *model.IndexInfo
availableUKCache = make([]*model.IndexInfo, 0, len(ti.Indices))
Expand All @@ -509,6 +509,9 @@ func getDownStreamTi(ti *model.TableInfo, originTi *model.TableInfo) *downstream
continue
}
if idx.Primary {
if indexCache != nil {
availableUKCache = append(availableUKCache, indexCache)
}
WizardXiao marked this conversation as resolved.
Show resolved Hide resolved
indexCache = indexRedirect
hasPk = true
} else if idx.Unique {
Expand All @@ -526,14 +529,17 @@ func getDownStreamTi(ti *model.TableInfo, originTi *model.TableInfo) *downstream
if !hasPk {
exPk := redirectIndexKeys(handlePkExCase(ti), originTi)
if exPk != nil {
if indexCache != nil {
availableUKCache = append(availableUKCache, indexCache)
}
indexCache = exPk
}
}

return &downstreamTableInfo{
tableInfo: ti,
indexCache: indexCache,
availableUKCache: availableUKCache,
return &DownstreamTableInfo{
TableInfo: ti,
IndexCache: indexCache,
AvailableUKCache: availableUKCache,
}
}

Expand Down
58 changes: 29 additions & 29 deletions dm/pkg/schema/tracker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -603,7 +603,7 @@ func (s *trackerSuite) TestGetDownStreamIndexInfo(c *C) {
mock.ExpectQuery("SHOW CREATE TABLE " + tableID).WillReturnRows(
sqlmock.NewRows([]string{"Table", "Create Table"}).
AddRow("test", "create table t(a int, b int, c varchar(10))"))
indexinfo, err := tracker.GetDownStreamIndexInfo(tcontext.Background(), tableID, oriTi)
indexinfo, _, err := tracker.GetDownStreamIndexAndTableInfo(tcontext.Background(), tableID, oriTi)
c.Assert(err, IsNil)
_, ok := tracker.dsTracker.tableInfos[tableID]
c.Assert(ok, IsTrue)
Expand All @@ -614,7 +614,7 @@ func (s *trackerSuite) TestGetDownStreamIndexInfo(c *C) {
mock.ExpectQuery("SHOW CREATE TABLE " + tableID).WillReturnRows(
sqlmock.NewRows([]string{"Table", "Create Table"}).
AddRow("test", "create table t(a int, b int, c varchar(10), PRIMARY KEY (c))"))
indexinfo, err = tracker.GetDownStreamIndexInfo(tcontext.Background(), tableID, oriTi)
indexinfo, _, err = tracker.GetDownStreamIndexAndTableInfo(tcontext.Background(), tableID, oriTi)
c.Assert(err, IsNil)
_, ok = tracker.dsTracker.tableInfos[tableID]
c.Assert(ok, IsTrue)
Expand All @@ -624,7 +624,7 @@ func (s *trackerSuite) TestGetDownStreamIndexInfo(c *C) {
mock.ExpectQuery("SHOW CREATE TABLE " + tableID).WillReturnRows(
sqlmock.NewRows([]string{"Table", "Create Table"}).
AddRow("test", "create table t(a int primary key, b int, c varchar(10))"))
indexinfo, err = tracker.GetDownStreamIndexInfo(tcontext.Background(), tableID, oriTi)
indexinfo, _, err = tracker.GetDownStreamIndexAndTableInfo(tcontext.Background(), tableID, oriTi)
c.Assert(err, IsNil)
_, ok = tracker.dsTracker.tableInfos[tableID]
c.Assert(ok, IsTrue)
Expand All @@ -635,7 +635,7 @@ func (s *trackerSuite) TestGetDownStreamIndexInfo(c *C) {
mock.ExpectQuery("SHOW CREATE TABLE " + tableID).WillReturnRows(
sqlmock.NewRows([]string{"Table", "Create Table"}).
AddRow("test", "create table t(a int, b int, c varchar(10), PRIMARY KEY (a,b))"))
indexinfo, err = tracker.GetDownStreamIndexInfo(tcontext.Background(), tableID, oriTi)
indexinfo, _, err = tracker.GetDownStreamIndexAndTableInfo(tcontext.Background(), tableID, oriTi)
c.Assert(err, IsNil)
_, ok = tracker.dsTracker.tableInfos[tableID]
c.Assert(ok, IsTrue)
Expand All @@ -646,7 +646,7 @@ func (s *trackerSuite) TestGetDownStreamIndexInfo(c *C) {
mock.ExpectQuery("SHOW CREATE TABLE " + tableID).WillReturnRows(
sqlmock.NewRows([]string{"Table", "Create Table"}).
AddRow("test", "create table t(a int unique not null, b int, c varchar(10))"))
indexinfo, err = tracker.GetDownStreamIndexInfo(tcontext.Background(), tableID, oriTi)
indexinfo, _, err = tracker.GetDownStreamIndexAndTableInfo(tcontext.Background(), tableID, oriTi)
c.Assert(err, IsNil)
_, ok = tracker.dsTracker.tableInfos[tableID]
c.Assert(ok, IsTrue)
Expand All @@ -657,31 +657,31 @@ func (s *trackerSuite) TestGetDownStreamIndexInfo(c *C) {
mock.ExpectQuery("SHOW CREATE TABLE " + tableID).WillReturnRows(
sqlmock.NewRows([]string{"Table", "Create Table"}).
AddRow("test", "create table t(a int unique, b int, c varchar(10))"))
indexinfo, err = tracker.GetDownStreamIndexInfo(tcontext.Background(), tableID, oriTi)
indexinfo, _, err = tracker.GetDownStreamIndexAndTableInfo(tcontext.Background(), tableID, oriTi)
c.Assert(err, IsNil)
dti, ok := tracker.dsTracker.tableInfos[tableID]
c.Assert(ok, IsTrue)
c.Assert(indexinfo, IsNil)
c.Assert(dti.availableUKCache, NotNil)
c.Assert(dti.AvailableUKCache, NotNil)
delete(tracker.dsTracker.tableInfos, tableID)

// downstream has uks
mock.ExpectQuery("SHOW CREATE TABLE " + tableID).WillReturnRows(
sqlmock.NewRows([]string{"Table", "Create Table"}).
AddRow("test", "create table t(a int unique, b int unique, c varchar(10) unique not null)"))
indexinfo, err = tracker.GetDownStreamIndexInfo(tcontext.Background(), tableID, oriTi)
indexinfo, _, err = tracker.GetDownStreamIndexAndTableInfo(tcontext.Background(), tableID, oriTi)
c.Assert(err, IsNil)
dti, ok = tracker.dsTracker.tableInfos[tableID]
c.Assert(ok, IsTrue)
c.Assert(indexinfo, NotNil)
c.Assert(len(dti.availableUKCache) == 2, IsTrue)
c.Assert(len(dti.AvailableUKCache) == 2, IsTrue)
delete(tracker.dsTracker.tableInfos, tableID)

// downstream has pk and uk, pk has priority
mock.ExpectQuery("SHOW CREATE TABLE " + tableID).WillReturnRows(
sqlmock.NewRows([]string{"Table", "Create Table"}).
AddRow("test", "create table t(a int unique not null , b int, c varchar(10), PRIMARY KEY (c))"))
indexinfo, err = tracker.GetDownStreamIndexInfo(tcontext.Background(), tableID, oriTi)
indexinfo, _, err = tracker.GetDownStreamIndexAndTableInfo(tcontext.Background(), tableID, oriTi)
c.Assert(err, IsNil)
c.Assert(indexinfo.Primary, IsTrue)
delete(tracker.dsTracker.tableInfos, tableID)
Expand All @@ -690,7 +690,7 @@ func (s *trackerSuite) TestGetDownStreamIndexInfo(c *C) {
mock.ExpectQuery("SHOW CREATE TABLE " + tableID).WillReturnRows(
sqlmock.NewRows([]string{"Table", "Create Table"}).
AddRow("test", "create table t(a int , d int PRIMARY KEY, c varchar(10), b int unique not null)"))
indexinfo, err = tracker.GetDownStreamIndexInfo(tcontext.Background(), tableID, oriTi)
indexinfo, _, err = tracker.GetDownStreamIndexAndTableInfo(tcontext.Background(), tableID, oriTi)
c.Assert(err, IsNil)
c.Assert(indexinfo, NotNil)
c.Assert(indexinfo.Primary, IsFalse)
Expand All @@ -699,18 +699,18 @@ func (s *trackerSuite) TestGetDownStreamIndexInfo(c *C) {
mock.ExpectQuery("SHOW CREATE TABLE " + tableID).WillReturnRows(
sqlmock.NewRows([]string{"Table", "Create Table"}).
AddRow("test", "create table t(a int , d int PRIMARY KEY, c varchar(10), b int unique)"))
indexinfo, err = tracker.GetDownStreamIndexInfo(tcontext.Background(), tableID, oriTi)
indexinfo, _, err = tracker.GetDownStreamIndexAndTableInfo(tcontext.Background(), tableID, oriTi)
c.Assert(err, IsNil)
c.Assert(indexinfo, IsNil)
dti, ok = tracker.dsTracker.tableInfos[tableID]
c.Assert(ok, IsTrue)
c.Assert(len(dti.availableUKCache) == 1, IsTrue)
c.Assert(len(dti.AvailableUKCache) == 1, IsTrue)
delete(tracker.dsTracker.tableInfos, tableID)

mock.ExpectQuery("SHOW CREATE TABLE " + tableID).WillReturnRows(
sqlmock.NewRows([]string{"Table", "Create Table"}).
AddRow("test", "create table t(a int , d int PRIMARY KEY, c varchar(10), b int)"))
indexinfo, err = tracker.GetDownStreamIndexInfo(tcontext.Background(), tableID, oriTi)
indexinfo, _, err = tracker.GetDownStreamIndexAndTableInfo(tcontext.Background(), tableID, oriTi)
c.Assert(err, IsNil)
c.Assert(indexinfo, IsNil)
delete(tracker.dsTracker.tableInfos, tableID)
Expand All @@ -719,7 +719,7 @@ func (s *trackerSuite) TestGetDownStreamIndexInfo(c *C) {
mock.ExpectQuery("SHOW CREATE TABLE " + tableID).WillReturnRows(
sqlmock.NewRows([]string{"Table", "Create Table"}).
AddRow("test", "create table t(a int , d int unique not null, c varchar(10), b int unique not null)"))
indexinfo, err = tracker.GetDownStreamIndexInfo(tcontext.Background(), tableID, oriTi)
indexinfo, _, err = tracker.GetDownStreamIndexAndTableInfo(tcontext.Background(), tableID, oriTi)
c.Assert(err, IsNil)
c.Assert(indexinfo, NotNil)
c.Assert(indexinfo.Columns[0].Name.L == "b", IsTrue)
Expand All @@ -728,18 +728,18 @@ func (s *trackerSuite) TestGetDownStreamIndexInfo(c *C) {
mock.ExpectQuery("SHOW CREATE TABLE " + tableID).WillReturnRows(
sqlmock.NewRows([]string{"Table", "Create Table"}).
AddRow("test", "create table t(a int , d int unique not null, c varchar(10), b int unique)"))
indexinfo, err = tracker.GetDownStreamIndexInfo(tcontext.Background(), tableID, oriTi)
indexinfo, _, err = tracker.GetDownStreamIndexAndTableInfo(tcontext.Background(), tableID, oriTi)
c.Assert(err, IsNil)
c.Assert(indexinfo, IsNil)
dti, ok = tracker.dsTracker.tableInfos[tableID]
c.Assert(ok, IsTrue)
c.Assert(len(dti.availableUKCache) == 1, IsTrue)
c.Assert(len(dti.AvailableUKCache) == 1, IsTrue)
delete(tracker.dsTracker.tableInfos, tableID)

mock.ExpectQuery("SHOW CREATE TABLE " + tableID).WillReturnRows(
sqlmock.NewRows([]string{"Table", "Create Table"}).
AddRow("test", "create table t(a int , d int unique not null, c varchar(10), b int)"))
indexinfo, err = tracker.GetDownStreamIndexInfo(tcontext.Background(), tableID, oriTi)
indexinfo, _, err = tracker.GetDownStreamIndexAndTableInfo(tcontext.Background(), tableID, oriTi)
c.Assert(err, IsNil)
c.Assert(indexinfo, IsNil)
delete(tracker.dsTracker.tableInfos, tableID)
Expand Down Expand Up @@ -777,7 +777,7 @@ func (s *trackerSuite) TestGetAvailableDownStreanUKIndexInfo(c *C) {
mock.ExpectQuery("SHOW CREATE TABLE " + tableID).WillReturnRows(
sqlmock.NewRows([]string{"Table", "Create Table"}).
AddRow("test", "create table t(a int, b int, c varchar(10))"))
indexinfo, err := tracker.GetDownStreamIndexInfo(tcontext.Background(), tableID, oriTi)
indexinfo, _, err := tracker.GetDownStreamIndexAndTableInfo(tcontext.Background(), tableID, oriTi)
c.Assert(err, IsNil)
c.Assert(indexinfo, IsNil)
data := []interface{}{1, 2, 3}
Expand All @@ -789,7 +789,7 @@ func (s *trackerSuite) TestGetAvailableDownStreanUKIndexInfo(c *C) {
mock.ExpectQuery("SHOW CREATE TABLE " + tableID).WillReturnRows(
sqlmock.NewRows([]string{"Table", "Create Table"}).
AddRow("test", "create table t(a int unique, b int, c varchar(10))"))
indexinfo, err = tracker.GetDownStreamIndexInfo(tcontext.Background(), tableID, oriTi)
indexinfo, _, err = tracker.GetDownStreamIndexAndTableInfo(tcontext.Background(), tableID, oriTi)
c.Assert(err, IsNil)
c.Assert(indexinfo, IsNil)
data = []interface{}{nil, 2, 3}
Expand All @@ -801,7 +801,7 @@ func (s *trackerSuite) TestGetAvailableDownStreanUKIndexInfo(c *C) {
mock.ExpectQuery("SHOW CREATE TABLE " + tableID).WillReturnRows(
sqlmock.NewRows([]string{"Table", "Create Table"}).
AddRow("test", "create table t(a int unique, b int, c varchar(10))"))
indexinfo, err = tracker.GetDownStreamIndexInfo(tcontext.Background(), tableID, oriTi)
indexinfo, _, err = tracker.GetDownStreamIndexAndTableInfo(tcontext.Background(), tableID, oriTi)
c.Assert(err, IsNil)
c.Assert(indexinfo, IsNil)
data = []interface{}{1, 2, 3}
Expand All @@ -813,7 +813,7 @@ func (s *trackerSuite) TestGetAvailableDownStreanUKIndexInfo(c *C) {
mock.ExpectQuery("SHOW CREATE TABLE " + tableID).WillReturnRows(
sqlmock.NewRows([]string{"Table", "Create Table"}).
AddRow("test", "create table t(a int, b int, c varchar(10), unique key(a, b))"))
indexinfo, err = tracker.GetDownStreamIndexInfo(tcontext.Background(), tableID, oriTi)
indexinfo, _, err = tracker.GetDownStreamIndexAndTableInfo(tcontext.Background(), tableID, oriTi)
c.Assert(err, IsNil)
c.Assert(indexinfo, IsNil)
data = []interface{}{1, nil, 3}
Expand All @@ -824,7 +824,7 @@ func (s *trackerSuite) TestGetAvailableDownStreanUKIndexInfo(c *C) {
mock.ExpectQuery("SHOW CREATE TABLE " + tableID).WillReturnRows(
sqlmock.NewRows([]string{"Table", "Create Table"}).
AddRow("test", "create table t(a int, b int, c varchar(10), unique key(a, b))"))
indexinfo, err = tracker.GetDownStreamIndexInfo(tcontext.Background(), tableID, oriTi)
indexinfo, _, err = tracker.GetDownStreamIndexAndTableInfo(tcontext.Background(), tableID, oriTi)
c.Assert(err, IsNil)
c.Assert(indexinfo, IsNil)
data = []interface{}{1, nil, nil}
Expand All @@ -836,7 +836,7 @@ func (s *trackerSuite) TestGetAvailableDownStreanUKIndexInfo(c *C) {
mock.ExpectQuery("SHOW CREATE TABLE " + tableID).WillReturnRows(
sqlmock.NewRows([]string{"Table", "Create Table"}).
AddRow("test", "create table t(a int, b int, c varchar(10), unique key(a, b))"))
indexinfo, err = tracker.GetDownStreamIndexInfo(tcontext.Background(), tableID, oriTi)
indexinfo, _, err = tracker.GetDownStreamIndexAndTableInfo(tcontext.Background(), tableID, oriTi)
c.Assert(err, IsNil)
c.Assert(indexinfo, IsNil)
data = []interface{}{1, 2, 3}
Expand Down Expand Up @@ -876,7 +876,7 @@ func (s *trackerSuite) TestReTrackDownStreamIndex(c *C) {
mock.ExpectQuery("SHOW CREATE TABLE " + tableID).WillReturnRows(
sqlmock.NewRows([]string{"Table", "Create Table"}).
AddRow("test", "create table t(a int, b int, c varchar(10), PRIMARY KEY (a,b))"))
_, err = tracker.GetDownStreamIndexInfo(tcontext.Background(), tableID, oriTi)
_, _, err = tracker.GetDownStreamIndexAndTableInfo(tcontext.Background(), tableID, oriTi)
c.Assert(err, IsNil)
_, ok := tracker.dsTracker.tableInfos[tableID]
c.Assert(ok, IsTrue)
Expand All @@ -890,7 +890,7 @@ func (s *trackerSuite) TestReTrackDownStreamIndex(c *C) {
mock.ExpectQuery("SHOW CREATE TABLE " + tableID).WillReturnRows(
sqlmock.NewRows([]string{"Table", "Create Table"}).
AddRow("test", "create table t(a int, b int, c varchar(10), PRIMARY KEY (a,b))"))
_, err = tracker.GetDownStreamIndexInfo(tcontext.Background(), tableID, oriTi)
_, _, err = tracker.GetDownStreamIndexAndTableInfo(tcontext.Background(), tableID, oriTi)
c.Assert(err, IsNil)
_, ok = tracker.dsTracker.tableInfos[tableID]
c.Assert(ok, IsTrue)
Expand All @@ -902,7 +902,7 @@ func (s *trackerSuite) TestReTrackDownStreamIndex(c *C) {
mock.ExpectQuery("SHOW CREATE TABLE " + tableID).WillReturnRows(
sqlmock.NewRows([]string{"Table", "Create Table"}).
AddRow("test", "create table t(a int primary key, b int, c varchar(10))"))
_, err = tracker.GetDownStreamIndexInfo(tcontext.Background(), tableID, oriTi)
_, _, err = tracker.GetDownStreamIndexAndTableInfo(tcontext.Background(), tableID, oriTi)
c.Assert(err, IsNil)
_, ok = tracker.dsTracker.tableInfos[tableID]
c.Assert(ok, IsTrue)
Expand All @@ -916,7 +916,7 @@ func (s *trackerSuite) TestReTrackDownStreamIndex(c *C) {
mock.ExpectQuery("SHOW CREATE TABLE " + tableID).WillReturnRows(
sqlmock.NewRows([]string{"Table", "Create Table"}).
AddRow("test", "create table t(a int, b int, c varchar(10), PRIMARY KEY (a,b))"))
_, err = tracker.GetDownStreamIndexInfo(tcontext.Background(), tableID, oriTi)
_, _, err = tracker.GetDownStreamIndexAndTableInfo(tcontext.Background(), tableID, oriTi)
c.Assert(err, IsNil)
_, ok = tracker.dsTracker.tableInfos[tableID]
c.Assert(ok, IsTrue)
Expand All @@ -928,7 +928,7 @@ func (s *trackerSuite) TestReTrackDownStreamIndex(c *C) {
mock.ExpectQuery("SHOW CREATE TABLE " + tableID).WillReturnRows(
sqlmock.NewRows([]string{"Table", "Create Table"}).
AddRow("test", "create table t(a int primary key, b int, c varchar(10))"))
_, err = tracker.GetDownStreamIndexInfo(tcontext.Background(), tableID, oriTi)
_, _, err = tracker.GetDownStreamIndexAndTableInfo(tcontext.Background(), tableID, oriTi)
c.Assert(err, IsNil)
_, ok = tracker.dsTracker.tableInfos[tableID]
c.Assert(ok, IsTrue)
Expand Down
Loading