From fc33e8332ef1f06c6040d53bcb82c0c077ba9565 Mon Sep 17 00:00:00 2001 From: Luke Bakken Date: Tue, 12 Sep 2023 05:19:57 -0700 Subject: [PATCH] Use pg_local to track AMQP 1.0 connections MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Fixes #9371 Since each AMQP 1.0 connection opens several direct AMQP connections, we must assign each direct connection a unique name to prevent multiple entries in the `connection_created_stats` table. Also, use `pg_local` to track AMQP 1.0 connections instead of walking the supervisor trees. Nuke authz_backends from connection created event 💥 Fix regex for connection name because UniqueId is part of it now (channel number) (cherry picked from commit c94d22acebe9d9e3ea9f56b9609427df2406673f) --- deps/amqp_client/src/amqp_connection.erl | 10 +++++-- .../src/amqp_direct_connection.erl | 28 +++++++++++------ .../test/rabbit_core_metrics_gc_SUITE.erl | 7 +++-- .../rabbit_common/src/rabbit_core_metrics.erl | 16 ++++++++-- deps/rabbitmq_amqp1_0/src/rabbit_amqp1_0.erl | 30 +++++++++---------- .../src/rabbit_amqp1_0_reader.erl | 5 +++- .../src/rabbit_amqp1_0_session_sup.erl | 6 ++-- .../test/proxy_protocol_SUITE.erl | 10 ++++--- 8 files changed, 73 insertions(+), 39 deletions(-) diff --git a/deps/amqp_client/src/amqp_connection.erl b/deps/amqp_client/src/amqp_connection.erl index 9f8c6ddb92ed..4681147db15c 100644 --- a/deps/amqp_client/src/amqp_connection.erl +++ b/deps/amqp_client/src/amqp_connection.erl @@ -65,7 +65,8 @@ -export([error_atom/1]). -export([info/2, info_keys/1, info_keys/0]). -export([connection_name/1, update_secret/3]). --export([socket_adapter_info/2]). +-export([socket_adapter_info/2, + socket_adapter_info/3]). -define(DEFAULT_CONSUMER, {amqp_selective_consumer, []}). @@ -379,7 +380,12 @@ info_keys() -> %% @doc Takes a socket and a protocol, returns an #amqp_adapter_info{} %% based on the socket for the protocol given. socket_adapter_info(Sock, Protocol) -> - amqp_direct_connection:socket_adapter_info(Sock, Protocol). + socket_adapter_info(Sock, Protocol, undefined). + +%% @doc Takes a socket and a protocol, returns an #amqp_adapter_info{} +%% based on the socket for the protocol given. +socket_adapter_info(Sock, Protocol, UniqueId) -> + amqp_direct_connection:socket_adapter_info(Sock, Protocol, UniqueId). %% @spec (ConnectionPid) -> ConnectionName %% where diff --git a/deps/amqp_client/src/amqp_direct_connection.erl b/deps/amqp_client/src/amqp_direct_connection.erl index 2b4b637b5e13..269dcd5168ff 100644 --- a/deps/amqp_client/src/amqp_direct_connection.erl +++ b/deps/amqp_client/src/amqp_direct_connection.erl @@ -17,7 +17,8 @@ -export([init/0, terminate/2, connect/4, do/2, open_channel_args/1, i/2, info_keys/0, handle_message/2, closing/3, channels_terminated/1]). --export([socket_adapter_info/2]). +-export([socket_adapter_info/2, + socket_adapter_info/3]). -record(state, {node, user, @@ -176,17 +177,26 @@ ensure_adapter_info(A = #amqp_adapter_info{name = unknown}) -> ensure_adapter_info(Info) -> Info. socket_adapter_info(Sock, Protocol) -> + socket_adapter_info(Sock, Protocol, undefined). + +socket_adapter_info(Sock, Protocol, UniqueId) -> {PeerHost, PeerPort, Host, Port} = - case rabbit_net:socket_ends(Sock, inbound) of - {ok, Res} -> Res; - _ -> {unknown, unknown, unknown, unknown} - end, - Name = case rabbit_net:connection_string(Sock, inbound) of - {ok, Res1} -> Res1; - _Error -> "(unknown)" + case rabbit_net:socket_ends(Sock, inbound) of + {ok, Res} -> Res; + _ -> {unknown, unknown, unknown, unknown} + end, + ConnectionString = case rabbit_net:connection_string(Sock, inbound) of + {ok, Res1} -> Res1; + _Error -> "(unknown)" + end, + Name = case UniqueId of + undefined -> + rabbit_data_coercion:to_binary(ConnectionString); + _ -> + rabbit_data_coercion:to_binary(rabbit_misc:format("~s (~tp)", [ConnectionString, UniqueId])) end, #amqp_adapter_info{protocol = Protocol, - name = list_to_binary(Name), + name = Name, host = Host, port = Port, peer_host = PeerHost, diff --git a/deps/rabbit/test/rabbit_core_metrics_gc_SUITE.erl b/deps/rabbit/test/rabbit_core_metrics_gc_SUITE.erl index 34f42e6465be..586936669e82 100644 --- a/deps/rabbit/test/rabbit_core_metrics_gc_SUITE.erl +++ b/deps/rabbit/test/rabbit_core_metrics_gc_SUITE.erl @@ -127,10 +127,13 @@ connection_metrics(Config) -> DeadPid = rabbit_ct_broker_helpers:rpc(Config, A, ?MODULE, dead_pid, []), + Infos = [{info0, foo}, {info1, bar}, {info2, baz}, + {authz_backends, [rabbit_auth_backend_oauth2,rabbit_auth_backend_http]}], + rabbit_ct_broker_helpers:rpc(Config, A, rabbit_core_metrics, - connection_created, [DeadPid, infos]), + connection_created, [DeadPid, Infos]), rabbit_ct_broker_helpers:rpc(Config, A, rabbit_core_metrics, - connection_stats, [DeadPid, infos]), + connection_stats, [DeadPid, Infos]), rabbit_ct_broker_helpers:rpc(Config, A, rabbit_core_metrics, connection_stats, [DeadPid, 1, 1, 1]), diff --git a/deps/rabbit_common/src/rabbit_core_metrics.erl b/deps/rabbit_common/src/rabbit_core_metrics.erl index 8ce229954bab..0a146c2ac910 100644 --- a/deps/rabbit_common/src/rabbit_core_metrics.erl +++ b/deps/rabbit_common/src/rabbit_core_metrics.erl @@ -120,8 +120,9 @@ terminate() -> || {Table, _Type} <- ?CORE_TABLES ++ ?CORE_EXTRA_TABLES], ok. -connection_created(Pid, Infos) -> - ets:insert(connection_created, {Pid, Infos}), +connection_created(Pid, Infos0) -> + Infos1 = maybe_cleanup_infos(Infos0), + ets:insert(connection_created, {Pid, Infos1}), ets:update_counter(connection_churn_metrics, node(), {2, 1}, ?CONNECTION_CHURN_METRICS), ok. @@ -446,3 +447,14 @@ format_auth_attempt({{RemoteAddress, Username, Protocol}, Total, Succeeded, Fail format_auth_attempt({Protocol, Total, Succeeded, Failed}) -> [{protocol, atom_to_binary(Protocol, utf8)}, {auth_attempts, Total}, {auth_attempts_failed, Failed}, {auth_attempts_succeeded, Succeeded}]. + +maybe_cleanup_infos(Infos0) when is_list(Infos0) -> + %% Note: authz_backends is added in rabbit_amqp1_0_session_sup:adapter_info/3 + %% We delete it here, if present, because it should not be stored in the + %% connection_created table. + %% + %% TODO @ansd this will no longer be necessary once this PR is merged: + %% https://github.com/rabbitmq/rabbitmq-server/pull/9022 + proplists:delete(authz_backends, Infos0); +maybe_cleanup_infos(Infos) -> + Infos. diff --git a/deps/rabbitmq_amqp1_0/src/rabbit_amqp1_0.erl b/deps/rabbitmq_amqp1_0/src/rabbit_amqp1_0.erl index dc8e955c5aaa..72c1e9ac432f 100644 --- a/deps/rabbitmq_amqp1_0/src/rabbit_amqp1_0.erl +++ b/deps/rabbitmq_amqp1_0/src/rabbit_amqp1_0.erl @@ -6,10 +6,11 @@ %% -module(rabbit_amqp1_0). --export([connection_info_local/1, - emit_connection_info_local/3, +-export([emit_connection_info_local/3, emit_connection_info_all/4, - list/0]). + list/0, + register_connection/1, + unregister_connection/1]). emit_connection_info_all(Nodes, Items, Ref, AggregatorPid) -> Pids = [spawn_link(Node, rabbit_amqp1_0, emit_connection_info_local, @@ -26,17 +27,14 @@ emit_connection_info_local(Items, Ref, AggregatorPid) -> end, list()). -connection_info_local(Items) -> - Connections = list(), - [rabbit_amqp1_0_reader:info(Pid, Items) || Pid <- Connections]. - +-spec list() -> [pid()]. list() -> - [ReaderPid - || {_, TcpPid, _, [tcp_listener_sup]} <- supervisor:which_children(rabbit_sup), - {_, RanchEPid, _, [ranch_embedded_sup]} <- supervisor:which_children(TcpPid), - {_, RanchLPid, _, [ranch_listener_sup]} <- supervisor:which_children(RanchEPid), - {_, RanchCSPid, _, [ranch_conns_sup_sup]} <- supervisor:which_children(RanchLPid), - {_, RanchCPid, _, [ranch_conns_sup]} <- supervisor:which_children(RanchCSPid), - {rabbit_connection_sup, ConnPid, _, _} <- supervisor:which_children(RanchCPid), - {reader, ReaderPid, _, _} <- supervisor:which_children(ConnPid) - ]. + pg_local:get_members(rabbit_amqp10_connections). + +-spec register_connection(pid()) -> ok. +register_connection(Pid) -> + pg_local:join(rabbit_amqp10_connections, Pid). + +-spec unregister_connection(pid()) -> ok. +unregister_connection(Pid) -> + pg_local:leave(rabbit_amqp10_connections, Pid). diff --git a/deps/rabbitmq_amqp1_0/src/rabbit_amqp1_0_reader.erl b/deps/rabbitmq_amqp1_0/src/rabbit_amqp1_0_reader.erl index 917f1619e215..721111b0e74f 100644 --- a/deps/rabbitmq_amqp1_0/src/rabbit_amqp1_0_reader.erl +++ b/deps/rabbitmq_amqp1_0/src/rabbit_amqp1_0_reader.erl @@ -238,10 +238,12 @@ update_last_blocked_by(Throttle) -> close_connection(State = #v1{connection = #v1_connection{ timeout_sec = TimeoutSec}}) -> + Pid = self(), erlang:send_after((if TimeoutSec > 0 andalso TimeoutSec < ?CLOSING_TIMEOUT -> TimeoutSec; true -> ?CLOSING_TIMEOUT - end) * 1000, self(), terminate_connection), + end) * 1000, Pid, terminate_connection), + rabbit_amqp1_0:unregister_connection(Pid), State#v1{connection_state = closed}. handle_dependent_exit(ChPid, Reason, State) -> @@ -433,6 +435,7 @@ handle_1_0_connection_frame(#'v1_0.open'{ max_frame_size = ClientFrameMax, container_id = {utf8, rabbit_nodes:cluster_name()}, properties = server_properties()}), Conserve = rabbit_alarm:register(self(), {?MODULE, conserve_resources, []}), + rabbit_amqp1_0:register_connection(self()), control_throttle( State1#v1{throttle = Throttle#throttle{alarmed_by = Conserve}}); diff --git a/deps/rabbitmq_amqp1_0/src/rabbit_amqp1_0_session_sup.erl b/deps/rabbitmq_amqp1_0/src/rabbit_amqp1_0_session_sup.erl index c5fc1f035fcd..8ef40a0f7d79 100644 --- a/deps/rabbitmq_amqp1_0/src/rabbit_amqp1_0_session_sup.erl +++ b/deps/rabbitmq_amqp1_0/src/rabbit_amqp1_0_session_sup.erl @@ -62,7 +62,7 @@ start_link({amqp10_framing, Sock, Channel, FrameMax, ReaderPid, start => {rabbit_amqp1_0_session_process, start_link, [ {Channel, ReaderPid, WriterPid, User, VHost, FrameMax, - adapter_info(User, SocketForAdapterInfo), Collector} + adapter_info(User, SocketForAdapterInfo, Channel), Collector} ]}, restart => transient, significant => true, @@ -98,7 +98,7 @@ init([]) -> %% See rabbit_direct.erl to see how `authz_bakends` is propagated from % amqp_adapter_info.additional_info to the rabbit_access_control module -adapter_info(User, Sock) -> - AdapterInfo = amqp_connection:socket_adapter_info(Sock, {'AMQP', "1.0"}), +adapter_info(User, Sock, UniqueId) -> + AdapterInfo = amqp_connection:socket_adapter_info(Sock, {'AMQP', "1.0"}, UniqueId), AdapterInfo#amqp_adapter_info{additional_info = AdapterInfo#amqp_adapter_info.additional_info ++ [{authz_backends, User#user.authz_backends}]}. diff --git a/deps/rabbitmq_amqp1_0/test/proxy_protocol_SUITE.erl b/deps/rabbitmq_amqp1_0/test/proxy_protocol_SUITE.erl index 489a8f3b02f5..985bc6eb09e9 100644 --- a/deps/rabbitmq_amqp1_0/test/proxy_protocol_SUITE.erl +++ b/deps/rabbitmq_amqp1_0/test/proxy_protocol_SUITE.erl @@ -65,7 +65,7 @@ proxy_protocol_v1(Config) -> {ok, _Packet} = gen_tcp:recv(Socket, 0, ?TIMEOUT), ConnectionName = rabbit_ct_broker_helpers:rpc(Config, 0, ?MODULE, connection_name, []), - match = re:run(ConnectionName, <<"^192.168.1.1:80 -> 192.168.1.2:81$">>, [{capture, none}]), + match = re:run(ConnectionName, <<"^192.168.1.1:80 -> 192.168.1.2:81 \\(\\d\\)">>, [{capture, none}]), gen_tcp:close(Socket), ok. @@ -82,7 +82,7 @@ proxy_protocol_v1_tls(Config) -> timer:sleep(1000), ConnectionName = rabbit_ct_broker_helpers:rpc(Config, 0, ?MODULE, connection_name, []), - match = re:run(ConnectionName, <<"^192.168.1.1:80 -> 192.168.1.2:81$">>, [{capture, none}]), + match = re:run(ConnectionName, <<"^192.168.1.1:80 -> 192.168.1.2:81 \\(\\d\\)$">>, [{capture, none}]), gen_tcp:close(Socket), ok. @@ -100,7 +100,7 @@ proxy_protocol_v2_local(Config) -> {ok, _Packet} = gen_tcp:recv(Socket, 0, ?TIMEOUT), ConnectionName = rabbit_ct_broker_helpers:rpc(Config, 0, ?MODULE, connection_name, []), - match = re:run(ConnectionName, <<"^127.0.0.1:\\d+ -> 127.0.0.1:\\d+$">>, [{capture, none}]), + match = re:run(ConnectionName, <<"^127.0.0.1:\\d+ -> 127.0.0.1:\\d+ \\(\\d\\)$">>, [{capture, none}]), gen_tcp:close(Socket), ok. @@ -144,7 +144,9 @@ connection_name() -> end. connection_registered() -> - length(ets:tab2list(connection_created)) > 0. + I = ets:info(connection_created), + Size = proplists:get_value(size, I), + Size > 0. retry(_Function, 0) -> false;