Skip to content

Commit

Permalink
Merge pull request #32 from wguanicedew/dev
Browse files Browse the repository at this point in the history
Dev
  • Loading branch information
tmaeno authored Jun 8, 2020
2 parents 1fa14b2 + 2a630d0 commit dd49794
Show file tree
Hide file tree
Showing 13 changed files with 345 additions and 52 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -85,8 +85,18 @@ def get_executable_arguments_for_sandbox(self, transform_metadata, input_json, u
sandbox = None
if 'sandbox' in transform_metadata:
sandbox = transform_metadata['sandbox']
executable = transform_metadata['executable']
arguments = transform_metadata['arguments']
executable = transform_metadata['executable'].strip()
arguments = transform_metadata['arguments'].strip()

if executable == 'docker' and sandbox:
if 'workdir' in transform_metadata:
docker_workdir = transform_metadata['workdir']
else:
docker_workdir = None

arg_pre = 'run --rm -v $(pwd):%s %s ' % (docker_workdir, sandbox)
arguments = arg_pre + arguments
sandbox = None

output_json = None
if 'output_json' in transform_metadata:
Expand Down
171 changes: 133 additions & 38 deletions atlas/lib/idds/atlas/processing/stagein_poller.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,15 @@
# http://www.apache.org/licenses/LICENSE-2.0OA
#
# Authors:
# - Wen Guan, <[email protected]>, 2019
# - Wen Guan, <[email protected]>, 2019-2020


"""
Class of collection lister plubin
"""

import datetime
import time
import traceback


Expand All @@ -24,55 +26,148 @@
class StageInPoller(ProcessingPluginBase):
def __init__(self, **kwargs):
super(StageInPoller, self).__init__(**kwargs)
if hasattr(self, 'default_max_waiting_time'):
self.default_max_waiting_time = int(self.default_max_waiting_time)
else:
self.default_max_waiting_time = 9999999999999999
if hasattr(self, 'check_all_rules_for_new_rule'):
if type(self.check_all_rules_for_new_rule) in [bool]:
pass
elif type(self.check_all_rules_for_new_rule) in [str]:
if self.check_all_rules_for_new_rule.lower == 'true':
self.check_all_rules_for_new_rule = True
else:
self.check_all_rules_for_new_rule = False
else:
self.check_all_rules_for_new_rule = False
else:
self.check_all_rules_for_new_rule = False
if hasattr(self, 'new_rule_lifetime'):
self.new_rule_lifetime = int(self.new_rule_lifetime)
else:
self.new_rule_lifetime = 3600 * 24 * 7

def get_rule_lifetime(self, rule):
current_time = datetime.datetime.utcnow()
life_diff = current_time - rule['created_at']
life_time = life_diff.total_seconds()
return life_time

def get_max_waiting_time(self, transform):
transform_metadata = transform['transform_metadata']
if 'max_waiting_time' in transform_metadata and transform_metadata['max_waiting_time']:
return transform_metadata['max_waiting_time']
else:
return self.default_max_waiting_time

def should_create_new_rule(self, basic_rule, new_rules, transform):
if self.check_all_rules_for_new_rule:
rules = [basic_rule] + new_rules
else:
rules = [basic_rule]

for rule in rules:
if self.get_rule_lifetime(rule) >= self.get_max_waiting_time(transform):
return True
return False

def create_new_rule(self, rule, dids, dest_rse, src_rse=None):
try:
if 'rule_creator' not in self.plugins:
# raise exceptions.AgentPluginError('Plugin rule_creator is required')
err_msg = "Plugin rule_creator is required"
self.logger.error(err_msg)
return None

dataset_scope = rule['scope']
dataset_name = rule['name'] + ".idds.sub_%s" % int(time.time())
rule_id = self.plugins['rule_creator'](dataset_scope=dataset_scope,
dataset_name=dataset_name,
dids=dids,
dest_rse=dest_rse,
src_rse=src_rse,
lifetime=self.new_rule_lifetime)
return rule_id
except Exception as ex:
self.logger.error(ex)
self.logger.error(traceback.format_exc())
return None

def get_replica_status(self, file_key, replicases_status, new_replicases_statuses):
for repli_status in [replicases_status] + new_replicases_statuses:
if file_key in repli_status and repli_status[file_key] in [ContentStatus.Available, ContentStatus.Available.value]:
return ContentStatus.Available
return ContentStatus.New

def __call__(self, processing, transform, input_collection, output_collection, output_contents):
try:
processing_metadata = processing['processing_metadata']
rule_id = processing_metadata['rule_id']

if 'rule_poller' not in self.plugins:
raise exceptions.AgentPluginError('Plugin rule_poller is required')
rule, replicases_status = self.plugins['rule_poller'](rule_id)

rule_id = processing_metadata['rule_id']
basic_rule, basic_replicases_status = self.plugins['rule_poller'](rule_id)

if 'new_rule_ids' in processing_metadata:
new_rule_ids = processing_metadata['new_rule_ids']
else:
new_rule_ids = []
new_rules, new_replicases_statuses = [], []
for rule_id in new_rule_ids:
new_rule, new_replicases_status = self.plugins['rule_poller'](rule_id)
new_rules.append(new_rule)
new_replicases_statuses.append(new_replicases_status)

remain_files = []
updated_files = []
processing_updates = {}
if replicases_status:
file_status_statistics = {}
for file in output_contents:
file_key = '%s:%s' % (file['scope'], file['name'])
if file_key in replicases_status:
new_file_status = replicases_status[file_key]
if not new_file_status == file['status']:
file['status'] = new_file_status

updated_file = {'content_id': file['content_id'],
'status': new_file_status,
'scope': file['scope'],
'name': file['name'],
'path': None}
updated_files.append(updated_file)

if file['status'] not in file_status_statistics:
file_status_statistics[file['status']] = 0
file_status_statistics[file['status']] += 1

file_status_keys = list(file_status_statistics.keys())
if len(file_status_keys) == 1:
if file_status_keys == [ContentStatus.Available] and rule['state'] == 'OK':
processing_status = ProcessingStatus.Finished
elif file_status_keys == [ContentStatus.Failed]:
processing_status = ProcessingStatus.Failed
file_status_statistics = {}
for file in output_contents:
file_key = '%s:%s' % (file['scope'], file['name'])
new_file_status = self.get_replica_status(file_key, basic_replicases_status, new_replicases_statuses)
if not new_file_status == file['status']:
file['status'] = new_file_status

updated_file = {'content_id': file['content_id'],
'status': new_file_status,
'scope': file['scope'],
'name': file['name'],
'path': None}
updated_files.append(updated_file)

if file['status'] in [ContentStatus.New]:
remain_file = {'scope': file['scope'], 'name': file['name']}
remain_files.append(remain_file)

if file['status'] not in file_status_statistics:
file_status_statistics[file['status']] = 0
file_status_statistics[file['status']] += 1

file_status_keys = list(file_status_statistics.keys())
if len(file_status_keys) == 1:
if file_status_keys == [ContentStatus.Available]:
processing_status = ProcessingStatus.Finished
elif file_status_keys == [ContentStatus.Failed]:
processing_status = ProcessingStatus.Failed
else:
processing_status = ProcessingStatus.Running

file_statusvalue_statistics = {}
for key in file_status_statistics:
file_statusvalue_statistics[key.name] = file_status_statistics[key]
processing_metadata['content_status_statistics'] = file_statusvalue_statistics

if remain_files and self.should_create_new_rule(basic_rule, new_rules, transform):
new_rule_id = self.create_new_rule(rule=basic_rule, dids=remain_files, dest_rse=basic_rule['rse_expression'])
if new_rule_id is not None:
if ('new_rule_ids' not in processing_metadata):
processing_metadata['new_rule_ids'] = [new_rule_id]
else:
processing_status = ProcessingStatus.Running

file_statusvalue_statistics = {}
for key in file_status_statistics:
file_statusvalue_statistics[key.name] = file_status_statistics[key]

processing_metadata['content_status_statistics'] = file_statusvalue_statistics
processing_metadata['new_rule_ids'].append(new_rule_id)

processing_updates = {'status': processing_status,
'processing_metadata': processing_metadata}
processing_updates = {'status': processing_status,
'processing_metadata': processing_metadata}

return {'updated_files': updated_files, 'processing_updates': processing_updates}
except Exception as ex:
Expand Down
58 changes: 58 additions & 0 deletions atlas/lib/idds/atlas/rucio/rule_creator.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
#!/usr/bin/env python
#
# Licensed under the Apache License, Version 2.0 (the "License");
# You may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# http://www.apache.org/licenses/LICENSE-2.0OA
#
# Authors:
# - Wen Guan, <[email protected]>, 2019


"""
Class of rule creator plubin
"""

import traceback

from rucio.common.exception import DuplicateRule

from idds.atlas.rucio.base_plugin import RucioPluginBase


class RuleCreator(RucioPluginBase):
def __init__(self, lifetime=3600 * 24 * 7, **kwargs):
super(RuleCreator, self).__init__(**kwargs)
self.lifetime = int(lifetime)

def __call__(self, dataset_scope, dataset_name, dids, dest_rse, src_rse=None, lifetime=None):
try:
# dids = [{'scope': <scope>, 'name': <name>}]
if not lifetime:
lifetime = self.lifetime

self.client.add_dataset(scope=dataset_scope, name=dataset_name, lifetime=lifetime)
self.client.attach_dids(scope=dataset_scope, name=dataset_name, dids=dids)
self.client.set_status(scope=dataset_scope, name=dataset_name, open=False)

ds_did = {'scope': dataset_scope, 'name': dataset_name}
rule_id = self.client.add_replication_rule(dids=[ds_did],
copies=1,
rse_expression=dest_rse,
source_replica_expression=src_rse,
lifetime=lifetime,
locked=False,
grouping='DATASET',
ask_approval=False)
return rule_id
except DuplicateRule as ex:
self.logger.warn(ex)
rules = self.client.list_did_rules(scope=dataset_scope, name=dataset_name)
for rule in rules:
if rule['account'] == self.client.account and rule['rse_expression'] == dest_rse:
return rule['id']
except Exception as ex:
self.logger.error(ex)
self.logger.error(traceback.format_exc())
# raise exceptions.AgentPluginError('%s: %s' % (str(ex), traceback.format_exc()))
return None
2 changes: 1 addition & 1 deletion atlas/lib/idds/atlas/rucio/rule_submitter.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ def __call__(self, processing, transform, input_collection):
self.logger.warn(ex)
rules = self.client.list_did_rules(scope=input_collection['scope'], name=input_collection['name'])
for rule in rules:
if rule['account'] == self.client.account:
if rule['account'] == self.client.account and rule['rse_expression'] == transform_metadata['dest_rse']:
return rule['id']
except Exception as ex:
self.logger.error(ex)
Expand Down
13 changes: 9 additions & 4 deletions docs/source/usecases/hyperparemeter_optimization.rst
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,12 @@ RESTful Service

1. To retrieve hyperparameters.

client.get_hyperparameters(request_id, status=None, limit=None)
client.get_hyperparameters(workload_id, request_id, id=None, status=None, limit=None)

examples:
client.get_hyperparameters(workload_id=123, request_id=None)
client.get_hyperparameters(workload_id=None, request_id=456)
client.get_hyperparameters(workload_id=None, request_id=456, id=0)

2. To register loss of a group of hyperparameters.

Expand Down Expand Up @@ -82,8 +87,8 @@ through command-line arguments.
The name of output file which the container creates in the current directory. The file contains a json-formatted list of new hyperparameter points.

When iDDS runs Steering containers, iDDS will replace %XYZ with actual parameters.
For example, if an execution string is something like ``python opt.py --n=%NUM_POINT=%IN ...``,
``python opt.py --n=%NUM_POINT=10 ...`` will be executed in the container.
For example, if an execution string is something like ``python opt.py --n=%NUM_POINT ...``,
``python opt.py --n=10 ...`` will be executed in the container.
Input and output are done through json files in the current directly ($PWD) so that
the directory needs to be mounted.

Expand All @@ -110,7 +115,7 @@ Basically what the Steering container needs to do is as follows:

How to test the Steering container
************************************
Here is one example (main/lib/idds/tests/hyperparameteropt_docker_test.py). Users can update the request part and test their docker locally.
Here is one example (`Steering_local_test https://github.com/HSF/iDDS/blob/master/main/lib/idds/tests/hyperparameteropt_docker_local_test.py`_). Users can update the request part and test their docker locally.



Expand Down
Loading

0 comments on commit dd49794

Please sign in to comment.