-
Notifications
You must be signed in to change notification settings - Fork 1.4k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
KIP-62 / KAFKA-3888: Allow consumer to send heartbeats from a background thread #948
Comments
Hey @jeffwidman do you plan to do any work on this? If not I probably can take it on the weekend. |
No, I'm afraid I won't have time to work on this anytime soon... if you have time, go for it! Only comment--and you probably already know this--is as far as implementation, the closer it mirrors the Java consumer (without the bugs 😉 ), the better IMHO. Among other things, it makes it so config tuning advice from the Kafka mailing list etc is easier to translate to |
We were just discussing this internally at my day job... since Python has the GIL, how would this background thread ever issue a heartbeat if the main consumer thread is CPU-bound in it's message processing? Ie, the main thread would occupy the CPU the whole time, and the background thread wouldn't get scheduled in to issue the heartbeat unless the message processing was I/O bound. |
Hey @jeffwidman |
I agree that in most cases the GIL would not prevent the heart-beat background thread from doing its job. |
@tvoinarovskyi are you still planning on working on this? It's not something I can tackle in the near term, just wanted to check in to see where you're at on it as it would be a very useful addition... |
@jeffwidman Hey, sorry, but not seems possible in near term. I did look at how Java client does it, quite a hassle with locking, not sure I can do it properly in my limited free time. |
@jeffwidman @Drizzt1991 - is there a pull request for this? We would quite like to leverage KIP-62 functionality in our organization. |
@oscarbatori Sorry, nothing too good to show... |
Is there any work around for this problem? I just switched to |
@pvanderlinden You could do a quite eleborate setup with using |
@tvoinarovskyi I don't see a way to poll Kafka though, if I call poll it will actually fetch and return messages which I don't want of course? Just noticed if I increase the session_timeout_ms I also need to increase the request_timeout_ms, which means failures will take even longer to be detected on that part as well. The processing is in python, io waits for an external program, and some in c modules for python (scikit-learn/numpy). |
@pvanderlinden The idea is that you |
@tvoinarovskyi So every time you consume a message you pause all partitions, start a thread which calls poll, then after processing you unpause and stop the thread, consume, and start all over again? |
@patricklucas Let me put up a snippet of code. I wanted to put it togather before anyway. Will be back in a hour or so) |
Thanks, that would be very useful. |
@pvanderlinden Ok, so for starters, it's not exactly easy code here. https://gist.github.com/tvoinarovskyi/05a5d083a0f96cae3e9b4c2af580be74 It's not production code, only something I got working and tested locally in 2 hours, so it will need some polishing. I did include all the hard parts, like:
But it has some drawbacks:
|
By the way, @dpkp, @jeffwidman maybe you have a link for a good implementation of the consumer that uses |
That is quiet complex, maybe the background thread for heartbeat is a better way to go then. |
Just ran in to major problems again. Fetch size of 1, session timeout of 20 seconds, average processing time of 10 seconds, still I see the error happening about every 15 minutes and bumping the offset back again. I might need to switch back to the other python client which sort of worked. |
I would like to start working on this, we have multiple services at my day job that are impacted by this (and just realized there may be more due to #1039). Unfortunately, I've never really worked with Java, so will be a bit slow trying to figure this out. @tvoinarovskyi do you have any pointers on where to look in the Java code for where to start changing things compared to how they are in |
@jeffwidman Cool, so you can use https://github.com/apache/kafka/pull/1627/files to understand the parts that need to be altered, but I prefer doing the changes right from the I would approach this task with something like:
And hey, don't worry about breaking something, you will have another couple of eyes here =) When you start, you can create a WIP PR and I'll help out with more pointers, or take some part off from you. |
You can omit some hard parts on the first iteration of this:
So you can concentrate only on |
I worked on this over the weekend. There's a subtle issue that needs some attention. KIP-62 relies on new JoinGroup api versions. The v1 JoinGroup api supports passing both a session_timeout_ms and a rebalance_timeout_ms (aka max_poll_interval_ms). The session timeout tells the group coordinator how long to wait between heartbeats before marking the consumer dead and forcing a rebalance. The rebalance timeout tells the group coordinator how long to keep the rebalance operation open so that the consumer can complete a poll() loop, commit offsets, and begin rejoin. Question is how best to configure when we're connecting to an older broker that doesn't support the new JoinGroup api version. My current thinking is that we should require that session_timeout_ms == max_poll_interval_ms when using api_version < 0.10.1, and that we should only use the smaller default session_timeout_ms of 10000 if api_version >= 0.10.1 . This would allow consumers to use the background thread w/ older brokers, though I dont think it would help actual behavior because session_timeout_ms would still need to be configured as worst-case poll() duration. |
@dpkp how about just prevent setting max_poll_interval_ms for old broker versions? The behaviour for session_timeout seems fine as you described. |
Isn't this already solved by #1266? |
Yes! |
Allows consumers to take their time processing messages without being timed out from their consumer group.
max_poll_records
is a decent workaround for most of the pain here, but it'd still be nice to add this for consumers that have inconsistent message processing times.Related issues: #872, #544
The text was updated successfully, but these errors were encountered: