Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add throttler #163

Merged
merged 25 commits into from
Jun 5, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
1d911dd
add lsst test example
wguanicedew May 12, 2023
d4eec33
add panda rule to increase memory
wguanicedew May 12, 2023
e25f608
update contents for aborted tasks
wguanicedew May 17, 2023
5e38791
add content_ids to messages
wguanicedew May 17, 2023
24b9053
to check messages before replay
wguanicedew May 17, 2023
9b0226e
to check messages before replay
wguanicedew May 17, 2023
e9fb4bc
add throttler constant
wguanicedew May 21, 2023
65c992e
set utc logging
wguanicedew May 21, 2023
c065e44
monitor activated panda jobs
wguanicedew May 21, 2023
c52a266
add statistics func
wguanicedew May 21, 2023
f2124e3
add throttler table
wguanicedew May 21, 2023
6f7d803
add throttler alembic
wguanicedew May 21, 2023
2ddd2de
add throttler core orm func
wguanicedew May 21, 2023
1a482b8
conductor check whehter message has been processed before resending m…
wguanicedew May 21, 2023
25f3185
receiver add suspend/resume function
wguanicedew May 21, 2023
a54c0fc
clerk to throttle requests
wguanicedew May 21, 2023
7db0cf7
add anytree lib
wguanicedew May 21, 2023
e02de62
add set throttler func
wguanicedew May 21, 2023
7f8ac83
add message poll_period and delay resend messages using it
wguanicedew May 21, 2023
a4a34fe
add message poll_period and delay resend messages using it
wguanicedew May 21, 2023
8df6548
improve throttler and conductor
wguanicedew May 22, 2023
89a50a1
fix message core
wguanicedew May 22, 2023
0840cac
fix replay messages for failed jobs
wguanicedew May 23, 2023
a76b080
disable subscriber heartbeat
wguanicedew Jun 1, 2023
26f3bab
disable syslog
wguanicedew Jun 5, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions common/lib/idds/common/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,7 @@ class RequestStatus(IDDSEnum):
Terminating = 20
Building = 21
Built = 22
Throttling = 23


class RequestLocking(IDDSEnum):
Expand Down Expand Up @@ -291,6 +292,7 @@ class ContentStatus(IDDSEnum):
FakeAvailable = 8
Missing = 9
Cancelled = 10
Activated = 11


class ContentLocking(IDDSEnum):
Expand Down Expand Up @@ -429,6 +431,7 @@ class MessageStatus(IDDSEnum):
Fetched = 1
Delivered = 2
Failed = 3
ConfirmDelivered = 4


class MessageLocking(IDDSEnum):
Expand Down Expand Up @@ -484,6 +487,11 @@ class CommandLocation(IDDSEnum):
Other = 6


class ThrottlerStatus(IDDSEnum):
InActive = 0
Active = 1


class ReturnCode(IDDSEnum):
Ok = 0
Failed = 255
Expand Down
4 changes: 3 additions & 1 deletion common/lib/idds/common/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import subprocess
import sys
import tarfile
import time
# import traceback

from enum import Enum
Expand Down Expand Up @@ -67,6 +68,7 @@ def setup_logging(name, stream=None, loglevel=None):
else:
logging.basicConfig(stream=stream, level=loglevel,
format='%(asctime)s\t%(threadName)s\t%(name)s\t%(levelname)s\t%(message)s')
logging.Formatter.converter = time.gmtime


