Skip to content

Commit

Permalink
[fix #241] fix flow control (#242)
Browse files Browse the repository at this point in the history
* fix flow control

Signed-off-by: zeminzhou <[email protected]>

* remove barrier

Signed-off-by: zeminzhou <[email protected]>

* KeySpanFlowController -> ChangefeedFlowController

Signed-off-by: zeminzhou <[email protected]>

* fix ut

Signed-off-by: zeminzhou <[email protected]>

* fix ut

Signed-off-by: zeminzhou <[email protected]>

* rename NewKeySpanMemoryQuota -> NewChangefeedMemoryQuota

Signed-off-by: zeminzhou <[email protected]>

* fix ut

Signed-off-by: zeminzhou <[email protected]>

* fix ut

Signed-off-by: zeminzhou <[email protected]>

* fix metric for flow-control

Signed-off-by: zeminzhou <[email protected]>

* fix metric for flow-control

Signed-off-by: zeminzhou <[email protected]>

* add metric for flowController consume

Signed-off-by: zeminzhou <[email protected]>

* fix comment

Signed-off-by: zeminzhou <[email protected]>

* fix comment

Signed-off-by: zeminzhou <[email protected]>

Signed-off-by: zeminzhou <[email protected]>
  • Loading branch information
zeminzhou authored Oct 11, 2022
1 parent bf349c5 commit 21754a5
Show file tree
Hide file tree
Showing 11 changed files with 110 additions and 73 deletions.
8 changes: 4 additions & 4 deletions cdc/cdc/processor/pipeline/keyspan.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ type keyspanPipelineImpl struct {

// TODO find a better name or avoid using an interface
// We use an interface here for ease in unit testing.
type keyspanFlowController interface {
type changefeedFlowController interface {
Consume(commitTs uint64, size uint64, blockCallBack func() error) error
Release(resolvedTs uint64)
Abort()
Expand Down Expand Up @@ -179,15 +179,15 @@ func NewKeySpanPipeline(ctx cdcContext.Context,
replConfig: replConfig,
}

perKeySpanMemoryQuota := serverConfig.GetGlobalServerConfig().PerKeySpanMemoryQuota
perChangefeedMemoryQuota := serverConfig.GetGlobalServerConfig().PerChangefeedMemoryQuota

log.Debug("creating keyspan flow controller",
zap.String("changefeed-id", ctx.ChangefeedVars().ID),
zap.String("keyspan-name", keyspanName),
zap.Uint64("keyspan-id", keyspanID),
zap.Uint64("quota", perKeySpanMemoryQuota))
zap.Uint64("quota", perChangefeedMemoryQuota))

flowController := common.NewKeySpanFlowController(perKeySpanMemoryQuota)
flowController := common.NewChangefeedFlowController(perChangefeedMemoryQuota)
runnerSize := defaultRunnersSize

p := pipeline.NewPipeline(ctx, 500*time.Millisecond, runnerSize, defaultOutputChannelSize)
Expand Down
15 changes: 12 additions & 3 deletions cdc/cdc/processor/pipeline/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,19 +32,28 @@ var (
Name: "txn_count",
Help: "txn count received/executed by this processor",
}, []string{"type", "changefeed", "capture"})
keyspanMemoryHistogram = prometheus.NewHistogramVec(
changefeedMemoryHistogram = prometheus.NewHistogramVec(
prometheus.HistogramOpts{
Namespace: "tikv_cdc",
Subsystem: "processor",
Name: "keyspan_memory_consumption",
Name: "changefeed_memory_consumption",
Help: "estimated memory consumption for a keyspan after the sorter",
Buckets: prometheus.ExponentialBuckets(1*1024*1024 /* mb */, 2, 10),
}, []string{"changefeed", "capture"})
flowControllerDurationHistogram = prometheus.NewHistogramVec(
prometheus.HistogramOpts{
Namespace: "tikv_cdc",
Subsystem: "processor",
Name: "flow_controller_duration",
Help: "bucketed histogram of processing time (s) of flowController consume",
Buckets: prometheus.ExponentialBuckets(0.002 /* 2 ms */, 2, 18),
}, []string{"type", "changefeed", "capture"})
)

// InitMetrics registers all metrics used in processor
func InitMetrics(registry *prometheus.Registry) {
registry.MustRegister(keyspanResolvedTsGauge)
registry.MustRegister(txnCounter)
registry.MustRegister(keyspanMemoryHistogram)
registry.MustRegister(changefeedMemoryHistogram)
registry.MustRegister(flowControllerDurationHistogram)
}
4 changes: 2 additions & 2 deletions cdc/cdc/processor/pipeline/sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,13 +78,13 @@ type sinkNode struct {
eventBuffer []*model.PolymorphicEvent
rawKVBuffer []*model.RawKVEntry

flowController keyspanFlowController
flowController changefeedFlowController

replicaConfig *config.ReplicaConfig
isKeySpanActorMode bool
}

func newSinkNode(keyspanID model.KeySpanID, sink sink.Sink, startTs model.Ts, targetTs model.Ts, flowController keyspanFlowController) *sinkNode {
func newSinkNode(keyspanID model.KeySpanID, sink sink.Sink, startTs model.Ts, targetTs model.Ts, flowController changefeedFlowController) *sinkNode {
return &sinkNode{
keyspanID: keyspanID,
sink: sink,
Expand Down
40 changes: 34 additions & 6 deletions cdc/cdc/processor/pipeline/sorter.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,8 @@ type sorterNode struct {
keyspanID model.KeySpanID
keyspanName string // quoted keyspan, used in metircs only

// for per-keyspan flow control
flowController keyspanFlowController
// for per-changefeed flow control
flowController changefeedFlowController

eg *errgroup.Group
cancel context.CancelFunc
Expand All @@ -57,7 +57,7 @@ type sorterNode struct {

func newSorterNode(
keyspanName string, keyspanID model.KeySpanID, startTs model.Ts,
flowController keyspanFlowController, replConfig *config.ReplicaConfig,
flowController changefeedFlowController, replConfig *config.ReplicaConfig,
) *sorterNode {
return &sorterNode{
keyspanName: keyspanName,
Expand Down Expand Up @@ -117,7 +117,8 @@ func (n *sorterNode) StartActorNode(ctx pipeline.NodeContext, eg *errgroup.Group
lastSendResolvedTsTime := time.Now() // the time at which we last sent a resolved-ts.
lastCRTs := uint64(0) // the commit-ts of the last row changed we sent.

metricsKeySpanMemoryHistogram := keyspanMemoryHistogram.WithLabelValues(ctx.ChangefeedVars().ID, ctx.GlobalVars().CaptureInfo.AdvertiseAddr)
metricsChangefeedMemoryHistogram := changefeedMemoryHistogram.WithLabelValues(ctx.ChangefeedVars().ID, ctx.GlobalVars().CaptureInfo.AdvertiseAddr)
metricsMemoryConsumeHistogram := flowControllerDurationHistogram.WithLabelValues("consume", ctx.ChangefeedVars().ID, ctx.GlobalVars().CaptureInfo.AdvertiseAddr)
metricsTicker := time.NewTicker(flushMemoryMetricsDuration)
defer metricsTicker.Stop()

Expand All @@ -126,7 +127,7 @@ func (n *sorterNode) StartActorNode(ctx pipeline.NodeContext, eg *errgroup.Group
case <-stdCtx.Done():
return nil
case <-metricsTicker.C:
metricsKeySpanMemoryHistogram.Observe(float64(n.flowController.GetConsumption()))
metricsChangefeedMemoryHistogram.Observe(float64(n.flowController.GetConsumption()))
case msg, ok := <-eventSorter.Output():
if !ok {
// sorter output channel closed
Expand All @@ -151,6 +152,33 @@ func (n *sorterNode) StartActorNode(ctx pipeline.NodeContext, eg *errgroup.Group
}
}

// We calculate memory consumption by PolymorphicEvent size.
size := uint64(msg.RawKV.ApproximateDataSize())
// NOTE we allow the quota to be exceeded if blocking means interrupting a transaction.
// Otherwise the pipeline would deadlock.
startTime := time.Now()
err := n.flowController.Consume(commitTs, size, func() error {
if lastCRTs > lastSentResolvedTs {
// If we are blocking, we send a Resolved Event here to elicit a sink-flush.
// Not sending a Resolved Event here will very likely deadlock the pipeline.
lastSentResolvedTs = lastCRTs
lastSendResolvedTsTime = time.Now()
ctx.SendToNextNode(pipeline.PolymorphicEventMessage(model.NewResolvedPolymorphicEvent(0, lastCRTs, n.keyspanID)))
}
return nil
})
if err != nil {
if cerror.ErrFlowControllerAborted.Equal(err) {
log.Info("flow control cancelled for keyspan",
zap.Uint64("keyspanID", n.keyspanID),
zap.String("keyspanName", n.keyspanName))
} else {
ctx.Throw(err)
}
return nil
}
metricsMemoryConsumeHistogram.Observe(time.Since(startTime).Seconds())

lastCRTs = commitTs
} else {
// handle OpTypeResolved
Expand Down Expand Up @@ -198,7 +226,7 @@ func (n *sorterNode) TryHandleDataMessage(ctx context.Context, msg pipeline.Mess
}

func (n *sorterNode) Destroy(ctx pipeline.NodeContext) error {
defer keyspanMemoryHistogram.DeleteLabelValues(ctx.ChangefeedVars().ID, ctx.GlobalVars().CaptureInfo.AdvertiseAddr)
defer changefeedMemoryHistogram.DeleteLabelValues(ctx.ChangefeedVars().ID, ctx.GlobalVars().CaptureInfo.AdvertiseAddr)
n.cancel()
return n.eg.Wait()
}
Expand Down
48 changes: 24 additions & 24 deletions cdc/cdc/sink/common/flow_control.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,10 @@ import (
"go.uber.org/zap"
)

// KeySpanMemoryQuota is designed to curb the total memory consumption of processing
// the event streams in a keyspan.
// A higher-level controller more suikeyspan for direct use by the processor is KeySpanFlowController.
type KeySpanMemoryQuota struct {
// ChangefeedMemoryQuota is designed to curb the total memory consumption of processing
// the event streams in a chagnefeed.
// A higher-level controller more suitable for direct use by the processor is ChangefeedFlowController.
type ChangefeedMemoryQuota struct {
Quota uint64 // should not be changed once intialized

IsAborted uint32
Expand All @@ -38,10 +38,10 @@ type KeySpanMemoryQuota struct {
cond *sync.Cond
}

// NewKeySpanMemoryQuota creates a new KeySpanMemoryQuota
// NewChangefeedMemoryQuota creates a new ChangefeedMemoryQuota
// quota: max advised memory consumption in bytes.
func NewKeySpanMemoryQuota(quota uint64) *KeySpanMemoryQuota {
ret := &KeySpanMemoryQuota{
func NewChangefeedMemoryQuota(quota uint64) *ChangefeedMemoryQuota {
ret := &ChangefeedMemoryQuota{
Quota: quota,
mu: sync.Mutex{},
Consumed: 0,
Expand All @@ -55,7 +55,7 @@ func NewKeySpanMemoryQuota(quota uint64) *KeySpanMemoryQuota {
// block until enough memory has been freed up by Release.
// blockCallBack will be called if the function will block.
// Should be used with care to prevent deadlock.
func (c *KeySpanMemoryQuota) ConsumeWithBlocking(nBytes uint64, blockCallBack func() error) error {
func (c *ChangefeedMemoryQuota) ConsumeWithBlocking(nBytes uint64, blockCallBack func() error) error {
if nBytes >= c.Quota {
return cerrors.ErrFlowControllerEventLargerThanQuota.GenWithStackByArgs(nBytes, c.Quota)
}
Expand Down Expand Up @@ -91,7 +91,7 @@ func (c *KeySpanMemoryQuota) ConsumeWithBlocking(nBytes uint64, blockCallBack fu

// ForceConsume is called when blocking is not acceptable and the limit can be violated
// for the sake of avoid deadlock. It merely records the increased memory consumption.
func (c *KeySpanMemoryQuota) ForceConsume(nBytes uint64) error {
func (c *ChangefeedMemoryQuota) ForceConsume(nBytes uint64) error {
c.mu.Lock()
defer c.mu.Unlock()

Expand All @@ -104,12 +104,12 @@ func (c *KeySpanMemoryQuota) ForceConsume(nBytes uint64) error {
}

// Release is called when a chuck of memory is done being used.
func (c *KeySpanMemoryQuota) Release(nBytes uint64) {
func (c *ChangefeedMemoryQuota) Release(nBytes uint64) {
c.mu.Lock()

if c.Consumed < nBytes {
c.mu.Unlock()
log.Panic("KeySpanMemoryQuota: releasing more than consumed, report a bug",
log.Panic("ChangefeedMemoryQuota: releasing more than consumed, report a bug",
zap.Uint64("consumed", c.Consumed),
zap.Uint64("released", nBytes))
}
Expand All @@ -125,22 +125,22 @@ func (c *KeySpanMemoryQuota) Release(nBytes uint64) {
}

// Abort interrupts any ongoing ConsumeWithBlocking call
func (c *KeySpanMemoryQuota) Abort() {
func (c *ChangefeedMemoryQuota) Abort() {
atomic.StoreUint32(&c.IsAborted, 1)
c.cond.Signal()
}

// GetConsumption returns the current memory consumption
func (c *KeySpanMemoryQuota) GetConsumption() uint64 {
func (c *ChangefeedMemoryQuota) GetConsumption() uint64 {
c.mu.Lock()
defer c.mu.Unlock()

return c.Consumed
}

// KeySpanFlowController provides a convenient interface to control the memory consumption of a per keyspan event stream
type KeySpanFlowController struct {
memoryQuota *KeySpanMemoryQuota
// ChangefeedFlowController provides a convenient interface to control the memory consumption of a per changefeed event stream
type ChangefeedFlowController struct {
memoryQuota *ChangefeedMemoryQuota

mu sync.Mutex
queue deque.Deque
Expand All @@ -153,17 +153,17 @@ type commitTsSizeEntry struct {
Size uint64
}

// NewKeySpanFlowController creates a new KeySpanFlowController
func NewKeySpanFlowController(quota uint64) *KeySpanFlowController {
return &KeySpanFlowController{
memoryQuota: NewKeySpanMemoryQuota(quota),
// NewChangefeedFlowController creates a new ChangefeedFlowController
func NewChangefeedFlowController(quota uint64) *ChangefeedFlowController {
return &ChangefeedFlowController{
memoryQuota: NewChangefeedMemoryQuota(quota),
queue: deque.NewDeque(),
}
}

// Consume is called when an event has arrived for being processed by the sink.
// It will handle transaction boundaries automatically, and will not block intra-transaction.
func (c *KeySpanFlowController) Consume(commitTs uint64, size uint64, blockCallBack func() error) error {
func (c *ChangefeedFlowController) Consume(commitTs uint64, size uint64, blockCallBack func() error) error {
lastCommitTs := atomic.LoadUint64(&c.lastCommitTs)

if commitTs < lastCommitTs {
Expand Down Expand Up @@ -201,7 +201,7 @@ func (c *KeySpanFlowController) Consume(commitTs uint64, size uint64, blockCallB
}

// Release is called when all events committed before resolvedTs has been freed from memory.
func (c *KeySpanFlowController) Release(resolvedTs uint64) {
func (c *ChangefeedFlowController) Release(resolvedTs uint64) {
var nBytesToRelease uint64

c.mu.Lock()
Expand All @@ -219,11 +219,11 @@ func (c *KeySpanFlowController) Release(resolvedTs uint64) {
}

// Abort interrupts any ongoing Consume call
func (c *KeySpanFlowController) Abort() {
func (c *ChangefeedFlowController) Abort() {
c.memoryQuota.Abort()
}

// GetConsumption returns the current memory consumption
func (c *KeySpanFlowController) GetConsumption() uint64 {
func (c *ChangefeedFlowController) GetConsumption() uint64 {
return c.memoryQuota.GetConsumption()
}
22 changes: 11 additions & 11 deletions cdc/cdc/sink/common/flow_control_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ func (c *mockCallBacker) cb() error {
func (s *flowControlSuite) TestMemoryQuotaBasic(c *check.C) {
defer testleak.AfterTest(c)()

controller := NewKeySpanMemoryQuota(1024)
controller := NewChangefeedMemoryQuota(1024)
sizeCh := make(chan uint64, 1024)
var (
wg sync.WaitGroup
Expand Down Expand Up @@ -89,7 +89,7 @@ func (s *flowControlSuite) TestMemoryQuotaBasic(c *check.C) {
func (s *flowControlSuite) TestMemoryQuotaForceConsume(c *check.C) {
defer testleak.AfterTest(c)()

controller := NewKeySpanMemoryQuota(1024)
controller := NewChangefeedMemoryQuota(1024)
sizeCh := make(chan uint64, 1024)
var (
wg sync.WaitGroup
Expand Down Expand Up @@ -137,7 +137,7 @@ func (s *flowControlSuite) TestMemoryQuotaForceConsume(c *check.C) {
func (s *flowControlSuite) TestMemoryQuotaAbort(c *check.C) {
defer testleak.AfterTest(c)()

controller := NewKeySpanMemoryQuota(1024)
controller := NewChangefeedMemoryQuota(1024)
var wg sync.WaitGroup
wg.Add(1)
go func() {
Expand All @@ -162,7 +162,7 @@ func (s *flowControlSuite) TestMemoryQuotaAbort(c *check.C) {
func (s *flowControlSuite) TestMemoryQuotaReleaseZero(c *check.C) {
defer testleak.AfterTest(c)()

controller := NewKeySpanMemoryQuota(1024)
controller := NewChangefeedMemoryQuota(1024)
controller.Release(0)
}

Expand All @@ -178,7 +178,7 @@ func (s *flowControlSuite) TestFlowControlBasic(c *check.C) {
defer cancel()
errg, ctx := errgroup.WithContext(ctx)
mockedRowsCh := make(chan *commitTsSizeEntry, 1024)
flowController := NewKeySpanFlowController(2048)
flowController := NewChangefeedFlowController(2048)

errg.Go(func() error {
lastCommitTs := uint64(1)
Expand Down Expand Up @@ -288,7 +288,7 @@ func (s *flowControlSuite) TestFlowControlAbort(c *check.C) {
defer testleak.AfterTest(c)()

callBacker := &mockCallBacker{}
controller := NewKeySpanFlowController(1024)
controller := NewChangefeedFlowController(1024)
var wg sync.WaitGroup
wg.Add(1)
go func() {
Expand Down Expand Up @@ -318,7 +318,7 @@ func (s *flowControlSuite) TestFlowControlCallBack(c *check.C) {
defer cancel()
errg, ctx := errgroup.WithContext(ctx)
mockedRowsCh := make(chan *commitTsSizeEntry, 1024)
flowController := NewKeySpanFlowController(512)
flowController := NewChangefeedFlowController(512)

errg.Go(func() error {
lastCommitTs := uint64(1)
Expand Down Expand Up @@ -421,7 +421,7 @@ func (s *flowControlSuite) TestFlowControlCallBackNotBlockingRelease(c *check.C)
defer testleak.AfterTest(c)()

var wg sync.WaitGroup
controller := NewKeySpanFlowController(512)
controller := NewChangefeedFlowController(512)
wg.Add(1)

ctx, cancel := context.WithCancel(context.TODO())
Expand Down Expand Up @@ -463,7 +463,7 @@ func (s *flowControlSuite) TestFlowControlCallBackError(c *check.C) {
defer testleak.AfterTest(c)()

var wg sync.WaitGroup
controller := NewKeySpanFlowController(512)
controller := NewChangefeedFlowController(512)
wg.Add(1)

ctx, cancel := context.WithCancel(context.TODO())
Expand Down Expand Up @@ -492,7 +492,7 @@ func (s *flowControlSuite) TestFlowControlCallBackError(c *check.C) {
func (s *flowControlSuite) TestFlowControlConsumeLargerThanQuota(c *check.C) {
defer testleak.AfterTest(c)()

controller := NewKeySpanFlowController(1024)
controller := NewChangefeedFlowController(1024)
err := controller.Consume(1, 2048, func() error {
c.Fatalf("unreachable")
return nil
Expand All @@ -505,7 +505,7 @@ func BenchmarkKeySpanFlowController(B *testing.B) {
defer cancel()
errg, ctx := errgroup.WithContext(ctx)
mockedRowsCh := make(chan *commitTsSizeEntry, 102400)
flowController := NewKeySpanFlowController(20 * 1024 * 1024) // 20M
flowController := NewChangefeedFlowController(20 * 1024 * 1024) // 20M

errg.Go(func() error {
lastCommitTs := uint64(1)
Expand Down
Loading

0 comments on commit 21754a5

Please sign in to comment.