-
Notifications
You must be signed in to change notification settings - Fork 10
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Always restart non-group worker processes that exited abnormally #107
base: master
Are you sure you want to change the base?
Always restart non-group worker processes that exited abnormally #107
Conversation
2c04b64
to
b1d0d5c
Compare
The new behaviour is less surprising and consistent with the way group consumer workers behave. Fixes bbalser#104
b1d0d5c
to
dbcf417
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@bbalser can tell me if I'm missing something here, but I believe this is not a change we want to support because it would likely result in duplicate processing of messages and being able to recover from abnormal worker exit is a better scenario for a group consumer.
the un-grouped consumer has a deliberate restart policy of :temporary
precisely because there is no group consumer manager to track the offset state independently of the worker. when the work dies it takes its progress through processing the topic with it and can only restart back at the beginning where the begin_offset
value was fed in the worker's init_opts.
your example update to the integration test correctly indicates that restarting the worker will allow it to process the additional messages received (message3
and message4
) but it's missing that when the worker restarted it received (and re-processed) message1
and message2
a second time. If you update the test as follows it will still pass as an illustration:
start_supervised!(
{Elsa.Supervisor,
connection: :name1,
endpoints: @brokers,
consumer: [
topic: topic,
handler: Testing.ExampleMessageHandlerWithState,
handler_init_args: %{pid: self()},
begin_offset: :earliest
]}
)
send_messages(topic, ["message1"])
send_messages(topic, ["message2"])
assert_receive {:message, %{topic: ^topic, value: "message1"}}, 5_000
assert_receive {:message, %{topic: ^topic, value: "message2"}}, 5_000
kill_worker(topic)
send_messages(topic, ["message3"])
send_messages(topic, ["message4"])
assert_receive {:message, %{topic: ^topic, value: "message1"}}, 5_000 # <= these two messages were already processed
assert_receive {:message, %{topic: ^topic, value: "message2"}}, 5_000 # <= and get picked up again when the worker restarted with the original beginning_offset
assert_receive {:message, %{topic: ^topic, value: "message3"}}, 5_000
assert_receive {:message, %{topic: ^topic, value: "message4"}}, 5_000
sometimes you want this type of behavior in Kafka and duplicate data is an acceptable price to pay for the delivery guarantee but i don't think we want that in a "dumb" consumer. i believe the expected behavior for a dumb consumer would be that it will drop messages and restarting the topic consumer in that case would lead to some really difficult to identify data integrity bugs if you aren't explicitly expecting the potential for duplicate data from a group consumer tracking topic offsets
@jeffgrunewald You make a good point about the duplicate message handling (and I should update the test to clarify this). The consumer must be built in a way that it can deal with "at least once" delivery. In my opinion, this is still an acceptable situation. Here are some points that come to mind:
EDIT: Perhaps making the worker crash trigger a crash of the entire |
The new behaviour is less surprising and consistent with the way group consumer workers behave.
Fixes #104