Skip to content

Commit

Permalink
fix(dispatcher): remove from timeout ref when retries occur
Browse files Browse the repository at this point in the history
  • Loading branch information
oliveigah committed May 5, 2024
1 parent 1fc125e commit 5732688
Show file tree
Hide file tree
Showing 2 changed files with 65 additions and 69 deletions.
128 changes: 62 additions & 66 deletions lib/klife/producer/dispatcher.ex
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,8 @@ defmodule Klife.Producer.Dispatcher do
%__MODULE__{
batcher_pid: batcher_pid,
requests: requests,
broker_id: broker_id
broker_id: broker_id,
timeouts: timeouts
} = state

data =
Expand Down Expand Up @@ -180,75 +181,70 @@ defmodule Klife.Producer.Dispatcher do
end)
end)

case failure_list do
[] ->
send(batcher_pid, {:request_completed, pool_idx})
{:noreply, remove_request(state, req_ref)}
grouped_errors =
Enum.group_by(failure_list, fn {topic, partition, error_code, _base_offset} ->
cond do
error_code in @delivery_discard_codes ->
Logger.warning("""
Fatal error while producing message. Message will be discarded!
error_list ->
# TODO: Enhance specific code error handling
# TODO: One major problem with the current implementantion is that
# one bad topic can hold the in flight request spot for a long time
# can it be handled better without a new producer?
grouped_errors =
Enum.group_by(error_list, fn {topic, partition, error_code, _base_offset} ->
cond do
error_code in @delivery_discard_codes ->
Logger.warning("""
Fatal error while producing message. Message will be discarded!
topic: #{topic}
partition: #{partition}
error_code: #{error_code}
cluster: #{cluster_name}
broker_id: #{broker_id}
producer_name: #{producer_name}
""")

:discard

true ->
Logger.warning("""
Error while producing message. Message will be retried!
topic: #{topic}
partition: #{partition}
error_code: #{error_code}
cluster: #{cluster_name}
broker_id: #{broker_id}
producer_name: #{producer_name}
""")

:retry
end
end)
topic: #{topic}
partition: #{partition}
error_code: #{error_code}
to_discard = grouped_errors[:discard] || []
cluster: #{cluster_name}
broker_id: #{broker_id}
producer_name: #{producer_name}
""")

Enum.each(to_discard, fn {topic, partition, error_code, _base_offset} ->
delivery_confirmation_pids
|> Map.get({topic, partition}, [])
|> Enum.reverse()
|> Enum.each(fn {pid, _batch_offset} ->
send(pid, {:klife_produce_sync, :error, error_code})
end)
end)

to_drop_list = List.flatten([success_list, to_discard])
to_drop_keys = Enum.map(to_drop_list, fn {t, p, _, _} -> {t, p} end)
new_batch_to_send = Map.drop(batch_to_send, to_drop_keys)

if Map.keys(new_batch_to_send) == [] do
send(batcher_pid, {:request_completed, pool_idx})
{:noreply, remove_request(state, req_ref)}
else
Process.send_after(self(), {:dispatch, req_ref}, p_config.retry_ms)
new_req_data = %{data | batch_to_send: new_batch_to_send}
new_state = %{state | requests: Map.put(requests, req_ref, new_req_data)}
{:noreply, new_state}
:discard

true ->
Logger.warning("""
Error while producing message. Message will be retried!
topic: #{topic}
partition: #{partition}
error_code: #{error_code}
cluster: #{cluster_name}
broker_id: #{broker_id}
producer_name: #{producer_name}
""")

:retry
end
end)

to_discard = grouped_errors[:discard] || []
to_retry = grouped_errors[:retry] || []

Enum.each(to_discard, fn {topic, partition, error_code, _base_offset} ->
delivery_confirmation_pids
|> Map.get({topic, partition}, [])
|> Enum.reverse()
|> Enum.each(fn {pid, _batch_offset} ->
send(pid, {:klife_produce_sync, :error, error_code})
end)
end)

to_retry_keys = Enum.map(to_retry, fn {t, p, _, _} -> {t, p} end)
new_batch_to_send = Map.take(batch_to_send, to_retry_keys)

if new_batch_to_send == %{} do
send(batcher_pid, {:request_completed, pool_idx})
{:noreply, remove_request(state, req_ref)}
else
Process.send_after(self(), {:dispatch, req_ref}, p_config.retry_ms)
new_req_data = %{data | batch_to_send: new_batch_to_send}

new_state = %{
state
| requests: Map.put(requests, req_ref, new_req_data),
timeouts: Map.delete(timeouts, req_ref)
}

{:noreply, new_state}
end
end

Expand Down
6 changes: 3 additions & 3 deletions lib/klife/producer/producer.ex
Original file line number Diff line number Diff line change
Expand Up @@ -141,15 +141,15 @@ defmodule Klife.Producer do
end)
end)
|> List.flatten()
|> Enum.each(fn %{topic_name: t_name, partition_idx: p_idx, batcher_id: d_id} ->
|> Enum.each(fn %{topic_name: t_name, partition_idx: p_idx, batcher_id: b_id} ->
# Used when a record is produced by a non default producer
# in this case the proper batcher_id won't be present at
# main metadata ets table, therefore we need a way to
# find out it's value.
put_batcher_id(cluster_name, producer_name, t_name, p_idx, d_id)
put_batcher_id(cluster_name, producer_name, t_name, p_idx, b_id)

if ProducerController.get_default_producer(cluster_name, t_name, p_idx) == producer_name do
ProducerController.update_batcher_id(cluster_name, t_name, p_idx, d_id)
ProducerController.update_batcher_id(cluster_name, t_name, p_idx, b_id)
end
end)
end
Expand Down

0 comments on commit 5732688

Please sign in to comment.