Skip to content

Commit

Permalink
Merge branch 'master' into allow_pyproject.toml_with_cylc_lint_settings
Browse files Browse the repository at this point in the history
* master:
  tui: add poll mutation (cylc#5075)
  bump dev version
  Add workflow field to ClientError, ClientTimeout
  Bump dev version
  Lint.hardcode style index numbers (cylc#5055)
  Wait for preparing tasks to submit before auto restart (cylc#5062)
  update changelog
  Prepare release 8.0.1
  scan: ignore FileNotFoundError (cylc#5065)
  • Loading branch information
wxtim committed Aug 18, 2022
2 parents 48223c0 + 312a9de commit 362fe27
Show file tree
Hide file tree
Showing 11 changed files with 175 additions and 57 deletions.
8 changes: 7 additions & 1 deletion CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -43,12 +43,15 @@ using `--ignore <Issue Code>`.


-------------------------------------------------------------------------------
## __cylc-8.0.1 (<span actions:bind='release-date'>Upcoming</span>)__
## __cylc-8.0.1 (<span actions:bind='release-date'>Released 2022-08-16</span>)__

Maintenance release.

### Fixes

[#5045](https://github.com/cylc/cylc-flow/pull/5045) -
Fix issue where unsatisfied xtriggers could be wiped on reload.

[#5031](https://github.com/cylc/cylc-flow/pull/5031) - Fix bug where
specifying multiple datetime offsets (e.g. `final cycle point = +P1M-P1D`)
would not obey the given order.
Expand All @@ -66,6 +69,9 @@ workflow restart number would get wiped on reload.
[#5049](https://github.com/cylc/cylc-flow/pull/5049) - Fix several small
bugs related to auto restart.

[#5062](https://github.com/cylc/cylc-flow/pull/5062) - Fix bug where preparing
tasks could sometimes get orphaned when an auto restart occurred.

-------------------------------------------------------------------------------
## __cylc-8.0.0 (<span actions:bind='release-date'>Released 2022-07-28</span>)__

Expand Down
2 changes: 1 addition & 1 deletion cylc/flow/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ def environ_init():

environ_init()

__version__ = '8.0.1.dev'
__version__ = '8.1.0.dev'


def iter_entry_points(entry_point_name):
Expand Down
18 changes: 16 additions & 2 deletions cylc/flow/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -254,9 +254,17 @@ def __str__(self):

class ClientError(CylcError):

def __init__(self, message: str, traceback: Optional[str] = None):
def __init__(
self,
message: str,
traceback: Optional[str] = None,
workflow: Optional[str] = None
):
self.message = message
self.traceback = traceback
# Workflow not included in string representation but useful bit of
# info to attach to the exception object
self.workflow = workflow

def __str__(self) -> str:
ret = self.message
Expand All @@ -277,7 +285,13 @@ def __str__(self):


class ClientTimeout(CylcError):
pass

def __init__(self, message: str, workflow: Optional[str] = None):
self.message = message
self.workflow = workflow

def __str__(self) -> str:
return self.message


class CyclingError(CylcError):
Expand Down
9 changes: 5 additions & 4 deletions cylc/flow/network/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import asyncio
import getpass
import json
from typing import Tuple

import zmq
import zmq.asyncio
Expand Down Expand Up @@ -59,17 +60,17 @@ def decode_(message):
return msg


def get_location(workflow: str):
def get_location(workflow: str) -> Tuple[str, int, int]:
"""Extract host and port from a workflow's contact file.
NB: if it fails to load the workflow contact file, it will exit.
Args:
workflow (str): workflow name
workflow: workflow name
Returns:
Tuple[str, int, int]: tuple with the host name and port numbers.
tuple with the host name and port numbers.
Raises:
ClientError: if the workflow is not running.
WorkflowStopped: if the workflow is not running.
CylcVersionError: if target is a Cylc 7 (or earlier) workflow.
"""
try:
Expand Down
2 changes: 1 addition & 1 deletion cylc/flow/network/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ class WorkflowRuntimeClient(ZMQSocketBase):
"args": {...}}
Raises:
ClientError: if the workflow is not running.
WorkflowStopped: if the workflow is not running.
Call server "endpoints" using:
``__call__``, ``serial_request``
Expand Down
7 changes: 6 additions & 1 deletion cylc/flow/network/scan.py
Original file line number Diff line number Diff line change
Expand Up @@ -253,7 +253,12 @@ def _scan_subdirs(listing: List[Path], depth: int) -> None:
return_when=asyncio.FIRST_COMPLETED
)
for task in done:
path, depth, contents = task.result()
try:
path, depth, contents = task.result()
except FileNotFoundError:
# directory has been removed since the scan was scheduled
running.remove(task)
continue
running.remove(task)
is_flow = dir_is_flow(contents)
if is_flow:
Expand Down
105 changes: 65 additions & 40 deletions cylc/flow/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -1242,7 +1242,7 @@ def run_event_handlers(self, event, reason=""):
return
self.workflow_event_handler.handle(self, event, str(reason))

def release_queued_tasks(self):
def release_queued_tasks(self) -> None:
"""Release queued tasks, and submit jobs.
The task queue manages references to task proxies in the task pool.
Expand All @@ -1267,36 +1267,48 @@ def release_queued_tasks(self):
and self.auto_restart_time is None
):
pre_prep_tasks = self.pool.release_queued_tasks()
if not pre_prep_tasks:
# No tasks to submit.
return

# Start the job submission process.
self.is_updated = True
self.reset_inactivity_timer()
elif (
self.should_auto_restart_now()
and self.auto_restart_mode == AutoRestartMode.RESTART_NORMAL
):
# Need to get preparing tasks to submit before auto restart
pre_prep_tasks = [
itask for itask in self.pool.get_tasks()
if itask.state(TASK_STATUS_PREPARING)
]

self.task_job_mgr.task_remote_mgr.rsync_includes = (
self.config.get_validated_rsync_includes())
# Return, if no tasks to submit.
else:
return
if not pre_prep_tasks:
return

meth = LOG.debug
if self.options.reftest or self.options.genref:
meth = LOG.info
for itask in self.task_job_mgr.submit_task_jobs(
self.workflow,
pre_prep_tasks,
self.server.curve_auth,
self.server.client_pub_key_dir,
self.config.run_mode('simulation')
):
if itask.flow_nums:
flow = ','.join(str(i) for i in itask.flow_nums)
else:
flow = FLOW_NONE
meth(
f"{itask.identity} -triggered off "
f"{itask.state.get_resolved_dependencies()}"
f" in flow {flow}"
)
# Start the job submission process.
self.is_updated = True
self.reset_inactivity_timer()

self.task_job_mgr.task_remote_mgr.rsync_includes = (
self.config.get_validated_rsync_includes())

log = LOG.debug
if self.options.reftest or self.options.genref:
log = LOG.info
for itask in self.task_job_mgr.submit_task_jobs(
self.workflow,
pre_prep_tasks,
self.server.curve_auth,
self.server.client_pub_key_dir,
is_simulation=self.config.run_mode('simulation')
):
if itask.flow_nums:
flow = ','.join(str(i) for i in itask.flow_nums)
else:
flow = FLOW_NONE
log(
f"{itask.identity} -triggered off "
f"{itask.state.get_resolved_dependencies()} in flow {flow}"
)

def process_workflow_db_queue(self):
"""Update workflow DB."""
Expand Down Expand Up @@ -1383,27 +1395,33 @@ async def workflow_shutdown(self):
self.time_next_kill = time() + self.INTERVAL_STOP_KILL

# Is the workflow set to auto stop [+restart] now ...
if self.auto_restart_time is None or time() < self.auto_restart_time:
if not self.should_auto_restart_now():
# ... no
pass
elif self.auto_restart_mode == AutoRestartMode.RESTART_NORMAL:
# ... yes - wait for local jobs to complete before restarting
# * Avoid polling issues see #2843
# * Ensure the host can be safely taken down once the
# workflow has stopped running.
# ... yes - wait for preparing jobs to see if they're local and
# wait for local jobs to complete before restarting
# * Avoid polling issues - see #2843
# * Ensure the host can be safely taken down once the
# workflow has stopped running.
for itask in self.pool.get_tasks():
if itask.state(TASK_STATUS_PREPARING):
LOG.info(
"Waiting for preparing jobs to submit before "
"attempting restart"
)
break
if (
itask.state(*TASK_STATUSES_ACTIVE)
and itask.summary['job_runner_name']
and not is_remote_platform(itask.platform)
and self.task_job_mgr.job_runner_mgr
.is_job_local_to_host(
itask.summary['job_runner_name'])
itask.state(*TASK_STATUSES_ACTIVE)
and itask.summary['job_runner_name']
and not is_remote_platform(itask.platform)
and self.task_job_mgr.job_runner_mgr.is_job_local_to_host(
itask.summary['job_runner_name'])
):
LOG.info('Waiting for jobs running on localhost to '
'complete before attempting restart')
break
else:
else: # no break
self._set_stop(StopMode.REQUEST_NOW_NOW)
elif ( # noqa: SIM106
self.auto_restart_mode == AutoRestartMode.FORCE_STOP
Expand All @@ -1415,6 +1433,13 @@ async def workflow_shutdown(self):
raise SchedulerError(
'Invalid auto_restart_mode=%s' % self.auto_restart_mode)

def should_auto_restart_now(self) -> bool:
"""Is it time for the scheduler to auto stop + restart?"""
return (
self.auto_restart_time is not None and
time() >= self.auto_restart_time
)

def workflow_auto_restart(self, max_retries: int = 3) -> bool:
"""Attempt to restart the workflow assuming it has already stopped."""
cmd = [
Expand Down
1 change: 1 addition & 0 deletions cylc/flow/scripts/lint.py
Original file line number Diff line number Diff line change
Expand Up @@ -574,6 +574,7 @@ def get_option_parser() -> COP:
action='append',
default=[],
dest='ignores',
metavar="CODE",
choices=tuple([f'S{i["index"]:03d}' for i in STYLE_CHECKS.values()])
)

Expand Down
6 changes: 4 additions & 2 deletions cylc/flow/tui/data.py
Original file line number Diff line number Diff line change
Expand Up @@ -92,13 +92,15 @@
'hold',
'release',
'kill',
'trigger'
'trigger',
'poll',
],
'task': [
'hold',
'release',
'kill',
'trigger'
'trigger',
'poll',
],
'job': [
'kill',
Expand Down
17 changes: 17 additions & 0 deletions tests/unit/scripts/test_lint.py
Original file line number Diff line number Diff line change
Expand Up @@ -240,6 +240,23 @@ def test_check_exclusions(create_testable_file, exclusion):
assert item not in result.out


@pytest.mark.parametrize(
'exclusion',
[
comb for i in range(len(STYLE_CHECKS.values()))
for comb in combinations(
[f'S{i["index"]:03d}' for i in STYLE_CHECKS.values()], i + 1
)
]
)
def test_check_exclusions(create_testable_file, exclusion):
"""It does not report any items excluded."""
result, _ = create_testable_file(
LINT_TEST_FILE, 'style', list(exclusion))
for item in exclusion:
assert item not in result.out


@pytest.fixture
def create_testable_dir(tmp_path):
test_file = (tmp_path / 'suite.rc')
Expand Down
57 changes: 52 additions & 5 deletions tests/unit/test_scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,16 +15,19 @@
# along with this program. If not, see <http://www.gnu.org/licenses/>.
"""Tests for Cylc scheduler server."""

import pytest
from time import time
from types import SimpleNamespace
from typing import Any, List
from unittest.mock import Mock
from typing import List
from unittest.mock import MagicMock, Mock

import pytest

from cylc.flow.exceptions import InputError
from cylc.flow.scheduler import Scheduler
from cylc.flow.scheduler_cli import RunOptions

Fixture = Any
from cylc.flow.task_pool import TaskPool
from cylc.flow.task_proxy import TaskProxy
from cylc.flow.workflow_status import AutoRestartMode


@pytest.mark.parametrize(
Expand Down Expand Up @@ -62,3 +65,47 @@ def test_check_startup_opts(
with pytest.raises(InputError) as excinfo:
Scheduler._check_startup_opts(mocked_scheduler)
assert(err_msg.format(opt) in str(excinfo))


@pytest.mark.parametrize(
'auto_restart_time, expected',
[
(-1, True),
(0, True),
(1, False),
(None, False),
]
)
def test_should_auto_restart_now(
auto_restart_time, expected, monkeypatch: pytest.MonkeyPatch
):
"""Test Scheduler.should_auto_restart_now()."""
time_now = time()
monkeypatch.setattr('cylc.flow.scheduler.time', lambda: time_now)
if auto_restart_time is not None:
auto_restart_time += time_now
mock_schd = Mock(spec=Scheduler, auto_restart_time=auto_restart_time)
assert Scheduler.should_auto_restart_now(mock_schd) == expected


def test_release_queued_tasks__auto_restart():
"""Test that Scheduler.release_queued_tasks() works as expected
during auto restart."""
mock_schd = Mock(
auto_restart_time=(time() - 100),
auto_restart_mode=AutoRestartMode.RESTART_NORMAL,
is_paused=False,
stop_mode=None,
pool=Mock(
spec=TaskPool,
get_tasks=lambda: [Mock(spec=TaskProxy)]
),
workflow='parachutes',
options=RunOptions(),
task_job_mgr=MagicMock()
)
Scheduler.release_queued_tasks(mock_schd)
# Should not actually release any more tasks, just submit the
# preparing ones
mock_schd.pool.release_queued_tasks.assert_not_called()
mock_schd.task_job_mgr.submit_task_jobs.assert_called()

0 comments on commit 362fe27

Please sign in to comment.