Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Configuration of disk should be in MB #174

Merged
merged 1 commit into from
Sep 15, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 13 additions & 15 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ Particularly it uses information in the field of the config with header 'Outsour

```
[basic]
# usually helpful for debugging but it's a lot of msg
; usually helpful for debugging but it's a lot of msg
logging_level=DEBUG

[RunDB]
Expand All @@ -101,30 +101,28 @@ xe1t_database = run

[Outsource]
work_dir = /scratch/$USER/workflows
pegasus_path = /usr
# sites to exclude (GLIDEIN_Site), comma seprated list
; sites to exclude (GLIDEIN_Site), comma seprated list
exclude_sites = SU-ITS, NotreDame, UConn-HPC, Purdue Geddes, Chameleon, WSU-GRID, SIUE-CC-production, Lancium
# data type to process
; data type to process
dtypes = peaklets, hitlets_nv, events_nv, events_mv, event_info_double, afterpulses, led_calibration
# below are specific dtype options
; below are specific dtype options
us_only = False
hs06_test_run = False
this_site_only = False
raw_records_rse = UC_OSG_USERDISK
records_rse = UC_MIDWAY_USERDISK
peaklets_rse = UC_OSG_USERDISK
events_rse = UC_MIDWAY_USERDISK
exclude_modes = tpc_noise, tpc_rn_8pmts, tpc_commissioning_pmtgain, tpc_rn_6pmts, tpc_rn_12_pmts, nVeto_LED_calibration,tpc_rn_12pmts, nVeto_LED_calibration_2
notification_email =
min_run_number = 666
max_daily = 2000
hs06_test_run = False
this_site_only =
chunks_per_job = 10
combine_memory = 60000 # MB
combine_disk = 120000000 # KB
peaklets_memory = 14500 # MB
peaklets_disk = 50000000 # KB
events_memory = 60000 # MB
events_disk = 120000000 # KB
us_only = False
combine_memory = 60000
combine_disk = 120000
peaklets_memory = 14500
peaklets_disk = 50000
events_memory = 60000
events_disk = 120000
```

## Add a setup script
Expand Down
5 changes: 2 additions & 3 deletions outsource/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -115,8 +115,7 @@
class RunConfig:
"""The configuration of how a run will be processed.

The class will focus on the RSE and instruction to the outsource
submitter.
The class will focus on the RSE and instruction to the submitter.
"""

# Data availability to site selection map.
Expand Down Expand Up @@ -210,7 +209,7 @@ def get_requirements(self, rses):
requirements = self.requirements_base if len(rses) > 0 else self.requirements_base_us
if sites_expression:
requirements += f" && ({sites_expression})"
# us nodes
# US nodes
requirements_us = self.requirements_base_us
# Add excluded nodes
if self._exclude_sites:
Expand Down
13 changes: 6 additions & 7 deletions outsource/outsource.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,6 @@ class Outsource:
}

# Jobs details for a given datatype
# disk is in KB, memory in MB
job_kwargs = {
"combine": dict(name="combine", memory=COMBINE_MEMORY, disk=COMBINE_DISK),
"download": dict(name="download", memory=PEAKLETS_MEMORY, disk=PEAKLETS_DISK),
Expand Down Expand Up @@ -118,7 +117,7 @@ def __init__(
)

if not isinstance(runlist, list):
raise RuntimeError("Outsource expects a list of DBConfigs to run")
raise RuntimeError("Outsource expects a list of run_id")
self._runlist = runlist

# Setup context
Expand Down Expand Up @@ -151,10 +150,10 @@ def workflow(self):
def runlist(self):
return os.path.join(self.generated_dir, "runlist.txt")

def _job(self, name, run_on_submit_node=False, cores=1, memory=1_700, disk=1_000_000):
def _job(self, name, run_on_submit_node=False, cores=1, memory=1_700, disk=1_000):
"""Wrapper for a Pegasus job, also sets resource requirement profiles.

Memory in unit of MB, and disk in unit of MB.
Memory and disk in unit of MB.
"""
job = Job(name)

Expand All @@ -172,7 +171,7 @@ def _job(self, name, run_on_submit_node=False, cores=1, memory=1_700, disk=1_000
)
disk_str = (
"ifthenelse(isundefined(DAGNodeRetry) || "
f"DAGNodeRetry == 0, {disk}, (DAGNodeRetry + 1)*{disk})"
f"DAGNodeRetry == 0, {disk * 1_000}, (DAGNodeRetry + 1)*{disk * 1_000})"
)
job.add_profiles(Namespace.CONDOR, "request_disk", disk_str)
job.add_profiles(Namespace.CONDOR, "request_memory", memory)
Expand Down Expand Up @@ -406,11 +405,11 @@ def _generate_workflow(self):
else:
self.logger.warning(
f"No data found as the dependency of {dbcfg.key_for(dtype)}. "
f"Hopefully those will be created by the workflow"
f"Hopefully those will be created by the workflow."
)

rses_specified = uconfig.get("Outsource", "raw_records_rse").split(",")
# For standalone downloads, only target us
# For standalone downloads, only target US
if dbcfg.standalone_download:
rses = rses_specified

Expand Down