-
Notifications
You must be signed in to change notification settings - Fork 3.9k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Support MQTT 5.0 #7263
Merged
Support MQTT 5.0 #7263
Conversation
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Closed
ansd
force-pushed
the
mqtt5
branch
4 times, most recently
from
February 21, 2023 11:37
2c608f3
to
aade503
Compare
ansd
force-pushed
the
mqtt5
branch
7 times, most recently
from
February 24, 2023 18:44
5c185bb
to
e23fcc1
Compare
ansd
force-pushed
the
mqtt5
branch
8 times, most recently
from
March 2, 2023 10:44
87119ea
to
290974d
Compare
ansd
commented
Mar 3, 2023
ChunyiLyu
force-pushed
the
mqtt5
branch
2 times, most recently
from
March 3, 2023 13:21
3f7f5cf
to
c5a993e
Compare
There is a minor performance hit. The following catch is expensive due to building the stack trace: ``` ExchangeDests = try XMod:route(X, Delivery, Opts) catch error:undef -> XMod:route(X, Delivery) end, ``` This commit places the routing function and its arity into persistent_term. The persistent_term key is only an atom (not a tuple) since hashing only an atom is cheaper. We have to erase the key whenever the exchange plugin gets disabled or enabled. Otherwise, the wrong routing function would be cached in persistent_term.
1. Starting rabbitmq-server with `+S 1` 2. Create topology: ``` rabbitmqadmin declare queue name=mqtt-subscription-sq1 durable=true queue_type=stream rabbitmqadmin declare binding source=amq.topic destination=mqtt-subscription-sq1 routing_key=k ``` 3. ``` java -jar ./target/perf-test.jar -x 2 -y 0 -ad false -qa x-queue-type=stream -e amq.topic -k k -u mqtt-subscription-sq1 -p -z 30 ``` Above benchmark gives the following results (2 runs each): with binary key <<"x-binding-keys">> as done in ths commit: sending rate avg: 84672 msg/s sending rate avg: 87246 msg/s 1.0% of overall CPU time matches rabbit_queue_type:add_binding_keys with atom key 'x_binding_keys': sending rate avg: 87508 msg/s sending rate avg: 86275 msg/s 1.0% of overall CPU time matches rabbit_queue_type:add_binding_keys Do not set the header at all (i.e. do not call rabbit_basic:add_header/4): sending rate avg: 89389 msg/s sending rate avg: 88185 msg/s This shows that there is no real penalty for using binary keys vs atom keys for the AMQP 0.9.1 header. However there is a penalty for adding the AMQP 0.9.1 header. The penalty will be higher when many different destinations match with different bindings keys. However, to implement MQTT 5.0 features Subscription Identifiers and Retain As Published, we have to add that header and pay the penalty. Note that we only add the header if there are destination MQTT queues.
Name variables clearly. There is a difference in routing key and binding key: The BindingKey can contain wildcards, the RoutingKey cannot.
For MQTT 5.0 destination queues, the topic exchange does not only have to return the destination queue names, but also the matched binding keys. This is needed to implement MQTT 5.0 subscription options No Local, Retain As Published and Subscription Identifiers. Prior to this commit, as the trie was walked down, we remembered the edges being walked and assembled the final binding key with list_to_binary/1. list_to_binary/1 is very expensive with long lists (long topic names), even in OTP 26. The CPU flame graph showed ~3% of CPU usage was spent only in list_to_binary/1. Unfortunately and unnecessarily, the current topic exchange implementation stores topic levels as lists. It would be better to store topic levels as binaries: split_topic_key/1 should ideally use binary:split/3 similar as follows: ``` 1> P = binary:compile_pattern(<<".">>). {bm,#Ref<0.1273071188.1488322568.63736>} 2> Bin = <<"aaa.bbb..ccc">>. <<"aaa.bbb..ccc">> 3> binary:split(Bin, P, [global]). [<<"aaa">>,<<"bbb">>,<<>>,<<"ccc">>] ``` The compiled pattern could be placed into persistent term. This commit decided to avoid migrating Mnesia tables to use binaries instead of lists. Mnesia migrations are non-trivial, especially with the current feature flag subsystem. Furthermore the Mnesia topic tables are already getting migrated to their Khepri counterparts in 3.13. Adding additional migration only for Mnesia does not make sense. So, instead of assembling the binding key as we walk down the trie and then calling list_to_binary/1 in the leaf, it would be better to just fetch the binding key from the database in the leaf. As we reach the leaf of the trie, we know both source and destination. Unfortunately, we cannot fetch the binding key efficiently with the current rabbit_route (sorted by source exchange) and rabbit_reverse_route (sorted by destination) tables as the key is in the middle between source and destination. If there are a huge number of bindings for a given sourc exchange (very realistic in MQTT use cases) or a large number of bindings for a given destination (also realistic), it would require scanning these large number of bindings. Therefore this commit takes the simplest possible solution: The solution leverages the fact that binding arguments are already part of table rabbit_topic_trie_binding. So, if we simply include the binding key into the binding arguments, we can fetch and return it efficiently in the topic exchange implementation. The following patch omitting fetching the empty list binding argument (the default) makes routing slower because function `analyze_pattern.constprop.0` requires significantly more (~2.5%) CPU time ``` @@ -273,7 +273,11 @@ trie_bindings(X, Node) -> node_id = Node, destination = '$1', arguments = '$2'}}, - mnesia:select(?MNESIA_BINDING_TABLE, [{MatchHead, [], [{{'$1', '$2'}}]}]). + mnesia:select( + ?MNESIA_BINDING_TABLE, + [{MatchHead, [{'andalso', {'is_list', '$2'}, {'=/=', '$2', []}}], [{{'$1', '$2'}}]}, + {MatchHead, [], ['$1']} + ]). ``` Hence, this commit always fetches the binding arguments. All MQTT 5.0 destination queues will create a binding that contains the binding key in the binding arguments. Not only does this solution avoid expensive list_to_binay/1 calls, but it also means that Erlang app rabbit (specifically the topic exchange implementation) does not need to be aware of MQTT anymore: It just returns the binding key when the binding args tell to do so. In future, once the Khepri migration completed, we should be able to relatively simply remove the binding key from the binding arguments again to free up some storage space. Note that one of the advantages of a trie data structue is its space efficiency that you don't have to store the same prefixes multiple times. However, for RabbitMQ the binding key is already stored at least N times in various routing tables, so storing it a few times more via the binding arguments should be acceptable. The speed improvements are favoured over a few more MBs ETS usage.
- default to 20, configurable through cuttlefish config - add test to v5 suite for invalid topic alias in publish
Once the server's Topic Alias cache for messages from server to client is full, this commit does not replace any existing aliases. So, the first topics "win" and stay in the cache forever. This matches the behaviour of VerneMQ and EMQX. For now that's good enough. In the future, we can easily change that behaviour to some smarter strategy, for example 1. Hash the TopicName to an Topic Alias and replace the old alias, or 2. For the Topic Alias Cache from server to client, keep 2 Maps: #{TopicName => TopicAlias} and #{TopicAlias => TopicName} and a counter that wraps to 1 once the Topic Alias Maximum is reached and just replace an existing Alias if the TopicName is not cached. Also, refactor Topic Alias Maximum: * Remove code duplication * Allow an operator to prohibit Topic Aliases by allowing value 0 to be configured * Change config name to topic_alias_maximum to that it matches exactly the MQTT feture name * Fix wrong code formatting * Add the invalid or unkown Topic Alias to log message for easier troubleshooting
RabbitMQ MQTT already supports authenticating clients via username + password, OAuth tokens, and certificates. We could make use of RabbitMQ SASL mechanisms in the future, if needed. For now, if the client wants to perform extended authentication, we return Bad Authentication Method in the CONNACK packet.
- remove topic alias from message props when storing retained msgs - set topic alias for outbound before sending retained msgs
Matching against #{} does not validate that the map is empty.
Hashing the #resource{} record is expensive. Routing to 40k queues via the topic exchanges takes: ~150ms prior to this commit ~100ms after this commit As rabbit_exchange already deduplicates destination queues and binding keys, there's no need to use maps in rabbit_db_topic_exchange or rabbit_exchange_type_topic.
Allow rabbitmq_exchange:route_return() to return infos in addition to the matched binding keys. For example, some exchange types read the full #amqqueue{} record from the database and can in future return it from the route/3 function to avoid reading the full record again in the channel.
This commit fixes the following crash: ``` 2388-2023-06-21 08:53:28.189519+00:00 [error] <0.2191.0> exception error: bad argument 2389-2023-06-21 08:53:28.189519+00:00 [error] <0.2191.0> in function lists:keymember/3 2390-2023-06-21 08:53:28.189519+00:00 [error] <0.2191.0> called as lists:keymember(<<"x-mqtt-publish-qos">>,1,undefined) 2391-2023-06-21 08:53:28.189519+00:00 [error] <0.2191.0> *** argument 3: not a list 2392-2023-06-21 08:53:28.189519+00:00 [error] <0.2191.0> in call from rabbit_mqtt_processor:amqp_props_to_mqtt_props/2 (rabbit_mqtt_processor.erl, line 2219) ``` This crash occurs when a message without AMQP 0.9.1 #P_basic.headers is sent to the MQTT v4 connection process, but consumed by an MQTT v5 connection process. This is the case when an AMQP 0.9.1 client sends a message to a v4 MQTT client, and this same client subsequently upgrades to v5 consuming the message. When sending from AMQP 0.9.1 client directly to a v5 MQTT connection, the AMQP 0.9.1 header <<"x-binding-keys">> is set. When sending from AMQP 0.9.1 client directly to a v4 MQTT connection, no header is set, but this code branch was not evaluated.
- same tests as v3 test suite, with pathos mqtt V5 client. - two test cases are removed: 1. sessionRedelivery callback behaviors in v3 and v5 client seems to be different that throwing exception does not shutdown a v5 client immediately. This test case tests that unacked qos1 msgs are redelivered by RMQ broker, which is covered in other test suite 2. lastWillDowngradesQoS2 is removed because for mqtt 5, will msgs with unsupported Qos is an error and not accepted
The correct order is ``` assertArrayEquals(Expecteds, Actuals) ``` Do not mark the tests as flaky. Instead, we want to understand and fix the flakes.
- to avoid test poluting since both v3 and v5 java test cases use the same resource names
- one RMQ cluster per test group
ChunyiLyu
previously approved these changes
Jun 21, 2023
ChunyiLyu
approved these changes
Jun 21, 2023
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Add support for MQTT 5.0.
Closes #2554
Summary of new features in MQTT v5.0 as (not) implemented in this PR and how they map to RabbitMQ internals:
Session expiry
Queue TTLMessage expiry
per-message message TTLReason code on all ACKs
Reason string on all ACKs
is optional by the spec. Can easily be added in a future release if needed.Server disconnect
e.g. when RabbitMQ node enters maintenance modePayload format and content type
forwarded unaltered by RabbitMQRequest / Response
forwarded unaltered by RabbitMQ. Works nicely across protocols, e.g. request-response style RPCs between MQTT 5.0 and AMQP 0.9.1 clientsShared Subscriptions
not yet implemented. SeeLimitations
below.Subscription ID
requires the receiver to know which MQTT subscription (= AMQP 0.9.1 binding key) caused the message to be routedTopic Alias
Flow control
Consumer prefetchUser properties
forwarded unaltered by RabbitMQMaximum Packet Size
Optional Server feature availability
e.g.Shared Subscription Available: 0
Enhanced authentication
not required for now as RabbitMQ already supports authenticating clients via username + password, OAuth tokens, and certificates. We could make use of RabbitMQ SASL mechanisms in the future, if needed. For now the Authentication Method set by the client is ignored and AUTH packet unsupported. Authentication in RabbitMQ MQTT 5.0 works the same as in RabbitMQ MQTT 3.1.1 and 3.1.0.Subscription options
similar toSubscription ID
, subscription optionsRetain As Published
andNo Local
require the receiver to know which MQTT subscription (= AMQP 0.9.1 binding key) caused the message to be routedWill delay
Although a will message payload is usually small, the protocol spec allows it to be large. Therefore, instead of using Khepri, we store the single Will Delay Message in a separate queue. (Such a queue also gets created for publish-only clients that do not create any subscriptions.) This message has a message TTL set and will be dead lettered to the Topic Exchange unless the client reconnects on time.Server Keep Alive
is optional by the spec. Can easily be added in a future release if needed.Assigned ClientID
Server reference
is optional by the spec. Can be added in a future release if needed. May require a separateService
object per RabbitMQ node on Kubernetes deployed by the rabbitmq/cluster-operator if the Service Type is a Load Balancer.Summary of MQTT 5.0 limitations in this PR:
Shared Subscriptions
is planned to be added in a future RabbitMQ release. Although this feature maps nicely to a queue in RabbitMQ we’ll need a way store shared subscriptions as part of the session state. Efficiently querying shared subscriptions for a client that reconnects requires a new Mnesia table. Given we want to migrate Mnesia to Khepri and given we might need a newmqtt_session_state
table anyway in the future (thinking about a Stream based solution which provides support for efficient QoS 1 cloud-to-devices / fanouts and support for replication of QoS 1 messages), we might want to hold that feature off for a bit to avoid further future database migrations.TODOs until releasing MQTT 5.0 in RabbitMQ 3.13