Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[feat] Add new command line option to distribute single node jobs on multiple cluster nodes #2458

Merged
merged 53 commits into from
Apr 13, 2022
Merged
Show file tree
Hide file tree
Changes from 49 commits
Commits
Show all changes
53 commits
Select commit Hold shift + click to select a range
5f06bfa
Add pin_nodes attribute to Job
ekouts Feb 4, 2022
f943d19
Merge branch 'master' of https://github.com/eth-cscs/reframe into fea…
ekouts Feb 16, 2022
06cd315
Remove forgotten print
ekouts Feb 17, 2022
ea2891b
Add special parameter with all the nodes
ekouts Mar 2, 2022
ba05735
Merge branch 'master' of https://github.com/eth-cscs/reframe into fea…
ekouts Mar 2, 2022
82b9963
Fix small typo
ekouts Mar 2, 2022
a377f57
Filter nodes by partition
ekouts Mar 3, 2022
eac9d18
Add --flex-alloc-singlenode
ekouts Mar 3, 2022
a121d5a
Check all node parameters before making testcase
ekouts Mar 3, 2022
b4651ad
Update --flex-alloc-singlenode behaviour
ekouts Mar 4, 2022
6bdd243
Fix formatting issues
ekouts Mar 4, 2022
0900c18
Take into account cli job options in NodeTestParam
ekouts Mar 4, 2022
ab7749f
Merge branch 'master' of https://github.com/eth-cscs/reframe into fea…
ekouts Mar 18, 2022
405fc3c
Merge branch 'refactor/fixture-instantiation' of https://github.com/v…
ekouts Mar 18, 2022
bacd9ca
Merge branch 'master' of https://github.com/eth-cscs/reframe into fea…
ekouts Mar 20, 2022
9e26477
Update flex-alloc-singlenode implementation
ekouts Mar 20, 2022
4e3f1ad
Set the prefix in dynamically created tests
ekouts Mar 20, 2022
d95b5ac
Replace testcases_all in flex_alloc_singlenode
ekouts Mar 29, 2022
cc5705b
Add basic unittests
ekouts Mar 29, 2022
7fd49a5
Add more unittest with fixtures
ekouts Apr 7, 2022
46c2096
Address PR comments
ekouts Apr 7, 2022
78b13e9
Remove _rfm_dynamic_test_prefix attribute
ekouts Apr 7, 2022
9cd3648
Rename to dummy_job
ekouts Apr 7, 2022
5e2c4f5
Address PR comments
ekouts Apr 7, 2022
a5cfbb9
Remove unnecessary check
ekouts Apr 7, 2022
f51df33
Address PR comments
ekouts Apr 7, 2022
5cbff99
Merge branch 'master' of https://github.com/eth-cscs/reframe into fea…
ekouts Apr 7, 2022
f0c38e2
Fix unittest after master merging
ekouts Apr 7, 2022
5861e68
Update documentation for the distribute cli option
ekouts Apr 8, 2022
7401bae
Set versionadded for pin_nodes attribute
ekouts Apr 8, 2022
b99ab6d
Merge branch 'feat/extend-validx-syntax' of https://github.com/vkarak…
ekouts Apr 8, 2022
62c65c5
Remove skip_system_check and skip_prgenv_check from generate_testcases
ekouts Apr 8, 2022
ceb6e1b
Fix pep8 issues
ekouts Apr 8, 2022
f188029
Merge branch 'master' of https://github.com/eth-cscs/reframe into fea…
ekouts Apr 8, 2022
4ac94b5
Address PR comments
ekouts Apr 12, 2022
4aeba8e
Move distribute_tests and getallnodes to a separate module
ekouts Apr 12, 2022
91f699a
Remove unused imports
ekouts Apr 12, 2022
4810a5e
Remove empty line
ekouts Apr 12, 2022
994354d
Remove incorrect rerun message
ekouts Apr 12, 2022
a15a0b0
Address PR comments
ekouts Apr 12, 2022
a0ed9b3
Add formatting function for nodelist
ekouts Apr 12, 2022
9e69b34
Add default to distribute option
ekouts Apr 12, 2022
7ef2a45
Make test_distribute_testcases stricter
ekouts Apr 13, 2022
7c9e5fa
Remove _D_ from class name
ekouts Apr 13, 2022
212cd8b
Address comments
ekouts Apr 13, 2022
19b6aac
Split long line
ekouts Apr 13, 2022
9f6d4ed
Merge branch 'master' of https://github.com/eth-cscs/reframe into fea…
ekouts Apr 13, 2022
8f7c399
Fix bug in cli options
ekouts Apr 13, 2022
6e56af7
Small fix
ekouts Apr 13, 2022
1eef8b5
Update documentation
Apr 13, 2022
dacd357
Merge branch 'feat/alt_flex_alloc' of github.com:ekouts/reframe into …
Apr 13, 2022
cf56231
Code style fixes
Apr 13, 2022
631be2e
Rename distribute_tests.py to distribute.py
Apr 13, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 22 additions & 0 deletions docs/manpage.rst
Original file line number Diff line number Diff line change
Expand Up @@ -379,6 +379,28 @@ Options controlling ReFrame execution

