Skip to content

Commit

Permalink
Address more review comments.
Browse files Browse the repository at this point in the history
  • Loading branch information
hjoliver committed Jun 13, 2024
1 parent c3dfe6a commit b32f223
Show file tree
Hide file tree
Showing 6 changed files with 39 additions and 38 deletions.
4 changes: 2 additions & 2 deletions cylc/flow/cfgspec/workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -1742,8 +1742,8 @@ def get_script_common_text(this: str, example: Optional[str] = None):
.. deprecated:: 8.3.0
Please use the workflow_state xtrigger instead:
:py:mod:`cylc.flow.xtriggers.workflow_state`.
Please use the :ref:`workflow_state xtrigger
<Built-in Workflow State Triggers>` instead.
'''):
Conf('interval', VDR.V_INTERVAL, desc='''
Polling interval.
Expand Down
22 changes: 11 additions & 11 deletions cylc/flow/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -1548,26 +1548,26 @@ def configure_workflow_state_polling_tasks(self):
"$CYLC_TASK_CYCLE_POINT/"
f"{tdef.workflow_polling_cfg['task']}"
)
graph_trigger = tdef.workflow_polling_cfg['status']
config_trigger = rtc['workflow state polling']['message']
graph_selector = tdef.workflow_polling_cfg['status']
config_message = rtc['workflow state polling']['message']
if (
graph_trigger is not None and
graph_selector is not None and
(
config_trigger is not None
config_message is not None
) and (
graph_trigger != config_trigger
graph_selector != config_message
)
):
raise WorkflowConfigError(
f'Polling task "{name}" must configure a target status or'
f' output message in the graph (:{graph_trigger}) or task'
f' definition (message = "{config_trigger}") but not both.'
f' output message in the graph (:{graph_selector}) or task'
f' definition (message = "{config_message}") but not both.'
)
if graph_trigger is not None:
comstr += f":{graph_trigger}"
elif config_trigger is not None:
if graph_selector is not None:
comstr += f":{graph_selector}"
elif config_message is not None:
# quote: may contain spaces
comstr += f':"{config_trigger}" --messages'
comstr += f':"{config_message}" --messages'
else:
# default to :succeeded
comstr += f":{TASK_OUTPUT_SUCCEEDED}"
Expand Down
7 changes: 7 additions & 0 deletions cylc/flow/scripts/workflow_state.py
Original file line number Diff line number Diff line change
Expand Up @@ -356,6 +356,13 @@ def get_option_parser() -> COP:
@cli_function(get_option_parser, remove_opts=["--db"])
def main(parser: COP, options: 'Values', *ids: str) -> None:

# Note it would be cleaner to use 'id_cli.parse_ids()' here to get the
# workflow ID and tokens, but that function infers run number and fails
# if the workflow is not installed yet. We want to be able to start polling
# before the workflow is installed, which makes it easier to get set of
# interdependent workflows up and running, so runN inference is done inside
# the poller. TODO: consider using id_cli.parse_ids inside the poller.

if len(ids) != 1:
raise InputError("Please give a single ID")

