Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support MQTT 5.0 #7263

Merged
merged 66 commits into from
Jun 21, 2023
Merged

Support MQTT 5.0 #7263

merged 66 commits into from
Jun 21, 2023

Conversation

ansd
Copy link
Member

@ansd ansd commented Feb 12, 2023

Add support for MQTT 5.0.

Closes #2554

Summary of new features in MQTT v5.0 as (not) implemented in this PR and how they map to RabbitMQ internals:

  • Session expiry Queue TTL
  • Message expiry per-message message TTL
  • Reason code on all ACKs
  • Reason string on all ACKs is optional by the spec. Can easily be added in a future release if needed.
  • Server disconnect e.g. when RabbitMQ node enters maintenance mode
  • Payload format and content type forwarded unaltered by RabbitMQ
  • Request / Response forwarded unaltered by RabbitMQ. Works nicely across protocols, e.g. request-response style RPCs between MQTT 5.0 and AMQP 0.9.1 clients
  • Shared Subscriptions not yet implemented. See Limitations below.
  • Subscription ID requires the receiver to know which MQTT subscription (= AMQP 0.9.1 binding key) caused the message to be routed
  • Topic Alias
  • Flow control Consumer prefetch
  • User properties forwarded unaltered by RabbitMQ
  • Maximum Packet Size
  • Optional Server feature availability e.g. Shared Subscription Available: 0
  • Enhanced authentication not required for now as RabbitMQ already supports authenticating clients via username + password, OAuth tokens, and certificates. We could make use of RabbitMQ SASL mechanisms in the future, if needed. For now the Authentication Method set by the client is ignored and AUTH packet unsupported. Authentication in RabbitMQ MQTT 5.0 works the same as in RabbitMQ MQTT 3.1.1 and 3.1.0.
  • Subscription options similar to Subscription ID, subscription options Retain As Published and No Local require the receiver to know which MQTT subscription (= AMQP 0.9.1 binding key) caused the message to be routed
  • Will delay Although a will message payload is usually small, the protocol spec allows it to be large. Therefore, instead of using Khepri, we store the single Will Delay Message in a separate queue. (Such a queue also gets created for publish-only clients that do not create any subscriptions.) This message has a message TTL set and will be dead lettered to the Topic Exchange unless the client reconnects on time.
  • Server Keep Alive is optional by the spec. Can easily be added in a future release if needed.
  • Assigned ClientID
  • Server reference is optional by the spec. Can be added in a future release if needed. May require a separate Service object per RabbitMQ node on Kubernetes deployed by the rabbitmq/cluster-operator if the Service Type is a Load Balancer.

Summary of MQTT 5.0 limitations in this PR:

  1. Shared Subscriptions is planned to be added in a future RabbitMQ release. Although this feature maps nicely to a queue in RabbitMQ we’ll need a way store shared subscriptions as part of the session state. Efficiently querying shared subscriptions for a client that reconnects requires a new Mnesia table. Given we want to migrate Mnesia to Khepri and given we might need a new mqtt_session_state table anyway in the future (thinking about a Stream based solution which provides support for efficient QoS 1 cloud-to-devices / fanouts and support for replication of QoS 1 messages), we might want to hold that feature off for a bit to avoid further future database migrations.
  2. A Will Message that's both delayed and retained won't be retained. This is because the retainer doesn't consume from a queue currently. This limitation can be resolved in the future with a potentially Stream based replicated retained message store.

TODOs until releasing MQTT 5.0 in RabbitMQ 3.13

@ansd ansd self-assigned this Feb 12, 2023
@mergify mergify bot added the bazel label Feb 12, 2023
@ansd ansd linked an issue Feb 12, 2023 that may be closed by this pull request
@ansd ansd added this to the 3.13.0 milestone Feb 12, 2023
@ansd ansd force-pushed the mqtt5 branch 4 times, most recently from 2c608f3 to aade503 Compare February 21, 2023 11:37
@ansd ansd force-pushed the mqtt5 branch 7 times, most recently from 5c185bb to e23fcc1 Compare February 24, 2023 18:44
@mergify mergify bot added the make label Feb 24, 2023
@ansd ansd force-pushed the mqtt5 branch 8 times, most recently from 87119ea to 290974d Compare March 2, 2023 10:44
deps/rabbitmq_mqtt/test/v5_SUITE.erl Outdated Show resolved Hide resolved
deps/rabbitmq_mqtt/test/v5_SUITE.erl Outdated Show resolved Hide resolved
deps/rabbitmq_mqtt/src/rabbit_mqtt_processor.erl Outdated Show resolved Hide resolved
deps/rabbitmq_mqtt/src/rabbit_mqtt_processor.erl Outdated Show resolved Hide resolved
@ChunyiLyu ChunyiLyu force-pushed the mqtt5 branch 2 times, most recently from 3f7f5cf to c5a993e Compare March 3, 2023 13:21
ansd and others added 19 commits June 21, 2023 13:58
There is a minor performance hit.

