Skip to content

Commit

Permalink
Implement cylc pause
Browse files Browse the repository at this point in the history
Now:
- cylc pause/play pauses/resumes the workflow
- cylc hold/release holds/releases tasks

pause/play is decoupled from hold/release, i.e. they don't affect task 
states
  • Loading branch information
MetRonnie committed Feb 16, 2021
1 parent a1e4bc3 commit 5a71c4d
Show file tree
Hide file tree
Showing 18 changed files with 385 additions and 161 deletions.
2 changes: 1 addition & 1 deletion cylc/flow/etc/cylc-bash-completion
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ _cylc() {
cur="${COMP_WORDS[COMP_CWORD]}"
sec="${COMP_WORDS[1]}"
opts="$(cylc scan -t name 2>/dev/null)"
suite_cmds="broadcast|bcast|cat-log|check-versions|clean|compare|diff|dump|edit|ext-trigger|external-trigger|get-suite-config|get-config|get-suite-version|get-cylc-version|graph|graph-diff|hold|insert|install|kill|list|log|ls|tui|ping|play|poll|print|release|unhold|reload|remove|report-timings|reset|scan|search|grep|set-verbosity|show|set-outputs|stop|shutdown|single|suite-state|test-battery|trigger|validate|view|warranty"
suite_cmds="broadcast|bcast|cat-log|check-versions|clean|compare|diff|dump|edit|ext-trigger|external-trigger|get-suite-config|get-config|get-suite-version|get-cylc-version|graph|graph-diff|hold|insert|install|kill|list|log|ls|tui|pause|ping|play|poll|print|release|unhold|reload|remove|report-timings|reset|scan|search|grep|set-verbosity|show|set-outputs|stop|shutdown|single|suite-state|test-battery|trigger|validate|view|warranty"


if [[ ${COMP_CWORD} -eq 1 ]]; then
Expand Down
35 changes: 28 additions & 7 deletions cylc/flow/network/resolvers.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import logging
import queue
from time import time
from typing import Iterable, Optional, Tuple, TYPE_CHECKING
from uuid import uuid4

from graphene.utils.str_converters import to_snake_case
Expand All @@ -34,6 +35,10 @@
NodesEdges, PROXY_NODES, SUB_RESOLVERS, parse_node_id, sort_elements
)

if TYPE_CHECKING:
from cylc.flow.scheduler import Scheduler


logger = logging.getLogger(__name__)

DELTA_SLEEP_INTERVAL = 0.5
Expand Down Expand Up @@ -436,7 +441,7 @@ async def subscribe_delta(self, root, info, args):
class Resolvers(BaseResolvers):
"""Workflow Service context GraphQL query and mutation resolvers."""

schd = None
schd: 'Scheduler' = None

def __init__(self, data, **kwargs):
super().__init__(data)
Expand Down Expand Up @@ -518,8 +523,13 @@ def broadcast(
cutoff)
raise ValueError('Unsupported broadcast mode')

def hold(self, tasks=None, time=None):
"""Hold the workflow."""
def hold(
self, tasks: Optional[Iterable[str]] = None,
time: Optional[str] = None
) -> Tuple[bool, str]:
"""Hold tasks."""
if (tasks and time) or not (tasks or time):
return (False, 'Argument must be either tasks or time (not both)')
self.schd.command_queue.put((
'hold',
tuple(),
Expand All @@ -530,6 +540,11 @@ def hold(self, tasks=None, time=None):
))
return (True, 'Command queued')

def pause(self) -> Tuple[bool, str]:
"""Pause the workflow."""
self.schd.command_queue.put(('pause', tuple(), {}))
return (True, 'Command queued')

