Skip to content

Commit

Permalink
Allow setting consumer timeout via queue policy/arg and as consumer a…
Browse files Browse the repository at this point in the history
…rg. Close #5437
  • Loading branch information
deadtrickster committed Apr 25, 2023
1 parent a7365d0 commit 1f8f868
Show file tree
Hide file tree
Showing 4 changed files with 138 additions and 50 deletions.
78 changes: 56 additions & 22 deletions deps/rabbit/src/rabbit_channel.erl
Original file line number Diff line number Diff line change
Expand Up @@ -2727,7 +2727,7 @@ handle_deliver(CTag, Ack, Msg, State) ->
handle_deliver0(CTag, Ack, Msg, State).

handle_deliver0(ConsumerTag, AckRequired,
Msg = {QName, QPid, _MsgId, Redelivered,
Msg = {QName, QPid, _MsgId, Redelivered,
#basic_message{exchange_name = ExchangeName,
routing_keys = [RoutingKey | _CcRoutes],
content = Content}},
Expand Down Expand Up @@ -2804,34 +2804,68 @@ get_operation_timeout_and_deadline() ->
Deadline = now_millis() + Timeout,
{Timeout, Deadline}.

evaluate_consumer_timeout(State0 = #ch{cfg = #conf{channel = Channel,
consumer_timeout = Timeout},
unacked_message_q = UAMQ}) ->
Now = os:system_time(millisecond),
get_queue_consumer_timeout(_PA = #pending_ack{queue = QName},
_State = #ch{cfg = #conf{consumer_timeout = GCT}}) ->
case rabbit_amqqueue:lookup(QName) of
{ok, Q} -> %% should we account for different queue states here?
case rabbit_queue_type_util:args_policy_lookup(<<"consumer-timeout">>,
fun (X, Y) -> erlang:min(X, Y) end, Q) of
undefined -> GCT;
Val -> Val
end;
_ ->
GCT
end.

get_consumer_timeout(PA = #pending_ack{tag = CTag},
State = #ch{consumer_mapping = CMap,
cfg = #conf{consumer_timeout = GCT}}) ->
case maps:find(CTag, CMap) of
{ok, {_, {_, _, _, Args}}} ->
case rabbit_misc:table_lookup(Args, <<"x-consumer-timeout">>) of
{long, Timeout} -> Timeout;
_ -> get_queue_consumer_timeout(PA, State)
end;
_ ->
get_queue_consumer_timeout(PA, State)
end.

evaluate_consumer_timeout(State = #ch{unacked_message_q = UAMQ}) ->
case ?QUEUE:get(UAMQ, empty) of
#pending_ack{delivery_tag = ConsumerTag,
delivered_at = Time}
when is_integer(Timeout)
andalso Time < Now - Timeout ->
rabbit_log_channel:warning("Consumer ~ts on channel ~w has timed out "
"waiting for delivery acknowledgement. Timeout used: ~tp ms. "
"This timeout value can be configured, see consumers doc guide to learn more",
[rabbit_data_coercion:to_binary(ConsumerTag),
Channel, Timeout]),
Ex = rabbit_misc:amqp_error(precondition_failed,
"delivery acknowledgement on channel ~w timed out. "
"Timeout value used: ~tp ms. "
"This timeout value can be configured, see consumers doc guide to learn more",
[Channel, Timeout], none),
handle_exception(Ex, State0);
empty ->
{noreply, State};
PA -> evaluate_consumer_timeout1(PA, State)
end.

evaluate_consumer_timeout1(PA = #pending_ack{delivered_at = Time},
State) ->
Now = os:system_time(millisecond),
case get_consumer_timeout(PA, State) of
Timeout when is_integer(Timeout)
andalso Time < Now - Timeout ->
handle_consumer_timed_out(Timeout, PA, State);
_ ->
{noreply, State0}
{noreply, State}
end.

handle_consumer_timed_out(Timeout,#pending_ack{delivery_tag = DeliveryTag},
State = #ch{cfg = #conf{channel = Channel}}) ->
rabbit_log_channel:warning("Consumer ~ts on channel ~w has timed out "
"waiting for delivery acknowledgement. Timeout used: ~tp ms. "
"This timeout value can be configured, see consumers doc guide to learn more",
[rabbit_data_coercion:to_binary(DeliveryTag),
Channel, Timeout]),
Ex = rabbit_misc:amqp_error(precondition_failed,
"delivery acknowledgement on channel ~w timed out. "
"Timeout value used: ~tp ms. "
"This timeout value can be configured, see consumers doc guide to learn more",
[Channel, Timeout], none),
handle_exception(Ex, State).

handle_queue_actions(Actions, #ch{} = State0) ->
WriterPid = State0#ch.cfg#conf.writer_pid,
lists:foldl(
fun
fun
({settled, QRef, MsgSeqNos}, S0) ->
confirm(MsgSeqNos, QRef, S0);
({rejected, _QRef, MsgSeqNos}, S0) ->
Expand Down
2 changes: 1 addition & 1 deletion deps/rabbit/src/rabbit_networking.erl
Original file line number Diff line number Diff line change
Expand Up @@ -523,7 +523,7 @@ emit_connection_info_all(Nodes, Items, Ref, AggregatorPid) ->
emit_connection_info_local(Items, Ref, AggregatorPid) ->
rabbit_control_misc:emitting_map_with_exit_handler(
AggregatorPid, Ref, fun(Q) -> connection_info(Q, Items) end,
connections_local()).
connections_local() ++ rabbit_networking:local_non_amqp_connections()).

-spec close_connection(pid(), string()) -> 'ok'.

Expand Down
7 changes: 7 additions & 0 deletions deps/rabbit/src/rabbit_policies.erl
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ register() ->
%% such as rabbit_mirror_queue_misc
[rabbit_registry:register(Class, Name, ?MODULE) ||
{Class, Name} <- [{policy_validator, <<"alternate-exchange">>},
{policy_validator, <<"consumer-timeout">>},
{policy_validator, <<"dead-letter-exchange">>},
{policy_validator, <<"dead-letter-routing-key">>},
{policy_validator, <<"dead-letter-strategy">>},
Expand Down Expand Up @@ -74,6 +75,12 @@ validate_policy0(<<"alternate-exchange">>, Value)
validate_policy0(<<"alternate-exchange">>, Value) ->
{error, "~tp is not a valid alternate exchange name", [Value]};

validate_policy0(<<"consumer-timeout">>, Value)
when is_integer(Value), Value >= 0 ->
ok;
validate_policy0(<<"consumer-timeout">>, Value) ->
{error, "~tp is not a valid consumer timeout", [Value]};

validate_policy0(<<"dead-letter-exchange">>, Value)
when is_binary(Value) ->
ok;
Expand Down
101 changes: 74 additions & 27 deletions deps/rabbit/test/consumer_timeout_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -13,27 +13,58 @@

-compile(export_all).

-define(TIMEOUT, 30000).
-define(CONSUMER_TIMEOUT, 3000).
-define(RECEIVE_TIMEOUT, 5000).

-define(GROUP_CONFIG,
#{global_consumer_timeout => [{rabbit, [{consumer_timeout, ?CONSUMER_TIMEOUT}]},
{queue_policy, []},
{queue_arguments, []},
{consumer_arguments, []}],
queue_policy_consumer_timeout => [{rabbit, []},
{queue_policy, [{<<"consumer-timeout">>, ?CONSUMER_TIMEOUT}]},
{queue_arguments, []},
{consumer_arguments, []}],
queue_argument_consumer_timeout => [{rabbit, []},
{queue_policy, []},
{queue_arguments, [{<<"x-consumer-timeout">>, long, ?CONSUMER_TIMEOUT}]},
{consumer_arguments, []}],
consumer_argument_consumer_timeout => [{rabbit, []},
{queue_policy, []},
{queue_arguments, []},
{consumer_arguments, [{<<"x-consumer-timeout">>, long, ?CONSUMER_TIMEOUT}]}]}).

-import(quorum_queue_utils, [wait_for_messages/2]).

all() ->
[
{group, parallel_tests}
{group, global_consumer_timeout},
{group, queue_policy_consumer_timeout},
{group, queue_argument_consumer_timeout},
{group, consumer_argument_consumer_timeout}
].

groups() ->
AllTests = [consumer_timeout,
consumer_timeout_basic_get,
consumer_timeout_no_basic_cancel_capability
],
[
{parallel_tests, [],
[
ConsumerTests = [consumer_timeout,
consumer_timeout_no_basic_cancel_capability],
AllTests = ConsumerTests ++ [consumer_timeout_basic_get],

ConsumerTestsParallel = [
{classic_queue, [parallel], ConsumerTests},
{mirrored_queue, [parallel], ConsumerTests},
{quorum_queue, [parallel], ConsumerTests}
],

AllTestsParallel = [
{classic_queue, [parallel], AllTests},
{mirrored_queue, [parallel], AllTests},
{quorum_queue, [parallel], AllTests}
]}
],
[
{global_consumer_timeout, [], AllTestsParallel},
{queue_policy_consumer_timeout, [], AllTestsParallel},
{queue_argument_consumer_timeout, [], AllTestsParallel},
{consumer_argument_consumer_timeout, [], ConsumerTestsParallel}
].

suite() ->
Expand All @@ -55,33 +86,36 @@ end_per_suite(Config) ->
init_per_group(classic_queue, Config) ->
rabbit_ct_helpers:set_config(
Config,
[{queue_args, [{<<"x-queue-type">>, longstr, <<"classic">>}]},
[{policy_type, <<"classic_queues">>},
{queue_args, [{<<"x-queue-type">>, longstr, <<"classic">>}]},
{queue_durable, true}]);
init_per_group(quorum_queue, Config) ->
rabbit_ct_helpers:set_config(
Config,
[{queue_args, [{<<"x-queue-type">>, longstr, <<"quorum">>}]},
[{policy_type, <<"quorum_queues">>},
{queue_args, [{<<"x-queue-type">>, longstr, <<"quorum">>}]},
{queue_durable, true}]);
init_per_group(mirrored_queue, Config) ->
rabbit_ct_broker_helpers:set_ha_policy(Config, 0, <<"^max_length.*queue">>,
<<"all">>, [{<<"ha-sync-mode">>, <<"automatic">>}]),
Config1 = rabbit_ct_helpers:set_config(
Config, [{is_mirrored, true},
Config, [{policy_type, <<"classic_queues">>},
{is_mirrored, true},
{queue_args, [{<<"x-queue-type">>, longstr, <<"classic">>}]},
{queue_durable, true}]),
rabbit_ct_helpers:run_steps(Config1, []);
init_per_group(Group, Config0) ->
case lists:member({group, Group}, all()) of
true ->
GroupConfig = maps:get(Group, ?GROUP_CONFIG),
ClusterSize = 3,
Config = rabbit_ct_helpers:merge_app_env(
Config0, {rabbit, [{channel_tick_interval, 1000},
{quorum_tick_interval, 1000},
{consumer_timeout, 5000}]}),
{quorum_tick_interval, 1000}] ++ ?config(rabbit, GroupConfig)}),
Config1 = rabbit_ct_helpers:set_config(
Config, [ {rmq_nodename_suffix, Group},
{rmq_nodes_count, ClusterSize}
]),
] ++ GroupConfig),
rabbit_ct_helpers:run_steps(Config1,
rabbit_ct_broker_helpers:setup_steps() ++
rabbit_ct_client_helpers:setup_steps());
Expand All @@ -92,6 +126,11 @@ init_per_group(Group, Config0) ->
end_per_group(Group, Config) ->
case lists:member({group, Group}, all()) of
true ->
case ?config(queue_policy, Config) of
[] -> ok;
_Policy ->
rabbit_ct_broker_helpers:clear_policy(Config, 0, <<"consumer_timeout_queue_test_policy">>)
end,
rabbit_ct_helpers:run_steps(Config,
rabbit_ct_client_helpers:teardown_steps() ++
rabbit_ct_broker_helpers:teardown_steps());
Expand Down Expand Up @@ -119,12 +158,12 @@ consumer_timeout(Config) ->
declare_queue(Ch, Config, QName),
publish(Ch, QName, [<<"msg1">>]),
wait_for_messages(Config, [[QName, <<"1">>, <<"1">>, <<"0">>]]),
subscribe(Ch, QName, false),
subscribe(Ch, QName, false, ?config(consumer_arguments, Config)),
erlang:monitor(process, Conn),
erlang:monitor(process, Ch),
receive
{'DOWN', _, process, Ch, _} -> ok
after 30000 ->
after ?RECEIVE_TIMEOUT ->
flush(1),
exit(channel_exit_expected)
end,
Expand All @@ -149,7 +188,7 @@ consumer_timeout_basic_get(Config) ->
erlang:monitor(process, Ch),
receive
{'DOWN', _, process, Ch, _} -> ok
after 30000 ->
after ?RECEIVE_TIMEOUT ->
flush(1),
exit(channel_exit_expected)
end,
Expand Down Expand Up @@ -187,18 +226,18 @@ consumer_timeout_no_basic_cancel_capability(Config) ->
wait_for_messages(Config, [[QName, <<"1">>, <<"1">>, <<"0">>]]),
erlang:monitor(process, Conn),
erlang:monitor(process, Ch),
subscribe(Ch, QName, false),
subscribe(Ch, QName, false, ?config(consumer_arguments, Config)),
receive
{#'basic.deliver'{delivery_tag = _,
redelivered = false}, _} ->
%% do nothing with the delivery should trigger timeout
ok
after 5000 ->
after ?RECEIVE_TIMEOUT ->
exit(deliver_timeout)
end,
receive
{'DOWN', _, process, Ch, _} -> ok
after 30000 ->
after ?RECEIVE_TIMEOUT ->
flush(1),
exit(channel_exit_expected)
end,
Expand All @@ -217,8 +256,14 @@ consumer_timeout_no_basic_cancel_capability(Config) ->
declare_queue(Ch, Config, QName) ->
Args = ?config(queue_args, Config),
Durable = ?config(queue_durable, Config),
case ?config(queue_policy, Config) of
[] -> ok;
Policy ->
rabbit_ct_broker_helpers:set_policy(Config, 0, <<"consumer_timeout_queue_test_policy">>,
<<".*">>, ?config(policy_type, Config), Policy)
end,
#'queue.declare_ok'{} = amqp_channel:call(Ch, #'queue.declare'{queue = QName,
arguments = Args,
arguments = Args ++ ?config(queue_arguments, Config),
durable = Durable}).
publish(Ch, QName, Payloads) ->
[amqp_channel:call(Ch, #'basic.publish'{routing_key = QName}, #amqp_msg{payload = Payload})
Expand All @@ -235,13 +280,15 @@ consume(Ch, QName, NoAck, Payloads) ->
DTag
end || Payload <- Payloads].

subscribe(Ch, Queue, NoAck) ->
subscribe(Ch, Queue, NoAck, <<"ctag">>).
subscribe(Ch, Queue, NoAck, Args) ->
subscribe(Ch, Queue, NoAck, <<"ctag">>, Args).

subscribe(Ch, Queue, NoAck, Ctag) ->
subscribe(Ch, Queue, NoAck, Ctag, Args) ->
amqp_channel:subscribe(Ch, #'basic.consume'{queue = Queue,
no_ack = NoAck,
consumer_tag = Ctag},
consumer_tag = Ctag,
arguments = Args
},
self()),
receive
#'basic.consume_ok'{consumer_tag = Ctag} ->
Expand Down

0 comments on commit 1f8f868

Please sign in to comment.