Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Prometheus metric for messages dropped by MQTT QoS 0 Queue #9080

Merged
merged 2 commits into from
Aug 16, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
41 changes: 40 additions & 1 deletion deps/rabbit/include/rabbit_global_counters.hrl
Original file line number Diff line number Diff line change
@@ -1,2 +1,41 @@
-define(NUM_PROTOCOL_COUNTERS, 8).
-define(NUM_PROTOCOL_QUEUE_TYPE, 8).
-define(NUM_PROTOCOL_QUEUE_TYPE_COUNTERS, 8).

%% Dead Letter counters:
%%
%% The following two counters are mutually exclusive because
%% quorum queue dead-letter-strategy at-least-once is incompatible with overflow drop-head.
-define(MESSAGES_DEAD_LETTERED_MAXLEN, 1).
-define(MESSAGES_DEAD_LETTERED_CONFIRMED, 1).
-define(MESSAGES_DEAD_LETTERED_EXPIRED, 2).
-define(MESSAGES_DEAD_LETTERED_REJECTED, 3).
-define(MESSAGES_DEAD_LETTERED_DELIVERY_LIMIT, 4).

-define(MESSAGES_DEAD_LETTERED_MAXLEN_COUNTER,
{messages_dead_lettered_maxlen_total, ?MESSAGES_DEAD_LETTERED_MAXLEN, counter,
"Total number of messages dead-lettered due to overflow drop-head or reject-publish-dlx"
}).

-define(MESSAGES_DEAD_LETTERED_CONFIRMED_COUNTER,
{
messages_dead_lettered_confirmed_total, ?MESSAGES_DEAD_LETTERED_CONFIRMED, counter,
"Total number of messages dead-lettered and confirmed by target queues"
}).

-define(MESSAGES_DEAD_LETTERED_EXPIRED_COUNTER,
{
messages_dead_lettered_expired_total, ?MESSAGES_DEAD_LETTERED_EXPIRED, counter,
"Total number of messages dead-lettered due to message TTL exceeded"
}).

-define(MESSAGES_DEAD_LETTERED_REJECTED_COUNTER,
{
messages_dead_lettered_rejected_total, ?MESSAGES_DEAD_LETTERED_REJECTED, counter,
"Total number of messages dead-lettered due to basic.reject or basic.nack"
}).

-define(MESSAGES_DEAD_LETTERED_DELIVERY_LIMIT_COUNTER,
{
messages_dead_lettered_delivery_limit_total, ?MESSAGES_DEAD_LETTERED_DELIVERY_LIMIT, counter,
"Total number of messages dead-lettered due to delivery-limit exceeded"
}).
75 changes: 34 additions & 41 deletions deps/rabbit/src/rabbit_global_counters.erl
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@

-module(rabbit_global_counters).

-include("rabbit_global_counters.hrl").

-export([
boot_step/0,
init/1,
Expand Down Expand Up @@ -128,57 +130,48 @@
}
]).

-define(MESSAGES_DEAD_LETTERED_EXPIRED, 1).
-define(MESSAGES_DEAD_LETTERED_REJECTED, 2).
%% The following two counters are mutually exclusive because
%% quorum queue dead-letter-strategy at-least-once is incompatible with overflow drop-head.
-define(MESSAGES_DEAD_LETTERED_MAXLEN, 3).
-define(MESSAGES_DEAD_LETTERED_CONFIRMED, 3).
-define(MESSAGES_DEAD_LETTERED_DELIVERY_LIMIT, 4).
-define(MESSAGES_DEAD_LETTERED_COUNTERS,
[
{
messages_dead_lettered_expired_total, ?MESSAGES_DEAD_LETTERED_EXPIRED, counter,
"Total number of messages dead-lettered due to message TTL exceeded"
},
{
messages_dead_lettered_rejected_total, ?MESSAGES_DEAD_LETTERED_REJECTED, counter,
"Total number of messages dead-lettered due to basic.reject or basic.nack"
}
]).
-define(MESSAGES_DEAD_LETTERED_MAXLEN_COUNTER,
{
messages_dead_lettered_maxlen_total, ?MESSAGES_DEAD_LETTERED_MAXLEN, counter,
"Total number of messages dead-lettered due to overflow drop-head or reject-publish-dlx"
}).
-define(MESSAGES_DEAD_LETTERED_CONFIRMED_COUNTER,
{
messages_dead_lettered_confirmed_total, ?MESSAGES_DEAD_LETTERED_CONFIRMED, counter,
"Total number of messages dead-lettered and confirmed by target queues"
}).
-define(MESSAGES_DEAD_LETTERED_DELIVERY_LIMIT_COUNTER,
{
messages_dead_lettered_delivery_limit_total, ?MESSAGES_DEAD_LETTERED_DELIVERY_LIMIT, counter,
"Total number of messages dead-lettered due to delivery-limit exceeded"
}).

