diff --git a/raiden/network/transport/matrix/transport.py b/raiden/network/transport/matrix/transport.py index 4671b95dc8..12d0051b51 100644 --- a/raiden/network/transport/matrix/transport.py +++ b/raiden/network/transport/matrix/transport.py @@ -507,6 +507,33 @@ def send_async( self._send_with_retry(queue_identifier, message) + def send_global(self, room: str, message: Message) -> None: + """Sends a message to one of the global rooms + + These rooms aren't being listened on and therefore no reply could be heard, so these + messages are sent in a send-and-forget async way. + The actual room name is composed from the suffix given as parameter and chain name or id + e.g.: raiden_ropsten_discovery + Params: + room: name suffix as passed in config['global_rooms'] list + message: Message instance to be serialized and sent + """ + room_name = self._make_room_alias(room) + assert self._global_rooms.get(room_name), f'Unknown global room: {room_name!r}' + + def _send_global(): + text = JSONSerializer.serialize(message) + room = self._global_rooms[room_name] + self.log.debug( + 'Send global', + room_name=room_name, + room=room, + data=text.replace('\n', '\\n'), + ) + room.send_text(text) + + self._spawn(_send_global) + @property def _queueids_to_queues(self) -> QueueIdsToQueues: chain_state = views.state_from_raiden(self._raiden_service) diff --git a/raiden/network/transport/udp/udp_transport.py b/raiden/network/transport/udp/udp_transport.py index de63b19113..c23a1843ef 100644 --- a/raiden/network/transport/udp/udp_transport.py +++ b/raiden/network/transport/udp/udp_transport.py @@ -436,6 +436,14 @@ def send_async( message=message, ) + def send_global( + self, + room: str, + message: Message, + ) -> None: + """ This method exists only for interface compatibility with MatrixTransport """ + self.log.warning('UDP is unable to send global messages. Ignoring') + def maybe_send(self, recipient: typing.Address, message: Message): """ Send message to recipient if the transport is running. """ diff --git a/raiden/tests/integration/test_matrix_transport.py b/raiden/tests/integration/test_matrix_transport.py index 676c3bd4be..15f24e01bb 100644 --- a/raiden/tests/integration/test_matrix_transport.py +++ b/raiden/tests/integration/test_matrix_transport.py @@ -526,3 +526,50 @@ def test_matrix_discovery_room_offline_server( transport.stop() transport.get() + + +def test_matrix_send_global( + local_matrix_servers, + retries_before_backoff, + retry_interval, + private_rooms, +): + transport = MatrixTransport({ + 'global_rooms': ['discovery', 'monitoring'], + 'retries_before_backoff': retries_before_backoff, + 'retry_interval': retry_interval, + 'server': local_matrix_servers[0], + 'server_name': local_matrix_servers[0].netloc, + 'available_servers': [local_matrix_servers[0]], + 'private_rooms': private_rooms, + }) + transport.start(MockRaidenService(None), MessageHandler(set()), '') + gevent.idle() + + ms_room_name = transport._make_room_alias('monitoring') + ms_room = transport._global_rooms.get(ms_room_name) + assert isinstance(ms_room, Room) + + ms_room.send_text = MagicMock(spec=ms_room.send_text) + + for i in range(5): + message = Processed(i) + transport._raiden_service.sign(message) + transport.send_global( + 'monitoring', + message, + ) + + gevent.idle() + + assert ms_room.send_text.call_count == 5 + + # unknown room suffix is an error + with pytest.raises(AssertionError): + transport.send_global( + 'unknown_suffix', + Processed(10), + ) + + transport.stop() + transport.get()