Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Intelligent Host Selection from Platform Group #4329

Merged
merged 27 commits into from
Oct 4, 2021
Merged
Show file tree
Hide file tree
Changes from 10 commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
312309a
created a test workflow for platform from groups
wxtim Jul 23, 2021
547a172
flake 8 fixes
wxtim Jul 29, 2021
c4bbed2
re-arrange failure of get_platforms to use exception rather than retu…
wxtim Jul 29, 2021
f5e2e31
fix existing unit test
wxtim Jul 29, 2021
6ac240d
unit tests for get_platform_from_group
wxtim Jul 29, 2021
b00f135
fixed test broken by logging changes.
wxtim Jul 30, 2021
9b2aeca
fix bug introduced to get_platform usage
wxtim Jul 30, 2021
849fe76
Merge branch 'master' into ihs.from-platform-group
hjoliver Aug 31, 2021
52a2640
Update cylc/flow/exceptions.py
wxtim Aug 31, 2021
c6df734
Update cylc/flow/task_job_mgr.py
wxtim Aug 31, 2021
499d390
reformat comments to avoid codecov thinking that they are untested code
wxtim Sep 1, 2021
5e7ce38
Merge branch 'ihs.from-platform-group' of github.com:wxtim/cylc into …
wxtim Sep 1, 2021
444cc27
Merge branch 'master' into ihs.from-platform-group
wxtim Sep 1, 2021
e21bcd0
fix type
wxtim Sep 1, 2021
e4494a4
Merge branch 'ihs.from-platform-group' of github.com:wxtim/cylc into …
wxtim Sep 1, 2021
46a2cd2
refactored exceptions in response to review; - NoPlatformsError; - No…
wxtim Sep 9, 2021
9adb720
fixed group name issue
wxtim Sep 9, 2021
5ef6254
fixed tests accidentally broken
wxtim Sep 9, 2021
e749c26
Update cylc/flow/platforms.py
wxtim Sep 22, 2021
678e382
Merge branch 'master' of https://github.com/cylc/cylc into ihs.from-p…
wxtim Sep 22, 2021
c3d71e2
added changelog entry
wxtim Sep 22, 2021
65472a8
small test fix
wxtim Sep 22, 2021
cdc7a86
fix test broken by timeout change
wxtim Sep 23, 2021
a64248b
Merge branch 'master' into ihs.from-platform-group
wxtim Sep 24, 2021
9b24c48
Fixed platform group documentation which had been copied from platfor…
wxtim Sep 27, 2021
2d67390
Fixed platform group documentation which had been copied from platfor…
wxtim Sep 27, 2021
a720456
Merge branch 'master' into ihs.from-platform-group
wxtim Sep 27, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 16 additions & 0 deletions cylc/flow/cfgspec/globalcfg.py
Original file line number Diff line number Diff line change
Expand Up @@ -818,6 +818,22 @@
with Conf('platform groups'): # noqa: SIM117 (keep same format)
with Conf('<group>'):
Conf('platforms', VDR.V_STRING_LIST)
with Conf('selection'):
Conf(
'method', VDR.V_STRING, default='random',
options=['random', 'definition order'],
wxtim marked this conversation as resolved.
Show resolved Hide resolved
desc='''
Host selection method for the platform. Available
options:
wxtim marked this conversation as resolved.
Show resolved Hide resolved

- random: Suitable for an identical pool of hosts.
wxtim marked this conversation as resolved.
Show resolved Hide resolved
- definition order: Take the first host in the list
unless that host has been unreachable. In many cases
this is likely to cause load imbalances, but might
be appropriate if your hosts were
``main, backup, failsafe``.
wxtim marked this conversation as resolved.
Show resolved Hide resolved
'''
)
wxtim marked this conversation as resolved.
Show resolved Hide resolved
# task
with Conf('task events', desc='''
Global site/user defaults for
Expand Down
7 changes: 7 additions & 0 deletions cylc/flow/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -262,6 +262,13 @@ def __init__(self, platform):
super().__init__(f'Unable to find valid host for {self.platform_n}')


class NoPlatformsError(CylcError):
"""None of the platforms of a given group were reachable."""
def __init__(self):
super().__init__(
'Unable to find a reachable platform.')
wxtim marked this conversation as resolved.
Show resolved Hide resolved


class CylcVersionError(CylcError):
"""Contact file is for a Cylc Version not supported by this script."""
def __init__(self, version=None):
Expand Down
68 changes: 59 additions & 9 deletions cylc/flow/platforms.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,8 @@
Any, Dict, Iterable, List, Optional, Tuple, Union, Set, overload
)

from cylc.flow.exceptions import PlatformLookupError, CylcError, NoHostsError
from cylc.flow.exceptions import (
PlatformLookupError, CylcError, NoHostsError, NoPlatformsError)
from cylc.flow.cfgspec.glbl_cfg import glbl_cfg
from cylc.flow.hostuserutil import is_remote_host

Expand Down Expand Up @@ -70,7 +71,8 @@ def get_platform(
# Cylc9
def get_platform(
task_conf: Union[str, Dict[str, Any], None] = None,
task_id: str = UNKNOWN_TASK
task_id: str = UNKNOWN_TASK,
bad_hosts: Optional[Set[str]] = None
) -> Optional[Dict[str, Any]]:
"""Get a platform.

Expand All @@ -90,7 +92,7 @@ def get_platform(
"""
if task_conf is None or isinstance(task_conf, str):
# task_conf is a platform name, or get localhost if None
return platform_from_name(task_conf)
return platform_from_name(task_conf, bad_hosts=bad_hosts)

elif 'platform' in task_conf and task_conf['platform']:
# Check whether task has conflicting Cylc7 items.
Expand All @@ -103,7 +105,7 @@ def get_platform(
return None

# If platform name exists and doesn't clash with Cylc7 Config items.
return platform_from_name(task_conf['platform'])
return platform_from_name(task_conf['platform'], bad_hosts=bad_hosts)

else:
if get_platform_deprecated_settings(task_conf) == []:
Expand All @@ -121,13 +123,15 @@ def get_platform(
glbl_cfg(cached=False).get(['platforms']),
task_job_section,
task_remote_section
)
),
bad_hosts=bad_hosts
)


def platform_from_name(
platform_name: Optional[str] = None,
platforms: Optional[Dict[str, Dict[str, Any]]] = None
platforms: Optional[Dict[str, Dict[str, Any]]] = None,
bad_hosts: Optional[Set[str]] = None
) -> Dict[str, Any]:
"""
Find out which job platform to use given a list of possible platforms and
Expand All @@ -154,10 +158,10 @@ def platform_from_name(

platform_group = None
for platform_name_re in reversed(list(platform_groups)):
# Platform is member of a group.
if re.fullmatch(platform_name_re, platform_name):
platform_group = deepcopy(platform_name)
platform_name = random.choice(
platform_groups[platform_name_re]['platforms']
platform_name = get_platform_from_group(
platform_groups[platform_name_re], bad_hosts=bad_hosts
)

# The list is reversed to allow user-set platforms (which are loaded
Expand Down Expand Up @@ -200,6 +204,52 @@ def platform_from_name(
f"No matching platform \"{platform_name}\" found")


def get_platform_from_group(
group: Dict[str, Any], bad_hosts: Optional[Set[str]] = None
) -> str:
"""Get platform name from group, according to the selection method.

Args:
group: A platform group dictionary.
bad_hosts: The set of hosts found to be unreachable.

Returns:
Name of platform selected, or False if all hosts on all platforms are
in bad_hosts.

Raises:
NoPlatformsError: If there are no platforms with any usable
hosts in the platform group.

TODO:
Currently uses host_selection methods, which is fine, but should
also have the ability to use custom selection methods.
wxtim marked this conversation as resolved.
Show resolved Hide resolved
"""
if bad_hosts:
good_platforms = set()
for platform in group['platforms']:
if set(platform_from_name(platform)['hosts']) - bad_hosts:
good_platforms.add(platform)

platform_names = list(good_platforms)
else:
platform_names = group['platforms']

# Return False if there are no platforms available to be selected.
if not platform_names:
raise NoPlatformsError()

# Get the selection method
method = group['selection']['method']
if method not in HOST_SELECTION_METHODS:
raise CylcError(
f'method \"{method}\" is not a supported platform '
'selection method.'
wxtim marked this conversation as resolved.
Show resolved Hide resolved
)
else:
return HOST_SELECTION_METHODS[method](platform_names)


def platform_from_job_info(
platforms: Dict[str, Any],
job: Dict[str, Any],
Expand Down
73 changes: 56 additions & 17 deletions cylc/flow/task_job_mgr.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,9 @@
from cylc.flow.exceptions import (
PlatformLookupError,
WorkflowConfigError,
TaskRemoteMgmtError
TaskRemoteMgmtError,
NoPlatformsError,
NoHostsError
)
from cylc.flow.hostuserutil import (
get_host,
Expand All @@ -61,7 +63,6 @@
get_install_target_from_platform,
get_localhost_install_target,
get_platform,
NoHostsError
)
from cylc.flow.remote import construct_ssh_cmd
from cylc.flow.subprocctx import SubProcContext
Expand Down Expand Up @@ -148,6 +149,7 @@ def __init__(self, workflow, proc_pool, workflow_db_mgr,
self.job_file_writer = JobFileWriter()
self.job_runner_mgr = self.job_file_writer.job_runner_mgr
self.bad_hosts = bad_hosts
self.bad_hosts_to_clear = set()
self.task_remote_mgr = TaskRemoteMgr(
workflow, proc_pool, self.bad_hosts)

Expand Down Expand Up @@ -243,11 +245,9 @@ def submit_task_jobs(self, workflow, itasks, curve_auth,
"""
if is_simulation:
return self._simulation_submit_task_jobs(itasks)

# Prepare tasks for job submission
prepared_tasks, bad_tasks = self.prep_submit_task_jobs(
workflow, itasks)

# Reset consumed host selection results
self.task_remote_mgr.subshell_eval_reset()

Expand Down Expand Up @@ -277,21 +277,59 @@ def submit_task_jobs(self, workflow, itasks, curve_auth,
break
else:
# If there are no hosts left for this platform.
# Set the task state to submit-failed.
itask.waiting_on_job_prep = False
itask.local_job_file_path = None
self._prep_submit_task_job_error(
workflow, itask, '(remote init)', ''
)
self.bad_hosts.difference_update(itask.platform['hosts'])
LOG.critical(TaskRemoteMgmtError(
# See if you can get another platform from the group...
# (if [task config]platform is a group)
# ...else set task to submit failed.
LOG.warning(TaskRemoteMgmtError(
(
'Initialisation on platform did not complete:'
'no hosts were reachable.'
'Tried all the hosts on platform.'
), itask.platform['name'], [], 1, '', '',
))
out_of_hosts = True
done_tasks.append(itask)
# Get another platform, if task config platform is a group
use_next_platform_in_group = False
try:
platform = get_platform(
itask.tdef.rtconfig['platform'],
bad_hosts=self.bad_hosts
)
# If were able to select a new platform;
if platform and platform != itask.platform:
use_next_platform_in_group = True
except NoPlatformsError:
use_next_platform_in_group = False
wxtim marked this conversation as resolved.
Show resolved Hide resolved

if use_next_platform_in_group:
# store the previous platform's hosts so that when
# we record a submit fail we can clear all hosts
# from all platforms from bad_hosts.
for host_ in itask.platform['hosts']:
self.bad_hosts_to_clear.add(host_)
itask.platform = platform
out_of_hosts = False
break
else:
itask.waiting_on_job_prep = False
itask.local_job_file_path = None
self._prep_submit_task_job_error(
wxtim marked this conversation as resolved.
Show resolved Hide resolved
workflow, itask, '(remote init)', ''
)
# Now that all hosts on all platforms in platform
# group selected in task config are exhausted we clear
# bad_hosts or all the hosts we have tried for this
# platform or group.
self.bad_hosts = (
wxtim marked this conversation as resolved.
Show resolved Hide resolved
self.bad_hosts - set(itask.platform['hosts']))
self.bad_hosts = (
wxtim marked this conversation as resolved.
Show resolved Hide resolved
self.bad_hosts - self.bad_hosts_to_clear)
self.bad_hosts_to_clear.clear()
LOG.critical(TaskRemoteMgmtError(
wxtim marked this conversation as resolved.
Show resolved Hide resolved
(
'Initialisation on platform did not complete:'
'no hosts were reachable.'
), itask.tdef.rtconfig['platform'], [], 1, '', '',
))
out_of_hosts = True
done_tasks.append(itask)
wxtim marked this conversation as resolved.
Show resolved Hide resolved

if out_of_hosts is True:
continue
Expand Down Expand Up @@ -1119,7 +1157,8 @@ def _prep_submit_task_job(self, workflow, itask, check_syntax=True):
rtconfig['remote']['host'] = host_n

try:
platform = get_platform(rtconfig)
platform = get_platform(rtconfig, self.bad_hosts)

except PlatformLookupError as exc:
# Submit number not yet incremented
itask.waiting_on_job_prep = False
Expand Down
2 changes: 1 addition & 1 deletion cylc/flow/task_pool.py
Original file line number Diff line number Diff line change
Expand Up @@ -453,7 +453,7 @@ def load_db_task_pool_for_restart(self, row_idx, row):
TASK_STATUS_SUBMITTED,
TASK_STATUS_RUNNING
):
# update the task proxy with user@host
# update the task proxy with platform
itask.platform = get_platform(platform_name)

if time_submit:
Expand Down
3 changes: 3 additions & 0 deletions tests/functional/intelligent-host-selection/02-badhosts.t
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,9 @@ create_test_global_config "" "
method = 'definition order'
"
#-------------------------------------------------------------------------------
# Uncomment to print config for manual testing of workflow.
# cylc config -i '[platforms]' >&2

install_workflow "${TEST_NAME_BASE}" "${TEST_NAME_BASE}"

run_ok "${TEST_NAME_BASE}-validate" cylc validate "${WORKFLOW_NAME}"
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
#!/usr/bin/env bash
# THIS FILE IS PART OF THE CYLC WORKFLOW ENGINE.
# Copyright (C) NIWA & British Crown (Met Office) & Contributors.
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#-------------------------------------------------------------------------------
# Test that Cylc Can select a host from a platform group
# Failing if there is no good host _any_ platform
# Succeeding if there is no bad host on any platform in the group
export REQUIRE_PLATFORM='loc:remote fs:indep comms:tcp'

. "$(dirname "$0")/test_header"

#-------------------------------------------------------------------------------
set_test_number 7

create_test_global_config "" "
[platforms]
[[mixedhostplatform]]
hosts = unreachable_host, ${CYLC_TEST_HOST}
install target = ${CYLC_TEST_INSTALL_TARGET}
retrieve job logs = True
[[[selection]]]
method = 'definition order'
[[badhostplatform]]
hosts = bad_host1, bad_host2
install target = ${CYLC_TEST_INSTALL_TARGET}
retrieve job logs = True

[platform groups]
[[mixedplatformgroup]]
platforms = badhostplatform, mixedhostplatform
[[[selection]]]
method = definition order
[[goodplatformgroup]]
platforms = mixedhostplatform
[[[selection]]]
method = definition order
"
#-------------------------------------------------------------------------------
# Uncomment to print config for manual testing of workflow.
# cylc config -i '[platforms]' >&2
# cylc config -i '[platform groups]' >&2

install_workflow "${TEST_NAME_BASE}" "${TEST_NAME_BASE}"

run_ok "${TEST_NAME_BASE}-validate" cylc validate "${WORKFLOW_NAME}"

workflow_run_ok "${TEST_NAME_BASE}-run" \
cylc play --debug --no-detach "${WORKFLOW_NAME}"

# Task where platform = mixedplatformgroup fails totally on badhostplatform,
# fails on the first host of mixedhostplatform, then, finally suceeds.
named_grep_ok "job submit fails for bad_host1" "\"jobs-submit\" failed.*\"bad_host1\"" \
"${WORKFLOW_RUN_DIR}/log/workflow/log"
named_grep_ok "job submit fails for bad_host2" "\"jobs-submit\" failed.*\"bad_host2\"" \
"${WORKFLOW_RUN_DIR}/log/workflow/log"
named_grep_ok "job submit fails for badhostplatform" "badhostplatform: Tried all the hosts" \
"${WORKFLOW_RUN_DIR}/log/workflow/log"
named_grep_ok "job submit fails for unreachable_host" "\"jobs-submit\" failed.*\"bad_host1\"" \
"${WORKFLOW_RUN_DIR}/log/workflow/log"
named_grep_ok "job submit _finally_ works" "[ugly.1].*preparing => submitted" \
"${WORKFLOW_RUN_DIR}/log/workflow/log"

purge
exit 0
Loading