From 457395e814ebcf0cd07ca451487bb232204ba55f Mon Sep 17 00:00:00 2001 From: Karl Nilsson Date: Wed, 23 Aug 2023 10:56:30 +0100 Subject: [PATCH] QQ: fix bug when subscribing using an already existing consumer tag When subscribing using a consumer tag that is already in the quorum queues state (but perhaps with a cancelled status) and that has pending messages the next_msg_id which is used to initialise the queue type consumer state did not take the in flight message ids into account. This resulted in some messages occasionally not being delivered to the clint and thus would appear stuck as awaiting acknowledgement for the consumer. NB: this change changes the quorum queue state machine without incrementing the machine version. This is ok as the change only affects the meta data returned from the checkout command, not the actual state of the state machine itself. NB: really the ideal solution would be to make checkout operations async so that any inflight messages are delivered before the checkout result. That is a much bigger change for another day. --- deps/rabbit/src/rabbit_fifo.erl | 9 ++++++++- deps/rabbit/test/rabbit_fifo_SUITE.erl | 22 ++++++++++++++++++++++ 2 files changed, 30 insertions(+), 1 deletion(-) diff --git a/deps/rabbit/src/rabbit_fifo.erl b/deps/rabbit/src/rabbit_fifo.erl index f9a7f842d689..941fab92afc9 100644 --- a/deps/rabbit/src/rabbit_fifo.erl +++ b/deps/rabbit/src/rabbit_fifo.erl @@ -407,7 +407,14 @@ apply(Meta, #checkout{spec = Spec, meta = ConsumerMeta, #consumer{checked_out = Checked, credit = Credit, delivery_count = DeliveryCount, - next_msg_id = NextMsgId} = Consumer, + next_msg_id = NextMsgId0} = Consumer, + NextMsgId = case map_size(Checked) of + 0 -> + NextMsgId0; + _ -> + lists:min(maps:keys(Checked)) + end, + %% reply with a consumer summary Reply = {ok, #{next_msg_id => NextMsgId, credit => Credit, diff --git a/deps/rabbit/test/rabbit_fifo_SUITE.erl b/deps/rabbit/test/rabbit_fifo_SUITE.erl index aaf82be3de79..d3493e9e2ce2 100644 --- a/deps/rabbit/test/rabbit_fifo_SUITE.erl +++ b/deps/rabbit/test/rabbit_fifo_SUITE.erl @@ -1990,6 +1990,28 @@ chunk_disk_msgs_test(_Config) -> rabbit_fifo:chunk_disk_msgs(TwoBigMsgs, 0, [[]])), ok. +checkout_metadata_test(Config) -> + Cid = {<<"cid">>, self()}, + {State00, _} = enq(Config, 1, 1, first, test_init(test)), + {State0, _} = enq(Config, 2, 2, second, State00), + %% NB: the consumer meta data is taken _before_ it runs a checkout + %% so in this case num_checked_out will be 0 + {State1, {ok, #{next_msg_id := 0, + num_checked_out := 0}}, _} = + apply(meta(Config, ?LINE), + rabbit_fifo:make_checkout(Cid, {auto, 1, simple_prefetch}, #{}), + State0), + {State2, _, _} = apply(meta(Config, ?LINE), + rabbit_fifo:make_checkout(Cid, cancel, #{}), State1), + %% check out again with pending messages should set the next message id + %% to the lowest in-flight message id + {_State3, {ok, #{next_msg_id := 0, + num_checked_out := 1}}, _} = + apply(meta(Config, ?LINE), + rabbit_fifo:make_checkout(Cid, {auto, 1, simple_prefetch}, #{}), + State2), + ok. + %% Utility init(Conf) -> rabbit_fifo:init(Conf).