Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

sink(cdc): kafka producer use default configuration. #4359

Merged

Conversation

3AceShowHand
Copy link
Contributor

@3AceShowHand 3AceShowHand commented Jan 17, 2022

What problem does this PR solve?

Issue Number: close #4383, close #4499

If the network condition between the TiCDC and Kafka is not good enough, let producer get responses from Kafka as soon as possible, to prevent waste too much time.

What is changed and how it works?

change kafka producer to use default configurations, to prevent wast too much time on network communication with a Kafka cluster in bad network conditions.

  • set Net.DialTimeout, Net.WriteTimeout, Net.ReadTimeout to 10s
  • set Metadata.Retry.Max to 1, to make RefreshMatadata return fast

Check List

Tests

  • Manual test (add detailed scripts or steps below)
./bin/cdc cli changefeed create --pd=http://127.0.0.1:2379 --sink-uri="mysql://[email protected]:3306/"

./bin/cdc cli changefeed create --pd=http://127.0.0.1:2379 --sink-uri="kafka://127.0.0.1:9092/kafka-test?protocol=open-protocol"

./bin/go-tpc tpcc -H 127.0.0.1 -P 4000 -D workload --warehouses 50 -T 4 prepare

wait a few moment ... then kill the kafka by `kill -9`
[2022/01/17 18:54:33.796 +08:00] [ERROR] [changefeed.go:118] ["an error occurred in Owner"] [changefeed=4b62a15d-5da7-4f48-a40f-608e40456373] [error="[CDC:ErrKafkaSendMessage]kafka send message failed: kafka: Failed to deliver 3 messages."] [errorVerbose="[CDC:ErrKafkaSendMessage]kafka send message failed: kafka: Failed to deliver 3 messages.\ngithub.com/pingcap/errors.AddStack\n\tgithub.com/pingcap/[email protected]/errors.go:174\ngithub.com/pingcap/errors.(*Error).GenWithStackByArgs\n\tgithub.com/pingcap/[email protected]/normalize.go:164\ngithub.com/pingcap/tiflow/pkg/errors.WrapError\n\tgithub.com/pingcap/tiflow/pkg/errors/helper.go:30\ngithub.com/pingcap/tiflow/cdc/sink/producer/kafka.(*kafkaSaramaProducer).SyncBroadcastMessage\n\tgithub.com/pingcap/tiflow/cdc/sink/producer/kafka/kafka.go:135\ngithub.com/pingcap/tiflow/cdc/sink.(*mqSink).writeToProducer\n\tgithub.com/pingcap/tiflow/cdc/sink/mq.go:382\ngithub.com/pingcap/tiflow/cdc/sink.(*mqSink).EmitCheckpointTs\n\tgithub.com/pingcap/tiflow/cdc/sink/mq.go:219\ngithub.com/pingcap/tiflow/cdc/owner.(*ddlSinkImpl).run.func1\n\tgithub.com/pingcap/tiflow/cdc/owner/ddl_sink.go:149\nruntime.goexit\n\truntime/asm_arm64.s:1133"]
[2022/01/17 18:54:33.796 +08:00] [INFO] [changefeed.go:315] ["close changefeed"] [changefeed=4b62a15d-5da7-4f48-a40f-608e40456373] [info="{\"sink-uri\":\"kafka://***/asd123?protocol=open-protocol\",\"opts\":{\"max-message-bytes\":\"1048588\"},\"create-time\":\"2022-01-17T18:51:53.105199+08:00\",\"start-ts\":430549686829711362,\"target-ts\":0,\"admin-job-type\":0,\"sort-engine\":\"unified\",\"sort-dir\":\"\",\"config\":{\"case-sensitive\":true,\"enable-old-value\":true,\"force-replicate\":false,\"check-gc-safe-point\":true,\"filter\":{\"rules\":[\"*.*\"],\"ignore-txn-start-ts\":null},\"mounter\":{\"worker-num\":16},\"sink\":{\"dispatchers\":null,\"protocol\":\"open-protocol\",\"column-selectors\":null},\"cyclic-replication\":{\"enable\":false,\"replica-id\":0,\"filter-replica-ids\":null,\"id-buckets\":0,\"sync-ddl\":false},\"scheduler\":{\"type\":\"table-number\",\"polling-time\":-1},\"consistent\":{\"level\":\"none\",\"max-log-size\":64,\"flush-interval\":1000,\"storage\":\"\"}},\"state\":\"normal\",\"error\":null,\"sync-point-enabled\":false,\"sync-point-interval\":600000000000,\"creator-version\":\"v5.4.0-master-dirty\"}"] [isRemoved=false]
[2022/01/17 18:54:33.796 +08:00] [INFO] [ddl_puller.go:195] ["Close the ddl puller"]
[2022/01/17 18:54:33.796 +08:00] [INFO] [kafka.go:196] ["kafka producer closing..."]
[2022/01/17 18:54:33.797 +08:00] [INFO] [kafka.go:221] ["async client closed"] [duration=255.625µs]
[2022/01/17 18:54:33.864 +08:00] [INFO] [region_range_lock.go:370] ["unlocked range"] [lockID=1] [regionID=4] [startKey=6d44444c4a6f624cff69ff737400000000ff0000f90000000000ff00006c0000000000fa] [endKey=6d44444c4a6f624cff69ff737400000000ff0000f90000000000ff00006d0000000000fa] [checkpointTs=430549728786382855]
[2022/01/17 18:54:33.865 +08:00] [INFO] [client.go:1087] ["stream to store closed"] [addr=127.0.0.1:20160] [storeID=1]
[2022/01/17 18:54:33.888 +08:00] [INFO] [region_range_lock.go:370] ["unlocked range"] [lockID=2] [regionID=4] [startKey=6d44444c4a6f6241ff64ff644964784c69ff7374ff0000000000ff000000f700000000ff0000006c00000000fb] [endKey=6d44444c4a6f6241ff64ff644964784c69ff7374ff0000000000ff000000f700000000ff0000006d00000000fb] [checkpointTs=430549728786382855]
[2022/01/17 18:54:33.889 +08:00] [INFO] [client.go:1087] ["stream to store closed"] [addr=127.0.0.1:20160] [storeID=1]
[2022/01/17 18:54:34.568 +08:00] [INFO] [kafka.go:228] ["sync client closed"] [duration=771.709333ms]