def kill_tasks(self, tasks):
"""Kill task jobs.
Expand Down Expand Up @@ -647,17 +662,23 @@ def reload_suite(self):
self.schd.command_queue.put(("reload_suite", (), {}))
return (True, 'Command queued')

def release(self, tasks=None):
"""Release (un-hold) the workflow."""
def release(
self, tasks: Optional[Iterable[str]] = None) -> Tuple[bool, str]:
"""Release held tasks."""
self.schd.command_queue.put((
"release",
(),
'release',
tuple(),
filter_none({
'ids': tasks
})
))
return (True, 'Command queued')

def resume(self) -> Tuple[bool, str]:
"""Resume the workflow."""
self.schd.command_queue.put(('resume', tuple(), {}))
return (True, 'Command queued')

def set_verbosity(self, level):
"""Set suite verbosity to new level (for suite logs).
Expand Down
4 changes: 2 additions & 2 deletions cylc/flow/network/scan.py
Original file line number Diff line number Diff line change
Expand Up @@ -343,8 +343,8 @@ async def graphql_query(flow, fields, filters=None):
# state must be running
[('state',), 'running']
# state must be running or held
[('state',), ('running', 'held')]
# state must be running or paused
[('state',), ('running', 'paused')]
"""
query = f'query {{ workflows(ids: ["{flow["name"]}"]) {{ {fields} }} }}'
Expand Down
45 changes: 38 additions & 7 deletions cylc/flow/network/schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -1508,24 +1508,37 @@ class Arguments:
class Hold(Mutation):
class Meta:
description = sstrip('''
Hold a workflow or tasks within it.
Hold tasks within a workflow.
''')
resolver = partial(mutator, command='hold')

class Arguments:
workflows = List(WorkflowID, required=True)
tasks = List(
NamespaceIDGlob,
description='Hold the specified tasks rather than the workflow.'
description='Hold matching tasks.'
)
time = TimePoint(description=sstrip('''
Get the workflow to hold after the specified wallclock time
Hold all tasks only after the specified wallclock time
has passed.
'''))

result = GenericScalar()


class Pause(Mutation):
class Meta:
description = sstrip('''
Pause a workflow.
''')
resolver = partial(mutator, command='pause')

class Arguments:
workflows = List(WorkflowID, required=True)

result = GenericScalar()


class Ping(Mutation):
class Meta:
description = sstrip('''
Expand Down Expand Up @@ -1572,7 +1585,7 @@ class Arguments:
class Release(Mutation):
class Meta:
description = sstrip('''
Release a held workflow or tasks within it.
Release held tasks within a workflow.
See also the opposite command `hold`.
''')
Expand All @@ -1583,13 +1596,29 @@ class Arguments:
tasks = List(
NamespaceIDGlob,
description=sstrip('''
Release matching tasks rather than the workflow as whole.
Release matching tasks or, if not specified, release all tasks
and remove the hold point if set.
''')
)

result = GenericScalar()


class Resume(Mutation):
class Meta:
description = sstrip('''
Resume a paused workflow.
See also the opposite command `pause`.
''')
resolver = partial(mutator, command='resume')

class Arguments:
workflows = List(WorkflowID, required=True)

result = GenericScalar()


class Reload(Mutation):
class Meta:
description = sstrip('''
Expand Down Expand Up @@ -1824,18 +1853,20 @@ class Mutations(ObjectType):
# workflow actions
broadcast = _mut_field(Broadcast)
ext_trigger = _mut_field(ExtTrigger)
hold = _mut_field(Hold)
message = _mut_field(Message)
pause = _mut_field(Pause)
ping = _mut_field(Ping)
release = _mut_field(Release)
reload = _mut_field(Reload)
resume = _mut_field(Resume)
set_verbosity = _mut_field(SetVerbosity)
set_graph_window_extent = _mut_field(SetGraphWindowExtent)
stop = _mut_field(Stop)

# task actions
hold = _mut_field(Hold)
kill = _mut_field(Kill)
poll = _mut_field(Poll)
release = _mut_field(Release)
remove = _mut_field(Remove)
set_outputs = _mut_field(SetOutputs)
trigger = _mut_field(Trigger)
Expand Down
Loading

0 comments on commit 5a71c4d

Please sign in to comment.