boot_step() ->
%% Protocol counters
init([{protocol, amqp091}]),

%% Protocol & Queue Type counters
init([{protocol, amqp091}, {queue_type, rabbit_classic_queue}]),
init([{protocol, amqp091}, {queue_type, rabbit_quorum_queue}]),
init([{protocol, amqp091}, {queue_type, rabbit_stream_queue}]),

%% Dead Letter counters
%%
%% Streams never dead letter.
%%
%% Source classic queue dead letters.
init([{queue_type, rabbit_classic_queue}, {dead_letter_strategy, disabled}],
[?MESSAGES_DEAD_LETTERED_MAXLEN_COUNTER]),
[?MESSAGES_DEAD_LETTERED_MAXLEN_COUNTER,
?MESSAGES_DEAD_LETTERED_EXPIRED_COUNTER,
?MESSAGES_DEAD_LETTERED_REJECTED_COUNTER]),
init([{queue_type, rabbit_classic_queue}, {dead_letter_strategy, at_most_once}],
[?MESSAGES_DEAD_LETTERED_MAXLEN_COUNTER]),
[?MESSAGES_DEAD_LETTERED_MAXLEN_COUNTER,
?MESSAGES_DEAD_LETTERED_EXPIRED_COUNTER,
?MESSAGES_DEAD_LETTERED_REJECTED_COUNTER]),
%%
%% Source quorum queue dead letters.
%% Only quorum queues can dead letter due to delivery-limit exceeded.
%% Only quorum queues support dead letter strategy at-least-once.
init([{queue_type, rabbit_quorum_queue}, {dead_letter_strategy, disabled}],
[?MESSAGES_DEAD_LETTERED_MAXLEN_COUNTER,
?MESSAGES_DEAD_LETTERED_DELIVERY_LIMIT_COUNTER]),
?MESSAGES_DEAD_LETTERED_EXPIRED_COUNTER,
?MESSAGES_DEAD_LETTERED_REJECTED_COUNTER,
?MESSAGES_DEAD_LETTERED_DELIVERY_LIMIT_COUNTER
]),
init([{queue_type, rabbit_quorum_queue}, {dead_letter_strategy, at_most_once}],
[?MESSAGES_DEAD_LETTERED_MAXLEN_COUNTER,
?MESSAGES_DEAD_LETTERED_DELIVERY_LIMIT_COUNTER]),
?MESSAGES_DEAD_LETTERED_EXPIRED_COUNTER,
?MESSAGES_DEAD_LETTERED_REJECTED_COUNTER,
?MESSAGES_DEAD_LETTERED_DELIVERY_LIMIT_COUNTER
]),
init([{queue_type, rabbit_quorum_queue}, {dead_letter_strategy, at_least_once}],
[?MESSAGES_DEAD_LETTERED_CONFIRMED_COUNTER,
?MESSAGES_DEAD_LETTERED_EXPIRED_COUNTER,
?MESSAGES_DEAD_LETTERED_REJECTED_COUNTER,
?MESSAGES_DEAD_LETTERED_DELIVERY_LIMIT_COUNTER
]).

