From 5f06bfaae094710b757b27b4cb15b5011dd33b49 Mon Sep 17 00:00:00 2001 From: Eirini Koutsaniti Date: Fri, 4 Feb 2022 09:14:25 +0100 Subject: [PATCH 01/43] Add pin_nodes attribute to Job --- reframe/core/schedulers/__init__.py | 8 ++++++++ reframe/core/schedulers/slurm.py | 3 +++ 2 files changed, 11 insertions(+) diff --git a/reframe/core/schedulers/__init__.py b/reframe/core/schedulers/__init__.py index c8a7530c52..d6cc662797 100644 --- a/reframe/core/schedulers/__init__.py +++ b/reframe/core/schedulers/__init__.py @@ -159,6 +159,12 @@ class Job(jsonext.JSONSerializable): #: :type: :class:`reframe.core.launchers.JobLauncher` launcher = fields.TypedField(JobLauncher) + #: Pin nodes for the job + #: + #: :type: :class:`str` or :class:`None` + #: :default: :class:`None` + pin_nodes = fields.TypedField(str, type(None)) + # The sched_* arguments are exposed also to the frontend def __init__(self, name, @@ -182,6 +188,7 @@ def __init__(self, self.time_limit = None self.cli_options = list(sched_options) if sched_options else [] self.options = [] + self.pin_nodes = None self._name = name self._workdir = workdir @@ -422,6 +429,7 @@ def guess_num_tasks(self): return len(available_nodes) * num_tasks_per_node def submit(self): + print(self.scheduler.allnodes()) return self.scheduler.submit(self) def wait(self): diff --git a/reframe/core/schedulers/slurm.py b/reframe/core/schedulers/slurm.py index 12ae08a457..b9c21299af 100644 --- a/reframe/core/schedulers/slurm.py +++ b/reframe/core/schedulers/slurm.py @@ -192,6 +192,9 @@ def emit_preamble(self, job): else: hint = 'multithread' if job.use_smt else 'nomultithread' + if job.pin_nodes: + preamble.append(self._format_option(job.pin_nodes, '--nodelist={0}')) + for opt in job.sched_access: if not opt.strip().startswith(('-C', '--constraint')): preamble.append('%s %s' % (self._prefix, opt)) From 06cd3159085ffd6351cd6f90e55291917584a671 Mon Sep 17 00:00:00 2001 From: Eirini Koutsaniti Date: Thu, 17 Feb 2022 08:35:23 +0100 Subject: [PATCH 02/43] Remove forgotten print --- reframe/core/schedulers/__init__.py | 1 - 1 file changed, 1 deletion(-) diff --git a/reframe/core/schedulers/__init__.py b/reframe/core/schedulers/__init__.py index d6cc662797..3f146b9305 100644 --- a/reframe/core/schedulers/__init__.py +++ b/reframe/core/schedulers/__init__.py @@ -429,7 +429,6 @@ def guess_num_tasks(self): return len(available_nodes) * num_tasks_per_node def submit(self): - print(self.scheduler.allnodes()) return self.scheduler.submit(self) def wait(self): From ea2891b5775a784e1a96e029ca4061d6121b5d02 Mon Sep 17 00:00:00 2001 From: Eirini Koutsaniti Date: Wed, 2 Mar 2022 10:56:09 +0100 Subject: [PATCH 03/43] Add special parameter with all the nodes --- reframe/core/meta.py | 6 ++++-- reframe/core/parameters.py | 18 ++++++++++++++++++ reframe/core/schedulers/local.py | 4 ++++ reframe/frontend/autodetect.py | 10 ++++++++++ reframe/frontend/executors/__init__.py | 16 +++++++++++++++- 5 files changed, 51 insertions(+), 3 deletions(-) diff --git a/reframe/core/meta.py b/reframe/core/meta.py index e03405579e..bf5ac1c733 100644 --- a/reframe/core/meta.py +++ b/reframe/core/meta.py @@ -222,6 +222,8 @@ def __prepare__(metacls, name, bases, **kwargs): # class body as: `P0 = parameter([0,1,2,3])`. namespace['parameter'] = parameters.TestParam + namespace['nodeparameter'] = parameters.NodeTestParam + # Regression test var space defined at the class level namespace['_rfm_local_var_space'] = namespaces.LocalNamespace() @@ -363,7 +365,7 @@ class was created or even at the instance level (e.g. doing directives = [ 'parameter', 'variable', 'bind', 'run_before', 'run_after', 'require_deps', 'required', 'deferrable', 'sanity_function', - 'final', 'performance_function', 'fixture' + 'final', 'performance_function', 'fixture', 'nodeparameter' ] for b in directives: namespace.pop(b) @@ -768,7 +770,7 @@ class MyTest(rfm.RegressionTest): ... # Get the raw info for variant 0 - MyTest.get_variant_info(0, recursive=True) + MyTest.get_variant_info(0, recurse=True) # { # 'params': {'p1': 'a'}, # 'fixtures': { diff --git a/reframe/core/parameters.py b/reframe/core/parameters.py index edcabdf6b2..966d2a7c7e 100644 --- a/reframe/core/parameters.py +++ b/reframe/core/parameters.py @@ -13,6 +13,7 @@ import reframe.core.namespaces as namespaces import reframe.utility as utils from reframe.core.exceptions import ReframeSyntaxError +from reframe.frontend.autodetect import getallnodes class TestParam: @@ -92,6 +93,23 @@ def is_abstract(self): return len(self.values) == 0 +class NodeTestParam(TestParam): + def __init__(self, values=None, + inherit_params=False, filter_params=None, fmt=None): + # TODO choose which nodes to get + self._node_map = getallnodes(['*']) + node_values = list(itertools.chain(*self._node_map.values())) + # Remove duplicates of nodes from different partitions + node_values = list(set(node_values)) + + super().__init__(values=node_values, inherit_params=inherit_params, + filter_params=filter_params, fmt=fmt) + + @property + def node_map(self): + return self._node_map + + class ParamSpace(namespaces.Namespace): '''Regression test parameter space diff --git a/reframe/core/schedulers/local.py b/reframe/core/schedulers/local.py index d7bda85605..ac4ddde0c3 100644 --- a/reframe/core/schedulers/local.py +++ b/reframe/core/schedulers/local.py @@ -202,5 +202,9 @@ class _LocalNode(sched.Node): def __init__(self, name): self._name = name + @property + def name(self): + return self._name + def in_state(self, state): return state.casefold() == 'idle' diff --git a/reframe/frontend/autodetect.py b/reframe/frontend/autodetect.py index 21d50d9d2e..1096b5e741 100644 --- a/reframe/frontend/autodetect.py +++ b/reframe/frontend/autodetect.py @@ -250,3 +250,13 @@ def detect_topology(): if not found_devinfo: getlogger().debug(f'> device auto-detection is not supported') + + +def getallnodes(parts): + rt = runtime.runtime() + nodes = {} + for part in rt.system.partitions: + # TODO keep only partitions that the user asks for + nodes[part.fullname] = [n.name for n in part.scheduler.allnodes()] + + return nodes diff --git a/reframe/frontend/executors/__init__.py b/reframe/frontend/executors/__init__.py index f98ec9c0b6..e6a77de245 100644 --- a/reframe/frontend/executors/__init__.py +++ b/reframe/frontend/executors/__init__.py @@ -21,6 +21,7 @@ ForceExitError, SkipTestError, TaskExit) +from reframe.core.parameters import NodeTestParam from reframe.core.schedulers.local import LocalJobScheduler from reframe.frontend.printer import PrettyPrinter from reframe.frontend.statistics import TestStats @@ -108,11 +109,24 @@ def supports_partition(c, p): def supports_environ(c, e): return skip_environ_check or c.supports_environ(e.name) + def support_node_parameter(c, part): + cls = type(c) + variant_info = cls.get_variant_info(c.variant_num) + for param, v in variant_info['params'].items(): + param_object = cls.raw_params[param] + if isinstance(param_object, NodeTestParam): + return v in param_object.node_map[part.fullname] + + return True + rt = runtime.runtime() cases = [] for c in checks: for p in rt.system.partitions: - if not supports_partition(c, p): + if ( + not supports_partition(c, p) or + not support_node_parameter(c, p) + ): continue for e in p.environs: From 82b99632b5d7b689500d8c29f245f0f51dde153f Mon Sep 17 00:00:00 2001 From: Eirini Koutsaniti Date: Wed, 2 Mar 2022 14:56:53 +0100 Subject: [PATCH 04/43] Fix small typo --- reframe/core/meta.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/reframe/core/meta.py b/reframe/core/meta.py index e3e3ca0311..6ae0e49f25 100644 --- a/reframe/core/meta.py +++ b/reframe/core/meta.py @@ -940,7 +940,7 @@ def make_test(name, bases, body, **kwargs): class HelloTest(rfm.RunOnlyRegressionTest): valid_systems = ['*'] valid_prog_environs = ['*'] - executable = 'echo', + executable = 'echo' sanity_patterns: sn.assert_true(1) hello_cls = HelloTest From a377f5706237f7f7471126dc26330a2f43aecc27 Mon Sep 17 00:00:00 2001 From: Eirini Koutsaniti Date: Thu, 3 Mar 2022 10:18:58 +0100 Subject: [PATCH 05/43] Filter nodes by partition --- reframe/frontend/autodetect.py | 10 +++++++++- 1 file changed, 9 insertions(+), 1 deletion(-) diff --git a/reframe/frontend/autodetect.py b/reframe/frontend/autodetect.py index 1096b5e741..ae7f4d9c07 100644 --- a/reframe/frontend/autodetect.py +++ b/reframe/frontend/autodetect.py @@ -257,6 +257,14 @@ def getallnodes(parts): nodes = {} for part in rt.system.partitions: # TODO keep only partitions that the user asks for - nodes[part.fullname] = [n.name for n in part.scheduler.allnodes()] + + # This job will not be submitted, it's used only to filter + # the nodes based on the partition configuration + job = Job.create(part.scheduler, + part.launcher_type(), + name='placeholder-job', + sched_access=part.access) + filtered = part.scheduler.filternodes(job, part.scheduler.allnodes()) + nodes[part.fullname] = [n.name for n in filtered] return nodes From eac9d189abd86f462ea927abbdefc0f8767f1363 Mon Sep 17 00:00:00 2001 From: Eirini Koutsaniti Date: Thu, 3 Mar 2022 10:44:31 +0100 Subject: [PATCH 06/43] Add --flex-alloc-singlenode --- reframe/core/meta.py | 12 ++++++++++-- reframe/core/parameters.py | 7 ++++--- reframe/core/pipeline.py | 7 +++++++ reframe/core/runtime.py | 1 + reframe/frontend/cli.py | 9 +++++++++ 5 files changed, 31 insertions(+), 5 deletions(-) diff --git a/reframe/core/meta.py b/reframe/core/meta.py index 6ae0e49f25..b93c8b489f 100644 --- a/reframe/core/meta.py +++ b/reframe/core/meta.py @@ -18,8 +18,8 @@ import reframe.core.hooks as hooks import reframe.utility as utils -from reframe.core.exceptions import ReframeSyntaxError from reframe.core.deferrable import deferrable, _DeferredPerformanceExpression +from reframe.core.exceptions import ReframeSyntaxError, ReframeFatalError from reframe.core.runtime import runtime @@ -216,7 +216,15 @@ def __prepare__(metacls, name, bases, **kwargs): ] # Regression test parameter space defined at the class level - namespace['_rfm_local_param_space'] = namespaces.LocalNamespace() + localns = namespaces.LocalNamespace() + try: + rt = runtime() + if rt.flex_alloc_singlenode: + localns['_rfm_node'] = parameters.NodeTestParam() + except ReframeFatalError: + pass + + namespace['_rfm_local_param_space'] = localns # Directive to insert a regression test parameter directly in the # class body as: `P0 = parameter([0,1,2,3])`. diff --git a/reframe/core/parameters.py b/reframe/core/parameters.py index 060b9b9112..25354d1acd 100644 --- a/reframe/core/parameters.py +++ b/reframe/core/parameters.py @@ -98,8 +98,9 @@ def is_loggable(self): class NodeTestParam(TestParam): - def __init__(self, values=None, - inherit_params=False, filter_params=None, fmt=None): + def __init__(self, values=None, inherit_params=False, + filter_params=None, fmt=None, loggable=False): + # TODO check which arguments makes sence to keep # TODO choose which nodes to get self._node_map = getallnodes(['*']) node_values = list(itertools.chain(*self._node_map.values())) @@ -107,7 +108,7 @@ def __init__(self, values=None, node_values = list(set(node_values)) super().__init__(values=node_values, inherit_params=inherit_params, - filter_params=filter_params, fmt=fmt) + filter_params=filter_params, fmt=fmt, loggable=loggable) @property def node_map(self): diff --git a/reframe/core/pipeline.py b/reframe/core/pipeline.py index 094fe86ca5..871be943ea 100644 --- a/reframe/core/pipeline.py +++ b/reframe/core/pipeline.py @@ -1666,6 +1666,10 @@ def compile(self): f'systems/0/partitions/@{self.current_partition.name}' f'/time_limit') ) + if (not self.local and not self.build_locally + and rt.runtime().flex_alloc_singlenode): + self._build_job.pin_nodes = self._rfm_node + with osext.change_dir(self._stagedir): # Prepare build job build_commands = [ @@ -1768,6 +1772,9 @@ def run(self): self.job.time_limit = (self.time_limit or rt.runtime().get_option( f'systems/0/partitions/@{self.current_partition.name}/time_limit') ) + if not self.local and rt.runtime().flex_alloc_singlenode: + self.job.pin_nodes = self._rfm_node + exec_cmd = [self.job.launcher.run_command(self.job), self.executable, *self.executable_opts] diff --git a/reframe/core/runtime.py b/reframe/core/runtime.py index 698f82409e..a0c72afeff 100644 --- a/reframe/core/runtime.py +++ b/reframe/core/runtime.py @@ -32,6 +32,7 @@ def __init__(self, site_config): self._system = System.create(site_config) self._current_run = 0 self._timestamp = datetime.now() + self.flex_alloc_singlenode = False def _makedir(self, *dirs, wipeout=False): ret = os.path.join(*dirs) diff --git a/reframe/frontend/cli.py b/reframe/frontend/cli.py index 026473278a..4c5314a1ad 100644 --- a/reframe/frontend/cli.py +++ b/reframe/frontend/cli.py @@ -379,6 +379,13 @@ def main(): dest='flex_alloc_nodes', metavar='{all|STATE|NUM}', default=None, help='Set strategy for the flexible node allocation (default: "idle").' ) + # TODO Decide the functionality of the option + # Now ReFrame parametirizes the test in all nodes of the partition + run_options.add_argument( + '--flex-alloc-singlenode', action='store_true', + dest='flex_alloc_singlenode', + help='Submit single node job automatically on every node of a partition' + ) run_options.add_argument( '--force-local', action='store_true', help='Force local execution of checks' @@ -885,6 +892,8 @@ def print_infoline(param, value): 'workdir': os.getcwd(), } + rt.flex_alloc_singlenode = options.flex_alloc_singlenode or False + # Print command line printer.info(f"[ReFrame Setup]") print_infoline('version', session_info['version']) From a121d5a8e1ffad3e2c584522bce8c118dba5b8b5 Mon Sep 17 00:00:00 2001 From: Eirini Koutsaniti Date: Thu, 3 Mar 2022 13:18:25 +0100 Subject: [PATCH 07/43] Check all node parameters before making testcase --- reframe/frontend/executors/__init__.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/reframe/frontend/executors/__init__.py b/reframe/frontend/executors/__init__.py index 407ab8fbe6..0d4f643808 100644 --- a/reframe/frontend/executors/__init__.py +++ b/reframe/frontend/executors/__init__.py @@ -115,7 +115,8 @@ def support_node_parameter(c, part): for param, v in variant_info['params'].items(): param_object = cls.raw_params[param] if isinstance(param_object, NodeTestParam): - return v in param_object.node_map[part.fullname] + if v not in param_object.node_map[part.fullname]: + return False return True From b4651ada05d7bdac93859802991489d9976aeba7 Mon Sep 17 00:00:00 2001 From: Eirini Koutsaniti Date: Fri, 4 Mar 2022 12:55:49 +0100 Subject: [PATCH 08/43] Update --flex-alloc-singlenode behaviour --- reframe/core/meta.py | 12 ++++++++++-- reframe/core/parameters.py | 4 ++-- reframe/core/pipeline.py | 6 ++++-- reframe/core/runtime.py | 6 +++++- reframe/frontend/autodetect.py | 22 +++++++++++++++++----- reframe/frontend/cli.py | 21 +++++++++++++++++---- 6 files changed, 55 insertions(+), 16 deletions(-) diff --git a/reframe/core/meta.py b/reframe/core/meta.py index b93c8b489f..78c6e5c7c2 100644 --- a/reframe/core/meta.py +++ b/reframe/core/meta.py @@ -219,9 +219,17 @@ def __prepare__(metacls, name, bases, **kwargs): localns = namespaces.LocalNamespace() try: rt = runtime() - if rt.flex_alloc_singlenode: - localns['_rfm_node'] = parameters.NodeTestParam() + if rt.flex_alloc_singlenode_state: + if (rt.flex_alloc_singlenode_tests is None + or name in rt.flex_alloc_singlenode_tests): + localns['_rfm_node'] = ( + parameters.NodeTestParam( + state=rt.flex_alloc_singlenode_state + ) + ) except ReframeFatalError: + # ReFrame context might not be configured for classes like + # BuildSystem etc pass namespace['_rfm_local_param_space'] = localns diff --git a/reframe/core/parameters.py b/reframe/core/parameters.py index 25354d1acd..98546adc29 100644 --- a/reframe/core/parameters.py +++ b/reframe/core/parameters.py @@ -98,11 +98,11 @@ def is_loggable(self): class NodeTestParam(TestParam): - def __init__(self, values=None, inherit_params=False, + def __init__(self, state='all', inherit_params=False, filter_params=None, fmt=None, loggable=False): # TODO check which arguments makes sence to keep # TODO choose which nodes to get - self._node_map = getallnodes(['*']) + self._node_map = getallnodes(state) node_values = list(itertools.chain(*self._node_map.values())) # Remove duplicates of nodes from different partitions node_values = list(set(node_values)) diff --git a/reframe/core/pipeline.py b/reframe/core/pipeline.py index 871be943ea..94db0ab145 100644 --- a/reframe/core/pipeline.py +++ b/reframe/core/pipeline.py @@ -1667,7 +1667,8 @@ def compile(self): f'/time_limit') ) if (not self.local and not self.build_locally - and rt.runtime().flex_alloc_singlenode): + and rt.runtime().flex_alloc_singlenode_state + and hasattr(self, '_rfm_node')): self._build_job.pin_nodes = self._rfm_node with osext.change_dir(self._stagedir): @@ -1772,7 +1773,8 @@ def run(self): self.job.time_limit = (self.time_limit or rt.runtime().get_option( f'systems/0/partitions/@{self.current_partition.name}/time_limit') ) - if not self.local and rt.runtime().flex_alloc_singlenode: + if (not self.local and rt.runtime().flex_alloc_singlenode_state + and hasattr(self, '_rfm_node')): self.job.pin_nodes = self._rfm_node exec_cmd = [self.job.launcher.run_command(self.job), diff --git a/reframe/core/runtime.py b/reframe/core/runtime.py index a0c72afeff..b317dbbac1 100644 --- a/reframe/core/runtime.py +++ b/reframe/core/runtime.py @@ -32,7 +32,11 @@ def __init__(self, site_config): self._system = System.create(site_config) self._current_run = 0 self._timestamp = datetime.now() - self.flex_alloc_singlenode = False + + # Necessary information for flex-alloc-singlenode + self.flex_alloc_singlenode_state = None + self.flex_alloc_singlenode_tests = None + self.jobs_cli_options = [] def _makedir(self, *dirs, wipeout=False): ret = os.path.join(*dirs) diff --git a/reframe/frontend/autodetect.py b/reframe/frontend/autodetect.py index ae7f4d9c07..31a8368bf9 100644 --- a/reframe/frontend/autodetect.py +++ b/reframe/frontend/autodetect.py @@ -252,19 +252,31 @@ def detect_topology(): getlogger().debug(f'> device auto-detection is not supported') -def getallnodes(parts): +def getallnodes(state='all'): rt = runtime.runtime() nodes = {} for part in rt.system.partitions: - # TODO keep only partitions that the user asks for - # This job will not be submitted, it's used only to filter # the nodes based on the partition configuration job = Job.create(part.scheduler, part.launcher_type(), name='placeholder-job', sched_access=part.access) - filtered = part.scheduler.filternodes(job, part.scheduler.allnodes()) - nodes[part.fullname] = [n.name for n in filtered] + + available_nodes = part.scheduler.allnodes() + available_nodes = part.scheduler.filternodes(job, available_nodes) + getlogger().debug( + f'Total available nodes for {part.name}: {len(available_nodes)}' + ) + + if state.casefold() != 'all': + available_nodes = {n for n in available_nodes + if n.in_state(state)} + getlogger().debug( + f'[F] Selecting nodes in state {state!r}: ' + f'available nodes now: {len(available_nodes)}' + ) + + nodes[part.fullname] = [n.name for n in available_nodes] return nodes diff --git a/reframe/frontend/cli.py b/reframe/frontend/cli.py index 4c5314a1ad..726846080d 100644 --- a/reframe/frontend/cli.py +++ b/reframe/frontend/cli.py @@ -382,9 +382,10 @@ def main(): # TODO Decide the functionality of the option # Now ReFrame parametirizes the test in all nodes of the partition run_options.add_argument( - '--flex-alloc-singlenode', action='store_true', - dest='flex_alloc_singlenode', - help='Submit single node job automatically on every node of a partition' + '--flex-alloc-singlenode', action='store', default=None, + dest='flex_alloc_singlenode', metavar='{all|STATE}[:TEST1,TEST2]', + help=('Submit single node jobs automatically on every node of a ' + 'partition in STATE') ) run_options.add_argument( '--force-local', action='store_true', @@ -892,7 +893,19 @@ def print_infoline(param, value): 'workdir': os.getcwd(), } - rt.flex_alloc_singlenode = options.flex_alloc_singlenode or False + if options.flex_alloc_singlenode: + state, *tests = options.flex_alloc_singlenode.split(':') + if len(tests) == 0: + rt.flex_alloc_singlenode_state = state + rt.flex_alloc_singlenode_tests = None + elif len(tests) == 1: + rt.flex_alloc_singlenode_state = state + rt.flex_alloc_singlenode_tests = tests[0].split(',') + else: + printer.error(f'argument {options.flex_alloc_singlenode} for ' + f'--flex-alloc-singlenode is not in format ' + f'{{all|STATE}}[:TEST1,TEST2]') + sys.exit(1) # Print command line printer.info(f"[ReFrame Setup]") From 6bdd243ffef4d58dbfb0b96ffd45b460bd72a4cc Mon Sep 17 00:00:00 2001 From: Eirini Koutsaniti Date: Fri, 4 Mar 2022 14:33:56 +0100 Subject: [PATCH 09/43] Fix formatting issues --- reframe/core/meta.py | 6 +++--- reframe/core/parameters.py | 3 ++- reframe/core/pipeline.py | 10 +++++----- reframe/core/schedulers/slurm.py | 4 +++- 4 files changed, 13 insertions(+), 10 deletions(-) diff --git a/reframe/core/meta.py b/reframe/core/meta.py index 78c6e5c7c2..9768542192 100644 --- a/reframe/core/meta.py +++ b/reframe/core/meta.py @@ -216,12 +216,12 @@ def __prepare__(metacls, name, bases, **kwargs): ] # Regression test parameter space defined at the class level - localns = namespaces.LocalNamespace() + localns = namespaces.LocalNamespace() try: rt = runtime() if rt.flex_alloc_singlenode_state: - if (rt.flex_alloc_singlenode_tests is None - or name in rt.flex_alloc_singlenode_tests): + if (rt.flex_alloc_singlenode_tests is None or + name in rt.flex_alloc_singlenode_tests): localns['_rfm_node'] = ( parameters.NodeTestParam( state=rt.flex_alloc_singlenode_state diff --git a/reframe/core/parameters.py b/reframe/core/parameters.py index 98546adc29..388e5f3bee 100644 --- a/reframe/core/parameters.py +++ b/reframe/core/parameters.py @@ -108,7 +108,8 @@ def __init__(self, state='all', inherit_params=False, node_values = list(set(node_values)) super().__init__(values=node_values, inherit_params=inherit_params, - filter_params=filter_params, fmt=fmt, loggable=loggable) + filter_params=filter_params, fmt=fmt, + loggable=loggable) @property def node_map(self): diff --git a/reframe/core/pipeline.py b/reframe/core/pipeline.py index 94db0ab145..a3f1eb4fce 100644 --- a/reframe/core/pipeline.py +++ b/reframe/core/pipeline.py @@ -1666,9 +1666,9 @@ def compile(self): f'systems/0/partitions/@{self.current_partition.name}' f'/time_limit') ) - if (not self.local and not self.build_locally - and rt.runtime().flex_alloc_singlenode_state - and hasattr(self, '_rfm_node')): + if (not self.local and not self.build_locally and + rt.runtime().flex_alloc_singlenode_state and + hasattr(self, '_rfm_node')): self._build_job.pin_nodes = self._rfm_node with osext.change_dir(self._stagedir): @@ -1773,8 +1773,8 @@ def run(self): self.job.time_limit = (self.time_limit or rt.runtime().get_option( f'systems/0/partitions/@{self.current_partition.name}/time_limit') ) - if (not self.local and rt.runtime().flex_alloc_singlenode_state - and hasattr(self, '_rfm_node')): + if (not self.local and rt.runtime().flex_alloc_singlenode_state and + hasattr(self, '_rfm_node')): self.job.pin_nodes = self._rfm_node exec_cmd = [self.job.launcher.run_command(self.job), diff --git a/reframe/core/schedulers/slurm.py b/reframe/core/schedulers/slurm.py index 1421c0e031..fe570a8281 100644 --- a/reframe/core/schedulers/slurm.py +++ b/reframe/core/schedulers/slurm.py @@ -193,7 +193,9 @@ def emit_preamble(self, job): hint = 'multithread' if job.use_smt else 'nomultithread' if job.pin_nodes: - preamble.append(self._format_option(job.pin_nodes, '--nodelist={0}')) + preamble.append( + self._format_option(job.pin_nodes, '--nodelist={0}') + ) for opt in job.sched_access: if not opt.strip().startswith(('-C', '--constraint')): From 0900c185e324cce79504c297c4d319ec0ea8e587 Mon Sep 17 00:00:00 2001 From: Eirini Koutsaniti Date: Fri, 4 Mar 2022 15:54:00 +0100 Subject: [PATCH 10/43] Take into account cli job options in NodeTestParam --- reframe/core/parameters.py | 1 - reframe/frontend/autodetect.py | 1 + reframe/frontend/cli.py | 27 +++++++++++++++------------ 3 files changed, 16 insertions(+), 13 deletions(-) diff --git a/reframe/core/parameters.py b/reframe/core/parameters.py index 388e5f3bee..cefca5dc7a 100644 --- a/reframe/core/parameters.py +++ b/reframe/core/parameters.py @@ -101,7 +101,6 @@ class NodeTestParam(TestParam): def __init__(self, state='all', inherit_params=False, filter_params=None, fmt=None, loggable=False): # TODO check which arguments makes sence to keep - # TODO choose which nodes to get self._node_map = getallnodes(state) node_values = list(itertools.chain(*self._node_map.values())) # Remove duplicates of nodes from different partitions diff --git a/reframe/frontend/autodetect.py b/reframe/frontend/autodetect.py index 31a8368bf9..003f8632d5 100644 --- a/reframe/frontend/autodetect.py +++ b/reframe/frontend/autodetect.py @@ -263,6 +263,7 @@ def getallnodes(state='all'): name='placeholder-job', sched_access=part.access) + job.cli_options = rt.jobs_cli_options available_nodes = part.scheduler.allnodes() available_nodes = part.scheduler.filternodes(job, available_nodes) getlogger().debug( diff --git a/reframe/frontend/cli.py b/reframe/frontend/cli.py index 726846080d..6af4142bc0 100644 --- a/reframe/frontend/cli.py +++ b/reframe/frontend/cli.py @@ -924,6 +924,21 @@ def print_infoline(param, value): print_infoline('output directory', repr(session_info['prefix_output'])) printer.info('') try: + # Need to parse the cli options before loading the tests + parsed_job_options = [] + for opt in options.job_options: + opt_split = opt.split('=', maxsplit=1) + optstr = opt_split[0] + valstr = opt_split[1] if len(opt_split) > 1 else '' + if opt.startswith('-') or opt.startswith('#'): + parsed_job_options.append(opt) + elif len(optstr) == 1: + parsed_job_options.append(f'-{optstr} {valstr}') + else: + parsed_job_options.append(f'--{optstr} {valstr}') + + rt.jobs_cli_options = parsed_job_options + # Locate and load checks checks_found = loader.load_all() printer.verbose(f'Loaded {len(checks_found)} test(s)') @@ -1187,18 +1202,6 @@ def module_unuse(*paths): sched_flex_alloc_nodes = options.flex_alloc_nodes exec_policy.sched_flex_alloc_nodes = sched_flex_alloc_nodes - parsed_job_options = [] - for opt in options.job_options: - opt_split = opt.split('=', maxsplit=1) - optstr = opt_split[0] - valstr = opt_split[1] if len(opt_split) > 1 else '' - if opt.startswith('-') or opt.startswith('#'): - parsed_job_options.append(opt) - elif len(optstr) == 1: - parsed_job_options.append(f'-{optstr} {valstr}') - else: - parsed_job_options.append(f'--{optstr} {valstr}') - exec_policy.sched_options = parsed_job_options if options.maxfail < 0: raise errors.ConfigError( From 9e2647783bdc35d14f68a84e915d6074a471849b Mon Sep 17 00:00:00 2001 From: Eirini Koutsaniti Date: Sun, 20 Mar 2022 14:39:42 +0100 Subject: [PATCH 11/43] Update flex-alloc-singlenode implementation --- reframe/core/meta.py | 21 +------ reframe/core/parameters.py | 19 ------- reframe/core/pipeline.py | 5 -- reframe/core/runtime.py | 4 -- reframe/core/schedulers/__init__.py | 10 ++-- reframe/frontend/autodetect.py | 12 +++- reframe/frontend/cli.py | 76 +++++++++++++++++++++++++- reframe/frontend/executors/__init__.py | 17 +----- 8 files changed, 90 insertions(+), 74 deletions(-) diff --git a/reframe/core/meta.py b/reframe/core/meta.py index 25a3953c95..2ff4d16eea 100644 --- a/reframe/core/meta.py +++ b/reframe/core/meta.py @@ -19,8 +19,7 @@ import reframe.core.hooks as hooks import reframe.utility as utils -from reframe.core.deferrable import deferrable, _DeferredPerformanceExpression -from reframe.core.exceptions import ReframeSyntaxError, ReframeFatalError +from reframe.core.exceptions import ReframeSyntaxError from reframe.core.runtime import runtime @@ -220,23 +219,7 @@ def __prepare__(metacls, name, bases, **kwargs): ] # Regression test parameter space defined at the class level - localns = namespaces.LocalNamespace() - try: - rt = runtime() - if rt.flex_alloc_singlenode_state: - if (rt.flex_alloc_singlenode_tests is None or - name in rt.flex_alloc_singlenode_tests): - localns['_rfm_node'] = ( - parameters.NodeTestParam( - state=rt.flex_alloc_singlenode_state - ) - ) - except ReframeFatalError: - # ReFrame context might not be configured for classes like - # BuildSystem etc - pass - - namespace['_rfm_local_param_space'] = localns + namespace['_rfm_local_param_space'] = namespaces.LocalNamespace() # Regression test var space defined at the class level namespace['_rfm_local_var_space'] = namespaces.LocalNamespace() diff --git a/reframe/core/parameters.py b/reframe/core/parameters.py index 13bcde2322..6047e513b1 100644 --- a/reframe/core/parameters.py +++ b/reframe/core/parameters.py @@ -13,7 +13,6 @@ import reframe.core.namespaces as namespaces import reframe.utility as utils from reframe.core.exceptions import ReframeSyntaxError -from reframe.frontend.autodetect import getallnodes class TestParam: @@ -204,24 +203,6 @@ def is_loggable(self): return self.__loggable -class NodeTestParam(TestParam): - def __init__(self, state='all', inherit_params=False, - filter_params=None, fmt=None, loggable=False): - # TODO check which arguments makes sence to keep - self._node_map = getallnodes(state) - node_values = list(itertools.chain(*self._node_map.values())) - # Remove duplicates of nodes from different partitions - node_values = list(set(node_values)) - - super().__init__(values=node_values, inherit_params=inherit_params, - filter_params=filter_params, fmt=fmt, - loggable=loggable) - - @property - def node_map(self): - return self._node_map - - class ParamSpace(namespaces.Namespace): '''Regression test parameter space diff --git a/reframe/core/pipeline.py b/reframe/core/pipeline.py index 8a243d0a8e..828248dbc8 100644 --- a/reframe/core/pipeline.py +++ b/reframe/core/pipeline.py @@ -1664,11 +1664,6 @@ def compile(self): f'systems/0/partitions/@{self.current_partition.name}' f'/time_limit') ) - if (not self.local and not self.build_locally and - rt.runtime().flex_alloc_singlenode_state and - hasattr(self, '_rfm_node')): - self._build_job.pin_nodes = self._rfm_node - with osext.change_dir(self._stagedir): # Prepare build job build_commands = [ diff --git a/reframe/core/runtime.py b/reframe/core/runtime.py index b317dbbac1..4d8e8ada81 100644 --- a/reframe/core/runtime.py +++ b/reframe/core/runtime.py @@ -32,10 +32,6 @@ def __init__(self, site_config): self._system = System.create(site_config) self._current_run = 0 self._timestamp = datetime.now() - - # Necessary information for flex-alloc-singlenode - self.flex_alloc_singlenode_state = None - self.flex_alloc_singlenode_tests = None self.jobs_cli_options = [] def _makedir(self, *dirs, wipeout=False): diff --git a/reframe/core/schedulers/__init__.py b/reframe/core/schedulers/__init__.py index be78312237..7bed278e8b 100644 --- a/reframe/core/schedulers/__init__.py +++ b/reframe/core/schedulers/__init__.py @@ -265,11 +265,11 @@ class Job(jsonext.JSONSerializable, metaclass=JobMeta): #: :type: :class:`reframe.core.launchers.JobLauncher` launcher = variable(JobLauncher) - #: Pin nodes for the job - #: - #: :type: :class:`str` or :class:`None` - #: :default: :class:`None` - pin_nodes = fields.TypedField(str, type(None)) + # #: Pin nodes for the job + # #: + # #: :type: :class:`str` or :class:`None` + # #: :default: :class:`None` + pin_nodes = variable(type(None), str, value=None) # The sched_* arguments are exposed also to the frontend def __init__(self, diff --git a/reframe/frontend/autodetect.py b/reframe/frontend/autodetect.py index 003f8632d5..2903360c07 100644 --- a/reframe/frontend/autodetect.py +++ b/reframe/frontend/autodetect.py @@ -252,18 +252,24 @@ def detect_topology(): getlogger().debug(f'> device auto-detection is not supported') -def getallnodes(state='all'): +def getallnodes(state='all', partitions=None): rt = runtime.runtime() nodes = {} + if partitions is None: + partitions = rt.system.partitions + for part in rt.system.partitions: + if part not in partitions: + continue + # This job will not be submitted, it's used only to filter # the nodes based on the partition configuration job = Job.create(part.scheduler, part.launcher_type(), name='placeholder-job', - sched_access=part.access) + sched_access=part.access, + sched_options=rt.jobs_cli_options) - job.cli_options = rt.jobs_cli_options available_nodes = part.scheduler.allnodes() available_nodes = part.scheduler.filternodes(job, available_nodes) getlogger().debug( diff --git a/reframe/frontend/cli.py b/reframe/frontend/cli.py index 6af4142bc0..664eeeff99 100644 --- a/reframe/frontend/cli.py +++ b/reframe/frontend/cli.py @@ -3,6 +3,7 @@ # # SPDX-License-Identifier: BSD-3-Clause +import copy import inspect import itertools import json @@ -14,6 +15,7 @@ import traceback import reframe +import reframe.core.builtins as builtins import reframe.core.config as config import reframe.core.exceptions as errors import reframe.core.logging as logging @@ -29,6 +31,9 @@ import reframe.utility.osext as osext + +from reframe.core.decorators import TestRegistry +from reframe.core.meta import make_test from reframe.frontend.printer import PrettyPrinter from reframe.frontend.loader import RegressionCheckLoader from reframe.frontend.executors.policies import (SerialExecutionPolicy, @@ -183,6 +188,65 @@ def calc_verbosity(site_config, quiesce): return curr_verbosity - quiesce +def distribute_tests(state, testcases, skip_system_check, skip_prgenv_check): + node_map = autodetect.getallnodes(state) + + # Create extra tests + temporary_registry = None + new_checks = [] + for t in testcases: + if not t.check.is_fixture(): + cls = type(t.check) + basename = cls.__name__ + original_var_info = cls.get_variant_info( + t.check.variant_num, recurse=True + ) + + def _rfm_distributed_set_run_nodes(obj): + if not obj.local: + obj.job.pin_nodes = obj._rfm_nodelist + + def _rfm_distributed_set_build_nodes(obj): + if not obj.local and not obj.build_locally: + obj.build_job.pin_nodes = obj._rfm_nodelist + + # We re-set the valid system and environment in a hook to + # make sure that it will not be overwriten by a parent + # post-init hook + def _rfm_distributed_set_valid_sys_env(obj): + obj.valid_systems = [t._partition.fullname] + obj.valid_prog_environs = [t._environ.name] + class BaseTest(t.check.__class__): + _rfm_nodelist = builtins.parameter(node_map[t._partition.fullname]) + + nc = make_test( + f'__D_{t._partition.name}_{t._environ.name}_{basename}', + (BaseTest, ), + {}, + methods=[ + builtins.run_before('run')(_rfm_distributed_set_run_nodes), + builtins.run_before('compile')(_rfm_distributed_set_build_nodes), + builtins.run_after('init')(_rfm_distributed_set_valid_sys_env), + ] + ) + + for i in range(nc.num_variants): + # Check if this variant should be instantiated + var_info = copy.deepcopy(nc.get_variant_info(i, recurse=True)) + var_info['params'].pop('_rfm_nodelist') + if var_info == original_var_info: + if temporary_registry is None: + temporary_registry = TestRegistry.create(nc, variant_num=i) + else: + temporary_registry.add(nc, variant_num=i) + + if temporary_registry: + new_checks = temporary_registry.instantiate_all() + return generate_testcases(new_checks, skip_system_check, + skip_prgenv_check) + else: + return [] + def main(): # Setup command line options argparser = argparse.ArgumentParser() @@ -379,11 +443,10 @@ def main(): dest='flex_alloc_nodes', metavar='{all|STATE|NUM}', default=None, help='Set strategy for the flexible node allocation (default: "idle").' ) - # TODO Decide the functionality of the option - # Now ReFrame parametirizes the test in all nodes of the partition + # TODO Decide the exact functionality of the option run_options.add_argument( '--flex-alloc-singlenode', action='store', default=None, - dest='flex_alloc_singlenode', metavar='{all|STATE}[:TEST1,TEST2]', + dest='flex_alloc_singlenode', metavar='{all|STATE}', help=('Submit single node jobs automatically on every node of a ' 'partition in STATE') ) @@ -1022,6 +1085,13 @@ def _case_failed(t): f'{len(testcases)} remaining' ) + if options.flex_alloc_singlenode: + testcases = distribute_tests( + options.flex_alloc_singlenode, testcases, + options.skip_system_check, options.skip_prgenv_check, + ) + testcases_all += testcases + # Prepare for running printer.debug('Building and validating the full test DAG') testgraph, skipped_cases = dependencies.build_deps(testcases_all) diff --git a/reframe/frontend/executors/__init__.py b/reframe/frontend/executors/__init__.py index 0d4f643808..61707cf7d7 100644 --- a/reframe/frontend/executors/__init__.py +++ b/reframe/frontend/executors/__init__.py @@ -21,7 +21,6 @@ ForceExitError, SkipTestError, TaskExit) -from reframe.core.parameters import NodeTestParam from reframe.core.schedulers.local import LocalJobScheduler from reframe.frontend.printer import PrettyPrinter from reframe.frontend.statistics import TestStats @@ -109,25 +108,11 @@ def supports_partition(c, p): def supports_environ(c, e): return skip_environ_check or c.supports_environ(e.name) - def support_node_parameter(c, part): - cls = type(c) - variant_info = cls.get_variant_info(c.variant_num) - for param, v in variant_info['params'].items(): - param_object = cls.raw_params[param] - if isinstance(param_object, NodeTestParam): - if v not in param_object.node_map[part.fullname]: - return False - - return True - rt = runtime.runtime() cases = [] for c in checks: for p in rt.system.partitions: - if ( - not supports_partition(c, p) or - not support_node_parameter(c, p) - ): + if not supports_partition(c, p): continue for e in p.environs: From 4e3f1adf3ab50b5ad32d455f6b685286491e3b1a Mon Sep 17 00:00:00 2001 From: Eirini Koutsaniti Date: Sun, 20 Mar 2022 15:35:48 +0100 Subject: [PATCH 12/43] Set the prefix in dynamically created tests --- reframe/core/pipeline.py | 9 ++++++--- reframe/frontend/cli.py | 2 ++ 2 files changed, 8 insertions(+), 3 deletions(-) diff --git a/reframe/core/pipeline.py b/reframe/core/pipeline.py index 828248dbc8..4de0da5aa6 100644 --- a/reframe/core/pipeline.py +++ b/reframe/core/pipeline.py @@ -861,9 +861,12 @@ def __new__(cls, *args, **kwargs): try: prefix = cls._rfm_pinned_prefix except AttributeError: - prefix = os.path.abspath( - os.path.dirname(inspect.getfile(cls)) - ) + try: + prefix = cls._rfm_dynamic_test_prefix + except AttributeError: + prefix = os.path.abspath( + os.path.dirname(inspect.getfile(cls)) + ) # Prepare initialization of test defaults (variables and parameters are # injected after __new__ has returned, so we schedule this function diff --git a/reframe/frontend/cli.py b/reframe/frontend/cli.py index 664eeeff99..0899b3e8c0 100644 --- a/reframe/frontend/cli.py +++ b/reframe/frontend/cli.py @@ -229,6 +229,8 @@ class BaseTest(t.check.__class__): builtins.run_after('init')(_rfm_distributed_set_valid_sys_env), ] ) + # We have to set the prefix manually + nc._rfm_dynamic_test_prefix = t.check.prefix for i in range(nc.num_variants): # Check if this variant should be instantiated From d95b5acffce796073050fe7680447246d62a766f Mon Sep 17 00:00:00 2001 From: Eirini Koutsaniti Date: Tue, 29 Mar 2022 11:19:09 +0200 Subject: [PATCH 13/43] Replace testcases_all in flex_alloc_singlenode --- reframe/frontend/cli.py | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/reframe/frontend/cli.py b/reframe/frontend/cli.py index 0899b3e8c0..39e737b9d3 100644 --- a/reframe/frontend/cli.py +++ b/reframe/frontend/cli.py @@ -216,8 +216,11 @@ def _rfm_distributed_set_build_nodes(obj): def _rfm_distributed_set_valid_sys_env(obj): obj.valid_systems = [t._partition.fullname] obj.valid_prog_environs = [t._environ.name] + class BaseTest(t.check.__class__): _rfm_nodelist = builtins.parameter(node_map[t._partition.fullname]) + valid_systems = [t._partition.fullname] + valid_prog_environs = [t._environ.name] nc = make_test( f'__D_{t._partition.name}_{t._environ.name}_{basename}', @@ -226,7 +229,8 @@ class BaseTest(t.check.__class__): methods=[ builtins.run_before('run')(_rfm_distributed_set_run_nodes), builtins.run_before('compile')(_rfm_distributed_set_build_nodes), - builtins.run_after('init')(_rfm_distributed_set_valid_sys_env), + # TODO this hook is not working properly + # builtins.run_after('init')(_rfm_distributed_set_valid_sys_env), ] ) # We have to set the prefix manually @@ -1092,7 +1096,7 @@ def _case_failed(t): options.flex_alloc_singlenode, testcases, options.skip_system_check, options.skip_prgenv_check, ) - testcases_all += testcases + testcases_all = testcases # Prepare for running printer.debug('Building and validating the full test DAG') From cc5705bc4378ed46c28fa84d74c77a9fad7ed011 Mon Sep 17 00:00:00 2001 From: Eirini Koutsaniti Date: Tue, 29 Mar 2022 19:08:46 +0200 Subject: [PATCH 14/43] Add basic unittests --- reframe/frontend/cli.py | 11 ++-- .../resources/checks_unlisted/alloc_check.py | 40 ++++++++++++++ unittests/test_cli.py | 13 +++++ unittests/test_dynamic_tests.py | 52 +++++++++++++++++++ 4 files changed, 110 insertions(+), 6 deletions(-) create mode 100644 unittests/resources/checks_unlisted/alloc_check.py create mode 100644 unittests/test_dynamic_tests.py diff --git a/reframe/frontend/cli.py b/reframe/frontend/cli.py index 39e737b9d3..a92997a6d9 100644 --- a/reframe/frontend/cli.py +++ b/reframe/frontend/cli.py @@ -188,10 +188,8 @@ def calc_verbosity(site_config, quiesce): return curr_verbosity - quiesce -def distribute_tests(state, testcases, skip_system_check, skip_prgenv_check): - node_map = autodetect.getallnodes(state) - - # Create extra tests +def distribute_tests(testcases, skip_system_check, skip_prgenv_check, + node_map): temporary_registry = None new_checks = [] for t in testcases: @@ -1092,9 +1090,10 @@ def _case_failed(t): ) if options.flex_alloc_singlenode: + node_map = autodetect.getallnodes(options.flex_alloc_singlenode) testcases = distribute_tests( - options.flex_alloc_singlenode, testcases, - options.skip_system_check, options.skip_prgenv_check, + testcases, options.skip_system_check, + options.skip_prgenv_check, node_map ) testcases_all = testcases diff --git a/unittests/resources/checks_unlisted/alloc_check.py b/unittests/resources/checks_unlisted/alloc_check.py new file mode 100644 index 0000000000..ed40035109 --- /dev/null +++ b/unittests/resources/checks_unlisted/alloc_check.py @@ -0,0 +1,40 @@ +# Copyright 2016-2022 Swiss National Supercomputing Centre (CSCS/ETH Zurich) +# ReFrame Project Developers. See the top-level LICENSE file for details. +# +# SPDX-License-Identifier: BSD-3-Clause + +import reframe as rfm +import reframe.utility.sanity as sn + +import os + + +@rfm.simple_test +class Simple(rfm.RunOnlyRegressionTest): + valid_systems = ['*'] + valid_prog_environs = ['*'] + executable = '/bin/true' + + +class MyFixture(rfm.RunOnlyRegressionTest): + # x = parameter([1, 2]) + executable = 'echo hello from fixture' + + @sanity_function + def assert_True(self): + return True + + +@rfm.simple_test +class Complex(rfm.RunOnlyRegressionTest): + valid_systems = ['*'] + valid_prog_environs = ['*'] + f = fixture(MyFixture, scope='session') + executable = '/bin/true' + + @sanity_function + def inspect_fixture(self): + return sn.assert_found( + r'hello from fixture', + os.path.join(self.f.stagedir, self.f.stdout.evaluate()) + ) diff --git a/unittests/test_cli.py b/unittests/test_cli.py index efa02468ac..06c229a902 100644 --- a/unittests/test_cli.py +++ b/unittests/test_cli.py @@ -917,3 +917,16 @@ def test_fixture_resolution(run_reframe): action='run' ) assert returncode == 0 + + +def test_dynamic_tests(run_reframe, tmp_path): + returncode, stdout, _ = run_reframe( + system='sys0', + environs=[], + checkpath=['unittests/resources/checks_unlisted/alloc_check.py'], + action='run', + more_options=['-n', 'Complex', '--flex-alloc-singlenode=idle'] + ) + assert returncode == 0 + assert 'Ran 5/5 test case(s) from 5 check(s)' in stdout + assert 'FAILED' not in stdout diff --git a/unittests/test_dynamic_tests.py b/unittests/test_dynamic_tests.py new file mode 100644 index 0000000000..9dac592d89 --- /dev/null +++ b/unittests/test_dynamic_tests.py @@ -0,0 +1,52 @@ +# Copyright 2016-2022 Swiss National Supercomputing Centre (CSCS/ETH Zurich) +# ReFrame Project Developers. See the top-level LICENSE file for details. +# +# SPDX-License-Identifier: BSD-3-Clause + +import pytest + +import reframe.frontend.executors as executors +import reframe.frontend.filters as filters +from reframe.frontend.cli import distribute_tests +from reframe.frontend.loader import RegressionCheckLoader + + +@pytest.fixture +def default_exec_ctx(make_exec_ctx_g): + yield from make_exec_ctx_g(system='sys0') + +@pytest.fixture +def loader(): + return RegressionCheckLoader([ + 'unittests/resources/checks_unlisted/alloc_check.py' + ]) + + +def test_dynamic_testcases(loader, default_exec_ctx): + testcases = executors.generate_testcases(loader.load_all()) + testcases = filter( + filters.have_name('Simple'), testcases + ) + + testcases = list(testcases) + assert len(testcases) == 4 + count = sum(map(lambda x : x._partition.fullname == 'sys0:p0', testcases)) + assert count == 2 + count = sum(map(lambda x : x._partition.fullname == 'sys0:p1', testcases)) + assert count == 2 + + node_map = { + 'sys0:p0': ['n1', 'n2'], + 'sys0:p1': ['n3'] + } + new_cases = distribute_tests(testcases, False, False, node_map) + assert len(new_cases) == 6 + count = sum(map(lambda x : x._partition.fullname == 'sys0:p0', new_cases)) + assert count == 4 + count = sum(map(lambda x : x._partition.fullname == 'sys0:p1', new_cases)) + assert count == 2 + for c in new_cases: + if c._partition.fullname == 'sys0:p0': + assert c.check._rfm_nodelist in ('n1', 'n2') + else: + assert c.check._rfm_nodelist == 'n3' From 7fd49a585147c396b58ba1d65d6c59eb25262618 Mon Sep 17 00:00:00 2001 From: Eirini Koutsaniti Date: Thu, 7 Apr 2022 16:10:09 +0200 Subject: [PATCH 15/43] Add more unittest with fixtures --- .../resources/checks_unlisted/alloc_check.py | 2 +- unittests/test_cli.py | 16 +++++++++++++++- 2 files changed, 16 insertions(+), 2 deletions(-) diff --git a/unittests/resources/checks_unlisted/alloc_check.py b/unittests/resources/checks_unlisted/alloc_check.py index ed40035109..bbc83f0c94 100644 --- a/unittests/resources/checks_unlisted/alloc_check.py +++ b/unittests/resources/checks_unlisted/alloc_check.py @@ -17,7 +17,7 @@ class Simple(rfm.RunOnlyRegressionTest): class MyFixture(rfm.RunOnlyRegressionTest): - # x = parameter([1, 2]) + x = parameter([1, 2]) executable = 'echo hello from fixture' @sanity_function diff --git a/unittests/test_cli.py b/unittests/test_cli.py index 06c229a902..fc0d6926da 100644 --- a/unittests/test_cli.py +++ b/unittests/test_cli.py @@ -928,5 +928,19 @@ def test_dynamic_tests(run_reframe, tmp_path): more_options=['-n', 'Complex', '--flex-alloc-singlenode=idle'] ) assert returncode == 0 - assert 'Ran 5/5 test case(s) from 5 check(s)' in stdout + assert 'Ran 10/10 test case(s) from 10 check(s)' in stdout + assert 'FAILED' not in stdout + + +def test_dynamic_tests_filtering(run_reframe, tmp_path): + # Target sys1 that has compact_test_names==True + returncode, stdout, _ = run_reframe( + system='sys1', + environs=[], + checkpath=['unittests/resources/checks_unlisted/alloc_check.py'], + action='run', + more_options=['-n', 'Complex@1', '--flex-alloc-singlenode=idle'] + ) + assert returncode == 0 + assert 'Ran 7/7 test case(s) from 7 check(s)' in stdout assert 'FAILED' not in stdout From 46c209618ec20e77b882186d367469acc6b3ec71 Mon Sep 17 00:00:00 2001 From: Eirini Koutsaniti Date: Thu, 7 Apr 2022 17:51:51 +0200 Subject: [PATCH 16/43] Address PR comments --- reframe/core/decorators.py | 6 +- reframe/core/runtime.py | 1 - reframe/frontend/autodetect.py | 10 +-- reframe/frontend/cli.py | 136 +++++++++++++++------------------ unittests/test_cli.py | 4 +- 5 files changed, 70 insertions(+), 87 deletions(-) diff --git a/reframe/core/decorators.py b/reframe/core/decorators.py index ed33b410ea..a7862bd367 100644 --- a/reframe/core/decorators.py +++ b/reframe/core/decorators.py @@ -50,9 +50,11 @@ def __init__(self): self._skip_tests = set() @classmethod - def create(cls, test, *args, **kwargs): + def create(cls, test=None, *args, **kwargs): obj = cls() - obj.add(test, *args, **kwargs) + if test: + obj.add(test, *args, **kwargs) + return obj def add(self, test, *args, **kwargs): diff --git a/reframe/core/runtime.py b/reframe/core/runtime.py index 4d8e8ada81..698f82409e 100644 --- a/reframe/core/runtime.py +++ b/reframe/core/runtime.py @@ -32,7 +32,6 @@ def __init__(self, site_config): self._system = System.create(site_config) self._current_run = 0 self._timestamp = datetime.now() - self.jobs_cli_options = [] def _makedir(self, *dirs, wipeout=False): ret = os.path.join(*dirs) diff --git a/reframe/frontend/autodetect.py b/reframe/frontend/autodetect.py index 2903360c07..b18a60ae00 100644 --- a/reframe/frontend/autodetect.py +++ b/reframe/frontend/autodetect.py @@ -252,23 +252,17 @@ def detect_topology(): getlogger().debug(f'> device auto-detection is not supported') -def getallnodes(state='all', partitions=None): +def getallnodes(state='all', jobs_cli_options=[]): rt = runtime.runtime() nodes = {} - if partitions is None: - partitions = rt.system.partitions - for part in rt.system.partitions: - if part not in partitions: - continue - # This job will not be submitted, it's used only to filter # the nodes based on the partition configuration job = Job.create(part.scheduler, part.launcher_type(), name='placeholder-job', sched_access=part.access, - sched_options=rt.jobs_cli_options) + sched_options=jobs_cli_options) available_nodes = part.scheduler.allnodes() available_nodes = part.scheduler.filternodes(job, available_nodes) diff --git a/reframe/frontend/cli.py b/reframe/frontend/cli.py index a92997a6d9..ff27e1fd1b 100644 --- a/reframe/frontend/cli.py +++ b/reframe/frontend/cli.py @@ -190,62 +190,65 @@ def calc_verbosity(site_config, quiesce): def distribute_tests(testcases, skip_system_check, skip_prgenv_check, node_map): - temporary_registry = None + tmp_registry = TestRegistry.create() new_checks = [] - for t in testcases: - if not t.check.is_fixture(): - cls = type(t.check) - basename = cls.__name__ - original_var_info = cls.get_variant_info( - t.check.variant_num, recurse=True - ) + for tc in testcases: + check, partition, environ = tc + if check.is_fixture(): + continue - def _rfm_distributed_set_run_nodes(obj): - if not obj.local: - obj.job.pin_nodes = obj._rfm_nodelist - - def _rfm_distributed_set_build_nodes(obj): - if not obj.local and not obj.build_locally: - obj.build_job.pin_nodes = obj._rfm_nodelist - - # We re-set the valid system and environment in a hook to - # make sure that it will not be overwriten by a parent - # post-init hook - def _rfm_distributed_set_valid_sys_env(obj): - obj.valid_systems = [t._partition.fullname] - obj.valid_prog_environs = [t._environ.name] - - class BaseTest(t.check.__class__): - _rfm_nodelist = builtins.parameter(node_map[t._partition.fullname]) - valid_systems = [t._partition.fullname] - valid_prog_environs = [t._environ.name] - - nc = make_test( - f'__D_{t._partition.name}_{t._environ.name}_{basename}', - (BaseTest, ), - {}, - methods=[ - builtins.run_before('run')(_rfm_distributed_set_run_nodes), - builtins.run_before('compile')(_rfm_distributed_set_build_nodes), - # TODO this hook is not working properly - # builtins.run_after('init')(_rfm_distributed_set_valid_sys_env), - ] - ) - # We have to set the prefix manually - nc._rfm_dynamic_test_prefix = t.check.prefix - - for i in range(nc.num_variants): - # Check if this variant should be instantiated - var_info = copy.deepcopy(nc.get_variant_info(i, recurse=True)) - var_info['params'].pop('_rfm_nodelist') - if var_info == original_var_info: - if temporary_registry is None: - temporary_registry = TestRegistry.create(nc, variant_num=i) - else: - temporary_registry.add(nc, variant_num=i) - - if temporary_registry: - new_checks = temporary_registry.instantiate_all() + cls = type(check) + basename = cls.__name__ + original_var_info = cls.get_variant_info( + check.variant_num, recurse=True + ) + + def _rfm_distributed_set_run_nodes(obj): + if not obj.local: + obj.job.pin_nodes = obj._rfm_nodelist + + def _rfm_distributed_set_build_nodes(obj): + if not obj.local and not obj.build_locally: + obj.build_job.pin_nodes = obj._rfm_nodelist + + # We re-set the valid system and environment in a hook to + # make sure that it will not be overwriten by a parent + # post-init hook + def _rfm_distributed_set_valid_sys_env(obj): + obj.valid_systems = [partition.fullname] + obj.valid_prog_environs = [environ.name] + + class BaseTest(cls): + _rfm_nodelist = builtins.parameter(node_map[partition.fullname]) + valid_systems = [partition.fullname] + valid_prog_environs = [environ.name] + + nc = make_test( + f'_D_{partition.name}_{environ.name}_{basename}', + (BaseTest, ), + {}, + methods=[ + builtins.run_before('run')(_rfm_distributed_set_run_nodes), + builtins.run_before('compile')(_rfm_distributed_set_build_nodes), + # TODO this hook is not working properly + # builtins.run_after('init')(_rfm_distributed_set_valid_sys_env), + ] + ) + # We have to set the prefix manually + nc._rfm_custom_prefix = check.prefix + + for i in range(nc.num_variants): + # Check if this variant should be instantiated + var_info = copy.deepcopy(nc.get_variant_info(i, recurse=True)) + var_info['params'].pop('_rfm_nodelist') + if var_info == original_var_info: + if tmp_registry is None: + tmp_registry = TestRegistry.create(nc, variant_num=i) + else: + tmp_registry.add(nc, variant_num=i) + + if tmp_registry: + new_checks = tmp_registry.instantiate_all() return generate_testcases(new_checks, skip_system_check, skip_prgenv_check) else: @@ -449,8 +452,8 @@ def main(): ) # TODO Decide the exact functionality of the option run_options.add_argument( - '--flex-alloc-singlenode', action='store', default=None, - dest='flex_alloc_singlenode', metavar='{all|STATE}', + '--distribute', action='store', default=None, + dest='distribute', metavar='{all|STATE}', help=('Submit single node jobs automatically on every node of a ' 'partition in STATE') ) @@ -960,20 +963,6 @@ def print_infoline(param, value): 'workdir': os.getcwd(), } - if options.flex_alloc_singlenode: - state, *tests = options.flex_alloc_singlenode.split(':') - if len(tests) == 0: - rt.flex_alloc_singlenode_state = state - rt.flex_alloc_singlenode_tests = None - elif len(tests) == 1: - rt.flex_alloc_singlenode_state = state - rt.flex_alloc_singlenode_tests = tests[0].split(',') - else: - printer.error(f'argument {options.flex_alloc_singlenode} for ' - f'--flex-alloc-singlenode is not in format ' - f'{{all|STATE}}[:TEST1,TEST2]') - sys.exit(1) - # Print command line printer.info(f"[ReFrame Setup]") print_infoline('version', session_info['version']) @@ -1004,8 +993,6 @@ def print_infoline(param, value): else: parsed_job_options.append(f'--{optstr} {valstr}') - rt.jobs_cli_options = parsed_job_options - # Locate and load checks checks_found = loader.load_all() printer.verbose(f'Loaded {len(checks_found)} test(s)') @@ -1089,8 +1076,9 @@ def _case_failed(t): f'{len(testcases)} remaining' ) - if options.flex_alloc_singlenode: - node_map = autodetect.getallnodes(options.flex_alloc_singlenode) + if options.distribute: + node_map = autodetect.getallnodes(options.distribute, + parsed_job_options) testcases = distribute_tests( testcases, options.skip_system_check, options.skip_prgenv_check, node_map diff --git a/unittests/test_cli.py b/unittests/test_cli.py index fc0d6926da..5b1af8a93f 100644 --- a/unittests/test_cli.py +++ b/unittests/test_cli.py @@ -925,7 +925,7 @@ def test_dynamic_tests(run_reframe, tmp_path): environs=[], checkpath=['unittests/resources/checks_unlisted/alloc_check.py'], action='run', - more_options=['-n', 'Complex', '--flex-alloc-singlenode=idle'] + more_options=['-n', 'Complex', '--distribute=idle'] ) assert returncode == 0 assert 'Ran 10/10 test case(s) from 10 check(s)' in stdout @@ -939,7 +939,7 @@ def test_dynamic_tests_filtering(run_reframe, tmp_path): environs=[], checkpath=['unittests/resources/checks_unlisted/alloc_check.py'], action='run', - more_options=['-n', 'Complex@1', '--flex-alloc-singlenode=idle'] + more_options=['-n', 'Complex@1', '--distribute=idle'] ) assert returncode == 0 assert 'Ran 7/7 test case(s) from 7 check(s)' in stdout From 78b13e95ac7bd5231952287ea5d05403d9bc63f0 Mon Sep 17 00:00:00 2001 From: Eirini Koutsaniti Date: Thu, 7 Apr 2022 17:53:01 +0200 Subject: [PATCH 17/43] Remove _rfm_dynamic_test_prefix attribute --- reframe/core/pipeline.py | 9 +++------ 1 file changed, 3 insertions(+), 6 deletions(-) diff --git a/reframe/core/pipeline.py b/reframe/core/pipeline.py index 4de0da5aa6..828248dbc8 100644 --- a/reframe/core/pipeline.py +++ b/reframe/core/pipeline.py @@ -861,12 +861,9 @@ def __new__(cls, *args, **kwargs): try: prefix = cls._rfm_pinned_prefix except AttributeError: - try: - prefix = cls._rfm_dynamic_test_prefix - except AttributeError: - prefix = os.path.abspath( - os.path.dirname(inspect.getfile(cls)) - ) + prefix = os.path.abspath( + os.path.dirname(inspect.getfile(cls)) + ) # Prepare initialization of test defaults (variables and parameters are # injected after __new__ has returned, so we schedule this function From 9cd364875803ce5c02a942a7a95400da64ec2860 Mon Sep 17 00:00:00 2001 From: Eirini Koutsaniti Date: Thu, 7 Apr 2022 17:56:56 +0200 Subject: [PATCH 18/43] Rename to dummy_job --- reframe/frontend/autodetect.py | 13 +++++++------ 1 file changed, 7 insertions(+), 6 deletions(-) diff --git a/reframe/frontend/autodetect.py b/reframe/frontend/autodetect.py index b18a60ae00..512bd68242 100644 --- a/reframe/frontend/autodetect.py +++ b/reframe/frontend/autodetect.py @@ -258,14 +258,15 @@ def getallnodes(state='all', jobs_cli_options=[]): for part in rt.system.partitions: # This job will not be submitted, it's used only to filter # the nodes based on the partition configuration - job = Job.create(part.scheduler, - part.launcher_type(), - name='placeholder-job', - sched_access=part.access, - sched_options=jobs_cli_options) + dummy_job = Job.create(part.scheduler, + part.launcher_type(), + name='placeholder-job', + sched_access=part.access, + sched_options=jobs_cli_options) available_nodes = part.scheduler.allnodes() - available_nodes = part.scheduler.filternodes(job, available_nodes) + available_nodes = part.scheduler.filternodes(dummy_job, + available_nodes) getlogger().debug( f'Total available nodes for {part.name}: {len(available_nodes)}' ) From 5e2c4f5419cc411e328784afc4849632dd9079f8 Mon Sep 17 00:00:00 2001 From: Eirini Koutsaniti Date: Thu, 7 Apr 2022 18:29:59 +0200 Subject: [PATCH 19/43] Address PR comments --- reframe/frontend/cli.py | 44 +++++++++++++++++++++++++---------------- unittests/test_cli.py | 4 ++-- 2 files changed, 29 insertions(+), 19 deletions(-) diff --git a/reframe/frontend/cli.py b/reframe/frontend/cli.py index ff27e1fd1b..359041e155 100644 --- a/reframe/frontend/cli.py +++ b/reframe/frontend/cli.py @@ -192,11 +192,16 @@ def distribute_tests(testcases, skip_system_check, skip_prgenv_check, node_map): tmp_registry = TestRegistry.create() new_checks = [] + # We don't want to register the same check for every environment + # per partition + registered_checks = set() for tc in testcases: check, partition, environ = tc - if check.is_fixture(): + candidate_check = (check.unique_name, partition) + if check.is_fixture() or candidate_check in registered_checks: continue + registered_checks.add(candidate_check) cls = type(check) basename = cls.__name__ original_var_info = cls.get_variant_info( @@ -214,24 +219,31 @@ def _rfm_distributed_set_build_nodes(obj): # We re-set the valid system and environment in a hook to # make sure that it will not be overwriten by a parent # post-init hook - def _rfm_distributed_set_valid_sys_env(obj): - obj.valid_systems = [partition.fullname] - obj.valid_prog_environs = [environ.name] + def _rfm_distributed_set_valid_sys(systems): + def _fn(obj): + obj.valid_systems = systems + + return _fn class BaseTest(cls): _rfm_nodelist = builtins.parameter(node_map[partition.fullname]) - valid_systems = [partition.fullname] - valid_prog_environs = [environ.name] nc = make_test( - f'_D_{partition.name}_{environ.name}_{basename}', + f'_D_{basename}_{partition.fullname.replace(":", "_")}', (BaseTest, ), - {}, + { + 'valid_systems' : [partition.fullname] + }, methods=[ builtins.run_before('run')(_rfm_distributed_set_run_nodes), - builtins.run_before('compile')(_rfm_distributed_set_build_nodes), - # TODO this hook is not working properly - # builtins.run_after('init')(_rfm_distributed_set_valid_sys_env), + builtins.run_before('compile')( + _rfm_distributed_set_build_nodes + ), + builtins.run_after('init')( + _rfm_distributed_set_valid_sys( + [partition.fullname] + ) + ), ] ) # We have to set the prefix manually @@ -247,12 +259,10 @@ class BaseTest(cls): else: tmp_registry.add(nc, variant_num=i) - if tmp_registry: - new_checks = tmp_registry.instantiate_all() - return generate_testcases(new_checks, skip_system_check, - skip_prgenv_check) - else: - return [] + new_checks = tmp_registry.instantiate_all() + return generate_testcases(new_checks, skip_system_check, + skip_prgenv_check) + def main(): # Setup command line options diff --git a/unittests/test_cli.py b/unittests/test_cli.py index 5b1af8a93f..3954271e69 100644 --- a/unittests/test_cli.py +++ b/unittests/test_cli.py @@ -928,7 +928,7 @@ def test_dynamic_tests(run_reframe, tmp_path): more_options=['-n', 'Complex', '--distribute=idle'] ) assert returncode == 0 - assert 'Ran 10/10 test case(s) from 10 check(s)' in stdout + assert 'Ran 10/10 test case(s)' in stdout assert 'FAILED' not in stdout @@ -942,5 +942,5 @@ def test_dynamic_tests_filtering(run_reframe, tmp_path): more_options=['-n', 'Complex@1', '--distribute=idle'] ) assert returncode == 0 - assert 'Ran 7/7 test case(s) from 7 check(s)' in stdout + assert 'Ran 7/7 test case(s)' in stdout assert 'FAILED' not in stdout From a5cfbb92c309562b9c0532e7c485ef1988141856 Mon Sep 17 00:00:00 2001 From: Eirini Koutsaniti Date: Thu, 7 Apr 2022 18:41:09 +0200 Subject: [PATCH 20/43] Remove unnecessary check --- reframe/frontend/cli.py | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/reframe/frontend/cli.py b/reframe/frontend/cli.py index 359041e155..29f606d2bf 100644 --- a/reframe/frontend/cli.py +++ b/reframe/frontend/cli.py @@ -254,10 +254,7 @@ class BaseTest(cls): var_info = copy.deepcopy(nc.get_variant_info(i, recurse=True)) var_info['params'].pop('_rfm_nodelist') if var_info == original_var_info: - if tmp_registry is None: - tmp_registry = TestRegistry.create(nc, variant_num=i) - else: - tmp_registry.add(nc, variant_num=i) + tmp_registry.add(nc, variant_num=i) new_checks = tmp_registry.instantiate_all() return generate_testcases(new_checks, skip_system_check, From f51df33f39863001cedfbbaebc99fe228972b751 Mon Sep 17 00:00:00 2001 From: Eirini Koutsaniti Date: Thu, 7 Apr 2022 19:10:39 +0200 Subject: [PATCH 21/43] Address PR comments --- reframe/frontend/cli.py | 16 ++++++++-------- ...dynamic_tests.py => test_distribute_tests.py} | 7 ++++--- 2 files changed, 12 insertions(+), 11 deletions(-) rename unittests/{test_dynamic_tests.py => test_distribute_tests.py} (89%) diff --git a/reframe/frontend/cli.py b/reframe/frontend/cli.py index 29f606d2bf..fac4996f9c 100644 --- a/reframe/frontend/cli.py +++ b/reframe/frontend/cli.py @@ -209,12 +209,14 @@ def distribute_tests(testcases, skip_system_check, skip_prgenv_check, ) def _rfm_distributed_set_run_nodes(obj): + pin_nodes = getattr(obj, '$nid') if not obj.local: - obj.job.pin_nodes = obj._rfm_nodelist + obj.job.pin_nodes = pin_nodes def _rfm_distributed_set_build_nodes(obj): + pin_nodes = getattr(obj, '$nid') if not obj.local and not obj.build_locally: - obj.build_job.pin_nodes = obj._rfm_nodelist + obj.build_job.pin_nodes = pin_nodes # We re-set the valid system and environment in a hook to # make sure that it will not be overwriten by a parent @@ -225,14 +227,12 @@ def _fn(obj): return _fn - class BaseTest(cls): - _rfm_nodelist = builtins.parameter(node_map[partition.fullname]) - nc = make_test( f'_D_{basename}_{partition.fullname.replace(":", "_")}', - (BaseTest, ), + (cls, ), { - 'valid_systems' : [partition.fullname] + 'valid_systems' : [partition.fullname], + '$nid' : builtins.parameter(node_map[partition.fullname]) }, methods=[ builtins.run_before('run')(_rfm_distributed_set_run_nodes), @@ -252,7 +252,7 @@ class BaseTest(cls): for i in range(nc.num_variants): # Check if this variant should be instantiated var_info = copy.deepcopy(nc.get_variant_info(i, recurse=True)) - var_info['params'].pop('_rfm_nodelist') + var_info['params'].pop('$nid') if var_info == original_var_info: tmp_registry.add(nc, variant_num=i) diff --git a/unittests/test_dynamic_tests.py b/unittests/test_distribute_tests.py similarity index 89% rename from unittests/test_dynamic_tests.py rename to unittests/test_distribute_tests.py index 9dac592d89..b6f302052e 100644 --- a/unittests/test_dynamic_tests.py +++ b/unittests/test_distribute_tests.py @@ -22,7 +22,7 @@ def loader(): ]) -def test_dynamic_testcases(loader, default_exec_ctx): +def test_distribute_testcases(loader, default_exec_ctx): testcases = executors.generate_testcases(loader.load_all()) testcases = filter( filters.have_name('Simple'), testcases @@ -46,7 +46,8 @@ def test_dynamic_testcases(loader, default_exec_ctx): count = sum(map(lambda x : x._partition.fullname == 'sys0:p1', new_cases)) assert count == 2 for c in new_cases: + nodes = getattr(c.check, '$nid') if c._partition.fullname == 'sys0:p0': - assert c.check._rfm_nodelist in ('n1', 'n2') + assert nodes in ('n1', 'n2') else: - assert c.check._rfm_nodelist == 'n3' + assert nodes == 'n3' From f0c38e296ec5d5a39835ca7b089f64030e656fcb Mon Sep 17 00:00:00 2001 From: Eirini Koutsaniti Date: Thu, 7 Apr 2022 19:23:03 +0200 Subject: [PATCH 22/43] Fix unittest after master merging --- unittests/test_distribute_tests.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/unittests/test_distribute_tests.py b/unittests/test_distribute_tests.py index b6f302052e..fecee1d687 100644 --- a/unittests/test_distribute_tests.py +++ b/unittests/test_distribute_tests.py @@ -25,7 +25,7 @@ def loader(): def test_distribute_testcases(loader, default_exec_ctx): testcases = executors.generate_testcases(loader.load_all()) testcases = filter( - filters.have_name('Simple'), testcases + filters.have_any_name('Simple'), testcases ) testcases = list(testcases) From 5861e68d79a8076539d81b6f1319e2f684a3127c Mon Sep 17 00:00:00 2001 From: Eirini Koutsaniti Date: Fri, 8 Apr 2022 09:48:47 +0200 Subject: [PATCH 23/43] Update documentation for the distribute cli option --- docs/manpage.rst | 16 ++++++++++++++++ reframe/frontend/cli.py | 13 ++++++------- 2 files changed, 22 insertions(+), 7 deletions(-) diff --git a/docs/manpage.rst b/docs/manpage.rst index e653954607..45c68d91f5 100644 --- a/docs/manpage.rst +++ b/docs/manpage.rst @@ -379,6 +379,22 @@ Options controlling ReFrame execution .. versionadded:: 3.2 +.. option:: --distribute=STATE + + Parameterize and run tests on all the nodes of their partitions. + The attribute ``pin_nodes`` will be set in these tests in order to be submitted on the selected nodes. + + Currently this will work correctly only for one-node tests in local or Slurm partitions, and it will take into account the cli jobs options that are passed by the user. + + You can decide the state of the nodes that will be considered: + + - ``all``: Tests will be parameterized over all the nodes of their partitions. + - ``STATE``: Tests will run on all the nodes in state ``STATE``, for example ``idle``. + The states of the nodes will be determined once, before beginning the + execution of the tests so it might be different in the time of the submission of the tests. + + .. versionadded:: 3.11.0 + .. option:: --exec-policy=POLICY The execution policy to be used for running tests. diff --git a/reframe/frontend/cli.py b/reframe/frontend/cli.py index c7bd34eb5b..f775fc99c3 100644 --- a/reframe/frontend/cli.py +++ b/reframe/frontend/cli.py @@ -448,6 +448,12 @@ def main(): '--disable-hook', action='append', metavar='NAME', dest='hooks', default=[], help='Disable a pipeline hook for this run' ) + run_options.add_argument( + '--distribute', action='store', default=None, + dest='distribute', metavar='{all|STATE}', + help=('Submit single node jobs automatically on every node of a ' + 'partition in STATE') + ) run_options.add_argument( '--exec-policy', metavar='POLICY', action='store', choices=['async', 'serial'], default='async', @@ -458,13 +464,6 @@ def main(): dest='flex_alloc_nodes', metavar='{all|STATE|NUM}', default=None, help='Set strategy for the flexible node allocation (default: "idle").' ) - # TODO Decide the exact functionality of the option - run_options.add_argument( - '--distribute', action='store', default=None, - dest='distribute', metavar='{all|STATE}', - help=('Submit single node jobs automatically on every node of a ' - 'partition in STATE') - ) run_options.add_argument( '--force-local', action='store_true', help='Force local execution of checks' From 7401baecf16465f9b0915ad60b07eb60b29ee038 Mon Sep 17 00:00:00 2001 From: Eirini Koutsaniti Date: Fri, 8 Apr 2022 09:50:53 +0200 Subject: [PATCH 24/43] Set versionadded for pin_nodes attribute --- reframe/core/schedulers/__init__.py | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/reframe/core/schedulers/__init__.py b/reframe/core/schedulers/__init__.py index 7bed278e8b..8a4965bb16 100644 --- a/reframe/core/schedulers/__init__.py +++ b/reframe/core/schedulers/__init__.py @@ -265,10 +265,12 @@ class Job(jsonext.JSONSerializable, metaclass=JobMeta): #: :type: :class:`reframe.core.launchers.JobLauncher` launcher = variable(JobLauncher) - # #: Pin nodes for the job - # #: - # #: :type: :class:`str` or :class:`None` - # #: :default: :class:`None` + #: Pin nodes for the job + #: + #: :type: :class:`str` or :class:`None` + #: :default: :class:`None` + #: + #: .. versionadded:: 3.11.0 pin_nodes = variable(type(None), str, value=None) # The sched_* arguments are exposed also to the frontend From 62c65c5d18cc468e4095a7ee7f8c428684ea7ae3 Mon Sep 17 00:00:00 2001 From: Eirini Koutsaniti Date: Fri, 8 Apr 2022 10:06:31 +0200 Subject: [PATCH 25/43] Remove skip_system_check and skip_prgenv_check from generate_testcases --- reframe/frontend/cli.py | 11 +++-------- unittests/test_distribute_tests.py | 2 +- 2 files changed, 4 insertions(+), 9 deletions(-) diff --git a/reframe/frontend/cli.py b/reframe/frontend/cli.py index 8cf7855e49..6a63fec4df 100644 --- a/reframe/frontend/cli.py +++ b/reframe/frontend/cli.py @@ -189,8 +189,7 @@ def calc_verbosity(site_config, quiesce): return curr_verbosity - quiesce -def distribute_tests(testcases, skip_system_check, skip_prgenv_check, - node_map): +def distribute_tests(testcases, node_map): tmp_registry = TestRegistry.create() new_checks = [] # We don't want to register the same check for every environment @@ -258,8 +257,7 @@ def _fn(obj): tmp_registry.add(nc, variant_num=i) new_checks = tmp_registry.instantiate_all() - return generate_testcases(new_checks, skip_system_check, - skip_prgenv_check) + return generate_testcases(new_checks) def main(): @@ -1115,10 +1113,7 @@ def _case_failed(t): if options.distribute: node_map = autodetect.getallnodes(options.distribute, parsed_job_options) - testcases = distribute_tests( - testcases, options.skip_system_check, - options.skip_prgenv_check, node_map - ) + testcases = distribute_tests(testcases, node_map) testcases_all = testcases # Prepare for running diff --git a/unittests/test_distribute_tests.py b/unittests/test_distribute_tests.py index fecee1d687..5d7e19ebce 100644 --- a/unittests/test_distribute_tests.py +++ b/unittests/test_distribute_tests.py @@ -39,7 +39,7 @@ def test_distribute_testcases(loader, default_exec_ctx): 'sys0:p0': ['n1', 'n2'], 'sys0:p1': ['n3'] } - new_cases = distribute_tests(testcases, False, False, node_map) + new_cases = distribute_tests(testcases, node_map) assert len(new_cases) == 6 count = sum(map(lambda x : x._partition.fullname == 'sys0:p0', new_cases)) assert count == 4 From ceb6e1bf0c8760511f5b7c335e7f3d0c599f7050 Mon Sep 17 00:00:00 2001 From: Eirini Koutsaniti Date: Fri, 8 Apr 2022 10:10:42 +0200 Subject: [PATCH 26/43] Fix pep8 issues --- reframe/frontend/cli.py | 5 ++--- unittests/test_distribute_tests.py | 9 +++++---- 2 files changed, 7 insertions(+), 7 deletions(-) diff --git a/reframe/frontend/cli.py b/reframe/frontend/cli.py index 6a63fec4df..7dfdf5f6d2 100644 --- a/reframe/frontend/cli.py +++ b/reframe/frontend/cli.py @@ -32,7 +32,6 @@ import reframe.utility.typecheck as typ - from reframe.core.decorators import TestRegistry from reframe.core.meta import make_test from reframe.frontend.printer import PrettyPrinter @@ -231,8 +230,8 @@ def _fn(obj): f'_D_{basename}_{partition.fullname.replace(":", "_")}', (cls, ), { - 'valid_systems' : [partition.fullname], - '$nid' : builtins.parameter(node_map[partition.fullname]) + 'valid_systems': [partition.fullname], + '$nid': builtins.parameter(node_map[partition.fullname]) }, methods=[ builtins.run_before('run')(_rfm_distributed_set_run_nodes), diff --git a/unittests/test_distribute_tests.py b/unittests/test_distribute_tests.py index 5d7e19ebce..ba5678e87f 100644 --- a/unittests/test_distribute_tests.py +++ b/unittests/test_distribute_tests.py @@ -15,6 +15,7 @@ def default_exec_ctx(make_exec_ctx_g): yield from make_exec_ctx_g(system='sys0') + @pytest.fixture def loader(): return RegressionCheckLoader([ @@ -30,9 +31,9 @@ def test_distribute_testcases(loader, default_exec_ctx): testcases = list(testcases) assert len(testcases) == 4 - count = sum(map(lambda x : x._partition.fullname == 'sys0:p0', testcases)) + count = sum(map(lambda x: x._partition.fullname == 'sys0:p0', testcases)) assert count == 2 - count = sum(map(lambda x : x._partition.fullname == 'sys0:p1', testcases)) + count = sum(map(lambda x: x._partition.fullname == 'sys0:p1', testcases)) assert count == 2 node_map = { @@ -41,9 +42,9 @@ def test_distribute_testcases(loader, default_exec_ctx): } new_cases = distribute_tests(testcases, node_map) assert len(new_cases) == 6 - count = sum(map(lambda x : x._partition.fullname == 'sys0:p0', new_cases)) + count = sum(map(lambda x: x._partition.fullname == 'sys0:p0', new_cases)) assert count == 4 - count = sum(map(lambda x : x._partition.fullname == 'sys0:p1', new_cases)) + count = sum(map(lambda x: x._partition.fullname == 'sys0:p1', new_cases)) assert count == 2 for c in new_cases: nodes = getattr(c.check, '$nid') From 4ac94b5c561d7a8af4fd6ff9ef59abd2824b2163 Mon Sep 17 00:00:00 2001 From: Eirini Koutsaniti Date: Tue, 12 Apr 2022 09:25:21 +0200 Subject: [PATCH 27/43] Address PR comments --- docs/manpage.rst | 12 ++-- reframe/core/decorators.py | 5 +- reframe/core/schedulers/__init__.py | 12 ++-- reframe/core/schedulers/slurm.py | 7 +- reframe/frontend/cli.py | 71 ++++++++++--------- .../{alloc_check.py => distribute.py} | 0 unittests/test_cli.py | 4 +- unittests/test_distribute_tests.py | 6 +- 8 files changed, 64 insertions(+), 53 deletions(-) rename unittests/resources/checks_unlisted/{alloc_check.py => distribute.py} (100%) diff --git a/docs/manpage.rst b/docs/manpage.rst index 45c68d91f5..73b03ef967 100644 --- a/docs/manpage.rst +++ b/docs/manpage.rst @@ -379,17 +379,21 @@ Options controlling ReFrame execution .. versionadded:: 3.2 -.. option:: --distribute=STATE +.. option:: --distribute=NODESTATE - Parameterize and run tests on all the nodes of their partitions. - The attribute ``pin_nodes`` will be set in these tests in order to be submitted on the selected nodes. + Distribute the tests on all the nodes in state ``NODESTATE`` in their respective valid partitions. + + ReFrame will parameterize and run the tests on the selected nodes. + In order to do that, it will dynamically create new tests that will inherit all the attributes of the original tests and contain one more parameter, ``$nid``, with the node that it will run on. + The new ReFrame classes are named ``_D_{basetest}_{partition}``. Currently this will work correctly only for one-node tests in local or Slurm partitions, and it will take into account the cli jobs options that are passed by the user. + Dependendecies will also be affected, since the names of the tests will be changed. You can decide the state of the nodes that will be considered: - ``all``: Tests will be parameterized over all the nodes of their partitions. - - ``STATE``: Tests will run on all the nodes in state ``STATE``, for example ``idle``. + - ``NODESTATE``: Tests will run on all the nodes in state ``NODESTATE``, for example ``idle``. The states of the nodes will be determined once, before beginning the execution of the tests so it might be different in the time of the submission of the tests. diff --git a/reframe/core/decorators.py b/reframe/core/decorators.py index da196c0869..694ae0c1e8 100644 --- a/reframe/core/decorators.py +++ b/reframe/core/decorators.py @@ -50,10 +50,9 @@ def __init__(self): self._skip_tests = set() @classmethod - def create(cls, test=None, *args, **kwargs): + def create(cls, test, *args, **kwargs): obj = cls() - if test: - obj.add(test, *args, **kwargs) + obj.add(test, *args, **kwargs) return obj diff --git a/reframe/core/schedulers/__init__.py b/reframe/core/schedulers/__init__.py index 8a4965bb16..ccc629fdc6 100644 --- a/reframe/core/schedulers/__init__.py +++ b/reframe/core/schedulers/__init__.py @@ -265,13 +265,17 @@ class Job(jsonext.JSONSerializable, metaclass=JobMeta): #: :type: :class:`reframe.core.launchers.JobLauncher` launcher = variable(JobLauncher) - #: Pin nodes for the job + #: Pin the jobs on the given nodes. #: - #: :type: :class:`str` or :class:`None` - #: :default: :class:`None` + #: The list of nodes will be transformed to a suitable string and be + #: passed to the scheduler's options. Currently it will have an effect + #: only for the Slurm scheduler. + #: + #: :type: :class:`List[str]` + #: :default: ``[]`` #: #: .. versionadded:: 3.11.0 - pin_nodes = variable(type(None), str, value=None) + pin_nodes = variable(typ.List[str], value=[]) # The sched_* arguments are exposed also to the frontend def __init__(self, diff --git a/reframe/core/schedulers/slurm.py b/reframe/core/schedulers/slurm.py index e505d685d9..fcc328859f 100644 --- a/reframe/core/schedulers/slurm.py +++ b/reframe/core/schedulers/slurm.py @@ -19,7 +19,7 @@ JobBlockedError, JobError, JobSchedulerError) -from reframe.utility import seconds_to_hms +from reframe.utility import nodelist_abbrev, seconds_to_hms def slurm_state_completed(state): @@ -194,7 +194,10 @@ def emit_preamble(self, job): if job.pin_nodes: preamble.append( - self._format_option(job.pin_nodes, '--nodelist={0}') + self._format_option( + nodelist_abbrev(job.pin_nodes), + '--nodelist={0}' + ) ) for opt in job.sched_access: diff --git a/reframe/frontend/cli.py b/reframe/frontend/cli.py index 7dfdf5f6d2..4e806a50e9 100644 --- a/reframe/frontend/cli.py +++ b/reframe/frontend/cli.py @@ -189,59 +189,61 @@ def calc_verbosity(site_config, quiesce): def distribute_tests(testcases, node_map): - tmp_registry = TestRegistry.create() + tmp_registry = TestRegistry() new_checks = [] # We don't want to register the same check for every environment # per partition - registered_checks = set() + check_part_combs = set() for tc in testcases: - check, partition, environ = tc - candidate_check = (check.unique_name, partition) - if check.is_fixture() or candidate_check in registered_checks: + check, partition, _ = tc + candidate_check = (check.unique_name, partition.fullname) + if check.is_fixture() or candidate_check in check_part_combs: continue - registered_checks.add(candidate_check) + check_part_combs.add(candidate_check) cls = type(check) - basename = cls.__name__ - original_var_info = cls.get_variant_info( + variant_info = cls.get_variant_info( check.variant_num, recurse=True ) - def _rfm_distributed_set_run_nodes(obj): - pin_nodes = getattr(obj, '$nid') + def _rfm_pin_run_nodes(obj): + nodelist = getattr(obj, '$nid') if not obj.local: - obj.job.pin_nodes = pin_nodes + obj.job.pin_nodes = nodelist - def _rfm_distributed_set_build_nodes(obj): + def _rfm_pin_build_nodes(obj): pin_nodes = getattr(obj, '$nid') if not obj.local and not obj.build_locally: obj.build_job.pin_nodes = pin_nodes - # We re-set the valid system and environment in a hook to - # make sure that it will not be overwriten by a parent - # post-init hook - def _rfm_distributed_set_valid_sys(systems): - def _fn(obj): + def make_valid_systems_hook(systems): + '''Returns a function to be used as a hook that sets the + valid systems. + + Since valid_systems change for each generated test, we need to + generate different post-init hooks for each one of them. + ''' + def _rfm_set_valid_systems(obj): obj.valid_systems = systems - return _fn + return _rfm_set_valid_systems nc = make_test( - f'_D_{basename}_{partition.fullname.replace(":", "_")}', - (cls, ), + f'_D_{cls.__name__}_{partition.fullname.replace(":", "_")}', + (cls,), { 'valid_systems': [partition.fullname], - '$nid': builtins.parameter(node_map[partition.fullname]) + '$nid': builtins.parameter( + [[n] for n in node_map[partition.fullname]] + ) }, methods=[ - builtins.run_before('run')(_rfm_distributed_set_run_nodes), - builtins.run_before('compile')( - _rfm_distributed_set_build_nodes - ), + builtins.run_before('run')(_rfm_pin_run_nodes), + builtins.run_before('compile')(_rfm_pin_build_nodes), + # We re-set the valid system in a hook to make sure that it + # will not be overwriten by a parent post-init hook builtins.run_after('init')( - _rfm_distributed_set_valid_sys( - [partition.fullname] - ) + make_valid_systems_hook([partition.fullname]) ), ] ) @@ -250,9 +252,9 @@ def _fn(obj): for i in range(nc.num_variants): # Check if this variant should be instantiated - var_info = copy.deepcopy(nc.get_variant_info(i, recurse=True)) - var_info['params'].pop('$nid') - if var_info == original_var_info: + vinfo = nc.get_variant_info(i, recurse=True) + vinfo['params'].pop('$nid') + if vinfo == variant_info: tmp_registry.add(nc, variant_num=i) new_checks = tmp_registry.instantiate_all() @@ -446,10 +448,9 @@ def main(): default=[], help='Disable a pipeline hook for this run' ) run_options.add_argument( - '--distribute', action='store', default=None, - dest='distribute', metavar='{all|STATE}', - help=('Submit single node jobs automatically on every node of a ' - 'partition in STATE') + '--distribute', action='store', default=None, metavar='{all|STATE}', + help=('Distribute the selected single-node jobs on every node that' + 'is in STATE') ) run_options.add_argument( '--exec-policy', metavar='POLICY', action='store', diff --git a/unittests/resources/checks_unlisted/alloc_check.py b/unittests/resources/checks_unlisted/distribute.py similarity index 100% rename from unittests/resources/checks_unlisted/alloc_check.py rename to unittests/resources/checks_unlisted/distribute.py diff --git a/unittests/test_cli.py b/unittests/test_cli.py index 3954271e69..ce762d2a6d 100644 --- a/unittests/test_cli.py +++ b/unittests/test_cli.py @@ -923,7 +923,7 @@ def test_dynamic_tests(run_reframe, tmp_path): returncode, stdout, _ = run_reframe( system='sys0', environs=[], - checkpath=['unittests/resources/checks_unlisted/alloc_check.py'], + checkpath=['unittests/resources/checks_unlisted/distribute.py'], action='run', more_options=['-n', 'Complex', '--distribute=idle'] ) @@ -937,7 +937,7 @@ def test_dynamic_tests_filtering(run_reframe, tmp_path): returncode, stdout, _ = run_reframe( system='sys1', environs=[], - checkpath=['unittests/resources/checks_unlisted/alloc_check.py'], + checkpath=['unittests/resources/checks_unlisted/distribute.py'], action='run', more_options=['-n', 'Complex@1', '--distribute=idle'] ) diff --git a/unittests/test_distribute_tests.py b/unittests/test_distribute_tests.py index ba5678e87f..d4626b0870 100644 --- a/unittests/test_distribute_tests.py +++ b/unittests/test_distribute_tests.py @@ -19,7 +19,7 @@ def default_exec_ctx(make_exec_ctx_g): @pytest.fixture def loader(): return RegressionCheckLoader([ - 'unittests/resources/checks_unlisted/alloc_check.py' + 'unittests/resources/checks_unlisted/distribute.py' ]) @@ -49,6 +49,6 @@ def test_distribute_testcases(loader, default_exec_ctx): for c in new_cases: nodes = getattr(c.check, '$nid') if c._partition.fullname == 'sys0:p0': - assert nodes in ('n1', 'n2') + assert nodes in (['n1'], ['n2']) else: - assert nodes == 'n3' + assert nodes == ['n3'] From 4aeba8e3219537f3e555e50c6cfc2b8734b88694 Mon Sep 17 00:00:00 2001 From: Eirini Koutsaniti Date: Tue, 12 Apr 2022 09:48:05 +0200 Subject: [PATCH 28/43] Move distribute_tests and getallnodes to a separate module --- reframe/frontend/autodetect.py | 32 ------- reframe/frontend/cli.py | 80 +---------------- reframe/frontend/distribute_tests.py | 125 +++++++++++++++++++++++++++ unittests/test_distribute_tests.py | 2 +- 4 files changed, 128 insertions(+), 111 deletions(-) create mode 100644 reframe/frontend/distribute_tests.py diff --git a/reframe/frontend/autodetect.py b/reframe/frontend/autodetect.py index 512bd68242..21d50d9d2e 100644 --- a/reframe/frontend/autodetect.py +++ b/reframe/frontend/autodetect.py @@ -250,35 +250,3 @@ def detect_topology(): if not found_devinfo: getlogger().debug(f'> device auto-detection is not supported') - - -def getallnodes(state='all', jobs_cli_options=[]): - rt = runtime.runtime() - nodes = {} - for part in rt.system.partitions: - # This job will not be submitted, it's used only to filter - # the nodes based on the partition configuration - dummy_job = Job.create(part.scheduler, - part.launcher_type(), - name='placeholder-job', - sched_access=part.access, - sched_options=jobs_cli_options) - - available_nodes = part.scheduler.allnodes() - available_nodes = part.scheduler.filternodes(dummy_job, - available_nodes) - getlogger().debug( - f'Total available nodes for {part.name}: {len(available_nodes)}' - ) - - if state.casefold() != 'all': - available_nodes = {n for n in available_nodes - if n.in_state(state)} - getlogger().debug( - f'[F] Selecting nodes in state {state!r}: ' - f'available nodes now: {len(available_nodes)}' - ) - - nodes[part.fullname] = [n.name for n in available_nodes] - - return nodes diff --git a/reframe/frontend/cli.py b/reframe/frontend/cli.py index 4e806a50e9..d8153fc633 100644 --- a/reframe/frontend/cli.py +++ b/reframe/frontend/cli.py @@ -15,7 +15,6 @@ import traceback import reframe -import reframe.core.builtins as builtins import reframe.core.config as config import reframe.core.exceptions as errors import reframe.core.logging as logging @@ -32,8 +31,7 @@ import reframe.utility.typecheck as typ -from reframe.core.decorators import TestRegistry -from reframe.core.meta import make_test +from reframe.frontend.distribute_tests import distribute_tests, getallnodes from reframe.frontend.printer import PrettyPrinter from reframe.frontend.loader import RegressionCheckLoader from reframe.frontend.executors.policies import (SerialExecutionPolicy, @@ -188,79 +186,6 @@ def calc_verbosity(site_config, quiesce): return curr_verbosity - quiesce -def distribute_tests(testcases, node_map): - tmp_registry = TestRegistry() - new_checks = [] - # We don't want to register the same check for every environment - # per partition - check_part_combs = set() - for tc in testcases: - check, partition, _ = tc - candidate_check = (check.unique_name, partition.fullname) - if check.is_fixture() or candidate_check in check_part_combs: - continue - - check_part_combs.add(candidate_check) - cls = type(check) - variant_info = cls.get_variant_info( - check.variant_num, recurse=True - ) - - def _rfm_pin_run_nodes(obj): - nodelist = getattr(obj, '$nid') - if not obj.local: - obj.job.pin_nodes = nodelist - - def _rfm_pin_build_nodes(obj): - pin_nodes = getattr(obj, '$nid') - if not obj.local and not obj.build_locally: - obj.build_job.pin_nodes = pin_nodes - - def make_valid_systems_hook(systems): - '''Returns a function to be used as a hook that sets the - valid systems. - - Since valid_systems change for each generated test, we need to - generate different post-init hooks for each one of them. - ''' - def _rfm_set_valid_systems(obj): - obj.valid_systems = systems - - return _rfm_set_valid_systems - - nc = make_test( - f'_D_{cls.__name__}_{partition.fullname.replace(":", "_")}', - (cls,), - { - 'valid_systems': [partition.fullname], - '$nid': builtins.parameter( - [[n] for n in node_map[partition.fullname]] - ) - }, - methods=[ - builtins.run_before('run')(_rfm_pin_run_nodes), - builtins.run_before('compile')(_rfm_pin_build_nodes), - # We re-set the valid system in a hook to make sure that it - # will not be overwriten by a parent post-init hook - builtins.run_after('init')( - make_valid_systems_hook([partition.fullname]) - ), - ] - ) - # We have to set the prefix manually - nc._rfm_custom_prefix = check.prefix - - for i in range(nc.num_variants): - # Check if this variant should be instantiated - vinfo = nc.get_variant_info(i, recurse=True) - vinfo['params'].pop('$nid') - if vinfo == variant_info: - tmp_registry.add(nc, variant_num=i) - - new_checks = tmp_registry.instantiate_all() - return generate_testcases(new_checks) - - def main(): # Setup command line options argparser = argparse.ArgumentParser() @@ -1111,8 +1036,7 @@ def _case_failed(t): ) if options.distribute: - node_map = autodetect.getallnodes(options.distribute, - parsed_job_options) + node_map = getallnodes(options.distribute, parsed_job_options) testcases = distribute_tests(testcases, node_map) testcases_all = testcases diff --git a/reframe/frontend/distribute_tests.py b/reframe/frontend/distribute_tests.py new file mode 100644 index 0000000000..f742278631 --- /dev/null +++ b/reframe/frontend/distribute_tests.py @@ -0,0 +1,125 @@ +# Copyright 2016-2022 Swiss National Supercomputing Centre (CSCS/ETH Zurich) +# ReFrame Project Developers. See the top-level LICENSE file for details. +# +# SPDX-License-Identifier: BSD-3-Clause + + +import reframe.core.builtins as builtins +import reframe.core.runtime as runtime + +from reframe.core.decorators import TestRegistry +from reframe.core.logging import getlogger +from reframe.core.meta import make_test +from reframe.core.schedulers import Job +from reframe.frontend.executors import generate_testcases + + +def getallnodes(state='all', jobs_cli_options=[]): + rt = runtime.runtime() + nodes = {} + for part in rt.system.partitions: + # This job will not be submitted, it's used only to filter + # the nodes based on the partition configuration + dummy_job = Job.create(part.scheduler, + part.launcher_type(), + name='placeholder-job', + sched_access=part.access, + sched_options=jobs_cli_options) + + available_nodes = part.scheduler.allnodes() + available_nodes = part.scheduler.filternodes(dummy_job, + available_nodes) + getlogger().debug( + f'Total available nodes for {part.name}: {len(available_nodes)}' + ) + + if state.casefold() != 'all': + available_nodes = {n for n in available_nodes + if n.in_state(state)} + getlogger().debug( + f'[F] Selecting nodes in state {state!r}: ' + f'available nodes now: {len(available_nodes)}' + ) + + nodes[part.fullname] = [n.name for n in available_nodes] + + return nodes + + +def _rfm_pin_run_nodes(obj): + nodelist = getattr(obj, '$nid') + if not obj.local: + obj.job.pin_nodes = nodelist + + +def _rfm_pin_build_nodes(obj): + pin_nodes = getattr(obj, '$nid') + if not obj.local and not obj.build_locally: + obj.build_job.pin_nodes = pin_nodes + + +def make_valid_systems_hook(systems): + '''Returns a function to be used as a hook that sets the + valid systems. + + Since valid_systems change for each generated test, we need to + generate different post-init hooks for each one of them. + ''' + def _rfm_set_valid_systems(obj): + obj.valid_systems = systems + + return _rfm_set_valid_systems + + +def distribute_tests(testcases, node_map): + '''Returns new testcases that will be parameterized to run in node of + their partitions based on the nodemap + ''' + tmp_registry = TestRegistry() + new_checks = [] + # We don't want to register the same check for every environment + # per partition + check_part_combs = set() + for tc in testcases: + check, partition, _ = tc + candidate_check = (check.unique_name, partition.fullname) + if check.is_fixture() or candidate_check in check_part_combs: + continue + + check_part_combs.add(candidate_check) + cls = type(check) + variant_info = cls.get_variant_info( + check.variant_num, recurse=True + ) + + nc = make_test( + f'_D_{cls.__name__}_{partition.fullname.replace(":", "_")}', + (cls,), + { + 'valid_systems': [partition.fullname], + '$nid': builtins.parameter( + [[n] for n in node_map[partition.fullname]] + ) + }, + methods=[ + builtins.run_before('run')(_rfm_pin_run_nodes), + builtins.run_before('compile')(_rfm_pin_build_nodes), + # We re-set the valid system in a hook to make sure that it + # will not be overwriten by a parent post-init hook + builtins.run_after('init')( + make_valid_systems_hook([partition.fullname]) + ), + ] + ) + # We have to set the prefix manually + nc._rfm_custom_prefix = check.prefix + + for i in range(nc.num_variants): + # Check if this variant should be instantiated + vinfo = nc.get_variant_info(i, recurse=True) + vinfo['params'].pop('$nid') + if vinfo == variant_info: + tmp_registry.add(nc, variant_num=i) + + new_checks = tmp_registry.instantiate_all() + return generate_testcases(new_checks) diff --git a/unittests/test_distribute_tests.py b/unittests/test_distribute_tests.py index d4626b0870..704c1edc84 100644 --- a/unittests/test_distribute_tests.py +++ b/unittests/test_distribute_tests.py @@ -7,7 +7,7 @@ import reframe.frontend.executors as executors import reframe.frontend.filters as filters -from reframe.frontend.cli import distribute_tests +from reframe.frontend.distribute_tests import distribute_tests from reframe.frontend.loader import RegressionCheckLoader From 91f699a2c97d6f4de0b70d1933d3812fb4d25ef4 Mon Sep 17 00:00:00 2001 From: Eirini Koutsaniti Date: Tue, 12 Apr 2022 09:55:00 +0200 Subject: [PATCH 29/43] Remove unused imports --- reframe/frontend/cli.py | 1 - 1 file changed, 1 deletion(-) diff --git a/reframe/frontend/cli.py b/reframe/frontend/cli.py index d8153fc633..9e8684acd1 100644 --- a/reframe/frontend/cli.py +++ b/reframe/frontend/cli.py @@ -3,7 +3,6 @@ # # SPDX-License-Identifier: BSD-3-Clause -import copy import inspect import itertools import json From 4810a5e56dd5d5141eaf0726e604ab1d7420115f Mon Sep 17 00:00:00 2001 From: Eirini Koutsaniti Date: Tue, 12 Apr 2022 12:17:56 +0200 Subject: [PATCH 30/43] Remove empty line --- reframe/core/decorators.py | 1 - 1 file changed, 1 deletion(-) diff --git a/reframe/core/decorators.py b/reframe/core/decorators.py index 694ae0c1e8..997cd25b67 100644 --- a/reframe/core/decorators.py +++ b/reframe/core/decorators.py @@ -53,7 +53,6 @@ def __init__(self): def create(cls, test, *args, **kwargs): obj = cls() obj.add(test, *args, **kwargs) - return obj def add(self, test, *args, **kwargs): From 994354d71741eaf3dd7ba8a1abbe2d33e220c119 Mon Sep 17 00:00:00 2001 From: Eirini Koutsaniti Date: Tue, 12 Apr 2022 16:30:11 +0200 Subject: [PATCH 31/43] Remove incorrect rerun message --- reframe/frontend/cli.py | 2 +- reframe/frontend/statistics.py | 23 +++++++++++++---------- 2 files changed, 14 insertions(+), 11 deletions(-) diff --git a/reframe/frontend/cli.py b/reframe/frontend/cli.py index 9e8684acd1..fb171d8459 100644 --- a/reframe/frontend/cli.py +++ b/reframe/frontend/cli.py @@ -1248,7 +1248,7 @@ def module_unuse(*paths): success = True if runner.stats.failed(): success = False - runner.stats.print_failure_report(printer) + runner.stats.print_failure_report(printer, options.distribute) if options.failure_stats: runner.stats.print_failure_stats(printer) diff --git a/reframe/frontend/statistics.py b/reframe/frontend/statistics.py index 5ac29ec0c6..152cdbb1a0 100644 --- a/reframe/frontend/statistics.py +++ b/reframe/frontend/statistics.py @@ -203,7 +203,7 @@ def json(self, force=False): return self._run_data - def print_failure_report(self, printer): + def print_failure_report(self, printer, distribute_run=None): line_width = 78 printer.info(line_width * '=') printer.info('SUMMARY OF FAILURES') @@ -227,7 +227,6 @@ def print_failure_report(self, printer): f" * Node list: {util.nodelist_abbrev(r['nodelist'])}" ) job_type = 'local' if r['scheduler'] == 'local' else 'batch job' - jobid = r['jobid'] printer.info(f" * Job type: {job_type} (id={r['jobid']})") printer.info(f" * Dependencies (conceptual): " f"{r['dependencies_conceptual']}") @@ -235,15 +234,19 @@ def print_failure_report(self, printer): f"{r['dependencies_actual']}") printer.info(f" * Maintainers: {r['maintainers']}") printer.info(f" * Failing phase: {r['fail_phase']}") - if rt.runtime().get_option('general/0/compact_test_names'): - cls = r['display_name'].split(' ')[0] - variant = r['unique_name'].replace(cls, '').replace('_', '@') - nameoptarg = cls + variant - else: - nameoptarg = r['unique_name'] + if not distribute_run: + if rt.runtime().get_option('general/0/compact_test_names'): + cls = r['display_name'].split(' ')[0] + variant = r['unique_name'].replace(cls, '') + variant = variant.replace('_', '@') + nameoptarg = cls + variant + else: + nameoptarg = r['unique_name'] + + printer.info(f" * Rerun with '-n {nameoptarg}" + f" -p {r['environment']} --system " + f"{r['system']} -r'") - printer.info(f" * Rerun with '-n {nameoptarg}" - f" -p {r['environment']} --system {r['system']} -r'") printer.info(f" * Reason: {r['fail_reason']}") tb = ''.join(traceback.format_exception(*r['fail_info'].values())) From a15a0b01050ceeb6520806756ec7aaa769ebf410 Mon Sep 17 00:00:00 2001 From: Eirini Koutsaniti Date: Tue, 12 Apr 2022 16:55:07 +0200 Subject: [PATCH 32/43] Address PR comments --- docs/manpage.rst | 4 ++-- reframe/frontend/distribute_tests.py | 7 +++---- unittests/test_distribute_tests.py | 14 +++++++------- 3 files changed, 12 insertions(+), 13 deletions(-) diff --git a/docs/manpage.rst b/docs/manpage.rst index 73b03ef967..528344c325 100644 --- a/docs/manpage.rst +++ b/docs/manpage.rst @@ -381,14 +381,14 @@ Options controlling ReFrame execution .. option:: --distribute=NODESTATE - Distribute the tests on all the nodes in state ``NODESTATE`` in their respective valid partitions. + Distribute the selected tests on all the nodes in state ``NODESTATE`` in their respective valid partitions. ReFrame will parameterize and run the tests on the selected nodes. In order to do that, it will dynamically create new tests that will inherit all the attributes of the original tests and contain one more parameter, ``$nid``, with the node that it will run on. The new ReFrame classes are named ``_D_{basetest}_{partition}``. Currently this will work correctly only for one-node tests in local or Slurm partitions, and it will take into account the cli jobs options that are passed by the user. - Dependendecies will also be affected, since the names of the tests will be changed. + This feature will not work with dependencies, since the names of the tests will be changed, but it will work with fixtures. You can decide the state of the nodes that will be considered: diff --git a/reframe/frontend/distribute_tests.py b/reframe/frontend/distribute_tests.py index f742278631..6c363f439f 100644 --- a/reframe/frontend/distribute_tests.py +++ b/reframe/frontend/distribute_tests.py @@ -82,16 +82,15 @@ def distribute_tests(testcases, node_map): check_part_combs = set() for tc in testcases: check, partition, _ = tc - candidate_check = (check.unique_name, partition.fullname) - if check.is_fixture() or candidate_check in check_part_combs: + candidate_comb = (check.unique_name, partition.fullname) + if check.is_fixture() or candidate_comb in check_part_combs: continue - check_part_combs.add(candidate_check) + check_part_combs.add(candidate_comb) cls = type(check) variant_info = cls.get_variant_info( check.variant_num, recurse=True ) - nc = make_test( f'_D_{cls.__name__}_{partition.fullname.replace(":", "_")}', (cls,), diff --git a/unittests/test_distribute_tests.py b/unittests/test_distribute_tests.py index 704c1edc84..c5b01398f4 100644 --- a/unittests/test_distribute_tests.py +++ b/unittests/test_distribute_tests.py @@ -31,9 +31,9 @@ def test_distribute_testcases(loader, default_exec_ctx): testcases = list(testcases) assert len(testcases) == 4 - count = sum(map(lambda x: x._partition.fullname == 'sys0:p0', testcases)) + count = sum(map(lambda x: x.partition.fullname == 'sys0:p0', testcases)) assert count == 2 - count = sum(map(lambda x: x._partition.fullname == 'sys0:p1', testcases)) + count = sum(map(lambda x: x.partition.fullname == 'sys0:p1', testcases)) assert count == 2 node_map = { @@ -42,13 +42,13 @@ def test_distribute_testcases(loader, default_exec_ctx): } new_cases = distribute_tests(testcases, node_map) assert len(new_cases) == 6 - count = sum(map(lambda x: x._partition.fullname == 'sys0:p0', new_cases)) + count = sum(map(lambda x: x.partition.fullname == 'sys0:p0', new_cases)) assert count == 4 - count = sum(map(lambda x: x._partition.fullname == 'sys0:p1', new_cases)) + count = sum(map(lambda x: x.partition.fullname == 'sys0:p1', new_cases)) assert count == 2 - for c in new_cases: - nodes = getattr(c.check, '$nid') - if c._partition.fullname == 'sys0:p0': + for tc in new_cases: + nodes = getattr(tc.check, '$nid') + if tc.partition.fullname == 'sys0:p0': assert nodes in (['n1'], ['n2']) else: assert nodes == ['n3'] From a0ed9b347a81c07131b194f78049211ff550ed67 Mon Sep 17 00:00:00 2001 From: Eirini Koutsaniti Date: Tue, 12 Apr 2022 17:16:37 +0200 Subject: [PATCH 33/43] Add formatting function for nodelist --- reframe/frontend/distribute_tests.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/reframe/frontend/distribute_tests.py b/reframe/frontend/distribute_tests.py index 6c363f439f..c4cb572f5c 100644 --- a/reframe/frontend/distribute_tests.py +++ b/reframe/frontend/distribute_tests.py @@ -6,6 +6,7 @@ import reframe.core.builtins as builtins import reframe.core.runtime as runtime +import reframe.utility as util from reframe.core.decorators import TestRegistry from reframe.core.logging import getlogger @@ -97,7 +98,8 @@ def distribute_tests(testcases, node_map): { 'valid_systems': [partition.fullname], '$nid': builtins.parameter( - [[n] for n in node_map[partition.fullname]] + [[n] for n in node_map[partition.fullname]], + fmt=util.nodelist_abbrev ) }, methods=[ From 9e69b34be33783b89c705c6503832aa77e9c3bab Mon Sep 17 00:00:00 2001 From: Eirini Koutsaniti Date: Tue, 12 Apr 2022 17:32:06 +0200 Subject: [PATCH 34/43] Add default to distribute option --- docs/manpage.rst | 4 +++- reframe/frontend/cli.py | 5 +++-- 2 files changed, 6 insertions(+), 3 deletions(-) diff --git a/docs/manpage.rst b/docs/manpage.rst index 528344c325..98b012c81a 100644 --- a/docs/manpage.rst +++ b/docs/manpage.rst @@ -379,7 +379,7 @@ Options controlling ReFrame execution .. versionadded:: 3.2 -.. option:: --distribute=NODESTATE +.. option:: --distribute[=NODESTATE] Distribute the selected tests on all the nodes in state ``NODESTATE`` in their respective valid partitions. @@ -397,6 +397,8 @@ Options controlling ReFrame execution The states of the nodes will be determined once, before beginning the execution of the tests so it might be different in the time of the submission of the tests. + If ``NODESTATE`` is not passed it will take ``idle`` as default. + .. versionadded:: 3.11.0 .. option:: --exec-policy=POLICY diff --git a/reframe/frontend/cli.py b/reframe/frontend/cli.py index fb171d8459..394234dbae 100644 --- a/reframe/frontend/cli.py +++ b/reframe/frontend/cli.py @@ -372,9 +372,10 @@ def main(): default=[], help='Disable a pipeline hook for this run' ) run_options.add_argument( - '--distribute', action='store', default=None, metavar='{all|STATE}', + '--distribute', action='store', metavar='{all|STATE}', + nargs='?', const='idle', help=('Distribute the selected single-node jobs on every node that' - 'is in STATE') + 'is in STATE (default: "idle"') ) run_options.add_argument( '--exec-policy', metavar='POLICY', action='store', From 7ef2a45a9905c23069f0e80cecccd37ee194c459 Mon Sep 17 00:00:00 2001 From: Eirini Koutsaniti Date: Wed, 13 Apr 2022 09:14:00 +0200 Subject: [PATCH 35/43] Make test_distribute_testcases stricter --- unittests/test_distribute_tests.py | 12 +++++++++++- 1 file changed, 11 insertions(+), 1 deletion(-) diff --git a/unittests/test_distribute_tests.py b/unittests/test_distribute_tests.py index c5b01398f4..7705129b85 100644 --- a/unittests/test_distribute_tests.py +++ b/unittests/test_distribute_tests.py @@ -46,9 +46,19 @@ def test_distribute_testcases(loader, default_exec_ctx): assert count == 4 count = sum(map(lambda x: x.partition.fullname == 'sys0:p1', new_cases)) assert count == 2 + + def sys0p0_nodes(): + for nodelist in (['n2'], ['n2'], ['n1'], ['n1']): + yield nodelist + + nodelist_iter = sys0p0_nodes() for tc in new_cases: nodes = getattr(tc.check, '$nid') if tc.partition.fullname == 'sys0:p0': - assert nodes in (['n1'], ['n2']) + assert nodes == next(nodelist_iter) else: assert nodes == ['n3'] + + # Make sure we have consumed all the elements from nodelist_iter + with pytest.raises(StopIteration): + next(nodelist_iter) From 7c9e5faa5381f38336538b41cad72ec2fe7b35be Mon Sep 17 00:00:00 2001 From: Eirini Koutsaniti Date: Wed, 13 Apr 2022 10:50:32 +0200 Subject: [PATCH 36/43] Remove _D_ from class name --- docs/manpage.rst | 2 +- reframe/frontend/distribute_tests.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/docs/manpage.rst b/docs/manpage.rst index 98b012c81a..2e3d13d078 100644 --- a/docs/manpage.rst +++ b/docs/manpage.rst @@ -385,7 +385,7 @@ Options controlling ReFrame execution ReFrame will parameterize and run the tests on the selected nodes. In order to do that, it will dynamically create new tests that will inherit all the attributes of the original tests and contain one more parameter, ``$nid``, with the node that it will run on. - The new ReFrame classes are named ``_D_{basetest}_{partition}``. + The new ReFrame classes are named ``{basetest}_{partition}``. Currently this will work correctly only for one-node tests in local or Slurm partitions, and it will take into account the cli jobs options that are passed by the user. This feature will not work with dependencies, since the names of the tests will be changed, but it will work with fixtures. diff --git a/reframe/frontend/distribute_tests.py b/reframe/frontend/distribute_tests.py index c4cb572f5c..aa3766e856 100644 --- a/reframe/frontend/distribute_tests.py +++ b/reframe/frontend/distribute_tests.py @@ -93,7 +93,7 @@ def distribute_tests(testcases, node_map): check.variant_num, recurse=True ) nc = make_test( - f'_D_{cls.__name__}_{partition.fullname.replace(":", "_")}', + f'{cls.__name__}_{partition.fullname.replace(":", "_")}', (cls,), { 'valid_systems': [partition.fullname], From 212cd8b379177222ef976f34619de5c017b735b0 Mon Sep 17 00:00:00 2001 From: Eirini Koutsaniti Date: Wed, 13 Apr 2022 13:25:02 +0200 Subject: [PATCH 37/43] Address comments --- reframe/frontend/cli.py | 2 +- reframe/frontend/statistics.py | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/reframe/frontend/cli.py b/reframe/frontend/cli.py index 394234dbae..3e04286428 100644 --- a/reframe/frontend/cli.py +++ b/reframe/frontend/cli.py @@ -1249,7 +1249,7 @@ def module_unuse(*paths): success = True if runner.stats.failed(): success = False - runner.stats.print_failure_report(printer, options.distribute) + runner.stats.print_failure_report(printer, not options.distribute) if options.failure_stats: runner.stats.print_failure_stats(printer) diff --git a/reframe/frontend/statistics.py b/reframe/frontend/statistics.py index 152cdbb1a0..8457958eb5 100644 --- a/reframe/frontend/statistics.py +++ b/reframe/frontend/statistics.py @@ -203,7 +203,7 @@ def json(self, force=False): return self._run_data - def print_failure_report(self, printer, distribute_run=None): + def print_failure_report(self, printer, rerun_info=True): line_width = 78 printer.info(line_width * '=') printer.info('SUMMARY OF FAILURES') @@ -234,7 +234,7 @@ def print_failure_report(self, printer, distribute_run=None): f"{r['dependencies_actual']}") printer.info(f" * Maintainers: {r['maintainers']}") printer.info(f" * Failing phase: {r['fail_phase']}") - if not distribute_run: + if rerun_info: if rt.runtime().get_option('general/0/compact_test_names'): cls = r['display_name'].split(' ')[0] variant = r['unique_name'].replace(cls, '') From 19b6aac8d931c36cbde4095b658a0532659aeaec Mon Sep 17 00:00:00 2001 From: Eirini Koutsaniti Date: Wed, 13 Apr 2022 13:47:03 +0200 Subject: [PATCH 38/43] Split long line --- reframe/frontend/cli.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/reframe/frontend/cli.py b/reframe/frontend/cli.py index 3e04286428..8c1489f4c9 100644 --- a/reframe/frontend/cli.py +++ b/reframe/frontend/cli.py @@ -1249,7 +1249,9 @@ def module_unuse(*paths): success = True if runner.stats.failed(): success = False - runner.stats.print_failure_report(printer, not options.distribute) + runner.stats.print_failure_report( + printer, not options.distribute + ) if options.failure_stats: runner.stats.print_failure_stats(printer) From 8f7c3997a829554221c3ecdd20464d1fbe23f004 Mon Sep 17 00:00:00 2001 From: Eirini Koutsaniti Date: Wed, 13 Apr 2022 17:00:57 +0200 Subject: [PATCH 39/43] Fix bug in cli options --- reframe/core/schedulers/slurm.py | 3 +++ reframe/frontend/cli.py | 8 ++++++++ 2 files changed, 11 insertions(+) diff --git a/reframe/core/schedulers/slurm.py b/reframe/core/schedulers/slurm.py index fcc328859f..a4ad91fede 100644 --- a/reframe/core/schedulers/slurm.py +++ b/reframe/core/schedulers/slurm.py @@ -7,6 +7,7 @@ import glob import itertools import re +import shlex import time from argparse import ArgumentParser from contextlib import suppress @@ -305,6 +306,8 @@ def filternodes(self, job, nodes): # create a mutable list out of the immutable SequenceView that # sched_access is options = job.sched_access + job.options + job.cli_options + options = list(itertools.chain.from_iterable(shlex.split(m) + for m in options)) option_parser = ArgumentParser() option_parser.add_argument('--reservation') option_parser.add_argument('-p', '--partition') diff --git a/reframe/frontend/cli.py b/reframe/frontend/cli.py index 8c1489f4c9..a8d6d74f76 100644 --- a/reframe/frontend/cli.py +++ b/reframe/frontend/cli.py @@ -1037,6 +1037,14 @@ def _case_failed(t): if options.distribute: node_map = getallnodes(options.distribute, parsed_job_options) + # In the distribute option we need to remove the cli options that + # begin with '--nodelist' and '-w', so that we don't include them + # in the job scripts + parsed_job_options = list( + filter(lambda x: (not x.startswith('-w') and + not x.startswith('--nodelist')), + parsed_job_options) + ) testcases = distribute_tests(testcases, node_map) testcases_all = testcases From 6e56af7e065bc694ede5fb95e43a44f8195c2a50 Mon Sep 17 00:00:00 2001 From: Eirini Koutsaniti Date: Wed, 13 Apr 2022 17:05:07 +0200 Subject: [PATCH 40/43] Small fix --- reframe/core/schedulers/slurm.py | 4 ++-- reframe/frontend/distribute_tests.py | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/reframe/core/schedulers/slurm.py b/reframe/core/schedulers/slurm.py index a4ad91fede..e12da92d40 100644 --- a/reframe/core/schedulers/slurm.py +++ b/reframe/core/schedulers/slurm.py @@ -306,8 +306,8 @@ def filternodes(self, job, nodes): # create a mutable list out of the immutable SequenceView that # sched_access is options = job.sched_access + job.options + job.cli_options - options = list(itertools.chain.from_iterable(shlex.split(m) - for m in options)) + options = list(itertools.chain.from_iterable(shlex.split(opt) + for opt in options)) option_parser = ArgumentParser() option_parser.add_argument('--reservation') option_parser.add_argument('-p', '--partition') diff --git a/reframe/frontend/distribute_tests.py b/reframe/frontend/distribute_tests.py index aa3766e856..7ef555b108 100644 --- a/reframe/frontend/distribute_tests.py +++ b/reframe/frontend/distribute_tests.py @@ -15,7 +15,7 @@ from reframe.frontend.executors import generate_testcases -def getallnodes(state='all', jobs_cli_options=[]): +def getallnodes(state='all', jobs_cli_options=None): rt = runtime.runtime() nodes = {} for part in rt.system.partitions: From 1eef8b5d65b87a5d5a10923a1036e03b987b206a Mon Sep 17 00:00:00 2001 From: Vasileios Karakasis Date: Wed, 13 Apr 2022 17:31:28 +0200 Subject: [PATCH 41/43] Update documentation --- docs/manpage.rst | 28 ++++++++++++++++++---------- 1 file changed, 18 insertions(+), 10 deletions(-) diff --git a/docs/manpage.rst b/docs/manpage.rst index 2e3d13d078..531b73995c 100644 --- a/docs/manpage.rst +++ b/docs/manpage.rst @@ -384,20 +384,28 @@ Options controlling ReFrame execution Distribute the selected tests on all the nodes in state ``NODESTATE`` in their respective valid partitions. ReFrame will parameterize and run the tests on the selected nodes. - In order to do that, it will dynamically create new tests that will inherit all the attributes of the original tests and contain one more parameter, ``$nid``, with the node that it will run on. - The new ReFrame classes are named ``{basetest}_{partition}``. + Effectively, it will dynamically create new tests that inherit from the original tests and add a new parameter named ``$nid`` which contains the list of nodes that the test must run on. + The new tests are named with the following pattern ``{orig_test_basename}_{partition_fullname}``. - Currently this will work correctly only for one-node tests in local or Slurm partitions, and it will take into account the cli jobs options that are passed by the user. - This feature will not work with dependencies, since the names of the tests will be changed, but it will work with fixtures. + When determining the list of nodes to distribute the selected tests, ReFrame will take into account any job options passed through the :option:`-J` option. - You can decide the state of the nodes that will be considered: + You can optionally specify the state of the nodes to consider when distributing the test through the ``NODESTATE`` argument: - - ``all``: Tests will be parameterized over all the nodes of their partitions. - - ``NODESTATE``: Tests will run on all the nodes in state ``NODESTATE``, for example ``idle``. - The states of the nodes will be determined once, before beginning the - execution of the tests so it might be different in the time of the submission of the tests. + - ``all``: Tests will run on all the nodes of their respective valid partitions regardless of the nodes' state. + - ``idle``: Tests will run on all *idle* nodes of their respective valid partitions. + - ``NODESTATE``: Tests will run on all the nodes in state ``NODESTATE`` of their respective valid partitions. + If ``NODESTATE`` is not specified, ``idle`` will be assumed. + + The state of the nodes will be determined once, before beginning the + execution of the tests, so it might be different at the time the tests are actually submitted. + + .. note:: + Currently, only single-node jobs can be distributed and only local or the Slurm-based backends support this feature. + + .. note:: + Distributing tests with dependencies is not supported. + However, you can distribute tests that use fixtures. - If ``NODESTATE`` is not passed it will take ``idle`` as default. .. versionadded:: 3.11.0 From cf562310ba2f8bdfe5c7270f7e64826b1213e10d Mon Sep 17 00:00:00 2001 From: Vasileios Karakasis Date: Wed, 13 Apr 2022 17:45:37 +0200 Subject: [PATCH 42/43] Code style fixes --- reframe/core/schedulers/slurm.py | 3 +++ reframe/frontend/cli.py | 19 +++++++++++-------- 2 files changed, 14 insertions(+), 8 deletions(-) diff --git a/reframe/core/schedulers/slurm.py b/reframe/core/schedulers/slurm.py index e12da92d40..a4b5186199 100644 --- a/reframe/core/schedulers/slurm.py +++ b/reframe/core/schedulers/slurm.py @@ -306,6 +306,9 @@ def filternodes(self, job, nodes): # create a mutable list out of the immutable SequenceView that # sched_access is options = job.sched_access + job.options + job.cli_options + + # Properly split lexically all the arguments in the options list so as + # to treat correctly entries such as '--option foo'. options = list(itertools.chain.from_iterable(shlex.split(opt) for opt in options)) option_parser = ArgumentParser() diff --git a/reframe/frontend/cli.py b/reframe/frontend/cli.py index a8d6d74f76..6664d21fa8 100644 --- a/reframe/frontend/cli.py +++ b/reframe/frontend/cli.py @@ -1037,14 +1037,17 @@ def _case_failed(t): if options.distribute: node_map = getallnodes(options.distribute, parsed_job_options) - # In the distribute option we need to remove the cli options that - # begin with '--nodelist' and '-w', so that we don't include them - # in the job scripts - parsed_job_options = list( - filter(lambda x: (not x.startswith('-w') and - not x.startswith('--nodelist')), - parsed_job_options) - ) + + # Remove the job options that begin with '--nodelist' and '-w', so + # that they do not override those set from the distribute feature. + # + # NOTE: This is Slurm-specific. When support of distributing tests + # is added to other scheduler backends, this needs to be updated, + # too. + parsed_job_options = [ + for x in parsed_job_options + if (not x.startswith('-w') and not x.startswith('--nodelist')) + ] testcases = distribute_tests(testcases, node_map) testcases_all = testcases From 631be2e5c86da732bf53d9f415799ea9ea3340fe Mon Sep 17 00:00:00 2001 From: Vasileios Karakasis Date: Wed, 13 Apr 2022 17:57:12 +0200 Subject: [PATCH 43/43] Rename distribute_tests.py to distribute.py --- reframe/frontend/cli.py | 8 ++++---- reframe/frontend/{distribute_tests.py => distribute.py} | 0 unittests/test_distribute_tests.py | 2 +- 3 files changed, 5 insertions(+), 5 deletions(-) rename reframe/frontend/{distribute_tests.py => distribute.py} (100%) diff --git a/reframe/frontend/cli.py b/reframe/frontend/cli.py index 6664d21fa8..1a721b885d 100644 --- a/reframe/frontend/cli.py +++ b/reframe/frontend/cli.py @@ -30,12 +30,12 @@ import reframe.utility.typecheck as typ -from reframe.frontend.distribute_tests import distribute_tests, getallnodes -from reframe.frontend.printer import PrettyPrinter -from reframe.frontend.loader import RegressionCheckLoader +from reframe.frontend.distribute import distribute_tests, getallnodes from reframe.frontend.executors.policies import (SerialExecutionPolicy, AsynchronousExecutionPolicy) from reframe.frontend.executors import Runner, generate_testcases +from reframe.frontend.loader import RegressionCheckLoader +from reframe.frontend.printer import PrettyPrinter def format_env(envvars): @@ -1045,7 +1045,7 @@ def _case_failed(t): # is added to other scheduler backends, this needs to be updated, # too. parsed_job_options = [ - for x in parsed_job_options + x for x in parsed_job_options if (not x.startswith('-w') and not x.startswith('--nodelist')) ] testcases = distribute_tests(testcases, node_map) diff --git a/reframe/frontend/distribute_tests.py b/reframe/frontend/distribute.py similarity index 100% rename from reframe/frontend/distribute_tests.py rename to reframe/frontend/distribute.py diff --git a/unittests/test_distribute_tests.py b/unittests/test_distribute_tests.py index 7705129b85..7da0e2dd1e 100644 --- a/unittests/test_distribute_tests.py +++ b/unittests/test_distribute_tests.py @@ -7,7 +7,7 @@ import reframe.frontend.executors as executors import reframe.frontend.filters as filters -from reframe.frontend.distribute_tests import distribute_tests +from reframe.frontend.distribute import distribute_tests from reframe.frontend.loader import RegressionCheckLoader