From 40841a88936034e1685bae207c35be7c09ea49cb Mon Sep 17 00:00:00 2001 From: dachengx Date: Sat, 13 Jan 2024 03:33:19 -0600 Subject: [PATCH 1/3] Combine several jobs into one to save computation resources --- alea/submitter.py | 26 ++++++++++++++++++++++++++ alea/submitters/local.py | 3 ++- alea/submitters/slurm.py | 7 ++++--- 3 files changed, 32 insertions(+), 4 deletions(-) diff --git a/alea/submitter.py b/alea/submitter.py index 07d006d5..a24e7a4f 100644 --- a/alea/submitter.py +++ b/alea/submitter.py @@ -61,6 +61,7 @@ class Submitter: config_file_path: str template_path: str + combine_n_job: int = 1 allowed_special_args: List[str] = [] logging = logging.getLogger("submitter_logger") @@ -372,6 +373,31 @@ def computation_tickets_generator(self): else: yield script, output_filename + def combined_tickets_generator(self): + """Get the combined submission script for the current configuration. ``self.vcombine_n_job`` + jobs will be combined into one submission script. + + Yields: + (str, str): the combined submission script and name output_filename + + """ + + _script = "" + n_combined = 0 + for script, last_output_filename in self.computation_tickets_generator(): + if n_combined == 0: + _script += script + else: + _script += " && " + script + n_combined += 1 + if n_combined == self.combine_n_job: + yield _script, last_output_filename + n_combined = 0 + _script = "" + else: + if n_combined > 0: + yield _script, last_output_filename + @staticmethod def update_n_batch(runner_args): """Update n_mc if n_batch is provided. diff --git a/alea/submitters/local.py b/alea/submitters/local.py index e1849bc5..1dc22dbe 100644 --- a/alea/submitters/local.py +++ b/alea/submitters/local.py @@ -32,6 +32,7 @@ def __init__(self, *args, **kwargs): """Initialize the SubmitterLocal class.""" self.local_configurations = kwargs.get("local_configurations", {}) self.template_path = self.local_configurations.pop("template_path", None) + self.combine_n_job = self.local_configurations.pop("combine_n_job", 1) super().__init__(*args, **kwargs) @staticmethod @@ -57,7 +58,7 @@ def submit(self): If debug is True, only return the first instance of Runner. """ - for _, (script, _) in enumerate(self.computation_tickets_generator()): + for _, (script, _) in enumerate(self.combined_tickets_generator()): if self.debug: print(script) return self.initialized_runner(script) diff --git a/alea/submitters/slurm.py b/alea/submitters/slurm.py index 1d561018..8c26feb2 100644 --- a/alea/submitters/slurm.py +++ b/alea/submitters/slurm.py @@ -38,6 +38,7 @@ def __init__(self, *args, **kwargs): self.name = self.__class__.__name__ self.slurm_configurations = kwargs.get("slurm_configurations", {}) self.template_path = self.slurm_configurations.pop("template_path", None) + self.combine_n_job = self.slurm_configurations.pop("combine_n_job", 1) self.batchq_arguments = {**BATCHQ_DEFAULT_ARGUMENTS, **self.slurm_configurations} self._check_batchq_arguments() super().__init__(*args, **kwargs) @@ -92,7 +93,7 @@ def submit(self, **kwargs): """ _jobname = kwargs.pop("jobname", self.name.lower()) batchq_kwargs = {} - for job, (script, output_filename) in enumerate(self.computation_tickets_generator()): + for job, (script, last_output_filename) in enumerate(self.combined_tickets_generator()): if self.debug: print(script) if job > 0: @@ -101,7 +102,7 @@ def submit(self, **kwargs): self.logging.info("Too many jobs. Sleeping for 30s.") time.sleep(30) batchq_kwargs["jobname"] = f"{_jobname}_{job:03d}" - if output_filename is not None: - batchq_kwargs["log"] = os.path.join(self.log_dir, f"{output_filename}.log") + if last_output_filename is not None: + batchq_kwargs["log"] = os.path.join(self.log_dir, f"{last_output_filename}.log") self.logging.debug(f"Call '_submit' with job: {job} and kwargs: {batchq_kwargs}.") self._submit(script, **batchq_kwargs) From 1e2ae053f92b8c03ce7186d3f5aef67244ad697c Mon Sep 17 00:00:00 2001 From: dachengx Date: Fri, 19 Jan 2024 05:28:06 -0600 Subject: [PATCH 2/3] Add example of potential usage of `combine_n_jobs` --- alea/submitter.py | 11 ++++++++--- alea/submitters/local.py | 2 +- alea/submitters/slurm.py | 2 +- 3 files changed, 10 insertions(+), 5 deletions(-) diff --git a/alea/submitter.py b/alea/submitter.py index e6ae89a8..8a917c99 100644 --- a/alea/submitter.py +++ b/alea/submitter.py @@ -61,7 +61,7 @@ class Submitter: config_file_path: str template_path: str - combine_n_job: int = 1 + combine_n_jobs: int = 1 allowed_special_args: List[str] = [] logging = logging.getLogger("submitter_logger") @@ -389,12 +389,17 @@ def already_done(self, i_args: dict) -> bool: return is_done def combined_tickets_generator(self): - """Get the combined submission script for the current configuration. ``self.vcombine_n_job`` + """Get the combined submission script for the current configuration. ``self.combine_n_jobs`` jobs will be combined into one submission script. Yields: (str, str): the combined submission script and name output_filename + Example: + Use can add ``combine_n_jobs: 10`` in ``local_configurations``, ``slurm_configurations`` + or ``htcondor_configurations`` to combine 10 jobs into one submission script. User + will need this feature when the number of jobs pending for submission is too large. + """ _script = "" @@ -405,7 +410,7 @@ def combined_tickets_generator(self): else: _script += " && " + script n_combined += 1 - if n_combined == self.combine_n_job: + if n_combined == self.combine_n_jobs: yield _script, last_output_filename n_combined = 0 _script = "" diff --git a/alea/submitters/local.py b/alea/submitters/local.py index 1dc22dbe..7e38f0e2 100644 --- a/alea/submitters/local.py +++ b/alea/submitters/local.py @@ -32,7 +32,7 @@ def __init__(self, *args, **kwargs): """Initialize the SubmitterLocal class.""" self.local_configurations = kwargs.get("local_configurations", {}) self.template_path = self.local_configurations.pop("template_path", None) - self.combine_n_job = self.local_configurations.pop("combine_n_job", 1) + self.combine_n_jobs = self.local_configurations.pop("combine_n_jobs", 1) super().__init__(*args, **kwargs) @staticmethod diff --git a/alea/submitters/slurm.py b/alea/submitters/slurm.py index 8c26feb2..5646e77f 100644 --- a/alea/submitters/slurm.py +++ b/alea/submitters/slurm.py @@ -38,7 +38,7 @@ def __init__(self, *args, **kwargs): self.name = self.__class__.__name__ self.slurm_configurations = kwargs.get("slurm_configurations", {}) self.template_path = self.slurm_configurations.pop("template_path", None) - self.combine_n_job = self.slurm_configurations.pop("combine_n_job", 1) + self.combine_n_jobs = self.slurm_configurations.pop("combine_n_jobs", 1) self.batchq_arguments = {**BATCHQ_DEFAULT_ARGUMENTS, **self.slurm_configurations} self._check_batchq_arguments() super().__init__(*args, **kwargs) From 834dc61ccf655db5752938d19d18ac160bd4c3ae Mon Sep 17 00:00:00 2001 From: dachengx Date: Fri, 19 Jan 2024 06:08:24 -0600 Subject: [PATCH 3/3] Minor update of docs --- alea/model.py | 6 +++--- alea/submitter.py | 11 ++++++----- 2 files changed, 9 insertions(+), 8 deletions(-) diff --git a/alea/model.py b/alea/model.py index 6296b3cd..ecc48c0e 100644 --- a/alea/model.py +++ b/alea/model.py @@ -204,10 +204,10 @@ def store_data( Store a list of datasets. (each on the form of a list of one or more structured arrays or dicts) Using inference_interface, but included here to allow over-writing. - The structure would be: [[datasets1], [datasets2], ..., [datasetsn]], + The structure would be: ``[[datasets1], [datasets2], ..., [datasetsn]]``, where each of datasets is a list of structured arrays. - If you specify, it is set, if not it will read from self.get_likelihood_term_names. - If not defined, it will be ["0", "1", ..., "n-1"]. The metadata is optional. + If you specify, it is set, if not it will read from ``self.get_likelihood_term_names``. + If not defined, it will be ``["0", "1", ..., "n-1"]``. The metadata is optional. Args: file_name (str): name of the file to store the data in diff --git a/alea/submitter.py b/alea/submitter.py index 8a917c99..c0e19d39 100644 --- a/alea/submitter.py +++ b/alea/submitter.py @@ -317,7 +317,7 @@ def computation_tickets_generator(self): """Get the submission script for the current configuration. It generates the submission script for each combination of the computation options. - for Runner from to_zip, to_vary and in_common. + For Runner from to_zip, to_vary and in_common: - First, generate the combined computational options directly. - Second, update the input and output folder of the options. - Thrid, collect the non-fittable(settable) parameters into nominal_values. @@ -395,10 +395,11 @@ def combined_tickets_generator(self): Yields: (str, str): the combined submission script and name output_filename - Example: - Use can add ``combine_n_jobs: 10`` in ``local_configurations``, ``slurm_configurations`` - or ``htcondor_configurations`` to combine 10 jobs into one submission script. User - will need this feature when the number of jobs pending for submission is too large. + Note: + User can add ``combine_n_jobs: 10`` in ``local_configurations``, + ``slurm_configurations`` or ``htcondor_configurations`` to combine 10 jobs into + one submission script. User will need this feature when the number of jobs pending + for submission is too large. """