Skip to content

Commit

Permalink
Merge pull request #353 from HSF/dev
Browse files Browse the repository at this point in the history
Dev
  • Loading branch information
wguanicedew authored Oct 14, 2024
2 parents b27a0c9 + 1953616 commit 5d2e353
Show file tree
Hide file tree
Showing 21 changed files with 668 additions and 268 deletions.
14 changes: 14 additions & 0 deletions common/lib/idds/common/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -386,6 +386,20 @@ class ProcessingStatus(IDDSEnum):
Synchronizing = 30


Terminated_processing_status = [ProcessingStatus.Finished,
ProcessingStatus.Failed,
ProcessingStatus.Lost,
ProcessingStatus.FinishedOnStep,
ProcessingStatus.FinishedOnExec,
ProcessingStatus.FinishedTerm,
ProcessingStatus.SubFinished,
ProcessingStatus.Cancelled,
ProcessingStatus.Suspended,
ProcessingStatus.Expired,
ProcessingStatus.Broken
]


class ProcessingLocking(IDDSEnum):
Idle = 0
Locking = 1
Expand Down
75 changes: 73 additions & 2 deletions common/lib/idds/common/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,11 @@
# - Lino Oscar Gerlach, <[email protected]>, 2024

import base64
import concurrent.futures
import contextlib
import errno
import datetime
import functools
import importlib
import hashlib
import logging
Expand Down Expand Up @@ -883,15 +885,19 @@ def import_attribute(name: str) -> Callable[..., Any]:
return getattr(attribute_owner, attribute_name)


def decode_base64(sb):
def decode_base64(sb, remove_quotes=False):
try:
if isinstance(sb, str):
sb_bytes = bytes(sb, 'ascii')
elif isinstance(sb, bytes):
sb_bytes = sb
else:
return sb
return base64.b64decode(sb_bytes).decode("utf-8")
decode_str = base64.b64decode(sb_bytes).decode("utf-8")
# remove the single quotes afeter decoding
if remove_quotes:
return decode_str[1:-1]
return decode_str
except Exception as ex:
logging.error("decode_base64 %s: %s" % (sb, ex))
return sb
Expand Down Expand Up @@ -953,6 +959,15 @@ def __str__(self):
return '****'


def is_panda_client_verbose():
verbose = os.environ.get("PANDA_CLIENT_VERBOSE", None)
if verbose:
verbose = verbose.lower()
if verbose == 'true':
return True
return False


def get_unique_id_for_dict(dict_):
ret = hashlib.sha1(json.dumps(dict_, sort_keys=True).encode()).hexdigest()
# logging.debug("get_unique_id_for_dict, type: %s: %s, ret: %s" % (type(dict_), dict_, ret))
Expand Down Expand Up @@ -996,3 +1011,59 @@ def modified_environ(*remove, **update):
finally:
env.update(update_after)
[env.pop(k) for k in remove_after]


def run_with_timeout(func, args=(), kwargs={}, timeout=None, retries=1):
"""
Run a function with a timeout.
Parameters:
func (callable): The function to run.
args (tuple): The arguments to pass to the function.
kwargs (dict): The keyword arguments to pass to the function.
timeout (float or int): The time limit in seconds.
Returns:
The function's result if it finishes within the timeout.
Raises TimeoutError if the function takes longer than the specified timeout.
"""
for i in range(retries):
with concurrent.futures.ThreadPoolExecutor() as executor:
future = executor.submit(func, *args, **kwargs)
try:
if i > 0:
logging.info(f"retry {i} to execute function.")
return future.result(timeout=timeout)
except concurrent.futures.TimeoutError:
# raise TimeoutError(f"Function '{func.__name__}' timed out after {timeout} seconds.")
logging.error(f"Function '{func.__name__}' timed out after {timeout} seconds in retry {i}.")
return TimeoutError(f"Function '{func.__name__}' timed out after {timeout} seconds.")