def get_logger(name, filename=None, loglevel=None):
Expand Down Expand Up @@ -538,7 +540,7 @@ def merge_dict(dict1, dict2):
else:
if dict2[key] is None:
continue
elif isinstance(dict1[key], type(dict2[key])):
elif not isinstance(dict1[key], type(dict2[key])):
raise Exception("type of %s is different from %s, cannot merge" % (type(dict1[key]), type(dict2[key])))
elif dict1[key] == dict2[key]:
continue
Expand Down
37 changes: 35 additions & 2 deletions doma/lib/idds/doma/workflowv2/domapandawork.py
Original file line number Diff line number Diff line change
Expand Up @@ -544,6 +544,10 @@ def create_processing(self, input_output_maps=[]):
task_param_map['maxWalltime'] = self.maxWalltime
task_param_map['maxFailure'] = self.maxAttempt if self.maxAttempt else 5
task_param_map['maxAttempt'] = self.maxAttempt if self.maxAttempt else 5
if task_param_map['maxAttempt'] < self.num_retries:
task_param_map['maxAttempt'] = self.num_retries
if task_param_map['maxFailure'] < self.num_retries:
task_param_map['maxFailure'] = self.num_retries
task_param_map['log'] = self.task_log
task_param_map['jobParameters'] = [
{'type': 'constant',
Expand Down Expand Up @@ -673,9 +677,12 @@ def get_processing_status_from_panda_status(self, task_status):
elif task_status in ['finished', 'paused']:
# finished, finishing, waiting it to be done
processing_status = ProcessingStatus.SubFinished
elif task_status in ['failed', 'aborted', 'exhausted']:
elif task_status in ['failed', 'exhausted']:
# aborting, tobroken
processing_status = ProcessingStatus.Failed
elif task_status in ['aborted']:
# aborting, tobroken
processing_status = ProcessingStatus.Cancelled
elif task_status in ['broken']:
processing_status = ProcessingStatus.Broken
else:
Expand Down Expand Up @@ -786,6 +793,8 @@ def get_content_status_from_panda_status(self, job_info):
return ContentStatus.FinalFailed
else:
return ContentStatus.Failed
elif jobstatus in ['activated']:
return ContentStatus.Activated
else:
return ContentStatus.Processing

Expand Down Expand Up @@ -1117,6 +1126,28 @@ def get_contents_ext(self, input_output_maps, contents_ext, contents_ext_full, j
self.logger.debug("get_contents_ext, left_contents[:1]: %s" % (str(left_contents[:3])))
return new_contents_ext_d, update_contents_ext_d, left_contents

def abort_contents(self, input_output_maps, updated_contents, contents_ext):
contents_ext_dict = {content['content_id']: content for content in contents_ext}
new_contents_ext = []

for map_id in input_output_maps:
outputs = input_output_maps[map_id]['outputs']
for content in outputs:
update_content = {'content_id': content['content_id'],
'substatus': ContentStatus.Missing}
updated_contents.append(update_content)
if content['content_id'] not in contents_ext_dict:
new_content_ext = {'content_id': content['content_id'],
'request_id': content['request_id'],
'transform_id': content['transform_id'],
'workload_id': content['workload_id'],
'coll_id': content['coll_id'],
'map_id': content['map_id'],
'status': ContentStatus.Missing}
new_contents_ext.append(new_content_ext)

return updated_contents, new_contents_ext

def poll_panda_task(self, processing=None, input_output_maps=None, contents_ext=None, job_info_maps={}, log_prefix=''):
task_id = None
try:
Expand Down Expand Up @@ -1161,7 +1192,9 @@ def poll_panda_task(self, processing=None, input_output_maps=None, contents_ext=
new_contents_ext, update_contents_ext, left_contents = self.get_contents_ext(input_output_maps, contents_ext,
contents_ext_full, job_info_maps)
# if left_jobs:
# processing_status = ProcessingStatus.Running
if processing_status in [ProcessingStatus.Cancelled]:
updated_contents, new_contents_ext1 = self.abort_contents(input_output_maps, updated_contents, contents_ext)
new_contents_ext = new_contents_ext + new_contents_ext1

return processing_status, updated_contents, update_contents_full, new_contents_ext, update_contents_ext
else:
Expand Down
34 changes: 34 additions & 0 deletions main/etc/sql/oracle_update.sql
Original file line number Diff line number Diff line change
Expand Up @@ -399,3 +399,37 @@ alter table health modify payload VARCHAR2(2048);

--- 2023.03.29
alter table contents_update add fetch_status NUMBER(2) DEFAULT 0;


-- 2023.05.18
alter table requests add site VARCHAR2(50);
CREATE INDEX REQUESTS_STATUS_SITE ON requests (status, site, request_id) COMPRESS 3 LOCAL;

alter table transforms add site VARCHAR2(50);
CREATE INDEX TRANSFORMS_STATUS_SITE ON transforms (status, site, request_id, transform_id) COMPRESS 3 LOCAL;

alter table processings add site VARCHAR2(50);
CREATE INDEX PROCESSINGS_STATUS_SITE ON processings (status, site, request_id, transform_id, processing_id) COMPRESS 3 LOCAL;

alter table messages add fetching_id NUMBER(12);


CREATE SEQUENCE THROTTLER_ID_SEQ MINVALUE 1 INCREMENT BY 1 START WITH 1 NOCACHE ORDER NOCYCLE GLOBAL;
CREATE TABLE Throttlers
(
throttler_id NUMBER(12) DEFAULT ON NULL THROTTLER_ID_SEQ.NEXTVAL constraint THROTTLER_ID_NN NOT NULL,
site VARCHAR2(50),
status NUMBER(2),
num_requests NUMBER(12),
num_transforms NUMBER(12),
num_processings NUMBER(12),
new_contents NUMBER(12),
queue_contents NUMBER(12),
created_at DATE DEFAULT SYS_EXTRACT_UTC(systimestamp(0)),
updated_at DATE DEFAULT SYS_EXTRACT_UTC(systimestamp(0)),
others CLOB,
CONSTRAINT THROTTLER_PK PRIMARY KEY (throttler_id), -- USING INDEX LOCAL,
CONSTRAINT THROTTLER_SITE_UQ UNIQUE (site)
);

alter table Messages add (poll_period INTERVAL DAY TO SECOND DEFAULT '00 00:05:00');
27 changes: 23 additions & 4 deletions main/lib/idds/agents/carrier/receiver.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ def __init__(self, receiver_num_threads=8, num_threads=1, bulk_message_delay=30,
self.update_processing_interval = 300

self.mode = mode
self.selected = None
self.selected_receiver = None

self.log_prefix = ''
Expand All @@ -79,6 +80,16 @@ def stop_receiver(self):
self.receiver.stop()
self.receiver = None

def suspend_receiver(self):
if hasattr(self, 'receiver') and self.receiver:
self.logger.info("Stopping receiver: %s" % self.receiver)
self.receiver.suspend()

def resume_receiver(self):
if hasattr(self, 'receiver') and self.receiver:
self.logger.info("Stopping receiver: %s" % self.receiver)
self.receiver.resume()

def is_receiver_started(self):
if hasattr(self, 'receiver') and self.receiver:
return True
Expand Down Expand Up @@ -109,9 +120,15 @@ def get_output_messages(self):
return msgs

def is_selected(self):
selected = None
if not self.selected_receiver:
return True
return self.is_self(self.selected_receiver)
selected = True
else:
selected = self.is_self(self.selected_receiver)
if self.selected is None or self.selected != selected:
self.logger.info("is_selected changed from %s to %s" % (self.selected, selected))
self.selected = selected
return self.selected

def monitor_receiver(self):
if self.mode == "single":
Expand Down Expand Up @@ -242,6 +259,8 @@ def run(self):
# [self.executors.submit(self.worker, log_prefix) for i in range(self.executors.get_max_workers())]
self.init_event_function_map()

self.start_receiver()

while not self.graceful_stop.is_set():
try:
self.execute_schedules()
Expand All @@ -250,11 +269,11 @@ def run(self):

if self.is_selected():
if not self.is_receiver_started():
self.start_receiver()
self.resume_receiver()

if not self.is_selected():
if self.is_receiver_started():
self.stop_receiver()
self.suspend_receiver()

msg = self.get_output_messages()
if msg:
Expand Down
11 changes: 10 additions & 1 deletion main/lib/idds/agents/carrier/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -315,10 +315,13 @@ def generate_file_messages(request_id, transform_id, workload_id, work, files, r
file_message = {'scope': file['scope'],
'name': file['name'],
'path': file['path'],
'map_id': file['map_id'],
'content_id': file['content_id'],
'status': file_status}
files_message.append(file_message)
msg_content = {'msg_type': i_msg_type_str.value,
'request_id': request_id,
'transform_id': transform_id,
'workload_id': workload_id,
'relation_type': relation_type,
'files': files_message}
Expand All @@ -341,6 +344,7 @@ def generate_content_ext_messages(request_id, transform_id, workload_id, work, f
msg_content = {'msg_type': i_msg_type_str.value,
'request_id': request_id,
'workload_id': workload_id,
'transform_id': transform_id,
'relation_type': relation_type,
'files': files_message}
num_msg_content = len(files_message)
Expand All @@ -356,6 +360,7 @@ def generate_collection_messages(request_id, transform_id, workload_id, work, co
msg_content = {'msg_type': i_msg_type_str.value,
'request_id': request_id,
'workload_id': workload_id,
'transform_id': transform_id,
'relation_type': relation_type,
'collections': [{'scope': collection.scope,
'name': coll_name,
Expand All @@ -371,6 +376,7 @@ def generate_work_messages(request_id, transform_id, workload_id, work, relation
msg_content = {'msg_type': i_msg_type_str.value,
'request_id': request_id,
'workload_id': workload_id,
'transform_id': transform_id,
'relation_type': relation_type,
'status': work.get_status().name,
'output': work.get_output_data(),
Expand Down Expand Up @@ -1085,6 +1091,7 @@ def handle_trigger_processing(processing, agent_attributes, trigger_new_updates=

def get_content_status_from_panda_msg_status(status):
status_map = {'starting': ContentStatus.New,
'activated': ContentStatus.Activated,
'running': ContentStatus.Processing,
'finished': ContentStatus.Available,
'failed': ContentStatus.Failed}
Expand Down Expand Up @@ -1370,7 +1377,9 @@ def handle_messages_processing(messages, logger=None, log_prefix='', update_proc
job_id = msg['jobid']
status = msg['status']
inputs = msg['inputs']
if inputs and status in ['finished']:
# if inputs and status in ['finished']:
# add activated
if inputs and status in ['finished', 'activated']:
logger.debug(log_prefix + "Received message: %s" % str(ori_msg))

ret_req_tf_pr_id = get_workload_id_transform_id_map(workload_id, logger=logger, log_prefix=log_prefix)
Expand Down
Loading