diff --git a/CHANGELOG.md b/CHANGELOG.md index 30e52815..448d015b 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,7 +1,15 @@ # Changelog +## Notes + +* The Bluetooth and CAN connections are still not stable on some systems. If you want to have a stable connection use the serial connection. + ## Breaking changes +* Driver version greater or equal to `v1.0.20231126beta` + + The custom name is not saved to the config file anymore, but to the dbus service com.victronenergy.settings. You have to re-enter it once. + * Driver version greater or equal to `v1.0.20230629beta` and smaller or equal to `v1.0.20230926beta`: With `v1.0.20230927beta` the following values changed names: @@ -26,6 +34,7 @@ * Added: Load to SOC reset voltage every x days to reset the SoC to 100% for some BMS by @mr-manuel * Added: Save custom name and make it restart persistant by @mr-manuel * Added: Temperature names to dbus and mqtt by @mr-manuel +* Added: The device instance does not change anymore when you plug the BMS into another USB port. Fixed https://github.com/Louisvdw/dbus-serialbattery/issues/718 by @mr-manuel * Added: Use current average of the last 300 cycles for time to go and time to SoC calculation by @mr-manuel * Added: Validate current, voltage, capacity and SoC for all BMS. This prevents that a device, which is no BMS, is detected as BMS. Fixes also https://github.com/Louisvdw/dbus-serialbattery/issues/479 by @mr-manuel * Changed: `VOLTAGE_DROP` now behaves differently. Before it reduced the voltage for the check, now the voltage for the charger is increased in order to get the target voltage on the BMS by @mr-manuel diff --git a/etc/dbus-serialbattery/battery.py b/etc/dbus-serialbattery/battery.py index 6a229e34..393a0d20 100644 --- a/etc/dbus-serialbattery/battery.py +++ b/etc/dbus-serialbattery/battery.py @@ -7,8 +7,6 @@ import math from time import time from abc import ABC, abstractmethod -import re -import sys class Protection(object): @@ -1253,146 +1251,6 @@ def log_settings(self) -> None: return - # save custom name to config file - def custom_name_callback(self, path, value): - try: - if path == "/CustomName": - file = open( - "/data/etc/dbus-serialbattery/" + utils.PATH_CONFIG_USER, "r" - ) - lines = file.readlines() - last = len(lines) - - # remove not allowed characters - value = value.replace(":", "").replace("=", "").replace(",", "").strip() - - # empty string to save new config file - config_file_new = "" - - # make sure we are in the [DEFAULT] section - current_line_in_default_section = False - default_section_checked = False - - # check if already exists - exists = False - - # count lines - i = 0 - # looping through the file - for line in lines: - # increment by one - i += 1 - - # stripping line break - line = line.strip() - - # check, if current line is after the [DEFAULT] section - if line == "[DEFAULT]": - current_line_in_default_section = True - - # check, if current line starts a new section - if line != "[DEFAULT]" and re.match(r"^\[.*\]", line): - # set default_section_checked to true, if it was already checked and a new section comes on - if current_line_in_default_section and not exists: - default_section_checked = True - current_line_in_default_section = False - - # check, if the current line is the last line - if i == last: - default_section_checked = True - - # insert or replace only in [DEFAULT] section - if current_line_in_default_section and re.match( - r"^CUSTOM_BATTERY_NAMES.*", line - ): - # set that the setting was found, else a new one is created - exists = True - - # remove setting name - line = re.sub( - "^CUSTOM_BATTERY_NAMES\s*=\s*", "", line # noqa: W605 - ) - - # change only the name of the current BMS - result = [] - bms_name_list = line.split(",") - for bms_name_pair in bms_name_list: - tmp = bms_name_pair.split(":") - if tmp[0] == self.port: - result.append(tmp[0] + ":" + value) - else: - result.append(bms_name_pair) - - new_line = "CUSTOM_BATTERY_NAMES = " + ",".join(result) - - else: - if default_section_checked and not exists: - exists = True - - # add before current line - if i != last: - new_line = ( - "CUSTOM_BATTERY_NAMES = " - + self.port - + ":" - + value - + "\n\n" - + line - ) - - # add at the end if last line - else: - new_line = ( - line - + "\n\n" - + "CUSTOM_BATTERY_NAMES = " - + self.port - + ":" - + value - ) - else: - new_line = line - # concatenate the new string and add an end-line break - config_file_new = config_file_new + new_line + "\n" - - # close the file - file.close() - # Open file in write mode - write_file = open( - "/data/etc/dbus-serialbattery/" + utils.PATH_CONFIG_USER, "w" - ) - # overwriting the old file contents with the new/replaced content - write_file.write(config_file_new) - # close the file - write_file.close() - - # logger.error("value (saved): " + str(value)) - - """ - # this removes all comments and tranfsorm the values to lowercase - utils.config.set( - "DEFAULT", - "CUSTOM_BATTERY_NAMES", - self.port + ":" + value, - ) - - # Writing our configuration file to 'example.ini' - with open( - "/data/etc/dbus-serialbattery/" + utils.PATH_CONFIG_USER, "w" - ) as configfile: - type(utils.config.write(configfile)) - """ - - except Exception: - exception_type, exception_object, exception_traceback = sys.exc_info() - file = exception_traceback.tb_frame.f_code.co_filename - line = exception_traceback.tb_lineno - logger.error( - f"Exception occurred: {repr(exception_object)} of type {exception_type} in {file} line #{line}" - ) - - return value - def reset_soc_callback(self, path, value): # callback for handling reset soc request return diff --git a/etc/dbus-serialbattery/dbushelper.py b/etc/dbus-serialbattery/dbushelper.py index 5e47fef4..5e398bd9 100644 --- a/etc/dbus-serialbattery/dbushelper.py +++ b/etc/dbus-serialbattery/dbushelper.py @@ -5,6 +5,9 @@ import dbus # pyright: ignore[reportMissingImports] import traceback from time import time +from utils import logger, publish_config_variables +import utils +from xml.etree import ElementTree # Victron packages sys.path.insert( @@ -14,12 +17,10 @@ "/opt/victronenergy/dbus-systemcalc-py/ext/velib_python", ), ) -from vedbus import VeDbusService # noqa: E402 # pyright: ignore[reportMissingImports] -from settingsdevice import ( # noqa: E402 # pyright: ignore[reportMissingImports] +from vedbus import VeDbusService # noqa: E402 +from settingsdevice import ( # noqa: E402 SettingsDevice, ) -from utils import logger, publish_config_variables # noqa: E402 -import utils # noqa: E402 def get_bus(): @@ -31,6 +32,8 @@ def get_bus(): class DbusHelper: + EMPTY_DICT = {} + def __init__(self, battery): self.battery = battery self.instance = 1 @@ -42,36 +45,205 @@ def __init__(self, battery): + self.battery.port[self.battery.port.rfind("/") + 1 :], get_bus(), ) + self.bms_id = "".join( + # remove all non alphanumeric characters from the identifier + c if c.isalnum() else "_" + for c in self.battery.unique_identifier() + ) + self.path_battery = None def setup_instance(self): + # checks if the battery was already connected once + # if so, get the instance from the dbus settings and update last seen with current time + # if not, create the settings and set the instance to the next available one + # bms_id = self.battery.production if self.battery.production is not None else \ # self.battery.port[self.battery.port.rfind('/') + 1:] - bms_id = self.battery.port[self.battery.port.rfind("/") + 1 :] - path = "/Settings/Devices/serialbattery" - default_instance = "battery:1" + # bms_id = self.battery.port[self.battery.port.rfind("/") + 1 :] + + custom_name = self.battery.custom_name() + device_instance = "1" + device_instances_used = [] + found_bms = False + self.path_battery = "/Settings/Devices/serialbattery" + "_" + str(self.bms_id) + + # prepare settings class + self.settings = SettingsDevice( + get_bus(), self.EMPTY_DICT, self.handle_changed_setting + ) + + # get all the settings from the dbus + settings_from_dbus = self.getSettingsWithValues( + get_bus(), "com.victronenergy.settings", "/Settings/Devices" + ) + # output: + # { + # "Settings": { + # "Devices": { + # "serialbattery_JK_B2A20S20P": { + # "UniqueIdentifier": "JK_B2A20S20P", + # "ClassAndVrmInstance": "battery:1", + # "CustomName": "My Battery 1", + # "ChargeMode": "0", + # "LastSeen": "1700926114", + # }, + # "serialbattery_JK_B2A20S25P": { + # "UniqueIdentifier": "JK_B2A20S25P", + # "ClassAndVrmInstance": "battery:2", + # "CustomName": "My Battery 2", + # "ChargeMode": "0", + # "LastSeen": "1700926114", + # }, + # "serialbattery_ttyUSB0": { + # "ClassAndVrmInstance": "battery:1", + # }, + # "serialbattery_ttyUSB1": { + # "ClassAndVrmInstance": "battery:2", + # }, + # "vegps_ttyUSB0": { + # "ClassAndVrmInstance": "gps:0" + # }, + # } + # } + # } + + # loop through devices in dbus settings + for key, value in settings_from_dbus["Settings"]["Devices"].items(): + # check if it's a serialbattery + if "serialbattery" in key: + # check used device instances + if "ClassAndVrmInstance" in value: + device_instances_used.append( + value["ClassAndVrmInstance"][ + value["ClassAndVrmInstance"].rfind(":") + 1 : + ] + ) + + # check if the battery has a custom name + if "CustomName" in value and value["CustomName"] != "": + custom_name = value["CustomName"] + + # check the last seen time and remove the battery it it was not seen for 30 days + if "LastSeen" in value and int(value["LastSeen"]) < int(time()) - ( + 60 * 60 * 24 * 30 + ): + # remove entry + del_return = self.removeSetting( + get_bus(), + "com.victronenergy.settings", + "/Settings/Devices/" + key, + ["ClassAndVrmInstance", "ChargeMode", "CustomName", "LastSeen"], + ) + logger.info( + f"Remove /Settings/Devices/{key}/LastSeen from dbus. Delete result: {del_return}" + ) + + # check if the battery has a last seen time, if not then it's an old entry + if "LastSeen" not in value: + del_return = self.removeSetting( + get_bus(), + "com.victronenergy.settings", + "/Settings/Devices/" + key, + ["ClassAndVrmInstance"], + ) + logger.info( + f"Remove /Settings/Devices/{key}/ClassAndVrmInstance from dbus." + + f"Old entry. Delete result: {del_return}" + ) + + # check the device instance, if the battery was already connected once + if ( + "UniqueIdentifier" in value + and value["UniqueIdentifier"] == self.bms_id + ): + # set found_bms to true + found_bms = True + + # get the instance from the object name + device_instance = int( + value["ClassAndVrmInstance"][ + value["ClassAndVrmInstance"].rfind(":") + 1 : + ] + ) + logger.info( + f"Found existing battery with DeviceInstance = {device_instance}" + ) + + # create class and crm instance + class_and_vrm_instance = "battery:" + str(device_instance) + + # preare settings and write them to com.victronenergy.settings settings = { - "instance": [ - path + "_" + str(bms_id).replace(" ", "_") + "/ClassAndVrmInstance", - default_instance, + "ClassAndVrmInstance": [ + self.path_battery + "/ClassAndVrmInstance", + class_and_vrm_instance, + 0, + 0, + ], + "ChargeMode": [ + self.path_battery + "/ChargeMode", + 0, # 0 = Bulk, 1 = Absorption, 2 = Float + 0, + 0, + ], + "CustomName": [ + self.path_battery + "/CustomName", + custom_name, + 0, + 0, + ], + "LastSeen": [ + self.path_battery + "/LastSeen", + int(time()), + 0, + 0, + ], + "UniqueIdentifier": [ + self.path_battery + "/UniqueIdentifier", + self.bms_id, 0, 0, ], } - self.settings = SettingsDevice(get_bus(), settings, self.handle_changed_setting) + if found_bms: + self.setSetting( + get_bus(), + "com.victronenergy.settings", + self.path_battery, + "LastSeen", + int(time()), + ) + + self.settings.addSettings(settings) self.battery.role, self.instance = self.get_role_instance() + logger.info(f"Used device instances: {device_instances_used}") + + def update_last_seen(self): + # update the last seen time + self.settings.addSetting( + "/Settings/Devices/serialbattery" + "_" + str(self.bms_id) + "/LastSeen", + int(time()), + 0, + 0, + ) + def get_role_instance(self): - val = self.settings["instance"].split(":") + val = self.settings["ClassAndVrmInstance"].split(":") logger.info("DeviceInstance = %d", int(val[1])) return val[0], int(val[1]) def handle_changed_setting(self, setting, oldvalue, newvalue): - if setting == "instance": + if setting == "ClassAndVrmInstance": self.battery.role, self.instance = self.get_role_instance() - logger.info("Changed DeviceInstance = %d", self.instance) + logger.info(f"Changed DeviceInstance = {self.instance}") + return + if setting == "CustomName": + logger.info(f"Changed CustomName = {newvalue}") return + # this function is called when the battery is initiated def setup_vedbus(self): # Set up dbus service and device instance # and notify of all the attributes we intend to update @@ -100,13 +272,11 @@ def setup_vedbus(self): self._dbusservice.add_path("/Connected", 1) self._dbusservice.add_path( "/CustomName", - self.battery.custom_name(), + self.settings["CustomName"], writeable=True, - onchangecallback=self.battery.custom_name_callback, - ) - self._dbusservice.add_path( - "/Serial", self.battery.unique_identifier(), writeable=True + onchangecallback=self.custom_name_callback, ) + self._dbusservice.add_path("/Serial", self.bms_id, writeable=True) self._dbusservice.add_path( "/DeviceName", self.battery.custom_field, writeable=True ) @@ -680,3 +850,104 @@ def publish_dbus(self): if self.battery.has_settings: self._dbusservice["/Settings/ResetSoc"] = self.battery.reset_soc + + def getSettingsWithValues( + self, bus, service: str, object_path: str, recursive: bool = True + ) -> dict: + # print(object_path) + obj = bus.get_object(service, object_path) + iface = dbus.Interface(obj, "org.freedesktop.DBus.Introspectable") + xml_string = iface.Introspect() + result = {} + for child in ElementTree.fromstring(xml_string): + if child.tag == "node" and recursive: + if object_path == "/": + object_path = "" + new_path = "/".join((object_path, child.attrib["name"])) + # result.update(getSettingsWithValues(bus, service, new_path)) + result_sub = self.getSettingsWithValues(bus, service, new_path) + self.merge_dicts(result, result_sub) + elif child.tag == "interface": + if child.attrib["name"] == "com.victronenergy.Settings": + settings_iface = dbus.Interface(obj, "com.victronenergy.BusItem") + method = settings_iface.get_dbus_method("GetValue") + try: + value = method() + if type(value) is not dbus.Dictionary: + # result[object_path] = str(value) + self.merge_dicts( + result, self.create_nested_dict(object_path, str(value)) + ) + # print(f"{object_path}: {value}") + if not recursive: + return value + except dbus.exceptions.DBusException as e: + logger.error( + f"getSettingsWithValues(): Failed to get value: {e}" + ) + + return result + + def setSetting( + self, bus, service: str, object_path: str, setting_name: str, value + ) -> bool: + obj = bus.get_object(service, object_path + "/" + setting_name) + # iface = dbus.Interface(obj, "org.freedesktop.DBus.Introspectable") + # xml_string = iface.Introspect() + # print(xml_string) + settings_iface = dbus.Interface(obj, "com.victronenergy.BusItem") + method = settings_iface.get_dbus_method("SetValue") + try: + print(f"Setted setting {object_path}/{setting_name} to {value}") + return True if method(value) == 0 else False + except dbus.exceptions.DBusException as e: + print(f"Failed to remove setting: {e}") + + def removeSetting( + self, bus, service: str, object_path: str, setting_name: list + ) -> bool: + obj = bus.get_object(service, object_path) + # iface = dbus.Interface(obj, "org.freedesktop.DBus.Introspectable") + # xml_string = iface.Introspect() + # print(xml_string) + settings_iface = dbus.Interface(obj, "com.victronenergy.Settings") + method = settings_iface.get_dbus_method("RemoveSettings") + try: + print(f"Removed setting at {object_path}") + return True if method(setting_name) == 0 else False + except dbus.exceptions.DBusException as e: + print(f"Failed to remove setting: {e}") + + def create_nested_dict(self, path, value) -> dict: + keys = path.strip("/").split("/") + result = current = {} + for key in keys[:-1]: + current[key] = {} + current = current[key] + current[keys[-1]] = value + return result + + def merge_dicts(self, dict1, dict2) -> None: + for key in dict2: + if ( + key in dict1 + and isinstance(dict1[key], dict) + and isinstance(dict2[key], dict) + ): + self.merge_dicts(dict1[key], dict2[key]) + else: + dict1[key] = dict2[key] + + # save custom name to config file + def custom_name_callback(self, path, value) -> str: + result = self.setSetting( + get_bus(), + "com.victronenergy.settings", + self.path_battery, + "CustomName", + value, + ) + logger.debug( + f'CustomName changed to "{value}" for {self.path_battery}: {result}' + ) + return value if result else None diff --git a/etc/dbus-serialbattery/utils.py b/etc/dbus-serialbattery/utils.py index df4ce2ab..8ed1c48a 100644 --- a/etc/dbus-serialbattery/utils.py +++ b/etc/dbus-serialbattery/utils.py @@ -37,7 +37,7 @@ def _get_list_from_config( # Constants -DRIVER_VERSION = "1.0.20231117dev" +DRIVER_VERSION = "1.0.20231126dev" zero_char = chr(48) degree_sign = "\N{DEGREE SIGN}" @@ -434,9 +434,6 @@ def read_serialport_data( return False -locals_copy = locals().copy() - - # Publish config variables to dbus def publish_config_variables(dbusservice): for variable, value in locals_copy.items(): @@ -449,3 +446,6 @@ def publish_config_variables(dbusservice): or isinstance(value, List) ): dbusservice.add_path(f"/Info/Config/{variable}", value) + + +locals_copy = locals().copy()