From 7e90e27ff366eda76e3c955fdb2c11c8d56eb478 Mon Sep 17 00:00:00 2001 From: Wen Guan Date: Wed, 7 Jun 2023 11:19:00 +0200 Subject: [PATCH 1/2] set msg eventbus to be able to come back after failures --- .../common/eventbus/msgeventbusbackend.py | 17 +++++++++++++++-- 1 file changed, 15 insertions(+), 2 deletions(-) diff --git a/main/lib/idds/agents/common/eventbus/msgeventbusbackend.py b/main/lib/idds/agents/common/eventbus/msgeventbusbackend.py index 205af0f1..d1a7ddd8 100644 --- a/main/lib/idds/agents/common/eventbus/msgeventbusbackend.py +++ b/main/lib/idds/agents/common/eventbus/msgeventbusbackend.py @@ -206,7 +206,8 @@ class MsgEventBusBackend(BaseEventBusBackend): """ def __init__(self, logger=None, coordinator_port=5556, socket_timeout=10, debug=False, - timeout_threshold=5, failure_threshold=5, **kwargs): + timeout_threshold=5, failure_threshold=5, failure_timeout=180, + num_of_set_failed_at_threshold=10, **kwargs): super(MsgEventBusBackend, self).__init__() self._id = str(uuid.uuid4())[:8] self._state_claim_wait = 60 @@ -222,6 +223,10 @@ def __init__(self, logger=None, coordinator_port=5556, socket_timeout=10, debug= self._password = ''.join(random.choice(string.ascii_uppercase + string.ascii_lowercase + string.digits) for _ in range(20)) self._is_ok = True + self._failed_at = None + self._failure_timeout = int(failure_timeout) + self._num_of_set_failed_at = 0 + self._num_of_set_failed_at_threshold = int(num_of_set_failed_at_threshold) self.num_success = 0 self.num_failures = 0 self.num_timeout = 0 @@ -456,8 +461,16 @@ def fail_event(self, event): pass def is_ok(self): - if self.num_failures > self.failure_threshold or self.num_timeout > self.timeout_threshold: + if self._num_of_set_failed_at < self._num_of_set_failed_at_threshold and self._failed_at and self._failed_at + self._failure_timeout < time.time(): + self._is_ok = True + self._failed_at = None + self.num_failures = 0 + self.num_timeout = 0 + elif self.num_failures > self.failure_threshold or self.num_timeout > self.timeout_threshold: self._is_ok = False + if not self._failed_at: + self._failed_at = time.time() + self._num_of_set_failed_at += 1 else: self._is_ok = True return self._is_ok From c92e0c491e7e1f26e34b48ea07b986c6900c767f Mon Sep 17 00:00:00 2001 From: Wen Guan Date: Wed, 7 Jun 2023 11:20:23 +0200 Subject: [PATCH 2/2] set eventbus manager correctly when it fails --- main/lib/idds/agents/common/eventbus/eventbus.py | 13 ++++++++++--- requirements.yaml | 1 + workflow/tools/env/environment.yml | 1 + 3 files changed, 12 insertions(+), 3 deletions(-) diff --git a/main/lib/idds/agents/common/eventbus/eventbus.py b/main/lib/idds/agents/common/eventbus/eventbus.py index 59055d15..79502f3f 100644 --- a/main/lib/idds/agents/common/eventbus/eventbus.py +++ b/main/lib/idds/agents/common/eventbus/eventbus.py @@ -47,6 +47,7 @@ def __init__(self, logger=None): self.attrs = attrs self._backend = None self._orig_backend = None + self._backup_backend = BaseEventBusBackendOpt(logger=self.logger, **attrs) if 'backend' in attrs: if attrs['backend'] == 'message': self.backend = MsgEventBusBackend(logger=self.logger, **attrs) @@ -57,18 +58,20 @@ def __init__(self, logger=None): if self.backend is None: self.backend = BaseEventBusBackendOpt(logger=self.logger, **attrs) self.logger.info("EventBus backend : %s" % self.backend) + self._orig_backend = self.backend self.backend.start() @property def backend(self): if self._backend and isinstance(self._backend, MsgEventBusBackend) and not self._backend.is_ok(): - self._orig_backend = self._backend - self._backend = BaseEventBusBackendOpt(logger=self.logger, **self.attrs) + # self._orig_backend = self._backend + # self._backend = BaseEventBusBackendOpt(logger=self.logger, **self.attrs) + self._backend = self._backup_backend self.logger.critical("MsgEventBusBackend failed, switch to use BaseEventBusBackendOpt") elif self._orig_backend and isinstance(self._orig_backend, MsgEventBusBackend) and self._orig_backend.is_ok(): self.logger.critical("MsgEventBusBackend is ok, switch back to use it") self._backend = self._orig_backend - self._orig_backend = None + # self._orig_backend = None return self._backend @backend.setter @@ -125,8 +128,12 @@ def fail_event(self, event): def set_manager(self, manager): self.backend.set_manager(manager) + if self._orig_backend: + self._orig_backend.set_manager(manager) def get_manager(self): + if self._orig_backend: + return self._orig_backend.get_manager() return self.backend.get_manager() def get_coordinator(self): diff --git a/requirements.yaml b/requirements.yaml index 07445ab5..0677ef71 100644 --- a/requirements.yaml +++ b/requirements.yaml @@ -31,4 +31,5 @@ dependencies: - deepdiff - pyzmq - anytree + - networkx - oic diff --git a/workflow/tools/env/environment.yml b/workflow/tools/env/environment.yml index 13c48f03..e9ee95f7 100644 --- a/workflow/tools/env/environment.yml +++ b/workflow/tools/env/environment.yml @@ -7,6 +7,7 @@ dependencies: - pep8 # checks for PEP8 code style compliance - flake8 # Wrapper around PyFlakes&pep8 - anytree + - networkx - pytest # python testing tool - nose # nose test tools - idds-common==0.11.5