The following catch is expensive due to building the stack trace:
```
ExchangeDests = try XMod:route(X, Delivery, Opts)
                catch error:undef -> XMod:route(X, Delivery)
                end,
```

This commit places the routing function and its arity into
persistent_term.
The persistent_term key is only an atom (not a tuple) since hashing only
an atom is cheaper.

We have to erase the key whenever the exchange plugin gets disabled or
enabled. Otherwise, the wrong routing function would be cached in
persistent_term.
1. Starting rabbitmq-server with `+S 1`

2. Create topology:
```
rabbitmqadmin declare queue name=mqtt-subscription-sq1 durable=true queue_type=stream
rabbitmqadmin declare binding source=amq.topic destination=mqtt-subscription-sq1 routing_key=k
```

3.
```
java -jar ./target/perf-test.jar -x 2 -y 0 -ad false -qa x-queue-type=stream -e amq.topic -k k -u mqtt-subscription-sq1 -p -z 30
```

Above benchmark gives the following results (2 runs each):

with binary key <<"x-binding-keys">> as done in ths commit:
sending rate avg: 84672 msg/s
sending rate avg: 87246 msg/s
1.0% of overall CPU time matches rabbit_queue_type:add_binding_keys

with atom key 'x_binding_keys':
sending rate avg: 87508 msg/s
sending rate avg: 86275 msg/s
1.0% of overall CPU time matches rabbit_queue_type:add_binding_keys

Do not set the header at all (i.e. do not call rabbit_basic:add_header/4):
sending rate avg: 89389 msg/s
sending rate avg: 88185 msg/s

This shows that there is no real penalty for using binary keys vs atom
keys for the AMQP 0.9.1 header.

However there is a penalty for adding the AMQP 0.9.1 header.
The penalty will be higher when many different destinations match
with different bindings keys.
However, to implement MQTT 5.0 features Subscription Identifiers and Retain As
Published, we have to add that header and pay the penalty.
Note that we only add the header if there are destination MQTT queues.
Name variables clearly. There is a difference in routing key and binding
key:
The BindingKey can contain wildcards, the RoutingKey cannot.
For MQTT 5.0 destination queues, the topic exchange does not only have
to return the destination queue names, but also the matched binding
keys.
This is needed to implement MQTT 5.0 subscription options No Local,
Retain As Published and Subscription Identifiers.

Prior to this commit, as the trie was walked down, we remembered the
edges being walked and assembled the final binding key with
list_to_binary/1.

list_to_binary/1 is very expensive with long lists (long topic names),
even in OTP 26.
The CPU flame graph showed ~3% of CPU usage was spent only in
list_to_binary/1.

Unfortunately and unnecessarily, the current topic exchange
implementation stores topic levels as lists.

It would be better to store topic levels as binaries:
split_topic_key/1 should ideally use binary:split/3 similar as follows:
```
1> P = binary:compile_pattern(<<".">>).
{bm,#Ref<0.1273071188.1488322568.63736>}
2> Bin = <<"aaa.bbb..ccc">>.
<<"aaa.bbb..ccc">>
3> binary:split(Bin, P, [global]).
[<<"aaa">>,<<"bbb">>,<<>>,<<"ccc">>]
```
The compiled pattern could be placed into persistent term.

This commit decided to avoid migrating Mnesia tables to use binaries
instead of lists. Mnesia migrations are non-trivial, especially with the
current feature flag subsystem.
Furthermore the Mnesia topic tables are already getting migrated to
their Khepri counterparts in 3.13.
Adding additional migration only for Mnesia does not make sense.

So, instead of assembling the binding key as we walk down the trie and
then calling list_to_binary/1 in the leaf, it
would be better to just fetch the binding key from the database in the leaf.

As we reach the leaf of the trie, we know both source and destination.
Unfortunately, we cannot fetch the binding key efficiently with the
current rabbit_route (sorted by source exchange) and
rabbit_reverse_route (sorted by destination) tables as the key is in
the middle between source and destination.
If there are a huge number of bindings for a given sourc exchange (very
realistic in MQTT use cases) or a large number of bindings for a given
destination (also realistic), it would require scanning these large
number of bindings.

Therefore this commit takes the simplest possible solution:
The solution leverages the fact that binding arguments are already part of
table rabbit_topic_trie_binding.
So, if we simply include the binding key into the binding arguments, we
can fetch and return it efficiently in the topic exchange
implementation.

The following patch omitting fetching the empty list binding argument
(the default) makes routing slower because function
`analyze_pattern.constprop.0` requires significantly more (~2.5%) CPU time
```
@@ -273,7 +273,11 @@ trie_bindings(X, Node) ->
                                    node_id       = Node,
                                    destination   = '$1',
                                    arguments     = '$2'}},
-    mnesia:select(?MNESIA_BINDING_TABLE, [{MatchHead, [], [{{'$1', '$2'}}]}]).
+    mnesia:select(
+      ?MNESIA_BINDING_TABLE,
+      [{MatchHead, [{'andalso', {'is_list', '$2'}, {'=/=', '$2', []}}], [{{'$1', '$2'}}]},
+       {MatchHead, [], ['$1']}
+      ]).
```
Hence, this commit always fetches the binding arguments.