.. versionadded:: 3.2

.. option:: --distribute[=NODESTATE]

Distribute the selected tests on all the nodes in state ``NODESTATE`` in their respective valid partitions.

ReFrame will parameterize and run the tests on the selected nodes.
In order to do that, it will dynamically create new tests that will inherit all the attributes of the original tests and contain one more parameter, ``$nid``, with the node that it will run on.
The new ReFrame classes are named ``{basetest}_{partition}``.

Currently this will work correctly only for one-node tests in local or Slurm partitions, and it will take into account the cli jobs options that are passed by the user.
This feature will not work with dependencies, since the names of the tests will be changed, but it will work with fixtures.

You can decide the state of the nodes that will be considered:

- ``all``: Tests will be parameterized over all the nodes of their partitions.
- ``NODESTATE``: Tests will run on all the nodes in state ``NODESTATE``, for example ``idle``.
The states of the nodes will be determined once, before beginning the
execution of the tests so it might be different in the time of the submission of the tests.

If ``NODESTATE`` is not passed it will take ``idle`` as default.

.. versionadded:: 3.11.0

.. option:: --exec-policy=POLICY

The execution policy to be used for running tests.
Expand Down
4 changes: 2 additions & 2 deletions reframe/core/meta.py
Original file line number Diff line number Diff line change
Expand Up @@ -705,7 +705,7 @@ class MyTest(rfm.RegressionTest):
...

# Get the raw info for variant 0
MyTest.get_variant_info(0, recursive=True)
MyTest.get_variant_info(0, recurse=True)
# {
# 'params': {'p1': 'a'},
# 'fixtures': {
Expand Down Expand Up @@ -846,7 +846,7 @@ def make_test(name, bases, body, methods=None, **kwargs):
class HelloTest(rfm.RunOnlyRegressionTest):
valid_systems = ['*']
valid_prog_environs = ['*']
executable = 'echo',
executable = 'echo'
sanity_patterns: sn.assert_true(1)

hello_cls = HelloTest
Expand Down
12 changes: 12 additions & 0 deletions reframe/core/schedulers/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -265,6 +265,18 @@ class Job(jsonext.JSONSerializable, metaclass=JobMeta):
#: :type: :class:`reframe.core.launchers.JobLauncher`
launcher = variable(JobLauncher)

#: Pin the jobs on the given nodes.
#:
#: The list of nodes will be transformed to a suitable string and be
#: passed to the scheduler's options. Currently it will have an effect
#: only for the Slurm scheduler.
#:
#: :type: :class:`List[str]`
#: :default: ``[]``
#:
#: .. versionadded:: 3.11.0
pin_nodes = variable(typ.List[str], value=[])

# The sched_* arguments are exposed also to the frontend
def __init__(self,
name,
Expand Down
4 changes: 4 additions & 0 deletions reframe/core/schedulers/local.py
Original file line number Diff line number Diff line change
Expand Up @@ -202,5 +202,9 @@ class _LocalNode(sched.Node):
def __init__(self, name):
self._name = name

@property
def name(self):
return self._name

def in_state(self, state):
return state.casefold() == 'idle'
13 changes: 12 additions & 1 deletion reframe/core/schedulers/slurm.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import glob
import itertools
import re
import shlex
import time
from argparse import ArgumentParser
from contextlib import suppress
Expand All @@ -19,7 +20,7 @@
JobBlockedError,
JobError,
JobSchedulerError)
from reframe.utility import seconds_to_hms
from reframe.utility import nodelist_abbrev, seconds_to_hms


def slurm_state_completed(state):
Expand Down Expand Up @@ -192,6 +193,14 @@ def emit_preamble(self, job):
else:
hint = 'multithread' if job.use_smt else 'nomultithread'

if job.pin_nodes:
preamble.append(
self._format_option(
nodelist_abbrev(job.pin_nodes),
'--nodelist={0}'
)
)

for opt in job.sched_access:
if not opt.strip().startswith(('-C', '--constraint')):
preamble.append('%s %s' % (self._prefix, opt))
Expand Down Expand Up @@ -297,6 +306,8 @@ def filternodes(self, job, nodes):
# create a mutable list out of the immutable SequenceView that
# sched_access is
options = job.sched_access + job.options + job.cli_options
options = list(itertools.chain.from_iterable(shlex.split(opt)
for opt in options))
option_parser = ArgumentParser()
option_parser.add_argument('--reservation')
option_parser.add_argument('-p', '--partition')
Expand Down
49 changes: 36 additions & 13 deletions reframe/frontend/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import reframe.utility.typecheck as typ


