Skip to content

Commit

Permalink
Use MB all the time in SubmitterHTCondor, no more kB (#201)
Browse files Browse the repository at this point in the history
* Use MB all the time in `SubmitterHTCondor`, no more kB

* No `_get_file_name`, use `os.path.basename`
  • Loading branch information
dachengx authored Sep 5, 2024
1 parent 3760d53 commit 127c882
Show file tree
Hide file tree
Showing 2 changed files with 27 additions and 30 deletions.
8 changes: 4 additions & 4 deletions alea/submitters/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,8 @@ htcondor_configurations:
cluster_size: 1
request_cpus: 4
request_memory: 2000
request_disk: 2000000
combine_disk: 20000000
request_disk: 2000
combine_disk: 20000
dagman_maxidle: 100000
dagman_retry: 2
dagman_maxjobs: 100000
Expand All @@ -48,8 +48,8 @@ htcondor_configurations:
- `cluster_size`: clustering multiple `alea_run_toymc` jobs into a single job. For example, now you expect to run 100 individual `alea_run_toymc` jobs, and you specified `cluster_size: 10`, there will be only 10 `alea_run_toymc` in the end, each containing 10 jobs to run in sequence. Unless you got crazy amount of jobs like >200, I don't recommend changing it from 1.
- `request_cpus`: number of CPUs for each job. It should be larger than alea max multi-threading number, otherwise OSG will complains.
- `request_memory`: requested memory for each job in unit of MB. Please don't put a number larger than what you need, because it will significantly reduce our available slots.
- `request_disk`: requested disk for each job in unit of KB. Please don't put a number larger than what you need, because it will significantly reduce our available slots.
- `combine_disk`: requested disk for combine job in unit of KB. In most cases 20GB is enough.
- `request_disk`: requested disk for each job in unit of MB. Please don't put a number larger than what you need, because it will significantly reduce our available slots.
- `combine_disk`: requested disk for combine job in unit of MB. In most cases 20GB is enough.
- `dagman_maxidle`: maximum of jobs allowed to be idle. The default 100000 is good for most cases.
- `dagman_retry`: number of automatic retry for each job when failure happen for whatever reason. Note that everytime it retries, we will have new resources requirement `n_retry * request_memory` and `n_retry * request_disk` to get rid of failure due to resource shortage.
- `dagman_maxjobs`: maximum of jobs allowed to be running. The default 100000 is good for most cases.
Expand Down
49 changes: 23 additions & 26 deletions alea/submitters/htcondor.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,13 +62,13 @@ def __init__(self, *args, **kwargs):
# Resources configurations
self.request_cpus = self.htcondor_configurations.pop("request_cpus", 1)
self.request_memory = self.htcondor_configurations.pop("request_memory", 2000)
self.request_disk = self.htcondor_configurations.pop("request_disk", 2000000)
self.combine_disk = self.htcondor_configurations.pop("combine_disk", 20000000)
self.request_disk = self.htcondor_configurations.pop("request_disk", 2_000)
self.combine_disk = self.htcondor_configurations.pop("combine_disk", 20_000)

# Dagman configurations
self.dagman_maxidle = self.htcondor_configurations.pop("dagman_maxidle", 100000)
self.dagman_maxidle = self.htcondor_configurations.pop("dagman_maxidle", 100_000)
self.dagman_retry = self.htcondor_configurations.pop("dagman_retry", 2)
self.dagman_maxjobs = self.htcondor_configurations.pop("dagman_maxjobs", 100000)
self.dagman_maxjobs = self.htcondor_configurations.pop("dagman_maxjobs", 100_000)

super().__init__(*args, **kwargs)

Expand Down Expand Up @@ -125,10 +125,6 @@ def requirements(self):

return _requirements

def _get_file_name(self, file_path):
"""Get the filename from the file path."""
return os.path.basename(file_path)

def _validate_x509_proxy(self, min_valid_hours=20):
"""Ensure $X509_USER_PROXY exists and has enough time left.
Expand Down Expand Up @@ -193,7 +189,7 @@ def _modify_yaml(self):
"""
# Output file will have the same name as input file but with '_modified' appended
_output_file = self._get_file_name(self.statistical_model_config).replace(
_output_file = os.path.basename(self.statistical_model_config).replace(
".yaml", "_modified.yaml"
)
self.modified_statistical_model_config = os.path.join(self.generated_dir, _output_file)
Expand Down Expand Up @@ -378,25 +374,25 @@ def _generate_rc(self):
rc = ReplicaCatalog()

# Add the templates
self.f_template_tarball = File(self._get_file_name(self.template_tarball))
self.f_template_tarball = File(os.path.basename(self.template_tarball))
rc.add_replica(
"local",
self._get_file_name(self.template_tarball),
os.path.basename(self.template_tarball),
f"file://{self.template_tarball}",
)
# Add the yaml files
self.f_running_configuration = File(self._get_file_name(self.config_file_path))
self.f_running_configuration = File(os.path.basename(self.config_file_path))
rc.add_replica(
"local",
self._get_file_name(self.config_file_path),
os.path.basename(self.config_file_path),
f"file://{self.config_file_path}",
)
self.f_statistical_model_config = File(
self._get_file_name(self.modified_statistical_model_config)
os.path.basename(self.modified_statistical_model_config)
)
rc.add_replica(
"local",
self._get_file_name(self.modified_statistical_model_config),
os.path.basename(self.modified_statistical_model_config),
f"file://{self.modified_statistical_model_config}",
)
# Add run_toymc_wrapper
Expand Down Expand Up @@ -428,12 +424,12 @@ def _initialize_job(
name="run_toymc_wrapper",
cores=1,
memory=1_700,
disk=1_000_000,
disk=1_000,
run_on_submit_node=False,
):
"""Initilize a Pegasus job, also sets resource profiles.
Memory in unit of MB, and disk in unit of KB.
Memory and disk in unit of MB.
"""
job = Job(name)
Expand All @@ -446,13 +442,14 @@ def _initialize_job(

# Set memory and disk requirements
# If the job fails, retry with more memory and disk
# Somehow we need to write memory in MB and disk in kB
memory_str = (
"ifthenelse(isundefined(DAGNodeRetry) || "
f"DAGNodeRetry == 0, {memory}, (DAGNodeRetry + 1) * {memory})"
)
disk_str = (
"ifthenelse(isundefined(DAGNodeRetry) || "
f"DAGNodeRetry == 0, {disk}, (DAGNodeRetry + 1) * {disk})"
f"DAGNodeRetry == 0, {disk * 1_000}, (DAGNodeRetry + 1) * {disk * 1_000})"
)
job.add_profiles(Namespace.CONDOR, "request_disk", disk_str)
job.add_profiles(Namespace.CONDOR, "request_memory", memory_str)
Expand Down Expand Up @@ -482,10 +479,10 @@ def _add_combine_job(self, combine_i):

def _add_limit_threshold(self):
"""Add the Neyman thresholds limit_threshold to the replica catalog."""
self.f_limit_threshold = File(self._get_file_name(self.limit_threshold))
self.f_limit_threshold = File(os.path.basename(self.limit_threshold))
self.rc.add_replica(
"local",
self._get_file_name(self.limit_threshold),
os.path.basename(self.limit_threshold),
"file://{}".format(self.limit_threshold),
)
self.added_limit_threshold = True
Expand All @@ -495,14 +492,14 @@ def _correct_paths_args_dict(self, args_dict):
args_dict["statistical_model_args"]["template_path"] = "templates/"

if "limit_threshold" in args_dict["statistical_model_args"].keys():
limit_threshold = self._get_file_name(
limit_threshold = os.path.basename(
args_dict["statistical_model_args"]["limit_threshold"]
)
args_dict["statistical_model_args"]["limit_threshold"] = limit_threshold

args_dict["toydata_filename"] = self._get_file_name(args_dict["toydata_filename"])
args_dict["output_filename"] = self._get_file_name(args_dict["output_filename"])
args_dict["statistical_model_config"] = self._get_file_name(
args_dict["toydata_filename"] = os.path.basename(args_dict["toydata_filename"])
args_dict["output_filename"] = os.path.basename(args_dict["output_filename"])
args_dict["statistical_model_config"] = os.path.basename(
self.modified_statistical_model_config
)

Expand All @@ -514,7 +511,7 @@ def _reorganize_script(self, script):
Correct the paths on the fly.
"""
executable = self._get_file_name(script.split()[1])
executable = os.path.basename(script.split()[1])
args_dict = Submitter.runner_kwargs_from_script(shlex.split(script)[2:])

# Add the limit_threshold to the replica catalog if not added
Expand Down Expand Up @@ -651,7 +648,7 @@ def _plan_and_submit(self):
staging_sites={"condorpool": "staging-davs"},
output_sites=["local"],
dir=os.path.dirname(self.runs_dir),
relative_dir=self.workflow_id,
relative_dir=os.path.basename(self.runs_dir),
**self.pegasus_config,
)

Expand Down

0 comments on commit 127c882

Please sign in to comment.