diff --git a/dmx/__init__.py b/dmx/__init__.py index 41d6ef8..9040aa1 100644 --- a/dmx/__init__.py +++ b/dmx/__init__.py @@ -8,9 +8,9 @@ __author__ = 'Rune Monzel' __email__ = 'runemonzel@googlemail.com' -__version__ = '0.1.0' +__version__ = '0.2.0' __all__ = ['DMX', 'logger', 'Device', 'DEVICE_LIST', - 'sleep_us',] \ No newline at end of file + 'sleep_us'] \ No newline at end of file diff --git a/dmx/dmx.py b/dmx/dmx.py index 3dbd702..b7e165b 100644 --- a/dmx/dmx.py +++ b/dmx/dmx.py @@ -7,6 +7,7 @@ # internal python modules import logging import time +from typing import Union # external python modules import serial @@ -90,7 +91,7 @@ def __init__(self, num_of_channels: int = 512, serial_number: str = "") -> None: :param num_of_channels: integer between 1 and 512 :param serial_number: serial number of the RS-485 chip as string. If you want to know the current serial number - of your device, call my_dmx.use_device.serial_number + of your device, call my_dmx.device.serial_number :return None: """ @@ -102,7 +103,7 @@ def __init__(self, num_of_channels: int = 512, serial_number: str = "") -> None: # Search for RS-485 devices, for this look into DEVICE_LIST self.ser = None - self.use_device = None + self.device = None for device in serial.tools.list_ports.comports(): for known_device in DEVICE_LIST: if device.vid == known_device.vid and device.pid == known_device.pid and serial_number == "": @@ -112,7 +113,7 @@ def __init__(self, num_of_channels: int = 512, serial_number: str = "") -> None: except (OSError, serial.SerialException): pass else: - self.use_device = device + self.device = device break elif device.vid == known_device.vid and device.pid == known_device.pid and \ @@ -124,22 +125,22 @@ def __init__(self, num_of_channels: int = 512, serial_number: str = "") -> None: except (OSError, serial.SerialException) as error: raise error else: - self.use_device = device + self.device = device logger.info("Found device with serial number: " + serial_number) break - if self.use_device: - logger.info("Found RS-485 interface: " + self.use_device.description) + if self.device: + logger.info("Found RS-485 interface: " + self.device.description) break - if self.use_device is None: + if self.device is None: raise ConnectionError("Could not find the RS-485 interface.") - if self.use_device.vid == EUROLITE_USB_DMX512_PRO_CABLE_INTERFACE.vid and \ - self.use_device.pid == EUROLITE_USB_DMX512_PRO_CABLE_INTERFACE.pid: + if self.device.vid == EUROLITE_USB_DMX512_PRO_CABLE_INTERFACE.vid and \ + self.device.pid == EUROLITE_USB_DMX512_PRO_CABLE_INTERFACE.pid: self.start_byte = np.array([0x7E, 0x06, 0x01, 0x02, 0x00], np.uint8) self.end_byte = np.array([0xE7], np.uint8) self.num_of_channels = num_of_channels - self.ser = serial.Serial(self.use_device.name, + self.ser = serial.Serial(self.device.name, baudrate=250000, parity=serial.PARITY_NONE, bytesize=serial.EIGHTBITS, @@ -149,7 +150,7 @@ def __init__(self, num_of_channels: int = 512, serial_number: str = "") -> None: self.start_byte = np.array([0x00], np.uint8) self.end_byte = np.array([], np.uint8) self.num_of_channels = num_of_channels - self.ser = serial.Serial(self.use_device.name, + self.ser = serial.Serial(self.device.name, baudrate=250000, parity=serial.PARITY_NONE, bytesize=serial.EIGHTBITS, @@ -175,8 +176,8 @@ def num_of_channels(self, num_of_channels: int) -> None: if num_of_channels > 512: raise ValueError("Number of channels are maximal 512! Only channels 1 to 512 can be accessed. " + "Channel 0 is reserved as start channel.") - if self.use_device.vid == EUROLITE_USB_DMX512_PRO_CABLE_INTERFACE.vid and \ - self.use_device.pid == EUROLITE_USB_DMX512_PRO_CABLE_INTERFACE.pid: + if self.device.vid == EUROLITE_USB_DMX512_PRO_CABLE_INTERFACE.vid and \ + self.device.pid == EUROLITE_USB_DMX512_PRO_CABLE_INTERFACE.pid: self.start_byte[2] = (num_of_channels+1) & 0xFF self.start_byte[3] = ((num_of_channels+1) >> 8) & 0xFF self.__num_of_channels = num_of_channels @@ -186,6 +187,19 @@ def num_of_channels(self, num_of_channels: int) -> None: for channel_id in range(min([len(old_data), len(self.data)])): self.data[channel_id] = old_data[channel_id] + def is_connected(self) -> bool: + """ + checks if the DMX class has a connection to the device + + :return: + """ + connected = False + devices = serial.tools.list_ports.comports() + if self.device is not None: + if self.device in devices: + connected = True + return connected + def set_data(self, channel_id: int, data: int, auto_send: bool = True) -> None: """ @@ -224,11 +238,12 @@ def __del__(self) -> None: """ if isinstance(self.ser, serial.Serial): if self.ser.is_open: + if self.is_connected(): + self.num_of_channels = 512 + self.data = np.zeros([self.num_of_channels], np.uint8) + self.send() + self.send() # make sure it has been send print("close serial port") - self.num_of_channels = 512 - self.data = np.zeros([self.num_of_channels], np.uint8) - self.send() - self.send() # make sure it is send self.ser.close() @@ -289,7 +304,7 @@ def sleep_us(sleep_in_us: int) -> None: dmx.set_data(4, i) time.sleep(0.01) - my_device_sn = dmx.use_device.serial_number + my_device_sn = dmx.device.serial_number del dmx dmx2 = DMX(serial_number=my_device_sn)