From 5405c86dd68af99f3db32fa52d9a1c4948ef8548 Mon Sep 17 00:00:00 2001 From: Fabian Peter Hammerle Date: Sun, 29 Dec 2024 07:47:17 +0100 Subject: [PATCH] migrate from dbus-python to pure-python jeepney fixes https://github.com/fphammerle/systemctl-mqtt/issues/39 --- .github/workflows/python.yml | 9 - CHANGELOG.md | 3 +- systemctl_mqtt/__init__.py | 16 +- systemctl_mqtt/_dbus.py | 3 + .../message-generators/test_login_manager.py | 98 +++++++++++ tests/test_action.py | 13 +- tests/test_dbus.py | 154 ++++++++++++------ tests/test_mqtt.py | 118 ++++++++++---- tests/test_state_dbus.py | 106 +++++------- 9 files changed, 347 insertions(+), 173 deletions(-) create mode 100644 tests/dbus/message-generators/test_login_manager.py diff --git a/.github/workflows/python.yml b/.github/workflows/python.yml index e60bdd1..001c27c 100644 --- a/.github/workflows/python.yml +++ b/.github/workflows/python.yml @@ -23,11 +23,6 @@ jobs: with: python-version: ${{ matrix.python-version }} - run: pip install --upgrade pipenv==2024.1.0 - - run: sudo apt-get update - # TODO exclude dbus-python & PyGObject from pipenv install - - run: sudo apt-get install --yes --no-install-recommends - libdbus-1-dev - libgirepository1.0-dev - run: pipenv install --python "$PYTHON_VERSION" --deploy --dev env: PYTHON_VERSION: ${{ matrix.python-version }} @@ -57,10 +52,6 @@ jobs: # > [...]/coverage/inorout.py:507: CoverageWarning: # . Module was never imported. (module-not-imported) - run: pip install --upgrade pipenv==2023.6.18 - - run: sudo apt-get update - - run: sudo apt-get install --yes --no-install-recommends - libdbus-1-dev - libgirepository1.0-dev # by default pipenv picks the latest version in PATH - run: pipenv install --python "$PYTHON_VERSION" --deploy --dev env: diff --git a/CHANGELOG.md b/CHANGELOG.md index 3967b0c..5b344b7 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -16,7 +16,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 to pure-python [jeepney](https://gitlab.com/takluyver/jeepney) (removes indirect dependency on libdbus, glib, [PyGObject](https://gitlab.gnome.org/GNOME/pygobject) and - [pycairo](https://github.com/pygobject/pycairo)) + [pycairo](https://github.com/pygobject/pycairo), + fixes https://github.com/fphammerle/systemctl-mqtt/issues/39) - automatic discovery in home assistant: - replace component-based (topic: `/binary_sensor//preparing-for-shutdown/config`) diff --git a/systemctl_mqtt/__init__.py b/systemctl_mqtt/__init__.py index a142d2b..0030c33 100644 --- a/systemctl_mqtt/__init__.py +++ b/systemctl_mqtt/__init__.py @@ -78,14 +78,15 @@ def acquire_shutdown_lock(self) -> None: why="Report shutdown via MQTT", mode="delay", ) + assert isinstance( + self._shutdown_lock, jeepney.fds.FileDescriptor + ), self._shutdown_lock _LOGGER.debug("acquired shutdown inhibitor lock") def release_shutdown_lock(self) -> None: with self._shutdown_lock_mutex: if self._shutdown_lock: - # TODO - # https://dbus.freedesktop.org/doc/dbus-python/dbus.types.html#dbus.types.UnixFd.take - os.close(self._shutdown_lock.take()) + self._shutdown_lock.close() _LOGGER.debug("released shutdown inhibitor lock") self._shutdown_lock = None @@ -276,7 +277,7 @@ def _mqtt_on_connect( ) -def _run( +def _run( # pylint: disable=too-many-arguments *, mqtt_host: str, mqtt_port: int, @@ -288,13 +289,14 @@ def _run( poweroff_delay: datetime.timedelta, mqtt_disable_tls: bool = False, ) -> None: + # pylint: disable=too-many-locals; will be split up when switching to async mqtt dbus_connection = jeepney.io.blocking.open_dbus_connection(bus="SYSTEM") bus_proxy = jeepney.io.blocking.Proxy( msggen=jeepney.bus_messages.message_bus, connection=dbus_connection ) preparing_for_shutdown_match_rule = ( # pylint: disable=protected-access - systemctl_mqtt._dbus.get_login_manager_signal_match_rule("SessionNew") + systemctl_mqtt._dbus.get_login_manager_signal_match_rule("PrepareForShutdown") ) assert bus_proxy.AddMatch(preparing_for_shutdown_match_rule) == () state = _State( @@ -327,9 +329,9 @@ def _run( try: with dbus_connection.filter(preparing_for_shutdown_match_rule) as queue: while True: - print(dbus_connection.recv_until_filtered(queue).body) + (preparing_for_sleep,) = dbus_connection.recv_until_filtered(queue).body state.preparing_for_shutdown_handler( - active=True, mqtt_client=mqtt_client # TODO + active=preparing_for_sleep, mqtt_client=mqtt_client ) finally: # blocks until loop_forever stops diff --git a/systemctl_mqtt/_dbus.py b/systemctl_mqtt/_dbus.py index c70d234..9de7032 100644 --- a/systemctl_mqtt/_dbus.py +++ b/systemctl_mqtt/_dbus.py @@ -60,6 +60,9 @@ def ListInhibitors(self) -> jeepney.low_level.Message: def LockSessions(self) -> jeepney.low_level.Message: return jeepney.new_method_call(remote_obj=self, method="LockSessions") + def CanPowerOff(self) -> jeepney.low_level.Message: + return jeepney.new_method_call(remote_obj=self, method="CanPowerOff") + def ScheduleShutdown( self, *, action: str, time: datetime.datetime ) -> jeepney.low_level.Message: diff --git a/tests/dbus/message-generators/test_login_manager.py b/tests/dbus/message-generators/test_login_manager.py new file mode 100644 index 0000000..2cfad98 --- /dev/null +++ b/tests/dbus/message-generators/test_login_manager.py @@ -0,0 +1,98 @@ +# systemctl-mqtt - MQTT client triggering & reporting shutdown on systemd-based systems +# +# Copyright (C) 2024 Fabian Peter Hammerle +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . + +import contextlib +import datetime +import typing +import unittest.mock + +import pytest +from jeepney.low_level import HeaderFields, Message + +import systemctl_mqtt._dbus + +# pylint: disable=protected-access + + +@contextlib.contextmanager +def mock_open_dbus_connection() -> typing.Iterator[unittest.mock.MagicMock]: + with unittest.mock.patch("jeepney.io.blocking.open_dbus_connection") as mock: + yield mock.return_value + + +@pytest.mark.parametrize( + ("member", "signature", "kwargs", "body"), + [ + ("ListInhibitors", None, {}, ()), + ("LockSessions", None, {}, ()), + ("CanPowerOff", None, {}, ()), + ( + "ScheduleShutdown", + "st", + { + "action": "poweroff", + "time": datetime.datetime( + 1970, 1, 1, 0, 0, tzinfo=datetime.timezone.utc + ), + }, + ("poweroff", 0), + ), + ( + "Inhibit", + "ssss", + {"what": "poweroff", "who": "me", "why": "fixing bugs", "mode": "block"}, + ("poweroff", "me", "fixing bugs", "block"), + ), + ], +) +def test_method( + member: str, + signature: typing.Optional[str], + kwargs: typing.Dict[str, typing.Any], + body: typing.Tuple[typing.Any], +) -> None: + with mock_open_dbus_connection() as dbus_connection_mock: + proxy = systemctl_mqtt._dbus.get_login_manager_proxy() + getattr(proxy, member)(**kwargs) + dbus_connection_mock.send_and_get_reply.assert_called_once() + message: Message = dbus_connection_mock.send_and_get_reply.call_args[0][0] + if signature: + assert message.header.fields.pop(HeaderFields.signature) == signature + assert message.header.fields == { + HeaderFields.path: "/org/freedesktop/login1", + HeaderFields.destination: "org.freedesktop.login1", + HeaderFields.interface: "org.freedesktop.login1.Manager", + HeaderFields.member: member, + } + assert message.body == body + + +@pytest.mark.parametrize("property_name", ["HandlePowerKey", "Docked"]) +def test_get(property_name: str) -> None: + with mock_open_dbus_connection() as dbus_connection_mock: + proxy = systemctl_mqtt._dbus.get_login_manager_proxy() + proxy.Get(property_name=property_name) + dbus_connection_mock.send_and_get_reply.assert_called_once() + message: Message = dbus_connection_mock.send_and_get_reply.call_args[0][0] + assert message.header.fields == { + HeaderFields.path: "/org/freedesktop/login1", + HeaderFields.destination: "org.freedesktop.login1", + HeaderFields.interface: "org.freedesktop.DBus.Properties", + HeaderFields.member: "Get", + HeaderFields.signature: "ss", + } + assert message.body == ("org.freedesktop.login1.Manager", property_name) diff --git a/tests/test_action.py b/tests/test_action.py index d3f3948..55577ce 100644 --- a/tests/test_action.py +++ b/tests/test_action.py @@ -14,6 +14,8 @@ def test_poweroff_trigger(delay): action = systemctl_mqtt._MQTTActionSchedulePoweroff() with unittest.mock.patch( + "systemctl_mqtt._dbus.get_login_manager_proxy" + ), unittest.mock.patch( "systemctl_mqtt._dbus.schedule_shutdown" ) as schedule_shutdown_mock: action.trigger( @@ -34,7 +36,7 @@ def test_mqtt_topic_suffix_action_mapping_poweroff(topic_suffix, expected_action mqtt_action = systemctl_mqtt._MQTT_TOPIC_SUFFIX_ACTION_MAPPING[topic_suffix] login_manager_mock = unittest.mock.MagicMock() with unittest.mock.patch( - "systemctl_mqtt._dbus.get_login_manager", return_value=login_manager_mock + "systemctl_mqtt._dbus.get_login_manager_proxy", return_value=login_manager_mock ): mqtt_action.trigger( state=systemctl_mqtt._State( @@ -46,8 +48,11 @@ def test_mqtt_topic_suffix_action_mapping_poweroff(topic_suffix, expected_action ) login_manager_mock.ScheduleShutdown.assert_called_once() schedule_args, schedule_kwargs = login_manager_mock.ScheduleShutdown.call_args - assert len(schedule_args) == 2 - assert schedule_args[0] == expected_action_arg + assert not schedule_args + assert schedule_kwargs.pop("action") == expected_action_arg + assert abs( + datetime.datetime.now() - schedule_kwargs.pop("time") + ) < datetime.timedelta(seconds=2) assert not schedule_kwargs @@ -55,7 +60,7 @@ def test_mqtt_topic_suffix_action_mapping_lock(): mqtt_action = systemctl_mqtt._MQTT_TOPIC_SUFFIX_ACTION_MAPPING["lock-all-sessions"] login_manager_mock = unittest.mock.MagicMock() with unittest.mock.patch( - "systemctl_mqtt._dbus.get_login_manager", return_value=login_manager_mock + "systemctl_mqtt._dbus.get_login_manager_proxy", return_value=login_manager_mock ): mqtt_action.trigger(state="dummy") login_manager_mock.LockSessions.assert_called_once_with() diff --git a/tests/test_dbus.py b/tests/test_dbus.py index f6bff8c..f5988c4 100644 --- a/tests/test_dbus.py +++ b/tests/test_dbus.py @@ -17,54 +17,41 @@ import datetime import logging +import typing import unittest.mock -import dbus +import jeepney +import jeepney.low_level +import jeepney.wrappers import pytest import systemctl_mqtt._dbus -_UTC = datetime.timezone(offset=datetime.timedelta(seconds=0)) - # pylint: disable=protected-access -def test_get_login_manager(): - login_manager = systemctl_mqtt._dbus.get_login_manager() - assert isinstance(login_manager, dbus.proxies.Interface) - assert login_manager.dbus_interface == "org.freedesktop.login1.Manager" +def test_get_login_manager_proxy(): + login_manager = systemctl_mqtt._dbus.get_login_manager_proxy() + assert isinstance(login_manager, jeepney.io.blocking.Proxy) + assert login_manager._msggen.interface == "org.freedesktop.login1.Manager" # https://freedesktop.org/wiki/Software/systemd/logind/ - assert isinstance(login_manager.CanPowerOff(), dbus.String) + assert login_manager.CanPowerOff() in {("yes",), ("challenge",)} def test__log_shutdown_inhibitors_some(caplog): login_manager = unittest.mock.MagicMock() - login_manager.ListInhibitors.return_value = dbus.Array( + login_manager.ListInhibitors.return_value = ( [ - dbus.Struct( - ( - dbus.String("shutdown:sleep"), - dbus.String("Developer"), - dbus.String("Haven't pushed my commits yet"), - dbus.String("delay"), - dbus.UInt32(1000), - dbus.UInt32(1234), - ), - signature=None, - ), - dbus.Struct( - ( - dbus.String("shutdown"), - dbus.String("Editor"), - dbus.String(""), - dbus.String("Unsafed files open"), - dbus.UInt32(0), - dbus.UInt32(42), - ), - signature=None, + ( + "shutdown:sleep", + "Developer", + "Haven't pushed my commits yet", + "delay", + 1000, + 1234, ), + ("shutdown", "Editor", "", "Unsafed files open", 0, 42), ], - signature=dbus.Signature("(ssssuu)"), ) with caplog.at_level(logging.DEBUG): systemctl_mqtt._dbus._log_shutdown_inhibitors(login_manager) @@ -79,7 +66,7 @@ def test__log_shutdown_inhibitors_some(caplog): def test__log_shutdown_inhibitors_none(caplog): login_manager = unittest.mock.MagicMock() - login_manager.ListInhibitors.return_value = dbus.Array([]) + login_manager.ListInhibitors.return_value = ([],) with caplog.at_level(logging.DEBUG): systemctl_mqtt._dbus._log_shutdown_inhibitors(login_manager) assert len(caplog.records) == 1 @@ -89,12 +76,15 @@ def test__log_shutdown_inhibitors_none(caplog): def test__log_shutdown_inhibitors_fail(caplog): login_manager = unittest.mock.MagicMock() - login_manager.ListInhibitors.side_effect = dbus.DBusException("mocked") + login_manager.ListInhibitors.side_effect = DBusErrorResponseMock("error", "mocked") with caplog.at_level(logging.DEBUG): systemctl_mqtt._dbus._log_shutdown_inhibitors(login_manager) assert len(caplog.records) == 1 assert caplog.records[0].levelno == logging.WARNING - assert caplog.records[0].message == "failed to fetch shutdown inhibitors: mocked" + assert ( + caplog.records[0].message + == "failed to fetch shutdown inhibitors: [error] mocked" + ) @pytest.mark.parametrize("action", ["poweroff", "reboot"]) @@ -102,40 +92,52 @@ def test__log_shutdown_inhibitors_fail(caplog): def test__schedule_shutdown(action, delay): login_manager_mock = unittest.mock.MagicMock() with unittest.mock.patch( - "systemctl_mqtt._dbus.get_login_manager", return_value=login_manager_mock + "systemctl_mqtt._dbus.get_login_manager_proxy", return_value=login_manager_mock ): systemctl_mqtt._dbus.schedule_shutdown(action=action, delay=delay) login_manager_mock.ScheduleShutdown.assert_called_once() schedule_args, schedule_kwargs = login_manager_mock.ScheduleShutdown.call_args - assert len(schedule_args) == 2 - assert schedule_args[0] == action - assert isinstance(schedule_args[1], dbus.UInt64) - shutdown_datetime = datetime.datetime.fromtimestamp( - schedule_args[1] / 10**6, tz=_UTC - ) - actual_delay = shutdown_datetime - datetime.datetime.now(tz=_UTC) + assert not schedule_args + assert schedule_kwargs.pop("action") == action + actual_delay = schedule_kwargs.pop("time") - datetime.datetime.now() assert actual_delay.total_seconds() == pytest.approx(delay.total_seconds(), abs=0.1) assert not schedule_kwargs +class DBusErrorResponseMock(jeepney.wrappers.DBusErrorResponse): + # pylint: disable=missing-class-docstring,super-init-not-called + def __init__(self, name: str, data: typing.Any): + self.name = name + self.data = data + + @pytest.mark.parametrize("action", ["poweroff"]) @pytest.mark.parametrize( - ("exception_message", "log_message"), + ("error_name", "error_message", "log_message"), [ - ("test message", "test message"), ( + "test error", + "test message", + "[test error] ('test message',)", + ), + ( + "org.freedesktop.DBus.Error.InteractiveAuthorizationRequired", "Interactive authentication required.", "unauthorized; missing polkit authorization rules?", ), ], ) -def test__schedule_shutdown_fail(caplog, action, exception_message, log_message): +def test__schedule_shutdown_fail( + caplog, action, error_name, error_message, log_message +): login_manager_mock = unittest.mock.MagicMock() - login_manager_mock.ScheduleShutdown.side_effect = dbus.DBusException( - exception_message + login_manager_mock.ScheduleShutdown.side_effect = DBusErrorResponseMock( + name=error_name, + data=(error_message,), ) + login_manager_mock.ListInhibitors.return_value = ([],) with unittest.mock.patch( - "systemctl_mqtt._dbus.get_login_manager", return_value=login_manager_mock + "systemctl_mqtt._dbus.get_login_manager_proxy", return_value=login_manager_mock ), caplog.at_level(logging.DEBUG): systemctl_mqtt._dbus.schedule_shutdown( action=action, delay=datetime.timedelta(seconds=21) @@ -152,10 +154,64 @@ def test__schedule_shutdown_fail(caplog, action, exception_message, log_message) def test_lock_all_sessions(caplog): login_manager_mock = unittest.mock.MagicMock() with unittest.mock.patch( - "systemctl_mqtt._dbus.get_login_manager", return_value=login_manager_mock + "systemctl_mqtt._dbus.get_login_manager_proxy", return_value=login_manager_mock ), caplog.at_level(logging.INFO): systemctl_mqtt._dbus.lock_all_sessions() login_manager_mock.LockSessions.assert_called_once_with() assert len(caplog.records) == 1 assert caplog.records[0].levelno == logging.INFO assert caplog.records[0].message == "instruct all sessions to activate screen locks" + + +def test__run_signal_loop(): + # pylint: disable=too-many-locals,too-many-arguments + login_manager_mock = unittest.mock.MagicMock() + dbus_connection_mock = unittest.mock.MagicMock() + with unittest.mock.patch( + "paho.mqtt.client.Client" + ) as mqtt_client_mock, unittest.mock.patch( + "systemctl_mqtt._dbus.get_login_manager_proxy", return_value=login_manager_mock + ), unittest.mock.patch( + "jeepney.io.blocking.open_dbus_connection", return_value=dbus_connection_mock + ) as open_dbus_connection_mock: + add_match_reply = unittest.mock.Mock() + add_match_reply.body = () + dbus_connection_mock.send_and_get_reply.return_value = add_match_reply + dbus_connection_mock.recv_until_filtered.side_effect = [ + jeepney.low_level.Message(header=None, body=(False,)), + jeepney.low_level.Message(header=None, body=(True,)), + jeepney.low_level.Message(header=None, body=(False,)), + ] + login_manager_mock.Inhibit.return_value = (jeepney.fds.FileDescriptor(-1),) + with pytest.raises(StopIteration): + systemctl_mqtt._run( + mqtt_host="localhost", + mqtt_port=1833, + mqtt_username=None, + mqtt_password=None, + mqtt_topic_prefix="systemctl/host", + homeassistant_discovery_prefix="homeassistant", + homeassistant_discovery_object_id="test", + poweroff_delay=datetime.timedelta(), + ) + open_dbus_connection_mock.assert_called_once_with(bus="SYSTEM") + dbus_connection_mock.send_and_get_reply.assert_called_once() + add_match_msg = dbus_connection_mock.send_and_get_reply.call_args[0][0] + assert ( + add_match_msg.header.fields[jeepney.low_level.HeaderFields.member] == "AddMatch" + ) + assert add_match_msg.body == ( + "interface='org.freedesktop.login1.Manager',member='PrepareForShutdown'" + ",path='/org/freedesktop/login1',type='signal'", + ) + assert mqtt_client_mock().publish.call_args_list == [ + unittest.mock.call( + topic="systemctl/host/preparing-for-shutdown", payload="false", retain=True + ), + unittest.mock.call( + topic="systemctl/host/preparing-for-shutdown", payload="true", retain=True + ), + unittest.mock.call( + topic="systemctl/host/preparing-for-shutdown", payload="false", retain=True + ), + ] diff --git a/tests/test_mqtt.py b/tests/test_mqtt.py index 7a49e56..b092fb0 100644 --- a/tests/test_mqtt.py +++ b/tests/test_mqtt.py @@ -15,13 +15,16 @@ # You should have received a copy of the GNU General Public License # along with this program. If not, see . +import contextlib import datetime import logging import threading import time +import typing import unittest.mock -import dbus +import jeepney.fds +import jeepney.low_level import paho.mqtt.client import pytest from paho.mqtt.client import MQTTMessage @@ -31,6 +34,16 @@ # pylint: disable=protected-access,too-many-positional-arguments +@contextlib.contextmanager +def mock_open_dbus_connection() -> typing.Iterator[unittest.mock.MagicMock]: + with unittest.mock.patch("jeepney.io.blocking.open_dbus_connection") as mock: + add_match_reply = unittest.mock.Mock() + add_match_reply.body = () + mock.return_value.send_and_get_reply.return_value = add_match_reply + mock.return_value.recv_until_filtered.side_effect = [] + yield mock + + @pytest.mark.parametrize("mqtt_host", ["mqtt-broker.local"]) @pytest.mark.parametrize("mqtt_port", [1833]) @pytest.mark.parametrize("mqtt_topic_prefix", ["systemctl/host", "system/command"]) @@ -46,6 +59,7 @@ def test__run( ): # pylint: disable=too-many-locals,too-many-arguments caplog.set_level(logging.DEBUG) + login_manager_mock = unittest.mock.MagicMock() with unittest.mock.patch( "socket.create_connection" ) as create_socket_mock, unittest.mock.patch( @@ -53,22 +67,22 @@ def test__run( ) as ssl_wrap_socket_mock, unittest.mock.patch( "paho.mqtt.client.Client.loop_forever", autospec=True ) as mqtt_loop_forever_mock, unittest.mock.patch( - "gi.repository.GLib.MainLoop.run" - ) as glib_loop_mock, unittest.mock.patch( - "systemctl_mqtt._dbus.get_login_manager" - ) as get_login_manager_mock: + "systemctl_mqtt._dbus.get_login_manager_proxy", return_value=login_manager_mock + ), mock_open_dbus_connection() as open_dbus_connection_mock: ssl_wrap_socket_mock.return_value.send = len - get_login_manager_mock.return_value.Get.return_value = dbus.Boolean(False) - systemctl_mqtt._run( - mqtt_host=mqtt_host, - mqtt_port=mqtt_port, - mqtt_username=None, - mqtt_password=None, - mqtt_topic_prefix=mqtt_topic_prefix, - homeassistant_discovery_prefix=homeassistant_discovery_prefix, - homeassistant_discovery_object_id=homeassistant_discovery_object_id, - poweroff_delay=datetime.timedelta(), - ) + login_manager_mock.Inhibit.return_value = (jeepney.fds.FileDescriptor(-1),) + login_manager_mock.Get.return_value = (("b", False),) + with pytest.raises(StopIteration): + systemctl_mqtt._run( + mqtt_host=mqtt_host, + mqtt_port=mqtt_port, + mqtt_username=None, + mqtt_password=None, + mqtt_topic_prefix=mqtt_topic_prefix, + homeassistant_discovery_prefix=homeassistant_discovery_prefix, + homeassistant_discovery_object_id=homeassistant_discovery_object_id, + poweroff_delay=datetime.timedelta(), + ) assert caplog.records[0].levelno == logging.INFO assert caplog.records[0].message == ( f"connecting to MQTT broker {mqtt_host}:{mqtt_port} (TLS enabled)" @@ -98,11 +112,15 @@ def test__run( "paho.mqtt.client.Client.subscribe" ) as mqtt_subscribe_mock: mqtt_client.on_connect(mqtt_client, mqtt_client._userdata, {}, 0) - state = mqtt_client._userdata - assert ( - state._login_manager.connect_to_signal.call_args[1]["signal_name"] - == "PrepareForShutdown" + open_dbus_connection_mock.assert_called_once_with(bus="SYSTEM") + login_manager_mock.Inhibit.assert_called_once_with( + what="shutdown", + who="systemctl-mqtt", + why="Report shutdown via MQTT", + mode="delay", ) + login_manager_mock.Get.assert_called_once_with("PreparingForShutdown") + open_dbus_connection_mock.return_value.send_and_get_reply.assert_called_once() assert sorted(mqtt_subscribe_mock.call_args_list) == [ unittest.mock.call(mqtt_topic_prefix + "/lock-all-sessions"), unittest.mock.call(mqtt_topic_prefix + "/poweroff"), @@ -146,8 +164,7 @@ def test__run( f" triggering {systemctl_mqtt._MQTT_TOPIC_SUFFIX_ACTION_MAPPING[s]}" for s in ("poweroff", "lock-all-sessions") } - # dbus loop started? - glib_loop_mock.assert_called_once_with() + open_dbus_connection_mock.return_value.filter.assert_called_once() # waited for mqtt loop to stop? assert mqtt_client._thread_terminate assert mqtt_client._thread is None @@ -160,7 +177,7 @@ def test__run_tls(caplog, mqtt_host, mqtt_port, mqtt_disable_tls): caplog.set_level(logging.INFO) with unittest.mock.patch( "paho.mqtt.client.Client" - ) as mqtt_client_class, unittest.mock.patch("gi.repository.GLib.MainLoop.run"): + ) as mqtt_client_class, mock_open_dbus_connection(), pytest.raises(StopIteration): systemctl_mqtt._run( mqtt_host=mqtt_host, mqtt_port=mqtt_port, @@ -186,7 +203,7 @@ def test__run_tls(caplog, mqtt_host, mqtt_port, mqtt_disable_tls): def test__run_tls_default(): with unittest.mock.patch( "paho.mqtt.client.Client" - ) as mqtt_client_class, unittest.mock.patch("gi.repository.GLib.MainLoop.run"): + ) as mqtt_client_class, mock_open_dbus_connection(), pytest.raises(StopIteration): systemctl_mqtt._run( mqtt_host="mqtt-broker.local", mqtt_port=1833, @@ -215,9 +232,9 @@ def test__run_authentication( ) as ssl_wrap_socket_mock, unittest.mock.patch( "paho.mqtt.client.Client.loop_forever", autospec=True ) as mqtt_loop_forever_mock, unittest.mock.patch( - "gi.repository.GLib.MainLoop.run" - ), unittest.mock.patch( - "systemctl_mqtt._dbus.get_login_manager" + "systemctl_mqtt._dbus.get_login_manager_proxy" + ), mock_open_dbus_connection(), pytest.raises( + StopIteration ): ssl_wrap_socket_mock.return_value.send = len systemctl_mqtt._run( @@ -247,12 +264,15 @@ def _initialize_mqtt_client( ) as ssl_wrap_socket_mock, unittest.mock.patch( "paho.mqtt.client.Client.loop_forever", autospec=True ) as mqtt_loop_forever_mock, unittest.mock.patch( - "gi.repository.GLib.MainLoop.run" - ), unittest.mock.patch( - "systemctl_mqtt._dbus.get_login_manager" - ) as get_login_manager_mock: + "systemctl_mqtt._dbus.get_login_manager_proxy" + ) as get_login_manager_mock, mock_open_dbus_connection(), pytest.raises( + StopIteration + ): ssl_wrap_socket_mock.return_value.send = len - get_login_manager_mock.return_value.Get.return_value = dbus.Boolean(False) + get_login_manager_mock.return_value.Inhibit.return_value = ( + jeepney.fds.FileDescriptor(-1), + ) + get_login_manager_mock.return_value.Get.return_value = (("b", True),) systemctl_mqtt._run( mqtt_host=mqtt_host, mqtt_port=mqtt_port, @@ -301,8 +321,8 @@ def test__client_handle_message(caplog, mqtt_host, mqtt_port, mqtt_topic_prefix) @pytest.mark.parametrize("mqtt_password", ["secret"]) def test__run_authentication_missing_username(mqtt_host, mqtt_port, mqtt_password): with unittest.mock.patch("paho.mqtt.client.Client"), unittest.mock.patch( - "systemctl_mqtt._dbus.get_login_manager" - ): + "systemctl_mqtt._dbus.get_login_manager_proxy" + ), mock_open_dbus_connection(): with pytest.raises(ValueError, match=r"^Missing MQTT username$"): systemctl_mqtt._run( mqtt_host=mqtt_host, @@ -365,3 +385,33 @@ def test_mqtt_message_callback_poweroff_retained( ) assert caplog.records[1].levelno == logging.INFO assert caplog.records[1].message == "ignoring retained message" + + +@pytest.mark.parametrize("active", [True, False]) +@pytest.mark.parametrize("block", [True, False]) +def test__publish_preparing_for_shutdown_blocking(active: bool, block: bool) -> None: + login_manager_mock = unittest.mock.MagicMock() + login_manager_mock.Get.return_value = (("b", active),) + with unittest.mock.patch( + "systemctl_mqtt._dbus.get_login_manager_proxy", return_value=login_manager_mock + ): + state = systemctl_mqtt._State( + mqtt_topic_prefix="prefix", + homeassistant_discovery_prefix="prefix", + homeassistant_discovery_object_id="object-id", + poweroff_delay=datetime.timedelta(), + ) + mqtt_client_mock = unittest.mock.MagicMock() + state._publish_preparing_for_shutdown( + mqtt_client=mqtt_client_mock, active=active, block=block + ) + mqtt_client_mock.publish.assert_called_once_with( + topic="prefix/preparing-for-shutdown", + payload="true" if active else "false", + retain=True, + ) + msg_info = mqtt_client_mock.publish.return_value + if block: + msg_info.wait_for_publish.assert_called_once() + else: + msg_info.wait_for_publish.assert_not_called() diff --git a/tests/test_state_dbus.py b/tests/test_state_dbus.py index ddf7f0c..78ccbb6 100644 --- a/tests/test_state_dbus.py +++ b/tests/test_state_dbus.py @@ -19,9 +19,10 @@ import json import logging import re +import typing import unittest.mock -import dbus.types +import jeepney.wrappers import pytest import systemctl_mqtt @@ -30,88 +31,47 @@ def test_shutdown_lock(): - lock_fd = unittest.mock.MagicMock() - with unittest.mock.patch("systemctl_mqtt._dbus.get_login_manager"): + lock_fd = unittest.mock.MagicMock(spec=jeepney.fds.FileDescriptor) + with unittest.mock.patch( + "systemctl_mqtt._dbus.get_login_manager_proxy" + ) as get_login_manager_mock: state = systemctl_mqtt._State( mqtt_topic_prefix="any", homeassistant_discovery_prefix=None, homeassistant_discovery_object_id=None, poweroff_delay=datetime.timedelta(), ) - state._login_manager.Inhibit.return_value = lock_fd + get_login_manager_mock.return_value.Inhibit.return_value = (lock_fd,) state.acquire_shutdown_lock() state._login_manager.Inhibit.assert_called_once_with( - "shutdown", "systemctl-mqtt", "Report shutdown via MQTT", "delay" + what="shutdown", + who="systemctl-mqtt", + why="Report shutdown via MQTT", + mode="delay", ) assert state._shutdown_lock == lock_fd - # https://dbus.freedesktop.org/doc/dbus-python/dbus.types.html#dbus.types.UnixFd.take - lock_fd.take.return_value = "fdnum" - with unittest.mock.patch("os.close") as close_mock: - state.release_shutdown_lock() - close_mock.assert_called_once_with("fdnum") - - -@pytest.mark.parametrize("active", [True, False]) -def test_prepare_for_shutdown_handler(caplog, active): - with unittest.mock.patch("systemctl_mqtt._dbus.get_login_manager"): - state = systemctl_mqtt._State( - mqtt_topic_prefix="any", - homeassistant_discovery_prefix=None, - homeassistant_discovery_object_id=None, - poweroff_delay=datetime.timedelta(), - ) - mqtt_client_mock = unittest.mock.MagicMock() - state.register_prepare_for_shutdown_handler(mqtt_client=mqtt_client_mock) - # pylint: disable=no-member,comparison-with-callable - connect_to_signal_kwargs = state._login_manager.connect_to_signal.call_args[1] - assert connect_to_signal_kwargs["signal_name"] == "PrepareForShutdown" - handler_function = connect_to_signal_kwargs["handler_function"] - assert handler_function.func == state._prepare_for_shutdown_handler - with unittest.mock.patch.object( - state, "acquire_shutdown_lock" - ) as acquire_lock_mock, unittest.mock.patch.object( - state, "release_shutdown_lock" - ) as release_lock_mock: - handler_function(dbus.types.Boolean(active)) - if active: - acquire_lock_mock.assert_not_called() - release_lock_mock.assert_called_once_with() - else: - acquire_lock_mock.assert_called_once_with() - release_lock_mock.assert_not_called() - mqtt_client_mock.publish.assert_called_once_with( - topic="any/preparing-for-shutdown", - payload="true" if active else "false", - retain=True, - ) - assert len(caplog.records) == 1 - assert caplog.records[0].levelno == logging.ERROR - assert caplog.records[0].message.startswith( - "failed to publish on any/preparing-for-shutdown" - ) + lock_fd.close.assert_not_called() + state.release_shutdown_lock() + lock_fd.close.assert_called_once_with() @pytest.mark.parametrize("active", [True, False]) -def test_publish_preparing_for_shutdown(active): +def test_publish_preparing_for_shutdown(active: bool) -> None: login_manager_mock = unittest.mock.MagicMock() - login_manager_mock.Get.return_value = dbus.Boolean(active) + login_manager_mock.Get.return_value = (("b", active),)[:] with unittest.mock.patch( - "systemctl_mqtt._dbus.get_login_manager", return_value=login_manager_mock + "systemctl_mqtt._dbus.get_login_manager_proxy", return_value=login_manager_mock ): state = systemctl_mqtt._State( mqtt_topic_prefix="any", - homeassistant_discovery_prefix=None, - homeassistant_discovery_object_id=None, + homeassistant_discovery_prefix="pre/fix", + homeassistant_discovery_object_id="obj", poweroff_delay=datetime.timedelta(), ) assert state._login_manager == login_manager_mock mqtt_client_mock = unittest.mock.MagicMock() state.publish_preparing_for_shutdown(mqtt_client=mqtt_client_mock) - login_manager_mock.Get.assert_called_once_with( - "org.freedesktop.login1.Manager", - "PreparingForShutdown", - dbus_interface="org.freedesktop.DBus.Properties", - ) + login_manager_mock.Get.assert_called_once_with("PreparingForShutdown") mqtt_client_mock.publish.assert_called_once_with( topic="any/preparing-for-shutdown", payload="true" if active else "false", @@ -119,11 +79,18 @@ def test_publish_preparing_for_shutdown(active): ) +class DBusErrorResponseMock(jeepney.wrappers.DBusErrorResponse): + # pylint: disable=missing-class-docstring,super-init-not-called + def __init__(self, name: str, data: typing.Any): + self.name = name + self.data = data + + def test_publish_preparing_for_shutdown_get_fail(caplog): login_manager_mock = unittest.mock.MagicMock() - login_manager_mock.Get.side_effect = dbus.DBusException("mocked") + login_manager_mock.Get.side_effect = DBusErrorResponseMock("error", ("mocked",)) with unittest.mock.patch( - "systemctl_mqtt._dbus.get_login_manager", return_value=login_manager_mock + "systemctl_mqtt._dbus.get_login_manager_proxy", return_value=login_manager_mock ): state = systemctl_mqtt._State( mqtt_topic_prefix="any", @@ -138,7 +105,7 @@ def test_publish_preparing_for_shutdown_get_fail(caplog): assert caplog.records[0].levelno == logging.ERROR assert ( caplog.records[0].message - == "failed to read logind's PreparingForShutdown property: mocked" + == "failed to read logind's PreparingForShutdown property: [error] ('mocked',)" ) @@ -149,12 +116,13 @@ def test_publish_preparing_for_shutdown_get_fail(caplog): def test_publish_homeassistant_device_config( topic_prefix, discovery_prefix, object_id, hostname ): - state = systemctl_mqtt._State( - mqtt_topic_prefix=topic_prefix, - homeassistant_discovery_prefix=discovery_prefix, - homeassistant_discovery_object_id=object_id, - poweroff_delay=datetime.timedelta(), - ) + with unittest.mock.patch("jeepney.io.blocking.open_dbus_connection"): + state = systemctl_mqtt._State( + mqtt_topic_prefix=topic_prefix, + homeassistant_discovery_prefix=discovery_prefix, + homeassistant_discovery_object_id=object_id, + poweroff_delay=datetime.timedelta(), + ) mqtt_client = unittest.mock.MagicMock() with unittest.mock.patch( "systemctl_mqtt._utils.get_hostname", return_value=hostname