Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Always restart non-group worker processes that exited abnormally #107

Open
wants to merge 1 commit into
base: master
Choose a base branch
from

Conversation

jtrees
Copy link

@jtrees jtrees commented Apr 25, 2023

The new behaviour is less surprising and consistent with the way group consumer workers behave.

Fixes #104

@jtrees jtrees force-pushed the fix-worker-process-dying-without-restart branch from 2c04b64 to b1d0d5c Compare April 25, 2023 10:19
The new behaviour is less surprising and consistent with the way group
consumer workers behave.

Fixes bbalser#104
@jtrees jtrees force-pushed the fix-worker-process-dying-without-restart branch from b1d0d5c to dbcf417 Compare April 25, 2023 10:22
Copy link
Collaborator

@jeffgrunewald jeffgrunewald left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@bbalser can tell me if I'm missing something here, but I believe this is not a change we want to support because it would likely result in duplicate processing of messages and being able to recover from abnormal worker exit is a better scenario for a group consumer.

the un-grouped consumer has a deliberate restart policy of :temporary precisely because there is no group consumer manager to track the offset state independently of the worker. when the work dies it takes its progress through processing the topic with it and can only restart back at the beginning where the begin_offset value was fed in the worker's init_opts.

your example update to the integration test correctly indicates that restarting the worker will allow it to process the additional messages received (message3 and message4) but it's missing that when the worker restarted it received (and re-processed) message1 and message2 a second time. If you update the test as follows it will still pass as an illustration:

    start_supervised!(
      {Elsa.Supervisor,
       connection: :name1,
       endpoints: @brokers,
       consumer: [
         topic: topic,
         handler: Testing.ExampleMessageHandlerWithState,
         handler_init_args: %{pid: self()},
         begin_offset: :earliest
       ]}
    )

    send_messages(topic, ["message1"])
    send_messages(topic, ["message2"])

    assert_receive {:message, %{topic: ^topic, value: "message1"}}, 5_000
    assert_receive {:message, %{topic: ^topic, value: "message2"}}, 5_000

    kill_worker(topic)

    send_messages(topic, ["message3"])
    send_messages(topic, ["message4"])

    assert_receive {:message, %{topic: ^topic, value: "message1"}}, 5_000    # <= these two messages were already processed
    assert_receive {:message, %{topic: ^topic, value: "message2"}}, 5_000   # <= and get picked up again when the worker restarted with the original beginning_offset
    assert_receive {:message, %{topic: ^topic, value: "message3"}}, 5_000
    assert_receive {:message, %{topic: ^topic, value: "message4"}}, 5_000

sometimes you want this type of behavior in Kafka and duplicate data is an acceptable price to pay for the delivery guarantee but i don't think we want that in a "dumb" consumer. i believe the expected behavior for a dumb consumer would be that it will drop messages and restarting the topic consumer in that case would lead to some really difficult to identify data integrity bugs if you aren't explicitly expecting the potential for duplicate data from a group consumer tracking topic offsets

@jtrees
Copy link
Author

jtrees commented May 10, 2023

@jeffgrunewald You make a good point about the duplicate message handling (and I should update the test to clarify this). The consumer must be built in a way that it can deal with "at least once" delivery.

In my opinion, this is still an acceptable situation. Here are some points that come to mind:

  • If the Elsa.Supervisor dies and is restarted by the parent supervisor, we run into this situation anyway. We still need to handle it either way.
  • Currently the failure state (crashed worker) is invisible on the outside (at least via public APIs). Even if an "auto-restart" is undesirable as a default feature, there should at least be some kind of feedback or options to handle the scenario in other ways. I think using an opinionated default (which will work for some folks) is better than the unhandled status quo which (AFAIK) works for no-one.

EDIT: Perhaps making the worker crash trigger a crash of the entire Elsa.Supervisor is a viable alternative? Then it would be up to the user (of the lib) to decide how to deal with it.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Bug: Non-group workers are not restarted after crash
4 participants