Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: implement shared client #133

Merged
merged 5 commits into from
Oct 11, 2023
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
65 changes: 63 additions & 2 deletions lib/broadway_kafka/brod_client.ex
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,8 @@ defmodule BroadwayKafka.BrodClient do

@default_begin_offset :assigned

@default_shared_client false

@impl true
def init(opts) do
with {:ok, hosts} <- validate(opts, :hosts, required: true),
Expand All @@ -62,6 +64,8 @@ defmodule BroadwayKafka.BrodClient do
validate(opts, :offset_reset_policy, default: @default_offset_reset_policy),
{:ok, begin_offset} <-
validate(opts, :begin_offset, default: @default_begin_offset),
{:ok, shared_client} <-
validate(opts, :shared_client, default: @default_shared_client),
{:ok, group_config} <- validate_group_config(opts),
{:ok, fetch_config} <- validate_fetch_config(opts),
{:ok, client_config} <- validate_client_config(opts) do
Expand All @@ -77,14 +81,16 @@ defmodule BroadwayKafka.BrodClient do
begin_offset: begin_offset,
group_config: [{:offset_commit_policy, @offset_commit_policy} | group_config],
fetch_config: Map.new(fetch_config || []),
client_config: client_config
client_config: client_config,
shared_client: shared_client,
shared_client_id: build_shared_client_id(opts)
}}
end
end

@impl true
def setup(stage_pid, client_id, callback_module, config) do
with :ok <- :brod.start_client(config.hosts, client_id, config.client_config),
with :ok <- do_start_brod_client(config.hosts, client_id, config.client_config),
{:ok, group_coordinator} <-
start_link_group_coordinator(stage_pid, client_id, callback_module, config) do
Process.monitor(client_id)
Expand Down Expand Up @@ -147,6 +153,20 @@ defmodule BroadwayKafka.BrodClient do
end
end

@impl true
def prepare_for_start(broadway_opts) do
{_, producer_opts} = broadway_opts[:producer][:module]
init_opts = Keyword.put(producer_opts, :broadway, broadway_opts)

case init(init_opts) do
{:error, message} ->
raise ArgumentError, "invalid options given to #{__MODULE__}.init/1, " <> message
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this module is not quite public, so I would rather include the user module instead in the error message. like invalid options were given to Broadway.start_link.

Copy link
Contributor Author

@oliveigah oliveigah Oct 10, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have implemented the same error message that is being raised currently on

raise ArgumentError, "invalid options given to #{inspect(client)}.init/1, " <> message

Should I update both places?


{:ok, config} ->
{child_specs(config), broadway_opts}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Instead of returning the options, should we return the initialized state?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think so. Since this callback is directly related to broadway's prepare_for_start/2, I think make sense they have the same return data.

IMO, the fact that our brod_client calls init/1 here is an implementation detail that should not leak to the producer module.

Do you see some advantages in returning the config data instead of broadway options?

If we return the initalized state here, how should we handle it on the the producer module? Gonna add it on the broadway options there? What you think?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should move this whole callback to the producer, and have the logic in the producer.

BrodClient is completely an implementation detail, it is only replaceable for tests, so I think going with the callback on the client and validating twice is adding extra indirection.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done @josevalim!

Moved most of the logic from the brod_client (now its only returns the child_spec) to the producer and reuse the first initialization in order to avoid unnecessary validation.

Thanks for the feedback! 😄

Let me know if I can improve the solution in any other way!

end
end

defp lookup_offset(hosts, topic, partition, policy, client_config) do
case :brod.resolve_offset(hosts, topic, partition, policy, client_config) do
{:ok, offset} ->
Expand Down Expand Up @@ -174,6 +194,19 @@ defmodule BroadwayKafka.BrodClient do
)
end

defp child_specs(%{shared_client: false} = _config), do: []

