Skip to content

Commit

Permalink
Merge pull request #166 from HSF/dev
Browse files Browse the repository at this point in the history
dev
  • Loading branch information
wguanicedew authored Jun 5, 2023
2 parents 8a22608 + ae9c408 commit 2d057ff
Show file tree
Hide file tree
Showing 39 changed files with 1,226 additions and 137 deletions.
4 changes: 2 additions & 2 deletions Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ RUN source /etc/profile.d/conda.sh; conda activate /opt/idds; python3 -m pip ins

RUN source /etc/profile.d/conda.sh; conda activate /opt/idds; python3 -m pip install --no-cache-dir --upgrade requests SQLAlchemy urllib3 retrying mod_wsgi flask futures stomp.py cx-Oracle unittest2 pep8 flake8 pytest nose sphinx recommonmark sphinx-rtd-theme nevergrad
RUN source /etc/profile.d/conda.sh; conda activate /opt/idds; python3 -m pip install --no-cache-dir --upgrade psycopg2-binary
RUN source /etc/profile.d/conda.sh; conda activate /opt/idds; python3 -m pip install --no-cache-dir --upgrade rucio-clients-atlas rucio-clients panda-client
RUN source /etc/profile.d/conda.sh; conda activate /opt/idds; python3 -m pip install --no-cache-dir --upgrade rucio-clients-atlas rucio-clients panda-client-light


WORKDIR /tmp/src
Expand Down Expand Up @@ -137,7 +137,7 @@ RUN sed -i "s/WSGISocketPrefix\ \/var\/log\/idds\/wsgisocks\/wsgi/WSGISocketPref
RUN ln -fs /opt/idds/config/idds/supervisord_idds.ini /etc/supervisord.d/idds.ini
RUN ln -fs /opt/idds/config/idds/supervisord_iddsfake.ini /etc/supervisord.d/iddsfake.ini
RUN ln -fs /opt/idds/config/idds/supervisord_httpd.ini /etc/supervisord.d/httpd.ini
RUN ln -fs /opt/idds/config/idds/supervisord_syslog-ng.ini /etc/supervisord.d/syslog-ng.ini
# RUN ln -fs /opt/idds/config/idds/supervisord_syslog-ng.ini /etc/supervisord.d/syslog-ng.ini

# for syslog-ng
RUN mv /etc/syslog-ng/syslog-ng.conf /etc/syslog-ng/syslog-ng.conf.back
Expand Down
2 changes: 1 addition & 1 deletion atlas/tools/env/environment.yml
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ dependencies:
- pytest # python testing tool
- nose # nose test tools
- stomp.py
- panda-client # panda client
- panda-client-light # panda client
- rucio-clients
- rucio-clients-atlas
- idds-common==0.11.5
Expand Down
8 changes: 8 additions & 0 deletions common/lib/idds/common/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,7 @@ class RequestStatus(IDDSEnum):
Terminating = 20
Building = 21
Built = 22
Throttling = 23


class RequestLocking(IDDSEnum):
Expand Down Expand Up @@ -291,6 +292,7 @@ class ContentStatus(IDDSEnum):
FakeAvailable = 8
Missing = 9
Cancelled = 10
Activated = 11


class ContentLocking(IDDSEnum):
Expand Down Expand Up @@ -429,6 +431,7 @@ class MessageStatus(IDDSEnum):
Fetched = 1
Delivered = 2
Failed = 3
ConfirmDelivered = 4


class MessageLocking(IDDSEnum):
Expand Down Expand Up @@ -484,6 +487,11 @@ class CommandLocation(IDDSEnum):
Other = 6


class ThrottlerStatus(IDDSEnum):
InActive = 0
Active = 1


class ReturnCode(IDDSEnum):
Ok = 0
Failed = 255
Expand Down
4 changes: 3 additions & 1 deletion common/lib/idds/common/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import subprocess
import sys
import tarfile
import time
# import traceback

