Skip to content

Commit

Permalink
feat: add boolean flag 'share_leader_conn' in consumer config
Browse files Browse the repository at this point in the history
Set to `true' to consume less TCP connections towards Kafka,
but may lead to higher fetch latency. This is because Kafka can
ony accumulate messages for the oldest fetch request, later
requests behind it may get blocked until `max_wait_time' expires
for the oldest one
  • Loading branch information
zmstone committed Sep 21, 2024
1 parent 87b1296 commit c186c86
Show file tree
Hide file tree
Showing 4 changed files with 90 additions and 29 deletions.
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
# Changelog

- 4.2.0
- Add `share_leader_conn` consumer config (default `false`) to optimize fetch latency.

- 4.1.1
- Upgrade `kafka_protocol` from version 4.1.5 to 4.1.9.

Expand Down
1 change: 1 addition & 0 deletions src/brod.erl
Original file line number Diff line number Diff line change
Expand Up @@ -262,6 +262,7 @@
| {offset_reset_policy, brod_consumer:offset_reset_policy()}
| {size_stat_window, non_neg_integer()}
| {isolation_level, brod_consumer:isolation_level()}
| {share_leader_conn, boolean()}
].
%% Consumer configuration.
%%
Expand Down
11 changes: 10 additions & 1 deletion src/brod_client.erl
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
%%%
%%% Copyright (c) 2015-2021 Klarna Bank AB (publ)
%%% Copyright (c) 2022-2024 kafka4beam contributors
%%%
%%% Licensed under the Apache License, Version 2.0 (the "License");
%%% you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -30,6 +30,7 @@
, get_group_coordinator/2
, get_transactional_coordinator/2
, get_leader_connection/3
, get_bootstrap/1
, get_metadata/2
, get_metadata_safe/2
, get_partitions_count/2
Expand Down Expand Up @@ -227,6 +228,10 @@ stop_consumer(Client, TopicName) ->
get_leader_connection(Client, Topic, Partition) ->
safe_gen_call(Client, {get_leader_connection, Topic, Partition}, infinity).

-spec get_bootstrap(client()) -> {ok, brod:bootstrap()} | {error, any()}.
get_bootstrap(Client) ->
safe_gen_call(Client, get_bootstrap, infinity).

%% @doc Get connection to a kafka broker.
%%
%% Return already established connection towards the broker,
Expand Down Expand Up @@ -388,6 +393,10 @@ handle_call({stop_consumer, Topic}, _From, State) ->
handle_call({get_leader_connection, Topic, Partition}, _From, State) ->
{Result, NewState} = do_get_leader_connection(State, Topic, Partition),
{reply, Result, NewState};
handle_call(get_bootstrap, _From, State) ->
#{bootstrap_endpoints := Endpoints} = State,
ConnConfig = conn_config(State),
{reply, {ok, {Endpoints, ConnConfig}}, State};
handle_call({get_connection, Host, Port}, _From, State) ->
{Result, NewState} = maybe_connect(State, {Host, Port}),
{reply, Result, NewState};
Expand Down
104 changes: 76 additions & 28 deletions src/brod_consumer.erl
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
%%% Copyright (c) 2014-2021 Klarna Bank AB (publ)
%%% Copyright (c) 2022-2024 kafka4beam contributors
%%%
%%% Licensed under the Apache License, Version 2.0 (the "License");
%%% you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -93,7 +94,10 @@
-type pending_acks() :: #pending_acks{}.
-type isolation_level() :: kpro:isolation_level().

