Skip to content

Commit

Permalink
sink(ticdc): optimize the code structure of sink module (#5139)
Browse files Browse the repository at this point in the history
close #5138
  • Loading branch information
CharlesCheung96 authored Apr 20, 2022
1 parent 79b6dd8 commit 062cd4b
Show file tree
Hide file tree
Showing 75 changed files with 260 additions and 213 deletions.
4 changes: 2 additions & 2 deletions cdc/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,8 @@ import (
tablepipeline "github.com/pingcap/tiflow/cdc/processor/pipeline"
"github.com/pingcap/tiflow/cdc/puller"
redowriter "github.com/pingcap/tiflow/cdc/redo/writer"
"github.com/pingcap/tiflow/cdc/sink"
"github.com/pingcap/tiflow/cdc/sink/producer/kafka"
sink "github.com/pingcap/tiflow/cdc/sink/metrics"
"github.com/pingcap/tiflow/cdc/sink/mq/producer/kafka"
"github.com/pingcap/tiflow/cdc/sorter"
"github.com/pingcap/tiflow/cdc/sorter/leveldb"
"github.com/pingcap/tiflow/cdc/sorter/memory"
Expand Down
5 changes: 3 additions & 2 deletions cdc/owner/ddl_sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"github.com/pingcap/log"
"github.com/pingcap/tiflow/cdc/model"
"github.com/pingcap/tiflow/cdc/sink"
"github.com/pingcap/tiflow/cdc/sink/mysql"
cdcContext "github.com/pingcap/tiflow/pkg/context"
cerror "github.com/pingcap/tiflow/pkg/errors"
"github.com/pingcap/tiflow/pkg/filter"
Expand Down Expand Up @@ -56,7 +57,7 @@ type DDLSink interface {

type ddlSinkImpl struct {
lastSyncPoint model.Ts
syncPointStore sink.SyncpointStore
syncPointStore mysql.SyncpointStore

// It is used to record the checkpointTs and the names of the table at that time.
mu struct {
Expand Down Expand Up @@ -107,7 +108,7 @@ func ddlSinkInitializer(ctx cdcContext.Context, a *ddlSinkImpl, id model.ChangeF
if !info.SyncPointEnabled {
return nil
}
syncPointStore, err := sink.NewSyncpointStore(stdCtx, id, info.SinkURI)
syncPointStore, err := mysql.NewSyncpointStore(stdCtx, id, info.SinkURI)
if err != nil {
return errors.Trace(err)
}
Expand Down
4 changes: 2 additions & 2 deletions cdc/processor/pipeline/table.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import (
"github.com/pingcap/tiflow/cdc/model"
"github.com/pingcap/tiflow/cdc/redo"
"github.com/pingcap/tiflow/cdc/sink"
"github.com/pingcap/tiflow/cdc/sink/common"
"github.com/pingcap/tiflow/cdc/sink/flowcontrol"
serverConfig "github.com/pingcap/tiflow/pkg/config"
cdcContext "github.com/pingcap/tiflow/pkg/context"
cerror "github.com/pingcap/tiflow/pkg/errors"
Expand Down Expand Up @@ -196,7 +196,7 @@ func NewTablePipeline(ctx cdcContext.Context,
zap.String("tableName", tableName),
zap.Int64("tableID", tableID),
zap.Uint64("quota", perTableMemoryQuota))
flowController := common.NewTableFlowController(perTableMemoryQuota)
flowController := flowcontrol.NewTableFlowController(perTableMemoryQuota)
config := ctx.ChangefeedVars().Info.Config
cyclicEnabled := config.Cyclic != nil && config.Cyclic.IsEnabled()
runnerSize := defaultRunnersSize
Expand Down
4 changes: 2 additions & 2 deletions cdc/processor/pipeline/table_actor.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import (
"github.com/pingcap/tiflow/cdc/model"
"github.com/pingcap/tiflow/cdc/redo"
"github.com/pingcap/tiflow/cdc/sink"
"github.com/pingcap/tiflow/cdc/sink/common"
"github.com/pingcap/tiflow/cdc/sink/flowcontrol"
"github.com/pingcap/tiflow/pkg/actor"
"github.com/pingcap/tiflow/pkg/actor/message"
serverConfig "github.com/pingcap/tiflow/pkg/config"
Expand Down Expand Up @@ -263,7 +263,7 @@ func (t *tableActor) start(sdtTableContext context.Context) error {
zap.String("tableName", t.tableName),
zap.Uint64("quota", t.memoryQuota))

flowController := common.NewTableFlowController(t.memoryQuota)
flowController := flowcontrol.NewTableFlowController(t.memoryQuota)
sorterNode := newSorterNode(t.tableName, t.tableID,
t.replicaInfo.StartTs, flowController,
t.mounter, t.replicaConfig,
Expand Down
5 changes: 3 additions & 2 deletions cdc/processor/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import (
"github.com/pingcap/tiflow/cdc/puller"
"github.com/pingcap/tiflow/cdc/redo"
"github.com/pingcap/tiflow/cdc/sink"
"github.com/pingcap/tiflow/cdc/sink/metrics"
"github.com/pingcap/tiflow/cdc/sorter/memory"
"github.com/pingcap/tiflow/pkg/config"
cdcContext "github.com/pingcap/tiflow/pkg/context"
Expand Down Expand Up @@ -480,8 +481,8 @@ func (p *processor) lazyInitImpl(ctx cdcContext.Context) error {
}
opts[mark.OptCyclicConfig] = cyclicCfg
}
opts[sink.OptChangefeedID] = p.changefeed.ID
opts[sink.OptCaptureAddr] = ctx.GlobalVars().CaptureInfo.AdvertiseAddr
opts[metrics.OptChangefeedID] = p.changefeed.ID
opts[metrics.OptCaptureAddr] = ctx.GlobalVars().CaptureInfo.AdvertiseAddr
log.Info("processor try new sink", zap.String("changefeed", p.changefeed.ID))

start := time.Now()
Expand Down
7 changes: 4 additions & 3 deletions cdc/sink/black_hole.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,19 +19,20 @@ import (

"github.com/pingcap/log"
"github.com/pingcap/tiflow/cdc/model"
"github.com/pingcap/tiflow/cdc/sink/metrics"
"go.uber.org/zap"
)

// newBlackHoleSink creates a black hole sink
func newBlackHoleSink(ctx context.Context) *blackHoleSink {
return &blackHoleSink{
// use `sinkTypeDB` to record metrics
statistics: NewStatistics(ctx, sinkTypeDB),
// use `SinkTypeDB` to record metrics
statistics: metrics.NewStatistics(ctx, metrics.SinkTypeDB),
}
}

type blackHoleSink struct {
statistics *Statistics
statistics *metrics.Statistics
accumulated uint64
lastAccumulated uint64
}
Expand Down
8 changes: 4 additions & 4 deletions cdc/sink/buffer_sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"github.com/pingcap/errors"
"github.com/pingcap/log"
"github.com/pingcap/tiflow/cdc/model"
"github.com/pingcap/tiflow/cdc/sink/metrics"
"github.com/pingcap/tiflow/pkg/util"
"github.com/prometheus/client_golang/prometheus"
"go.uber.org/zap"
Expand Down Expand Up @@ -64,13 +65,12 @@ type runState struct {
metricTotalRows prometheus.Counter
}

func (b *bufferSink) run(ctx context.Context, errCh chan error) {
changefeedID := util.ChangefeedIDFromCtx(ctx)
func (b *bufferSink) run(ctx context.Context, changefeedID string, errCh chan error) {
state := runState{
metricTotalRows: bufferSinkTotalRowsCountCounter.WithLabelValues(changefeedID),
metricTotalRows: metrics.BufferSinkTotalRowsCountCounter.WithLabelValues(changefeedID),
}
defer func() {
bufferSinkTotalRowsCountCounter.DeleteLabelValues(changefeedID)
metrics.BufferSinkTotalRowsCountCounter.DeleteLabelValues(changefeedID)
}()

for {
Expand Down
7 changes: 4 additions & 3 deletions cdc/sink/buffer_sink_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"time"

"github.com/pingcap/tiflow/cdc/model"
"github.com/pingcap/tiflow/cdc/sink/metrics"
"github.com/stretchr/testify/require"
)

Expand All @@ -39,7 +40,7 @@ func TestFlushTable(t *testing.T) {
ctx, cancel := context.WithCancel(context.TODO())
defer cancel()
b := newBufferSink(newBlackHoleSink(ctx), 5, make(chan drawbackMsg))
go b.run(ctx, make(chan error))
go b.run(ctx, "", make(chan error))

require.Equal(t, uint64(5), b.getTableCheckpointTs(2))
require.Nil(t, b.EmitRowChangedEvents(ctx))
Expand Down Expand Up @@ -83,7 +84,7 @@ func TestFlushFailed(t *testing.T) {

ctx, cancel := context.WithCancel(context.TODO())
b := newBufferSink(newBlackHoleSink(ctx), 5, make(chan drawbackMsg))
go b.run(ctx, make(chan error))
go b.run(ctx, "", make(chan error))

checkpoint, err := b.FlushRowChangedEvents(ctx, 3, 8)
require.True(t, checkpoint <= 8)
Expand Down Expand Up @@ -121,7 +122,7 @@ func BenchmarkRun(b *testing.B) {
defer cancel()

state := runState{
metricTotalRows: bufferSinkTotalRowsCountCounter.WithLabelValues(b.Name()),
metricTotalRows: metrics.BufferSinkTotalRowsCountCounter.WithLabelValues(b.Name()),
}

for exp := 0; exp < 9; exp++ {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

package common
package flowcontrol

import (
"sync"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

package common
package flowcontrol

import (
"context"
Expand Down
59 changes: 38 additions & 21 deletions cdc/sink/metrics.go → cdc/sink/metrics/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,83 +11,100 @@
// See the License for the specific language governing permissions and
// limitations under the License.

package sink
package metrics

import (
"github.com/prometheus/client_golang/prometheus"
)

var (
execBatchHistogram = prometheus.NewHistogramVec(
// ExecBatchHistogram records batch size of a txn.
ExecBatchHistogram = prometheus.NewHistogramVec(
prometheus.HistogramOpts{
Namespace: "ticdc",
Subsystem: "sink",
Name: "txn_batch_size",
Help: "Bucketed histogram of batch size of a txn.",
Buckets: prometheus.ExponentialBuckets(1, 2, 18),
}, []string{"changefeed", "type"}) // type is for `sinkType`
execTxnHistogram = prometheus.NewHistogramVec(

// ExecTxnHistogram records the execution time of a txn.
ExecTxnHistogram = prometheus.NewHistogramVec(
prometheus.HistogramOpts{
Namespace: "ticdc",
Subsystem: "sink",
Name: "txn_exec_duration",
Help: "Bucketed histogram of processing time (s) of a txn.",
Buckets: prometheus.ExponentialBuckets(0.002 /* 2 ms */, 2, 18),
}, []string{"changefeed", "type"}) // type is for `sinkType`
execDDLHistogram = prometheus.NewHistogramVec(

// ExecDDLHistogram records the exexution time of a DDL.
ExecDDLHistogram = prometheus.NewHistogramVec(
prometheus.HistogramOpts{
Namespace: "ticdc",
Subsystem: "sink",
Name: "ddl_exec_duration",
Help: "Bucketed histogram of processing time (s) of a ddl.",
Buckets: prometheus.ExponentialBuckets(0.01, 2, 18),
}, []string{"changefeed"})
executionErrorCounter = prometheus.NewCounterVec(

// ExecutionErrorCounter is the counter of execution errors.
ExecutionErrorCounter = prometheus.NewCounterVec(
prometheus.CounterOpts{
Namespace: "ticdc",
Subsystem: "sink",
Name: "execution_error",
Help: "total count of execution errors",
}, []string{"changefeed"})
conflictDetectDurationHis = prometheus.NewHistogramVec(

// ConflictDetectDurationHis records the duration of detecting conflict.
ConflictDetectDurationHis = prometheus.NewHistogramVec(
prometheus.HistogramOpts{
Namespace: "ticdc",
Subsystem: "sink",
Name: "conflict_detect_duration",
Help: "Bucketed histogram of conflict detect time (s) for single DML statement",
Buckets: prometheus.ExponentialBuckets(0.001 /* 1 ms */, 2, 20),
}, []string{"changefeed"})
bucketSizeCounter = prometheus.NewCounterVec(

// BucketSizeCounter is the counter of bucket size.
BucketSizeCounter = prometheus.NewCounterVec(
prometheus.CounterOpts{
Namespace: "ticdc",
Subsystem: "sink",
Name: "bucket_size",
Help: "size of the DML bucket",
}, []string{"changefeed", "bucket"})
totalRowsCountGauge = prometheus.NewGaugeVec(

// TotalRowsCountGauge is the total number of rows that are processed by sink.
TotalRowsCountGauge = prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Namespace: "ticdc",
Subsystem: "sink",
Name: "total_rows_count",
Help: "The total count of rows that are processed by sink",
}, []string{"changefeed"})
totalFlushedRowsCountGauge = prometheus.NewGaugeVec(

// TotalFlushedRowsCountGauge is the total count of rows that are flushed to sink.
TotalFlushedRowsCountGauge = prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Namespace: "ticdc",
Subsystem: "sink",
Name: "total_flushed_rows_count",
Help: "The total count of rows that are flushed by sink",
}, []string{"changefeed"})

tableSinkTotalRowsCountCounter = prometheus.NewCounterVec(
// TableSinkTotalRowsCountCounter is the total count of rows that are processed by sink.
TableSinkTotalRowsCountCounter = prometheus.NewCounterVec(
prometheus.CounterOpts{
Namespace: "ticdc",
Subsystem: "sink",
Name: "table_sink_total_rows_count",
Help: "The total count of rows that are processed by table sink",
}, []string{"changefeed"})

bufferSinkTotalRowsCountCounter = prometheus.NewCounterVec(
// BufferSinkTotalRowsCountCounter is the total count of rows that are processed by buffer sink.
BufferSinkTotalRowsCountCounter = prometheus.NewCounterVec(
prometheus.CounterOpts{
Namespace: "ticdc",
Subsystem: "sink",
Expand All @@ -98,14 +115,14 @@ var (

// InitMetrics registers all metrics in this file
func InitMetrics(registry *prometheus.Registry) {
registry.MustRegister(execBatchHistogram)
registry.MustRegister(execTxnHistogram)
registry.MustRegister(execDDLHistogram)
registry.MustRegister(executionErrorCounter)
registry.MustRegister(conflictDetectDurationHis)
registry.MustRegister(bucketSizeCounter)
registry.MustRegister(totalRowsCountGauge)
registry.MustRegister(totalFlushedRowsCountGauge)
registry.MustRegister(tableSinkTotalRowsCountCounter)
registry.MustRegister(bufferSinkTotalRowsCountCounter)
registry.MustRegister(ExecBatchHistogram)
registry.MustRegister(ExecTxnHistogram)
registry.MustRegister(ExecDDLHistogram)
registry.MustRegister(ExecutionErrorCounter)
registry.MustRegister(ConflictDetectDurationHis)
registry.MustRegister(BucketSizeCounter)
registry.MustRegister(TotalRowsCountGauge)
registry.MustRegister(TotalFlushedRowsCountGauge)
registry.MustRegister(TableSinkTotalRowsCountCounter)
registry.MustRegister(BufferSinkTotalRowsCountCounter)
}
Loading

0 comments on commit 062cd4b

Please sign in to comment.