diff --git a/for_build/etc/thingsboard-gateway/extensions/modbus_async/__init__.py b/for_build/etc/thingsboard-gateway/extensions/modbus_async/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/setup.py b/setup.py index c5ca11d5a..a7cf8ad3c 100644 --- a/setup.py +++ b/setup.py @@ -39,7 +39,7 @@ 'thingsboard_gateway.storage', 'thingsboard_gateway.storage.memory', 'thingsboard_gateway.gateway.shell', 'thingsboard_gateway.storage.file', 'thingsboard_gateway.storage.sqlite', 'thingsboard_gateway.gateway.entities', 'thingsboard_gateway.connectors', 'thingsboard_gateway.connectors.ble', 'thingsboard_gateway.connectors.socket', - 'thingsboard_gateway.connectors.mqtt', 'thingsboard_gateway.connectors.xmpp', + 'thingsboard_gateway.connectors.mqtt', 'thingsboard_gateway.connectors.xmpp', 'thingsboard_gateway.connectors.modbus_async', 'thingsboard_gateway.connectors.opcua', 'thingsboard_gateway.connectors.request', 'thingsboard_gateway.connectors.ocpp', 'thingsboard_gateway.connectors.modbus', 'thingsboard_gateway.connectors.can', 'thingsboard_gateway.connectors.bacnet', 'thingsboard_gateway.connectors.bacnet.bacnet_utilities', 'thingsboard_gateway.connectors.odbc', diff --git a/thingsboard_gateway/__init__.py b/thingsboard_gateway/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/thingsboard_gateway/connectors/modbus_async/__init__.py b/thingsboard_gateway/connectors/modbus_async/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/thingsboard_gateway/connectors/modbus_async/bytes_modbus_downlink_converter.py b/thingsboard_gateway/connectors/modbus_async/bytes_modbus_downlink_converter.py new file mode 100644 index 000000000..b2208e7d2 --- /dev/null +++ b/thingsboard_gateway/connectors/modbus_async/bytes_modbus_downlink_converter.py @@ -0,0 +1,120 @@ +# Copyright 2024. ThingsBoard +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from pymodbus.payload import BinaryPayloadBuilder + +from thingsboard_gateway.connectors.modbus_async.modbus_converter import ModbusConverter +from thingsboard_gateway.connectors.modbus_async.entities.bytes_downlink_converter_config import \ + BytesDownlinkConverterConfig +from thingsboard_gateway.gateway.statistics.decorators import CollectStatistics + + +class BytesModbusDownlinkConverter(ModbusConverter): + + def __init__(self, _, logger): + self._log = logger + + @CollectStatistics(start_stat_type='allReceivedBytesFromTB', + end_stat_type='allBytesSentToDevices') + def convert(self, config: BytesDownlinkConverterConfig, data): + builder = BinaryPayloadBuilder(byteorder=config.byte_order, wordorder=config.word_order, repack=config.repack) + builder_functions = {"string": builder.add_string, + "bits": builder.add_bits, + "8int": builder.add_8bit_int, + "16int": builder.add_16bit_int, + "32int": builder.add_32bit_int, + "64int": builder.add_64bit_int, + "8uint": builder.add_8bit_uint, + "16uint": builder.add_16bit_uint, + "32uint": builder.add_32bit_uint, + "64uint": builder.add_64bit_uint, + "16float": builder.add_16bit_float, + "32float": builder.add_32bit_float, + "64float": builder.add_64bit_float} + + value = data["data"]["params"] + + if config.lower_type == "error": + self._log.error('"type" and "tag" - not found in configuration.') + + lower_type = config.lower_type + variable_size = config.objects_count * 16 if lower_type not in ["coils", "bits", "coil", + "bit"] else config.objects_count + + if lower_type in ["integer", "dword", "dword/integer", "word", "int"]: + lower_type = str(variable_size) + "int" + assert builder_functions.get(lower_type) is not None + builder_functions[lower_type](int(value)) + elif lower_type in ["uint", "unsigned", "unsigned integer", "unsigned int"]: + lower_type = str(variable_size) + "uint" + assert builder_functions.get(lower_type) is not None + builder_functions[lower_type](int(value)) + elif lower_type in ["float", "double"]: + lower_type = str(variable_size) + "float" + assert builder_functions.get(lower_type) is not None + builder_functions[lower_type](float(value)) + elif lower_type in ["coil", "bits", "coils", "bit"]: + assert builder_functions.get("bits") is not None + if variable_size > 1: + if isinstance(value, str): + builder_functions["bits"](bytes(value, encoding='UTF-8')) + elif isinstance(value, list): + builder_functions["bits"]([int(x) for x in value]) + else: + builder_functions["bits"]([int(x) for x in bin(value)[2:]]) + else: + return int(value).to_bytes(1, byteorder='big') + elif lower_type in ["string"]: + assert builder_functions.get("string") is not None + builder_functions[lower_type](value) + elif lower_type in builder_functions and 'int' in lower_type: + builder_functions[lower_type](int(value)) + elif lower_type in builder_functions and 'float' in lower_type: + builder_functions[lower_type](float(value)) + elif lower_type in builder_functions: + builder_functions[lower_type](value) + else: + self._log.error("Unknown variable type") + return None + + builder_converting_functions = {5: builder.to_coils, + 15: builder.to_coils, + 6: builder.to_registers, + 16: builder.to_registers} + + function_code = config.function_code + + if function_code in builder_converting_functions: + builder = builder_converting_functions[function_code]() + self._log.debug("Created builder %r.", builder) + if "Exception" in str(builder): + self._log.exception(builder) + builder = str(builder) + # if function_code is 5 , is using first coils value + if function_code == 5: + if isinstance(builder, list): + builder = builder[0] + else: + if variable_size <= 16: + if isinstance(builder, list) and len(builder) not in (8, 16, 32, 64): + builder = builder[0] + else: + if isinstance(builder, list) and len(builder) not in (2, 4): + self._log.warning("There is a problem with the value builder. " + "Only the first register is written.") + builder = builder[0] + return builder + self._log.warning("Unsupported function code, for the device %s in the Modbus Downlink converter", + config.device_name) + return None diff --git a/thingsboard_gateway/connectors/modbus_async/bytes_modbus_uplink_converter.py b/thingsboard_gateway/connectors/modbus_async/bytes_modbus_uplink_converter.py new file mode 100644 index 000000000..ddfa94bd4 --- /dev/null +++ b/thingsboard_gateway/connectors/modbus_async/bytes_modbus_uplink_converter.py @@ -0,0 +1,195 @@ +# Copyright 2024. ThingsBoard +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from typing import List, Union + +from pymodbus.constants import Endian +from pymodbus.exceptions import ModbusIOException +from pymodbus.payload import BinaryPayloadDecoder +from pymodbus.pdu import ExceptionResponse + +from thingsboard_gateway.connectors.modbus_async.entities.bytes_uplink_converter_config import BytesUplinkConverterConfig +from thingsboard_gateway.connectors.modbus_async.modbus_converter import ModbusConverter +from thingsboard_gateway.gateway.entities.converted_data import ConvertedData +from thingsboard_gateway.gateway.statistics.decorators import CollectStatistics +from thingsboard_gateway.gateway.statistics.statistics_service import StatisticsService + + +class BytesModbusUplinkConverter(ModbusConverter): + def __init__(self, config: BytesUplinkConverterConfig, logger): + self._log = logger + self.__config = config + + @CollectStatistics(start_stat_type='receivedBytesFromDevices', + end_stat_type='convertedBytesFromDevice') + def convert(self, _, data: List[dict]) -> Union[ConvertedData, None]: + result = ConvertedData(self.__config.device_name, self.__config.device_type) + converted_data_append_methods = { + 'attributes': result.add_to_attributes, + 'telemetry': result.add_to_telemetry + } + for device_data in data: + StatisticsService.count_connector_message(self._log.name, 'convertersMsgProcessed') + + for config_section in converted_data_append_methods: + for config in getattr(self.__config, config_section): + encoded_data = device_data[config_section].get(config['tag']) + + if encoded_data: + decoded_data = self.decode_data(encoded_data, config, + self.__config.byte_order, + self.__config.word_order) + + if decoded_data is not None: + converted_data_append_methods[config_section]({config['tag']: decoded_data}) + + self._log.trace("Decoded data: %s", result) + StatisticsService.count_connector_message(self._log.name, 'convertersAttrProduced', + count=result.attributes_datapoints_count) + StatisticsService.count_connector_message(self._log.name, 'convertersTsProduced', + count=result.telemetry_datapoints_count) + + return result + + def decode_data(self, encoded_data, config, endian_order, word_endian_order): + decoded_data = None + + if not isinstance(encoded_data, ModbusIOException) and not isinstance(encoded_data, ExceptionResponse): + if config['functionCode'] in (1, 2): + try: + decoder = self.from_coils(encoded_data.bits, endian_order=endian_order, + word_endian_order=word_endian_order) + except TypeError: + decoder = self.from_coils(encoded_data.bits, word_endian_order=word_endian_order) + + decoded_data = self.decode_from_registers(decoder, config) + elif config['functionCode'] in (3, 4): + decoder = BinaryPayloadDecoder.fromRegisters(encoded_data.registers, byteorder=endian_order, + wordorder=word_endian_order) + decoded_data = self.decode_from_registers(decoder, config) + + if config.get('divider'): + decoded_data = float(decoded_data) / float(config['divider']) + elif config.get('multiplier'): + decoded_data = decoded_data * config['multiplier'] + else: + self._log.exception("Error while decoding data: %s, with config: %s", encoded_data, config) + decoded_data = None + + return decoded_data + + @staticmethod + def from_coils(coils, endian_order=Endian.Little, word_endian_order=Endian.Big): + _is_wordorder = '_wordorder' in BinaryPayloadDecoder.fromCoils.__code__.co_varnames + if _is_wordorder: + try: + decoder = BinaryPayloadDecoder.fromCoils(coils, byteorder=endian_order, + wordorder=word_endian_order) + except TypeError: + decoder = BinaryPayloadDecoder.fromCoils(coils, wordorder=word_endian_order) + else: + try: + decoder = BinaryPayloadDecoder.fromCoils(coils, byteorder=endian_order, + wordorder=word_endian_order) + except TypeError: + decoder = BinaryPayloadDecoder.fromCoils(coils, wordorder=word_endian_order) + + return decoder + + def decode_from_registers(self, decoder, configuration): + objects_count = configuration.get("objectsCount", + configuration.get("registersCount", configuration.get("registerCount", 1))) + lower_type = configuration["type"].lower() + + decoder_functions = { + 'string': decoder.decode_string, + 'bytes': decoder.decode_string, + 'bit': decoder.decode_bits, + 'bits': decoder.decode_bits, + '8int': decoder.decode_8bit_int, + '8uint': decoder.decode_8bit_uint, + '16int': decoder.decode_16bit_int, + '16uint': decoder.decode_16bit_uint, + '16float': decoder.decode_16bit_float, + '32int': decoder.decode_32bit_int, + '32uint': decoder.decode_32bit_uint, + '32float': decoder.decode_32bit_float, + '64int': decoder.decode_64bit_int, + '64uint': decoder.decode_64bit_uint, + '64float': decoder.decode_64bit_float, + } + + decoded = None + + if lower_type in ['bit', 'bits']: + decoded = decoder_functions[lower_type]() + decoded_lastbyte = decoder_functions[lower_type]() + decoded += decoded_lastbyte + decoded = decoded[len(decoded)-objects_count:] + + elif lower_type == "string": + decoded = decoder_functions[lower_type](objects_count * 2) + + elif lower_type == "bytes": + decoded = decoder_functions[lower_type](size=objects_count * 2) + + elif decoder_functions.get(lower_type) is not None: + decoded = decoder_functions[lower_type]() + + elif lower_type in ['int', 'long', 'integer']: + type_ = str(objects_count * 16) + "int" + assert decoder_functions.get(type_) is not None + decoded = decoder_functions[type_]() + + elif lower_type in ["double", "float"]: + type_ = str(objects_count * 16) + "float" + assert decoder_functions.get(type_) is not None + decoded = decoder_functions[type_]() + + elif lower_type == 'uint': + type_ = str(objects_count * 16) + "uint" + assert decoder_functions.get(type_) is not None + decoded = decoder_functions[type_]() + + else: + self._log.error("Unknown type: %s", lower_type) + + if isinstance(decoded, int): + result_data = decoded + elif isinstance(decoded, bytes) and lower_type == "string": + try: + result_data = decoded.decode('UTF-8') + except UnicodeDecodeError as e: + self._log.error("Error decoding string from bytes, will be saved as hex: %s", decoded, exc_info=e) + result_data = decoded.hex() + elif isinstance(decoded, bytes) and lower_type == "bytes": + result_data = decoded.hex() + elif isinstance(decoded, list): + if configuration.get('bit') is not None: + result_data = int(decoded[configuration['bit'] if + configuration['bit'] < len(decoded) else len(decoded) - 1]) + else: + bitAsBoolean = configuration.get('bitTargetType', 'bool') == 'bool' + if objects_count == 1: + result_data = bool(decoded[-1]) if bitAsBoolean else int(decoded[-1]) + else: + result_data = [bool(bit) if bitAsBoolean else int(bit) for bit in decoded] + elif isinstance(decoded, float): + result_data = decoded + elif decoded is not None: + result_data = int(decoded, 16) + else: + result_data = decoded + + return result_data diff --git a/thingsboard_gateway/connectors/modbus_async/constants.py b/thingsboard_gateway/connectors/modbus_async/constants.py new file mode 100644 index 000000000..cf9f982c2 --- /dev/null +++ b/thingsboard_gateway/connectors/modbus_async/constants.py @@ -0,0 +1,98 @@ +# Copyright 2024. ThingsBoard +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from thingsboard_gateway.gateway.constants import * + +# Connector constants + +LAST_PREFIX = "last_" +NEXT_PREFIX = "next_" + +CHECK_POSTFIX = "_check" +POLL_PERIOD_POSTFIX = "PollPeriod" + +TIMESERIES_PARAMETER = "timeseries" + +MASTER_PARAMETER = "master" +AVAILABLE_FUNCTIONS_PARAMETER = "available_functions" + +CONNECTION_ATTEMPT_PARAMETER = "connection_attempt" +LAST_CONNECTION_ATTEMPT_TIME_PARAMETER = "last_connection_attempt_time" + +# Configuration parameters + +RPC_SECTION = "rpc" + +BYTE_ORDER_PARAMETER = "byteOrder" +WORD_ORDER_PARAMETER = "wordOrder" +SEND_DATA_ONLY_ON_CHANGE_PARAMETER = "sendDataOnlyOnChange" +CONNECT_ATTEMPT_COUNT_PARAMETER = "connectAttemptCount" +CONNECT_ATTEMPT_TIME_MS_PARAMETER = "connectAttemptTimeMs" +WAIT_AFTER_FAILED_ATTEMPTS_MS_PARAMETER = "waitAfterFailedAttemptsMs" + +FUNCTION_CODE_PARAMETER = "functionCode" + +ADDRESS_PARAMETER = "address" +OBJECTS_COUNT_PARAMETER = "objectsCount" + +UNIT_ID_PARAMETER = "unitId" + +HOST_PARAMETER = "host" +PORT_PARAMETER = "port" +BAUDRATE_PARAMETER = "baudrate" +TIMEOUT_PARAMETER = "timeout" +METHOD_PARAMETER = "method" +STOPBITS_PARAMETER = "stopbits" +BYTESIZE_PARAMETER = "bytesize" +PARITY_PARAMETER = "parity" +STRICT_PARAMETER = "strict" +TYPE_PARAMETER = "type" +SERIAL_CONNECTION_TYPE_PARAMETER = "serial" + +RETRIES_PARAMETER = "retries" +RETRY_ON_EMPTY_PARAMETER = "retryOnEmpty" +RETRY_ON_INVALID_PARAMETER = "retryOnInvalid" + +PAYLOAD_PARAMETER = "payload" +TAG_PARAMETER = "tag" + +COILS_INITIALIZER = "coils_initializer" +HOLDING_REGISTERS = "holding_registers" +INPUT_REGISTERS = "input_registers" +DISCRETE_INPUTS = "discrete_inputs" + +FUNCTION_TYPE = { + COILS_INITIALIZER: 'co', + HOLDING_REGISTERS: 'hr', + INPUT_REGISTERS: 'ir', + DISCRETE_INPUTS: 'di' +} + +FUNCTION_CODE_SLAVE_INITIALIZATION = { + HOLDING_REGISTERS: (6, 16), + COILS_INITIALIZER: (5, 15), + INPUT_REGISTERS: (6, 16), + DISCRETE_INPUTS: (5, 15) +} + +FUNCTION_CODE_READ = { + HOLDING_REGISTERS: 3, + COILS_INITIALIZER: 1, + INPUT_REGISTERS: 4, + DISCRETE_INPUTS: 2 +} + +# Default values + +TIMEOUT = 30 diff --git a/thingsboard_gateway/connectors/modbus_async/entities/__init__.py b/thingsboard_gateway/connectors/modbus_async/entities/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/thingsboard_gateway/connectors/modbus_async/entities/bytes_downlink_converter_config.py b/thingsboard_gateway/connectors/modbus_async/entities/bytes_downlink_converter_config.py new file mode 100644 index 000000000..b57034e21 --- /dev/null +++ b/thingsboard_gateway/connectors/modbus_async/entities/bytes_downlink_converter_config.py @@ -0,0 +1,13 @@ +from pymodbus.constants import Endian + + +class BytesDownlinkConverterConfig: + def __init__(self, device_name, byte_order, word_order, repack, objects_count, function_code, lower_type, address): + self.device_name = device_name + self.byte_order = Endian.Big if byte_order.upper() == "BIG" else Endian.Little + self.word_order = Endian.Big if word_order.upper() == "BIG" else Endian.Little + self.repack = repack + self.objects_count = objects_count + self.function_code = function_code + self.lower_type = lower_type.lower() + self.address = address diff --git a/thingsboard_gateway/connectors/modbus_async/entities/bytes_uplink_converter_config.py b/thingsboard_gateway/connectors/modbus_async/entities/bytes_uplink_converter_config.py new file mode 100644 index 000000000..147e3e5cb --- /dev/null +++ b/thingsboard_gateway/connectors/modbus_async/entities/bytes_uplink_converter_config.py @@ -0,0 +1,11 @@ +from pymodbus.constants import Endian + + +class BytesUplinkConverterConfig: + def __init__(self, **kwargs): + self.device_name = kwargs['deviceName'] + self.device_type = kwargs.get('deviceType', 'default') + self.byte_order = Endian.Big if kwargs.get('byteOrder', 'LITTLE').upper() == "BIG" else Endian.Little + self.word_order = Endian.Big if kwargs.get('wordOrder', 'LITTLE').upper() == "BIG" else Endian.Little + self.telemetry = kwargs.get('timeseries', []) + self.attributes = kwargs.get('attributes', []) diff --git a/thingsboard_gateway/connectors/modbus_async/entities/master.py b/thingsboard_gateway/connectors/modbus_async/entities/master.py new file mode 100644 index 000000000..ff3ccc964 --- /dev/null +++ b/thingsboard_gateway/connectors/modbus_async/entities/master.py @@ -0,0 +1,150 @@ +# Copyright 2024. ThingsBoard +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from asyncio import Lock + +from pymodbus.client import AsyncModbusTlsClient, AsyncModbusTcpClient, AsyncModbusUdpClient, AsyncModbusSerialClient +from pymodbus.framer.ascii_framer import ModbusAsciiFramer +from pymodbus.framer.rtu_framer import ModbusRtuFramer +from pymodbus.framer.socket_framer import ModbusSocketFramer + +from thingsboard_gateway.connectors.modbus_async.constants import SERIAL_CONNECTION_TYPE_PARAMETER + +FRAMER_TYPE = { + 'rtu': ModbusRtuFramer, + 'socket': ModbusSocketFramer, + 'ascii': ModbusAsciiFramer +} + +def with_lock_for_serial(func): + async def wrapper(master, *args, **kwargs): + if master.client_type == SERIAL_CONNECTION_TYPE_PARAMETER: + await master.lock.acquire() + + resp = await func(master, *args, **kwargs) + + if master.client_type == SERIAL_CONNECTION_TYPE_PARAMETER: + master.lock.release() + + return resp + + return wrapper + + +class Master: + def __init__(self, client_type, client): + self.lock = Lock() + self.client_type = client_type.lower() + self.__client = client + + def connected(self): + return self.__client.connected + + @with_lock_for_serial + async def connect(self): + await self.__client.connect() + + @with_lock_for_serial + async def close(self): + await self.__client.close() + + @with_lock_for_serial + async def read_coils(self, address, count, unit_id): + return await self.__client.read_coils(address=address, count=count, slave=unit_id) + + @with_lock_for_serial + async def read_discrete_inputs(self, address, count, unit_id): + return await self.__client.read_discrete_inputs(address=address, count=count, slave=unit_id) + + @with_lock_for_serial + async def read_holding_registers(self, address, count, unit_id): + return await self.__client.read_holding_registers(address=address, count=count, slave=unit_id) + + @with_lock_for_serial + async def read_input_registers(self, address, count, unit_id): + return await self.__client.read_input_registers(address=address, count=count, slave=unit_id) + + @with_lock_for_serial + async def write_coil(self, address, value, unit_id): + return await self.__client.write_coil(address=address, value=value, slave=unit_id) + + @with_lock_for_serial + async def write_register(self, address, value, unit_id): + return await self.__client.write_register(address=address, value=value, slave=unit_id) + + @with_lock_for_serial + async def write_coils(self, address, values, unit_id): + return await self.__client.write_coils(address=address, value=values, slave=unit_id) + + @with_lock_for_serial + async def write_registers(self, address, values, unit_id): + return await self.__client.write_registers(address=address, value=values, slave=unit_id) + + def get_available_functions(self): + return { + 1: self.read_coils, + 2: self.read_discrete_inputs, + 3: self.read_holding_registers, + 4: self.read_input_registers, + 5: self.write_coil, + 6: self.write_register, + 15: self.write_coils, + 16: self.write_registers, + } + + @staticmethod + def configure_master(config): + framer = FRAMER_TYPE[config.method] + + if config.type == 'tcp' and config.tls: + master = AsyncModbusTlsClient(config.host, + config.port, + framer, + timeout=config.timeout, + retry_on_empty=config.retry_on_empty, + retry_on_invalid=config.retry_on_invalid, + retries=config.retries, + **config.tls) + elif config.type == 'tcp': + master = AsyncModbusTcpClient(config.host, + config.port, + framer, + timeout=config.timeout, + retry_on_empty=config.retry_on_empty, + retry_on_invalid=config.retry_on_invalid, + retries=config.retries) + elif config.type == 'udp': + master = AsyncModbusUdpClient(config.host, + config.port, + framer, + timeout=config.timeout, + retry_on_empty=config.retry_on_empty, + retry_on_invalid=config.retry_on_invalid, + retries=config.retries) + elif config.type == 'serial': + master = AsyncModbusSerialClient(method=config.method, + port=config.port, + timeout=config.timeout, + retry_on_empty=config.retry_on_empty, + retry_on_invalid=config.retry_on_invalid, + retries=config.retries, + baudrate=config.baudrate, + stopbits=config.stopbits, + bytesize=config.bytesize, + parity=config.parity, + strict=config.strict) + else: + raise Exception("Invalid Modbus transport type.") + + return master diff --git a/thingsboard_gateway/connectors/modbus_async/modbus_connector.py b/thingsboard_gateway/connectors/modbus_async/modbus_connector.py new file mode 100644 index 000000000..b5470d594 --- /dev/null +++ b/thingsboard_gateway/connectors/modbus_async/modbus_connector.py @@ -0,0 +1,646 @@ +# Copyright 2024. ThingsBoard +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import asyncio +from asyncio import CancelledError +from queue import Queue, Empty +from threading import Thread +from random import choice +from string import ascii_lowercase +from time import monotonic, sleep + +from packaging import version + +from thingsboard_gateway.connectors.modbus_async.constants import ADDRESS_PARAMETER, TAG_PARAMETER, \ + FUNCTION_CODE_PARAMETER +from thingsboard_gateway.gateway.entities.converted_data import ConvertedData +from thingsboard_gateway.gateway.statistics.statistics_service import StatisticsService +from thingsboard_gateway.tb_utility.tb_utility import TBUtility +from thingsboard_gateway.connectors.connector import Connector +from thingsboard_gateway.gateway.constants import STATISTIC_MESSAGE_RECEIVED_PARAMETER, \ + STATISTIC_MESSAGE_SENT_PARAMETER, CONNECTOR_PARAMETER, DEVICE_SECTION_PARAMETER, DATA_PARAMETER, \ + RPC_METHOD_PARAMETER, RPC_PARAMS_PARAMETER, RPC_ID_PARAMETER +from thingsboard_gateway.tb_utility.tb_logger import init_logger + +# Try import Pymodbus library or install it and import +installation_required = False +required_version = '3.0.0' +force_install = False + +try: + from pymodbus import __version__ as pymodbus_version + + if version.parse(pymodbus_version) < version.parse(required_version): + installation_required = True + + if version.parse( + pymodbus_version) > version.parse(required_version): + installation_required = True + force_install = True + +except ImportError: + installation_required = True + +if installation_required: + print("Modbus library not found - installing...") + TBUtility.install_package("pymodbus", required_version, force_install=force_install) + TBUtility.install_package('pyserial') + TBUtility.install_package('pyserial-asyncio') + +try: + from twisted.internet import reactor +except ImportError: + TBUtility.install_package('twisted') + from twisted.internet import reactor + +from thingsboard_gateway.connectors.modbus_async.entities.master import Master +from thingsboard_gateway.connectors.modbus_async.server import Server +from thingsboard_gateway.connectors.modbus_async.slave import Slave +from thingsboard_gateway.connectors.modbus_async.entities.bytes_downlink_converter_config import \ + BytesDownlinkConverterConfig + +from pymodbus.exceptions import ConnectionException +from pymodbus.bit_read_message import ReadBitsResponseBase +from pymodbus.bit_write_message import WriteMultipleCoilsResponse, WriteSingleCoilResponse +from pymodbus.constants import Endian +from pymodbus.pdu import ExceptionResponse +from pymodbus.register_read_message import ReadRegistersResponseBase +from pymodbus.register_write_message import WriteMultipleRegistersResponse, WriteSingleRegisterResponse + + +class AsyncModbusConnector(Connector, Thread): + PROCESS_REQUESTS = Queue(-1) + + def __init__(self, gateway, config, connector_type): + self.statistics = {STATISTIC_MESSAGE_RECEIVED_PARAMETER: 0, + STATISTIC_MESSAGE_SENT_PARAMETER: 0} + super().__init__() + self.__gateway = gateway + self._connector_type = connector_type + self.__config = config + self.__log = init_logger(self.__gateway, config.get('name', self.name), + config.get('logLevel', 'INFO'), + enable_remote_logging=config.get('enableRemoteLogging', False)) + self.__log.info('Starting Modbus Connector...') + self.__id = self.__config.get('id') + self.name = self.__config.get("name", 'Modbus Connector ' + ''.join(choice(ascii_lowercase) for _ in range(5))) + self.__connected = False + self.__stopped = False + self.daemon = True + + try: + self.loop = asyncio.new_event_loop() + asyncio.set_event_loop(self.loop) + except RuntimeError: + self.loop = asyncio.get_event_loop() + + self.__data_to_convert = Queue(-1) + self.__data_to_save = Queue(-1) + + self.__server = None + + self._master_connections = {} + + self.__slaves = [] + self.__add_slaves(self.__config.get('master', {'slaves': []}).get('slaves', [])) + + def close(self): + self.__stopped = True + self.__connected = False + + self.__log.debug('Stopping %s...', self.get_name()) + + if self.__server: + self.__server.stop() + + for slave in self.__slaves: + slave.close() + + asyncio.run_coroutine_threadsafe(self.__cancel_all_tasks(), self.loop) + + self.__check_is_alive() + + self.__log.info('%s has been stopped', self.get_name()) + self.__log.stop() + + def __check_is_alive(self): + start_time = monotonic() + + while self.is_alive(): + if monotonic() - start_time > 10: + self.__log.error("Failed to stop connector %s", self.get_name()) + break + sleep(.1) + + async def __cancel_all_tasks(self): + await asyncio.sleep(5) + for task in asyncio.all_tasks(self.loop): + task.cancel() + + def open(self): + self.start() + + def run(self): + self.__connected = True + + # check if Gateway as a server need to be started + if Server.is_runnable(self.__config): + try: + self.__log.info("Starting Modbus server") + self.__run_server() + self.__add_slave(self.__server.get_slave_config_format()) + self.__log.info("Modbus server started") + except Exception as e: + self.__log.exception('Failed to start Modbus server: %s', e) + self.__connected = False + + Thread(target=self.__convert_data, daemon=True, name="Modbus connector data converter thread").start() + Thread(target=self.__save_data, daemon=True, name="Modbus connector data saver thread").start() + + try: + self.loop.run_until_complete(self.__process_requests()) + except CancelledError as e: + self.__log.debug('Task was cancelled due to connector stop: %s', e.__str__()) + except Exception as e: + self.__log.exception(e) + + def __run_server(self): + self.__server = Server(self.__config['slave'], self.__log) + self.__server.start() + + def __get_master(self, slave: Slave): + """ + Method check if connection to master already exists and return it + or create new connection and return it + + Newly created master connection structure will look like: + self._master_connections = { + '127.0.0.1:5021': AsyncClient object + } + """ + + socket_str = slave.host + ':' + str(slave.port) + if socket_str not in self._master_connections: + master_connection = Master.configure_master(slave) + master = Master(slave.type, master_connection) + self._master_connections[socket_str] = master + + return self._master_connections[socket_str] + + def __add_slave(self, slave_config): + slave = Slave(self, self.__log, slave_config) + master = self.__get_master(slave) + slave.master = master + + self.__slaves.append(slave) + + def __add_slaves(self, slaves_config): + for slave_config in slaves_config: + try: + self.__add_slave(slave_config) + except Exception as e: + self.__log.exception('Failed to add slave: %s', e) + + self.__log.debug('Added %d slaves', len(self.__slaves)) + + @classmethod + def callback(cls, slave: Slave): + cls.PROCESS_REQUESTS.put_nowait(slave) + + async def __process_requests(self): + while not self.__stopped: + try: + slave = self.PROCESS_REQUESTS.get_nowait() + await self.__poll_device(slave) + except Empty: + await asyncio.sleep(.01) + except Exception as e: + self.__log.exception('Failed to poll device: %s', e) + + async def __poll_device(self, slave): + self.__log.debug("Polling %s slave", slave) + + # check if device have attributes or telemetry to poll + if slave.uplink_converter_config.attributes or slave.uplink_converter_config.telemetry: + try: + connected_to_master = await slave.connect() + + if connected_to_master: + slave_data = await self.__read_slave_data(slave) + self.__data_to_convert.put_nowait((slave, slave_data)) + + self.__manage_device_connectivity_to_platform(slave) + else: + self.__log.error('Socket is closed, connection is lost, for device %s', slave) + self.__delete_device_from_platform(slave) + except (ConnectionException, asyncio.exceptions.TimeoutError): + self.__delete_device_from_platform(slave) + await asyncio.sleep(5) + self.__log.error('Failed to connect to device %s', slave) + except Exception as e: + self.__delete_device_from_platform(slave) + self.__log.exception('Failed to poll %s device: %s', slave, e) + else: + self.__log.error('Config is empty. Nothing to read, for device %s', slave) + + async def __read_slave_data(self, slave: Slave): + result = { + 'telemetry': {}, + 'attributes': {} + } + + for config_section in ('attributes', 'telemetry'): + for config in getattr(slave.uplink_converter_config, config_section): + response = await slave.read(config['functionCode'], config['address'], config['objectsCount']) + + if 'Exception' in str(response) or 'Error' in str(response): + self.__log.error("Reading failed for device %s function code %s address %s unit id %s", + slave.device_name, config['functionCode'], config[ADDRESS_PARAMETER], + slave.unit_id) + self.__log.error("Reading failed with exception:", exc_info=result) + self.__log.info("Trying to reconnect to device %s", slave.device_name) + if slave.master.connected(): + await slave.master.close() + await slave.connect() + if slave.master.connected(): + self.__log.info("Reconnected to device %s", slave.device_name) + response = await slave.read(config['functionCode'], config['address'], config['objectsCount']) + if "Exception" in str(result) or "Error" in str(result): + self.__log.error("Reading failed for device %s function code %s address %s unit id %s", + slave.device_name, config['functionCode'], config[ADDRESS_PARAMETER], + slave.unit_id) + self.__log.error("Reading failed with exception:", exc_info=result) + + if slave.master.connected(): + await slave.master.close() + + self.__log.info("Will try to connect to device %s later", slave.device_name) + + result[config_section][config['tag']] = response + + return result + + def __manage_device_connectivity_to_platform(self, slave: Slave): + if slave.master.connected() and not slave.is_connected_to_platform(): + self.__add_device_to_platform(slave) + + def __delete_device_from_platform(self, slave: Slave): + if slave.is_connected_to_platform(): + self.__gateway.del_device(slave.device_name) + slave.last_connect_time = 0 + + def __add_device_to_platform(self, slave: Slave): + """ + Add device to platform + """ + + device_connected = slave.is_connected_to_platform() + if not device_connected and slave.master.connected(): + device_connected = self.__gateway.add_device(slave.device_name, + {CONNECTOR_PARAMETER: self}, + device_type=slave.device_type) + + slave.last_connect_time = monotonic() if device_connected else 0 + + def __convert_data(self): + while not self.__stopped: + try: + batch_to_convert = {} + if not self.__data_to_convert.empty(): + batch_forming_start = monotonic() + while not self.__stopped and monotonic() - batch_forming_start < 0.1: + try: + slave, data = self.__data_to_convert.get_nowait() + except Empty: + break + + batch_key = (slave.device_name, slave.uplink_converter) + + if batch_key not in batch_to_convert: + batch_to_convert[batch_key] = [] + + batch_to_convert[batch_key].append(data) + + for (device_name, uplink_converter), data in batch_to_convert.items(): + converted_data: ConvertedData = uplink_converter.convert({}, data) + if len(converted_data['attributes']) or len(converted_data['telemetry']): + self.__data_to_save.put_nowait(converted_data) + else: + sleep(.001) + except Exception as e: + self.__log.error('Exception in convertion data loop: %s', e) + + def __save_data(self): + while not self.__stopped: + if not self.__data_to_save.empty(): + try: + converted_data = self.__data_to_save.get_nowait() + StatisticsService.count_connector_message(self.get_name(), stat_parameter_name='storageMsgPushed') + self.__gateway.send_to_storage(self.get_name(), self.get_id(), converted_data) + self.statistics[STATISTIC_MESSAGE_SENT_PARAMETER] += 1 + except Empty: + sleep(.001) + else: + sleep(.001) + + def on_attributes_update(self, content): + self.__log.debug('Got attributes update: %s', content) + + try: + device = self.__get_device_by_name(content[DEVICE_SECTION_PARAMETER]) + + for attribute_updates_command_config in device.attributes_updates_config: + for attribute_updated in content[DATA_PARAMETER]: + if attribute_updates_command_config[TAG_PARAMETER] == attribute_updated: + to_process = { + DEVICE_SECTION_PARAMETER: content[DEVICE_SECTION_PARAMETER], + DATA_PARAMETER: { + RPC_METHOD_PARAMETER: attribute_updated, + RPC_PARAMS_PARAMETER: content[DATA_PARAMETER][attribute_updated] + } + } + + self.__create_task(self.__process_attribute_request, + (device, to_process, attribute_updates_command_config), + {}) + except Exception as e: + self.__log.exception('Failed to update attributes: %s', e) + + async def __process_attribute_request(self, device: Slave, data, config): + if config is not None: + converted_data = None + + config = BytesDownlinkConverterConfig( + device_name=device.device_name, + byte_order=device.byte_order, + word_order=device.word_order, + repack=device.repack, + objects_count=config.get("objectsCount", config.get("registersCount", config.get("registerCount", 1))), + function_code=config.get(FUNCTION_CODE_PARAMETER), + lower_type=config.get("type", config.get("tag", "error")), + address=config.get(ADDRESS_PARAMETER) + ) + + if config.function_code in (5, 6): + converted_data = device.downlink_converter.convert(config, data) + + try: + converted_data = converted_data[0] + except IndexError and TypeError: + pass + elif config.function_code in (15, 16): + converted_data = device.downlink_converter.convert(config, data) + + if converted_data is not None: + try: + connected = await device.connect() + if connected: + await device.write(config.function_code, config.address, converted_data) + else: + self.__log.error( + 'Failed to process attribute update: Socket is closed, connection is lost, for device %s', + device) + except Exception as e: + self.__log.exception('Failed to process attribute update: ', e) + + def server_side_rpc_handler(self, content): + self.__log.debug('Received server side rpc request: %r', content) + + try: + if self.__is_old_format_rpc_content(content): + self.__convert_old_format_rpc_content(content) + + self.__check_is_connector_rpc(content) + + if content.get(DEVICE_SECTION_PARAMETER) is not None: + self.__log.debug("Modbus connector received rpc request for %s with server_rpc_request: %s", + content[DEVICE_SECTION_PARAMETER], + content) + device = self.__get_device_by_name(content[DEVICE_SECTION_PARAMETER]) + + config = self.__get_rpc_config(device, content) + if config is None: + self.__log.error("Received rpc request, but method %s not found in config for %s.", + content['data']['method'], + self.get_name()) + self.__gateway.send_rpc_reply(content[DEVICE_SECTION_PARAMETER], + content[DATA_PARAMETER][RPC_ID_PARAMETER], + {content['data']['method']: "METHOD NOT FOUND!"}) + return + + result = {} + self.__create_task(self.__process_rpc_request, (device, config, content), {'result': result}) + + return result['response'] + else: + self.__log.debug("Received RPC to connector: %r", content) + results = [] + for device in self.__slaves: + content[DEVICE_SECTION_PARAMETER] = device.device_name + result = {} + self.__create_task(self.__process_rpc_request, + (device, content, content), + {'with_response': True, 'result': result}) + results.append(result['response']) + + return results + except Exception as e: + self.__log.exception('Failed to process server side rpc request: %s', e) + + def __create_task(self, func, args, kwargs): + task = self.loop.create_task(func(*args, **kwargs)) + + while not task.done(): + sleep(.02) + + @staticmethod + def __is_old_format_rpc_content(content): + return content.get('data') is None + + @staticmethod + def __convert_old_format_rpc_content(content): + content['data'] = {'params': content['params'], + 'method': content['method']} + + def __check_is_connector_rpc(self, content): + """ + Check if RPC type is connector RPC (can be only 'set') + """ + + try: + (connector_type, rpc_method_name) = content['data']['method'].split('_') + if connector_type == self._connector_type: + content['data']['method'] = rpc_method_name + content['device'] = content['params'].split(' ')[0].split('=')[-1] + except (IndexError, ValueError, AttributeError): + pass + + @staticmethod + def __get_rpc_config(device: Slave, content): + rpc_method = content['data']['method'] + if rpc_method == 'get' or rpc_method == 'set': + params = {} + for param in content['data']['params'].split(';'): + try: + (key, value) = param.split('=') + except ValueError: + continue + + if key and value: + params[key] = value if key not in ('functionCode', 'objectsCount', 'address') else int( + value) + + return params + elif isinstance(device.rpc_requests_config, dict): + return device.rpc_requests_config.get(rpc_method) + elif isinstance(device.rpc_requests_config, list): + for rpc_command_config in device.rpc_requests_config: + if rpc_command_config[TAG_PARAMETER] == rpc_method: + return rpc_command_config + else: + return None + + async def __process_rpc_request(self, device: Slave, config, data, with_response=False, result={}): + try: + if config is not None: + if config['functionCode'] in (5, 6, 15, 16): + response = await self.__write_rpc_data(device, config, data) + elif config['functionCode'] in (1, 2, 3, 4): + response = await self.__read_rpc_data(device, config) + else: + response = 'Unsupported function code in RPC request.' + + if self.__can_rpc_return_response(data): + result['response'] = self.__send_rpc_response(data, response, with_response) + except Exception as e: + self.__log.error('Failed to process rpc request: %s', e) + result['response'] = e.__str__() + + async def __read_rpc_data(self, device, config): + response = None + + try: + connected = await device.connect() + if connected: + response = await device.read(config['functionCode'], config['address'], config['objectsCount']) + except Exception as e: + self.__log.error('Failed to process rpc request: %s', e) + response = e + + if isinstance(response, (ReadRegistersResponseBase, ReadBitsResponseBase)): + endian_order = Endian.Big if device.byte_order.upper() == "BIG" else Endian.Little + word_endian_order = Endian.Big if device.word_order.upper() == "BIG" else Endian.Little + response = device.uplink_converter.decode_data(response, config, endian_order, word_endian_order) + + return response + + async def __write_rpc_data(self, device, config, data): + response = None + + config = BytesDownlinkConverterConfig( + device_name=device.device_name, + byte_order=device.byte_order, + word_order=device.word_order, + repack=device.repack, + objects_count=config.get("objectsCount", config.get("registersCount", config.get("registerCount", 1))), + function_code=config.get(FUNCTION_CODE_PARAMETER), + lower_type=config.get("type", config.get("tag", "error")), + address=config.get(ADDRESS_PARAMETER) + ) + + converted_data = device.downlink_converter.convert(config, data) + + if config.function_code in (5, 6): + try: + converted_data = converted_data[0] + except IndexError and TypeError: + pass + + if converted_data is not None: + try: + connected = await device.connect() + if connected: + response = await device.write(config.function_code, config.address, converted_data) + except Exception as e: + self.__log.error('Failed to process rpc request: %s', e) + response = e + + if isinstance(response, (WriteMultipleRegistersResponse, + WriteMultipleCoilsResponse, + WriteSingleCoilResponse, + WriteSingleRegisterResponse)): + self.__log.debug("Write %r", str(response)) + response = {"success": True} + + return response + + @staticmethod + def __can_rpc_return_response(content): + return content.get(RPC_ID_PARAMETER) or (content.get(DATA_PARAMETER) is not None + and content[DATA_PARAMETER].get(RPC_ID_PARAMETER) is not None) + + def __send_rpc_response(self, content, response, with_response=False): + if isinstance(response, Exception) or isinstance(response, ExceptionResponse): + if not with_response: + self.__gateway.send_rpc_reply(device=content[DEVICE_SECTION_PARAMETER], + req_id=content[DATA_PARAMETER].get(RPC_ID_PARAMETER), + content={ + content[DATA_PARAMETER][RPC_METHOD_PARAMETER]: str(response) + }, + success_sent=False) + else: + return { + 'device': content[DEVICE_SECTION_PARAMETER], + 'req_id': content[DATA_PARAMETER].get(RPC_ID_PARAMETER), + 'content': { + content[DATA_PARAMETER][RPC_METHOD_PARAMETER]: str(response) + }, + 'success_sent': False + } + else: + if not with_response: + self.__gateway.send_rpc_reply(device=content[DEVICE_SECTION_PARAMETER], + req_id=content[DATA_PARAMETER].get(RPC_ID_PARAMETER), + content=response) + else: + return { + 'device': content[DEVICE_SECTION_PARAMETER], + 'req_id': content[DATA_PARAMETER].get(RPC_ID_PARAMETER), + 'content': response + } + + def __get_device_by_name(self, device_name) -> Slave: + return tuple(filter(lambda slave: slave.device_name == device_name, self.__slaves))[0] + + @property + def connector_type(self): + return self._connector_type + + def get_id(self): + return self.__id + + def get_name(self): + return self.name + + def get_type(self): + return self._connector_type + + def get_config(self): + return self.__config + + def is_connected(self): + return self.__connected + + def is_stopped(self): + return self.__stopped diff --git a/thingsboard_gateway/connectors/modbus_async/modbus_converter.py b/thingsboard_gateway/connectors/modbus_async/modbus_converter.py new file mode 100644 index 000000000..8026512d4 --- /dev/null +++ b/thingsboard_gateway/connectors/modbus_async/modbus_converter.py @@ -0,0 +1,21 @@ +# Copyright 2024. ThingsBoard +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from thingsboard_gateway.connectors.converter import Converter, abstractmethod + + +class ModbusConverter(Converter): + @abstractmethod + def convert(self, config, data): + pass diff --git a/thingsboard_gateway/connectors/modbus_async/server.py b/thingsboard_gateway/connectors/modbus_async/server.py new file mode 100644 index 000000000..ffd42d86e --- /dev/null +++ b/thingsboard_gateway/connectors/modbus_async/server.py @@ -0,0 +1,216 @@ +# Copyright 2024. ThingsBoard +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import asyncio +from asyncio import CancelledError +from threading import Thread +from time import monotonic, sleep + +from pymodbus.datastore import ModbusSparseDataBlock, ModbusServerContext, ModbusSlaveContext +from pymodbus.device import ModbusDeviceIdentification +from pymodbus.framer.ascii_framer import ModbusAsciiFramer +from pymodbus.framer.rtu_framer import ModbusRtuFramer +from pymodbus.framer.socket_framer import ModbusSocketFramer +from pymodbus.server import StartAsyncTcpServer, StartAsyncTlsServer, StartAsyncUdpServer, StartAsyncSerialServer, \ + ServerAsyncStop +from pymodbus.version import version + +from thingsboard_gateway.connectors.modbus_async.bytes_modbus_downlink_converter import BytesModbusDownlinkConverter +from thingsboard_gateway.connectors.modbus_async.entities.bytes_downlink_converter_config import \ + BytesDownlinkConverterConfig +from thingsboard_gateway.connectors.modbus_async.constants import FUNCTION_CODE_SLAVE_INITIALIZATION, FUNCTION_TYPE, \ + FUNCTION_CODE_READ + +SLAVE_TYPE = { + 'tcp': StartAsyncTcpServer, + 'tls': StartAsyncTlsServer, + 'udp': StartAsyncUdpServer, + 'serial': StartAsyncSerialServer +} + +FRAMER_TYPE = { + 'rtu': ModbusRtuFramer, + 'socket': ModbusSocketFramer, + 'ascii': ModbusAsciiFramer +} + + +class Server(Thread): + def __init__(self, config, logger): + super().__init__() + self.__stopped = False + self.daemon = True + self.name = 'Gateway Modbus Server (Slave)' + + self.__log = logger + + self.__config = config + + self.unit_id = config['unitId'] + self.host = config['host'] + self.port = config['port'] + self.device_name = config.get('deviceName', 'Modbus Slave') + self.device_type = config.get('deviceType', 'default') + self.poll_period = config.get('pollPeriod', 5000) + self.method = config.get('method', 'socket').lower() + + self.__type = config.get('type', 'tcp').lower() + self.__identity = self.__get_identity(self.__config) + self.__server_context = self.__get_server_context(self.__config) + self.__connection_config = self.__get_connection_config(self.__config) + + try: + self.loop = asyncio.new_event_loop() + asyncio.set_event_loop(self.loop) + except RuntimeError: + self.loop = asyncio.get_event_loop() + + def __str__(self): + return self.name + + def run(self): + try: + self.loop.run_until_complete(self.start_server()) + except CancelledError: + self.__log.debug("Server %s has been stopped", self.name) + except Exception as e: + self.__log.error("Server has been stopped with error: %s", e) + + def stop(self): + self.__stopped = True + + asyncio.run_coroutine_threadsafe(self.__cancel_all_tasks(), self.loop) + + self.__check_is_alive() + + def __check_is_alive(self): + start_time = monotonic() + + while self.is_alive(): + if monotonic() - start_time > 10: + self.__log.error("Failed to stop connector %s", self.name) + break + sleep(.1) + + async def __cancel_all_tasks(self): + await asyncio.sleep(5) + + for task in asyncio.all_tasks(self.loop): + task.cancel() + + await ServerAsyncStop() + + async def start_server(self): + try: + await SLAVE_TYPE[self.__type](identity=self.__identity, context=self.__server_context, + **self.__connection_config) + except Exception as e: + self.__stopped = True + self.__log.error('Failed to start Gateway Modbus Server (Slave): %s', e) + + def get_slave_config_format(self): + """ + Function return configuration in slave format for adding it to gateway slaves list + """ + + config = { + 'unitId': self.unit_id, + 'deviceName': self.device_name, + 'deviceType': self.device_type, + 'pollPeriod': self.poll_period, + 'host': self.host, + 'port': self.port, + 'method': self.method, + } + + for (register, register_values) in self.__config.get('values', {}).items(): + for (section_name, section_values) in register_values.items(): + if not config.get(section_name): + config[section_name] = [] + + for item in section_values: + item_config = { + **item, + 'functionCode': FUNCTION_CODE_READ[register] + if section_name not in ('attributeUpdates', 'rpc') else item['functionCode'], + } + config[section_name].append(item_config) + + return config + + @staticmethod + def is_runnable(config): + return config.get('slave') and config.get('slave', {}).get('sendDataToThingsBoard', False) + + @staticmethod + def __get_identity(config): + identity = None + + if config.get('identity'): + identity = ModbusDeviceIdentification() + identity.VendorName = config['identity'].get('vendorName', '') + identity.ProductCode = config['identity'].get('productCode', '') + identity.VendorUrl = config['identity'].get('vendorUrl', '') + identity.ProductName = config['identity'].get('productName', '') + identity.ModelName = config['identity'].get('ModelName', '') + identity.MajorMinorRevision = version.short() + + return identity + + @staticmethod + def __get_connection_config(config): + return { + 'type': config['type'], + 'address': (config.get('host'), config.get('port')) if (config['type'] == 'tcp' or 'udp') else None, + 'port': config.get('port') if config['type'] == 'serial' else None, + 'framer': FRAMER_TYPE[config['method']], + 'security': config.get('security', {}) + } + + def __get_server_context(self, config): + blocks = {} + if (config.get('values') is None) or (not len(config.get('values'))): + self.__log.error("No values to read from device %s", config.get('deviceName', 'Modbus Slave')) + return + + for (key, value) in config.get('values').items(): + values = {} + converter = BytesModbusDownlinkConverter({}, self.__log) + for section in ('attributes', 'timeseries', 'attributeUpdates', 'rpc'): + for item in value.get(section, []): + function_code = FUNCTION_CODE_SLAVE_INITIALIZATION[key][0] if item['objectsCount'] <= 1 else \ + FUNCTION_CODE_SLAVE_INITIALIZATION[key][1] + converter_config = BytesDownlinkConverterConfig( + device_name=config.get('deviceName', 'Gateway'), + byte_order=config['byteOrder'], + word_order=config.get('wordOrder', 'LITTLE'), + repack=config.get('repack', False), + objects_count=item['objectsCount'], + function_code=function_code, + lower_type=item.get('type', item.get('tag', 'error')), + address=item.get('address', 0) + ) + converted_value = converter.convert(converter_config, {'data': {'params': item['value']}}) + if converted_value is not None: + values[item['address'] + 1] = converted_value + else: + self.__log.error("Failed to convert value %s with type %s, skipping...", item['value'], + item['type']) + if len(values): + blocks[FUNCTION_TYPE[key]] = ModbusSparseDataBlock(values) + + if not len(blocks): + self.__log.info("%s - will be initialized without values", config.get('deviceName', 'Modbus Slave')) + + return ModbusServerContext(slaves=ModbusSlaveContext(**blocks), single=True) diff --git a/thingsboard_gateway/connectors/modbus_async/slave.py b/thingsboard_gateway/connectors/modbus_async/slave.py new file mode 100644 index 000000000..2363c81ff --- /dev/null +++ b/thingsboard_gateway/connectors/modbus_async/slave.py @@ -0,0 +1,228 @@ +# Copyright 2024. ThingsBoard +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from threading import Thread +from time import sleep, monotonic + +from pymodbus.constants import Defaults + +from thingsboard_gateway.connectors.modbus_async.bytes_modbus_downlink_converter import BytesModbusDownlinkConverter +from thingsboard_gateway.connectors.modbus_async.bytes_modbus_uplink_converter import BytesModbusUplinkConverter +from thingsboard_gateway.connectors.modbus_async.entities.bytes_uplink_converter_config import BytesUplinkConverterConfig +from thingsboard_gateway.gateway.constants import UPLINK_PREFIX, CONVERTER_PARAMETER, DOWNLINK_PREFIX +from thingsboard_gateway.gateway.statistics.statistics_service import StatisticsService +from thingsboard_gateway.tb_utility.tb_loader import TBModuleLoader + + +class Slave(Thread): + def __init__(self, connector, logger, config): + super().__init__() + self.daemon = True + self.stopped = False + self._log = logger + self.connector = connector + self.name = "Modbus slave processor for unit " + str(config['unitId']) + " on host " + str( + config['host']) + ":" + str(config['port']) + + self.callback = connector.callback + + self.unit_id = config['unitId'] + self.host = config.get('host') + self.port = config['port'] + self.type = config.get('type', 'tcp').lower() + self.method = config['method'] + self.tls = config.get('tls', {}) + self.timeout = config.get('timeout') + self.retry_on_empty = config.get('retryOnEmpty', False) + self.retry_on_invalid = config.get('retryOnInvalid', False) + self.retries = config.get('retries', 3) + self.baudrate = config.get('baudrate', 19200) + self.stopbits = config.get('stopbits', Defaults.Stopbits) + self.bytesize = config.get('bytesize', Defaults.Bytesize) + self.parity = config.get('parity', Defaults.Parity) + self.strict = config.get('strict', Defaults.Strict) + self.repack = config.get('repack', False) + self.word_order = config.get('wordOrder', 'LITTLE').upper() + self.byte_order = config.get('byteOrder', 'LITTLE').upper() + + self.attributes_updates_config = config.get('attributeUpdates', []) + self.rpc_requests_config = config.get('rpc', []) + + self.connect_attempt_time_ms = config.get('connectAttemptTimeMs', 500) \ + if config.get('connectAttemptTimeMs', 500) >= 500 else 500 + self.wait_after_failed_attempts_ms = config.get('waitAfterFailedAttemptsMs', 300000) \ + if config.get('waitAfterFailedAttemptsMs', 300000) >= 300000 else 300000 + self.connection_attempt = config.get('connectionAttempt', 5) if config.get('connectionAttempt', 5) >= 5 else 5 + + self.device_name = config['deviceName'] + self.device_type = config.get('deviceType', 'default') + + self.poll_period = config['pollPeriod'] / 1000 + + self.last_connect_time = 0 + self.last_polled_time = 0 + self.last_connection_attempt_time = 0 + self.connection_attempt_count = 0 + + self.downlink_converter = self.__load_downlink_converter(config) + + self.uplink_converter_config = BytesUplinkConverterConfig(**config) + self.uplink_converter = self.__load_uplink_converter(config) + + self.__master = None + self.available_functions = None + + self.start() + + def __timer(self): + self.__send_callback() + + while not self.stopped and not self.connector.is_stopped(): + if monotonic() - self.last_polled_time >= self.poll_period: + self.__send_callback() + + sleep(.001) + + def __send_callback(self): + self.last_polled_time = monotonic() + + try: + self.callback(self) + except Exception as e: + self._log.exception('Error sending slave callback: %s', e) + + def run(self): + self.__timer() + + def close(self): + self.stopped = True + + def get_name(self): + return self.device_name + + def __load_downlink_converter(self, config): + try: + if config.get(DOWNLINK_PREFIX + CONVERTER_PARAMETER) is not None: + converter = TBModuleLoader.import_module(self.connector.connector_type, + config[DOWNLINK_PREFIX + CONVERTER_PARAMETER])({}, self._log) + else: + converter = BytesModbusDownlinkConverter({}, self._log) + + return converter + except Exception as e: + self._log.exception('Failed to load downlink converter for % slave: %s', self.name, e) + + def __load_uplink_converter(self, config): + try: + if config.get(UPLINK_PREFIX + CONVERTER_PARAMETER) is not None: + converter = TBModuleLoader.import_module(self.connector.connector_type, + config[UPLINK_PREFIX + CONVERTER_PARAMETER])( + self.uplink_converter_config, self._log) + else: + converter = BytesModbusUplinkConverter(self.uplink_converter_config, self._log) + + return converter + except Exception as e: + self._log.exception('Failed to load uplink converter for % slave: %s', self.name, e) + + async def connect(self) -> bool: + cur_time = monotonic() * 1000 + if not self.master.connected(): + if (self.connection_attempt_count >= self.connection_attempt + and cur_time - self.last_connection_attempt_time >= self.wait_after_failed_attempts_ms): + self.connection_attempt_count = 0 + + while not self.master.connected() \ + and self.connection_attempt_count < self.connection_attempt \ + and cur_time - self.last_connection_attempt_time >= self.connect_attempt_time_ms: + if self.stopped: + return False + self.connection_attempt_count += 1 + self.last_connection_attempt_time = cur_time + self._log.debug("Trying connect to %s", self) + await self.master.connect() + + if self.connection_attempt_count == self.connection_attempt: + self._log.warn("Maximum attempt count (%i) for device \"%s\" - encountered.", + self.connection_attempt, + self) + return False + + if self.connection_attempt_count >= 0 and self.master.connected(): + self.connection_attempt_count = 0 + self.last_connection_attempt_time = cur_time + return True + else: + return False + + @property + def master(self): + return self.__master + + @master.setter + def master(self, master): + self.__master = master + self.available_functions = self.__master.get_available_functions() + + async def read(self, function_code, address, objects_count): + self._log.debug('Read %s registers from address %s with function code %s', objects_count, address, + function_code) + + result = await self.__read(function_code, address, objects_count) + + StatisticsService.count_connector_message(self.name, stat_parameter_name='connectorMsgsReceived') + StatisticsService.count_connector_bytes(self.name, result, stat_parameter_name='connectorBytesReceived') + + return result + + async def __read(self, function_code, address, objects_count): + result = None + + try: + result = await self.available_functions[function_code](address=address, + count=objects_count, + unit_id=self.unit_id) + except KeyError: + self._log.error('Unknown Modbus function with code: %s', function_code) + + self._log.debug("Read with result: %s", str(result)) + return result + + async def write(self, function_code, address, value): + self._log.debug('Write %s value to address %s with function code %s', value, address, function_code) + result = await self.__write(function_code, address, value) + + StatisticsService.count_connector_message(self.name, stat_parameter_name='connectorMsgsReceived') + StatisticsService.count_connector_bytes(self.name, result, stat_parameter_name='connectorBytesReceived') + + return result + + async def __write(self, function_code, address, value): + result = None + + if function_code in (5, 6): + result = await self.available_functions[function_code](address=address, value=value, unit_id=self.unit_id) + elif function_code in (15, 16): + result = await self.available_functions[function_code](address=address, values=value, unit_id=self.unit_id) + else: + self._log.error("Unknown Modbus function with code: %s", function_code) + + self._log.debug("Write with result: %s", str(result)) + return result + + def is_connected_to_platform(self): + return self.last_connect_time != 0 and monotonic() - self.last_connect_time < 10 + + def __str__(self): + return f'{self.device_name}' diff --git a/thingsboard_gateway/extensions/modbus_async/__init__.py b/thingsboard_gateway/extensions/modbus_async/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/thingsboard_gateway/gateway/tb_gateway_service.py b/thingsboard_gateway/gateway/tb_gateway_service.py index b31b8ed30..4d1cd02b9 100644 --- a/thingsboard_gateway/gateway/tb_gateway_service.py +++ b/thingsboard_gateway/gateway/tb_gateway_service.py @@ -77,6 +77,7 @@ class TBGRPCServerManager: DEFAULT_CONNECTORS = { "mqtt": "MqttConnector", "modbus": "ModbusConnector", + "modbus_async": "AsyncModbusConnector", "opcua": "OpcUaConnector", "ble": "BLEConnector", "request": "RequestConnector",