Skip to content

Commit

Permalink
Support AMQP over WebSocket (OSS part)
Browse files Browse the repository at this point in the history
  • Loading branch information
ansd committed Jan 15, 2025
1 parent 87bb1a1 commit 1553522
Show file tree
Hide file tree
Showing 34 changed files with 388 additions and 596 deletions.
7 changes: 4 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,14 @@
[RabbitMQ](https://rabbitmq.com) is a [feature rich](https://www.rabbitmq.com/docs),
multi-protocol messaging and streaming broker. It supports:

* AMQP 0-9-1
* AMQP 1.0
* AMQP 0-9-1
* [RabbitMQ Stream Protocol](https://www.rabbitmq.com/docs/streams)
* MQTT 3.1, 3.1.1, and 5.0
* STOMP 1.0 through 1.2
* [MQTT over WebSockets](https://www.rabbitmq.com/docs/web-mqtt)
* [STOMP over WebSockets](https://www.rabbitmq.com/docs/web-stomp)
* [MQTT over WebSocket](https://www.rabbitmq.com/docs/web-mqtt)
* [STOMP over WebSocket](https://www.rabbitmq.com/docs/web-stomp)
* AMQP 1.0 over WebSocket (supported in [VMware Tanzu RabbitMQ](https://www.vmware.com/products/app-platform/tanzu-rabbitmq))


## Installation
Expand Down
8 changes: 0 additions & 8 deletions deps/rabbit/include/rabbit_amqp.hrl
Original file line number Diff line number Diff line change
Expand Up @@ -43,14 +43,6 @@
node
] ++ ?AUTH_EVENT_KEYS).

-define(INFO_ITEMS,
[connection_state,
recv_oct,
recv_cnt,
send_oct,
send_cnt
] ++ ?ITEMS).

%% for rabbit_event connection_created
-define(CONNECTION_EVENT_KEYS,
[type,
Expand Down
11 changes: 11 additions & 0 deletions deps/rabbit/include/rabbit_amqp_metrics.hrl
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
-define(SIMPLE_METRICS, [pid,
recv_oct,
send_oct,
reductions]).

-define(OTHER_METRICS, [recv_cnt,
send_cnt,
send_pend,
state,
channels,
garbage_collection]).
63 changes: 63 additions & 0 deletions deps/rabbit/include/rabbit_amqp_reader.hrl
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
%% same values as in rabbit_reader
-define(NORMAL_TIMEOUT, 3_000).
-define(CLOSING_TIMEOUT, 30_000).
-define(SILENT_CLOSE_DELAY, 3_000).

%% Allow for potentially large sets of tokens during the SASL exchange.
%% https://docs.oasis-open.org/amqp/amqp-cbs/v1.0/csd01/amqp-cbs-v1.0-csd01.html#_Toc67999915
-define(INITIAL_MAX_FRAME_SIZE, 8192).

-type protocol() :: amqp | sasl.
-type channel_number() :: non_neg_integer().
-type callback() :: handshake |
{frame_header, protocol()} |
{frame_body, protocol(), DataOffset :: pos_integer(), channel_number()}.

-record(v1_connection,
{name :: binary(),
container_id = none :: none | binary(),
vhost = none :: none | rabbit_types:vhost(),
%% server host
host :: inet:ip_address() | inet:hostname(),
%% client host
peer_host :: inet:ip_address() | inet:hostname(),
%% server port
port :: inet:port_number(),
%% client port
peer_port :: inet:port_number(),
connected_at :: integer(),
user = unauthenticated :: unauthenticated | rabbit_types:user(),
timeout = ?NORMAL_TIMEOUT :: non_neg_integer(),
incoming_max_frame_size = ?INITIAL_MAX_FRAME_SIZE :: pos_integer(),
outgoing_max_frame_size = ?INITIAL_MAX_FRAME_SIZE :: unlimited | pos_integer(),
%% "Prior to any explicit negotiation, [...] the maximum channel number is 0." [2.4.1]
channel_max = 0 :: non_neg_integer(),
auth_mechanism = sasl_init_unprocessed :: sasl_init_unprocessed | {binary(), module()},
auth_state = unauthenticated :: term(),
credential_timer :: undefined | reference(),
properties :: undefined | {map, list(tuple())}
}).

-record(v1,
{parent :: pid(),
helper_sup :: pid(),
writer = none :: none | pid(),
heartbeater = none :: none | rabbit_heartbeat:heartbeaters(),
session_sup = none :: none | pid(),
websocket :: boolean(),
sock :: none | rabbit_net:socket(),
proxy_socket :: undefined | {rabbit_proxy_socket, any(), any()},
connection :: none | #v1_connection{},
connection_state :: waiting_amqp3100 | received_amqp3100 | waiting_sasl_init |
securing | waiting_amqp0100 | waiting_open | running |
closing | closed,
callback :: callback(),
recv_len = 8 :: non_neg_integer(),
pending_recv :: boolean(),
buf :: list(),
buf_len :: non_neg_integer(),
tracked_channels = maps:new() :: #{channel_number() => Session :: pid()},
stats_timer :: rabbit_event:state()
}).

-type state() :: #v1{}.
Loading

0 comments on commit 1553522

Please sign in to comment.