Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for Kafka's ConsumerRebalanceListener #601

Merged
merged 1 commit into from
Jun 16, 2020

Conversation

pcasaes
Copy link
Contributor

@pcasaes pcasaes commented Jun 11, 2020

Details #577

Had to create a custom listener interface that also receives the KafkaConsumer beside the assigned topics with partitions. This is necessary for use cases that involves seeking to specific offsets during rebalance/initialization.

@cescoffier cescoffier self-requested a review June 11, 2020 08:41
@cescoffier cescoffier linked an issue Jun 11, 2020 that may be closed by this pull request
Copy link
Contributor

@cescoffier cescoffier left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Amazing! Thanks.

I've made a few comments, but nothing critical.
I'm wondering if this feature does not deserve its own section in the doc. Something explaining how it's used and link to the Kafka documentation. WDYT?

@cescoffier
Copy link
Contributor

Please rebase. I should fix the CI issue.

@pcasaes
Copy link
Contributor Author

pcasaes commented Jun 11, 2020

I thought the same thing. I wanted to to get the code out for review first.

@pcasaes
Copy link
Contributor Author

pcasaes commented Jun 11, 2020

@cescoffier Added an example in the docs. I thought occurred to me though. We could dispense with the consumer-rebalance-listener.name configuration if we were to use the same channel-name on the listener. What do you think?

Pro: less configuration work.
Con: Multiple inbound connectors won't be able to share a listener.

@pcasaes pcasaes force-pushed the master branch 2 times, most recently from af2a5bc to 30f6247 Compare June 11, 2020 22:47
Copy link
Contributor

@cescoffier cescoffier left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

More a question than a comment. I wonder if the listener should return Uni<Void> to indicate when it completes or fails. However, that would not fit very well with the callback registered on the consumer.

At the moment, it requires verifying when the action has completed and I'm not totally sure about failures.

@pcasaes
Copy link
Contributor Author

pcasaes commented Jun 12, 2020

I was able to get it working but the solution isn't very elegant. It doesn't solve the fundamental problem which is that we need to wait for the partition assigned Uni to complete before continuing with the poll. I'll take a look at kafka mutiny module and see what can be done.

@pcasaes
Copy link
Contributor Author

pcasaes commented Jun 13, 2020

@cescoffier I got it working well enough. The KafkaConsumer class has pause and resume methods which made the work possible.

Did you get chance to think about my comments on #601 (comment) ?

@pcasaes pcasaes force-pushed the master branch 2 times, most recently from 48c43d1 to b0d6ac5 Compare June 14, 2020 01:15
@cescoffier
Copy link
Contributor

@cescoffier Added an example in the docs. I thought occurred to me though. We could dispense with the consumer-rebalance-listener.name configuration if we were to use the same channel-name on the listener. What do you think?

Pro: less configuration work.
Con: Multiple inbound connectors won't be able to share a listener.

@pcasaes Interesting idea. The questions are:

  1. do we want to use a default one (if there is one exposed without a name)?
  2. do we want to use one having the channel name as name?

About 1:

I'm not sure, it's a fairly advanced use case, you probably don't want to use it unconsciously. So, I would say no: it must have a name.

About 2:

I like the idea. At least the developer is aware of it. It does not reduce the flexibility as you can still have a specific name and configure the attribute in that case. So, you can still have a listener shared by multiple channels.

The question is about channel name or topic name. As it's quite Kafka specific, I would say that it should use the topic name, and not the channel name.

So:

  1. if the configuration attribute is not set and there is a listener named after the topic name you are consuming, you use that one. A log message should be added to indicate the choice.
  2. if the configuration attribute is set and there is no listener with that name, you fail.
  3. if the configuration attribute is set and there is a listener with that name, you use that one.

WDYT?

@pcasaes
Copy link
Contributor Author

pcasaes commented Jun 14, 2020

I gave it some thought. I understand why we wouldn't want to use the channel name but the topic name doesn't seem quite right either. You could conceivably set up two connectors against the same topic. How about the consumer group? That makes more sense. It's in the name after all: consumer rebalance listener. And setting up two consumers instances on the same group but with different setups seems incorrect.

EDIT: In a future PR we might want to allow a consumer to read from multiple topics (kafka consumers allow for this) so using the group id will make us future proof.

@pcasaes
Copy link
Contributor Author

pcasaes commented Jun 14, 2020

@cescoffier I think it's done. Let me know what you think.

Copy link
Contributor

@cescoffier cescoffier left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The consumer group Id is the right thing to do.

About the pause/resume, so, it should be paused and resumed by the connector. The connector should also handle failures and probably "fails stop" in this case.

@pcasaes
Copy link
Contributor Author

pcasaes commented Jun 14, 2020

Went ahead and moved the pause/resume to the connector. In the case of a failure kafka will handle it. If a consumer instance is paused for a certain amount of time it will trigger a consumer rebalance.

@pcasaes pcasaes force-pushed the master branch 3 times, most recently from 23f4916 to 8504e83 Compare June 14, 2020 20:44
@pcasaes
Copy link
Contributor Author

pcasaes commented Jun 14, 2020

@cescoffier

  1. Used the logger for all logs

  2. Removed the merging Multi (that was a relic from an older attempt).

  3. We have to resume the consumer for it to trigger a rebalance
    This one was crucial and was discovered by adding a test for this case. So if the rebalance Uni results in an error we wait session.timeout.ms + max.poll.interval.ms before resuming the consumer.

@pcasaes
Copy link
Contributor Author

pcasaes commented Jun 15, 2020

I think I can do better with shorter retries.

@pcasaes pcasaes force-pushed the master branch 2 times, most recently from aab7c92 to c2d1571 Compare June 15, 2020 04:42
@pcasaes
Copy link
Contributor Author

pcasaes commented Jun 15, 2020

@cescoffier
If assigning fails will keep retrying using an exponential back off until the kafka consumer session can be considered timed out. According to the documentation it's max.poll.interval.ms or max.poll.interval.ms + session.timeout.ms if group.instance.id is not null. Once timed out we'll resume the consumer which will trigger a new consumer re-balance. Added tests to both cases, success on a simple retry and success only after triggering a new consumer re-balance.

Copy link
Contributor

@cescoffier cescoffier left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The logic sounds good. I've made a few cosmetic comments.
I believe one test is missing (when the configured listener cannot be found).

@pcasaes
Copy link
Contributor Author

pcasaes commented Jun 15, 2020

All done

@cescoffier cescoffier self-requested a review June 16, 2020 07:41
@cescoffier
Copy link
Contributor

Amazing! Thanks for the hard work @pcasaes!

@cescoffier cescoffier merged commit 7fc71ee into smallrye:master Jun 16, 2020
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Add support for Kafka's ConsumerRebalanceListener
2 participants