From cfb6ce681ac6984d4cc9569c03d9597fe89a82e5 Mon Sep 17 00:00:00 2001 From: Karl Nilsson Date: Wed, 23 Aug 2023 10:56:30 +0100 Subject: [PATCH 1/2] QQ: fix bug when subscribing using an already existing consumer tag When subscribing using a consumer tag that is already in the quorum queues state (but perhaps with a cancelled status) and that has pending messages the next_msg_id which is used to initialise the queue type consumer state did not take the in flight message ids into account. This resulted in some messages occasionally not being delivered to the clint and thus would appear stuck as awaiting acknowledgement for the consumer. When a new checkout operation detects there are in-flight messages we set the last_msg_id to `undefined` and just accept the next message that arrives, irrespective of their message id. This isn't 100% fool proof as there may be cases where messages are lost between queue and channel where we'd miss to trigger the fallback query for missing messages. It is however much better than what we have atm. NB: really the ideal solution would be to make checkout operations async so that any inflight messages are delivered before the checkout result. That is a much bigger change for another day. (cherry picked from commit 49108a69cd1357751fd60f83482fd8d590333ded) (cherry picked from commit 039088630526231d468dbedac77adbb8118fcee3) # Conflicts: # deps/rabbit/src/rabbit_fifo_client.erl # deps/rabbit/test/rabbit_fifo_SUITE.erl --- deps/rabbit/src/rabbit_fifo.erl | 1 + deps/rabbit/src/rabbit_fifo_client.erl | 26 +++++++++++++-- deps/rabbit/test/quorum_queue_SUITE.erl | 12 ++++--- deps/rabbit/test/rabbit_fifo_SUITE.erl | 44 +++++++++++++++++++++++++ 4 files changed, 75 insertions(+), 8 deletions(-) diff --git a/deps/rabbit/src/rabbit_fifo.erl b/deps/rabbit/src/rabbit_fifo.erl index 634ba0a69b15..94df7f18b944 100644 --- a/deps/rabbit/src/rabbit_fifo.erl +++ b/deps/rabbit/src/rabbit_fifo.erl @@ -407,6 +407,7 @@ apply(Meta, #checkout{spec = Spec, meta = ConsumerMeta, credit = Credit, delivery_count = DeliveryCount, next_msg_id = NextMsgId} = Consumer, + %% reply with a consumer summary Reply = {ok, #{next_msg_id => NextMsgId, credit => Credit, diff --git a/deps/rabbit/src/rabbit_fifo_client.erl b/deps/rabbit/src/rabbit_fifo_client.erl index 5827a07893cd..eee60a4834bd 100644 --- a/deps/rabbit/src/rabbit_fifo_client.erl +++ b/deps/rabbit/src/rabbit_fifo_client.erl @@ -47,9 +47,13 @@ {rabbit_fifo:consumer_tag(), non_neg_integer()}}. -type actions() :: [action()]. +<<<<<<< HEAD -type cluster_name() :: rabbit_types:r(queue). -record(consumer, {last_msg_id :: seq() | -1, +======= +-record(consumer, {last_msg_id :: seq() | -1 | undefined, +>>>>>>> 0390886305 (QQ: fix bug when subscribing using an already existing consumer tag) ack = false :: boolean(), delivery_count = 0 :: non_neg_integer()}). @@ -393,8 +397,20 @@ checkout(ConsumerTag, NumUnsettled, CreditMode, Meta, %% this is the pre 3.11.1 / 3.10.9 %% reply format -1; - {ok, #{next_msg_id := NextMsgId}} -> - NextMsgId - 1 + {ok, #{num_checked_out := NumChecked, + next_msg_id := NextMsgId}} -> + case NumChecked > 0 of + true -> + %% we cannot know if the pending messages + %% have been delivered to the client or they + %% are on their way to the current process. + %% We set `undefined' to signal this uncertainty + %% and will just accept the next arriving message + %% irrespective of message id + undefined; + false -> + NextMsgId - 1 + end end, SDels = maps:update_with( ConsumerTag, fun (C) -> C#consumer{ack = Ack} end, @@ -749,7 +765,11 @@ handle_delivery(Leader, {delivery, Tag, [{FstId, _} | _] = IdMsgs}, %% TODO: remove potential default allocation case Consumer of #consumer{last_msg_id = Prev} = C - when FstId =:= Prev+1 -> + when Prev =:= undefined orelse FstId =:= Prev+1 -> + %% Prev =:= undefined is a special case where a checkout was done + %% for a previously cancelled consumer that still had pending messages + %% In this case we can't reliably know what the next expected message + %% id should be so have to accept whatever message comes next maybe_auto_ack(Ack, Del, State0#state{consumer_deliveries = update_consumer(Tag, LastId, diff --git a/deps/rabbit/test/quorum_queue_SUITE.erl b/deps/rabbit/test/quorum_queue_SUITE.erl index f952239d6a3a..f44fb58a03ab 100644 --- a/deps/rabbit/test/quorum_queue_SUITE.erl +++ b/deps/rabbit/test/quorum_queue_SUITE.erl @@ -1695,7 +1695,7 @@ confirm_availability_on_leader_change(Config) -> flush(T) -> receive X -> - ct:pal("flushed ~w", [X]), + ct:pal("flushed ~p", [X]), flush(T) after T -> ok @@ -2987,12 +2987,13 @@ cancel_and_consume_with_same_tag(Config) -> DeclareRslt0 = declare(Ch, QQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}]), ?assertMatch(ExpectedDeclareRslt0, DeclareRslt0), - ok = publish(Ch, QQ), + ok = publish(Ch, QQ, <<"msg1">>), ok = subscribe(Ch, QQ, false), DeliveryTag = receive - {#'basic.deliver'{delivery_tag = D}, _} -> + {#'basic.deliver'{delivery_tag = D}, + #amqp_msg{payload = <<"msg1">>}} -> D after 5000 -> flush(100), @@ -3003,10 +3004,11 @@ cancel_and_consume_with_same_tag(Config) -> ok = subscribe(Ch, QQ, false), - ok = publish(Ch, QQ), + ok = publish(Ch, QQ, <<"msg2">>), receive - {#'basic.deliver'{delivery_tag = _}, _} -> + {#'basic.deliver'{delivery_tag = _}, + #amqp_msg{payload = <<"msg2">>}} -> ok after 5000 -> flush(100), diff --git a/deps/rabbit/test/rabbit_fifo_SUITE.erl b/deps/rabbit/test/rabbit_fifo_SUITE.erl index 0baa5c9cb6ea..252b16753d37 100644 --- a/deps/rabbit/test/rabbit_fifo_SUITE.erl +++ b/deps/rabbit/test/rabbit_fifo_SUITE.erl @@ -1969,6 +1969,50 @@ header_test(_) -> ?assertEqual(undefined, rabbit_fifo:get_header(blah, H5)), ok. +<<<<<<< HEAD +======= +chunk_disk_msgs_test(_Config) -> + %% NB: this does test an internal function + %% input to this function is a reversed list of MSGs + Input = [{I, ?MSG(I, 1000)} || I <- lists:seq(200, 1, -1)], + Chunks = rabbit_fifo:chunk_disk_msgs(Input, 0, [[]]), + ?assertMatch([_, _], Chunks), + [Chunk1, Chunk2] = Chunks, + ?assertMatch([{1, ?MSG(1, 1000)} | _], Chunk1), + %% the chunks are worked out in backwards order, hence the first chunk + %% will be a "remainder" chunk + ?assertMatch([{73, ?MSG(73, 1000)} | _], Chunk2), + ?assertEqual(128, length(Chunk2)), + ?assertEqual(72, length(Chunk1)), + + TwoBigMsgs = [{124, ?MSG(124, 200_000)}, + {123, ?MSG(123, 200_000)}], + ?assertMatch([[{123, ?MSG(123, 200_000)}], + [{124, ?MSG(124, 200_000)}]], + rabbit_fifo:chunk_disk_msgs(TwoBigMsgs, 0, [[]])), + ok. + +checkout_metadata_test(Config) -> + Cid = {<<"cid">>, self()}, + {State00, _} = enq(Config, 1, 1, first, test_init(test)), + {State0, _} = enq(Config, 2, 2, second, State00), + %% NB: the consumer meta data is taken _before_ it runs a checkout + %% so in this case num_checked_out will be 0 + {State1, {ok, #{next_msg_id := 0, + num_checked_out := 0}}, _} = + apply(meta(Config, ?LINE), + rabbit_fifo:make_checkout(Cid, {auto, 1, simple_prefetch}, #{}), + State0), + {State2, _, _} = apply(meta(Config, ?LINE), + rabbit_fifo:make_checkout(Cid, cancel, #{}), State1), + {_State3, {ok, #{next_msg_id := 1, + num_checked_out := 1}}, _} = + apply(meta(Config, ?LINE), + rabbit_fifo:make_checkout(Cid, {auto, 1, simple_prefetch}, #{}), + State2), + ok. + +>>>>>>> 0390886305 (QQ: fix bug when subscribing using an already existing consumer tag) %% Utility init(Conf) -> rabbit_fifo:init(Conf). From c3c24e652e0174e9b306097268a93ab9a37dcc46 Mon Sep 17 00:00:00 2001 From: Karl Nilsson Date: Fri, 25 Aug 2023 15:33:44 +0100 Subject: [PATCH 2/2] fix conflicts --- deps/rabbit/src/rabbit_fifo_client.erl | 4 ---- deps/rabbit/test/rabbit_fifo_SUITE.erl | 24 ------------------------ 2 files changed, 28 deletions(-) diff --git a/deps/rabbit/src/rabbit_fifo_client.erl b/deps/rabbit/src/rabbit_fifo_client.erl index eee60a4834bd..ebd5265f74cf 100644 --- a/deps/rabbit/src/rabbit_fifo_client.erl +++ b/deps/rabbit/src/rabbit_fifo_client.erl @@ -47,13 +47,9 @@ {rabbit_fifo:consumer_tag(), non_neg_integer()}}. -type actions() :: [action()]. -<<<<<<< HEAD -type cluster_name() :: rabbit_types:r(queue). --record(consumer, {last_msg_id :: seq() | -1, -======= -record(consumer, {last_msg_id :: seq() | -1 | undefined, ->>>>>>> 0390886305 (QQ: fix bug when subscribing using an already existing consumer tag) ack = false :: boolean(), delivery_count = 0 :: non_neg_integer()}). diff --git a/deps/rabbit/test/rabbit_fifo_SUITE.erl b/deps/rabbit/test/rabbit_fifo_SUITE.erl index 252b16753d37..86152d2695bf 100644 --- a/deps/rabbit/test/rabbit_fifo_SUITE.erl +++ b/deps/rabbit/test/rabbit_fifo_SUITE.erl @@ -1969,29 +1969,6 @@ header_test(_) -> ?assertEqual(undefined, rabbit_fifo:get_header(blah, H5)), ok. -<<<<<<< HEAD -======= -chunk_disk_msgs_test(_Config) -> - %% NB: this does test an internal function - %% input to this function is a reversed list of MSGs - Input = [{I, ?MSG(I, 1000)} || I <- lists:seq(200, 1, -1)], - Chunks = rabbit_fifo:chunk_disk_msgs(Input, 0, [[]]), - ?assertMatch([_, _], Chunks), - [Chunk1, Chunk2] = Chunks, - ?assertMatch([{1, ?MSG(1, 1000)} | _], Chunk1), - %% the chunks are worked out in backwards order, hence the first chunk - %% will be a "remainder" chunk - ?assertMatch([{73, ?MSG(73, 1000)} | _], Chunk2), - ?assertEqual(128, length(Chunk2)), - ?assertEqual(72, length(Chunk1)), - - TwoBigMsgs = [{124, ?MSG(124, 200_000)}, - {123, ?MSG(123, 200_000)}], - ?assertMatch([[{123, ?MSG(123, 200_000)}], - [{124, ?MSG(124, 200_000)}]], - rabbit_fifo:chunk_disk_msgs(TwoBigMsgs, 0, [[]])), - ok. - checkout_metadata_test(Config) -> Cid = {<<"cid">>, self()}, {State00, _} = enq(Config, 1, 1, first, test_init(test)), @@ -2012,7 +1989,6 @@ checkout_metadata_test(Config) -> State2), ok. ->>>>>>> 0390886305 (QQ: fix bug when subscribing using an already existing consumer tag) %% Utility init(Conf) -> rabbit_fifo:init(Conf).