From 4e6e6e6ec9e101a7f5262e8542443789381d4c8f Mon Sep 17 00:00:00 2001 From: MacGH23 Date: Mon, 6 Jan 2025 20:12:50 +0100 Subject: [PATCH] Update standalone_serialbattery.py (#152) --- .../standalone_serialbattery.py | 227 ++++++++++-------- 1 file changed, 127 insertions(+), 100 deletions(-) diff --git a/dbus-serialbattery/standalone_serialbattery.py b/dbus-serialbattery/standalone_serialbattery.py index ffe1482..57d9664 100644 --- a/dbus-serialbattery/standalone_serialbattery.py +++ b/dbus-serialbattery/standalone_serialbattery.py @@ -79,11 +79,11 @@ from time import sleep from standalone_helper import DbusHelper -import utils from battery import Battery from utils import ( BMS_TYPE, + bytearray_to_string, logger, BATTERY_ADDRESSES, ) @@ -171,6 +171,88 @@ def __init__(self, devpath, driverOption, devadr, loglevel): self.helper = {} self.BatIds = [] + def get_battery(self, _port: str, _bus_address: hex = None, can_transport_interface: callable = None) -> Union[Battery, None]: + """ + Attempts to establish a connection to the battery and returns the battery object if successful. + + :param _port: The port to connect to. + :param _bus_address: The Modbus/CAN address to connect to (optional). + :return: The battery object if a connection is established, otherwise None. + """ + # Try to establish communications with the battery 3 times, else exit + retry = 1 + retries = 3 + while retry <= retries: + logging.info("-- Testing BMS: " + str(retry) + " of " + str(retries) + " rounds") + # create a new battery object that can read the battery and run connection test + for test in self.expected_bms_types: + # noinspection PyBroadException + try: + if _bus_address is not None: + # Convert hex string to bytes + _bms_address = bytes.fromhex(_bus_address.replace("0x", "")) + elif "address" in test: + _bms_address = test["address"] + else: + _bms_address = None + + logging.info( + "Testing " + test["bms"].__name__ + (' at address "' + bytearray_to_string(_bms_address) + '"' if _bms_address is not None else "") + ) + batteryClass = test["bms"] + baud = test["baud"] if "baud" in test else None + battery: Battery = batteryClass(port=_port, baud=baud, address=_bms_address) + if battery.test_connection() and battery.validate_data(): + logging.info("-- Connection established to " + battery.__class__.__name__) + return battery + except KeyboardInterrupt: + return None + except Exception: + ( + exception_type, + exception_object, + exception_traceback, + ) = sys.exc_info() + file = exception_traceback.tb_frame.f_code.co_filename + line = exception_traceback.tb_lineno + logging.error("Non blocking exception occurred: " + f"{repr(exception_object)} of type {exception_type} in {file} line #{line}") + # Ignore any malfunction test_function() + pass + retry += 1 + sleep(0.5) + + return None + + def check_bms_types(self, supported_bms_types, type) -> None: + """ + Checks if BMS_TYPE is not empty and all specified BMS types are supported. + + :param supported_bms_types: List of supported BMS types. + :param type: The type of BMS connection (ble, can, or serial). + :return: None + """ + # Get only BMS_TYPE that end with "_Ble" + if type == "ble": + bms_types = [type for type in BMS_TYPE if type.endswith("_Ble")] + + # Get only BMS_TYPE that end with "_Can" + if type == "can": + bms_types = [type for type in BMS_TYPE if type.endswith("_Can")] + + # Get only BMS_TYPE that do not end with "_Ble" or "_Can" + if type == "serial": + bms_types = [type for type in BMS_TYPE if not type.endswith("_Ble") and not type.endswith("_Can")] + + if len(bms_types) > 0: + for bms_type in bms_types: + if bms_type not in [bms["bms"].__name__ for bms in supported_bms_types]: + logger.error( + f'ERROR >>> BMS type "{bms_type}" is not supported. Supported BMS types are: ' + + f"{', '.join([bms['bms'].__name__ for bms in supported_bms_types])}" + + "; Disabled by default: ANT, MNB, Sinowealth" + ) + raise (None, None, 1) + def bms_open(self): logging.info("open serial interface") # check if BMS_TYPE is not empty and all BMS types in the list are supported @@ -212,8 +294,12 @@ def bms_open(self): if self.driveroption == 10: # can interface """ - Import CAN classes only, if it's a can port, else the driver won't start due to missing python modules - This prevent problems when using the driver only with a serial connection + Import CAN classes only if it's a CAN port; otherwise, the driver won't start due to missing Python modules. + This prevents issues when using the driver exclusively with a serial connection + + can: Older GX devices and Raspberry Pi with CAN hat + vecan: Newer Venus GX devices + vcan: Virtual CAN interface for testing """ from bms.daly_can import Daly_Can from bms.jkbms_can import Jkbms_Can @@ -228,32 +314,51 @@ def bms_open(self): battery_type for battery_type in self.supported_bms_types if battery_type["bms"].__name__ in BMS_TYPE or len(BMS_TYPE) == 0 ] + # If no BMS type is supported, use all supported BMS types + + if len(self.expected_bms_types) == 0: + logging.warning(f"No supported CAN BMS type found in BMS_TYPE: {', '.join(BMS_TYPE)}. Using all supported BMS types.") + self.expected_bms_types = supported_bms_types + # start the corresponding CanReceiverThread if BMS for this type found - from utils_can import CanReceiverThread + from utils_can import CanReceiverThread, CanTransportInterface try: can_thread = CanReceiverThread.get_instance(bustype="socketcan", channel=self.devpath) except Exception as e: print(f"Error: {e}") + # wait until thread has initialized + if not can_thread.can_initialised.wait(2): + logger.error("Timeout while accessing CAN interface") + sleep(60) + + can_transport_interface = CanTransportInterface() + can_transport_interface.can_message_cache_callback = can_thread.get_message_cache + can_transport_interface.can_bus = can_thread.can_bus logging.debug("Wait shortly to make sure that all needed data is in the cache") # Slowest message cycle trasmission is every 1 second, wait a bit more for the fist time to fetch all needed data sleep(2) - - # check if BATTERY_ADDRESSES is not empty - if BATTERY_ADDRESSES: - logging.info(">>> CAN multi device mode") - for address in BATTERY_ADDRESSES: - checkbatt = self.get_battery(self.devpath, address, can_thread.get_message_cache) - if checkbatt is not None: - self.battery[address] = checkbatt - logging.info("Successful battery connection at " + self.devpath + " and this device address " + str(address)) + addresses = [None] if len(BATTERY_ADDRESSES) == 0 else BATTERY_ADDRESSES # use default address, if not configured + + for busspeed in [250, 500]: + for address in addresses: + bat = self.get_battery(self.devpath, address, can_transport_interface) + if bat: + self.battery[address] = bat + logger.info(f"Successful battery connection at {self.devpath} and this address {str(address)}") else: - logging.warning("No battery connection at " + self.devpath + " and this device address " + str(address)) - # use default address - else: - self.battery[0] = self.get_battery(self.devpath, None, can_thread.get_message_cache) + logger.warning(f"No battery connection at {self.devpath} and this address {str(address)}") + + # if we've found at least 1 battery, stop the search here. otherwise retry with other bus speeds + if len(self.battery) > 0: + break + + logger.info(f"Found no devices on can bus, retrying with {busspeed} kbps") + can_thread.setup_can(channel=self.devpath, bitrate=busspeed, force=True) + sleep(2) + # SERIAL else: # Serial, modbus, ... # check if BMS_TYPE is not empty and all BMS types in the list are supported # self.check_bms_types(supported_bms_types, "serial") @@ -265,12 +370,12 @@ def bms_open(self): # check if BATTERY_ADDRESSES is not empty if BATTERY_ADDRESSES: for address in BATTERY_ADDRESSES: - checkbatt = self.get_battery(self.devpath, address) - if checkbatt is not None: - self.battery[address] = checkbatt - logger.info("Successful battery connection at " + self.devpath + " and this Modbus address " + str(address)) + found_battery = self.get_battery(self.devpath, address) + if found_battery: + self.battery[address] = found_battery + logger.info(f"Successful battery connection at {self.devpath} and this address {address}") else: - logger.warning("No battery connection at " + self.devpath + " and this Modbus address " + str(address)) + logger.warning(f"No battery connection at {self.devpath} and this address {address}") # use default address else: self.battery[0] = self.get_battery(self.devpath) @@ -298,84 +403,6 @@ def bms_open(self): # self.battery.log_settings() return self.BatIds - def check_bms_types(self, supported_bms_types, type) -> None: - """ - Checks if BMS_TYPE is not empty and all specified BMS types are supported. - - :param supported_bms_types: List of supported BMS types. - :param type: The type of BMS connection (ble, can, or serial). - :return: None - """ - # Get only BMS_TYPE that end with "_Ble" - if type == "ble": - bms_types = [type for type in BMS_TYPE if type.endswith("_Ble")] - - # Get only BMS_TYPE that end with "_Can" - if type == "can": - bms_types = [type for type in BMS_TYPE if type.endswith("_Can")] - - # Get only BMS_TYPE that do not end with "_Ble" or "_Can" - if type == "serial": - bms_types = [type for type in BMS_TYPE if not type.endswith("_Ble") and not type.endswith("_Can")] - - if len(bms_types) > 0: - for bms_type in bms_types: - if bms_type not in [bms["bms"].__name__ for bms in supported_bms_types]: - logger.error( - f'ERROR >>> BMS type "{bms_type}" is not supported. Supported BMS types are: ' - + f"{', '.join([bms['bms'].__name__ for bms in supported_bms_types])}" - + "; Disabled by default: ANT, MNB, Sinowealth" - ) - raise (None, None, 1) - - def get_battery(self, _port: str, _modbus_address: hex = None, _can_message_cache_callback: callable = None) -> Union[Battery, None]: - # all the different batteries the driver support and need to test for - # try to establish communications with the battery 3 times, else exit - retry = 1 - retries = 2 - while retry <= retries: - logging.info("-- Testing BMS: " + str(retry) + " of " + str(retries) + " rounds") - # create a new battery object that can read the battery and run connection test - for test in self.expected_bms_types: - # noinspection PyBroadException - try: - if _modbus_address is not None: - # convert hex string to bytes - _bms_address = bytes.fromhex(_modbus_address.replace("0x", "")) - elif "address" in test: - _bms_address = test["address"] - else: - _bms_address = None - - logging.info( - "Testing " - + test["bms"].__name__ - + (' at address "' + utils.bytearray_to_string(_bms_address) + '"' if _bms_address is not None else "") - ) - batteryClass = test["bms"] - baud = test["baud"] if "baud" in test else None - battery: Battery = batteryClass(port=_port, baud=baud, address=_bms_address) - if battery.test_connection() and battery.validate_data(): - logging.info("-- Connection established to " + battery.__class__.__name__) - return battery - except KeyboardInterrupt: - return None - except Exception: - ( - exception_type, - exception_object, - exception_traceback, - ) = sys.exc_info() - file = exception_traceback.tb_frame.f_code.co_filename - line = exception_traceback.tb_lineno - logging.error("Non blocking exception occurred: " + f"{repr(exception_object)} of type {exception_type} in {file} line #{line}") - # Ignore any malfunction test_function() - pass - retry += 1 - sleep(0.5) - - return None - def bms_close(self): logging.debug("close serial interface")