Expand All @@ -193,9 +186,9 @@ init(Labels = [{protocol, Protocol}], Extra) ->
_ = seshat:new_group(?MODULE),
Counters = seshat:new(?MODULE, Labels, ?PROTOCOL_COUNTERS ++ Extra),
persistent_term:put({?MODULE, Protocol}, Counters);
init(Labels = [{queue_type, QueueType}, {dead_letter_strategy, DLS}], Extra) ->
init(Labels = [{queue_type, QueueType}, {dead_letter_strategy, DLS}], DeadLetterCounters) ->
_ = seshat:new_group(?MODULE),
Counters = seshat:new(?MODULE, Labels, ?MESSAGES_DEAD_LETTERED_COUNTERS ++ Extra),
Counters = seshat:new(?MODULE, Labels, DeadLetterCounters),
persistent_term:put({?MODULE, QueueType, DLS}, Counters).

overview() ->
Expand Down Expand Up @@ -263,9 +256,9 @@ consumer_deleted(Protocol) ->

messages_dead_lettered(Reason, QueueType, DeadLetterStrategy, Num) ->
Index = case Reason of
maxlen -> ?MESSAGES_DEAD_LETTERED_MAXLEN;
expired -> ?MESSAGES_DEAD_LETTERED_EXPIRED;
rejected -> ?MESSAGES_DEAD_LETTERED_REJECTED;
maxlen -> ?MESSAGES_DEAD_LETTERED_MAXLEN;
delivery_limit -> ?MESSAGES_DEAD_LETTERED_DELIVERY_LIMIT
end,
counters:add(fetch(QueueType, DeadLetterStrategy), Index, Num).
Expand Down
13 changes: 8 additions & 5 deletions deps/rabbitmq_mqtt/src/rabbit_mqtt.erl
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@

-include("rabbit_mqtt.hrl").
-include("rabbit_mqtt_packet.hrl").
-include_lib("rabbit/include/rabbit_global_counters.hrl").
-include_lib("stdlib/include/assert.hrl").

-export([start/2, stop/1]).
Expand Down Expand Up @@ -93,16 +94,18 @@ local_connection_pids() ->
end.

init_global_counters() ->
init_global_counters(?MQTT_PROTO_V3),
init_global_counters(?MQTT_PROTO_V4),
init_global_counters(?MQTT_PROTO_V5).
lists:foreach(fun init_global_counters/1, [?MQTT_PROTO_V3,
?MQTT_PROTO_V4,
?MQTT_PROTO_V5]).

init_global_counters(ProtoVer) ->
Proto = {protocol, ProtoVer},
rabbit_global_counters:init([Proto]),
rabbit_global_counters:init([Proto, {queue_type, ?QUEUE_TYPE_QOS_0}]),
rabbit_global_counters:init([Proto, {queue_type, rabbit_classic_queue}]),
rabbit_global_counters:init([Proto, {queue_type, rabbit_quorum_queue}]).
rabbit_global_counters:init([Proto, {queue_type, rabbit_quorum_queue}]),
rabbit_global_counters:init([Proto, {queue_type, ?QUEUE_TYPE_QOS_0}]),
rabbit_global_counters:init([{queue_type, ?QUEUE_TYPE_QOS_0}, {dead_letter_strategy, disabled}],
[?MESSAGES_DEAD_LETTERED_MAXLEN_COUNTER]).

persist_static_configuration() ->
rabbit_mqtt_util:init_sparkplug(),
Expand Down
2 changes: 2 additions & 0 deletions deps/rabbitmq_mqtt/src/rabbit_mqtt_processor.erl
Original file line number Diff line number Diff line change
Expand Up @@ -1982,6 +1982,8 @@ handle_queue_event({queue_event, ?QUEUE_TYPE_QOS_0, Msg},
false ->
deliver_one_to_client(Msg, false, State0);
true ->
rabbit_global_counters:messages_dead_lettered(
maxlen, ?QUEUE_TYPE_QOS_0, disabled, 1),
State0#state{qos0_messages_dropped = N + 1}
end,
{ok, State};
Expand Down
33 changes: 31 additions & 2 deletions deps/rabbitmq_mqtt/test/reader_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -254,7 +254,22 @@ event_authentication_failure(Config) ->
%% Test that queue type rabbit_mqtt_qos0_queue drops QoS 0 messages when its
%% max length is reached.
rabbit_mqtt_qos0_queue_overflow(Config) ->
ok = rabbit_ct_broker_helpers:enable_feature_flag(Config, rabbit_mqtt_qos0_queue),
ProtoVer = case ?config(mqtt_version, Config) of
v4 -> mqtt311;
v5 -> mqtt50
end,
QType = rabbit_mqtt_qos0_queue,

#{
[{protocol, ProtoVer}, {queue_type, QType}] :=
#{messages_delivered_total := 0,
messages_delivered_consume_auto_ack_total := 0},

