From 9e18c6c817fee5a3f2b584bedd3bac1d4fb305c8 Mon Sep 17 00:00:00 2001 From: Karl Nilsson Date: Thu, 31 Aug 2023 17:11:44 +0100 Subject: [PATCH] Use ra_leaderboard when initialising quorum queue client. If available failure the leader in ra_leaderboard over the one in the queue record as with mnesia the queue record can be very stale if a node has a network issue that hasn't yet been detected by the runtime. Also change some legacy naming in rabbit_fifo_client --- deps/rabbit/src/rabbit_fifo_client.erl | 36 ++++++++++++------------- deps/rabbit/src/rabbit_quorum_queue.erl | 14 +++++++--- 2 files changed, 28 insertions(+), 22 deletions(-) diff --git a/deps/rabbit/src/rabbit_fifo_client.erl b/deps/rabbit/src/rabbit_fifo_client.erl index 280da92991db..8e19e6a29303 100644 --- a/deps/rabbit/src/rabbit_fifo_client.erl +++ b/deps/rabbit/src/rabbit_fifo_client.erl @@ -147,10 +147,10 @@ enqueue(QName, Correlation, Msg, next_seq = Seq, next_enqueue_seq = EnqueueSeq, cfg = #cfg{soft_limit = SftLmt}} = State0) -> - Server = pick_server(State0), + ServerId = pick_server(State0), % by default there is no correlation id Cmd = rabbit_fifo:make_enqueue(self(), EnqueueSeq, Msg), - ok = ra:pipeline_command(Server, Cmd, Seq, low), + ok = ra:pipeline_command(ServerId, Cmd, Seq, low), Tag = case map_size(Pending) >= SftLmt of true -> slow; false -> ok @@ -199,9 +199,9 @@ enqueue(QName, Msg, State) -> | {empty, state()} | {error | timeout, term()}. dequeue(QueueName, ConsumerTag, Settlement, #state{cfg = #cfg{timeout = Timeout}} = State0) -> - Node = pick_server(State0), + ServerId = pick_server(State0), ConsumerId = consumer_id(ConsumerTag), - case ra:process_command(Node, + case ra:process_command(ServerId, rabbit_fifo:make_checkout(ConsumerId, {dequeue, Settlement}, #{}), @@ -242,9 +242,9 @@ add_delivery_count_header(Msg, Count) -> -spec settle(rabbit_fifo:consumer_tag(), [rabbit_fifo:msg_id()], state()) -> {state(), list()}. settle(ConsumerTag, [_|_] = MsgIds, #state{slow = false} = State0) -> - Node = pick_server(State0), + ServerId = pick_server(State0), Cmd = rabbit_fifo:make_settle(consumer_id(ConsumerTag), MsgIds), - {send_command(Node, undefined, Cmd, normal, State0), []}; + {send_command(ServerId, undefined, Cmd, normal, State0), []}; settle(ConsumerTag, [_|_] = MsgIds, #state{unsent_commands = Unsent0} = State0) -> ConsumerId = consumer_id(ConsumerTag), @@ -270,10 +270,10 @@ settle(ConsumerTag, [_|_] = MsgIds, -spec return(rabbit_fifo:consumer_tag(), [rabbit_fifo:msg_id()], state()) -> {state(), list()}. return(ConsumerTag, [_|_] = MsgIds, #state{slow = false} = State0) -> - Node = pick_server(State0), + ServerId = pick_server(State0), % TODO: make rabbit_fifo return support lists of message ids Cmd = rabbit_fifo:make_return(consumer_id(ConsumerTag), MsgIds), - {send_command(Node, undefined, Cmd, normal, State0), []}; + {send_command(ServerId, undefined, Cmd, normal, State0), []}; return(ConsumerTag, [_|_] = MsgIds, #state{unsent_commands = Unsent0} = State0) -> ConsumerId = consumer_id(ConsumerTag), @@ -295,9 +295,9 @@ return(ConsumerTag, [_|_] = MsgIds, -spec discard(rabbit_fifo:consumer_tag(), [rabbit_fifo:msg_id()], state()) -> {state(), list()}. discard(ConsumerTag, [_|_] = MsgIds, #state{slow = false} = State0) -> - Node = pick_server(State0), + ServerId = pick_server(State0), Cmd = rabbit_fifo:make_discard(consumer_id(ConsumerTag), MsgIds), - {send_command(Node, undefined, Cmd, normal, State0), []}; + {send_command(ServerId, undefined, Cmd, normal, State0), []}; discard(ConsumerTag, [_|_] = MsgIds, #state{unsent_commands = Unsent0} = State0) -> ConsumerId = consumer_id(ConsumerTag), @@ -404,10 +404,10 @@ credit(ConsumerTag, Credit, Drain, %% the last received msgid provides us with the delivery count if we %% add one as it is 0 indexed C = maps:get(ConsumerTag, CDels, #consumer{last_msg_id = -1}), - Node = pick_server(State0), + ServerId = pick_server(State0), Cmd = rabbit_fifo:make_credit(ConsumerId, Credit, C#consumer.last_msg_id + 1, Drain), - {send_command(Node, undefined, Cmd, normal, State0), []}. + {send_command(ServerId, undefined, Cmd, normal, State0), []}. %% @doc Cancels a checkout with the rabbit_fifo queue for the consumer tag %% @@ -560,10 +560,10 @@ handle_ra_event(QName, From, {applied, Seqs}, add_command(Cid, discard, Discards, Acc))) end, [], State1#state.unsent_commands), - Node = pick_server(State2), + ServerId = pick_server(State2), %% send all the settlements and returns State = lists:foldl(fun (C, S0) -> - send_command(Node, undefined, C, + send_command(ServerId, undefined, C, normal, S0) end, State2, Commands), {ok, State, [{unblock, cluster_name(State)} | Actions]}; @@ -624,9 +624,9 @@ handle_ra_event(_QName, _Leader, {machine, eol}, State) -> %% @returns `ok' -spec untracked_enqueue([ra:server_id()], term()) -> ok. -untracked_enqueue([Node | _], Msg) -> +untracked_enqueue([ServerId | _], Msg) -> Cmd = rabbit_fifo:make_enqueue(undefined, undefined, Msg), - ok = ra:pipeline_command(Node, Cmd), + ok = ra:pipeline_command(ServerId, Cmd), ok. %% Internal @@ -856,10 +856,10 @@ send_command(Server, Correlation, Command, Priority, next_seq = Seq + 1, slow = Tag == slow}. -resend_command(Node, Correlation, Command, +resend_command(ServerId, Correlation, Command, #state{pending = Pending, next_seq = Seq} = State) -> - ok = ra:pipeline_command(Node, Command, Seq), + ok = ra:pipeline_command(ServerId, Command, Seq), State#state{pending = Pending#{Seq => {Correlation, Command}}, next_seq = Seq + 1}. diff --git a/deps/rabbit/src/rabbit_quorum_queue.erl b/deps/rabbit/src/rabbit_quorum_queue.erl index 87c2ca75c47b..6ebd3ec6cc55 100644 --- a/deps/rabbit/src/rabbit_quorum_queue.erl +++ b/deps/rabbit/src/rabbit_quorum_queue.erl @@ -161,11 +161,17 @@ is_compatible(_, _, _) -> -spec init(amqqueue:amqqueue()) -> {ok, rabbit_fifo_client:state()}. init(Q) when ?is_amqqueue(Q) -> {ok, SoftLimit} = application:get_env(rabbit, quorum_commands_soft_limit), - %% This lookup could potentially return an {error, not_found}, but we do not - %% know what to do if the queue has `disappeared`. Let it crash. - {Name, _LeaderNode} = Leader = amqqueue:get_pid(Q), + {Name, _} = MaybeLeader = amqqueue:get_pid(Q), + Leader = case ra_leaderboard:lookup_leader(Name) of + undefined -> + %% leader from queue record will have to suffice + MaybeLeader; + LikelyLeader -> + LikelyLeader + end, Nodes = get_nodes(Q), - %% Ensure the leader is listed first + %% Ensure the leader is listed first to increase likelihood of first + %% server tried is the one we want Servers0 = [{Name, N} || N <- Nodes], Servers = [Leader | lists:delete(Leader, Servers0)], {ok, rabbit_fifo_client:init(Servers, SoftLimit)}.