Skip to content

Commit

Permalink
This is an automated cherry-pick of pingcap#3345
Browse files Browse the repository at this point in the history
Signed-off-by: ti-chi-bot <[email protected]>
  • Loading branch information
lance6716 authored and ti-chi-bot committed Dec 17, 2021
1 parent 9c0bfed commit d5c9c66
Show file tree
Hide file tree
Showing 4 changed files with 271 additions and 2 deletions.
1 change: 1 addition & 0 deletions dm/dm/worker/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@ func NewServer(cfg *Config) *Server {
}

// Start starts to serving.
// this function should only exit when can't dail DM-master, for other errors it should not exit.
func (s *Server) Start() error {
log.L().Info("starting dm-worker server")
RegistryMetrics()
Expand Down
4 changes: 2 additions & 2 deletions dm/dm/worker/source_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -301,11 +301,11 @@ func (w *SourceWorker) EnableRelay() (err error) {
defer dcancel()
minLoc, err1 := getMinLocInAllSubTasks(dctx, subTaskCfgs)
if err1 != nil {
return err1
w.l.Error("meet error when EnableRelay", zap.Error(err1))
}

if minLoc != nil {
log.L().Info("get min location in all subtasks", zap.Stringer("location", *minLoc))
w.l.Info("get min location in all subtasks", zap.Stringer("location", *minLoc))
w.cfg.RelayBinLogName = binlog.AdjustPosition(minLoc.Position).Name
w.cfg.RelayBinlogGTID = minLoc.GTIDSetStr()
// set UUIDSuffix when bound to a source
Expand Down
227 changes: 227 additions & 0 deletions dm/pkg/schema/tracker.go
Original file line number Diff line number Diff line change
Expand Up @@ -345,3 +345,230 @@ func (tr *Tracker) CreateTableIfNotExists(table *filter.Table, ti *model.TableIn
func (tr *Tracker) GetSystemVar(name string) (string, bool) {
return tr.se.GetSessionVars().GetSystemVar(name)
}
<<<<<<< HEAD
=======

// GetDownStreamTableInfo gets downstream table info.
// note. this function will init downstreamTrack's table info.
func (tr *Tracker) GetDownStreamTableInfo(tctx *tcontext.Context, tableID string, originTi *model.TableInfo) (*DownstreamTableInfo, error) {
dti, ok := tr.dsTracker.tableInfos[tableID]
if !ok {
tctx.Logger.Info("Downstream schema tracker init. ", zap.String("tableID", tableID))
ti, err := tr.getTableInfoByCreateStmt(tctx, tableID)
if err != nil {
tctx.Logger.Error("Init dowstream schema info error. ", zap.String("tableID", tableID), zap.Error(err))
return nil, err
}

dti = GetDownStreamTi(ti, originTi)
tr.dsTracker.tableInfos[tableID] = dti
}
return dti, nil
}

// GetAvailableDownStreamUKIndexInfo gets available downstream UK whose data is not null.
// note. this function will not init downstreamTrack.
func (tr *Tracker) GetAvailableDownStreamUKIndexInfo(tableID string, data []interface{}) *model.IndexInfo {
dti, ok := tr.dsTracker.tableInfos[tableID]

if !ok || len(dti.AvailableUKIndexList) == 0 {
return nil
}
// func for check data is not null
fn := func(i int) bool {
return data[i] != nil
}

for _, uk := range dti.AvailableUKIndexList {
// check uk's column data is not null
if isSpecifiedIndexColumn(uk, fn) {
return uk
}
}
return nil
}

// RemoveDownstreamSchema just remove schema or table in downstreamTrack.
func (tr *Tracker) RemoveDownstreamSchema(tctx *tcontext.Context, targetTables []*filter.Table) {
if len(targetTables) == 0 {
return
}

for _, targetTable := range targetTables {
tableID := utils.GenTableID(targetTable)
_, ok := tr.dsTracker.tableInfos[tableID]
if !ok {
// handle just have schema
if targetTable.Schema != "" && targetTable.Name == "" {
for k := range tr.dsTracker.tableInfos {
if strings.HasPrefix(k, tableID+".") {
delete(tr.dsTracker.tableInfos, k)
tctx.Logger.Info("Remove downstream schema tracker", zap.String("tableID", k))
}
}
}
} else {
delete(tr.dsTracker.tableInfos, tableID)
tctx.Logger.Info("Remove downstream schema tracker", zap.String("tableID", tableID))
}
}
}

// getTableInfoByCreateStmt get downstream tableInfo by "SHOW CREATE TABLE" stmt.
func (tr *Tracker) getTableInfoByCreateStmt(tctx *tcontext.Context, tableID string) (*model.TableInfo, error) {
if tr.dsTracker.stmtParser == nil {
err := tr.initDownStreamSQLModeAndParser(tctx)
if err != nil {
return nil, err
}
}
createStr, err := utils.GetTableCreateSQL(tctx.Ctx, tr.dsTracker.downstreamConn.BaseConn.DBConn, tableID)
if err != nil {
return nil, dmterror.ErrSchemaTrackerCannotFetchDownstreamCreateTableStmt.Delegate(err, tableID)
}

tctx.Logger.Info("Show create table info", zap.String("tableID", tableID), zap.String("create string", createStr))
// parse create table stmt.
stmtNode, err := tr.dsTracker.stmtParser.ParseOneStmt(createStr, "", "")
if err != nil {
return nil, dmterror.ErrSchemaTrackerInvalidCreateTableStmt.Delegate(err, createStr)
}

ti, err := ddl.MockTableInfo(mock.NewContext(), stmtNode.(*ast.CreateTableStmt), mockTableID)
if err != nil {
return nil, dmterror.ErrSchemaTrackerCannotMockDownstreamTable.Delegate(err, createStr)
}
return ti, nil
}

// initDownStreamTrackerParser init downstream tracker parser by default sql_mode.
func (tr *Tracker) initDownStreamSQLModeAndParser(tctx *tcontext.Context) error {
setSQLMode := fmt.Sprintf("SET SESSION SQL_MODE = '%s'", mysql.DefaultSQLMode)
_, err := tr.dsTracker.downstreamConn.ExecuteSQL(tctx, []string{setSQLMode})
if err != nil {
return dmterror.ErrSchemaTrackerCannotSetDownstreamSQLMode.Delegate(err, mysql.DefaultSQLMode)
}
stmtParser, err := utils.GetParserFromSQLModeStr(mysql.DefaultSQLMode)
if err != nil {
return dmterror.ErrSchemaTrackerCannotInitDownstreamParser.Delegate(err, mysql.DefaultSQLMode)
}
tr.dsTracker.stmtParser = stmtParser
return nil
}

// GetDownStreamTi constructs downstreamTable index cache by tableinfo.
func GetDownStreamTi(ti *model.TableInfo, originTi *model.TableInfo) *DownstreamTableInfo {
var (
absoluteUKIndexInfo *model.IndexInfo
availableUKIndexList = []*model.IndexInfo{}
hasPk = false
absoluteUKPosition = -1
)

// func for check not null constraint
fn := func(i int) bool {
return mysql.HasNotNullFlag(ti.Columns[i].Flag)
}

for i, idx := range ti.Indices {
if !idx.Primary && !idx.Unique {
continue
}
indexRedirect := redirectIndexKeys(idx, originTi)
if indexRedirect == nil {
continue
}
availableUKIndexList = append(availableUKIndexList, indexRedirect)
if idx.Primary {
absoluteUKIndexInfo = indexRedirect
absoluteUKPosition = i
hasPk = true
} else if absoluteUKIndexInfo == nil && isSpecifiedIndexColumn(idx, fn) {
// second check not null unique key
absoluteUKIndexInfo = indexRedirect
absoluteUKPosition = i
}
}

// handle pk exceptional case.
// e.g. "create table t(a int primary key, b int)".
if !hasPk {
exPk := redirectIndexKeys(handlePkExCase(ti), originTi)
if exPk != nil {
absoluteUKIndexInfo = exPk
absoluteUKPosition = len(availableUKIndexList)
availableUKIndexList = append(availableUKIndexList, absoluteUKIndexInfo)
}
}

// move absoluteUKIndexInfo to the first in availableUKIndexList
if absoluteUKPosition != -1 && len(availableUKIndexList) > 1 {
availableUKIndexList[0], availableUKIndexList[absoluteUKPosition] = availableUKIndexList[absoluteUKPosition], availableUKIndexList[0]
}

return &DownstreamTableInfo{
TableInfo: ti,
AbsoluteUKIndexInfo: absoluteUKIndexInfo,
AvailableUKIndexList: availableUKIndexList,
}
}

// redirectIndexKeys redirect index's columns offset in origin tableinfo.
func redirectIndexKeys(index *model.IndexInfo, originTi *model.TableInfo) *model.IndexInfo {
if index == nil || originTi == nil {
return nil
}

columns := make([]*model.IndexColumn, 0, len(index.Columns))
for _, key := range index.Columns {
originColumn := model.FindColumnInfo(originTi.Columns, key.Name.L)
if originColumn == nil {
return nil
}
column := &model.IndexColumn{
Name: key.Name,
Offset: originColumn.Offset,
Length: key.Length,
}
columns = append(columns, column)
}
return &model.IndexInfo{
Table: index.Table,
Unique: index.Unique,
Primary: index.Primary,
State: index.State,
Tp: index.Tp,
Columns: columns,
}
}

// handlePkExCase is handle pk exceptional case.
// e.g. "create table t(a int primary key, b int)".
func handlePkExCase(ti *model.TableInfo) *model.IndexInfo {
if pk := ti.GetPkColInfo(); pk != nil {
return &model.IndexInfo{
Table: ti.Name,
Unique: true,
Primary: true,
State: model.StatePublic,
Tp: model.IndexTypeBtree,
Columns: []*model.IndexColumn{{
Name: pk.Name,
Offset: pk.Offset,
Length: types.UnspecifiedLength,
}},
}
}
return nil
}

// isSpecifiedIndexColumn checks all of index's columns are matching 'fn'.
func isSpecifiedIndexColumn(index *model.IndexInfo, fn func(i int) bool) bool {
for _, col := range index.Columns {
if !fn(col.Offset) {
return false
}
}
return true
}
>>>>>>> 8814deda7 (dm/worker: don't exit when failed to read checkpoint in relay (#3345))
41 changes: 41 additions & 0 deletions dm/tests/new_relay/run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,46 @@ function test_cant_dail_upstream() {
cleanup_data $TEST_NAME
}

function test_cant_dail_downstream() {
cleanup_data $TEST_NAME
cleanup_process

run_dm_master $WORK_DIR/master $MASTER_PORT $cur/conf/dm-master.toml
check_rpc_alive $cur/../bin/check_master_online 127.0.0.1:$MASTER_PORT
run_dm_worker $WORK_DIR/worker1 $WORKER1_PORT $cur/conf/dm-worker1.toml
check_rpc_alive $cur/../bin/check_worker_online 127.0.0.1:$WORKER1_PORT

cp $cur/conf/source1.yaml $WORK_DIR/source1.yaml
dmctl_operate_source create $WORK_DIR/source1.yaml $SOURCE_ID1

run_dm_ctl_with_retry $WORK_DIR "127.0.0.1:$MASTER_PORT" \
"start-relay -s $SOURCE_ID1 worker1" \
"\"result\": true" 1
dmctl_start_task_standalone $cur/conf/dm-task.yaml "--remove-meta"

kill_dm_worker
# kill tidb
pkill -hup tidb-server 2>/dev/null || true
wait_process_exit tidb-server

run_dm_worker $WORK_DIR/worker1 $WORKER1_PORT $cur/conf/dm-worker1.toml
check_rpc_alive $cur/../bin/check_worker_online 127.0.0.1:$WORKER1_PORT

# make sure DM-worker doesn't exit
sleep 2
run_dm_ctl_with_retry $WORK_DIR "127.0.0.1:$MASTER_PORT" \
"query-status -s $SOURCE_ID1" \
"\"relayCatchUpMaster\": true" 1 \
"dial tcp 127.0.0.1:4000: connect: connection refused" 1

# restart tidb
run_tidb_server 4000 $TIDB_PASSWORD
sleep 2

cleanup_process
cleanup_data $TEST_NAME
}

function test_kill_dump_connection() {
cleanup_data $TEST_NAME
cleanup_process
Expand Down Expand Up @@ -83,6 +123,7 @@ function test_kill_dump_connection() {
}

function run() {
test_cant_dail_downstream
test_cant_dail_upstream

export GO_FAILPOINTS="github.com/pingcap/ticdc/dm/relay/ReportRelayLogSpaceInBackground=return(1)"
Expand Down

0 comments on commit d5c9c66

Please sign in to comment.