From b5300e6216878ac239e9c16b1a07ac0ed6d8be77 Mon Sep 17 00:00:00 2001 From: Denis Zabavchik Date: Wed, 23 Aug 2023 12:06:57 -0400 Subject: [PATCH] RLink fixes for registrations --- crossbar/node/node.py | 1 + crossbar/worker/main.py | 5 +- crossbar/worker/rlink.py | 99 +++++++++++++++++++++++++++++----------- 3 files changed, 78 insertions(+), 27 deletions(-) diff --git a/crossbar/node/node.py b/crossbar/node/node.py index 384d4ebde..111b37d44 100644 --- a/crossbar/node/node.py +++ b/crossbar/node/node.py @@ -796,6 +796,7 @@ def _configure_native_worker_router(self, worker_logname, worker_id, worker): ) # start rlinks for realms + # TODO: Defer rlink setups until controller starts all workers dl = [] for realm in worker.get('realms', []): realm_id = realm['id'] diff --git a/crossbar/worker/main.py b/crossbar/worker/main.py index 83427b37b..5a33a5ff6 100644 --- a/crossbar/worker/main.py +++ b/crossbar/worker/main.py @@ -333,7 +333,10 @@ def make_session(): from autobahn.twisted.websocket import WampWebSocketServerFactory transport_factory = WampWebSocketServerFactory(make_session, 'ws://localhost') transport_factory.protocol = WorkerServerProtocol - transport_factory.setProtocolOptions(failByDrop=False) + + # we need to increase the opening handshake timeout, + # because when running multiple router workers controller may not be able to connect get to this worker in 5 seconds + transport_factory.setProtocolOptions(failByDrop=False, openHandshakeTimeout=45) # create a protocol instance and wire up to stdio # diff --git a/crossbar/worker/rlink.py b/crossbar/worker/rlink.py index a83849dee..206570d52 100644 --- a/crossbar/worker/rlink.py +++ b/crossbar/worker/rlink.py @@ -47,6 +47,7 @@ def __init__(self, config): self._exclude_authid = None self._exclude_authrole = None + self._exclude_uri = None def onMessage(self, msg): if msg._router_internal is not None: @@ -56,13 +57,26 @@ def onMessage(self, msg): msg.caller, msg.caller_authid, msg.caller_authrole = msg._router_internal return super(BridgeSession, self).onMessage(msg) + def is_uri_excluded(self, uri): + if uri.startswith("local."): + return True + + if self._exclude_uri is None: + return False + + # TODO: look at excluded URIs (exact, prefix, wildcard) + + return False + + @inlineCallbacks def _setup_event_forwarding(self, other): self.log.debug( - "setup event forwarding between {me} and {other} (exclude_authid={exclude_authid}, exclude_authrole={exclude_authrole})", + "setup event forwarding between {me} and {other} (exclude_authid={exclude_authid}, exclude_authrole={exclude_authrole}, exclude_uri={exclude_uri})", exclude_authid=self._exclude_authid, exclude_authrole=self._exclude_authrole, + exclude_uri=self._exclude_uri, me=self._session_id, other=other) @@ -79,11 +93,16 @@ def on_subscription_create(sub_session, sub_details, details=None): :param details: :return: """ - if sub_details["uri"].startswith("wamp."): + + uri = sub_details['uri'] + if uri.startswith("wamp."): return sub_id = sub_details["id"] + if self.is_uri_excluded(uri): + return + if sub_id in self._subs and self._subs[sub_id]["sub"]: # This will happen if, partway through the subscription process, the RLink disconnects self.log.error('on_subscription_create: sub ID {sub_id} already in map {method}', @@ -95,7 +114,7 @@ def on_subscription_create(sub_session, sub_details, details=None): self._subs[sub_id] = sub_details self._subs[sub_id]["sub"] = None - uri = sub_details['uri'] + ERR_MSG = [None] @inlineCallbacks @@ -225,6 +244,7 @@ def forward_current_subs(): def on_remote_join(_session, _details): yield forward_current_subs() + @inlineCallbacks def on_remote_leave(_session, _details): # The remote session has ended, clear subscription records. # Clearing this dictionary helps avoid the case where @@ -276,7 +296,13 @@ def on_registration_create(reg_session, reg_details, details=None): :param details: :return: """ - if reg_details['uri'].startswith("wamp."): + + uri = reg_details['uri'] + + if uri.startswith("wamp."): + return + + if self.is_uri_excluded(uri): return reg_id = reg_details["id"] @@ -288,11 +314,14 @@ def on_registration_create(reg_session, reg_details, details=None): method=hltype(BridgeSession._setup_invocation_forwarding)) return + # When more than 1 rlinks are present, `reg_details` that was passed here is the same for observers. + # Any manipulation like `_regs[reg_id]['reg'] = None` or `self._regs[reg_id]['reg'] = reg` + # will change value in the same shared object referenced by multiple _regs[reg_id] in multiple + reg_details_local = copy.deepcopy(reg_details) if reg_id not in self._regs: - self._regs[reg_id] = reg_details - self._regs[reg_id]['reg'] = None + reg_details_local['reg'] = None + self._regs[reg_id] = reg_details_local - uri = reg_details['uri'] ERR_MSG = [None] @inlineCallbacks @@ -355,12 +384,19 @@ def on_call(*args, **kwargs): ) return result + reg = None try: + registration_forward_details = { + 'session': self.session_id, + 'authid': self.authid, + 'authrole': self.authrole, + } reg = yield other.register(on_call, uri, options=RegisterOptions( details_arg='details', invoke=reg_details.get('invoke', None), + forward_for=[registration_forward_details] )) except TransportLost: self.log.debug( @@ -371,8 +407,8 @@ def on_call(*args, **kwargs): # however we need to make sure this situation never happens. if isinstance(e, ApplicationError) and e.error == 'wamp.error.procedure_already_exists': other_leg = 'local' if self.IS_REMOTE_LEG else 'remote' - self.log.debug(f"on_registration_create: tried to register procedure {uri} on {other_leg} " - f"session but it's already registered.") + self.log.debug("on_registration_create: tried to register procedure {uri} on {other_leg} session but it's already registered.", + uri=uri, other_leg=other_leg) return raise Exception("fatal: could not forward-register '{}'".format(uri)) @@ -384,16 +420,16 @@ def on_call(*args, **kwargs): self.log.info("registration already gone: {uri}", uri=reg_details['uri']) yield reg.unregister() else: - self._regs[reg_id]['reg'] = reg + reg_details_local['reg'] = reg self.log.debug( "created forwarding registration: me={me} other={other} reg_id={reg_id} reg_details={reg_details} details={details} reg_session={reg_session}", me=self._session_id, other=other._session_id, reg_id=reg_id, - reg_details=reg_details, + reg_details=reg_details_local, details=details, - reg_session=reg_session, + reg_session=reg_session ) # called when a registration is removed from the local router @@ -424,7 +460,7 @@ def on_registration_delete(session_id, reg_id, details=None): del self._regs[reg_id] - self.log.debug("{other} unsubscribed from {uri}".format(other=other, uri=uri)) + self.log.debug("{other} deleted registration {uri}".format(other=other, uri=uri)) @inlineCallbacks def register_current(): @@ -437,6 +473,18 @@ def register_current(): @inlineCallbacks def on_remote_join(_session, _details): + # listen to when new registrations are created on the local router + yield self.subscribe(on_registration_create, + "wamp.registration.on_create", + options=SubscribeOptions(details_arg="details")) + + # listen to when a registration is removed from the local router + yield self.subscribe(on_registration_delete, + "wamp.registration.on_delete", + options=SubscribeOptions(details_arg="details")) + + self.log.info("{me}: call forwarding setup done", me=self._session_id) + yield register_current() def on_remote_leave(_session, _details): @@ -460,18 +508,6 @@ def on_remote_leave(_session, _details): other.on('join', on_remote_join) other.on('leave', on_remote_leave) - # listen to when new registrations are created on the local router - yield self.subscribe(on_registration_create, - "wamp.registration.on_create", - options=SubscribeOptions(details_arg="details")) - - # listen to when a registration is removed from the local router - yield self.subscribe(on_registration_delete, - "wamp.registration.on_delete", - options=SubscribeOptions(details_arg="details")) - - self.log.info("{me}: call forwarding setup done", me=self._session_id) - class RLinkLocalSession(BridgeSession): """ @@ -502,6 +538,7 @@ def onJoin(self, details): self._exclude_authid = self.config.extra.get('exclude_authid', None) self._exclude_authrole = self.config.extra.get('exclude_authrole', None) + self._exclude_uri = self.config.extra.get('exclude_uri', None) # setup local->remote event forwarding forward_events = self.config.extra.get('forward_events', False) @@ -659,6 +696,7 @@ def onJoin(self, details): self._exclude_authid = self.config.extra.get('exclude_authid', None) self._exclude_authrole = self.config.extra.get('exclude_authrole', None) + self._exclude_uri = self.config.extra.get('exclude_uri', None) # setup remote->local event forwarding forward_events = self.config.extra.get('forward_events', False) @@ -757,7 +795,7 @@ def marshal(self): class RLinkConfig(object): - def __init__(self, realm, transport, authid, exclude_authid, forward_local_events, forward_remote_events, + def __init__(self, realm, transport, authid, exclude_authid, exclude_authrole, forward_local_events, forward_remote_events, forward_local_invocations, forward_remote_invocations): """ @@ -771,6 +809,8 @@ def __init__(self, realm, transport, authid, exclude_authid, forward_local_event self.transport = transport self.authid = authid self.exclude_authid = exclude_authid + # Components are assigned UUID for authid, we need to rely on authroles + self.exclude_authrole = exclude_authrole self.forward_local_events = forward_local_events self.forward_remote_events = forward_remote_events self.forward_local_invocations = forward_local_invocations @@ -785,6 +825,7 @@ def marshal(self): 'transport': self.transport, 'authid': self.authid, 'exclude_authid': self.exclude_authid, + 'exclude_authrole': self.exclude_authrole, 'forward_local_events': self.forward_local_events, 'forward_remote_events': self.forward_remote_events, 'forward_local_invocations': self.forward_local_invocations, @@ -818,6 +859,7 @@ def parse(personality, obj, id=None): 'transport': (True, [Mapping]), 'authid': (False, [str]), 'exclude_authid': (False, [Sequence]), + 'exclude_authrole': (False, [Sequence]), 'forward_local_events': (False, [bool]), 'forward_remote_events': (False, [bool]), 'forward_local_invocations': (False, [bool]), @@ -829,6 +871,9 @@ def parse(personality, obj, id=None): exclude_authid = obj.get('exclude_authid', []) for aid in exclude_authid: assert type(aid) == str + exclude_authrole = obj.get('exclude_authrole', []) + for rid in exclude_authrole: + assert type(rid) == str forward_local_events = obj.get('forward_local_events', True) forward_remote_events = obj.get('forward_remote_events', True) forward_local_invocations = obj.get('forward_local_invocations', True) @@ -843,6 +888,7 @@ def parse(personality, obj, id=None): transport=transport, authid=authid, exclude_authid=exclude_authid, + exclude_authrole=exclude_authrole, forward_local_events=forward_local_events, forward_remote_events=forward_remote_events, forward_local_invocations=forward_local_invocations, @@ -936,6 +982,7 @@ def start_link(self, link_id, link_config, caller): 'on_ready': Deferred(), 'authid': link_config.authid, 'exclude_authid': link_config.exclude_authid, + 'exclude_authrole': link_config.exclude_authrole, 'forward_events': link_config.forward_remote_events, 'forward_invocations': link_config.forward_remote_invocations, }