Skip to content

Commit

Permalink
Merge pull request #1034 from samson0v/master
Browse files Browse the repository at this point in the history
Fixed on_attributes_update for OPC-UA AsyncIO Connector
  • Loading branch information
imbeacon authored Dec 28, 2022
2 parents 69787bf + 73e5bb5 commit 6d9650a
Showing 1 changed file with 28 additions and 7 deletions.
35 changes: 28 additions & 7 deletions thingsboard_gateway/connectors/opcua_asyncio/opcua_connector.py
Original file line number Diff line number Diff line change
Expand Up @@ -359,7 +359,8 @@ async def __poll_nodes(self):
node['valid'] = True
except ConnectionError:
raise
except (BadNodeIdUnknown, BadConnectionClosed, BadInvalidState, BadAttributeIdInvalid, BadCommunicationError, BadOutOfService):
except (BadNodeIdUnknown, BadConnectionClosed, BadInvalidState, BadAttributeIdInvalid,
BadCommunicationError, BadOutOfService):
if node.get('valid', True):
self.__log.warning('Node not found (2); device: %s, key: %s, path: %s', device.name, node['key'], node['path'])
await self.__reset_node(node)
Expand Down Expand Up @@ -390,6 +391,14 @@ def __send_data(self):
else:
sleep(.2)

async def get_shared_attr_node_id(self, path, result={}):
try:
q_path = await self.find_node_name_space_index(path)
var = await self.__client.nodes.root.get_child(q_path[0])
result['result'] = var
except Exception as e:
result['error'] = e.__str__()

def on_attributes_update(self, content):
self.__log.debug(content)
try:
Expand All @@ -398,12 +407,21 @@ def on_attributes_update(self, content):
for (key, value) in content['data'].items():
for attr_update in device.config['attributes_updates']:
if attr_update['attributeOnThingsBoard'] == key:
for section in ('attributes', 'timeseries'):
for node in device.values[section]:
if re.fullmatch(attr_update['attributeOnDevice'],
'.'.join(device.path) + f'.{node["path"]}'):
self.__loop.create_task(self.__write_value(node['id'], value))
result = {}
task = self.__loop.create_task(
self.get_shared_attr_node_id(
device.path + attr_update['attributeOnDevice'].replace('\\', '').split('.'), result))

while not task.done():
sleep(.1)

if result.get('error'):
self.__log.error('Node not found! (%s)', result['error'])
return

node_id = result['result']
self.__loop.create_task(self.__write_value(node_id, value))
return
except Exception as e:
self.__log.exception(e)

Expand Down Expand Up @@ -481,7 +499,10 @@ def server_side_rpc_handler(self, content):

async def __write_value(self, path, value, result={}):
try:
var = self.__client.get_node(path.replace('\\.', '.'))
var = path
if isinstance(path, str):
var = self.__client.get_node(path.replace('\\.', '.'))

await var.write_value(value)
except Exception as e:
result['error'] = e.__str__()
Expand Down

0 comments on commit 6d9650a

Please sign in to comment.