-
Notifications
You must be signed in to change notification settings - Fork 53
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: implement shared client #133
Changes from 1 commit
9367887
f8d8c64
8a9cbcc
a623269
5f8fd46
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -47,6 +47,8 @@ defmodule BroadwayKafka.BrodClient do | |
|
||
@default_begin_offset :assigned | ||
|
||
@default_shared_client false | ||
|
||
@impl true | ||
def init(opts) do | ||
with {:ok, hosts} <- validate(opts, :hosts, required: true), | ||
|
@@ -62,6 +64,8 @@ defmodule BroadwayKafka.BrodClient do | |
validate(opts, :offset_reset_policy, default: @default_offset_reset_policy), | ||
{:ok, begin_offset} <- | ||
validate(opts, :begin_offset, default: @default_begin_offset), | ||
{:ok, shared_client} <- | ||
validate(opts, :shared_client, default: @default_shared_client), | ||
{:ok, group_config} <- validate_group_config(opts), | ||
{:ok, fetch_config} <- validate_fetch_config(opts), | ||
{:ok, client_config} <- validate_client_config(opts) do | ||
|
@@ -77,14 +81,16 @@ defmodule BroadwayKafka.BrodClient do | |
begin_offset: begin_offset, | ||
group_config: [{:offset_commit_policy, @offset_commit_policy} | group_config], | ||
fetch_config: Map.new(fetch_config || []), | ||
client_config: client_config | ||
client_config: client_config, | ||
shared_client: shared_client, | ||
shared_client_id: build_shared_client_id(opts) | ||
}} | ||
end | ||
end | ||
|
||
@impl true | ||
def setup(stage_pid, client_id, callback_module, config) do | ||
with :ok <- :brod.start_client(config.hosts, client_id, config.client_config), | ||
with :ok <- do_start_brod_client(config.hosts, client_id, config.client_config), | ||
{:ok, group_coordinator} <- | ||
start_link_group_coordinator(stage_pid, client_id, callback_module, config) do | ||
Process.monitor(client_id) | ||
|
@@ -147,6 +153,20 @@ defmodule BroadwayKafka.BrodClient do | |
end | ||
end | ||
|
||
@impl true | ||
def prepare_for_start(broadway_opts) do | ||
{_, producer_opts} = broadway_opts[:producer][:module] | ||
init_opts = Keyword.put(producer_opts, :broadway, broadway_opts) | ||
|
||
case init(init_opts) do | ||
{:error, message} -> | ||
raise ArgumentError, "invalid options given to #{__MODULE__}.init/1, " <> message | ||
|
||
{:ok, config} -> | ||
{child_specs(config), broadway_opts} | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Instead of returning the options, should we return the initialized state? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I don't think so. Since this callback is directly related to broadway's IMO, the fact that our brod_client calls Do you see some advantages in returning the config data instead of broadway options? If we return the initalized state here, how should we handle it on the the producer module? Gonna add it on the broadway options there? What you think? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think we should move this whole callback to the producer, and have the logic in the producer. BrodClient is completely an implementation detail, it is only replaceable for tests, so I think going with the callback on the client and validating twice is adding extra indirection. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Done @josevalim! Moved most of the logic from the brod_client (now its only returns the child_spec) to the producer and reuse the first initialization in order to avoid unnecessary validation. Thanks for the feedback! 😄 Let me know if I can improve the solution in any other way! |
||
end | ||
end | ||
|
||
defp lookup_offset(hosts, topic, partition, policy, client_config) do | ||
case :brod.resolve_offset(hosts, topic, partition, policy, client_config) do | ||
{:ok, offset} -> | ||
|
@@ -174,6 +194,19 @@ defmodule BroadwayKafka.BrodClient do | |
) | ||
end | ||
|
||
defp child_specs(%{shared_client: false} = _config), do: [] | ||
|
||
defp child_specs(%{shared_client: true} = config) do | ||
[ | ||
%{ | ||
id: config.shared_client_id, | ||
start: | ||
{:brod, :start_link_client, | ||
[config.hosts, config.shared_client_id, config.client_config]} | ||
} | ||
] | ||
end | ||
|
||
defp validate(opts, key, options \\ []) when is_list(opts) do | ||
has_key = Keyword.has_key?(opts, key) | ||
required = Keyword.get(options, :required, false) | ||
|
@@ -268,6 +301,9 @@ defmodule BroadwayKafka.BrodClient do | |
defp validate_option(:client_id_prefix, value) when not is_binary(value), | ||
do: validation_error(:client_id_prefix, "a string", value) | ||
|
||
defp validate_option(:shared_client, value) when not is_boolean(value), | ||
do: validation_error(:shared_client, "a boolean", value) | ||
|
||
defp validate_option(:sasl, :undefined), | ||
do: {:ok, :undefined} | ||
|
||
|
@@ -387,4 +423,29 @@ defmodule BroadwayKafka.BrodClient do | |
end | ||
|
||
defp parse_hosts(hosts), do: hosts | ||
|
||
defp build_shared_client_id(opts) do | ||
if opts[:shared_client] do | ||
prefix = get_in(opts, [:client_config, :client_id_prefix]) | ||
broadway_name = opts[:broadway][:name] | ||
:"#{prefix}#{Module.concat(broadway_name, SharedClient)}" | ||
end | ||
end | ||
|
||
defp do_start_brod_client(hosts, client_id, client_config) do | ||
case :brod.start_client(hosts, client_id, client_config) do | ||
:ok -> | ||
:ok | ||
|
||
# Because we are starting the client on the broadway supervison tree | ||
# instead of the :brod supervisor, the already_started error | ||
# is not properly handled by :brod.start_client/3 for shared clients | ||
# So we must handle it here. | ||
{:error, {{:already_started, _}, _}} -> | ||
:ok | ||
|
||
error -> | ||
error | ||
end | ||
end | ||
end |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this module is not quite public, so I would rather include the user module instead in the error message. like invalid options were given to Broadway.start_link.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I have implemented the same error message that is being raised currently on
broadway_kafka/lib/broadway_kafka/producer.ex
Line 238 in 3227bf0
Should I update both places?