From 155352289583957edbd52155f84902c1ddcbdd2f Mon Sep 17 00:00:00 2001 From: David Ansari Date: Tue, 14 Jan 2025 14:11:56 +0100 Subject: [PATCH] Support AMQP over WebSocket (OSS part) --- README.md | 7 +- deps/rabbit/include/rabbit_amqp.hrl | 8 - deps/rabbit/include/rabbit_amqp_metrics.hrl | 11 + deps/rabbit/include/rabbit_amqp_reader.hrl | 63 ++++ deps/rabbit/src/rabbit_amqp_reader.erl | 229 ++++++-------- deps/rabbit/src/rabbit_amqp_reader.hrl | 17 -- deps/rabbit/src/rabbit_amqp_writer.erl | 35 +-- deps/rabbit/src/rabbit_core_ff.erl | 7 + deps/rabbit/src/rabbit_reader.erl | 39 ++- deps/rabbit/test/amqp_address_SUITE.erl | 11 +- deps/rabbit/test/amqp_auth_SUITE.erl | 29 +- deps/rabbit/test/amqp_client_SUITE.erl | 91 ++++-- deps/rabbit/test/amqp_filtex_SUITE.erl | 8 +- .../rabbit/test/amqp_proxy_protocol_SUITE.erl | 4 +- deps/rabbit/test/amqp_utils.erl | 21 +- deps/rabbit/test/classic_queue_SUITE.erl | 8 +- deps/rabbit_common/mk/rabbitmq-run.mk | 3 + deps/rabbit_common/src/credit_flow.erl | 33 +- deps/rabbit_common/src/rabbit_net.erl | 2 +- .../lib/rabbitmq/cli/core/listeners.ex | 10 +- .../rabbitmq_cli/test/core/listeners_test.exs | 4 +- deps/rabbitmq_ct_client_helpers/Makefile | 2 +- .../src/rfc6455_client.erl | 15 +- .../src/rabbit_ct_broker_helpers.erl | 9 + .../priv/www/js/dispatcher.js | 3 +- .../priv/www/js/tmpl/connection.ejs | 3 +- .../rabbit_mgmt_wm_connection_sessions.erl | 11 +- .../test/rabbit_mgmt_http_SUITE.erl | 4 +- .../priv/bunny.html | 2 +- .../rabbitmq_web_mqtt_examples/priv/echo.html | 2 +- .../test/src/rfc6455_client.erl | 287 ------------------ .../priv/bunny.html | 2 +- .../priv/echo.html | 2 +- .../priv/temp-queue.html | 2 +- 34 files changed, 388 insertions(+), 596 deletions(-) create mode 100644 deps/rabbit/include/rabbit_amqp_metrics.hrl create mode 100644 deps/rabbit/include/rabbit_amqp_reader.hrl delete mode 100644 deps/rabbit/src/rabbit_amqp_reader.hrl rename deps/{rabbitmq_web_mqtt/test => rabbitmq_ct_client_helpers}/src/rfc6455_client.erl (97%) delete mode 100644 deps/rabbitmq_web_stomp/test/src/rfc6455_client.erl diff --git a/README.md b/README.md index 32dfe0a3ab48..1ae2d01dddd6 100644 --- a/README.md +++ b/README.md @@ -5,13 +5,14 @@ [RabbitMQ](https://rabbitmq.com) is a [feature rich](https://www.rabbitmq.com/docs), multi-protocol messaging and streaming broker. It supports: - * AMQP 0-9-1 * AMQP 1.0 + * AMQP 0-9-1 * [RabbitMQ Stream Protocol](https://www.rabbitmq.com/docs/streams) * MQTT 3.1, 3.1.1, and 5.0 * STOMP 1.0 through 1.2 - * [MQTT over WebSockets](https://www.rabbitmq.com/docs/web-mqtt) - * [STOMP over WebSockets](https://www.rabbitmq.com/docs/web-stomp) + * [MQTT over WebSocket](https://www.rabbitmq.com/docs/web-mqtt) + * [STOMP over WebSocket](https://www.rabbitmq.com/docs/web-stomp) + * AMQP 1.0 over WebSocket (supported in [VMware Tanzu RabbitMQ](https://www.vmware.com/products/app-platform/tanzu-rabbitmq)) ## Installation diff --git a/deps/rabbit/include/rabbit_amqp.hrl b/deps/rabbit/include/rabbit_amqp.hrl index 185e80fe0c64..44e7d1522b57 100644 --- a/deps/rabbit/include/rabbit_amqp.hrl +++ b/deps/rabbit/include/rabbit_amqp.hrl @@ -43,14 +43,6 @@ node ] ++ ?AUTH_EVENT_KEYS). --define(INFO_ITEMS, - [connection_state, - recv_oct, - recv_cnt, - send_oct, - send_cnt - ] ++ ?ITEMS). - %% for rabbit_event connection_created -define(CONNECTION_EVENT_KEYS, [type, diff --git a/deps/rabbit/include/rabbit_amqp_metrics.hrl b/deps/rabbit/include/rabbit_amqp_metrics.hrl new file mode 100644 index 000000000000..c7e18453c8c3 --- /dev/null +++ b/deps/rabbit/include/rabbit_amqp_metrics.hrl @@ -0,0 +1,11 @@ +-define(SIMPLE_METRICS, [pid, + recv_oct, + send_oct, + reductions]). + +-define(OTHER_METRICS, [recv_cnt, + send_cnt, + send_pend, + state, + channels, + garbage_collection]). diff --git a/deps/rabbit/include/rabbit_amqp_reader.hrl b/deps/rabbit/include/rabbit_amqp_reader.hrl new file mode 100644 index 000000000000..0077a9c9c2be --- /dev/null +++ b/deps/rabbit/include/rabbit_amqp_reader.hrl @@ -0,0 +1,63 @@ +%% same values as in rabbit_reader +-define(NORMAL_TIMEOUT, 3_000). +-define(CLOSING_TIMEOUT, 30_000). +-define(SILENT_CLOSE_DELAY, 3_000). + +%% Allow for potentially large sets of tokens during the SASL exchange. +%% https://docs.oasis-open.org/amqp/amqp-cbs/v1.0/csd01/amqp-cbs-v1.0-csd01.html#_Toc67999915 +-define(INITIAL_MAX_FRAME_SIZE, 8192). + +-type protocol() :: amqp | sasl. +-type channel_number() :: non_neg_integer(). +-type callback() :: handshake | + {frame_header, protocol()} | + {frame_body, protocol(), DataOffset :: pos_integer(), channel_number()}. + +-record(v1_connection, + {name :: binary(), + container_id = none :: none | binary(), + vhost = none :: none | rabbit_types:vhost(), + %% server host + host :: inet:ip_address() | inet:hostname(), + %% client host + peer_host :: inet:ip_address() | inet:hostname(), + %% server port + port :: inet:port_number(), + %% client port + peer_port :: inet:port_number(), + connected_at :: integer(), + user = unauthenticated :: unauthenticated | rabbit_types:user(), + timeout = ?NORMAL_TIMEOUT :: non_neg_integer(), + incoming_max_frame_size = ?INITIAL_MAX_FRAME_SIZE :: pos_integer(), + outgoing_max_frame_size = ?INITIAL_MAX_FRAME_SIZE :: unlimited | pos_integer(), + %% "Prior to any explicit negotiation, [...] the maximum channel number is 0." [2.4.1] + channel_max = 0 :: non_neg_integer(), + auth_mechanism = sasl_init_unprocessed :: sasl_init_unprocessed | {binary(), module()}, + auth_state = unauthenticated :: term(), + credential_timer :: undefined | reference(), + properties :: undefined | {map, list(tuple())} + }). + +-record(v1, + {parent :: pid(), + helper_sup :: pid(), + writer = none :: none | pid(), + heartbeater = none :: none | rabbit_heartbeat:heartbeaters(), + session_sup = none :: none | pid(), + websocket :: boolean(), + sock :: none | rabbit_net:socket(), + proxy_socket :: undefined | {rabbit_proxy_socket, any(), any()}, + connection :: none | #v1_connection{}, + connection_state :: waiting_amqp3100 | received_amqp3100 | waiting_sasl_init | + securing | waiting_amqp0100 | waiting_open | running | + closing | closed, + callback :: callback(), + recv_len = 8 :: non_neg_integer(), + pending_recv :: boolean(), + buf :: list(), + buf_len :: non_neg_integer(), + tracked_channels = maps:new() :: #{channel_number() => Session :: pid()}, + stats_timer :: rabbit_event:state() + }). + +-type state() :: #v1{}. diff --git a/deps/rabbit/src/rabbit_amqp_reader.erl b/deps/rabbit/src/rabbit_amqp_reader.erl index 965543488c12..423aa84ed829 100644 --- a/deps/rabbit/src/rabbit_amqp_reader.erl +++ b/deps/rabbit/src/rabbit_amqp_reader.erl @@ -10,8 +10,9 @@ -include_lib("kernel/include/logger.hrl"). -include_lib("rabbit_common/include/rabbit.hrl"). -include_lib("amqp10_common/include/amqp10_types.hrl"). --include("rabbit_amqp_reader.hrl"). -include("rabbit_amqp.hrl"). +-include("rabbit_amqp_metrics.hrl"). +-include("rabbit_amqp_reader.hrl"). -export([init/1, info/2, @@ -22,110 +23,38 @@ system_terminate/4, system_code_change/4]). --import(rabbit_amqp_util, [protocol_error/3]). +-export([advertise_sasl_mechanism/1, + handle_input/2, + handle_other/2, + ensure_stats_timer/1]). -%% same values as in rabbit_reader --define(NORMAL_TIMEOUT, 3_000). --define(CLOSING_TIMEOUT, 30_000). --define(SILENT_CLOSE_DELAY, 3_000). - -%% Allow for potentially large sets of tokens during the SASL exchange. -%% https://docs.oasis-open.org/amqp/amqp-cbs/v1.0/csd01/amqp-cbs-v1.0-csd01.html#_Toc67999915 --define(INITIAL_MAX_FRAME_SIZE, 8192). - --type protocol() :: amqp | sasl. --type channel_number() :: non_neg_integer(). - --record(v1_connection, - {name :: binary(), - container_id :: none | binary(), - vhost :: none | rabbit_types:vhost(), - %% server host - host :: inet:ip_address() | inet:hostname(), - %% client host - peer_host :: inet:ip_address() | inet:hostname(), - %% server port - port :: inet:port_number(), - %% client port - peer_port :: inet:port_number(), - connected_at :: integer(), - user :: unauthenticated | rabbit_types:user(), - timeout :: non_neg_integer(), - incoming_max_frame_size :: pos_integer(), - outgoing_max_frame_size :: unlimited | pos_integer(), - channel_max :: non_neg_integer(), - auth_mechanism :: sasl_init_unprocessed | {binary(), module()}, - auth_state :: term(), - credential_timer :: undefined | reference(), - properties :: undefined | {map, list(tuple())} - }). - --record(v1, - { - parent :: pid(), - helper_sup :: pid(), - writer :: none | pid(), - heartbeater :: none | rabbit_heartbeat:heartbeaters(), - session_sup :: rabbit_types:option(pid()), - sock :: rabbit_net:socket(), - proxy_socket :: undefined | {rabbit_proxy_socket, any(), any()}, - connection :: #v1_connection{}, - connection_state :: received_amqp3100 | waiting_sasl_init | securing | - waiting_amqp0100 | waiting_open | running | - closing | closed, - callback :: handshake | - {frame_header, protocol()} | - {frame_body, protocol(), DataOffset :: pos_integer(), channel_number()}, - recv_len :: non_neg_integer(), - pending_recv :: boolean(), - buf :: list(), - buf_len :: non_neg_integer(), - tracked_channels :: #{channel_number() => Session :: pid()}, - stats_timer :: rabbit_event:state() - }). - --type state() :: #v1{}. +-import(rabbit_amqp_util, [protocol_error/3]). -define(IS_RUNNING(State), State#v1.connection_state =:= running). -%%-------------------------------------------------------------------------- - unpack_from_0_9_1( {Sock, PendingRecv, SupPid, Buf, BufLen, ProxySocket, ConnectionName, Host, PeerHost, Port, PeerPort, ConnectedAt, StatsTimer}, Parent) -> logger:update_process_metadata(#{connection => ConnectionName}), #v1{parent = Parent, + websocket = false, sock = Sock, callback = {frame_header, sasl}, - recv_len = 8, pending_recv = PendingRecv, - heartbeater = none, helper_sup = SupPid, buf = Buf, buf_len = BufLen, proxy_socket = ProxySocket, - tracked_channels = maps:new(), - writer = none, connection_state = received_amqp3100, stats_timer = StatsTimer, connection = #v1_connection{ name = ConnectionName, - container_id = none, - vhost = none, host = Host, peer_host = PeerHost, port = Port, peer_port = PeerPort, - connected_at = ConnectedAt, - user = unauthenticated, - timeout = ?NORMAL_TIMEOUT, - incoming_max_frame_size = ?INITIAL_MAX_FRAME_SIZE, - outgoing_max_frame_size = ?INITIAL_MAX_FRAME_SIZE, - %% "Prior to any explicit negotiation, [...] the maximum channel number is 0." [2.4.1] - channel_max = 0, - auth_mechanism = sasl_init_unprocessed, - auth_state = unauthenticated}}. + connected_at = ConnectedAt}}. -spec system_continue(pid(), [sys:dbg_opt()], state()) -> no_return() | ok. system_continue(Parent, Deb, State) -> @@ -152,8 +81,6 @@ set_credential(Pid, Credential) -> %%-------------------------------------------------------------------------- -inet_op(F) -> rabbit_misc:throw_on_error(inet_error, F). - recvloop(Deb, State = #v1{pending_recv = true}) -> mainloop(Deb, State); recvloop(Deb, State = #v1{sock = Sock, @@ -166,8 +93,7 @@ recvloop(Deb, State = #v1{sock = Sock, {error, Reason} -> throw({inet_error, Reason}) end; -recvloop(Deb, State0 = #v1{callback = Callback, - recv_len = RecvLen, +recvloop(Deb, State0 = #v1{recv_len = RecvLen, buf = Buf, buf_len = BufLen}) -> Bin = case Buf of @@ -177,7 +103,7 @@ recvloop(Deb, State0 = #v1{callback = Callback, {Data, Rest} = split_binary(Bin, RecvLen), State1 = State0#v1{buf = [Rest], buf_len = BufLen - RecvLen}, - State = handle_input(Callback, Data, State1), + State = handle_input(Data, State1), recvloop(Deb, State). -spec mainloop([sys:dbg_opt()], state()) -> @@ -204,6 +130,7 @@ mainloop(Deb, State = #v1{sock = Sock, buf = Buf, buf_len = BufLen}) -> end end. +-spec handle_other(any(), state()) -> state() | stop. handle_other(emit_stats, State) -> emit_stats(State); handle_other(ensure_stats_timer, State) -> @@ -236,7 +163,7 @@ handle_other(heartbeat_timeout, State) -> Error = error_frame(?V_1_0_AMQP_ERROR_RESOURCE_LIMIT_EXCEEDED, "no frame received from client within idle timeout threshold", []), handle_exception(State, 0, Error); -handle_other({'$gen_call', From, {shutdown, Explanation}}, +handle_other({rabbit_call, From, {shutdown, Explanation}}, State = #v1{connection = #v1_connection{properties = Properties}}) -> Ret = case Explanation =:= "Node was put into maintenance mode" andalso ignore_maintenance(Properties) of @@ -245,7 +172,7 @@ handle_other({'$gen_call', From, {shutdown, Explanation}}, end, gen_server:reply(From, ok), Ret; -handle_other({'$gen_call', From, {info, Items}}, State) -> +handle_other({rabbit_call, From, {info, Items}}, State) -> Reply = try infos(Items, State) of Infos -> {ok, Infos} @@ -254,6 +181,9 @@ handle_other({'$gen_call', From, {info, Items}}, State) -> end, gen_server:reply(From, Reply), State; +handle_other({'$gen_call', From, Req}, State) -> + %% Delete this function clause when feature flag 'rabbitmq_4.1.0' becomes required. + handle_other({rabbit_call, From, Req}, State); handle_other({'$gen_cast', {force_event_refresh, Ref}}, State) -> case ?IS_RUNNING(State) of true -> @@ -290,8 +220,7 @@ terminate(_, _) -> %%-------------------------------------------------------------------------- %% error handling / termination -close(Error, State = #v1{sock = Sock, - connection = #v1_connection{timeout = Timeout}}) -> +close(Error, State = #v1{connection = #v1_connection{timeout = Timeout}}) -> %% Client properties will be emitted in the connection_closed event by rabbit_reader. ClientProperties = i(client_properties, State), put(client_properties, ClientProperties), @@ -301,7 +230,7 @@ close(Error, State = #v1{sock = Sock, false -> ?CLOSING_TIMEOUT end, _TRef = erlang:send_after(Time, self(), terminate_connection), - ok = send_on_channel0(Sock, #'v1_0.close'{error = Error}), + ok = send_on_channel0(State, #'v1_0.close'{error = Error}, amqp10_framing), State#v1{connection_state = closed}. handle_session_exit(ChannelNum, SessionPid, Reason, State0) -> @@ -491,12 +420,9 @@ handle_connection_frame( end, {ok, ReceiveTimeoutSec} = application:get_env(rabbit, heartbeat), ReceiveTimeoutMillis = ReceiveTimeoutSec * 1000, - SendFun = fun() -> - Frame = amqp10_binary_generator:build_heartbeat_frame(), - catch rabbit_net:send(Sock, Frame) - end, - Parent = self(), - ReceiveFun = fun() -> Parent ! heartbeat_timeout end, + Reader = self(), + ReceiveFun = fun() -> Reader ! heartbeat_timeout end, + SendFun = heartbeat_send_fun(Reader, State0), %% TODO: only start heartbeat receive timer at next next frame Heartbeater = rabbit_heartbeat:start( HelperSupPid, Sock, ConnectionName, @@ -556,16 +482,21 @@ handle_connection_frame( container_id = {utf8, rabbit_nodes:cluster_name()}, offered_capabilities = rabbit_amqp_util:capabilities(Caps), properties = server_properties()}, - ok = send_on_channel0(Sock, Open), + ok = send_on_channel0(State, Open, amqp10_framing), State; handle_connection_frame(#'v1_0.close'{}, State0) -> State = State0#v1{connection_state = closing}, close(undefined, State). start_writer(#v1{helper_sup = SupPid, + websocket = WebSocket, sock = Sock} = State) -> + Socket = case WebSocket of + true -> websocket; + false -> Sock + end, ChildSpec = #{id => writer, - start => {rabbit_amqp_writer, start_link, [Sock, self()]}, + start => {rabbit_amqp_writer, start_link, [Socket, self()]}, restart => transient, significant => true, shutdown => ?WORKER_WAIT, @@ -620,15 +551,15 @@ handle_sasl_frame(#'v1_0.sasl_response'{response = {binary, Response}}, handle_sasl_frame(Performative, State) -> throw({unexpected_1_0_sasl_frame, Performative, State}). -handle_input(handshake, - <<"AMQP",0,1,0,0>>, - #v1{connection_state = waiting_amqp0100, - sock = Sock, +-spec handle_input(binary(), state()) -> state(). +handle_input(Handshake = <<"AMQP",0,1,0,0>>, + #v1{callback = handshake, + connection_state = waiting_amqp0100, connection = #v1_connection{user = #user{}}, helper_sup = HelperSup } = State0) -> %% At this point, client already got successfully authenticated by SASL. - send_handshake(Sock, <<"AMQP",0,1,0,0>>), + send(State0, Handshake), ChildSpec = #{id => session_sup, start => {rabbit_amqp_session_sup, start_link, [self()]}, restart => transient, @@ -643,9 +574,9 @@ handle_input(handshake, %% sending any other frames." [2.4.1] connection_state = waiting_open}, switch_callback(State, {frame_header, amqp}, 8); -handle_input({frame_header, Mode}, - Header = <>, - State0) when DOff >= 2 -> +handle_input(Header = <>, + State0 = #v1{callback = {frame_header, Mode}}) + when DOff >= 2 -> case {Mode, Type} of {amqp, 0} -> ok; {sasl, 1} -> ok; @@ -665,19 +596,16 @@ handle_input({frame_header, Mode}, switch_callback(State0, {frame_body, Mode, DOff, Channel}, Size - 8) end, ensure_stats_timer(State); -handle_input({frame_header, _Mode}, Malformed, _State) -> +handle_input(Malformed, #v1{callback = {frame_header, _Mode}}) -> throw({bad_1_0_header, Malformed}); -handle_input({frame_body, Mode, DOff, Channel}, - FrameBin, - State) -> +handle_input(FrameBin, State0 = #v1{callback = {frame_body, Mode, DOff, Channel}}) -> %% Figure 2.16 %% DOff = 4-byte words minus 8 bytes we've already read ExtendedHeaderSize = (DOff * 32 - 64), <<_IgnoreExtendedHeader:ExtendedHeaderSize, FrameBody/binary>> = FrameBin, - handle_frame(Mode, Channel, FrameBody, - switch_callback(State, {frame_header, Mode}, 8)); - -handle_input(Callback, Data, _State) -> + State = switch_callback(State0, {frame_header, Mode}, 8), + handle_frame(Mode, Channel, FrameBody, State); +handle_input(Data, #v1{callback = Callback}) -> throw({bad_input, Callback, Data}). -spec init(tuple()) -> no_return(). @@ -689,26 +617,42 @@ init(PackedState) -> %% By invoking recvloop here we become 1.0. recvloop(sys:debug_options([]), State). +-spec advertise_sasl_mechanism(state()) -> state(). advertise_sasl_mechanism(State0 = #v1{connection_state = received_amqp3100, sock = Sock}) -> - send_handshake(Sock, <<"AMQP",3,1,0,0>>), + send(State0, <<"AMQP",3,1,0,0>>), Ms0 = [{symbol, atom_to_binary(M)} || M <- auth_mechanisms(Sock)], Ms1 = {array, symbol, Ms0}, Ms = #'v1_0.sasl_mechanisms'{sasl_server_mechanisms = Ms1}, - ok = send_on_channel0(Sock, Ms, rabbit_amqp_sasl), + ok = send_on_channel0(State0, Ms, rabbit_amqp_sasl), State = State0#v1{connection_state = waiting_sasl_init}, switch_callback(State, {frame_header, sasl}, 8). -send_handshake(Sock, Handshake) -> - ok = inet_op(fun () -> rabbit_net:send(Sock, Handshake) end). - -send_on_channel0(Sock, Method) -> - send_on_channel0(Sock, Method, amqp10_framing). - -send_on_channel0(Sock, Method, Framing) -> - ok = rabbit_amqp_writer:internal_send_command(Sock, Method, Framing). - -%% End 1-0 +send_on_channel0(State, Performative, Framing) -> + Data = rabbit_amqp_writer:assemble_frame(0, Performative, Framing), + send(State, Data). + +send(#v1{websocket = true}, Data) -> + self() ! {send_ws, self(), Data}, + ok; +send(#v1{websocket = false, + sock = Sock}, Data) -> + rabbit_misc:throw_on_error( + inet_error, + fun() -> rabbit_net:send(Sock, Data) end). + +heartbeat_send_fun(Reader, #v1{websocket = true}) -> + fun() -> + Frame = amqp10_binary_generator:build_heartbeat_frame(), + Reader ! {send_ws, self(), Frame}, + ok + end; +heartbeat_send_fun(_, #v1{websocket = false, + sock = Sock}) -> + fun() -> + Frame = amqp10_binary_generator:build_heartbeat_frame(), + catch rabbit_net:send(Sock, Frame) + end. auth_mechanism_to_module(TypeBin, Sock) -> case rabbit_registry:binary_to_type(TypeBin) of @@ -742,8 +686,7 @@ auth_mechanisms(Sock) -> auth_phase( Response, - State = #v1{sock = Sock, - connection = Conn = #v1_connection{auth_mechanism = {Name, AuthMechanism}, + State = #v1{connection = Conn = #v1_connection{auth_mechanism = {Name, AuthMechanism}, auth_state = AuthState}}) -> case AuthMechanism:handle_response(Response, AuthState) of {refused, Username, Msg, Args} -> @@ -753,7 +696,7 @@ auth_phase( auth_fail(Username, State), silent_close_delay(), Outcome = #'v1_0.sasl_outcome'{code = ?V_1_0_SASL_CODE_AUTH}, - ok = send_on_channel0(Sock, Outcome, rabbit_amqp_sasl), + ok = send_on_channel0(State, Outcome, rabbit_amqp_sasl), protocol_error( ?V_1_0_AMQP_ERROR_UNAUTHORIZED_ACCESS, "~ts login refused: ~ts", [Name, io_lib:format(Msg, Args)]); @@ -762,12 +705,12 @@ auth_phase( protocol_error(?V_1_0_AMQP_ERROR_DECODE_ERROR, Msg, Args); {challenge, Challenge, AuthState1} -> Challenge = #'v1_0.sasl_challenge'{challenge = {binary, Challenge}}, - ok = send_on_channel0(Sock, Challenge, rabbit_amqp_sasl), + ok = send_on_channel0(State, Challenge, rabbit_amqp_sasl), State1 = State#v1{connection = Conn#v1_connection{auth_state = AuthState1}}, switch_callback(State1, {frame_header, sasl}, 8); {ok, User} -> Outcome = #'v1_0.sasl_outcome'{code = ?V_1_0_SASL_CODE_OK}, - ok = send_on_channel0(Sock, Outcome, rabbit_amqp_sasl), + ok = send_on_channel0(State, Outcome, rabbit_amqp_sasl), State1 = State#v1{connection_state = waiting_amqp0100, connection = Conn#v1_connection{user = User, auth_state = authenticated}}, @@ -967,17 +910,11 @@ silent_close_delay() -> -spec info(rabbit_types:connection(), rabbit_types:info_keys()) -> rabbit_types:infos(). info(Pid, InfoItems) -> - KnownItems = [session_pids | ?INFO_ITEMS], - case InfoItems -- KnownItems of - [] -> - case gen_server:call(Pid, {info, InfoItems}, infinity) of - {ok, InfoList} -> - InfoList; - {error, Error} -> - throw(Error) - end; - UnknownItems -> - throw({bad_argument, UnknownItems}) + case gen_server:call(Pid, {info, InfoItems}, infinity) of + {ok, InfoList} -> + InfoList; + {error, Reason} -> + throw(Reason) end. infos(Items, State) -> @@ -987,8 +924,12 @@ i(pid, #v1{}) -> self(); i(type, #v1{}) -> network; -i(protocol, #v1{}) -> - {1, 0}; +i(protocol, #v1{websocket = WebSocket}) -> + Vsn = {1, 0}, + case WebSocket of + true -> {'Web AMQP', Vsn}; + false -> Vsn + end; i(connection, #v1{connection = Val}) -> Val; i(node, #v1{}) -> diff --git a/deps/rabbit/src/rabbit_amqp_reader.hrl b/deps/rabbit/src/rabbit_amqp_reader.hrl deleted file mode 100644 index 4f29639c23d2..000000000000 --- a/deps/rabbit/src/rabbit_amqp_reader.hrl +++ /dev/null @@ -1,17 +0,0 @@ -%% This Source Code Form is subject to the terms of the Mozilla Public -%% License, v. 2.0. If a copy of the MPL was not distributed with this -%% file, You can obtain one at https://mozilla.org/MPL/2.0/. -%% -%% Copyright (c) 2007-2025 Broadcom. All Rights Reserved. The term “Broadcom” refers to Broadcom Inc. and/or its subsidiaries. All rights reserved. - --define(SIMPLE_METRICS, [pid, - recv_oct, - send_oct, - reductions]). - --define(OTHER_METRICS, [recv_cnt, - send_cnt, - send_pend, - state, - channels, - garbage_collection]). diff --git a/deps/rabbit/src/rabbit_amqp_writer.erl b/deps/rabbit/src/rabbit_amqp_writer.erl index 80e5859f7441..45e7a1919034 100644 --- a/deps/rabbit/src/rabbit_amqp_writer.erl +++ b/deps/rabbit/src/rabbit_amqp_writer.erl @@ -16,7 +16,7 @@ send_command/4, send_command_sync/3, send_command_and_notify/5, - internal_send_command/3]). + assemble_frame/3]). %% gen_server callbacks -export([init/1, @@ -26,7 +26,7 @@ format_status/1]). -record(state, { - sock :: rabbit_net:socket(), + sock :: rabbit_net:socket() | websocket, reader :: rabbit_types:connection(), pending :: iolist(), %% This field is just an optimisation to minimize the cost of erlang:iolist_size/1 @@ -85,13 +85,6 @@ send_command_and_notify(Writer, QueuePid, ChannelNum, Performative, Payload) -> Request = {send_command_and_notify, QueuePid, self(), ChannelNum, Performative, Payload}, maybe_send(Writer, Request). --spec internal_send_command(rabbit_net:socket(), - performative(), - amqp10_framing | rabbit_amqp_sasl) -> ok. -internal_send_command(Sock, Performative, Protocol) -> - Data = assemble_frame(0, Performative, Protocol), - ok = tcp_send(Sock, Data). - %%%%%%%%%%%%%%%%%%%%%%%%%%%% %%% gen_server callbacks %%% %%%%%%%%%%%%%%%%%%%%%%%%%%%% @@ -125,13 +118,16 @@ handle_call({send_command, ChannelNum, Performative}, _From, State0) -> State = flush(State1), {reply, ok, State}. +handle_info(timeout, State0) -> + State = flush(State0), + {noreply, State}; +handle_info({bump_credit, Msg}, State) -> + credit_flow:handle_bump_msg(Msg), + no_reply(State); handle_info(emit_stats, State0 = #state{reader = ReaderPid}) -> ReaderPid ! ensure_stats_timer, State = rabbit_event:reset_stats_timer(State0, #state.stats_timer), no_reply(State); -handle_info(timeout, State0) -> - State = flush(State0), - {noreply, State}; handle_info({{'DOWN', session}, _MRef, process, SessionPid, _Reason}, State0 = #state{monitored_sessions = Sessions}) -> credit_flow:peer_down(SessionPid), @@ -200,6 +196,9 @@ internal_send_command_async(Channel, Performative, Payload, assemble_frame(Channel, Performative) -> assemble_frame(Channel, Performative, amqp10_framing). +-spec assemble_frame(rabbit_types:channel_number(), + performative(), + amqp10_framing | rabbit_amqp_sasl) -> iolist(). assemble_frame(Channel, Performative, amqp10_framing) -> ?TRACE("channel ~b <-~n ~tp", [Channel, amqp10_framing:pprint(Performative)]), @@ -217,11 +216,6 @@ assemble_frame_with_payload(Channel, Performative, Payload) -> PerfIoData = amqp10_framing:encode_bin(Performative), amqp10_binary_generator:build_frame(Channel, [PerfIoData, Payload]). -tcp_send(Sock, Data) -> - rabbit_misc:throw_on_error( - inet_error, - fun() -> rabbit_net:send(Sock, Data) end). - %% Flush when more than 2.5 * 1460 bytes (TCP over Ethernet MSS) = 3650 bytes of data %% has accumulated. The idea is to get the TCP data sections full (i.e. fill 1460 bytes) %% as often as possible to reduce the overhead of TCP/IP headers. @@ -235,6 +229,13 @@ maybe_flush(State = #state{pending_size = PendingSize}) -> flush(State = #state{pending = []}) -> State; +flush(State = #state{sock = websocket, + reader = Reader, + pending = Pending}) -> + credit_flow:send(Reader), + Reader ! {send_ws, self(), lists:reverse(Pending)}, + State#state{pending = [], + pending_size = 0}; flush(State0 = #state{sock = Sock, pending = Pending}) -> case rabbit_net:send(Sock, lists:reverse(Pending)) of diff --git a/deps/rabbit/src/rabbit_core_ff.erl b/deps/rabbit/src/rabbit_core_ff.erl index eed253e8c9e9..e8817e1751ac 100644 --- a/deps/rabbit/src/rabbit_core_ff.erl +++ b/deps/rabbit/src/rabbit_core_ff.erl @@ -205,3 +205,10 @@ stability => stable, depends_on => [message_containers] }}). + +-rabbit_feature_flag( + {'rabbitmq_4.1.0', + #{desc => "Allows rolling upgrades to 4.1.x", + stability => stable, + depends_on => ['rabbitmq_4.0.0'] + }}). diff --git a/deps/rabbit/src/rabbit_reader.erl b/deps/rabbit/src/rabbit_reader.erl index 6e4ba7f95017..498e333bc8c0 100644 --- a/deps/rabbit/src/rabbit_reader.erl +++ b/deps/rabbit/src/rabbit_reader.erl @@ -42,7 +42,7 @@ -include_lib("rabbit_common/include/rabbit_framing.hrl"). -include_lib("rabbit_common/include/rabbit.hrl"). --include("rabbit_amqp_reader.hrl"). +-include("rabbit_amqp_metrics.hrl"). -export([start_link/2, info/2, force_event_refresh/2, shutdown/2]). @@ -146,10 +146,9 @@ start_link(HelperSups, Ref) -> Pid = proc_lib:spawn_link(?MODULE, init, [self(), HelperSups, Ref]), {ok, Pid}. --spec shutdown(pid(), string()) -> 'ok'. - +-spec shutdown(pid(), string()) -> ok. shutdown(Pid, Explanation) -> - gen_server:call(Pid, {shutdown, Explanation}, infinity). + gen_call(Pid, {shutdown, Explanation}, infinity). -spec init(pid(), {pid(), pid()}, ranch:ref()) -> no_return(). @@ -176,11 +175,10 @@ system_code_change(Misc, _Module, _OldVsn, _Extra) -> {ok, Misc}. -spec info(pid(), rabbit_types:info_keys()) -> rabbit_types:infos(). - info(Pid, Items) -> - case gen_server:call(Pid, {info, Items}, infinity) of - {ok, Res} -> Res; - {error, Error} -> throw(Error) + case gen_call(Pid, {info, Items}, infinity) of + {ok, InfoList} -> InfoList; + {error, Reason} -> throw(Reason) end. -spec force_event_refresh(pid(), reference()) -> 'ok'. @@ -296,7 +294,7 @@ start_connection(Parent, HelperSups, RanchRef, Deb, Sock) -> {PeerHost, PeerPort, Host, Port} = socket_op(Sock, fun (S) -> rabbit_net:socket_ends(S, inbound) end), ?store_proc_name(Name), - ConnectedAt = os:system_time(milli_seconds), + ConnectedAt = os:system_time(millisecond), State = #v1{parent = Parent, ranch_ref = RanchRef, sock = RealSocket, @@ -604,18 +602,21 @@ handle_other(heartbeat_timeout, State = #v1{connection = #connection{timeout_sec = T}}) -> maybe_emit_stats(State), throw({heartbeat_timeout, T}); -handle_other({'$gen_call', From, {shutdown, Explanation}}, State) -> +handle_other({rabbit_call, From, {shutdown, Explanation}}, State) -> {ForceTermination, NewState} = terminate(Explanation, State), gen_server:reply(From, ok), case ForceTermination of force -> stop; normal -> NewState end; -handle_other({'$gen_call', From, {info, Items}}, State) -> +handle_other({rabbit_call, From, {info, Items}}, State) -> gen_server:reply(From, try {ok, infos(Items, State)} catch Error -> {error, Error} end), State; +handle_other({'$gen_call', From, Req}, State) -> + %% Delete this function clause when feature flag 'rabbitmq_4.1.0' becomes required. + handle_other({rabbit_call, From, Req}, State); handle_other({'$gen_cast', {force_event_refresh, Ref}}, State) when ?IS_RUNNING(State) -> rabbit_event:notify( @@ -1842,3 +1843,19 @@ connection_duration(ConnectedAt) -> true -> io_lib:format("~Bms", [DurationMillis]) end. + +gen_call(Pid, Req, Timeout) -> + case rabbit_feature_flags:is_enabled('rabbitmq_4.1.0') of + true -> + %% We use gen:call/4 with label rabbit_call instead of gen_server:call/3 with label '$gen_call' + %% because cowboy_websocket does not let rabbit_web_amqp_handler handle '$gen_call' messages: + %% https://github.com/ninenines/cowboy/blob/2.12.0/src/cowboy_websocket.erl#L427-L430 + case catch gen:call(Pid, rabbit_call, Req, Timeout) of + {ok, Res} -> + Res; + {'EXIT', Reason} -> + exit({Reason, {?MODULE, ?FUNCTION_NAME, [Pid, Req, Timeout]}}) + end; + false -> + gen_server:call(Pid, Req, Timeout) + end. diff --git a/deps/rabbit/test/amqp_address_SUITE.erl b/deps/rabbit/test/amqp_address_SUITE.erl index a914442d9730..a974675bb17a 100644 --- a/deps/rabbit/test/amqp_address_SUITE.erl +++ b/deps/rabbit/test/amqp_address_SUITE.erl @@ -19,7 +19,8 @@ -import(rabbit_ct_helpers, [eventually/1]). -import(amqp_utils, - [flush/1, + [connection_config/1, + flush/1, wait_for_credit/1]). -define(TIMEOUT, 30_000). @@ -647,14 +648,6 @@ cleanup({Connection, LinkPair = #link_pair{session = Session}}) -> ok = amqp10_client:end_session(Session), ok = amqp10_client:close_connection(Connection). -connection_config(Config) -> - Host = ?config(rmq_hostname, Config), - Port = rabbit_ct_broker_helpers:get_node_config(Config, 0, tcp_port_amqp), - #{address => Host, - port => Port, - container_id => <<"my container">>, - sasl => {plain, <<"guest">>, <<"guest">>}}. - wait_for_settled(State, Tag) -> receive {amqp10_disposition, {State, Tag}} -> diff --git a/deps/rabbit/test/amqp_auth_SUITE.erl b/deps/rabbit/test/amqp_auth_SUITE.erl index f9328aab969d..581351c462ed 100644 --- a/deps/rabbit/test/amqp_auth_SUITE.erl +++ b/deps/rabbit/test/amqp_auth_SUITE.erl @@ -24,7 +24,8 @@ [assert_event_type/2, assert_event_prop/2]). -import(amqp_utils, - [flush/1, + [web_amqp/1, + flush/1, wait_for_credit/1, close_connection_sync/1]). @@ -584,15 +585,9 @@ target_per_message_topic(Config) -> authn_failure_event(Config) -> ok = event_recorder:start(Config), - Host = ?config(rmq_hostname, Config), - Port = rabbit_ct_broker_helpers:get_node_config(Config, 0, tcp_port_amqp), - Vhost = ?config(test_vhost, Config), User = ?config(test_user, Config), - OpnConf = #{address => Host, - port => Port, - container_id => <<"my container">>, - sasl => {plain, User, <<"wrong password">>}, - hostname => <<"vhost:", Vhost/binary>>}, + OpnConf0 = connection_config(Config), + OpnConf = maps:update(sasl, {plain, User, <<"wrong password">>}, OpnConf0), {ok, Connection} = amqp10_client:open_connection(OpnConf), receive {amqp10_event, {connection, Connection, {closed, sasl_auth_failure}}} -> ok @@ -603,11 +598,15 @@ authn_failure_event(Config) -> [E | _] = event_recorder:get_events(Config), ok = event_recorder:stop(Config), + Proto = case web_amqp(Config) of + true -> {'Web AMQP', {1, 0}}; + false -> {1, 0} + end, assert_event_type(user_authentication_failure, E), assert_event_prop([{name, <<"test user">>}, {auth_mechanism, <<"PLAIN">>}, {ssl, false}, - {protocol, {1, 0}}], + {protocol, Proto}], E). sasl_anonymous_success(Config) -> @@ -1037,14 +1036,10 @@ connection_config(Config) -> connection_config(Config, Vhost). connection_config(Config, Vhost) -> - Host = ?config(rmq_hostname, Config), - Port = rabbit_ct_broker_helpers:get_node_config(Config, 0, tcp_port_amqp), + Cfg = amqp_utils:connection_config(Config), User = Password = ?config(test_user, Config), - #{address => Host, - port => Port, - container_id => <<"my container">>, - sasl => {plain, User, Password}, - hostname => <<"vhost:", Vhost/binary>>}. + Cfg#{hostname => <<"vhost:", Vhost/binary>>, + sasl := {plain, User, Password}}. set_permissions(Config, ConfigurePerm, WritePerm, ReadPerm) -> ok = rabbit_ct_broker_helpers:set_permissions(Config, diff --git a/deps/rabbit/test/amqp_client_SUITE.erl b/deps/rabbit/test/amqp_client_SUITE.erl index c49e93cb39fa..531c0df22e35 100644 --- a/deps/rabbit/test/amqp_client_SUITE.erl +++ b/deps/rabbit/test/amqp_client_SUITE.erl @@ -30,6 +30,7 @@ -import(amqp_utils, [init/1, init/2, connection_config/1, connection_config/2, + web_amqp/1, flush/1, wait_for_credit/1, wait_for_accepts/1, @@ -1890,17 +1891,20 @@ events(Config) -> ok = event_recorder:stop(Config), ct:pal("Recorded events: ~p", [Events]), - Protocol = {protocol, {1, 0}}, + Proto = case web_amqp(Config) of + true -> {'Web AMQP', {1, 0}}; + false -> {1, 0} + end, AuthProps = [{name, <<"guest">>}, {auth_mechanism, <<"PLAIN">>}, {ssl, false}, - Protocol], + {protocol, Proto}], ?assertMatch( {value, _}, find_event(user_authentication_success, AuthProps, Events)), Node = get_node_config(Config, 0, nodename), - ConnectionCreatedProps = [Protocol, + ConnectionCreatedProps = [{protocol, Proto}, {node, Node}, {vhost, <<"/">>}, {user, <<"guest">>}, @@ -3954,7 +3958,7 @@ leader_transfer_send(QName, QType, Config) -> ok = amqp10_client:close_connection(Connection0). %% rabbitmqctl list_connections -%% should list both AMQP 1.0 and AMQP 0.9.1 connections. +%% should list both (Web) AMQP 1.0 and AMQP 0.9.1 connections. list_connections(Config) -> %% Close any open AMQP 0.9.1 connections from previous test cases. [ok = rabbit_ct_client_helpers:close_channels_and_connection(Config, Node) || Node <- [0, 1, 2]], @@ -3978,10 +3982,13 @@ list_connections(Config) -> %% Remove any whitespaces. Protocols1 = [binary:replace(Subject, <<" ">>, <<>>, [global]) || Subject <- Protocols0], Protocols = lists:sort(Protocols1), - ?assertEqual([<<"{0,9,1}">>, - <<"{1,0}">>, - <<"{1,0}">>], - Protocols), + Expected = case web_amqp(Config) of + true -> + [<<"{'WebAMQP',{1,0}}">>, <<"{'WebAMQP',{1,0}}">>, <<"{0,9,1}">>]; + false -> + [<<"{0,9,1}">>, <<"{1,0}">>, <<"{1,0}">>] + end, + ?assertEqual(Expected, Protocols), %% CLI should list AMQP 1.0 container-id {ok, StdOut1} = rabbit_ct_broker_helpers:rabbitmqctl(Config, 0, ["list_connections", "--silent", "container_id"]), @@ -4625,7 +4632,12 @@ idle_time_out_on_server(Config) -> rabbit_ct_broker_helpers:setup_meck(Config), Mod = rabbit_net, ok = rpc(Config, meck, new, [Mod, [no_link, passthrough]]), - ok = rpc(Config, meck, expect, [Mod, getstat, 2, {ok, [{recv_oct, 999}]}]), + ok = rpc(Config, meck, expect, [Mod, getstat, fun(_Sock, [recv_oct]) -> + {ok, [{recv_oct, 999}]}; + (Sock, Opts) -> + meck:passthrough([Sock, Opts]) + end]), + %% The server "SHOULD try to gracefully close the connection using a close %% frame with an error explaining why" [2.4.5]. %% Since we chose a heartbeat value of 1 second, the server should easily @@ -4662,13 +4674,15 @@ idle_time_out_on_client(Config) -> %% All good, the server sent us frames every second. %% Mock the server to not send anything. + %% Mocking gen_tcp:send/2 allows this test to work for + %% * AMQP: https://github.com/rabbitmq/rabbitmq-server/blob/v4.1.0-beta.3/deps/rabbit_common/src/rabbit_net.erl#L174 + %% * AMQP over WebSocket: https://github.com/ninenines/ranch/blob/2.1.0/src/ranch_tcp.erl#L191 rabbit_ct_broker_helpers:setup_meck(Config), - Mod = rabbit_net, - ok = rpc(Config, meck, new, [Mod, [no_link, passthrough]]), + Mod = gen_tcp, + ok = rpc(Config, meck, new, [Mod, [unstick, no_link, passthrough]]), ok = rpc(Config, meck, expect, [Mod, send, 2, ok]), - %% Our client should time out within less than 5 seconds given that the - %% idle-time-out is 1 second. + %% Our client should time out soon given that the idle-time-out is 1 second. receive {amqp10_event, {connection, Connection, @@ -4694,9 +4708,19 @@ handshake_timeout(Config) -> Par = ?FUNCTION_NAME, {ok, DefaultVal} = rpc(Config, application, get_env, [App, Par]), ok = rpc(Config, application, set_env, [App, Par, 200]), - Port = get_node_config(Config, 0, tcp_port_amqp), - {ok, Socket} = gen_tcp:connect("localhost", Port, [{active, false}]), - ?assertEqual({error, closed}, gen_tcp:recv(Socket, 0, 400)), + case web_amqp(Config) of + true -> + Port = get_node_config(Config, 0, tcp_port_web_amqp), + Uri = "ws://127.0.0.1:" ++ integer_to_list(Port) ++ "/ws", + Ws = rfc6455_client:new(Uri, self(), undefined, ["amqp"]), + {ok, [{http_response, Resp}]} = rfc6455_client:open(Ws), + ?assertNotEqual(nomatch, string:prefix(Resp, "HTTP/1.1 101 Switching Protocols")), + ?assertMatch({close, _}, rfc6455_client:recv(Ws, 1000)); + false -> + Port = get_node_config(Config, 0, tcp_port_amqp), + {ok, Socket} = gen_tcp:connect("localhost", Port, [{active, false}]), + ?assertEqual({error, closed}, gen_tcp:recv(Socket, 0, 1000)) + end, ok = rpc(Config, application, set_env, [App, Par, DefaultVal]). credential_expires(Config) -> @@ -5890,20 +5914,35 @@ tcp_back_pressure_rabbitmq_internal_flow(QType, Config) -> end, flush(receiver_attached), - {_GenStatemState, - #{reader := ReaderPid, - socket := {tcp, Socket}}} = formatted_state(Session), + {_GenStatemStateSession, StateSession} = formatted_state(Session), + Socket = case web_amqp(Config) of + true -> + #{socket := {ws, GunPid, _GunStreamRef}} = StateSession, + {_GenStatemStateGun, StateGun} = formatted_state(GunPid), + %% https://github.com/ninenines/gun/blob/2.1.0/src/gun.erl#L315 + element(12, StateGun); + false -> + #{socket := {tcp, Sock}} = StateSession, + Sock + end, + ?assert(is_port(Socket)), - %% Provoke TCP back-pressure from client to server by using very small buffers. + %% Provoke TCP back-pressure from client to server by: + %% 1. using very small buffers ok = inet:setopts(Socket, [{recbuf, 256}, {buffer, 256}]), - %% Suspend the receiving client such that it stops reading from its socket - %% causing TCP back-pressure to the server being applied. - true = erlang:suspend_process(ReaderPid), + %% 2. stopping reading from the socket + Mod = inet, + ok = meck:new(Mod, [unstick, no_link, passthrough]), + ok = meck:expect(Mod, setopts, fun(_Sock, [{active, once}]) -> + ok; + (Sock, Opts) -> + meck:passthrough([Sock, Opts]) + end), ok = amqp10_client:flow_link_credit(Receiver, Num, never), %% We give the queue time to send messages to the session proc and writer proc. - timer:sleep(1000), + timer:sleep(2000), %% Here, we do a bit of white box testing: We assert that RabbitMQ has some form of internal %% flow control by checking that the queue sent some but, more importantly, not all its @@ -5917,7 +5956,9 @@ tcp_back_pressure_rabbitmq_internal_flow(QType, Config) -> ok = inet:setopts(Socket, [{recbuf, 65536}, {buffer, 65536}]), %% When we resume the receiving client, we expect to receive all messages. - true = erlang:resume_process(ReaderPid), + ?assert(meck:validate(Mod)), + ok = meck:unload(Mod), + ok = Mod:setopts(Socket, [{active, once}]), receive_messages(Receiver, Num), ok = detach_link_sync(Receiver), diff --git a/deps/rabbit/test/amqp_filtex_SUITE.erl b/deps/rabbit/test/amqp_filtex_SUITE.erl index 9655d007378a..75f8528da9ca 100644 --- a/deps/rabbit/test/amqp_filtex_SUITE.erl +++ b/deps/rabbit/test/amqp_filtex_SUITE.erl @@ -271,10 +271,7 @@ application_properties_section(Config) -> {ok, Receiver0} = amqp10_client:attach_receiver_link( Session, <<"receiver 0">>, Address, unsettled, configuration, Filter0), - %% Wait for the attach so the detach command won't fail - receive {amqp10_event, - {link, Receiver0, {attached, #'v1_0.attach'{}}}} -> - ok + receive {amqp10_event, {link, Receiver0, {attached, #'v1_0.attach'{}}}} -> ok after 30000 -> ct:fail({missing_event, ?LINE}) end, ok = amqp10_client:flow_link_credit(Receiver0, 10, never), @@ -597,6 +594,9 @@ string_modifier(Config) -> {ok, Receiver2} = amqp10_client:attach_receiver_link( Session, <<"receiver 2">>, Address, settled, configuration, Filter2), + receive {amqp10_event, {link, Receiver2, attached}} -> ok + after 30000 -> ct:fail({missing_event, ?LINE}) + end, ok = amqp10_client:flow_link_credit(Receiver2, 10, never), ok = assert_no_msg_received(?LINE), ok = detach_link_sync(Receiver2), diff --git a/deps/rabbit/test/amqp_proxy_protocol_SUITE.erl b/deps/rabbit/test/amqp_proxy_protocol_SUITE.erl index 85a3fdbf6f39..523bccb93598 100644 --- a/deps/rabbit/test/amqp_proxy_protocol_SUITE.erl +++ b/deps/rabbit/test/amqp_proxy_protocol_SUITE.erl @@ -101,10 +101,10 @@ v2_local(Config) -> %% use wireshark with one of the Java tests to record those amqp_1_0_frame(header_sasl) -> hex_frame_to_binary("414d515003010000"); -amqp_1_0_frame(header_amqp) -> - hex_frame_to_binary("414d515000010000"); amqp_1_0_frame(sasl_init) -> hex_frame_to_binary("0000001902010000005341c00c01a309414e4f4e594d4f5553"); +amqp_1_0_frame(header_amqp) -> + hex_frame_to_binary("414d515000010000"); amqp_1_0_frame(open) -> hex_frame_to_binary("0000003f02000000005310c03202a12438306335323662332d653530662d343835352d613564302d336466643738623537633730a1096c6f63616c686f7374"). diff --git a/deps/rabbit/test/amqp_utils.erl b/deps/rabbit/test/amqp_utils.erl index 0c8b80040bd8..9de9a1bbfa06 100644 --- a/deps/rabbit/test/amqp_utils.erl +++ b/deps/rabbit/test/amqp_utils.erl @@ -11,6 +11,7 @@ -export([init/1, init/2, connection_config/1, connection_config/2, + web_amqp/1, flush/1, wait_for_credit/1, wait_for_accepts/1, @@ -35,11 +36,21 @@ connection_config(Config) -> connection_config(Node, Config) -> Host = proplists:get_value(rmq_hostname, Config), - Port = rabbit_ct_broker_helpers:get_node_config(Config, Node, tcp_port_amqp), - #{address => Host, - port => Port, - container_id => <<"my container">>, - sasl => {plain, <<"guest">>, <<"guest">>}}. + Cfg = #{address => Host, + container_id => <<"my container">>, + sasl => {plain, <<"guest">>, <<"guest">>}}, + case web_amqp(Config) of + true -> + Port = rabbit_ct_broker_helpers:get_node_config(Config, Node, tcp_port_web_amqp), + Cfg#{port => Port, + ws_path => "/ws"}; + false -> + Port = rabbit_ct_broker_helpers:get_node_config(Config, Node, tcp_port_amqp), + Cfg#{port => Port} + end. + +web_amqp(Config) -> + proplists:get_value(web_amqp, Config, false). flush(Prefix) -> receive diff --git a/deps/rabbit/test/classic_queue_SUITE.erl b/deps/rabbit/test/classic_queue_SUITE.erl index 2242fe599cfd..d45d5ff1cf52 100644 --- a/deps/rabbit/test/classic_queue_SUITE.erl +++ b/deps/rabbit/test/classic_queue_SUITE.erl @@ -93,7 +93,9 @@ classic_queue_flow_control_enabled(Config) -> ?assertMatch({0, _}, gen_server2_queue(QPid)), %% The connection gets into flow state - ?assertEqual([{state, flow}], rabbit_reader:info(ConnPid, [state])), + ?assertEqual( + [{state, flow}], + rabbit_ct_broker_helpers:rpc(Config, rabbit_reader, info, [ConnPid, [state]])), Dict = proc_info(ConnPid, dictionary), ?assertMatch([_|_], proplists:get_value(credit_blocked, Dict)), @@ -111,7 +113,9 @@ classic_queue_flow_control_disabled(Config) -> ?assertMatch({0, _}, gen_server2_queue(QPid)), %% The connection dos not get into flow state - ?assertEqual([{state, running}], rabbit_reader:info(ConnPid, [state])), + ?assertEqual( + [{state, running}], + rabbit_ct_broker_helpers:rpc(Config, rabbit_reader, info, [ConnPid, [state]])), Dict = proc_info(ConnPid, dictionary), ?assertMatch([], proplists:get_value(credit_blocked, Dict, [])) diff --git a/deps/rabbit_common/mk/rabbitmq-run.mk b/deps/rabbit_common/mk/rabbitmq-run.mk index f7720de345fe..7017435a85fe 100644 --- a/deps/rabbit_common/mk/rabbitmq-run.mk +++ b/deps/rabbit_common/mk/rabbitmq-run.mk @@ -164,6 +164,9 @@ $(if $(RABBITMQ_NODE_PORT), {tcp_listeners$(comma) [$(RABBITMQ_NODE_PORT)]} {rabbitmq_management, [ $(if $(RABBITMQ_NODE_PORT), {listener$(comma) [{port$(comma) $(shell echo "$$(($(RABBITMQ_NODE_PORT) + 10000))")}]},) ]}, + {rabbitmq_web_amqp, [ +$(if $(RABBITMQ_NODE_PORT), {tcp_config$(comma) [{port$(comma) $(shell echo "$$((15678 + $(RABBITMQ_NODE_PORT) - 5672))")}]},) + ]}, {rabbitmq_mqtt, [ $(if $(RABBITMQ_NODE_PORT), {tcp_listeners$(comma) [$(shell echo "$$((1883 + $(RABBITMQ_NODE_PORT) - 5672))")]},) ]}, diff --git a/deps/rabbit_common/src/credit_flow.erl b/deps/rabbit_common/src/credit_flow.erl index 20e644675eb8..433b8104998a 100644 --- a/deps/rabbit_common/src/credit_flow.erl +++ b/deps/rabbit_common/src/credit_flow.erl @@ -33,9 +33,7 @@ %% %% Grepping the project files for `credit_flow` will reveal the places %% where this module is currently used, with extra comments on what's -%% going on at each instance. Note that credit flow between mirrors -%% synchronization has not been documented, since this doesn't affect -%% client publishes. +%% going on at each instance. -define(DEFAULT_CREDIT, persistent_term:get(credit_flow_default_credit)). @@ -116,18 +114,18 @@ send(From) -> send(From, ?DEFAULT_CREDIT). send(From, {InitialCredit, _MoreCreditAfter}) -> ?UPDATE({credit_from, From}, InitialCredit, C, - if C == 1 -> block(From), - 0; - true -> C - 1 + if C =:= 1 -> block(From), + 0; + true -> C - 1 end). ack(To) -> ack(To, ?DEFAULT_CREDIT). ack(To, {_InitialCredit, MoreCreditAfter}) -> ?UPDATE({credit_to, To}, MoreCreditAfter, C, - if C == 1 -> grant(To, MoreCreditAfter), - MoreCreditAfter; - true -> C - 1 + if C =:= 1 -> grant(To, MoreCreditAfter), + MoreCreditAfter; + true -> C - 1 end). handle_bump_msg({From, MoreCredit}) -> @@ -193,10 +191,15 @@ unblock(From) -> ?TRACE_UNBLOCKED(self(), From), ?UPDATE(credit_blocked, [], Blocks, Blocks -- [From]), case blocked() of - false -> case erase(credit_deferred) of - undefined -> ok; - Credits -> _ = [To ! Msg || {To, Msg} <- Credits], - ok - end; - true -> ok + false -> + case erase(credit_deferred) of + undefined -> + ok; + Credits -> + lists:foreach(fun({To, Msg}) -> + To ! Msg + end, Credits) + end; + true -> + ok end. diff --git a/deps/rabbit_common/src/rabbit_net.erl b/deps/rabbit_common/src/rabbit_net.erl index 5f05b8a81ca1..949f483eeac2 100644 --- a/deps/rabbit_common/src/rabbit_net.erl +++ b/deps/rabbit_common/src/rabbit_net.erl @@ -223,7 +223,7 @@ connection_string(Sock, Direction) -> end. socket_ends(Sock, Direction) when ?IS_SSL(Sock); - is_port(Sock) -> + is_port(Sock) -> {From, To} = sock_funs(Direction), case {From(Sock), To(Sock)} of {{ok, {FromAddress, FromPort}}, {ok, {ToAddress, ToPort}}} -> diff --git a/deps/rabbitmq_cli/lib/rabbitmq/cli/core/listeners.ex b/deps/rabbitmq_cli/lib/rabbitmq/cli/core/listeners.ex index ae526d1b17e2..3474faafc4af 100644 --- a/deps/rabbitmq_cli/lib/rabbitmq/cli/core/listeners.ex +++ b/deps/rabbitmq_cli/lib/rabbitmq/cli/core/listeners.ex @@ -261,10 +261,12 @@ defmodule RabbitMQ.CLI.Core.Listeners do def protocol_label(:"stomp/ssl"), do: "STOMP over TLS" def protocol_label(:http), do: "HTTP API" def protocol_label(:https), do: "HTTP API over TLS (HTTPS)" - def protocol_label(:"http/web-mqtt"), do: "MQTT over WebSockets" - def protocol_label(:"https/web-mqtt"), do: "MQTT over WebSockets and TLS (HTTPS)" - def protocol_label(:"http/web-stomp"), do: "STOMP over WebSockets" - def protocol_label(:"https/web-stomp"), do: "STOMP over WebSockets and TLS (HTTPS)" + def protocol_label(:"http/web-amqp"), do: "AMQP over WebSocket" + def protocol_label(:"https/web-amqp"), do: "AMQP over WebSocket and TLS (HTTPS)" + def protocol_label(:"http/web-mqtt"), do: "MQTT over WebSocket" + def protocol_label(:"https/web-mqtt"), do: "MQTT over WebSocket and TLS (HTTPS)" + def protocol_label(:"http/web-stomp"), do: "STOMP over WebSocket" + def protocol_label(:"https/web-stomp"), do: "STOMP over WebSocket and TLS (HTTPS)" def protocol_label(:"http/prometheus"), do: "Prometheus exporter API over HTTP" def protocol_label(:"https/prometheus"), do: "Prometheus exporter API over TLS (HTTPS)" def protocol_label(:clustering), do: "inter-node and CLI tool communication" diff --git a/deps/rabbitmq_cli/test/core/listeners_test.exs b/deps/rabbitmq_cli/test/core/listeners_test.exs index 92eabc491800..2f921976b94b 100644 --- a/deps/rabbitmq_cli/test/core/listeners_test.exs +++ b/deps/rabbitmq_cli/test/core/listeners_test.exs @@ -37,8 +37,8 @@ defmodule CoreListenersTest do assert protocol_label(:"stomp/ssl") == "STOMP over TLS" assert protocol_label(:http) == "HTTP API" assert protocol_label(:https) == "HTTP API over TLS (HTTPS)" - assert protocol_label(:"https/web-stomp") == "STOMP over WebSockets and TLS (HTTPS)" - assert protocol_label(:"https/web-mqtt") == "MQTT over WebSockets and TLS (HTTPS)" + assert protocol_label(:"https/web-stomp") == "STOMP over WebSocket and TLS (HTTPS)" + assert protocol_label(:"https/web-mqtt") == "MQTT over WebSocket and TLS (HTTPS)" assert protocol_label(:"http/prometheus") == "Prometheus exporter API over HTTP" assert protocol_label(:"https/prometheus") == "Prometheus exporter API over TLS (HTTPS)" diff --git a/deps/rabbitmq_ct_client_helpers/Makefile b/deps/rabbitmq_ct_client_helpers/Makefile index 84b5238fb08e..ac964056746c 100644 --- a/deps/rabbitmq_ct_client_helpers/Makefile +++ b/deps/rabbitmq_ct_client_helpers/Makefile @@ -5,7 +5,7 @@ DEPS = rabbit_common rabbitmq_ct_helpers amqp_client DEP_PLUGINS = rabbit_common/mk/rabbitmq-build.mk -PLT_APPS = common_test +PLT_APPS += common_test crypto include ../../rabbitmq-components.mk include ../../erlang.mk diff --git a/deps/rabbitmq_web_mqtt/test/src/rfc6455_client.erl b/deps/rabbitmq_ct_client_helpers/src/rfc6455_client.erl similarity index 97% rename from deps/rabbitmq_web_mqtt/test/src/rfc6455_client.erl rename to deps/rabbitmq_ct_client_helpers/src/rfc6455_client.erl index 32b50b419afe..57caf90c05c0 100644 --- a/deps/rabbitmq_web_mqtt/test/src/rfc6455_client.erl +++ b/deps/rabbitmq_ct_client_helpers/src/rfc6455_client.erl @@ -23,8 +23,8 @@ new(WsUrl, PPid, AuthInfo, Protocols) -> new(WsUrl, PPid, AuthInfo, Protocols, <<>>). new(WsUrl, PPid, AuthInfo, Protocols, TcpPreface) -> - crypto:start(), - application:ensure_all_started(ssl), + _ = crypto:start(), + _ = application:ensure_all_started(ssl), {Transport, Url} = case WsUrl of "ws://" ++ Rest -> {gen_tcp, Rest}; "wss://" ++ SslRest -> {ssl, SslRest} @@ -113,7 +113,7 @@ start_conn(State = #state{transport = Transport}, AuthInfo, Protocols, TcpPrefac {ok, Socket0} = gen_tcp:connect(State#state.host, State#state.port, [binary, {packet, 0}]), - gen_tcp:send(Socket0, TcpPreface), + ok = gen_tcp:send(Socket0, TcpPreface), case Transport of gen_tcp -> {ok, Socket0}; ssl -> Transport:connect(Socket0, [{verify, verify_none}]) @@ -173,7 +173,7 @@ do_recv(State = #state{phase = Phase, data = Data, socket = Socket, transport = <> -> {F, O, Payload, Rest}; - <<_:1, _:3, _:4, 1:1, _/binary>> -> + <<_:1, _:3, _:4, 1:1, _/bitstring>> -> %% According o rfc6455 5.1 the server must not mask any frames. die(Socket, Transport, PPid, {1006, "Protocol error"}, normal); _ -> @@ -200,7 +200,7 @@ do_recv2(State = #state{phase = Phase, socket = Socket, ppid = PPid, transport = end, case Phase of open -> %% echo - do_close(State, WsReason), + _ = do_close(State, WsReason), Transport:close(Socket); closing -> ok @@ -260,7 +260,7 @@ loop(State = #state{socket = Socket, transport = Transport, ppid = PPid, data = error({unknown_message, Other, Socket}) end. - +-spec die(any(), any(), pid(), any(), any()) -> no_return(). die(Socket, Transport, PPid, WsReason, Reason) -> Transport:shutdown(Socket, read_write), PPid ! {rfc6455, close, self(), WsReason}, @@ -285,9 +285,6 @@ split(SubStr, Str, Limit, Acc, Default) -> split(SubStr, R, Limit-1, [L | Acc], Default). -apply_mask(Mask, Data) when is_number(Mask) -> - apply_mask(<>, Data); - apply_mask(<<0:32>>, Data) -> Data; apply_mask(Mask, Data) -> diff --git a/deps/rabbitmq_ct_helpers/src/rabbit_ct_broker_helpers.erl b/deps/rabbitmq_ct_helpers/src/rabbit_ct_broker_helpers.erl index f686db6bc4d1..85a55ba5839e 100644 --- a/deps/rabbitmq_ct_helpers/src/rabbit_ct_broker_helpers.erl +++ b/deps/rabbitmq_ct_helpers/src/rabbit_ct_broker_helpers.erl @@ -194,6 +194,8 @@ tcp_port_erlang_dist_proxy, tcp_port_mqtt, tcp_port_mqtt_tls, + tcp_port_web_amqp, + tcp_port_web_amqp_tls, tcp_port_web_mqtt, tcp_port_web_mqtt_tls, tcp_port_stomp, @@ -547,6 +549,13 @@ update_tcp_ports_in_rmq_config(NodeConfig, [tcp_port_mqtt_tls = Key | Rest]) -> NodeConfig1 = rabbit_ct_helpers:merge_app_env(NodeConfig, {rabbitmq_mqtt, [{ssl_listeners, [?config(Key, NodeConfig)]}]}), update_tcp_ports_in_rmq_config(NodeConfig1, Rest); +update_tcp_ports_in_rmq_config(NodeConfig, [tcp_port_web_amqp_tls | Rest]) -> + %% Skip this one, because we need more than just a port to configure + update_tcp_ports_in_rmq_config(NodeConfig, Rest); +update_tcp_ports_in_rmq_config(NodeConfig, [tcp_port_web_amqp = Key | Rest]) -> + NodeConfig1 = rabbit_ct_helpers:merge_app_env(NodeConfig, + {rabbitmq_web_amqp, [{tcp_config, [{port, ?config(Key, NodeConfig)}]}]}), + update_tcp_ports_in_rmq_config(NodeConfig1, Rest); update_tcp_ports_in_rmq_config(NodeConfig, [tcp_port_web_mqtt_tls | Rest]) -> %% Skip this one, because we need more than just a port to configure update_tcp_ports_in_rmq_config(NodeConfig, Rest); diff --git a/deps/rabbitmq_management/priv/www/js/dispatcher.js b/deps/rabbitmq_management/priv/www/js/dispatcher.js index 65a7872d72ca..8413eb7b6f97 100644 --- a/deps/rabbitmq_management/priv/www/js/dispatcher.js +++ b/deps/rabbitmq_management/priv/www/js/dispatcher.js @@ -55,7 +55,8 @@ dispatcher_add(function(sammy) { }; // First, get the connection details to check the protocol var connectionDetails = JSON.parse(sync_get(connectionPath)); - if (connectionDetails.protocol === 'AMQP 1-0') { + if (connectionDetails.protocol === 'AMQP 1-0' || + connectionDetails.protocol === 'Web AMQP 1-0') { reqs['sessions'] = connectionPath + '/sessions'; } else { reqs['channels'] = connectionPath + '/channels'; diff --git a/deps/rabbitmq_management/priv/www/js/tmpl/connection.ejs b/deps/rabbitmq_management/priv/www/js/tmpl/connection.ejs index 1e7433822689..49e604c5277f 100644 --- a/deps/rabbitmq_management/priv/www/js/tmpl/connection.ejs +++ b/deps/rabbitmq_management/priv/www/js/tmpl/connection.ejs @@ -84,7 +84,8 @@ -<% if (connection.protocol === 'AMQP 1-0') { %> +<% if (connection.protocol === 'AMQP 1-0' || + connection.protocol === 'Web AMQP 1-0') { %>

Sessions (<%=(sessions.length)%>)

diff --git a/deps/rabbitmq_management/src/rabbit_mgmt_wm_connection_sessions.erl b/deps/rabbitmq_management/src/rabbit_mgmt_wm_connection_sessions.erl index 0baf3639fca5..aea1c7ddcec5 100644 --- a/deps/rabbitmq_management/src/rabbit_mgmt_wm_connection_sessions.erl +++ b/deps/rabbitmq_management/src/rabbit_mgmt_wm_connection_sessions.erl @@ -35,10 +35,13 @@ resource_exists(ReqData, Context) -> to_json(ReqData, Context) -> Conn = conn(ReqData), - case proplists:get_value(protocol, Conn) of - {1, 0} -> + Vsn = {1, 0}, + Protocol = proplists:get_value(protocol, Conn), + case Protocol =:= Vsn orelse + Protocol =:= {'Web AMQP', Vsn} of + true -> ConnPid = proplists:get_value(pid, Conn), - try rabbit_amqp_reader:info(ConnPid, [session_pids]) of + try rabbit_reader:info(ConnPid, [session_pids]) of [{session_pids, Pids}] -> rabbit_mgmt_util:reply_list(session_infos(Pids), ["channel_number"], @@ -52,7 +55,7 @@ to_json(ReqData, Context) -> [ConnPid, Type, Reason0]))), rabbit_mgmt_util:internal_server_error(Reason, ReqData, Context) end; - _ -> + false -> rabbit_mgmt_util:bad_request(<<"connection does not use AMQP 1.0">>, ReqData, Context) diff --git a/deps/rabbitmq_management/test/rabbit_mgmt_http_SUITE.erl b/deps/rabbitmq_management/test/rabbit_mgmt_http_SUITE.erl index 97a9e3df4e23..7cae1e5c484e 100644 --- a/deps/rabbitmq_management/test/rabbit_mgmt_http_SUITE.erl +++ b/deps/rabbitmq_management/test/rabbit_mgmt_http_SUITE.erl @@ -17,8 +17,8 @@ open_unmanaged_connection/1]). -import(rabbit_ct_broker_helpers, [rpc/4]). -import(rabbit_ct_helpers, - [eventually/3, - eventually/1]). + [eventually/1, + eventually/3]). -import(rabbit_mgmt_test_util, [assert_list/2, assert_item/2, test_item/2, assert_keys/2, assert_no_keys/2, decode_body/1, diff --git a/deps/rabbitmq_web_mqtt_examples/priv/bunny.html b/deps/rabbitmq_web_mqtt_examples/priv/bunny.html index 3ff155b9d89a..2db3154706f2 100644 --- a/deps/rabbitmq_web_mqtt_examples/priv/bunny.html +++ b/deps/rabbitmq_web_mqtt_examples/priv/bunny.html @@ -1,6 +1,6 @@ - +