def timeout_wrapper(timeout, retries=1):
"""
Decorator to timeout a function after a given number of seconds.
Parameters:
seconds (int or float): The time limit in seconds.
Raises:
TimeoutError: If the function execution exceeds the time limit.
"""
def decorator(func):
@functools.wraps(func)
def wrapper(*args, **kwargs):
for i in range(retries):
with concurrent.futures.ThreadPoolExecutor() as executor:
future = executor.submit(func, *args, **kwargs)
try:
if i > 0:
logging.info(f"retry {i} to execute function.")

return future.result(timeout=timeout)
except concurrent.futures.TimeoutError:
# raise TimeoutError(f"Function '{func.__name__}' timed out after {seconds} seconds.")
logging.error(f"Function '{func.__name__}' timed out after {timeout} seconds in retry {i}.")
return TimeoutError(f"Function '{func.__name__}' timed out after {timeout} seconds.")
return wrapper
return decorator
2 changes: 1 addition & 1 deletion doma/lib/idds/doma/workflowv2/domapandawork.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
except ImportError:
import configparser as ConfigParser

import concurrent
import concurrent.futures
import datetime
import os
import time
Expand Down
25 changes: 22 additions & 3 deletions main/lib/idds/agents/carrier/finisher.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,8 @@

from idds.common import exceptions
from idds.common.constants import (Sections, ReturnCode, ProcessingType,
ProcessingStatus, ProcessingLocking)
ProcessingStatus, ProcessingLocking,
Terminated_processing_status)
from idds.common.utils import setup_logging, truncate_string
from idds.core import processings as core_processings
from idds.agents.common.baseagent import BaseAgent
Expand Down Expand Up @@ -185,7 +186,16 @@ def process_sync_processing(self, event):
pr = self.get_processing(processing_id=event._processing_id, locking=True)
if not pr:
self.logger.error("Cannot find processing for event: %s" % str(event))
pro_ret = ReturnCode.Locked.value
# pro_ret = ReturnCode.Locked.value
pro_ret = ReturnCode.Ok.value
elif pr['status'] in Terminated_processing_status:
parameters = {'locking': ProcessingLocking.Idle}
update_processing = {'processing_id': pr['processing_id'],
'parameters': parameters}
ret = {'update_processing': update_processing,
'update_contents': []}
self.update_processing(ret, pr)
pro_ret = ReturnCode.Ok.value
else:
log_pre = self.get_log_prefix(pr)

Expand Down Expand Up @@ -281,7 +291,16 @@ def process_terminated_processing(self, event):
pr = self.get_processing(processing_id=event._processing_id, locking=True)
if not pr:
self.logger.error("Cannot find processing for event: %s" % str(event))
pro_ret = ReturnCode.Locked.value
# pro_ret = ReturnCode.Locked.value
pro_ret = ReturnCode.Ok.value
elif pr['status'] in Terminated_processing_status:
parameters = {'locking': ProcessingLocking.Idle}
update_processing = {'processing_id': pr['processing_id'],
'parameters': parameters}
ret = {'update_processing': update_processing,
'update_contents': []}
self.update_processing(ret, pr)
pro_ret = ReturnCode.Ok.value
else:
log_pre = self.get_log_prefix(pr)

Expand Down
53 changes: 35 additions & 18 deletions main/lib/idds/agents/carrier/poller.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,8 @@

