diff --git a/.github/workflows/bash.yml b/.github/workflows/bash.yml index 9f9dd123c20..aff9b7bfb4b 100644 --- a/.github/workflows/bash.yml +++ b/.github/workflows/bash.yml @@ -44,9 +44,9 @@ jobs: tests/functional/cylc-poll/15-job-st-file-no-batch.t \ tests/functional/events/28-inactivity.t \ tests/functional/events/34-task-abort.t \ - tests/functional/hold-release/12-hold-then-retry.t \ tests/functional/job-file-trap/00-sigusr1.t \ tests/functional/job-file-trap/02-pipefail.t \ + tests/functional/pause-resume/12-pause-then-retry.t \ tests/functional/shutdown/09-now2.t \ tests/functional/shutdown/13-no-port-file-check.t \ tests/functional/shutdown/14-no-dir-check.t diff --git a/CHANGES.md b/CHANGES.md index 1d817a739ac..9b31cf28672 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -66,10 +66,18 @@ queueing logic centralized. `cylc register` has been replaced by `cylc install` ([#4000](https://github.com/cylc/cylc-flow/pull/4000)). +Added a new command: `cylc clean`, for removing stopped workflows on the local +and any remote filesystems ([#3961](https://github.com/cylc/cylc-flow/pull/3961), +[#4017](https://github.com/cylc/cylc-flow/pull/4017)). + `cylc run` and `cylc restart` have been replaced by `cylc play`, simplifying how workflows are restarted ([#4040](https://github.com/cylc/cylc-flow/pull/4040)). +`cylc pause` and `cylc play` are now used to pause and resume workflows, +respectively. `cylc hold` and `cylc release` now only hold and release tasks, +not the whole workflow. ([#4076](https://github.com/cylc/cylc-flow/pull/4076)) + "Implicit"/"naked" tasks (tasks that do not have an explicit definition in `flow.cylc[runtime]`) are now disallowed by default ([#4109](https://github.com/cylc/cylc-flow/pull/4109)). You can allow them by @@ -167,11 +175,6 @@ hierarchy and ability to set site config directory. [#3883](https://github.com/cylc/cylc-flow/pull/3883) - Added a new workflow config option `[scheduling]stop after cycle point`. -[#3961](https://github.com/cylc/cylc-flow/pull/3961), -[#4017](https://github.com/cylc/cylc-flow/pull/4017) - Added a new command: -`cylc clean`, for removing stopped workflows on the local and any remote -filesystems. - [#3913](https://github.com/cylc/cylc-flow/pull/3913) - Added the ability to use plugins to parse suite templating variables and additional files to install. Only one such plugin exists at the time of writing, designed to diff --git a/cylc/flow/cycling/loader.py b/cylc/flow/cycling/loader.py index e07a2db954c..df781a8388f 100644 --- a/cylc/flow/cycling/loader.py +++ b/cylc/flow/cycling/loader.py @@ -19,7 +19,9 @@ Each task may have multiple sequences, e.g. 12-hourly and 6-hourly. """ -from . import integer +from typing import Optional, Type + +from . import PointBase, integer from . import iso8601 from metomi.isodatetime.data import Calendar @@ -58,18 +60,19 @@ class DefaultCycler: """Store the default TYPE for Cyclers.""" - TYPE = None + TYPE: str -def get_point(*args, **kwargs): +def get_point( + value: str, cycling_type: Optional[str] = None +) -> Optional[PointBase]: """Return a cylc.flow.cycling.PointBase-derived object from a string.""" - if args[0] is None: + if value is None: return None - cycling_type = kwargs.pop("cycling_type", DefaultCycler.TYPE) - return get_point_cls(cycling_type=cycling_type)(*args, **kwargs) + return get_point_cls(cycling_type=cycling_type)(value) -def get_point_cls(cycling_type=None): +def get_point_cls(cycling_type: Optional[str] = None) -> Type[PointBase]: """Return the cylc.flow.cycling.PointBase-derived class we're using.""" if cycling_type is None: cycling_type = DefaultCycler.TYPE diff --git a/cylc/flow/etc/cylc-bash-completion b/cylc/flow/etc/cylc-bash-completion index d3bbaf1bc07..d7bd7ddf1f2 100644 --- a/cylc/flow/etc/cylc-bash-completion +++ b/cylc/flow/etc/cylc-bash-completion @@ -38,7 +38,7 @@ _cylc() { cur="${COMP_WORDS[COMP_CWORD]}" sec="${COMP_WORDS[1]}" opts="$(cylc scan -t name 2>/dev/null)" - suite_cmds="broadcast|bcast|cat-log|check-versions|clean|compare|diff|dump|edit|ext-trigger|external-trigger|get-suite-config|get-config|get-suite-version|get-cylc-version|graph|hold|insert|install|kill|list|log|ls|tui|ping|play|poll|print|reinstall|release|unhold|reload|remove|report-timings|reset|scan|search|grep|set-verbosity|show|set-outputs|stop|shutdown|single|suite-state|test-battery|trigger|validate|view|warranty" + suite_cmds="broadcast|bcast|cat-log|check-versions|clean|compare|diff|dump|edit|ext-trigger|external-trigger|get-suite-config|get-config|get-suite-version|get-cylc-version|graph|hold|insert|install|kill|list|log|ls|tui|pause|ping|play|poll|print|reinstall|release|unhold|reload|remove|report-timings|reset|scan|search|grep|set-verbosity|show|set-outputs|stop|shutdown|single|suite-state|test-battery|trigger|validate|view|warranty" if [[ ${COMP_CWORD} -eq 1 ]]; then diff --git a/cylc/flow/network/resolvers.py b/cylc/flow/network/resolvers.py index 5930c9a5bb2..994a0146653 100644 --- a/cylc/flow/network/resolvers.py +++ b/cylc/flow/network/resolvers.py @@ -21,6 +21,7 @@ import logging import queue from time import time +from typing import Iterable, Tuple, TYPE_CHECKING from uuid import uuid4 from graphene.utils.str_converters import to_snake_case @@ -34,6 +35,11 @@ NodesEdges, PROXY_NODES, SUB_RESOLVERS, parse_node_id, sort_elements ) +if TYPE_CHECKING: + from cylc.flow.data_store_mgr import DataStoreMgr + from cylc.flow.scheduler import Scheduler + + logger = logging.getLogger(__name__) DELTA_SLEEP_INTERVAL = 0.5 @@ -438,15 +444,11 @@ async def subscribe_delta(self, root, info, args): class Resolvers(BaseResolvers): """Workflow Service context GraphQL query and mutation resolvers.""" - schd = None + schd: 'Scheduler' - def __init__(self, data, **kwargs): + def __init__(self, data: 'DataStoreMgr', schd: 'Scheduler') -> None: super().__init__(data) - - # Set extra attributes - for key, value in kwargs.items(): - if hasattr(self, key): - setattr(self, key, value) + self.schd = schd # Mutations async def mutator(self, *m_args): @@ -520,16 +522,19 @@ def broadcast( cutoff) raise ValueError('Unsupported broadcast mode') - def hold(self, tasks=None, time=None): - """Hold the workflow.""" - self.schd.command_queue.put(( - 'hold', - tuple(), - filter_none({ - 'tasks': tasks or None, - 'time': time - }) - )) + def hold(self, tasks: Iterable[str]) -> Tuple[bool, str]: + """Hold tasks.""" + self.schd.command_queue.put(('hold', (tasks,), {})) + return (True, 'Command queued') + + def set_hold_point(self, point: str) -> Tuple[bool, str]: + """Set workflow hold after cycle point.""" + self.schd.command_queue.put(('set_hold_point', (point,), {})) + return (True, 'Command queued') + + def pause(self) -> Tuple[bool, str]: + """Pause the workflow.""" + self.schd.command_queue.put(('pause', tuple(), {})) return (True, 'Command queued') def kill_tasks(self, tasks): @@ -649,15 +654,19 @@ def reload_suite(self): self.schd.command_queue.put(("reload_suite", (), {})) return (True, 'Command queued') - def release(self, tasks=None): - """Release (un-hold) the workflow.""" - self.schd.command_queue.put(( - "release", - (), - filter_none({ - 'ids': tasks - }) - )) + def release(self, tasks: Iterable[str]) -> Tuple[bool, str]: + """Release held tasks.""" + self.schd.command_queue.put(('release', (tasks,), {})) + return (True, 'Command queued') + + def release_hold_point(self) -> Tuple[bool, str]: + """Release all tasks and unset workflow hold point.""" + self.schd.command_queue.put(('release_hold_point', tuple(), {})) + return (True, 'Command queued') + + def resume(self) -> Tuple[bool, str]: + """Resume the workflow.""" + self.schd.command_queue.put(('resume', tuple(), {})) return (True, 'Command queued') def set_verbosity(self, level): diff --git a/cylc/flow/network/scan.py b/cylc/flow/network/scan.py index f653103f07d..8217cf32e1d 100644 --- a/cylc/flow/network/scan.py +++ b/cylc/flow/network/scan.py @@ -343,8 +343,8 @@ async def graphql_query(flow, fields, filters=None): # state must be running [('state',), 'running'] - # state must be running or held - [('state',), ('running', 'held')] + # state must be running or paused + [('state',), ('running', 'paused')] """ query = f'query {{ workflows(ids: ["{flow["name"]}"]) {{ {fields} }} }}' diff --git a/cylc/flow/network/schema.py b/cylc/flow/network/schema.py index cfb3b5cad77..ad23a495757 100644 --- a/cylc/flow/network/schema.py +++ b/cylc/flow/network/schema.py @@ -1532,23 +1532,33 @@ class Arguments: result = GenericScalar() -class Hold(Mutation): +class SetHoldPoint(Mutation): class Meta: description = sstrip(''' - Hold a workflow or tasks within it. + Set workflow hold after cycle point. All tasks after this point + will be held. ''') - resolver = partial(mutator, command='hold') + resolver = partial(mutator, command='set_hold_point') class Arguments: workflows = List(WorkflowID, required=True) - tasks = List( - NamespaceIDGlob, - description='Hold the specified tasks rather than the workflow.' + point = CyclePoint( + description='Hold all tasks after the specified cycle point.', + required=True ) - time = TimePoint(description=sstrip(''' - Get the workflow to hold after the specified wallclock time - has passed. - ''')) + + result = GenericScalar() + + +class Pause(Mutation): + class Meta: + description = sstrip(''' + Pause a workflow. + ''') + resolver = partial(mutator, command='pause') + + class Arguments: + workflows = List(WorkflowID, required=True) result = GenericScalar() @@ -1596,23 +1606,30 @@ class Arguments: result = GenericScalar() -class Release(Mutation): +class ReleaseHoldPoint(Mutation): class Meta: description = sstrip(''' - Release a held workflow or tasks within it. + Release all tasks and unset the workflow hold point, if set. + ''') + resolver = partial(mutator, command='release_hold_point') - See also the opposite command `hold`. + class Arguments: + workflows = List(WorkflowID, required=True) + + result = GenericScalar() + + +class Resume(Mutation): + class Meta: + description = sstrip(''' + Resume a paused workflow. + + See also the opposite command `pause`. ''') - resolver = partial(mutator, command='release') + resolver = partial(mutator, command='resume') class Arguments: workflows = List(WorkflowID, required=True) - tasks = List( - NamespaceIDGlob, - description=sstrip(''' - Release matching tasks rather than the workflow as whole. - ''') - ) result = GenericScalar() @@ -1763,6 +1780,24 @@ class Arguments: result = GenericScalar() +class Hold(Mutation, TaskMutation): + class Meta: + description = sstrip(''' + Hold tasks within a workflow. + ''') + resolver = partial(mutator, command='hold') + + +class Release(Mutation, TaskMutation): + class Meta: + description = sstrip(''' + Release held tasks within a workflow. + + See also the opposite command `hold`. + ''') + resolver = partial(mutator, command='release') + + class Kill(Mutation, TaskMutation): # TODO: This should be a job mutation? class Meta: @@ -1851,18 +1886,22 @@ class Mutations(ObjectType): # workflow actions broadcast = _mut_field(Broadcast) ext_trigger = _mut_field(ExtTrigger) - hold = _mut_field(Hold) message = _mut_field(Message) + pause = _mut_field(Pause) ping = _mut_field(Ping) - release = _mut_field(Release) reload = _mut_field(Reload) + resume = _mut_field(Resume) set_verbosity = _mut_field(SetVerbosity) set_graph_window_extent = _mut_field(SetGraphWindowExtent) stop = _mut_field(Stop) + set_hold_point = _mut_field(SetHoldPoint) + release_hold_point = _mut_field(ReleaseHoldPoint) # task actions + hold = _mut_field(Hold) kill = _mut_field(Kill) poll = _mut_field(Poll) + release = _mut_field(Release) remove = _mut_field(Remove) set_outputs = _mut_field(SetOutputs) trigger = _mut_field(Trigger) diff --git a/cylc/flow/scheduler.py b/cylc/flow/scheduler.py index b30ad35fd6a..99c4048f8b4 100644 --- a/cylc/flow/scheduler.py +++ b/cylc/flow/scheduler.py @@ -29,7 +29,7 @@ from threading import Barrier from time import sleep, time import traceback -from typing import Optional, List +from typing import Iterable, Optional, List from uuid import uuid4 import zmq from zmq.auth.thread import ThreadAuthenticator @@ -43,7 +43,7 @@ from cylc.flow.cycling.loader import get_point from cylc.flow.data_store_mgr import DataStoreMgr, parse_job_item from cylc.flow.exceptions import ( - CylcError, SuiteConfigError, PlatformLookupError + CyclingError, CylcError, SuiteConfigError, PlatformLookupError ) import cylc.flow.flags from cylc.flow.host_select import select_suite_host @@ -164,6 +164,29 @@ class Scheduler: 'reload_suite' ) + # managers + profiler: Profiler + pool: TaskPool + proc_pool: SubProcPool + task_job_mgr: TaskJobManager + task_events_mgr: TaskEventsManager + suite_event_handler: SuiteEventHandler + data_store_mgr: DataStoreMgr + suite_db_mgr: SuiteDatabaseManager + broadcast_mgr: BroadcastMgr + xtrigger_mgr: XtriggerManager + + # queues + command_queue: Queue + message_queue: Queue + ext_trigger_queue: Queue + + # configuration + config: SuiteConfig # flow config + cylc_config: DictTree # [scheduler] config + flow_file: Optional[str] = None + flow_file_update_time: Optional[float] = None + # flow information suite: Optional[str] = None owner: Optional[str] = None @@ -182,12 +205,6 @@ class Scheduler: stop_task: Optional[str] = None stop_clock_time: Optional[int] = None - # configuration - config: Optional[SuiteConfig] = None # flow config - cylc_config: Optional[DictTree] = None # [scheduler] config - flow_file: Optional[str] = None - flow_file_update_time: Optional[float] = None - # directories suite_dir: Optional[str] = None suite_log_dir: Optional[str] = None @@ -196,6 +213,7 @@ class Scheduler: suite_work_dir: Optional[str] = None # task event loop + is_paused: Optional[bool] = None is_updated: Optional[bool] = None is_stalled: Optional[bool] = None @@ -215,23 +233,6 @@ class Scheduler: curve_auth: ThreadAuthenticator = None client_pub_key_dir: Optional[str] = None - # managers - profiler: Optional[Profiler] = None - pool: Optional[TaskPool] = None - proc_pool: Optional[SubProcPool] = None - task_job_mgr: Optional[TaskJobManager] = None - task_events_mgr: Optional[TaskEventsManager] = None - suite_event_handler: Optional[SuiteEventHandler] = None - data_store_mgr: Optional[DataStoreMgr] = None - suite_db_mgr: Optional[SuiteDatabaseManager] = None - broadcast_mgr: Optional[BroadcastMgr] = None - xtrigger_mgr: Optional[XtriggerManager] = None - - # queues - command_queue: Optional[Queue] = None - message_queue: Optional[Queue] = None - ext_trigger_queue: Optional[Queue] = None - # queue-released tasks still in prep pre_submit_tasks: Optional[List[TaskProxy]] = None @@ -281,7 +282,7 @@ async def install(self): """ # Install - source = suite_files.get_workflow_source_dir(Path.cwd()) + source, _ = suite_files.get_workflow_source_dir(Path.cwd()) if source is None: # register workflow rund = get_workflow_run_dir(self.suite) @@ -491,18 +492,18 @@ async def configure(self): self.options.main_loop ) - # Determine whether suite is held or should be held - # Determine whether suite can be auto shutdown holdcp = None if self.options.holdcp: holdcp = self.options.holdcp elif self.config.cfg['scheduling']['hold after cycle point']: holdcp = self.config.cfg['scheduling']['hold after cycle point'] if holdcp is not None: - self.hold_suite(get_point(holdcp)) - if self.options.hold_start: - LOG.info("Held on start-up (no tasks will be submitted)") - self.hold_suite() + self.command_set_hold_point(holdcp) + + if self.options.paused_start: + LOG.info("Paused on start up") + self.pause_workflow() + self.profiler.log_memory("scheduler.py: begin run while loop") self.is_updated = True if self.options.profile_mode: @@ -882,10 +883,19 @@ def _set_stop(self, stop_mode=None): self.proc_pool.set_stopping() self.stop_mode = stop_mode - def command_release(self, ids=None): - if ids: - return self.pool.release_tasks(ids) - self.release_suite() + def command_release(self, task_globs: Iterable[str]) -> int: + """Release held tasks.""" + return self.pool.release_tasks(task_globs) + + def command_release_hold_point(self) -> None: + """Release all held tasks and unset workflow hold after cycle point, + if set.""" + LOG.info("Releasing all tasks and removing hold cycle point.") + self.pool.release_hold_point() + + def command_resume(self) -> None: + """Resume paused workflow.""" + self.resume_workflow() def command_poll_tasks(self, items=None): """Poll pollable tasks or a task/family if options are provided.""" @@ -907,16 +917,23 @@ def command_kill_tasks(self, items=None): self.task_job_mgr.kill_task_jobs(self.suite, itasks) return len(bad_items) - def command_hold(self, tasks=None, time=None): - if tasks: - self.pool.hold_tasks(tasks) - if time: - point = TaskID.get_standardised_point(time) - self.hold_suite(point) - LOG.info( - 'The suite will pause when all tasks have passed %s', point) - if not (tasks or time): - self.hold_suite() + def command_hold(self, task_globs: Iterable[str]) -> int: + """Hold specified tasks.""" + return self.pool.hold_tasks(task_globs) + + def command_set_hold_point(self, point: str) -> None: + """Hold all tasks after the specified cycle point.""" + cycle_point = TaskID.get_standardised_point(point) + if cycle_point is None: + raise CyclingError("Cannot set hold point to None") + LOG.info( + f"Setting hold cycle point: {cycle_point}\n" + "All tasks after this point will be held.") + self.pool.set_hold_point(cycle_point) + + def command_pause(self) -> None: + """Pause the workflow.""" + self.pause_workflow() @staticmethod def command_set_verbosity(lvl): @@ -1100,7 +1117,7 @@ def _load_suite_params(self, row_idx, row): * Start/Stop Cycle points. * Stop task. * Suite UUID. - * A flag to indicate if the suite should be held or not. + * A flag to indicate if the suite should be paused or not. * Original suite run time zone. """ if row_idx == 0: @@ -1137,10 +1154,10 @@ def _load_suite_params(self, row_idx, row): elif key == self.suite_db_mgr.KEY_UUID_STR: self.uuid_str.value = value LOG.info('+ suite UUID = %s', value) - elif key == self.suite_db_mgr.KEY_HOLD: - if self.options.hold_start is None: - self.options.hold_start = bool(value) - LOG.info('+ hold suite = %s', bool(value)) + elif key == self.suite_db_mgr.KEY_PAUSED: + if self.options.paused_start is None: + self.options.paused_start = bool(value) + LOG.info(f'+ paused = {bool(value)}') elif key == self.suite_db_mgr.KEY_HOLD_CYCLE_POINT: if self.options.holdcp is None: self.options.holdcp = value @@ -1211,7 +1228,8 @@ def process_task_pool(self): itask.waiting_on_job_prep ] - if self.stop_mode is None and self.auto_restart_time is None: + if (not self.is_paused and + self.stop_mode is None and self.auto_restart_time is None): # Add newly released tasks to those still preparing. self.pre_submit_tasks += self.pool.queue_and_release() if self.pre_submit_tasks: @@ -1389,7 +1407,7 @@ def update_profiler_logs(self, tinit): self.count, get_current_time_string())) self.count += 1 - def release_tasks(self): + def release_runahead_tasks(self) -> None: if self.pool.release_runahead_tasks(): self.is_updated = True self.task_events_mgr.pflag = True @@ -1408,7 +1426,8 @@ async def main_loop(self): self.data_store_mgr.publish_deltas) self.process_command_queue() - self.release_tasks() + if not self.is_paused: + self.release_runahead_tasks() self.proc_pool.process() if self.should_process_tasks(): @@ -1543,7 +1562,7 @@ def check_suite_stalled(self): if self._get_events_conf(self.EVENT_TIMEOUT): self.set_suite_timer() - def should_process_tasks(self): + def should_process_tasks(self) -> bool: """Return True if waiting tasks are ready.""" # do we need to do a pass through the main task processing loop? process = False @@ -1600,6 +1619,9 @@ async def shutdown(self, reason): """ if isinstance(reason, SchedulerStop): LOG.info(f'Suite shutting down - {reason.args[0]}') + # Unset the "paused" status of the workflow if not auto-restarting + if self.auto_restart_mode != AutoRestartMode.RESTART_NORMAL: + self.resume_workflow(quiet=True) elif isinstance(reason, SchedulerError): LOG.error(f'Suite shutting down - {reason}') elif isinstance(reason, SuiteConfigError): @@ -1613,14 +1635,14 @@ async def shutdown(self, reason): else: LOG.critical('Suite shutting down') - if self.proc_pool: + if hasattr(self, 'proc_pool'): self.proc_pool.close() if self.proc_pool.is_not_done(): # e.g. KeyboardInterrupt self.proc_pool.terminate() self.proc_pool.process() - if self.pool is not None: + if hasattr(self, 'pool'): if not self.is_stalled: # (else already reported) self.pool.report_unmet_deps() @@ -1704,6 +1726,8 @@ def stop_clock_done(self): def check_auto_shutdown(self): """Check if we should do an automatic shutdown: main pool empty.""" + if self.is_paused: + return False self.pool.release_runahead_tasks() if self.pool.get_tasks(): return False @@ -1714,33 +1738,29 @@ def check_auto_shutdown(self): self.suite_db_mgr.delete_suite_stop_cycle_point() return True - def hold_suite(self, point=None): - """Hold all tasks in suite.""" - if point is None: - self.pool.hold_all_tasks() - self.task_events_mgr.pflag = True - self.suite_db_mgr.put_suite_hold() - LOG.info('Suite held.') - else: - LOG.info( - 'Setting suite hold cycle point: %s.' - '\nThe suite will hold once all tasks have passed this point.', - point - ) - self.pool.set_hold_point(point) - self.suite_db_mgr.put_suite_hold_cycle_point(point) - - def release_suite(self): - """Release (un-hold) all tasks in suite.""" - if self.pool.is_held: - LOG.info("RELEASE: new tasks will be queued when ready") - self.pool.set_hold_point(None) - self.pool.release_all_tasks() - self.suite_db_mgr.delete_suite_hold() - - def paused(self): - """Is the suite paused?""" - return self.pool.is_held + def pause_workflow(self) -> None: + """Pause the workflow.""" + if self.is_paused: + LOG.info("Workflow is already paused") + return + LOG.info("PAUSING the workflow now") + self.is_paused = True + self.suite_db_mgr.put_suite_paused() + + def resume_workflow(self, quiet: bool = False) -> None: + """Resume the workflow. + + Args: + quiet: whether to log anything. + """ + if not self.is_paused: + if not quiet: + LOG.warning("Cannot resume - workflow is not paused") + return + if not quiet: + LOG.info("RESUMING the workflow now") + self.is_paused = False + self.suite_db_mgr.delete_suite_paused() def command_force_trigger_tasks(self, items, reflow=False): """Trigger tasks.""" diff --git a/cylc/flow/scheduler_cli.py b/cylc/flow/scheduler_cli.py index 8f4fe082d95..398738f470b 100644 --- a/cylc/flow/scheduler_cli.py +++ b/cylc/flow/scheduler_cli.py @@ -21,19 +21,18 @@ import sys from ansimarkup import parse as cparse -from pathlib import Path from cylc.flow import LOG, RSYNC_LOG from cylc.flow.exceptions import SuiteServiceFileError from cylc.flow.host_select import select_suite_host from cylc.flow.hostuserutil import is_remote_host from cylc.flow.loggingutil import TimestampRotatingFileHandler +from cylc.flow.network.client import SuiteRuntimeClient from cylc.flow.option_parsers import ( CylcOptionParser as COP, Options ) from cylc.flow.pathutil import ( - get_workflow_run_dir, get_suite_run_log_name, get_suite_file_install_log_name) from cylc.flow.remote import _remote_cylc_cmd @@ -42,10 +41,10 @@ from cylc.flow import suite_files from cylc.flow.terminal import cli_function -PLAY_DOC = r"""cylc [control] play [OPTIONS] [ARGS] +PLAY_DOC = r"""cylc play [OPTIONS] ARGS Start running a workflow, or restart a stopped workflow from its previous -state/cycle point, or resume a paused workflow by releasing all tasks. +state, or resume a paused workflow. The scheduler will run as a daemon unless you specify --no-detach. @@ -53,10 +52,10 @@ it will be installed on the fly before start up. Examples: - # Start/restart the workflow with name REG. + # Start, restart or resume the workflow with name REG. $ cylc play REG -A "cold start" (the default for a freshly-installed workflow) starts from the +A "(cold) start" (the default for a freshly-installed workflow) starts from the initial cycle point (specified in flow.cylc or on the command line). Any dependence on tasks prior to the initial cycle point is ignored. It is also possible to start from a point that is later than the initial cycle @@ -66,11 +65,26 @@ A "restart" continues on from the most recent recorded state of the workflow. Tasks recorded as submitted or running are polled at restart to determine what -happened to them while the workflow was shut down.""" +happened to them while the workflow was shut down. + +A "resume" of a paused (but not stopped) workflow allows task jobs to be +submitted once again.""" FLOW_NAME_ARG_DOC = ("REG", "Workflow name") +RESUME_MUTATION = ''' +mutation ( + $wFlows: [WorkflowID]! +) { + resume ( + workflows: $wFlows + ) { + result + } +} +''' + @lru_cache() def get_option_parser(add_std_opts=False): @@ -79,6 +93,7 @@ def get_option_parser(add_std_opts=False): PLAY_DOC, icp=True, jset=True, + comms=True, argdoc=[FLOW_NAME_ARG_DOC]) parser.add_option( @@ -122,16 +137,13 @@ def get_option_parser(add_std_opts=False): metavar="CYCLE_POINT", action="store", dest="stopcp") parser.add_option( - "--hold", - help="Hold suite immediately on starting.", - action="store_true", dest="hold_start") + "--pause", + help="Pause the workflow immediately on start up.", + action="store_true", dest="paused_start") parser.add_option( - "--hold-point", "--hold-after", - help=( - "Set hold cycle point. " - "Hold suite AFTER all tasks have PASSED this cycle point." - ), + "--hold-after", "--hold-cycle-point", "--holdcp", + help="Hold all tasks after this cycle point.", metavar="CYCLE_POINT", action="store", dest="holdcp") parser.add_option( @@ -207,19 +219,6 @@ def get_option_parser(add_std_opts=False): get_option_parser(add_std_opts=True), DEFAULT_OPTS) -def _auto_install(): - """Register workflow installed in the cylc-run directory""" - try: - reg = suite_files.register() - except SuiteServiceFileError as exc: - sys.exit(exc) - # Replace this process with "cylc play REG ..." for 'ps -f'. - os.execv( - sys.argv[0], - [sys.argv[0]] + sys.argv[1:] + [reg] - ) - - def _open_logs(reg, no_detach): """Open Cylc log handlers for a flow run.""" if not no_detach: @@ -260,16 +259,22 @@ def scheduler_cli(parser, options, reg): functionality. """ + suite_files.validate_flow_name(reg) reg = os.path.normpath(reg) try: suite_files.detect_old_contact_file(reg) except SuiteServiceFileError as exc: - # TODO: unpause - print(f"Workflow is already running\n\n{exc}") + print(f"Resuming already-running workflow\n\n{exc}") + pclient = SuiteRuntimeClient(reg, timeout=options.comms_timeout) + mutation_kwargs = { + 'request_string': RESUME_MUTATION, + 'variables': { + 'wFlows': [reg] + } + } + pclient('graphql', mutation_kwargs) sys.exit(0) - _check_srvd(reg) - # re-execute on another host if required _distribute(options.host) @@ -313,16 +318,6 @@ def scheduler_cli(parser, options, reg): sys.exit(ret) -def _check_srvd(reg): - """Check the run dir contains .service dir""" - workflow_run_dir = get_workflow_run_dir(reg) - if not Path(workflow_run_dir, - suite_files.SuiteFiles.Service.DIRNAME).exists: - sys.stderr.write(f'suite service directory not found ' - f'at: {workflow_run_dir}\n') - sys.exit(1) - - def _distribute(host): """Re-invoke this command on a different host if requested.""" # Check whether a run host is explicitly specified, else select one. diff --git a/cylc/flow/scripts/cylc.py b/cylc/flow/scripts/cylc.py index 006e79d957e..f8ed0b086c6 100644 --- a/cylc/flow/scripts/cylc.py +++ b/cylc/flow/scripts/cylc.py @@ -313,6 +313,7 @@ def cli_help(): commands=[ 'hold', 'kill', + 'pause', 'play', 'release', 'scan', diff --git a/cylc/flow/scripts/hold.py b/cylc/flow/scripts/hold.py index 838f57dd388..c6fc9c47eff 100755 --- a/cylc/flow/scripts/hold.py +++ b/cylc/flow/scripts/hold.py @@ -18,33 +18,65 @@ """cylc hold [OPTIONS] ARGS -Hold a workflow or tasks. +Hold one or more tasks in a workflow. + +Held tasks do not submit their jobs even if ready to run. Examples: - $ cylc hold REG # hold a workflow - $ cylc hold REG TASK_GLOB ... # hold one or more tasks in a workflow + # Hold mytask at cycle point 1234 in my_flow + $ cylc hold my_flow mytask.1234 -Held tasks do not submit their jobs even if ready to run. + # Hold all active tasks at cycle 1234 in my_flow (note: tasks before/after + # this cycle point will not be held) + $ cylc hold my_flow '*.1234' + + # Hold all active instances of mytask in my_flow (note: this will not hold + # any unspawned tasks that might spawn in the future) + $ cylc hold my_flow 'mytask.*' + + # Hold all tasks after cycle point 1234 in my_flow + $ cylc hold my_flow --after=1234 + +Note: To pause a workflow (immediately preventing all job submission), use +'cylc pause' instead. See also 'cylc release'. """ import os.path +from typing import TYPE_CHECKING +from cylc.flow.exceptions import UserInputError from cylc.flow.option_parsers import CylcOptionParser as COP from cylc.flow.network.client import SuiteRuntimeClient from cylc.flow.terminal import cli_function -MUTATION = ''' +if TYPE_CHECKING: + from cylc.flow.option_parsers import Options + + +HOLD_MUTATION = ''' mutation ( $wFlows: [WorkflowID]!, - $tasks: [NamespaceIDGlob], - $time: TimePoint + $tasks: [NamespaceIDGlob]! ) { hold ( workflows: $wFlows, - tasks: $tasks, - time: $time + tasks: $tasks + ) { + result + } +} +''' + +SET_HOLD_POINT_MUTATION = ''' +mutation ( + $wFlows: [WorkflowID]!, + $point: CyclePoint! +) { + setHoldPoint ( + workflows: $wFlows, + point: $point ) { result } @@ -52,32 +84,56 @@ ''' -def get_option_parser(): +def get_option_parser() -> COP: parser = COP( __doc__, comms=True, multitask=True, argdoc=[ - ("REG", "Suite name"), - ('[TASK_GLOB ...]', 'Task matching patterns')]) + ('REG', "Workflow name"), + ('[TASK_GLOB ...]', "Task matching patterns")] + ) parser.add_option( "--after", - help="Hold whole suite AFTER this cycle point.", + help="Hold all tasks after this cycle point.", metavar="CYCLE_POINT", action="store", dest="hold_point_string") return parser +def _validate(options: 'Options', *task_globs: str) -> None: + """Check combination of options and task globs is valid.""" + if options.hold_point_string: + if task_globs: + raise UserInputError( + "Cannot combine --after with TASK_GLOB(s).\n" + "`cylc hold --after` holds all tasks after the given " + "cycle point.") + else: + if not task_globs: + raise UserInputError( + "Missing arguments: TASK_GLOB [...]. See `cylc hold --help`.") + + @cli_function(get_option_parser) -def main(parser, options, suite, *task_globs): - suite = os.path.normpath(suite) - pclient = SuiteRuntimeClient(suite, timeout=options.comms_timeout) +def main(parser: COP, options: 'Options', workflow: str, *task_globs: str): + + _validate(options, *task_globs) + + workflow = os.path.normpath(workflow) + pclient = SuiteRuntimeClient(workflow, timeout=options.comms_timeout) + + if options.hold_point_string: + mutation = SET_HOLD_POINT_MUTATION + args = {'point': options.hold_point_string} + else: + mutation = HOLD_MUTATION + args = {'tasks': list(task_globs)} mutation_kwargs = { - 'request_string': MUTATION, + 'request_string': mutation, 'variables': { - 'wFlows': [suite], - 'tasks': list(task_globs), - 'time': options.hold_point_string, + 'wFlows': [workflow], + **args } } diff --git a/cylc/flow/scripts/pause.py b/cylc/flow/scripts/pause.py new file mode 100644 index 00000000000..c5b697e3968 --- /dev/null +++ b/cylc/flow/scripts/pause.py @@ -0,0 +1,77 @@ +#!/usr/bin/env python3 + +# THIS FILE IS PART OF THE CYLC SUITE ENGINE. +# Copyright (C) NIWA & British Crown (Met Office) & Contributors. +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . + +"""cylc pause [OPTIONS] ARGS + +Pause a workflow. + +This prevents submission of any task jobs. + +Examples: + $ cylc pause my_flow + +To resume a paused workflow, use 'cylc play'. + +Not to be confused with `cylc hold`. +""" + +import os.path + +from cylc.flow.option_parsers import CylcOptionParser as COP +from cylc.flow.network.client import SuiteRuntimeClient +from cylc.flow.terminal import cli_function + +MUTATION = ''' +mutation ( + $wFlows: [WorkflowID]! +) { + pause ( + workflows: $wFlows + ) { + result + } +} +''' + + +def get_option_parser(): + parser = COP( + __doc__, comms=True, multitask=True, + argdoc=[('REG', "Workflow name")] + ) + + return parser + + +@cli_function(get_option_parser) +def main(parser, options, workflow): + workflow = os.path.normpath(workflow) + pclient = SuiteRuntimeClient(workflow, timeout=options.comms_timeout) + + mutation_kwargs = { + 'request_string': MUTATION, + 'variables': { + 'wFlows': [workflow], + } + } + + pclient('graphql', mutation_kwargs) + + +if __name__ == '__main__': + main() diff --git a/cylc/flow/scripts/release.py b/cylc/flow/scripts/release.py index 563a2d78771..adbb8d5c939 100755 --- a/cylc/flow/scripts/release.py +++ b/cylc/flow/scripts/release.py @@ -18,11 +18,20 @@ """cylc release [OPTIONS] ARGS -Release a held workflow or tasks. +Release held tasks. Examples: - $ cylc release REG # release the workflow - $ cylc release REG TASK_GLOB ... # release one or more tasks + # Release mytask at cycle 1234 in my_flow + $ cylc release my_flow mytask.1234 + + # Release all active tasks at cycle 1234 in my_flow + $ cylc release my_flow '*.1234' + + # Release all active instances of mytask in my_flow + $ cylc release my_flow 'mytask.*' + + # Release all held tasks and remove the hold point + $ cylc release my_flow --all Held tasks do not submit their jobs even if ready to run. @@ -30,15 +39,21 @@ """ import os.path +from typing import TYPE_CHECKING +from cylc.flow.exceptions import UserInputError from cylc.flow.option_parsers import CylcOptionParser as COP from cylc.flow.network.client import SuiteRuntimeClient from cylc.flow.terminal import cli_function -MUTATION = ''' +if TYPE_CHECKING: + from cylc.flow.option_parsers import Options + + +RELEASE_MUTATION = ''' mutation ( $wFlows: [WorkflowID]!, - $tasks: [NamespaceIDGlob], + $tasks: [NamespaceIDGlob]! ) { release ( workflows: $wFlows, @@ -49,27 +64,69 @@ } ''' +RELEASE_HOLD_POINT_MUTATION = ''' +mutation ( + $wFlows: [WorkflowID]! +) { + releaseHoldPoint ( + workflows: $wFlows + ) { + result + } +} +''' + -def get_option_parser(): +def get_option_parser() -> COP: parser = COP( __doc__, comms=True, multitask=True, argdoc=[ - ("REG", 'Suite name'), - ('[TASK_GLOB ...]', 'Task matching patterns')]) + ('REG', "Workflow name"), + ('[TASK_GLOB ...]', "Task matching patterns")] + ) + + parser.add_option( + "--all", + help=( + "Release all held tasks and remove the 'hold after cycle point', " + "if set."), + action="store_true", dest="release_all") return parser +def _validate(options: 'Options', *task_globs: str) -> None: + """Check combination of options and task globs is valid.""" + if options.release_all: + if task_globs: + raise UserInputError("Cannot combine --all with TASK_GLOB(s).") + else: + if not task_globs: + raise UserInputError( + "Missing arguments: TASK_GLOB [...]. " + "See `cylc release --help`.") + + @cli_function(get_option_parser) -def main(parser, options, suite, *task_globs): - suite = os.path.normpath(suite) - pclient = SuiteRuntimeClient(suite, timeout=options.comms_timeout) +def main(parser: COP, options: 'Options', workflow: str, *task_globs: str): + + _validate(options, *task_globs) + + workflow = os.path.normpath(workflow) + pclient = SuiteRuntimeClient(workflow, timeout=options.comms_timeout) + + if options.release_all: + mutation = RELEASE_HOLD_POINT_MUTATION + args = {} + else: + mutation = RELEASE_MUTATION + args = {'tasks': list(task_globs)} mutation_kwargs = { - 'request_string': MUTATION, + 'request_string': mutation, 'variables': { - 'wFlows': [suite], - 'tasks': list(task_globs), + 'wFlows': [workflow], + **args } } diff --git a/cylc/flow/scripts/scan.py b/cylc/flow/scripts/scan.py index 9f89e0ed60e..2cc3d3d4fcd 100644 --- a/cylc/flow/scripts/scan.py +++ b/cylc/flow/scripts/scan.py @@ -19,10 +19,10 @@ List Cylc workflows. -By default this shows only running or held workflows. +By default this shows only running or paused workflows. Examples: - # list all "active" workflows (i.e. running or held) + # list all "active" workflows (i.e. running or paused) $ cylc scan # show more information about these workflows @@ -35,7 +35,7 @@ $ cylc scan --state stopped # list all workflows (active or inactive) - $ cylc scan --state=running,held,stopped + $ cylc scan --state=running,paused,stopped $ cylc scan --state=all # or using the shorthand # filter workflows by name @@ -75,7 +75,7 @@ # all supported suite states FLOW_STATES = { 'running', - 'held', + 'paused', 'stopping', 'stopped' } @@ -85,7 +85,7 @@ FLOW_STATE_CMAP = { # suite state: term colour 'running': 'green', - 'held': 'fg 172', + 'paused': 'fg 172', 'stopping': 'fg 201', 'stopped': 'red' } @@ -97,7 +97,7 @@ # can appear wildly different font-depending and may not # even be monospace 'running': '▶', - 'held': '‖', + 'paused': '‖', 'stopping': '◧', 'stopped': '■' } @@ -179,7 +179,7 @@ def get_option_parser(): ' or "all" to show everything. See the full `cylc scan` help' ' for a list of supported states.' ), - default='running,held,stopping', + default='running,paused,stopping', action='store' ) @@ -334,7 +334,7 @@ def _format_rich(flow, opts): def sort_function(flow): if flow.get('status') == 'running': state = 0 - elif flow.get('status') == 'held': + elif flow.get('status') == 'paused': state = 0 elif flow.get('contact'): state = 2 @@ -401,9 +401,9 @@ def get_pipe(opts, formatter, scan_dir=None): pipe = scan show_running = 'running' in opts.states - show_held = 'held' in opts.states - show_active = show_running or show_held or 'stopping' in opts.states - # show_active = bool({'running', 'held'} & opts.states) + show_paused = 'paused' in opts.states + show_active = show_running or show_paused or 'stopping' in opts.states + # show_active = bool({'running', 'paused'} & opts.states) show_inactive = bool({'stopped'} & opts.states) # filter by flow name @@ -423,8 +423,8 @@ def get_pipe(opts, formatter, scan_dir=None): graphql_fields = {} graphql_filters = set() - # filter held/running flows - if show_active and not (show_running and show_held): + # filter paused/running flows + if show_active and not (show_running and show_paused): graphql_fields['status'] = None graphql_filters.add((('status',), tuple(opts.states))) diff --git a/cylc/flow/scripts/stop.py b/cylc/flow/scripts/stop.py index 71afa7b9d85..14681bba21f 100755 --- a/cylc/flow/scripts/stop.py +++ b/cylc/flow/scripts/stop.py @@ -105,10 +105,9 @@ def check(self): def get_option_parser(): parser = COP( __doc__, comms=True, - argdoc=[("REG", "Suite name"), - ("[STOP]", """a/ task POINT (cycle point), or - b/ ISO 8601 date-time (clock time), or - c/ TASK (task ID).""")]) + argdoc=[("REG", "Workflow name"), + ("[STOP]", "task POINT (cycle point), or TASK (task ID).")] + ) parser.add_option( "-k", "--kill", diff --git a/cylc/flow/suite_db_mgr.py b/cylc/flow/suite_db_mgr.py index 4f6e728c170..a101ceeb5c1 100644 --- a/cylc/flow/suite_db_mgr.py +++ b/cylc/flow/suite_db_mgr.py @@ -53,7 +53,7 @@ class SuiteDatabaseManager: KEY_UUID_STR = 'uuid_str' KEY_CYLC_VERSION = 'cylc_version' KEY_UTC_MODE = 'UTC_mode' - KEY_HOLD = 'is_held' + KEY_PAUSED = 'is_paused' KEY_HOLD_CYCLE_POINT = 'holdcp' KEY_RUN_MODE = 'run_mode' KEY_STOP_CLOCK_TIME = 'stop_clock_time' @@ -138,9 +138,13 @@ def delete_suite_params(self, *keys): for key in keys: self.db_deletes_map[self.TABLE_SUITE_PARAMS].append({'key': key}) - def delete_suite_hold(self): - """Delete suite hold flag and hold cycle point.""" - self.delete_suite_params(self.KEY_HOLD, self.KEY_HOLD_CYCLE_POINT) + def delete_suite_paused(self): + """Delete paused status.""" + self.delete_suite_params(self.KEY_PAUSED) + + def delete_suite_hold_cycle_point(self): + """Delete suite hold cycle point.""" + self.delete_suite_params(self.KEY_HOLD_CYCLE_POINT) def delete_suite_stop_clock_time(self): """Delete suite stop clock time from suite_params table.""" @@ -300,9 +304,9 @@ def put_suite_params(self, schd): self.db_inserts_map[self.TABLE_SUITE_PARAMS].append({ "key": self.KEY_CYCLE_POINT_FORMAT, "value": schd.config.cycle_point_dump_format}) - if schd.pool.is_held: + if schd.is_paused: self.db_inserts_map[self.TABLE_SUITE_PARAMS].append({ - "key": self.KEY_HOLD, "value": 1}) + "key": self.KEY_PAUSED, "value": 1}) for key in ( self.KEY_INITIAL_CYCLE_POINT, self.KEY_FINAL_CYCLE_POINT, @@ -326,9 +330,9 @@ def put_suite_params_1(self, key, value): self.db_inserts_map[self.TABLE_SUITE_PARAMS].append( {"key": key, "value": value}) - def put_suite_hold(self): - """Put suite hold flag to suite_params table.""" - self.put_suite_params_1(self.KEY_HOLD, 1) + def put_suite_paused(self): + """Put suite paused flag to suite_params table.""" + self.put_suite_params_1(self.KEY_PAUSED, 1) def put_suite_hold_cycle_point(self, value): """Put suite hold cycle point to suite_params table.""" diff --git a/cylc/flow/suite_files.py b/cylc/flow/suite_files.py index 8dda5e0376f..c99020defcf 100644 --- a/cylc/flow/suite_files.py +++ b/cylc/flow/suite_files.py @@ -16,6 +16,7 @@ """Suite service files management.""" +from typing import Optional, Tuple, Union import aiofiles from enum import Enum import logging @@ -370,29 +371,32 @@ def get_flow_file(reg): return flow_file -def get_workflow_source_dir(dir_): - """Return the source directory path of the workflow in directory provided. - Args: - dir_ (path): - directory to check for an installed flow - """ +def get_workflow_source_dir( + run_dir: Union[Path, str] +) -> Union[Tuple[str, Path], Tuple[None, None]]: + """Get the source directory path of the workflow in directory provided. + + Args: + run_dir: directory to check for an installed flow inside. + Returns (source_dir, symlink) where the latter is the symlink to the source + dir that exists in the run dir. + """ source_path = Path( - dir_, - SuiteFiles.Install.DIRNAME, - SuiteFiles.Install.SOURCE) - alt_source_path = Path( - dir_.parent, + run_dir, SuiteFiles.Install.DIRNAME, SuiteFiles.Install.SOURCE) try: source = os.readlink(source_path) return source, source_path except OSError: + alt_source_path = Path( + Path(run_dir).parent, + SuiteFiles.Install.DIRNAME, + SuiteFiles.Install.SOURCE) try: source = os.readlink(alt_source_path) return source, alt_source_path - except OSError: return None, None @@ -478,7 +482,9 @@ def parse_suite_arg(options, arg): return name, path -def register(flow_name=None, source=None): +def register( + flow_name: Optional[str] = None, source: Optional[str] = None +) -> str: """Set up workflow. This completes some of the set up completed by cylc install. Called only if running workflow that has not been installed. @@ -489,11 +495,11 @@ def register(flow_name=None, source=None): Creates the .service directory. Args: - flow_name (str): workflow name, default basename($PWD). - source (str): directory location of flow.cylc file, default $PWD. + flow_name: workflow name, default basename($PWD). + source: directory location of flow.cylc file, default $PWD. Return: - str: The installed suite name (which may be computed here). + The installed suite name (which may be computed here). Raise: WorkflowFilesError: @@ -502,13 +508,8 @@ def register(flow_name=None, source=None): - Nested workflow run directories. """ if flow_name is None: - flow_name = (Path.cwd().stem) - is_valid, message = SuiteNameValidator.validate(flow_name) - if not is_valid: - raise WorkflowFilesError(f'Invalid workflow name - {message}') - if Path.is_absolute(Path(flow_name)): - raise WorkflowFilesError( - f'Workflow name cannot be an absolute path: {flow_name}') + flow_name = Path.cwd().stem + validate_flow_name(flow_name) if source is not None: if os.path.basename(source) == SuiteFiles.FLOW_FILE: source = os.path.dirname(source) @@ -531,8 +532,8 @@ def register(flow_name=None, source=None): def is_installed(path): """Check to see if the path sent contains installed flow. - Checks for valid _cylc-install directory in current folder and checks - source link exists. + Checks for valid _cylc-install directory in current folder and checks + source link exists. """ cylc_install_folder = Path(path, SuiteFiles.Install.DIRNAME) source = Path(cylc_install_folder, SuiteFiles.Install.SOURCE) @@ -548,7 +549,7 @@ def _clean_check(reg, run_dir): reg (str): Workflow name. run_dir (str): Path to the workflow run dir on the filesystem. """ - _validate_reg(reg) + validate_flow_name(reg) reg = os.path.normpath(reg) if reg.startswith('.'): raise WorkflowFilesError( @@ -839,22 +840,18 @@ def get_platforms_from_db(run_dir): pri_dao.close() -def _validate_reg(reg): - """Check suite name is valid. - - Args: - reg (str): Suite name +def validate_flow_name(flow_name: str) -> None: + """Check workflow name is valid and not an absolute path. - Raise: - SuiteServiceFileError: - - reg has form of absolute path or is otherwise not valid + Raise WorkflowFilesError if not valid. """ - is_valid, message = SuiteNameValidator.validate(reg) + is_valid, message = SuiteNameValidator.validate(flow_name) if not is_valid: - raise SuiteServiceFileError(f'invalid suite name "{reg}" - {message}') - if os.path.isabs(reg): - raise SuiteServiceFileError( - f'suite name cannot be an absolute path: {reg}') + raise WorkflowFilesError( + f"invalid workflow name '{flow_name}' - {message}") + if os.path.isabs(flow_name): + raise WorkflowFilesError( + f"workflow name cannot be an absolute path: {flow_name}") def check_nested_run_dirs(run_dir, flow_name): @@ -1251,15 +1248,6 @@ def create_workflow_srv_dir(rundir=None, source=None): workflow_srv_d.mkdir(exist_ok=True, parents=True) -def validate_flow_name(flow_name): - is_valid, message = SuiteNameValidator.validate(flow_name) - if not is_valid: - raise WorkflowFilesError(f'Invalid workflow name - {message}') - if Path.is_absolute(Path(flow_name)): - raise WorkflowFilesError( - f'Workflow name cannot be an absolute path: {flow_name}') - - def validate_source_dir(source, flow_name): """Ensure the source directory is valid. diff --git a/cylc/flow/suite_status.py b/cylc/flow/suite_status.py index 50a142fa0ed..f82027261e8 100644 --- a/cylc/flow/suite_status.py +++ b/cylc/flow/suite_status.py @@ -40,7 +40,7 @@ class SuiteStatus(Enum): """The possible statuses of a suite.""" - HELD = "held" + PAUSED = "paused" """Suite will not submit any new jobs.""" RUNNING = "running" @@ -135,8 +135,8 @@ def get_suite_status(schd): status = SuiteStatus.RUNNING status_msg = '' - if schd.pool.is_held: - status = SuiteStatus.HELD + if schd.is_paused: + status = SuiteStatus.PAUSED elif schd.stop_mode is not None: status = SuiteStatus.STOPPING status_msg = f'Stopping: {schd.stop_mode.describe()}' diff --git a/cylc/flow/task_events_mgr.py b/cylc/flow/task_events_mgr.py index 097ca76b787..3e2fe669d44 100644 --- a/cylc/flow/task_events_mgr.py +++ b/cylc/flow/task_events_mgr.py @@ -199,7 +199,8 @@ def __init__(self, suite, proc_pool, suite_db_mgr, broadcast_mgr, self.event_timers_updated = True # To be set by the task pool: self.spawn_func = None - # pflag was set to True to stimulate dependency negotiation in SoS. + # pflag was set to True to stimulate dependency negotiation in SoS + # (flag is turned on by commands that change task state) self.pflag = False self.timestamp = timestamp diff --git a/cylc/flow/task_id.py b/cylc/flow/task_id.py index bc75c712509..ede75587760 100644 --- a/cylc/flow/task_id.py +++ b/cylc/flow/task_id.py @@ -17,7 +17,9 @@ import re +from typing import Optional +from cylc.flow.cycling import PointBase from cylc.flow.cycling.loader import get_point, standardise_point_string from cylc.flow.exceptions import PointParsingError @@ -87,7 +89,7 @@ def get_standardised_point_string(cls, point_string): return point_string @classmethod - def get_standardised_point(cls, point_string): + def get_standardised_point(cls, point_string: str) -> Optional[PointBase]: """Return a standardised point.""" return get_point(cls.get_standardised_point_string(point_string)) diff --git a/cylc/flow/task_pool.py b/cylc/flow/task_pool.py index 2f1d6618d8a..07acfbf9a2f 100644 --- a/cylc/flow/task_pool.py +++ b/cylc/flow/task_pool.py @@ -18,11 +18,12 @@ """ +from collections import Counter from fnmatch import fnmatchcase from string import ascii_letters import json from time import time -from collections import Counter +from typing import Iterable, TYPE_CHECKING from cylc.flow.parsec.OrderedDict import OrderedDict @@ -57,6 +58,9 @@ from cylc.flow.platforms import get_platform from cylc.flow.task_queues.independent import IndepQueueManager +if TYPE_CHECKING: + from cylc.flow.cycling import PointBase + class FlowLabelMgr: """ @@ -171,7 +175,6 @@ def __init__(self, config, suite_db_mgr, task_events_mgr, data_store_mgr): self.pool_changed = False self.rhpool_changed = False - self.is_held = False self.hold_point = None self.abs_outputs_done = set() @@ -289,11 +292,11 @@ def release_runahead_tasks(self): # Get the earliest point with unfinished tasks. runahead_base_point = min(points) + runahead_number_limit = None + runahead_time_limit = None if isinstance(self.custom_runahead_limit, IntegerInterval): - number_limit = int(self.custom_runahead_limit) - runahead_time_limit = None + runahead_number_limit = int(self.custom_runahead_limit) elif isinstance(self.custom_runahead_limit, ISO8601Interval): - number_limit = None runahead_time_limit = self.custom_runahead_limit # Get all cycling points possible after the runahead base point. @@ -302,31 +305,30 @@ def release_runahead_tasks(self): # Cache for speed. sequence_points = self._prev_runahead_sequence_points else: - sequence_points = [] + sequence_points = set() for sequence in self.config.sequences: seq_point = sequence.get_next_point(runahead_base_point) count = 1 while seq_point is not None: - if number_limit is None: - if (seq_point > runahead_base_point + - runahead_time_limit): + if runahead_time_limit is not None: + if seq_point > (runahead_base_point + + runahead_time_limit): break else: - if count > number_limit: + if count > runahead_number_limit: break count += 1 - sequence_points.append(seq_point) + sequence_points.add(seq_point) seq_point = sequence.get_next_point(seq_point) - sequence_points = set(sequence_points) self._prev_runahead_sequence_points = sequence_points self._prev_runahead_base_point = runahead_base_point points = set(points).union(sequence_points) - if number_limit is not None: + if runahead_number_limit is not None: # Calculate which tasks to release based on a maximum number of # active cycle points (active meaning non-finished tasks). - latest_allowed_point = sorted(points)[:number_limit][-1] + latest_allowed_point = sorted(points)[:runahead_number_limit][-1] if self.max_future_offset is not None: # For the first N points, release their future trigger tasks. latest_allowed_point += self.max_future_offset @@ -912,13 +914,11 @@ def warn_stop_orphans(self): point, name, submit_num, key1)) def is_stalled(self): - """Return True if the suite is stalled. + """Return True if the workflow is stalled. - A suite is stalled if it is not held and the active pool contains only - unhandled failed tasks. + A workflow is stalled if the active pool contains only unhandled + failed tasks. """ - if self.is_held: - return False unhandled_failed = [] for itask in self.get_tasks(): if itask.state(*TASK_STATUSES_FAILURE): @@ -977,43 +977,38 @@ def report_unmet_deps(self): ) ) - def set_hold_point(self, point): - """Set the point after which tasks must be held.""" + def set_hold_point(self, point: 'PointBase') -> None: + """Set the point after which all tasks must be held.""" self.hold_point = point - if point is not None: - for itask in self.get_all_tasks(): - if itask.point > point: - if itask.state.reset(is_held=True): - self.data_store_mgr.delta_task_held(itask) - - def hold_tasks(self, items): - """Hold tasks with IDs matching any item in "ids".""" + for itask in self.get_all_tasks(): + if itask.point > point: + if itask.state.reset(is_held=True): + self.data_store_mgr.delta_task_held(itask) + self.suite_db_mgr.put_suite_hold_cycle_point(point) + + def hold_tasks(self, items: Iterable[str]) -> int: + """Hold tasks with IDs matching the specified items.""" itasks, bad_items = self.filter_task_proxies(items) for itask in itasks: if itask.state.reset(is_held=True): self.data_store_mgr.delta_task_held(itask) return len(bad_items) - def release_tasks(self, items): - """Release held tasks with IDs matching any item in "ids".""" + def release_tasks(self, items: Iterable[str]) -> int: + """Release held tasks with IDs matching any specified items.""" itasks, bad_items = self.filter_task_proxies(items) for itask in itasks: if itask.state.reset(is_held=False): self.data_store_mgr.delta_task_held(itask) return len(bad_items) - def hold_all_tasks(self): - """Hold all tasks.""" - LOG.info("Holding all waiting tasks now") - self.is_held = True + def release_hold_point(self) -> None: + """Release all tasks and unset the workflow hold point.""" + self.hold_point = None for itask in self.get_all_tasks(): - if itask.state.reset(is_held=True): + if itask.state.reset(is_held=False): self.data_store_mgr.delta_task_held(itask) - - def release_all_tasks(self): - """Release all held tasks.""" - self.is_held = False - self.release_tasks(None) + self.suite_db_mgr.delete_suite_hold_cycle_point() def check_abort_on_task_fails(self): """Check whether suite should abort on task failure. @@ -1197,12 +1192,6 @@ def spawn_task(self, name, point, flow_label=None, reflow=True, if future_trigger_overrun: LOG.warning("[%s] -won't run: depends on a " "task beyond the stop point", itask) - if (self.is_held - and itask.state(TASK_STATUS_WAITING, is_held=False)): - # Hold newly-spawned tasks in a held suite (e.g. due to manual - # triggering of a held task). - if itask.state.reset(is_held=True): - self.data_store_mgr.delta_task_held(itask) # Attempt to satisfy any absolute triggers now. # TODO: consider doing this only for tasks with absolute prerequisites. @@ -1422,9 +1411,9 @@ def prune_flow_labels(self): # Nothing to do. return # Gather all current labels. - labels = [] - for itask in self.get_all_tasks(): - labels.append(itask.flow_label) + labels = [itask.flow_label for itask in self.get_all_tasks()] + if not labels: + return # Find any labels common to all tasks. common = self.flow_label_mgr.get_common_labels(labels) # And prune them back to just one. diff --git a/cylc/flow/tui/__init__.py b/cylc/flow/tui/__init__.py index 741ba0cb27f..76abcadb072 100644 --- a/cylc/flow/tui/__init__.py +++ b/cylc/flow/tui/__init__.py @@ -63,7 +63,7 @@ # suite state colour SUITE_COLOURS = { 'running': ('light blue', BACK), - 'held': ('brown', BACK), + 'paused': ('brown', BACK), 'stopping': ('light magenta', BACK), 'stopped': ('light red', BACK), 'error': ('light red', BACK, 'bold') diff --git a/cylc/flow/tui/data.py b/cylc/flow/tui/data.py index 9816f76b306..635ce8a9aa8 100644 --- a/cylc/flow/tui/data.py +++ b/cylc/flow/tui/data.py @@ -76,10 +76,10 @@ MUTATIONS = { 'workflow': [ - 'hold', - 'release', + 'pause', + 'resume', 'reload', - 'stop' + 'stop', ], 'cycle_point': [ 'hold', @@ -106,7 +106,7 @@ MUTATION_TEMPLATES = { 'workflow': ''' mutation($workflow: [WorkflowID]!) { - hold (workflows: $workflow) { + pause (workflows: $workflow) { result } } diff --git a/setup.cfg b/setup.cfg index 7d617e76ca8..364d4d9a240 100644 --- a/setup.cfg +++ b/setup.cfg @@ -92,6 +92,7 @@ cylc.command = kill = cylc.flow.scripts.kill:main list = cylc.flow.scripts.list:main message = cylc.flow.scripts.message:main + pause = cylc.flow.scripts.pause:main ping = cylc.flow.scripts.ping:main play = cylc.flow.scripts.play:main poll = cylc.flow.scripts.poll:main diff --git a/tests/flakyfunctional/hold-release/14-hold-kill/flow.cylc b/tests/flakyfunctional/hold-release/14-hold-kill/flow.cylc index e78708cc808..fb392e0a42a 100644 --- a/tests/flakyfunctional/hold-release/14-hold-kill/flow.cylc +++ b/tests/flakyfunctional/hold-release/14-hold-kill/flow.cylc @@ -23,9 +23,8 @@ """ [[sleeper]] script = """ - if ((CYLC_TASK_TRY_NUMBER == 1)); then - sleep 120 - fi + if ((CYLC_TASK_TRY_NUMBER == 1)); then + sleep 120 + fi """ - [[[job]]] - execution retry delays = PT1S + execution retry delays = PT1S diff --git a/tests/flakyfunctional/restart/19-checkpoint/flow.cylc b/tests/flakyfunctional/restart/19-checkpoint/flow.cylc deleted file mode 100644 index 26c25b38fb7..00000000000 --- a/tests/flakyfunctional/restart/19-checkpoint/flow.cylc +++ /dev/null @@ -1,42 +0,0 @@ -#!jinja2 -[scheduler] - UTC mode=True - cycle point format = %Y - [[events]] - abort on stalled = True - abort on inactivity = True - inactivity = P1M - startup handler = cylc release '%(suite)s' -[scheduling] - initial cycle point = 2016 - final cycle point = 2020 - [[graph]] - P1Y=t1[-P1Y] => t1 -[runtime] - [[t1]] - script = """ -wait "${CYLC_TASK_MESSAGE_STARTED_PID}" 2>/dev/null || true -if [[ "${CYLC_TASK_CYCLE_POINT}" == '2017' ]]; then - cylc__job__poll_grep_suite_log -F \ - '[t1.2017] status=submitted: (received)started' - sleep 2 # make sure status change recorded in DB - cylc broadcast "${CYLC_SUITE_NAME}" -p '2017' -n 't1' --set='script="true"' - cylc hold "${CYLC_SUITE_NAME}" - cylc__job__poll_grep_suite_log -F \ - 'INFO - Command succeeded: hold()' - sleep 2 - (cd "${CYLC_SUITE_DEF_PATH}"; cp -p 'flow2.cylc' 'flow.cylc') - cylc reload "${CYLC_SUITE_NAME}" - cylc__job__poll_grep_suite_log 'Reload completed' - cylc set-outputs "${CYLC_SUITE_NAME}" 't1.2017' - while ! cylc show "${CYLC_SUITE_NAME}" 't2.2017' 1>'/dev/null' 2>&1; do - sleep 1 # make sure insert completes - done - sleep 2 - cylc release "${CYLC_SUITE_NAME}" -fi -""" - [[[job]]] - execution time limit = PT50S - [[[events]]] - failed handler = cylc release '%(suite)s' diff --git a/tests/flakyfunctional/restart/19-checkpoint/flow2.cylc b/tests/flakyfunctional/restart/19-checkpoint/flow2.cylc deleted file mode 100644 index f75434bc0eb..00000000000 --- a/tests/flakyfunctional/restart/19-checkpoint/flow2.cylc +++ /dev/null @@ -1,18 +0,0 @@ -#!jinja2 -[scheduler] - UTC mode=True - cycle point format = %Y - [[events]] - abort on stalled = True - abort on inactivity = True - inactivity = P1M -[scheduling] - initial cycle point = 2016 - final cycle point = 2020 - [[graph]] - P1Y=t2[-P1Y] => t1 => t2 -[runtime] - [[t1]] - script = true - [[t2]] - script = false diff --git a/tests/flakyfunctional/restart/21-task-elapsed.t b/tests/flakyfunctional/restart/21-task-elapsed.t index a118d63c9cf..0771063859a 100755 --- a/tests/flakyfunctional/restart/21-task-elapsed.t +++ b/tests/flakyfunctional/restart/21-task-elapsed.t @@ -62,7 +62,7 @@ LOADING task run times + t1: %d,%d,%d,%d,%d,%d,%d,%d,%d,%d __OUT__ suite_run_ok "${TEST_NAME_BASE}-restart-3" \ - cylc play "${SUITE_NAME}" --hold + cylc play "${SUITE_NAME}" --hold-after=1900 # allow the task pool to settle before requesting a dump cylc suite-state "${SUITE_NAME}" \ --task=t1 \ diff --git a/tests/flakyfunctional/restart/21-task-elapsed/flow.cylc b/tests/flakyfunctional/restart/21-task-elapsed/flow.cylc index c937fa1804e..bb4068357f3 100644 --- a/tests/flakyfunctional/restart/21-task-elapsed/flow.cylc +++ b/tests/flakyfunctional/restart/21-task-elapsed/flow.cylc @@ -6,7 +6,6 @@ abort on stalled = True abort on inactivity = True inactivity = PT3M - startup handler = cylc release '%(suite)s' [scheduling] initial cycle point = 2016 final cycle point = 2031 diff --git a/tests/flakyfunctional/restart/40-auto-restart-force-stop.t b/tests/flakyfunctional/restart/40-auto-restart-force-stop.t index a002178b56c..c601d059dd5 100644 --- a/tests/flakyfunctional/restart/40-auto-restart-force-stop.t +++ b/tests/flakyfunctional/restart/40-auto-restart-force-stop.t @@ -47,7 +47,7 @@ ${BASE_GLOBAL_CONFIG} available = localhost " -cylc play "${SUITE_NAME}" --hold +cylc play "${SUITE_NAME}" --pause poll_suite_running create_test_global_config '' " diff --git a/tests/functional/cylc-get-host-metrics/test_header b/tests/functional/cylc-get-host-metrics/test_header deleted file mode 120000 index 90bd5a36f92..00000000000 --- a/tests/functional/cylc-get-host-metrics/test_header +++ /dev/null @@ -1 +0,0 @@ -../lib/bash/test_header \ No newline at end of file diff --git a/tests/functional/cylc-get-suite-contact/00-basic.t b/tests/functional/cylc-get-suite-contact/00-basic.t index 9e6e46e1893..05074c458c0 100644 --- a/tests/functional/cylc-get-suite-contact/00-basic.t +++ b/tests/functional/cylc-get-suite-contact/00-basic.t @@ -36,7 +36,7 @@ run_fail "${TEST_NAME_BASE}-get-suite-contact-1" \ cmp_ok "${TEST_NAME_BASE}-get-suite-contact-1.stderr" <<__ERR__ CylcError: ${SUITE_NAME}: cannot get contact info, suite not running? __ERR__ -run_ok "${TEST_NAME_BASE}-run-hold" cylc play --hold "${SUITE_NAME}" +run_ok "${TEST_NAME_BASE}-run-pause" cylc play --pause "${SUITE_NAME}" run_ok "${TEST_NAME_BASE}-get-suite-contact-2" \ cylc get-suite-contact "${SUITE_NAME}" contains_ok "${TEST_NAME_BASE}-get-suite-contact-2.stdout" \ diff --git a/tests/functional/cylc-install/02-failures.t b/tests/functional/cylc-install/02-failures.t index 09388a62672..05923deff94 100644 --- a/tests/functional/cylc-install/02-failures.t +++ b/tests/functional/cylc-install/02-failures.t @@ -51,7 +51,7 @@ popd || exit TEST_NAME="${TEST_NAME_BASE}-nodir" make_rnd_suite rm -rf "${RND_SUITE_SOURCE}" -run_fail "${TEST_NAME}" cylc install --flow-name="${RND_SUITE_NAME}" --no-run-name -C "${RND_SUITE_SOURCE}" +run_fail "${TEST_NAME}" cylc install --flow-name="${RND_SUITE_NAME}" --no-run-name -C "${RND_SUITE_SOURCE}" contains_ok "${TEST_NAME}.stderr" <<__ERR__ WorkflowFilesError: no flow.cylc or suite.rc in ${RND_SUITE_SOURCE} __ERR__ @@ -75,7 +75,7 @@ TEST_NAME="${TEST_NAME_BASE}-no-abs-path-flow-name" make_rnd_suite run_fail "${TEST_NAME}" cylc install --flow-name="${RND_SUITE_SOURCE}" -C "${RND_SUITE_SOURCE}" contains_ok "${TEST_NAME}.stderr" <<__ERR__ -WorkflowFilesError: Workflow name cannot be an absolute path: ${RND_SUITE_SOURCE} +WorkflowFilesError: workflow name cannot be an absolute path: ${RND_SUITE_SOURCE} __ERR__ purge_rnd_suite @@ -95,7 +95,7 @@ TEST_NAME="${TEST_NAME_BASE}--invalid-flow-name-cylc-install" make_rnd_suite run_fail "${TEST_NAME}" cylc install --flow-name=\.invalid -C "${RND_SUITE_SOURCE}" contains_ok "${TEST_NAME}.stderr" <<__ERR__ -WorkflowFilesError: Invalid workflow name - cannot start with: \`\`.\`\`, \`\`-\`\` +WorkflowFilesError: invalid workflow name '.invalid' - cannot start with: \`\`.\`\`, \`\`-\`\` __ERR__ purge_rnd_suite @@ -120,7 +120,7 @@ done TEST_NAME="${TEST_NAME_BASE}--no-run-name-and--run-name-forbidden" make_rnd_suite pushd "${RND_SUITE_SOURCE}" || exit 1 -run_fail "${TEST_NAME}" cylc install --run-name="${RND_SUITE_NAME}" --no-run-name +run_fail "${TEST_NAME}" cylc install --run-name="${RND_SUITE_NAME}" --no-run-name contains_ok "${TEST_NAME}.stderr" <<__ERR__ cylc: error: options --no-run-name and --run-name are mutually exclusive. __ERR__ diff --git a/tests/functional/cylc-ping/03-check-keys-local.t b/tests/functional/cylc-ping/03-check-keys-local.t index 0e78f96f791..4f4b1b8c757 100644 --- a/tests/functional/cylc-ping/03-check-keys-local.t +++ b/tests/functional/cylc-ping/03-check-keys-local.t @@ -32,7 +32,7 @@ run_ok "${TEST_NAME_BASE}-validate" cylc validate "${SUITE_NAME}" SRVD="${SUITE_RUN_DIR}/.service" -suite_run_ok "${TEST_NAME_BASE}-run-hold" cylc play --hold "${SUITE_NAME}" +suite_run_ok "${TEST_NAME_BASE}-run-pause" cylc play --pause "${SUITE_NAME}" exists_ok "${SRVD}/client.key_secret" exists_ok "${SRVD}/server.key_secret" diff --git a/tests/functional/cylc-scan/04-outputs/flow.cylc b/tests/functional/cylc-scan/04-outputs/flow.cylc deleted file mode 100644 index 7f9e9a13e51..00000000000 --- a/tests/functional/cylc-scan/04-outputs/flow.cylc +++ /dev/null @@ -1,27 +0,0 @@ -[meta] - title = Cylc Scan test suite. - description = """ - Stalls when the first task fails. - Here we test out a multi-line description! - """ - custom_metadata = something_custom - another_metadata = 1 -[scheduler] - [[events]] - timeout = PT1M - abort on timeout = True -[scheduling] - cycling mode = integer - initial cycle point = 1 - final cycle point = 2 - [[graph]] - # oops makes bar spawn as waiting - R/1 = foo & oops => bar - R/2 = bar[-P1] => pub -[runtime] - [[foo]] - script = false - [[bar]] - script = true - [[pub]] - script = true diff --git a/tests/functional/cylc-scan/test_header b/tests/functional/cylc-scan/test_header deleted file mode 120000 index 90bd5a36f92..00000000000 --- a/tests/functional/cylc-scan/test_header +++ /dev/null @@ -1 +0,0 @@ -../lib/bash/test_header \ No newline at end of file diff --git a/tests/functional/cylc-submit/test_header b/tests/functional/cylc-submit/test_header deleted file mode 120000 index 90bd5a36f92..00000000000 --- a/tests/functional/cylc-submit/test_header +++ /dev/null @@ -1 +0,0 @@ -../lib/bash/test_header \ No newline at end of file diff --git a/tests/functional/cylc.wallclock/test_header b/tests/functional/cylc.wallclock/test_header deleted file mode 120000 index 90bd5a36f92..00000000000 --- a/tests/functional/cylc.wallclock/test_header +++ /dev/null @@ -1 +0,0 @@ -../lib/bash/test_header \ No newline at end of file diff --git a/tests/functional/events/25-held-not-stalled.t b/tests/functional/events/25-held-not-stalled.t index a3059245a3c..ec1487fdce3 100755 --- a/tests/functional/events/25-held-not-stalled.t +++ b/tests/functional/events/25-held-not-stalled.t @@ -15,14 +15,16 @@ # You should have received a copy of the GNU General Public License # along with this program. If not, see . #------------------------------------------------------------------------------- -# Test abort on stalled with hold, ensure abort on stalled does not applied to -# a suite started with --hold. +# Test abort on stalled does not apply to a workflow started with --hold-after + +# See also tests/functional/pause-resume/02-paused-not-stalled.t + . "$(dirname "$0")/test_header" set_test_number 2 install_suite "${TEST_NAME_BASE}" "${TEST_NAME_BASE}" run_ok "${TEST_NAME_BASE}-validate" cylc validate "${SUITE_NAME}" -suite_run_ok "${TEST_NAME_BASE}-run" cylc play --hold --no-detach "${SUITE_NAME}" +suite_run_ok "${TEST_NAME_BASE}-run" cylc play --hold-after=0 --no-detach "${SUITE_NAME}" purge exit diff --git a/tests/functional/events/25-held-not-stalled/flow.cylc b/tests/functional/events/25-held-not-stalled/flow.cylc index fefd0217aa8..8ef473d9a68 100644 --- a/tests/functional/events/25-held-not-stalled/flow.cylc +++ b/tests/functional/events/25-held-not-stalled/flow.cylc @@ -2,7 +2,7 @@ [[events]] abort on inactivity = False abort on stalled = True - inactivity handler = cylc release '%(suite)s' + inactivity handler = cylc release --all '%(suite)s' inactivity = PT5S [scheduling] [[graph]] diff --git a/tests/functional/events/48-suite-aborted/flow.cylc b/tests/functional/events/48-suite-aborted/flow.cylc index a4884b46775..c3b5df61420 100644 --- a/tests/functional/events/48-suite-aborted/flow.cylc +++ b/tests/functional/events/48-suite-aborted/flow.cylc @@ -15,10 +15,10 @@ [runtime] [[modify]] script = """ -# Hold the suite, so it does not shutdown -cylc hold "${CYLC_SUITE_NAME}" -# Extra content in suite contact file should cause health check to fail -echo 'TIME=MONEY' >>"${CYLC_SUITE_RUN_DIR}/.service/contact" -""" + # Pause the suite, so it does not shutdown + cylc pause "${CYLC_SUITE_NAME}" + # Extra content in suite contact file should cause health check to fail + echo 'TIME=MONEY' >>"${CYLC_SUITE_RUN_DIR}/.service/contact" + """ [[t2]] script = true diff --git a/tests/functional/graph-equivalence/03-multiline_and1.t b/tests/functional/graph-equivalence/03-multiline_and1.t index 7e974bf5749..64210ecce48 100644 --- a/tests/functional/graph-equivalence/03-multiline_and1.t +++ b/tests/functional/graph-equivalence/03-multiline_and1.t @@ -34,8 +34,8 @@ suite_run_ok "${TEST_NAME}" \ #------------------------------------------------------------------------------- delete_db TEST_NAME="${TEST_NAME_BASE}-check-c" -cylc play "${SUITE_NAME}" --hold 1>'out' 2>&1 -poll_grep_suite_log 'Holding all waiting tasks now' +cylc play "${SUITE_NAME}" --hold-after=1 1>'out' 2>&1 +poll_grep_suite_log 'Setting hold cycle point' cylc show "${SUITE_NAME}" 'c.1' | sed -n "/prerequisites/,/outputs/p" > 'c-prereqs' contains_ok "${TEST_SOURCE_DIR}/multiline_and_refs/c-ref" 'c-prereqs' cylc shutdown "${SUITE_NAME}" --now diff --git a/tests/functional/graph-equivalence/04-multiline_and2.t b/tests/functional/graph-equivalence/04-multiline_and2.t index 92be4094a8d..e3ffcc87819 100644 --- a/tests/functional/graph-equivalence/04-multiline_and2.t +++ b/tests/functional/graph-equivalence/04-multiline_and2.t @@ -34,9 +34,9 @@ suite_run_ok "${TEST_NAME}" \ cylc play --reference-test --debug --no-detach "${SUITE_NAME}" #------------------------------------------------------------------------------- delete_db -TEST_NAME=${TEST_NAME_BASE}-check-c -cylc play "${SUITE_NAME}" --hold 1>'out' 2>&1 -poll_grep_suite_log 'Holding all waiting tasks now' +TEST_NAME="${TEST_NAME_BASE}-check-c" +cylc play "${SUITE_NAME}" --hold-after=1 1>'out' 2>&1 +poll_grep_suite_log 'Setting hold cycle point' cylc show "${SUITE_NAME}" 'c.1' | sed -n "/prerequisites/,/outputs/p" > 'c-prereqs' contains_ok "${TEST_SOURCE_DIR}/multiline_and_refs/c-ref-2" 'c-prereqs' cylc shutdown --max-polls=20 --interval=2 --now "${SUITE_NAME}" diff --git a/tests/functional/graphql/01-workflow.t b/tests/functional/graphql/01-workflow.t index a99d7db0ca5..0735f80119d 100755 --- a/tests/functional/graphql/01-workflow.t +++ b/tests/functional/graphql/01-workflow.t @@ -23,7 +23,7 @@ set_test_number 4 install_suite "${TEST_NAME_BASE}" "${TEST_NAME_BASE}" # run suite -run_ok "${TEST_NAME_BASE}-run" cylc play --hold "${SUITE_NAME}" +run_ok "${TEST_NAME_BASE}-run" cylc play --pause "${SUITE_NAME}" # query suite TEST_NAME="${TEST_NAME_BASE}-workflows" @@ -76,14 +76,13 @@ SUITE_LOG_DIR="$( cylc cat-log -m p "${SUITE_NAME}" \ cylc stop --max-polls=10 --interval=2 --kill "${SUITE_NAME}" # compare to expectation -# Note: Runahead pool has no members on start-up, which means, -# newestRunaheadCyclePoint is expected to be blank. +# Note: Zero active cycle points when starting paused cmp_json "${TEST_NAME}-out" "${TEST_NAME_BASE}-workflows.stdout" << __HERE__ { "workflows": [ { "name": "${SUITE_NAME}", - "status": "held", + "status": "paused", "statusMsg": "", "host": "${HOST}", "port": ${PORT}, @@ -93,13 +92,13 @@ cmp_json "${TEST_NAME}-out" "${TEST_NAME_BASE}-workflows.stdout" << __HERE__ "title": "foo", "description": "bar" }, - "newestRunaheadCyclePoint": "", - "newestActiveCyclePoint": "1", - "oldestActiveCyclePoint": "1", + "newestRunaheadCyclePoint": "1", + "newestActiveCyclePoint": "", + "oldestActiveCyclePoint": "", "reloaded": false, "runMode": "live", "stateTotals": { - "waiting": 1, + "waiting": 0, "expired": 0, "preparing": 0, "submit-failed": 0, @@ -117,12 +116,8 @@ cmp_json "${TEST_NAME}-out" "${TEST_NAME_BASE}-workflows.stdout" << __HERE__ "foo", "root" ], - "states": [ - "waiting" - ], - "latestStateTasks": { - "waiting": ["foo.1"] - } + "states": [], + "latestStateTasks": {} } ] } diff --git a/tests/functional/graphql/03-is-held-arg.t b/tests/functional/graphql/03-is-held-arg.t index 0294fdad45b..f7665601236 100755 --- a/tests/functional/graphql/03-is-held-arg.t +++ b/tests/functional/graphql/03-is-held-arg.t @@ -20,11 +20,23 @@ #------------------------------------------------------------------------------- set_test_number 4 #------------------------------------------------------------------------------- -install_suite "${TEST_NAME_BASE}" "${TEST_NAME_BASE}" +init_suite "${TEST_NAME_BASE}" << __FLOW__ +[meta] + title = foo + description = bar +[scheduling] + [[graph]] + R1 = foo +[runtime] + [[BAZ]] + [[foo]] + inherit = BAZ + script = sleep 20 +__FLOW__ # run suite run_ok "${TEST_NAME_BASE}-run" cylc play "${SUITE_NAME}" -cylc hold "${SUITE_NAME}" +cylc hold --after=0 "${SUITE_NAME}" sleep 1 # query suite diff --git a/tests/functional/graphql/03-is-held-arg/flow.cylc b/tests/functional/graphql/03-is-held-arg/flow.cylc deleted file mode 100644 index 855d87a5861..00000000000 --- a/tests/functional/graphql/03-is-held-arg/flow.cylc +++ /dev/null @@ -1,16 +0,0 @@ -[meta] - title = foo - description = bar - -[scheduler] - UTC mode = True - -[scheduling] - [[graph]] - R1 = foo - -[runtime] - [[BAZ]] - [[foo]] - inherit = BAZ - script = sleep 20 diff --git a/tests/functional/hold-release/00-suite.t b/tests/functional/hold-release/00-suite.t index 3dd03bb6219..663a82b2fcc 100755 --- a/tests/functional/hold-release/00-suite.t +++ b/tests/functional/hold-release/00-suite.t @@ -1,7 +1,7 @@ #!/usr/bin/env bash # THIS FILE IS PART OF THE CYLC SUITE ENGINE. # Copyright (C) NIWA & British Crown (Met Office) & Contributors. -# +# # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by # the Free Software Foundation, either version 3 of the License, or @@ -15,7 +15,7 @@ # You should have received a copy of the GNU General Public License # along with this program. If not, see . #------------------------------------------------------------------------------- -# Test suite hold and release, with cycling and async tasks present. +# Test hold point and release, with cycling and async tasks present. . "$(dirname "$0")/test_header" set_test_number 2 reftest diff --git a/tests/functional/hold-release/00-suite/flow.cylc b/tests/functional/hold-release/00-suite/flow.cylc index bd13f6a564a..559bb5baf6b 100644 --- a/tests/functional/hold-release/00-suite/flow.cylc +++ b/tests/functional/hold-release/00-suite/flow.cylc @@ -1,12 +1,13 @@ [meta] - title = "hold/release test suite" + title = "hold/release test" description = """ - One task that holds then releases the suite, with - short sleeps to make the effect on the downstream task obvious. + One task that sets hold point then releases all tasks. """ -# ref: bug-fix GitHub Pull Request #843 (5412d01) +# https://github.com/cylc/cylc-flow/pull/843 + +# See also tests/f/pause-resume/00-suite [scheduler] cycle point format = %Y%m%dT%H @@ -20,10 +21,10 @@ [runtime] [[holdrelease]] script = """ -wait -cylc hold "${CYLC_SUITE_NAME}" -cylc__job__poll_grep_suite_log -F 'INFO - Command succeeded: hold()' -cylc release "${CYLC_SUITE_NAME}" -""" + wait + cylc hold --after=1900 "${CYLC_SUITE_NAME}" + cylc__job__poll_grep_suite_log -F 'INFO - Command succeeded: set_hold_point' + cylc release --all "${CYLC_SUITE_NAME}" + """ [[foo,bar]] script = true diff --git a/tests/functional/hold-release/01-beyond-stop.t b/tests/functional/hold-release/01-beyond-stop.t index 781a5db6ae4..06051290009 100755 --- a/tests/functional/hold-release/01-beyond-stop.t +++ b/tests/functional/hold-release/01-beyond-stop.t @@ -1,7 +1,7 @@ #!/usr/bin/env bash # THIS FILE IS PART OF THE CYLC SUITE ENGINE. # Copyright (C) NIWA & British Crown (Met Office) & Contributors. -# +# # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by # the Free Software Foundation, either version 3 of the License, or @@ -15,7 +15,7 @@ # You should have received a copy of the GNU General Public License # along with this program. If not, see . #------------------------------------------------------------------------------- -# Test suite hold and release, with tasks present held beyond stop point. +# Test hold point and release, with tasks present beyond stop point. . "$(dirname "$0")/test_header" set_test_number 2 reftest diff --git a/tests/functional/hold-release/01-beyond-stop/flow.cylc b/tests/functional/hold-release/01-beyond-stop/flow.cylc index d14b2c4538a..c39dc9e6c92 100644 --- a/tests/functional/hold-release/01-beyond-stop/flow.cylc +++ b/tests/functional/hold-release/01-beyond-stop/flow.cylc @@ -1,27 +1,31 @@ [meta] - title = "hold/release a suite with tasks held beyond suite stop point" + title = "test hold point and release, with tasks beyond suite stop point" description = """ -Releasing a held suite should not release tasks that are held because they're -beyond the suite stop point.""" + Releasing all tasks should not release tasks that are held because they're + beyond the suite stop point. + """ -# ref: GitHub Pull Request #1144 +# https://github.com/cylc/cylc-flow/pull/1144 + +# See also tests/f/pause-resume/01-beyond-stop [scheduler] cycle point format = %Y%m%dT%H [scheduling] initial cycle point = 20140101T00 - final cycle point = 20140101T00 + stop after cycle point = 20140101T00 [[graph]] R1 = foo => holdrelease T00 = "foo => bar" [runtime] [[holdrelease]] - # When this task runs foo will be held beyond the suite stop point. + # When this task runs foo will be held beyond the stop point. script = """ -cylc hold $CYLC_SUITE_NAME -cylc release $CYLC_SUITE_NAME""" + cylc hold --after=1900 $CYLC_SUITE_NAME + cylc release --all $CYLC_SUITE_NAME + """ [[foo]] script = true [[bar]] diff --git a/tests/functional/hold-release/01-beyond-stop/reference.log b/tests/functional/hold-release/01-beyond-stop/reference.log index d27bce4248e..33c86a4fdff 100644 --- a/tests/functional/hold-release/01-beyond-stop/reference.log +++ b/tests/functional/hold-release/01-beyond-stop/reference.log @@ -1,5 +1,3 @@ -2014-09-08T09:42:52+12 INFO - Initial point: 20140101T00 -2014-09-08T09:42:52+12 INFO - Final point: 20140101T00 2014-09-08T09:42:52+12 INFO - [foo.20140101T00] -triggered off [] 2014-09-08T09:42:54+12 INFO - [holdrelease.20140101T00] -triggered off ['foo.20140101T00'] 2014-09-08T09:42:54+12 INFO - [bar.20140101T00] -triggered off ['foo.20140101T00'] diff --git a/tests/functional/hold-release/02-hold-on-spawn.t b/tests/functional/hold-release/02-hold-on-spawn.t index 6a7d2a12d49..4116143e34b 100755 --- a/tests/functional/hold-release/02-hold-on-spawn.t +++ b/tests/functional/hold-release/02-hold-on-spawn.t @@ -15,19 +15,20 @@ # You should have received a copy of the GNU General Public License # along with this program. If not, see . #------------------------------------------------------------------------------- -# Test that spawned children of tasks released in a held suite, are held. +# Test that spawned children of released tasks are held when the workflow has +# a hold point. . "$(dirname "$0")/test_header" set_test_number 2 init_suite "${TEST_NAME_BASE}" <<'__FLOW_CONFIG__' [scheduling] - [[dependencies]] + [[dependencies]] R1 = "foo => bar" [runtime] - [[foo, bar]] + [[foo, bar]] script = true __FLOW_CONFIG__ -suite_run_ok "${TEST_NAME_BASE}-run" cylc play --hold "${SUITE_NAME}" +suite_run_ok "${TEST_NAME_BASE}-run" cylc play --hold-after=0 "${SUITE_NAME}" cylc release "${SUITE_NAME}" foo.1 # foo.1 should run and spawn bar.1 as waiting and held @@ -41,6 +42,6 @@ cmp_ok task-pool.out <<__OUT__ 1|bar|waiting|1 __OUT__ -cylc stop --max-polls=10 --interval=2 "${SUITE_NAME}" +cylc stop --now --now "${SUITE_NAME}" purge diff --git a/tests/functional/hold-release/05-release.t b/tests/functional/hold-release/05-release.t index 266988776d3..3f911b87c29 100755 --- a/tests/functional/hold-release/05-release.t +++ b/tests/functional/hold-release/05-release.t @@ -32,20 +32,20 @@ init_suite "${TEST_NAME_BASE}" <<'__FLOW_CONFIG__' [runtime] [[holdrelease]] script = """ -wait -cylc hold $CYLC_SUITE_NAME -cylc__job__poll_grep_suite_log 'Suite held.' -cylc release ${CYLC_SUITE_NAME} '*FF.1' # inexact fam -cylc release ${CYLC_SUITE_NAME} 'TOAST.1' # exact fam -cylc release ${CYLC_SUITE_NAME} 'cat*.1' # inexact tasks -cylc release ${CYLC_SUITE_NAME} 'dog1.1' # exact tasks -cylc release ${CYLC_SUITE_NAME} 'stop.1' # exact tasks + wait + cylc hold --after=0 ${CYLC_SUITE_NAME} + cylc__job__poll_grep_suite_log 'Command succeeded: set_hold_point' + cylc release ${CYLC_SUITE_NAME} '*FF.1' # inexact fam + cylc release ${CYLC_SUITE_NAME} 'TOAST.1' # exact fam + cylc release ${CYLC_SUITE_NAME} 'cat*.1' # inexact tasks + cylc release ${CYLC_SUITE_NAME} 'dog1.1' # exact tasks + cylc release ${CYLC_SUITE_NAME} 'stop.1' # exact tasks -# TODO: finished tasks are not removed if held: should this be the case? -# (is this related to killed tasks being held to prevent retries?) -cylc release ${CYLC_SUITE_NAME} 'spawner.1' -cylc release ${CYLC_SUITE_NAME} 'holdrelease.1' -""" + # TODO: finished tasks are not removed if held: should this be the case? + # (is this related to killed tasks being held to prevent retries?) + cylc release ${CYLC_SUITE_NAME} 'spawner.1' + cylc release ${CYLC_SUITE_NAME} 'holdrelease.1' + """ [[STUFF]] [[TOAST]] [[STOP]] @@ -63,8 +63,8 @@ cylc release ${CYLC_SUITE_NAME} 'holdrelease.1' [[stop]] inherit = STOP script = """ - cylc__job__poll_grep_suite_log '\[dog1\.1\] -task proxy removed (finished)' - cylc stop "${CYLC_SUITE_NAME}" + cylc__job__poll_grep_suite_log '\[dog1\.1\] -task proxy removed (finished)' + cylc stop "${CYLC_SUITE_NAME}" """ __FLOW_CONFIG__ diff --git a/tests/functional/hold-release/17-hold-after-point/flow.cylc b/tests/functional/hold-release/17-hold-after-point/flow.cylc index 1f56c5dea47..8dea0625ce3 100644 --- a/tests/functional/hold-release/17-hold-after-point/flow.cylc +++ b/tests/functional/hold-release/17-hold-after-point/flow.cylc @@ -5,19 +5,20 @@ [scheduler] UTC mode = True + cycle point format = CCYYMMDDThhmmZ [scheduling] - initial cycle point = 20100101T00Z - final cycle point = 20100110T00Z - hold after cycle point = 20100102T00Z + initial cycle point = 2010-01-01 + final cycle point = 2010-01-10 + hold after cycle point = 2010-01-02 [[graph]] R1 = stopper T00 = foo[-P1D] => foo [runtime] [[stopper]] script = """ -cylc__job__poll_grep_suite_log -F 'holding (beyond suite hold point) 20100102T00Z' -cylc stop "${CYLC_SUITE_NAME}" -""" + cylc__job__poll_grep_suite_log -F 'holding (beyond suite hold point) 20100102T0000Z' + cylc stop "${CYLC_SUITE_NAME}" + """ [[foo]] script = true diff --git a/tests/functional/pause-resume/00-suite.t b/tests/functional/pause-resume/00-suite.t new file mode 100644 index 00000000000..bbc01e99b9d --- /dev/null +++ b/tests/functional/pause-resume/00-suite.t @@ -0,0 +1,22 @@ +#!/usr/bin/env bash +# THIS FILE IS PART OF THE CYLC SUITE ENGINE. +# Copyright (C) NIWA & British Crown (Met Office) & Contributors. +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . +#------------------------------------------------------------------------------- +# Test workflow pause and resume, with cycling and async tasks present. +. "$(dirname "$0")/test_header" +set_test_number 2 +reftest +exit diff --git a/tests/functional/pause-resume/00-suite/flow.cylc b/tests/functional/pause-resume/00-suite/flow.cylc new file mode 100644 index 00000000000..09098a7e827 --- /dev/null +++ b/tests/functional/pause-resume/00-suite/flow.cylc @@ -0,0 +1,26 @@ +[meta] + title = "pause/resume test workflow" + +# https://github.com/cylc/cylc-flow/pull/843 + +# See also tests/f/hold-release/00-suite + +[scheduler] + cycle point format = %Y%m%dT%H + +[scheduling] + initial cycle point = 20140101T00 + final cycle point = 20140101T00 + [[graph]] + R1 = "pause_resume => foo & bar" + T00, T06 = "bar" +[runtime] + [[pause_resume]] + script = """ + wait + cylc pause "${CYLC_SUITE_NAME}" + cylc__job__poll_grep_suite_log -F 'INFO - Command succeeded: pause()' + cylc play "${CYLC_SUITE_NAME}" + """ + [[foo,bar]] + script = true diff --git a/tests/functional/pause-resume/00-suite/reference.log b/tests/functional/pause-resume/00-suite/reference.log new file mode 100644 index 00000000000..fd1912a6312 --- /dev/null +++ b/tests/functional/pause-resume/00-suite/reference.log @@ -0,0 +1,3 @@ +INFO - [pause_resume.20140101T00] -triggered off [] +INFO - [foo.20140101T00] -triggered off ['pause_resume.20140101T00'] +INFO - [bar.20140101T00] -triggered off ['pause_resume.20140101T00'] diff --git a/tests/functional/hold-release/12-hold-then-retry.t b/tests/functional/pause-resume/01-beyond-stop.t old mode 100755 new mode 100644 similarity index 93% rename from tests/functional/hold-release/12-hold-then-retry.t rename to tests/functional/pause-resume/01-beyond-stop.t index 4bc17945401..b773766afb8 --- a/tests/functional/hold-release/12-hold-then-retry.t +++ b/tests/functional/pause-resume/01-beyond-stop.t @@ -15,7 +15,7 @@ # You should have received a copy of the GNU General Public License # along with this program. If not, see . #------------------------------------------------------------------------------- -# Test suite hold => retry and submit-retry => suite release +# Test pause and resume, with tasks present beyond stop point. . "$(dirname "$0")/test_header" set_test_number 2 reftest diff --git a/tests/functional/pause-resume/01-beyond-stop/flow.cylc b/tests/functional/pause-resume/01-beyond-stop/flow.cylc new file mode 100644 index 00000000000..95d75e7f442 --- /dev/null +++ b/tests/functional/pause-resume/01-beyond-stop/flow.cylc @@ -0,0 +1,32 @@ +[meta] + title = "pause/resume a suite with tasks held beyond suite stop point" + + description = """ + Resuming a paused workflow should not release tasks + beyond the workflow stop point. + """ + +# https://github.com/cylc/cylc-flow/pull/1144 + +# See also tests/f/hold-release/01-beyond-stop + +[scheduler] + cycle point format = %Y%m%dT%H + +[scheduling] + initial cycle point = 20140101T00 + stop after cycle point = 20140101T00 + [[graph]] + R1 = foo => pause_resume + T00 = foo => bar +[runtime] + [[pause_resume]] + # When this task runs foo will be held beyond the workflow stop point. + script = """ + cylc pause $CYLC_SUITE_NAME + cylc play $CYLC_SUITE_NAME + """ + [[foo]] + script = true + [[bar]] + script = true diff --git a/tests/functional/pause-resume/01-beyond-stop/reference.log b/tests/functional/pause-resume/01-beyond-stop/reference.log new file mode 100644 index 00000000000..5e1a993ef72 --- /dev/null +++ b/tests/functional/pause-resume/01-beyond-stop/reference.log @@ -0,0 +1,3 @@ +INFO - [foo.20140101T00] -triggered off [] +INFO - [pause_resume.20140101T00] -triggered off ['foo.20140101T00'] +INFO - [bar.20140101T00] -triggered off ['foo.20140101T00'] diff --git a/tests/functional/pause-resume/02-paused-not-stalled.t b/tests/functional/pause-resume/02-paused-not-stalled.t new file mode 100644 index 00000000000..aaa369e2a33 --- /dev/null +++ b/tests/functional/pause-resume/02-paused-not-stalled.t @@ -0,0 +1,44 @@ +#!/usr/bin/env bash +# THIS FILE IS PART OF THE CYLC SUITE ENGINE. +# Copyright (C) NIWA & British Crown (Met Office) & Contributors. +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . +#------------------------------------------------------------------------------- +# Test that abort on stalled does not apply to a paused workflow + +# See also tests/functional/events/25-held-not-stalled.t + +. "$(dirname "$0")/test_header" +set_test_number 2 + + +init_suite "${TEST_NAME_BASE}" << __FLOW__ +[scheduler] + [[events]] + abort on inactivity = False + abort on stalled = True + inactivity handler = cylc play '%(suite)s' + inactivity = PT5S +[scheduling] + [[graph]] + R1 = t1 +[runtime] + [[t1]] + script = true +__FLOW__ + +run_ok "${TEST_NAME_BASE}-validate" cylc validate "${SUITE_NAME}" +suite_run_ok "${TEST_NAME_BASE}-run" cylc play --pause --no-detach "${SUITE_NAME}" + +purge diff --git a/tests/functional/pause-resume/12-pause-then-retry.t b/tests/functional/pause-resume/12-pause-then-retry.t new file mode 100755 index 00000000000..b47061aff8a --- /dev/null +++ b/tests/functional/pause-resume/12-pause-then-retry.t @@ -0,0 +1,22 @@ +#!/usr/bin/env bash +# THIS FILE IS PART OF THE CYLC SUITE ENGINE. +# Copyright (C) NIWA & British Crown (Met Office) & Contributors. +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . +#------------------------------------------------------------------------------- +# Test workflow pause => retry and submit-retry => workflow resume +. "$(dirname "$0")/test_header" +set_test_number 2 +reftest +exit diff --git a/tests/functional/hold-release/12-hold-then-retry/flow.cylc b/tests/functional/pause-resume/12-pause-then-retry/flow.cylc similarity index 62% rename from tests/functional/hold-release/12-hold-then-retry/flow.cylc rename to tests/functional/pause-resume/12-pause-then-retry/flow.cylc index 7e527500b25..34b59cb8d2c 100644 --- a/tests/functional/hold-release/12-hold-then-retry/flow.cylc +++ b/tests/functional/pause-resume/12-pause-then-retry/flow.cylc @@ -1,5 +1,5 @@ [meta] - title = Test: run task - hold suite - task job retry - release suite + title = Test: run task - pause workflow - task job retry - resume workflow [scheduler] [[events]] @@ -10,30 +10,29 @@ [scheduling] [[graph]] R1 = """ -t-retry-able:start => t-hold -t-submit-retry-able:submit => t-hold -""" + t-retry-able:start => t-pause + t-submit-retry-able:submit => t-pause + """ [runtime] - [[t-hold]] + [[t-pause]] script = """ - # Hold the suite - cylc hold "${CYLC_SUITE_NAME}" - cylc__job__poll_grep_suite_log -F 'Command succeeded: hold' + cylc pause "${CYLC_SUITE_NAME}" + cylc__job__poll_grep_suite_log -F 'Command succeeded: pause' # Poll t-submit-retry-able, should return submit-fail cylc poll "${CYLC_SUITE_NAME}" 't-submit-retry-able' # Allow t-retry-able to continue rm -f "${CYLC_SUITE_RUN_DIR}/file" cylc__job__poll_grep_suite_log -F \ - '[t-retry-able.1] -running (held) => waiting (held)' + '[t-retry-able.1] -running => waiting' cylc__job__poll_grep_suite_log -F \ - '[t-submit-retry-able.1] -submitted (held) => waiting (held)' - # Release the suite - cylc release "${CYLC_SUITE_NAME}" + '[t-submit-retry-able.1] -submitted => waiting' + # Resume the suite + cylc play "${CYLC_SUITE_NAME}" cylc__job__poll_grep_suite_log -F \ - '[t-retry-able.1] -waiting (held) => waiting' + '[t-retry-able.1] -waiting => waiting (queued)' cylc__job__poll_grep_suite_log -F \ - '[t-submit-retry-able.1] -waiting (held) => waiting' + '[t-submit-retry-able.1] -waiting => waiting (queued)' """ [[t-retry-able]] script = """ @@ -45,8 +44,7 @@ t-submit-retry-able:submit => t-hold false fi """ - [[[job]]] - execution retry delays = PT5S + execution retry delays = PT5S [[t-submit-retry-able]] env-script = """ if ((CYLC_TASK_SUBMIT_NUMBER == 1)); then @@ -55,5 +53,4 @@ t-submit-retry-able:submit => t-hold fi """ script = true - [[[job]]] - submission retry delays = PT5S + submission retry delays = PT5S diff --git a/tests/functional/hold-release/12-hold-then-retry/reference.log b/tests/functional/pause-resume/12-pause-then-retry/reference.log similarity index 78% rename from tests/functional/hold-release/12-hold-then-retry/reference.log rename to tests/functional/pause-resume/12-pause-then-retry/reference.log index 723a5cf96d8..28d1fdb208e 100644 --- a/tests/functional/hold-release/12-hold-then-retry/reference.log +++ b/tests/functional/pause-resume/12-pause-then-retry/reference.log @@ -2,6 +2,6 @@ 2014-12-18T16:14:12Z INFO - Final point: 1 2014-12-18T16:14:12Z INFO - [t-submit-retry-able.1] -triggered off [] 2014-12-18T16:14:12Z INFO - [t-retry-able.1] -triggered off [] -2014-12-18T16:14:15Z INFO - [t-hold.1] -triggered off ['t-retry-able.1', 't-submit-retry-able.1'] +2014-12-18T16:14:15Z INFO - [t-pause.1] -triggered off ['t-retry-able.1', 't-submit-retry-able.1'] 2014-12-18T16:14:24Z INFO - [t-retry-able.1] -triggered off [] 2014-12-18T16:14:25Z INFO - [t-submit-retry-able.1] -triggered off [] diff --git a/tests/functional/hold-release/21-client.t b/tests/functional/pause-resume/21-client.t similarity index 82% rename from tests/functional/hold-release/21-client.t rename to tests/functional/pause-resume/21-client.t index 0f38d327677..90231b9d50c 100755 --- a/tests/functional/hold-release/21-client.t +++ b/tests/functional/pause-resume/21-client.t @@ -15,21 +15,21 @@ # You should have received a copy of the GNU General Public License # along with this program. If not, see . #------------------------------------------------------------------------------- -# Test release held suite using the "cylc client" command. +# Test resume paused workflow using the "cylc client" command. . "$(dirname "$0")/test_header" set_test_number 3 install_suite "${TEST_NAME_BASE}" "${TEST_NAME_BASE}" run_ok "${TEST_NAME_BASE}-validate" cylc validate "${SUITE_NAME}" -cylc play --reference-test --hold --debug --no-detach "${SUITE_NAME}" \ +cylc play --reference-test --pause --debug --no-detach "${SUITE_NAME}" \ 1>"${TEST_NAME_BASE}.out" 2>&1 & CYLC_RUN_PID=$! poll_suite_running -read -r -d '' release <<_args_ +read -r -d '' resume <<_args_ {"request_string": " mutation { - release(workflows: [\"${SUITE_NAME}\"]){ + resume(workflows: [\"${SUITE_NAME}\"]){ result } } @@ -39,7 +39,7 @@ _args_ # shellcheck disable=SC2086 run_ok "${TEST_NAME_BASE}-client" \ - cylc client "${SUITE_NAME}" 'graphql' < <(echo ${release}) + cylc client "${SUITE_NAME}" 'graphql' < <(echo ${resume}) run_ok "${TEST_NAME_BASE}-run" wait "${CYLC_RUN_PID}" purge exit diff --git a/tests/functional/hold-release/21-client/flow.cylc b/tests/functional/pause-resume/21-client/flow.cylc similarity index 100% rename from tests/functional/hold-release/21-client/flow.cylc rename to tests/functional/pause-resume/21-client/flow.cylc diff --git a/tests/functional/hold-release/21-client/reference.log b/tests/functional/pause-resume/21-client/reference.log similarity index 100% rename from tests/functional/hold-release/21-client/reference.log rename to tests/functional/pause-resume/21-client/reference.log diff --git a/tests/flakyfunctional/cylc-kill/test_header b/tests/functional/pause-resume/test_header similarity index 100% rename from tests/flakyfunctional/cylc-kill/test_header rename to tests/functional/pause-resume/test_header diff --git a/tests/functional/pre-initial/12-warm-restart.t b/tests/functional/pre-initial/12-warm-restart.t index dfd812dc31d..7744fe25529 100644 --- a/tests/functional/pre-initial/12-warm-restart.t +++ b/tests/functional/pre-initial/12-warm-restart.t @@ -25,12 +25,12 @@ install_suite "${TEST_NAME_BASE}" warm-start TEST_NAME="${TEST_NAME_BASE}-validate" run_ok "${TEST_NAME}" cylc validate "${SUITE_NAME}" #------------------------------------------------------------------------------- -TEST_NAME=${TEST_NAME_BASE}-run-hold -suite_run_ok "${TEST_NAME}" cylc play "${SUITE_NAME}" --startcp=20130101T12 --hold +TEST_NAME=${TEST_NAME_BASE}-run-paused +suite_run_ok "${TEST_NAME}" cylc play "${SUITE_NAME}" --startcp=20130101T12 --pause #------------------------------------------------------------------------------- cylc stop --max-polls=10 --interval=2 "${SUITE_NAME}" #------------------------------------------------------------------------------- -TEST_NAME=${TEST_NAME_BASE}-run-hold-restart +TEST_NAME=${TEST_NAME_BASE}-restart suite_run_ok "${TEST_NAME}" cylc play "${SUITE_NAME}" # Ensure suite has started poll_suite_running diff --git a/tests/functional/reload/15-state-summary.t b/tests/functional/reload/15-state-summary.t index 49ec67e8954..912dd83e212 100644 --- a/tests/functional/reload/15-state-summary.t +++ b/tests/functional/reload/15-state-summary.t @@ -18,7 +18,7 @@ # Test that the state summary updates immediately when a reload finishes. # (SoD: the original test contrived to get a succeeded and a failed task in the # pool, and no active tasks. That's not possible under SoD, and it seems to me -# a trivial held suite should do to test that the state summary updates after a +# a trivial paused suite should do to test that the state summary updates after a # reload when nothing else is happening). # See https://github.com/cylc/cylc-flow/pull/1756 . "$(dirname "$0")/test_header" @@ -37,8 +37,7 @@ __FLOW_CONFIG__ TEST_NAME="${TEST_NAME_BASE}-validate" run_ok "${TEST_NAME}" cylc validate "${SUITE_NAME}" #------------------------------------------------------------------------------- -# Suite runs and shuts down with a failed task. -cylc play --hold "${SUITE_NAME}" > /dev/null 2>&1 +cylc play --pause "${SUITE_NAME}" > /dev/null 2>&1 sleep 5 cylc reload "${SUITE_NAME}" sleep 5 @@ -47,5 +46,5 @@ TEST_NAME=${TEST_NAME_BASE}-grep # State summary should not say "reloaded = True" grep_ok "reloaded=False" dump.out #------------------------------------------------------------------------------- -cylc stop --max-polls=10 --interval=2 "${SUITE_NAME}" +cylc stop --now --now "${SUITE_NAME}" purge diff --git a/tests/functional/reload/17-graphing-change.t b/tests/functional/reload/17-graphing-change.t index e44eaf77389..7440bb979a3 100755 --- a/tests/functional/reload/17-graphing-change.t +++ b/tests/functional/reload/17-graphing-change.t @@ -32,8 +32,8 @@ grep_suite_log_n_times() { install_suite "${TEST_NAME_BASE}" 'graphing-change' LOG_FILE="${SUITE_RUN_DIR}/log/suite/log" -# start suite in held mode -run_ok "${TEST_NAME_BASE}-add-run" cylc play --debug --hold "${SUITE_NAME}" +# start suite in paused mode +run_ok "${TEST_NAME_BASE}-add-run" cylc play --debug --pause "${SUITE_NAME}" # change the flow.cylc file cp "${TEST_SOURCE_DIR}/graphing-change/flow-1.cylc" \ diff --git a/tests/functional/reload/23-cycle-point-time-zone.t b/tests/functional/reload/23-cycle-point-time-zone.t index 08daf694090..30897782e56 100644 --- a/tests/functional/reload/23-cycle-point-time-zone.t +++ b/tests/functional/reload/23-cycle-point-time-zone.t @@ -26,15 +26,11 @@ set_test_number 5 init_suite "${TEST_NAME_BASE}" << '__FLOW__' [scheduler] UTC mode = False + allow implicit tasks = True [scheduling] initial cycle point = now - [[special tasks]] - clock-trigger = foo(PT0M) [[graph]] - T23 = foo -[runtime] - [[foo]] - script = true + R1 = foo __FLOW__ run_ok "${TEST_NAME_BASE}-validate" cylc validate "${SUITE_NAME}" @@ -42,7 +38,7 @@ run_ok "${TEST_NAME_BASE}-validate" cylc validate "${SUITE_NAME}" # Set time zone to +01:00 export TZ=BST-1 -suite_run_ok "${TEST_NAME_BASE}-run" cylc play "${SUITE_NAME}" --hold +suite_run_ok "${TEST_NAME_BASE}-run" cylc play "${SUITE_NAME}" --pause poll_suite_running # Simulate DST change @@ -50,11 +46,11 @@ export TZ=UTC run_ok "${TEST_NAME_BASE}-reload" cylc reload "${SUITE_NAME}" poll_suite_running -cylc stop "${SUITE_NAME}" -poll_suite_stopped + +cylc stop --now --now "${SUITE_NAME}" log_scan "${TEST_NAME_BASE}-log-scan" "${SUITE_RUN_DIR}/log/suite/log" 1 0 \ - 'LOADING suite parameters' '+ cycle point time zone = +0100' + 'LOADING suite parameters' \ + '+ cycle point time zone = +0100' purge -exit diff --git a/tests/functional/remote/01-file-install/flow.cylc b/tests/functional/remote/01-file-install/flow.cylc deleted file mode 100644 index 0561bf29152..00000000000 --- a/tests/functional/remote/01-file-install/flow.cylc +++ /dev/null @@ -1,33 +0,0 @@ -#!jinja2 -[scheduler] - -{% if SECOND_RUN is defined %} - -[scheduler] - install = {{ SECOND_RUN }} - -{% endif %} - -[scheduling] - [[graph]] - R1 = startup => holder => held -[runtime] - [[startup]] - script = """ - for DIR in "bin" "app" "etc" "lib" "dir1" "dir2" - do - mkdir -p "${CYLC_SUITE_RUN_DIR}/${DIR}" - touch "${CYLC_SUITE_RUN_DIR}/${DIR}/moo" - done - - for FILE in "file1" "file2" - do - touch "${CYLC_SUITE_RUN_DIR}/${FILE}" - done - """ - platform = localhost - [[holder]] - script = """cylc hold "${CYLC_SUITE_NAME}" """ - platform = {{CYLC_TEST_PLATFORM}} - [[held]] - script = true diff --git a/tests/functional/remote/02-install-target.t b/tests/functional/remote/02-install-target.t index 9f5d5bc050e..2de331d84a1 100644 --- a/tests/functional/remote/02-install-target.t +++ b/tests/functional/remote/02-install-target.t @@ -27,9 +27,9 @@ init_suite "${TEST_NAME_BASE}" <<'__FLOW_CONFIG__' graph = remote [runtime] [[remote]] - # thie should not require remote-init because the platform + # this should not require remote-init because the platform # has a shared filesystem (same install target) - script = """cylc hold "${CYLC_SUITE_NAME}" """ + script = true platform = {{CYLC_TEST_PLATFORM}} __FLOW_CONFIG__ diff --git a/tests/functional/restart/16-template-vars/flow.cylc b/tests/functional/restart/16-template-vars/flow.cylc index 1e0f99830ba..9359f6c2229 100644 --- a/tests/functional/restart/16-template-vars/flow.cylc +++ b/tests/functional/restart/16-template-vars/flow.cylc @@ -1,8 +1,6 @@ #!Jinja2 [scheduler] - UTC mode = True - [[events]] - startup handler = cylc release '%(suite)s' + UTC mode = True [scheduling] initial cycle point = 2016 final cycle point = {{FINAL_CYCLE_POINT}} diff --git a/tests/functional/restart/18-template-vars-override/flow.cylc b/tests/functional/restart/18-template-vars-override/flow.cylc index 21db7ab6f57..620faecc065 100644 --- a/tests/functional/restart/18-template-vars-override/flow.cylc +++ b/tests/functional/restart/18-template-vars-override/flow.cylc @@ -1,8 +1,6 @@ #!Jinja2 [scheduler] UTC mode = True - [[events]] - startup handler = cylc release '%(suite)s' [scheduling] initial cycle point = 2016 final cycle point = {{FINAL_CYCLE_POINT}} diff --git a/tests/functional/restart/23-hold-retry.t b/tests/functional/restart/23-hold-retry.t index b031ab6525e..e45a16f843d 100755 --- a/tests/functional/restart/23-hold-retry.t +++ b/tests/functional/restart/23-hold-retry.t @@ -21,19 +21,26 @@ set_test_number 5 install_suite "${TEST_NAME_BASE}" "${TEST_NAME_BASE}" run_ok "${TEST_NAME_BASE}-validate" cylc validate "${SUITE_NAME}" + suite_run_ok "${TEST_NAME_BASE}-run" \ cylc play --debug --no-detach "${SUITE_NAME}" + sqlite3 "${SUITE_RUN_DIR}/log/db" \ 'SELECT cycle, name, status FROM task_pool ORDER BY cycle, name' >'task-pool.out' cmp_ok 'task-pool.out' <<__OUT__ 1|t1|waiting __OUT__ + # restart cylc play "${SUITE_NAME}" --debug --no-detach 1>'out' 2>&1 & SUITE_PID=$! + poll_grep_suite_log -F 'INFO - + t1.1 waiting (held)' -run_ok "${TEST_NAME_BASE}-release" cylc release "${SUITE_NAME}" + +run_ok "${TEST_NAME_BASE}-release" cylc release "${SUITE_NAME}" t1.1 + poll_grep_suite_log -F 'INFO - DONE' + if ! wait "${SUITE_PID}"; then cat 'out' >&2 fi diff --git a/tests/functional/restart/25-hold-suite.t b/tests/functional/restart/25-hold-suite.t index f70176b7453..86640818a6a 100755 --- a/tests/functional/restart/25-hold-suite.t +++ b/tests/functional/restart/25-hold-suite.t @@ -15,18 +15,15 @@ # You should have received a copy of the GNU General Public License # along with this program. If not, see . #------------------------------------------------------------------------------- -# Test restart with held suite +# Test restart with held tasks . "$(dirname "$0")/test_header" -set_test_number 7 +set_test_number 5 install_suite "${TEST_NAME_BASE}" "${TEST_NAME_BASE}" run_ok "${TEST_NAME_BASE}-validate" cylc validate "${SUITE_NAME}" suite_run_ok "${TEST_NAME_BASE}-run" \ cylc play "${SUITE_NAME}" --debug --no-detach --abort-if-any-task-fails -sqlite3 "${SUITE_RUN_DIR}/log/db" \ - 'SELECT value FROM suite_params WHERE key=="is_held"' >'suite-is-held.out' -cmp_ok 'suite-is-held.out' <<<'1' T1_2016_PID="$(sed -n 's/CYLC_JOB_PID=//p' "${SUITE_RUN_DIR}/log/job/2016/t1/01/job.status")" poll_pid_done "${T1_2016_PID}" @@ -37,25 +34,23 @@ cmp_ok 'suite-stopped.out' <<'__OUT__' 2017|t1|waiting __OUT__ +# Restart cylc play "${SUITE_NAME}" --debug --no-detach 1>"${TEST_NAME_BASE}-restart.out" 2>&1 & CYLC_RESTART_PID=$! -# Ensure suite has started poll_suite_running + cylc release "${SUITE_NAME}" 't2.2016' poll_grep 'CYLC_JOB_EXIT' "${SUITE_RUN_DIR}/log/job/2016/t2/01/job.status" -cylc release "${SUITE_NAME}" +cylc release --all "${SUITE_NAME}" cylc poll "${SUITE_NAME}" -# Ensure suite has completed + +# Ensure workflow has completed run_ok "${TEST_NAME_BASE}-restart" wait "${CYLC_RESTART_PID}" -sqlite3 "${SUITE_RUN_DIR}/log/db" \ - 'SELECT value FROM suite_params WHERE key=="is_held"' >'suite-is-held.out' -cmp_ok 'suite-is-held.out' <'/dev/null' -# Should shut down with emtpy task pool. sqlite3 "${SUITE_RUN_DIR}/log/db" \ 'SELECT cycle, name, status FROM task_pool ORDER BY cycle, name' >'task-pool.out' -cmp_ok 'task-pool.out' <<'__OUT__' -__OUT__ +cmp_ok 'task-pool.out' < /dev/null + purge exit diff --git a/tests/functional/restart/25-hold-suite/flow.cylc b/tests/functional/restart/25-hold-suite/flow.cylc index 6c551d07b58..3a656b4d7dc 100644 --- a/tests/functional/restart/25-hold-suite/flow.cylc +++ b/tests/functional/restart/25-hold-suite/flow.cylc @@ -13,7 +13,7 @@ [[t1]] script = """ if [[ "${CYLC_TASK_CYCLE_POINT}" == '2016' ]]; then - cylc hold "${CYLC_SUITE_NAME}" + cylc hold --after=1900 "${CYLC_SUITE_NAME}" cylc stop "${CYLC_SUITE_NAME}" fi """ diff --git a/tests/functional/restart/35-auto-restart-recovery.t b/tests/functional/restart/35-auto-restart-recovery.t index bd812c96110..e235d170384 100644 --- a/tests/functional/restart/35-auto-restart-recovery.t +++ b/tests/functional/restart/35-auto-restart-recovery.t @@ -45,7 +45,7 @@ init_suite "${TEST_NAME}" <<< ' ' create_test_global_config '' "${BASE_GLOBAL_CONFIG}" run_ok "${TEST_NAME}-suite-start" \ - cylc play "${SUITE_NAME}" --host=localhost --hold + cylc play "${SUITE_NAME}" --host=localhost --pause poll_suite_running # corrupt suite @@ -72,7 +72,5 @@ log_scan "${TEST_NAME}-shutdown" "${FILE}" 20 1 \ 'Suite unable to automatically restart after 3 tries' # stop suite - suite should already by stopped but just to be safe -cylc stop --max-polls=10 --interval=2 -kill "${SUITE_NAME}" 2>'/dev/null' +cylc stop --max-polls=10 --interval=2 --kill "${SUITE_NAME}" 2>'/dev/null' purge - -exit diff --git a/tests/functional/restart/37-auto-restart-delay.t b/tests/functional/restart/37-auto-restart-delay.t index a9a24553036..019e37343e1 100644 --- a/tests/functional/restart/37-auto-restart-delay.t +++ b/tests/functional/restart/37-auto-restart-delay.t @@ -62,7 +62,7 @@ ${BASE_GLOBAL_CONFIG} " # Run suite. -cylc play "${SUITE_NAME}" --hold +cylc play "${SUITE_NAME}" --pause poll_suite_running # Condemn host - trigger stop-restart. @@ -111,7 +111,5 @@ else fi cylc stop "${SUITE_NAME}" --now --now 2>/dev/null -sleep 1 -purge -exit +purge diff --git a/tests/functional/restart/52-cycle-point-time-zone.t b/tests/functional/restart/52-cycle-point-time-zone.t index 0a796906636..c0d1e27ee01 100644 --- a/tests/functional/restart/52-cycle-point-time-zone.t +++ b/tests/functional/restart/52-cycle-point-time-zone.t @@ -26,15 +26,11 @@ set_test_number 6 init_suite "${TEST_NAME_BASE}" << '__FLOW__' [scheduler] UTC mode = False + allow implicit tasks = True [scheduling] initial cycle point = now - [[special tasks]] - clock-trigger = foo(PT0M) [[graph]] - T23 = foo -[runtime] - [[foo]] - script = true + R1 = foo __FLOW__ run_ok "${TEST_NAME_BASE}-validate" cylc validate "${SUITE_NAME}" @@ -42,7 +38,7 @@ run_ok "${TEST_NAME_BASE}-validate" cylc validate "${SUITE_NAME}" # Set time zone to +01:00 export TZ=BST-1 -suite_run_ok "${TEST_NAME_BASE}-run" cylc play "${SUITE_NAME}" --hold +suite_run_ok "${TEST_NAME_BASE}-run" cylc play "${SUITE_NAME}" --pause poll_suite_running cylc stop "${SUITE_NAME}" poll_suite_stopped @@ -54,13 +50,13 @@ cmp_ok 'dump.out' <<< 'cycle_point_tz|+0100' # Simulate DST change export TZ=UTC -suite_run_ok "${TEST_NAME_BASE}-restart" cylc play "${SUITE_NAME}" --hold +suite_run_ok "${TEST_NAME_BASE}-restart" cylc play "${SUITE_NAME}" --pause poll_suite_running + cylc stop "${SUITE_NAME}" -poll_suite_stopped log_scan "${TEST_NAME_BASE}-log-scan" "${SUITE_RUN_DIR}/log/suite/log" 1 0 \ - 'LOADING suite parameters' '+ cycle point time zone = +0100' + 'LOADING suite parameters' \ + '+ cycle point time zone = +0100' purge -exit diff --git a/tests/functional/shutdown/18-client-on-dead-suite.t b/tests/functional/shutdown/18-client-on-dead-suite.t index f4185167e28..1a0a64943e3 100755 --- a/tests/functional/shutdown/18-client-on-dead-suite.t +++ b/tests/functional/shutdown/18-client-on-dead-suite.t @@ -34,7 +34,7 @@ init_suite "${TEST_NAME_BASE}" <<'__FLOW_CONFIG__' __FLOW_CONFIG__ run_ok "${TEST_NAME_BASE}-validate" cylc validate "${SUITE_NAME}" -cylc play --hold --no-detach "${SUITE_NAME}" 1>'cylc-run.out' 2>&1 & +cylc play --pause --no-detach "${SUITE_NAME}" 1>'cylc-run.out' 2>&1 & MYPID=$! poll_suite_running kill "${MYPID}" # Should leave behind the contact file diff --git a/tests/integration/conftest.py b/tests/integration/conftest.py index 9bfe99414f3..3a37161a05c 100644 --- a/tests/integration/conftest.py +++ b/tests/integration/conftest.py @@ -20,12 +20,14 @@ from pathlib import Path import re from shutil import rmtree +from typing import List, TYPE_CHECKING import pytest from cylc.flow.cfgspec.glbl_cfg import glbl_cfg from cylc.flow.wallclock import get_current_time_string from cylc.flow.platforms import platform_from_name +from cylc.flow.rundb import CylcSuiteDAO from .utils import ( _expanduser, @@ -37,6 +39,9 @@ _run_flow ) +if TYPE_CHECKING: + from cylc.flow.scheduler import Scheduler + @pytest.hookimpl(tryfirst=True, hookwrapper=True) def pytest_runtest_makereport(item, call): @@ -246,3 +251,54 @@ def event_loop(): for task in asyncio.all_tasks(loop): task.cancel() loop.close() + + +@pytest.fixture +def db_select(): + """Select columns from workflow database. + + Args: + schd: The Scheduler object for the workflow. + table: The name of the database table to query. + *columns (optional): The columns to select from the table. To select + all columns, omit or use '*'. + **where (optional): Kwargs specifying ='' for use in + WHERE clauses. If more than one specified, they will be chained + together using an AND operator. + """ + + def _check_columns(table: str, *columns: str) -> None: + all_columns = [x[0] for x in CylcSuiteDAO.TABLES_ATTRS[table]] + if not all(col in all_columns for col in columns): + raise ValueError( + f"One or more unrecognised column names for table {table} " + f"in: {columns}") + + def _inner( + schd: 'Scheduler', table: str, *columns: str, **where: str + ) -> List[str]: + + if table not in CylcSuiteDAO.TABLES_ATTRS: + raise ValueError(f"Table name '{table}' not recognised") + if not columns: + columns = ('*',) + elif columns != ('*',): + _check_columns(table, *columns) + + stmt = f'SELECT {",".join(columns)} FROM {table}' + stmt_args = [] + if where: + _check_columns(table, *where.keys()) + where_stmt = ' AND '.join([ + f'{col}=?' for col in where.keys() + ]) + stmt += f' WHERE {where_stmt}' + stmt_args = list(where.values()) + + dao = schd.suite_db_mgr.get_pri_dao() + try: + return [i for i in dao.connect().execute(stmt, stmt_args)] + finally: + dao.close() + + return _inner diff --git a/tests/integration/test_data_store_mgr.py b/tests/integration/test_data_store_mgr.py index a28dd648a86..0fd9d3e51ff 100644 --- a/tests/integration/test_data_store_mgr.py +++ b/tests/integration/test_data_store_mgr.py @@ -15,6 +15,7 @@ # along with this program. If not, see . import pytest +from typing import TYPE_CHECKING from cylc.flow import ID_DELIM from cylc.flow.data_store_mgr import ( @@ -22,7 +23,7 @@ JOBS, TASKS, TASK_PROXIES, - WORKFLOW, + WORKFLOW ) from cylc.flow.task_state import ( TASK_STATUS_FAILED, @@ -30,6 +31,13 @@ ) from cylc.flow.wallclock import get_current_time_string +if TYPE_CHECKING: + from cylc.flow.scheduler import Scheduler + + +# NOTE: These tests mutate the data store, so running them in isolation may +# see failures when they actually pass if you run the whole file + def job_config(schd): return { @@ -95,9 +103,11 @@ async def harness(mod_flow, mod_scheduler, mod_run): } } } - reg = mod_flow(flow_def) - schd = mod_scheduler(reg) + reg: str = mod_flow(flow_def) + schd: 'Scheduler' = mod_scheduler(reg) async with mod_run(schd): + schd.pool.hold_tasks('*') + schd.resume_workflow() # Think this is needed to save the data state at first start (?) # Fails without it.. and a test needs to overwrite schd data with this. data = schd.data_store_mgr.data[schd.data_store_mgr.workflow_id] @@ -262,7 +272,7 @@ def test_update_data_structure(harness): assert TASK_STATUS_FAILED in set(collect_states(data, FAMILY_PROXIES)) # state totals changed assert TASK_STATUS_FAILED in data[WORKFLOW].state_totals - # Shows prunning worked + # Shows pruning worked assert len({t.is_held for t in data[TASK_PROXIES].values()}) == 1 diff --git a/tests/integration/test_examples.py b/tests/integration/test_examples.py index 6f493be03c1..b7977ad069b 100644 --- a/tests/integration/test_examples.py +++ b/tests/integration/test_examples.py @@ -80,15 +80,15 @@ async def test_scheduler_arguments(flow, scheduler, run, one_conf): Use the `dest` value specified in the option parser. """ - # Ensure the hold_start option is obeyed by the scheduler. + # Ensure the paused_start option is obeyed by the scheduler. reg = flow(one_conf) - schd = scheduler(reg, hold_start=True) + schd = scheduler(reg, paused_start=True) async with run(schd): - assert schd.paused() + assert schd.is_paused reg = flow(one_conf) - schd = scheduler(reg, hold_start=False) + schd = scheduler(reg, paused_start=False) async with run(schd): - assert not schd.paused() + assert not schd.is_paused @pytest.mark.asyncio @@ -140,7 +140,7 @@ async def test_task_pool(flow, scheduler, one_conf): await schd.configure() # pump the scheduler's heart manually - schd.release_tasks() + schd.release_runahead_tasks() assert len(schd.pool.pool) == 1 @@ -214,3 +214,22 @@ def test_module_one(myflow): def test_module_two(myflow): # Ensure the uuid is set on __init__ assert myflow.uuid_str + + +@pytest.mark.asyncio +async def test_db_select(one, run, db_select): + """Demonstrate and test querying the workflow database.""" + schd = one + async with run(schd): + # Note: can't query database here unfortunately + pass + # Now we can query the DB + # Select all from suite_params table: + assert ('UTC_mode', '0') in db_select(schd, 'suite_params') + # Select name & status columns from task_states table: + results = db_select(schd, 'task_states', 'name', 'status') + assert results[0] == ('one', 'waiting') + # Select all columns where name==one & status==waiting from + # task_states table: + results = db_select(schd, 'task_states', name='one', status='waiting') + assert len(results) == 1 diff --git a/tests/integration/test_publisher.py b/tests/integration/test_publisher.py index a23aa1d8386..c5aaeebdd77 100644 --- a/tests/integration/test_publisher.py +++ b/tests/integration/test_publisher.py @@ -26,7 +26,7 @@ async def test_publisher(flow, scheduler, run, one_conf, port_range): """It should publish deltas when the flow starts.""" reg = flow(one_conf) - schd = scheduler(reg, hold_start=False) + schd = scheduler(reg, paused_start=False) async with run(schd): # create a subscriber subscriber = WorkflowSubscriber( diff --git a/tests/integration/test_resolvers.py b/tests/integration/test_resolvers.py index 2155de1d881..54f2b5d97c8 100644 --- a/tests/integration/test_resolvers.py +++ b/tests/integration/test_resolvers.py @@ -61,11 +61,11 @@ async def flow(mod_flow, mod_scheduler, mod_run): } }) - ret.schd = mod_scheduler(ret.reg, hold_start=True) + ret.schd = mod_scheduler(ret.reg, paused_start=True) await ret.schd.install() await ret.schd.initialise() await ret.schd.configure() - ret.schd.release_tasks() + ret.schd.release_runahead_tasks() ret.schd.data_store_mgr.initiate_data_model() ret.owner = ret.schd.owner @@ -181,7 +181,7 @@ async def test_mutator(flow, flow_args): args = {} response = await flow.resolvers.mutator( None, - 'hold', + 'pause', flow_args, args ) @@ -202,5 +202,5 @@ async def test_nodes_mutator(flow, flow_args): @pytest.mark.asyncio async def test_mutation_mapper(flow): """Test the mapping of mutations to internal command methods.""" - response = await flow.resolvers._mutation_mapper('hold', {}) + response = await flow.resolvers._mutation_mapper('pause', {}) assert response is not None diff --git a/tests/integration/test_scan_api.py b/tests/integration/test_scan_api.py index a86066fb779..5cc075cf187 100644 --- a/tests/integration/test_scan_api.py +++ b/tests/integration/test_scan_api.py @@ -40,7 +40,7 @@ async def flows(mod_flow, mod_scheduler, mod_run, mod_one_conf): """Three workflows in different states. - One stopped, one held and one that thinks its running. + One stopped, one paused and one that thinks its running. TODO: Start one of the workflows with tasks in funny states @@ -55,9 +55,9 @@ async def flows(mod_flow, mod_scheduler, mod_run, mod_one_conf): # a simply hierarchically registered workflow we will leave stopped mod_flow(mod_one_conf, name='a/b/c') - # a simple workflow we will leave held - reg1 = mod_flow(mod_one_conf, name='-held-') - schd1 = mod_scheduler(reg1, hold_start=True) + # a simple workflow we will leave paused + reg1 = mod_flow(mod_one_conf, name='-paused-') + schd1 = mod_scheduler(reg1, paused_start=True) # a workflow with some metadata we will make look like it's running reg2 = mod_flow( @@ -87,7 +87,7 @@ async def flows(mod_flow, mod_scheduler, mod_run, mod_one_conf): }, name='-running-' ) - schd2 = mod_scheduler(reg2, run_mode='simulation', hold_start=False) + schd2 = mod_scheduler(reg2, run_mode='simulation', paused_start=False) # run cylc run async with mod_run(schd1): @@ -106,12 +106,12 @@ async def test_state_filter(flows, mod_test_dir): assert '-stopped-' in lines[0] assert 'a/b/c' in lines[1] - # one held flow - opts = ScanOptions(states='held') + # one paused flow + opts = ScanOptions(states='paused') lines = [] await main(opts, write=lines.append, scan_dir=mod_test_dir) assert len(lines) == 1 - assert '-held-' in lines[0] + assert '-paused-' in lines[0] # one running flow opts = ScanOptions(states='running') @@ -121,13 +121,13 @@ async def test_state_filter(flows, mod_test_dir): assert '-running-' in lines[0] # two active flows - opts = ScanOptions(states='held,running') + opts = ScanOptions(states='paused,running') lines = [] await main(opts, write=lines.append, scan_dir=mod_test_dir) assert len(lines) == 2 # three registered flows - opts = ScanOptions(states='held,running,stopped') + opts = ScanOptions(states='paused,running,stopped') lines = [] await main(opts, write=lines.append, scan_dir=mod_test_dir) assert len(lines) == 4 @@ -137,11 +137,11 @@ async def test_state_filter(flows, mod_test_dir): async def test_name_filter(flows, mod_test_dir): """It should filter flows by name regex.""" # one stopped flow - opts = ScanOptions(states='all', name=['.*held.*']) + opts = ScanOptions(states='all', name=['.*paused.*']) lines = [] await main(opts, write=lines.append, scan_dir=mod_test_dir) assert len(lines) == 1 - assert '-held-' in lines[0] + assert '-paused-' in lines[0] @pytest.mark.asyncio @@ -152,7 +152,7 @@ async def test_name_sort(flows, mod_test_dir): lines = [] await main(opts, write=lines.append, scan_dir=mod_test_dir) assert len(lines) == 4 - assert '-held-' in lines[0] + assert '-paused-' in lines[0] assert '-running-' in lines[1] assert '-stopped-' in lines[2] assert 'a/b/c' in lines[3] @@ -279,7 +279,7 @@ async def test_scan_cleans_stuck_contact_files( dump_contact_file(reg, contact_info) # make sure this flow shows for a regular filesystem-only scan - opts = ScanOptions(states='running,held', format='name') + opts = ScanOptions(states='running,paused', format='name') flows = [] await main(opts, write=flows.append, scan_dir=test_dir) assert len(flows) == 1 @@ -289,7 +289,7 @@ async def test_scan_cleans_stuck_contact_files( assert cont.exists() # make sure this flow shows for a regular filesystem-only scan - opts = ScanOptions(states='running,held', format='name', ping=True) + opts = ScanOptions(states='running,paused', format='name', ping=True) flows = [] await main(opts, write=flows.append, scan_dir=test_dir) assert len(flows) == 0 diff --git a/tests/integration/test_scheduler.py b/tests/integration/test_scheduler.py new file mode 100644 index 00000000000..83b3ce9c4cb --- /dev/null +++ b/tests/integration/test_scheduler.py @@ -0,0 +1,92 @@ +# THIS FILE IS PART OF THE CYLC SUITE ENGINE. +# Copyright (C) NIWA & British Crown (Met Office) & Contributors. +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . + +import asyncio +import pytest +from typing import Any, TYPE_CHECKING + +if TYPE_CHECKING: + from cylc.flow.scheduler import Scheduler + +Fixture = Any + + +@pytest.mark.asyncio +async def test_is_paused_after_stop( + one_conf: Fixture, flow: Fixture, scheduler: Fixture, run: Fixture, + db_select: Fixture): + """Test the paused status is unset on normal shutdown.""" + reg: str = flow(one_conf) + schd: 'Scheduler' = scheduler(reg, paused_start=True) + # Run + async with run(schd): + assert not schd.is_restart + assert schd.is_paused + # Stopped + assert ('is_paused', '1') not in db_select(schd, 'suite_params') + # Restart + schd = scheduler(reg, paused_start=None) + async with run(schd): + assert schd.is_restart + assert not schd.is_paused + + +@pytest.mark.asyncio +async def test_is_paused_after_crash( + one_conf: Fixture, flow: Fixture, scheduler: Fixture, run: Fixture, + db_select: Fixture): + """Test the paused status is not unset for an interrupted workflow.""" + reg: str = flow(one_conf) + schd: 'Scheduler' = scheduler(reg, paused_start=True) + + def ctrl_c(): + raise asyncio.CancelledError("Mock keyboard interrupt") + # Patch this part of the main loop + _schd_suite_shutdown = schd.suite_shutdown + schd.suite_shutdown = ctrl_c + + # Run + with pytest.raises(asyncio.CancelledError) as exc: + async with run(schd): + assert not schd.is_restart + assert schd.is_paused + assert "Mock keyboard interrupt" in str(exc.value) + # Stopped + assert ('is_paused', '1') in db_select(schd, 'suite_params') + # Reset patched method + schd.suite_shutdown = _schd_suite_shutdown + # Restart + schd = scheduler(reg, paused_start=None) + async with run(schd): + assert schd.is_restart + assert schd.is_paused + + +@pytest.mark.asyncio +async def test_resume_does_not_release_tasks(one: Fixture, run: Fixture): + """Test that resuming a workflow does not release any held tasks.""" + schd: 'Scheduler' = one + async with run(schd): + assert schd.is_paused + itasks = schd.pool.get_all_tasks() + assert len(itasks) == 1 + itask = itasks[0] + assert not itask.state.is_held + + schd.command_hold('*') + schd.resume_workflow() + assert not schd.is_paused + assert itask.state.is_held diff --git a/tests/integration/utils/flow_tools.py b/tests/integration/utils/flow_tools.py index 92b294462e7..af68361e70f 100644 --- a/tests/integration/utils/flow_tools.py +++ b/tests/integration/utils/flow_tools.py @@ -53,8 +53,8 @@ def _make_flow(run_dir, test_dir, conf, name=None): def _make_scheduler(reg, **opts): """Return a scheduler object for a flow registration.""" - # This allows hold_start to be overriden: - opts = {'hold_start': True, **opts} + # This allows paused_start to be overriden: + opts = {'paused_start': True, **opts} options = RunOptions(**opts) # create workflow return Scheduler(reg, options) diff --git a/tests/unit/conftest.py b/tests/unit/conftest.py index b0ed413a69d..4134e09eb51 100644 --- a/tests/unit/conftest.py +++ b/tests/unit/conftest.py @@ -34,8 +34,11 @@ def cycling_mode(monkeypatch): """Set the Cylc cycling mode and return its value.""" def _cycling_mode(integer=True): mode = INTEGER_CYCLING_TYPE if integer else ISO8601_CYCLING_TYPE + + class _DefaultCycler: + TYPE = mode monkeypatch.setattr( - 'cylc.flow.cycling.loader.DefaultCycler.TYPE', mode) + 'cylc.flow.cycling.loader.DefaultCycler', _DefaultCycler) return mode return _cycling_mode diff --git a/tests/unit/scripts/test_hold.py b/tests/unit/scripts/test_hold.py new file mode 100644 index 00000000000..8e8d6dc4244 --- /dev/null +++ b/tests/unit/scripts/test_hold.py @@ -0,0 +1,52 @@ +# THIS FILE IS PART OF THE CYLC SUITE ENGINE. +# Copyright (C) NIWA & British Crown (Met Office) & Contributors. +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . + +"""Test logic in cylc-hold script.""" + +import pytest +from typing import Any, Iterable, Optional, Tuple, Type + +from cylc.flow.exceptions import UserInputError +from cylc.flow.option_parsers import Options +from cylc.flow.scripts.hold import get_option_parser, _validate + + +Opts = Options(get_option_parser()) + + +@pytest.mark.parametrize( + 'opts, task_globs, expected_err', + [ + (Opts(), ['*'], None), + (Opts(hold_point_string='2'), [], None), + (Opts(hold_point_string='2'), ['*'], + (UserInputError, "Cannot combine --after with TASK_GLOB")), + (Opts(), [], + (UserInputError, "Missing arguments: TASK_GLOB")), + ] +) +def test_validate( + opts: Options, + task_globs: Iterable[str], + expected_err: Optional[Tuple[Type[Exception], str]]): + + if expected_err: + err, msg = expected_err + with pytest.raises(err) as exc: + _validate(opts, *task_globs) + assert msg in str(exc.value) + else: + _validate(opts, *task_globs) diff --git a/tests/unit/scripts/test_release.py b/tests/unit/scripts/test_release.py new file mode 100644 index 00000000000..fe841b04b8b --- /dev/null +++ b/tests/unit/scripts/test_release.py @@ -0,0 +1,52 @@ +# THIS FILE IS PART OF THE CYLC SUITE ENGINE. +# Copyright (C) NIWA & British Crown (Met Office) & Contributors. +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . + +"""Test logic in cylc-release script.""" + +import pytest +from typing import Any, Iterable, Optional, Tuple, Type + +from cylc.flow.exceptions import UserInputError +from cylc.flow.option_parsers import Options +from cylc.flow.scripts.release import get_option_parser, _validate + + +Opts = Options(get_option_parser()) + + +@pytest.mark.parametrize( + 'opts, task_globs, expected_err', + [ + (Opts(), ['*'], None), + (Opts(release_all=True), [], None), + (Opts(release_all=True), ['*'], + (UserInputError, "Cannot combine --all with TASK_GLOB")), + (Opts(), [], + (UserInputError, "Missing arguments: TASK_GLOB")), + ] +) +def test_validate( + opts: Options, + task_globs: Iterable[str], + expected_err: Optional[Tuple[Type[Exception], str]]): + + if expected_err: + err, msg = expected_err + with pytest.raises(err) as exc: + _validate(opts, *task_globs) + assert msg in str(exc.value) + else: + _validate(opts, *task_globs) diff --git a/tests/unit/test_suite_files.py b/tests/unit/test_suite_files.py index d1b1b1ab695..9b5c91faf8e 100644 --- a/tests/unit/test_suite_files.py +++ b/tests/unit/test_suite_files.py @@ -177,17 +177,18 @@ def test_rundir_children_that_contain_workflows_raise_error( @pytest.mark.parametrize( 'reg, expected_err, expected_msg', [('foo/bar/', None, None), - ('/foo/bar', SuiteServiceFileError, "cannot be an absolute path"), - ('$HOME/alone', SuiteServiceFileError, "invalid suite name")] + ('/foo/bar', WorkflowFilesError, "cannot be an absolute path"), + ('$HOME/alone', WorkflowFilesError, "invalid workflow name"), + ('./foo', WorkflowFilesError, "invalid workflow name")] ) -def test_validate_reg(reg, expected_err, expected_msg): +def test_validate_flow_name(reg, expected_err, expected_msg): if expected_err: with pytest.raises(expected_err) as exc: - suite_files._validate_reg(reg) + suite_files.validate_flow_name(reg) if expected_msg: assert expected_msg in str(exc.value) else: - suite_files._validate_reg(reg) + suite_files.validate_flow_name(reg) @pytest.mark.parametrize(