defp child_specs(%{shared_client: true} = config) do
[
%{
id: config.shared_client_id,
start:
{:brod, :start_link_client,
[config.hosts, config.shared_client_id, config.client_config]}
}
]
end

defp validate(opts, key, options \\ []) when is_list(opts) do
has_key = Keyword.has_key?(opts, key)
required = Keyword.get(options, :required, false)
Expand Down Expand Up @@ -268,6 +301,9 @@ defmodule BroadwayKafka.BrodClient do
defp validate_option(:client_id_prefix, value) when not is_binary(value),
do: validation_error(:client_id_prefix, "a string", value)

defp validate_option(:shared_client, value) when not is_boolean(value),
do: validation_error(:shared_client, "a boolean", value)

defp validate_option(:sasl, :undefined),
do: {:ok, :undefined}

Expand Down Expand Up @@ -387,4 +423,29 @@ defmodule BroadwayKafka.BrodClient do
end

defp parse_hosts(hosts), do: hosts

defp build_shared_client_id(opts) do
if opts[:shared_client] do
prefix = get_in(opts, [:client_config, :client_id_prefix])
broadway_name = opts[:broadway][:name]
:"#{prefix}#{Module.concat(broadway_name, SharedClient)}"
end
end

defp do_start_brod_client(hosts, client_id, client_config) do
case :brod.start_client(hosts, client_id, client_config) do
:ok ->
:ok

# Because we are starting the client on the broadway supervison tree
# instead of the :brod supervisor, the already_started error
# is not properly handled by :brod.start_client/3 for shared clients
# So we must handle it here.
{:error, {{:already_started, _}, _}} ->
:ok

error ->
error
end
end
end
10 changes: 9 additions & 1 deletion lib/broadway_kafka/kafka_client.ex
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,9 @@ defmodule BroadwayKafka.KafkaClient do
offset_commit_on_ack: boolean,
topics: [:brod.topic()],
group_config: keyword,
client_config: keyword
client_config: keyword,
shared_client: boolean(),
shared_client_id: atom() | nil
}

@typep offset_reset_policy :: :earliest | :latest
Expand Down Expand Up @@ -52,4 +54,10 @@ defmodule BroadwayKafka.KafkaClient do
@callback update_topics(:brod.group_coordinator(), [:brod.topic()]) :: :ok
@callback connected?(:brod.client()) :: boolean
@callback disconnect(:brod.client()) :: :ok

@callback prepare_for_start(broadway_opts :: keyword()) ::
{[child_spec], updated_opts :: keyword()}
when child_spec: :supervisor.child_spec() | {module, any} | module

@optional_callbacks prepare_for_start: 1
end
29 changes: 25 additions & 4 deletions lib/broadway_kafka/producer.ex
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,10 @@ defmodule BroadwayKafka.Producer do
When set to `:reset`, the starting offset will be dictated by the `:offset_reset_policy` option, either
starting from the `:earliest` or the `:latest` offsets of the topic. Default is `:assigned`.

* `:shared_client` - Optional. A boolean that defines how many clients will be started.
If `true`, only one shared client will be started for all producers, if `false` each producer
will have it's own client. Default is `false`

* `:group_config` - Optional. A list of options used to configure the group
coordinator. See the ["Group config options"](#module-group-config-options) section below for a list of all available
options.
Expand Down Expand Up @@ -246,7 +250,9 @@ defmodule BroadwayKafka.Producer do
|> drain_after_revoke_table_init!()

prefix = get_in(config, [:client_config, :client_id_prefix])
client_id = :"#{prefix}#{Module.concat([producer_name, Client])}"

client_id =
config[:shared_client_id] || :"#{prefix}#{Module.concat([producer_name, Client])}"

max_demand =
with [{_first, processor_opts}] <- opts[:broadway][:processors],
Expand All @@ -271,7 +277,8 @@ defmodule BroadwayKafka.Producer do
demand: 0,
shutting_down?: false,
buffer: :queue.new(),
max_demand: max_demand
max_demand: max_demand,
shared_client: config[:shared_client]
}

