Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Dev #316

Merged
merged 4 commits into from
Jun 7, 2024
Merged

Dev #316

Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions common/lib/idds/common/event.py
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@ def event_type(self):
return self._event_type.name

def able_to_merge(self, event):
"""
if self._event_type == event._event_type and self.get_event_id() == event.get_event_id():
return True
if self._event_type == event._event_type and self.get_event_id() == event.get_event_id() and self._counter == event._counter:
Expand All @@ -98,6 +99,14 @@ def able_to_merge(self, event):
# ddiff = DeepDiff(self._content, event._content, ignore_order=True)
# if not ddiff:
# return True
"""
if self._event_type == event._event_type:
if (self._content is None and event._content is None):
return True
elif (self._content is not None and event._content is not None):
ddiff = DeepDiff(self._content, event._content, ignore_order=True)
if not ddiff:
return True
return False

def changed(self):
Expand Down
6 changes: 3 additions & 3 deletions main/etc/sql/postgresql.sql
Original file line number Diff line number Diff line change
Expand Up @@ -548,15 +548,15 @@ CREATE INDEX "CONTENTS_ID_NAME_IDX" ON doma_idds.contents (coll_id, scope, md5('
$$ LANGUAGE PLPGSQL


CREATE SEQUENCE doma_idds."METAINFO_ID_SEQ" START WITH 1
CREATE SEQUENCE doma_idds."METAINFO_ID_SEQ" START WITH 1;
CREATE TABLE meta_info
(
meta_id BIGINT NOT NULL,
name VARCHAR2(50),
name VARCHAR(50),
status INTEGER,
created_at TIMESTAMP WITHOUT TIME ZONE NOT NULL,
updated_at TIMESTAMP WITHOUT TIME ZONE NOT NULL,
description VARCHAR2(1000),
description VARCHAR(1000),
meta_info JSONB,
CONSTRAINT "METAINFO_PK" PRIMARY KEY (meta_id), -- USING INDEX LOCAL,
CONSTRAINT "METAINFO_NAME_UQ" UNIQUE (name)
Expand Down
127 changes: 110 additions & 17 deletions main/lib/idds/agents/carrier/finisher.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
handle_resume_processing,
# is_process_terminated,
sync_processing)
from .iutils import sync_iprocessing
from .iutils import sync_iprocessing, handle_abort_iprocessing, handle_resume_iprocessing
from .poller import Poller

setup_logging(__name__)
Expand Down Expand Up @@ -346,6 +346,43 @@ def handle_abort_processing(self, processing, log_prefix=""):
return ret
return None

def handle_abort_iprocessing(self, processing, log_prefix=""):
"""
process abort processing
"""
try:
plugin = None
if processing['processing_type']:
plugin_name = processing['processing_type'].name.lower() + '_poller'
plugin = self.get_plugin(plugin_name)
else:
raise exceptions.ProcessSubmitFailed('No corresponding poller plugins for %s' % processing['processing_type'])

processing_status, update_collections, update_contents, messages = handle_abort_iprocessing(processing, self.agent_attributes, plugin=plugin, logger=self.logger, log_prefix=log_prefix)

update_processing = {'processing_id': processing['processing_id'],
'parameters': {'status': processing_status,
'substatus': ProcessingStatus.ToCancel,
'locking': ProcessingLocking.Idle}}
ret = {'update_processing': update_processing,
'update_collections': update_collections,
'update_contents': update_contents,
'messages': messages
}
return ret
except Exception as ex:
self.logger.error(ex)
self.logger.error(traceback.format_exc())
error = {'abort_err': {'msg': truncate_string('%s' % (ex), length=200)}}
update_processing = {'processing_id': processing['processing_id'],
'parameters': {'status': ProcessingStatus.ToCancel,
'locking': ProcessingLocking.Idle,
'errors': processing['errors'] if processing['errors'] else {}}}
update_processing['parameters']['errors'].update(error)
ret = {'update_processing': update_processing}
return ret
return None

def process_abort_processing(self, event):
self.number_workers += 1
pro_ret = ReturnCode.Ok.value
Expand Down Expand Up @@ -373,17 +410,27 @@ def process_abort_processing(self, event):
self.logger.info(log_pre + "process_abort_processing result: %s" % str(ret))
self.update_processing(ret, pr)
elif pr:
ret = self.handle_abort_processing(pr, log_prefix=log_pre)
ret_copy = {}
for ret_key in ret:
if ret_key != 'messages':
ret_copy[ret_key] = ret[ret_key]
self.logger.info(log_pre + "process_abort_processing result: %s" % str(ret_copy))
if pr['processing_type'] and pr['processing_type'] in [ProcessingType.iWorkflow, ProcessingType.iWork]:
ret = self.handle_abort_iprocessing(pr, log_prefix=log_pre)
self.logger.info(log_pre + "process_abort_processing result: %s" % str(ret))

self.update_processing(ret, pr)
self.logger.info(log_pre + "UpdateTransformEvent(transform_id: %s)" % pr['transform_id'])
event = UpdateTransformEvent(publisher_id=self.id, transform_id=pr['transform_id'], content=event._content)
self.event_bus.send(event)
self.update_processing(ret, pr, use_bulk_update_mappings=False)

self.logger.info(log_pre + "UpdateTransformEvent(transform_id: %s)" % pr['transform_id'])
event = UpdateTransformEvent(publisher_id=self.id, transform_id=pr['transform_id'], content=event._content)
self.event_bus.send(event)
else:
ret = self.handle_abort_processing(pr, log_prefix=log_pre)
ret_copy = {}
for ret_key in ret:
if ret_key != 'messages':
ret_copy[ret_key] = ret[ret_key]
self.logger.info(log_pre + "process_abort_processing result: %s" % str(ret_copy))

self.update_processing(ret, pr)
self.logger.info(log_pre + "UpdateTransformEvent(transform_id: %s)" % pr['transform_id'])
event = UpdateTransformEvent(publisher_id=self.id, transform_id=pr['transform_id'], content=event._content)
self.event_bus.send(event)
except Exception as ex:
self.logger.error(ex)
self.logger.error(traceback.format_exc())
Expand Down Expand Up @@ -419,6 +466,42 @@ def handle_resume_processing(self, processing, log_prefix=""):
return ret
return None

def handle_resume_iprocessing(self, processing, log_prefix=""):
"""
process resume processing
"""
try:
plugin = None
if processing['processing_type']:
plugin_name = processing['processing_type'].name.lower() + '_poller'
plugin = self.get_plugin(plugin_name)
else:
raise exceptions.ProcessSubmitFailed('No corresponding poller plugins for %s' % processing['processing_type'])

processing_status, update_collections, update_contents = handle_resume_iprocessing(processing, self.agent_attributes, plugin=plugin, logger=self.logger, log_prefix=log_prefix)

update_processing = {'processing_id': processing['processing_id'],
'parameters': {'status': processing_status,
'substatus': ProcessingStatus.ToResume,
'locking': ProcessingLocking.Idle}}
ret = {'update_processing': update_processing,
'update_collections': update_collections,
'update_contents': update_contents,
}
return ret
except Exception as ex:
self.logger.error(ex)
self.logger.error(traceback.format_exc())
error = {'resume_err': {'msg': truncate_string('%s' % (ex), length=200)}}
update_processing = {'processing_id': processing['processing_id'],
'parameters': {'status': ProcessingStatus.ToResume,
'locking': ProcessingLocking.Idle,
'errors': processing['errors'] if processing['errors'] else {}}}
update_processing['parameters']['errors'].update(error)
ret = {'update_processing': update_processing}
return ret
return None

def process_resume_processing(self, event):
self.number_workers += 1
pro_ret = ReturnCode.Ok.value
Expand All @@ -445,14 +528,24 @@ def process_resume_processing(self, event):

self.update_processing(ret, pr)
elif pr:
ret = self.handle_resume_processing(pr, log_prefix=log_pre)
self.logger.info(log_pre + "process_resume_processing result: %s" % str(ret))
if pr['processing_type'] and pr['processing_type'] in [ProcessingType.iWorkflow, ProcessingType.iWork]:
ret = self.handle_resume_iprocessing(pr, log_prefix=log_pre)
self.logger.info(log_pre + "process_resume_processing result: %s" % str(ret))

self.update_processing(ret, pr, use_bulk_update_mappings=False)
self.update_processing(ret, pr, use_bulk_update_mappings=False)

self.logger.info(log_pre + "UpdateTransformEvent(transform_id: %s)" % pr['transform_id'])
event = UpdateTransformEvent(publisher_id=self.id, transform_id=pr['transform_id'], content=event._content)
self.event_bus.send(event)
self.logger.info(log_pre + "UpdateTransformEvent(transform_id: %s)" % pr['transform_id'])
event = UpdateTransformEvent(publisher_id=self.id, transform_id=pr['transform_id'], content=event._content)
self.event_bus.send(event)
else:
ret = self.handle_resume_processing(pr, log_prefix=log_pre)
self.logger.info(log_pre + "process_resume_processing result: %s" % str(ret))

self.update_processing(ret, pr, use_bulk_update_mappings=False)

self.logger.info(log_pre + "UpdateTransformEvent(transform_id: %s)" % pr['transform_id'])
event = UpdateTransformEvent(publisher_id=self.id, transform_id=pr['transform_id'], content=event._content)
self.event_bus.send(event)
except Exception as ex:
self.logger.error(ex)
self.logger.error(traceback.format_exc())
Expand Down
30 changes: 30 additions & 0 deletions main/lib/idds/agents/carrier/iutils.py
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,36 @@ def handle_update_iprocessing(processing, agent_attributes, plugin=None, max_upd
return status, [], [], [], [], [], [], []


def handle_abort_iprocessing(processing, agent_attributes, plugin=None, logger=None, log_prefix=''):
logger = get_logger(logger)

workload_id = processing['workload_id']

try:
status = plugin.abort(workload_id, logger=logger, log_prefix=log_prefix)
logger.info(log_prefix + "abort work (status: %s, workload_id: %s)" % (status, workload_id))
except Exception as ex:
err_msg = "abort work failed with exception: %s" % (ex)
logger.error(log_prefix + err_msg)
raise Exception(err_msg)
return status, [], [], []


def handle_resume_iprocessing(processing, agent_attributes, plugin=None, logger=None, log_prefix=''):
logger = get_logger(logger)

workload_id = processing['workload_id']

try:
status = plugin.resume(workload_id, logger=logger, log_prefix=log_prefix)
logger.info(log_prefix + "resume work (status: %s, workload_id: %s)" % (status, workload_id))
except Exception as ex:
err_msg = "resume work failed with exception: %s" % (ex)
logger.error(log_prefix + err_msg)
raise Exception(err_msg)
return status, [], []


def sync_iprocessing(processing, agent_attributes, terminate=False, abort=False, logger=None, log_prefix=""):
# logger = get_logger()

Expand Down
33 changes: 33 additions & 0 deletions main/lib/idds/agents/carrier/plugins/panda.py
Original file line number Diff line number Diff line change
Expand Up @@ -159,3 +159,36 @@ def poll(self, workload_id, logger=None, log_prefix=''):
logger.error(log_prefix + str(ex))
logger.error(traceback.format_exc())
raise ex

def abort(self, workload_id, logger=None, log_prefix=''):
from pandaclient import Client

try:
if logger:
logger.info(log_prefix + f"aborting task {workload_id}")
Client.killTask(workload_id, soft=True)
status, task_status = Client.getTaskStatus(workload_id)
if status == 0:
return self.get_processing_status(task_status)
else:
msg = "Failed to abort task %s: status: %s, task_status: %s" % (workload_id, status, task_status)
raise Exception(msg)
except Exception as ex:
if logger:
logger.error(log_prefix + str(ex))
logger.error(traceback.format_exc())
raise ex

def resume(self, workload_id, logger=None, log_prefix=''):
from pandaclient import Client

try:
if logger:
logger.info(log_prefix + f"resuming task {workload_id}")
status, out = Client.retryTask(workload_id, newParams={})
return ProcessingStatus.Running
except Exception as ex:
if logger:
logger.error(log_prefix + str(ex))
logger.error(traceback.format_exc())
raise ex
2 changes: 1 addition & 1 deletion main/lib/idds/agents/carrier/poller.py
Original file line number Diff line number Diff line change
Expand Up @@ -430,7 +430,7 @@ def handle_update_iprocessing(self, processing):
plugin_name = processing['processing_type'].name.lower() + '_poller'
plugin = self.get_plugin(plugin_name)
else:
raise exceptions.ProcessSubmitFailed('No corresponding submitter plugins for %s' % processing['processing_type'])
raise exceptions.ProcessSubmitFailed('No corresponding poller plugins for %s' % processing['processing_type'])

ret_handle_update_processing = handle_update_iprocessing(processing,
self.agent_attributes,
Expand Down
Loading