Skip to content

Commit

Permalink
RLink fixes for registrations
Browse files Browse the repository at this point in the history
  • Loading branch information
DZabavchik committed Aug 23, 2023
1 parent 3882b2a commit b5300e6
Show file tree
Hide file tree
Showing 3 changed files with 78 additions and 27 deletions.
1 change: 1 addition & 0 deletions crossbar/node/node.py
Original file line number Diff line number Diff line change
Expand Up @@ -796,6 +796,7 @@ def _configure_native_worker_router(self, worker_logname, worker_id, worker):
)

# start rlinks for realms
# TODO: Defer rlink setups until controller starts all workers
dl = []
for realm in worker.get('realms', []):
realm_id = realm['id']
Expand Down
5 changes: 4 additions & 1 deletion crossbar/worker/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -333,7 +333,10 @@ def make_session():
from autobahn.twisted.websocket import WampWebSocketServerFactory
transport_factory = WampWebSocketServerFactory(make_session, 'ws://localhost')
transport_factory.protocol = WorkerServerProtocol
transport_factory.setProtocolOptions(failByDrop=False)

# we need to increase the opening handshake timeout,
# because when running multiple router workers controller may not be able to connect get to this worker in 5 seconds
transport_factory.setProtocolOptions(failByDrop=False, openHandshakeTimeout=45)

# create a protocol instance and wire up to stdio
#
Expand Down
99 changes: 73 additions & 26 deletions crossbar/worker/rlink.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ def __init__(self, config):

self._exclude_authid = None
self._exclude_authrole = None
self._exclude_uri = None

def onMessage(self, msg):
if msg._router_internal is not None:
Expand All @@ -56,13 +57,26 @@ def onMessage(self, msg):
msg.caller, msg.caller_authid, msg.caller_authrole = msg._router_internal
return super(BridgeSession, self).onMessage(msg)

def is_uri_excluded(self, uri):
if uri.startswith("local."):
return True

if self._exclude_uri is None:
return False

# TODO: look at excluded URIs (exact, prefix, wildcard)

return False


@inlineCallbacks
def _setup_event_forwarding(self, other):

self.log.debug(
"setup event forwarding between {me} and {other} (exclude_authid={exclude_authid}, exclude_authrole={exclude_authrole})",
"setup event forwarding between {me} and {other} (exclude_authid={exclude_authid}, exclude_authrole={exclude_authrole}, exclude_uri={exclude_uri})",
exclude_authid=self._exclude_authid,
exclude_authrole=self._exclude_authrole,
exclude_uri=self._exclude_uri,
me=self._session_id,
other=other)

Expand All @@ -79,11 +93,16 @@ def on_subscription_create(sub_session, sub_details, details=None):
:param details:
:return:
"""
if sub_details["uri"].startswith("wamp."):

uri = sub_details['uri']
if uri.startswith("wamp."):
return

sub_id = sub_details["id"]

if self.is_uri_excluded(uri):
return

if sub_id in self._subs and self._subs[sub_id]["sub"]:
# This will happen if, partway through the subscription process, the RLink disconnects
self.log.error('on_subscription_create: sub ID {sub_id} already in map {method}',
Expand All @@ -95,7 +114,7 @@ def on_subscription_create(sub_session, sub_details, details=None):
self._subs[sub_id] = sub_details
self._subs[sub_id]["sub"] = None

uri = sub_details['uri']

ERR_MSG = [None]

@inlineCallbacks
Expand Down Expand Up @@ -225,6 +244,7 @@ def forward_current_subs():
def on_remote_join(_session, _details):
yield forward_current_subs()

@inlineCallbacks
def on_remote_leave(_session, _details):
# The remote session has ended, clear subscription records.
# Clearing this dictionary helps avoid the case where
Expand Down Expand Up @@ -276,7 +296,13 @@ def on_registration_create(reg_session, reg_details, details=None):
:param details:
:return:
"""
if reg_details['uri'].startswith("wamp."):

uri = reg_details['uri']

