Skip to content

Commit

Permalink
QQ: fix bug when subscribing using an already existing consumer tag
Browse files Browse the repository at this point in the history
When subscribing using a consumer tag that is already in the quorum
queues state (but perhaps with a cancelled status) and that has
pending messages the next_msg_id which is used to initialise the
queue type consumer state did not take the in flight message ids into
account. This resulted in some messages occasionally not being delivered
to the clint and thus would appear stuck as awaiting acknowledgement
for the consumer.

NB: this change changes the quorum queue state machine without incrementing
the machine version. This is ok as the change only affects the meta
data returned from the checkout command, not the actual state of the
state machine itself.

NB: really the ideal solution would be to make checkout operations
async so that any inflight messages are delivered before the checkout
result. That is a much bigger change for another day.
  • Loading branch information
kjnilsson committed Aug 23, 2023
1 parent eeaca1f commit 457395e
Show file tree
Hide file tree
Showing 2 changed files with 30 additions and 1 deletion.
9 changes: 8 additions & 1 deletion deps/rabbit/src/rabbit_fifo.erl
Original file line number Diff line number Diff line change
Expand Up @@ -407,7 +407,14 @@ apply(Meta, #checkout{spec = Spec, meta = ConsumerMeta,
#consumer{checked_out = Checked,
credit = Credit,
delivery_count = DeliveryCount,
next_msg_id = NextMsgId} = Consumer,
next_msg_id = NextMsgId0} = Consumer,
NextMsgId = case map_size(Checked) of
0 ->
NextMsgId0;
_ ->
lists:min(maps:keys(Checked))
end,

%% reply with a consumer summary
Reply = {ok, #{next_msg_id => NextMsgId,
credit => Credit,
Expand Down
22 changes: 22 additions & 0 deletions deps/rabbit/test/rabbit_fifo_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -1990,6 +1990,28 @@ chunk_disk_msgs_test(_Config) ->
rabbit_fifo:chunk_disk_msgs(TwoBigMsgs, 0, [[]])),
ok.

checkout_metadata_test(Config) ->
Cid = {<<"cid">>, self()},
{State00, _} = enq(Config, 1, 1, first, test_init(test)),
{State0, _} = enq(Config, 2, 2, second, State00),
%% NB: the consumer meta data is taken _before_ it runs a checkout
%% so in this case num_checked_out will be 0
{State1, {ok, #{next_msg_id := 0,
num_checked_out := 0}}, _} =
apply(meta(Config, ?LINE),
rabbit_fifo:make_checkout(Cid, {auto, 1, simple_prefetch}, #{}),
State0),
{State2, _, _} = apply(meta(Config, ?LINE),
rabbit_fifo:make_checkout(Cid, cancel, #{}), State1),
%% check out again with pending messages should set the next message id
%% to the lowest in-flight message id
{_State3, {ok, #{next_msg_id := 0,
num_checked_out := 1}}, _} =
apply(meta(Config, ?LINE),
rabbit_fifo:make_checkout(Cid, {auto, 1, simple_prefetch}, #{}),
State2),
ok.

%% Utility

init(Conf) -> rabbit_fifo:init(Conf).
Expand Down

0 comments on commit 457395e

Please sign in to comment.