diff --git a/docs/manpage.rst b/docs/manpage.rst index e653954607..531b73995c 100644 --- a/docs/manpage.rst +++ b/docs/manpage.rst @@ -379,6 +379,36 @@ Options controlling ReFrame execution .. versionadded:: 3.2 +.. option:: --distribute[=NODESTATE] + + Distribute the selected tests on all the nodes in state ``NODESTATE`` in their respective valid partitions. + + ReFrame will parameterize and run the tests on the selected nodes. + Effectively, it will dynamically create new tests that inherit from the original tests and add a new parameter named ``$nid`` which contains the list of nodes that the test must run on. + The new tests are named with the following pattern ``{orig_test_basename}_{partition_fullname}``. + + When determining the list of nodes to distribute the selected tests, ReFrame will take into account any job options passed through the :option:`-J` option. + + You can optionally specify the state of the nodes to consider when distributing the test through the ``NODESTATE`` argument: + + - ``all``: Tests will run on all the nodes of their respective valid partitions regardless of the nodes' state. + - ``idle``: Tests will run on all *idle* nodes of their respective valid partitions. + - ``NODESTATE``: Tests will run on all the nodes in state ``NODESTATE`` of their respective valid partitions. + If ``NODESTATE`` is not specified, ``idle`` will be assumed. + + The state of the nodes will be determined once, before beginning the + execution of the tests, so it might be different at the time the tests are actually submitted. + + .. note:: + Currently, only single-node jobs can be distributed and only local or the Slurm-based backends support this feature. + + .. note:: + Distributing tests with dependencies is not supported. + However, you can distribute tests that use fixtures. + + + .. versionadded:: 3.11.0 + .. option:: --exec-policy=POLICY The execution policy to be used for running tests. diff --git a/reframe/core/meta.py b/reframe/core/meta.py index cb23d60d97..a28162a0c8 100644 --- a/reframe/core/meta.py +++ b/reframe/core/meta.py @@ -705,7 +705,7 @@ class MyTest(rfm.RegressionTest): ... # Get the raw info for variant 0 - MyTest.get_variant_info(0, recursive=True) + MyTest.get_variant_info(0, recurse=True) # { # 'params': {'p1': 'a'}, # 'fixtures': { @@ -846,7 +846,7 @@ def make_test(name, bases, body, methods=None, **kwargs): class HelloTest(rfm.RunOnlyRegressionTest): valid_systems = ['*'] valid_prog_environs = ['*'] - executable = 'echo', + executable = 'echo' sanity_patterns: sn.assert_true(1) hello_cls = HelloTest diff --git a/reframe/core/schedulers/__init__.py b/reframe/core/schedulers/__init__.py index 3b58cc384d..ccc629fdc6 100644 --- a/reframe/core/schedulers/__init__.py +++ b/reframe/core/schedulers/__init__.py @@ -265,6 +265,18 @@ class Job(jsonext.JSONSerializable, metaclass=JobMeta): #: :type: :class:`reframe.core.launchers.JobLauncher` launcher = variable(JobLauncher) + #: Pin the jobs on the given nodes. + #: + #: The list of nodes will be transformed to a suitable string and be + #: passed to the scheduler's options. Currently it will have an effect + #: only for the Slurm scheduler. + #: + #: :type: :class:`List[str]` + #: :default: ``[]`` + #: + #: .. versionadded:: 3.11.0 + pin_nodes = variable(typ.List[str], value=[]) + # The sched_* arguments are exposed also to the frontend def __init__(self, name, diff --git a/reframe/core/schedulers/local.py b/reframe/core/schedulers/local.py index d7bda85605..ac4ddde0c3 100644 --- a/reframe/core/schedulers/local.py +++ b/reframe/core/schedulers/local.py @@ -202,5 +202,9 @@ class _LocalNode(sched.Node): def __init__(self, name): self._name = name + @property + def name(self): + return self._name + def in_state(self, state): return state.casefold() == 'idle' diff --git a/reframe/core/schedulers/slurm.py b/reframe/core/schedulers/slurm.py index 3ac3deac7f..a4b5186199 100644 --- a/reframe/core/schedulers/slurm.py +++ b/reframe/core/schedulers/slurm.py @@ -7,6 +7,7 @@ import glob import itertools import re +import shlex import time from argparse import ArgumentParser from contextlib import suppress @@ -19,7 +20,7 @@ JobBlockedError, JobError, JobSchedulerError) -from reframe.utility import seconds_to_hms +from reframe.utility import nodelist_abbrev, seconds_to_hms def slurm_state_completed(state): @@ -192,6 +193,14 @@ def emit_preamble(self, job): else: hint = 'multithread' if job.use_smt else 'nomultithread' + if job.pin_nodes: + preamble.append( + self._format_option( + nodelist_abbrev(job.pin_nodes), + '--nodelist={0}' + ) + ) + for opt in job.sched_access: if not opt.strip().startswith(('-C', '--constraint')): preamble.append('%s %s' % (self._prefix, opt)) @@ -297,6 +306,11 @@ def filternodes(self, job, nodes): # create a mutable list out of the immutable SequenceView that # sched_access is options = job.sched_access + job.options + job.cli_options + + # Properly split lexically all the arguments in the options list so as + # to treat correctly entries such as '--option foo'. + options = list(itertools.chain.from_iterable(shlex.split(opt) + for opt in options)) option_parser = ArgumentParser() option_parser.add_argument('--reservation') option_parser.add_argument('-p', '--partition') diff --git a/reframe/frontend/cli.py b/reframe/frontend/cli.py index 1cc245edee..1a721b885d 100644 --- a/reframe/frontend/cli.py +++ b/reframe/frontend/cli.py @@ -30,11 +30,12 @@ import reframe.utility.typecheck as typ -from reframe.frontend.printer import PrettyPrinter -from reframe.frontend.loader import RegressionCheckLoader +from reframe.frontend.distribute import distribute_tests, getallnodes from reframe.frontend.executors.policies import (SerialExecutionPolicy, AsynchronousExecutionPolicy) from reframe.frontend.executors import Runner, generate_testcases +from reframe.frontend.loader import RegressionCheckLoader +from reframe.frontend.printer import PrettyPrinter def format_env(envvars): @@ -370,6 +371,12 @@ def main(): '--disable-hook', action='append', metavar='NAME', dest='hooks', default=[], help='Disable a pipeline hook for this run' ) + run_options.add_argument( + '--distribute', action='store', metavar='{all|STATE}', + nargs='?', const='idle', + help=('Distribute the selected single-node jobs on every node that' + 'is in STATE (default: "idle"') + ) run_options.add_argument( '--exec-policy', metavar='POLICY', action='store', choices=['async', 'serial'], default='async', @@ -933,6 +940,19 @@ def print_infoline(param, value): print_infoline('output directory', repr(session_info['prefix_output'])) printer.info('') try: + # Need to parse the cli options before loading the tests + parsed_job_options = [] + for opt in options.job_options: + opt_split = opt.split('=', maxsplit=1) + optstr = opt_split[0] + valstr = opt_split[1] if len(opt_split) > 1 else '' + if opt.startswith('-') or opt.startswith('#'): + parsed_job_options.append(opt) + elif len(optstr) == 1: + parsed_job_options.append(f'-{optstr} {valstr}') + else: + parsed_job_options.append(f'--{optstr} {valstr}') + # Locate and load checks; `force=True` is not needed for normal # invocations from the command line and has practically no effect, but # it is needed to better emulate the behavior of running reframe's CLI @@ -1015,6 +1035,22 @@ def _case_failed(t): f'{len(testcases)} remaining' ) + if options.distribute: + node_map = getallnodes(options.distribute, parsed_job_options) + + # Remove the job options that begin with '--nodelist' and '-w', so + # that they do not override those set from the distribute feature. + # + # NOTE: This is Slurm-specific. When support of distributing tests + # is added to other scheduler backends, this needs to be updated, + # too. + parsed_job_options = [ + x for x in parsed_job_options + if (not x.startswith('-w') and not x.startswith('--nodelist')) + ] + testcases = distribute_tests(testcases, node_map) + testcases_all = testcases + # Prepare for running printer.debug('Building and validating the full test DAG') testgraph, skipped_cases = dependencies.build_deps(testcases_all) @@ -1194,18 +1230,6 @@ def module_unuse(*paths): sched_flex_alloc_nodes = options.flex_alloc_nodes exec_policy.sched_flex_alloc_nodes = sched_flex_alloc_nodes - parsed_job_options = [] - for opt in options.job_options: - opt_split = opt.split('=', maxsplit=1) - optstr = opt_split[0] - valstr = opt_split[1] if len(opt_split) > 1 else '' - if opt.startswith('-') or opt.startswith('#'): - parsed_job_options.append(opt) - elif len(optstr) == 1: - parsed_job_options.append(f'-{optstr} {valstr}') - else: - parsed_job_options.append(f'--{optstr} {valstr}') - exec_policy.sched_options = parsed_job_options if options.maxfail < 0: raise errors.ConfigError( @@ -1236,7 +1260,9 @@ def module_unuse(*paths): success = True if runner.stats.failed(): success = False - runner.stats.print_failure_report(printer) + runner.stats.print_failure_report( + printer, not options.distribute + ) if options.failure_stats: runner.stats.print_failure_stats(printer) diff --git a/reframe/frontend/distribute.py b/reframe/frontend/distribute.py new file mode 100644 index 0000000000..7ef555b108 --- /dev/null +++ b/reframe/frontend/distribute.py @@ -0,0 +1,126 @@ +# Copyright 2016-2022 Swiss National Supercomputing Centre (CSCS/ETH Zurich) +# ReFrame Project Developers. See the top-level LICENSE file for details. +# +# SPDX-License-Identifier: BSD-3-Clause + + +import reframe.core.builtins as builtins +import reframe.core.runtime as runtime +import reframe.utility as util + +from reframe.core.decorators import TestRegistry +from reframe.core.logging import getlogger +from reframe.core.meta import make_test +from reframe.core.schedulers import Job +from reframe.frontend.executors import generate_testcases + + +def getallnodes(state='all', jobs_cli_options=None): + rt = runtime.runtime() + nodes = {} + for part in rt.system.partitions: + # This job will not be submitted, it's used only to filter + # the nodes based on the partition configuration + dummy_job = Job.create(part.scheduler, + part.launcher_type(), + name='placeholder-job', + sched_access=part.access, + sched_options=jobs_cli_options) + + available_nodes = part.scheduler.allnodes() + available_nodes = part.scheduler.filternodes(dummy_job, + available_nodes) + getlogger().debug( + f'Total available nodes for {part.name}: {len(available_nodes)}' + ) + + if state.casefold() != 'all': + available_nodes = {n for n in available_nodes + if n.in_state(state)} + getlogger().debug( + f'[F] Selecting nodes in state {state!r}: ' + f'available nodes now: {len(available_nodes)}' + ) + + nodes[part.fullname] = [n.name for n in available_nodes] + + return nodes + + +def _rfm_pin_run_nodes(obj): + nodelist = getattr(obj, '$nid') + if not obj.local: + obj.job.pin_nodes = nodelist + + +def _rfm_pin_build_nodes(obj): + pin_nodes = getattr(obj, '$nid') + if not obj.local and not obj.build_locally: + obj.build_job.pin_nodes = pin_nodes + + +def make_valid_systems_hook(systems): + '''Returns a function to be used as a hook that sets the + valid systems. + + Since valid_systems change for each generated test, we need to + generate different post-init hooks for each one of them. + ''' + def _rfm_set_valid_systems(obj): + obj.valid_systems = systems + + return _rfm_set_valid_systems + + +def distribute_tests(testcases, node_map): + '''Returns new testcases that will be parameterized to run in node of + their partitions based on the nodemap + ''' + tmp_registry = TestRegistry() + new_checks = [] + # We don't want to register the same check for every environment + # per partition + check_part_combs = set() + for tc in testcases: + check, partition, _ = tc + candidate_comb = (check.unique_name, partition.fullname) + if check.is_fixture() or candidate_comb in check_part_combs: + continue + + check_part_combs.add(candidate_comb) + cls = type(check) + variant_info = cls.get_variant_info( + check.variant_num, recurse=True + ) + nc = make_test( + f'{cls.__name__}_{partition.fullname.replace(":", "_")}', + (cls,), + { + 'valid_systems': [partition.fullname], + '$nid': builtins.parameter( + [[n] for n in node_map[partition.fullname]], + fmt=util.nodelist_abbrev + ) + }, + methods=[ + builtins.run_before('run')(_rfm_pin_run_nodes), + builtins.run_before('compile')(_rfm_pin_build_nodes), + # We re-set the valid system in a hook to make sure that it + # will not be overwriten by a parent post-init hook + builtins.run_after('init')( + make_valid_systems_hook([partition.fullname]) + ), + ] + ) + # We have to set the prefix manually + nc._rfm_custom_prefix = check.prefix + + for i in range(nc.num_variants): + # Check if this variant should be instantiated + vinfo = nc.get_variant_info(i, recurse=True) + vinfo['params'].pop('$nid') + if vinfo == variant_info: + tmp_registry.add(nc, variant_num=i) + + new_checks = tmp_registry.instantiate_all() + return generate_testcases(new_checks) diff --git a/reframe/frontend/statistics.py b/reframe/frontend/statistics.py index 5ac29ec0c6..8457958eb5 100644 --- a/reframe/frontend/statistics.py +++ b/reframe/frontend/statistics.py @@ -203,7 +203,7 @@ def json(self, force=False): return self._run_data - def print_failure_report(self, printer): + def print_failure_report(self, printer, rerun_info=True): line_width = 78 printer.info(line_width * '=') printer.info('SUMMARY OF FAILURES') @@ -227,7 +227,6 @@ def print_failure_report(self, printer): f" * Node list: {util.nodelist_abbrev(r['nodelist'])}" ) job_type = 'local' if r['scheduler'] == 'local' else 'batch job' - jobid = r['jobid'] printer.info(f" * Job type: {job_type} (id={r['jobid']})") printer.info(f" * Dependencies (conceptual): " f"{r['dependencies_conceptual']}") @@ -235,15 +234,19 @@ def print_failure_report(self, printer): f"{r['dependencies_actual']}") printer.info(f" * Maintainers: {r['maintainers']}") printer.info(f" * Failing phase: {r['fail_phase']}") - if rt.runtime().get_option('general/0/compact_test_names'): - cls = r['display_name'].split(' ')[0] - variant = r['unique_name'].replace(cls, '').replace('_', '@') - nameoptarg = cls + variant - else: - nameoptarg = r['unique_name'] + if rerun_info: + if rt.runtime().get_option('general/0/compact_test_names'): + cls = r['display_name'].split(' ')[0] + variant = r['unique_name'].replace(cls, '') + variant = variant.replace('_', '@') + nameoptarg = cls + variant + else: + nameoptarg = r['unique_name'] + + printer.info(f" * Rerun with '-n {nameoptarg}" + f" -p {r['environment']} --system " + f"{r['system']} -r'") - printer.info(f" * Rerun with '-n {nameoptarg}" - f" -p {r['environment']} --system {r['system']} -r'") printer.info(f" * Reason: {r['fail_reason']}") tb = ''.join(traceback.format_exception(*r['fail_info'].values())) diff --git a/unittests/resources/checks_unlisted/distribute.py b/unittests/resources/checks_unlisted/distribute.py new file mode 100644 index 0000000000..bbc83f0c94 --- /dev/null +++ b/unittests/resources/checks_unlisted/distribute.py @@ -0,0 +1,40 @@ +# Copyright 2016-2022 Swiss National Supercomputing Centre (CSCS/ETH Zurich) +# ReFrame Project Developers. See the top-level LICENSE file for details. +# +# SPDX-License-Identifier: BSD-3-Clause + +import reframe as rfm +import reframe.utility.sanity as sn + +import os + + +@rfm.simple_test +class Simple(rfm.RunOnlyRegressionTest): + valid_systems = ['*'] + valid_prog_environs = ['*'] + executable = '/bin/true' + + +class MyFixture(rfm.RunOnlyRegressionTest): + x = parameter([1, 2]) + executable = 'echo hello from fixture' + + @sanity_function + def assert_True(self): + return True + + +@rfm.simple_test +class Complex(rfm.RunOnlyRegressionTest): + valid_systems = ['*'] + valid_prog_environs = ['*'] + f = fixture(MyFixture, scope='session') + executable = '/bin/true' + + @sanity_function + def inspect_fixture(self): + return sn.assert_found( + r'hello from fixture', + os.path.join(self.f.stagedir, self.f.stdout.evaluate()) + ) diff --git a/unittests/test_cli.py b/unittests/test_cli.py index efa02468ac..ce762d2a6d 100644 --- a/unittests/test_cli.py +++ b/unittests/test_cli.py @@ -917,3 +917,30 @@ def test_fixture_resolution(run_reframe): action='run' ) assert returncode == 0 + + +def test_dynamic_tests(run_reframe, tmp_path): + returncode, stdout, _ = run_reframe( + system='sys0', + environs=[], + checkpath=['unittests/resources/checks_unlisted/distribute.py'], + action='run', + more_options=['-n', 'Complex', '--distribute=idle'] + ) + assert returncode == 0 + assert 'Ran 10/10 test case(s)' in stdout + assert 'FAILED' not in stdout + + +def test_dynamic_tests_filtering(run_reframe, tmp_path): + # Target sys1 that has compact_test_names==True + returncode, stdout, _ = run_reframe( + system='sys1', + environs=[], + checkpath=['unittests/resources/checks_unlisted/distribute.py'], + action='run', + more_options=['-n', 'Complex@1', '--distribute=idle'] + ) + assert returncode == 0 + assert 'Ran 7/7 test case(s)' in stdout + assert 'FAILED' not in stdout diff --git a/unittests/test_distribute_tests.py b/unittests/test_distribute_tests.py new file mode 100644 index 0000000000..7da0e2dd1e --- /dev/null +++ b/unittests/test_distribute_tests.py @@ -0,0 +1,64 @@ +# Copyright 2016-2022 Swiss National Supercomputing Centre (CSCS/ETH Zurich) +# ReFrame Project Developers. See the top-level LICENSE file for details. +# +# SPDX-License-Identifier: BSD-3-Clause + +import pytest + +import reframe.frontend.executors as executors +import reframe.frontend.filters as filters +from reframe.frontend.distribute import distribute_tests +from reframe.frontend.loader import RegressionCheckLoader + + +@pytest.fixture +def default_exec_ctx(make_exec_ctx_g): + yield from make_exec_ctx_g(system='sys0') + + +@pytest.fixture +def loader(): + return RegressionCheckLoader([ + 'unittests/resources/checks_unlisted/distribute.py' + ]) + + +def test_distribute_testcases(loader, default_exec_ctx): + testcases = executors.generate_testcases(loader.load_all()) + testcases = filter( + filters.have_any_name('Simple'), testcases + ) + + testcases = list(testcases) + assert len(testcases) == 4 + count = sum(map(lambda x: x.partition.fullname == 'sys0:p0', testcases)) + assert count == 2 + count = sum(map(lambda x: x.partition.fullname == 'sys0:p1', testcases)) + assert count == 2 + + node_map = { + 'sys0:p0': ['n1', 'n2'], + 'sys0:p1': ['n3'] + } + new_cases = distribute_tests(testcases, node_map) + assert len(new_cases) == 6 + count = sum(map(lambda x: x.partition.fullname == 'sys0:p0', new_cases)) + assert count == 4 + count = sum(map(lambda x: x.partition.fullname == 'sys0:p1', new_cases)) + assert count == 2 + + def sys0p0_nodes(): + for nodelist in (['n2'], ['n2'], ['n1'], ['n1']): + yield nodelist + + nodelist_iter = sys0p0_nodes() + for tc in new_cases: + nodes = getattr(tc.check, '$nid') + if tc.partition.fullname == 'sys0:p0': + assert nodes == next(nodelist_iter) + else: + assert nodes == ['n3'] + + # Make sure we have consumed all the elements from nodelist_iter + with pytest.raises(StopIteration): + next(nodelist_iter)