Expand Down
31 changes: 15 additions & 16 deletions cylc/flow/xtriggers/workflow_state.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
def workflow_state(
workflow_task_id: str,
offset: Optional[str] = None,
flow_num: Optional[int] = 1,
flow_num: Optional[int] = None,
is_output: bool = False,
is_message: bool = False,
alt_cylc_run_dir: Optional[str] = None,
Expand Down Expand Up @@ -70,20 +70,16 @@ def workflow_state(
args=[]
)
if asyncio.run(poller.poll()):
return (
True,
{
"workflow_id": poller.workflow_id,
"task_id": f"{poller.cycle}/{poller.task}",
"task_selector": poller.selector,
"flow_num": poller.flow_num
}
)
result = {
"workflow_id": poller.workflow_id,
"task_id": f"{poller.cycle}/{poller.task}",
"task_selector": poller.selector,
}
if flow_num is not None:
result["flow_num"] = poller.flow_num

Check warning on line 79 in cylc/flow/xtriggers/workflow_state.py

View check run for this annotation

Codecov / codecov/patch

cylc/flow/xtriggers/workflow_state.py#L79

Added line #L79 was not covered by tests
return (True, result)
else:
return (
False,
{}
)
return (False, {})


def validate(args: Dict[str, Any]):
Expand All @@ -109,8 +105,11 @@ def validate(args: Dict[str, Any]):
raise WorkflowConfigError(
"Full ID needed: workflow//cycle/task[:selector].")

if not isinstance(args["flow_num"], int):
raise WorkflowConfigError("flow_num must be an integer.")
if (
args["flow_num"] is not None and
not isinstance(args["flow_num"], int)
):
raise WorkflowConfigError("flow_num must be an integer if given.")


# BACK COMPAT: workflow_state_backcompat
Expand Down
5 changes: 0 additions & 5 deletions tests/flakyfunctional/xtriggers/01-workflow_state.t
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,6 @@ contains_ok "${JOB_LOG}" << __END__
upstream_workflow_id="${WORKFLOW_NAME_UPSTREAM}"
upstream_task_id="2015/foo"
upstream_task_selector="data_ready"
upstream_flow_num="1"
__END__

# Check broadcast of xtrigger outputs is recorded: 1) in the workflow log...
Expand All @@ -71,11 +70,9 @@ contains_ok "${WORKFLOW_LOG}" << __LOG_BROADCASTS__
${LOG_INDENT}+ [2015/f1] [environment]upstream_workflow_id=${WORKFLOW_NAME_UPSTREAM}
${LOG_INDENT}+ [2015/f1] [environment]upstream_task_id=2015/foo
${LOG_INDENT}+ [2015/f1] [environment]upstream_task_selector=data_ready
${LOG_INDENT}+ [2015/f1] [environment]upstream_flow_num=1
${LOG_INDENT}- [2015/f1] [environment]upstream_workflow_id=${WORKFLOW_NAME_UPSTREAM}
${LOG_INDENT}- [2015/f1] [environment]upstream_task_id=2015/foo
${LOG_INDENT}- [2015/f1] [environment]upstream_task_selector=data_ready
${LOG_INDENT}- [2015/f1] [environment]upstream_flow_num=1
__LOG_BROADCASTS__
# ... and 2) in the DB.
TEST_NAME="${TEST_NAME_BASE}-check-broadcast-in-db"
Expand All @@ -91,11 +88,9 @@ contains_ok "${NAME}" << __DB_BROADCASTS__
+|2015|f1|[environment]upstream_workflow_id|${WORKFLOW_NAME_UPSTREAM}
+|2015|f1|[environment]upstream_task_id|2015/foo
+|2015|f1|[environment]upstream_task_selector|data_ready
+|2015|f1|[environment]upstream_flow_num|1
-|2015|f1|[environment]upstream_workflow_id|${WORKFLOW_NAME_UPSTREAM}
-|2015|f1|[environment]upstream_task_id|2015/foo
-|2015|f1|[environment]upstream_task_selector|data_ready
-|2015|f1|[environment]upstream_flow_num|1
__DB_BROADCASTS__

purge
Expand Down
8 changes: 4 additions & 4 deletions tests/flakyfunctional/xtriggers/01-workflow_state/flow.cylc
Original file line number Diff line number Diff line change
@@ -1,14 +1,14 @@
#!Jinja2
[scheduler]
cycle point format = %Y
cycle point format = %Y
[[events]]
inactivity timeout = PT20S
abort on inactivity timeout = True
inactivity timeout = PT20S
abort on inactivity timeout = True
[scheduling]
initial cycle point = 2011
final cycle point = 2016
[[xtriggers]]
upstream = workflow_state(workflow_task_id={{UPSTREAM}}//%(point)s/foo:data_ready):PT1S
upstream = workflow_state("{{UPSTREAM}}//%(point)s/foo:data_ready"):PT1S
[[graph]]
P1Y = """
foo
Expand Down

0 comments on commit b32f223

Please sign in to comment.