Skip to content

Commit

Permalink
Implement shared client (#133)
Browse files Browse the repository at this point in the history
  • Loading branch information
oliveigah authored Oct 11, 2023
1 parent 3227bf0 commit 1cadc01
Show file tree
Hide file tree
Showing 7 changed files with 552 additions and 105 deletions.
4 changes: 2 additions & 2 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,8 @@ jobs:
elixir: 1.7.4
otp: 21.3.8.17
- pair:
elixir: 1.11.3
otp: 23.2.5
elixir: 1.15.6
otp: 26.1.1
lint: lint
steps:
- uses: actions/checkout@v2
Expand Down
78 changes: 63 additions & 15 deletions lib/broadway_kafka/brod_client.ex
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,8 @@ defmodule BroadwayKafka.BrodClient do

@default_begin_offset :assigned

@default_shared_client false

@impl true
def init(opts) do
with {:ok, hosts} <- validate(opts, :hosts, required: true),
Expand All @@ -62,29 +64,34 @@ defmodule BroadwayKafka.BrodClient do
validate(opts, :offset_reset_policy, default: @default_offset_reset_policy),
{:ok, begin_offset} <-
validate(opts, :begin_offset, default: @default_begin_offset),
{:ok, shared_client} <-
validate(opts, :shared_client, default: @default_shared_client),
{:ok, group_config} <- validate_group_config(opts),
{:ok, fetch_config} <- validate_fetch_config(opts),
{:ok, client_config} <- validate_client_config(opts) do
{:ok,
%{
hosts: parse_hosts(hosts),
group_id: group_id,
topics: topics,
receive_interval: receive_interval,
reconnect_timeout: reconnect_timeout,
offset_commit_on_ack: offset_commit_on_ack,
offset_reset_policy: offset_reset_policy,
begin_offset: begin_offset,
group_config: [{:offset_commit_policy, @offset_commit_policy} | group_config],
fetch_config: Map.new(fetch_config || []),
client_config: client_config
}}
config = %{
hosts: parse_hosts(hosts),
group_id: group_id,
topics: topics,
receive_interval: receive_interval,
reconnect_timeout: reconnect_timeout,
offset_commit_on_ack: offset_commit_on_ack,
offset_reset_policy: offset_reset_policy,
begin_offset: begin_offset,
group_config: [{:offset_commit_policy, @offset_commit_policy} | group_config],
fetch_config: Map.new(fetch_config || []),
client_config: client_config,
shared_client: shared_client,
shared_client_id: build_shared_client_id(opts)
}

{:ok, shared_client_child_spec(config), config}
end
end

@impl true
def setup(stage_pid, client_id, callback_module, config) do
with :ok <- :brod.start_client(config.hosts, client_id, config.client_config),
with :ok <- do_start_brod_client(config.hosts, client_id, config.client_config),
{:ok, group_coordinator} <-
start_link_group_coordinator(stage_pid, client_id, callback_module, config) do
Process.monitor(client_id)
Expand Down Expand Up @@ -147,6 +154,19 @@ defmodule BroadwayKafka.BrodClient do
end
end

defp shared_client_child_spec(%{shared_client: false}), do: []

defp shared_client_child_spec(%{shared_client: true} = config) do
[
%{
id: config.shared_client_id,
start:
{:brod, :start_link_client,
[config.hosts, config.shared_client_id, config.client_config]}
}
]
end

defp lookup_offset(hosts, topic, partition, policy, client_config) do
case :brod.resolve_offset(hosts, topic, partition, policy, client_config) do
{:ok, offset} ->
Expand Down Expand Up @@ -268,6 +288,9 @@ defmodule BroadwayKafka.BrodClient do
defp validate_option(:client_id_prefix, value) when not is_binary(value),
do: validation_error(:client_id_prefix, "a string", value)

defp validate_option(:shared_client, value) when not is_boolean(value),
do: validation_error(:shared_client, "a boolean", value)

defp validate_option(:sasl, :undefined),
do: {:ok, :undefined}

Expand Down Expand Up @@ -387,4 +410,29 @@ defmodule BroadwayKafka.BrodClient do
end

defp parse_hosts(hosts), do: hosts

defp build_shared_client_id(opts) do
if opts[:shared_client] do
prefix = get_in(opts, [:client_config, :client_id_prefix])
broadway_name = opts[:broadway][:name]
:"#{prefix}#{Module.concat(broadway_name, SharedClient)}"
end
end

defp do_start_brod_client(hosts, client_id, client_config) do
case :brod.start_client(hosts, client_id, client_config) do
:ok ->
:ok

# Because we are starting the client on the broadway supervison tree
# instead of the :brod supervisor, the already_started error
# is not properly handled by :brod.start_client/3 for shared clients
# So we must handle it here.
{:error, {{:already_started, _}, _}} ->
:ok

error ->
error
end
end
end
4 changes: 3 additions & 1 deletion lib/broadway_kafka/kafka_client.ex
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,9 @@ defmodule BroadwayKafka.KafkaClient do
offset_commit_on_ack: boolean,
topics: [:brod.topic()],
group_config: keyword,
client_config: keyword
client_config: keyword,
shared_client: boolean(),
shared_client_id: atom() | nil
}

