Skip to content

Commit

Permalink
Update standalone_serialbattery.py (#152)
Browse files Browse the repository at this point in the history
  • Loading branch information
MacGH23 authored Jan 6, 2025
1 parent 300f2f5 commit 4e6e6e6
Showing 1 changed file with 127 additions and 100 deletions.
227 changes: 127 additions & 100 deletions dbus-serialbattery/standalone_serialbattery.py
Original file line number Diff line number Diff line change
Expand Up @@ -79,11 +79,11 @@
from time import sleep

from standalone_helper import DbusHelper
import utils
from battery import Battery

from utils import (
BMS_TYPE,
bytearray_to_string,
logger,
BATTERY_ADDRESSES,
)
Expand Down Expand Up @@ -171,6 +171,88 @@ def __init__(self, devpath, driverOption, devadr, loglevel):
self.helper = {}
self.BatIds = []

def get_battery(self, _port: str, _bus_address: hex = None, can_transport_interface: callable = None) -> Union[Battery, None]:
"""
Attempts to establish a connection to the battery and returns the battery object if successful.
:param _port: The port to connect to.
:param _bus_address: The Modbus/CAN address to connect to (optional).
:return: The battery object if a connection is established, otherwise None.
"""
# Try to establish communications with the battery 3 times, else exit
retry = 1
retries = 3
while retry <= retries:
logging.info("-- Testing BMS: " + str(retry) + " of " + str(retries) + " rounds")
# create a new battery object that can read the battery and run connection test
for test in self.expected_bms_types:
# noinspection PyBroadException
try:
if _bus_address is not None:
# Convert hex string to bytes
_bms_address = bytes.fromhex(_bus_address.replace("0x", ""))
elif "address" in test:
_bms_address = test["address"]
else:
_bms_address = None

logging.info(
"Testing " + test["bms"].__name__ + (' at address "' + bytearray_to_string(_bms_address) + '"' if _bms_address is not None else "")
)
batteryClass = test["bms"]
baud = test["baud"] if "baud" in test else None
battery: Battery = batteryClass(port=_port, baud=baud, address=_bms_address)
if battery.test_connection() and battery.validate_data():
logging.info("-- Connection established to " + battery.__class__.__name__)
return battery
except KeyboardInterrupt:
return None
except Exception:
(
exception_type,
exception_object,
exception_traceback,
) = sys.exc_info()
file = exception_traceback.tb_frame.f_code.co_filename
line = exception_traceback.tb_lineno
logging.error("Non blocking exception occurred: " + f"{repr(exception_object)} of type {exception_type} in {file} line #{line}")
# Ignore any malfunction test_function()
pass
retry += 1
sleep(0.5)

return None

def check_bms_types(self, supported_bms_types, type) -> None:
"""
Checks if BMS_TYPE is not empty and all specified BMS types are supported.
:param supported_bms_types: List of supported BMS types.
:param type: The type of BMS connection (ble, can, or serial).
:return: None
"""
# Get only BMS_TYPE that end with "_Ble"
if type == "ble":
bms_types = [type for type in BMS_TYPE if type.endswith("_Ble")]

# Get only BMS_TYPE that end with "_Can"
if type == "can":
bms_types = [type for type in BMS_TYPE if type.endswith("_Can")]

# Get only BMS_TYPE that do not end with "_Ble" or "_Can"
if type == "serial":
bms_types = [type for type in BMS_TYPE if not type.endswith("_Ble") and not type.endswith("_Can")]

if len(bms_types) > 0:
for bms_type in bms_types:
if bms_type not in [bms["bms"].__name__ for bms in supported_bms_types]:
logger.error(
f'ERROR >>> BMS type "{bms_type}" is not supported. Supported BMS types are: '
+ f"{', '.join([bms['bms'].__name__ for bms in supported_bms_types])}"
+ "; Disabled by default: ANT, MNB, Sinowealth"
)
raise (None, None, 1)

def bms_open(self):
logging.info("open serial interface")
# check if BMS_TYPE is not empty and all BMS types in the list are supported
Expand Down Expand Up @@ -212,8 +294,12 @@ def bms_open(self):

if self.driveroption == 10: # can interface
"""
Import CAN classes only, if it's a can port, else the driver won't start due to missing python modules
This prevent problems when using the driver only with a serial connection
Import CAN classes only if it's a CAN port; otherwise, the driver won't start due to missing Python modules.
This prevents issues when using the driver exclusively with a serial connection
can: Older GX devices and Raspberry Pi with CAN hat
vecan: Newer Venus GX devices
vcan: Virtual CAN interface for testing
"""
from bms.daly_can import Daly_Can
from bms.jkbms_can import Jkbms_Can
Expand All @@ -228,32 +314,51 @@ def bms_open(self):
battery_type for battery_type in self.supported_bms_types if battery_type["bms"].__name__ in BMS_TYPE or len(BMS_TYPE) == 0
]

# If no BMS type is supported, use all supported BMS types

if len(self.expected_bms_types) == 0:
logging.warning(f"No supported CAN BMS type found in BMS_TYPE: {', '.join(BMS_TYPE)}. Using all supported BMS types.")
self.expected_bms_types = supported_bms_types

# start the corresponding CanReceiverThread if BMS for this type found
from utils_can import CanReceiverThread
from utils_can import CanReceiverThread, CanTransportInterface

try:
can_thread = CanReceiverThread.get_instance(bustype="socketcan", channel=self.devpath)
except Exception as e:
print(f"Error: {e}")

# wait until thread has initialized
if not can_thread.can_initialised.wait(2):
logger.error("Timeout while accessing CAN interface")
sleep(60)

