Skip to content

Commit

Permalink
new_owner: a owner implement, the owner is a manager of changefeeds (p…
Browse files Browse the repository at this point in the history
  • Loading branch information
leoppro authored and leoppro committed Jun 22, 2021
1 parent 5034850 commit dd9218a
Show file tree
Hide file tree
Showing 3 changed files with 458 additions and 0 deletions.
7 changes: 7 additions & 0 deletions cdc/owner/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,13 @@ var (
}, []string{"changefeed", "capture", "type"})
)

const (
// total tables that have been dispatched to a single processor
maintainTableTypeTotal string = "total"
// tables that are dispatched to a processor and have not been finished yet
maintainTableTypeWip string = "wip"
)

// InitMetrics registers all metrics used in owner
func InitMetrics(registry *prometheus.Registry) {
registry.MustRegister(changefeedCheckpointTsGauge)
Expand Down
272 changes: 272 additions & 0 deletions cdc/owner/owner.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,272 @@
// Copyright 2021 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.

package owner

import (
"context"
"net/http"
"sync"
"sync/atomic"
"time"

"github.com/pingcap/errors"
"github.com/pingcap/failpoint"
"github.com/pingcap/log"
"github.com/pingcap/ticdc/cdc/model"
cdcContext "github.com/pingcap/ticdc/pkg/context"
cerror "github.com/pingcap/ticdc/pkg/errors"
"github.com/pingcap/ticdc/pkg/orchestrator"
"go.uber.org/zap"
)

type ownerJobType int

// All AdminJob types
const (
ownerJobTypeRebalance ownerJobType = iota
ownerJobTypeManualSchedule
ownerJobTypeAdminJob
ownerJobTypeDebugInfo
)

type ownerJob struct {
tp ownerJobType
changefeedID model.ChangeFeedID

// for ManualSchedule only
targetCaptureID model.CaptureID
// for ManualSchedule only
tableID model.TableID

// for Admin Job only
adminJob *model.AdminJob

// for debug info only
httpWriter http.ResponseWriter

done chan struct{}
}

// Owner manages many changefeeds
// All public functions are THREAD-SAFE, except for Tick, Tick is only used for etcd worker
type Owner struct {
changefeeds map[model.ChangeFeedID]*changefeed

gcManager *gcManager

ownerJobQueueMu sync.Mutex
ownerJobQueue []*ownerJob

lastTickTime time.Time

closed int32

newChangefeed func(id model.ChangeFeedID, gcManager *gcManager) *changefeed
}

// NewOwner creates a new Owner
func NewOwner() *Owner {
return &Owner{
changefeeds: make(map[model.ChangeFeedID]*changefeed),
gcManager: newGCManager(),
lastTickTime: time.Now(),
newChangefeed: newChangefeed,
}
}

// NewOwner4Test creates a new Owner for test
func NewOwner4Test(
newDDLPuller func(ctx cdcContext.Context, startTs uint64) (DDLPuller, error),
newSink func(ctx cdcContext.Context) (AsyncSink, error)) *Owner {
o := NewOwner()
o.newChangefeed = func(id model.ChangeFeedID, gcManager *gcManager) *changefeed {
return newChangefeed4Test(id, gcManager, newDDLPuller, newSink)
}
return o
}

// Tick implements the Reactor interface
func (o *Owner) Tick(stdCtx context.Context, rawState orchestrator.ReactorState) (nextState orchestrator.ReactorState, err error) {
failpoint.Inject("owner-run-with-error", func() {
failpoint.Return(nil, errors.New("owner run with injected error"))
})
failpoint.Inject("sleep-in-owner-tick", nil)
ctx := stdCtx.(cdcContext.Context)
state := rawState.(*model.GlobalReactorState)
o.updateMetrics(state)
state.CheckCaptureAlive(ctx.GlobalVars().CaptureInfo.ID)
err = o.gcManager.updateGCSafePoint(ctx, state)
if err != nil {
return nil, errors.Trace(err)
}
o.handleJobs()
for changefeedID, changefeedState := range state.Changefeeds {
if changefeedState.Info == nil {
o.cleanUpChangefeed(changefeedState)
continue
}
ctx = cdcContext.WithChangefeedVars(ctx, &cdcContext.ChangefeedVars{
ID: changefeedID,
Info: changefeedState.Info,
})
cfReactor, exist := o.changefeeds[changefeedID]
if !exist {
cfReactor = o.newChangefeed(changefeedID, o.gcManager)
o.changefeeds[changefeedID] = cfReactor
}
cfReactor.Tick(ctx, changefeedState, state.Captures)
}
if len(o.changefeeds) != len(state.Changefeeds) {
for changefeedID, cfReactor := range o.changefeeds {
if _, exist := state.Changefeeds[changefeedID]; exist {
continue
}
cfReactor.Close()
delete(o.changefeeds, changefeedID)
}
}
if atomic.LoadInt32(&o.closed) != 0 {
for _, cfReactor := range o.changefeeds {
cfReactor.Close()
}
return state, cerror.ErrReactorFinished.GenWithStackByArgs()
}
return state, nil
}

// EnqueueJob enqueues a admin job into a internal queue, and the Owner will handle the job in the next tick
func (o *Owner) EnqueueJob(adminJob model.AdminJob) {
o.pushOwnerJob(&ownerJob{
tp: ownerJobTypeAdminJob,
adminJob: &adminJob,
changefeedID: adminJob.CfID,
done: make(chan struct{}),
})
}

// TriggerRebalance triggers a rebalance for the specified changefeed
func (o *Owner) TriggerRebalance(cfID model.ChangeFeedID) {
o.pushOwnerJob(&ownerJob{
tp: ownerJobTypeRebalance,
changefeedID: cfID,
done: make(chan struct{}),
})
}

// ManualSchedule moves a table from a capture to another capture
func (o *Owner) ManualSchedule(cfID model.ChangeFeedID, toCapture model.CaptureID, tableID model.TableID) {
o.pushOwnerJob(&ownerJob{
tp: ownerJobTypeManualSchedule,
changefeedID: cfID,
targetCaptureID: toCapture,
tableID: tableID,
done: make(chan struct{}),
})
}

// WriteDebugInfo writes debug info into the specified http writer
func (o *Owner) WriteDebugInfo(w http.ResponseWriter) {
o.pushOwnerJob(&ownerJob{
tp: ownerJobTypeDebugInfo,
httpWriter: w,
done: make(chan struct{}),
})
}

// AsyncStop stops the owner asynchronously
func (o *Owner) AsyncStop() {
atomic.StoreInt32(&o.closed, 1)
}

func (o *Owner) cleanUpChangefeed(state *model.ChangefeedReactorState) {
state.PatchInfo(func(info *model.ChangeFeedInfo) (*model.ChangeFeedInfo, bool, error) {
return nil, info != nil, nil
})
state.PatchStatus(func(status *model.ChangeFeedStatus) (*model.ChangeFeedStatus, bool, error) {
return nil, status != nil, nil
})
for captureID := range state.TaskStatuses {
state.PatchTaskStatus(captureID, func(status *model.TaskStatus) (*model.TaskStatus, bool, error) {
return nil, status != nil, nil
})
}
for captureID := range state.TaskPositions {
state.PatchTaskPosition(captureID, func(position *model.TaskPosition) (*model.TaskPosition, bool, error) {
return nil, position != nil, nil
})
}
for captureID := range state.Workloads {
state.PatchTaskWorkload(captureID, func(workload model.TaskWorkload) (model.TaskWorkload, bool, error) {
return nil, workload != nil, nil
})
}
}

func (o *Owner) updateMetrics(state *model.GlobalReactorState) {
// Keep the value of prometheus expression `rate(counter)` = 1
// Please also change alert rule in ticdc.rules.yml when change the expression value.
now := time.Now()
ownershipCounter.Add(float64(now.Sub(o.lastTickTime) / time.Second))
o.lastTickTime = now

ownerMaintainTableNumGauge.Reset()
for changefeedID, changefeedState := range state.Changefeeds {
for captureID, captureInfo := range state.Captures {
taskStatus, exist := changefeedState.TaskStatuses[captureID]
if !exist {
continue
}
ownerMaintainTableNumGauge.WithLabelValues(changefeedID, captureInfo.AdvertiseAddr, maintainTableTypeTotal).Set(float64(len(taskStatus.Tables)))
ownerMaintainTableNumGauge.WithLabelValues(changefeedID, captureInfo.AdvertiseAddr, maintainTableTypeWip).Set(float64(len(taskStatus.Operation)))
}
}
}

func (o *Owner) handleJobs() {
jobs := o.takeOnwerJobs()
for _, job := range jobs {
changefeedID := job.changefeedID
cfReactor, exist := o.changefeeds[changefeedID]
if !exist {
log.Warn("changefeed not found when handle a job", zap.Reflect("job", job))
continue
}
switch job.tp {
case ownerJobTypeAdminJob:
cfReactor.feedStateManager.PushAdminJob(job.adminJob)
case ownerJobTypeManualSchedule:
cfReactor.scheduler.MoveTable(job.tableID, job.targetCaptureID)
case ownerJobTypeRebalance:
cfReactor.scheduler.Rebalance()
case ownerJobTypeDebugInfo:
panic("unimplemented") // TODO
}
close(job.done)
}
}

func (o *Owner) takeOnwerJobs() []*ownerJob {
o.ownerJobQueueMu.Lock()
defer o.ownerJobQueueMu.Unlock()

jobs := o.ownerJobQueue
o.ownerJobQueue = nil
return jobs
}

func (o *Owner) pushOwnerJob(job *ownerJob) {
o.ownerJobQueueMu.Lock()
defer o.ownerJobQueueMu.Unlock()
o.ownerJobQueue = append(o.ownerJobQueue, job)
}
Loading

0 comments on commit dd9218a

Please sign in to comment.