Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Combine several jobs into one to save computation resources #131

Merged
merged 6 commits into from
Jan 19, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions alea/model.py
Original file line number Diff line number Diff line change
Expand Up @@ -204,10 +204,10 @@ def store_data(
Store a list of datasets.
(each on the form of a list of one or more structured arrays or dicts)
Using inference_interface, but included here to allow over-writing.
The structure would be: [[datasets1], [datasets2], ..., [datasetsn]],
The structure would be: ``[[datasets1], [datasets2], ..., [datasetsn]]``,
where each of datasets is a list of structured arrays.
If you specify, it is set, if not it will read from self.get_likelihood_term_names.
If not defined, it will be ["0", "1", ..., "n-1"]. The metadata is optional.
If you specify, it is set, if not it will read from ``self.get_likelihood_term_names``.
If not defined, it will be ``["0", "1", ..., "n-1"]``. The metadata is optional.

Args:
file_name (str): name of the file to store the data in
Expand Down
34 changes: 33 additions & 1 deletion alea/submitter.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ class Submitter:

config_file_path: str
template_path: str
combine_n_jobs: int = 1
allowed_special_args: List[str] = []
logging = logging.getLogger("submitter_logger")

Expand Down Expand Up @@ -316,7 +317,7 @@ def computation_tickets_generator(self):
"""Get the submission script for the current configuration. It generates the submission
script for each combination of the computation options.

for Runner from to_zip, to_vary and in_common.
For Runner from to_zip, to_vary and in_common:
- First, generate the combined computational options directly.
- Second, update the input and output folder of the options.
- Thrid, collect the non-fittable(settable) parameters into nominal_values.
Expand Down Expand Up @@ -387,6 +388,37 @@ def already_done(self, i_args: dict) -> bool:
is_done = False
return is_done

def combined_tickets_generator(self):
"""Get the combined submission script for the current configuration. ``self.combine_n_jobs``
jobs will be combined into one submission script.

Yields:
(str, str): the combined submission script and name output_filename

Note:
User can add ``combine_n_jobs: 10`` in ``local_configurations``,
``slurm_configurations`` or ``htcondor_configurations`` to combine 10 jobs into
one submission script. User will need this feature when the number of jobs pending
for submission is too large.

"""

_script = ""
n_combined = 0
for script, last_output_filename in self.computation_tickets_generator():
if n_combined == 0:
_script += script
else:
_script += " && " + script
n_combined += 1
if n_combined == self.combine_n_jobs:
yield _script, last_output_filename
n_combined = 0
_script = ""
else:
if n_combined > 0:
yield _script, last_output_filename

@staticmethod
def update_n_batch(runner_args):
"""Update n_mc if n_batch is provided.
Expand Down
3 changes: 2 additions & 1 deletion alea/submitters/local.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ def __init__(self, *args, **kwargs):
"""Initialize the SubmitterLocal class."""
self.local_configurations = kwargs.get("local_configurations", {})
self.template_path = self.local_configurations.pop("template_path", None)
self.combine_n_jobs = self.local_configurations.pop("combine_n_jobs", 1)
super().__init__(*args, **kwargs)

@staticmethod
Expand All @@ -57,7 +58,7 @@ def submit(self):
If debug is True, only return the first instance of Runner.

"""
for _, (script, _) in enumerate(self.computation_tickets_generator()):
for _, (script, _) in enumerate(self.combined_tickets_generator()):
if self.debug:
print(script)
return self.initialized_runner(script)
Expand Down
7 changes: 4 additions & 3 deletions alea/submitters/slurm.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ def __init__(self, *args, **kwargs):
self.name = self.__class__.__name__
self.slurm_configurations = kwargs.get("slurm_configurations", {})
self.template_path = self.slurm_configurations.pop("template_path", None)
self.combine_n_jobs = self.slurm_configurations.pop("combine_n_jobs", 1)
self.batchq_arguments = {**BATCHQ_DEFAULT_ARGUMENTS, **self.slurm_configurations}
self._check_batchq_arguments()
super().__init__(*args, **kwargs)
Expand Down Expand Up @@ -92,7 +93,7 @@ def submit(self, **kwargs):
"""
_jobname = kwargs.pop("jobname", self.name.lower())
batchq_kwargs = {}
for job, (script, output_filename) in enumerate(self.computation_tickets_generator()):
for job, (script, last_output_filename) in enumerate(self.combined_tickets_generator()):
if self.debug:
print(script)
if job > 0:
Expand All @@ -101,7 +102,7 @@ def submit(self, **kwargs):
self.logging.info("Too many jobs. Sleeping for 30s.")
time.sleep(30)
batchq_kwargs["jobname"] = f"{_jobname}_{job:03d}"
if output_filename is not None:
batchq_kwargs["log"] = os.path.join(self.log_dir, f"{output_filename}.log")
if last_output_filename is not None:
batchq_kwargs["log"] = os.path.join(self.log_dir, f"{last_output_filename}.log")
self.logging.debug(f"Call '_submit' with job: {job} and kwargs: {batchq_kwargs}.")
self._submit(script, **batchq_kwargs)
Loading