Skip to content

Commit

Permalink
Support AMQP over WebSocket in Erlang client
Browse files Browse the repository at this point in the history
 ## What?
Implement the AMQP over WebSocket Binding Committee Specification 01 in
the AMQP 1.0 Erlang client:
https://docs.oasis-open.org/amqp-bindmap/amqp-wsb/v1.0/cs01/amqp-wsb-v1.0-cs01.html

 ## Why?
1. This allows writing integration tests for the server implementation
   of AMQP over WebSocket.
2. Erlang and Elixir clients can use AMQP over WebSocket in environments
   where firewalls prohibit access to the AMQP port.

 ## How?
Use gun as WebSocket client.

The new module `amqp10_client_socket` handles socket operations (open, close, send) for:
* TCP sockets
* SSL sockets
* WebSockets

Prior to this commit, the amqp10_client_connection process closed only the
write end of the socket after it sent the AMQP close performative.
This commit removed premature socket closure because:
1. There is no equivalent feature provided in Gun since sending a
   WebSocket close frame causes Gun to cleanly close the connection for
   both writing and reading.
2. It's unnecessary and can result in unexpected and confusing behaviour on the server.
3. It's better practive to keep the TCP connection fully open until
   the AMQP closing handshake completes.
4. When amqp10_client_frame_reader terminates, it will cleanly close
   the socket for both writing and reading.
  • Loading branch information
ansd committed Jan 15, 2025
1 parent 5bc7803 commit 87bb1a1
Show file tree
Hide file tree
Showing 5 changed files with 223 additions and 148 deletions.
2 changes: 1 addition & 1 deletion deps/amqp10_client/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ endef
PACKAGES_DIR ?= $(abspath PACKAGES)

BUILD_DEPS = rabbit_common elvis_mk
DEPS = amqp10_common credentials_obfuscation
DEPS = amqp10_common credentials_obfuscation gun
TEST_DEPS = rabbit rabbitmq_ct_helpers
LOCAL_DEPS = ssl inets crypto public_key

Expand Down
107 changes: 51 additions & 56 deletions deps/amqp10_client/src/amqp10_client_connection.erl
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,6 @@

-export([format_status/1]).

-type amqp10_socket() :: {tcp, gen_tcp:socket()} | {ssl, ssl:sslsocket()}.

-type milliseconds() :: non_neg_integer().

-type address() :: inet:socket_address() | inet:hostname().
Expand All @@ -60,6 +58,8 @@
address => address(),
port => inet:port_number(),
tls_opts => {secure_port, [ssl:tls_option()]},
ws_path => string(),
ws_opts => gun:opts(),
notify => pid() | none, % the pid to send connection events to
notify_when_opened => pid() | none,
notify_when_closed => pid() | none,
Expand All @@ -83,14 +83,13 @@
sessions_sup :: pid() | undefined,
pending_session_reqs = [] :: [term()],
reader :: pid() | undefined,
socket :: amqp10_socket() | undefined,
socket :: amqp10_client_socket:socket() | undefined,
idle_time_out :: non_neg_integer() | undefined,
heartbeat_timer :: timer:tref() | undefined,
config :: connection_config()
}).

-export_type([connection_config/0,
amqp10_socket/0]).
-export_type([connection_config/0]).

%% -------------------------------------------------------------------
%% Public API.
Expand Down Expand Up @@ -152,7 +151,7 @@ start_link(Sup, Config) ->
set_other_procs(Pid, OtherProcs) ->
gen_statem:cast(Pid, {set_other_procs, OtherProcs}).

-spec socket_ready(pid(), amqp10_socket()) -> ok.
-spec socket_ready(pid(), amqp10_client_socket:socket()) -> ok.
socket_ready(Pid, Socket) ->
gen_statem:cast(Pid, {socket_ready, Socket}).

