Skip to content

Commit

Permalink
Use ra_leaderboard when initialising quorum queue client.
Browse files Browse the repository at this point in the history
If available failure the leader in ra_leaderboard over the one
in the queue record as with mnesia the queue record can be
very stale if a node has a network issue that hasn't yet been
detected by the runtime.

Also change some legacy naming in rabbit_fifo_client
  • Loading branch information
kjnilsson committed Aug 31, 2023
1 parent 0b92354 commit 9e18c6c
Show file tree
Hide file tree
Showing 2 changed files with 28 additions and 22 deletions.
36 changes: 18 additions & 18 deletions deps/rabbit/src/rabbit_fifo_client.erl
Original file line number Diff line number Diff line change
Expand Up @@ -147,10 +147,10 @@ enqueue(QName, Correlation, Msg,
next_seq = Seq,
next_enqueue_seq = EnqueueSeq,
cfg = #cfg{soft_limit = SftLmt}} = State0) ->
Server = pick_server(State0),
ServerId = pick_server(State0),
% by default there is no correlation id
Cmd = rabbit_fifo:make_enqueue(self(), EnqueueSeq, Msg),
ok = ra:pipeline_command(Server, Cmd, Seq, low),
ok = ra:pipeline_command(ServerId, Cmd, Seq, low),
Tag = case map_size(Pending) >= SftLmt of
true -> slow;
false -> ok
Expand Down Expand Up @@ -199,9 +199,9 @@ enqueue(QName, Msg, State) ->
| {empty, state()} | {error | timeout, term()}.
dequeue(QueueName, ConsumerTag, Settlement,
#state{cfg = #cfg{timeout = Timeout}} = State0) ->
Node = pick_server(State0),
ServerId = pick_server(State0),
ConsumerId = consumer_id(ConsumerTag),
case ra:process_command(Node,
case ra:process_command(ServerId,
rabbit_fifo:make_checkout(ConsumerId,
{dequeue, Settlement},
#{}),
Expand Down Expand Up @@ -242,9 +242,9 @@ add_delivery_count_header(Msg, Count) ->
-spec settle(rabbit_fifo:consumer_tag(), [rabbit_fifo:msg_id()], state()) ->
{state(), list()}.
settle(ConsumerTag, [_|_] = MsgIds, #state{slow = false} = State0) ->
Node = pick_server(State0),
ServerId = pick_server(State0),
Cmd = rabbit_fifo:make_settle(consumer_id(ConsumerTag), MsgIds),
{send_command(Node, undefined, Cmd, normal, State0), []};
{send_command(ServerId, undefined, Cmd, normal, State0), []};
settle(ConsumerTag, [_|_] = MsgIds,
#state{unsent_commands = Unsent0} = State0) ->
ConsumerId = consumer_id(ConsumerTag),
Expand All @@ -270,10 +270,10 @@ settle(ConsumerTag, [_|_] = MsgIds,
-spec return(rabbit_fifo:consumer_tag(), [rabbit_fifo:msg_id()], state()) ->
{state(), list()}.
return(ConsumerTag, [_|_] = MsgIds, #state{slow = false} = State0) ->
Node = pick_server(State0),
ServerId = pick_server(State0),
% TODO: make rabbit_fifo return support lists of message ids
Cmd = rabbit_fifo:make_return(consumer_id(ConsumerTag), MsgIds),
{send_command(Node, undefined, Cmd, normal, State0), []};
{send_command(ServerId, undefined, Cmd, normal, State0), []};
return(ConsumerTag, [_|_] = MsgIds,
#state{unsent_commands = Unsent0} = State0) ->
ConsumerId = consumer_id(ConsumerTag),
Expand All @@ -295,9 +295,9 @@ return(ConsumerTag, [_|_] = MsgIds,
-spec discard(rabbit_fifo:consumer_tag(), [rabbit_fifo:msg_id()], state()) ->
{state(), list()}.
discard(ConsumerTag, [_|_] = MsgIds, #state{slow = false} = State0) ->
Node = pick_server(State0),
ServerId = pick_server(State0),
Cmd = rabbit_fifo:make_discard(consumer_id(ConsumerTag), MsgIds),
{send_command(Node, undefined, Cmd, normal, State0), []};
{send_command(ServerId, undefined, Cmd, normal, State0), []};
discard(ConsumerTag, [_|_] = MsgIds,
#state{unsent_commands = Unsent0} = State0) ->
ConsumerId = consumer_id(ConsumerTag),
Expand Down Expand Up @@ -404,10 +404,10 @@ credit(ConsumerTag, Credit, Drain,
%% the last received msgid provides us with the delivery count if we
%% add one as it is 0 indexed
C = maps:get(ConsumerTag, CDels, #consumer{last_msg_id = -1}),
Node = pick_server(State0),
ServerId = pick_server(State0),
Cmd = rabbit_fifo:make_credit(ConsumerId, Credit,
C#consumer.last_msg_id + 1, Drain),
{send_command(Node, undefined, Cmd, normal, State0), []}.
{send_command(ServerId, undefined, Cmd, normal, State0), []}.

%% @doc Cancels a checkout with the rabbit_fifo queue for the consumer tag
%%
Expand Down Expand Up @@ -560,10 +560,10 @@ handle_ra_event(QName, From, {applied, Seqs},
add_command(Cid, discard,
Discards, Acc)))
end, [], State1#state.unsent_commands),
Node = pick_server(State2),
ServerId = pick_server(State2),
%% send all the settlements and returns
State = lists:foldl(fun (C, S0) ->
send_command(Node, undefined, C,
send_command(ServerId, undefined, C,
normal, S0)
end, State2, Commands),
{ok, State, [{unblock, cluster_name(State)} | Actions]};
Expand Down Expand Up @@ -624,9 +624,9 @@ handle_ra_event(_QName, _Leader, {machine, eol}, State) ->
%% @returns `ok'
-spec untracked_enqueue([ra:server_id()], term()) ->
ok.
untracked_enqueue([Node | _], Msg) ->
untracked_enqueue([ServerId | _], Msg) ->
Cmd = rabbit_fifo:make_enqueue(undefined, undefined, Msg),
ok = ra:pipeline_command(Node, Cmd),
ok = ra:pipeline_command(ServerId, Cmd),
ok.

%% Internal
Expand Down Expand Up @@ -856,10 +856,10 @@ send_command(Server, Correlation, Command, Priority,
next_seq = Seq + 1,
slow = Tag == slow}.

resend_command(Node, Correlation, Command,
resend_command(ServerId, Correlation, Command,
#state{pending = Pending,
next_seq = Seq} = State) ->
ok = ra:pipeline_command(Node, Command, Seq),
ok = ra:pipeline_command(ServerId, Command, Seq),
State#state{pending = Pending#{Seq => {Correlation, Command}},
next_seq = Seq + 1}.

Expand Down
14 changes: 10 additions & 4 deletions deps/rabbit/src/rabbit_quorum_queue.erl
Original file line number Diff line number Diff line change
Expand Up @@ -161,11 +161,17 @@ is_compatible(_, _, _) ->
-spec init(amqqueue:amqqueue()) -> {ok, rabbit_fifo_client:state()}.
init(Q) when ?is_amqqueue(Q) ->
{ok, SoftLimit} = application:get_env(rabbit, quorum_commands_soft_limit),
%% This lookup could potentially return an {error, not_found}, but we do not
%% know what to do if the queue has `disappeared`. Let it crash.
{Name, _LeaderNode} = Leader = amqqueue:get_pid(Q),
{Name, _} = MaybeLeader = amqqueue:get_pid(Q),
Leader = case ra_leaderboard:lookup_leader(Name) of
undefined ->
%% leader from queue record will have to suffice
MaybeLeader;
LikelyLeader ->
LikelyLeader
end,
Nodes = get_nodes(Q),
%% Ensure the leader is listed first
%% Ensure the leader is listed first to increase likelihood of first
%% server tried is the one we want
Servers0 = [{Name, N} || N <- Nodes],
Servers = [Leader | lists:delete(Leader, Servers0)],
{ok, rabbit_fifo_client:init(Servers, SoftLimit)}.
Expand Down

0 comments on commit 9e18c6c

Please sign in to comment.