can_transport_interface = CanTransportInterface()
can_transport_interface.can_message_cache_callback = can_thread.get_message_cache
can_transport_interface.can_bus = can_thread.can_bus
logging.debug("Wait shortly to make sure that all needed data is in the cache")
# Slowest message cycle trasmission is every 1 second, wait a bit more for the fist time to fetch all needed data
sleep(2)

# check if BATTERY_ADDRESSES is not empty
if BATTERY_ADDRESSES:
logging.info(">>> CAN multi device mode")
for address in BATTERY_ADDRESSES:
checkbatt = self.get_battery(self.devpath, address, can_thread.get_message_cache)
if checkbatt is not None:
self.battery[address] = checkbatt
logging.info("Successful battery connection at " + self.devpath + " and this device address " + str(address))
addresses = [None] if len(BATTERY_ADDRESSES) == 0 else BATTERY_ADDRESSES # use default address, if not configured

for busspeed in [250, 500]:
for address in addresses:
bat = self.get_battery(self.devpath, address, can_transport_interface)
if bat:
self.battery[address] = bat
logger.info(f"Successful battery connection at {self.devpath} and this address {str(address)}")
else:
logging.warning("No battery connection at " + self.devpath + " and this device address " + str(address))
# use default address
else:
self.battery[0] = self.get_battery(self.devpath, None, can_thread.get_message_cache)
logger.warning(f"No battery connection at {self.devpath} and this address {str(address)}")

# if we've found at least 1 battery, stop the search here. otherwise retry with other bus speeds
if len(self.battery) > 0:
break

logger.info(f"Found no devices on can bus, retrying with {busspeed} kbps")
can_thread.setup_can(channel=self.devpath, bitrate=busspeed, force=True)
sleep(2)

# SERIAL
else: # Serial, modbus, ...
# check if BMS_TYPE is not empty and all BMS types in the list are supported
# self.check_bms_types(supported_bms_types, "serial")
Expand All @@ -265,12 +370,12 @@ def bms_open(self):
# check if BATTERY_ADDRESSES is not empty
if BATTERY_ADDRESSES:
for address in BATTERY_ADDRESSES:
checkbatt = self.get_battery(self.devpath, address)
if checkbatt is not None:
self.battery[address] = checkbatt
logger.info("Successful battery connection at " + self.devpath + " and this Modbus address " + str(address))
found_battery = self.get_battery(self.devpath, address)
if found_battery:
self.battery[address] = found_battery
logger.info(f"Successful battery connection at {self.devpath} and this address {address}")
else:
logger.warning("No battery connection at " + self.devpath + " and this Modbus address " + str(address))
logger.warning(f"No battery connection at {self.devpath} and this address {address}")
# use default address
else:
self.battery[0] = self.get_battery(self.devpath)
Expand Down Expand Up @@ -298,84 +403,6 @@ def bms_open(self):
# self.battery.log_settings()
return self.BatIds

def check_bms_types(self, supported_bms_types, type) -> None:
"""
Checks if BMS_TYPE is not empty and all specified BMS types are supported.
:param supported_bms_types: List of supported BMS types.
:param type: The type of BMS connection (ble, can, or serial).
:return: None
"""
# Get only BMS_TYPE that end with "_Ble"
if type == "ble":
bms_types = [type for type in BMS_TYPE if type.endswith("_Ble")]

# Get only BMS_TYPE that end with "_Can"
if type == "can":
bms_types = [type for type in BMS_TYPE if type.endswith("_Can")]

# Get only BMS_TYPE that do not end with "_Ble" or "_Can"
if type == "serial":
bms_types = [type for type in BMS_TYPE if not type.endswith("_Ble") and not type.endswith("_Can")]

if len(bms_types) > 0:
for bms_type in bms_types:
if bms_type not in [bms["bms"].__name__ for bms in supported_bms_types]:
logger.error(
f'ERROR >>> BMS type "{bms_type}" is not supported. Supported BMS types are: '
+ f"{', '.join([bms['bms'].__name__ for bms in supported_bms_types])}"
+ "; Disabled by default: ANT, MNB, Sinowealth"
)
raise (None, None, 1)

def get_battery(self, _port: str, _modbus_address: hex = None, _can_message_cache_callback: callable = None) -> Union[Battery, None]:
# all the different batteries the driver support and need to test for
# try to establish communications with the battery 3 times, else exit
retry = 1
retries = 2
while retry <= retries:
logging.info("-- Testing BMS: " + str(retry) + " of " + str(retries) + " rounds")
# create a new battery object that can read the battery and run connection test
for test in self.expected_bms_types:
# noinspection PyBroadException
try:
if _modbus_address is not None:
# convert hex string to bytes
_bms_address = bytes.fromhex(_modbus_address.replace("0x", ""))
elif "address" in test:
_bms_address = test["address"]
else:
_bms_address = None

logging.info(
"Testing "
+ test["bms"].__name__
+ (' at address "' + utils.bytearray_to_string(_bms_address) + '"' if _bms_address is not None else "")
)
batteryClass = test["bms"]
baud = test["baud"] if "baud" in test else None
battery: Battery = batteryClass(port=_port, baud=baud, address=_bms_address)
if battery.test_connection() and battery.validate_data():
logging.info("-- Connection established to " + battery.__class__.__name__)
return battery
except KeyboardInterrupt:
return None
except Exception:
(
exception_type,
exception_object,
exception_traceback,
) = sys.exc_info()
file = exception_traceback.tb_frame.f_code.co_filename
line = exception_traceback.tb_lineno
logging.error("Non blocking exception occurred: " + f"{repr(exception_object)} of type {exception_type} in {file} line #{line}")
# Ignore any malfunction test_function()
pass
retry += 1
sleep(0.5)

return None

def bms_close(self):
logging.debug("close serial interface")

Expand Down

0 comments on commit 4e6e6e6

Please sign in to comment.