From dd9218afc134233f6d53f8086501bc28e6e6a32c Mon Sep 17 00:00:00 2001 From: leoppro Date: Wed, 2 Jun 2021 13:08:21 +0800 Subject: [PATCH] new_owner: a owner implement, the owner is a manager of changefeeds (#1909) --- cdc/owner/metrics.go | 7 ++ cdc/owner/owner.go | 272 ++++++++++++++++++++++++++++++++++++++++ cdc/owner/owner_test.go | 179 ++++++++++++++++++++++++++ 3 files changed, 458 insertions(+) create mode 100644 cdc/owner/owner.go create mode 100644 cdc/owner/owner_test.go diff --git a/cdc/owner/metrics.go b/cdc/owner/metrics.go index 9375c047041..0b1ae01d765 100644 --- a/cdc/owner/metrics.go +++ b/cdc/owner/metrics.go @@ -46,6 +46,13 @@ var ( }, []string{"changefeed", "capture", "type"}) ) +const ( + // total tables that have been dispatched to a single processor + maintainTableTypeTotal string = "total" + // tables that are dispatched to a processor and have not been finished yet + maintainTableTypeWip string = "wip" +) + // InitMetrics registers all metrics used in owner func InitMetrics(registry *prometheus.Registry) { registry.MustRegister(changefeedCheckpointTsGauge) diff --git a/cdc/owner/owner.go b/cdc/owner/owner.go new file mode 100644 index 00000000000..232373e64ff --- /dev/null +++ b/cdc/owner/owner.go @@ -0,0 +1,272 @@ +// Copyright 2021 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package owner + +import ( + "context" + "net/http" + "sync" + "sync/atomic" + "time" + + "github.com/pingcap/errors" + "github.com/pingcap/failpoint" + "github.com/pingcap/log" + "github.com/pingcap/ticdc/cdc/model" + cdcContext "github.com/pingcap/ticdc/pkg/context" + cerror "github.com/pingcap/ticdc/pkg/errors" + "github.com/pingcap/ticdc/pkg/orchestrator" + "go.uber.org/zap" +) + +type ownerJobType int + +// All AdminJob types +const ( + ownerJobTypeRebalance ownerJobType = iota + ownerJobTypeManualSchedule + ownerJobTypeAdminJob + ownerJobTypeDebugInfo +) + +type ownerJob struct { + tp ownerJobType + changefeedID model.ChangeFeedID + + // for ManualSchedule only + targetCaptureID model.CaptureID + // for ManualSchedule only + tableID model.TableID + + // for Admin Job only + adminJob *model.AdminJob + + // for debug info only + httpWriter http.ResponseWriter + + done chan struct{} +} + +// Owner manages many changefeeds +// All public functions are THREAD-SAFE, except for Tick, Tick is only used for etcd worker +type Owner struct { + changefeeds map[model.ChangeFeedID]*changefeed + + gcManager *gcManager + + ownerJobQueueMu sync.Mutex + ownerJobQueue []*ownerJob + + lastTickTime time.Time + + closed int32 + + newChangefeed func(id model.ChangeFeedID, gcManager *gcManager) *changefeed +} + +// NewOwner creates a new Owner +func NewOwner() *Owner { + return &Owner{ + changefeeds: make(map[model.ChangeFeedID]*changefeed), + gcManager: newGCManager(), + lastTickTime: time.Now(), + newChangefeed: newChangefeed, + } +} + +// NewOwner4Test creates a new Owner for test +func NewOwner4Test( + newDDLPuller func(ctx cdcContext.Context, startTs uint64) (DDLPuller, error), + newSink func(ctx cdcContext.Context) (AsyncSink, error)) *Owner { + o := NewOwner() + o.newChangefeed = func(id model.ChangeFeedID, gcManager *gcManager) *changefeed { + return newChangefeed4Test(id, gcManager, newDDLPuller, newSink) + } + return o +} + +// Tick implements the Reactor interface +func (o *Owner) Tick(stdCtx context.Context, rawState orchestrator.ReactorState) (nextState orchestrator.ReactorState, err error) { + failpoint.Inject("owner-run-with-error", func() { + failpoint.Return(nil, errors.New("owner run with injected error")) + }) + failpoint.Inject("sleep-in-owner-tick", nil) + ctx := stdCtx.(cdcContext.Context) + state := rawState.(*model.GlobalReactorState) + o.updateMetrics(state) + state.CheckCaptureAlive(ctx.GlobalVars().CaptureInfo.ID) + err = o.gcManager.updateGCSafePoint(ctx, state) + if err != nil { + return nil, errors.Trace(err) + } + o.handleJobs() + for changefeedID, changefeedState := range state.Changefeeds { + if changefeedState.Info == nil { + o.cleanUpChangefeed(changefeedState) + continue + } + ctx = cdcContext.WithChangefeedVars(ctx, &cdcContext.ChangefeedVars{ + ID: changefeedID, + Info: changefeedState.Info, + }) + cfReactor, exist := o.changefeeds[changefeedID] + if !exist { + cfReactor = o.newChangefeed(changefeedID, o.gcManager) + o.changefeeds[changefeedID] = cfReactor + } + cfReactor.Tick(ctx, changefeedState, state.Captures) + } + if len(o.changefeeds) != len(state.Changefeeds) { + for changefeedID, cfReactor := range o.changefeeds { + if _, exist := state.Changefeeds[changefeedID]; exist { + continue + } + cfReactor.Close() + delete(o.changefeeds, changefeedID) + } + } + if atomic.LoadInt32(&o.closed) != 0 { + for _, cfReactor := range o.changefeeds { + cfReactor.Close() + } + return state, cerror.ErrReactorFinished.GenWithStackByArgs() + } + return state, nil +} + +// EnqueueJob enqueues a admin job into a internal queue, and the Owner will handle the job in the next tick +func (o *Owner) EnqueueJob(adminJob model.AdminJob) { + o.pushOwnerJob(&ownerJob{ + tp: ownerJobTypeAdminJob, + adminJob: &adminJob, + changefeedID: adminJob.CfID, + done: make(chan struct{}), + }) +} + +// TriggerRebalance triggers a rebalance for the specified changefeed +func (o *Owner) TriggerRebalance(cfID model.ChangeFeedID) { + o.pushOwnerJob(&ownerJob{ + tp: ownerJobTypeRebalance, + changefeedID: cfID, + done: make(chan struct{}), + }) +} + +// ManualSchedule moves a table from a capture to another capture +func (o *Owner) ManualSchedule(cfID model.ChangeFeedID, toCapture model.CaptureID, tableID model.TableID) { + o.pushOwnerJob(&ownerJob{ + tp: ownerJobTypeManualSchedule, + changefeedID: cfID, + targetCaptureID: toCapture, + tableID: tableID, + done: make(chan struct{}), + }) +} + +// WriteDebugInfo writes debug info into the specified http writer +func (o *Owner) WriteDebugInfo(w http.ResponseWriter) { + o.pushOwnerJob(&ownerJob{ + tp: ownerJobTypeDebugInfo, + httpWriter: w, + done: make(chan struct{}), + }) +} + +// AsyncStop stops the owner asynchronously +func (o *Owner) AsyncStop() { + atomic.StoreInt32(&o.closed, 1) +} + +func (o *Owner) cleanUpChangefeed(state *model.ChangefeedReactorState) { + state.PatchInfo(func(info *model.ChangeFeedInfo) (*model.ChangeFeedInfo, bool, error) { + return nil, info != nil, nil + }) + state.PatchStatus(func(status *model.ChangeFeedStatus) (*model.ChangeFeedStatus, bool, error) { + return nil, status != nil, nil + }) + for captureID := range state.TaskStatuses { + state.PatchTaskStatus(captureID, func(status *model.TaskStatus) (*model.TaskStatus, bool, error) { + return nil, status != nil, nil + }) + } + for captureID := range state.TaskPositions { + state.PatchTaskPosition(captureID, func(position *model.TaskPosition) (*model.TaskPosition, bool, error) { + return nil, position != nil, nil + }) + } + for captureID := range state.Workloads { + state.PatchTaskWorkload(captureID, func(workload model.TaskWorkload) (model.TaskWorkload, bool, error) { + return nil, workload != nil, nil + }) + } +} + +func (o *Owner) updateMetrics(state *model.GlobalReactorState) { + // Keep the value of prometheus expression `rate(counter)` = 1 + // Please also change alert rule in ticdc.rules.yml when change the expression value. + now := time.Now() + ownershipCounter.Add(float64(now.Sub(o.lastTickTime) / time.Second)) + o.lastTickTime = now + + ownerMaintainTableNumGauge.Reset() + for changefeedID, changefeedState := range state.Changefeeds { + for captureID, captureInfo := range state.Captures { + taskStatus, exist := changefeedState.TaskStatuses[captureID] + if !exist { + continue + } + ownerMaintainTableNumGauge.WithLabelValues(changefeedID, captureInfo.AdvertiseAddr, maintainTableTypeTotal).Set(float64(len(taskStatus.Tables))) + ownerMaintainTableNumGauge.WithLabelValues(changefeedID, captureInfo.AdvertiseAddr, maintainTableTypeWip).Set(float64(len(taskStatus.Operation))) + } + } +} + +func (o *Owner) handleJobs() { + jobs := o.takeOnwerJobs() + for _, job := range jobs { + changefeedID := job.changefeedID + cfReactor, exist := o.changefeeds[changefeedID] + if !exist { + log.Warn("changefeed not found when handle a job", zap.Reflect("job", job)) + continue + } + switch job.tp { + case ownerJobTypeAdminJob: + cfReactor.feedStateManager.PushAdminJob(job.adminJob) + case ownerJobTypeManualSchedule: + cfReactor.scheduler.MoveTable(job.tableID, job.targetCaptureID) + case ownerJobTypeRebalance: + cfReactor.scheduler.Rebalance() + case ownerJobTypeDebugInfo: + panic("unimplemented") // TODO + } + close(job.done) + } +} + +func (o *Owner) takeOnwerJobs() []*ownerJob { + o.ownerJobQueueMu.Lock() + defer o.ownerJobQueueMu.Unlock() + + jobs := o.ownerJobQueue + o.ownerJobQueue = nil + return jobs +} + +func (o *Owner) pushOwnerJob(job *ownerJob) { + o.ownerJobQueueMu.Lock() + defer o.ownerJobQueueMu.Unlock() + o.ownerJobQueue = append(o.ownerJobQueue, job) +} diff --git a/cdc/owner/owner_test.go b/cdc/owner/owner_test.go new file mode 100644 index 00000000000..0c537fdd747 --- /dev/null +++ b/cdc/owner/owner_test.go @@ -0,0 +1,179 @@ +// Copyright 2021 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package owner + +import ( + "context" + "time" + + "github.com/pingcap/check" + "github.com/pingcap/ticdc/cdc/model" + "github.com/pingcap/ticdc/pkg/config" + cdcContext "github.com/pingcap/ticdc/pkg/context" + "github.com/pingcap/ticdc/pkg/etcd" + "github.com/pingcap/ticdc/pkg/orchestrator" + "github.com/pingcap/ticdc/pkg/util/testleak" + "github.com/pingcap/tidb/store/tikv/oracle" +) + +var _ = check.Suite(&ownerSuite{}) + +type ownerSuite struct { +} + +func createOwner4Test(ctx cdcContext.Context, c *check.C) (*Owner, *model.GlobalReactorState, *orchestrator.ReactorStateTester) { + ctx.GlobalVars().PDClient = &mockPDClient{updateServiceGCSafePointFunc: func(ctx context.Context, serviceID string, ttl int64, safePoint uint64) (uint64, error) { + return safePoint, nil + }} + cf := NewOwner4Test(func(ctx cdcContext.Context, startTs uint64) (DDLPuller, error) { + return &mockDDLPuller{resolvedTs: startTs - 1}, nil + }, func(ctx cdcContext.Context) (AsyncSink, error) { + return &mockAsyncSink{}, nil + }) + state := model.NewGlobalState().(*model.GlobalReactorState) + tester := orchestrator.NewReactorStateTester(c, state, nil) + + // set captures + cdcKey := etcd.CDCKey{ + Tp: etcd.CDCKeyTypeCapture, + CaptureID: ctx.GlobalVars().CaptureInfo.ID, + } + captureBytes, err := ctx.GlobalVars().CaptureInfo.Marshal() + c.Assert(err, check.IsNil) + tester.MustUpdate(cdcKey.String(), captureBytes) + return cf, state, tester +} + +func (s *ownerSuite) TestCreateRemoveChangefeed(c *check.C) { + defer testleak.AfterTest(c)() + ctx := cdcContext.NewBackendContext4Test(false) + owner, state, tester := createOwner4Test(ctx, c) + changefeedID := "test-changefeed" + changefeedInfo := &model.ChangeFeedInfo{ + StartTs: oracle.GoTimeToTS(time.Now()), + Config: config.GetDefaultReplicaConfig(), + } + changefeedStr, err := changefeedInfo.Marshal() + c.Assert(err, check.IsNil) + cdcKey := etcd.CDCKey{ + Tp: etcd.CDCKeyTypeChangefeedInfo, + ChangefeedID: changefeedID, + } + tester.MustUpdate(cdcKey.String(), []byte(changefeedStr)) + _, err = owner.Tick(ctx, state) + tester.MustApplyPatches() + c.Assert(err, check.IsNil) + c.Assert(owner.changefeeds, check.HasKey, changefeedID) + + // delete changefeed info key to remove changefeed + tester.MustUpdate(cdcKey.String(), nil) + // this tick to clean the leak info fo the removed changefeed + _, err = owner.Tick(ctx, state) + c.Assert(err, check.IsNil) + // this tick to remove the changefeed state in memory + tester.MustApplyPatches() + _, err = owner.Tick(ctx, state) + c.Assert(err, check.IsNil) + tester.MustApplyPatches() + c.Assert(err, check.IsNil) + c.Assert(owner.changefeeds, check.Not(check.HasKey), changefeedID) + c.Assert(state.Changefeeds, check.Not(check.HasKey), changefeedID) +} + +func (s *ownerSuite) TestStopChangefeed(c *check.C) { + defer testleak.AfterTest(c)() + ctx := cdcContext.NewBackendContext4Test(false) + owner, state, tester := createOwner4Test(ctx, c) + changefeedID := "test-changefeed" + changefeedInfo := &model.ChangeFeedInfo{ + StartTs: oracle.GoTimeToTS(time.Now()), + Config: config.GetDefaultReplicaConfig(), + } + changefeedStr, err := changefeedInfo.Marshal() + c.Assert(err, check.IsNil) + cdcKey := etcd.CDCKey{ + Tp: etcd.CDCKeyTypeChangefeedInfo, + ChangefeedID: changefeedID, + } + tester.MustUpdate(cdcKey.String(), []byte(changefeedStr)) + _, err = owner.Tick(ctx, state) + tester.MustApplyPatches() + c.Assert(err, check.IsNil) + c.Assert(owner.changefeeds, check.HasKey, changefeedID) + + // remove changefeed forcibly + owner.EnqueueJob(model.AdminJob{ + CfID: changefeedID, + Type: model.AdminRemove, + Opts: &model.AdminJobOption{ + ForceRemove: true, + }, + }) + + // this tick to clean the leak info fo the removed changefeed + _, err = owner.Tick(ctx, state) + c.Assert(err, check.IsNil) + c.Assert(err, check.IsNil) + // this tick to remove the changefeed state in memory + tester.MustApplyPatches() + _, err = owner.Tick(ctx, state) + c.Assert(err, check.IsNil) + c.Assert(err, check.IsNil) + tester.MustApplyPatches() + c.Assert(err, check.IsNil) + c.Assert(owner.changefeeds, check.Not(check.HasKey), changefeedID) + c.Assert(state.Changefeeds, check.Not(check.HasKey), changefeedID) +} + +func (s *ownerSuite) TestAdminJob(c *check.C) { + defer testleak.AfterTest(c)() + ctx := cdcContext.NewBackendContext4Test(false) + owner, _, _ := createOwner4Test(ctx, c) + owner.EnqueueJob(model.AdminJob{ + CfID: "test-changefeed1", + Type: model.AdminResume, + }) + owner.TriggerRebalance("test-changefeed2") + owner.ManualSchedule("test-changefeed3", "test-caputre1", 10) + owner.WriteDebugInfo(nil) + + // remove job.done, it's hard to check deep equals + jobs := owner.takeOnwerJobs() + for _, job := range jobs { + c.Assert(job.done, check.NotNil) + job.done = nil + } + c.Assert(jobs, check.DeepEquals, []*ownerJob{ + { + tp: ownerJobTypeAdminJob, + adminJob: &model.AdminJob{ + CfID: "test-changefeed1", + Type: model.AdminResume, + }, + changefeedID: "test-changefeed1", + }, { + tp: ownerJobTypeRebalance, + changefeedID: "test-changefeed2", + }, { + tp: ownerJobTypeManualSchedule, + changefeedID: "test-changefeed3", + targetCaptureID: "test-caputre1", + tableID: 10, + }, { + tp: ownerJobTypeDebugInfo, + httpWriter: nil, + }, + }) + c.Assert(owner.takeOnwerJobs(), check.HasLen, 0) +}