From 3acae03af786451fd9d14b9cf40dc90d34a1579e Mon Sep 17 00:00:00 2001 From: Piotr Bartman-Szwarc Date: Mon, 12 Aug 2024 18:17:18 +0200 Subject: [PATCH] q-dev: refactor device_protocol.py --- qubesusbproxy/core3ext.py | 55 +++++++++++++++++++++------------------ qubesusbproxy/utils.py | 12 ++++----- 2 files changed, 36 insertions(+), 31 deletions(-) diff --git a/qubesusbproxy/core3ext.py b/qubesusbproxy/core3ext.py index bf216a5..10ca5d5 100644 --- a/qubesusbproxy/core3ext.py +++ b/qubesusbproxy/core3ext.py @@ -57,7 +57,7 @@ class DeviceInfo(DescriptionOverrider, LegacyDeviceInfo): def __init__(self, port): # not supported options in legacy code self.safe_chars = self.safe_chars.replace(' ', '') - super().__init__(port.backend_domain, port.ident) + super().__init__(port.backend_domain, port.port_id) # needed but not in legacy DeviceInfo self._vendor = None @@ -96,15 +96,15 @@ def get_assigned_devices(devices): class USBDevice(DeviceInfo): # pylint: disable=too-few-public-methods - def __init__(self, backend_domain, ident): + def __init__(self, backend_domain, port_id): # the superclass can restrict the allowed characters self.safe_chars = (string.ascii_letters + string.digits + string.punctuation + ' ') port = qubes.device_protocol.Port( - backend_domain=backend_domain, ident=ident, devclass="usb") + backend_domain=backend_domain, port_id=port_id, devclass="usb") super(USBDevice, self).__init__(port) - self._qdb_ident = ident.replace('.', '_') + self._qdb_ident = port_id.replace('.', '_') self._qdb_path = '/qubes-usb-devices/' + self._qdb_ident self._vendor_id = None self._product_id = None @@ -326,7 +326,7 @@ def attachment(self): if not usb_connected_to_re.match(untrusted_connected_to): self.backend_domain.log.warning( 'Device {} has invalid chars in connected-to ' - 'property'.format(self.ident)) + 'property'.format(self.port_id)) return None untrusted_connected_to = untrusted_connected_to.decode( 'ascii', errors='strict') @@ -335,13 +335,13 @@ def attachment(self): untrusted_connected_to] except KeyError: self.backend_domain.log.warning( - f'Device {self.ident} has invalid VM name in connected-to ' + f'Device {self.port_id} has invalid VM name in connected-to ' f'property: {untrusted_connected_to}') return None return connected_to @property - def self_identity(self) -> str: + def device_id(self) -> str: """ Get identification of a device not related to port. """ @@ -495,7 +495,7 @@ def on_domain_init_load(self, vm, event): # avoid building a cache on domain-init, as it isn't fully set yet, # and definitely isn't running yet current_devices = { - dev.ident: dev.attachment + dev.port_id: dev.attachment for dev in self.on_device_list_usb(vm, None) } self.devices_cache[vm.name] = current_devices @@ -507,14 +507,20 @@ async def attach_and_notify(self, vm, assignment): try: identity = assignment.device_identity device = assignment.device - if identity not in ('any', device.self_identity): + if identity not in ('any', device.device_id): print("Unrecognized identity, skipping attachment of device" f" in port {assignment}", file=sys.stderr) raise UnrecognizedDevice( "Device presented identity " - f"{device.self_identity} " + f"{device.device_id} " f"does not match expected {identity}" ) + + if assignment.mode.value == "ask-to-attach": + if vm.name != utils.confirm_device_attachment( + device, {vm: assignment}): + return + await self.on_device_attach_usb( vm, 'device-pre-attach:usb', device, assignment.options) except UnrecognizedDevice: @@ -526,7 +532,7 @@ async def attach_and_notify(self, vm, assignment): def on_qdb_change(self, vm, event, path): """A change in QubesDB means a change in a device list.""" # pylint: disable=unused-argument,no-self-use - current_devices = dict((dev.ident, dev.attachment) + current_devices = dict((dev.port_id, dev.attachment) for dev in self.on_device_list_usb(vm, None)) utils.device_list_change(self, current_devices, vm, path, USBDevice) @@ -551,17 +557,17 @@ def on_device_list_usb(self, vm, event): if not usb_device_re.match(untrusted_qdb_ident): vm.log.warning('Invalid USB device name detected') continue - ident = untrusted_qdb_ident.replace('_', '.') - yield USBDevice(vm, ident) + port_id = untrusted_qdb_ident.replace('_', '.') + yield USBDevice(vm, port_id) @qubes.ext.handler('device-get:usb') - def on_device_get_usb(self, vm, event, ident): + def on_device_get_usb(self, vm, event, port_id): # pylint: disable=unused-argument,no-self-use if not vm.is_running(): return if vm.untrusted_qdb.list( - '/qubes-usb-devices/' + ident.replace('.', '_')): - yield USBDevice(vm, ident) + '/qubes-usb-devices/' + port_id.replace('.', '_')): + yield USBDevice(vm, port_id) @staticmethod def get_all_devices(app): @@ -582,7 +588,7 @@ def on_device_list_attached(self, vm, event, **kwargs): for dev in self.get_all_devices(vm.app): if dev.attachment == vm: - yield (dev, {'identity': dev.self_identity}) + yield (dev, {'identity': dev.device_id}) @qubes.ext.handler('device-pre-attach:usb') async def on_device_attach_usb(self, vm, event, device, options): @@ -621,12 +627,12 @@ async def on_device_attach_usb(self, vm, event, device, options): # update the cache before the call, to avoid sending duplicated events # (one on qubesdb watch and the other by the caller of this method) - self.devices_cache[device.backend_domain.name][device.ident] = vm + self.devices_cache[device.backend_domain.name][device.port_id] = vm # set qrexec policy to allow this device policy_line = '{} {} allow,user=root\n'.format(name, device.backend_domain.name) - modify_qrexec_policy('qubes.USB+{}'.format(device.ident), + modify_qrexec_policy('qubes.USB+{}'.format(device.port_id), policy_line, True) try: # and actual attach @@ -634,7 +640,7 @@ async def on_device_attach_usb(self, vm, event, device, options): await vm.run_service_for_stdio('qubes.USBAttach', user='root', input='{} {}\n'.format(device.backend_domain.name, - device.ident).encode(), **extra_kwargs) + device.port_id).encode(), **extra_kwargs) except subprocess.CalledProcessError as e: if e.returncode == 127: raise USBProxyNotInstalled( @@ -649,7 +655,7 @@ async def on_device_attach_usb(self, vm, event, device, options): raise QubesUSBException( 'Device attach failed: {}'.format(sanitized_stderr)) finally: - modify_qrexec_policy('qubes.USB+{}'.format(device.ident), + modify_qrexec_policy('qubes.USB+{}'.format(device.port_id), policy_line, False) @qubes.ext.handler('device-pre-detach:usb') @@ -670,13 +676,13 @@ async def on_device_detach_usb(self, vm, event, device): # update the cache before the call, to avoid sending duplicated events # (one on qubesdb watch and the other by the caller of this method) - self.devices_cache[device.backend_domain.name][device.ident] = None + self.devices_cache[device.backend_domain.name][device.port_id] = None try: await device.backend_domain.run_service_for_stdio( 'qubes.USBDetach', user='root', - input='{}\n'.format(device.ident).encode()) + input='{}\n'.format(device.port_id).encode()) except subprocess.CalledProcessError as e: # TODO: sanitize and include stdout raise QubesUSBException('Device detach failed') @@ -685,8 +691,7 @@ async def on_device_detach_usb(self, vm, event, device): async def on_domain_start(self, vm, _event, **_kwargs): # pylint: disable=unused-argument for assignment in get_assigned_devices(vm.devices['usb']): - await self.attach_and_notify( - vm, assignment.device, assignment.options) + await self.attach_and_notify(vm, assignment) @qubes.ext.handler('domain-shutdown') async def on_domain_shutdown(self, vm, _event, **_kwargs): diff --git a/qubesusbproxy/utils.py b/qubesusbproxy/utils.py index da49b4f..460fed4 100644 --- a/qubesusbproxy/utils.py +++ b/qubesusbproxy/utils.py @@ -60,8 +60,8 @@ def device_list_change( for assignment in front_vm.devices[devclass].assignments( persistent=True): if (assignment.backend_domain == vm - and assignment.ident in added - and assignment.ident not in attached + and assignment.port_id in added + and assignment.port_id not in attached ): asyncio.ensure_future(ext.attach_and_notify( front_vm, assignment.device, assignment.options)) @@ -69,10 +69,10 @@ def device_list_change( def compare_device_cache(vm, devices_cache, current_devices): # compare cached devices and current devices, collect: - # - newly appeared devices (ident) - # - devices attached from a vm to frontend vm (ident: frontend_vm) - # - devices detached from frontend vm (ident: frontend_vm) - # - disappeared devices, e.g., plugged out (ident) + # - newly appeared devices (port_id) + # - devices attached from a vm to frontend vm (port_id: frontend_vm) + # - devices detached from frontend vm (port_id: frontend_vm) + # - disappeared devices, e.g., plugged out (port_id) added = set() attached = {} detached = {}