-record(state, { bootstrap :: pid() | brod:bootstrap()
-define(GET_FROM_CLIENT, get).
-define(IGNORE, ignore).
-record(state, { client_pid :: ?IGNORE | pid()
, bootstrap :: ?IGNORE | ?GET_FROM_CLIENT | brod:bootstrap()
, connection :: ?undef | pid()
, topic :: binary()
, partition :: integer()
Expand Down Expand Up @@ -136,6 +140,7 @@
-define(INIT_CONNECTION, init_connection).
-define(DEFAULT_AVG_WINDOW, 5).
-define(DEFAULT_ISOLATION_LEVEL, ?kpro_read_committed).
-define(DEFAULT_SHARE_LEADER_CONN, false).

%%%_* APIs =====================================================================
%% @equiv start_link(ClientPid, Topic, Partition, Config, [])
Expand Down Expand Up @@ -220,6 +225,16 @@ start_link(Bootstrap, Topic, Partition, Config) ->
%% and `read_committed' to get only the records from committed
%% transactions</li>
%%
%% <li>`share_leader_conn': (optional, default = `false')
%%
%% Whether or not share the partition leader connection with
%% other producers or consumers.
%% Set to `true' to consume less TCP connections towards Kafka,
%% but may lead to higher fetch latency. This is because Kafka can
%% ony accumulate messages for the oldest fetch request, later
%% requests behind it may get blocked until `max_wait_time' expires
%% for the oldest one</li>
%%
%% </ul>
%% @end
-spec start_link(pid() | brod:bootstrap(),
Expand Down Expand Up @@ -286,7 +301,7 @@ get_connection(Pid) ->

%%%_* gen_server callbacks =====================================================

init({Bootstrap, Topic, Partition, Config}) ->
init({Bootstrap0, Topic, Partition, Config}) ->
erlang:process_flag(trap_exit, true),
Cfg = fun(Name, Default) ->
proplists:get_value(Name, Config, Default)
Expand All @@ -300,15 +315,33 @@ init({Bootstrap, Topic, Partition, Config}) ->
BeginOffset = Cfg(begin_offset, ?DEFAULT_BEGIN_OFFSET),
OffsetResetPolicy = Cfg(offset_reset_policy, ?DEFAULT_OFFSET_RESET_POLICY),
IsolationLevel = Cfg(isolation_level, ?DEFAULT_ISOLATION_LEVEL),

%% If bootstrap is a client pid, register self to the client
case is_shared_conn(Bootstrap) of
IsShareConn = Cfg(share_leader_conn, ?DEFAULT_SHARE_LEADER_CONN),

%% resolve connection bootstrap args
{ClientPid, Bootstrap} =
case is_pid(Bootstrap0) of
true when IsShareConn ->
%% share leader connection with other producers/consumers
%% the connection is to be managed by brod_client
{Bootstrap0, ?IGNORE};
true ->
%% not sharing leader connection with other producers/consumers
%% the bootstrap args will be resolved later when it's
%% time to establish a connection to partition leader
{Bootstrap0, ?GET_FROM_CLIENT};
false ->
%% this consumer process is not started from `brod' APIs
%% maybe managed by other supervisors.
{?IGNORE, Bootstrap0}
end,
case is_pid(ClientPid) of
true ->
ok = brod_client:register_consumer(Bootstrap, Topic, Partition);
false ->
ok
end,
{ok, #state{ bootstrap = Bootstrap
{ok, #state{ client_pid = ClientPid
, bootstrap = Bootstrap
, topic = Topic
, partition = Partition
, begin_offset = BeginOffset
Expand Down Expand Up @@ -418,20 +451,26 @@ handle_cast(Cast, State) ->
{noreply, State}.

%% @private
terminate(Reason, #state{ bootstrap = Bootstrap
terminate(Reason, #state{ client_pid = ClientPid
, topic = Topic
, partition = Partition
, connection = Connection
, connection_mref = Mref
}) ->
IsShared = is_shared_conn(Bootstrap),
IsNormal = brod_utils:is_normal_reason(Reason),
%% deregister consumer if it's shared connection and normal shutdown
IsShared andalso IsNormal andalso
brod_client:deregister_consumer(Bootstrap, Topic, Partition),
%% close connection if it's working standalone
case not IsShared andalso is_pid(Connection) of
true -> kpro:close_connection(Connection);
false -> ok
case is_pid(ClientPid) andalso IsNormal of
true ->
brod_client:deregister_consumer(ClientPid, Topic, Partition);
false ->
ok
end,
%% close connection if it's owned by this consumer
case Mref =:= ?undef andalso is_pid(Connection) andalso is_process_alive(Connection) of
true ->
kpro:close_connection(Connection);
false ->
ok
end,
%% write a log if it's not a normal reason
IsNormal orelse ?BROD_LOG_ERROR("Consumer ~s-~w terminate reason: ~p",
Expand Down Expand Up @@ -858,17 +897,19 @@ safe_gen_call(Server, Call, Timeout) ->
-spec maybe_init_connection(state()) ->
{ok, state()} | {{error, any()}, state()}.
maybe_init_connection(
#state{ bootstrap = Bootstrap
#state{ client_pid = ClientPid
, bootstrap = Bootstrap
, topic = Topic
, partition = Partition
, connection = ?undef
} = State0) ->
%% Lookup, or maybe (re-)establish a connection to partition leader
case connect_leader(Bootstrap, Topic, Partition) of
{MonitorOrLink, Result} = connect_leader(ClientPid, Bootstrap, Topic, Partition),
case Result of
{ok, Connection} ->
Mref = case is_shared_conn(Bootstrap) of
true -> erlang:monitor(process, Connection);
false -> ?undef %% linked
Mref = case MonitorOrLink of
monitor -> erlang:monitor(process, Connection);
linked -> ?undef
end,
%% Switching to a new connection
%% the response for last_req_ref will be lost forever
Expand All @@ -883,13 +924,23 @@ maybe_init_connection(
maybe_init_connection(State) ->
{ok, State}.

connect_leader(ClientPid, Topic, Partition) when is_pid(ClientPid) ->
brod_client:get_leader_connection(ClientPid, Topic, Partition);
connect_leader(Endpoints, Topic, Partition) when is_list(Endpoints) ->
connect_leader({Endpoints, []}, Topic, Partition);
connect_leader({Endpoints, ConnCfg}, Topic, Partition) ->
connect_leader(ClientPid, ?IGNORE, Topic, Partition) when is_pid(ClientPid) ->
{monitor, brod_client:get_leader_connection(ClientPid, Topic, Partition)};
connect_leader(ClientPid, ?GET_FROM_CLIENT, Topic, Partition) when is_pid(ClientPid) ->
case brod_client:get_bootstrap(ClientPid) of
{ok, Bootstrap} ->
link_connect_leader(Bootstrap, Topic, Partition);
{error, Reason} ->
{linked, {error, Reason}}
end;
connect_leader(?IGNORE, Bootstrap, Topic, Partition) ->
link_connect_leader(Bootstrap, Topic, Partition).

link_connect_leader(Endpoints, Topic, Partition) when is_list(Endpoints) ->
link_connect_leader({Endpoints, []}, Topic, Partition);
link_connect_leader({Endpoints, ConnCfg}, Topic, Partition) ->
%% connection pid is linked to self()
kpro:connect_partition_leader(Endpoints, ConnCfg, Topic, Partition).
{linked, kpro:connect_partition_leader(Endpoints, ConnCfg, Topic, Partition)}.

%% Send a ?INIT_CONNECTION delayed loopback message to re-init.
-spec maybe_send_init_connection(state()) -> ok.
Expand All @@ -900,9 +951,6 @@ maybe_send_init_connection(#state{subscriber = Subscriber}) ->
erlang:send_after(Timeout, self(), ?INIT_CONNECTION),
ok.

%% In case 'bootstrap' is a client pid, connection is shared with other workers.
is_shared_conn(Bootstrap) -> is_pid(Bootstrap).

%%%_* Tests ====================================================================

-ifdef(TEST).
Expand Down

0 comments on commit c186c86

Please sign in to comment.