Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[fix #241] fix flow control #242

Merged
merged 15 commits into from
Oct 11, 2022
Merged

[fix #241] fix flow control #242

merged 15 commits into from
Oct 11, 2022

Conversation

zeminzhou
Copy link
Contributor

Signed-off-by: zeminzhou [email protected]

What problem does this PR solve?

Fix flow control

Issue Number: close #241

Problem Description: TBD

What is changed and how does it work?

Fix flow control

Code changes

  • Has exported function/method change

Check List for Tests

This PR has been tested by at least one of the following methods:

  • Unit test
  • Integration test
  • Manual test (add detailed scripts or steps below)

Signed-off-by: zeminzhou <[email protected]>
Signed-off-by: zeminzhou <[email protected]>
@zeminzhou
Copy link
Contributor Author

zeminzhou commented Sep 28, 2022

/cc @pingyu @haojinming PTAL~

@codecov
Copy link

codecov bot commented Sep 28, 2022

Codecov Report

Merging #242 (9f4a2bf) into main (bf349c5) will decrease coverage by 0.3052%.
The diff coverage is 6.8181%.

Additional details and impacted files

Impacted file tree graph

@@               Coverage Diff                @@
##               main       #242        +/-   ##
================================================
- Coverage   61.3330%   61.0277%   -0.3053%     
================================================
  Files           239        239                
  Lines         20180      20199        +19     
================================================
- Hits          12377      12327        -50     
- Misses         6666       6755        +89     
+ Partials       1137       1117        -20     
Flag Coverage Δ *Carryforward flag
br 60.2203% <ø> (ø) Carriedforward from 8d809a8
cdc 61.3984% <6.8181%> (-0.4461%) ⬇️

*This pull request uses carry forward flags. Click here to find out more.

Impacted Files Coverage Δ
cdc/cdc/processor/pipeline/keyspan.go 0.0000% <0.0000%> (ø)
cdc/cdc/processor/pipeline/metrics.go 0.0000% <0.0000%> (ø)
cdc/cdc/processor/pipeline/sorter.go 30.8333% <0.0000%> (-5.4412%) ⬇️
cdc/cdc/sink/common/flow_control.go 0.0000% <0.0000%> (ø)
cdc/cdc/processor/pipeline/sink.go 63.0434% <100.0000%> (ø)
cdc/pkg/config/server_config.go 55.6818% <100.0000%> (ø)
cdc/cdc/sorter/unified/file_backend.go 33.7719% <0.0000%> (-20.1755%) ⬇️
cdc/pkg/pdtime/acquirer.go 58.1395% <0.0000%> (-6.9768%) ⬇️
cdc/pkg/orchestrator/etcd_worker.go 78.5714% <0.0000%> (-0.8404%) ⬇️
cdc/cdc/sorter/unified/unified_sorter.go 91.6666% <0.0000%> (-0.6945%) ⬇️
... and 1 more

zap.Uint64("keyspanID", n.keyspanID),
zap.String("keyspanName", n.keyspanName))
} else {
ctx.Throw(err)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this cause panic?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It does not cause panic, just throws a err to parents ndoes.

Copy link
Collaborator

@pingyu pingyu left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggest to add some test case to verify the consuming & releasing (including integrated & e2e tests). If we miss some release, the flowController.Consume will block the data flow.

@@ -188,6 +212,7 @@ func (n *sorterNode) TryHandleDataMessage(ctx context.Context, msg pipeline.Mess
zap.Uint64("resolvedTs", resolvedTs),
zap.Uint64("oldResolvedTs", oldResolvedTs))
}
atomic.StoreUint64(&n.resolvedTs, rawKV.CRTs)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why ? The previous oldResolvedTs := atomic.SwapUint64(&n.resolvedTs, resolvedTs) should have done the assignment.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removed, thanks! PTAL~

@@ -151,6 +151,30 @@ func (n *sorterNode) StartActorNode(ctx pipeline.NodeContext, eg *errgroup.Group
}
}

// We calculate memory consumption by PolymorphicEvent size.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

About PerKeySpanMemoryQuota:

  1. The default value 10MB may be too small. 1GB may be fine.

  2. I think we can change the config item name per-keyspan-memory-quota to per-changefeed-memory-quota, as users have no idea that what keyspan is.

  3. It would better to default to a proportion of total system memory, for ease of use. Refer to MaxMemoryPresure. (Not a must in this PR).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for your suggestion, I do the following:

  1. changed 10MB to 1GB;
  2. changed per-keyspan-memory-quota to per-changefeed-memory-quota;

size := uint64(msg.RawKV.ApproximateDataSize())
// NOTE we allow the quota to be exceeded if blocking means interrupting a transaction.
// Otherwise the pipeline would deadlock.
err := n.flowController.Consume(commitTs, size, func() error {
Copy link
Contributor

@haojinming haojinming Oct 10, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggest add a metric to observe the blocking duration and Consume failure count. It seems it should not fail unless flowcontrol is aborted.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

image
added, thanks! PTAL~

@zeminzhou
Copy link
Contributor Author

image

Copy link
Contributor

@haojinming haojinming left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

Copy link
Collaborator

@pingyu pingyu left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Rest LGTM.

Help: "estimated memory consumption for a keyspan after the sorter",
Buckets: prometheus.ExponentialBuckets(1*1024*1024 /* mb */, 2, 10),
}, []string{"changefeed", "capture"})
flowControllerConsumeHistogram = prometheus.NewHistogramVec(
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggest to name as flowControllerDurationHistogram and add a tag type = consume for Consume interface. Then we can add more tags for other interfaces if necessary later.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

added, thanks! PTAL~

@@ -2965,7 +2965,7 @@
"steppedLine": false,
"targets": [
{
"expr": "sum(rate(tikv_cdc_processor_table_memory_consumption_sum{tidb_cluster=\"$tidb_cluster\", capture=~\"$capture\"}[30s]) / rate(tikv_cdc_processor_table_memory_consumption_count{tidb_cluster=\"$tidb_cluster\", capture=~\"$capture\"}[30s])) by (capture)",
"expr": "sum(rate(tikv_cdc_processor_changefeed_memory_consumption_sum{tidb_cluster=\"$tidb_cluster\", capture=~\"$capture\"}[30s]) / rate(tikv_cdc_processor_changefeed_memory_consumption_count{tidb_cluster=\"$tidb_cluster\", capture=~\"$capture\"}[30s])) by (capture)",
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please also paste the screenshot of these two metrics.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

pasted, PTAL -> #242 (comment)

@pingyu
Copy link
Collaborator

pingyu commented Oct 10, 2022

/verify

Signed-off-by: zeminzhou <[email protected]>
Signed-off-by: zeminzhou <[email protected]>
Copy link
Collaborator

@pingyu pingyu left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM~

@pingyu pingyu merged commit 21754a5 into tikv:main Oct 11, 2022
@zeminzhou zeminzhou deleted the fix-flow-control branch October 17, 2022 13:20
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

cdc: OOM happend when create 20 changefeed
3 participants