Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Combine several jobs into one to save computation resources #131

Merged
merged 6 commits into from
Jan 19, 2024
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 26 additions & 0 deletions alea/submitter.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ class Submitter:

config_file_path: str
template_path: str
combine_n_job: int = 1
allowed_special_args: List[str] = []
logging = logging.getLogger("submitter_logger")

Expand Down Expand Up @@ -387,6 +388,31 @@ def already_done(self, i_args: dict) -> bool:
is_done = False
return is_done

def combined_tickets_generator(self):
"""Get the combined submission script for the current configuration. ``self.vcombine_n_job``
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
"""Get the combined submission script for the current configuration. ``self.vcombine_n_job``
"""Get the combined submission script for the current configuration. ``self.combine_n_job``

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

could you maybe add a small example to make clear how/why one should use the combine_n_jobs? 😊

jobs will be combined into one submission script.

Yields:
(str, str): the combined submission script and name output_filename

"""

_script = ""
n_combined = 0
for script, last_output_filename in self.computation_tickets_generator():
if n_combined == 0:
_script += script
else:
_script += " && " + script
n_combined += 1
if n_combined == self.combine_n_job:
yield _script, last_output_filename
n_combined = 0
_script = ""
else:
if n_combined > 0:
yield _script, last_output_filename

@staticmethod
def update_n_batch(runner_args):
"""Update n_mc if n_batch is provided.
Expand Down
3 changes: 2 additions & 1 deletion alea/submitters/local.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ def __init__(self, *args, **kwargs):
"""Initialize the SubmitterLocal class."""
self.local_configurations = kwargs.get("local_configurations", {})
self.template_path = self.local_configurations.pop("template_path", None)
self.combine_n_job = self.local_configurations.pop("combine_n_job", 1)
super().__init__(*args, **kwargs)

@staticmethod
Expand All @@ -57,7 +58,7 @@ def submit(self):
If debug is True, only return the first instance of Runner.

"""
for _, (script, _) in enumerate(self.computation_tickets_generator()):
for _, (script, _) in enumerate(self.combined_tickets_generator()):
if self.debug:
print(script)
return self.initialized_runner(script)
Expand Down
7 changes: 4 additions & 3 deletions alea/submitters/slurm.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ def __init__(self, *args, **kwargs):
self.name = self.__class__.__name__
self.slurm_configurations = kwargs.get("slurm_configurations", {})
self.template_path = self.slurm_configurations.pop("template_path", None)
self.combine_n_job = self.slurm_configurations.pop("combine_n_job", 1)
self.batchq_arguments = {**BATCHQ_DEFAULT_ARGUMENTS, **self.slurm_configurations}
self._check_batchq_arguments()
super().__init__(*args, **kwargs)
Expand Down Expand Up @@ -92,7 +93,7 @@ def submit(self, **kwargs):
"""
_jobname = kwargs.pop("jobname", self.name.lower())
batchq_kwargs = {}
for job, (script, output_filename) in enumerate(self.computation_tickets_generator()):
for job, (script, last_output_filename) in enumerate(self.combined_tickets_generator()):
if self.debug:
print(script)
if job > 0:
Expand All @@ -101,7 +102,7 @@ def submit(self, **kwargs):
self.logging.info("Too many jobs. Sleeping for 30s.")
time.sleep(30)
batchq_kwargs["jobname"] = f"{_jobname}_{job:03d}"
if output_filename is not None:
batchq_kwargs["log"] = os.path.join(self.log_dir, f"{output_filename}.log")
if last_output_filename is not None:
batchq_kwargs["log"] = os.path.join(self.log_dir, f"{last_output_filename}.log")
self.logging.debug(f"Call '_submit' with job: {job} and kwargs: {batchq_kwargs}.")
self._submit(script, **batchq_kwargs)
Loading