-
Notifications
You must be signed in to change notification settings - Fork 53
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: implement shared client #133
Changes from 2 commits
9367887
f8d8c64
8a9cbcc
a623269
5f8fd46
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -49,6 +49,10 @@ defmodule BroadwayKafka.Producer do | |
When set to `:reset`, the starting offset will be dictated by the `:offset_reset_policy` option, either | ||
starting from the `:earliest` or the `:latest` offsets of the topic. Default is `:assigned`. | ||
|
||
* `:shared_client` - Optional. A boolean that defines how many clients will be started. | ||
If `true`, only one shared client will be started for all producers, if `false` each producer | ||
will have it's own client. Default is `false` | ||
|
||
* `:group_config` - Optional. A list of options used to configure the group | ||
coordinator. See the ["Group config options"](#module-group-config-options) section below for a list of all available | ||
options. | ||
|
@@ -233,7 +237,7 @@ defmodule BroadwayKafka.Producer do | |
|
||
client = opts[:client] || BroadwayKafka.BrodClient | ||
|
||
case client.init(opts) do | ||
case opts[:initialized_client_config] || client.init(opts) do | ||
{:error, message} -> | ||
raise ArgumentError, "invalid options given to #{inspect(client)}.init/1, " <> message | ||
|
||
|
@@ -246,7 +250,9 @@ defmodule BroadwayKafka.Producer do | |
|> drain_after_revoke_table_init!() | ||
|
||
prefix = get_in(config, [:client_config, :client_id_prefix]) | ||
client_id = :"#{prefix}#{Module.concat([producer_name, Client])}" | ||
|
||
client_id = | ||
config[:shared_client_id] || :"#{prefix}#{Module.concat([producer_name, Client])}" | ||
|
||
max_demand = | ||
with [{_first, processor_opts}] <- opts[:broadway][:processors], | ||
|
@@ -271,7 +277,8 @@ defmodule BroadwayKafka.Producer do | |
demand: 0, | ||
shutting_down?: false, | ||
buffer: :queue.new(), | ||
max_demand: max_demand | ||
max_demand: max_demand, | ||
shared_client: config.shared_client | ||
} | ||
|
||
{:producer, connect(state)} | ||
|
@@ -509,7 +516,29 @@ defmodule BroadwayKafka.Producer do | |
|> Keyword.put(:processors, [updated_processor_entry | other_processors_entries]) | ||
|> Keyword.put(:batchers, updated_batchers_entries) | ||
|
||
{allocators, updated_opts} | ||
{producer_mod, producer_opts} = opts[:producer][:module] | ||
|
||
{extra_child_specs, initialized_client_config} = | ||
if producer_opts[:shared_client] do | ||
client = producer_opts[:client] || BroadwayKafka.BrodClient | ||
|
||
case client.init(Keyword.put(producer_opts, :broadway, opts)) do | ||
{:error, message} -> | ||
raise ArgumentError, "invalid options given to #{client}.init/1, " <> message | ||
|
||
{:ok, config} = result -> | ||
{client.shared_client_child_spec(config), result} | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Instead of adding one additional callback, change There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I thought about doing this but was worried about potential problems with some other clients implementation, because it is a breaking change on the current kafka_client interface. If this is not a problem I agree 100%. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The whole kafka client API is private :) we only use it for tests, feel free to break it. In any case, we can ship it as a new minor version, if we want to be safe. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. That was not clear to me at all! 🤣 . Now everything makes sense! Thanks! |
||
end | ||
else | ||
{[], nil} | ||
end | ||
|
||
new_producer_opts = | ||
Keyword.put(producer_opts, :initialized_client_config, initialized_client_config) | ||
|
||
updated_opts = put_in(updated_opts, [:producer, :module], {producer_mod, new_producer_opts}) | ||
|
||
{allocators ++ extra_child_specs, updated_opts} | ||
end | ||
|
||
@impl :brod_group_member | ||
|
@@ -547,7 +576,11 @@ defmodule BroadwayKafka.Producer do | |
def terminate(_reason, state) do | ||
%{client: client, group_coordinator: group_coordinator, client_id: client_id} = state | ||
group_coordinator && Process.exit(group_coordinator, :shutdown) | ||
client.disconnect(client_id) | ||
|
||
if state.shared_client == false do | ||
client.disconnect(client_id) | ||
end | ||
|
||
:ok | ||
end | ||
|
||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Instead of calling
init
only if:shared_client
, let's always callinit
here instead of callinginit
when the producer starts.