-
Notifications
You must be signed in to change notification settings - Fork 53
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: implement shared client #133
Conversation
lib/broadway_kafka/brod_client.ex
Outdated
|
||
case init(init_opts) do | ||
{:error, message} -> | ||
raise ArgumentError, "invalid options given to #{__MODULE__}.init/1, " <> message |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this module is not quite public, so I would rather include the user module instead in the error message. like invalid options were given to Broadway.start_link.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I have implemented the same error message that is being raised currently on
broadway_kafka/lib/broadway_kafka/producer.ex
Line 238 in 3227bf0
raise ArgumentError, "invalid options given to #{inspect(client)}.init/1, " <> message |
Should I update both places?
lib/broadway_kafka/brod_client.ex
Outdated
raise ArgumentError, "invalid options given to #{__MODULE__}.init/1, " <> message | ||
|
||
{:ok, config} -> | ||
{child_specs(config), broadway_opts} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Instead of returning the options, should we return the initialized state?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think so. Since this callback is directly related to broadway's prepare_for_start/2
, I think make sense they have the same return data.
IMO, the fact that our brod_client calls init/1
here is an implementation detail that should not leak to the producer module.
Do you see some advantages in returning the config data instead of broadway options?
If we return the initalized state here, how should we handle it on the the producer module? Gonna add it on the broadway options there? What you think?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we should move this whole callback to the producer, and have the logic in the producer.
BrodClient is completely an implementation detail, it is only replaceable for tests, so I think going with the callback on the client and validating twice is adding extra indirection.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done @josevalim!
Moved most of the logic from the brod_client (now its only returns the child_spec) to the producer and reuse the first initialization in order to avoid unnecessary validation.
Thanks for the feedback! 😄
Let me know if I can improve the solution in any other way!
lib/broadway_kafka/producer.ex
Outdated
raise ArgumentError, "invalid options given to #{client}.init/1, " <> message | ||
|
||
{:ok, config} = result -> | ||
{client.shared_client_child_spec(config), result} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Instead of adding one additional callback, change client.init
to return {:ok, child_specs, config}
or {:error, mesage}
. :)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I thought about doing this but was worried about potential problems with some other clients implementation, because it is a breaking change on the current kafka_client interface. If this is not a problem I agree 100%.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The whole kafka client API is private :) we only use it for tests, feel free to break it. In any case, we can ship it as a new minor version, if we want to be safe.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That was not clear to me at all! 🤣 . Now everything makes sense! Thanks!
lib/broadway_kafka/producer.ex
Outdated
{producer_mod, producer_opts} = opts[:producer][:module] | ||
|
||
{extra_child_specs, initialized_client_config} = | ||
if producer_opts[:shared_client] do |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Instead of calling init
only if :shared_client
, let's always call init
here instead of calling init
when the producer starts.
Applied the proposed changes @josevalim!
|
Tests are failing, can you please take a look? The linting one is quite old, so feel free to bump the Elixir version to make the formatter happier. |
Sure! I've tested everything locally and everything passed. Gonna try to reproduce it. Maybe it is flaky somehow. |
@josevalim, I could not reproduce the integration test problem neither locally or on CI. But my guess is that using the same topic/group might have impacted in some way, also I increased the waiting time of the integration test, since on the failed run we missed only 1 message (999 expected 1000). So in summary the changes were:
|
💚 💙 💜 💛 ❤️ |
Context: #132
First attempt to solve the related issue. Let me know what you think! 😄
Some considerations:
init/1
callback implemented by the client because I need it there in order to return the proper child_spec on the new added callback. Is this difference ok?shared_client
isfalse
the clients are being started under brod's supervisor but when it istrue
it is started under broadway supervision tree. Is that ok? I had a tricky bug because of this difference. (left a comment on the code about it)