From 1651418cab93a01d751c78f3928b984b7b85a23a Mon Sep 17 00:00:00 2001 From: samson0v Date: Tue, 3 Dec 2024 12:26:13 +0200 Subject: [PATCH 1/3] Added Async BACnet Connector --- .../extensions/bacnet_async/__init__.py | 0 setup.py | 6 +- thingsboard_gateway/config/bacnet.json | 43 +- .../connectors/bacnet/application.py | 40 ++ .../bacnet/backward_compatibility_adapter.py | 73 +++ .../connectors/bacnet/bacnet_connector.py | 562 +++++++++--------- .../connectors/bacnet/bacnet_converter.py | 10 +- .../bacnet/bacnet_uplink_converter.py | 101 +--- .../connectors/bacnet/device.py | 100 ++++ .../connectors/bacnet/entities/__init__.py | 0 .../bacnet/entities/bacnet_device_details.py | 33 + .../connectors/bacnet/entities/device_info.py | 46 ++ .../bacnet/entities/device_object_config.py | 44 ++ .../entities/uplink_converter_config.py | 33 + .../__init__.py | 0 .../connectors/bacnet_old/bacnet_connector.py | 345 +++++++++++ .../connectors/bacnet_old/bacnet_converter.py | 22 + .../bacnet_downlink_converter.py | 2 +- .../bacnet_old/bacnet_uplink_converter.py | 95 +++ .../bacnet_old/bacnet_utilities/__init__.py | 13 + .../tb_gateway_bacnet_application.py | 2 +- .../tb_gateway_bacnet_device.py | 0 .../extensions/bacnet_async/__init__.py | 0 .../gateway/tb_gateway_service.py | 3 +- .../tb_gateway_remote_configurator.py | 1 + 25 files changed, 1199 insertions(+), 375 deletions(-) create mode 100644 for_build/etc/thingsboard-gateway/extensions/bacnet_async/__init__.py create mode 100644 thingsboard_gateway/connectors/bacnet/application.py create mode 100644 thingsboard_gateway/connectors/bacnet/backward_compatibility_adapter.py create mode 100644 thingsboard_gateway/connectors/bacnet/device.py create mode 100644 thingsboard_gateway/connectors/bacnet/entities/__init__.py create mode 100644 thingsboard_gateway/connectors/bacnet/entities/bacnet_device_details.py create mode 100644 thingsboard_gateway/connectors/bacnet/entities/device_info.py create mode 100644 thingsboard_gateway/connectors/bacnet/entities/device_object_config.py create mode 100644 thingsboard_gateway/connectors/bacnet/entities/uplink_converter_config.py rename thingsboard_gateway/connectors/{bacnet/bacnet_utilities => bacnet_old}/__init__.py (100%) create mode 100644 thingsboard_gateway/connectors/bacnet_old/bacnet_connector.py create mode 100644 thingsboard_gateway/connectors/bacnet_old/bacnet_converter.py rename thingsboard_gateway/connectors/{bacnet => bacnet_old}/bacnet_downlink_converter.py (91%) create mode 100644 thingsboard_gateway/connectors/bacnet_old/bacnet_uplink_converter.py create mode 100644 thingsboard_gateway/connectors/bacnet_old/bacnet_utilities/__init__.py rename thingsboard_gateway/connectors/{bacnet => bacnet_old}/bacnet_utilities/tb_gateway_bacnet_application.py (98%) rename thingsboard_gateway/connectors/{bacnet => bacnet_old}/bacnet_utilities/tb_gateway_bacnet_device.py (100%) create mode 100644 thingsboard_gateway/extensions/bacnet_async/__init__.py diff --git a/for_build/etc/thingsboard-gateway/extensions/bacnet_async/__init__.py b/for_build/etc/thingsboard-gateway/extensions/bacnet_async/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/setup.py b/setup.py index 731b1da17..849738069 100644 --- a/setup.py +++ b/setup.py @@ -41,14 +41,14 @@ 'thingsboard_gateway.connectors', 'thingsboard_gateway.connectors.ble', 'thingsboard_gateway.connectors.socket', 'thingsboard_gateway.connectors.mqtt', 'thingsboard_gateway.connectors.xmpp', 'thingsboard_gateway.connectors.modbus_async', 'thingsboard_gateway.connectors.opcua', 'thingsboard_gateway.connectors.request', 'thingsboard_gateway.connectors.ocpp', - 'thingsboard_gateway.connectors.modbus', 'thingsboard_gateway.connectors.can', 'thingsboard_gateway.connectors.bacnet', - 'thingsboard_gateway.connectors.bacnet.bacnet_utilities', 'thingsboard_gateway.connectors.odbc', + 'thingsboard_gateway.connectors.modbus', 'thingsboard_gateway.connectors.can', 'thingsboard_gateway.connectors.bacnet_old', + 'thingsboard_gateway.connectors.bacnet_old.bacnet_utilities', 'thingsboard_gateway.connectors.odbc', 'thingsboard_gateway.connectors.bacnet', 'thingsboard_gateway.connectors.rest', 'thingsboard_gateway.connectors.snmp', 'thingsboard_gateway.connectors.ftp', 'thingsboard_gateway.tb_utility', 'thingsboard_gateway.extensions', 'thingsboard_gateway.extensions.mqtt', 'thingsboard_gateway.extensions.modbus', 'thingsboard_gateway.extensions.opcua', 'thingsboard_gateway.extensions.ocpp', 'thingsboard_gateway.extensions.ble', 'thingsboard_gateway.extensions.serial', 'thingsboard_gateway.extensions.request', - 'thingsboard_gateway.extensions.can', 'thingsboard_gateway.extensions.bacnet', 'thingsboard_gateway.extensions.odbc', + 'thingsboard_gateway.extensions.can', 'thingsboard_gateway.extensions.bacnet_old', 'thingsboard_gateway.extensions.bacnet', 'thingsboard_gateway.extensions.odbc', 'thingsboard_gateway.extensions.rest', 'thingsboard_gateway.extensions.snmp', 'thingsboard_gateway.extensions.ftp', 'thingsboard_gateway.extensions.socket', 'thingsboard_gateway.extensions.xmpp', 'thingsboard_gateway.gateway.statistics' ], diff --git a/thingsboard_gateway/config/bacnet.json b/thingsboard_gateway/config/bacnet.json index a03bbdb97..341bf80e8 100644 --- a/thingsboard_gateway/config/bacnet.json +++ b/thingsboard_gateway/config/bacnet.json @@ -1,39 +1,50 @@ { - "general": { + "application": { "objectName": "TB_gateway", - "address": "0.0.0.0:47808", + "address": "0.0.0.0", "objectIdentifier": 599, "maxApduLengthAccepted": 1476, "segmentationSupported": "segmentedBoth", - "vendorIdentifier": 15 + "vendorIdentifier": 15, + "deviceDiscoveryTimeoutInSec": 5 }, "devices": [ { - "deviceName": "BACnet Device ${objectName}", - "deviceType": "default", - "address": "192.168.2.110:47808", + "deviceInfo": { + "deviceNameExpressionSource": "expression", + "deviceNameExpression": "BACnet Device ${objectName}", + "deviceProfileExpressionSource": "constant", + "deviceProfileExpression": "default" + }, + "address": "192.168.1.160:49644", "pollPeriod": 10000, "attributes": [ { "key": "temperature", - "type": "string", - "objectId": "analogOutput:1", + "objectType": "analogInput", + "objectId": "0", "propertyId": "presentValue" } ], "timeseries": [ { - "key": "state", - "type": "bool", - "objectId": "binaryValue:1", + "key": "water_temp", + "objectType": "analogInput", + "objectId": "1", + "propertyId": "presentValue" + }, + { + "key": "temp_outdoor", + "objectType": "analogInput", + "objectId": "2", "propertyId": "presentValue" } ], "attributeUpdates": [ { "key": "brightness", - "requestType": "writeProperty", - "objectId": "analogOutput:1", + "objectType": "analogInput", + "objectId": "1", "propertyId": "presentValue" } ], @@ -42,14 +53,16 @@ "method": "set_state", "requestType": "writeProperty", "requestTimeout": 10000, - "objectId": "binaryOutput:1", + "objectType": "binaryOutput", + "objectId": "1", "propertyId": "presentValue" }, { "method": "get_state", "requestType": "readProperty", "requestTimeout": 10000, - "objectId": "binaryOutput:1", + "objectType": "analogInput", + "objectId": "0", "propertyId": "presentValue" } ] diff --git a/thingsboard_gateway/connectors/bacnet/application.py b/thingsboard_gateway/connectors/bacnet/application.py new file mode 100644 index 000000000..a5fed83b6 --- /dev/null +++ b/thingsboard_gateway/connectors/bacnet/application.py @@ -0,0 +1,40 @@ +# Copyright 2024. ThingsBoard +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from bacpypes3.ipv4.app import NormalApplication as App +from bacpypes3.local.device import DeviceObject +from bacpypes3.pdu import Address + +from thingsboard_gateway.connectors.bacnet.entities.device_object_config import DeviceObjectConfig + + +class Application(App): + def __init__(self, device_object_config: DeviceObjectConfig, indication_callback, logger): + self.__device_object_config = device_object_config + self.__device_object = DeviceObject(**self.__device_object_config.device_object_config) + super().__init__(self.__device_object, Address(self.__device_object_config.address)) + + self.__log = logger + self.__indication_callback = indication_callback + + async def indication(self, apdu) -> None: + self.__log.debug(f"(indication) Received APDU: {apdu}") + await super().indication(apdu) + self.__indication_callback(apdu) + + async def do_who_is(self, device_address): + devices = await self.who_is(address=Address(device_address), + timeout=self.__device_object_config.device_discovery_timeout) + if len(devices): + return devices[0] diff --git a/thingsboard_gateway/connectors/bacnet/backward_compatibility_adapter.py b/thingsboard_gateway/connectors/bacnet/backward_compatibility_adapter.py new file mode 100644 index 000000000..4ab5cffaa --- /dev/null +++ b/thingsboard_gateway/connectors/bacnet/backward_compatibility_adapter.py @@ -0,0 +1,73 @@ +# Copyright 2024. ThingsBoard +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from copy import deepcopy + + +class BackwardCompatibilityAdapter: + def __init__(self, config, logger): + self.__log = logger + self.__config = deepcopy(config) + + def convert(self): + try: + self.__convert_application_section() + self.__convert_device_section() + except Exception as e: + self.__log.error('Error converting old config: %s', e) + + return self.__config + + + def __convert_application_section(self): + general_section = self.__config.pop('general', {}) + self.__config['application'] = general_section + + def __convert_device_section(self): + for device in self.__config.get('devices', []): + self.__convert_device_info_section(device) + + for section in ('attributes', 'timeseries'): + for item_config in device.get(section, []): + self.__convert_object_id(item_config) + + for attr_update_config in device.get('attributeUpdates', []): + self.__convert_object_id(attr_update_config) + + for rpc_config in device.get('rpc', []): + self.__convert_object_id(rpc_config) + + @staticmethod + def __convert_device_info_section(old_device_config): + device_name = old_device_config.pop('deviceName') + device_type = old_device_config.pop('deviceType', 'default') + + old_device_config['deviceInfo'] = { + 'deviceNameExpressionSource': 'expression', + 'deviceProfileExpressionSource': 'expression', + 'deviceNameExpression': device_name, + 'deviceProfileExpression': device_type + } + + @staticmethod + def __convert_object_id(old_item_config): + old_object_id = old_item_config.pop('objectId') + (object_type, object_id) = old_object_id.split(':') + old_item_config['objectType'] = object_type + old_item_config['objectId'] = object_id + + @staticmethod + def is_old_config(config): + if config.get('general'): + return True diff --git a/thingsboard_gateway/connectors/bacnet/bacnet_connector.py b/thingsboard_gateway/connectors/bacnet/bacnet_connector.py index a6cac29aa..004049db7 100644 --- a/thingsboard_gateway/connectors/bacnet/bacnet_connector.py +++ b/thingsboard_gateway/connectors/bacnet/bacnet_connector.py @@ -12,334 +12,338 @@ # See the License for the specific language governing permissions and # limitations under the License. -from queue import Queue -from random import choice -from string import ascii_lowercase +import asyncio +from asyncio import CancelledError +from queue import Queue, Empty from threading import Thread -from time import sleep, time +from string import ascii_lowercase +from random import choice +from time import monotonic, sleep -from thingsboard_gateway.gateway.statistics.decorators import CollectAllReceivedBytesStatistics +from thingsboard_gateway.connectors.connector import Connector +from thingsboard_gateway.gateway.constants import STATISTIC_MESSAGE_RECEIVED_PARAMETER, STATISTIC_MESSAGE_SENT_PARAMETER from thingsboard_gateway.gateway.statistics.statistics_service import StatisticsService -from thingsboard_gateway.tb_utility.tb_loader import TBModuleLoader -from thingsboard_gateway.tb_utility.tb_utility import TBUtility from thingsboard_gateway.tb_utility.tb_logger import init_logger +from thingsboard_gateway.tb_utility.tb_utility import TBUtility try: - from bacpypes.core import run, stop + from bacpypes3.apdu import ErrorRejectAbortNack + from bacpypes3.pdu import Address except ImportError: - print("BACnet library not found - installing...") - TBUtility.install_package("bacpypes", ">=0.18.0") - from bacpypes.core import run, stop + print("bacpypes3 library not found") + TBUtility.install_package("bacpypes3") + from bacpypes3.apdu import ErrorRejectAbortNack + from bacpypes3.pdu import Address -from bacpypes.pdu import Address, GlobalBroadcast, LocalBroadcast, LocalStation, RemoteStation +from thingsboard_gateway.connectors.bacnet.device import Device +from thingsboard_gateway.connectors.bacnet.entities.device_object_config import DeviceObjectConfig +from thingsboard_gateway.connectors.bacnet.application import Application +from thingsboard_gateway.connectors.bacnet.backward_compatibility_adapter import BackwardCompatibilityAdapter -from thingsboard_gateway.connectors.connector import Connector -from thingsboard_gateway.connectors.bacnet.bacnet_utilities.tb_gateway_bacnet_application import TBBACnetApplication +class AsyncBACnetConnector(Thread, Connector): + PROCESS_DEVICE_QUEUE = Queue(-1) -class BACnetConnector(Thread, Connector): def __init__(self, gateway, config, connector_type): - self._connector_type = connector_type - self.statistics = {'MessagesReceived': 0, - 'MessagesSent': 0} + self.statistics = {STATISTIC_MESSAGE_RECEIVED_PARAMETER: 0, + STATISTIC_MESSAGE_SENT_PARAMETER: 0} + self.__connector_type = connector_type super().__init__() + self.__gateway = gateway self.__config = config + + self.__log = init_logger(self.__gateway, self.name, self.__config.get('logLevel', "INFO"), + enable_remote_logging=self.__config.get('enableRemoteLogging', False)) + self.__log.info('Starting BACnet connector...') + + if BackwardCompatibilityAdapter.is_old_config(config): + backward_compatibility_adapter = BackwardCompatibilityAdapter(config, self.__log) + self.__config = backward_compatibility_adapter.convert() + self.__id = self.__config.get('id') self.name = config.get('name', 'BACnet ' + ''.join(choice(ascii_lowercase) for _ in range(5))) - self.__devices = [] - self.__device_indexes = {} - self.__devices_address_name = {} - self.__gateway = gateway - self._log = init_logger(self.__gateway, self.name, self.__config.get('logLevel', 'INFO'), - enable_remote_logging=self.__config.get('enableRemoteLogging', False)) - self._application = TBBACnetApplication(self, self.__config, self._log) - self.__bacnet_core_thread = Thread(target=run, name="BACnet core thread", daemon=True, - kwargs={"sigterm": None, "sigusr1": None}) - self.__bacnet_core_thread.start() + self.daemon = True self.__stopped = False - self.__config_devices = self.__config["devices"] - self.default_converters = { - "uplink_converter": TBModuleLoader.import_module(self._connector_type, "BACnetUplinkConverter"), - "downlink_converter": TBModuleLoader.import_module(self._connector_type, "BACnetDownlinkConverter")} - self.__request_functions = {"writeProperty": self._application.do_write_property, - "readProperty": self._application.do_read_property, - "risingEdge": self._application.do_binary_rising_edge} - self.__available_object_resources = {} - self.rpc_requests_in_progress = {} self.__connected = False - self.daemon = True - self.__convert_and_save_data_queue = Queue() + + self.__application = None + + self.__data_to_convert_queue = Queue(-1) + self.__data_to_save_queue = Queue(-1) + + try: + self.loop = asyncio.new_event_loop() + asyncio.set_event_loop(self.loop) + except RuntimeError: + self.loop = asyncio.get_event_loop() + + self.__devices = [] + + self.__function_to_execute = { + 'readProperty': self.__read_property, + 'writeProperty': self.__write_property + } def open(self): - self.__stopped = False self.start() def run(self): self.__connected = True - self.scan_network() - self._application.do_whois() - self._log.debug("WhoIsRequest has been sent.") - self.scan_network() - while not self.__stopped: - sleep(.2) - for device in self.__devices: - try: - if device.get("previous_check") is None or time() * 1000 - device["previous_check"] >= device[ - "poll_period"]: - for mapping_type in ["attributes", "telemetry"]: - for config in device[mapping_type]: - if config.get("uplink_converter") is None or config.get("downlink_converter") is None: - self.__load_converters(device) - data_to_application = { - "device": device, - "mapping_type": mapping_type, - "config": config, - "callback": self.__bacnet_device_mapping_response_cb - } - self._application.do_read_property(**data_to_application) - device["previous_check"] = time() * 1000 - else: - sleep(.2) - except Exception as e: - self._log.exception(e) - if not self.__convert_and_save_data_queue.empty(): - for _ in range(self.__convert_and_save_data_queue.qsize()): - thread = Thread(target=self.__convert_and_save_data, args=(self.__convert_and_save_data_queue,), - daemon=True) - thread.start() + try: + self.loop.run_until_complete(self.__start()) + except CancelledError as e: + self.__log.debug('Task was cancelled due to connector stop: %s', e.__str__()) + except Exception as e: + self.__log.exception(e) - def close(self): - self.__stopped = True - self.__connected = False - self._log.stop() + async def __start(self): + self.__application = Application(DeviceObjectConfig(self.__config['application']), self.indication_callback, self.__log) - self._application.mux.directPort.connected = False - self._application.mux.directPort.accepting = False - self._application.mux.directPort.connecting = False - self._application.mux.directPort.del_channel() - if self._application.mux.directPort.socket is not None: - try: - sleep(1) - self._application.mux.directPort.socket.close() - except OSError: - pass + await self.__discover_devices() + await asyncio.gather(self.__process_device_requests(), self.__convert_data(), self.__save_data()) - stop() - self._log.info('BACnet connector has been stopped.') + def indication_callback(self, apdu): + """ + First check if device is already added + If added, set active to True + If not added, check if device is in config + """ - def get_name(self): - return self.name + try: + device_address = apdu.pduSource.exploded + self.__log.debug('Received APDU, from %s, tyring to find device...', device_address) + added_device = self.__find_device_by_address(device_address) + if added_device is None: + device_config = Device.find_self_in_config(self.__config['devices'], apdu) + if device_config: + device = Device(self.connector_type, device_config, apdu, self.callback, self.__log) + self.__devices.append(device) + self.__gateway.add_device(device.device_info.device_name, + {"connector": self}, + device_type=device.device_info.device_type) + self.__log.debug('Device %s found', device) + else: + self.__log.debug('Device %s not found in config', device_address) + else: + added_device.active = True + self.__log.debug('Device %s already added', added_device) + except Exception as e: + self.__log.error('Error processing indication callback: %s', e) - def get_id(self): - return self.__id + def __find_device_by_address(self, address): + for device in self.__devices: + if device.details.address == address: + return device + + def __find_device_by_name(self, name): + device_filter = list(filter(lambda x: x.device_info.device_name == name, self.__devices)) + if len(device_filter): + return device_filter[0] - def get_type(self): - return self._connector_type + async def __discover_devices(self): + self.__log.info('Discovering devices...') + for device_config in self.__config.get('devices', []): + try: + await self.__application.do_who_is(device_address=device_config['address']) + self.__log.debug('WhoIs request sent to device %s', device_config['address']) + except Exception as e: + self.__log.error('Error discovering device %s: %s', device_config['address'], e) - def is_connected(self): - return self.__connected + async def __process_device_requests(self): + while not self.__stopped: + try: + device = self.PROCESS_DEVICE_QUEUE.get_nowait() - def is_stopped(self): - return self.__stopped + results = [] + for object_to_read in device.uplink_converter_config.objects_to_read: + result = await self.__read_property(Address(device.details.address), + Device.get_object_id(object_to_read), + object_to_read['propertyId']) + results.append(result) - @CollectAllReceivedBytesStatistics(start_stat_type='allReceivedBytesFromTB') - def on_attributes_update(self, content): + self.__log.trace('%s reading results: %s', device, results) + self.__data_to_convert_queue.put_nowait((device, results)) + except Empty: + await asyncio.sleep(.01) + except Exception as e: + self.__log.error('Error processing device requests: %s', e) + + async def __read_property(self, address, object_id, property_id): try: - self._log.debug('Recieved Attribute Update Request: %r', str(content)) - for device in self.__devices: - if device["deviceName"] == content["device"]: - for request in device["attribute_updates"]: - if request["config"].get("requestType") is not None: - for attribute in content["data"]: - if attribute == request["key"]: - request["iocb"][1]["config"].update({"propertyValue": content["data"][attribute]}) - kwargs = request["iocb"][1] - iocb = request["iocb"][0](device, **kwargs) - self.__request_functions[request["config"]["requestType"]](iocb) - return - else: - self._log.error("\"requestType\" not found in request configuration for key %s device: %s", - request.get("key", "[KEY IS EMPTY]"), - device["deviceName"]) + result = await self.__application.read_property(address, object_id, property_id) + return result + except ErrorRejectAbortNack as err: + return err except Exception as e: - self._log.exception(e) - - @CollectAllReceivedBytesStatistics(start_stat_type='allReceivedBytesFromTB') - def server_side_rpc_handler(self, content): + self.__log.error('Error reading property %s:%s from device %s: %s', object_id, property_id, address, e) + + async def __write_property(self, address, object_id, property_id, value): try: - self._log.debug('Recieved RPC Request: %r', str(content)) - for device in self.__devices: - if device["deviceName"] == content["device"]: - method_found = False - for request in device["server_side_rpc"]: - if request["config"].get("requestType") is not None: - if content["data"]["method"] == request["method"]: - method_found = True - kwargs = request["iocb"][1] - timeout = time() * 1000 + request["config"].get("requestTimeout", 200) - if content["data"].get("params") is not None: - kwargs["config"].update({"propertyValue": content["data"]["params"]}) - iocb = request["iocb"][0](device, **kwargs) - self.__request_functions[request["config"]["requestType"]](device=iocb, - callback=self.__rpc_response_cb) - self.rpc_requests_in_progress[iocb] = {"content": content, - "uplink_converter": request["uplink_converter"]} - # self.__gateway.register_rpc_request_timeout(content, - # timeout, - # iocb, - # self.__rpc_cancel_processing) - else: - self._log.error("\"requestType\" not found in request configuration for key %s device: %s", - request.get("key", "[KEY IS EMPTY]"), - device["deviceName"]) - if not method_found: - self._log.error("RPC method %s not found in configuration", content["data"]["method"]) - self.__gateway.send_rpc_reply(content["device"], content["data"]["id"], success_sent=False) + result = await self.__application.write_property(address, object_id, property_id, value) + return result + except ErrorRejectAbortNack as err: + return err except Exception as e: - self._log.exception(e) - - def __rpc_response_cb(self, iocb, callback_params=None): - device = self.rpc_requests_in_progress[iocb] - converter = device["uplink_converter"] - content = device["content"] - if iocb.ioResponse: - apdu = iocb.ioResponse - self._log.debug("Received callback with Response: %r", apdu) - converted_data = converter.convert(None, apdu) - if converted_data is None: - converted_data = {"success": True} - self.__gateway.send_rpc_reply(content["device"], content["data"]["id"], converted_data) - # self.__gateway.rpc_with_reply_processing(iocb, converted_data or {"success": True}) - elif iocb.ioError: - self._log.exception("Received callback with Error: %r", iocb.ioError) - data = {"error": str(iocb.ioError)} - self.__gateway.send_rpc_reply(content["device"], content["data"]["id"], data) - self._log.debug(iocb.ioError) - else: - self._log.error("Received unknown RPC response callback from device: %r", iocb) + self.__log.error('Error writing property %s:%s to device %s: %s', object_id, property_id, address, e) - def __rpc_cancel_processing(self, iocb): - self._log.info("RPC with iocb %r - cancelled.", iocb) + async def __convert_data(self): + while not self.__stopped: + try: + device, values = self.__data_to_convert_queue.get_nowait() + self.__log.trace('%s data to convert: %s', device, values) + converted_data = device.uplink_converter.convert(values) + self.__data_to_save_queue.put_nowait((device, converted_data)) + except Empty: + await asyncio.sleep(.01) + except Exception as e: + self.__log.error('Error converting data: %s', e) - def scan_network(self): - self._application.do_whois() - self._log.debug("WhoIsRequest has been sent.") - for device in self.__config_devices: + async def __save_data(self): + while not self.__stopped: try: - if self._application.check_or_add(device): - for mapping_type in ["attributes", "timeseries"]: - for config in device[mapping_type]: - if config.get("uplink_converter") is None or config.get("downlink_converter") is None: - self.__load_converters(device) - data_to_application = { - "device": device, - "mapping_type": mapping_type, - "config": config, - "callback": self.__bacnet_device_mapping_response_cb - } - self._application.do_read_property(**data_to_application) + device, data_to_save = self.__data_to_save_queue.get_nowait() + self.__log.trace('%s data to save: %s', device, data_to_save) + StatisticsService.count_connector_message(self.get_name(), stat_parameter_name='storageMsgPushed') + self.__gateway.send_to_storage(self.get_name(), self.get_id(), data_to_save) + self.statistics[STATISTIC_MESSAGE_SENT_PARAMETER] += 1 + except Empty: + await asyncio.sleep(.01) except Exception as e: - self._log.exception(e) + self.__log.error('Error saving data: %s', e) - def __convert_and_save_data(self, queue): - converter, mapping_type, config, iocb = queue.get() - StatisticsService.count_connector_message(self.name, stat_parameter_name='connectorMsgsReceived') - StatisticsService.count_connector_bytes(self.name, iocb.ioResponse if iocb.ioResponse else iocb.ioError, - stat_parameter_name='connectorBytesReceived') - try: - converted_data = converter.convert((mapping_type, config), - iocb.ioResponse if iocb.ioResponse else iocb.ioError) - self.__gateway.send_to_storage(self.get_name(), self.__id, converted_data) - except Exception as e: - self._log.exception("Error while converting data: %s", e) - - def __bacnet_device_mapping_response_cb(self, iocb, callback_params): - mapping_type = callback_params["mapping_type"] - config = callback_params["config"] - converter = callback_params["config"].get("uplink_converter") - if converter is None: - for device in self.__devices: - self.__load_converters(device) - else: - converter = callback_params["config"].get("uplink_converter") + @classmethod + def callback(cls, device: Device): + cls.PROCESS_DEVICE_QUEUE.put_nowait(device) + + def close(self): + self.__log.info('Stopping BACnet connector...') + self.__connected = False + self.__stopped = True + + self.__stop_devices() + + if self.__application: + self.__application.close() + + asyncio.run_coroutine_threadsafe(self.__cancel_all_tasks(), self.loop) + + self.__check_is_alive() + + self.__log.info('BACnet connector stopped') + self.__log.stop() + + def __stop_devices(self): + for device in self.__devices: + device.stop() + + if self.__application: + self.__application.close() + + def __check_is_alive(self): + start_time = monotonic() + + while self.is_alive(): + if monotonic() - start_time > 10: + self.__log.error("Failed to stop connector %s", self.get_name()) + break + sleep(.1) + + async def __cancel_all_tasks(self): + await asyncio.sleep(5) + for task in asyncio.all_tasks(self.loop): + task.cancel() + + def __create_task(self, func, args, kwargs): + task = self.loop.create_task(func(*args, **kwargs)) + + while not task.done(): + sleep(.02) + + def on_attributes_update(self, content): try: - converted_data = converter.convert((mapping_type, config), - iocb.ioResponse if iocb.ioResponse else iocb.ioError) - self.__gateway.send_to_storage(self.get_name(), self.get_id(), converted_data) + self.__log.debug('Received Attribute Update request: %r', content) + + device = self.__find_device_by_name(content['device']) + if device is None: + self.__log.error('Device %s not found', content['device']) + return + + for attribute_name, value in content.get('data', {}).items(): + attribute_update_config_filter = list(filter(lambda x: x['key'] == attribute_name, device.attributes_updates)) + for attribute_update_config in attribute_update_config_filter: + try: + object_id = Device.get_object_id(attribute_update_config) + result = {} + self.__create_task(self.__process_attribute_update, + (Address(device.details.address), object_id, attribute_update_config['propertyId'], value), + {'result': result}) + self.__log.info('Processed attribute update with result: %r', result) + except Exception as e: + self.__log.error('Error updating attribute %s: %s', attribute_name, e) except Exception as e: - self._log.exception("Error while converting data in callback: %s", e) + self.__log.error('Error processing attribute update%s with error: %s', content, e) + + async def __process_attribute_update(self, address, object_id, property_id, value, result={}): + result['response'] = await self.__write_property(address, object_id, property_id, value) + + def server_side_rpc_handler(self, content): + self.__log.debug('Received RPC request: %r', content) + + device = self.__find_device_by_name(content['device']) + if device is None: + self.__log.error('Device %s not found', content['device']) + return - def __load_converters(self, device): - datatypes = ["attributes", "telemetry", "attribute_updates", "server_side_rpc"] - for datatype in datatypes: - for datatype_config in device.get(datatype, []): + rpc_method_name = content.get('data', {}).get('method') + if rpc_method_name is None: + self.__log.error('Method name not found in RPC request: %r', content) + return + + for rpc_config in device.server_side_rpc: + if rpc_config['method'] == rpc_method_name: try: - for converter_type in self.default_converters: - converter_object = self.default_converters[converter_type] if datatype_config.get( - "class") is None else TBModuleLoader.import_module(self._connector_type, - device.get("class")) - datatype_config[converter_type] = converter_object(device, self._log) + object_id = Device.get_object_id(rpc_config) + result = {} + value = content.get('data', {}).get('params') + self.__create_task(self.__process_rpc_request, + (Address(device.details.address), object_id, rpc_config['propertyId']), + {'value': value, 'result': result}) + self.__log.info('Processed RPC request with result: %r', result) + self.__gateway.send_rpc_reply(device=device.device_info.device_name, + req_id=content['data'].get('id'), + content=str(result)) except Exception as e: - self._log.exception(e) + self.__log.error('Error processing RPC request %s: %s', rpc_method_name, e) + self.__gateway.send_rpc_reply(device=device.device_info.device_name, + req_id=content['data'].get('id'), + content={rpc_method_name: str(e)}, + success_sent=False) - def add_device(self, data): - if self.__devices_address_name.get(data["address"]) is None: - for device in self.__config_devices: - if device["address"] == data["address"]: - try: - config_address = Address(device["address"]) - device_name_tag = TBUtility.get_value(device["deviceName"], get_tag=True) - device_name = device["deviceName"].replace("${" + device_name_tag + "}", data["name"]) - device_information = { - **data, - **self.__get_requests_configs(device), - "type": device["deviceType"], - "config": device, - "attributes": device.get("attributes", []), - "telemetry": device.get("timeseries", []), - "poll_period": device.get("pollPeriod", 5000), - "deviceName": device_name, - } - if config_address == data["address"] or \ - (config_address, GlobalBroadcast) or \ - (isinstance(config_address, LocalBroadcast) and isinstance(device["address"], - LocalStation)) or \ - (isinstance(config_address, (LocalStation, RemoteStation)) and isinstance( - data["address"], ( - LocalStation, RemoteStation))): - self.__devices_address_name[data["address"]] = device_information["deviceName"] - self.__devices.append(device_information) - - self._log.debug(data["address"].addrType) - except Exception as e: - self._log.exception(e) - - def __get_requests_configs(self, device): - result = {"attribute_updates": [], "server_side_rpc": []} - for request in device.get("attributeUpdates", []): - kwarg_dict = { - "config": request, - "request_type": request["requestType"] - } - request_config = { - "key": request["key"], - "iocb": (self._application.form_iocb, kwarg_dict), - "config": request - } - result["attribute_updates"].append(request_config) - for request in device.get("serverSideRpc", []): - kwarg_dict = { - "config": request, - "request_type": request["requestType"] - } - request_config = { - "method": request["method"], - "iocb": (self._application.form_iocb, kwarg_dict), - "config": request - } - result["server_side_rpc"].append(request_config) - return result + async def __process_rpc_request(self, address, object_id, property_id, value=None, result={}): + if value is None: + result['response'] = await self.__read_property(address, object_id, property_id) + else: + result['response'] = await self.__write_property(address, object_id, property_id, value) + + def get_id(self): + return self.__id + + def get_name(self): + return self.name + + def get_type(self): + return self.__connector_type + + @property + def connector_type(self): + return self.__connector_type def get_config(self): return self.__config + + def is_connected(self): + return self.__connected + + def is_stopped(self): + return self.__stopped diff --git a/thingsboard_gateway/connectors/bacnet/bacnet_converter.py b/thingsboard_gateway/connectors/bacnet/bacnet_converter.py index 49e229a83..741f79de6 100644 --- a/thingsboard_gateway/connectors/bacnet/bacnet_converter.py +++ b/thingsboard_gateway/connectors/bacnet/bacnet_converter.py @@ -11,12 +11,12 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. -from thingsboard_gateway.connectors.converter import Converter +from abc import abstractmethod +from thingsboard_gateway.connectors.converter import Converter -class BACnetConverter(Converter): - def __init__(self, config): - pass - def convert(self, config, data): +class AsyncBACnetConverter(Converter): + @abstractmethod + def convert(self, data): pass diff --git a/thingsboard_gateway/connectors/bacnet/bacnet_uplink_converter.py b/thingsboard_gateway/connectors/bacnet/bacnet_uplink_converter.py index 3c8d027a0..7f7da5a49 100644 --- a/thingsboard_gateway/connectors/bacnet/bacnet_uplink_converter.py +++ b/thingsboard_gateway/connectors/bacnet/bacnet_uplink_converter.py @@ -11,85 +11,46 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. -from time import time -from bacpypes.apdu import APDU, ReadPropertyACK -from bacpypes.constructeddata import ArrayOf -from bacpypes.primitivedata import Tag - -from thingsboard_gateway.connectors.bacnet.bacnet_converter import BACnetConverter -from thingsboard_gateway.gateway.constants import REPORT_STRATEGY_PARAMETER +from thingsboard_gateway.connectors.bacnet.bacnet_converter import AsyncBACnetConverter +from thingsboard_gateway.connectors.bacnet.entities.uplink_converter_config import UplinkConverterConfig from thingsboard_gateway.gateway.entities.converted_data import ConvertedData from thingsboard_gateway.gateway.entities.report_strategy_config import ReportStrategyConfig -from thingsboard_gateway.gateway.entities.telemetry_entry import TelemetryEntry -from thingsboard_gateway.gateway.statistics.decorators import CollectStatistics from thingsboard_gateway.gateway.statistics.statistics_service import StatisticsService from thingsboard_gateway.tb_utility.tb_utility import TBUtility -class BACnetUplinkConverter(BACnetConverter): - DATATYPES = {"attributes": "attributes", - "timeseries": "telemetry", - "telemetry": "telemetry"} - def __init__(self, config, logger): - self._log = logger +class AsyncBACnetUplinkConverter(AsyncBACnetConverter): + def __init__(self, config: UplinkConverterConfig, logger): + self.__log = logger self.__config = config - self.__device_report_strategy = None - try: - self.__device_report_strategy = ReportStrategyConfig(self.__config.get(REPORT_STRATEGY_PARAMETER)) - except ValueError as e: - self._log.trace("Report strategy is not set for device %s. Using strategy from upper level.", self.__config.get("deviceName")) - @CollectStatistics(start_stat_type='receivedBytesFromDevices', - end_stat_type='convertedBytesFromDevice') - def convert(self, config, data): - converted_data = ConvertedData( - device_name=self.__config.get("deviceName", config[1].get("name", "BACnet device") if config is not None else "BACnet device"), - device_type=self.__config.get("deviceType", "default") - ) + def convert(self, data): + if len(data): + StatisticsService.count_connector_message(self.__log.name, 'convertersMsgProcessed') + converted_data = ConvertedData(device_name=self.__config.device_name, device_type=self.__config.device_type) + converted_data_append_methods = { + 'attributes': converted_data.add_to_attributes, + 'telemetry': converted_data.add_to_telemetry + } - try: - value = None - if isinstance(data, ReadPropertyACK): - value = self.__property_value_from_apdu(data) - if config is not None: - datapoint_key = ( - TBUtility.convert_key_to_datapoint_key(config[1]["key"], - config[1].get(REPORT_STRATEGY_PARAMETER), - self.__device_report_strategy, - self._log)) - if BACnetUplinkConverter.DATATYPES[config[0]] == "attributes": - converted_data.add_to_attributes(datapoint_key, value) - else: - converted_data.add_to_telemetry(TelemetryEntry({datapoint_key: value}, time() * 1000)) - else: - converted_data = value - self._log.debug("Converted data: %s", converted_data) - except Exception as e: - StatisticsService.count_connector_message(self._log.name, 'convertersMsgDropped') - self._log.exception(e) + device_report_strategy = self._get_device_report_strategy(self.__config.report_strategy, self.__config.device_name) - if isinstance(converted_data, ConvertedData): - StatisticsService.count_connector_message(self._log.name, 'convertersAttrProduced', - count=converted_data.attributes_datapoints_count) - StatisticsService.count_connector_message(self._log.name, 'convertersTsProduced', - count=converted_data.telemetry_datapoints_count) - return converted_data + for config, value in zip(self.__config.objects_to_read, data): + try: + datapoint_key = TBUtility.convert_key_to_datapoint_key(config['key'], device_report_strategy, config, self.__log) + converted_data_append_methods[config['type']]({datapoint_key: round(value, 2) if isinstance(value, float) else value}) + except Exception as e: + self.__log.error("Error converting datapoint: %s", e) - @staticmethod - def __property_value_from_apdu(apdu: APDU): - tag_list = apdu.propertyValue.tagList - non_app_tags = [tag for tag in tag_list if tag.tagClass != Tag.applicationTagClass] - if non_app_tags: - raise RuntimeError("Value has some non-application tags") - first_tag = tag_list[0] - other_type_tags = [tag for tag in tag_list[1:] if tag.tagNumber != first_tag.tagNumber] - if other_type_tags: - raise RuntimeError("All tags must be the same type") - datatype = Tag._app_tag_class[first_tag.tagNumber] - if not datatype: - raise RuntimeError("unknown datatype") - if len(tag_list) > 1: - datatype = ArrayOf(datatype) - value = apdu.propertyValue.cast_out(datatype) - return value + StatisticsService.count_connector_message(self.__log.name, 'convertersAttrProduced', count=converted_data.attributes_datapoints_count) + StatisticsService.count_connector_message(self.__log.name, 'convertersTsProduced', count=converted_data.telemetry_datapoints_count) + + self.__log.debug("Converted data: %s", converted_data) + return converted_data + + def _get_device_report_strategy(self, report_strategy, device_name): + try: + return ReportStrategyConfig(report_strategy) + except ValueError as e: + self.__log.trace("Report strategy config is not specified for device %s: %s", device_name, e) diff --git a/thingsboard_gateway/connectors/bacnet/device.py b/thingsboard_gateway/connectors/bacnet/device.py new file mode 100644 index 000000000..a5b29cdc1 --- /dev/null +++ b/thingsboard_gateway/connectors/bacnet/device.py @@ -0,0 +1,100 @@ +# Copyright 2024. ThingsBoard +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import time +from threading import Thread + +from bacpypes3.primitivedata import ObjectIdentifier + +from thingsboard_gateway.connectors.bacnet.bacnet_uplink_converter import AsyncBACnetUplinkConverter +from thingsboard_gateway.connectors.bacnet.entities.bacnet_device_details import BACnetDeviceDetails +from thingsboard_gateway.connectors.bacnet.entities.device_info import DeviceInfo +from thingsboard_gateway.connectors.bacnet.entities.uplink_converter_config import UplinkConverterConfig +from thingsboard_gateway.gateway.constants import UPLINK_PREFIX, CONVERTER_PARAMETER +from thingsboard_gateway.tb_utility.tb_loader import TBModuleLoader + + +class Device(Thread): + def __init__(self, connector_type, config, i_am_request, callback, logger): + super().__init__() + + self.__connector_type = connector_type + self.__config = config + + self.__log = logger + + self.__stopped = False + self.active = True + self.callback = callback + self.daemon = True + + self.details = BACnetDeviceDetails(i_am_request) + self.device_info = DeviceInfo(self.__config.get('deviceInfo', {}), self.details) + self.uplink_converter_config = UplinkConverterConfig(self.__config, self.device_info, self.details) + + self.name = self.device_info.device_name + + self.__poll_period = self.__config.get('pollPeriod', 10000) / 1000 + self.attributes_updates = self.__config.get('attributeUpdates', []) + self.server_side_rpc = self.__config.get('serverSideRpc', []) + + self.uplink_converter = self.__load_uplink_converter() + + self.__last_poll_time = 0 + + self.start() + + def __str__(self): + return f"Device(name={self.name}, address={self.details.address})" + + def __load_uplink_converter(self): + try: + if self.__config.get(UPLINK_PREFIX + CONVERTER_PARAMETER) is not None: + converter_class = TBModuleLoader.import_module(self.__connector_type, + self.__config[UPLINK_PREFIX + CONVERTER_PARAMETER]) + converter = converter_class(self.uplink_converter_config, self.__log) + else: + converter = AsyncBACnetUplinkConverter(self.uplink_converter_config, self.__log) + + return converter + except Exception as e: + self.__log.exception('Failed to load uplink converter for % slave: %s', self.name, e) + + def stop(self): + self.active = False + self.__stopped = True + + def run(self): + while not self.__stopped and self.active: + if time.monotonic() - self.__last_poll_time >= self.__poll_period: + self.__send_callback() + self.__last_poll_time = time.monotonic() + + time.sleep(.01) + + def __send_callback(self): + try: + self.callback(self) + except Exception as e: + self.__log.error('Error sending callback from device %s: %s', self, e) + + @staticmethod + def find_self_in_config(devices_config, apdu): + device_config = list(filter(lambda x: x['address'] == apdu.pduSource.exploded, devices_config)) + if len(device_config): + return device_config[0] + + @staticmethod + def get_object_id(config): + return ObjectIdentifier("%s:%s" % (config['objectType'], config['objectId'])) diff --git a/thingsboard_gateway/connectors/bacnet/entities/__init__.py b/thingsboard_gateway/connectors/bacnet/entities/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/thingsboard_gateway/connectors/bacnet/entities/bacnet_device_details.py b/thingsboard_gateway/connectors/bacnet/entities/bacnet_device_details.py new file mode 100644 index 000000000..78324cb31 --- /dev/null +++ b/thingsboard_gateway/connectors/bacnet/entities/bacnet_device_details.py @@ -0,0 +1,33 @@ +# Copyright 2024. ThingsBoard +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +class BACnetDeviceDetails: + def __init__(self, i_am_request): + self.address = i_am_request.pduSource.exploded + self.__object_identifier = i_am_request.iAmDeviceIdentifier[1] + self.__vendor_id = i_am_request.vendorID + self.__object_name = i_am_request.deviceName if hasattr(i_am_request, 'deviceName') else "Unknown" + + def __str__(self): + return (f"DeviceDetails(address={self.address}, objectIdentifier={self.__object_identifier}, " + f"vendorId={self.__vendor_id}, objectName={self.__object_name}") + + @property + def as_dict(self): + return { + "address": self.address, + "objectIdentifier": self.__object_identifier, + "vendorId": self.__vendor_id, + "objectName": self.__object_name + } diff --git a/thingsboard_gateway/connectors/bacnet/entities/device_info.py b/thingsboard_gateway/connectors/bacnet/entities/device_info.py new file mode 100644 index 000000000..6cef5bfea --- /dev/null +++ b/thingsboard_gateway/connectors/bacnet/entities/device_info.py @@ -0,0 +1,46 @@ +# Copyright 2024. ThingsBoard +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from thingsboard_gateway.tb_utility.tb_utility import TBUtility + + +class DeviceInfo: + def __init__(self, device_info_config, device_details): + self.device_name = self.__parse_device_name(device_info_config, device_details) + self.device_type = self.__parse_device_type(device_info_config, device_details) + + def __parse_device_name(self, config, data): + if config.get('deviceNameExpressionSource', 'expression') == 'expression': + return self.__parse_device_info(config['deviceNameExpression'], data) + + return config['deviceNameExpression'] + + def __parse_device_type(self, config, data): + if config.get('deviceProfileExpressionSource', 'expression') == 'expression': + return self.__parse_device_info(config.get('deviceProfileExpression', 'default'), data) + + return config.get('deviceProfileExpression', 'default') + + @staticmethod + def __parse_device_info(expression, data): + result_tags = TBUtility.get_values(expression, data.as_dict, get_tag=True) + result_values = TBUtility.get_values(expression, data.as_dict, expression_instead_none=True) + + result = expression + for (result_tag, result_value) in zip(result_tags, result_values): + is_valid_key = "${" in expression and "}" in expression + result = result.replace('${' + str(result_tag) + '}', + str(result_value)) if is_valid_key else result_tag + + return result diff --git a/thingsboard_gateway/connectors/bacnet/entities/device_object_config.py b/thingsboard_gateway/connectors/bacnet/entities/device_object_config.py new file mode 100644 index 000000000..9ed7d8243 --- /dev/null +++ b/thingsboard_gateway/connectors/bacnet/entities/device_object_config.py @@ -0,0 +1,44 @@ +# Copyright 2024. ThingsBoard +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +class DeviceObjectConfig: + def __init__(self, config): + self.address = config['address'] + self.device_discovery_timeout = int(config.get('deviceDiscoveryTimeoutInSec', 5)) + self.__object_identifier = int(config['objectIdentifier']) + self.__object_name = config.get('objectName', 'Gateway') + self.__network_number = int(config.get('networkNumber', 3)) + self.__network_number_quality = config.get('networkNumberQuality', 'configured') + self.__max_apdu_length_accepted = int(config.get('maxApduLengthAccepted', 1024)) + self.__segmentation_supported = config.get('segmentationSupported', 'segmentedBoth') + self.__vendor_identifier = int(config.get('vendorIdentifier', 15)) + + @property + def device_object_config(self): + return { + 'objectIdentifier': ('device', self.__object_identifier), + 'objectName': self.__object_name, + 'maxApduLengthAccepted': self.__max_apdu_length_accepted, + 'segmentationSupported': self.__segmentation_supported, + 'vendorIdentifier': self.__vendor_identifier + } + + @property + def network_port_object_config(self): + return { + 'objectIdentifier': ('network-port', self.__object_identifier + 1), + 'objectName': 'NetworkPort-1', + 'networkNumber': self.__network_number, + 'networkNumberQuality': self.__network_number_quality + } diff --git a/thingsboard_gateway/connectors/bacnet/entities/uplink_converter_config.py b/thingsboard_gateway/connectors/bacnet/entities/uplink_converter_config.py new file mode 100644 index 000000000..aa525df74 --- /dev/null +++ b/thingsboard_gateway/connectors/bacnet/entities/uplink_converter_config.py @@ -0,0 +1,33 @@ +# Copyright 2024. ThingsBoard +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +class UplinkConverterConfig: + def __init__(self, config, device_info, device_details): + self.__config = config + + self.device_details = device_details + self.device_name = device_info.device_name + self.device_type = device_info.device_type + self.__objects_to_read = self.__get_objects_to_read() + self.report_strategy = self.__config.get('reportStrategy', {}) + + def __get_objects_to_read(self): + attributes = [{**item, 'type': 'attributes'} for item in self.__config.get('attributes', [])] + timeseries = [{**item, 'type': 'telemetry'} for item in self.__config.get('timeseries', [])] + return attributes + timeseries + + @property + def objects_to_read(self): + return self.__objects_to_read + diff --git a/thingsboard_gateway/connectors/bacnet/bacnet_utilities/__init__.py b/thingsboard_gateway/connectors/bacnet_old/__init__.py similarity index 100% rename from thingsboard_gateway/connectors/bacnet/bacnet_utilities/__init__.py rename to thingsboard_gateway/connectors/bacnet_old/__init__.py diff --git a/thingsboard_gateway/connectors/bacnet_old/bacnet_connector.py b/thingsboard_gateway/connectors/bacnet_old/bacnet_connector.py new file mode 100644 index 000000000..a7e3f7437 --- /dev/null +++ b/thingsboard_gateway/connectors/bacnet_old/bacnet_connector.py @@ -0,0 +1,345 @@ +# Copyright 2024. ThingsBoard +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from queue import Queue +from random import choice +from string import ascii_lowercase +from threading import Thread +from time import sleep, time + +from thingsboard_gateway.gateway.statistics.decorators import CollectAllReceivedBytesStatistics +from thingsboard_gateway.gateway.statistics.statistics_service import StatisticsService +from thingsboard_gateway.tb_utility.tb_loader import TBModuleLoader +from thingsboard_gateway.tb_utility.tb_utility import TBUtility +from thingsboard_gateway.tb_utility.tb_logger import init_logger + +try: + from bacpypes.core import run, stop +except ImportError: + print("BACnet library not found - installing...") + TBUtility.install_package("bacpypes", ">=0.18.0") + from bacpypes.core import run, stop + +from bacpypes.pdu import Address, GlobalBroadcast, LocalBroadcast, LocalStation, RemoteStation + +from thingsboard_gateway.connectors.connector import Connector +from thingsboard_gateway.connectors.bacnet_old.bacnet_utilities.tb_gateway_bacnet_application import TBBACnetApplication + + +class BACnetConnector(Thread, Connector): + def __init__(self, gateway, config, connector_type): + self._connector_type = connector_type + self.statistics = {'MessagesReceived': 0, + 'MessagesSent': 0} + super().__init__() + self.__config = config + self.__id = self.__config.get('id') + self.name = config.get('name', 'BACnet ' + ''.join(choice(ascii_lowercase) for _ in range(5))) + self.__devices = [] + self.__device_indexes = {} + self.__devices_address_name = {} + self.__gateway = gateway + self._log = init_logger(self.__gateway, self.name, self.__config.get('logLevel', 'INFO'), + enable_remote_logging=self.__config.get('enableRemoteLogging', False)) + self._application = TBBACnetApplication(self, self.__config, self._log) + self.__bacnet_core_thread = Thread(target=run, name="BACnet core thread", daemon=True, + kwargs={"sigterm": None, "sigusr1": None}) + self.__bacnet_core_thread.start() + self.__stopped = False + self.__config_devices = self.__config["devices"] + self.default_converters = { + "uplink_converter": TBModuleLoader.import_module(self._connector_type, "BACnetUplinkConverter"), + "downlink_converter": TBModuleLoader.import_module(self._connector_type, "BACnetDownlinkConverter")} + self.__request_functions = {"writeProperty": self._application.do_write_property, + "readProperty": self._application.do_read_property, + "risingEdge": self._application.do_binary_rising_edge} + self.__available_object_resources = {} + self.rpc_requests_in_progress = {} + self.__connected = False + self.daemon = True + self.__convert_and_save_data_queue = Queue() + + def open(self): + self.__stopped = False + self.start() + + def run(self): + self.__connected = True + self.scan_network() + self._application.do_whois() + self._log.debug("WhoIsRequest has been sent.") + self.scan_network() + while not self.__stopped: + sleep(.2) + for device in self.__devices: + try: + if device.get("previous_check") is None or time() * 1000 - device["previous_check"] >= device[ + "poll_period"]: + for mapping_type in ["attributes", "telemetry"]: + for config in device[mapping_type]: + if config.get("uplink_converter") is None or config.get("downlink_converter") is None: + self.__load_converters(device) + data_to_application = { + "device": device, + "mapping_type": mapping_type, + "config": config, + "callback": self.__bacnet_device_mapping_response_cb + } + self._application.do_read_property(**data_to_application) + device["previous_check"] = time() * 1000 + else: + sleep(.2) + except Exception as e: + self._log.exception(e) + + if not self.__convert_and_save_data_queue.empty(): + for _ in range(self.__convert_and_save_data_queue.qsize()): + thread = Thread(target=self.__convert_and_save_data, args=(self.__convert_and_save_data_queue,), + daemon=True) + thread.start() + + def close(self): + self.__stopped = True + self.__connected = False + self._log.stop() + + self._application.mux.directPort.connected = False + self._application.mux.directPort.accepting = False + self._application.mux.directPort.connecting = False + self._application.mux.directPort.del_channel() + if self._application.mux.directPort.socket is not None: + try: + sleep(1) + self._application.mux.directPort.socket.close() + except OSError: + pass + + stop() + self._log.info('BACnet connector has been stopped.') + + def get_name(self): + return self.name + + def get_id(self): + return self.__id + + def get_type(self): + return self._connector_type + + def is_connected(self): + return self.__connected + + def is_stopped(self): + return self.__stopped + + @CollectAllReceivedBytesStatistics(start_stat_type='allReceivedBytesFromTB') + def on_attributes_update(self, content): + try: + self._log.debug('Recieved Attribute Update Request: %r', str(content)) + for device in self.__devices: + if device["deviceName"] == content["device"]: + for request in device["attribute_updates"]: + if request["config"].get("requestType") is not None: + for attribute in content["data"]: + if attribute == request["key"]: + request["iocb"][1]["config"].update({"propertyValue": content["data"][attribute]}) + kwargs = request["iocb"][1] + iocb = request["iocb"][0](device, **kwargs) + self.__request_functions[request["config"]["requestType"]](iocb) + return + else: + self._log.error("\"requestType\" not found in request configuration for key %s device: %s", + request.get("key", "[KEY IS EMPTY]"), + device["deviceName"]) + except Exception as e: + self._log.exception(e) + + @CollectAllReceivedBytesStatistics(start_stat_type='allReceivedBytesFromTB') + def server_side_rpc_handler(self, content): + try: + self._log.debug('Recieved RPC Request: %r', str(content)) + for device in self.__devices: + if device["deviceName"] == content["device"]: + method_found = False + for request in device["server_side_rpc"]: + if request["config"].get("requestType") is not None: + if content["data"]["method"] == request["method"]: + method_found = True + kwargs = request["iocb"][1] + timeout = time() * 1000 + request["config"].get("requestTimeout", 200) + if content["data"].get("params") is not None: + kwargs["config"].update({"propertyValue": content["data"]["params"]}) + iocb = request["iocb"][0](device, **kwargs) + self.__request_functions[request["config"]["requestType"]](device=iocb, + callback=self.__rpc_response_cb) + self.rpc_requests_in_progress[iocb] = {"content": content, + "uplink_converter": request["uplink_converter"]} + # self.__gateway.register_rpc_request_timeout(content, + # timeout, + # iocb, + # self.__rpc_cancel_processing) + else: + self._log.error("\"requestType\" not found in request configuration for key %s device: %s", + request.get("key", "[KEY IS EMPTY]"), + device["deviceName"]) + if not method_found: + self._log.error("RPC method %s not found in configuration", content["data"]["method"]) + self.__gateway.send_rpc_reply(content["device"], content["data"]["id"], success_sent=False) + except Exception as e: + self._log.exception(e) + + def __rpc_response_cb(self, iocb, callback_params=None): + device = self.rpc_requests_in_progress[iocb] + converter = device["uplink_converter"] + content = device["content"] + if iocb.ioResponse: + apdu = iocb.ioResponse + self._log.debug("Received callback with Response: %r", apdu) + converted_data = converter.convert(None, apdu) + if converted_data is None: + converted_data = {"success": True} + self.__gateway.send_rpc_reply(content["device"], content["data"]["id"], converted_data) + # self.__gateway.rpc_with_reply_processing(iocb, converted_data or {"success": True}) + elif iocb.ioError: + self._log.exception("Received callback with Error: %r", iocb.ioError) + data = {"error": str(iocb.ioError)} + self.__gateway.send_rpc_reply(content["device"], content["data"]["id"], data) + self._log.debug(iocb.ioError) + else: + self._log.error("Received unknown RPC response callback from device: %r", iocb) + + def __rpc_cancel_processing(self, iocb): + self._log.info("RPC with iocb %r - cancelled.", iocb) + + def scan_network(self): + self._application.do_whois() + self._log.debug("WhoIsRequest has been sent.") + for device in self.__config_devices: + try: + if self._application.check_or_add(device): + for mapping_type in ["attributes", "timeseries"]: + for config in device[mapping_type]: + if config.get("uplink_converter") is None or config.get("downlink_converter") is None: + self.__load_converters(device) + data_to_application = { + "device": device, + "mapping_type": mapping_type, + "config": config, + "callback": self.__bacnet_device_mapping_response_cb + } + self._application.do_read_property(**data_to_application) + except Exception as e: + self._log.exception(e) + + def __convert_and_save_data(self, queue): + converter, mapping_type, config, iocb = queue.get() + StatisticsService.count_connector_message(self.name, stat_parameter_name='connectorMsgsReceived') + StatisticsService.count_connector_bytes(self.name, iocb.ioResponse if iocb.ioResponse else iocb.ioError, + stat_parameter_name='connectorBytesReceived') + try: + converted_data = converter.convert((mapping_type, config), + iocb.ioResponse if iocb.ioResponse else iocb.ioError) + self.__gateway.send_to_storage(self.get_name(), self.__id, converted_data) + except Exception as e: + self._log.exception("Error while converting data: %s", e) + + def __bacnet_device_mapping_response_cb(self, iocb, callback_params): + mapping_type = callback_params["mapping_type"] + config = callback_params["config"] + converter = callback_params["config"].get("uplink_converter") + if converter is None: + for device in self.__devices: + self.__load_converters(device) + else: + converter = callback_params["config"].get("uplink_converter") + try: + converted_data = converter.convert((mapping_type, config), + iocb.ioResponse if iocb.ioResponse else iocb.ioError) + self.__gateway.send_to_storage(self.get_name(), self.get_id(), converted_data) + except Exception as e: + self._log.exception("Error while converting data in callback: %s", e) + + def __load_converters(self, device): + datatypes = ["attributes", "telemetry", "attribute_updates", "server_side_rpc"] + for datatype in datatypes: + for datatype_config in device.get(datatype, []): + try: + for converter_type in self.default_converters: + converter_object = self.default_converters[converter_type] if datatype_config.get( + "class") is None else TBModuleLoader.import_module(self._connector_type, + device.get("class")) + datatype_config[converter_type] = converter_object(device, self._log) + except Exception as e: + self._log.exception(e) + + def add_device(self, data): + if self.__devices_address_name.get(data["address"]) is None: + for device in self.__config_devices: + if device["address"] == data["address"]: + try: + config_address = Address(device["address"]) + device_name_tag = TBUtility.get_value(device["deviceName"], get_tag=True) + device_name = device["deviceName"].replace("${" + device_name_tag + "}", data["name"]) + device_information = { + **data, + **self.__get_requests_configs(device), + "type": device["deviceType"], + "config": device, + "attributes": device.get("attributes", []), + "telemetry": device.get("timeseries", []), + "poll_period": device.get("pollPeriod", 5000), + "deviceName": device_name, + } + if config_address == data["address"] or \ + (config_address, GlobalBroadcast) or \ + (isinstance(config_address, LocalBroadcast) and isinstance(device["address"], + LocalStation)) or \ + (isinstance(config_address, (LocalStation, RemoteStation)) and isinstance( + data["address"], ( + LocalStation, RemoteStation))): + self.__devices_address_name[data["address"]] = device_information["deviceName"] + self.__devices.append(device_information) + + self._log.debug(data["address"].addrType) + except Exception as e: + self._log.exception(e) + + def __get_requests_configs(self, device): + result = {"attribute_updates": [], "server_side_rpc": []} + for request in device.get("attributeUpdates", []): + kwarg_dict = { + "config": request, + "request_type": request["requestType"] + } + request_config = { + "key": request["key"], + "iocb": (self._application.form_iocb, kwarg_dict), + "config": request + } + result["attribute_updates"].append(request_config) + for request in device.get("serverSideRpc", []): + kwarg_dict = { + "config": request, + "request_type": request["requestType"] + } + request_config = { + "method": request["method"], + "iocb": (self._application.form_iocb, kwarg_dict), + "config": request + } + result["server_side_rpc"].append(request_config) + return result + + def get_config(self): + return self.__config diff --git a/thingsboard_gateway/connectors/bacnet_old/bacnet_converter.py b/thingsboard_gateway/connectors/bacnet_old/bacnet_converter.py new file mode 100644 index 000000000..49e229a83 --- /dev/null +++ b/thingsboard_gateway/connectors/bacnet_old/bacnet_converter.py @@ -0,0 +1,22 @@ +# Copyright 2024. ThingsBoard +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +from thingsboard_gateway.connectors.converter import Converter + + +class BACnetConverter(Converter): + def __init__(self, config): + pass + + def convert(self, config, data): + pass diff --git a/thingsboard_gateway/connectors/bacnet/bacnet_downlink_converter.py b/thingsboard_gateway/connectors/bacnet_old/bacnet_downlink_converter.py similarity index 91% rename from thingsboard_gateway/connectors/bacnet/bacnet_downlink_converter.py rename to thingsboard_gateway/connectors/bacnet_old/bacnet_downlink_converter.py index addc02617..446fd75af 100644 --- a/thingsboard_gateway/connectors/bacnet/bacnet_downlink_converter.py +++ b/thingsboard_gateway/connectors/bacnet_old/bacnet_downlink_converter.py @@ -12,7 +12,7 @@ # See the License for the specific language governing permissions and # limitations under the License. -from thingsboard_gateway.connectors.bacnet.bacnet_converter import Converter +from thingsboard_gateway.connectors.bacnet_old.bacnet_converter import Converter class BACnetDownlinkConverter(Converter): diff --git a/thingsboard_gateway/connectors/bacnet_old/bacnet_uplink_converter.py b/thingsboard_gateway/connectors/bacnet_old/bacnet_uplink_converter.py new file mode 100644 index 000000000..3b26c7779 --- /dev/null +++ b/thingsboard_gateway/connectors/bacnet_old/bacnet_uplink_converter.py @@ -0,0 +1,95 @@ +# Copyright 2024. ThingsBoard +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +from time import time + +from bacpypes.apdu import APDU, ReadPropertyACK +from bacpypes.constructeddata import ArrayOf +from bacpypes.primitivedata import Tag + +from thingsboard_gateway.connectors.bacnet_old.bacnet_converter import BACnetConverter +from thingsboard_gateway.gateway.constants import REPORT_STRATEGY_PARAMETER +from thingsboard_gateway.gateway.entities.converted_data import ConvertedData +from thingsboard_gateway.gateway.entities.report_strategy_config import ReportStrategyConfig +from thingsboard_gateway.gateway.entities.telemetry_entry import TelemetryEntry +from thingsboard_gateway.gateway.statistics.decorators import CollectStatistics +from thingsboard_gateway.gateway.statistics.statistics_service import StatisticsService +from thingsboard_gateway.tb_utility.tb_utility import TBUtility + + +class BACnetUplinkConverter(BACnetConverter): + DATATYPES = {"attributes": "attributes", + "timeseries": "telemetry", + "telemetry": "telemetry"} + def __init__(self, config, logger): + self._log = logger + self.__config = config + self.__device_report_strategy = None + try: + self.__device_report_strategy = ReportStrategyConfig(self.__config.get(REPORT_STRATEGY_PARAMETER)) + except ValueError as e: + self._log.trace("Report strategy is not set for device %s. Using strategy from upper level.", self.__config.get("deviceName")) + + @CollectStatistics(start_stat_type='receivedBytesFromDevices', + end_stat_type='convertedBytesFromDevice') + def convert(self, config, data): + converted_data = ConvertedData( + device_name=self.__config.get("deviceName", config[1].get("name", "BACnet device") if config is not None else "BACnet device"), + device_type=self.__config.get("deviceType", "default") + ) + + try: + value = None + if isinstance(data, ReadPropertyACK): + value = self.__property_value_from_apdu(data) + if config is not None: + datapoint_key = ( + TBUtility.convert_key_to_datapoint_key(config[1]["key"], + config[1].get(REPORT_STRATEGY_PARAMETER), + self.__device_report_strategy, + self._log)) + if BACnetUplinkConverter.DATATYPES[config[0]] == "attributes": + converted_data.add_to_attributes(datapoint_key, value) + else: + converted_data.add_to_telemetry(TelemetryEntry({datapoint_key: value}, time() * 1000)) + else: + converted_data = value + self._log.debug("Converted data: %s", converted_data) + except Exception as e: + StatisticsService.count_connector_message(self._log.name, 'convertersMsgDropped') + self._log.exception(e) + + if isinstance(converted_data, ConvertedData): + StatisticsService.count_connector_message(self._log.name, 'convertersAttrProduced', + count=converted_data.attributes_datapoints_count) + StatisticsService.count_connector_message(self._log.name, 'convertersTsProduced', + count=converted_data.telemetry_datapoints_count) + return converted_data + + @staticmethod + def __property_value_from_apdu(apdu: APDU): + tag_list = apdu.propertyValue.tagList + non_app_tags = [tag for tag in tag_list if tag.tagClass != Tag.applicationTagClass] + if non_app_tags: + raise RuntimeError("Value has some non-application tags") + first_tag = tag_list[0] + other_type_tags = [tag for tag in tag_list[1:] if tag.tagNumber != first_tag.tagNumber] + if other_type_tags: + raise RuntimeError("All tags must be the same type") + datatype = Tag._app_tag_class[first_tag.tagNumber] + if not datatype: + raise RuntimeError("unknown datatype") + if len(tag_list) > 1: + datatype = ArrayOf(datatype) + value = apdu.propertyValue.cast_out(datatype) + return value diff --git a/thingsboard_gateway/connectors/bacnet_old/bacnet_utilities/__init__.py b/thingsboard_gateway/connectors/bacnet_old/bacnet_utilities/__init__.py new file mode 100644 index 000000000..620ae8378 --- /dev/null +++ b/thingsboard_gateway/connectors/bacnet_old/bacnet_utilities/__init__.py @@ -0,0 +1,13 @@ +# Copyright 2024. ThingsBoard +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. diff --git a/thingsboard_gateway/connectors/bacnet/bacnet_utilities/tb_gateway_bacnet_application.py b/thingsboard_gateway/connectors/bacnet_old/bacnet_utilities/tb_gateway_bacnet_application.py similarity index 98% rename from thingsboard_gateway/connectors/bacnet/bacnet_utilities/tb_gateway_bacnet_application.py rename to thingsboard_gateway/connectors/bacnet_old/bacnet_utilities/tb_gateway_bacnet_application.py index fd032ab1f..cd4925bc0 100644 --- a/thingsboard_gateway/connectors/bacnet/bacnet_utilities/tb_gateway_bacnet_application.py +++ b/thingsboard_gateway/connectors/bacnet_old/bacnet_utilities/tb_gateway_bacnet_application.py @@ -21,7 +21,7 @@ from bacpypes.pdu import Address, GlobalBroadcast from bacpypes.primitivedata import Null, ObjectIdentifier, Atomic, Integer, Real, Unsigned -from thingsboard_gateway.connectors.bacnet.bacnet_utilities.tb_gateway_bacnet_device import TBBACnetDevice +from thingsboard_gateway.connectors.bacnet_old.bacnet_utilities.tb_gateway_bacnet_device import TBBACnetDevice LOG = None diff --git a/thingsboard_gateway/connectors/bacnet/bacnet_utilities/tb_gateway_bacnet_device.py b/thingsboard_gateway/connectors/bacnet_old/bacnet_utilities/tb_gateway_bacnet_device.py similarity index 100% rename from thingsboard_gateway/connectors/bacnet/bacnet_utilities/tb_gateway_bacnet_device.py rename to thingsboard_gateway/connectors/bacnet_old/bacnet_utilities/tb_gateway_bacnet_device.py diff --git a/thingsboard_gateway/extensions/bacnet_async/__init__.py b/thingsboard_gateway/extensions/bacnet_async/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/thingsboard_gateway/gateway/tb_gateway_service.py b/thingsboard_gateway/gateway/tb_gateway_service.py index f92b1bc73..928bdbf34 100644 --- a/thingsboard_gateway/gateway/tb_gateway_service.py +++ b/thingsboard_gateway/gateway/tb_gateway_service.py @@ -88,7 +88,8 @@ class TBGRPCServerManager: "ble": "BLEConnector", "request": "RequestConnector", "can": "CanConnector", - "bacnet": "BACnetConnector", + "bacnet": "AsyncBACnetConnector", + "bacnet_old": "BACnetConnector", "odbc": "OdbcConnector", "rest": "RESTConnector", "snmp": "SNMPConnector", diff --git a/thingsboard_gateway/tb_utility/tb_gateway_remote_configurator.py b/thingsboard_gateway/tb_utility/tb_gateway_remote_configurator.py index c51d58395..00eab0c2d 100644 --- a/thingsboard_gateway/tb_utility/tb_gateway_remote_configurator.py +++ b/thingsboard_gateway/tb_utility/tb_gateway_remote_configurator.py @@ -11,6 +11,7 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. + import inspect import os.path from copy import deepcopy From 3bb29d824cb251be2eaaeadacc2e4ed5b8d285b4 Mon Sep 17 00:00:00 2001 From: samson0v Date: Tue, 3 Dec 2024 12:55:33 +0200 Subject: [PATCH 2/3] Reformated setup.py and bacnet, modbus extension folders moved --- setup.py | 36 +++++++++++-------- .../extensions/bacnet/__init__.py | 13 ------- .../extensions/bacnet_old/__init__.py | 13 +++++++ .../extensions/modbus_async/__init__.py | 0 .../{bacnet_async => modbus_old}/__init__.py | 0 5 files changed, 34 insertions(+), 28 deletions(-) create mode 100644 thingsboard_gateway/extensions/bacnet_old/__init__.py delete mode 100644 thingsboard_gateway/extensions/modbus_async/__init__.py rename thingsboard_gateway/extensions/{bacnet_async => modbus_old}/__init__.py (100%) diff --git a/setup.py b/setup.py index 849738069..55805077b 100644 --- a/setup.py +++ b/setup.py @@ -35,22 +35,28 @@ long_description_content_type="text/markdown", include_package_data=True, python_requires=">=3.7", - packages=['thingsboard_gateway', 'thingsboard_gateway.gateway', 'thingsboard_gateway.gateway.proto', 'thingsboard_gateway.gateway.grpc_service', - 'thingsboard_gateway.storage', 'thingsboard_gateway.storage.memory', 'thingsboard_gateway.gateway.shell', 'thingsboard_gateway.gateway.report_strategy', - 'thingsboard_gateway.storage.file', 'thingsboard_gateway.storage.sqlite', 'thingsboard_gateway.gateway.entities', - 'thingsboard_gateway.connectors', 'thingsboard_gateway.connectors.ble', 'thingsboard_gateway.connectors.socket', - 'thingsboard_gateway.connectors.mqtt', 'thingsboard_gateway.connectors.xmpp', 'thingsboard_gateway.connectors.modbus_async', - 'thingsboard_gateway.connectors.opcua', 'thingsboard_gateway.connectors.request', 'thingsboard_gateway.connectors.ocpp', - 'thingsboard_gateway.connectors.modbus', 'thingsboard_gateway.connectors.can', 'thingsboard_gateway.connectors.bacnet_old', - 'thingsboard_gateway.connectors.bacnet_old.bacnet_utilities', 'thingsboard_gateway.connectors.odbc', 'thingsboard_gateway.connectors.bacnet', - 'thingsboard_gateway.connectors.rest', 'thingsboard_gateway.connectors.snmp', 'thingsboard_gateway.connectors.ftp', + packages=['thingsboard_gateway', 'thingsboard_gateway.gateway', 'thingsboard_gateway.gateway.proto', + 'thingsboard_gateway.gateway.grpc_service', 'thingsboard_gateway.storage', 'thingsboard_gateway.storage.memory', + 'thingsboard_gateway.gateway.shell', 'thingsboard_gateway.gateway.report_strategy', 'thingsboard_gateway.storage.file', + 'thingsboard_gateway.storage.sqlite', 'thingsboard_gateway.gateway.entities', 'thingsboard_gateway.connectors', + 'thingsboard_gateway.connectors.ble', 'thingsboard_gateway.extensions.ble', + 'thingsboard_gateway.connectors.socket', 'thingsboard_gateway.extensions.socket', + 'thingsboard_gateway.connectors.mqtt', 'thingsboard_gateway.extensions.mqtt', + 'thingsboard_gateway.connectors.xmpp', 'thingsboard_gateway.extensions.xmpp', + 'thingsboard_gateway.connectors.modbus', 'thingsboard_gateway.extensions.modbus', + 'thingsboard_gateway.connectors.modbus_old', 'thingsboard_gateway.connectors.modbus_old', + 'thingsboard_gateway.connectors.opcua', 'thingsboard_gateway.extensions.opcua', + 'thingsboard_gateway.connectors.request', 'thingsboard_gateway.extensions.request', + 'thingsboard_gateway.connectors.ocpp', 'thingsboard_gateway.extensions.ocpp', + 'thingsboard_gateway.connectors.can', 'thingsboard_gateway.extensions.can', + 'thingsboard_gateway.connectors.bacnet_old', 'thingsboard_gateway.connectors.bacnet_old.bacnet_utilities', 'thingsboard_gateway.extensions.bacnet_old', + 'thingsboard_gateway.connectors.odbc', 'thingsboard_gateway.extensions.odbc', + 'thingsboard_gateway.connectors.bacnet', 'thingsboard_gateway.extensions.bacnet', + 'thingsboard_gateway.connectors.rest', 'thingsboard_gateway.extensions.rest', + 'thingsboard_gateway.connectors.snmp', 'thingsboard_gateway.extensions.snmp', + 'thingsboard_gateway.connectors.ftp', 'thingsboard_gateway.extensions.ftp', 'thingsboard_gateway.tb_utility', 'thingsboard_gateway.extensions', - 'thingsboard_gateway.extensions.mqtt', 'thingsboard_gateway.extensions.modbus', 'thingsboard_gateway.extensions.opcua', - 'thingsboard_gateway.extensions.ocpp', 'thingsboard_gateway.extensions.ble', - 'thingsboard_gateway.extensions.serial', 'thingsboard_gateway.extensions.request', - 'thingsboard_gateway.extensions.can', 'thingsboard_gateway.extensions.bacnet_old', 'thingsboard_gateway.extensions.bacnet', 'thingsboard_gateway.extensions.odbc', - 'thingsboard_gateway.extensions.rest', 'thingsboard_gateway.extensions.snmp', 'thingsboard_gateway.extensions.ftp', - 'thingsboard_gateway.extensions.socket', 'thingsboard_gateway.extensions.xmpp', 'thingsboard_gateway.gateway.statistics' + 'thingsboard_gateway.extensions.serial', 'thingsboard_gateway.gateway.statistics' ], install_requires=[ 'setuptools', diff --git a/thingsboard_gateway/extensions/bacnet/__init__.py b/thingsboard_gateway/extensions/bacnet/__init__.py index 620ae8378..e69de29bb 100644 --- a/thingsboard_gateway/extensions/bacnet/__init__.py +++ b/thingsboard_gateway/extensions/bacnet/__init__.py @@ -1,13 +0,0 @@ -# Copyright 2024. ThingsBoard -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. diff --git a/thingsboard_gateway/extensions/bacnet_old/__init__.py b/thingsboard_gateway/extensions/bacnet_old/__init__.py new file mode 100644 index 000000000..620ae8378 --- /dev/null +++ b/thingsboard_gateway/extensions/bacnet_old/__init__.py @@ -0,0 +1,13 @@ +# Copyright 2024. ThingsBoard +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. diff --git a/thingsboard_gateway/extensions/modbus_async/__init__.py b/thingsboard_gateway/extensions/modbus_async/__init__.py deleted file mode 100644 index e69de29bb..000000000 diff --git a/thingsboard_gateway/extensions/bacnet_async/__init__.py b/thingsboard_gateway/extensions/modbus_old/__init__.py similarity index 100% rename from thingsboard_gateway/extensions/bacnet_async/__init__.py rename to thingsboard_gateway/extensions/modbus_old/__init__.py From 0be4a7853055666aa9ec0d5dd9cb4940382d799f Mon Sep 17 00:00:00 2001 From: samson0v Date: Tue, 3 Dec 2024 13:21:33 +0200 Subject: [PATCH 3/3] Fixes --- .../extensions/bacnet_async/__init__.py | 0 .../extensions/bacnet_old/__init__.py | 13 +++++++++++++ .../extensions/modbus_async/__init__.py | 0 .../extensions/modbus_old/__init__.py | 13 +++++++++++++ .../connectors/bacnet/bacnet_connector.py | 5 ----- thingsboard_gateway/connectors/bacnet/device.py | 4 ++-- 6 files changed, 28 insertions(+), 7 deletions(-) delete mode 100644 for_build/etc/thingsboard-gateway/extensions/bacnet_async/__init__.py create mode 100644 for_build/etc/thingsboard-gateway/extensions/bacnet_old/__init__.py delete mode 100644 for_build/etc/thingsboard-gateway/extensions/modbus_async/__init__.py create mode 100644 for_build/etc/thingsboard-gateway/extensions/modbus_old/__init__.py diff --git a/for_build/etc/thingsboard-gateway/extensions/bacnet_async/__init__.py b/for_build/etc/thingsboard-gateway/extensions/bacnet_async/__init__.py deleted file mode 100644 index e69de29bb..000000000 diff --git a/for_build/etc/thingsboard-gateway/extensions/bacnet_old/__init__.py b/for_build/etc/thingsboard-gateway/extensions/bacnet_old/__init__.py new file mode 100644 index 000000000..52b93288c --- /dev/null +++ b/for_build/etc/thingsboard-gateway/extensions/bacnet_old/__init__.py @@ -0,0 +1,13 @@ +# Copyright 2022. ThingsBoard +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. diff --git a/for_build/etc/thingsboard-gateway/extensions/modbus_async/__init__.py b/for_build/etc/thingsboard-gateway/extensions/modbus_async/__init__.py deleted file mode 100644 index e69de29bb..000000000 diff --git a/for_build/etc/thingsboard-gateway/extensions/modbus_old/__init__.py b/for_build/etc/thingsboard-gateway/extensions/modbus_old/__init__.py new file mode 100644 index 000000000..52b93288c --- /dev/null +++ b/for_build/etc/thingsboard-gateway/extensions/modbus_old/__init__.py @@ -0,0 +1,13 @@ +# Copyright 2022. ThingsBoard +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. diff --git a/thingsboard_gateway/connectors/bacnet/bacnet_connector.py b/thingsboard_gateway/connectors/bacnet/bacnet_connector.py index 004049db7..117f8c0e4 100644 --- a/thingsboard_gateway/connectors/bacnet/bacnet_connector.py +++ b/thingsboard_gateway/connectors/bacnet/bacnet_connector.py @@ -79,11 +79,6 @@ def __init__(self, gateway, config, connector_type): self.__devices = [] - self.__function_to_execute = { - 'readProperty': self.__read_property, - 'writeProperty': self.__write_property - } - def open(self): self.start() diff --git a/thingsboard_gateway/connectors/bacnet/device.py b/thingsboard_gateway/connectors/bacnet/device.py index a5b29cdc1..1525e7e51 100644 --- a/thingsboard_gateway/connectors/bacnet/device.py +++ b/thingsboard_gateway/connectors/bacnet/device.py @@ -76,8 +76,8 @@ def stop(self): self.__stopped = True def run(self): - while not self.__stopped and self.active: - if time.monotonic() - self.__last_poll_time >= self.__poll_period: + while not self.__stopped: + if time.monotonic() - self.__last_poll_time >= self.__poll_period and self.active: self.__send_callback() self.__last_poll_time = time.monotonic()