from idds.common import exceptions
from idds.common.constants import (Sections, ReturnCode, ProcessingType,
ProcessingStatus, ProcessingLocking)
ProcessingStatus, ProcessingLocking,
Terminated_processing_status)
from idds.common.utils import setup_logging, truncate_string, json_dumps
from idds.core import processings as core_processings
from idds.agents.common.baseagent import BaseAgent
Expand All @@ -37,7 +38,7 @@ class Poller(BaseAgent):
"""

def __init__(self, num_threads=1, max_number_workers=3, poll_period=10, retries=3, retrieve_bulk_size=2,
max_updates_per_round=2000, name='Poller', message_bulk_size=1000, **kwargs):
max_updates_per_round=2000, name='Poller', message_bulk_size=1000, locking_period=1800, **kwargs):
self.max_number_workers = max_number_workers
if int(num_threads) < int(self.max_number_workers):
num_threads = int(self.max_number_workers)
Expand All @@ -47,6 +48,7 @@ def __init__(self, num_threads=1, max_number_workers=3, poll_period=10, retries=
super(Poller, self).__init__(num_threads=num_threads, name=name, **kwargs)
self.config_section = Sections.Carrier
self.poll_period = int(poll_period)
self.locking_period = int(locking_period)
self.retries = int(retries)
self.retrieve_bulk_size = int(retrieve_bulk_size)
self.message_bulk_size = int(message_bulk_size)
Expand Down Expand Up @@ -101,6 +103,8 @@ def __init__(self, num_threads=1, max_number_workers=3, poll_period=10, retries=

self.extra_executors = None

self._running_processing_status = None

def get_extra_executors(self):
if self.enable_executors:
if self.extra_executors is None:
Expand Down Expand Up @@ -150,6 +154,8 @@ def get_running_processings(self):
ProcessingStatus.ToResume, ProcessingStatus.Resuming,
ProcessingStatus.ToExpire, ProcessingStatus.Expiring,
ProcessingStatus.ToFinish, ProcessingStatus.ToForceFinish]
self._running_processing_status = processing_status

# next_poll_at = datetime.datetime.utcnow() + datetime.timedelta(seconds=self.poll_period)
processings = core_processings.get_processings_by_status(status=processing_status,
locking=True, update_poll=True,
Expand Down Expand Up @@ -181,7 +187,10 @@ def get_running_processings(self):

def get_processing(self, processing_id, status=None, locking=False):
try:
return core_processings.get_processing_by_id_status(processing_id=processing_id, status=status, locking=locking)
return core_processings.get_processing_by_id_status(processing_id=processing_id,
status=status,
locking=locking,
lock_period=self.locking_period)
except exceptions.DatabaseException as ex:
if 'ORA-00060' in str(ex):
self.logger.warn("(cx_Oracle.DatabaseError) ORA-00060: deadlock detected while waiting for resource")
Expand Down Expand Up @@ -268,21 +277,20 @@ def update_processing(self, processing, processing_model, use_bulk_update_mappin
use_bulk_update_mappings=use_bulk_update_mappings)
except exceptions.DatabaseException as ex:
if 'ORA-00060' in str(ex):
self.logger.warn(log_prefix + "(cx_Oracle.DatabaseError) ORA-00060: deadlock detected while waiting for resource")
if retry_num < 5:
retry = True
if retry_num <= 1:
random_sleep = random.randint(1, 10)
elif retry_num <= 2:
random_sleep = random.randint(1, 60)
else:
random_sleep = random.randint(1, 120)
time.sleep(random_sleep)
self.logger.warn(log_prefix + "update_processing (cx_Oracle.DatabaseError) ORA-00060: deadlock detected while waiting for resource")
else:
self.logger.error(ex)
self.logger.error(traceback.format_exc())
if retry_num < 5:
retry = True
if retry_num <= 1:
random_sleep = random.randint(1, 10)
elif retry_num <= 2:
random_sleep = random.randint(1, 60)
else:
raise ex
random_sleep = random.randint(1, 120)
time.sleep(random_sleep)
else:
# self.logger.error(ex)
# self.logger.error(traceback.format_exc())
raise ex
except Exception as ex:
self.logger.error(ex)
Expand Down Expand Up @@ -539,7 +547,16 @@ def process_update_processing(self, event):
pr = self.get_processing(processing_id=event._processing_id, status=None, locking=True)
if not pr:
self.logger.warn("Cannot find processing for event: %s" % str(event))
pro_ret = ReturnCode.Locked.value
# pro_ret = ReturnCode.Locked.value
pro_ret = ReturnCode.Ok.value
elif pr['status'] in Terminated_processing_status:
parameters = {'locking': ProcessingLocking.Idle}
update_processing = {'processing_id': pr['processing_id'],
'parameters': parameters}
ret = {'update_processing': update_processing,
'update_contents': []}
self.update_processing(ret, pr)
pro_ret = ReturnCode.Ok.value
else:
log_pre = self.get_log_prefix(pr)

Expand Down Expand Up @@ -585,7 +602,7 @@ def process_update_processing(self, event):

def clean_locks(self):
self.logger.info("clean locking")
core_processings.clean_locking()
core_processings.clean_locking(time_period=self.locking_period)

def init_event_function_map(self):
self.event_func_map = {
Expand Down
12 changes: 12 additions & 0 deletions main/lib/idds/agents/carrier/submitter.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,8 @@ def __init__(self, num_threads=1, max_number_workers=3, poll_period=10, retries=
name=name, retrieve_bulk_size=retrieve_bulk_size, **kwargs)
self.site_to_cloud = None

self._new_processing_status = None

def get_new_processings(self):
"""
Get new processing
Expand All @@ -56,6 +58,7 @@ def get_new_processings(self):
return []

