Skip to content

Commit

Permalink
Merge pull request #58 from HSF/dev
Browse files Browse the repository at this point in the history
Dev
  • Loading branch information
wguanicedew authored Jul 12, 2021
2 parents d62e959 + ac7afe1 commit 9149082
Show file tree
Hide file tree
Showing 529 changed files with 150,113 additions and 2,099 deletions.
2 changes: 1 addition & 1 deletion atlas/lib/idds/atlas/version.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,4 +9,4 @@
# - Wen Guan, <[email protected]>, 2019 - 2021


release_version = "0.2.1"
release_version = "0.5.0"
59 changes: 30 additions & 29 deletions atlas/lib/idds/atlas/workflow/atlasactuatorwork.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@
import json
import os
import traceback
import uuid

from rucio.client.client import Client as RucioClient
from rucio.common.exception import (CannotAuthenticate as RucioCannotAuthenticate)
Expand All @@ -23,6 +22,7 @@
ProcessingStatus, WorkStatus)
from idds.common.utils import run_command
# from idds.workflow.work import Work
from idds.workflow.work import Processing
from idds.atlas.workflow.atlascondorwork import ATLASCondorWork


Expand Down Expand Up @@ -95,7 +95,7 @@ def set_agent_attributes(self, attrs, req_attributes=None):

########################################## # noqa E266
def generate_new_task(self):
self.logger.info("Work %s parameters for next task: %s" % (self.get_internal_id(), str(self.get_parameters_for_next_task())))
self.logger.info("Work %s parameters for next task: %s" % (self.internal_id, str(self.get_parameters_for_next_task())))
if self.get_parameters_for_next_task():
return True
else:
Expand All @@ -120,39 +120,37 @@ def get_rucio_client(self):

def poll_external_collection(self, coll):
try:
if 'status' in coll and coll['status'] in [CollectionStatus.Closed]:
if coll.status in [CollectionStatus.Closed]:
return coll
else:
client = self.get_rucio_client()
did_meta = client.get_metadata(scope=coll['scope'], name=coll['name'])
if 'coll_metadata' not in coll:
coll['coll_metadata'] = {}
coll['coll_metadata']['bytes'] = did_meta['bytes']
coll['coll_metadata']['total_files'] = did_meta['length']
coll['coll_metadata']['availability'] = did_meta['availability']
coll['coll_metadata']['events'] = did_meta['events']
coll['coll_metadata']['is_open'] = did_meta['is_open']
coll['coll_metadata']['run_number'] = did_meta['run_number']
coll['coll_metadata']['did_type'] = did_meta['did_type']
coll['coll_metadata']['list_all_files'] = False

if (('is_open' in coll['coll_metadata'] and not coll['coll_metadata']['is_open'])
or ('force_close' in coll['coll_metadata'] and coll['coll_metadata']['force_close'])): # noqa: W503
did_meta = client.get_metadata(scope=coll.scope, name=coll.name)
coll.coll_metadata['bytes'] = did_meta['bytes']
coll.coll_metadata['total_files'] = did_meta['length']
coll.coll_metadata['availability'] = did_meta['availability']
coll.coll_metadata['events'] = did_meta['events']
coll.coll_metadata['is_open'] = did_meta['is_open']
coll.coll_metadata['run_number'] = did_meta['run_number']
coll.coll_metadata['did_type'] = did_meta['did_type']
coll.coll_metadata['list_all_files'] = False

if (('is_open' in coll.coll_metadata and not coll.coll_metadata['is_open'])
or ('force_close' in coll.coll_metadata and coll.coll_metadata['force_close'])): # noqa: W503
coll_status = CollectionStatus.Closed
else:
coll_status = CollectionStatus.Open
coll['status'] = coll_status
coll.status = coll_status

if 'did_type' in coll['coll_metadata']:
if coll['coll_metadata']['did_type'] == 'DATASET':
if 'did_type' in coll.coll_metadata:
if coll.coll_metadata['did_type'] == 'DATASET':
coll_type = CollectionType.Dataset
elif coll['coll_metadata']['did_type'] == 'CONTAINER':
elif coll.coll_metadata['did_type'] == 'CONTAINER':
coll_type = CollectionType.Container
else:
coll_type = CollectionType.File
else:
coll_type = CollectionType.Dataset
coll['coll_type'] = coll_type
coll.coll_metadata['coll_type'] = coll_type

return coll
except Exception as ex:
Expand All @@ -179,10 +177,10 @@ def get_input_contents(self):
ret_file = {'coll_id': coll['coll_id'],
'scope': coll['scope'],
'name': coll['name'],
'bytes': coll['coll_metadata']['bytes'],
'bytes': coll.coll_metadata['bytes'],
'adler32': None,
'min_id': 0,
'max_id': coll['coll_metadata']['total_files'],
'max_id': coll.coll_metadata['total_files'],
'content_type': ContentType.File,
'content_metadata': {'total_files': coll['coll_metadata']['total_files']}
}
Expand Down Expand Up @@ -244,16 +242,19 @@ def get_new_input_output_maps(self, mapped_input_output_maps={}):

return new_input_output_maps

def get_processing(self, input_output_maps):
def get_processing(self, input_output_maps, without_creating=False):
if self.active_processings:
return self.processings[self.active_processings[0]]
else:
return self.create_processing(input_output_maps)
if not without_creating:
return self.create_processing(input_output_maps)
return None

def create_processing(self, input_output_maps):
proc = {'processing_metadata': {'internal_id': str(uuid.uuid1())}}
processing_metadata = {}
proc = Processing(processing_metadata=processing_metadata)
self.add_processing_to_processings(proc)
self.active_processings.append(proc['processing_metadata']['internal_id'])
self.active_processings.append(proc.internal_id)
return proc

def get_status_statistics(self, registered_input_output_maps):
Expand Down Expand Up @@ -296,7 +297,7 @@ def syn_work_status(self, registered_input_output_maps):

self.syn_collection_status()

if self.is_processings_terminated() and not self.has_new_inputs():
if self.is_processings_terminated() and not self.has_new_inputs:
if self.is_processings_finished():
self.status = WorkStatus.Finished
elif self.is_processings_failed():
Expand Down
Loading

0 comments on commit 9149082

Please sign in to comment.