Expand Down Expand Up @@ -186,10 +185,10 @@ expecting_socket(_EvtType, {socket_ready, Socket},
Sasl = credentials_obfuscation:decrypt(maps:get(sasl, Cfg)),
case Sasl of
none ->
ok = socket_send(Socket, ?AMQP_PROTOCOL_HEADER),
ok = amqp10_client_socket:send(Socket, ?AMQP_PROTOCOL_HEADER),
{next_state, hdr_sent, State1};
_ ->
ok = socket_send(Socket, ?SASL_PROTOCOL_HEADER),
ok = amqp10_client_socket:send(Socket, ?SASL_PROTOCOL_HEADER),
{next_state, sasl_hdr_sent, State1}
end;
expecting_socket(_EvtType, {set_other_procs, OtherProcs}, State) ->
Expand All @@ -205,9 +204,12 @@ expecting_socket({call, From}, begin_session,
sasl_hdr_sent(_EvtType, {protocol_header_received, 3, 1, 0, 0}, State) ->
{next_state, sasl_hdr_rcvds, State};
sasl_hdr_sent({call, From}, begin_session,
#state{pending_session_reqs = PendingSessionReqs} = State) ->
#state{pending_session_reqs = PendingSessionReqs} = State) ->
State1 = State#state{pending_session_reqs = [From | PendingSessionReqs]},
{keep_state, State1}.
{keep_state, State1};
sasl_hdr_sent(info, {'DOWN', MRef, process, _Pid, _},
#state{reader_m_ref = MRef}) ->
{stop, {shutdown, reader_down}}.

sasl_hdr_rcvds(_EvtType, #'v1_0.sasl_mechanisms'{
sasl_server_mechanisms = {array, symbol, AvailableMechs}},
Expand All @@ -228,7 +230,7 @@ sasl_hdr_rcvds({call, From}, begin_session,

sasl_init_sent(_EvtType, #'v1_0.sasl_outcome'{code = {ubyte, 0}},
#state{socket = Socket} = State) ->
ok = socket_send(Socket, ?AMQP_PROTOCOL_HEADER),
ok = amqp10_client_socket:send(Socket, ?AMQP_PROTOCOL_HEADER),
{next_state, hdr_sent, State};
sasl_init_sent(_EvtType, #'v1_0.sasl_outcome'{code = {ubyte, C}},
#state{} = State) when C==1;C==2;C==3;C==4 ->
Expand Down Expand Up @@ -285,7 +287,7 @@ open_sent({call, From}, begin_session,
#state{pending_session_reqs = PendingSessionReqs} = State) ->
State1 = State#state{pending_session_reqs = [From | PendingSessionReqs]},
{keep_state, State1};
open_sent(info, {'DOWN', MRef, _, _, _},
open_sent(info, {'DOWN', MRef, process, _, _},
#state{reader_m_ref = MRef}) ->
{stop, {shutdown, reader_down}}.

Expand All @@ -294,46 +296,56 @@ opened(_EvtType, heartbeat, State = #state{idle_time_out = T}) ->
{ok, Tmr} = start_heartbeat_timer(T),
{keep_state, State#state{heartbeat_timer = Tmr}};
opened(_EvtType, {close, Reason}, State) ->
%% We send the first close frame and wait for the reply.
%% TODO: stop all sessions writing
%% We could still accept incoming frames (See: 2.4.6)
case send_close(State, Reason) of
ok -> {next_state, close_sent, State};
{error, closed} -> {stop, normal, State};
Error -> {stop, Error, State}
ok ->
%% "After writing this frame the peer SHOULD continue to read from the connection
%% until it receives the partner's close frame (in order to guard against
%% erroneously or maliciously implemented partners, a peer SHOULD implement a
%% timeout to give its partner a reasonable time to receive and process the close
%% before giving up and simply closing the underlying transport mechanism)." [§2.4.3]
{next_state, close_sent, State, {state_timeout, ?TIMEOUT, received_no_close_frame}};
{error, closed} ->
{stop, normal, State};
Error ->
{stop, Error, State}
end;
opened(_EvtType, #'v1_0.close'{} = Close, State = #state{config = Config}) ->
%% We receive the first close frame, reply and terminate.
ok = notify_closed(Config, Close),
_ = send_close(State, none),
{stop, normal, State};
case send_close(State, none) of
ok -> {stop, normal, State};
{error, closed} -> {stop, normal, State};
Error -> {stop, Error, State}
end;
opened({call, From}, begin_session, State) ->
{Ret, State1} = handle_begin_session(From, State),
{keep_state, State1, [{reply, From, Ret}]};
opened(info, {'DOWN', MRef, _, _, _Info},
State = #state{reader_m_ref = MRef, config = Config}) ->
opened(info, {'DOWN', MRef, process, _, _Info},
#state{reader_m_ref = MRef, config = Config}) ->
%% reader has gone down and we are not already shutting down
ok = notify_closed(Config, shutdown),
{stop, normal, State};
{stop, normal};
opened(_EvtType, Frame, State) ->
logger:warning("Unexpected connection frame ~tp when in state ~tp ",
[Frame, State]),
{keep_state, State}.
[Frame, State]),
keep_state_and_data.

close_sent(_EvtType, heartbeat, State) ->
{next_state, close_sent, State};
close_sent(_EvtType, {'EXIT', _Pid, shutdown}, State) ->
close_sent(_EvtType, heartbeat, _Data) ->
keep_state_and_data;
close_sent(_EvtType, {'EXIT', _Pid, shutdown}, _Data) ->
%% monitored processes may exit during closure
{next_state, close_sent, State};
close_sent(_EvtType, {'DOWN', _Ref, process, ReaderPid, _},
#state{reader = ReaderPid} = State) ->
%% if the reader exits we probably wont receive a close frame
{stop, normal, State};
close_sent(_EvtType, #'v1_0.close'{} = Close, State = #state{config = Config}) ->
keep_state_and_data;
close_sent(_EvtType, {'DOWN', _Ref, process, ReaderPid, _Reason},
#state{reader = ReaderPid}) ->
%% if the reader exits we probably won't receive a close frame
{stop, normal};
close_sent(_EvtType, #'v1_0.close'{} = Close, #state{config = Config}) ->
ok = notify_closed(Config, Close),
%% TODO: we should probably set up a timer before this to ensure
%% we close down event if no reply is received
{stop, normal, State}.
{stop, normal};
close_sent(state_timeout, received_no_close_frame, _Data) ->
{stop, normal}.

set_other_procs0(OtherProcs, State) ->
#{sessions_sup := SessionsSup,
Expand Down Expand Up @@ -435,22 +447,15 @@ send_open(#state{socket = Socket, config = Config0}) ->
Encoded = amqp10_framing:encode_bin(Open),
Frame = amqp10_binary_generator:build_frame(0, Encoded),
?DBG("CONN <- ~tp", [Open]),
socket_send(Socket, Frame).
amqp10_client_socket:send(Socket, Frame).


send_close(#state{socket = Socket}, _Reason) ->
Close = #'v1_0.close'{},
Encoded = amqp10_framing:encode_bin(Close),
Frame = amqp10_binary_generator:build_frame(0, Encoded),
?DBG("CONN <- ~tp", [Close]),
Ret = socket_send(Socket, Frame),
case Ret of
ok -> _ =
socket_shutdown(Socket, write),
ok;
_ -> ok
end,
Ret.
amqp10_client_socket:send(Socket, Frame).

send_sasl_init(State, anon) ->
Frame = #'v1_0.sasl_init'{mechanism = {symbol, <<"ANONYMOUS">>}},
Expand All @@ -474,21 +479,11 @@ send(Record, FrameType, #state{socket = Socket}) ->
Encoded = amqp10_framing:encode_bin(Record),
Frame = amqp10_binary_generator:build_frame(0, FrameType, Encoded),
?DBG("CONN <- ~tp", [Record]),
socket_send(Socket, Frame).
amqp10_client_socket:send(Socket, Frame).

send_heartbeat(#state{socket = Socket}) ->
Frame = amqp10_binary_generator:build_heartbeat_frame(),
socket_send(Socket, Frame).

socket_send({tcp, Socket}, Data) ->
gen_tcp:send(Socket, Data);
socket_send({ssl, Socket}, Data) ->
ssl:send(Socket, Data).

socket_shutdown({tcp, Socket}, How) ->
gen_tcp:shutdown(Socket, How);
socket_shutdown({ssl, Socket}, How) ->
ssl:shutdown(Socket, How).
amqp10_client_socket:send(Socket, Frame).

notify_opened(#{notify_when_opened := none}, _) ->
ok;
Expand Down
Loading

0 comments on commit 87bb1a1

Please sign in to comment.