Skip to content

Commit

Permalink
Merge pull request #4076 from MetRonnie/cylc-pause
Browse files Browse the repository at this point in the history
cylc pause
  • Loading branch information
oliver-sanders authored Mar 19, 2021
2 parents df75d65 + bf55479 commit 4947568
Show file tree
Hide file tree
Showing 95 changed files with 1,252 additions and 717 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/bash.yml
Original file line number Diff line number Diff line change
Expand Up @@ -44,9 +44,9 @@ jobs:
tests/functional/cylc-poll/15-job-st-file-no-batch.t \
tests/functional/events/28-inactivity.t \
tests/functional/events/34-task-abort.t \
tests/functional/hold-release/12-hold-then-retry.t \
tests/functional/job-file-trap/00-sigusr1.t \
tests/functional/job-file-trap/02-pipefail.t \
tests/functional/pause-resume/12-pause-then-retry.t \
tests/functional/shutdown/09-now2.t \
tests/functional/shutdown/13-no-port-file-check.t \
tests/functional/shutdown/14-no-dir-check.t
Expand Down
13 changes: 8 additions & 5 deletions CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -66,10 +66,18 @@ queueing logic centralized.
`cylc register` has been replaced by `cylc install`
([#4000](https://github.com/cylc/cylc-flow/pull/4000)).

Added a new command: `cylc clean`, for removing stopped workflows on the local
and any remote filesystems ([#3961](https://github.com/cylc/cylc-flow/pull/3961),
[#4017](https://github.com/cylc/cylc-flow/pull/4017)).

`cylc run` and `cylc restart` have been replaced by `cylc play`, simplifying
how workflows are restarted
([#4040](https://github.com/cylc/cylc-flow/pull/4040)).

`cylc pause` and `cylc play` are now used to pause and resume workflows,
respectively. `cylc hold` and `cylc release` now only hold and release tasks,
not the whole workflow. ([#4076](https://github.com/cylc/cylc-flow/pull/4076))

"Implicit"/"naked" tasks (tasks that do not have an explicit definition in
`flow.cylc[runtime]`) are now disallowed by default
([#4109](https://github.com/cylc/cylc-flow/pull/4109)). You can allow them by
Expand Down Expand Up @@ -167,11 +175,6 @@ hierarchy and ability to set site config directory.
[#3883](https://github.com/cylc/cylc-flow/pull/3883) - Added a new workflow
config option `[scheduling]stop after cycle point`.

[#3961](https://github.com/cylc/cylc-flow/pull/3961),
[#4017](https://github.com/cylc/cylc-flow/pull/4017) - Added a new command:
`cylc clean`, for removing stopped workflows on the local and any remote
filesystems.

[#3913](https://github.com/cylc/cylc-flow/pull/3913) - Added the ability to
use plugins to parse suite templating variables and additional files to
install. Only one such plugin exists at the time of writing, designed to
Expand Down
17 changes: 10 additions & 7 deletions cylc/flow/cycling/loader.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,9 @@
Each task may have multiple sequences, e.g. 12-hourly and 6-hourly.
"""

from . import integer
from typing import Optional, Type

from . import PointBase, integer
from . import iso8601
from metomi.isodatetime.data import Calendar

Expand Down Expand Up @@ -58,18 +60,19 @@ class DefaultCycler:

"""Store the default TYPE for Cyclers."""

TYPE = None
TYPE: str


def get_point(*args, **kwargs):
def get_point(
value: str, cycling_type: Optional[str] = None
) -> Optional[PointBase]:
"""Return a cylc.flow.cycling.PointBase-derived object from a string."""
if args[0] is None:
if value is None:
return None
cycling_type = kwargs.pop("cycling_type", DefaultCycler.TYPE)
return get_point_cls(cycling_type=cycling_type)(*args, **kwargs)
return get_point_cls(cycling_type=cycling_type)(value)


def get_point_cls(cycling_type=None):
def get_point_cls(cycling_type: Optional[str] = None) -> Type[PointBase]:
"""Return the cylc.flow.cycling.PointBase-derived class we're using."""
if cycling_type is None:
cycling_type = DefaultCycler.TYPE
Expand Down
2 changes: 1 addition & 1 deletion cylc/flow/etc/cylc-bash-completion
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ _cylc() {
cur="${COMP_WORDS[COMP_CWORD]}"
sec="${COMP_WORDS[1]}"
opts="$(cylc scan -t name 2>/dev/null)"
suite_cmds="broadcast|bcast|cat-log|check-versions|clean|compare|diff|dump|edit|ext-trigger|external-trigger|get-suite-config|get-config|get-suite-version|get-cylc-version|graph|hold|insert|install|kill|list|log|ls|tui|ping|play|poll|print|reinstall|release|unhold|reload|remove|report-timings|reset|scan|search|grep|set-verbosity|show|set-outputs|stop|shutdown|single|suite-state|test-battery|trigger|validate|view|warranty"
suite_cmds="broadcast|bcast|cat-log|check-versions|clean|compare|diff|dump|edit|ext-trigger|external-trigger|get-suite-config|get-config|get-suite-version|get-cylc-version|graph|hold|insert|install|kill|list|log|ls|tui|pause|ping|play|poll|print|reinstall|release|unhold|reload|remove|report-timings|reset|scan|search|grep|set-verbosity|show|set-outputs|stop|shutdown|single|suite-state|test-battery|trigger|validate|view|warranty"


if [[ ${COMP_CWORD} -eq 1 ]]; then
Expand Down
61 changes: 35 additions & 26 deletions cylc/flow/network/resolvers.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import logging
import queue
from time import time
from typing import Iterable, Tuple, TYPE_CHECKING
from uuid import uuid4

from graphene.utils.str_converters import to_snake_case
Expand All @@ -34,6 +35,11 @@
NodesEdges, PROXY_NODES, SUB_RESOLVERS, parse_node_id, sort_elements
)

if TYPE_CHECKING:
from cylc.flow.data_store_mgr import DataStoreMgr
from cylc.flow.scheduler import Scheduler


logger = logging.getLogger(__name__)

DELTA_SLEEP_INTERVAL = 0.5
Expand Down Expand Up @@ -438,15 +444,11 @@ async def subscribe_delta(self, root, info, args):
class Resolvers(BaseResolvers):
"""Workflow Service context GraphQL query and mutation resolvers."""

schd = None
schd: 'Scheduler'

def __init__(self, data, **kwargs):
def __init__(self, data: 'DataStoreMgr', schd: 'Scheduler') -> None:
super().__init__(data)

# Set extra attributes
for key, value in kwargs.items():
if hasattr(self, key):
setattr(self, key, value)
self.schd = schd

# Mutations
async def mutator(self, *m_args):
Expand Down Expand Up @@ -520,16 +522,19 @@ def broadcast(
cutoff)
raise ValueError('Unsupported broadcast mode')

def hold(self, tasks=None, time=None):
"""Hold the workflow."""
self.schd.command_queue.put((
'hold',
tuple(),
filter_none({
'tasks': tasks or None,
'time': time
})
))
def hold(self, tasks: Iterable[str]) -> Tuple[bool, str]:
"""Hold tasks."""
self.schd.command_queue.put(('hold', (tasks,), {}))
return (True, 'Command queued')

def set_hold_point(self, point: str) -> Tuple[bool, str]:
"""Set workflow hold after cycle point."""
self.schd.command_queue.put(('set_hold_point', (point,), {}))
return (True, 'Command queued')

def pause(self) -> Tuple[bool, str]:
"""Pause the workflow."""
self.schd.command_queue.put(('pause', tuple(), {}))
return (True, 'Command queued')

def kill_tasks(self, tasks):
Expand Down Expand Up @@ -649,15 +654,19 @@ def reload_suite(self):
self.schd.command_queue.put(("reload_suite", (), {}))
return (True, 'Command queued')

def release(self, tasks=None):
"""Release (un-hold) the workflow."""
self.schd.command_queue.put((
"release",
(),
filter_none({
'ids': tasks
})
))
def release(self, tasks: Iterable[str]) -> Tuple[bool, str]:
"""Release held tasks."""
self.schd.command_queue.put(('release', (tasks,), {}))
return (True, 'Command queued')

def release_hold_point(self) -> Tuple[bool, str]:
"""Release all tasks and unset workflow hold point."""
self.schd.command_queue.put(('release_hold_point', tuple(), {}))
return (True, 'Command queued')

def resume(self) -> Tuple[bool, str]:
"""Resume the workflow."""
self.schd.command_queue.put(('resume', tuple(), {}))
return (True, 'Command queued')

def set_verbosity(self, level):
Expand Down
4 changes: 2 additions & 2 deletions cylc/flow/network/scan.py
Original file line number Diff line number Diff line change
Expand Up @@ -343,8 +343,8 @@ async def graphql_query(flow, fields, filters=None):
# state must be running
[('state',), 'running']
# state must be running or held
[('state',), ('running', 'held')]
# state must be running or paused
[('state',), ('running', 'paused')]
"""
query = f'query {{ workflows(ids: ["{flow["name"]}"]) {{ {fields} }} }}'
Expand Down
83 changes: 61 additions & 22 deletions cylc/flow/network/schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -1532,23 +1532,33 @@ class Arguments:
result = GenericScalar()


class Hold(Mutation):
class SetHoldPoint(Mutation):
class Meta:
description = sstrip('''
Hold a workflow or tasks within it.
Set workflow hold after cycle point. All tasks after this point
will be held.
''')
resolver = partial(mutator, command='hold')
resolver = partial(mutator, command='set_hold_point')

class Arguments:
workflows = List(WorkflowID, required=True)
tasks = List(
NamespaceIDGlob,
description='Hold the specified tasks rather than the workflow.'
point = CyclePoint(
description='Hold all tasks after the specified cycle point.',
required=True
)
time = TimePoint(description=sstrip('''
Get the workflow to hold after the specified wallclock time
has passed.
'''))

result = GenericScalar()


class Pause(Mutation):
class Meta:
description = sstrip('''
Pause a workflow.
''')
resolver = partial(mutator, command='pause')

class Arguments:
workflows = List(WorkflowID, required=True)

result = GenericScalar()

Expand Down Expand Up @@ -1596,23 +1606,30 @@ class Arguments:
result = GenericScalar()


class Release(Mutation):
class ReleaseHoldPoint(Mutation):
class Meta:
description = sstrip('''
Release a held workflow or tasks within it.
Release all tasks and unset the workflow hold point, if set.
''')
resolver = partial(mutator, command='release_hold_point')

See also the opposite command `hold`.
class Arguments:
workflows = List(WorkflowID, required=True)

result = GenericScalar()


class Resume(Mutation):
class Meta:
description = sstrip('''
Resume a paused workflow.
See also the opposite command `pause`.
''')
resolver = partial(mutator, command='release')
resolver = partial(mutator, command='resume')

class Arguments:
workflows = List(WorkflowID, required=True)
tasks = List(
NamespaceIDGlob,
description=sstrip('''
Release matching tasks rather than the workflow as whole.
''')
)

result = GenericScalar()

Expand Down Expand Up @@ -1763,6 +1780,24 @@ class Arguments:
result = GenericScalar()


class Hold(Mutation, TaskMutation):
class Meta:
description = sstrip('''
Hold tasks within a workflow.
''')
resolver = partial(mutator, command='hold')


class Release(Mutation, TaskMutation):
class Meta:
description = sstrip('''
Release held tasks within a workflow.
See also the opposite command `hold`.
''')
resolver = partial(mutator, command='release')


class Kill(Mutation, TaskMutation):
# TODO: This should be a job mutation?
class Meta:
Expand Down Expand Up @@ -1851,18 +1886,22 @@ class Mutations(ObjectType):
# workflow actions
broadcast = _mut_field(Broadcast)
ext_trigger = _mut_field(ExtTrigger)
hold = _mut_field(Hold)
message = _mut_field(Message)
pause = _mut_field(Pause)
ping = _mut_field(Ping)
release = _mut_field(Release)
reload = _mut_field(Reload)
resume = _mut_field(Resume)
set_verbosity = _mut_field(SetVerbosity)
set_graph_window_extent = _mut_field(SetGraphWindowExtent)
stop = _mut_field(Stop)
set_hold_point = _mut_field(SetHoldPoint)
release_hold_point = _mut_field(ReleaseHoldPoint)

# task actions
hold = _mut_field(Hold)
kill = _mut_field(Kill)
poll = _mut_field(Poll)
release = _mut_field(Release)
remove = _mut_field(Remove)
set_outputs = _mut_field(SetOutputs)
trigger = _mut_field(Trigger)
Expand Down
Loading

0 comments on commit 4947568

Please sign in to comment.