Skip to content

Commit

Permalink
Extract user/address mapping and presence handling from Matrix transport
Browse files Browse the repository at this point in the history
- Extract the user/address mapping and presence handling into a separate
  utility class (`UserAddressManager`)
- This allows the services to use this functionality as well (Fixes raiden-network#3720)
- Complete unit test coverage of the new `UserAddressManager`

Refs: raiden-network#3124, raiden-network#3252
  • Loading branch information
ulope committed Apr 18, 2019
1 parent 5b4375d commit 7a44293
Show file tree
Hide file tree
Showing 5 changed files with 538 additions and 138 deletions.
8 changes: 3 additions & 5 deletions raiden/network/transport/matrix/__init__.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,7 @@
from raiden.network.transport.matrix.transport import ( # noqa
MatrixTransport,
UserPresence,
_RetryQueue,
)
from raiden.network.transport.matrix.transport import MatrixTransport, _RetryQueue # noqa
from raiden.network.transport.matrix.utils import ( # noqa
AddressReachability,
UserPresence,
join_global_room,
login_or_register,
make_client,
Expand Down
157 changes: 45 additions & 112 deletions raiden/network/transport/matrix/transport.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,12 @@
import time
from binascii import Error as DecodeError
from collections import defaultdict
from enum import Enum
from urllib.parse import urlparse

import gevent
import structlog
from eth_utils import decode_hex, is_binary_address, to_checksum_address, to_normalized_address
from gevent.event import Event
from gevent.lock import Semaphore
from gevent.queue import JoinableQueue
from matrix_client.errors import MatrixRequestError
Expand Down Expand Up @@ -36,6 +36,9 @@
from raiden.network.transport.matrix.client import GMatrixClient, Room, User
from raiden.network.transport.matrix.utils import (
JOIN_RETRIES,
AddressReachability,
UserAddressManager,
UserPresence,
join_global_room,
login_or_register,
make_client,
Expand Down Expand Up @@ -83,16 +86,6 @@
_RoomID = NewType('_RoomID', str)


class UserPresence(Enum):
ONLINE = 'online'
UNAVAILABLE = 'unavailable'
OFFLINE = 'offline'
UNKNOWN = 'unknown'


_PRESENCE_REACHABLE_STATES = {UserPresence.ONLINE, UserPresence.UNAVAILABLE}


class _RetryQueue(Runnable):
""" A helper Runnable to send batched messages to receiver through transport """

Expand Down Expand Up @@ -199,8 +192,8 @@ def _check_and_send(self):
self.transport._global_send_queue.join()

self.log.debug('Retrying message', receiver=to_normalized_address(self.receiver))
status = self.transport._address_to_presence.get(self.receiver)
if status not in _PRESENCE_REACHABLE_STATES:
status = self.transport._address_mgr.get_address_reachability(self.receiver)
if status is not AddressReachability.REACHABLE:
# if partner is not reachable, return
self.log.debug(
'Partner not reachable. Skipping.',
Expand Down Expand Up @@ -319,24 +312,26 @@ def _http_retry_delay() -> Iterable[float]:

self.greenlets: List[gevent.Greenlet] = list()

# partner need to be in this dict to be listened on
self._address_to_userids: Dict[Address, Set[str]] = defaultdict(set)
self._address_to_presence: Dict[Address, UserPresence] = dict()
self._userid_to_presence: Dict[str, UserPresence] = dict()
self._address_to_retrier: Dict[Address, _RetryQueue] = dict()

self._global_rooms: Dict[str, Optional[Room]] = dict()
self._global_send_queue: JoinableQueue[Tuple[str, Message]] = JoinableQueue()

self._stop_event = gevent.event.Event()
self._stop_event = Event()
self._stop_event.set()

self._global_send_event = gevent.event.Event()

self._global_send_event = Event()
self._prioritize_global_messages = True

self._address_mgr: UserAddressManager = UserAddressManager(
client=self._client,
get_user_callable=self._get_user,
address_reachability_changed_callback=self._address_reachability_changed,
user_presence_changed_callback=self._user_presence_changed,
stop_event=self._stop_event,
)

self._client.add_invite_listener(self._handle_invite)
self._client.add_presence_listener(self._handle_presence_change)

self._health_lock = Semaphore()
self._getroom_lock = Semaphore()
Expand Down Expand Up @@ -485,7 +480,7 @@ def whitelist(self, address: Address):
start are handled properly.
"""
self.log.debug('Whitelist', address=to_normalized_address(address))
self._address_to_userids.setdefault(address, set())
self._address_mgr.add_address(address)

def start_health_check(self, node_address):
"""Start healthcheck (status monitoring) for a peer
Expand All @@ -496,7 +491,7 @@ def start_health_check(self, node_address):
return

with self._health_lock:
if node_address in self._address_to_userids:
if self._address_mgr.is_address_known(node_address):
return # already healthchecked

node_address_hex = to_normalized_address(node_address)
Expand All @@ -512,11 +507,11 @@ def start_health_check(self, node_address):
if validate_userid_signature(user) == node_address
}
self.whitelist(node_address)
self._address_to_userids[node_address].update(user_ids)
self._address_mgr.add_userids_for_address(node_address, user_ids)

# Ensure network state is updated in case we already know about the user presences
# representing the target node
self._update_address_presence(node_address)
self._address_mgr.refresh_address_presence(node_address)

def send_async(
self,
Expand Down Expand Up @@ -688,7 +683,7 @@ def _handle_invite(self, room_id: _RoomID, state: dict):
)
return

if peer_address not in self._address_to_userids:
if not self._address_mgr.is_address_known(peer_address):
self.log.debug(
'Got invited by a non-whitelisted user - ignoring',
room_id=room_id,
Expand All @@ -711,6 +706,7 @@ def _handle_invite(self, room_id: _RoomID, state: dict):
# we join room and _set_room_id_for_address despite room privacy and requirements,
# _get_room_ids_for_address will take care of returning only matching rooms and
# _leave_unused_rooms will clear it in the future, if and when needed
room: Room = None
last_ex: Optional[Exception] = None
retry_interval = 0.1
for _ in range(JOIN_RETRIES):
Expand Down Expand Up @@ -769,7 +765,7 @@ def _handle_message(self, room, event) -> bool:
return False

# don't proceed if user isn't whitelisted (yet)
if peer_address not in self._address_to_userids:
if not self._address_mgr.is_address_known(peer_address):
# user not start_health_check'ed
self.log.debug(
'Message from non-whitelisted peer - ignoring',
Expand Down Expand Up @@ -811,13 +807,13 @@ def _handle_message(self, room, event) -> bool:
)
self._set_room_id_for_address(peer_address, room.room_id)

is_peer_reachable = (
self._userid_to_presence.get(sender_id) in _PRESENCE_REACHABLE_STATES and
self._address_to_presence.get(peer_address) in _PRESENCE_REACHABLE_STATES
is_peer_reachable = self._address_mgr.get_address_reachability(peer_address) is (
AddressReachability.REACHABLE
)
if not is_peer_reachable:
self.log.debug('Forcing presence update', peer_address=peer_address, user_id=sender_id)
self._update_address_presence(peer_address)
self._address_mgr.force_user_presence(user, UserPresence.ONLINE)
self._address_mgr.refresh_address_presence(peer_address)

data = event['content']['body']
if not isinstance(data, str):
Expand Down Expand Up @@ -997,7 +993,7 @@ def _get_room_for_address(
if self._stop_event.ready():
return None
address_hex = to_normalized_address(address)
assert address and address in self._address_to_userids,\
assert address and self._address_mgr.is_address_known(address),\
f'address not health checked: me: {self._user_id}, peer: {address_hex}'

# filter_private is done in _get_room_ids_for_address
Expand Down Expand Up @@ -1033,7 +1029,7 @@ def _get_room_for_address(
else:
room = self._get_public_room(room_name, invitees=peers)

peer_ids = self._address_to_userids[address]
peer_ids = self._address_mgr.get_userids_for_address(address)
member_ids = {member.user_id for member in room.get_joined_members(force_resync=True)}
room_is_empty = not bool(peer_ids & member_ids)
if room_is_empty:
Expand Down Expand Up @@ -1066,7 +1062,7 @@ def _get_room_for_address(
peer_address=address_hex,
)

self._address_to_userids[address].update({user.user_id for user in peers})
self._address_mgr.add_userids_for_address(address, {user.user_id for user in peers})
self._set_room_id_for_address(address, room.room_id)

if not room.listeners:
Expand Down Expand Up @@ -1152,99 +1148,36 @@ def _get_public_room(self, room_name, invitees: List[User]):
invitees=invitees_uids,
is_public=True,
)
log.warning(
self.log.warning(
'Could not create nor join a named room. Successfuly created an unnamed one',
room=room,
invitees=invitees,
)

return room

def _handle_presence_change(self, event):
"""
Update node network reachability from presence events.
Due to the possibility of nodes using accounts on multiple homeservers a composite
address state is synthesised from the cached individual user presence state.
"""
if self._stop_event.ready():
return
user_id = event['sender']
if event['type'] != 'm.presence' or user_id == self._user_id:
return

user = self._get_user(user_id)
user.displayname = event['content'].get('displayname') or user.displayname
address = validate_userid_signature(user)
if not address:
# Malformed address - skip
return

# not a user we've whitelisted, skip
if address not in self._address_to_userids:
return
self._address_to_userids[address].add(user_id)

new_state = UserPresence(event['content']['presence'])
if new_state == self._userid_to_presence.get(user_id):
return

self._userid_to_presence[user_id] = new_state
self._update_address_presence(address)
def _user_presence_changed(self, user: User, _presence: UserPresence):
# maybe inviting user used to also possibly invite user's from presence changes
assert self._raiden_service is not None # make mypy happy
greenlet = self._spawn(self._maybe_invite_user, user)
greenlet.name = f'invite node:{pex(self._raiden_service.address)} user_id:{user_id}'

def _get_user_presence(self, user_id: str) -> UserPresence:
if user_id not in self._userid_to_presence:
try:
presence = UserPresence(
self._client.get_user_presence(user_id),
)
except MatrixRequestError:
presence = UserPresence.UNKNOWN
self._userid_to_presence[user_id] = presence
return self._userid_to_presence[user_id]

def _update_address_presence(self, address):
""" Update synthesized address presence state from user presence state """
composite_presence = {
self._get_user_presence(uid)
for uid
in self._address_to_userids.get(address, set())
}

# Iterate over UserPresence in definition order and pick first matching state
new_state = UserPresence.UNKNOWN
for presence in UserPresence.__members__.values():
if presence in composite_presence:
new_state = presence
break

if new_state == self._address_to_presence.get(address):
return
self.log.debug(
'Changing address presence state',
address=to_normalized_address(address),
prev_state=self._address_to_presence.get(address),
state=new_state,
)
self._address_to_presence[address] = new_state
greenlet.name = f'invite node:{pex(self._raiden_service.address)} user:{user}'

# The Matrix presence status 'unavailable' just means that the user has been inactive
# for a while. So a user with UserPresence.UNAVAILABLE is still 'reachable' to us.
if new_state in _PRESENCE_REACHABLE_STATES:
reachability = NODE_NETWORK_REACHABLE
def _address_reachability_changed(self, address: Address, reachability: AddressReachability):
if reachability is AddressReachability.REACHABLE:
node_reachability = NODE_NETWORK_REACHABLE
# _QueueRetry.notify when partner comes online
retrier = self._address_to_retrier.get(address)
if retrier:
retrier.notify()
elif new_state is UserPresence.UNKNOWN:
reachability = NODE_NETWORK_UNKNOWN
elif reachability is AddressReachability.UNKNOWN:
node_reachability = NODE_NETWORK_UNKNOWN
elif reachability is AddressReachability.UNREACHABLE:
node_reachability = NODE_NETWORK_UNREACHABLE
else:
reachability = NODE_NETWORK_UNREACHABLE
raise TypeError(f'Unexpected reachability state "{reachability}".')

state_change = ActionChangeNodeNetworkState(address, reachability)
assert self._raiden_service is not None # make mypy happy
state_change = ActionChangeNodeNetworkState(address, node_reachability)
self._raiden_service.handle_and_track_state_change(state_change)

def _maybe_invite_user(self, user: User):
Expand Down Expand Up @@ -1389,7 +1322,7 @@ def _leave_unused_rooms(self, _address_to_room_ids: Dict[AddressHex, List[_RoomI
# cache in a set all whitelisted addresses
whitelisted_hex_addresses: Set[AddressHex] = {
to_checksum_address(address)
for address in self._address_to_userids
for address in self._user_address_mgr.known_addresses
}

keep_rooms: Set[_RoomID] = set()
Expand Down
Loading

0 comments on commit 7a44293

Please sign in to comment.