@typep offset_reset_policy :: :earliest | :latest
Expand Down
101 changes: 61 additions & 40 deletions lib/broadway_kafka/producer.ex
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,10 @@ defmodule BroadwayKafka.Producer do
When set to `:reset`, the starting offset will be dictated by the `:offset_reset_policy` option, either
starting from the `:earliest` or the `:latest` offsets of the topic. Default is `:assigned`.
* `:shared_client` - Optional. A boolean that defines how many clients will be started.
If `true`, only one shared client will be started for all producers, if `false` each producer
will have it's own client. Default is `false`
* `:group_config` - Optional. A list of options used to configure the group
coordinator. See the ["Group config options"](#module-group-config-options) section below for a list of all available
options.
Expand Down Expand Up @@ -231,51 +235,48 @@ defmodule BroadwayKafka.Producer do
def init(opts) do
Process.flag(:trap_exit, true)

client = opts[:client] || BroadwayKafka.BrodClient
config = opts[:initialized_client_config]

case client.init(opts) do
{:error, message} ->
raise ArgumentError, "invalid options given to #{inspect(client)}.init/1, " <> message
draining_after_revoke_flag =
self()
|> drain_after_revoke_table_name!()
|> drain_after_revoke_table_init!()

{:ok, config} ->
{_, producer_name} = Process.info(self(), :registered_name)
prefix = get_in(config, [:client_config, :client_id_prefix])

draining_after_revoke_flag =
self()
|> drain_after_revoke_table_name!()
|> drain_after_revoke_table_init!()
{_, producer_name} = Process.info(self(), :registered_name)

prefix = get_in(config, [:client_config, :client_id_prefix])
client_id = :"#{prefix}#{Module.concat([producer_name, Client])}"
client_id =
config[:shared_client_id] || :"#{prefix}#{Module.concat([producer_name, Client])}"

max_demand =
with [{_first, processor_opts}] <- opts[:broadway][:processors],
max_demand when is_integer(max_demand) <- processor_opts[:max_demand] do
max_demand
else
_ -> 10
end
max_demand =
with [{_first, processor_opts}] <- opts[:broadway][:processors],
max_demand when is_integer(max_demand) <- processor_opts[:max_demand] do
max_demand
else
_ -> 10
end

state = %{
client: client,
client_id: client_id,
group_coordinator: nil,
receive_timer: nil,
receive_interval: config.receive_interval,
reconnect_timeout: config.reconnect_timeout,
acks: Acknowledger.new(),
config: config,
allocator_names: allocator_names(opts[:broadway]),
revoke_caller: nil,
draining_after_revoke_flag: draining_after_revoke_flag,
demand: 0,
shutting_down?: false,
buffer: :queue.new(),
max_demand: max_demand
}
state = %{
client: opts[:client] || BroadwayKafka.BrodClient,
client_id: client_id,
group_coordinator: nil,
receive_timer: nil,
receive_interval: config.receive_interval,
reconnect_timeout: config.reconnect_timeout,
acks: Acknowledger.new(),
config: config,
allocator_names: allocator_names(opts[:broadway]),
revoke_caller: nil,
draining_after_revoke_flag: draining_after_revoke_flag,
demand: 0,
shutting_down?: false,
buffer: :queue.new(),
max_demand: max_demand,
shared_client: config.shared_client
}

{:producer, connect(state)}
end
{:producer, connect(state)}
end

defp allocator_names(broadway_config) do
Expand Down Expand Up @@ -509,7 +510,23 @@ defmodule BroadwayKafka.Producer do
|> Keyword.put(:processors, [updated_processor_entry | other_processors_entries])
|> Keyword.put(:batchers, updated_batchers_entries)

{allocators, updated_opts}
{producer_mod, producer_opts} = opts[:producer][:module]

client = producer_opts[:client] || BroadwayKafka.BrodClient

case client.init(Keyword.put(producer_opts, :broadway, opts)) do
{:error, message} ->
raise ArgumentError, "invalid options given to #{client}.init/1, " <> message

{:ok, extra_child_specs, config} ->
new_producer_opts =
Keyword.put(producer_opts, :initialized_client_config, config)

updated_opts =
put_in(updated_opts, [:producer, :module], {producer_mod, new_producer_opts})

{allocators ++ extra_child_specs, updated_opts}
end
end

@impl :brod_group_member
Expand Down Expand Up @@ -547,7 +564,11 @@ defmodule BroadwayKafka.Producer do
def terminate(_reason, state) do
%{client: client, group_coordinator: group_coordinator, client_id: client_id} = state
group_coordinator && Process.exit(group_coordinator, :shutdown)
client.disconnect(client_id)

if state.shared_client == false do
client.disconnect(client_id)
end

:ok
end

Expand Down
Loading

0 comments on commit 1cadc01

Please sign in to comment.