Skip to content

Commit

Permalink
Merge pull request #9164 from rabbitmq/mergify/bp/v3.11.x/pr-9161
Browse files Browse the repository at this point in the history
QQ: fix bug when subscribing using an already existing consumer tag (backport #9158) (backport #9161)
  • Loading branch information
michaelklishin authored Aug 25, 2023
2 parents e21deeb + c3c24e6 commit e57661c
Show file tree
Hide file tree
Showing 4 changed files with 48 additions and 9 deletions.
1 change: 1 addition & 0 deletions deps/rabbit/src/rabbit_fifo.erl
Original file line number Diff line number Diff line change
Expand Up @@ -407,6 +407,7 @@ apply(Meta, #checkout{spec = Spec, meta = ConsumerMeta,
credit = Credit,
delivery_count = DeliveryCount,
next_msg_id = NextMsgId} = Consumer,

%% reply with a consumer summary
Reply = {ok, #{next_msg_id => NextMsgId,
credit => Credit,
Expand Down
24 changes: 20 additions & 4 deletions deps/rabbit/src/rabbit_fifo_client.erl
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@

-type cluster_name() :: rabbit_types:r(queue).

-record(consumer, {last_msg_id :: seq() | -1,
-record(consumer, {last_msg_id :: seq() | -1 | undefined,
ack = false :: boolean(),
delivery_count = 0 :: non_neg_integer()}).

Expand Down Expand Up @@ -393,8 +393,20 @@ checkout(ConsumerTag, NumUnsettled, CreditMode, Meta,
%% this is the pre 3.11.1 / 3.10.9
%% reply format
-1;
{ok, #{next_msg_id := NextMsgId}} ->
NextMsgId - 1
{ok, #{num_checked_out := NumChecked,
next_msg_id := NextMsgId}} ->
case NumChecked > 0 of
true ->
%% we cannot know if the pending messages
%% have been delivered to the client or they
%% are on their way to the current process.
%% We set `undefined' to signal this uncertainty
%% and will just accept the next arriving message
%% irrespective of message id
undefined;
false ->
NextMsgId - 1
end
end,
SDels = maps:update_with(
ConsumerTag, fun (C) -> C#consumer{ack = Ack} end,
Expand Down Expand Up @@ -749,7 +761,11 @@ handle_delivery(Leader, {delivery, Tag, [{FstId, _} | _] = IdMsgs},
%% TODO: remove potential default allocation
case Consumer of
#consumer{last_msg_id = Prev} = C
when FstId =:= Prev+1 ->
when Prev =:= undefined orelse FstId =:= Prev+1 ->
%% Prev =:= undefined is a special case where a checkout was done
%% for a previously cancelled consumer that still had pending messages
%% In this case we can't reliably know what the next expected message
%% id should be so have to accept whatever message comes next
maybe_auto_ack(Ack, Del,
State0#state{consumer_deliveries =
update_consumer(Tag, LastId,
Expand Down
12 changes: 7 additions & 5 deletions deps/rabbit/test/quorum_queue_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -1695,7 +1695,7 @@ confirm_availability_on_leader_change(Config) ->

flush(T) ->
receive X ->
ct:pal("flushed ~w", [X]),
ct:pal("flushed ~p", [X]),
flush(T)
after T ->
ok
Expand Down Expand Up @@ -2987,12 +2987,13 @@ cancel_and_consume_with_same_tag(Config) ->
DeclareRslt0 = declare(Ch, QQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}]),
?assertMatch(ExpectedDeclareRslt0, DeclareRslt0),

ok = publish(Ch, QQ),
ok = publish(Ch, QQ, <<"msg1">>),

ok = subscribe(Ch, QQ, false),

DeliveryTag = receive
{#'basic.deliver'{delivery_tag = D}, _} ->
{#'basic.deliver'{delivery_tag = D},
#amqp_msg{payload = <<"msg1">>}} ->
D
after 5000 ->
flush(100),
Expand All @@ -3003,10 +3004,11 @@ cancel_and_consume_with_same_tag(Config) ->

ok = subscribe(Ch, QQ, false),

ok = publish(Ch, QQ),
ok = publish(Ch, QQ, <<"msg2">>),

receive
{#'basic.deliver'{delivery_tag = _}, _} ->
{#'basic.deliver'{delivery_tag = _},
#amqp_msg{payload = <<"msg2">>}} ->
ok
after 5000 ->
flush(100),
Expand Down
20 changes: 20 additions & 0 deletions deps/rabbit/test/rabbit_fifo_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -1969,6 +1969,26 @@ header_test(_) ->
?assertEqual(undefined, rabbit_fifo:get_header(blah, H5)),
ok.

checkout_metadata_test(Config) ->
Cid = {<<"cid">>, self()},
{State00, _} = enq(Config, 1, 1, first, test_init(test)),
{State0, _} = enq(Config, 2, 2, second, State00),
%% NB: the consumer meta data is taken _before_ it runs a checkout
%% so in this case num_checked_out will be 0
{State1, {ok, #{next_msg_id := 0,
num_checked_out := 0}}, _} =
apply(meta(Config, ?LINE),
rabbit_fifo:make_checkout(Cid, {auto, 1, simple_prefetch}, #{}),
State0),
{State2, _, _} = apply(meta(Config, ?LINE),
rabbit_fifo:make_checkout(Cid, cancel, #{}), State1),
{_State3, {ok, #{next_msg_id := 1,
num_checked_out := 1}}, _} =
apply(meta(Config, ?LINE),
rabbit_fifo:make_checkout(Cid, {auto, 1, simple_prefetch}, #{}),
State2),
ok.

%% Utility

init(Conf) -> rabbit_fifo:init(Conf).
Expand Down

0 comments on commit e57661c

Please sign in to comment.