Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

new_owner: a owner implement, the owner is a manager of changefeeds #1909

Merged
merged 27 commits into from
Jun 2, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions cdc/owner/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,13 @@ var (
}, []string{"changefeed", "capture", "type"})
)

const (
// total tables that have been dispatched to a single processor
maintainTableTypeTotal string = "total"
// tables that are dispatched to a processor and have not been finished yet
maintainTableTypeWip string = "wip"
)

// InitMetrics registers all metrics used in owner
func InitMetrics(registry *prometheus.Registry) {
registry.MustRegister(changefeedCheckpointTsGauge)
Expand Down
272 changes: 272 additions & 0 deletions cdc/owner/owner.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,272 @@
// Copyright 2021 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.

package owner

import (
"context"
"net/http"
"sync"
"sync/atomic"
"time"

"github.com/pingcap/errors"
"github.com/pingcap/failpoint"
"github.com/pingcap/log"
"github.com/pingcap/ticdc/cdc/model"
cdcContext "github.com/pingcap/ticdc/pkg/context"
cerror "github.com/pingcap/ticdc/pkg/errors"
"github.com/pingcap/ticdc/pkg/orchestrator"
"go.uber.org/zap"
)

type ownerJobType int

// All AdminJob types
const (
ownerJobTypeRebalance ownerJobType = iota
ownerJobTypeManualSchedule
ownerJobTypeAdminJob
ownerJobTypeDebugInfo
)

type ownerJob struct {
tp ownerJobType
changefeedID model.ChangeFeedID

// for ManualSchedule only
targetCaptureID model.CaptureID
// for ManualSchedule only
tableID model.TableID

// for Admin Job only
adminJob *model.AdminJob

// for debug info only
httpWriter http.ResponseWriter

done chan struct{}
}

// Owner manages many changefeeds
// All public functions are THREAD-SAFE, except for Tick, Tick is only used for etcd worker
type Owner struct {
changefeeds map[model.ChangeFeedID]*changefeed

gcManager *gcManager

ownerJobQueueMu sync.Mutex
zier-one marked this conversation as resolved.
Show resolved Hide resolved
ownerJobQueue []*ownerJob

lastTickTime time.Time

closed int32

newChangefeed func(id model.ChangeFeedID, gcManager *gcManager) *changefeed
}

// NewOwner creates a new Owner
func NewOwner() *Owner {
return &Owner{
changefeeds: make(map[model.ChangeFeedID]*changefeed),
gcManager: newGCManager(),
lastTickTime: time.Now(),
newChangefeed: newChangefeed,
}
}

// NewOwner4Test creates a new Owner for test
func NewOwner4Test(
newDDLPuller func(ctx cdcContext.Context, startTs uint64) (DDLPuller, error),
newSink func(ctx cdcContext.Context) (AsyncSink, error)) *Owner {
o := NewOwner()
o.newChangefeed = func(id model.ChangeFeedID, gcManager *gcManager) *changefeed {
return newChangefeed4Test(id, gcManager, newDDLPuller, newSink)
}
return o
}

// Tick implements the Reactor interface
func (o *Owner) Tick(stdCtx context.Context, rawState orchestrator.ReactorState) (nextState orchestrator.ReactorState, err error) {
failpoint.Inject("owner-run-with-error", func() {
failpoint.Return(nil, errors.New("owner run with injected error"))
})
failpoint.Inject("sleep-in-owner-tick", nil)
ctx := stdCtx.(cdcContext.Context)
state := rawState.(*model.GlobalReactorState)
o.updateMetrics(state)
state.CheckCaptureAlive(ctx.GlobalVars().CaptureInfo.ID)
err = o.gcManager.updateGCSafePoint(ctx, state)
if err != nil {
return nil, errors.Trace(err)
}
o.handleJobs()
for changefeedID, changefeedState := range state.Changefeeds {
if changefeedState.Info == nil {
o.cleanUpChangefeed(changefeedState)
continue
}
ctx = cdcContext.WithChangefeedVars(ctx, &cdcContext.ChangefeedVars{
ID: changefeedID,
Info: changefeedState.Info,
})
cfReactor, exist := o.changefeeds[changefeedID]
if !exist {
cfReactor = o.newChangefeed(changefeedID, o.gcManager)
o.changefeeds[changefeedID] = cfReactor
}
cfReactor.Tick(ctx, changefeedState, state.Captures)
}
if len(o.changefeeds) != len(state.Changefeeds) {
for changefeedID, cfReactor := range o.changefeeds {
if _, exist := state.Changefeeds[changefeedID]; exist {
continue
}
cfReactor.Close()
delete(o.changefeeds, changefeedID)
zier-one marked this conversation as resolved.
Show resolved Hide resolved
}
}
if atomic.LoadInt32(&o.closed) != 0 {
for _, cfReactor := range o.changefeeds {
cfReactor.Close()
}
return state, cerror.ErrReactorFinished.GenWithStackByArgs()
}
return state, nil
}

// EnqueueJob enqueues a admin job into a internal queue, and the Owner will handle the job in the next tick
func (o *Owner) EnqueueJob(adminJob model.AdminJob) {
o.pushOwnerJob(&ownerJob{
tp: ownerJobTypeAdminJob,
adminJob: &adminJob,
changefeedID: adminJob.CfID,
done: make(chan struct{}),
})
}

// TriggerRebalance triggers a rebalance for the specified changefeed
func (o *Owner) TriggerRebalance(cfID model.ChangeFeedID) {
o.pushOwnerJob(&ownerJob{
tp: ownerJobTypeRebalance,
changefeedID: cfID,
done: make(chan struct{}),
})
}

// ManualSchedule moves a table from a capture to another capture
func (o *Owner) ManualSchedule(cfID model.ChangeFeedID, toCapture model.CaptureID, tableID model.TableID) {
o.pushOwnerJob(&ownerJob{
tp: ownerJobTypeManualSchedule,
changefeedID: cfID,
targetCaptureID: toCapture,
tableID: tableID,
done: make(chan struct{}),
})
}

// WriteDebugInfo writes debug info into the specified http writer
func (o *Owner) WriteDebugInfo(w http.ResponseWriter) {
o.pushOwnerJob(&ownerJob{
tp: ownerJobTypeDebugInfo,
httpWriter: w,
done: make(chan struct{}),
})
}

// AsyncStop stops the owner asynchronously
func (o *Owner) AsyncStop() {
atomic.StoreInt32(&o.closed, 1)
}

func (o *Owner) cleanUpChangefeed(state *model.ChangefeedReactorState) {
state.PatchInfo(func(info *model.ChangeFeedInfo) (*model.ChangeFeedInfo, bool, error) {
return nil, info != nil, nil
})
state.PatchStatus(func(status *model.ChangeFeedStatus) (*model.ChangeFeedStatus, bool, error) {
return nil, status != nil, nil
})
for captureID := range state.TaskStatuses {
state.PatchTaskStatus(captureID, func(status *model.TaskStatus) (*model.TaskStatus, bool, error) {
return nil, status != nil, nil
})
}
for captureID := range state.TaskPositions {
state.PatchTaskPosition(captureID, func(position *model.TaskPosition) (*model.TaskPosition, bool, error) {
return nil, position != nil, nil
})
}
for captureID := range state.Workloads {
state.PatchTaskWorkload(captureID, func(workload model.TaskWorkload) (model.TaskWorkload, bool, error) {
return nil, workload != nil, nil
})
}
}

func (o *Owner) updateMetrics(state *model.GlobalReactorState) {
// Keep the value of prometheus expression `rate(counter)` = 1
// Please also change alert rule in ticdc.rules.yml when change the expression value.
now := time.Now()
ownershipCounter.Add(float64(now.Sub(o.lastTickTime) / time.Second))
zier-one marked this conversation as resolved.
Show resolved Hide resolved
o.lastTickTime = now

ownerMaintainTableNumGauge.Reset()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since the new owner can control the removal of capture and changefeed(or by comparing memory data with etcd data), can we just cleanup the related metrics when the owner executing removal operation?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Theoretically, it is possible that we need to maintain the metadata of the last tick and compare it with the current tick

for changefeedID, changefeedState := range state.Changefeeds {
for captureID, captureInfo := range state.Captures {
taskStatus, exist := changefeedState.TaskStatuses[captureID]
if !exist {
continue
}
ownerMaintainTableNumGauge.WithLabelValues(changefeedID, captureInfo.AdvertiseAddr, maintainTableTypeTotal).Set(float64(len(taskStatus.Tables)))
ownerMaintainTableNumGauge.WithLabelValues(changefeedID, captureInfo.AdvertiseAddr, maintainTableTypeWip).Set(float64(len(taskStatus.Operation)))
}
}
}

func (o *Owner) handleJobs() {
jobs := o.takeOnwerJobs()
for _, job := range jobs {
changefeedID := job.changefeedID
cfReactor, exist := o.changefeeds[changefeedID]
if !exist {
log.Warn("changefeed not found when handle a job", zap.Reflect("job", job))
continue
}
switch job.tp {
case ownerJobTypeAdminJob:
cfReactor.feedStateManager.PushAdminJob(job.adminJob)
case ownerJobTypeManualSchedule:
cfReactor.scheduler.MoveTable(job.tableID, job.targetCaptureID)
case ownerJobTypeRebalance:
cfReactor.scheduler.Rebalance()
case ownerJobTypeDebugInfo:
panic("unimplemented") // TODO
}
close(job.done)
}
}

func (o *Owner) takeOnwerJobs() []*ownerJob {
o.ownerJobQueueMu.Lock()
defer o.ownerJobQueueMu.Unlock()

jobs := o.ownerJobQueue
o.ownerJobQueue = nil
return jobs
}

func (o *Owner) pushOwnerJob(job *ownerJob) {
o.ownerJobQueueMu.Lock()
defer o.ownerJobQueueMu.Unlock()
o.ownerJobQueue = append(o.ownerJobQueue, job)
}
Loading