-
Notifications
You must be signed in to change notification settings - Fork 26
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[fix #241] fix flow control #242
Conversation
Signed-off-by: zeminzhou <[email protected]>
Signed-off-by: zeminzhou <[email protected]>
/cc @pingyu @haojinming PTAL~ |
Codecov Report
Additional details and impacted files@@ Coverage Diff @@
## main #242 +/- ##
================================================
- Coverage 61.3330% 61.0277% -0.3053%
================================================
Files 239 239
Lines 20180 20199 +19
================================================
- Hits 12377 12327 -50
- Misses 6666 6755 +89
+ Partials 1137 1117 -20
*This pull request uses carry forward flags. Click here to find out more.
|
zap.Uint64("keyspanID", n.keyspanID), | ||
zap.String("keyspanName", n.keyspanName)) | ||
} else { | ||
ctx.Throw(err) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does this cause panic?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It does not cause panic, just throws a err to parents ndoes.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Suggest to add some test case to verify the consuming & releasing (including integrated & e2e tests). If we miss some release, the flowController.Consume
will block the data flow.
cdc/cdc/processor/pipeline/sorter.go
Outdated
@@ -188,6 +212,7 @@ func (n *sorterNode) TryHandleDataMessage(ctx context.Context, msg pipeline.Mess | |||
zap.Uint64("resolvedTs", resolvedTs), | |||
zap.Uint64("oldResolvedTs", oldResolvedTs)) | |||
} | |||
atomic.StoreUint64(&n.resolvedTs, rawKV.CRTs) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why ? The previous oldResolvedTs := atomic.SwapUint64(&n.resolvedTs, resolvedTs)
should have done the assignment.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Removed, thanks! PTAL~
@@ -151,6 +151,30 @@ func (n *sorterNode) StartActorNode(ctx pipeline.NodeContext, eg *errgroup.Group | |||
} | |||
} | |||
|
|||
// We calculate memory consumption by PolymorphicEvent size. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
About PerKeySpanMemoryQuota
:
-
The default value
10MB
may be too small.1GB
may be fine. -
I think we can change the config item name
per-keyspan-memory-quota
toper-changefeed-memory-quota
, as users have no idea that whatkeyspan
is. -
It would better to default to a proportion of total system memory, for ease of use. Refer to
MaxMemoryPresure
. (Not a must in this PR).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for your suggestion, I do the following:
- changed
10MB
to1GB
; - changed
per-keyspan-memory-quota
toper-changefeed-memory-quota
;
Signed-off-by: zeminzhou <[email protected]>
Signed-off-by: zeminzhou <[email protected]>
Signed-off-by: zeminzhou <[email protected]>
size := uint64(msg.RawKV.ApproximateDataSize()) | ||
// NOTE we allow the quota to be exceeded if blocking means interrupting a transaction. | ||
// Otherwise the pipeline would deadlock. | ||
err := n.flowController.Consume(commitTs, size, func() error { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Suggest add a metric to observe the blocking duration and Consume
failure count. It seems it should not fail unless flowcontrol
is aborted.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Signed-off-by: zeminzhou <[email protected]>
Signed-off-by: zeminzhou <[email protected]>
Signed-off-by: zeminzhou <[email protected]>
Signed-off-by: zeminzhou <[email protected]>
Signed-off-by: zeminzhou <[email protected]>
Signed-off-by: zeminzhou <[email protected]>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Rest LGTM.
Help: "estimated memory consumption for a keyspan after the sorter", | ||
Buckets: prometheus.ExponentialBuckets(1*1024*1024 /* mb */, 2, 10), | ||
}, []string{"changefeed", "capture"}) | ||
flowControllerConsumeHistogram = prometheus.NewHistogramVec( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Suggest to name as flowControllerDurationHistogram
and add a tag type
= consume
for Consume
interface. Then we can add more tags for other interfaces if necessary later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
added, thanks! PTAL~
@@ -2965,7 +2965,7 @@ | |||
"steppedLine": false, | |||
"targets": [ | |||
{ | |||
"expr": "sum(rate(tikv_cdc_processor_table_memory_consumption_sum{tidb_cluster=\"$tidb_cluster\", capture=~\"$capture\"}[30s]) / rate(tikv_cdc_processor_table_memory_consumption_count{tidb_cluster=\"$tidb_cluster\", capture=~\"$capture\"}[30s])) by (capture)", | |||
"expr": "sum(rate(tikv_cdc_processor_changefeed_memory_consumption_sum{tidb_cluster=\"$tidb_cluster\", capture=~\"$capture\"}[30s]) / rate(tikv_cdc_processor_changefeed_memory_consumption_count{tidb_cluster=\"$tidb_cluster\", capture=~\"$capture\"}[30s])) by (capture)", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please also paste the screenshot of these two metrics.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
pasted, PTAL -> #242 (comment)
/verify |
Signed-off-by: zeminzhou <[email protected]>
Signed-off-by: zeminzhou <[email protected]>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM~
Signed-off-by: zeminzhou [email protected]
What problem does this PR solve?
Fix flow control
Issue Number: close #241
Problem Description: TBD
What is changed and how does it work?
Fix flow control
Code changes
Check List for Tests
This PR has been tested by at least one of the following methods: