Skip to content

Commit

Permalink
Expose AMQP connection metrics
Browse files Browse the repository at this point in the history
Expose the same metrics for AMQP 1.0 connections as for AMQP 0.9.1 connections.

Display the following AMQP 1.0 metrics on the Management UI:
* Network bytes per second from/to client on connections page
* Number of sessions/channels on connections page
* Network bytes per second from/to client graph on connection page
* Reductions graph on connection page
* Garbage colletion info on connection page

Expose the following AMQP 1.0 per-object Prometheus metrics:
* rabbitmq_connection_incoming_bytes_total
* rabbitmq_connection_outgoing_bytes_total
* rabbitmq_connection_process_reductions_total
* rabbitmq_connection_incoming_packets_total
* rabbitmq_connection_outgoing_packets_total
* rabbitmq_connection_pending_packets
* rabbitmq_connection_channels

The rabbit_amqp_writer proc:
* notifies the rabbit_amqp_reader proc if it sent frames
* hibernates eventually if it doesn't send any frames

The rabbit_amqp_reader proc:
* does not emit stats (update ETS tables) if no frames are received
or sent to save resources when there are many idle connections.
  • Loading branch information
ansd committed Nov 2, 2024
1 parent ef2c8df commit 3db4a97
Show file tree
Hide file tree
Showing 10 changed files with 187 additions and 77 deletions.
108 changes: 76 additions & 32 deletions deps/rabbit/src/rabbit_amqp_reader.erl
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
-include_lib("kernel/include/logger.hrl").
-include_lib("rabbit_common/include/rabbit.hrl").
-include_lib("amqp10_common/include/amqp10_types.hrl").
-include("rabbit_amqp_reader.hrl").
-include("rabbit_amqp.hrl").

-export([init/1,
Expand Down Expand Up @@ -79,7 +80,8 @@
pending_recv :: boolean(),
buf :: list(),
buf_len :: non_neg_integer(),
tracked_channels :: #{channel_number() => Session :: pid()}
tracked_channels :: #{channel_number() => Session :: pid()},
stats_timer :: rabbit_event:state()
}).

-type state() :: #v1{}.
Expand All @@ -90,7 +92,7 @@

