Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

cylc pause #4076

Merged
merged 34 commits into from
Mar 19, 2021
Merged
Show file tree
Hide file tree
Changes from 31 commits
Commits
Show all changes
34 commits
Select commit Hold shift + click to select a range
30b9320
Implement cylc pause
MetRonnie Feb 12, 2021
eb44ae5
Tidy
MetRonnie Feb 12, 2021
0f9b0f3
Fix incorrect reference to wallclock time in GraphQL schema
MetRonnie Jan 26, 2021
46fa795
Fix integration tests after introducing cylc pause
MetRonnie Feb 19, 2021
d2107a6
Fix functional tests after introducing cylc pause
MetRonnie Feb 22, 2021
05d47a6
Update TUI for cylc pause
MetRonnie Feb 9, 2021
94cd1a9
Slightly hacky fix for functional tests
MetRonnie Feb 19, 2021
a04806b
Fix func test after introducing cylc pause
MetRonnie Feb 16, 2021
480f4b1
Fix error pruning flow labels when restarting a paused workflow
MetRonnie Feb 17, 2021
df37b52
Fix integration test after introducing cylc pause
MetRonnie Feb 18, 2021
cd77f43
Tidy func tests
MetRonnie Feb 18, 2021
fec10fd
Fix tricky-to-debug func test after introducing cylc pause
MetRonnie Feb 18, 2021
faca543
Address code review RE: cylc pause
MetRonnie Feb 19, 2021
0f88e12
Fix cycle point format in test
MetRonnie Feb 19, 2021
62a712a
Fix hold-release func test and add pause-resume equivalent
MetRonnie Feb 22, 2021
6cde074
Fix func test after introducing cylc pause and add equivalent
MetRonnie Feb 22, 2021
462c6b6
Rename tests as appropriate
MetRonnie Feb 22, 2021
240b8f6
Create function for querying DB in integration tests
MetRonnie Feb 23, 2021
450cc79
Write integration tests for cylc pause
MetRonnie Feb 23, 2021
d913cd3
Update changelog
MetRonnie Feb 24, 2021
1080829
Update cylc play cli help
MetRonnie Feb 24, 2021
05a64f6
Unit test the cli arg/opt validation in hold & release
MetRonnie Feb 25, 2021
7578b44
Split hold & release mutations into two each
MetRonnie Feb 26, 2021
1b2d35e
Merge branch 'master' into cylc-pause
MetRonnie Mar 1, 2021
57207d6
Address code review
MetRonnie Mar 1, 2021
b5c0d7d
Fix cylc play & stop docstrings
MetRonnie Mar 10, 2021
6778da0
Fix mistake
MetRonnie Mar 10, 2021
a8d9dbe
Prevent use of invalid reg or abs path in cylc play
MetRonnie Mar 10, 2021
6b90ddf
Fix mistake in scheduler install logic, plus tidy
MetRonnie Mar 11, 2021
16505e5
Merge branch 'master' into cylc-pause
MetRonnie Mar 12, 2021
637e7b5
Fix tests
MetRonnie Mar 12, 2021
4faa06c
Apply code review suggestion
MetRonnie Mar 17, 2021
a1db082
Merge branch 'master' into cylc-pause
MetRonnie Mar 18, 2021
bf55479
Improve type safety (i.e. fix mypy errors)
MetRonnie Mar 18, 2021
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/bash.yml
Original file line number Diff line number Diff line change
Expand Up @@ -44,9 +44,9 @@ jobs:
tests/functional/cylc-poll/15-job-st-file-no-batch.t \
tests/functional/events/28-inactivity.t \
tests/functional/events/34-task-abort.t \
tests/functional/hold-release/12-hold-then-retry.t \
tests/functional/job-file-trap/00-sigusr1.t \
tests/functional/job-file-trap/02-pipefail.t \
tests/functional/pause-resume/12-pause-then-retry.t \
tests/functional/shutdown/09-now2.t \
tests/functional/shutdown/13-no-port-file-check.t \
tests/functional/shutdown/14-no-dir-check.t
Expand Down
13 changes: 8 additions & 5 deletions CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -66,10 +66,18 @@ queueing logic centralized.
`cylc register` has been replaced by `cylc install`
([#4000](https://github.com/cylc/cylc-flow/pull/4000)).

Added a new command: `cylc clean`, for removing stopped workflows on the local
and any remote filesystems ([#3961](https://github.com/cylc/cylc-flow/pull/3961),
[#4017](https://github.com/cylc/cylc-flow/pull/4017)).

`cylc run` and `cylc restart` have been replaced by `cylc play`, simplifying
how workflows are restarted
([#4040](https://github.com/cylc/cylc-flow/pull/4040)).

`cylc pause` and `cylc play` are now used to pause and resume workflows,
respectively. `cylc hold` and `cylc release` now only hold and release tasks,
not the whole workflow. ([#4076](https://github.com/cylc/cylc-flow/pull/4076))

"Implicit"/"naked" tasks (tasks that do not have an explicit definition in
`flow.cylc[runtime]`) are now disallowed by default
([#4109](https://github.com/cylc/cylc-flow/pull/4109)). You can allow them by
Expand Down Expand Up @@ -167,11 +175,6 @@ hierarchy and ability to set site config directory.
[#3883](https://github.com/cylc/cylc-flow/pull/3883) - Added a new workflow
config option `[scheduling]stop after cycle point`.

[#3961](https://github.com/cylc/cylc-flow/pull/3961),
[#4017](https://github.com/cylc/cylc-flow/pull/4017) - Added a new command:
`cylc clean`, for removing stopped workflows on the local and any remote
filesystems.

[#3913](https://github.com/cylc/cylc-flow/pull/3913) - Added the ability to
use plugins to parse suite templating variables and additional files to
install. Only one such plugin exists at the time of writing, designed to
Expand Down
2 changes: 1 addition & 1 deletion cylc/flow/etc/cylc-bash-completion
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ _cylc() {
cur="${COMP_WORDS[COMP_CWORD]}"
sec="${COMP_WORDS[1]}"
opts="$(cylc scan -t name 2>/dev/null)"
suite_cmds="broadcast|bcast|cat-log|check-versions|clean|compare|diff|dump|edit|ext-trigger|external-trigger|get-suite-config|get-config|get-suite-version|get-cylc-version|graph|hold|insert|install|kill|list|log|ls|tui|ping|play|poll|print|reinstall|release|unhold|reload|remove|report-timings|reset|scan|search|grep|set-verbosity|show|set-outputs|stop|shutdown|single|suite-state|test-battery|trigger|validate|view|warranty"
suite_cmds="broadcast|bcast|cat-log|check-versions|clean|compare|diff|dump|edit|ext-trigger|external-trigger|get-suite-config|get-config|get-suite-version|get-cylc-version|graph|hold|insert|install|kill|list|log|ls|tui|pause|ping|play|poll|print|reinstall|release|unhold|reload|remove|report-timings|reset|scan|search|grep|set-verbosity|show|set-outputs|stop|shutdown|single|suite-state|test-battery|trigger|validate|view|warranty"


if [[ ${COMP_CWORD} -eq 1 ]]; then
Expand Down
52 changes: 32 additions & 20 deletions cylc/flow/network/resolvers.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import logging
import queue
from time import time
from typing import Iterable, Tuple, TYPE_CHECKING
from uuid import uuid4

from graphene.utils.str_converters import to_snake_case
Expand All @@ -34,6 +35,10 @@
NodesEdges, PROXY_NODES, SUB_RESOLVERS, parse_node_id, sort_elements
)

if TYPE_CHECKING:
from cylc.flow.scheduler import Scheduler
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ouch, what's going on here, is this because of import problems or is it to avoid unnecessary imports?

Copy link
Member Author

@MetRonnie MetRonnie Feb 25, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Avoid unnecessary imports mainly, this will only import the Scheduler when mypy is run (needed if using Scheduler in a type annotation). Also I guess it is likely to lead to circular imports too

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm guessing we can't do:

self.schd: 'cylc.flow.scheduler.Scheduler' = schd

In order for a Scheduler object to have been passed to the Resolvers instance cylc.flow.scheduler must already be loaded in memory, even so it would be annoying if we had to write imports like this everywhere to keep mypy happy.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Imho writing out the full path like that would be more annoying than doing if TYPE_CHECKING... import.... Anyway, hopefully the usefulness of mypy outweighs its annoyances.



logger = logging.getLogger(__name__)

DELTA_SLEEP_INTERVAL = 0.5
Expand Down Expand Up @@ -438,7 +443,7 @@ async def subscribe_delta(self, root, info, args):
class Resolvers(BaseResolvers):
"""Workflow Service context GraphQL query and mutation resolvers."""

schd = None
schd: 'Scheduler' = None

def __init__(self, data, **kwargs):
super().__init__(data)
Expand Down Expand Up @@ -520,16 +525,19 @@ def broadcast(
cutoff)
raise ValueError('Unsupported broadcast mode')

def hold(self, tasks=None, time=None):
"""Hold the workflow."""
self.schd.command_queue.put((
'hold',
tuple(),
filter_none({
'tasks': tasks or None,
'time': time
})
))
def hold(self, tasks: Iterable[str]) -> Tuple[bool, str]:
"""Hold tasks."""
self.schd.command_queue.put(('hold', (tasks,), {}))
return (True, 'Command queued')

def set_hold_point(self, point: str) -> Tuple[bool, str]:
"""Set workflow hold after cycle point."""
self.schd.command_queue.put(('set_hold_point', (point,), {}))
return (True, 'Command queued')

def pause(self) -> Tuple[bool, str]:
"""Pause the workflow."""
self.schd.command_queue.put(('pause', tuple(), {}))
return (True, 'Command queued')

def kill_tasks(self, tasks):
Expand Down Expand Up @@ -649,15 +657,19 @@ def reload_suite(self):
self.schd.command_queue.put(("reload_suite", (), {}))
return (True, 'Command queued')

def release(self, tasks=None):
"""Release (un-hold) the workflow."""
self.schd.command_queue.put((
"release",
(),
filter_none({
'ids': tasks
})
))
def release(self, tasks: Iterable[str]) -> Tuple[bool, str]:
"""Release held tasks."""
self.schd.command_queue.put(('release', (tasks,), {}))
return (True, 'Command queued')

def release_hold_point(self) -> Tuple[bool, str]:
"""Release all tasks and unset workflow hold point."""
self.schd.command_queue.put(('release_hold_point', tuple(), {}))
return (True, 'Command queued')

def resume(self) -> Tuple[bool, str]:
"""Resume the workflow."""
self.schd.command_queue.put(('resume', tuple(), {}))
return (True, 'Command queued')

def set_verbosity(self, level):
Expand Down
4 changes: 2 additions & 2 deletions cylc/flow/network/scan.py
Original file line number Diff line number Diff line change
Expand Up @@ -343,8 +343,8 @@ async def graphql_query(flow, fields, filters=None):
# state must be running
[('state',), 'running']

# state must be running or held
[('state',), ('running', 'held')]
# state must be running or paused
[('state',), ('running', 'paused')]

"""
query = f'query {{ workflows(ids: ["{flow["name"]}"]) {{ {fields} }} }}'
Expand Down
83 changes: 61 additions & 22 deletions cylc/flow/network/schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -1518,23 +1518,33 @@ class Arguments:
result = GenericScalar()


class Hold(Mutation):
class SetHoldPoint(Mutation):
class Meta:
description = sstrip('''
Hold a workflow or tasks within it.
Set workflow hold after cycle point. All tasks after this point
will be held.
''')
resolver = partial(mutator, command='hold')
resolver = partial(mutator, command='set_hold_point')

class Arguments:
workflows = List(WorkflowID, required=True)
tasks = List(
NamespaceIDGlob,
description='Hold the specified tasks rather than the workflow.'
point = CyclePoint(
description='Hold all tasks after the specified cycle point.',
required=True
)
time = TimePoint(description=sstrip('''
Get the workflow to hold after the specified wallclock time
has passed.
'''))

result = GenericScalar()


class Pause(Mutation):
class Meta:
description = sstrip('''
Pause a workflow.
''')
resolver = partial(mutator, command='pause')

class Arguments:
workflows = List(WorkflowID, required=True)

result = GenericScalar()

Expand Down Expand Up @@ -1582,23 +1592,30 @@ class Arguments:
result = GenericScalar()


class Release(Mutation):
class ReleaseHoldPoint(Mutation):
class Meta:
description = sstrip('''
Release a held workflow or tasks within it.
Release all tasks and unset the workflow hold point, if set.
''')
resolver = partial(mutator, command='release_hold_point')

See also the opposite command `hold`.
class Arguments:
workflows = List(WorkflowID, required=True)

result = GenericScalar()


class Resume(Mutation):
class Meta:
description = sstrip('''
Resume a paused workflow.

See also the opposite command `pause`.
''')
resolver = partial(mutator, command='release')
resolver = partial(mutator, command='resume')

class Arguments:
workflows = List(WorkflowID, required=True)
tasks = List(
NamespaceIDGlob,
description=sstrip('''
Release matching tasks rather than the workflow as whole.
''')
)

result = GenericScalar()

Expand Down Expand Up @@ -1749,6 +1766,24 @@ class Arguments:
result = GenericScalar()


class Hold(Mutation, TaskMutation):
class Meta:
description = sstrip('''
Hold tasks within a workflow.
''')
resolver = partial(mutator, command='hold')


class Release(Mutation, TaskMutation):
class Meta:
description = sstrip('''
Release held tasks within a workflow.

See also the opposite command `hold`.
''')
resolver = partial(mutator, command='release')


class Kill(Mutation, TaskMutation):
# TODO: This should be a job mutation?
class Meta:
Expand Down Expand Up @@ -1837,18 +1872,22 @@ class Mutations(ObjectType):
# workflow actions
broadcast = _mut_field(Broadcast)
ext_trigger = _mut_field(ExtTrigger)
hold = _mut_field(Hold)
message = _mut_field(Message)
pause = _mut_field(Pause)
ping = _mut_field(Ping)
release = _mut_field(Release)
reload = _mut_field(Reload)
resume = _mut_field(Resume)
set_verbosity = _mut_field(SetVerbosity)
set_graph_window_extent = _mut_field(SetGraphWindowExtent)
stop = _mut_field(Stop)
set_hold_point = _mut_field(SetHoldPoint)
release_hold_point = _mut_field(ReleaseHoldPoint)

# task actions
hold = _mut_field(Hold)
kill = _mut_field(Kill)
poll = _mut_field(Poll)
release = _mut_field(Release)
remove = _mut_field(Remove)
set_outputs = _mut_field(SetOutputs)
trigger = _mut_field(Trigger)
Expand Down
Loading