processing_status = [ProcessingStatus.New]
self._new_processing_status = processing_status
processings = core_processings.get_processings_by_status(status=processing_status, locking=True,
not_lock=True,
new_poll=True, only_return_id=True,
Expand Down Expand Up @@ -168,6 +171,7 @@ def handle_new_processing(self, processing):

error = {'submit_err': {'msg': truncate_string('%s' % str(ex), length=200)}}
parameters = {'status': pr_status,
'locking': ProcessingLocking.Idle,
'new_poll_period': new_poll_period,
'errors': processing['errors'] if processing['errors'] else {},
'new_retries': retries}
Expand Down Expand Up @@ -242,6 +246,7 @@ def handle_new_iprocessing(self, processing):

error = {'submit_err': {'msg': truncate_string('%s' % str(ex), length=200)}}
parameters = {'status': pr_status,
'locking': ProcessingLocking.Idle,
'new_poll_period': new_poll_period,
'errors': processing['errors'] if processing['errors'] else {},
'new_retries': retries}
Expand All @@ -262,6 +267,13 @@ def process_new_processing(self, event):
pr = self.get_processing(processing_id=event._processing_id, status=None, locking=True)
if not pr:
self.logger.warn("Cannot find processing for event: %s" % str(event))
elif self._new_processing_status and pr['status'] not in self._new_processing_status:
parameters = {'locking': ProcessingLocking.Idle}
update_processing = {'processing_id': pr['processing_id'],
'parameters': parameters}
ret = {'update_processing': update_processing,
'update_contents': []}
self.update_processing(ret, pr)
else:
log_pre = self.get_log_prefix(pr)
self.logger.info(log_pre + "process_new_processing")
Expand Down
14 changes: 12 additions & 2 deletions main/lib/idds/agents/carrier/trigger.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,8 @@
import traceback

from idds.common import exceptions
from idds.common.constants import ProcessingStatus, ProcessingLocking, ReturnCode
from idds.common.constants import (ProcessingStatus, ProcessingLocking, ReturnCode,
Terminated_processing_status)
from idds.common.utils import setup_logging, truncate_string
from idds.core import processings as core_processings
from idds.agents.common.baseagent import BaseAgent
Expand Down Expand Up @@ -197,7 +198,16 @@ def process_trigger_processing_real(self, event):
pr = self.get_processing(processing_id=event._processing_id, status=None, locking=True)
if not pr:
self.logger.warn("Cannot find processing for event: %s" % str(event))
pro_ret = ReturnCode.Locked.value
# pro_ret = ReturnCode.Locked.value
pro_ret = ReturnCode.Ok.value
elif pr['status'] in Terminated_processing_status:
parameters = {'locking': ProcessingLocking.Idle}
update_processing = {'processing_id': pr['processing_id'],
'parameters': parameters}
ret = {'update_processing': update_processing,
'update_contents': []}
self.update_processing(ret, pr)
pro_ret = ReturnCode.Ok.value
else:
log_pre = self.get_log_prefix(pr)
self.logger.info(log_pre + "process_trigger_processing")
Expand Down
2 changes: 1 addition & 1 deletion main/lib/idds/agents/carrier/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
# Authors:
# - Wen Guan, <[email protected]>, 2022 - 2023

import concurrent
import concurrent.futures
import json
import logging
import time
Expand Down
Loading

0 comments on commit 5d2e353

Please sign in to comment.