unpack_from_0_9_1(
{Sock, PendingRecv, SupPid, Buf, BufLen, ProxySocket,
ConnectionName, Host, PeerHost, Port, PeerPort, ConnectedAt},
ConnectionName, Host, PeerHost, Port, PeerPort, ConnectedAt, StatsTimer},
Parent) ->
logger:update_process_metadata(#{connection => ConnectionName}),
#v1{parent = Parent,
Expand All @@ -106,6 +108,7 @@ unpack_from_0_9_1(
tracked_channels = maps:new(),
writer = none,
connection_state = received_amqp3100,
stats_timer = StatsTimer,
connection = #v1_connection{
name = ConnectionName,
container_id = none,
Expand Down Expand Up @@ -201,6 +204,10 @@ mainloop(Deb, State = #v1{sock = Sock, buf = Buf, buf_len = BufLen}) ->
end
end.

handle_other(emit_stats, State) ->
emit_stats(State);
handle_other(ensure_stats_timer, State) ->
ensure_stats_timer(State);
handle_other({'EXIT', Parent, Reason}, State = #v1{parent = Parent}) ->
ReasonString = rabbit_misc:format("broker forced connection closure with reason '~w'",
[Reason]),
Expand Down Expand Up @@ -247,8 +254,16 @@ handle_other({'$gen_call', From, {info, Items}}, State) ->
end,
gen_server:reply(From, Reply),
State;
handle_other({'$gen_cast', {force_event_refresh, _Ref}}, State) ->
State;
handle_other({'$gen_cast', {force_event_refresh, Ref}}, State) ->
case ?IS_RUNNING(State) of
true ->
Infos = infos(?CONNECTION_EVENT_KEYS, State),
rabbit_event:notify(connection_created, Infos, Ref),
rabbit_event:init_stats_timer(State, #v1.stats_timer);
false ->
%% Ignore, we will emit a connection_created event once we start running.
State
end;
handle_other(terminate_connection, _State) ->
stop;
handle_other({set_credential, Cred}, State) ->
Expand Down Expand Up @@ -527,6 +542,7 @@ handle_connection_frame(
proplists:get_value(pid, Infos),
Infos),
ok = rabbit_event:notify(connection_created, Infos),
ok = maybe_emit_stats(State),
ok = rabbit_amqp1_0:register_connection(self()),
Caps = [%% https://docs.oasis-open.org/amqp/linkpair/v1.0/cs01/linkpair-v1.0-cs01.html#_Toc51331306
<<"LINK_PAIR_V1_0">>,
Expand Down Expand Up @@ -629,25 +645,26 @@ handle_input(handshake,
switch_callback(State, {frame_header, amqp}, 8);
handle_input({frame_header, Mode},
Header = <<Size:32, DOff:8, Type:8, Channel:16>>,
State) when DOff >= 2 ->
State0) when DOff >= 2 ->
case {Mode, Type} of
{amqp, 0} -> ok;
{sasl, 1} -> ok;
_ -> throw({bad_1_0_header_type, Header, Mode})
_ -> throw({bad_1_0_header_type, Header, Mode})
end,
MaxFrameSize = State#v1.connection#v1_connection.incoming_max_frame_size,
if Size =:= 8 ->
%% heartbeat
State;
Size > MaxFrameSize ->
handle_exception(
State, Channel, error_frame(
?V_1_0_CONNECTION_ERROR_FRAMING_ERROR,
"frame size (~b bytes) > maximum frame size (~b bytes)",
[Size, MaxFrameSize]));
true ->
switch_callback(State, {frame_body, Mode, DOff, Channel}, Size - 8)
end;
MaxFrameSize = State0#v1.connection#v1_connection.incoming_max_frame_size,
State = if Size =:= 8 ->
%% heartbeat
State0;
Size > MaxFrameSize ->
Err = error_frame(
?V_1_0_CONNECTION_ERROR_FRAMING_ERROR,
"frame size (~b bytes) > maximum frame size (~b bytes)",
[Size, MaxFrameSize]),
handle_exception(State0, Channel, Err);
true ->
switch_callback(State0, {frame_body, Mode, DOff, Channel}, Size - 8)
end,
ensure_stats_timer(State);
handle_input({frame_header, _Mode}, Malformed, _State) ->
throw({bad_1_0_header, Malformed});
handle_input({frame_body, Mode, DOff, Channel},
Expand Down Expand Up @@ -1013,13 +1030,18 @@ i(peer_host, #v1{connection = #v1_connection{peer_host = Val}}) ->
Val;
i(peer_port, #v1{connection = #v1_connection{peer_port = Val}}) ->
Val;
i(SockStat, S) when SockStat =:= recv_oct;
SockStat =:= recv_cnt;
SockStat =:= send_oct;
SockStat =:= send_cnt;
SockStat =:= send_pend ->
socket_info(fun (Sock) -> rabbit_net:getstat(Sock, [SockStat]) end,
fun ([{_, I}]) -> I end, S);
i(SockStat, #v1{sock = Sock})
when SockStat =:= recv_oct;
SockStat =:= recv_cnt;
SockStat =:= send_oct;
SockStat =:= send_cnt;
SockStat =:= send_pend ->
case rabbit_net:getstat(Sock, [SockStat]) of
{ok, [{SockStat, Val}]} ->
Val;
{error, _} ->
''
end;
i(ssl, #v1{sock = Sock}) -> rabbit_net:is_ssl(Sock);
i(SSL, #v1{sock = Sock, proxy_socket = ProxySock})
when SSL =:= ssl_protocol;
Expand All @@ -1045,15 +1067,37 @@ i(channels, #v1{tracked_channels = Channels}) ->
maps:size(Channels);
i(channel_max, #v1{connection = #v1_connection{channel_max = Max}}) ->
Max;
i(reductions = Item, _State) ->
{Item, Reductions} = erlang:process_info(self(), Item),
Reductions;
i(garbage_collection, _State) ->
rabbit_misc:get_gc_info(self());
i(Item, #v1{}) ->
throw({bad_argument, Item}).

%% From rabbit_reader
socket_info(Get, Select, #v1{sock = Sock}) ->
case Get(Sock) of
{ok, T} -> Select(T);
{error, _} -> ''
end.
maybe_emit_stats(State) ->
ok = rabbit_event:if_enabled(
State,
#v1.stats_timer,
fun() -> emit_stats(State) end).

emit_stats(State) ->
[{_, Pid},
{_, RecvOct},
{_, SendOct},
{_, Reductions}] = infos(?SIMPLE_METRICS, State),
Infos = infos(?OTHER_METRICS, State),
rabbit_core_metrics:connection_stats(Pid, Infos),
rabbit_core_metrics:connection_stats(Pid, RecvOct, SendOct, Reductions),
%% NB: Don't call ensure_stats_timer because it becomes expensive
%% if all idle non-hibernating connections emit stats.
rabbit_event:reset_stats_timer(State, #v1.stats_timer).

ensure_stats_timer(State)
when ?IS_RUNNING(State) ->
rabbit_event:ensure_stats_timer(State, #v1.stats_timer, emit_stats);
ensure_stats_timer(State) ->
State.

ignore_maintenance({map, Properties}) ->
lists:member(
Expand Down
17 changes: 17 additions & 0 deletions deps/rabbit/src/rabbit_amqp_reader.hrl
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
%% This Source Code Form is subject to the terms of the Mozilla Public
%% License, v. 2.0. If a copy of the MPL was not distributed with this
%% file, You can obtain one at https://mozilla.org/MPL/2.0/.
%%
%% Copyright (c) 2007-2024 Broadcom. All Rights Reserved. The term “Broadcom” refers to Broadcom Inc. and/or its subsidiaries. All rights reserved.

-define(SIMPLE_METRICS, [pid,
recv_oct,
send_oct,
reductions]).

-define(OTHER_METRICS, [recv_cnt,
send_cnt,
send_pend,
state,
channels,
garbage_collection]).
21 changes: 14 additions & 7 deletions deps/rabbit/src/rabbit_amqp_writer.erl
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,8 @@
pending :: iolist(),
%% This field is just an optimisation to minimize the cost of erlang:iolist_size/1
pending_size :: non_neg_integer(),
monitored_sessions :: #{pid() => true}
monitored_sessions :: #{pid() => true},
stats_timer :: rabbit_event:state()
}).

-define(HIBERNATE_AFTER, 6_000).
Expand Down Expand Up @@ -100,7 +101,8 @@ init({Sock, ReaderPid}) ->
reader = ReaderPid,
pending = [],
pending_size = 0,
monitored_sessions = #{}},
monitored_sessions = #{},
stats_timer = rabbit_event:init_stats_timer()},
process_flag(message_queue_data, off_heap),
{ok, State}.

Expand All @@ -123,6 +125,10 @@ handle_call({send_command, ChannelNum, Performative}, _From, State0) ->
State = flush(State1),
{reply, ok, State}.

handle_info(emit_stats, State0 = #state{reader = ReaderPid}) ->
ReaderPid ! ensure_stats_timer,
State = rabbit_event:reset_stats_timer(State0, #state.stats_timer),
no_reply(State);
handle_info(timeout, State0) ->
State = flush(State0),
{noreply, State};
Expand Down Expand Up @@ -223,18 +229,19 @@ tcp_send(Sock, Data) ->

maybe_flush(State = #state{pending_size = PendingSize}) ->
case PendingSize > ?FLUSH_THRESHOLD of
true -> flush(State);
true -> flush(State);
false -> State
end.

flush(State = #state{pending = []}) ->
State;
flush(State = #state{sock = Sock,
pending = Pending}) ->
flush(State0 = #state{sock = Sock,
pending = Pending}) ->
case rabbit_net:send(Sock, lists:reverse(Pending)) of
ok ->
State#state{pending = [],
pending_size = 0};
State = State0#state{pending = [],
pending_size = 0},
rabbit_event:ensure_stats_timer(State, #state.stats_timer, emit_stats);
{error, Reason} ->
exit({writer, send_failed, Reason})
end.
18 changes: 8 additions & 10 deletions deps/rabbit/src/rabbit_reader.erl
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@

-include_lib("rabbit_common/include/rabbit_framing.hrl").
-include_lib("rabbit_common/include/rabbit.hrl").
-include("rabbit_amqp_reader.hrl").

-export([start_link/2, info/2, force_event_refresh/2,
shutdown/2]).
Expand Down Expand Up @@ -116,10 +117,6 @@
connection_blocked_message_sent
}).

-define(SIMPLE_METRICS, [pid, recv_oct, send_oct, reductions]).
-define(OTHER_METRICS, [recv_cnt, send_cnt, send_pend, state, channels,
garbage_collection]).

-define(CREATION_EVENT_KEYS,
[pid, name, port, peer_port, host,
peer_host, ssl, peer_cert_subject, peer_cert_issuer,
Expand Down Expand Up @@ -1582,8 +1579,8 @@ i(state, #v1{connection_state = ConnectionState,
end;
i(garbage_collection, _State) ->
rabbit_misc:get_gc_info(self());
i(reductions, _State) ->
{reductions, Reductions} = erlang:process_info(self(), reductions),
i(reductions = Item, _State) ->
{Item, Reductions} = erlang:process_info(self(), Item),
Reductions;
i(Item, #v1{connection = Conn}) -> ic(Item, Conn).

Expand Down Expand Up @@ -1623,12 +1620,12 @@ maybe_emit_stats(State) ->

emit_stats(State) ->
[{_, Pid},
{_, Recv_oct},
{_, Send_oct},
{_, RecvOct},
{_, SendOct},
{_, Reductions}] = infos(?SIMPLE_METRICS, State),
Infos = infos(?OTHER_METRICS, State),
rabbit_core_metrics:connection_stats(Pid, Infos),
rabbit_core_metrics:connection_stats(Pid, Recv_oct, Send_oct, Reductions),
rabbit_core_metrics:connection_stats(Pid, RecvOct, SendOct, Reductions),
State1 = rabbit_event:reset_stats_timer(State, #v1.stats_timer),
ensure_stats_timer(State1).

Expand All @@ -1643,6 +1640,7 @@ pack_for_1_0(Buf, BufLen, #v1{sock = Sock,
pending_recv = PendingRecv,
helper_sup = {_HelperSup091, HelperSup10},
proxy_socket = ProxySocket,
stats_timer = StatsTimer,
connection = #connection{
name = Name,
host = Host,
Expand All @@ -1651,7 +1649,7 @@ pack_for_1_0(Buf, BufLen, #v1{sock = Sock,
peer_port = PeerPort,
connected_at = ConnectedAt}}) ->
{Sock, PendingRecv, HelperSup10, Buf, BufLen, ProxySocket,
Name, Host, PeerHost, Port, PeerPort, ConnectedAt}.
Name, Host, PeerHost, Port, PeerPort, ConnectedAt, StatsTimer}.

respond_and_close(State, Channel, Protocol, Reason, LogErr) ->
log_hard_error(State, Channel, LogErr),
Expand Down
4 changes: 2 additions & 2 deletions deps/rabbit_common/src/rabbit_core_metrics.erl
Original file line number Diff line number Diff line change
Expand Up @@ -141,9 +141,9 @@ connection_stats(Pid, Infos) ->
ets:insert(connection_metrics, {Pid, Infos}),
ok.

connection_stats(Pid, Recv_oct, Send_oct, Reductions) ->
connection_stats(Pid, RecvOct, SendOct, Reductions) ->
%% Includes delete marker
ets:insert(connection_coarse_metrics, {Pid, Recv_oct, Send_oct, Reductions, 0}),
ets:insert(connection_coarse_metrics, {Pid, RecvOct, SendOct, Reductions, 0}),
ok.

channel_created(Pid, Infos) ->
Expand Down
29 changes: 20 additions & 9 deletions deps/rabbit_common/src/rabbit_event.erl
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
-include("rabbit.hrl").

-export([start_link/0]).
-export([init_stats_timer/2, init_disabled_stats_timer/2,
-export([init_stats_timer/0, init_stats_timer/2, init_disabled_stats_timer/2,
ensure_stats_timer/3, stop_stats_timer/2, reset_stats_timer/2]).
-export([stats_level/2, if_enabled/3]).
-export([notify/2, notify/3, notify_if/3]).
Expand Down Expand Up @@ -89,23 +89,34 @@ start_link() ->
%% Nowadays, instead of sending a message to rabbit_event via notify(stats),
%% some stat-emitting objects update ETS tables directly via module rabbit_core_metrics.

init_stats_timer(C, P) ->
-spec init_stats_timer() -> state().
init_stats_timer() ->
%% If the rabbit app is not loaded - use default none:5000
StatsLevel = application:get_env(rabbit, collect_statistics, none),
Interval = application:get_env(rabbit, collect_statistics_interval, 5000),
setelement(P, C, #state{level = StatsLevel, interval = Interval,
timer = undefined}).
Interval = application:get_env(rabbit, collect_statistics_interval, 5000),
#state{level = StatsLevel,
interval = Interval,
timer = undefined}.

init_stats_timer(C, P) ->
State = init_stats_timer(),
setelement(P, C, State).

init_disabled_stats_timer(C, P) ->
setelement(P, C, #state{level = none, interval = 0, timer = undefined}).
State = #state{level = none,
interval = 0,
timer = undefined},
setelement(P, C, State).

ensure_stats_timer(C, P, Msg) ->
case element(P, C) of
#state{level = Level, interval = Interval, timer = undefined} = State
#state{level = Level,
interval = Interval,
timer = undefined} = State
when Level =/= none ->
TRef = erlang:send_after(Interval, self(), Msg),
setelement(P, C, State#state{timer = TRef});
#state{} ->
_State ->
C
end.

Expand Down Expand Up @@ -156,5 +167,5 @@ event_cons(Type, Props, Ref) ->
#event{type = Type,
props = Props,
reference = Ref,
timestamp = os:system_time(milli_seconds)}.
timestamp = os:system_time(millisecond)}.

Loading

0 comments on commit 3db4a97

Please sign in to comment.