as shown in the log above, the owner can be closed in around 1 second.

10s later, the changefeed restart, and failed in around 1 second again.
18s later, the changefeed restart again....

the restart logic is handled by feed_state_manager.

When the Kafka cluster is in a bad network condition, this configuration will detect it very fast, to prevent wasting too much time. But this would also have a false-negative case, such as the user's network is just not fast enough...

[2022/01/17 18:54:33.674 +08:00] [INFO] [processor.go:1074] ["processor try to close the sinkManager"] [changefeed=4b62a15d-5da7-4f48-a40f-608e40456373]
[2022/01/17 18:54:33.677 +08:00] [INFO] [manager.go:89] ["sinkManager try close bufSink"] [changefeed=4b62a15d-5da7-4f48-a40f-608e40456373]
[2022/01/17 18:54:55.254 +08:00] [ERROR] [kafka.go:219] ["close async client with error"] [error="kafka: Failed to deliver 383 messages."] [duration=21.576894875s]
[2022/01/17 18:54:55.254 +08:00] [INFO] [kafka.go:228] ["sync client closed"] [duration=41.084µs]
[2022/01/17 18:54:55.254 +08:00] [INFO] [manager.go:98] ["close bufSink success"] [changefeed=4b62a15d-5da7-4f48-a40f-608e40456373] [duration=21.577101583s]
[2022/01/17 18:54:55.254 +08:00] [INFO] [processor.go:1083] ["processor close sinkManager success"] [changefeed=4b62a15d-5da7-4f48-a40f-608e40456373] [duration=21.577510917s]
18:54:33 ~ 18:54:55, processor is closing the first time...

18:54:33 ~ 18:54:34, owner closed the first time
18:54:44 ~ 18:54:55, owner retry to start the first time
18:55:04 ~ 18:55:05, owner retry to start the second time.

It looks that the processor should be fully closed before the owner tries to initialize the changefeed again. Or, the owner shouldn't try to initialize the changefeed before the processor is not fully closed.

Code changes

Side effects

Related changes

  • Need to cherry-pick to the release branch

Release note

None

@ti-chi-bot
Copy link
Member

ti-chi-bot commented Jan 17, 2022

[REVIEW NOTIFICATION]

This pull request has been approved by:

  • hi-rustin
  • overvenus

To complete the pull request process, please ask the reviewers in the list to review by filling /cc @reviewer in the comment.
After your PR has acquired the required number of LGTMs, you can assign this pull request to the committer in the list by filling /assign @committer in the comment to help you merge this pull request.

The full list of commands accepted by this bot can be found here.

Reviewer can indicate their review by submitting an approval review.
Reviewer can cancel approval by submitting a request changes review.

@ti-chi-bot ti-chi-bot added do-not-merge/needs-linked-issue release-note Denotes a PR that will be considered when it comes time to generate release notes. size/S Denotes a PR that changes 10-29 lines, ignoring generated files. size/XS Denotes a PR that changes 0-9 lines, ignoring generated files. and removed size/S Denotes a PR that changes 10-29 lines, ignoring generated files. labels Jan 17, 2022
@3AceShowHand 3AceShowHand force-pushed the kafka-use-default-configuration branch from 8c97d96 to 39da317 Compare January 17, 2022 10:39
@codecov-commenter
Copy link

codecov-commenter commented Jan 17, 2022

