diff --git a/deps/rabbitmq_mqtt/src/rabbit_mqtt_processor.erl b/deps/rabbitmq_mqtt/src/rabbit_mqtt_processor.erl index 91875474499f..2243d80453ae 100644 --- a/deps/rabbitmq_mqtt/src/rabbit_mqtt_processor.erl +++ b/deps/rabbitmq_mqtt/src/rabbit_mqtt_processor.erl @@ -293,6 +293,20 @@ process_request(?PUBACK, {ok, State} end; +%% MQTT 5 spec 3.3.1.2 QoS +%% If the Server included a Maximum QoS in its CONNACK response +%% to a Client and it receives a PUBLISH packet with a QoS greater than this +%% then it uses DISCONNECT with Reason Code 0x9B (QoS not supported). +process_request(?PUBLISH, + #mqtt_packet{fixed = #mqtt_packet_fixed{qos = ?QOS_2}}, + State = #state{cfg = #cfg{ + proto_ver = ?MQTT_PROTO_V5, + client_id = ClientID}}) -> + ?LOG_DEBUG("Terminating MQTT connection. QoS not supported, client ID: ~s ," + "protocol version: ~p, qos: ~p", + [ClientID, ?MQTT_PROTO_V5, ?QOS_2]), + send_disconnect(?RC_QOS_NOT_SUPPORTED, State), + {stop, server_initiated_disconnect, State}; process_request(?PUBLISH, #mqtt_packet{ fixed = #mqtt_packet_fixed{qos = Qos, diff --git a/deps/rabbitmq_mqtt/test/v5_SUITE.erl b/deps/rabbitmq_mqtt/test/v5_SUITE.erl index 7078a7caeaa6..2b7d71e54e7a 100644 --- a/deps/rabbitmq_mqtt/test/v5_SUITE.erl +++ b/deps/rabbitmq_mqtt/test/v5_SUITE.erl @@ -39,7 +39,8 @@ cluster_size_1_tests() -> client_set_max_packet_size_connack, client_set_max_packet_size_invalid, message_expiry_interval, - message_expiry_interval_will_message + message_expiry_interval_will_message, + client_publish_qos2 ]. cluster_size_3_tests() -> @@ -205,6 +206,14 @@ message_expiry_interval_will_message(Config) -> assert_nothing_received(), ok = emqtt:disconnect(Sub2). +client_publish_qos2(Config) -> + Topic = ClientId = atom_to_binary(?FUNCTION_NAME), + {C, Connect} = start_client(ClientId, Config, 0, []), + ok, {ok, Props} = Connect(C), + ?assertEqual(1, maps:get('Maximum-QoS', Props)), + error, {error, Response} = emqtt:publish(C, Topic, #{}, <<"msg">>, [{qos, 2}]), + ?assertEqual({disconnected, 155, #{}}, Response). + satisfy_bazel(_Config) -> ok. diff --git a/deps/rabbitmq_web_mqtt/src/rabbit_web_mqtt_handler.erl b/deps/rabbitmq_web_mqtt/src/rabbit_web_mqtt_handler.erl index 596c8b72a6ac..2c587c3dd4df 100644 --- a/deps/rabbitmq_web_mqtt/src/rabbit_web_mqtt_handler.erl +++ b/deps/rabbitmq_web_mqtt/src/rabbit_web_mqtt_handler.erl @@ -285,6 +285,9 @@ handle_data1(Data, State = #state{socket = Socket, proc_state = ProcState1}); {error, Reason, _} -> stop_mqtt_protocol_error(State, Reason, ConnName); + {stop, server_initiated_disconnect, _} -> + self() ! {stop, ?CLOSE_PROTOCOL_ERROR, server_initiated_disconnect}, + {[], State}; {stop, disconnect, ProcState1} -> stop({_SendWill = false, State#state{proc_state = ProcState1}}) end