[{queue_type, QType}, {dead_letter_strategy, disabled}] :=
#{messages_dead_lettered_maxlen_total := NumDeadLettered}
} = rabbit_ct_broker_helpers:rpc(Config, rabbit_global_counters, overview, []),

ok = rabbit_ct_broker_helpers:enable_feature_flag(Config, QType),

Topic = atom_to_binary(?FUNCTION_NAME),
Msg = binary:copy(<<"x">>, 4000),
Expand Down Expand Up @@ -289,7 +304,8 @@ rabbit_mqtt_qos0_queue_overflow(Config) ->
{status, _, _, [_, _, _, _, Misc]} = sys:get_status(ServerConnectionPid),
[State] = [S || {data, [{"State", S}]} <- Misc],
#{proc_state := #{qos0_messages_dropped := NumDropped}} = State,
ct:pal("NumReceived=~b~nNumDropped=~b", [NumReceived, NumDropped]),

ct:pal("NumReceived=~b NumDropped=~b", [NumReceived, NumDropped]),

%% We expect that
%% 1. all sent messages were either received or dropped
Expand All @@ -302,6 +318,19 @@ rabbit_mqtt_qos0_queue_overflow(Config) ->
%% of mailbox_soft_limit=200 should not be dropped
?assert(NumReceived >= 200),

%% Assert that Prometheus metrics counted correctly.
ExpectedNumDeadLettered = NumDeadLettered + NumDropped,
?assertMatch(
#{
[{protocol, ProtoVer}, {queue_type, QType}] :=
#{messages_delivered_total := NumReceived,
messages_delivered_consume_auto_ack_total := NumReceived},

[{queue_type, QType}, {dead_letter_strategy, disabled}] :=
#{messages_dead_lettered_maxlen_total := ExpectedNumDeadLettered}
},
rabbit_ct_broker_helpers:rpc(Config, rabbit_global_counters, overview, [])),

ok = emqtt:disconnect(Sub),
ok = emqtt:disconnect(Pub).

Expand Down
2 changes: 1 addition & 1 deletion deps/rabbitmq_mqtt/test/v5_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -866,7 +866,7 @@ at_most_once_dead_letter_detect_cycle(Config) ->
ok = emqtt:disconnect(Pub),
%% Given our subscribing client is disconnected, the message should be dead lettered after 1 ms.
%% However, due to the dead letter cycle, we expect the message to be dropped.
timer:sleep(5),
timer:sleep(20),
Sub2 = connect(SubClientId, Config, [{clean_start, false}]),
assert_nothing_received(),
%% Double check that the message was indeed (exactly once) dead lettered.
Expand Down
5 changes: 3 additions & 2 deletions deps/rabbitmq_prometheus/metrics.md
Original file line number Diff line number Diff line change
Expand Up @@ -80,8 +80,9 @@ To generate these:
Metrics `rabbitmq_global_messages_dead_lettered_*` have labels `queue_type` and `dead_letter_strategy`.

Label `queue_type` denotes the type of queue messages were discarded from. It can have value
* `rabbit_classic_queue`, or
* `rabbit_quorum_queue`
* `rabbit_classic_queue`,
* `rabbit_quorum_queue`, or
* `rabbit_mqtt_qos0_queue`

(Queue type `rabbit_stream_queue` does not dead letter messages.)

Expand Down