Codecov Report

Merging #4359 (553b507) into master (9607554) will decrease coverage by 0.0775%.
The diff coverage is 59.9718%.

Flag Coverage Δ
cdc 60.3456% <67.4744%> (+0.4233%) ⬆️
dm 51.5144% <50.7086%> (-0.5145%) ⬇️

Flags with carried forward coverage won't be shown. Click here to find out more.

@@               Coverage Diff                @@
##             master      #4359        +/-   ##
================================================
- Coverage   55.6402%   55.5626%   -0.0776%     
================================================
  Files           494        502         +8     
  Lines         61283      62362      +1079     
================================================
+ Hits          34098      34650       +552     
- Misses        23750      24251       +501     
- Partials       3435       3461        +26     

@3AceShowHand
Copy link
Contributor Author

/run-all-tests

@3AceShowHand
Copy link
Contributor Author

/run-dm-integration-test
/run-kafka-integration-test

@ti-chi-bot ti-chi-bot added size/M Denotes a PR that changes 30-99 lines, ignoring generated files. and removed size/XS Denotes a PR that changes 0-9 lines, ignoring generated files. labels Jan 17, 2022
@3AceShowHand
Copy link
Contributor Author

/run-all-tests

@3AceShowHand
Copy link
Contributor Author

/run-kafka-integration-test

1 similar comment
@purelind
Copy link
Collaborator

/run-kafka-integration-test

@ti-chi-bot ti-chi-bot added size/S Denotes a PR that changes 10-29 lines, ignoring generated files. and removed size/M Denotes a PR that changes 30-99 lines, ignoring generated files. labels Jan 17, 2022
@3AceShowHand
Copy link
Contributor Author

/run-all-tests

@3AceShowHand
Copy link
Contributor Author

/run-kafka-integration-test

@3AceShowHand
Copy link
Contributor Author

/run-all-tests

@ti-chi-bot ti-chi-bot added the status/can-merge Indicates a PR has been approved by a committer. label Feb 9, 2022
@Rustin170506
Copy link
Member

/run-dm-integration-test

@Rustin170506
Copy link
Member

/run-kafka-integration-test

1 similar comment
@3AceShowHand
Copy link
Contributor Author

/run-kafka-integration-test

@Rustin170506
Copy link
Member

/run-kafka-integration-test

+1

@3AceShowHand
Copy link
Contributor Author

/run-kafka-integration-test

@Rustin170506
Copy link
Member

/run-kafka-integration-test /tidb=pr/32081

1 similar comment
@Rustin170506
Copy link
Member

/run-kafka-integration-test /tidb=pr/32081

@ti-chi-bot ti-chi-bot merged commit 1c1015b into pingcap:master Feb 9, 2022
ti-chi-bot pushed a commit to ti-chi-bot/tiflow that referenced this pull request Feb 9, 2022
@ti-chi-bot
Copy link
Member

In response to a cherrypick label: new pull request created: #4541.

@ti-chi-bot
Copy link
Member

In response to a cherrypick label: new pull request created: #4542.

ti-chi-bot pushed a commit to ti-chi-bot/tiflow that referenced this pull request Feb 9, 2022
@ti-chi-bot
Copy link
Member

In response to a cherrypick label: new pull request created: #4543.

ti-chi-bot pushed a commit to ti-chi-bot/tiflow that referenced this pull request Feb 9, 2022
@ti-chi-bot
Copy link
Member

In response to a cherrypick label: new pull request created: #4544.

ti-chi-bot pushed a commit to ti-chi-bot/tiflow that referenced this pull request Feb 9, 2022
@ti-chi-bot
Copy link
Member

In response to a cherrypick label: new pull request created: #4545.

ti-chi-bot pushed a commit to ti-chi-bot/tiflow that referenced this pull request Feb 9, 2022
@ti-chi-bot
Copy link
Member

In response to a cherrypick label: new pull request created: #4546.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
component/sink Sink component. needs-cherry-pick-release-4.0 Should cherry pick this PR to release-4.0 branch. needs-cherry-pick-release-5.0 Should cherry pick this PR to release-5.0 branch. needs-cherry-pick-release-5.1 Should cherry pick this PR to release-5.1 branch. needs-cherry-pick-release-5.2 Should cherry pick this PR to release-5.2 branch. needs-cherry-pick-release-5.3 Should cherry pick this PR to release-5.3 branch. needs-cherry-pick-release-5.4 Should cherry pick this PR to release-5.4 branch. release-note-none Denotes a PR that doesn't merit a release note. size/XL Denotes a PR that changes 500-999 lines, ignoring generated files. status/can-merge Indicates a PR has been approved by a committer. status/LGT2 Indicates that a PR has LGTM 2. type/enhancement The issue or PR belongs to an enhancement.
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Unclear max-message-bytes warning adjust Kafka producer's configuration to use reasonable values
6 participants