-
Notifications
You must be signed in to change notification settings - Fork 182
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add support for Kafka's ConsumerRebalanceListener #601
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Amazing! Thanks.
I've made a few comments, but nothing critical.
I'm wondering if this feature does not deserve its own section in the doc. Something explaining how it's used and link to the Kafka documentation. WDYT?
...ion/src/main/doc/modules/connectors/partials/META-INF/connector/smallrye-kafka-incoming.adoc
Outdated
Show resolved
Hide resolved
...ctive-messaging-kafka/src/main/java/io/smallrye/reactive/messaging/kafka/KafkaConnector.java
Outdated
Show resolved
Hide resolved
...kafka/src/main/java/io/smallrye/reactive/messaging/kafka/KafkaConsumerRebalanceListener.java
Show resolved
Hide resolved
Please rebase. I should fix the CI issue. |
I thought the same thing. I wanted to to get the code out for review first. |
@cescoffier Added an example in the docs. I thought occurred to me though. We could dispense with the Pro: less configuration work. |
af2a5bc
to
30f6247
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
More a question than a comment. I wonder if the listener should return Uni<Void>
to indicate when it completes or fails. However, that would not fit very well with the callback registered on the consumer.
At the moment, it requires verifying when the action has completed and I'm not totally sure about failures.
...on/src/main/doc/modules/kafka/examples/inbound/KafkaRebalancedConsumerRebalanceListener.java
Outdated
Show resolved
Hide resolved
...on/src/main/doc/modules/kafka/examples/inbound/KafkaRebalancedConsumerRebalanceListener.java
Outdated
Show resolved
Hide resolved
...on/src/main/doc/modules/kafka/examples/inbound/KafkaRebalancedConsumerRebalanceListener.java
Outdated
Show resolved
Hide resolved
I was able to get it working but the solution isn't very elegant. It doesn't solve the fundamental problem which is that we need to wait for the partition assigned Uni to complete before continuing with the poll. I'll take a look at kafka mutiny module and see what can be done. |
@cescoffier I got it working well enough. The KafkaConsumer class has pause and resume methods which made the work possible. Did you get chance to think about my comments on #601 (comment) ? |
48c43d1
to
b0d6ac5
Compare
@pcasaes Interesting idea. The questions are:
About 1: I'm not sure, it's a fairly advanced use case, you probably don't want to use it unconsciously. So, I would say no: it must have a name. About 2: I like the idea. At least the developer is aware of it. It does not reduce the flexibility as you can still have a specific name and configure the attribute in that case. So, you can still have a listener shared by multiple channels. The question is about channel name or topic name. As it's quite Kafka specific, I would say that it should use the topic name, and not the channel name. So:
WDYT? |
...on/src/main/doc/modules/kafka/examples/inbound/KafkaRebalancedConsumerRebalanceListener.java
Outdated
Show resolved
Hide resolved
...on/src/main/doc/modules/kafka/examples/inbound/KafkaRebalancedConsumerRebalanceListener.java
Outdated
Show resolved
Hide resolved
...kafka/src/main/java/io/smallrye/reactive/messaging/kafka/KafkaConsumerRebalanceListener.java
Outdated
Show resolved
Hide resolved
...kafka/src/main/java/io/smallrye/reactive/messaging/kafka/KafkaConsumerRebalanceListener.java
Outdated
Show resolved
Hide resolved
...ve-messaging-kafka/src/main/java/io/smallrye/reactive/messaging/kafka/i18n/KafkaLogging.java
Outdated
Show resolved
Hide resolved
...ive-messaging-kafka/src/main/java/io/smallrye/reactive/messaging/kafka/impl/KafkaSource.java
Outdated
Show resolved
Hide resolved
I gave it some thought. I understand why we wouldn't want to use the channel name but the topic name doesn't seem quite right either. You could conceivably set up two connectors against the same topic. How about the consumer group? That makes more sense. It's in the name after all: consumer rebalance listener. And setting up two consumers instances on the same group but with different setups seems incorrect. EDIT: In a future PR we might want to allow a consumer to read from multiple topics (kafka consumers allow for this) so using the group id will make us future proof. |
@cescoffier I think it's done. Let me know what you think. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The consumer group Id is the right thing to do.
About the pause/resume, so, it should be paused and resumed by the connector. The connector should also handle failures and probably "fails stop" in this case.
...on/src/main/doc/modules/kafka/examples/inbound/KafkaRebalancedConsumerRebalanceListener.java
Outdated
Show resolved
Hide resolved
...on/src/main/doc/modules/kafka/examples/inbound/KafkaRebalancedConsumerRebalanceListener.java
Outdated
Show resolved
Hide resolved
...ive-messaging-kafka/src/main/java/io/smallrye/reactive/messaging/kafka/impl/KafkaSource.java
Outdated
Show resolved
Hide resolved
Went ahead and moved the pause/resume to the connector. In the case of a failure kafka will handle it. If a consumer instance is paused for a certain amount of time it will trigger a consumer rebalance. |
23f4916
to
8504e83
Compare
|
I think I can do better with shorter retries. |
aab7c92
to
c2d1571
Compare
@cescoffier |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The logic sounds good. I've made a few cosmetic comments.
I believe one test is missing (when the configured listener cannot be found).
documentation/src/main/doc/modules/kafka/pages/consumer-rebalance-listener.adoc
Show resolved
Hide resolved
...kafka/src/main/java/io/smallrye/reactive/messaging/kafka/KafkaConsumerRebalanceListener.java
Outdated
Show resolved
Hide resolved
...kafka/src/main/java/io/smallrye/reactive/messaging/kafka/KafkaConsumerRebalanceListener.java
Outdated
Show resolved
Hide resolved
...kafka/src/main/java/io/smallrye/reactive/messaging/kafka/KafkaConsumerRebalanceListener.java
Outdated
Show resolved
Hide resolved
...kafka/src/main/java/io/smallrye/reactive/messaging/kafka/KafkaConsumerRebalanceListener.java
Outdated
Show resolved
Hide resolved
...ive-messaging-kafka/src/main/java/io/smallrye/reactive/messaging/kafka/impl/KafkaSource.java
Show resolved
Hide resolved
...ive-messaging-kafka/src/main/java/io/smallrye/reactive/messaging/kafka/impl/KafkaSource.java
Outdated
Show resolved
Hide resolved
...ive-messaging-kafka/src/main/java/io/smallrye/reactive/messaging/kafka/impl/KafkaSource.java
Show resolved
Hide resolved
...ive-messaging-kafka/src/main/java/io/smallrye/reactive/messaging/kafka/impl/KafkaSource.java
Outdated
Show resolved
Hide resolved
...tive-messaging-kafka/src/test/java/io/smallrye/reactive/messaging/kafka/KafkaSourceTest.java
Show resolved
Hide resolved
All done |
Amazing! Thanks for the hard work @pcasaes! |
Details #577
Had to create a custom listener interface that also receives the KafkaConsumer beside the assigned topics with partitions. This is necessary for use cases that involves seeking to specific offsets during rebalance/initialization.