All MQTT 5.0 destination queues will create a binding that
contains the binding key in the binding arguments.

Not only does this solution avoid expensive list_to_binay/1 calls, but
it also means that Erlang app rabbit (specifically the topic exchange
implementation) does not need to be aware of MQTT anymore:
It just returns the binding key when the binding args tell to do so.

In future, once the Khepri migration completed, we should be able to
relatively simply remove the binding key from the binding arguments
again to free up some storage space.

Note that one of the advantages of a trie data structue is its space
efficiency that you don't have to store the same prefixes multiple
times.
However, for RabbitMQ the binding key is already stored at least N times
in various routing tables, so storing it a few times more via the
binding arguments should be acceptable.
The speed improvements are favoured over a few more MBs ETS usage.
- default to 20, configurable through cuttlefish config
- add test to v5 suite for invalid topic alias in publish
Once the server's Topic Alias cache for messages from server to client
is full, this commit does not replace any existing aliases.
So, the first topics "win" and stay in the cache forever.
This matches the behaviour of VerneMQ and EMQX.
For now that's good enough.
In the future, we can easily change that behaviour to some smarter strategy,
for example
1. Hash the TopicName to an Topic Alias and replace the old
   alias, or
2. For the Topic Alias Cache from server to client, keep 2 Maps:
   #{TopicName => TopicAlias} and #{TopicAlias => TopicName} and a
   counter that wraps to 1 once the Topic Alias Maximum is reached and
   just replace an existing Alias if the TopicName is not cached.

Also, refactor Topic Alias Maximum:
* Remove code duplication
* Allow an operator to prohibit Topic Aliases by allowing value 0 to be
  configured
* Change config name to topic_alias_maximum to that it matches exactly
  the MQTT feture name
* Fix wrong code formatting
* Add the invalid or unkown Topic Alias to log message for easier
  troubleshooting
RabbitMQ MQTT already supports authenticating clients via
username + password, OAuth tokens, and certificates.
We could make use of RabbitMQ SASL mechanisms in the future,
if needed. For now, if the client wants to perform extended
authentication, we return Bad Authentication Method in the CONNACK
packet.
- remove topic alias from message props when storing retained msgs
- set topic alias for outbound before sending retained msgs
Matching against #{} does not validate that the map is empty.
Hashing the #resource{} record is expensive.
Routing to 40k queues via the topic exchanges takes:
~150ms prior to this commit
~100ms after this commit

As rabbit_exchange already deduplicates destination queues and binding
keys, there's no need to use maps in rabbit_db_topic_exchange or
rabbit_exchange_type_topic.
Allow rabbitmq_exchange:route_return() to return infos in addition to
the matched binding keys.
For example, some exchange types read the full #amqqueue{} record from the database
and can in future return it from the route/3 function to avoid reading the full
record again in the channel.
This commit fixes the following crash:
```
2388-2023-06-21 08:53:28.189519+00:00 [error] <0.2191.0>     exception error: bad argument
2389-2023-06-21 08:53:28.189519+00:00 [error] <0.2191.0>       in function  lists:keymember/3
2390-2023-06-21 08:53:28.189519+00:00 [error] <0.2191.0>          called as lists:keymember(<<"x-mqtt-publish-qos">>,1,undefined)
2391-2023-06-21 08:53:28.189519+00:00 [error] <0.2191.0>          *** argument 3: not a list
2392-2023-06-21 08:53:28.189519+00:00 [error] <0.2191.0>       in call from rabbit_mqtt_processor:amqp_props_to_mqtt_props/2 (rabbit_mqtt_processor.erl, line 2219)
```

This crash occurs when a message without AMQP 0.9.1 #P_basic.headers
is sent to the MQTT v4 connection process, but consumed by an MQTT v5
connection process.
This is the case when an AMQP 0.9.1 client sends a message to a v4 MQTT
client, and this same client subsequently upgrades to v5 consuming the
message.

When sending from AMQP 0.9.1 client directly to a v5 MQTT connection,
the AMQP 0.9.1 header <<"x-binding-keys">> is set.

When sending from AMQP 0.9.1 client directly to a v4 MQTT connection,
no header is set, but this code branch was not evaluated.
- same tests as v3 test suite, with pathos mqtt V5 client.
- two test cases are removed:
1. sessionRedelivery callback behaviors in v3 and v5 client seems
to be different that throwing exception does not shutdown a v5 client
immediately. This test case tests that unacked qos1 msgs are redelivered
by RMQ broker, which is covered in other test suite
2. lastWillDowngradesQoS2 is removed because for mqtt 5, will msgs with
unsupported Qos is an error and not accepted
The correct order is
```
assertArrayEquals(Expecteds, Actuals)
```

Do not mark the tests as flaky.
Instead, we want to understand and fix the flakes.
- to avoid test poluting since both v3 and v5 java test cases use
the same resource names
ChunyiLyu
ChunyiLyu previously approved these changes Jun 21, 2023
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

MQTT v5.0 support
3 participants