Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for Kafka's ConsumerRebalanceListener #577

Closed
pcasaes opened this issue Jun 1, 2020 · 7 comments · Fixed by #601
Closed

Add support for Kafka's ConsumerRebalanceListener #577

pcasaes opened this issue Jun 1, 2020 · 7 comments · Fixed by #601
Labels
kafka on-roadmap The issue is part of the roadmap

Comments

@pcasaes
Copy link
Contributor

pcasaes commented Jun 1, 2020

Sometimes earliest and latest don't quite cut it, not to mention some interesting deliver exactly once cases as described in the Kafka: The Definitive Guide. There would be a fait bit of work cut out though. From what I can understand from the code subscription is started during configuration. The listener would have to be registered before subscription somehow. Fortunately Vert.x' Kafka client implementation already has support for this.

@cescoffier
Copy link
Contributor

Can do give me a bit more details? Do you mean adding the ability to use a custom "poller" ?

@cescoffier
Copy link
Contributor

I finally understand...
https://www.learningjournal.guru/courses/kafka/kafka-foundation-training/rebalance-listener/ provides a good example.

It's definitely something we should add.

@cescoffier cescoffier added kafka on-roadmap The issue is part of the roadmap labels Jun 3, 2020
@pcasaes
Copy link
Contributor Author

pcasaes commented Jun 4, 2020

That's a good example. The Listener is always called in the first poll within the poll loop thread. It is also called whenever there's a rebalance which can be caused by an unhealthy consumer, but it can also be caused by adding/removing consumers as well as increase the number of partitions.

One example I have is a kafka backed in memory cache. The topic has a retention period but I don't want to wait for kafka to clean up the partitions. I also don't want my warm up to take too long loading up from earliest since it definitely will have data that is too old. I can use a rebalance to do an offset by timestamp look up (there's an index in kafka for that) and then set the offsets accordingly.

@cescoffier
Copy link
Contributor

It should be rather simple to integrate.

@cescoffier
Copy link
Contributor

So, I was thinking to integrate it as follow:

  • the ConsumerRebalanceListener would be exposed as a bean, maybe @Named if needed
  • if @Named is used, add an attribute to configure the name
  • The Kafka Connector would retrieve the bean using the bean manager
  • In Kafka Source, use the subscribe method that supports the listener, like in:
this.consumer.getDelegate().unwrap().subscribe(topic, listener);

@pcasaes fancy a PR? I can guide you.

@pcasaes
Copy link
Contributor Author

pcasaes commented Jun 10, 2020

Sounds good. I'll give it a shot.

@cescoffier
Copy link
Contributor

Thanks @pcasaes, don't hesitate to ping me if you have any questions.

@cescoffier cescoffier linked a pull request Jun 11, 2020 that will close this issue
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
kafka on-roadmap The issue is part of the roadmap
Projects
None yet
Development

Successfully merging a pull request may close this issue.

2 participants