from enum import Enum
Expand Down Expand Up @@ -67,6 +68,7 @@ def setup_logging(name, stream=None, loglevel=None):
else:
logging.basicConfig(stream=stream, level=loglevel,
format='%(asctime)s\t%(threadName)s\t%(name)s\t%(levelname)s\t%(message)s')
logging.Formatter.converter = time.gmtime


def get_logger(name, filename=None, loglevel=None):
Expand Down Expand Up @@ -538,7 +540,7 @@ def merge_dict(dict1, dict2):
else:
if dict2[key] is None:
continue
elif isinstance(dict1[key], type(dict2[key])):
elif not isinstance(dict1[key], type(dict2[key])):
raise Exception("type of %s is different from %s, cannot merge" % (type(dict1[key]), type(dict2[key])))
elif dict1[key] == dict2[key]:
continue
Expand Down
2 changes: 1 addition & 1 deletion docs/source/users/admin_guides.rst
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ Environment setup on CENTOS 7
conda activate /opt/idds
pip install idds-server idds-doma idds-atlas idds-monitor idds-website
pip install rucio-clients-atlas rucio-clients panda-client
pip install rucio-clients-atlas rucio-clients panda-client-light
# # add "auth_type = x509_proxy" to /opt/idds/etc/rucio.cfg
2. setup environment after installed.
Expand Down
37 changes: 35 additions & 2 deletions doma/lib/idds/doma/workflowv2/domapandawork.py
Original file line number Diff line number Diff line change
Expand Up @@ -544,6 +544,10 @@ def create_processing(self, input_output_maps=[]):
task_param_map['maxWalltime'] = self.maxWalltime
task_param_map['maxFailure'] = self.maxAttempt if self.maxAttempt else 5
task_param_map['maxAttempt'] = self.maxAttempt if self.maxAttempt else 5
if task_param_map['maxAttempt'] < self.num_retries:
task_param_map['maxAttempt'] = self.num_retries
if task_param_map['maxFailure'] < self.num_retries:
task_param_map['maxFailure'] = self.num_retries
task_param_map['log'] = self.task_log
task_param_map['jobParameters'] = [
{'type': 'constant',
Expand Down Expand Up @@ -673,9 +677,12 @@ def get_processing_status_from_panda_status(self, task_status):
elif task_status in ['finished', 'paused']:
# finished, finishing, waiting it to be done
processing_status = ProcessingStatus.SubFinished
elif task_status in ['failed', 'aborted', 'exhausted']:
elif task_status in ['failed', 'exhausted']:
# aborting, tobroken
processing_status = ProcessingStatus.Failed
elif task_status in ['aborted']:
# aborting, tobroken
processing_status = ProcessingStatus.Cancelled
elif task_status in ['broken']:
processing_status = ProcessingStatus.Broken
else:
Expand Down Expand Up @@ -786,6 +793,8 @@ def get_content_status_from_panda_status(self, job_info):
return ContentStatus.FinalFailed
else:
return ContentStatus.Failed
elif jobstatus in ['activated']:
return ContentStatus.Activated
else:
return ContentStatus.Processing

Expand Down Expand Up @@ -1117,6 +1126,28 @@ def get_contents_ext(self, input_output_maps, contents_ext, contents_ext_full, j
self.logger.debug("get_contents_ext, left_contents[:1]: %s" % (str(left_contents[:3])))
return new_contents_ext_d, update_contents_ext_d, left_contents

def abort_contents(self, input_output_maps, updated_contents, contents_ext):
contents_ext_dict = {content['content_id']: content for content in contents_ext}
new_contents_ext = []

for map_id in input_output_maps:
outputs = input_output_maps[map_id]['outputs']
for content in outputs:
update_content = {'content_id': content['content_id'],
'substatus': ContentStatus.Missing}
updated_contents.append(update_content)
if content['content_id'] not in contents_ext_dict:
new_content_ext = {'content_id': content['content_id'],
'request_id': content['request_id'],
'transform_id': content['transform_id'],
'workload_id': content['workload_id'],
'coll_id': content['coll_id'],
'map_id': content['map_id'],
'status': ContentStatus.Missing}
new_contents_ext.append(new_content_ext)

return updated_contents, new_contents_ext

def poll_panda_task(self, processing=None, input_output_maps=None, contents_ext=None, job_info_maps={}, log_prefix=''):
task_id = None
try:
Expand Down Expand Up @@ -1161,7 +1192,9 @@ def poll_panda_task(self, processing=None, input_output_maps=None, contents_ext=
new_contents_ext, update_contents_ext, left_contents = self.get_contents_ext(input_output_maps, contents_ext,
contents_ext_full, job_info_maps)
# if left_jobs:
# processing_status = ProcessingStatus.Running
if processing_status in [ProcessingStatus.Cancelled]:
updated_contents, new_contents_ext1 = self.abort_contents(input_output_maps, updated_contents, contents_ext)
new_contents_ext = new_contents_ext + new_contents_ext1

return processing_status, updated_contents, update_contents_full, new_contents_ext, update_contents_ext
else:
Expand Down
2 changes: 1 addition & 1 deletion doma/tools/env/environment.yml
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,6 @@ dependencies:
- flake8 # Wrapper around PyFlakes&pep8
- pytest # python testing tool
- nose # nose test tools
- panda-client # panda client
- panda-client-light # panda client
- idds-common==0.11.5
- idds-workflow==0.11.5
34 changes: 34 additions & 0 deletions main/etc/sql/oracle_update.sql
Original file line number Diff line number Diff line change
Expand Up @@ -399,3 +399,37 @@ alter table health modify payload VARCHAR2(2048);

--- 2023.03.29
alter table contents_update add fetch_status NUMBER(2) DEFAULT 0;


-- 2023.05.18
alter table requests add site VARCHAR2(50);
CREATE INDEX REQUESTS_STATUS_SITE ON requests (status, site, request_id) COMPRESS 3 LOCAL;

alter table transforms add site VARCHAR2(50);
CREATE INDEX TRANSFORMS_STATUS_SITE ON transforms (status, site, request_id, transform_id) COMPRESS 3 LOCAL;

alter table processings add site VARCHAR2(50);
CREATE INDEX PROCESSINGS_STATUS_SITE ON processings (status, site, request_id, transform_id, processing_id) COMPRESS 3 LOCAL;

alter table messages add fetching_id NUMBER(12);


CREATE SEQUENCE THROTTLER_ID_SEQ MINVALUE 1 INCREMENT BY 1 START WITH 1 NOCACHE ORDER NOCYCLE GLOBAL;
CREATE TABLE Throttlers
(
throttler_id NUMBER(12) DEFAULT ON NULL THROTTLER_ID_SEQ.NEXTVAL constraint THROTTLER_ID_NN NOT NULL,
site VARCHAR2(50),
status NUMBER(2),
num_requests NUMBER(12),
num_transforms NUMBER(12),
num_processings NUMBER(12),
new_contents NUMBER(12),
queue_contents NUMBER(12),
created_at DATE DEFAULT SYS_EXTRACT_UTC(systimestamp(0)),
updated_at DATE DEFAULT SYS_EXTRACT_UTC(systimestamp(0)),
others CLOB,
CONSTRAINT THROTTLER_PK PRIMARY KEY (throttler_id), -- USING INDEX LOCAL,
CONSTRAINT THROTTLER_SITE_UQ UNIQUE (site)
);

alter table Messages add (poll_period INTERVAL DAY TO SECOND DEFAULT '00 00:05:00');
27 changes: 23 additions & 4 deletions main/lib/idds/agents/carrier/receiver.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ def __init__(self, receiver_num_threads=8, num_threads=1, bulk_message_delay=30,
self.update_processing_interval = 300

self.mode = mode
self.selected = None
self.selected_receiver = None

self.log_prefix = ''
Expand All @@ -79,6 +80,16 @@ def stop_receiver(self):
self.receiver.stop()
self.receiver = None

def suspend_receiver(self):
if hasattr(self, 'receiver') and self.receiver:
self.logger.info("Stopping receiver: %s" % self.receiver)
self.receiver.suspend()

def resume_receiver(self):
if hasattr(self, 'receiver') and self.receiver:
self.logger.info("Stopping receiver: %s" % self.receiver)
self.receiver.resume()

def is_receiver_started(self):
if hasattr(self, 'receiver') and self.receiver:
return True
Expand Down Expand Up @@ -109,9 +120,15 @@ def get_output_messages(self):
return msgs

def is_selected(self):
selected = None
if not self.selected_receiver:
return True
return self.is_self(self.selected_receiver)
selected = True
else:
selected = self.is_self(self.selected_receiver)
if self.selected is None or self.selected != selected:
self.logger.info("is_selected changed from %s to %s" % (self.selected, selected))
self.selected = selected
return self.selected

def monitor_receiver(self):
if self.mode == "single":
Expand Down Expand Up @@ -242,6 +259,8 @@ def run(self):
# [self.executors.submit(self.worker, log_prefix) for i in range(self.executors.get_max_workers())]
self.init_event_function_map()

self.start_receiver()

while not self.graceful_stop.is_set():
try:
self.execute_schedules()
Expand All @@ -250,11 +269,11 @@ def run(self):

if self.is_selected():
if not self.is_receiver_started():
self.start_receiver()
self.resume_receiver()

if not self.is_selected():
if self.is_receiver_started():
self.stop_receiver()
self.suspend_receiver()

msg = self.get_output_messages()
if msg:
Expand Down
11 changes: 10 additions & 1 deletion main/lib/idds/agents/carrier/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -315,10 +315,13 @@ def generate_file_messages(request_id, transform_id, workload_id, work, files, r
file_message = {'scope': file['scope'],
'name': file['name'],
'path': file['path'],
'map_id': file['map_id'],
'content_id': file['content_id'],
'status': file_status}
files_message.append(file_message)
msg_content = {'msg_type': i_msg_type_str.value,
'request_id': request_id,
'transform_id': transform_id,
'workload_id': workload_id,
'relation_type': relation_type,
'files': files_message}
Expand All @@ -341,6 +344,7 @@ def generate_content_ext_messages(request_id, transform_id, workload_id, work, f
msg_content = {'msg_type': i_msg_type_str.value,
'request_id': request_id,
'workload_id': workload_id,
'transform_id': transform_id,
'relation_type': relation_type,
'files': files_message}
num_msg_content = len(files_message)
Expand All @@ -356,6 +360,7 @@ def generate_collection_messages(request_id, transform_id, workload_id, work, co
msg_content = {'msg_type': i_msg_type_str.value,
'request_id': request_id,
'workload_id': workload_id,
'transform_id': transform_id,
'relation_type': relation_type,
'collections': [{'scope': collection.scope,
'name': coll_name,
Expand All @@ -371,6 +376,7 @@ def generate_work_messages(request_id, transform_id, workload_id, work, relation
msg_content = {'msg_type': i_msg_type_str.value,
'request_id': request_id,
'workload_id': workload_id,
'transform_id': transform_id,
'relation_type': relation_type,
'status': work.get_status().name,
'output': work.get_output_data(),
Expand Down Expand Up @@ -1085,6 +1091,7 @@ def handle_trigger_processing(processing, agent_attributes, trigger_new_updates=

def get_content_status_from_panda_msg_status(status):
status_map = {'starting': ContentStatus.New,
'activated': ContentStatus.Activated,
'running': ContentStatus.Processing,
'finished': ContentStatus.Available,
'failed': ContentStatus.Failed}
Expand Down Expand Up @@ -1370,7 +1377,9 @@ def handle_messages_processing(messages, logger=None, log_prefix='', update_proc
job_id = msg['jobid']
status = msg['status']
inputs = msg['inputs']
if inputs and status in ['finished']:
# if inputs and status in ['finished']:
# add activated
if inputs and status in ['finished', 'activated']:
logger.debug(log_prefix + "Received message: %s" % str(ori_msg))

ret_req_tf_pr_id = get_workload_id_transform_id_map(workload_id, logger=logger, log_prefix=log_prefix)
Expand Down
Loading

0 comments on commit 2d057ff

Please sign in to comment.