From 128b14b25160a1444f93c22c73923c71dac58cc1 Mon Sep 17 00:00:00 2001 From: dachengx Date: Sat, 14 Sep 2024 23:21:13 -0500 Subject: [PATCH] Configuration of disk should be in MB --- README.md | 28 +++++++++++++--------------- outsource/config.py | 5 ++--- outsource/outsource.py | 13 ++++++------- 3 files changed, 21 insertions(+), 25 deletions(-) diff --git a/README.md b/README.md index 6ecde76..3af2d1e 100644 --- a/README.md +++ b/README.md @@ -84,7 +84,7 @@ Particularly it uses information in the field of the config with header 'Outsour ``` [basic] -# usually helpful for debugging but it's a lot of msg +; usually helpful for debugging but it's a lot of msg logging_level=DEBUG [RunDB] @@ -101,30 +101,28 @@ xe1t_database = run [Outsource] work_dir = /scratch/$USER/workflows -pegasus_path = /usr -# sites to exclude (GLIDEIN_Site), comma seprated list +; sites to exclude (GLIDEIN_Site), comma seprated list exclude_sites = SU-ITS, NotreDame, UConn-HPC, Purdue Geddes, Chameleon, WSU-GRID, SIUE-CC-production, Lancium -# data type to process +; data type to process dtypes = peaklets, hitlets_nv, events_nv, events_mv, event_info_double, afterpulses, led_calibration -# below are specific dtype options +; below are specific dtype options +us_only = False +hs06_test_run = False +this_site_only = False raw_records_rse = UC_OSG_USERDISK records_rse = UC_MIDWAY_USERDISK peaklets_rse = UC_OSG_USERDISK events_rse = UC_MIDWAY_USERDISK exclude_modes = tpc_noise, tpc_rn_8pmts, tpc_commissioning_pmtgain, tpc_rn_6pmts, tpc_rn_12_pmts, nVeto_LED_calibration,tpc_rn_12pmts, nVeto_LED_calibration_2 -notification_email = min_run_number = 666 max_daily = 2000 -hs06_test_run = False -this_site_only = chunks_per_job = 10 -combine_memory = 60000 # MB -combine_disk = 120000000 # KB -peaklets_memory = 14500 # MB -peaklets_disk = 50000000 # KB -events_memory = 60000 # MB -events_disk = 120000000 # KB -us_only = False +combine_memory = 60000 +combine_disk = 120000 +peaklets_memory = 14500 +peaklets_disk = 50000 +events_memory = 60000 +events_disk = 120000 ``` ## Add a setup script diff --git a/outsource/config.py b/outsource/config.py index fb792f5..6deb350 100644 --- a/outsource/config.py +++ b/outsource/config.py @@ -115,8 +115,7 @@ class RunConfig: """The configuration of how a run will be processed. - The class will focus on the RSE and instruction to the outsource - submitter. + The class will focus on the RSE and instruction to the submitter. """ # Data availability to site selection map. @@ -210,7 +209,7 @@ def get_requirements(self, rses): requirements = self.requirements_base if len(rses) > 0 else self.requirements_base_us if sites_expression: requirements += f" && ({sites_expression})" - # us nodes + # US nodes requirements_us = self.requirements_base_us # Add excluded nodes if self._exclude_sites: diff --git a/outsource/outsource.py b/outsource/outsource.py index a08e72d..d8337d3 100644 --- a/outsource/outsource.py +++ b/outsource/outsource.py @@ -62,7 +62,6 @@ class Outsource: } # Jobs details for a given datatype - # disk is in KB, memory in MB job_kwargs = { "combine": dict(name="combine", memory=COMBINE_MEMORY, disk=COMBINE_DISK), "download": dict(name="download", memory=PEAKLETS_MEMORY, disk=PEAKLETS_DISK), @@ -118,7 +117,7 @@ def __init__( ) if not isinstance(runlist, list): - raise RuntimeError("Outsource expects a list of DBConfigs to run") + raise RuntimeError("Outsource expects a list of run_id") self._runlist = runlist # Setup context @@ -151,10 +150,10 @@ def workflow(self): def runlist(self): return os.path.join(self.generated_dir, "runlist.txt") - def _job(self, name, run_on_submit_node=False, cores=1, memory=1_700, disk=1_000_000): + def _job(self, name, run_on_submit_node=False, cores=1, memory=1_700, disk=1_000): """Wrapper for a Pegasus job, also sets resource requirement profiles. - Memory in unit of MB, and disk in unit of MB. + Memory and disk in unit of MB. """ job = Job(name) @@ -172,7 +171,7 @@ def _job(self, name, run_on_submit_node=False, cores=1, memory=1_700, disk=1_000 ) disk_str = ( "ifthenelse(isundefined(DAGNodeRetry) || " - f"DAGNodeRetry == 0, {disk}, (DAGNodeRetry + 1)*{disk})" + f"DAGNodeRetry == 0, {disk * 1_000}, (DAGNodeRetry + 1)*{disk * 1_000})" ) job.add_profiles(Namespace.CONDOR, "request_disk", disk_str) job.add_profiles(Namespace.CONDOR, "request_memory", memory) @@ -406,11 +405,11 @@ def _generate_workflow(self): else: self.logger.warning( f"No data found as the dependency of {dbcfg.key_for(dtype)}. " - f"Hopefully those will be created by the workflow" + f"Hopefully those will be created by the workflow." ) rses_specified = uconfig.get("Outsource", "raw_records_rse").split(",") - # For standalone downloads, only target us + # For standalone downloads, only target US if dbcfg.standalone_download: rses = rses_specified