if uri.startswith("wamp."):
return

if self.is_uri_excluded(uri):
return

reg_id = reg_details["id"]
Expand All @@ -288,11 +314,14 @@ def on_registration_create(reg_session, reg_details, details=None):
method=hltype(BridgeSession._setup_invocation_forwarding))
return

# When more than 1 rlinks are present, `reg_details` that was passed here is the same for observers.
# Any manipulation like `_regs[reg_id]['reg'] = None` or `self._regs[reg_id]['reg'] = reg`
# will change value in the same shared object referenced by multiple _regs[reg_id] in multiple
reg_details_local = copy.deepcopy(reg_details)
if reg_id not in self._regs:
self._regs[reg_id] = reg_details
self._regs[reg_id]['reg'] = None
reg_details_local['reg'] = None
self._regs[reg_id] = reg_details_local

uri = reg_details['uri']
ERR_MSG = [None]

@inlineCallbacks
Expand Down Expand Up @@ -355,12 +384,19 @@ def on_call(*args, **kwargs):
)
return result

reg = None
try:
registration_forward_details = {
'session': self.session_id,
'authid': self.authid,
'authrole': self.authrole,
}
reg = yield other.register(on_call,
uri,
options=RegisterOptions(
details_arg='details',
invoke=reg_details.get('invoke', None),
forward_for=[registration_forward_details]
))
except TransportLost:
self.log.debug(
Expand All @@ -371,8 +407,8 @@ def on_call(*args, **kwargs):
# however we need to make sure this situation never happens.
if isinstance(e, ApplicationError) and e.error == 'wamp.error.procedure_already_exists':
other_leg = 'local' if self.IS_REMOTE_LEG else 'remote'
self.log.debug(f"on_registration_create: tried to register procedure {uri} on {other_leg} "
f"session but it's already registered.")
self.log.debug("on_registration_create: tried to register procedure {uri} on {other_leg} session but it's already registered.",
uri=uri, other_leg=other_leg)
return
raise Exception("fatal: could not forward-register '{}'".format(uri))

Expand All @@ -384,16 +420,16 @@ def on_call(*args, **kwargs):
self.log.info("registration already gone: {uri}", uri=reg_details['uri'])
yield reg.unregister()
else:
self._regs[reg_id]['reg'] = reg
reg_details_local['reg'] = reg

self.log.debug(
"created forwarding registration: me={me} other={other} reg_id={reg_id} reg_details={reg_details} details={details} reg_session={reg_session}",
me=self._session_id,
other=other._session_id,
reg_id=reg_id,
reg_details=reg_details,
reg_details=reg_details_local,
details=details,
reg_session=reg_session,
reg_session=reg_session
)

# called when a registration is removed from the local router
Expand Down Expand Up @@ -424,7 +460,7 @@ def on_registration_delete(session_id, reg_id, details=None):

del self._regs[reg_id]

self.log.debug("{other} unsubscribed from {uri}".format(other=other, uri=uri))
self.log.debug("{other} deleted registration {uri}".format(other=other, uri=uri))

@inlineCallbacks
def register_current():
Expand All @@ -437,6 +473,18 @@ def register_current():

@inlineCallbacks
def on_remote_join(_session, _details):
# listen to when new registrations are created on the local router
yield self.subscribe(on_registration_create,
"wamp.registration.on_create",
options=SubscribeOptions(details_arg="details"))

# listen to when a registration is removed from the local router
yield self.subscribe(on_registration_delete,
"wamp.registration.on_delete",
options=SubscribeOptions(details_arg="details"))

self.log.info("{me}: call forwarding setup done", me=self._session_id)

yield register_current()

def on_remote_leave(_session, _details):
Expand All @@ -460,18 +508,6 @@ def on_remote_leave(_session, _details):
other.on('join', on_remote_join)
other.on('leave', on_remote_leave)

# listen to when new registrations are created on the local router
yield self.subscribe(on_registration_create,
"wamp.registration.on_create",
options=SubscribeOptions(details_arg="details"))

# listen to when a registration is removed from the local router
yield self.subscribe(on_registration_delete,
"wamp.registration.on_delete",
options=SubscribeOptions(details_arg="details"))

self.log.info("{me}: call forwarding setup done", me=self._session_id)


class RLinkLocalSession(BridgeSession):
"""
Expand Down Expand Up @@ -502,6 +538,7 @@ def onJoin(self, details):

self._exclude_authid = self.config.extra.get('exclude_authid', None)
self._exclude_authrole = self.config.extra.get('exclude_authrole', None)
self._exclude_uri = self.config.extra.get('exclude_uri', None)

# setup local->remote event forwarding
forward_events = self.config.extra.get('forward_events', False)
Expand Down Expand Up @@ -659,6 +696,7 @@ def onJoin(self, details):

self._exclude_authid = self.config.extra.get('exclude_authid', None)
self._exclude_authrole = self.config.extra.get('exclude_authrole', None)
self._exclude_uri = self.config.extra.get('exclude_uri', None)

# setup remote->local event forwarding
forward_events = self.config.extra.get('forward_events', False)
Expand Down Expand Up @@ -757,7 +795,7 @@ def marshal(self):


class RLinkConfig(object):
def __init__(self, realm, transport, authid, exclude_authid, forward_local_events, forward_remote_events,
def __init__(self, realm, transport, authid, exclude_authid, exclude_authrole, forward_local_events, forward_remote_events,
forward_local_invocations, forward_remote_invocations):
"""
Expand All @@ -771,6 +809,8 @@ def __init__(self, realm, transport, authid, exclude_authid, forward_local_event
self.transport = transport
self.authid = authid
self.exclude_authid = exclude_authid
# Components are assigned UUID for authid, we need to rely on authroles
self.exclude_authrole = exclude_authrole
self.forward_local_events = forward_local_events
self.forward_remote_events = forward_remote_events
self.forward_local_invocations = forward_local_invocations
Expand All @@ -785,6 +825,7 @@ def marshal(self):
'transport': self.transport,
'authid': self.authid,
'exclude_authid': self.exclude_authid,
'exclude_authrole': self.exclude_authrole,
'forward_local_events': self.forward_local_events,
'forward_remote_events': self.forward_remote_events,
'forward_local_invocations': self.forward_local_invocations,
Expand Down Expand Up @@ -818,6 +859,7 @@ def parse(personality, obj, id=None):
'transport': (True, [Mapping]),
'authid': (False, [str]),
'exclude_authid': (False, [Sequence]),
'exclude_authrole': (False, [Sequence]),
'forward_local_events': (False, [bool]),
'forward_remote_events': (False, [bool]),
'forward_local_invocations': (False, [bool]),
Expand All @@ -829,6 +871,9 @@ def parse(personality, obj, id=None):
exclude_authid = obj.get('exclude_authid', [])
for aid in exclude_authid:
assert type(aid) == str
exclude_authrole = obj.get('exclude_authrole', [])
for rid in exclude_authrole:
assert type(rid) == str
forward_local_events = obj.get('forward_local_events', True)
forward_remote_events = obj.get('forward_remote_events', True)
forward_local_invocations = obj.get('forward_local_invocations', True)
Expand All @@ -843,6 +888,7 @@ def parse(personality, obj, id=None):
transport=transport,
authid=authid,
exclude_authid=exclude_authid,
exclude_authrole=exclude_authrole,
forward_local_events=forward_local_events,
forward_remote_events=forward_remote_events,
forward_local_invocations=forward_local_invocations,
Expand Down Expand Up @@ -936,6 +982,7 @@ def start_link(self, link_id, link_config, caller):
'on_ready': Deferred(),
'authid': link_config.authid,
'exclude_authid': link_config.exclude_authid,
'exclude_authrole': link_config.exclude_authrole,
'forward_events': link_config.forward_remote_events,
'forward_invocations': link_config.forward_remote_invocations,
}
Expand Down

0 comments on commit b5300e6

Please sign in to comment.