diff --git a/tests/test_client.py b/tests/test_client.py index d6457fe7..145f911c 100644 --- a/tests/test_client.py +++ b/tests/test_client.py @@ -1,19 +1,21 @@ +import threading import time import unicodedata import paho.mqtt.client as client import pytest +from paho.mqtt.enums import MQTTErrorCode, MQTTProtocolVersion from paho.mqtt.reasoncodes import ReasonCodes import tests.paho_test as paho_test # Import test fixture -from tests.testsupport.broker import fake_broker # noqa: F401 +from tests.testsupport.broker import FakeBroker, fake_broker # noqa: F401 @pytest.mark.parametrize("proto_ver", [ - (client.MQTTv31), - (client.MQTTv311), + (MQTTProtocolVersion.MQTTv31), + (MQTTProtocolVersion.MQTTv311), ]) class Test_connect: """ @@ -100,7 +102,7 @@ class Test_connect_v5: def test_01_broker_no_support(self, fake_broker): mqttc = client.Client( - "01-broker-no-support", protocol=client.MQTTv5) + "01-broker-no-support", protocol=MQTTProtocolVersion.MQTTv5) def on_connect(mqttc, obj, flags, reason, properties): assert reason == 132 @@ -137,6 +139,116 @@ def on_connect(mqttc, obj, flags, reason, properties): mqttc.loop_stop() +class TestConnectionLost: + def test_with_loop_start(self, fake_broker: FakeBroker): + mqttc = client.Client( + "test_with_loop_start", + protocol=MQTTProtocolVersion.MQTTv311, + reconnect_on_failure=False, + ) + + on_connect_reached = threading.Event() + on_disconnect_reached = threading.Event() + + + def on_connect(mqttc, obj, flags, rc): + assert rc == 0 + on_connect_reached.set() + + def on_disconnect(*args): + on_disconnect_reached.set() + + mqttc.on_connect = on_connect + mqttc.on_disconnect = on_disconnect + + mqttc.connect_async("localhost", fake_broker.port) + mqttc.loop_start() + + try: + fake_broker.start() + + connect_packet = paho_test.gen_connect( + "test_with_loop_start", keepalive=60, + proto_ver=MQTTProtocolVersion.MQTTv311) + packet_in = fake_broker.receive_packet(1000) + assert packet_in # Check connection was not closed + assert packet_in == connect_packet + + connack_packet = paho_test.gen_connack(rc=0) + count = fake_broker.send_packet(connack_packet) + assert count # Check connection was not closed + assert count == len(connack_packet) + + assert on_connect_reached.wait(1) + assert mqttc.is_connected() + + fake_broker.finish() + + assert on_disconnect_reached.wait(1) + assert not mqttc.is_connected() + + finally: + mqttc.loop_stop() + + def test_with_loop(self, fake_broker: FakeBroker): + mqttc = client.Client( + "test_with_loop", + clean_session=True, + ) + + on_connect_reached = threading.Event() + on_disconnect_reached = threading.Event() + + + def on_connect(mqttc, obj, flags, rc): + assert rc == 0 + on_connect_reached.set() + + def on_disconnect(*args): + on_disconnect_reached.set() + + mqttc.on_connect = on_connect + mqttc.on_disconnect = on_disconnect + + + try: + mqttc.connect("localhost", fake_broker.port) + + fake_broker.start() + + # not yet connected, packet are not yet processed by loop() + assert not mqttc.is_connected() + + # connect packet is sent during connect() call + connect_packet = paho_test.gen_connect( + "test_with_loop", keepalive=60, + proto_ver=MQTTProtocolVersion.MQTTv311) + packet_in = fake_broker.receive_packet(1000) + assert packet_in # Check connection was not closed + assert packet_in == connect_packet + + connack_packet = paho_test.gen_connack(rc=0) + count = fake_broker.send_packet(connack_packet) + assert count # Check connection was not closed + assert count == len(connack_packet) + + # call loop() to process the connack packet + assert mqttc.loop(timeout=1) == MQTTErrorCode.MQTT_ERR_SUCCESS + + assert on_connect_reached.wait(1) + assert mqttc.is_connected() + + fake_broker.finish() + + # call loop() to detect the connection lost + assert mqttc.loop(timeout=1) == MQTTErrorCode.MQTT_ERR_CONN_LOST + + assert on_disconnect_reached.wait(1) + assert not mqttc.is_connected() + + finally: + mqttc.loop_stop() + class TestPublishBroker2Client: def test_invalid_utf8_topic(self, fake_broker):