Skip to content

Commit

Permalink
Merge pull request #174 from HSF/dev
Browse files Browse the repository at this point in the history
fix eventbus
  • Loading branch information
wguanicedew authored Jun 7, 2023
2 parents 8aecf02 + 37604c8 commit 01620bd
Show file tree
Hide file tree
Showing 4 changed files with 27 additions and 5 deletions.
13 changes: 10 additions & 3 deletions main/lib/idds/agents/common/eventbus/eventbus.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ def __init__(self, logger=None):
self.attrs = attrs
self._backend = None
self._orig_backend = None
self._backup_backend = BaseEventBusBackendOpt(logger=self.logger, **attrs)
if 'backend' in attrs:
if attrs['backend'] == 'message':
self.backend = MsgEventBusBackend(logger=self.logger, **attrs)
Expand All @@ -57,18 +58,20 @@ def __init__(self, logger=None):
if self.backend is None:
self.backend = BaseEventBusBackendOpt(logger=self.logger, **attrs)
self.logger.info("EventBus backend : %s" % self.backend)
self._orig_backend = self.backend
self.backend.start()

@property
def backend(self):
if self._backend and isinstance(self._backend, MsgEventBusBackend) and not self._backend.is_ok():
self._orig_backend = self._backend
self._backend = BaseEventBusBackendOpt(logger=self.logger, **self.attrs)
# self._orig_backend = self._backend
# self._backend = BaseEventBusBackendOpt(logger=self.logger, **self.attrs)
self._backend = self._backup_backend
self.logger.critical("MsgEventBusBackend failed, switch to use BaseEventBusBackendOpt")
elif self._orig_backend and isinstance(self._orig_backend, MsgEventBusBackend) and self._orig_backend.is_ok():
self.logger.critical("MsgEventBusBackend is ok, switch back to use it")
self._backend = self._orig_backend
self._orig_backend = None
# self._orig_backend = None
return self._backend

@backend.setter
Expand Down Expand Up @@ -125,8 +128,12 @@ def fail_event(self, event):

def set_manager(self, manager):
self.backend.set_manager(manager)
if self._orig_backend:
self._orig_backend.set_manager(manager)

def get_manager(self):
if self._orig_backend:
return self._orig_backend.get_manager()
return self.backend.get_manager()

def get_coordinator(self):
Expand Down
17 changes: 15 additions & 2 deletions main/lib/idds/agents/common/eventbus/msgeventbusbackend.py
Original file line number Diff line number Diff line change
Expand Up @@ -206,7 +206,8 @@ class MsgEventBusBackend(BaseEventBusBackend):
"""

def __init__(self, logger=None, coordinator_port=5556, socket_timeout=10, debug=False,
timeout_threshold=5, failure_threshold=5, **kwargs):
timeout_threshold=5, failure_threshold=5, failure_timeout=180,
num_of_set_failed_at_threshold=10, **kwargs):
super(MsgEventBusBackend, self).__init__()
self._id = str(uuid.uuid4())[:8]
self._state_claim_wait = 60
Expand All @@ -222,6 +223,10 @@ def __init__(self, logger=None, coordinator_port=5556, socket_timeout=10, debug=
self._password = ''.join(random.choice(string.ascii_uppercase + string.ascii_lowercase + string.digits) for _ in range(20))

self._is_ok = True
self._failed_at = None
self._failure_timeout = int(failure_timeout)
self._num_of_set_failed_at = 0
self._num_of_set_failed_at_threshold = int(num_of_set_failed_at_threshold)
self.num_success = 0
self.num_failures = 0
self.num_timeout = 0
Expand Down Expand Up @@ -456,8 +461,16 @@ def fail_event(self, event):
pass

def is_ok(self):
if self.num_failures > self.failure_threshold or self.num_timeout > self.timeout_threshold:
if self._num_of_set_failed_at < self._num_of_set_failed_at_threshold and self._failed_at and self._failed_at + self._failure_timeout < time.time():
self._is_ok = True
self._failed_at = None
self.num_failures = 0
self.num_timeout = 0
elif self.num_failures > self.failure_threshold or self.num_timeout > self.timeout_threshold:
self._is_ok = False
if not self._failed_at:
self._failed_at = time.time()
self._num_of_set_failed_at += 1
else:
self._is_ok = True
return self._is_ok
Expand Down
1 change: 1 addition & 0 deletions requirements.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -31,4 +31,5 @@ dependencies:
- deepdiff
- pyzmq
- anytree
- networkx
- oic
1 change: 1 addition & 0 deletions workflow/tools/env/environment.yml
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ dependencies:
- pep8 # checks for PEP8 code style compliance
- flake8 # Wrapper around PyFlakes&pep8
- anytree
- networkx
- pytest # python testing tool
- nose # nose test tools
- idds-common==0.11.5

0 comments on commit 01620bd

Please sign in to comment.