diff --git a/qui/decorators.py b/qui/decorators.py
index d34590d0..5e8c01bf 100644
--- a/qui/decorators.py
+++ b/qui/decorators.py
@@ -50,7 +50,7 @@ def memory(self, memory=0) -> Gtk.Label:
def icon(self) -> Gtk.Image:
''' Returns a `Gtk.Image` containing the colored lock icon '''
icon_vm = Gtk.IconTheme.get_default().load_icon(
- self.vm.label.icon, 16, 0)
+ self.vm.icon, 16, 0)
icon_img = Gtk.Image.new_from_pixbuf(icon_vm)
return icon_img
@@ -65,7 +65,7 @@ def netvm(self) -> Gtk.Label:
return label
-def device_hbox(device, frontend_domains=None) -> Gtk.Box:
+def device_hbox(device) -> Gtk.Box:
''' Returns a :class:`Gtk.Box` containing the device name & icon.. '''
if device.devclass == 'block':
icon = 'drive-removable-media'
@@ -78,11 +78,11 @@ def device_hbox(device, frontend_domains=None) -> Gtk.Box:
dev_icon = create_icon(icon)
name_label = Gtk.Label(xalign=0)
- name = "{}:{} - {}".format(device.backend_domain.name, device.ident,
+ name = "{}:{} - {}".format(device.backend_domain, device.ident,
device.description)
- if frontend_domains:
+ if device.attachments:
name_label.set_markup('{} ({})'.format(
- name, ", ".join([vm.name for vm in frontend_domains])))
+ name, ", ".join([vm for vm in device.attachments])))
else:
name_label.set_text(name)
name_label.set_max_width_chars(64)
@@ -106,7 +106,7 @@ def device_domain_hbox(vm, attached: bool) -> Gtk.Box:
add_icon = create_icon('list-add')
hbox.pack_start(add_icon, False, False, 5)
- name = Gtk.Label(vm.name, xalign=0)
+ name = Gtk.Label(vm, xalign=0)
hbox.pack_start(name, True, True, 5)
return hbox
diff --git a/qui/tray/devices.py b/qui/tray/devices.py
index b1b791db..0e7c6c8b 100644
--- a/qui/tray/devices.py
+++ b/qui/tray/devices.py
@@ -11,7 +11,8 @@
import qubesadmin
import qubesadmin.events
-from qubesadmin import exc
+import qubesadmin.devices
+import qubesadmin.exc
import qui.decorators
import gbulb
@@ -22,103 +23,36 @@
class DomainMenuItem(Gtk.ImageMenuItem):
- ''' A submenu item for the device menu. Allows attaching and
- detaching the device to a domain. '''
+ """ A submenu item for the device menu. Displays attachment status.
+ Allows attaching/detaching the device."""
- def __init__(self, device, vm, attached, *args, **kwargs):
+ def __init__(self, device, vm, *args, **kwargs):
super().__init__(*args, **kwargs)
self.vm = vm
self.device = device
- self.attached = attached
+ self.attached = vm in device.attachments
- icon = self.vm.label.icon
+ icon = self.vm.icon
self.set_image(qui.decorators.create_icon(icon))
- self._hbox = qui.decorators.device_domain_hbox(self.vm,
- self.attached)
- self.devclass = str(self.device.devclass)
-
+ self._hbox = qui.decorators.device_domain_hbox(self.vm, self.attached)
self.add(self._hbox)
- def attach(self):
- assert not self.attached
- self.attached = True
-
- self.remove(self._hbox)
- self._hbox = qui.decorators.device_domain_hbox(self.vm,
- self.attached)
- self.add(self._hbox)
- self.show_all()
-
- def detach(self):
- assert self.attached
- self.attached = False
- self.remove(self._hbox)
- self._hbox = qui.decorators.device_domain_hbox(self.vm,
- self.attached)
- self.add(self._hbox)
- self.show_all()
-
class DomainMenu(Gtk.Menu):
- def __init__(self, device, frontend_domains, qapp,
- dispatcher, gtk_app, *args, **kwargs):
- super(DomainMenu, self).__init__(*args, **kwargs)
+ def __init__(self, device, domains, qapp, gtk_app, **kwargs):
+ super(DomainMenu, self).__init__(**kwargs)
self.device = device
- self.menu_items = {}
+ self.domains = domains
self.qapp = qapp
- self.attached_items = []
- self.frontend_domains = frontend_domains
- self.dispatcher = dispatcher
self.gtk_app = gtk_app
- for vm in self.qapp.domains:
- if vm != device.backend_domain\
- and vm.is_running() and vm.name != 'dom0':
- self.add_vm(vm)
-
- self.dispatcher.add_handler('domain-start', self.add_vm)
- self.dispatcher.add_handler('domain-shutdown', self.remove_vm)
-
- def add_vm(self, vm, _event=None, **_kwargs):
- menu_item = DomainMenuItem(self.device, vm, vm in self.frontend_domains)
- menu_item.connect('activate', self.toggle)
-
- self.menu_items[vm] = menu_item
- if vm in self.frontend_domains:
- self.attached_items.append(menu_item)
-
- # sort function
- position = 0
- for i in self.menu_items:
- if str(self.menu_items[i].vm) < str(vm.name):
- position += 1
-
- self.insert(menu_item, position)
- self.show_all()
- self.queue_draw()
-
- def remove_vm(self, vm, _event=None, **_kwargs):
- if vm not in self.menu_items:
- return
- menu_item = self.menu_items[vm]
- if menu_item in self.attached_items:
- self.attached_items.remove(menu_item)
- self.remove(menu_item)
- self.show_all()
- self.queue_draw()
-
- def dev_attached(self, vm):
- menu_item = self.menu_items[vm]
- menu_item.attach()
- self.attached_items.append(menu_item)
-
- def dev_detached(self, vm):
- menu_item = self.menu_items[vm]
- menu_item.detach()
- if menu_item in self.attached_items:
- self.attached_items.remove(menu_item)
+ for vm in self.domains:
+ if vm != device.backend_domain:
+ menu_item = DomainMenuItem(self.device, vm)
+ menu_item.connect('activate', self.toggle)
+ self.append(menu_item)
def toggle(self, menu_item):
if menu_item.attached:
@@ -127,130 +61,123 @@ def toggle(self, menu_item):
self.attach_item(menu_item)
def attach_item(self, menu_item):
- self.detach_item()
+ detach_successful = self.detach_item()
+
+ if not detach_successful:
+ return
try:
assignment = qubesadmin.devices.DeviceAssignment(
self.device.backend_domain, self.device.ident, persistent=False)
- menu_item.vm.devices[menu_item.devclass].attach(assignment)
- emit_notification(self.gtk_app, "Attaching device",
- "Attaching {} to {}".format(
- self.device.description, menu_item.vm),
- Gio.NotificationPriority.NORMAL)
- except exc.QubesException as ex:
- emit_notification(self.gtk_app,
- "Error",
- "Attaching device {0} to {1} failed. "
- "Error: {2}".format(self.device.description,
- menu_item.vm, ex),
- Gio.NotificationPriority.HIGH,
- error=True)
+
+ vm_to_attach = self.qapp.domains[menu_item.vm]
+ vm_to_attach.devices[menu_item.device.devclass].attach(assignment)
+
+ self.gtk_app.emit_notification(
+ "Attaching device",
+ "Attaching {} to {}".format(self.device.description,
+ menu_item.vm),
+ Gio.NotificationPriority.NORMAL)
+ except qubesadmin.exc.QubesException as ex:
+ self.gtk_app.emit_notification(
+ "Error",
+ "Attaching device {0} to {1} failed. Error: {2}".format(
+ self.device.description, menu_item.vm, ex),
+ Gio.NotificationPriority.HIGH,
+ error=True)
except Exception: # pylint: disable=broad-except
traceback.print_exc(file=sys.stderr)
def detach_item(self):
- for menu_item in self.attached_items:
- emit_notification(self.gtk_app,
- "Detaching device",
- "Detaching {} from {}".format(
- self.device.description,
- menu_item.vm.name),
- Gio.NotificationPriority.NORMAL)
- for assignment\
- in menu_item.vm.devices[menu_item.devclass].assignments():
- try:
- if assignment.device == self.device:
- menu_item.vm.devices[menu_item.devclass].detach(
- assignment)
- except exc.QubesException as ex:
- emit_notification(self.gtk_app,
- "Error",
- "Detaching device {0} from {1} failed. "
- "Error: {2}".format(
- self.device.description,
- menu_item.vm, ex),
- Gio.NotificationPriority.HIGH,
- error=True)
+ for vm in self.device.attachments:
+ self.gtk_app.emit_notification(
+ "Detaching device",
+ "Detaching {} from {}".format(self.device.description, vm),
+ Gio.NotificationPriority.NORMAL)
+ try:
+ assignment = qubesadmin.devices.DeviceAssignment(
+ self.device.backend_domain, self.device.ident,
+ persistent=False)
+ self.qapp.domains[vm].devices[self.device.devclass].detach(
+ assignment)
+ except qubesadmin.exc.QubesException as ex:
+ self.gtk_app.emit_notification(
+ "Error",
+ "Detaching device {0} from {1} failed. Error: {2}".format(
+ self.device.description, vm, ex),
+ Gio.NotificationPriority.HIGH,
+ error=True)
+ return False
+ return True
class DeviceItem(Gtk.ImageMenuItem):
- ''' MenuItem showing the device data and a :class:`DomainMenu`. '''
+ """ MenuItem showing the device data and a :class:`DomainMenu`. """
- def __init__(self, device, frontend_domains, qapp,
- dispatcher, gtk_app, *args, **kwargs):
- "docstring"
+ def __init__(self, device, *args, **kwargs):
super().__init__(*args, **kwargs)
- self.gtk_app = gtk_app
- self.qapp = qapp
self.device = device
- self.devclass = self.device.devclass
- self.frontend_domains = frontend_domains
- self.dispatcher = dispatcher
- vm_icon = self.device.backend_domain.label.icon
- self.hbox = qui.decorators.device_hbox(
- self.device,
- frontend_domains=self.frontend_domains) # type: Gtk.Box
- self.set_image(qui.decorators.create_icon(vm_icon))
+ self.hbox = qui.decorators.device_hbox(self.device) # type: Gtk.Box
+
+ self.set_image(qui.decorators.create_icon(self.device.vm_icon))
+
self.add(self.hbox)
- submenu = DomainMenu(
- self.device, self.frontend_domains, qapp, self.dispatcher,
- self.gtk_app)
- self.set_submenu(submenu)
- self.dispatcher.add_handler('domain-shutdown',
- self.vm_shutdown)
- self.dispatcher.add_handler('domain-start-failed',
- self.vm_shutdown)
- self.dispatcher.add_handler('domain-start', self.vm_start)
- def vm_start(self, vm, _event, **_kwargs):
- if self.device in vm.devices[self.devclass].attached():
- self.device_attached(vm)
+class Device:
+ def __init__(self, dev):
+ self.dev_name = str(dev)
+ self.ident = dev.ident
+ self.description = dev.description
+ self.devclass = dev.devclass
+ self.attachments = set()
+ self.backend_domain = dev.backend_domain.name
+ self.vm_icon = dev.backend_domain.label.icon
- def vm_shutdown(self, vm, _event, **_kwargs):
- if vm in self.frontend_domains:
- self.device_detached(vm)
-
- def device_attached(self, vm):
- self.frontend_domains.append(vm)
- self.remove(self.hbox)
- self.hbox = qui.decorators.device_hbox(
- self.device, frontend_domains=self.frontend_domains)
- self.add(self.hbox)
- self.get_submenu().dev_attached(vm)
- self.show_all()
-
- def device_detached(self, vm):
- if vm:
- self.frontend_domains.remove(vm)
- self.remove(self.hbox)
- self.hbox = qui.decorators.device_hbox(
- self.device, frontend_domains=self.frontend_domains)
- self.add(self.hbox)
- self.get_submenu().dev_detached(vm)
- self.show_all()
+ def __str__(self):
+ return self.dev_name
+ def __eq__(self, other):
+ return str(self) == str(other)
+
+
+class VM:
+ def __init__(self, vm):
+ self.hash = hash(vm)
+ self.vm_name = vm.name
+ self.icon = vm.label.icon
+
+ def __str__(self):
+ return self.vm_name
+
+ def __eq__(self, other):
+ return str(self) == str(other)
+
+ def __lt__(self, other):
+ return str(self) < str(other)
+
+ def __hash__(self):
+ return self.hash
+
+
+class DevicesTray(Gtk.Application):
+ def __init__(self, app_name, qapp, dispatcher):
+ super(DevicesTray, self).__init__()
+ self.name = app_name
+
+ self.devices = {}
+ self.vms = set()
-class DeviceGroups():
- def __init__(self, menu: Gtk.Menu, dispatcher, qapp, gtk_app):
- self.positions = {}
- self.separators = {}
- self.counters = {}
- self.menu = menu
- self.menu_items = {}
- self.qapp = qapp
self.dispatcher = dispatcher
- self.gtk_app = gtk_app
+ self.qapp = qapp
- for pos, dev_type in enumerate(DEV_TYPES):
- self.counters[dev_type] = 0
- separator = Gtk.SeparatorMenuItem()
- self.menu.add(separator)
+ self.set_application_id(self.name)
+ self.register() # register Gtk Application
- self.positions[dev_type] = pos
- self.separators[dev_type] = separator
+ self.initialize_vm_data()
+ self.initialize_dev_data()
for devclass in DEV_TYPES:
self.dispatcher.add_handler('device-attach:' + devclass,
@@ -258,187 +185,146 @@ def __init__(self, menu: Gtk.Menu, dispatcher, qapp, gtk_app):
self.dispatcher.add_handler('device-detach:' + devclass,
self.device_detached)
self.dispatcher.add_handler('device-list-change:' + devclass,
- self.device_change)
+ self.device_list_update)
+
+ self.dispatcher.add_handler('domain-shutdown',
+ self.vm_shutdown)
+ self.dispatcher.add_handler('domain-start-failed',
+ self.vm_shutdown)
+ self.dispatcher.add_handler('domain-start', self.vm_start)
+ self.dispatcher.add_handler('property-set:label', self.on_label_changed)
+
+ self.widget_icon = Gtk.StatusIcon()
+ self.widget_icon.set_from_icon_name('media-removable')
+ self.widget_icon.connect('button-press-event', self.show_menu)
+ self.widget_icon.set_tooltip_markup(
+ 'Qubes Devices\nView and manage devices.')
+
+ def device_list_update(self, vm, _event, **_kwargs):
+
+ changed_devices = []
+
+ # create list of all current devices from the changed VM
+ try:
+ for devclass in DEV_TYPES:
+ for device in vm.devices[devclass]:
+ changed_devices.append(Device(device))
+ except qubesadmin.exc.QubesException:
+ changed_devices = [] # VM was removed
+
+ for dev in changed_devices:
+ if str(dev) not in self.devices:
+ self.devices[str(dev)] = dev
+ self.emit_notification(
+ "Device available",
+ "Device {} is available".format(dev.description),
+ Gio.NotificationPriority.NORMAL)
- def update_device_list(self, vm=None):
- devices = {}
+ dev_to_remove = [name for name, dev in self.devices.items()
+ if dev.backend_domain == vm
+ and name not in changed_devices]
+ for dev_name in dev_to_remove:
+ self.emit_notification(
+ "Device removed",
+ "Device {} is removed".format(
+ self.devices[dev_name].description),
+ Gio.NotificationPriority.NORMAL)
+ del self.devices[dev_name]
+
+ def initialize_vm_data(self):
+ for vm in self.qapp.domains:
+ if vm.klass != 'AdminVM' and vm.is_running():
+ self.vms.add(VM(vm))
- for domain in self.qapp.domains if not vm else [vm]:
+ def initialize_dev_data(self):
+
+ # list all devices
+ for domain in self.qapp.domains:
for devclass in DEV_TYPES:
for device in domain.devices[devclass]:
- devices[device] = []
+ self.devices[str(device)] = Device(device)
+ # list existing device attachments
for domain in self.qapp.domains:
for devclass in DEV_TYPES:
for device in domain.devices[devclass].attached():
- if device in devices:
+ dev = str(device)
+ if dev in self.devices:
# occassionally ghost UnknownDevices appear when a
# device was removed but not detached from a VM
- devices[device].append(domain)
-
- for device in [dev for dev in devices
- if dev not in self.menu_items]:
- self.add(device, devices[device])
-
- for device in [dev for dev in self.menu_items
- if dev not in devices and
- (dev.backend_domain == vm or vm is None)]:
- self.remove(device)
-
- def device_change(self, vm, _event, **_kwargs):
- self.update_device_list(vm)
-
- def add(self, device, frontend_domains):
- if device.devclass not in DEV_TYPES:
- return
-
- position = self._position(device.devclass)
-
- position += len([dev for dev in self.menu_items
- if dev.devclass == device.devclass
- and str(dev) < str(device)])
-
- self._insert(device, frontend_domains, position)
-
- if device.devclass != DEV_TYPES[0]:
- self.separators[device.devclass].show()
-
- if self.gtk_app.startup_in_process:
- return
- emit_notification(self.gtk_app,
- "Device available",
- "Device {} is available".format(device.description),
- Gio.NotificationPriority.NORMAL)
-
- def _position(self, dev_type):
- if dev_type == DEV_TYPES[0]:
- return 0
- return self.positions[dev_type] - self.counters[dev_type] + 1
-
- def _insert(self, device, frontend_domains, position: int) -> None:
- menu_item = DeviceItem(device, frontend_domains, self.qapp,
- self.dispatcher, self.gtk_app)
- self.menu.insert(menu_item, position)
- self.counters[device.devclass] += 1
- self.menu_items[device] = menu_item
- self._shift_positions(device.devclass)
- self._recalc_separators()
- menu_item.show_all()
-
- def remove(self, device):
- for item in self.menu.get_children():
- if getattr(item, 'device', None) == device:
- self.menu.remove(item)
- self.menu_items.pop(device)
- self.counters[item.devclass] -= 1
- self._unshift_positions(item.devclass)
- self._recalc_separators()
- emit_notification(
- self.gtk_app,
- "Device removed",
- "Device {} is removed".format(item.device.description),
- Gio.NotificationPriority.NORMAL)
- return
-
- def _recalc_separators(self):
- for dev_type, size in self.counters.items():
- separator = self.separators[dev_type]
- if separator is not None:
- if size > 0:
- separator.show()
- else:
- separator.hide()
-
- def _shift_positions(self, dev_type):
- if dev_type == DEV_TYPES[-1]:
- return
-
- start_index = DEV_TYPES.index(dev_type)
- index_to_update = DEV_TYPES[start_index:]
-
- for index in index_to_update:
- self.positions[index] += 1
+ self.devices[dev].attachments.add(domain.name)
- def _unshift_positions(self, dev_type):
- if dev_type in [DEV_TYPES[0], DEV_TYPES[-1]]:
+ def device_attached(self, vm, _event, device, **_kwargs):
+ if not vm.is_running() or device.devclass not in DEV_TYPES:
return
- for index in DEV_TYPES[1:]:
- assert self.positions[index] > 0
- self.positions[index] -= 1
+ if str(device) not in self.devices:
+ self.devices[str(device)] = Device(device)
- def device_attached(self, vm, _event, device, **_kwargs):
- if not vm.is_running():
- return
- for item in self.menu.get_children():
- if getattr(item, 'device', None) == device \
- or str(getattr(item, 'device', None)) == str(device):
- item.device_attached(vm)
+ self.devices[str(device)].attachments.add(str(vm))
def device_detached(self, vm, _event, device, **_kwargs):
if not vm.is_running():
return
- for item in self.menu.get_children():
- if getattr(item, 'device', None) == device \
- or str(getattr(item, 'device', None)) == str(device):
- item.device_detached(vm)
+ device = str(device)
-class DevicesTray(Gtk.Application):
- def __init__(self, app_name, qapp, dispatcher):
- super(DevicesTray, self).__init__()
- self.name = app_name
- self.tray_menu = Gtk.Menu()
+ if device in self.devices:
+ self.devices[device].attachments.discard(str(vm))
- self.startup_in_process = True
+ def vm_start(self, vm, _event, **_kwargs):
+ self.vms.add(VM(vm))
- self.dispatcher = dispatcher
- self.qapp = qapp
+ def vm_shutdown(self, vm, _event, **_kwargs):
+ self.vms.discard(vm)
- self.set_application_id(self.name)
- self.register() # register Gtk Application
- self.devices = DeviceGroups(self.tray_menu, self.dispatcher, self.qapp,
- self)
+ def on_label_changed(self, vm, _event, **_kwargs):
+ if not vm: # global properties changed
+ return
+ try:
+ name = vm.name
+ except qubesadmin.exc.QubesPropertyAccessError:
+ return # the VM was deleted before its status could be updated
- self.widget_icon = Gtk.StatusIcon()
- self.widget_icon.set_from_icon_name('media-removable')
- self.widget_icon.connect('button-press-event', self.show_menu)
- self.widget_icon.set_tooltip_markup(
- 'Qubes Devices\nView and manage devices.')
+ for domain in self.vms:
+ if domain.name == name:
+ domain.icon = vm.icon
def show_menu(self, _, event):
- # this awkward hack is used to workaround a Gtk bug
- # that seems to manifest when the widget icon in placed
- # on bottom of the screen
- menu = Gtk.Menu()
- for item in self.tray_menu.get_children():
- self.tray_menu.remove(item)
- menu.add(item)
- menu.show_all()
- self.tray_menu = menu
-
- self.tray_menu.popup(None, # parent_menu_shell
- None, # parent_menu_item
- None, # func
- None, # data
- event.button, # button
- Gtk.get_current_event_time()) # activate_time
-
- def run(self): # pylint: disable=arguments-differ
- self.devices.update_device_list()
-
- self.tray_menu.show_all()
-
- self.startup_in_process = False
-
-
-def emit_notification(gtk_app, title, message, priority, error=False):
- notification = Gio.Notification.new(title)
- notification.set_body(message)
- notification.set_priority(priority)
- if error:
- notification.set_icon(Gio.ThemedIcon.new('dialog-error'))
- gtk_app.send_notification(None, notification)
+ tray_menu = Gtk.Menu()
+
+ # create menu items
+ menu_items = []
+ sorted_vms = sorted(self.vms)
+ for dev in self.devices.values():
+ domain_menu = DomainMenu(dev, sorted_vms, self.qapp, self)
+ device_menu = DeviceItem(dev)
+ device_menu.set_submenu(domain_menu)
+ menu_items.append(device_menu)
+
+ menu_items.sort(key=(lambda x: x.device.devclass + str(x.device)))
+
+ for i, item in enumerate(menu_items):
+ if i > 0 and item.device.devclass != \
+ menu_items[i-1].device.devclass:
+ tray_menu.add(Gtk.SeparatorMenuItem())
+ tray_menu.add(item)
+
+ tray_menu.show_all()
+ tray_menu.popup(None, # parent_menu_shell
+ None, # parent_menu_item
+ None, # func
+ None, # data
+ event.button, # button
+ Gtk.get_current_event_time()) # activate_time
+
+ def emit_notification(self, title, message, priority, error=False):
+ notification = Gio.Notification.new(title)
+ notification.set_body(message)
+ notification.set_priority(priority)
+ if error:
+ notification.set_icon(Gio.ThemedIcon.new('dialog-error'))
+ self.send_notification(None, notification)
def main():
@@ -446,7 +332,6 @@ def main():
dispatcher = qubesadmin.events.EventsDispatcher(qapp)
app = DevicesTray(
'org.qubes.qui.tray.Devices', qapp, dispatcher)
- app.run()
loop = asyncio.get_event_loop()
@@ -472,7 +357,7 @@ def main():
))
dialog.run()
exit_code = 1
-
+ del app
return exit_code