{:producer, connect(state)}
Expand Down Expand Up @@ -509,7 +516,17 @@ defmodule BroadwayKafka.Producer do
|> Keyword.put(:processors, [updated_processor_entry | other_processors_entries])
|> Keyword.put(:batchers, updated_batchers_entries)

{allocators, updated_opts}
{_, kafka_producer_opts} = opts[:producer][:module]
client = kafka_producer_opts[:client] || BroadwayKafka.BrodClient

{client_child_specs, updated_opts} =
if function_exported?(client, :prepare_for_start, 1) do
client.prepare_for_start(updated_opts)
else
{[], updated_opts}
end

{allocators ++ client_child_specs, updated_opts}
end

@impl :brod_group_member
Expand Down Expand Up @@ -547,7 +564,11 @@ defmodule BroadwayKafka.Producer do
def terminate(_reason, state) do
%{client: client, group_coordinator: group_coordinator, client_id: client_id} = state
group_coordinator && Process.exit(group_coordinator, :shutdown)
client.disconnect(client_id)

if state.shared_client == false do
client.disconnect(client_id)
end

:ok
end

Expand Down
83 changes: 83 additions & 0 deletions test/brod_client_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -363,6 +363,89 @@ defmodule BroadwayKafka.BrodClientTest do

assert {:ok, %{client_config: [query_api_versions: false]}} = BrodClient.init(opts)
end

test ":shared_client is an optional boolean" do
opts = Keyword.put(@opts, :shared_client, "true")

assert BrodClient.init(opts) ==
{:error, "expected :shared_client to be a boolean, got: \"true\""}

opts =
@opts
|> Keyword.put(:shared_client, true)
|> Keyword.put(:broadway, name: :my_broadway_name)
|> put_in([:client_config, :client_id_prefix], "my_prefix.")

assert {:ok, %{shared_client: true}} = BrodClient.init(opts)
end

test "return shared_client_id when :shared_client is true" do
opts =
@opts
|> Keyword.put(:shared_client, true)
|> Keyword.put(:broadway, name: :my_broadway_name)
|> put_in([:client_config, :client_id_prefix], "my_prefix.")

assert {:ok,
%{
shared_client: true,
shared_client_id: :"my_prefix.Elixir.my_broadway_name.SharedClient"
}} =
BrodClient.init(opts)

opts =
@opts
|> Keyword.put(:shared_client, false)
|> Keyword.put(:broadway, name: :my_broadway_name)
|> put_in([:client_config, :client_id_prefix], "my_prefix.")

assert {:ok,
%{
shared_client: false,
shared_client_id: nil
}} =
BrodClient.init(opts)
end
end

describe "prepare for start" do
test "should return an empty list and unchanged opts when shared_client is not true" do
broadway_opts = [
name: :my_broadway,
producer: [
module: {BroadwayKafka.Producer, @opts}
]
]

assert {[], ^broadway_opts} = BrodClient.prepare_for_start(broadway_opts)
end

test "should return :brod_client child spec and unchanged opts when shared_client is true" do
module_opts =
@opts
|> Keyword.put(:shared_client, true)
|> Keyword.put(:client_config, client_id_prefix: "my_prefix.")

broadway_opts = [
name: :my_broadway,
producer: [
module: {BroadwayKafka.Producer, module_opts}
]
]

assert {child_specs, ^broadway_opts} = BrodClient.prepare_for_start(broadway_opts)

assert [
%{
id: shared_client_id,
start: {:brod, :start_link_client, [hosts, shared_client_id, client_config]}
}
] = child_specs

assert [{:host, 9092}] = hosts
assert :"my_prefix.Elixir.my_broadway.SharedClient" = shared_client_id
assert [client_id_prefix: "my_prefix."] = client_config
end
end

defmodule FakeSaslMechanismPlugin do
Expand Down
Loading