Skip to content

Commit

Permalink
Merge remote-tracking branch 'pingcap-origin/master'
Browse files Browse the repository at this point in the history
  • Loading branch information
buchuitoudegou committed Jan 24, 2022
2 parents 8b0ebbb + 239a21f commit 6080a27
Show file tree
Hide file tree
Showing 13 changed files with 219 additions and 104 deletions.
8 changes: 5 additions & 3 deletions deployments/ticdc/docker/integration-test.Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -39,9 +39,11 @@ RUN yum install -y \
RUN wget http://dl.fedoraproject.org/pub/epel/epel-release-latest-7.noarch.rpm
RUN yum install -y epel-release-latest-7.noarch.rpm
RUN yum --enablerepo=epel install -y s3cmd
RUN wget -i -c http://dev.mysql.com/get/mysql57-community-release-el7-10.noarch.rpm
RUN yum install -y mysql57-community-release-el7-10.noarch.rpm
RUN yum install -y mysql-server
# Install mysql client.
RUN rpm -ivh https://repo.mysql.com/mysql57-community-release-el7-11.noarch.rpm
# See: https://support.cpanel.net/hc/en-us/articles/4419382481815?input_string=gpg+keys+problem+with+mysql+5.7
RUN rpm --import https://repo.mysql.com/RPM-GPG-KEY-mysql-2022
RUN yum install mysql-community-client.x86_64 -y