from reframe.frontend.distribute_tests import distribute_tests, getallnodes
from reframe.frontend.printer import PrettyPrinter
from reframe.frontend.loader import RegressionCheckLoader
from reframe.frontend.executors.policies import (SerialExecutionPolicy,
Expand Down Expand Up @@ -370,6 +371,12 @@ def main():
'--disable-hook', action='append', metavar='NAME', dest='hooks',
default=[], help='Disable a pipeline hook for this run'
)
run_options.add_argument(
'--distribute', action='store', metavar='{all|STATE}',
nargs='?', const='idle',
help=('Distribute the selected single-node jobs on every node that'
'is in STATE (default: "idle"')
)
run_options.add_argument(
'--exec-policy', metavar='POLICY', action='store',
choices=['async', 'serial'], default='async',
Expand Down Expand Up @@ -933,6 +940,19 @@ def print_infoline(param, value):
print_infoline('output directory', repr(session_info['prefix_output']))
printer.info('')
try:
# Need to parse the cli options before loading the tests
parsed_job_options = []
for opt in options.job_options:
opt_split = opt.split('=', maxsplit=1)
optstr = opt_split[0]
valstr = opt_split[1] if len(opt_split) > 1 else ''
if opt.startswith('-') or opt.startswith('#'):
parsed_job_options.append(opt)
elif len(optstr) == 1:
parsed_job_options.append(f'-{optstr} {valstr}')
else:
parsed_job_options.append(f'--{optstr} {valstr}')

# Locate and load checks; `force=True` is not needed for normal
# invocations from the command line and has practically no effect, but
# it is needed to better emulate the behavior of running reframe's CLI
Expand Down Expand Up @@ -1015,6 +1035,19 @@ def _case_failed(t):
f'{len(testcases)} remaining'
)

if options.distribute:
node_map = getallnodes(options.distribute, parsed_job_options)
# In the distribute option we need to remove the cli options that
# begin with '--nodelist' and '-w', so that we don't include them
# in the job scripts
parsed_job_options = list(
filter(lambda x: (not x.startswith('-w') and
not x.startswith('--nodelist')),
parsed_job_options)
)
testcases = distribute_tests(testcases, node_map)
testcases_all = testcases

# Prepare for running
printer.debug('Building and validating the full test DAG')
testgraph, skipped_cases = dependencies.build_deps(testcases_all)
Expand Down Expand Up @@ -1194,18 +1227,6 @@ def module_unuse(*paths):
sched_flex_alloc_nodes = options.flex_alloc_nodes

exec_policy.sched_flex_alloc_nodes = sched_flex_alloc_nodes
parsed_job_options = []
for opt in options.job_options:
opt_split = opt.split('=', maxsplit=1)
optstr = opt_split[0]
valstr = opt_split[1] if len(opt_split) > 1 else ''
if opt.startswith('-') or opt.startswith('#'):
parsed_job_options.append(opt)
elif len(optstr) == 1:
parsed_job_options.append(f'-{optstr} {valstr}')
else:
parsed_job_options.append(f'--{optstr} {valstr}')

