-
Notifications
You must be signed in to change notification settings - Fork 289
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
kafka(ticdc): reset the admin client to fix broken pipe #8228
Conversation
[REVIEW NOTIFICATION] This pull request has been approved by:
To complete the pull request process, please ask the reviewers in the list to review by filling The full list of commands accepted by this bot can be found here. Reviewer can indicate their review by submitting an approval review. |
/run-all-tests |
/run-all-tests |
/run-all-tests |
_ = a.client.Close() | ||
a.client = newClient | ||
|
||
return errors.New("retry after reset") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why do we always return an error?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if return nil here, the retry logic will exit, this return an error is on purpose.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Perhaps it is more appropriate to return this error outside of reset?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it's very strange that a func always returns an error.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if we move it outside, we still have to write one error when retry, no matter reset return error or not.
pkg/sink/kafka/admin.go
Outdated
brokers []*sarama.Broker | ||
err error | ||
) | ||
err = retry.Do(ctx, func() error { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
all the retry logic is almost the same, can you refine it to reuse same code?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1, Could the retry logic be extracted to a common place?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done, we can use high order function to solve this.
I'm wondering what is the route cause of this "broken pipe" error? Is the original issue related to this sarama issue (IBM/sarama#2173)? Maybe it's a sarama bug which hasn't been fixed thoroughly. |
yes, it's a know bug issue last for a few years, which has not been addressed yet. |
/run-all-tests |
1 similar comment
/run-all-tests |
/run-all-tests |
/merge |
This pull request has been accepted and is ready to merge. Commit hash: 1969a7f
|
/run-kafka-integration-test |
In response to a cherrypick label: new pull request created to branch |
What problem does this PR solve?
Issue Number: close #8225, close #8223
What is changed and how it works?
AdminClient
may meetwrite: broken pipe
, it's a know issue have not been addressed yet.admin client
if meetbroken pipe
error.Check List
Tests
Questions
Will it cause performance regression or break compatibility?
Do you need to update user documentation, design documentation or monitoring documentation?
Release note