Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

erlkaf_consumer stop timeout #21

Closed
thusnjak opened this issue May 21, 2020 · 10 comments
Closed

erlkaf_consumer stop timeout #21

thusnjak opened this issue May 21, 2020 · 10 comments

Comments

@thusnjak
Copy link

Hello,

after upgrading to erlkaf 2.0.0 (commit 1d706c6) we started experiencing consumer stop timeouts on consumer group rebalance.

Our consumer group has only one topic with 36 partitions and originally we have 2 application instances consuming 18 each partitions. When there is some more load we scale horizontally up to 9 applications and consumer group rebalances.

Sometimes, when rebalance occurs consumer timeout on stop happens (erlkaf_consumer.erl:50), gen_server crashes. There wouldn't be any problem but consumer group rebalance continues in a loop and our app never recovers. The consumer just keeps starting and crashing in a loop and stable group rebalance never happens.

erlkaf_timeout.txt

  • shows behaviour with original erlkaf code which shows that in first case 15/18 consumers stops within 5 seconds timeout and then the crash occurs. Partitions are being reassigned and shortly after in second rebalance 14/18 consumers stop gracefully and gen_server crashes again... this keeps happening in a loop and application never recovers to work stable.

erlkaf_timeout_extended.txt

  • shows behaviour when timeout in erlkaf_consumer:50 is increased to 60 seconds. This enables all the consumers to stop and a successful rebalance to complete, but the timestamps clearly show that some consumers stop instantly and in worst case, we have to wait over 30 seconds for the last one to stop.

This never happened with erlkaf 1.1.9 which we used for quite a while. What is different?

@silviucpp
Copy link
Owner

Hello,

In erlkaf 2.0.0 we did a major rewrite to improve the behavior when you have lot of topics. (https://github.com/silviucpp/erlkaf#upgrading-from-v1x-to-v200)
We are still not using 2.0.0 in production because it has lot of internal changes. I will investigate the problem you are talking about soon.

@silviucpp
Copy link
Owner

Hello,

Looking to the logs I think problem is that one of your callbacks it's crashing during the init/4 and forces the entire consumer group to rebalance in loop.

Is this the case ?

Silviu

@silviucpp
Copy link
Owner

I might be wrong:) the logs are in a pretty strange format :). I'll dig more

@silviucpp
Copy link
Owner

@thusnjak Can you please identify where the consumers spends so much time when it's stopped ?

When a rebalance occurs:

  1. The consumer group will try to stop all consumers in the group by sending a stop message to each children process: https://github.com/silviucpp/erlkaf/blob/master/src/erlkaf_consumer.erl#L45

2a. Once that stop message is received by the gen_server (if it's idle) will call:

https://github.com/silviucpp/erlkaf/blob/master/src/erlkaf_consumer.erl#L128

2b. In case the consumer is during some processing messages, after going out from the app callback (your handle_message/2 callback) will check if stop was received:

https://github.com/silviucpp/erlkaf/blob/master/src/erlkaf_consumer.erl#L193

In case your handle_message/2 callback takes more than 5 seconds can explain the issue.. or maybe a bug into our code..

Will be nice if you can put some logs and identify what function takes such big amount of time.

Silviu

@silviucpp
Copy link
Owner

So I think I replicated the problem:

handle_message(Msgs, State) ->
    case Msgs of
        #erlkaf_msg{topic = Topic, partition = Partition, offset = Offset} ->
            io:format("handle_message topic: ~p partition: ~p offset: ~p state: ~p ~n", [Topic, Partition, Offset, State]);
        _ ->
            io:format("handle_message BATCH count: ~p state: ~p ~n", [length(Msgs), State])
    end,

    timer:sleep(30000),

    io:format("handle_message FINISHED ~n", []),

    {ok, State}.

I created a handle_message that adds a delay of 30 seconds and seems the problem replicate. I will try to find exactly way and to do a fix but will be nice to confirm that also your handle_message takes more than 5 seconds till return

silviucpp pushed a commit that referenced this issue May 27, 2020
@silviucpp
Copy link
Owner

Please try the last master and let me know if you can still replicate

@thusnjak
Copy link
Author

Hi,

thank you for your quick response.

We did some crash testing and it seemed all fine. The fix is today deployed on our staging so most likely till Monday we'll know for sure.

I'll let you know.

Thanks,
BR

T

@silviucpp
Copy link
Owner

Thanks a lot I'm waiting for your feedback

Silviu

@thusnjak
Copy link
Author

thusnjak commented Jun 2, 2020

So far we haven't noticed any problems. We'll keep monitoring and let you know if any strange behaviour happens.

Thank you.
Tin

@thusnjak thusnjak closed this as completed Jun 2, 2020
@thusnjak
Copy link
Author

thusnjak commented Jul 1, 2020

We went into production with this and didn't experience any problems so far :) Tx

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants