Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

metrics(ticdc): add some log and metrics to owner and processorManage… #4402

Merged
33 changes: 32 additions & 1 deletion cdc/owner/changefeed.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"context"
"strings"
"sync"
"time"

"github.com/pingcap/errors"
"github.com/pingcap/failpoint"
Expand Down Expand Up @@ -72,6 +73,7 @@ type changefeed struct {
metricsChangefeedCheckpointTsLagGauge prometheus.Gauge
metricsChangefeedResolvedTsGauge prometheus.Gauge
metricsChangefeedResolvedTsLagGauge prometheus.Gauge
metricChangefeedTickDuration prometheus.Observer

newDDLPuller func(ctx cdcContext.Context, startTs uint64) (DDLPuller, error)
newSink func() DDLSink
Expand Down Expand Up @@ -109,12 +111,25 @@ func newChangefeed4Test(
}

func (c *changefeed) Tick(ctx cdcContext.Context, state *orchestrator.ChangefeedReactorState, captures map[model.CaptureID]*model.CaptureInfo) {
startTime := time.Now()

ctx = cdcContext.WithErrorHandler(ctx, func(err error) error {
c.errCh <- errors.Trace(err)
return nil
})
state.CheckCaptureAlive(ctx.GlobalVars().CaptureInfo.ID)
if err := c.tick(ctx, state, captures); err != nil {
err := c.tick(ctx, state, captures)

// The tick duration is recorded only if changefeed has completed initialization
if c.initialized {
costTime := time.Since(startTime)
if costTime > changefeedLogsWarnDuration {
log.Warn("changefeed tick took too long", zap.String("changefeed", c.id), zap.Duration("duration", costTime))
}
c.metricChangefeedTickDuration.Observe(costTime.Seconds())
}

if err != nil {
log.Error("an error occurred in Owner", zap.String("changefeed", c.state.ID), zap.Error(err))
var code string
if rfcCode, ok := cerror.RFCCode(err); ok {
Expand Down Expand Up @@ -185,7 +200,12 @@ func (c *changefeed) tick(ctx cdcContext.Context, state *orchestrator.Changefeed
// So we return here.
return nil
}
startTime := time.Now()
newCheckpointTs, newResolvedTs, err := c.scheduler.Tick(ctx, c.state, c.schema.AllPhysicalTables(), captures)
costTime := time.Since(startTime)
if costTime > schedulerLogsWarnDuration {
log.Warn("scheduler tick took too long", zap.String("changefeed", c.id), zap.Duration("duration", costTime))
}
if err != nil {
return errors.Trace(err)
}
Expand Down Expand Up @@ -296,6 +316,7 @@ LOOP:
c.metricsChangefeedCheckpointTsLagGauge = changefeedCheckpointTsLagGauge.WithLabelValues(c.id)
c.metricsChangefeedResolvedTsGauge = changefeedResolvedTsGauge.WithLabelValues(c.id)
c.metricsChangefeedResolvedTsLagGauge = changefeedResolvedTsLagGauge.WithLabelValues(c.id)
c.metricChangefeedTickDuration = changefeedTickDuration.WithLabelValues(c.id)

// create scheduler
c.scheduler, err = c.newScheduler(ctx, checkpointTs)
Expand Down Expand Up @@ -338,6 +359,9 @@ func (c *changefeed) releaseResources(ctx cdcContext.Context) {
c.metricsChangefeedResolvedTsGauge = nil
c.metricsChangefeedResolvedTsLagGauge = nil

changefeedTickDuration.DeleteLabelValues(c.id)
c.metricChangefeedTickDuration = nil

c.initialized = false
}

Expand Down Expand Up @@ -548,7 +572,14 @@ func (c *changefeed) updateStatus(currentTs int64, checkpointTs, resolvedTs mode
}

func (c *changefeed) Close(ctx cdcContext.Context) {
startTime := time.Now()

c.releaseResources(ctx)
costTime := time.Since(startTime)
if costTime > changefeedLogsWarnDuration {
log.Warn("changefeed close took too long", zap.String("changefeed", c.id), zap.Duration("duration", costTime))
}
changefeedCloseDuration.Observe(costTime.Seconds())
}

func (c *changefeed) GetInfoProvider() schedulerv2.InfoProvider {
Expand Down
29 changes: 28 additions & 1 deletion cdc/owner/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,11 @@

package owner

import "github.com/prometheus/client_golang/prometheus"
import (
"time"

"github.com/prometheus/client_golang/prometheus"
)

var (
changefeedCheckpointTsGauge = prometheus.NewGaugeVec(
Expand Down Expand Up @@ -65,13 +69,34 @@ var (
Name: "status",
Help: "The status of changefeeds",
}, []string{"changefeed"})
changefeedTickDuration = prometheus.NewHistogramVec(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe add a screenshot after the metric applied make things more clear, the dashboard template will be added later ?

prometheus.HistogramOpts{
Namespace: "ticdc",
Subsystem: "owner",
Name: "changefeed_tick_duration",
Help: "Bucketed histogram of owner tick changefeed reactor time (s).",
Buckets: prometheus.ExponentialBuckets(0.01 /* 10 ms */, 2, 18),
}, []string{"changefeed"})
changefeedCloseDuration = prometheus.NewHistogram(
prometheus.HistogramOpts{
Namespace: "ticdc",
Subsystem: "owner",
Name: "changefeed_close_duration",
Help: "Bucketed histogram of owner close changefeed reactor time (s).",
Buckets: prometheus.ExponentialBuckets(0.01 /* 10 ms */, 2, 18),
})
)

const (
// total tables that have been dispatched to a single processor
maintainTableTypeTotal string = "total"
// tables that are dispatched to a processor and have not been finished yet
maintainTableTypeWip string = "wip"
// When heavy operations (such as network IO and serialization) take too much time, the program
// should print a warning log, and if necessary, the timeout should be exposed externally through
// monitor.
changefeedLogsWarnDuration = 1 * time.Second
schedulerLogsWarnDuration = 1 * time.Second
)

// InitMetrics registers all metrics used in owner
Expand All @@ -83,4 +108,6 @@ func InitMetrics(registry *prometheus.Registry) {
registry.MustRegister(ownershipCounter)
registry.MustRegister(ownerMaintainTableNumGauge)
registry.MustRegister(changefeedStatusGauge)
registry.MustRegister(changefeedTickDuration)
registry.MustRegister(changefeedCloseDuration)
}
21 changes: 17 additions & 4 deletions cdc/processor/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
cdcContext "github.com/pingcap/tiflow/pkg/context"
cerrors "github.com/pingcap/tiflow/pkg/errors"
"github.com/pingcap/tiflow/pkg/orchestrator"
"github.com/prometheus/client_golang/prometheus"
"go.uber.org/zap"
)

Expand All @@ -36,6 +37,7 @@ const (
commandTpUnknow commandTp = iota //nolint:varcheck,deadcode
commandTpClose
commandTpWriteDebugInfo
processorLogsWarnDuration = 1 * time.Second
)

type command struct {
Expand All @@ -53,16 +55,19 @@ type Manager struct {
newProcessor func(cdcContext.Context) *processor

enableNewScheduler bool

metricProcessorCloseDuration prometheus.Observer
}

// NewManager creates a new processor manager
func NewManager() *Manager {
conf := config.GetGlobalServerConfig()
return &Manager{
processors: make(map[model.ChangeFeedID]*processor),
commandQueue: make(chan *command, 4),
newProcessor: newProcessor,
enableNewScheduler: conf.Debug.EnableNewScheduler,
processors: make(map[model.ChangeFeedID]*processor),
commandQueue: make(chan *command, 4),
newProcessor: newProcessor,
enableNewScheduler: conf.Debug.EnableNewScheduler,
metricProcessorCloseDuration: processorCloseDuration.WithLabelValues(conf.AdvertiseAddr),
}
}

Expand Down Expand Up @@ -129,7 +134,15 @@ func (m *Manager) Tick(stdCtx context.Context, state orchestrator.ReactorState)

func (m *Manager) closeProcessor(changefeedID model.ChangeFeedID) {
if processor, exist := m.processors[changefeedID]; exist {
startTime := time.Now()
captureID := processor.captureInfo.ID
err := processor.Close()
costTime := time.Since(startTime)
if costTime > processorLogsWarnDuration {
log.Warn("processor close took too long", zap.String("changefeed", changefeedID),
zap.String("capture", captureID), zap.Duration("duration", costTime))
}
m.metricProcessorCloseDuration.Observe(costTime.Seconds())
if err != nil {
log.Warn("failed to close processor",
zap.String("changefeed", changefeedID),
Expand Down
18 changes: 18 additions & 0 deletions cdc/processor/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,22 @@ var (
Name: "schema_storage_gc_ts",
Help: "the TS of the currently maintained oldest snapshot in SchemaStorage",
}, []string{"changefeed", "capture"})
processorTickDuration = prometheus.NewHistogramVec(
prometheus.HistogramOpts{
Namespace: "ticdc",
Subsystem: "processor",
Name: "processor_tick_duration",
Help: "Bucketed histogram of processorManager tick processor time (s).",
Buckets: prometheus.ExponentialBuckets(0.01 /* 10 ms */, 2, 18),
}, []string{"changefeed", "capture"})
processorCloseDuration = prometheus.NewHistogramVec(
prometheus.HistogramOpts{
Namespace: "ticdc",
Subsystem: "processor",
Name: "processor_close_duration",
Help: "Bucketed histogram of processorManager close processor time (s).",
Buckets: prometheus.ExponentialBuckets(0.01 /* 10 ms */, 2, 18),
}, []string{"capture"})
)

// InitMetrics registers all metrics used in processor
Expand All @@ -94,4 +110,6 @@ func InitMetrics(registry *prometheus.Registry) {
registry.MustRegister(syncTableNumGauge)
registry.MustRegister(processorErrorCounter)
registry.MustRegister(processorSchemaStorageGcTsGauge)
registry.MustRegister(processorTickDuration)
registry.MustRegister(processorCloseDuration)
}
12 changes: 12 additions & 0 deletions cdc/processor/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,7 @@ type processor struct {
metricSyncTableNumGauge prometheus.Gauge
metricSchemaStorageGcTsGauge prometheus.Gauge
metricProcessorErrorCounter prometheus.Counter
metricProcessorTickDuration prometheus.Observer
}

// checkReadyForMessages checks whether all necessary Etcd keys have been established.
Expand Down Expand Up @@ -246,6 +247,7 @@ func newProcessor(ctx cdcContext.Context) *processor {
metricSyncTableNumGauge: syncTableNumGauge.WithLabelValues(changefeedID, advertiseAddr),
metricProcessorErrorCounter: processorErrorCounter.WithLabelValues(changefeedID, advertiseAddr),
metricSchemaStorageGcTsGauge: processorSchemaStorageGcTsGauge.WithLabelValues(changefeedID, advertiseAddr),
metricProcessorTickDuration: processorTickDuration.WithLabelValues(changefeedID, advertiseAddr),
}
p.createTablePipeline = p.createTablePipelineImpl
p.lazyInit = p.lazyInitImpl
Expand Down Expand Up @@ -280,13 +282,22 @@ func isProcessorIgnorableError(err error) bool {
// the `state` parameter is sent by the etcd worker, the `state` must be a snapshot of KVs in etcd
// The main logic of processor is in this function, including the calculation of many kinds of ts, maintain table pipeline, error handling, etc.
func (p *processor) Tick(ctx cdcContext.Context, state *orchestrator.ChangefeedReactorState) (orchestrator.ReactorState, error) {
startTime := time.Now()
p.changefeed = state
state.CheckCaptureAlive(ctx.GlobalVars().CaptureInfo.ID)
ctx = cdcContext.WithChangefeedVars(ctx, &cdcContext.ChangefeedVars{
ID: state.ID,
Info: state.Info,
})
_, err := p.tick(ctx, state)

costTime := time.Since(startTime)
if costTime > processorLogsWarnDuration {
log.Warn("processor tick took too long", zap.String("changefeed", p.changefeedID),
zap.String("capture", ctx.GlobalVars().CaptureInfo.ID), zap.Duration("duration", costTime))
}
p.metricProcessorTickDuration.Observe(costTime.Seconds())

if err == nil {
return state, nil
}
Expand Down Expand Up @@ -1058,6 +1069,7 @@ func (p *processor) Close() error {
syncTableNumGauge.DeleteLabelValues(p.changefeedID, p.captureInfo.AdvertiseAddr)
processorErrorCounter.DeleteLabelValues(p.changefeedID, p.captureInfo.AdvertiseAddr)
processorSchemaStorageGcTsGauge.DeleteLabelValues(p.changefeedID, p.captureInfo.AdvertiseAddr)
processorTickDuration.DeleteLabelValues(p.changefeedID, p.captureInfo.AdvertiseAddr)
if p.sinkManager != nil {
// pass a canceled context is ok here, since we don't need to wait Close
ctx, cancel := context.WithCancel(context.Background())
Expand Down
1 change: 1 addition & 0 deletions cdc/sink/sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ type Sink interface {

// TryEmitRowChangedEvents is thread-safety and non-blocking.
TryEmitRowChangedEvents(ctx context.Context, rows ...*model.RowChangedEvent) (bool, error)

// EmitDDLEvent sends DDL Event to Sink
// EmitDDLEvent should execute DDL to downstream synchronously
//
Expand Down
8 changes: 0 additions & 8 deletions pkg/orchestrator/interfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,11 +67,3 @@ func (s *SingleDataPatch) Patch(valueMap map[util.EtcdKey][]byte, changedSet map
}
return nil
}

// MultiDataPatch represents an update to many keys
type MultiDataPatch func(valueMap map[util.EtcdKey][]byte, changedSet map[util.EtcdKey]struct{}) error

// Patch implements the DataPatch interface
func (m MultiDataPatch) Patch(valueMap map[util.EtcdKey][]byte, changedSet map[util.EtcdKey]struct{}) error {
return m(valueMap, changedSet)
}