# Copy go form downloader.
COPY --from=downloader /usr/local/go /usr/local/go
Expand Down
66 changes: 33 additions & 33 deletions dm/dm/master/scheduler/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -220,27 +220,27 @@ func (s *Scheduler) Start(pCtx context.Context, etcdCli *clientv3.Client) (err e
s.reset() // reset previous status.

// recover previous status from etcd.
err = s.recoverSources(etcdCli)
err = s.recoverSources()
if err != nil {
return err
}
err = s.recoverSubTasks(etcdCli)
err = s.recoverSubTasks()
if err != nil {
return err
}
err = s.recoverRelayConfigs(etcdCli)
err = s.recoverRelayConfigs()
if err != nil {
return err
}

var loadTaskRev int64
loadTaskRev, err = s.recoverLoadTasks(etcdCli, false)
loadTaskRev, err = s.recoverLoadTasks(false)
if err != nil {
return err
}

var rev int64
rev, err = s.recoverWorkersBounds(etcdCli)
rev, err = s.recoverWorkersBounds()
if err != nil {
return err
}
Expand All @@ -266,7 +266,7 @@ func (s *Scheduler) Start(pCtx context.Context, etcdCli *clientv3.Client) (err e
// starting to observe status of DM-worker instances.
// TODO: handle fatal error from observeWorkerEvent
//nolint:errcheck
s.observeWorkerEvent(ctx, etcdCli, rev1)
s.observeWorkerEvent(ctx, rev1)
}(rev)

s.wg.Add(1)
Expand All @@ -275,7 +275,7 @@ func (s *Scheduler) Start(pCtx context.Context, etcdCli *clientv3.Client) (err e
// starting to observe load task.
// TODO: handle fatal error from observeLoadTask
//nolint:errcheck
s.observeLoadTask(ctx, etcdCli, rev1)
s.observeLoadTask(ctx, rev1)
}(loadTaskRev)

s.started.Store(true) // started now
Expand Down Expand Up @@ -1578,14 +1578,14 @@ func (s *Scheduler) Started() bool {
}

// recoverSourceCfgs recovers history source configs and expectant relay stages from etcd.
func (s *Scheduler) recoverSources(cli *clientv3.Client) error {
func (s *Scheduler) recoverSources() error {
// get all source configs.
cfgM, _, err := ha.GetSourceCfg(cli, "", 0)
cfgM, _, err := ha.GetSourceCfg(s.etcdCli, "", 0)
if err != nil {
return err
}
// get all relay stages.
stageM, _, err := ha.GetAllRelayStage(cli)
stageM, _, err := ha.GetAllRelayStage(s.etcdCli)
if err != nil {
return err
}
Expand All @@ -1602,14 +1602,14 @@ func (s *Scheduler) recoverSources(cli *clientv3.Client) error {
}

// recoverSubTasks recovers history subtask configs and expectant subtask stages from etcd.
func (s *Scheduler) recoverSubTasks(cli *clientv3.Client) error {
func (s *Scheduler) recoverSubTasks() error {
// get all subtask configs.
cfgMM, _, err := ha.GetAllSubTaskCfg(cli)
cfgMM, _, err := ha.GetAllSubTaskCfg(s.etcdCli)
if err != nil {
return err
}
// get all subtask stages.
stageMM, _, err := ha.GetAllSubTaskStage(cli)
stageMM, _, err := ha.GetAllSubTaskStage(s.etcdCli)
if err != nil {
return err
}
Expand Down Expand Up @@ -1637,8 +1637,8 @@ func (s *Scheduler) recoverSubTasks(cli *clientv3.Client) error {
// This function also removes conflicting relay schedule types, which means if a source has both `enable-relay` and
// (source, worker) relay config, we remove the latter.
// should be called after recoverSources.
func (s *Scheduler) recoverRelayConfigs(cli *clientv3.Client) error {
relayWorkers, _, err := ha.GetAllRelayConfig(cli)
func (s *Scheduler) recoverRelayConfigs() error {
relayWorkers, _, err := ha.GetAllRelayConfig(s.etcdCli)
if err != nil {
return err
}
Expand All @@ -1651,7 +1651,7 @@ func (s *Scheduler) recoverRelayConfigs(cli *clientv3.Client) error {
}
if sourceCfg.EnableRelay {
// current etcd max-txn-op is 2048
_, err2 := ha.DeleteRelayConfig(cli, utils.SetToSlice(workers)...)
_, err2 := ha.DeleteRelayConfig(s.etcdCli, utils.SetToSlice(workers)...)
if err2 != nil {
return err2
}
Expand All @@ -1664,12 +1664,12 @@ func (s *Scheduler) recoverRelayConfigs(cli *clientv3.Client) error {
}

// recoverLoadTasks recovers history load workers from etcd.
func (s *Scheduler) recoverLoadTasks(cli *clientv3.Client, needLock bool) (int64, error) {
func (s *Scheduler) recoverLoadTasks(needLock bool) (int64, error) {
if needLock {
s.mu.Lock()
defer s.mu.Unlock()
}
loadTasks, rev, err := ha.GetAllLoadTask(cli)
loadTasks, rev, err := ha.GetAllLoadTask(s.etcdCli)
if err != nil {
return 0, err
}
Expand All @@ -1680,30 +1680,30 @@ func (s *Scheduler) recoverLoadTasks(cli *clientv3.Client, needLock bool) (int64

// recoverWorkersBounds recovers history DM-worker info and status from etcd.
// and it also recovers the bound/unbound relationship.
func (s *Scheduler) recoverWorkersBounds(cli *clientv3.Client) (int64, error) {
func (s *Scheduler) recoverWorkersBounds() (int64, error) {
// 1. get all history base info.
// it should no new DM-worker registered between this call and the below `GetKeepAliveWorkers`,
// because no DM-master leader are handling DM-worker register requests.
wim, _, err := ha.GetAllWorkerInfo(cli)
wim, _, err := ha.GetAllWorkerInfo(s.etcdCli)
if err != nil {
return 0, err
}

// 2. get all history bound relationships.
// it should no new bound relationship added between this call and the below `GetKeepAliveWorkers`,
// because no DM-master leader are doing the scheduler.
sbm, _, err := ha.GetSourceBound(cli, "")
sbm, _, err := ha.GetSourceBound(s.etcdCli, "")
if err != nil {
return 0, err
}
lastSourceBoundM, _, err := ha.GetLastSourceBounds(cli)
lastSourceBoundM, _, err := ha.GetLastSourceBounds(s.etcdCli)
if err != nil {
return 0, err
}
s.lastBound = lastSourceBoundM

// 3. get all history offline status.
kam, rev, err := ha.GetKeepAliveWorkers(cli)
kam, rev, err := ha.GetKeepAliveWorkers(s.etcdCli)
if err != nil {
return 0, err
}
Expand Down Expand Up @@ -1767,15 +1767,15 @@ func (s *Scheduler) recoverWorkersBounds(cli *clientv3.Client) (int64, error) {
for name := range sbm {
invalidSourceBounds = append(invalidSourceBounds, name)
}
_, err = ha.DeleteSourceBound(cli, invalidSourceBounds...)
_, err = ha.DeleteSourceBound(s.etcdCli, invalidSourceBounds...)
if err != nil {
return 0, err
}
}

// 6. put trigger source bounds info to etcd to order dm-workers to start source
if len(boundsToTrigger) > 0 {
_, err = ha.PutSourceBound(cli, boundsToTrigger...)
_, err = ha.PutSourceBound(s.etcdCli, boundsToTrigger...)
if err != nil {
return 0, err
}
Expand All @@ -1791,12 +1791,12 @@ func (s *Scheduler) recoverWorkersBounds(cli *clientv3.Client) (int64, error) {
return rev, nil
}

func (s *Scheduler) resetWorkerEv(cli *clientv3.Client) (int64, error) {
func (s *Scheduler) resetWorkerEv() (int64, error) {
s.mu.Lock()
defer s.mu.Unlock()

rwm := s.workers
kam, rev, err := ha.GetKeepAliveWorkers(cli)
kam, rev, err := ha.GetKeepAliveWorkers(s.etcdCli)
if err != nil {
return 0, err
}
Expand Down Expand Up @@ -1856,7 +1856,7 @@ func (s *Scheduler) handleWorkerEv(ctx context.Context, evCh <-chan ha.WorkerEve
}

// nolint:dupl
func (s *Scheduler) observeWorkerEvent(ctx context.Context, etcdCli *clientv3.Client, rev int64) error {
func (s *Scheduler) observeWorkerEvent(ctx context.Context, rev int64) error {
var wg sync.WaitGroup
for {
workerEvCh := make(chan ha.WorkerEvent, 10)
Expand All @@ -1870,7 +1870,7 @@ func (s *Scheduler) observeWorkerEvent(ctx context.Context, etcdCli *clientv3.Cl
close(workerErrCh)
wg.Done()
}()
ha.WatchWorkerEvent(ctx1, etcdCli, rev+1, workerEvCh, workerErrCh)
ha.WatchWorkerEvent(ctx1, s.etcdCli, rev+1, workerEvCh, workerErrCh)
}()
err := s.handleWorkerEv(ctx1, workerEvCh, workerErrCh)
cancel1()
Expand All @@ -1884,7 +1884,7 @@ func (s *Scheduler) observeWorkerEvent(ctx context.Context, etcdCli *clientv3.Cl
case <-ctx.Done():
return nil
case <-time.After(500 * time.Millisecond):
rev, err = s.resetWorkerEv(etcdCli)
rev, err = s.resetWorkerEv()
if err != nil {
log.L().Error("resetWorkerEv is failed, will retry later", zap.Error(err), zap.Int("retryNum", retryNum))
}
Expand Down Expand Up @@ -2311,7 +2311,7 @@ func (s *Scheduler) SetWorkerClientForTest(name string, mockCli workerrpc.Client
}

// nolint:dupl
func (s *Scheduler) observeLoadTask(ctx context.Context, etcdCli *clientv3.Client, rev int64) error {
func (s *Scheduler) observeLoadTask(ctx context.Context, rev int64) error {
var wg sync.WaitGroup
for {
loadTaskCh := make(chan ha.LoadTask, 10)
Expand All @@ -2325,7 +2325,7 @@ func (s *Scheduler) observeLoadTask(ctx context.Context, etcdCli *clientv3.Clien
close(loadTaskErrCh)
wg.Done()
}()
ha.WatchLoadTask(ctx1, etcdCli, rev+1, loadTaskCh, loadTaskErrCh)
ha.WatchLoadTask(ctx1, s.etcdCli, rev+1, loadTaskCh, loadTaskErrCh)
}()
err := s.handleLoadTask(ctx1, loadTaskCh, loadTaskErrCh)
cancel1()
Expand All @@ -2339,7 +2339,7 @@ func (s *Scheduler) observeLoadTask(ctx context.Context, etcdCli *clientv3.Clien
case <-ctx.Done():
return nil
case <-time.After(500 * time.Millisecond):
rev, err = s.recoverLoadTasks(etcdCli, true)
rev, err = s.recoverLoadTasks(true)
if err != nil {
log.L().Error("resetLoadTask is failed, will retry later", zap.Error(err), zap.Int("retryNum", retryNum))
}
Expand Down
18 changes: 9 additions & 9 deletions dm/dm/master/scheduler/scheduler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -986,7 +986,7 @@ func (t *testScheduler) TestWatchWorkerEventEtcdCompact(c *C) {
wg.Add(1)
go func() {
defer wg.Done()
c.Assert(s.observeWorkerEvent(ctx2, etcdTestCli, startRev), IsNil)
c.Assert(s.observeWorkerEvent(ctx2, startRev), IsNil)
}()
// step 5.3: wait for scheduler to restart handleWorkerEvent, then start a new worker
time.Sleep(time.Second)
Expand All @@ -1008,7 +1008,7 @@ func (t *testScheduler) TestWatchWorkerEventEtcdCompact(c *C) {
wg.Add(1)
go func() {
defer wg.Done()
c.Assert(s.observeWorkerEvent(ctx3, etcdTestCli, startRev), IsNil)
c.Assert(s.observeWorkerEvent(ctx3, startRev), IsNil)
}()
c.Assert(utils.WaitSomething(30, 100*time.Millisecond, func() bool {
bounds := s.BoundSources()
Expand Down Expand Up @@ -1722,7 +1722,7 @@ func (t *testScheduler) TestWatchLoadTask(c *C) {
wg.Add(1)
go func() {
defer wg.Done()
c.Assert(s.observeLoadTask(ctx1, etcdTestCli, startRev), IsNil)
c.Assert(s.observeLoadTask(ctx1, startRev), IsNil)
}()

// put task2, source1, worker1
Expand Down Expand Up @@ -1813,9 +1813,9 @@ func (t *testScheduler) TestWorkerHasDiffRelayAndBound(c *C) {
go ha.KeepAlive(ctx, etcdTestCli, workerName1, keepAlive)

// bootstrap
c.Assert(s.recoverSources(etcdTestCli), IsNil)
c.Assert(s.recoverRelayConfigs(etcdTestCli), IsNil)
_, err = s.recoverWorkersBounds(etcdTestCli)
c.Assert(s.recoverSources(), IsNil)
c.Assert(s.recoverRelayConfigs(), IsNil)
_, err = s.recoverWorkersBounds()
c.Assert(err, IsNil)

// check
Expand Down Expand Up @@ -1876,9 +1876,9 @@ func (t *testScheduler) TestUpgradeCauseConflictRelayType(c *C) {
go ha.KeepAlive(ctx, etcdTestCli, workerName2, keepAlive)

// bootstrap
c.Assert(s.recoverSources(etcdTestCli), IsNil)
c.Assert(s.recoverRelayConfigs(etcdTestCli), IsNil)
_, err = s.recoverWorkersBounds(etcdTestCli)
c.Assert(s.recoverSources(), IsNil)
c.Assert(s.recoverRelayConfigs(), IsNil)
_, err = s.recoverWorkersBounds()
c.Assert(err, IsNil)

// check when the relay config is conflicting with source config, relay config should be deleted
Expand Down
15 changes: 14 additions & 1 deletion dm/loader/checkpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"github.com/pingcap/tiflow/dm/pkg/conn"
tcontext "github.com/pingcap/tiflow/dm/pkg/context"
"github.com/pingcap/tiflow/dm/pkg/cputil"
fr "github.com/pingcap/tiflow/dm/pkg/func-rollback"
"github.com/pingcap/tiflow/dm/pkg/log"
"github.com/pingcap/tiflow/dm/pkg/terror"

Expand Down Expand Up @@ -101,7 +102,18 @@ type RemoteCheckPoint struct {
}

func newRemoteCheckPoint(tctx *tcontext.Context, cfg *config.SubTaskConfig, id string) (CheckPoint, error) {
db, dbConns, err := createConns(tctx, cfg, cfg.Name, cfg.SourceID, 1)
var err error
var db *conn.BaseDB
var dbConns []*DBConn

rollbackHolder := fr.NewRollbackHolder("loader")
defer func() {
if err != nil {
rollbackHolder.RollbackReverseOrder()
}
}()

db, dbConns, err = createConns(tctx, cfg, cfg.Name, cfg.SourceID, 1)
if err != nil {
return nil, err
}
Expand All @@ -116,6 +128,7 @@ func newRemoteCheckPoint(tctx *tcontext.Context, cfg *config.SubTaskConfig, id s
logger: tctx.L().WithFields(zap.String("component", "remote checkpoint")),
}
cp.restoringFiles.pos = make(map[string]map[string]FilePosSet)
rollbackHolder.Add(fr.FuncRollback{Name: "CloseRemoteCheckPoint", Fn: cp.Close})

err = cp.prepare(tctx)
if err != nil {
Expand Down
Loading

0 comments on commit 6080a27

Please sign in to comment.