diff --git a/Dockerfile b/Dockerfile index 203c53d7..ac83d58f 100644 --- a/Dockerfile +++ b/Dockerfile @@ -84,7 +84,7 @@ RUN source /etc/profile.d/conda.sh; conda activate /opt/idds; python3 -m pip ins RUN source /etc/profile.d/conda.sh; conda activate /opt/idds; python3 -m pip install --no-cache-dir --upgrade requests SQLAlchemy urllib3 retrying mod_wsgi flask futures stomp.py cx-Oracle unittest2 pep8 flake8 pytest nose sphinx recommonmark sphinx-rtd-theme nevergrad RUN source /etc/profile.d/conda.sh; conda activate /opt/idds; python3 -m pip install --no-cache-dir --upgrade psycopg2-binary -RUN source /etc/profile.d/conda.sh; conda activate /opt/idds; python3 -m pip install --no-cache-dir --upgrade rucio-clients-atlas rucio-clients panda-client +RUN source /etc/profile.d/conda.sh; conda activate /opt/idds; python3 -m pip install --no-cache-dir --upgrade rucio-clients-atlas rucio-clients panda-client-light WORKDIR /tmp/src @@ -137,7 +137,7 @@ RUN sed -i "s/WSGISocketPrefix\ \/var\/log\/idds\/wsgisocks\/wsgi/WSGISocketPref RUN ln -fs /opt/idds/config/idds/supervisord_idds.ini /etc/supervisord.d/idds.ini RUN ln -fs /opt/idds/config/idds/supervisord_iddsfake.ini /etc/supervisord.d/iddsfake.ini RUN ln -fs /opt/idds/config/idds/supervisord_httpd.ini /etc/supervisord.d/httpd.ini -RUN ln -fs /opt/idds/config/idds/supervisord_syslog-ng.ini /etc/supervisord.d/syslog-ng.ini +# RUN ln -fs /opt/idds/config/idds/supervisord_syslog-ng.ini /etc/supervisord.d/syslog-ng.ini # for syslog-ng RUN mv /etc/syslog-ng/syslog-ng.conf /etc/syslog-ng/syslog-ng.conf.back diff --git a/atlas/tools/env/environment.yml b/atlas/tools/env/environment.yml index fdfef10d..42f15752 100644 --- a/atlas/tools/env/environment.yml +++ b/atlas/tools/env/environment.yml @@ -10,7 +10,7 @@ dependencies: - pytest # python testing tool - nose # nose test tools - stomp.py - - panda-client # panda client + - panda-client-light # panda client - rucio-clients - rucio-clients-atlas - idds-common==0.11.5 diff --git a/common/lib/idds/common/constants.py b/common/lib/idds/common/constants.py index 1793cd02..ad52a979 100644 --- a/common/lib/idds/common/constants.py +++ b/common/lib/idds/common/constants.py @@ -146,6 +146,7 @@ class RequestStatus(IDDSEnum): Terminating = 20 Building = 21 Built = 22 + Throttling = 23 class RequestLocking(IDDSEnum): @@ -291,6 +292,7 @@ class ContentStatus(IDDSEnum): FakeAvailable = 8 Missing = 9 Cancelled = 10 + Activated = 11 class ContentLocking(IDDSEnum): @@ -429,6 +431,7 @@ class MessageStatus(IDDSEnum): Fetched = 1 Delivered = 2 Failed = 3 + ConfirmDelivered = 4 class MessageLocking(IDDSEnum): @@ -484,6 +487,11 @@ class CommandLocation(IDDSEnum): Other = 6 +class ThrottlerStatus(IDDSEnum): + InActive = 0 + Active = 1 + + class ReturnCode(IDDSEnum): Ok = 0 Failed = 255 diff --git a/common/lib/idds/common/utils.py b/common/lib/idds/common/utils.py index c9294623..cb85e9ad 100644 --- a/common/lib/idds/common/utils.py +++ b/common/lib/idds/common/utils.py @@ -19,6 +19,7 @@ import subprocess import sys import tarfile +import time # import traceback from enum import Enum @@ -67,6 +68,7 @@ def setup_logging(name, stream=None, loglevel=None): else: logging.basicConfig(stream=stream, level=loglevel, format='%(asctime)s\t%(threadName)s\t%(name)s\t%(levelname)s\t%(message)s') + logging.Formatter.converter = time.gmtime def get_logger(name, filename=None, loglevel=None): @@ -538,7 +540,7 @@ def merge_dict(dict1, dict2): else: if dict2[key] is None: continue - elif isinstance(dict1[key], type(dict2[key])): + elif not isinstance(dict1[key], type(dict2[key])): raise Exception("type of %s is different from %s, cannot merge" % (type(dict1[key]), type(dict2[key]))) elif dict1[key] == dict2[key]: continue diff --git a/docs/source/users/admin_guides.rst b/docs/source/users/admin_guides.rst index 00b27bfa..02e484e9 100644 --- a/docs/source/users/admin_guides.rst +++ b/docs/source/users/admin_guides.rst @@ -26,7 +26,7 @@ Environment setup on CENTOS 7 conda activate /opt/idds pip install idds-server idds-doma idds-atlas idds-monitor idds-website - pip install rucio-clients-atlas rucio-clients panda-client + pip install rucio-clients-atlas rucio-clients panda-client-light # # add "auth_type = x509_proxy" to /opt/idds/etc/rucio.cfg 2. setup environment after installed. diff --git a/doma/lib/idds/doma/workflowv2/domapandawork.py b/doma/lib/idds/doma/workflowv2/domapandawork.py index 03a8dd41..42a6a005 100644 --- a/doma/lib/idds/doma/workflowv2/domapandawork.py +++ b/doma/lib/idds/doma/workflowv2/domapandawork.py @@ -544,6 +544,10 @@ def create_processing(self, input_output_maps=[]): task_param_map['maxWalltime'] = self.maxWalltime task_param_map['maxFailure'] = self.maxAttempt if self.maxAttempt else 5 task_param_map['maxAttempt'] = self.maxAttempt if self.maxAttempt else 5 + if task_param_map['maxAttempt'] < self.num_retries: + task_param_map['maxAttempt'] = self.num_retries + if task_param_map['maxFailure'] < self.num_retries: + task_param_map['maxFailure'] = self.num_retries task_param_map['log'] = self.task_log task_param_map['jobParameters'] = [ {'type': 'constant', @@ -673,9 +677,12 @@ def get_processing_status_from_panda_status(self, task_status): elif task_status in ['finished', 'paused']: # finished, finishing, waiting it to be done processing_status = ProcessingStatus.SubFinished - elif task_status in ['failed', 'aborted', 'exhausted']: + elif task_status in ['failed', 'exhausted']: # aborting, tobroken processing_status = ProcessingStatus.Failed + elif task_status in ['aborted']: + # aborting, tobroken + processing_status = ProcessingStatus.Cancelled elif task_status in ['broken']: processing_status = ProcessingStatus.Broken else: @@ -786,6 +793,8 @@ def get_content_status_from_panda_status(self, job_info): return ContentStatus.FinalFailed else: return ContentStatus.Failed + elif jobstatus in ['activated']: + return ContentStatus.Activated else: return ContentStatus.Processing @@ -1117,6 +1126,28 @@ def get_contents_ext(self, input_output_maps, contents_ext, contents_ext_full, j self.logger.debug("get_contents_ext, left_contents[:1]: %s" % (str(left_contents[:3]))) return new_contents_ext_d, update_contents_ext_d, left_contents + def abort_contents(self, input_output_maps, updated_contents, contents_ext): + contents_ext_dict = {content['content_id']: content for content in contents_ext} + new_contents_ext = [] + + for map_id in input_output_maps: + outputs = input_output_maps[map_id]['outputs'] + for content in outputs: + update_content = {'content_id': content['content_id'], + 'substatus': ContentStatus.Missing} + updated_contents.append(update_content) + if content['content_id'] not in contents_ext_dict: + new_content_ext = {'content_id': content['content_id'], + 'request_id': content['request_id'], + 'transform_id': content['transform_id'], + 'workload_id': content['workload_id'], + 'coll_id': content['coll_id'], + 'map_id': content['map_id'], + 'status': ContentStatus.Missing} + new_contents_ext.append(new_content_ext) + + return updated_contents, new_contents_ext + def poll_panda_task(self, processing=None, input_output_maps=None, contents_ext=None, job_info_maps={}, log_prefix=''): task_id = None try: @@ -1161,7 +1192,9 @@ def poll_panda_task(self, processing=None, input_output_maps=None, contents_ext= new_contents_ext, update_contents_ext, left_contents = self.get_contents_ext(input_output_maps, contents_ext, contents_ext_full, job_info_maps) # if left_jobs: - # processing_status = ProcessingStatus.Running + if processing_status in [ProcessingStatus.Cancelled]: + updated_contents, new_contents_ext1 = self.abort_contents(input_output_maps, updated_contents, contents_ext) + new_contents_ext = new_contents_ext + new_contents_ext1 return processing_status, updated_contents, update_contents_full, new_contents_ext, update_contents_ext else: diff --git a/doma/tools/env/environment.yml b/doma/tools/env/environment.yml index 4847bdb4..284ee515 100644 --- a/doma/tools/env/environment.yml +++ b/doma/tools/env/environment.yml @@ -9,6 +9,6 @@ dependencies: - flake8 # Wrapper around PyFlakes&pep8 - pytest # python testing tool - nose # nose test tools - - panda-client # panda client + - panda-client-light # panda client - idds-common==0.11.5 - idds-workflow==0.11.5 \ No newline at end of file diff --git a/main/etc/sql/oracle_update.sql b/main/etc/sql/oracle_update.sql index 542b6c9b..1290c58a 100644 --- a/main/etc/sql/oracle_update.sql +++ b/main/etc/sql/oracle_update.sql @@ -399,3 +399,37 @@ alter table health modify payload VARCHAR2(2048); --- 2023.03.29 alter table contents_update add fetch_status NUMBER(2) DEFAULT 0; + + +-- 2023.05.18 +alter table requests add site VARCHAR2(50); +CREATE INDEX REQUESTS_STATUS_SITE ON requests (status, site, request_id) COMPRESS 3 LOCAL; + +alter table transforms add site VARCHAR2(50); +CREATE INDEX TRANSFORMS_STATUS_SITE ON transforms (status, site, request_id, transform_id) COMPRESS 3 LOCAL; + +alter table processings add site VARCHAR2(50); +CREATE INDEX PROCESSINGS_STATUS_SITE ON processings (status, site, request_id, transform_id, processing_id) COMPRESS 3 LOCAL; + +alter table messages add fetching_id NUMBER(12); + + +CREATE SEQUENCE THROTTLER_ID_SEQ MINVALUE 1 INCREMENT BY 1 START WITH 1 NOCACHE ORDER NOCYCLE GLOBAL; +CREATE TABLE Throttlers +( + throttler_id NUMBER(12) DEFAULT ON NULL THROTTLER_ID_SEQ.NEXTVAL constraint THROTTLER_ID_NN NOT NULL, + site VARCHAR2(50), + status NUMBER(2), + num_requests NUMBER(12), + num_transforms NUMBER(12), + num_processings NUMBER(12), + new_contents NUMBER(12), + queue_contents NUMBER(12), + created_at DATE DEFAULT SYS_EXTRACT_UTC(systimestamp(0)), + updated_at DATE DEFAULT SYS_EXTRACT_UTC(systimestamp(0)), + others CLOB, + CONSTRAINT THROTTLER_PK PRIMARY KEY (throttler_id), -- USING INDEX LOCAL, + CONSTRAINT THROTTLER_SITE_UQ UNIQUE (site) +); + +alter table Messages add (poll_period INTERVAL DAY TO SECOND DEFAULT '00 00:05:00'); diff --git a/main/lib/idds/agents/carrier/receiver.py b/main/lib/idds/agents/carrier/receiver.py index abcd8478..069471fc 100644 --- a/main/lib/idds/agents/carrier/receiver.py +++ b/main/lib/idds/agents/carrier/receiver.py @@ -54,6 +54,7 @@ def __init__(self, receiver_num_threads=8, num_threads=1, bulk_message_delay=30, self.update_processing_interval = 300 self.mode = mode + self.selected = None self.selected_receiver = None self.log_prefix = '' @@ -79,6 +80,16 @@ def stop_receiver(self): self.receiver.stop() self.receiver = None + def suspend_receiver(self): + if hasattr(self, 'receiver') and self.receiver: + self.logger.info("Stopping receiver: %s" % self.receiver) + self.receiver.suspend() + + def resume_receiver(self): + if hasattr(self, 'receiver') and self.receiver: + self.logger.info("Stopping receiver: %s" % self.receiver) + self.receiver.resume() + def is_receiver_started(self): if hasattr(self, 'receiver') and self.receiver: return True @@ -109,9 +120,15 @@ def get_output_messages(self): return msgs def is_selected(self): + selected = None if not self.selected_receiver: - return True - return self.is_self(self.selected_receiver) + selected = True + else: + selected = self.is_self(self.selected_receiver) + if self.selected is None or self.selected != selected: + self.logger.info("is_selected changed from %s to %s" % (self.selected, selected)) + self.selected = selected + return self.selected def monitor_receiver(self): if self.mode == "single": @@ -242,6 +259,8 @@ def run(self): # [self.executors.submit(self.worker, log_prefix) for i in range(self.executors.get_max_workers())] self.init_event_function_map() + self.start_receiver() + while not self.graceful_stop.is_set(): try: self.execute_schedules() @@ -250,11 +269,11 @@ def run(self): if self.is_selected(): if not self.is_receiver_started(): - self.start_receiver() + self.resume_receiver() if not self.is_selected(): if self.is_receiver_started(): - self.stop_receiver() + self.suspend_receiver() msg = self.get_output_messages() if msg: diff --git a/main/lib/idds/agents/carrier/utils.py b/main/lib/idds/agents/carrier/utils.py index 8f6e6836..42bf35c6 100644 --- a/main/lib/idds/agents/carrier/utils.py +++ b/main/lib/idds/agents/carrier/utils.py @@ -315,10 +315,13 @@ def generate_file_messages(request_id, transform_id, workload_id, work, files, r file_message = {'scope': file['scope'], 'name': file['name'], 'path': file['path'], + 'map_id': file['map_id'], + 'content_id': file['content_id'], 'status': file_status} files_message.append(file_message) msg_content = {'msg_type': i_msg_type_str.value, 'request_id': request_id, + 'transform_id': transform_id, 'workload_id': workload_id, 'relation_type': relation_type, 'files': files_message} @@ -341,6 +344,7 @@ def generate_content_ext_messages(request_id, transform_id, workload_id, work, f msg_content = {'msg_type': i_msg_type_str.value, 'request_id': request_id, 'workload_id': workload_id, + 'transform_id': transform_id, 'relation_type': relation_type, 'files': files_message} num_msg_content = len(files_message) @@ -356,6 +360,7 @@ def generate_collection_messages(request_id, transform_id, workload_id, work, co msg_content = {'msg_type': i_msg_type_str.value, 'request_id': request_id, 'workload_id': workload_id, + 'transform_id': transform_id, 'relation_type': relation_type, 'collections': [{'scope': collection.scope, 'name': coll_name, @@ -371,6 +376,7 @@ def generate_work_messages(request_id, transform_id, workload_id, work, relation msg_content = {'msg_type': i_msg_type_str.value, 'request_id': request_id, 'workload_id': workload_id, + 'transform_id': transform_id, 'relation_type': relation_type, 'status': work.get_status().name, 'output': work.get_output_data(), @@ -1085,6 +1091,7 @@ def handle_trigger_processing(processing, agent_attributes, trigger_new_updates= def get_content_status_from_panda_msg_status(status): status_map = {'starting': ContentStatus.New, + 'activated': ContentStatus.Activated, 'running': ContentStatus.Processing, 'finished': ContentStatus.Available, 'failed': ContentStatus.Failed} @@ -1370,7 +1377,9 @@ def handle_messages_processing(messages, logger=None, log_prefix='', update_proc job_id = msg['jobid'] status = msg['status'] inputs = msg['inputs'] - if inputs and status in ['finished']: + # if inputs and status in ['finished']: + # add activated + if inputs and status in ['finished', 'activated']: logger.debug(log_prefix + "Received message: %s" % str(ori_msg)) ret_req_tf_pr_id = get_workload_id_transform_id_map(workload_id, logger=logger, log_prefix=log_prefix) diff --git a/main/lib/idds/agents/clerk/clerk.py b/main/lib/idds/agents/clerk/clerk.py index c6465e6a..6296b2c3 100644 --- a/main/lib/idds/agents/clerk/clerk.py +++ b/main/lib/idds/agents/clerk/clerk.py @@ -16,11 +16,15 @@ from idds.common import exceptions from idds.common.constants import (Sections, ReturnCode, RequestStatus, RequestLocking, - TransformStatus, CommandType, - CommandStatus, CommandLocking) + TransformStatus, ProcessingStatus, + ContentStatus, ContentRelationType, + CommandType, CommandStatus, CommandLocking) from idds.common.utils import setup_logging, truncate_string from idds.core import (requests as core_requests, transforms as core_transforms, + processings as core_processings, + catalog as core_catalog, + throttlers as core_throttlers, commands as core_commands) from idds.agents.common.baseagent import BaseAgent from idds.agents.common.eventbus.event import (EventType, @@ -34,6 +38,9 @@ ResumeTransformEvent, ExpireRequestEvent) +from idds.agents.common.cache.redis import get_redis_cache + + setup_logging(__name__) @@ -42,7 +49,7 @@ class Clerk(BaseAgent): Clerk works to process requests and converts requests to transforms. """ - def __init__(self, num_threads=1, poll_period=10, retrieve_bulk_size=10, pending_time=None, **kwargs): + def __init__(self, num_threads=1, poll_period=10, retrieve_bulk_size=10, cache_expire_seconds=300, pending_time=None, **kwargs): self.set_max_workers() num_threads = self.max_number_workers super(Clerk, self).__init__(num_threads=num_threads, name='Clerk', **kwargs) @@ -54,6 +61,8 @@ def __init__(self, num_threads=1, poll_period=10, retrieve_bulk_size=10, pending else: self.pending_time = None + self.cache_expire_seconds = int(cache_expire_seconds) + if not hasattr(self, 'release_helper') or not self.release_helper: self.release_helper = False elif str(self.release_helper).lower() == 'true': @@ -69,6 +78,10 @@ def __init__(self, num_threads=1, poll_period=10, retrieve_bulk_size=10, pending self.update_poll_period = self.poll_period else: self.update_poll_period = int(self.update_poll_period) + if not hasattr(self, 'throttle_poll_period') or not self.throttle_poll_period: + self.throttle_poll_period = self.poll_period + else: + self.throttle_poll_period = int(self.new_poll_period) if hasattr(self, 'poll_period_increase_rate'): self.poll_period_increase_rate = float(self.poll_period_increase_rate) @@ -133,7 +146,7 @@ def get_new_requests(self): self.show_queue_size() - req_status = [RequestStatus.New, RequestStatus.Extend, RequestStatus.Built] + req_status = [RequestStatus.New, RequestStatus.Extend, RequestStatus.Built, RequestStatus.Throttling] reqs_new = core_requests.get_requests_by_status_type(status=req_status, locking=True, not_lock=True, new_poll=True, only_return_id=True, @@ -271,9 +284,11 @@ def get_request(self, request_id, status=None, locking=False): self.logger.error(traceback.format_exc()) return None - def load_poll_period(self, req, parameters): + def load_poll_period(self, req, parameters, throttling=False): if self.new_poll_period and req['new_poll_period'] != self.new_poll_period: parameters['new_poll_period'] = self.new_poll_period + if throttling: + parameters['new_poll_period'] = self.throttle_poll_period if self.update_poll_period and req['update_poll_period'] != self.update_poll_period: parameters['update_poll_period'] = self.update_poll_period parameters['max_new_retries'] = req['max_new_retries'] if req['max_new_retries'] is not None else self.max_new_retries @@ -342,6 +357,192 @@ def generate_transform(self, req, work, build=False): return new_transform + def get_num_active_requests(self, site_name): + cache = get_redis_cache() + num_requests = cache.get("num_requests", default=None) + if num_requests is None: + num_requests = {} + active_status = [RequestStatus.New, RequestStatus.Ready, RequestStatus.Throttling] + active_status1 = [RequestStatus.Transforming, RequestStatus.Terminating] + rets = core_requests.get_num_active_requests(active_status + active_status1) + for ret in rets: + status, site, count = ret + if site is None: + site = 'Default' + if site not in num_requests: + num_requests[site] = {'new': 0, 'processing': 0} + if status in active_status: + num_requests[site]['new'] += count + elif status in active_status1: + num_requests[site]['processing'] += count + cache.set("num_requests", num_requests, expire_seconds=self.cache_expire_seconds) + default_value = {'new': 0, 'processing': 0} + return num_requests.get(site_name, default_value) + + def get_num_active_transforms(self, site_name): + cache = get_redis_cache() + num_transforms = cache.get("num_transforms", default=None) + if num_transforms is None: + num_transforms = {} + active_status = [TransformStatus.New, TransformStatus.Ready] + active_status1 = [TransformStatus.Transforming, TransformStatus.Terminating] + rets = core_transforms.get_num_active_transforms(active_status + active_status1) + for ret in rets: + status, site, count = ret + if site is None: + site = 'Default' + if site not in num_transforms: + num_transforms[site] = {'new': 0, 'processing': 0} + if status in active_status: + num_transforms[site]['new'] += count + elif status in active_status1: + num_transforms[site]['processing'] += count + cache.set("num_transforms", num_transforms, expire_seconds=self.cache_expire_seconds) + default_value = {'new': 0, 'processing': 0} + return num_transforms.get(site_name, default_value) + + def get_num_active_processings(self, site_name): + cache = get_redis_cache() + num_processings = cache.get("num_processings", default=None) + active_transforms = cache.get("active_transforms", default={}) + if num_processings is None: + num_processings = {} + active_status = [ProcessingStatus.New] + active_status1 = [ProcessingStatus.Submitting, ProcessingStatus.Submitted, + ProcessingStatus.Running, ProcessingStatus.Terminating, ProcessingStatus.ToTrigger, + ProcessingStatus.Triggering] + rets = core_processings.get_active_processings(active_status + active_status1) + for ret in rets: + req_id, trf_id, pr_id, site, status = ret + if site is None: + site = 'Default' + if site not in num_processings: + num_processings[site] = {'new': 0, 'processing': 0} + active_transforms[site] = [] + if status in active_status: + num_processings[site]['new'] += 1 + elif status in active_status1: + num_processings[site]['processing'] += 1 + active_transforms[site].append(trf_id) + cache.set("num_processings", num_processings, expire_seconds=self.cache_expire_seconds) + cache.set("active_transforms", active_transforms, expire_seconds=self.cache_expire_seconds) + default_value = {'new': 0, 'processing': 0} + return num_processings.get(site_name, default_value), active_transforms + + def get_num_active_contents(self, site_name, active_transform_ids): + cache = get_redis_cache() + # 1. input contents not terminated + # 2. output contents not terminated + tf_id_site_map = {} + all_tf_ids = [] + for site in active_transform_ids: + all_tf_ids += active_transform_ids[site] + for tf_id in active_transform_ids[site]: + tf_id_site_map[tf_id] = site + + num_input_contents = cache.get("num_input_contents", default=None) + num_output_contents = cache.get("num_output_contents", default=None) + if num_input_contents is None or num_output_contents is None: + num_input_contents, num_output_contents = {}, {} + ret = core_catalog.get_content_status_statistics_by_relation_type(all_tf_ids) + for item in ret: + status, relation_type, transform_id, count = item + site = tf_id_site_map[transform_id] + if site not in num_input_contents: + num_input_contents[site] = {'new': 0, 'activated': 0, 'processed': 0} + num_output_contents[site] = {'new': 0, 'activated': 0, 'processed': 0} + if status in [ContentStatus.New]: + if relation_type == ContentRelationType.Input: + num_input_contents[site]['new'] += count + elif relation_type == ContentRelationType.Output: + num_output_contents[site]['new'] += count + if status in [ContentStatus.Activated]: + if relation_type == ContentRelationType.Input: + num_input_contents[site]['activated'] += count + elif relation_type == ContentRelationType.Output: + num_output_contents[site]['activated'] += count + else: + if relation_type == ContentRelationType.Input: + num_input_contents[site]['processed'] += count + elif relation_type == ContentRelationType.Output: + num_output_contents[site]['processed'] += count + + cache.set("num_input_contents", num_input_contents, expire_seconds=self.cache_expire_seconds) + cache.set("num_output_contents", num_output_contents, expire_seconds=self.cache_expire_seconds) + default_value = {'new': 0, 'activated': 0, 'processed': 0} + return num_input_contents.get(site_name, default_value), num_output_contents.get(site_name, default_value) + + def get_throttlers(self): + cache = get_redis_cache() + throttlers = cache.get("throttlers", default=None) + if throttlers is None: + throttler_items = core_throttlers.get_throttlers() + throttlers = {} + for item in throttler_items: + throttlers[item['site']] = {'num_requests': item['num_requests'], + 'num_transforms': item['num_transforms'], + 'num_processings': item['num_processings'], + 'new_contents': item['new_contents'], + 'queue_contents': item['queue_contents'], + 'others': item['others'], + 'status': item['status']} + cache.set("throttlers", throttlers, expire_seconds=self.cache_expire_seconds) + return throttlers + + def whether_to_throttle(self, request): + try: + site = request['site'] + if site is None: + site = 'Default' + throttlers = self.get_throttlers() + num_requests = self.get_num_active_requests(site) + num_transforms = self.get_num_active_transforms(site) + num_processings, active_transforms = self.get_num_active_processings(site) + num_input_contents, num_output_contents = self.get_num_active_contents(site, active_transforms) + self.logger.info("throttler(site: %s): active requests(%s), transforms(%s), processings(%s)" % (site, num_requests, num_transforms, num_processings)) + self.logger.info("throttler(site: %s): active input contents(%s), output contents(%s)" % (site, num_input_contents, num_output_contents)) + + throttle_requests = throttlers.get(site, {}).get('num_requests', None) + throttle_transforms = throttlers.get(site, {}).get('num_transforms', None) + throttle_processings = throttlers.get(site, {}).get('num_processings', None) + throttle_new_jobs = throttlers.get(site, {}).get('new_contents', None) + throttle_queue_jobs = throttlers.get(site, {}).get('queue_contents', None) + self.logger.info("throttler(site: %s): throttle_requests %s, throttle_transforms: %s, throttle_processings: %s" % (site, throttle_requests, throttle_transforms, throttle_processings)) + if throttle_requests: + if num_requests['processing'] >= throttle_requests: + self.logger.info("throttler(site: %s): num of processing requests (%s) is bigger than throttle_requests (%s), set throttling" % (site, num_requests['processing'], throttle_requests)) + return True + if throttle_transforms: + if num_transforms['processing'] >= throttle_transforms: + self.logger.info("throttler(site: %s): num of processing transforms (%s) is bigger than throttle_transforms (%s), set throttling" % (site, num_transforms['processing'], throttle_transforms)) + return True + if throttle_processings: + if num_processings['processing'] >= throttle_processings: + self.logger.info("throttler(site: %s): num of processing processings (%s) is bigger than throttle_processings (%s), set throttling" % (site, num_processings['processing'], throttle_processings)) + return True + + new_jobs = num_input_contents['new'] + released_jobs = num_input_contents['processed'] + terminated_jobs = num_output_contents['processed'] + queue_jobs = released_jobs - terminated_jobs + + self.logger.info("throttler(site: %s): throttle_new_jobs: %s, throttle_queue_jobs: %s" % (site, throttle_new_jobs, throttle_queue_jobs)) + self.logger.info("throttler(site: %s): new_jobs: %s, queue_jobs: %s" % (site, new_jobs, queue_jobs)) + if throttle_new_jobs: + if new_jobs >= throttle_new_jobs: + self.logger.info("throttler(site: %s): num of new jobs(not released) (%s) is bigger than throttle_new_jobs (%s), set throttling" % (site, new_jobs, throttle_new_jobs)) + return True + if throttle_queue_jobs: + if queue_jobs >= throttle_queue_jobs: + self.logger.info("throttler(site: %s): num of queue jobs(released but not terminated) (%s) is bigger than throttle_queue_jobs (%s), set throttling" % (site, queue_jobs, throttle_queue_jobs)) + return True + + return False + except Exception as ex: + self.logger.error("whether_to_throttle: %s" % str(ex)) + self.logger.error(traceback.format_exc()) + return False + def get_log_prefix(self, req): return "" % req['request_id'] @@ -349,34 +550,42 @@ def handle_new_request(self, req): try: log_pre = self.get_log_prefix(req) self.logger.info(log_pre + "Handle new request") - workflow = req['request_metadata']['workflow'] - - # wf = workflow.copy() - wf = workflow - works = wf.get_new_works() - transforms = [] - for work in works: - # new_work = work.copy() - new_work = work - new_work.add_proxy(wf.get_proxy()) - # new_work.set_request_id(req['request_id']) - # new_work.create_processing() - - transform = self.generate_transform(req, work) - transforms.append(transform) - self.logger.debug(log_pre + "Processing request(%s): new transforms: %s" % (req['request_id'], - str(transforms))) - # processing_metadata = req['processing_metadata'] - # processing_metadata = {'workflow_data': wf.get_running_data()} - - ret_req = {'request_id': req['request_id'], - 'parameters': {'status': RequestStatus.Transforming, - 'locking': RequestLocking.Idle, - # 'processing_metadata': processing_metadata, - 'request_metadata': req['request_metadata']}, - 'new_transforms': transforms} - ret_req['parameters'] = self.load_poll_period(req, ret_req['parameters']) - self.logger.info(log_pre + "Handle new request result: %s" % str(ret_req)) + to_throttle = self.whether_to_throttle(req) + if to_throttle: + ret_req = {'request_id': req['request_id'], + 'parameters': {'status': RequestStatus.Throttling, + 'locking': RequestLocking.Idle}} + ret_req['parameters'] = self.load_poll_period(req, ret_req['parameters'], throttling=True) + self.logger.info(log_pre + "Throttle new request result: %s" % str(ret_req)) + else: + workflow = req['request_metadata']['workflow'] + + # wf = workflow.copy() + wf = workflow + works = wf.get_new_works() + transforms = [] + for work in works: + # new_work = work.copy() + new_work = work + new_work.add_proxy(wf.get_proxy()) + # new_work.set_request_id(req['request_id']) + # new_work.create_processing() + + transform = self.generate_transform(req, work) + transforms.append(transform) + self.logger.debug(log_pre + "Processing request(%s): new transforms: %s" % (req['request_id'], + str(transforms))) + # processing_metadata = req['processing_metadata'] + # processing_metadata = {'workflow_data': wf.get_running_data()} + + ret_req = {'request_id': req['request_id'], + 'parameters': {'status': RequestStatus.Transforming, + 'locking': RequestLocking.Idle, + # 'processing_metadata': processing_metadata, + 'request_metadata': req['request_metadata']}, + 'new_transforms': transforms} + ret_req['parameters'] = self.load_poll_period(req, ret_req['parameters']) + self.logger.info(log_pre + "Handle new request result: %s" % str(ret_req)) except Exception as ex: self.logger.error(ex) self.logger.error(traceback.format_exc()) @@ -537,7 +746,8 @@ def process_new_request(self, event): self.number_workers += 1 try: if event: - req_status = [RequestStatus.New, RequestStatus.Extend, RequestStatus.Built] + # req_status = [RequestStatus.New, RequestStatus.Extend, RequestStatus.Built] + req_status = [RequestStatus.New, RequestStatus.Extend, RequestStatus.Built, RequestStatus.Throttling] req = self.get_request(request_id=event._request_id, status=req_status, locking=True) if not req: self.logger.error("Cannot find request for event: %s" % str(event)) diff --git a/main/lib/idds/agents/common/plugins/messaging.py b/main/lib/idds/agents/common/plugins/messaging.py index a398779a..6b181b92 100644 --- a/main/lib/idds/agents/common/plugins/messaging.py +++ b/main/lib/idds/agents/common/plugins/messaging.py @@ -62,6 +62,8 @@ def __init__(self, name="MessagingSender", logger=None, **kwargs): if logger: self.logger = logger self.graceful_stop = threading.Event() + self.graceful_suspend = threading.Event() + self.request_queue = None self.output_queue = None self.response_queue = None @@ -94,6 +96,12 @@ def get_logger(self): def stop(self): self.graceful_stop.set() + def suspend(self): + self.graceful_suspend.set() + + def resume(self): + self.graceful_suspend.clear() + def set_request_queue(self, request_queue): self.request_queue = request_queue @@ -138,7 +146,7 @@ def connect_to_messaging_brokers(self, sender=True): for broker, port in broker_addresses: conn = stomp.Connection12(host_and_ports=[(broker, port)], keepalive=True, - heartbeats=(60000, 60000), # one minute + heartbeats=(60000, 0), # one minute timeout=timeout) conns.append(conn) channel_conns[name] = conns @@ -151,7 +159,8 @@ def disconnect(self, conns): if conns[name]: for conn in conns[name]: try: - conn.disconnect() + if conn.is_connected(): + conn.disconnect() except Exception: pass @@ -261,27 +270,34 @@ def execute_subscribe(self): self.logger.error("Messaging receiver throws an exception: %s, %s" % (error, traceback.format_exc())) while not self.graceful_stop.is_set(): - has_failed_connection = False - try: - for name in self.receiver_conns: - for conn in self.receiver_conns[name]: - if not conn.is_connected(): - conn.set_listener('message-receiver', self.get_listener(conn.transport._Transport__host_and_ports[0])) - # conn.start() - conn.connect(self.channels[name]['username'], self.channels[name]['password'], wait=True) - conn.subscribe(destination=self.channels[name]['destination'], id='atlas-idds-messaging', ack='auto') - time.sleep(0.1) - except Exception as error: - self.logger.error("Messaging receiver throws an exception: %s, %s" % (error, traceback.format_exc())) - has_failed_connection = True - - if has_failed_connection or len(self.receiver_conns) == 0: + if self.graceful_suspend.is_set(): try: - # re-subscribe self.disconnect(self.receiver_conns) - self.subscribe() except Exception as error: self.logger.error("Messaging receiver throws an exception: %s, %s" % (error, traceback.format_exc())) + time.sleep(1) + else: + has_failed_connection = False + try: + for name in self.receiver_conns: + for conn in self.receiver_conns[name]: + if not conn.is_connected(): + conn.set_listener('message-receiver', self.get_listener(conn.transport._Transport__host_and_ports[0])) + # conn.start() + conn.connect(self.channels[name]['username'], self.channels[name]['password'], wait=True) + conn.subscribe(destination=self.channels[name]['destination'], id='atlas-idds-messaging', ack='auto') + time.sleep(0.1) + except Exception as error: + self.logger.error("Messaging receiver throws an exception: %s, %s" % (error, traceback.format_exc())) + has_failed_connection = True + + if has_failed_connection or len(self.receiver_conns) == 0: + try: + # re-subscribe + self.disconnect(self.receiver_conns) + self.subscribe() + except Exception as error: + self.logger.error("Messaging receiver throws an exception: %s, %s" % (error, traceback.format_exc())) self.logger.info('receiver graceful stop requested') diff --git a/main/lib/idds/agents/conductor/conductor.py b/main/lib/idds/agents/conductor/conductor.py index e51fa8ca..0f0b360a 100644 --- a/main/lib/idds/agents/conductor/conductor.py +++ b/main/lib/idds/agents/conductor/conductor.py @@ -8,6 +8,8 @@ # Authors: # - Wen Guan, , 2019 - 2023 +import datetime +import random import time import traceback try: @@ -17,10 +19,14 @@ # Python 2 from Queue import Queue -from idds.common.constants import (Sections, MessageStatus, MessageDestination, MessageType) +from idds.common.constants import (Sections, MessageStatus, MessageDestination, MessageType, + ProcessingStatus, ContentStatus, ContentRelationType) from idds.common.exceptions import AgentPluginError, IDDSException from idds.common.utils import setup_logging, get_logger -from idds.core import messages as core_messages +from idds.core import (messages as core_messages, + catalog as core_catalog, + processings as core_processings, + health as core_health) from idds.agents.common.baseagent import BaseAgent @@ -32,8 +38,9 @@ class Conductor(BaseAgent): Conductor works to notify workload management that the data is available. """ - def __init__(self, num_threads=1, retrieve_bulk_size=1000, threshold_to_release_messages=None, - random_delay=None, delay=60, interval_delay=10, replay_times=3, **kwargs): + def __init__(self, num_threads=1, retrieve_bulk_size=200, threshold_to_release_messages=None, + random_delay=None, delay=300, interval_delay=10, max_retry_delay=3600, + max_normal_retries=10, max_retries=30, replay_times=2, mode='single', **kwargs): super(Conductor, self).__init__(num_threads=num_threads, name='Conductor', **kwargs) self.config_section = Sections.Conductor self.retrieve_bulk_size = int(retrieve_bulk_size) @@ -52,17 +59,51 @@ def __init__(self, num_threads=1, retrieve_bulk_size=1000, threshold_to_release_ if delay is None: delay = 60 self.delay = int(delay) + if not max_retry_delay: + max_retry_delay = 3600 + self.max_retry_delay = int(max_retry_delay) + + self.max_normal_retries = int(max_normal_retries) + self.max_retries = int(max_retries) + if replay_times is None: - replay_times = 3 + replay_times = 2 self.replay_times = int(replay_times) if not interval_delay: interval_delay = 10 self.interval_delay = int(interval_delay) self.logger = get_logger(self.__class__.__name__) + self.mode = mode + self.selected = None + self.selected_conductor = None + def __del__(self): self.stop_notifier() + def is_selected(self): + selected = None + if not self.selected_conductor: + selected = True + else: + selected = self.is_self(self.selected_conductor) + if self.selected is None or self.selected != selected: + self.logger.info("is_selected changed from %s to %s" % (self.selected, selected)) + self.selected = selected + return self.selected + + def monitor_conductor(self): + if self.mode == "single": + self.logger.info("Conductor single mode") + self.selected_conductor = core_health.select_agent(name='Conductor', newer_than=self.heartbeat_delay * 2) + self.logger.debug("Selected conductor: %s" % self.selected_conductor) + + def add_conductor_monitor_task(self): + task = self.create_task(task_func=self.monitor_conductor, task_output_queue=None, + task_args=tuple(), task_kwargs={}, delay_time=self.heartbeat_delay, + priority=1) + self.add_task(task) + def get_messages(self): """ Get messages @@ -76,33 +117,41 @@ def get_messages(self): if messages: self.logger.info("Main thread get %s new messages" % len(messages)) - msg_type = [MessageType.StageInCollection, MessageType.StageInWork, - MessageType.ActiveLearningCollection, MessageType.ActiveLearningWork, - MessageType.HyperParameterOptCollection, MessageType.HyperParameterOptWork, - MessageType.ProcessingCollection, MessageType.ProcessingWork, - MessageType.UnknownCollection, MessageType.UnknownWork] + # msg_type = [MessageType.StageInCollection, MessageType.StageInWork, + # MessageType.ActiveLearningCollection, MessageType.ActiveLearningWork, + # MessageType.HyperParameterOptCollection, MessageType.HyperParameterOptWork, + # MessageType.ProcessingCollection, MessageType.ProcessingWork, + # MessageType.UnknownCollection, MessageType.UnknownWork] + retry_messages = [] - for retry in range(1, self.replay_times + 1): - delay = int(self.delay) * (retry ** 3) - - messages_d = core_messages.retrieve_messages(status=MessageStatus.Delivered, - retries=retry, delay=delay, - bulk_size=self.retrieve_bulk_size, - destination=destination, - msg_type=msg_type) - if messages_d: - self.logger.info("Main thread get %s retries messages" % len(messages_d)) - retry_messages += messages_d + messages_d = core_messages.retrieve_messages(status=MessageStatus.Delivered, + use_poll_period=True, + bulk_size=self.retrieve_bulk_size, + destination=destination) # msg_type=msg_type) + if messages_d: + self.logger.info("Main thread get %s retries messages" % len(messages_d)) + retry_messages += messages_d return messages + retry_messages - def clean_messages(self, msgs): + def clean_messages(self, msgs, confirm=False): # core_messages.delete_messages(msgs) + msg_status = MessageStatus.Delivered + if confirm: + msg_status = MessageStatus.ConfirmDelivered to_updates = [] for msg in msgs: + retries = msg['retries'] + if retries < self.max_normal_retries: + rand_num = random.randint(1, retries + 1) + delay = int(self.delay) * rand_num + delay = min(delay, self.max_retry_delay) + else: + delay = self.max_retry_delay to_updates.append({'msg_id': msg['msg_id'], 'retries': msg['retries'] + 1, - 'status': MessageStatus.Delivered}) + 'poll_period': datetime.timedelta(seconds=delay), + 'status': msg_status}) core_messages.update_messages(to_updates) def start_notifier(self): @@ -132,6 +181,61 @@ def get_output_messages(self): self.logger.error("Failed to get output messages: %s, %s" % (error, traceback.format_exc())) return msgs + def is_message_processed(self, message): + retries = message['retries'] + try: + if retries >= self.max_retries: + self.logger.info("message %s has reached max retries %s" % (message['msg_id'], self.max_retries)) + return True + msg_type = message['msg_type'] + if msg_type not in [MessageType.ProcessingFile]: + if retries < self.replay_times: + return False + else: + return True + else: + msg_content = message['msg_content'] + request_id = message['request_id'] + transform_id = message['transform_id'] + if 'files' not in msg_content or not msg_content['files']: + return True + if 'relation_type' not in msg_content or msg_content['relation_type'] != 'input': + return True + + files = msg_content['files'] + one_file = files[0] + # only check one file in a message + map_id = one_file['map_id'] + contents = core_catalog.get_contents_by_request_transform(request_id=request_id, + transform_id=transform_id, + map_id=map_id) + for content in contents: + if content['content_relation_type'] == ContentRelationType.Output: + if (content['status'] == ContentStatus.Missing): + workload_id = msg_content['workload_id'] + processings = core_processings.get_processings_by_transform_id(transform_id=transform_id) + find_processing = None + if processings: + for processing in processings: + if processing['workload_id'] == workload_id: + find_processing = processing + if find_processing and find_processing['status'] in [ProcessingStatus.Finished, ProcessingStatus.Failed, + ProcessingStatus.Lost, ProcessingStatus.SubFinished, + ProcessingStatus.Cancelled, ProcessingStatus.Expired, + ProcessingStatus.Suspended, ProcessingStatus.Broken]: + return True + else: + return False + if (content['status'] != ContentStatus.New): + return True + except Exception as ex: + self.logger.error(ex) + self.logger.error(traceback.format_exc()) + + if retries < self.replay_times: + return False + return False + def run(self): """ Main run function. @@ -143,6 +247,10 @@ def run(self): self.add_default_tasks() + if self.mode == "single": + self.logger.debug("single mode") + self.add_conductor_monitor_task() + self.start_notifier() # self.add_health_message_task() @@ -153,14 +261,26 @@ def run(self): try: num_contents = 0 - messages = self.get_messages() - if not messages: - time.sleep(self.interval_delay) + if self.is_selected(): + messages = self.get_messages() + if not messages: + time.sleep(self.interval_delay) + else: + message = [] + + to_discard_messages = [] for message in messages: message['destination'] = message['destination'].name num_contents += message['num_contents'] - self.message_queue.put(message) + if self.is_message_processed(message): + self.logger.debug("message (msg_id: %s) is already processed, not resend it again" % message['msg_id']) + to_discard_messages.append(message) + else: + self.message_queue.put(message) + if to_discard_messages: + self.clean_messages(to_discard_messages, confirm=True) + while not self.message_queue.empty(): time.sleep(1) output_messages = self.get_output_messages() diff --git a/main/lib/idds/core/catalog.py b/main/lib/idds/core/catalog.py index 85a14f7f..9b90ef95 100644 --- a/main/lib/idds/core/catalog.py +++ b/main/lib/idds/core/catalog.py @@ -331,7 +331,7 @@ def get_contents(coll_scope=None, coll_name=None, request_id=None, workload_id=N @read_session -def get_contents_by_request_transform(request_id=None, workload_id=None, transform_id=None, status=None, status_updated=False, session=None): +def get_contents_by_request_transform(request_id=None, workload_id=None, transform_id=None, status=None, map_id=None, status_updated=False, session=None): """ Get contents with request id, workload id and transform id. @@ -343,7 +343,7 @@ def get_contents_by_request_transform(request_id=None, workload_id=None, transfo :returns: list of contents """ ret = orm_contents.get_contents_by_request_transform(request_id=request_id, transform_id=transform_id, - workload_id=workload_id, status=status, + workload_id=workload_id, status=status, map_id=map_id, status_updated=status_updated, session=session) return ret @@ -499,7 +499,7 @@ def get_match_contents(coll_scope, coll_name, scope, name, min_id=None, max_id=N @read_session -def get_content_status_statistics(coll_id=None, session=None): +def get_content_status_statistics(coll_id=None, transform_ids=None, session=None): """ Get statistics group by status @@ -508,7 +508,20 @@ def get_content_status_statistics(coll_id=None, session=None): :returns: statistics group by status, as a dict. """ - return orm_contents.get_content_status_statistics(coll_id=coll_id, session=session) + return orm_contents.get_content_status_statistics(coll_id=coll_id, transform_ids=transform_ids, session=session) + + +@read_session +def get_content_status_statistics_by_relation_type(transform_ids, session=None): + """ + Get statistics group by status + + :param coll_id: Collection id. + :param session: The database session in use. + + :returns: statistics group by status, as a dict. + """ + return orm_contents.get_content_status_statistics_by_relation_type(transform_ids, session=session) @transactional_session diff --git a/main/lib/idds/core/messages.py b/main/lib/idds/core/messages.py index 3eb38370..2cb1767c 100644 --- a/main/lib/idds/core/messages.py +++ b/main/lib/idds/core/messages.py @@ -13,6 +13,8 @@ operations related to Messages. """ +import threading + from idds.common.constants import MessageDestination, MessageType, MessageStatus from idds.orm.base.session import read_session, transactional_session from idds.orm import messages as orm_messages @@ -46,7 +48,8 @@ def add_messages(messages, bulk_size=1000, session=None): @read_session def retrieve_messages(bulk_size=None, msg_type=None, status=None, destination=None, source=None, request_id=None, workload_id=None, transform_id=None, - processing_id=None, retries=None, delay=None, session=None): + processing_id=None, use_poll_period=False, retries=None, delay=None, + fetching_id=None, session=None): """ Retrieve up to $bulk messages. @@ -58,12 +61,16 @@ def retrieve_messages(bulk_size=None, msg_type=None, status=None, destination=No :returns messages: List of dictionaries """ + if fetching_id is None: + hb_thread = threading.current_thread() + fetching_id = hb_thread.ident + return orm_messages.retrieve_messages(bulk_size=bulk_size, msg_type=msg_type, status=status, source=source, destination=destination, request_id=request_id, workload_id=workload_id, transform_id=transform_id, processing_id=processing_id, - retries=retries, delay=delay, - session=session) + retries=retries, delay=delay, fetching_id=fetching_id, + use_poll_period=use_poll_period, session=session) @read_session diff --git a/main/lib/idds/core/processings.py b/main/lib/idds/core/processings.py index e16c0896..c3e8df9d 100644 --- a/main/lib/idds/core/processings.py +++ b/main/lib/idds/core/processings.py @@ -369,3 +369,13 @@ def clean_next_poll_at(status, session=None): :param status: status of the processing """ orm_processings.clean_next_poll_at(status=status, session=session) + + +@read_session +def get_num_active_processings(active_status=None, session=None): + return orm_processings.get_num_active_processings(active_status=active_status, session=session) + + +@read_session +def get_active_processings(active_status=None, session=None): + return orm_processings.get_active_processings(active_status=active_status, session=session) diff --git a/main/lib/idds/core/requests.py b/main/lib/idds/core/requests.py index 6821922c..b3990f7c 100644 --- a/main/lib/idds/core/requests.py +++ b/main/lib/idds/core/requests.py @@ -467,3 +467,13 @@ def get_last_request_id(status, older_than=None, session=None): :returns request_id """ return orm_requests.get_last_request_id(status=status, older_than=older_than, session=session) + + +@read_session +def get_num_active_requests(active_status=None, session=None): + return orm_requests.get_num_active_requests(active_status=active_status, session=session) + + +@read_session +def get_active_requests(active_status=None, session=None): + return orm_requests.get_active_requests(active_status=active_status, session=session) diff --git a/main/lib/idds/core/throttlers.py b/main/lib/idds/core/throttlers.py new file mode 100644 index 00000000..ba163217 --- /dev/null +++ b/main/lib/idds/core/throttlers.py @@ -0,0 +1,64 @@ +#!/usr/bin/env python +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# You may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# http://www.apache.org/licenses/LICENSE-2.0OA +# +# Authors: +# - Wen Guan, , 2023 + + +""" +operations related to throttler. +""" + +from idds.common.constants import ThrottlerStatus +from idds.orm.base.session import read_session, transactional_session +from idds.orm import throttlers as orm_throttlers + + +@transactional_session +def add_throttler(site, status=ThrottlerStatus.Active, num_requests=None, num_transforms=None, num_processings=None, new_contents=None, + queue_contents=None, others=None, session=None): + """ + Add a throttler item + + :param site: The site name. + :param session: The database session. + """ + return orm_throttlers.add_throttler(site=site, status=status, num_requests=num_requests, num_transforms=num_transforms, + num_processings=num_processings, new_contents=new_contents, queue_contents=queue_contents, + others=others, session=session) + + +@read_session +def get_throttlers(site=None, status=None, session=None): + """ + Get throttler + + :param site: site name. + :param status: throttler status. + """ + return orm_throttlers.get_throttlers(site=site, status=status, session=session) + + +@transactional_session +def update_throttler(throttler_id=None, site=None, parameters=None, session=None): + """ + Update throttler + + :param throttler_id: throttler id. + :param parameters: parameters in dict. + """ + return orm_throttlers.update_throttler(throttler_id=throttler_id, site=site, parameters=parameters, session=session) + + +@transactional_session +def delete_throttler(throttler_id, session=None): + """ + Delete throttler with the given id. + + :param throttler_id: The throttler id. + """ + return orm_throttlers.delete_throttler(throttler_id=throttler_id, session=session) diff --git a/main/lib/idds/core/transforms.py b/main/lib/idds/core/transforms.py index 638551f8..581103ee 100644 --- a/main/lib/idds/core/transforms.py +++ b/main/lib/idds/core/transforms.py @@ -627,3 +627,13 @@ def get_work_name_to_coll_map(request_id): 'workload_id': coll['workload_id'], 'scope': coll['scope'], 'name': coll['name']}) return work_name_to_coll_map + + +@read_session +def get_num_active_transforms(active_status=None, session=None): + return orm_transforms.get_num_active_transforms(active_status=active_status, session=session) + + +@read_session +def get_active_transforms(active_status=None, session=None): + return orm_transforms.get_active_transforms(active_status=active_status, session=session) diff --git a/main/lib/idds/orm/base/alembic/versions/0204f391c32d_add_poll_period_in_message.py b/main/lib/idds/orm/base/alembic/versions/0204f391c32d_add_poll_period_in_message.py new file mode 100644 index 00000000..44006783 --- /dev/null +++ b/main/lib/idds/orm/base/alembic/versions/0204f391c32d_add_poll_period_in_message.py @@ -0,0 +1,42 @@ +#!/usr/bin/env python +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# You may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# http://www.apache.org/licenses/LICENSE-2.0OA +# +# Authors: +# - Wen Guan, , 2023 + +"""add poll period in message + +Revision ID: 0204f391c32d +Revises: 53d0af715dab +Create Date: 2023-05-21 19:20:00.448768+00:00 + +""" + +import datetime + +from alembic import op +from alembic import context +import sqlalchemy as sa + + +# revision identifiers, used by Alembic. +revision = '0204f391c32d' +down_revision = '53d0af715dab' +branch_labels = None +depends_on = None + + +def upgrade() -> None: + if context.get_context().dialect.name in ['oracle', 'mysql', 'postgresql']: + schema = context.get_context().version_table_schema if context.get_context().version_table_schema else '' + op.add_column('messages', sa.Column('poll_period', sa.Interval, default=datetime.timedelta(seconds=300), nullable=False), schema=schema) + + +def downgrade() -> None: + if context.get_context().dialect.name in ['oracle', 'mysql', 'postgresql']: + schema = context.get_context().version_table_schema if context.get_context().version_table_schema else '' + op.drop_column('messages', 'poll_period', schema=schema) diff --git a/main/lib/idds/orm/base/alembic/versions/53d0af715dab_add_site_throttler.py b/main/lib/idds/orm/base/alembic/versions/53d0af715dab_add_site_throttler.py new file mode 100644 index 00000000..d59c42e8 --- /dev/null +++ b/main/lib/idds/orm/base/alembic/versions/53d0af715dab_add_site_throttler.py @@ -0,0 +1,85 @@ +#!/usr/bin/env python +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# You may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# http://www.apache.org/licenses/LICENSE-2.0OA +# +# Authors: +# - Wen Guan, , 2023 + +"""add site throttler + +Revision ID: 53d0af715dab +Revises: 6ca0e5e466eb +Create Date: 2023-05-18 10:02:46.858647+00:00 + +""" + +import datetime + +from alembic import op +from alembic import context +import sqlalchemy as sa + + +from idds.common.constants import ThrottlerStatus +from idds.orm.base.types import EnumWithValue +from idds.orm.base.types import JSON + +# revision identifiers, used by Alembic. +revision = '53d0af715dab' +down_revision = '6ca0e5e466eb' +branch_labels = None +depends_on = None + + +def upgrade() -> None: + if context.get_context().dialect.name in ['oracle', 'mysql', 'postgresql']: + schema = context.get_context().version_table_schema if context.get_context().version_table_schema else '' + + op.add_column('requests', sa.Column('site', sa.String(50)), schema=schema) + op.create_index("REQUESTS_STATUS_SITE", "requests", ['status', 'site', 'request_id']) + + op.add_column('transforms', sa.Column('site', sa.String(50)), schema=schema) + op.create_index("TRANSFORMS_STATUS_SITE", "transforms", ['status', 'site', 'request_id', 'transform_id']) + + op.add_column('processings', sa.Column('site', sa.String(50)), schema=schema) + op.create_index("PROCESSINGS_STATUS_SITE", "processings", ['status', 'site', 'request_id', 'transform_id', 'processing_id']) + + op.add_column('messages', sa.Column('fetching_id', sa.Integer()), schema=schema) + + op.create_table('throttlers', + sa.Column('throttler_id', sa.BigInteger(), sa.Sequence('THROTTLER_ID_SEQ', schema=schema)), + sa.Column('site', sa.String(50), nullable=False), + sa.Column('status', EnumWithValue(ThrottlerStatus), nullable=False), + sa.Column('num_requests', sa.Integer()), + sa.Column('num_transforms', sa.Integer()), + sa.Column('num_processings', sa.Integer()), + sa.Column('new_contents', sa.Integer()), + sa.Column('queue_contents', sa.Integer()), + sa.Column("created_at", sa.DateTime, default=datetime.datetime.utcnow, nullable=False), + sa.Column("updated_at", sa.DateTime, default=datetime.datetime.utcnow, onupdate=datetime.datetime.utcnow, nullable=False), + sa.Column('others', JSON())) + op.create_primary_key('THROTTLER_PK', 'throttlers', ['throttler_id']) + op.create_unique_constraint('THROTTLER_SITE_UQ', 'throttlers', ['site']) + + +def downgrade() -> None: + if context.get_context().dialect.name in ['oracle', 'mysql', 'postgresql']: + schema = context.get_context().version_table_schema if context.get_context().version_table_schema else '' + + op.drop_column('requests', 'site', schema=schema) + op.drop_index("REQUESTS_STATUS_SITE", "requests") + + op.drop_column('transforms', 'site', schema=schema) + op.drop_index("TRANSFORMS_STATUS_SITE", "requests") + + op.drop_column('processings', 'site', schema=schema) + op.drop_index("PROCESSINGS_STATUS_SITE", "requests") + + op.drop_column('messages', 'fetching_id', schema=schema) + + op.drop_constraint('THROTTLER_SITE_UQ') + op.drop_constraint('THROTTLER_PK') + op.drop_table('throttlers') diff --git a/main/lib/idds/orm/base/models.py b/main/lib/idds/orm/base/models.py index ed0c2539..0e8d01e0 100644 --- a/main/lib/idds/orm/base/models.py +++ b/main/lib/idds/orm/base/models.py @@ -30,7 +30,7 @@ CollectionRelationType, ContentType, ContentRelationType, ContentStatus, ContentFetchStatus, ContentLocking, GranularityType, MessageType, MessageStatus, MessageLocking, - MessageSource, MessageDestination, + MessageSource, MessageDestination, ThrottlerStatus, CommandType, CommandStatus, CommandLocking, CommandLocation, HealthStatus) from idds.common.event import (EventType, EventStatus) @@ -158,6 +158,7 @@ class Request(BASE, ModelBase): max_update_retries = Column(Integer(), default=0) new_poll_period = Column(Interval(), default=datetime.timedelta(seconds=1)) update_poll_period = Column(Interval(), default=datetime.timedelta(seconds=10)) + site = Column(String(50)) errors = Column(JSONString(1024)) _request_metadata = Column('request_metadata', JSON()) _processing_metadata = Column('processing_metadata', JSON()) @@ -242,6 +243,7 @@ def update(self, values, flush=True, session=None): CheckConstraint('status IS NOT NULL', name='REQUESTS_STATUS_ID_NN'), # UniqueConstraint('name', 'scope', 'requester', 'request_type', 'transform_tag', 'workload_id', name='REQUESTS_NAME_SCOPE_UQ '), Index('REQUESTS_SCOPE_NAME_IDX', 'name', 'scope', 'workload_id'), + Index('REQUESTS_STATUS_SITE', 'status', 'site', 'request_id'), Index('REQUESTS_STATUS_PRIO_IDX', 'status', 'priority', 'request_id', 'locking', 'updated_at', 'next_poll_at', 'created_at'), Index('REQUESTS_STATUS_POLL_IDX', 'status', 'priority', 'locking', 'updated_at', 'new_poll_period', 'update_poll_period', 'created_at', 'request_id')) @@ -306,6 +308,7 @@ class Transform(BASE, ModelBase): max_update_retries = Column(Integer(), default=0) new_poll_period = Column(Interval(), default=datetime.timedelta(seconds=1)) update_poll_period = Column(Interval(), default=datetime.timedelta(seconds=10)) + site = Column(String(50)) name = Column(String(NAME_LENGTH)) errors = Column(JSONString(1024)) _transform_metadata = Column('transform_metadata', JSON()) @@ -369,6 +372,7 @@ def update(self, values, flush=True, session=None): Index('TRANSFORMS_TYPE_TAG_IDX', 'transform_type', 'transform_tag', 'transform_id'), Index('TRANSFORMS_STATUS_UPDATED_AT_IDX', 'status', 'locking', 'updated_at', 'next_poll_at', 'created_at'), Index('TRANSFORMS_REQ_IDX', 'request_id', 'transform_id'), + Index('TRANSFORMS_STATUS_SITE', 'status', 'site', 'request_id', 'transform_id'), Index('TRANSFORMS_STATUS_POLL_IDX', 'status', 'locking', 'updated_at', 'new_poll_period', 'update_poll_period', 'created_at', 'transform_id')) @@ -411,6 +415,7 @@ class Processing(BASE, ModelBase): max_update_retries = Column(Integer(), default=0) new_poll_period = Column(Interval(), default=datetime.timedelta(seconds=1)) update_poll_period = Column(Interval(), default=datetime.timedelta(seconds=10)) + site = Column(String(50)) errors = Column(JSONString(1024)) _processing_metadata = Column('processing_metadata', JSON()) _running_metadata = Column('running_metadata', JSON()) @@ -473,6 +478,7 @@ def update(self, values, flush=True, session=None): ForeignKeyConstraint(['transform_id'], ['transforms.transform_id'], name='PROCESSINGS_TRANSFORM_ID_FK'), CheckConstraint('status IS NOT NULL', name='PROCESSINGS_STATUS_ID_NN'), CheckConstraint('transform_id IS NOT NULL', name='PROCESSINGS_TRANSFORM_ID_NN'), + Index('PROCESSINGS_STATUS_SITE', 'status', 'site', 'request_id', 'transform_id', 'processing_id'), Index('PROCESSINGS_STATUS_UPDATED_IDX', 'status', 'locking', 'updated_at', 'next_poll_at', 'created_at'), Index('PROCESSINGS_STATUS_POLL_IDX', 'status', 'processing_id', 'locking', 'updated_at', 'new_poll_period', 'update_poll_period', 'created_at')) @@ -711,8 +717,10 @@ class Message(BASE, ModelBase): processing_id = Column(Integer()) num_contents = Column(Integer()) retries = Column(Integer(), default=0) + fetching_id = Column(Integer()) created_at = Column("created_at", DateTime, default=datetime.datetime.utcnow, nullable=False) updated_at = Column("updated_at", DateTime, default=datetime.datetime.utcnow, onupdate=datetime.datetime.utcnow, nullable=False) + poll_period = Column(Interval(), default=datetime.timedelta(seconds=300), nullable=False) msg_content = Column(JSON()) __table_args__ = (PrimaryKeyConstraint('msg_id', name='MESSAGES_PK'), @@ -883,6 +891,25 @@ class EventArchive(BASE, ModelBase): __table_args__ = (PrimaryKeyConstraint('event_id', name='EVENTS_AR_PK'),) +class Throttler(BASE, ModelBase): + """Represents the operations events""" + __tablename__ = 'throttlers' + throttler_id = Column(BigInteger(), primary_key=True) + site = Column(String(50), nullable=False) + status = Column(EnumWithValue(ThrottlerStatus), nullable=False) + num_requests = Column(Integer()) + num_transforms = Column(Integer()) + num_processings = Column(Integer()) + new_contents = Column(Integer()) + queue_contents = Column(Integer()) + created_at = Column("created_at", DateTime, default=datetime.datetime.utcnow, nullable=False) + updated_at = Column("updated_at", DateTime, default=datetime.datetime.utcnow, onupdate=datetime.datetime.utcnow, nullable=False) + others = Column(JSON()) + + __table_args__ = (PrimaryKeyConstraint('throttler_id', name='THROTTLER_PK'), + UniqueConstraint('site', name='THROTTLER_SITE_UQ')) + + def create_trigger(): func = DDL(""" SET search_path TO %s; @@ -1019,7 +1046,7 @@ def register_models(engine): """ # models = (Request, Workprogress, Transform, Workprogress2transform, Processing, Collection, Content, Health, Message) - models = (Request, Transform, Processing, Collection, Content, Content_update, Content_ext, Health, Message, Command) + models = (Request, Transform, Processing, Collection, Content, Content_update, Content_ext, Health, Message, Command, Throttler) create_proc_to_update_contents() @@ -1034,7 +1061,7 @@ def unregister_models(engine): """ # models = (Request, Workprogress, Transform, Workprogress2transform, Processing, Collection, Content, Health, Message) - models = (Request, Transform, Processing, Collection, Content, Content_update, Content_ext, Health, Message, Command) + models = (Request, Transform, Processing, Collection, Content, Content_update, Content_ext, Health, Message, Command, Throttler) drop_proc_to_update_contents() diff --git a/main/lib/idds/orm/contents.py b/main/lib/idds/orm/contents.py index 81ddd3f7..e8244df4 100644 --- a/main/lib/idds/orm/contents.py +++ b/main/lib/idds/orm/contents.py @@ -335,7 +335,7 @@ def get_contents(scope=None, name=None, transform_id=None, coll_id=None, status= @read_session -def get_contents_by_request_transform(request_id=None, transform_id=None, workload_id=None, status=None, status_updated=False, session=None): +def get_contents_by_request_transform(request_id=None, transform_id=None, workload_id=None, status=None, map_id=None, status_updated=False, session=None): """ Get content or raise a NoObject exception. @@ -364,6 +364,8 @@ def get_contents_by_request_transform(request_id=None, transform_id=None, worklo query = query.filter(models.Content.workload_id == workload_id) if status is not None: query = query.filter(models.Content.substatus.in_(status)) + if map_id: + query = query.filter(models.Content.map_id == map_id) if status_updated: query = query.filter(models.Content.status != models.Content.substatus) query = query.order_by(asc(models.Content.request_id), asc(models.Content.transform_id), asc(models.Content.map_id)) @@ -382,7 +384,7 @@ def get_contents_by_request_transform(request_id=None, transform_id=None, worklo @read_session -def get_content_status_statistics(coll_id=None, session=None): +def get_content_status_statistics(coll_id=None, transform_ids=None, session=None): """ Get statistics group by status @@ -392,9 +394,17 @@ def get_content_status_statistics(coll_id=None, session=None): :returns: statistics group by status, as a dict. """ try: + if transform_ids and not isinstance(transform_ids, (list, tuple)): + transform_ids = [transform_ids] + if transform_ids and len(transform_ids) == 1: + transform_ids = [transform_ids[0], transform_ids[0]] + query = session.query(models.Content.status, func.count(models.Content.content_id)) if coll_id: query = query.filter(models.Content.coll_id == coll_id) + if transform_ids: + query = query.filter(models.Content.transform_id.in_(transform_ids)) + query = query.group_by(models.Content.status) tmp = query.all() rets = {} @@ -406,6 +416,33 @@ def get_content_status_statistics(coll_id=None, session=None): raise error +@read_session +def get_content_status_statistics_by_relation_type(transform_ids=None, session=None): + """ + Get statistics group by status + + :param coll_id: Collection id. + :param session: The database session in use. + + :returns: statistics group by status, as a dict. + """ + try: + if transform_ids and not isinstance(transform_ids, (list, tuple)): + transform_ids = [transform_ids] + if transform_ids and len(transform_ids) == 1: + transform_ids = [transform_ids[0], transform_ids[0]] + + query = session.query(models.Content.status, models.Content.content_relation_type, models.Content.transform_id, func.count(models.Content.content_id)) + if transform_ids: + query = query.filter(models.Content.transform_id.in_(transform_ids)) + + query = query.group_by(models.Content.status, models.Content.content_relation_type, models.Content.transform_id) + tmp = query.all() + return tmp + except Exception as error: + raise error + + @transactional_session def update_content(content_id, parameters, session=None): """ diff --git a/main/lib/idds/orm/messages.py b/main/lib/idds/orm/messages.py index f2860cb1..3296afd0 100644 --- a/main/lib/idds/orm/messages.py +++ b/main/lib/idds/orm/messages.py @@ -17,7 +17,7 @@ import re import copy -from sqlalchemy import or_ +from sqlalchemy import or_, asc from sqlalchemy.exc import DatabaseError, IntegrityError from idds.common import exceptions @@ -117,8 +117,8 @@ def update_messages(messages, bulk_size=1000, session=None): @read_session def retrieve_messages(bulk_size=1000, msg_type=None, status=None, source=None, destination=None, request_id=None, workload_id=None, - transform_id=None, processing_id=None, - retries=None, delay=None, session=None): + transform_id=None, processing_id=None, fetching_id=None, + use_poll_period=False, retries=None, delay=None, session=None): """ Retrieve up to $bulk messages. @@ -165,6 +165,10 @@ def retrieve_messages(bulk_size=1000, msg_type=None, status=None, source=None, query = query.filter_by(retries=retries) if delay: query = query.filter(models.Message.updated_at < datetime.datetime.utcnow() - datetime.timedelta(seconds=delay)) + elif use_poll_period: + query = query.filter(models.Message.updated_at + models.Message.poll_period <= datetime.datetime.utcnow()) + + query = query.order_by(asc(models.Message.updated_at)) if bulk_size: query = query.order_by(models.Message.created_at).limit(bulk_size) @@ -173,7 +177,8 @@ def retrieve_messages(bulk_size=1000, msg_type=None, status=None, source=None, tmp = query.all() if tmp: for t in tmp: - messages.append(t.to_dict()) + message = t.to_dict() + messages.append(message) return messages except IntegrityError as e: raise exceptions.DatabaseException(e.args) diff --git a/main/lib/idds/orm/processings.py b/main/lib/idds/orm/processings.py index 9175e461..4223027f 100644 --- a/main/lib/idds/orm/processings.py +++ b/main/lib/idds/orm/processings.py @@ -16,6 +16,7 @@ import datetime import sqlalchemy +from sqlalchemy import func from sqlalchemy.exc import DatabaseError, IntegrityError from sqlalchemy.sql.expression import asc @@ -424,3 +425,42 @@ def clean_next_poll_at(status, session=None): params = {'next_poll_at': datetime.datetime.utcnow()} session.query(models.Processing).filter(models.Processing.status.in_(status))\ .update(params, synchronize_session=False) + + +@read_session +def get_num_active_processings(active_status=None, session=None): + if active_status and not isinstance(active_status, (list, tuple)): + active_status = [active_status] + if active_status and len(active_status) == 1: + active_status = [active_status[0], active_status[0]] + + try: + query = session.query(models.Processing.status, models.Processing.site, func.count(models.Processing.processing_id)) + if active_status: + query = query.filter(models.Processing.status.in_(active_status)) + query = query.group_by(models.Processing.status, models.Processing.site) + tmp = query.all() + return tmp + except Exception as error: + raise error + + +@read_session +def get_active_processings(active_status=None, session=None): + if active_status and not isinstance(active_status, (list, tuple)): + active_status = [active_status] + if active_status and len(active_status) == 1: + active_status = [active_status[0], active_status[0]] + + try: + query = session.query(models.Processing.request_id, + models.Processing.transform_id, + models.Processing.processing_id, + models.Processing.site, + models.Processing.status) + if active_status: + query = query.filter(models.Processing.status.in_(active_status)) + tmp = query.all() + return tmp + except Exception as error: + raise error diff --git a/main/lib/idds/orm/requests.py b/main/lib/idds/orm/requests.py index 3c6f7ee2..5f97a9dd 100644 --- a/main/lib/idds/orm/requests.py +++ b/main/lib/idds/orm/requests.py @@ -17,7 +17,7 @@ import random import sqlalchemy -from sqlalchemy import and_ +from sqlalchemy import and_, func from sqlalchemy.exc import DatabaseError, IntegrityError from sqlalchemy.sql.expression import asc, desc @@ -829,8 +829,10 @@ def get_requests_by_status_type(status, request_type=None, time_period=None, req if locking_for_update: query = query.with_for_update(skip_locked=True) else: - query = query.order_by(asc(models.Request.updated_at))\ - .order_by(desc(models.Request.priority)) + # query = query.order_by(asc(models.Request.updated_at))\ + # .order_by(desc(models.Request.priority)) + query = query.order_by(desc(models.Request.priority))\ + .order_by(asc(models.Request.updated_at)) if bulk_size: query = query.limit(bulk_size) @@ -981,3 +983,38 @@ def get_last_request_id(status, older_than=None, session=None): if ret: return ret[0] return ret + + +@read_session +def get_num_active_requests(active_status=None, session=None): + if active_status and not isinstance(active_status, (list, tuple)): + active_status = [active_status] + if active_status and len(active_status) == 1: + active_status = [active_status[0], active_status[0]] + + try: + query = session.query(models.Request.status, models.Request.site, func.count(models.Request.request_id)) + if active_status: + query = query.filter(models.Request.status.in_(active_status)) + query = query.group_by(models.Request.status, models.Request.site) + tmp = query.all() + return tmp + except Exception as error: + raise error + + +@read_session +def get_active_requests(active_status=None, session=None): + if active_status and not isinstance(active_status, (list, tuple)): + active_status = [active_status] + if active_status and len(active_status) == 1: + active_status = [active_status[0], active_status[0]] + + try: + query = session.query(models.Request.request_id, models.Request.status, models.Request.site) + if active_status: + query = query.filter(models.Request.status.in_(active_status)) + tmp = query.all() + return tmp + except Exception as error: + raise error diff --git a/main/lib/idds/orm/throttlers.py b/main/lib/idds/orm/throttlers.py new file mode 100644 index 00000000..212c3461 --- /dev/null +++ b/main/lib/idds/orm/throttlers.py @@ -0,0 +1,151 @@ +#!/usr/bin/env python +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# You may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# http://www.apache.org/licenses/LICENSE-2.0OA +# +# Authors: +# - Wen Guan, , 2023 + + +""" +operations related to throttler. +""" + +import re + +from sqlalchemy.exc import DatabaseError, IntegrityError + +from idds.common import exceptions +from idds.common.constants import ThrottlerStatus +from idds.orm.base import models +from idds.orm.base.session import read_session, transactional_session + + +@transactional_session +def add_throttler(site, status=ThrottlerStatus.Active, num_requests=None, num_transforms=None, num_processings=None, new_contents=None, + queue_contents=None, others=None, session=None): + """ + Add a throttler item + + :param site: The site name. + :param session: The database session. + """ + + try: + old_throttlers = get_throttlers(site=site, session=session) + + if old_throttlers: + old_throttler = old_throttlers[0] + parameters = {} + if status is not None: + parameters['status'] = status + if num_requests is not None: + parameters['num_requests'] = num_requests + if num_transforms is not None: + parameters['num_transforms'] = num_transforms + if num_processings is not None: + parameters['num_processings'] = num_processings + if new_contents is not None: + parameters['new_contents'] = new_contents + if queue_contents is not None: + parameters['queue_contents'] = queue_contents + if others is not None: + parameters['others'] = others + update_throttler(throttler_id=old_throttler['throttler_id'], parameters=parameters, session=session) + return old_throttler['throttler_id'] + else: + throttler = models.Throttler(site=site, + status=status, + num_requests=num_requests, + num_transforms=num_transforms, + num_processings=num_processings, + new_contents=new_contents, + queue_contents=queue_contents, + others=others) + throttler.save(session=session) + return throttler.throttler_id + except TypeError as e: + raise exceptions.DatabaseException('Invalid JSON for content: %s' % str(e)) + except DatabaseError as e: + if re.match('.*ORA-12899.*', e.args[0]) \ + or re.match('.*1406.*', e.args[0]): + raise exceptions.DatabaseException('Could not persist throttler, content too large: %s' % str(e)) + else: + raise exceptions.DatabaseException('Could not persist throttler: %s' % str(e)) + return None + + +@read_session +def get_throttlers(site=None, status=None, session=None): + """ + Get throttler + + :param site: site name. + :param status: throttler status. + """ + try: + if status and not isinstance(status, (list, tuple)): + status = [status] + if status and len(status) == 1: + status = [status[0], status[0]] + + query = session.query(models.Throttler) + if site: + query = query.filter_by(site=site) + if status: + query = query.filter(models.Throttler.status.in_(status)) + + tmp = query.all() + throttlers = [] + if tmp: + for t in tmp: + throttlers.append(t.to_dict()) + return throttlers + except DatabaseError as e: + if re.match('.*ORA-12899.*', e.args[0]) \ + or re.match('.*1406.*', e.args[0]): + raise exceptions.DatabaseException('Could not persist throttler, content too large: %s' % str(e)) + else: + raise exceptions.DatabaseException('Could not persist throttler: %s' % str(e)) + return None + + +@transactional_session +def update_throttler(throttler_id=None, site=None, parameters=None, session=None): + """ + Update throttler + + :param throttler_id: throttler id. + :param parameters: parameters in dict. + """ + try: + query = session.query(models.Throttler) + if throttler_id is None and site is None: + raise exceptions.DatabaseException("Could not update database with both throttler_id and site None") + + if throttler_id: + query = query.filter_by(throttler_id=throttler_id) + if site: + query = query.filter_by(site=site) + query.update(parameters, synchronize_session=False) + except DatabaseError as e: + if re.match('.*ORA-12899.*', e.args[0]) \ + or re.match('.*1406.*', e.args[0]): + raise exceptions.DatabaseException('Could not persist throttler, content too large: %s' % str(e)) + else: + raise exceptions.DatabaseException('Could not persist throttler: %s' % str(e)) + + +@transactional_session +def delete_throttler(throttler_id, session=None): + """ + Delete throttler with the given id. + + :param throttler_id: The throttler id. + """ + try: + session.query(models.Throttler).filter_by(throttler_id=throttler_id).delete() + except IntegrityError as e: + raise exceptions.DatabaseException(e.args) diff --git a/main/lib/idds/orm/transforms.py b/main/lib/idds/orm/transforms.py index c011c21f..e3ba3639 100644 --- a/main/lib/idds/orm/transforms.py +++ b/main/lib/idds/orm/transforms.py @@ -16,7 +16,7 @@ import datetime import sqlalchemy -from sqlalchemy import and_ +from sqlalchemy import and_, func from sqlalchemy.exc import DatabaseError, IntegrityError from sqlalchemy.sql.expression import asc, desc @@ -509,3 +509,41 @@ def clean_next_poll_at(status, session=None): params = {'next_poll_at': datetime.datetime.utcnow()} session.query(models.Transform).filter(models.Transform.status.in_(status))\ .update(params, synchronize_session=False) + + +@read_session +def get_num_active_transforms(active_status=None, session=None): + if active_status and not isinstance(active_status, (list, tuple)): + active_status = [active_status] + if active_status and len(active_status) == 1: + active_status = [active_status[0], active_status[0]] + + try: + query = session.query(models.Transform.status, models.Transform.site, func.count(models.Transform.transform_id)) + if active_status: + query = query.filter(models.Transform.status.in_(active_status)) + query = query.group_by(models.Transform.status, models.Transform.site) + tmp = query.all() + return tmp + except Exception as error: + raise error + + +@read_session +def get_active_transforms(active_status=None, session=None): + if active_status and not isinstance(active_status, (list, tuple)): + active_status = [active_status] + if active_status and len(active_status) == 1: + active_status = [active_status[0], active_status[0]] + + try: + query = session.query(models.Transform.request_id, + models.Transform.transform_id, + models.Transform.site, + models.Transform.status) + if active_status: + query = query.filter(models.Transform.status.in_(active_status)) + tmp = query.all() + return tmp + except Exception as error: + raise error diff --git a/main/lib/idds/tests/panda_test.py b/main/lib/idds/tests/panda_test.py index 5fc4e6c5..2e8a5414 100644 --- a/main/lib/idds/tests/panda_test.py +++ b/main/lib/idds/tests/panda_test.py @@ -6,8 +6,8 @@ os.environ['PANDA_URL'] = 'http://pandaserver-doma.cern.ch:25080/server/panda' os.environ['PANDA_URL_SSL'] = 'https://pandaserver-doma.cern.ch:25443/server/panda' -os.environ['PANDA_URL'] = 'http://rubin-panda-server-dev.slac.stanford.edu:80/server/panda' -os.environ['PANDA_URL_SSL'] = 'https://rubin-panda-server-dev.slac.stanford.edu:8443/server/panda' +# os.environ['PANDA_URL'] = 'http://rubin-panda-server-dev.slac.stanford.edu:80/server/panda' +# os.environ['PANDA_URL_SSL'] = 'https://rubin-panda-server-dev.slac.stanford.edu:8443/server/panda' from pandaclient import Client # noqa E402 @@ -67,22 +67,29 @@ # sys.exit(0) -""" -jediTaskID = 998 + +jediTaskID = 152096 +# jediTaskID = 154357 +print(jediTaskID) + +ret = Client.getTaskStatus(jediTaskID) +print(ret) + ret = Client.getPandaIDsWithTaskID(jediTaskID, verbose=False) -# print(ret) +print(ret) jobids = ret[1] -# print(jobids) +print(jobids) ret = Client.getJobStatus(ids=jobids, verbose=False) print(ret) ret = Client.getFullJobStatus(ids=jobids, verbose=False) -# print(ret) +print(ret) ret = Client.getJediTaskDetails({'jediTaskID': jediTaskID}, True, True, verbose=False) print(ret) -""" + +sys.exit(0) task_ids = [] # task_ids = [1565, 1566, 1567, 1568, 1570, 1572, 1575, 1576, 1579, 1580, 1581, 1582, 1584, 1585, 1586, 1587, 1588, 1589, 1590, 1591, 1592, 1593, 1597, 1598, 1599, 1601, 1602, 1603, 1604, 1607, 1608, 1609, 1610, 1611, 1612, 1613, 1617] diff --git a/main/lib/idds/tests/set_throttlers.py b/main/lib/idds/tests/set_throttlers.py new file mode 100644 index 00000000..0f260056 --- /dev/null +++ b/main/lib/idds/tests/set_throttlers.py @@ -0,0 +1,8 @@ +from idds.common.constants import ThrottlerStatus +from idds.core import throttlers as core_throttlers + +throttler = {'site': 'Default', + 'status': ThrottlerStatus.Active, + 'new_contents': 100000, + 'queue_contents': 50000} +core_throttlers.add_throttler(**throttler) diff --git a/main/lib/idds/tests/test_get_request_info.py b/main/lib/idds/tests/test_get_request_info.py new file mode 100644 index 00000000..e2124f69 --- /dev/null +++ b/main/lib/idds/tests/test_get_request_info.py @@ -0,0 +1,36 @@ +from idds.common.constants import RequestStatus # noqa F401 +from idds.common.utils import json_loads # noqa F401 + +from lsst.ctrl.bps import BPS_DEFAULTS, BPS_SEARCH_ORDER, DEFAULT_MEM_FMT, DEFAULT_MEM_UNIT, BpsConfig # noqa F401 +from lsst.ctrl.bps.panda.utils import ( # noqa F401 + add_final_idds_work, # noqa F401 + add_idds_work, # noqa F401 + copy_files_for_distribution, # noqa F401 + get_idds_client, # noqa F401 + get_idds_result, # noqa F401 +) + +default_config = BpsConfig(BPS_DEFAULTS) + +idds_client = get_idds_client(default_config) + +wms_workflow_id = 4112 +# only check the request status +ret = idds_client.get_requests(request_id=wms_workflow_id) +print(ret) +# note: good to check the ret at first to make sure it's successful (see ctrl_bps_panda) +print(ret[1][1][0]['status']) + +# to show the status of different tasks +ret = idds_client.get_requests(request_id=wms_workflow_id, with_detail=True) +print(ret) + +workloads = [] +for workload in ret[1][1]: + workloads.append(workload['transform_workload_id']) +print(workloads) + +# show one workload file information +workload_1 = workloads[0] +ret = idds_client.get_contents_output_ext(request_id=wms_workflow_id, workload_id=workload_1) +print(ret) diff --git a/main/lib/idds/tests/test_migrate_requests.py b/main/lib/idds/tests/test_migrate_requests.py index a8c70ad2..3f153670 100644 --- a/main/lib/idds/tests/test_migrate_requests.py +++ b/main/lib/idds/tests/test_migrate_requests.py @@ -40,8 +40,8 @@ def migrate(): cern_k8s_dev_host = 'https://panda-idds-dev.cern.ch/idds' # noqa F841 # cm1 = ClientManager(host=atlas_host) - # cm1 = ClientManager(host=doma_host) - cm1 = ClientManager(host=slac_k8s_dev_host) + cm1 = ClientManager(host=doma_host) + # cm1 = ClientManager(host=slac_k8s_dev_host) # reqs = cm1.get_requests(request_id=290) # old_request_id = 298163 # old_request_id = 350723 @@ -60,7 +60,7 @@ def migrate(): old_request_ids = [3628] - old_request_ids = [21] + # old_request_ids = [21] # old_request_id = 1 # for old_request_id in [152]: @@ -69,10 +69,10 @@ def migrate(): for old_request_id in old_request_ids: # noqa E115 # doma 183 reqs = cm1.get_requests(request_id=old_request_id, with_metadata=True) - # cm2 = ClientManager(host=dev_host) + cm2 = ClientManager(host=dev_host) # cm2 = ClientManager(host=doma_host) # cm2 = ClientManager(host=atlas_host) - cm2 = ClientManager(host=slac_k8s_dev_host) + # cm2 = ClientManager(host=slac_k8s_dev_host) # cm2 = ClientManager(host=cern_k8s_dev_host) # print(reqs) diff --git a/main/tools/alembic/new_version.sh b/main/tools/alembic/new_version.sh new file mode 100644 index 00000000..d5139510 --- /dev/null +++ b/main/tools/alembic/new_version.sh @@ -0,0 +1 @@ +alembic revision -m "add fetch status in contents_update" diff --git a/main/tools/env/install_idds_full.sh b/main/tools/env/install_idds_full.sh index 05b38c64..cec5ea3f 100644 --- a/main/tools/env/install_idds_full.sh +++ b/main/tools/env/install_idds_full.sh @@ -26,7 +26,7 @@ conda env create --prefix=/opt/idds -f main/tools/env/environment.yml conda activate /opt/idds conda install -c conda-forge python-gfal2 -pip install rucio-clients-atlas rucio-clients panda-client +pip install rucio-clients-atlas rucio-clients panda-client-light # root ca.crt to /opt/idds/etc/ca.crt pip install requests SQLAlchemy urllib3 retrying mod_wsgi flask futures stomp.py cx-Oracle unittest2 pep8 flake8 pytest nose sphinx recommonmark sphinx-rtd-theme nevergrad diff --git a/main/tools/panda/increase_memory b/main/tools/panda/increase_memory new file mode 100644 index 00000000..fd09c225 --- /dev/null +++ b/main/tools/panda/increase_memory @@ -0,0 +1,8 @@ + +# add rule to increase memory +insert into retryactions(retryaction_id, retry_action, active, retry_description) values (2, 'increase_memory', 'Y', 'Job ran out of memory. Increase memory setting for next retry.'); +insert into retryerrors(retryerror_id, errorsource, errorcode, active, retryaction, description) values(1, 'transExitCode', 137, 'Y', 2, 'increase memory'); +insert into retryerrors(retryerror_id, errorsource, errorcode, active, retryaction, description) values(1, 'transExitCode', 139, 'Y', 2, 'increase memory'); +insert into retryerrors(retryerror_id, errorsource, errorcode, errordiag, active, retryaction, description) values(3, 'pilotErrorCode', 1305, '.*Unable to allocate.*', 'Y', 2, 'increase memory'); + + diff --git a/monitor/data/conf.js b/monitor/data/conf.js index 89f5608d..99b0cb99 100644 --- a/monitor/data/conf.js +++ b/monitor/data/conf.js @@ -1,9 +1,9 @@ var appConfig = { - 'iddsAPI_request': "https://lxplus810.cern.ch:443/idds/monitor_request/null/null", - 'iddsAPI_transform': "https://lxplus810.cern.ch:443/idds/monitor_transform/null/null", - 'iddsAPI_processing': "https://lxplus810.cern.ch:443/idds/monitor_processing/null/null", - 'iddsAPI_request_detail': "https://lxplus810.cern.ch:443/idds/monitor/null/null/true/false/false", - 'iddsAPI_transform_detail': "https://lxplus810.cern.ch:443/idds/monitor/null/null/false/true/false", - 'iddsAPI_processing_detail': "https://lxplus810.cern.ch:443/idds/monitor/null/null/false/false/true" + 'iddsAPI_request': "https://lxplus803.cern.ch:443/idds/monitor_request/null/null", + 'iddsAPI_transform': "https://lxplus803.cern.ch:443/idds/monitor_transform/null/null", + 'iddsAPI_processing': "https://lxplus803.cern.ch:443/idds/monitor_processing/null/null", + 'iddsAPI_request_detail': "https://lxplus803.cern.ch:443/idds/monitor/null/null/true/false/false", + 'iddsAPI_transform_detail': "https://lxplus803.cern.ch:443/idds/monitor/null/null/false/true/false", + 'iddsAPI_processing_detail': "https://lxplus803.cern.ch:443/idds/monitor/null/null/false/false/true" } diff --git a/requirements.yaml b/requirements.yaml index e15db948..f73545ba 100644 --- a/requirements.yaml +++ b/requirements.yaml @@ -30,3 +30,4 @@ dependencies: - alembic - deepdiff - pyzmq + - anytree diff --git a/start-daemon.sh b/start-daemon.sh index 00beb18d..1837722d 100755 --- a/start-daemon.sh +++ b/start-daemon.sh @@ -132,7 +132,7 @@ else cp /opt/idds/config_default/supervisord_idds.ini /opt/idds/config/idds/supervisord_idds.ini cp /opt/idds/config_default/supervisord_iddsfake.ini /opt/idds/config/idds/supervisord_iddsfake.ini cp /opt/idds/config_default/supervisord_httpd.ini /opt/idds/config/idds/supervisord_httpd.ini - cp /opt/idds/config_default/supervisord_syslog-ng.ini /opt/idds/config/idds/supervisord_syslog-ng.ini + # cp /opt/idds/config_default/supervisord_syslog-ng.ini /opt/idds/config/idds/supervisord_syslog-ng.ini fi if [ -f /etc/grid-security/hostkey.pem ]; then @@ -223,7 +223,7 @@ fi # echo "start syslog-ng" # /usr/sbin/syslog-ng -F --no-caps --persist-file=/var/log/idds/syslog-ng.persist -p /var/log/idds/syslog-ng.pid -tail -f -F /var/log/idds/syslog-ng-stdout.log & -tail -f -F /var/log/idds/syslog-ng-stderr.log & +# tail -f -F /var/log/idds/syslog-ng-stdout.log & +# tail -f -F /var/log/idds/syslog-ng-stderr.log & trap : TERM INT; sleep infinity & wait diff --git a/workflow/tools/env/environment.yml b/workflow/tools/env/environment.yml index a3eb0093..13c48f03 100644 --- a/workflow/tools/env/environment.yml +++ b/workflow/tools/env/environment.yml @@ -6,6 +6,7 @@ dependencies: - unittest2 # unit test tool - pep8 # checks for PEP8 code style compliance - flake8 # Wrapper around PyFlakes&pep8 + - anytree - pytest # python testing tool - nose # nose test tools - - idds-common==0.11.5 \ No newline at end of file + - idds-common==0.11.5