From 0dc1690393391bf58091529cf4a187efd7bf1ca0 Mon Sep 17 00:00:00 2001 From: leoppro Date: Mon, 31 May 2021 14:49:55 +0800 Subject: [PATCH 01/17] update --- cdc/owner.go | 2 +- cdc/owner/changefeed.go | 423 +++++++++++++++++++++++++++++++++++ cdc/owner/changefeed_test.go | 308 +++++++++++++++++++++++++ cdc/owner/metrics.go | 55 +++++ cmd/client_changefeed.go | 2 +- cmd/util.go | 4 +- go.sum | 1 - pkg/errors/errors.go | 4 + pkg/errors/helper.go | 19 +- pkg/util/gc_service.go | 6 +- pkg/util/gc_service_test.go | 11 +- 11 files changed, 822 insertions(+), 13 deletions(-) create mode 100644 cdc/owner/changefeed.go create mode 100644 cdc/owner/changefeed_test.go create mode 100644 cdc/owner/metrics.go diff --git a/cdc/owner.go b/cdc/owner.go index a26dbf315e3..5a393db8bf9 100644 --- a/cdc/owner.go +++ b/cdc/owner.go @@ -301,7 +301,7 @@ func (o *Owner) newChangeFeed( log.Info("Find new changefeed", zap.Stringer("info", info), zap.String("changefeed", id), zap.Uint64("checkpoint ts", checkpointTs)) if info.Config.CheckGCSafePoint { - err := util.CheckSafetyOfStartTs(ctx, o.pdClient, checkpointTs) + err := util.CheckSafetyOfStartTs(ctx, o.pdClient, id, checkpointTs) if err != nil { return nil, errors.Trace(err) } diff --git a/cdc/owner/changefeed.go b/cdc/owner/changefeed.go new file mode 100644 index 00000000000..709f4019320 --- /dev/null +++ b/cdc/owner/changefeed.go @@ -0,0 +1,423 @@ +// Copyright 2021 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package owner + +import ( + "context" + "reflect" + "sync" + "time" + + "github.com/pingcap/errors" + "github.com/pingcap/failpoint" + "github.com/pingcap/log" + timodel "github.com/pingcap/parser/model" + "github.com/pingcap/ticdc/cdc/model" + cdcContext "github.com/pingcap/ticdc/pkg/context" + cerror "github.com/pingcap/ticdc/pkg/errors" + "github.com/pingcap/ticdc/pkg/util" + "github.com/pingcap/tidb/sessionctx/binloginfo" + "github.com/pingcap/tidb/store/tikv/oracle" + "github.com/prometheus/client_golang/prometheus" + "go.uber.org/zap" +) + +type changefeed struct { + id model.ChangeFeedID + state *model.ChangefeedReactorState + + scheduler *scheduler + barriers *barriers + feedStateManager *feedStateManager + gcManager *gcManager + + schema *schemaWrap4Owner + sink AsyncSink + ddlPuller DDLPuller + initialized bool + ddlEventCache *model.DDLEvent + + errCh chan error + cancel context.CancelFunc + wg sync.WaitGroup + + metricsChangefeedCheckpointTsGauge prometheus.Gauge + metricsChangefeedCheckpointTsLagGauge prometheus.Gauge + + newDDLPuller func(ctx cdcContext.Context, startTs uint64) (DDLPuller, error) + newSink func(ctx cdcContext.Context) (AsyncSink, error) +} + +func newChangefeed(id model.ChangeFeedID, gcManager *gcManager) *changefeed { + c := &changefeed{ + id: id, + scheduler: newScheduler(), + barriers: newBarriers(), + feedStateManager: new(feedStateManager), + gcManager: gcManager, + + errCh: make(chan error, defaultErrChSize), + cancel: func() {}, + + newDDLPuller: newDDLPuller, + } + c.newSink = newAsyncSink + return c +} + +func newChangefeed4Test( + id model.ChangeFeedID, gcManager *gcManager, + newDDLPuller func(ctx cdcContext.Context, startTs uint64) (DDLPuller, error), + newSink func(ctx cdcContext.Context) (AsyncSink, error), +) *changefeed { + c := newChangefeed(id, gcManager) + c.newDDLPuller = newDDLPuller + c.newSink = newSink + return c +} + +func (c *changefeed) Tick(ctx cdcContext.Context, state *model.ChangefeedReactorState, captures map[model.CaptureID]*model.CaptureInfo) { + ctx = cdcContext.WithErrorHandler(ctx, func(err error) error { + c.errCh <- errors.Trace(err) + return nil + }) + if err := c.tick(ctx, state, captures); err != nil { + log.Error("an error occurred in Owner", zap.String("changefeedID", c.state.ID), zap.Error(err), zap.Stringer("tp", reflect.TypeOf(err))) + var code string + if rfcCode, ok := cerror.RFCCode(err); ok { + code = string(rfcCode) + } else { + code = string(cerror.ErrOwnerUnknown.RFCCode()) + } + c.feedStateManager.HandleError(&model.RunningError{ + Addr: util.CaptureAddrFromCtx(ctx), + Code: code, + Message: err.Error(), + }) + c.releaseResources() + } +} + +func (c *changefeed) tick(ctx cdcContext.Context, state *model.ChangefeedReactorState, captures map[model.CaptureID]*model.CaptureInfo) error { + c.state = state + c.feedStateManager.Tick(state) + if !c.feedStateManager.ShouldRunning() { + c.releaseResources() + return nil + } + + checkpointTs := c.state.Info.GetCheckpointTs(c.state.Status) + if err := c.gcManager.CheckStaleCheckpointTs(ctx, checkpointTs); err != nil { + return errors.Trace(err) + } + if !c.preCheck(captures) { + return nil + } + if err := c.initialize(ctx); err != nil { + return errors.Trace(err) + } + + select { + case err := <-c.errCh: + return errors.Trace(err) + default: + } + + c.sink.EmitCheckpointTs(ctx, checkpointTs) + barrierTs, err := c.handleBarrier(ctx) + if err != nil { + return errors.Trace(err) + } + shouldUpdateState, err := c.scheduler.Tick(c.state, c.schema.AllPhysicalTables(), captures) + if err != nil { + return errors.Trace(err) + } + if shouldUpdateState { + c.updateStatus(barrierTs) + } + return nil +} + +func (c *changefeed) initialize(ctx cdcContext.Context) error { + if c.initialized { + return nil + } + // empty the errCh +LOOP: + for { + select { + case <-c.errCh: + default: + break LOOP + } + } + startTs := c.state.Info.GetCheckpointTs(c.state.Status) + log.Info("initialize changefeed", zap.String("changefeed", c.state.ID), + zap.Stringer("info", c.state.Info), + zap.Uint64("checkpoint ts", startTs)) + failpoint.Inject("NewChangefeedNoRetryError", func() { + failpoint.Return(cerror.ErrStartTsBeforeGC.GenWithStackByArgs(startTs-300, startTs)) + }) + + failpoint.Inject("NewChangefeedRetryError", func() { + failpoint.Return(errors.New("failpoint injected retriable error")) + }) + + if c.state.Info.Config.CheckGCSafePoint { + err := util.CheckSafetyOfStartTs(ctx, ctx.GlobalVars().PDClient, c.state.ID, startTs) + if err != nil { + return errors.Trace(err) + } + } + c.barriers.Update(ddlJobBarrier, startTs) + c.barriers.Update(syncPointBarrier, startTs) + c.barriers.Update(finishBarrier, c.state.Info.GetTargetTs()) + var err error + c.schema, err = newSchemaWrap4Owner(ctx.GlobalVars().KVStorage, startTs, c.state.Info.Config) + if err != nil { + return errors.Trace(err) + } + cancelCtx, cancel := cdcContext.WithCancel(ctx) + c.cancel = cancel + c.sink, err = c.newSink(cancelCtx) + if err != nil { + return errors.Trace(err) + } + err = c.sink.Initialize(cancelCtx, c.schema.SinkTableInfos()) + if err != nil { + return errors.Trace(err) + } + c.ddlPuller, err = c.newDDLPuller(cancelCtx, startTs) + if err != nil { + return errors.Trace(err) + } + c.wg.Add(1) + go func() { + defer c.wg.Done() + ctx.Throw(c.ddlPuller.Run(cancelCtx)) + }() + + // init metrics + c.metricsChangefeedCheckpointTsGauge = changefeedCheckpointTsGauge.WithLabelValues(c.id) + c.metricsChangefeedCheckpointTsLagGauge = changefeedCheckpointTsLagGauge.WithLabelValues(c.id) + c.initialized = true + return nil +} + +func (c *changefeed) releaseResources() { + if !c.initialized { + return + } + log.Info("close changefeed", zap.String("changefeed", c.state.ID), + zap.Stringer("info", c.state.Info)) + c.cancel() + c.cancel = func() {} + c.ddlPuller.Close() + c.schema = nil + if err := c.sink.Close(); err != nil { + log.Warn("release the owner resources failed", zap.String("changefeedID", c.state.ID), zap.Error(err)) + } + c.wg.Wait() + changefeedCheckpointTsGauge.DeleteLabelValues(c.id) + changefeedCheckpointTsLagGauge.DeleteLabelValues(c.id) + c.metricsChangefeedCheckpointTsGauge = nil + c.metricsChangefeedCheckpointTsLagGauge = nil + c.initialized = false +} + +func (c *changefeed) preCheck(captures map[model.CaptureID]*model.CaptureInfo) (passCheck bool) { + passCheck = true + if c.state.Status == nil { + c.state.PatchStatus(func(status *model.ChangeFeedStatus) (*model.ChangeFeedStatus, bool, error) { + if status == nil { + status = &model.ChangeFeedStatus{ + ResolvedTs: c.state.Info.StartTs, + CheckpointTs: c.state.Info.StartTs, + AdminJobType: model.AdminNone, + } + return status, true, nil + } + return status, false, nil + }) + passCheck = false + } + for captureID := range captures { + if _, exist := c.state.TaskStatuses[captureID]; !exist { + c.state.PatchTaskStatus(captureID, func(status *model.TaskStatus) (*model.TaskStatus, bool, error) { + if status == nil { + status = new(model.TaskStatus) + return status, true, nil + } + return status, false, nil + }) + passCheck = false + } + } + for captureID := range c.state.TaskStatuses { + if _, exist := captures[captureID]; !exist { + c.state.PatchTaskStatus(captureID, func(status *model.TaskStatus) (*model.TaskStatus, bool, error) { + return nil, status != nil, nil + }) + passCheck = false + } + } + + for captureID := range c.state.TaskPositions { + if _, exist := captures[captureID]; !exist { + c.state.PatchTaskPosition(captureID, func(position *model.TaskPosition) (*model.TaskPosition, bool, error) { + return nil, position != nil, nil + }) + passCheck = false + } + } + for captureID := range c.state.Workloads { + if _, exist := captures[captureID]; !exist { + c.state.PatchTaskWorkload(captureID, func(workload model.TaskWorkload) (model.TaskWorkload, bool, error) { + return nil, workload != nil, nil + }) + passCheck = false + } + } + return +} + +func (c *changefeed) handleBarrier(ctx cdcContext.Context) (uint64, error) { + barrierTp, barrierTs := c.barriers.Min() + blocked := (barrierTs == c.state.Status.CheckpointTs) && (barrierTs == c.state.Status.ResolvedTs) + if blocked && c.state.Info.SyncPointEnabled { + if err := c.sink.SinkSyncpoint(ctx, barrierTs); err != nil { + return 0, errors.Trace(err) + } + } + switch barrierTp { + case ddlJobBarrier: + ddlResolvedTs, ddlJob := c.ddlPuller.FrontDDL() + if ddlJob == nil || ddlResolvedTs != barrierTs { + c.barriers.Update(ddlJobBarrier, ddlResolvedTs) + return barrierTs, nil + } + if !blocked { + return barrierTs, nil + } + done, err := c.asyncExecDDL(ctx, ddlJob) + if err != nil { + return 0, errors.Trace(err) + } + if !done { + return barrierTs, nil + } + c.ddlPuller.PopFrontDDL() + newDDLResolvedTs, _ := c.ddlPuller.FrontDDL() + c.barriers.Update(ddlJobBarrier, newDDLResolvedTs) + + case syncPointBarrier: + if !c.state.Info.SyncPointEnabled { + c.barriers.Remove(syncPointBarrier) + return barrierTs, nil + } + if !blocked { + return barrierTs, nil + } + nextSyncPointTs := oracle.GoTimeToTS(oracle.GetTimeFromTS(barrierTs).Add(c.state.Info.SyncPointInterval)) + c.barriers.Update(syncPointBarrier, nextSyncPointTs) + + case finishBarrier: + if !blocked { + return barrierTs, nil + } + c.feedStateManager.MarkFinished() + default: + log.Panic("Unknown barrier type", zap.Int("barrier type", int(barrierTp))) + } + return barrierTs, nil +} + +func (c *changefeed) asyncExecDDL(ctx cdcContext.Context, job *timodel.Job) (done bool, err error) { + if job.BinlogInfo == nil { + log.Warn("ignore the invalid DDL job", zap.Reflect("job", job)) + return true, nil + } + cyclicConfig := c.state.Info.Config.Cyclic + if cyclicConfig.IsEnabled() && !cyclicConfig.SyncDDL { + return true, nil + } + if c.ddlEventCache == nil || c.ddlEventCache.CommitTs != job.BinlogInfo.FinishedTS { + ddlEvent, err := c.schema.BuildDDLEvent(job) + if err != nil { + return false, errors.Trace(err) + } + err = c.schema.HandleDDL(job) + if err != nil { + return false, errors.Trace(err) + } + ddlEvent.Query = binloginfo.AddSpecialComment(ddlEvent.Query) + c.ddlEventCache = ddlEvent + } + if job.BinlogInfo.TableInfo != nil && c.schema.IsIneligibleTableID(job.BinlogInfo.TableInfo.ID) { + log.Warn("ignore the DDL job of ineligible table", zap.Reflect("job", job)) + return true, nil + } + done, err = c.sink.EmitDDLEvent(ctx, c.ddlEventCache) + if err != nil { + return false, err + } + if done { + c.ddlEventCache = nil + } + return done, nil +} + +func (c *changefeed) updateStatus(barrierTs model.Ts) { + resolvedTs := barrierTs + for _, position := range c.state.TaskPositions { + if resolvedTs > position.ResolvedTs { + resolvedTs = position.ResolvedTs + } + } + for _, taskStatus := range c.state.TaskStatuses { + for _, opt := range taskStatus.Operation { + if resolvedTs > opt.BoundaryTs { + resolvedTs = opt.BoundaryTs + } + } + } + checkpointTs := resolvedTs + for _, position := range c.state.TaskPositions { + if checkpointTs > position.CheckPointTs { + checkpointTs = position.CheckPointTs + } + } + c.state.PatchStatus(func(status *model.ChangeFeedStatus) (*model.ChangeFeedStatus, bool, error) { + changed := false + if status.ResolvedTs != resolvedTs { + status.ResolvedTs = resolvedTs + changed = true + } + if status.CheckpointTs != checkpointTs { + status.CheckpointTs = checkpointTs + changed = true + } + return status, changed, nil + }) + + phyTs := oracle.ExtractPhysical(checkpointTs) + c.metricsChangefeedCheckpointTsGauge.Set(float64(phyTs)) + // It is more accurate to get tso from PD, but in most cases we have + // deployed NTP service, a little bias is acceptable here. + c.metricsChangefeedCheckpointTsLagGauge.Set(float64(oracle.GetPhysical(time.Now())-phyTs) / 1e3) +} + +func (c *changefeed) Close() { + c.releaseResources() +} diff --git a/cdc/owner/changefeed_test.go b/cdc/owner/changefeed_test.go new file mode 100644 index 00000000000..a6931631c9b --- /dev/null +++ b/cdc/owner/changefeed_test.go @@ -0,0 +1,308 @@ +// Copyright 2021 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package owner + +import ( + "context" + "sync/atomic" + "time" + + "github.com/pingcap/check" + "github.com/pingcap/errors" + timodel "github.com/pingcap/parser/model" + "github.com/pingcap/ticdc/cdc/entry" + "github.com/pingcap/ticdc/cdc/model" + cdcContext "github.com/pingcap/ticdc/pkg/context" + "github.com/pingcap/ticdc/pkg/orchestrator" + "github.com/pingcap/ticdc/pkg/util/testleak" + "github.com/pingcap/tidb/store/tikv/oracle" +) + +type mockDDLPuller struct { + // DDLPuller + resolvedTs model.Ts + ddlQueue []*timodel.Job +} + +func (m *mockDDLPuller) FrontDDL() (uint64, *timodel.Job) { + if len(m.ddlQueue) > 0 { + return m.ddlQueue[0].BinlogInfo.FinishedTS, m.ddlQueue[0] + } + return m.resolvedTs, nil +} + +func (m *mockDDLPuller) PopFrontDDL() (uint64, *timodel.Job) { + if len(m.ddlQueue) > 0 { + job := m.ddlQueue[0] + m.ddlQueue = m.ddlQueue[1:] + return job.BinlogInfo.FinishedTS, job + } + return m.resolvedTs, nil +} + +func (m *mockDDLPuller) Close() {} + +func (m *mockDDLPuller) Run(ctx cdcContext.Context) error { + <-ctx.Done() + return nil +} + +type mockAsyncSink struct { + // AsyncSink + ddlExecuting *model.DDLEvent + ddlDone bool + checkpointTs model.Ts + syncPoint model.Ts + syncPointHis []model.Ts +} + +func (m *mockAsyncSink) EmitDDLEvent(ctx cdcContext.Context, ddl *model.DDLEvent) (bool, error) { + m.ddlExecuting = ddl + defer func() { m.ddlDone = false }() + return m.ddlDone, nil +} + +func (m *mockAsyncSink) SinkSyncpoint(ctx cdcContext.Context, checkpointTs uint64) error { + if checkpointTs == m.syncPoint { + return nil + } + m.syncPoint = checkpointTs + m.syncPointHis = append(m.syncPointHis, checkpointTs) + return nil +} + +func (m *mockAsyncSink) Initialize(ctx cdcContext.Context, tableInfo []*model.SimpleTableInfo) error { + return nil +} + +func (m *mockAsyncSink) EmitCheckpointTs(ctx cdcContext.Context, ts uint64) { + atomic.StoreUint64(&m.checkpointTs, ts) +} + +func (m *mockAsyncSink) Close() error { + return nil +} + +var _ = check.Suite(&changefeedSuite{}) + +type changefeedSuite struct { +} + +func createChangefeed4Test(ctx cdcContext.Context, c *check.C) (*changefeed, *model.ChangefeedReactorState, + map[model.CaptureID]*model.CaptureInfo, *orchestrator.ReactorStateTester) { + ctx.GlobalVars().PDClient = &mockPDClient{updateServiceGCSafePointFunc: func(ctx context.Context, serviceID string, ttl int64, safePoint uint64) (uint64, error) { + return safePoint, nil + }} + gcManager := newGCManager() + cf := newChangefeed4Test(ctx.ChangefeedVars().ID, gcManager, func(ctx cdcContext.Context, startTs uint64) (DDLPuller, error) { + return &mockDDLPuller{resolvedTs: startTs - 1}, nil + }, func(ctx cdcContext.Context) (AsyncSink, error) { + return &mockAsyncSink{}, nil + }) + state := model.NewChangefeedReactorState(ctx.ChangefeedVars().ID) + tester := orchestrator.NewReactorStateTester(c, state, nil) + state.PatchInfo(func(info *model.ChangeFeedInfo) (*model.ChangeFeedInfo, bool, error) { + c.Assert(info, check.IsNil) + info = ctx.ChangefeedVars().Info + return info, true, nil + }) + tester.MustApplyPatches() + captures := map[model.CaptureID]*model.CaptureInfo{ctx.GlobalVars().CaptureInfo.ID: ctx.GlobalVars().CaptureInfo} + return cf, state, captures, tester +} + +func (s *changefeedSuite) TestPreCheck(c *check.C) { + defer testleak.AfterTest(c)() + ctx := cdcContext.NewBackendContext4Test(true) + cf, state, captures, tester := createChangefeed4Test(ctx, c) + cf.Tick(ctx, state, captures) + tester.MustApplyPatches() + c.Assert(state.Status, check.NotNil) + c.Assert(state.TaskStatuses, check.HasKey, ctx.GlobalVars().CaptureInfo.ID) + + // test clean the meta data of offline capture + offlineCaputreID := "offline-capture" + state.PatchTaskStatus(offlineCaputreID, func(status *model.TaskStatus) (*model.TaskStatus, bool, error) { + return new(model.TaskStatus), true, nil + }) + state.PatchTaskPosition(offlineCaputreID, func(position *model.TaskPosition) (*model.TaskPosition, bool, error) { + return new(model.TaskPosition), true, nil + }) + state.PatchTaskWorkload(offlineCaputreID, func(workload model.TaskWorkload) (model.TaskWorkload, bool, error) { + return make(model.TaskWorkload), true, nil + }) + tester.MustApplyPatches() + + cf.Tick(ctx, state, captures) + tester.MustApplyPatches() + c.Assert(state.Status, check.NotNil) + c.Assert(state.TaskStatuses, check.HasKey, ctx.GlobalVars().CaptureInfo.ID) + c.Assert(state.TaskStatuses, check.Not(check.HasKey), offlineCaputreID) + c.Assert(state.TaskPositions, check.Not(check.HasKey), offlineCaputreID) + c.Assert(state.Workloads, check.Not(check.HasKey), offlineCaputreID) +} + +func (s *changefeedSuite) TestInitialize(c *check.C) { + defer testleak.AfterTest(c)() + ctx := cdcContext.NewBackendContext4Test(true) + cf, state, captures, tester := createChangefeed4Test(ctx, c) + // pre check + cf.Tick(ctx, state, captures) + tester.MustApplyPatches() + + // initialize + cf.Tick(ctx, state, captures) + tester.MustApplyPatches() + c.Assert(state.Status.CheckpointTs, check.Equals, ctx.ChangefeedVars().Info.StartTs) +} + +func (s *changefeedSuite) TestHandleError(c *check.C) { + defer testleak.AfterTest(c)() + ctx := cdcContext.NewBackendContext4Test(true) + cf, state, captures, tester := createChangefeed4Test(ctx, c) + // pre check + cf.Tick(ctx, state, captures) + tester.MustApplyPatches() + + // initialize + cf.Tick(ctx, state, captures) + tester.MustApplyPatches() + + cf.errCh <- errors.New("fake error") + // handle error + cf.Tick(ctx, state, captures) + tester.MustApplyPatches() + c.Assert(state.Status.CheckpointTs, check.Equals, ctx.ChangefeedVars().Info.StartTs) + c.Assert(state.Info.Error.Message, check.Equals, "fake error") +} + +func (s *changefeedSuite) TestExecDDL(c *check.C) { + defer testleak.AfterTest(c)() + ctx := cdcContext.NewBackendContext4Test(true) + cf, state, captures, tester := createChangefeed4Test(ctx, c) + helper := entry.NewSchemaTestHelper(c) + tickThreeTime := func() { + cf.Tick(ctx, state, captures) + tester.MustApplyPatches() + cf.Tick(ctx, state, captures) + tester.MustApplyPatches() + cf.Tick(ctx, state, captures) + tester.MustApplyPatches() + } + // pre check and initialize + tickThreeTime() + + // ddl puller resolved ts grow uo + mockDDLPuller := cf.ddlPuller.(*mockDDLPuller) + mockDDLPuller.resolvedTs += 1000 + mockAsyncSink := cf.sink.(*mockAsyncSink) + // three tick to make sure all barriers set in initialize is handled + tickThreeTime() + c.Assert(state.Status.CheckpointTs, check.Equals, mockDDLPuller.resolvedTs) + + // handle create database + job := helper.DDL2Job("create database test1") + mockDDLPuller.resolvedTs += 1000 + job.BinlogInfo.FinishedTS = mockDDLPuller.resolvedTs + mockDDLPuller.ddlQueue = append(mockDDLPuller.ddlQueue, job) + tickThreeTime() + c.Assert(state.Status.CheckpointTs, check.Equals, mockDDLPuller.resolvedTs) + c.Assert(mockAsyncSink.ddlExecuting.Query, check.Equals, "create database test1") + + // executing the ddl finished + mockAsyncSink.ddlDone = true + mockDDLPuller.resolvedTs += 1000 + tickThreeTime() + c.Assert(state.Status.CheckpointTs, check.Equals, mockDDLPuller.resolvedTs) + + // handle create table + job = helper.DDL2Job("create table test1.test1(id int primary key)") + mockDDLPuller.resolvedTs += 1000 + job.BinlogInfo.FinishedTS = mockDDLPuller.resolvedTs + mockDDLPuller.ddlQueue = append(mockDDLPuller.ddlQueue, job) + tickThreeTime() + c.Assert(state.Status.CheckpointTs, check.Equals, mockDDLPuller.resolvedTs) + c.Assert(mockAsyncSink.ddlExecuting.Query, check.Equals, "create table test1.test1(id int primary key)") + + // executing the ddl finished + mockAsyncSink.ddlDone = true + mockDDLPuller.resolvedTs += 1000 + tickThreeTime() + c.Assert(state.TaskStatuses[ctx.GlobalVars().CaptureInfo.ID].Tables, check.HasKey, job.TableID) +} + +func (s *changefeedSuite) TestSyncPoint(c *check.C) { + defer testleak.AfterTest(c)() + ctx := cdcContext.NewBackendContext4Test(true) + ctx.ChangefeedVars().Info.SyncPointEnabled = true + ctx.ChangefeedVars().Info.SyncPointInterval = 1 * time.Second + cf, state, captures, tester := createChangefeed4Test(ctx, c) + + // pre check + cf.Tick(ctx, state, captures) + tester.MustApplyPatches() + + // initialize + cf.Tick(ctx, state, captures) + tester.MustApplyPatches() + + mockDDLPuller := cf.ddlPuller.(*mockDDLPuller) + mockAsyncSink := cf.sink.(*mockAsyncSink) + c.Assert(mockAsyncSink.syncPoint, check.Equals, state.Status.CheckpointTs) + // add 5s to resolvedTs + mockDDLPuller.resolvedTs = oracle.GoTimeToTS(oracle.GetTimeFromTS(mockDDLPuller.resolvedTs).Add(5 * time.Second)) + // tick 20 times + for i := 0; i <= 20; i++ { + cf.Tick(ctx, state, captures) + tester.MustApplyPatches() + } + + syncPointDurations := make([]time.Duration, len(mockAsyncSink.syncPointHis)) + startGoTime := oracle.GetTimeFromTS(ctx.ChangefeedVars().Info.StartTs) + for i, syncPoint := range mockAsyncSink.syncPointHis { + syncPointDurations[i] = oracle.GetTimeFromTS(syncPoint).Sub(startGoTime) + } + c.Assert(syncPointDurations, check.DeepEquals, + []time.Duration{ + 0, 1 * time.Second, 2 * time.Second, 3 * time.Second, 4 * time.Second, + // the last sync point shoule be equal to the last checkpoint ts + oracle.GetTimeFromTS(state.Status.CheckpointTs).Sub(startGoTime), + }) +} + +func (s *changefeedSuite) TestFinished(c *check.C) { + defer testleak.AfterTest(c)() + ctx := cdcContext.NewBackendContext4Test(true) + ctx.ChangefeedVars().Info.TargetTs = ctx.ChangefeedVars().Info.StartTs + 1000 + cf, state, captures, tester := createChangefeed4Test(ctx, c) + + // pre check + cf.Tick(ctx, state, captures) + tester.MustApplyPatches() + + // initialize + cf.Tick(ctx, state, captures) + tester.MustApplyPatches() + + mockDDLPuller := cf.ddlPuller.(*mockDDLPuller) + mockDDLPuller.resolvedTs += 2000 + // tick many times to make sure the change feed is stopped + for i := 0; i <= 10; i++ { + cf.Tick(ctx, state, captures) + tester.MustApplyPatches() + } + + c.Assert(state.Status.CheckpointTs, check.Equals, state.Info.TargetTs) + c.Assert(state.Info.State, check.Equals, model.StateFinished) +} diff --git a/cdc/owner/metrics.go b/cdc/owner/metrics.go new file mode 100644 index 00000000000..9375c047041 --- /dev/null +++ b/cdc/owner/metrics.go @@ -0,0 +1,55 @@ +// Copyright 2020 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package owner + +import "github.com/prometheus/client_golang/prometheus" + +var ( + changefeedCheckpointTsGauge = prometheus.NewGaugeVec( + prometheus.GaugeOpts{ + Namespace: "ticdc", + Subsystem: "owner", + Name: "checkpoint_ts", + Help: "checkpoint ts of changefeeds", + }, []string{"changefeed"}) + changefeedCheckpointTsLagGauge = prometheus.NewGaugeVec( + prometheus.GaugeOpts{ + Namespace: "ticdc", + Subsystem: "owner", + Name: "checkpoint_ts_lag", + Help: "checkpoint ts lag of changefeeds", + }, []string{"changefeed"}) + ownershipCounter = prometheus.NewCounter( + prometheus.CounterOpts{ + Namespace: "ticdc", + Subsystem: "owner", + Name: "ownership_counter", + Help: "The counter of ownership increases every 5 seconds on a owner capture", + }) + ownerMaintainTableNumGauge = prometheus.NewGaugeVec( + prometheus.GaugeOpts{ + Namespace: "ticdc", + Subsystem: "owner", + Name: "maintain_table_num", + Help: "number of replicated tables maintained in owner", + }, []string{"changefeed", "capture", "type"}) +) + +// InitMetrics registers all metrics used in owner +func InitMetrics(registry *prometheus.Registry) { + registry.MustRegister(changefeedCheckpointTsGauge) + registry.MustRegister(changefeedCheckpointTsLagGauge) + registry.MustRegister(ownershipCounter) + registry.MustRegister(ownerMaintainTableNumGauge) +} diff --git a/cmd/client_changefeed.go b/cmd/client_changefeed.go index 7b1e6fe0d8d..b8fc3bc4e89 100644 --- a/cmd/client_changefeed.go +++ b/cmd/client_changefeed.go @@ -259,7 +259,7 @@ func verifyChangefeedParamers(ctx context.Context, cmd *cobra.Command, isCreate } startTs = oracle.ComposeTS(ts, logical) } - if err := verifyStartTs(ctx, startTs); err != nil { + if err := verifyStartTs(ctx, changefeedID, startTs); err != nil { return nil, err } if err := confirmLargeDataGap(ctx, cmd, startTs); err != nil { diff --git a/cmd/util.go b/cmd/util.go index d1b0cdb0e52..63d7581d462 100644 --- a/cmd/util.go +++ b/cmd/util.go @@ -217,11 +217,11 @@ func jsonPrint(cmd *cobra.Command, v interface{}) error { return nil } -func verifyStartTs(ctx context.Context, startTs uint64) error { +func verifyStartTs(ctx context.Context, changefeedID string, startTs uint64) error { if disableGCSafePointCheck { return nil } - return util.CheckSafetyOfStartTs(ctx, pdCli, startTs) + return util.CheckSafetyOfStartTs(ctx, pdCli, changefeedID, startTs) } func verifyTargetTs(ctx context.Context, startTs, targetTs uint64) error { diff --git a/go.sum b/go.sum index 8b987a1815d..ba196c7c1e2 100644 --- a/go.sum +++ b/go.sum @@ -678,7 +678,6 @@ github.com/unrolled/render v1.0.1/go.mod h1:gN9T0NhL4Bfbwu8ann7Ry/TGHYfosul+J0ob github.com/urfave/cli v1.20.0/go.mod h1:70zkFmudgCuE/ngEzBv17Jvp/497gISqfk5gWijbERA= github.com/urfave/cli/v2 v2.1.1/go.mod h1:SE9GqnLQmjVa0iPEY0f1w3ygNIYcIJ0OKPMoW2caLfQ= github.com/urfave/negroni v0.3.0/go.mod h1:Meg73S6kFm/4PpbYdq35yYWoCZ9mS/YSx+lKnmiohz4= -github.com/valyala/gozstd v1.7.0 h1:Ljh5c9zboqLhwTI33al32R72iCZfn0mCbVGcFWbGwRQ= github.com/valyala/gozstd v1.7.0/go.mod h1:y5Ew47GLlP37EkTB+B4s7r6A5rdaeB7ftbl9zoYiIPQ= github.com/vmihailenco/msgpack/v4 v4.3.11/go.mod h1:gborTTJjAo/GWTqqRjrLCn9pgNN+NXzzngzBKDPIqw4= github.com/vmihailenco/msgpack/v5 v5.0.0-beta.1 h1:d71/KA0LhvkrJ/Ok+Wx9qK7bU8meKA1Hk0jpVI5kJjk= diff --git a/pkg/errors/errors.go b/pkg/errors/errors.go index ca68b700dec..2633674d0cb 100644 --- a/pkg/errors/errors.go +++ b/pkg/errors/errors.go @@ -155,6 +155,7 @@ var ( ErrCaptureRegister = errors.Normalize("capture register to etcd failed", errors.RFCCodeText("CDC:ErrCaptureRegister")) ErrNewProcessorFailed = errors.Normalize("new processor failed", errors.RFCCodeText("CDC:ErrNewProcessorFailed")) ErrProcessorUnknown = errors.Normalize("processor running unknown error", errors.RFCCodeText("CDC:ErrProcessorUnknown")) + ErrOwnerUnknown = errors.Normalize("owner running unknown error", errors.RFCCodeText("CDC:ErrOwnerUnknown")) ErrProcessorTableNotFound = errors.Normalize("table not found in processor cache", errors.RFCCodeText("CDC:ErrProcessorTableNotFound")) ErrProcessorEtcdWatch = errors.Normalize("etcd watch returns error", errors.RFCCodeText("CDC:ErrProcessorEtcdWatch")) ErrProcessorSortDir = errors.Normalize("sort dir error", errors.RFCCodeText("CDC:ErrProcessorSortDir")) @@ -179,6 +180,7 @@ var ( ErrUpdateServiceSafepointFailed = errors.Normalize("updating service safepoint failed", errors.RFCCodeText("CDC:ErrUpdateServiceSafepointFailed")) ErrStartTsBeforeGC = errors.Normalize("fail to create changefeed because start-ts %d is earlier than GC safepoint at %d", errors.RFCCodeText("CDC:ErrStartTsBeforeGC")) ErrSnapshotLostByGC = errors.Normalize("fail to create or maintain changefeed due to snapshot loss caused by GC. checkpoint-ts %d is earlier than GC safepoint at %d", errors.RFCCodeText("CDC:ErrSnapshotLostByGC")) + ErrNotOwner = errors.Normalize("this capture is not a owner", errors.RFCCodeText("CDC:ErrNotOwner")) ErrTableListenReplicated = errors.Normalize("A table is being replicated by at least two processors(%s, %s), please report a bug", errors.RFCCodeText("CDC:ErrTableListenReplicated")) // EtcdWorker related errors. Internal use only. // ErrEtcdTryAgain is used by a PatchFunc to force a transaction abort. @@ -211,6 +213,8 @@ var ( // processor errors ErrTableProcessorStoppedSafely = errors.Normalize("table processor stopped safely", errors.RFCCodeText("CDC:ErrTableProcessorStoppedSafely")) + // owner errors + ErrOwnerChangedUnexpectedly = errors.Normalize("owner changed unexpectedly", errors.RFCCodeText("CDC:ErrOwnerChangedUnexpectedly")) // owner related errors ErrOwnerInconsistentStates = errors.Normalize("owner encountered inconsistent state. report a bug if this happens frequently. %s", errors.RFCCodeText("CDC:ErrOwnerInconsistentStates")) diff --git a/pkg/errors/helper.go b/pkg/errors/helper.go index 140bd6f60a0..2d50f52602a 100644 --- a/pkg/errors/helper.go +++ b/pkg/errors/helper.go @@ -31,16 +31,31 @@ func WrapError(rfcError *errors.Error, err error) error { // ChangefeedFastFailError checks the error, returns true if it is meaningless // to retry on this error func ChangefeedFastFailError(err error) bool { - return ErrStartTsBeforeGC.Equal(errors.Cause(err)) + return ErrStartTsBeforeGC.Equal(errors.Cause(err)) || ErrSnapshotLostByGC.Equal(errors.Cause(err)) } // ChangefeedFastFailErrorCode checks the error, returns true if it is meaningless // to retry on this error func ChangefeedFastFailErrorCode(errCode errors.RFCErrorCode) bool { switch errCode { - case ErrStartTsBeforeGC.RFCCode(): + case ErrStartTsBeforeGC.RFCCode(), ErrSnapshotLostByGC.RFCCode(): return true default: return false } } + +// RFCCode returns a RFCCode from an error +func RFCCode(err error) (errors.RFCErrorCode, bool) { + type rfcCoder interface { + RFCCode() errors.RFCErrorCode + } + if terr, ok := err.(rfcCoder); ok { + return terr.RFCCode(), true + } + err = errors.Cause(err) + if terr, ok := err.(rfcCoder); ok { + return terr.RFCCode(), true + } + return "", false +} diff --git a/pkg/util/gc_service.go b/pkg/util/gc_service.go index 24b6fb2cf5c..28f260b94de 100644 --- a/pkg/util/gc_service.go +++ b/pkg/util/gc_service.go @@ -23,15 +23,15 @@ import ( const ( // cdcChangefeedCreatingServiceGCSafePointID is service GC safe point ID - cdcChangefeedCreatingServiceGCSafePointID = "ticdc-changefeed-creating" + cdcChangefeedCreatingServiceGCSafePointID = "ticdc-creating-" // cdcChangefeedCreatingServiceGCSafePointTTL is service GC safe point TTL cdcChangefeedCreatingServiceGCSafePointTTL = 10 * 60 // 10 mins ) // CheckSafetyOfStartTs checks if the startTs less than the minimum of Service-GC-Ts // and this function will update the service GC to startTs -func CheckSafetyOfStartTs(ctx context.Context, pdCli pd.Client, startTs uint64) error { - minServiceGCTs, err := pdCli.UpdateServiceGCSafePoint(ctx, cdcChangefeedCreatingServiceGCSafePointID, +func CheckSafetyOfStartTs(ctx context.Context, pdCli pd.Client, changefeedID string, startTs uint64) error { + minServiceGCTs, err := pdCli.UpdateServiceGCSafePoint(ctx, cdcChangefeedCreatingServiceGCSafePointID+changefeedID, cdcChangefeedCreatingServiceGCSafePointTTL, startTs) if err != nil { return errors.Trace(err) diff --git a/pkg/util/gc_service_test.go b/pkg/util/gc_service_test.go index 322f1903eb5..84bbf3a82b6 100644 --- a/pkg/util/gc_service_test.go +++ b/pkg/util/gc_service_test.go @@ -34,13 +34,18 @@ func (s *gcServiceSuite) TestCheckSafetyOfStartTs(c *check.C) { defer testleak.AfterTest(c)() ctx := context.Background() s.pdCli.UpdateServiceGCSafePoint(ctx, "service1", 10, 60) //nolint:errcheck - err := CheckSafetyOfStartTs(ctx, s.pdCli, 50) + err := CheckSafetyOfStartTs(ctx, s.pdCli, "changefeed1", 50) c.Assert(err.Error(), check.Equals, "[CDC:ErrStartTsBeforeGC]fail to create changefeed because start-ts 50 is earlier than GC safepoint at 60") s.pdCli.UpdateServiceGCSafePoint(ctx, "service2", 10, 80) //nolint:errcheck s.pdCli.UpdateServiceGCSafePoint(ctx, "service3", 10, 70) //nolint:errcheck - err = CheckSafetyOfStartTs(ctx, s.pdCli, 65) + err = CheckSafetyOfStartTs(ctx, s.pdCli, "changefeed2", 65) c.Assert(err, check.IsNil) - c.Assert(s.pdCli.serviceSafePoint, check.DeepEquals, map[string]uint64{"service1": 60, "service2": 80, "service3": 70, "ticdc-changefeed-creating": 65}) + c.Assert(s.pdCli.serviceSafePoint, check.DeepEquals, map[string]uint64{ + "service1": 60, + "service2": 80, + "service3": 70, + "ticdc-creating-changefeed2": 65, + }) } type mockPdClientForServiceGCSafePoint struct { From 0fe1adbb2100a82ba8d5dc9ed2925babbc86baba Mon Sep 17 00:00:00 2001 From: leoppro Date: Mon, 31 May 2021 15:03:48 +0800 Subject: [PATCH 02/17] update --- cdc/entry/schema_storage.go | 5 +++++ cdc/owner/async_sink.go | 6 ++++++ cdc/owner/schema.go | 14 ++++++++++++-- pkg/context/context.go | 4 +++- 4 files changed, 26 insertions(+), 3 deletions(-) diff --git a/cdc/entry/schema_storage.go b/cdc/entry/schema_storage.go index acceb46e9da..81aa493d962 100644 --- a/cdc/entry/schema_storage.go +++ b/cdc/entry/schema_storage.go @@ -100,6 +100,11 @@ func (s *SingleSchemaSnapshot) PreTableInfo(job *timodel.Job) (*model.TableInfo, // NewSingleSchemaSnapshotFromMeta creates a new single schema snapshot from a tidb meta func NewSingleSchemaSnapshotFromMeta(meta *timeta.Meta, currentTs uint64, explicitTables bool) (*SingleSchemaSnapshot, error) { + if meta == nil { + snap := newEmptySchemaSnapshot(explicitTables) + snap.currentTs = currentTs + return snap, nil + } return newSchemaSnapshotFromMeta(meta, currentTs, explicitTables) } diff --git a/cdc/owner/async_sink.go b/cdc/owner/async_sink.go index 008978c0cd1..b6af446cc94 100644 --- a/cdc/owner/async_sink.go +++ b/cdc/owner/async_sink.go @@ -57,6 +57,8 @@ type asyncSinkImpl struct { checkpointTs model.Ts + lastSyncPoint model.Ts + ddlCh chan *model.DDLEvent ddlFinishedTs model.Ts ddlSentTs model.Ts @@ -169,6 +171,10 @@ func (s *asyncSinkImpl) EmitDDLEvent(ctx cdcContext.Context, ddl *model.DDLEvent } func (s *asyncSinkImpl) SinkSyncpoint(ctx cdcContext.Context, checkpointTs uint64) error { + if checkpointTs == s.lastSyncPoint { + return nil + } + s.lastSyncPoint = checkpointTs // TODO implement async sink syncpoint return s.syncpointStore.SinkSyncpoint(ctx, ctx.ChangefeedVars().ID, checkpointTs) } diff --git a/cdc/owner/schema.go b/cdc/owner/schema.go index 8b9fde783f0..feb70898920 100644 --- a/cdc/owner/schema.go +++ b/cdc/owner/schema.go @@ -34,6 +34,7 @@ type schemaWrap4Owner struct { config *config.ReplicaConfig allPhysicalTablesCache []model.TableID + ddlHandledTs model.Ts } func newSchemaWrap4Owner(kvStorage tidbkv.Storage, startTs model.Ts, config *config.ReplicaConfig) (*schemaWrap4Owner, error) { @@ -45,7 +46,7 @@ func newSchemaWrap4Owner(kvStorage tidbkv.Storage, startTs model.Ts, config *con return nil, errors.Trace(err) } } - schemaSnap, err := entry.NewSingleSchemaSnapshotFromMeta(meta, startTs, config.ForceReplicate) + schemaSnap, err := entry.NewSingleSchemaSnapshotFromMeta(meta, startTs-1, config.ForceReplicate) if err != nil { return nil, errors.Trace(err) } @@ -57,6 +58,7 @@ func newSchemaWrap4Owner(kvStorage tidbkv.Storage, startTs model.Ts, config *con schemaSnapshot: schemaSnap, filter: f, config: config, + ddlHandledTs: startTs - 1, }, nil } @@ -84,8 +86,16 @@ func (s *schemaWrap4Owner) AllPhysicalTables() []model.TableID { } func (s *schemaWrap4Owner) HandleDDL(job *timodel.Job) error { + if job.BinlogInfo.FinishedTS <= s.ddlHandledTs { + return nil + } s.allPhysicalTablesCache = nil - return s.schemaSnapshot.HandleDDL(job) + err := s.schemaSnapshot.HandleDDL(job) + if err != nil { + return errors.Trace(err) + } + s.ddlHandledTs = job.BinlogInfo.FinishedTS + return nil } func (s *schemaWrap4Owner) IsIneligibleTableID(tableID model.TableID) bool { diff --git a/pkg/context/context.go b/pkg/context/context.go index 323a446499c..71546b235c6 100644 --- a/pkg/context/context.go +++ b/pkg/context/context.go @@ -22,6 +22,7 @@ import ( "github.com/pingcap/ticdc/cdc/model" "github.com/pingcap/ticdc/pkg/config" tidbkv "github.com/pingcap/tidb/kv" + "github.com/pingcap/tidb/store/tikv/oracle" pd "github.com/tikv/pd/client" "go.uber.org/zap" ) @@ -184,7 +185,8 @@ func NewBackendContext4Test(withChangefeedVars bool) Context { ctx = WithChangefeedVars(ctx, &ChangefeedVars{ ID: "changefeed-id-test", Info: &model.ChangeFeedInfo{ - Config: config.GetDefaultReplicaConfig(), + StartTs: oracle.GoTimeToTS(time.Now()), + Config: config.GetDefaultReplicaConfig(), }, }) } From f0ab604ba0ff80bc1fde4432750084e184f0a854 Mon Sep 17 00:00:00 2001 From: leoppro Date: Mon, 31 May 2021 15:24:45 +0800 Subject: [PATCH 03/17] update --- errors.toml | 15 +++++++++++++++ 1 file changed, 15 insertions(+) diff --git a/errors.toml b/errors.toml index e39d13b9b5b..f61216ea429 100755 --- a/errors.toml +++ b/errors.toml @@ -496,6 +496,11 @@ error = ''' received event regionID %v, requestID %v from %v, but neither pending region nor running region was found ''' +["CDC:ErrNotOwner"] +error = ''' +this capture is not a owner +''' + ["CDC:ErrOldValueNotEnabled"] error = ''' old value is not enabled @@ -511,6 +516,11 @@ error = ''' owner campaign key deleted ''' +["CDC:ErrOwnerChangedUnexpectedly"] +error = ''' +owner changed unexpectedly +''' + ["CDC:ErrOwnerChangefeedNotFound"] error = ''' changefeed %s not found in owner cache @@ -531,6 +541,11 @@ error = ''' owner sort dir ''' +["CDC:ErrOwnerUnknown"] +error = ''' +owner running unknown error +''' + ["CDC:ErrPDBatchLoadRegions"] error = ''' pd batch load regions failed From 15f90d649679eeb2b97ff5d7d99db6c3fe3d7fd6 Mon Sep 17 00:00:00 2001 From: leoppro Date: Mon, 31 May 2021 15:26:39 +0800 Subject: [PATCH 04/17] update --- go.sum | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/go.sum b/go.sum index ba196c7c1e2..2f635261983 100644 --- a/go.sum +++ b/go.sum @@ -107,6 +107,7 @@ github.com/chzyer/test v0.0.0-20180213035817-a1ea475d72b1/go.mod h1:Q3SI9o4m/ZMn github.com/client9/misspell v0.3.4/go.mod h1:qj6jICC3Q7zFZvVWo7KLAzC3yx5G7kyvSDkc90ppPyw= github.com/cockroachdb/datadriven v0.0.0-20190809214429-80d97fb3cbaa h1:OaNxuTZr7kxeODyLWsRMC+OD03aFUH+mW6r2d+MWa5Y= github.com/cockroachdb/datadriven v0.0.0-20190809214429-80d97fb3cbaa/go.mod h1:zn76sxSg3SzpJ0PPJaLDCu+Bu0Lg3sKTORVIj19EIF8= +github.com/codahale/hdrhistogram v0.9.0 h1:9GjrtRI+mLEFPtTfR/AZhcxp+Ii8NZYWq5104FbZQY0= github.com/codahale/hdrhistogram v0.9.0/go.mod h1:sE/e/2PUdi/liOCUjSTXgM1o87ZssimdTWN964YiIeI= github.com/colinmarc/hdfs/v2 v2.1.1/go.mod h1:M3x+k8UKKmxtFu++uAZ0OtDU8jR3jnaZIAc6yK4Ue0c= github.com/coocood/bbloom v0.0.0-20190830030839-58deb6228d64 h1:W1SHiII3e0jVwvaQFglwu3kS9NLxOeTpvik7MbKCyuQ= @@ -186,9 +187,11 @@ github.com/fatih/structtag v1.2.0/go.mod h1:mBJUNpUnHmRKrKlQQlmCrh5PuhftFbNv8Ys4 github.com/fogleman/gg v1.3.0/go.mod h1:R/bRT+9gY/C5z7JzPU0zXsXHKM4/ayA+zqcVNZzPa1k= github.com/fortytw2/leaktest v1.3.0 h1:u8491cBMTQ8ft8aeV+adlcytMZylmA5nnwwkRZjI8vw= github.com/fortytw2/leaktest v1.3.0/go.mod h1:jDsjWgpAGjm2CA7WthBh/CdZYEPF31XHquHwclZch5g= +github.com/frankban/quicktest v1.10.2 h1:19ARM85nVi4xH7xPXuc5eM/udya5ieh7b/Sv+d844Tk= github.com/frankban/quicktest v1.10.2/go.mod h1:K+q6oSqb0W0Ininfk863uOk1lMy69l/P6txr3mVT54s= github.com/frankban/quicktest v1.11.1 h1:stwUsXhUGliQs9t0ZS39BWCltFdOHgABiIlihop8AD4= github.com/frankban/quicktest v1.11.1/go.mod h1:K+q6oSqb0W0Ininfk863uOk1lMy69l/P6txr3mVT54s= +github.com/fsnotify/fsnotify v1.4.7 h1:IXs+QLmnXW2CcXuY+8Mzv/fWEsPGWxqefPtCP5CnV9I= github.com/fsnotify/fsnotify v1.4.7/go.mod h1:jwhsz4b93w/PPRr/qN1Yymfu8t87LnFCMoQvtojpjFo= github.com/fsouza/fake-gcs-server v1.17.0 h1:OeH75kBZcZa3ZE+zz/mFdJ2btt9FgqfjI7gIh9+5fvk= github.com/fsouza/fake-gcs-server v1.17.0/go.mod h1:D1rTE4YCyHFNa99oyJJ5HyclvN/0uQR+pM/VdlL83bw= @@ -461,6 +464,7 @@ github.com/ngaut/pools v0.0.0-20180318154953-b7bc8c42aac7/go.mod h1:iWMfgwqYW+e8 github.com/ngaut/sync2 v0.0.0-20141008032647-7a24ed77b2ef h1:K0Fn+DoFqNqktdZtdV3bPQ/0cuYh2H4rkg0tytX/07k= github.com/ngaut/sync2 v0.0.0-20141008032647-7a24ed77b2ef/go.mod h1:7WjlapSfwQyo6LNmIvEWzsW1hbBQfpUO4JWnuQRmva8= github.com/nicksnyder/go-i18n v1.10.0/go.mod h1:HrK7VCrbOvQoUAQ7Vpy7i87N7JZZZ7R2xBGjv0j365Q= +github.com/niemeyer/pretty v0.0.0-20200227124842-a10e7caefd8e h1:fD57ERR4JtEqsWbfPhv4DMiApHyliiK5xCTNVSPiaAs= github.com/niemeyer/pretty v0.0.0-20200227124842-a10e7caefd8e/go.mod h1:zD1mROLANZcx1PVRCS0qkT7pwLkGfwJo4zjcN/Tysno= github.com/oklog/ulid v1.3.1/go.mod h1:CirwcVhetQ6Lv90oh/F+FBtV6XMibvdAFo93nm5qn4U= github.com/oleiade/reflections v1.0.1/go.mod h1:rdFxbxq4QXVZWj0F+e9jqjDkc7dbp97vkRixKo2JR60= @@ -507,6 +511,7 @@ github.com/pingcap/errors v0.11.5-0.20201029093017-5a7df2af2ac7/go.mod h1:G7x87l github.com/pingcap/errors v0.11.5-0.20201126102027-b0a155152ca3 h1:LllgC9eGfqzkfubMgjKIDyZYaa609nNWAyNZtpy2B3M= github.com/pingcap/errors v0.11.5-0.20201126102027-b0a155152ca3/go.mod h1:G7x87le1poQzLB/TqvTJI2ILrSgobnq4Ut7luOwvfvI= github.com/pingcap/failpoint v0.0.0-20191029060244-12f4ac2fd11d/go.mod h1:DNS3Qg7bEDhU6EXNHF+XSv/PGznQaMJ5FWvctpm6pQI= +github.com/pingcap/failpoint v0.0.0-20200702092429-9f69995143ce h1:Y1kCxlCtlPTMtVcOkjUcuQKh+YrluSo7+7YMCQSzy30= github.com/pingcap/failpoint v0.0.0-20200702092429-9f69995143ce/go.mod h1:w4PEZ5y16LeofeeGwdgZB4ddv9bLyDuIX+ljstgKZyk= github.com/pingcap/failpoint v0.0.0-20210316064728-7acb0f0a3dfd h1:I8IeI8MNiZVKnwuXhcIIzz6pREcOSbq18Q31KYIzFVM= github.com/pingcap/failpoint v0.0.0-20210316064728-7acb0f0a3dfd/go.mod h1:IVF+ijPSMZVtx2oIqxAg7ur6EyixtTYfOHwpfmlhqI4= @@ -521,6 +526,7 @@ github.com/pingcap/kvproto v0.0.0-20210429093846-65f54a202d7e h1:oUMZ6X/Kpaoxfej github.com/pingcap/kvproto v0.0.0-20210429093846-65f54a202d7e/go.mod h1:IOdRDPLyda8GX2hE/jO7gqaCV/PNFh8BZQCQZXfIOqI= github.com/pingcap/log v0.0.0-20191012051959-b742a5d432e9/go.mod h1:4rbK1p9ILyIfb6hU7OG2CiWSqMXnp3JMbiaVJ6mvoY8= github.com/pingcap/log v0.0.0-20200511115504-543df19646ad/go.mod h1:4rbK1p9ILyIfb6hU7OG2CiWSqMXnp3JMbiaVJ6mvoY8= +github.com/pingcap/log v0.0.0-20201112100606-8f1e84a3abc8 h1:M+DNpOu/I3uDmwee6vcnoPd6GgSMqND4gxvDQ/W584U= github.com/pingcap/log v0.0.0-20201112100606-8f1e84a3abc8/go.mod h1:4rbK1p9ILyIfb6hU7OG2CiWSqMXnp3JMbiaVJ6mvoY8= github.com/pingcap/log v0.0.0-20210317133921-96f4fcab92a4 h1:ERrF0fTuIOnwfGbt71Ji3DKbOEaP189tjym50u8gpC8= github.com/pingcap/log v0.0.0-20210317133921-96f4fcab92a4/go.mod h1:4rbK1p9ILyIfb6hU7OG2CiWSqMXnp3JMbiaVJ6mvoY8= @@ -678,6 +684,7 @@ github.com/unrolled/render v1.0.1/go.mod h1:gN9T0NhL4Bfbwu8ann7Ry/TGHYfosul+J0ob github.com/urfave/cli v1.20.0/go.mod h1:70zkFmudgCuE/ngEzBv17Jvp/497gISqfk5gWijbERA= github.com/urfave/cli/v2 v2.1.1/go.mod h1:SE9GqnLQmjVa0iPEY0f1w3ygNIYcIJ0OKPMoW2caLfQ= github.com/urfave/negroni v0.3.0/go.mod h1:Meg73S6kFm/4PpbYdq35yYWoCZ9mS/YSx+lKnmiohz4= +github.com/valyala/gozstd v1.7.0 h1:Ljh5c9zboqLhwTI33al32R72iCZfn0mCbVGcFWbGwRQ= github.com/valyala/gozstd v1.7.0/go.mod h1:y5Ew47GLlP37EkTB+B4s7r6A5rdaeB7ftbl9zoYiIPQ= github.com/vmihailenco/msgpack/v4 v4.3.11/go.mod h1:gborTTJjAo/GWTqqRjrLCn9pgNN+NXzzngzBKDPIqw4= github.com/vmihailenco/msgpack/v5 v5.0.0-beta.1 h1:d71/KA0LhvkrJ/Ok+Wx9qK7bU8meKA1Hk0jpVI5kJjk= @@ -692,6 +699,7 @@ github.com/xitongsys/parquet-go v1.5.1/go.mod h1:xUxwM8ELydxh4edHGegYq1pA8NnMKDx github.com/xitongsys/parquet-go v1.5.5-0.20201110004701-b09c49d6d457 h1:tBbuFCtyJNKT+BFAv6qjvTFpVdy97IYNaBwGUXifIUs= github.com/xitongsys/parquet-go v1.5.5-0.20201110004701-b09c49d6d457/go.mod h1:pheqtXeHQFzxJk45lRQ0UIGIivKnLXvialZSFWs81A8= github.com/xitongsys/parquet-go-source v0.0.0-20190524061010-2b72cbee77d5/go.mod h1:xxCx7Wpym/3QCo6JhujJX51dzSXrwmb0oH6FQb39SEA= +github.com/xitongsys/parquet-go-source v0.0.0-20200817004010-026bad9b25d0 h1:a742S4V5A15F93smuVxA60LQWsrCnN8bKeWDBARU1/k= github.com/xitongsys/parquet-go-source v0.0.0-20200817004010-026bad9b25d0/go.mod h1:HYhIKsdns7xz80OgkbgJYrtQY7FjHWHKH6cvN7+czGE= github.com/xordataexchange/crypt v0.0.3-0.20170626215501-b2862e3d0a77/go.mod h1:aYKd//L2LvnjZzWKhF00oedf4jCCReLcmhLdhm1A27Q= github.com/yahoo/athenz v1.8.55 h1:xGhxN3yLq334APyn0Zvcc+aqu78Q7BBhYJevM3EtTW0= @@ -1005,6 +1013,7 @@ gopkg.in/alecthomas/kingpin.v3-unstable v3.0.0-20180810215634-df19058c872c/go.mo gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/check.v1 v1.0.0-20200902074654-038fdea0a05b h1:QRR6H1YWRnHb4Y/HeNFCTJLFVxaq6wH4YuVdsUOr75U= gopkg.in/check.v1 v1.0.0-20200902074654-038fdea0a05b/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntNwaWcugrBjAiHlqqRiVk= gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c/go.mod h1:JHkPIbrfpd72SG/EVd6muEfDQjcINNoR0C8j2r3qZ4Q= From b6af45fa8d303bf2a56ff711b7f98e3451212dc2 Mon Sep 17 00:00:00 2001 From: leoppro Date: Mon, 31 May 2021 17:17:39 +0800 Subject: [PATCH 05/17] update --- cdc/owner/changefeed.go | 27 +++++++++++++++++++++------ 1 file changed, 21 insertions(+), 6 deletions(-) diff --git a/cdc/owner/changefeed.go b/cdc/owner/changefeed.go index 709f4019320..3f53cc1875b 100644 --- a/cdc/owner/changefeed.go +++ b/cdc/owner/changefeed.go @@ -42,15 +42,25 @@ type changefeed struct { feedStateManager *feedStateManager gcManager *gcManager - schema *schemaWrap4Owner - sink AsyncSink - ddlPuller DDLPuller - initialized bool + schema *schemaWrap4Owner + sink AsyncSink + ddlPuller DDLPuller + initialized bool + + // only used for asyncExecDDL function + // the ddlEventCache is not nil when the changefeed is executing the DDL event asynchronously + // when the DDL event is executed, the ddlEventCache will be set to nil ddlEventCache *model.DDLEvent errCh chan error cancel context.CancelFunc - wg sync.WaitGroup + + // the changefeed will start some backend goroutine in initialize function, + // such as DDLPuller, Sink, etc. + // the wait group is used to manager those backend goroutine. + // but the wait group only manager the DDLPuller for now. + // TODO: manager the Sink and another backend goroutine. + wg sync.WaitGroup metricsChangefeedCheckpointTsGauge prometheus.Gauge metricsChangefeedCheckpointTsLagGauge prometheus.Gauge @@ -153,7 +163,9 @@ func (c *changefeed) initialize(ctx cdcContext.Context) error { if c.initialized { return nil } - // empty the errCh + // clean the errCh + // when the changefeed is resumed after stopped, the changefeed instance will be reuse, + // so we should make sure that the errCh is empty when the changefeed restarting LOOP: for { select { @@ -236,6 +248,9 @@ func (c *changefeed) releaseResources() { c.initialized = false } +// preCheck makes sure the metadata is enough to run the tick +// if the metadata is not complete, for example, the ChangeFeedStatus is nil, +// this function will create the lost metadata and skip this tick. func (c *changefeed) preCheck(captures map[model.CaptureID]*model.CaptureInfo) (passCheck bool) { passCheck = true if c.state.Status == nil { From 9bd73db5e81be7d72db604203448d7ec4dae7156 Mon Sep 17 00:00:00 2001 From: leoppro Date: Mon, 31 May 2021 17:18:14 +0800 Subject: [PATCH 06/17] Apply suggestions from code review Co-authored-by: Zixiong Liu --- cdc/owner/changefeed.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/cdc/owner/changefeed.go b/cdc/owner/changefeed.go index 709f4019320..b64e5ec65c0 100644 --- a/cdc/owner/changefeed.go +++ b/cdc/owner/changefeed.go @@ -226,7 +226,7 @@ func (c *changefeed) releaseResources() { c.ddlPuller.Close() c.schema = nil if err := c.sink.Close(); err != nil { - log.Warn("release the owner resources failed", zap.String("changefeedID", c.state.ID), zap.Error(err)) + log.Warn("Closing sink failed in Owner", zap.String("changefeedID", c.state.ID), zap.Error(err)) } c.wg.Wait() changefeedCheckpointTsGauge.DeleteLabelValues(c.id) @@ -236,7 +236,7 @@ func (c *changefeed) releaseResources() { c.initialized = false } -func (c *changefeed) preCheck(captures map[model.CaptureID]*model.CaptureInfo) (passCheck bool) { +func (c *changefeed) preflightCheck(captures map[model.CaptureID]*model.CaptureInfo) (ok bool) { passCheck = true if c.state.Status == nil { c.state.PatchStatus(func(status *model.ChangeFeedStatus) (*model.ChangeFeedStatus, bool, error) { From 43ab5ed31a57a060ba1420ca98d27d2420aab86e Mon Sep 17 00:00:00 2001 From: leoppro Date: Mon, 31 May 2021 17:29:03 +0800 Subject: [PATCH 07/17] Update cdc/entry/schema_storage.go --- cdc/entry/schema_storage.go | 1 + 1 file changed, 1 insertion(+) diff --git a/cdc/entry/schema_storage.go b/cdc/entry/schema_storage.go index 81aa493d962..e8f0505d4e1 100644 --- a/cdc/entry/schema_storage.go +++ b/cdc/entry/schema_storage.go @@ -100,6 +100,7 @@ func (s *SingleSchemaSnapshot) PreTableInfo(job *timodel.Job) (*model.TableInfo, // NewSingleSchemaSnapshotFromMeta creates a new single schema snapshot from a tidb meta func NewSingleSchemaSnapshotFromMeta(meta *timeta.Meta, currentTs uint64, explicitTables bool) (*SingleSchemaSnapshot, error) { + // meta is nil only in unit tests if meta == nil { snap := newEmptySchemaSnapshot(explicitTables) snap.currentTs = currentTs From 0001b60ed7dd51c939e4ba09aa6e1d43b34d202a Mon Sep 17 00:00:00 2001 From: leoppro Date: Mon, 31 May 2021 17:30:21 +0800 Subject: [PATCH 08/17] Apply suggestions from code review Co-authored-by: Zixiong Liu --- cdc/owner/changefeed.go | 24 ++++++++++++------------ 1 file changed, 12 insertions(+), 12 deletions(-) diff --git a/cdc/owner/changefeed.go b/cdc/owner/changefeed.go index 818534f9118..3b71a6fb3b2 100644 --- a/cdc/owner/changefeed.go +++ b/cdc/owner/changefeed.go @@ -48,18 +48,18 @@ type changefeed struct { initialized bool // only used for asyncExecDDL function - // the ddlEventCache is not nil when the changefeed is executing the DDL event asynchronously - // when the DDL event is executed, the ddlEventCache will be set to nil + // ddlEventCache is not nil when the changefeed is executing a DDL event asynchronously + // After the DDL event has been executed, ddlEventCache will be set to nil. ddlEventCache *model.DDLEvent errCh chan error cancel context.CancelFunc - // the changefeed will start some backend goroutine in initialize function, + // The changefeed will start some backend goroutines in the function `initialize`, // such as DDLPuller, Sink, etc. - // the wait group is used to manager those backend goroutine. - // but the wait group only manager the DDLPuller for now. - // TODO: manager the Sink and another backend goroutine. + // `wg` is used to manage those backend goroutines. + // But it only manages the DDLPuller for now. + // TODO: manage the Sink and other backend goroutines. wg sync.WaitGroup metricsChangefeedCheckpointTsGauge prometheus.Gauge @@ -164,8 +164,8 @@ func (c *changefeed) initialize(ctx cdcContext.Context) error { return nil } // clean the errCh - // when the changefeed is resumed after stopped, the changefeed instance will be reuse, - // so we should make sure that the errCh is empty when the changefeed restarting + // When the changefeed is resumed after being stopped, the changefeed instance will be reused, + // So we should make sure that the errCh is empty when the changefeed is restarting LOOP: for { select { @@ -248,9 +248,9 @@ func (c *changefeed) releaseResources() { c.initialized = false } -// preflightCheck makes sure the metadata is enough to run the tick -// if the metadata is not complete, for example, the ChangeFeedStatus is nil, -// this function will create the lost metadata and skip this tick. +// preflightCheck makes sure that the metadata in Etcd is complete enough to run the tick. +// If the metadata is not complete, such as when the ChangeFeedStatus is nil, +// this function will reconstruct the lost metadata and skip this tick. func (c *changefeed) preflightCheck(captures map[model.CaptureID]*model.CaptureInfo) (ok bool) { ok = true if c.state.Status == nil { @@ -428,7 +428,7 @@ func (c *changefeed) updateStatus(barrierTs model.Ts) { phyTs := oracle.ExtractPhysical(checkpointTs) c.metricsChangefeedCheckpointTsGauge.Set(float64(phyTs)) - // It is more accurate to get tso from PD, but in most cases we have + // It is more accurate to get tso from PD, but in most cases since we have // deployed NTP service, a little bias is acceptable here. c.metricsChangefeedCheckpointTsLagGauge.Set(float64(oracle.GetPhysical(time.Now())-phyTs) / 1e3) } From 244992745deb2d92e4ba4317a79560329db1938f Mon Sep 17 00:00:00 2001 From: leoppro Date: Tue, 1 Jun 2021 10:24:15 +0800 Subject: [PATCH 09/17] update --- cdc/owner/changefeed.go | 11 ++++------- 1 file changed, 4 insertions(+), 7 deletions(-) diff --git a/cdc/owner/changefeed.go b/cdc/owner/changefeed.go index 3b71a6fb3b2..db1b0de837c 100644 --- a/cdc/owner/changefeed.go +++ b/cdc/owner/changefeed.go @@ -15,7 +15,6 @@ package owner import ( "context" - "reflect" "sync" "time" @@ -103,7 +102,7 @@ func (c *changefeed) Tick(ctx cdcContext.Context, state *model.ChangefeedReactor return nil }) if err := c.tick(ctx, state, captures); err != nil { - log.Error("an error occurred in Owner", zap.String("changefeedID", c.state.ID), zap.Error(err), zap.Stringer("tp", reflect.TypeOf(err))) + log.Error("an error occurred in Owner", zap.String("changefeedID", c.state.ID), zap.Error(err)) var code string if rfcCode, ok := cerror.RFCCode(err); ok { code = string(rfcCode) @@ -192,8 +191,10 @@ LOOP: return errors.Trace(err) } } + if c.state.Info.SyncPointEnabled { + c.barriers.Update(syncPointBarrier, startTs) + } c.barriers.Update(ddlJobBarrier, startTs) - c.barriers.Update(syncPointBarrier, startTs) c.barriers.Update(finishBarrier, c.state.Info.GetTargetTs()) var err error c.schema, err = newSchemaWrap4Owner(ctx.GlobalVars().KVStorage, startTs, c.state.Info.Config) @@ -337,10 +338,6 @@ func (c *changefeed) handleBarrier(ctx cdcContext.Context) (uint64, error) { c.barriers.Update(ddlJobBarrier, newDDLResolvedTs) case syncPointBarrier: - if !c.state.Info.SyncPointEnabled { - c.barriers.Remove(syncPointBarrier) - return barrierTs, nil - } if !blocked { return barrierTs, nil } From 6b31d42773bfa1d4fae59e60041d3db6e0c7f2ab Mon Sep 17 00:00:00 2001 From: leoppro Date: Tue, 1 Jun 2021 10:36:57 +0800 Subject: [PATCH 10/17] fix tidy --- go.sum | 8 -------- 1 file changed, 8 deletions(-) diff --git a/go.sum b/go.sum index 2f635261983..8b987a1815d 100644 --- a/go.sum +++ b/go.sum @@ -107,7 +107,6 @@ github.com/chzyer/test v0.0.0-20180213035817-a1ea475d72b1/go.mod h1:Q3SI9o4m/ZMn github.com/client9/misspell v0.3.4/go.mod h1:qj6jICC3Q7zFZvVWo7KLAzC3yx5G7kyvSDkc90ppPyw= github.com/cockroachdb/datadriven v0.0.0-20190809214429-80d97fb3cbaa h1:OaNxuTZr7kxeODyLWsRMC+OD03aFUH+mW6r2d+MWa5Y= github.com/cockroachdb/datadriven v0.0.0-20190809214429-80d97fb3cbaa/go.mod h1:zn76sxSg3SzpJ0PPJaLDCu+Bu0Lg3sKTORVIj19EIF8= -github.com/codahale/hdrhistogram v0.9.0 h1:9GjrtRI+mLEFPtTfR/AZhcxp+Ii8NZYWq5104FbZQY0= github.com/codahale/hdrhistogram v0.9.0/go.mod h1:sE/e/2PUdi/liOCUjSTXgM1o87ZssimdTWN964YiIeI= github.com/colinmarc/hdfs/v2 v2.1.1/go.mod h1:M3x+k8UKKmxtFu++uAZ0OtDU8jR3jnaZIAc6yK4Ue0c= github.com/coocood/bbloom v0.0.0-20190830030839-58deb6228d64 h1:W1SHiII3e0jVwvaQFglwu3kS9NLxOeTpvik7MbKCyuQ= @@ -187,11 +186,9 @@ github.com/fatih/structtag v1.2.0/go.mod h1:mBJUNpUnHmRKrKlQQlmCrh5PuhftFbNv8Ys4 github.com/fogleman/gg v1.3.0/go.mod h1:R/bRT+9gY/C5z7JzPU0zXsXHKM4/ayA+zqcVNZzPa1k= github.com/fortytw2/leaktest v1.3.0 h1:u8491cBMTQ8ft8aeV+adlcytMZylmA5nnwwkRZjI8vw= github.com/fortytw2/leaktest v1.3.0/go.mod h1:jDsjWgpAGjm2CA7WthBh/CdZYEPF31XHquHwclZch5g= -github.com/frankban/quicktest v1.10.2 h1:19ARM85nVi4xH7xPXuc5eM/udya5ieh7b/Sv+d844Tk= github.com/frankban/quicktest v1.10.2/go.mod h1:K+q6oSqb0W0Ininfk863uOk1lMy69l/P6txr3mVT54s= github.com/frankban/quicktest v1.11.1 h1:stwUsXhUGliQs9t0ZS39BWCltFdOHgABiIlihop8AD4= github.com/frankban/quicktest v1.11.1/go.mod h1:K+q6oSqb0W0Ininfk863uOk1lMy69l/P6txr3mVT54s= -github.com/fsnotify/fsnotify v1.4.7 h1:IXs+QLmnXW2CcXuY+8Mzv/fWEsPGWxqefPtCP5CnV9I= github.com/fsnotify/fsnotify v1.4.7/go.mod h1:jwhsz4b93w/PPRr/qN1Yymfu8t87LnFCMoQvtojpjFo= github.com/fsouza/fake-gcs-server v1.17.0 h1:OeH75kBZcZa3ZE+zz/mFdJ2btt9FgqfjI7gIh9+5fvk= github.com/fsouza/fake-gcs-server v1.17.0/go.mod h1:D1rTE4YCyHFNa99oyJJ5HyclvN/0uQR+pM/VdlL83bw= @@ -464,7 +461,6 @@ github.com/ngaut/pools v0.0.0-20180318154953-b7bc8c42aac7/go.mod h1:iWMfgwqYW+e8 github.com/ngaut/sync2 v0.0.0-20141008032647-7a24ed77b2ef h1:K0Fn+DoFqNqktdZtdV3bPQ/0cuYh2H4rkg0tytX/07k= github.com/ngaut/sync2 v0.0.0-20141008032647-7a24ed77b2ef/go.mod h1:7WjlapSfwQyo6LNmIvEWzsW1hbBQfpUO4JWnuQRmva8= github.com/nicksnyder/go-i18n v1.10.0/go.mod h1:HrK7VCrbOvQoUAQ7Vpy7i87N7JZZZ7R2xBGjv0j365Q= -github.com/niemeyer/pretty v0.0.0-20200227124842-a10e7caefd8e h1:fD57ERR4JtEqsWbfPhv4DMiApHyliiK5xCTNVSPiaAs= github.com/niemeyer/pretty v0.0.0-20200227124842-a10e7caefd8e/go.mod h1:zD1mROLANZcx1PVRCS0qkT7pwLkGfwJo4zjcN/Tysno= github.com/oklog/ulid v1.3.1/go.mod h1:CirwcVhetQ6Lv90oh/F+FBtV6XMibvdAFo93nm5qn4U= github.com/oleiade/reflections v1.0.1/go.mod h1:rdFxbxq4QXVZWj0F+e9jqjDkc7dbp97vkRixKo2JR60= @@ -511,7 +507,6 @@ github.com/pingcap/errors v0.11.5-0.20201029093017-5a7df2af2ac7/go.mod h1:G7x87l github.com/pingcap/errors v0.11.5-0.20201126102027-b0a155152ca3 h1:LllgC9eGfqzkfubMgjKIDyZYaa609nNWAyNZtpy2B3M= github.com/pingcap/errors v0.11.5-0.20201126102027-b0a155152ca3/go.mod h1:G7x87le1poQzLB/TqvTJI2ILrSgobnq4Ut7luOwvfvI= github.com/pingcap/failpoint v0.0.0-20191029060244-12f4ac2fd11d/go.mod h1:DNS3Qg7bEDhU6EXNHF+XSv/PGznQaMJ5FWvctpm6pQI= -github.com/pingcap/failpoint v0.0.0-20200702092429-9f69995143ce h1:Y1kCxlCtlPTMtVcOkjUcuQKh+YrluSo7+7YMCQSzy30= github.com/pingcap/failpoint v0.0.0-20200702092429-9f69995143ce/go.mod h1:w4PEZ5y16LeofeeGwdgZB4ddv9bLyDuIX+ljstgKZyk= github.com/pingcap/failpoint v0.0.0-20210316064728-7acb0f0a3dfd h1:I8IeI8MNiZVKnwuXhcIIzz6pREcOSbq18Q31KYIzFVM= github.com/pingcap/failpoint v0.0.0-20210316064728-7acb0f0a3dfd/go.mod h1:IVF+ijPSMZVtx2oIqxAg7ur6EyixtTYfOHwpfmlhqI4= @@ -526,7 +521,6 @@ github.com/pingcap/kvproto v0.0.0-20210429093846-65f54a202d7e h1:oUMZ6X/Kpaoxfej github.com/pingcap/kvproto v0.0.0-20210429093846-65f54a202d7e/go.mod h1:IOdRDPLyda8GX2hE/jO7gqaCV/PNFh8BZQCQZXfIOqI= github.com/pingcap/log v0.0.0-20191012051959-b742a5d432e9/go.mod h1:4rbK1p9ILyIfb6hU7OG2CiWSqMXnp3JMbiaVJ6mvoY8= github.com/pingcap/log v0.0.0-20200511115504-543df19646ad/go.mod h1:4rbK1p9ILyIfb6hU7OG2CiWSqMXnp3JMbiaVJ6mvoY8= -github.com/pingcap/log v0.0.0-20201112100606-8f1e84a3abc8 h1:M+DNpOu/I3uDmwee6vcnoPd6GgSMqND4gxvDQ/W584U= github.com/pingcap/log v0.0.0-20201112100606-8f1e84a3abc8/go.mod h1:4rbK1p9ILyIfb6hU7OG2CiWSqMXnp3JMbiaVJ6mvoY8= github.com/pingcap/log v0.0.0-20210317133921-96f4fcab92a4 h1:ERrF0fTuIOnwfGbt71Ji3DKbOEaP189tjym50u8gpC8= github.com/pingcap/log v0.0.0-20210317133921-96f4fcab92a4/go.mod h1:4rbK1p9ILyIfb6hU7OG2CiWSqMXnp3JMbiaVJ6mvoY8= @@ -699,7 +693,6 @@ github.com/xitongsys/parquet-go v1.5.1/go.mod h1:xUxwM8ELydxh4edHGegYq1pA8NnMKDx github.com/xitongsys/parquet-go v1.5.5-0.20201110004701-b09c49d6d457 h1:tBbuFCtyJNKT+BFAv6qjvTFpVdy97IYNaBwGUXifIUs= github.com/xitongsys/parquet-go v1.5.5-0.20201110004701-b09c49d6d457/go.mod h1:pheqtXeHQFzxJk45lRQ0UIGIivKnLXvialZSFWs81A8= github.com/xitongsys/parquet-go-source v0.0.0-20190524061010-2b72cbee77d5/go.mod h1:xxCx7Wpym/3QCo6JhujJX51dzSXrwmb0oH6FQb39SEA= -github.com/xitongsys/parquet-go-source v0.0.0-20200817004010-026bad9b25d0 h1:a742S4V5A15F93smuVxA60LQWsrCnN8bKeWDBARU1/k= github.com/xitongsys/parquet-go-source v0.0.0-20200817004010-026bad9b25d0/go.mod h1:HYhIKsdns7xz80OgkbgJYrtQY7FjHWHKH6cvN7+czGE= github.com/xordataexchange/crypt v0.0.3-0.20170626215501-b2862e3d0a77/go.mod h1:aYKd//L2LvnjZzWKhF00oedf4jCCReLcmhLdhm1A27Q= github.com/yahoo/athenz v1.8.55 h1:xGhxN3yLq334APyn0Zvcc+aqu78Q7BBhYJevM3EtTW0= @@ -1013,7 +1006,6 @@ gopkg.in/alecthomas/kingpin.v3-unstable v3.0.0-20180810215634-df19058c872c/go.mo gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= -gopkg.in/check.v1 v1.0.0-20200902074654-038fdea0a05b h1:QRR6H1YWRnHb4Y/HeNFCTJLFVxaq6wH4YuVdsUOr75U= gopkg.in/check.v1 v1.0.0-20200902074654-038fdea0a05b/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntNwaWcugrBjAiHlqqRiVk= gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c/go.mod h1:JHkPIbrfpd72SG/EVd6muEfDQjcINNoR0C8j2r3qZ4Q= From e7695b0aec8443535ee7b2435b6425ec6d78d0a7 Mon Sep 17 00:00:00 2001 From: leoppro Date: Tue, 1 Jun 2021 11:52:50 +0800 Subject: [PATCH 11/17] fix test leak --- cdc/owner/changefeed_test.go | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/cdc/owner/changefeed_test.go b/cdc/owner/changefeed_test.go index a6931631c9b..1ef9fc93b5c 100644 --- a/cdc/owner/changefeed_test.go +++ b/cdc/owner/changefeed_test.go @@ -157,6 +157,7 @@ func (s *changefeedSuite) TestInitialize(c *check.C) { defer testleak.AfterTest(c)() ctx := cdcContext.NewBackendContext4Test(true) cf, state, captures, tester := createChangefeed4Test(ctx, c) + defer cf.Close() // pre check cf.Tick(ctx, state, captures) tester.MustApplyPatches() @@ -171,6 +172,7 @@ func (s *changefeedSuite) TestHandleError(c *check.C) { defer testleak.AfterTest(c)() ctx := cdcContext.NewBackendContext4Test(true) cf, state, captures, tester := createChangefeed4Test(ctx, c) + defer cf.Close() // pre check cf.Tick(ctx, state, captures) tester.MustApplyPatches() @@ -191,6 +193,7 @@ func (s *changefeedSuite) TestExecDDL(c *check.C) { defer testleak.AfterTest(c)() ctx := cdcContext.NewBackendContext4Test(true) cf, state, captures, tester := createChangefeed4Test(ctx, c) + defer cf.Close() helper := entry.NewSchemaTestHelper(c) tickThreeTime := func() { cf.Tick(ctx, state, captures) @@ -248,6 +251,7 @@ func (s *changefeedSuite) TestSyncPoint(c *check.C) { ctx.ChangefeedVars().Info.SyncPointEnabled = true ctx.ChangefeedVars().Info.SyncPointInterval = 1 * time.Second cf, state, captures, tester := createChangefeed4Test(ctx, c) + defer cf.Close() // pre check cf.Tick(ctx, state, captures) @@ -286,6 +290,7 @@ func (s *changefeedSuite) TestFinished(c *check.C) { ctx := cdcContext.NewBackendContext4Test(true) ctx.ChangefeedVars().Info.TargetTs = ctx.ChangefeedVars().Info.StartTs + 1000 cf, state, captures, tester := createChangefeed4Test(ctx, c) + defer cf.Close() // pre check cf.Tick(ctx, state, captures) From 6c9bfe39ef075193c295985d64b0d36fbe25833e Mon Sep 17 00:00:00 2001 From: leoppro Date: Tue, 1 Jun 2021 12:20:58 +0800 Subject: [PATCH 12/17] fix leak test --- cdc/owner/changefeed_test.go | 1 + 1 file changed, 1 insertion(+) diff --git a/cdc/owner/changefeed_test.go b/cdc/owner/changefeed_test.go index 1ef9fc93b5c..648dcdc7b0f 100644 --- a/cdc/owner/changefeed_test.go +++ b/cdc/owner/changefeed_test.go @@ -195,6 +195,7 @@ func (s *changefeedSuite) TestExecDDL(c *check.C) { cf, state, captures, tester := createChangefeed4Test(ctx, c) defer cf.Close() helper := entry.NewSchemaTestHelper(c) + defer helper.Close() tickThreeTime := func() { cf.Tick(ctx, state, captures) tester.MustApplyPatches() From 8ac4349b55756e7137cac0c29e860454d7d6b280 Mon Sep 17 00:00:00 2001 From: leoppro Date: Tue, 1 Jun 2021 17:21:32 +0800 Subject: [PATCH 13/17] update --- cdc/owner/metrics.go | 7 ++ cdc/owner/owner.go | 271 ++++++++++++++++++++++++++++++++++++++++ cdc/owner/owner_test.go | 175 ++++++++++++++++++++++++++ 3 files changed, 453 insertions(+) create mode 100644 cdc/owner/owner.go create mode 100644 cdc/owner/owner_test.go diff --git a/cdc/owner/metrics.go b/cdc/owner/metrics.go index 9375c047041..0b1ae01d765 100644 --- a/cdc/owner/metrics.go +++ b/cdc/owner/metrics.go @@ -46,6 +46,13 @@ var ( }, []string{"changefeed", "capture", "type"}) ) +const ( + // total tables that have been dispatched to a single processor + maintainTableTypeTotal string = "total" + // tables that are dispatched to a processor and have not been finished yet + maintainTableTypeWip string = "wip" +) + // InitMetrics registers all metrics used in owner func InitMetrics(registry *prometheus.Registry) { registry.MustRegister(changefeedCheckpointTsGauge) diff --git a/cdc/owner/owner.go b/cdc/owner/owner.go new file mode 100644 index 00000000000..5a182160050 --- /dev/null +++ b/cdc/owner/owner.go @@ -0,0 +1,271 @@ +// Copyright 2021 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package owner + +import ( + "context" + "net/http" + "sync" + "sync/atomic" + "time" + + "github.com/pingcap/errors" + "github.com/pingcap/failpoint" + "github.com/pingcap/log" + "github.com/pingcap/ticdc/cdc/model" + cdcContext "github.com/pingcap/ticdc/pkg/context" + cerror "github.com/pingcap/ticdc/pkg/errors" + "github.com/pingcap/ticdc/pkg/orchestrator" + "go.uber.org/zap" +) + +type ownerJobType int + +// All AdminJob types +const ( + ownerJobTypeRebalance ownerJobType = iota + ownerJobTypeManualSchedule + ownerJobTypeAdminJob + ownerJobTypeDebugInfo +) + +type ownerJob struct { + tp ownerJobType + changefeedID model.ChangeFeedID + + // for ManualSchedule only + targetCaptureID model.CaptureID + // for ManualSchedule only + tableID model.TableID + + // for Admin Job only + adminJob *model.AdminJob + + // for debug info only + httpWriter http.ResponseWriter + + done chan struct{} +} + +// Owner manages many changefeeds +// All published functions is THREAD-SAFE, except for Tick, Tick is only used for etcd worker +type Owner struct { + changefeeds map[model.ChangeFeedID]*changefeed + + gcManager *gcManager + + ownerJobQueue []*ownerJob + ownerJobQueueMu sync.Mutex + lastTickTime time.Time + + close int32 + + newChangefeed func(id model.ChangeFeedID, gcManager *gcManager) *changefeed +} + +// NewOwner creates a new Owner +func NewOwner() *Owner { + return &Owner{ + changefeeds: make(map[model.ChangeFeedID]*changefeed), + gcManager: newGCManager(), + lastTickTime: time.Now(), + newChangefeed: newChangefeed, + } +} + +// NewOwner4Test creates a new Owner for test +func NewOwner4Test( + newDDLPuller func(ctx cdcContext.Context, startTs uint64) (DDLPuller, error), + newSink func(ctx cdcContext.Context) (AsyncSink, error)) *Owner { + o := NewOwner() + o.newChangefeed = func(id model.ChangeFeedID, gcManager *gcManager) *changefeed { + return newChangefeed4Test(id, gcManager, newDDLPuller, newSink) + } + return o +} + +// Tick implements the Reactor interface +func (o *Owner) Tick(stdCtx context.Context, rawState orchestrator.ReactorState) (nextState orchestrator.ReactorState, err error) { + failpoint.Inject("owner-run-with-error", func() { + failpoint.Return(nil, errors.New("owner run with injected error")) + }) + failpoint.Inject("sleep-in-owner-tick", nil) + ctx := stdCtx.(cdcContext.Context) + state := rawState.(*model.GlobalReactorState) + o.updateMetrics(state) + state.CheckCaptureAlive(ctx.GlobalVars().CaptureInfo.ID) + err = o.gcManager.updateGCSafePoint(ctx, state) + if err != nil { + return nil, errors.Trace(err) + } + o.handleJob() + for changefeedID, changefeedState := range state.Changefeeds { + if changefeedState.Info == nil { + o.cleanUpChangefeed(changefeedState) + continue + } + ctx = cdcContext.WithChangefeedVars(ctx, &cdcContext.ChangefeedVars{ + ID: changefeedID, + Info: changefeedState.Info, + }) + cfReactor, exist := o.changefeeds[changefeedID] + if !exist { + cfReactor = o.newChangefeed(changefeedID, o.gcManager) + o.changefeeds[changefeedID] = cfReactor + } + cfReactor.Tick(ctx, changefeedState, state.Captures) + } + if len(o.changefeeds) != len(state.Changefeeds) { + for changefeedID, cfReactor := range o.changefeeds { + if _, exist := state.Changefeeds[changefeedID]; exist { + continue + } + cfReactor.Close() + delete(o.changefeeds, changefeedID) + } + } + if atomic.LoadInt32(&o.close) != 0 { + for _, cfReactor := range o.changefeeds { + cfReactor.Close() + } + return state, cerror.ErrReactorFinished.GenWithStackByArgs() + } + return state, nil +} + +// EnqueueJob enqueues a admin job into a internal queue, and the Owner will handle the job in the next tick +func (o *Owner) EnqueueJob(adminJob model.AdminJob) { + o.pushOwnerJob(&ownerJob{ + tp: ownerJobTypeAdminJob, + adminJob: &adminJob, + changefeedID: adminJob.CfID, + done: make(chan struct{}), + }) +} + +// TriggerRebalance triggers a rebalance for the specified changefeed +func (o *Owner) TriggerRebalance(cfID model.ChangeFeedID) { + o.pushOwnerJob(&ownerJob{ + tp: ownerJobTypeRebalance, + changefeedID: cfID, + done: make(chan struct{}), + }) +} + +// ManualSchedule moves a table from a capture to another capture +func (o *Owner) ManualSchedule(cfID model.ChangeFeedID, toCapture model.CaptureID, tableID model.TableID) { + o.pushOwnerJob(&ownerJob{ + tp: ownerJobTypeManualSchedule, + changefeedID: cfID, + targetCaptureID: toCapture, + tableID: tableID, + done: make(chan struct{}), + }) +} + +// WriteDebugInfo writes debug info into the specified http writer +func (o *Owner) WriteDebugInfo(w http.ResponseWriter) { + o.pushOwnerJob(&ownerJob{ + tp: ownerJobTypeDebugInfo, + httpWriter: w, + done: make(chan struct{}), + }) +} + +// AsyncStop stops the owner asynchronously +func (o *Owner) AsyncStop() { + atomic.StoreInt32(&o.close, 1) +} + +func (o *Owner) cleanUpChangefeed(state *model.ChangefeedReactorState) { + state.PatchInfo(func(info *model.ChangeFeedInfo) (*model.ChangeFeedInfo, bool, error) { + return nil, info != nil, nil + }) + state.PatchStatus(func(status *model.ChangeFeedStatus) (*model.ChangeFeedStatus, bool, error) { + return nil, status != nil, nil + }) + for captureID := range state.TaskStatuses { + state.PatchTaskStatus(captureID, func(status *model.TaskStatus) (*model.TaskStatus, bool, error) { + return nil, status != nil, nil + }) + } + for captureID := range state.TaskPositions { + state.PatchTaskPosition(captureID, func(position *model.TaskPosition) (*model.TaskPosition, bool, error) { + return nil, position != nil, nil + }) + } + for captureID := range state.Workloads { + state.PatchTaskWorkload(captureID, func(workload model.TaskWorkload) (model.TaskWorkload, bool, error) { + return nil, workload != nil, nil + }) + } +} + +func (o *Owner) updateMetrics(state *model.GlobalReactorState) { + // Keep the value of prometheus expression `rate(counter)` = 1 + // Please also change alert rule in ticdc.rules.yml when change the expression value. + now := time.Now() + ownershipCounter.Add(float64(now.Sub(o.lastTickTime) / time.Second)) + o.lastTickTime = now + + ownerMaintainTableNumGauge.Reset() + for changefeedID, changefeedState := range state.Changefeeds { + for captureID, captureInfo := range state.Captures { + taskStatus, exist := changefeedState.TaskStatuses[captureID] + if !exist { + continue + } + ownerMaintainTableNumGauge.WithLabelValues(changefeedID, captureInfo.AdvertiseAddr, maintainTableTypeTotal).Set(float64(len(taskStatus.Tables))) + ownerMaintainTableNumGauge.WithLabelValues(changefeedID, captureInfo.AdvertiseAddr, maintainTableTypeWip).Set(float64(len(taskStatus.Operation))) + } + } +} + +func (o *Owner) handleJob() { + jobs := o.takeOnwerJobs() + for _, job := range jobs { + changefeedID := job.changefeedID + cfReactor, exist := o.changefeeds[changefeedID] + if !exist { + log.Warn("changefeed not found when handle a job", zap.Reflect("job", job)) + continue + } + switch job.tp { + case ownerJobTypeAdminJob: + cfReactor.feedStateManager.PushAdminJob(job.adminJob) + case ownerJobTypeManualSchedule: + cfReactor.scheduler.MoveTable(job.tableID, job.targetCaptureID) + case ownerJobTypeRebalance: + cfReactor.scheduler.Rebalance() + case ownerJobTypeDebugInfo: + panic("unimplemented") // TODO + } + close(job.done) + } +} + +func (o *Owner) takeOnwerJobs() []*ownerJob { + o.ownerJobQueueMu.Lock() + defer o.ownerJobQueueMu.Unlock() + + jobs := o.ownerJobQueue + o.ownerJobQueue = nil + return jobs +} + +func (o *Owner) pushOwnerJob(job *ownerJob) { + o.ownerJobQueueMu.Lock() + defer o.ownerJobQueueMu.Unlock() + o.ownerJobQueue = append(o.ownerJobQueue, job) +} diff --git a/cdc/owner/owner_test.go b/cdc/owner/owner_test.go new file mode 100644 index 00000000000..b542c51545e --- /dev/null +++ b/cdc/owner/owner_test.go @@ -0,0 +1,175 @@ +// Copyright 2021 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package owner + +import ( + "context" + "time" + + "github.com/pingcap/check" + "github.com/pingcap/ticdc/cdc/model" + "github.com/pingcap/ticdc/pkg/config" + cdcContext "github.com/pingcap/ticdc/pkg/context" + "github.com/pingcap/ticdc/pkg/etcd" + "github.com/pingcap/ticdc/pkg/orchestrator" + "github.com/pingcap/tidb/store/tikv/oracle" +) + +var _ = check.Suite(&ownerSuite{}) + +type ownerSuite struct { +} + +func createOwner4Test(ctx cdcContext.Context, c *check.C) (*Owner, *model.GlobalReactorState, *orchestrator.ReactorStateTester) { + ctx.GlobalVars().PDClient = &mockPDClient{updateServiceGCSafePointFunc: func(ctx context.Context, serviceID string, ttl int64, safePoint uint64) (uint64, error) { + return safePoint, nil + }} + cf := NewOwner4Test(func(ctx cdcContext.Context, startTs uint64) (DDLPuller, error) { + return &mockDDLPuller{resolvedTs: startTs - 1}, nil + }, func(ctx cdcContext.Context) (AsyncSink, error) { + return &mockAsyncSink{}, nil + }) + state := model.NewGlobalState().(*model.GlobalReactorState) + tester := orchestrator.NewReactorStateTester(c, state, nil) + + // set captures + cdcKey := etcd.CDCKey{ + Tp: etcd.CDCKeyTypeCapture, + CaptureID: ctx.GlobalVars().CaptureInfo.ID, + } + captureBytes, err := ctx.GlobalVars().CaptureInfo.Marshal() + c.Assert(err, check.IsNil) + tester.MustUpdate(cdcKey.String(), captureBytes) + return cf, state, tester +} + +func (s *ownerSuite) TestCreateRemoveChangefeed(c *check.C) { + ctx := cdcContext.NewBackendContext4Test(false) + owner, state, tester := createOwner4Test(ctx, c) + changefeedID := "test-changefeed" + changefeedInfo := &model.ChangeFeedInfo{ + StartTs: oracle.GoTimeToTS(time.Now()), + Config: config.GetDefaultReplicaConfig(), + } + changefeedStr, err := changefeedInfo.Marshal() + c.Assert(err, check.IsNil) + cdcKey := etcd.CDCKey{ + Tp: etcd.CDCKeyTypeChangefeedInfo, + ChangefeedID: changefeedID, + } + tester.MustUpdate(cdcKey.String(), []byte(changefeedStr)) + _, err = owner.Tick(ctx, state) + tester.MustApplyPatches() + c.Assert(err, check.IsNil) + c.Assert(owner.changefeeds, check.HasKey, changefeedID) + + // delete changefeed info key to remove changefeed + tester.MustUpdate(cdcKey.String(), nil) + // this tick to clean the leak info fo the removed changefeed + _, err = owner.Tick(ctx, state) + c.Assert(err, check.IsNil) + // this tick to remove the changefeed state in memory + tester.MustApplyPatches() + _, err = owner.Tick(ctx, state) + c.Assert(err, check.IsNil) + tester.MustApplyPatches() + c.Assert(err, check.IsNil) + c.Assert(owner.changefeeds, check.Not(check.HasKey), changefeedID) + c.Assert(state.Changefeeds, check.Not(check.HasKey), changefeedID) +} + +func (s *ownerSuite) TestStopChangefeed(c *check.C) { + ctx := cdcContext.NewBackendContext4Test(false) + owner, state, tester := createOwner4Test(ctx, c) + changefeedID := "test-changefeed" + changefeedInfo := &model.ChangeFeedInfo{ + StartTs: oracle.GoTimeToTS(time.Now()), + Config: config.GetDefaultReplicaConfig(), + } + changefeedStr, err := changefeedInfo.Marshal() + c.Assert(err, check.IsNil) + cdcKey := etcd.CDCKey{ + Tp: etcd.CDCKeyTypeChangefeedInfo, + ChangefeedID: changefeedID, + } + tester.MustUpdate(cdcKey.String(), []byte(changefeedStr)) + _, err = owner.Tick(ctx, state) + tester.MustApplyPatches() + c.Assert(err, check.IsNil) + c.Assert(owner.changefeeds, check.HasKey, changefeedID) + + // remove changefeed forcibly + owner.EnqueueJob(model.AdminJob{ + CfID: changefeedID, + Type: model.AdminRemove, + Opts: &model.AdminJobOption{ + ForceRemove: true, + }, + }) + + // this tick to clean the leak info fo the removed changefeed + _, err = owner.Tick(ctx, state) + c.Assert(err, check.IsNil) + c.Assert(err, check.IsNil) + // this tick to remove the changefeed state in memory + tester.MustApplyPatches() + _, err = owner.Tick(ctx, state) + c.Assert(err, check.IsNil) + c.Assert(err, check.IsNil) + tester.MustApplyPatches() + c.Assert(err, check.IsNil) + c.Assert(owner.changefeeds, check.Not(check.HasKey), changefeedID) + c.Assert(state.Changefeeds, check.Not(check.HasKey), changefeedID) +} + +func (s *ownerSuite) TestAdminJob(c *check.C) { + ctx := cdcContext.NewBackendContext4Test(false) + owner, _, _ := createOwner4Test(ctx, c) + owner.EnqueueJob(model.AdminJob{ + CfID: "test-changefeed1", + Type: model.AdminResume, + }) + owner.TriggerRebalance("test-changefeed2") + owner.ManualSchedule("test-changefeed3", "test-caputre1", 10) + owner.WriteDebugInfo(nil) + + // remove job.done, it's hard to check deep equals + jobs := owner.takeOnwerJobs() + for _, job := range jobs { + c.Assert(job.done, check.NotNil) + job.done = nil + } + c.Assert(jobs, check.DeepEquals, []*ownerJob{ + { + tp: ownerJobTypeAdminJob, + adminJob: &model.AdminJob{ + CfID: "test-changefeed1", + Type: model.AdminResume, + }, + changefeedID: "test-changefeed1", + }, { + tp: ownerJobTypeRebalance, + changefeedID: "test-changefeed2", + }, { + tp: ownerJobTypeManualSchedule, + changefeedID: "test-changefeed3", + targetCaptureID: "test-caputre1", + tableID: 10, + }, { + tp: ownerJobTypeDebugInfo, + httpWriter: nil, + }, + }) + c.Assert(owner.takeOnwerJobs(), check.HasLen, 0) +} From 0d828cfb1e324c38b12a8ae09003c348e3164dc1 Mon Sep 17 00:00:00 2001 From: leoppro Date: Tue, 1 Jun 2021 17:52:34 +0800 Subject: [PATCH 14/17] update test --- cdc/owner/owner_test.go | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/cdc/owner/owner_test.go b/cdc/owner/owner_test.go index b542c51545e..0c537fdd747 100644 --- a/cdc/owner/owner_test.go +++ b/cdc/owner/owner_test.go @@ -23,6 +23,7 @@ import ( cdcContext "github.com/pingcap/ticdc/pkg/context" "github.com/pingcap/ticdc/pkg/etcd" "github.com/pingcap/ticdc/pkg/orchestrator" + "github.com/pingcap/ticdc/pkg/util/testleak" "github.com/pingcap/tidb/store/tikv/oracle" ) @@ -55,6 +56,7 @@ func createOwner4Test(ctx cdcContext.Context, c *check.C) (*Owner, *model.Global } func (s *ownerSuite) TestCreateRemoveChangefeed(c *check.C) { + defer testleak.AfterTest(c)() ctx := cdcContext.NewBackendContext4Test(false) owner, state, tester := createOwner4Test(ctx, c) changefeedID := "test-changefeed" @@ -90,6 +92,7 @@ func (s *ownerSuite) TestCreateRemoveChangefeed(c *check.C) { } func (s *ownerSuite) TestStopChangefeed(c *check.C) { + defer testleak.AfterTest(c)() ctx := cdcContext.NewBackendContext4Test(false) owner, state, tester := createOwner4Test(ctx, c) changefeedID := "test-changefeed" @@ -134,6 +137,7 @@ func (s *ownerSuite) TestStopChangefeed(c *check.C) { } func (s *ownerSuite) TestAdminJob(c *check.C) { + defer testleak.AfterTest(c)() ctx := cdcContext.NewBackendContext4Test(false) owner, _, _ := createOwner4Test(ctx, c) owner.EnqueueJob(model.AdminJob{ From 4c7a45b265fd650a244d39b27fbf1c54bf8e524e Mon Sep 17 00:00:00 2001 From: leoppro Date: Tue, 1 Jun 2021 21:25:08 +0800 Subject: [PATCH 15/17] Update cdc/owner/owner.go Co-authored-by: amyangfei --- cdc/owner/owner.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cdc/owner/owner.go b/cdc/owner/owner.go index 5a182160050..95408ecc30c 100644 --- a/cdc/owner/owner.go +++ b/cdc/owner/owner.go @@ -59,7 +59,7 @@ type ownerJob struct { } // Owner manages many changefeeds -// All published functions is THREAD-SAFE, except for Tick, Tick is only used for etcd worker +// All public functions are THREAD-SAFE, except for Tick, Tick is only used for etcd worker type Owner struct { changefeeds map[model.ChangeFeedID]*changefeed From 0a90a14138dfff4c470c831d70d41e4ba2ee171f Mon Sep 17 00:00:00 2001 From: leoppro Date: Tue, 1 Jun 2021 21:56:56 +0800 Subject: [PATCH 16/17] update --- cdc/owner/owner.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/cdc/owner/owner.go b/cdc/owner/owner.go index 95408ecc30c..914ddc96c74 100644 --- a/cdc/owner/owner.go +++ b/cdc/owner/owner.go @@ -69,7 +69,7 @@ type Owner struct { ownerJobQueueMu sync.Mutex lastTickTime time.Time - close int32 + closed int32 newChangefeed func(id model.ChangeFeedID, gcManager *gcManager) *changefeed } @@ -135,7 +135,7 @@ func (o *Owner) Tick(stdCtx context.Context, rawState orchestrator.ReactorState) delete(o.changefeeds, changefeedID) } } - if atomic.LoadInt32(&o.close) != 0 { + if atomic.LoadInt32(&o.closed) != 0 { for _, cfReactor := range o.changefeeds { cfReactor.Close() } @@ -185,7 +185,7 @@ func (o *Owner) WriteDebugInfo(w http.ResponseWriter) { // AsyncStop stops the owner asynchronously func (o *Owner) AsyncStop() { - atomic.StoreInt32(&o.close, 1) + atomic.StoreInt32(&o.closed, 1) } func (o *Owner) cleanUpChangefeed(state *model.ChangefeedReactorState) { From 91401f47f613c3306346fe0e63e60715566f5f7b Mon Sep 17 00:00:00 2001 From: leoppro Date: Wed, 2 Jun 2021 11:57:12 +0800 Subject: [PATCH 17/17] update --- cdc/owner/owner.go | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/cdc/owner/owner.go b/cdc/owner/owner.go index 914ddc96c74..232373e64ff 100644 --- a/cdc/owner/owner.go +++ b/cdc/owner/owner.go @@ -65,9 +65,10 @@ type Owner struct { gcManager *gcManager - ownerJobQueue []*ownerJob ownerJobQueueMu sync.Mutex - lastTickTime time.Time + ownerJobQueue []*ownerJob + + lastTickTime time.Time closed int32 @@ -109,7 +110,7 @@ func (o *Owner) Tick(stdCtx context.Context, rawState orchestrator.ReactorState) if err != nil { return nil, errors.Trace(err) } - o.handleJob() + o.handleJobs() for changefeedID, changefeedState := range state.Changefeeds { if changefeedState.Info == nil { o.cleanUpChangefeed(changefeedState) @@ -232,7 +233,7 @@ func (o *Owner) updateMetrics(state *model.GlobalReactorState) { } } -func (o *Owner) handleJob() { +func (o *Owner) handleJobs() { jobs := o.takeOnwerJobs() for _, job := range jobs { changefeedID := job.changefeedID