exec_policy.sched_options = parsed_job_options
if options.maxfail < 0:
raise errors.ConfigError(
Expand Down Expand Up @@ -1236,7 +1257,9 @@ def module_unuse(*paths):
success = True
if runner.stats.failed():
success = False
runner.stats.print_failure_report(printer)
runner.stats.print_failure_report(
printer, not options.distribute
)
if options.failure_stats:
runner.stats.print_failure_stats(printer)

Expand Down
126 changes: 126 additions & 0 deletions reframe/frontend/distribute_tests.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,126 @@
# Copyright 2016-2022 Swiss National Supercomputing Centre (CSCS/ETH Zurich)
# ReFrame Project Developers. See the top-level LICENSE file for details.
#
# SPDX-License-Identifier: BSD-3-Clause


import reframe.core.builtins as builtins
import reframe.core.runtime as runtime
import reframe.utility as util

from reframe.core.decorators import TestRegistry
from reframe.core.logging import getlogger
from reframe.core.meta import make_test
from reframe.core.schedulers import Job
from reframe.frontend.executors import generate_testcases


def getallnodes(state='all', jobs_cli_options=None):
rt = runtime.runtime()
nodes = {}
for part in rt.system.partitions:
# This job will not be submitted, it's used only to filter
# the nodes based on the partition configuration
dummy_job = Job.create(part.scheduler,
part.launcher_type(),
name='placeholder-job',
sched_access=part.access,
sched_options=jobs_cli_options)

available_nodes = part.scheduler.allnodes()
available_nodes = part.scheduler.filternodes(dummy_job,
available_nodes)
getlogger().debug(
f'Total available nodes for {part.name}: {len(available_nodes)}'
)

if state.casefold() != 'all':
available_nodes = {n for n in available_nodes
if n.in_state(state)}
getlogger().debug(
f'[F] Selecting nodes in state {state!r}: '
f'available nodes now: {len(available_nodes)}'
)

nodes[part.fullname] = [n.name for n in available_nodes]

return nodes


def _rfm_pin_run_nodes(obj):
nodelist = getattr(obj, '$nid')
if not obj.local:
obj.job.pin_nodes = nodelist


def _rfm_pin_build_nodes(obj):
pin_nodes = getattr(obj, '$nid')
if not obj.local and not obj.build_locally:
obj.build_job.pin_nodes = pin_nodes


def make_valid_systems_hook(systems):
'''Returns a function to be used as a hook that sets the
valid systems.

Since valid_systems change for each generated test, we need to
generate different post-init hooks for each one of them.
'''
def _rfm_set_valid_systems(obj):
obj.valid_systems = systems

return _rfm_set_valid_systems


def distribute_tests(testcases, node_map):
'''Returns new testcases that will be parameterized to run in node of
their partitions based on the nodemap
'''
tmp_registry = TestRegistry()
new_checks = []
# We don't want to register the same check for every environment
# per partition
check_part_combs = set()
for tc in testcases:
check, partition, _ = tc
candidate_comb = (check.unique_name, partition.fullname)
if check.is_fixture() or candidate_comb in check_part_combs:
continue

check_part_combs.add(candidate_comb)
cls = type(check)
variant_info = cls.get_variant_info(
check.variant_num, recurse=True
)
nc = make_test(
f'{cls.__name__}_{partition.fullname.replace(":", "_")}',
(cls,),
{
'valid_systems': [partition.fullname],
'$nid': builtins.parameter(
[[n] for n in node_map[partition.fullname]],
fmt=util.nodelist_abbrev
)
},
methods=[
builtins.run_before('run')(_rfm_pin_run_nodes),
builtins.run_before('compile')(_rfm_pin_build_nodes),
# We re-set the valid system in a hook to make sure that it
# will not be overwriten by a parent post-init hook
builtins.run_after('init')(
make_valid_systems_hook([partition.fullname])
),
]
)
# We have to set the prefix manually
nc._rfm_custom_prefix = check.prefix

for i in range(nc.num_variants):
# Check if this variant should be instantiated
vinfo = nc.get_variant_info(i, recurse=True)
vinfo['params'].pop('$nid')
if vinfo == variant_info:
tmp_registry.add(nc, variant_num=i)

new_checks = tmp_registry.instantiate_all()
return generate_testcases(new_checks)
23 changes: 13 additions & 10 deletions reframe/frontend/statistics.py
Original file line number Diff line number Diff line change
Expand Up @@ -203,7 +203,7 @@ def json(self, force=False):

return self._run_data

def print_failure_report(self, printer):
def print_failure_report(self, printer, rerun_info=True):
line_width = 78
printer.info(line_width * '=')
printer.info('SUMMARY OF FAILURES')
Expand All @@ -227,23 +227,26 @@ def print_failure_report(self, printer):
f" * Node list: {util.nodelist_abbrev(r['nodelist'])}"
)
job_type = 'local' if r['scheduler'] == 'local' else 'batch job'
jobid = r['jobid']
printer.info(f" * Job type: {job_type} (id={r['jobid']})")
printer.info(f" * Dependencies (conceptual): "
f"{r['dependencies_conceptual']}")
printer.info(f" * Dependencies (actual): "
f"{r['dependencies_actual']}")
printer.info(f" * Maintainers: {r['maintainers']}")
printer.info(f" * Failing phase: {r['fail_phase']}")
if rt.runtime().get_option('general/0/compact_test_names'):
cls = r['display_name'].split(' ')[0]
variant = r['unique_name'].replace(cls, '').replace('_', '@')
nameoptarg = cls + variant
else:
nameoptarg = r['unique_name']
if rerun_info:
if rt.runtime().get_option('general/0/compact_test_names'):
cls = r['display_name'].split(' ')[0]
variant = r['unique_name'].replace(cls, '')
variant = variant.replace('_', '@')
nameoptarg = cls + variant
else:
nameoptarg = r['unique_name']

printer.info(f" * Rerun with '-n {nameoptarg}"
f" -p {r['environment']} --system "
f"{r['system']} -r'")

printer.info(f" * Rerun with '-n {nameoptarg}"
f" -p {r['environment']} --system {r['system']} -r'")
printer.info(f" * Reason: {r['fail_reason']}")

tb = ''.join(traceback.format_exception(*r['fail_info'].values()))
Expand Down
Loading