diff --git a/CHANGES.md b/CHANGES.md
index 9f3698432c2..00539f6490f 100644
--- a/CHANGES.md
+++ b/CHANGES.md
@@ -24,6 +24,11 @@ Cylc to crash when preparing the job script.
-------------------------------------------------------------------------------
## __cylc-8.2.0 (Coming Soon)__
+### Enhancements
+
+[#5090](https://github.com/cylc/cylc-flow/pull/5090) - Implement initial and
+final graphs, distinct from the main cycling graph.
+
### Fixes
[#5328](https://github.com/cylc/cylc-flow/pull/5328) -
Efficiency improvements to reduce task management overheads on the Scheduler.
@@ -73,19 +78,20 @@ workflows with many-to-many dependencies (e.g. ` => `).
### Enhancements
+[#5184](https://github.com/cylc/cylc-flow/pull/5184) - Scan for active
+runs of the same workflow at install time.
+
+[#5032](https://github.com/cylc/cylc-flow/pull/5032) - Set a default limit of
+
[#5229](https://github.com/cylc/cylc-flow/pull/5229) -
- Added a single command to validate a previously run workflow against changes
to its source and reinstall a workflow.
- Allows Cylc commands (including validate, list, view, config, and graph) to load template variables
configured by `cylc install` and `cylc play`.
-[#5184](https://github.com/cylc/cylc-flow/pull/5184) - scan for active
-runs of the same workflow at install time.
-
[#5121](https://github.com/cylc/cylc-flow/pull/5121) - Added a single
command to validate, install and play a workflow.
-[#5032](https://github.com/cylc/cylc-flow/pull/5032) - set a default limit of
100 for the "default" queue.
[#5055](https://github.com/cylc/cylc-flow/pull/5055) and
diff --git a/cylc/flow/config.py b/cylc/flow/config.py
index a549c9ed771..fa1c773264d 100644
--- a/cylc/flow/config.py
+++ b/cylc/flow/config.py
@@ -54,6 +54,11 @@
get_sequence, get_sequence_cls, init_cyclers, get_dump_format,
INTEGER_CYCLING_TYPE, ISO8601_CYCLING_TYPE
)
+from cylc.flow.cycling.nocycle import (
+ NocycleSequence,
+ NOCYCLE_SEQ_ALPHA,
+ NOCYCLE_SEQ_OMEGA
+)
from cylc.flow.id import Tokens
from cylc.flow.cycling.integer import IntegerInterval
from cylc.flow.cycling.iso8601 import ingest_time, ISO8601Interval
@@ -270,6 +275,7 @@ def __init__(
self.start_point: 'PointBase'
self.stop_point: Optional['PointBase'] = None
self.final_point: Optional['PointBase'] = None
+ self.nocycle_sequences: Set['NocycleSequence'] = set()
self.sequences: List['SequenceBase'] = []
self.actual_first_point: Optional['PointBase'] = None
self._start_point_for_actual_first_point: Optional['PointBase'] = None
@@ -618,9 +624,16 @@ def prelim_process_graph(self) -> None:
if (
'cycling mode' not in self.cfg['scheduling'] and
self.cfg['scheduling'].get('initial cycle point', '1') == '1' and
- all(item in ['graph', '1', 'R1'] for item in graphdict)
+ all(
+ item in [
+ 'graph', '1', 'R1',
+ str(NOCYCLE_SEQ_ALPHA),
+ str(NOCYCLE_SEQ_OMEGA)
+ ]
+ for item in graphdict
+ )
):
- # Pure acyclic graph, assume integer cycling mode with '1' cycle
+ # Non-cycling graph, assume integer cycling mode with '1' cycle
self.cfg['scheduling']['cycling mode'] = INTEGER_CYCLING_TYPE
for key in ('initial cycle point', 'final cycle point'):
if key not in self.cfg['scheduling']:
@@ -2087,15 +2100,24 @@ def load_graph(self):
try:
seq = get_sequence(section, icp, fcp)
except (AttributeError, TypeError, ValueError, CylcError) as exc:
- if cylc.flow.flags.verbosity > 1:
- traceback.print_exc()
- msg = 'Cannot process recurrence %s' % section
- msg += ' (initial cycle point=%s)' % icp
- msg += ' (final cycle point=%s)' % fcp
- if isinstance(exc, CylcError):
- msg += ' %s' % exc.args[0]
- raise WorkflowConfigError(msg)
- self.sequences.append(seq)
+ try:
+ seq = NocycleSequence(section)
+ except ValueError:
+ if cylc.flow.flags.verbosity > 1:
+ traceback.print_exc()
+ msg = (
+ f"Cannot process recurrence {section}"
+ f" (initial cycle point={icp})"
+ f" (final cycle point={fcp})"
+ )
+ if isinstance(exc, CylcError):
+ msg += ' %s' % exc.args[0]
+ raise WorkflowConfigError(msg)
+ else:
+ self.nocycle_sequences.add(seq)
+ else:
+ self.sequences.append(seq)
+
parser = GraphParser(
family_map,
self.parameters,
diff --git a/cylc/flow/cycling/__init__.py b/cylc/flow/cycling/__init__.py
index 1bb60f916dc..8191fba036b 100644
--- a/cylc/flow/cycling/__init__.py
+++ b/cylc/flow/cycling/__init__.py
@@ -345,7 +345,11 @@ def TYPE_SORT_KEY(self) -> int:
@classmethod
@abstractmethod # Note: stacked decorator not strictly enforced in Py2.x
def get_async_expr(cls, start_point=0):
- """Express a one-off sequence at the initial cycle point."""
+ """Express a one-off sequence at the initial cycle point.
+
+ Note "async" has nothing to do with asyncio. It was a (bad)
+ name for one-off (non-cycling) graphs in early Cylc versions.
+ """
pass
@abstractmethod
diff --git a/cylc/flow/cycling/integer.py b/cylc/flow/cycling/integer.py
index 749c651fc08..b6bc0e2555c 100644
--- a/cylc/flow/cycling/integer.py
+++ b/cylc/flow/cycling/integer.py
@@ -21,7 +21,12 @@
import re
from cylc.flow.cycling import (
- PointBase, IntervalBase, SequenceBase, ExclusionBase, parse_exclusion, cmp
+ PointBase,
+ IntervalBase,
+ SequenceBase,
+ ExclusionBase,
+ parse_exclusion,
+ cmp
)
from cylc.flow.exceptions import (
CylcMissingContextPointError,
diff --git a/cylc/flow/cycling/nocycle.py b/cylc/flow/cycling/nocycle.py
new file mode 100644
index 00000000000..67fa23f0552
--- /dev/null
+++ b/cylc/flow/cycling/nocycle.py
@@ -0,0 +1,159 @@
+# THIS FILE IS PART OF THE CYLC WORKFLOW ENGINE.
+# Copyright (C) NIWA & British Crown (Met Office) & Contributors.
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see .
+
+"""
+Cycling logic for isolated non-cycling startup and shutdown graphs.
+"""
+
+from cylc.flow.cycling import PointBase, SequenceBase, cmp
+
+# TODO: scheduler check DB to be sure alpha and omega sections have run or not.
+
+# cycle point values
+NOCYCLE_PT_ALPHA = "alpha"
+NOCYCLE_PT_OMEGA = "omega"
+
+NOCYCLE_POINTS = (
+ NOCYCLE_PT_ALPHA,
+ NOCYCLE_PT_OMEGA
+)
+
+CYCLER_TYPE_NOCYCLE = "nocycle"
+CYCLER_TYPE_SORT_KEY_NOCYCLE = 1
+
+
+class NocyclePoint(PointBase):
+ """A string-valued point."""
+
+ TYPE = CYCLER_TYPE_NOCYCLE
+ TYPE_SORT_KEY = CYCLER_TYPE_SORT_KEY_NOCYCLE
+
+ __slots__ = ('value')
+
+ def __init__(self, value: str) -> None:
+ if value not in [NOCYCLE_PT_ALPHA, NOCYCLE_PT_OMEGA]:
+ raise ValueError(f"Illegal Nocycle value {value}")
+ self.value = value
+
+ def __hash__(self):
+ return hash(self.value)
+
+ def __eq__(self, other):
+ return str(other) == self.value
+
+ def __le__(self, other):
+ """less than or equal only if equal."""
+ return str(other) == self.value
+
+ def __lt__(self, other):
+ """never less than."""
+ return False
+
+ def __gt__(self, other):
+ """never greater than."""
+ return False
+
+ def __str__(self):
+ return self.value
+
+ def _cmp(self, other):
+ return cmp(int(self), int(other))
+
+ def add(self, other):
+ # NOT USED
+ return None
+
+ def sub(self, other):
+ # NOT USED
+ return None
+
+
+class NocycleSequence(SequenceBase):
+ """A single point sequence."""
+
+ def __init__(self, dep_section, p_context_start=None, p_context_stop=None):
+ """Workflow cycling context is ignored."""
+ self.point = NocyclePoint(dep_section)
+
+ def __hash__(self):
+ return hash(str(self.point))
+
+ def is_valid(self, point):
+ """Is point on-sequence and in-bounds?"""
+ return str(point) == self.point
+
+ def get_first_point(self, point):
+ """First point is the only point"""
+ return self.point
+
+ def get_next_point(self, point):
+ """There is no next point"""
+ return None
+
+ def get_next_point_on_sequence(self, point):
+ """There is no next point"""
+ return None
+
+ def __eq__(self, other):
+ try:
+ return other.point == self.point
+ except AttributeError:
+ # (other is not a nocycle sequence)
+ return False
+
+ def __str__(self):
+ return str(self.point)
+
+ def TYPE(self) -> str:
+ raise NotImplementedError
+
+ def TYPE_SORT_KEY(self) -> int:
+ raise NotImplementedError
+
+ def get_async_expr(cls, start_point=0):
+ raise NotImplementedError
+
+ def get_interval(self):
+ """Return the cycling interval of this sequence."""
+ raise NotImplementedError
+
+ def get_offset(self):
+ """Deprecated: return the offset used for this sequence."""
+ raise NotImplementedError
+
+ def set_offset(self, i_offset):
+ """Deprecated: alter state to offset the entire sequence."""
+ raise NotImplementedError
+
+ def is_on_sequence(self, point):
+ """Is point on-sequence, disregarding bounds?"""
+ raise NotImplementedError
+
+ def get_prev_point(self, point):
+ """Return the previous point < point, or None if out of bounds."""
+ raise NotImplementedError
+
+ def get_nearest_prev_point(self, point):
+ """Return the largest point < some arbitrary point."""
+ raise NotImplementedError
+
+ def get_stop_point(self):
+ """Return the last point in this sequence, or None if unbounded."""
+ raise NotImplementedError
+
+
+NOCYCLE_SEQ_ALPHA = NocycleSequence(NOCYCLE_PT_ALPHA)
+NOCYCLE_SEQ_OMEGA = NocycleSequence(NOCYCLE_PT_OMEGA)
diff --git a/cylc/flow/scheduler.py b/cylc/flow/scheduler.py
index 4fa19c81870..5a2f620db1a 100644
--- a/cylc/flow/scheduler.py
+++ b/cylc/flow/scheduler.py
@@ -19,6 +19,7 @@
from contextlib import suppress
from collections import deque
from dataclasses import dataclass
+from functools import partial
from optparse import Values
import os
from pathlib import Path
@@ -54,6 +55,13 @@
from cylc.flow.broadcast_mgr import BroadcastMgr
from cylc.flow.cfgspec.glbl_cfg import glbl_cfg
from cylc.flow.config import WorkflowConfig
+from cylc.flow.cycling.nocycle import (
+ NOCYCLE_POINTS,
+ NOCYCLE_PT_ALPHA,
+ NOCYCLE_PT_OMEGA,
+ NOCYCLE_SEQ_ALPHA,
+ NOCYCLE_SEQ_OMEGA
+)
from cylc.flow.data_store_mgr import DataStoreMgr
from cylc.flow.id import Tokens
from cylc.flow.flow_mgr import FLOW_NONE, FlowMgr, FLOW_NEW
@@ -306,6 +314,9 @@ def __init__(self, reg: str, options: Values) -> None:
pub_d=os.path.join(self.workflow_run_dir, 'log')
)
self.is_restart = Path(self.workflow_db_mgr.pri_path).is_file()
+
+ self.graph_loaders: List[Callable] = []
+
# Map used to track incomplete remote inits for restart
# {install_target: platform}
self.incomplete_ri_map: Dict[str, Dict] = {}
@@ -473,17 +484,6 @@ async def configure(self):
self.data_store_mgr.initiate_data_model()
- self.profiler.log_memory("scheduler.py: before load_tasks")
- if self.is_restart:
- self._load_pool_from_db()
- if self.restored_stop_task_id is not None:
- self.pool.set_stop_task(self.restored_stop_task_id)
- elif self.options.starttask:
- self._load_pool_from_tasks()
- else:
- self._load_pool_from_point()
- self.profiler.log_memory("scheduler.py: after load_tasks")
-
self.workflow_db_mgr.put_workflow_params(self)
self.workflow_db_mgr.put_workflow_template_vars(self.template_vars)
self.workflow_db_mgr.put_runtime_inheritance(self.config)
@@ -610,16 +610,86 @@ def log_start(self) -> None:
extra=RotatingLogFileHandler.header_extra
)
+ def _get_graph_loaders(self) -> None:
+ """Get next graphs base on current pool content."""
+ # Check pool points in case this is a restart.
+ # TODO REALLY NEED TO CHECK DB FOR SECTIONS THAT RAN ALREADY.
+
+ points = [p.value for p in self.pool.get_points()]
+ if self.is_restart and not points:
+ # Restart with empty pool: only unfinished event handlers.
+ # No graph to load.
+ return
+
+ if (
+ NOCYCLE_SEQ_OMEGA in self.config.nocycle_sequences
+ and (not points or NOCYCLE_PT_OMEGA not in points)
+ ):
+ # Omega section exists and hasn't started yet.
+ self.graph_loaders.append(
+ partial(self.pool.load_nocycle_graph, NOCYCLE_SEQ_OMEGA)
+ )
+ if (
+ self.config.sequences
+ and (
+ not points
+ or (
+ not any(p not in NOCYCLE_POINTS for p in points)
+ and NOCYCLE_PT_OMEGA not in points
+ )
+ )
+ ):
+ # Normal graph exists and hasn't started yet.
+ if self.options.starttask:
+ # Cold start from specified tasks.
+ self.graph_loaders.append(self._load_pool_from_tasks)
+ else:
+ # Cold start from cycle point.
+ self.graph_loaders.append(self._load_pool_from_point)
+
+ if (
+ NOCYCLE_SEQ_ALPHA in self.config.nocycle_sequences
+ and not self.is_restart
+ ):
+ # Alpha section exists and hasn't started yet.
+ # (Never in a restart).
+ self.graph_loaders.append(
+ partial(self.pool.load_nocycle_graph, NOCYCLE_SEQ_ALPHA)
+ )
+
+ async def run_graphs(self):
+ self.graph_loaders = []
+ if self.is_restart:
+ # Restart from DB.
+ self.task_job_mgr.task_remote_mgr.is_restart = True
+ self.task_job_mgr.task_remote_mgr.rsync_includes = (
+ self.config.get_validated_rsync_includes())
+ self._load_pool_from_db()
+ self.restart_remote_init()
+ # Poll all pollable tasks
+ self.command_poll_tasks(['*/*'])
+ # TODO - WHY DOESN'T '*/*' MATCH THE FOLLOWING?
+ self.command_poll_tasks([f"{NOCYCLE_PT_ALPHA}/*"])
+ self.command_poll_tasks([f"{NOCYCLE_PT_OMEGA}/*"])
+
+ self._get_graph_loaders()
+ await self.main_loop()
+ # next graphs depends on content of restart pool
+ while self.graph_loaders:
+ (self.graph_loaders.pop())()
+ await self.main_loop()
+ elif self.pool.main_pool:
+ # pool loaded for integration test!
+ await self.main_loop()
+ else:
+ self._get_graph_loaders()
+ while self.graph_loaders:
+ (self.graph_loaders.pop())()
+ await self.main_loop()
+
async def run_scheduler(self) -> None:
"""Start the scheduler main loop."""
try:
- if self.is_restart:
- self.task_job_mgr.task_remote_mgr.is_restart = True
- self.task_job_mgr.task_remote_mgr.rsync_includes = (
- self.config.get_validated_rsync_includes())
- self.restart_remote_init()
- self.command_poll_tasks(['*/*'])
-
self.run_event_handlers(self.EVENT_STARTUP, 'workflow starting')
await asyncio.gather(
*main_loop.get_runners(
@@ -628,12 +698,14 @@ async def run_scheduler(self) -> None:
self
)
)
- self.server.publish_queue.put(
- self.data_store_mgr.publish_deltas)
+ self.server.publish_queue.put(self.data_store_mgr.publish_deltas)
+
# Non-async sleep - yield to other threads rather than event loop
sleep(0)
self.profiler.start()
- await self.main_loop()
+
+ await self.run_graphs()
+ LOG.critical("DONE")
except SchedulerStop as exc:
# deliberate stop
@@ -660,7 +732,6 @@ async def run_scheduler(self) -> None:
await self.handle_exception(exc)
else:
- # main loop ends (not used?)
await self.shutdown(SchedulerStop(StopMode.AUTO.value))
finally:
@@ -703,7 +774,7 @@ async def run(self):
def _load_pool_from_tasks(self):
"""Load task pool with specified tasks, for a new run."""
- LOG.info(f"Start task: {self.options.starttask}")
+ LOG.info(f"LOADING START TASKS: {self.options.starttask}")
# flow number set in this call:
self.pool.force_trigger_tasks(
self.options.starttask,
@@ -722,15 +793,16 @@ def _load_pool_from_point(self):
released from runhead.)
"""
- start_type = (
- "Warm" if self.config.start_point > self.config.initial_point
- else "Cold"
- )
- LOG.info(f"{start_type} start from {self.config.start_point}")
+ LOG.info("LOADING MAIN GRAPH")
+ msg = f"start from {self.config.start_point}"
+ if self.config.start_point == self.config.initial_point:
+ msg = "Cold " + msg
+ LOG.info(msg)
self.pool.load_from_point()
def _load_pool_from_db(self):
"""Load task pool from DB, for a restart."""
+ LOG.info("LOADING DB FOR RESTART")
self.workflow_db_mgr.pri_dao.select_broadcast_states(
self.broadcast_mgr.load_db_broadcast_states)
self.broadcast_mgr.post_load_db_coerce()
@@ -749,6 +821,9 @@ def _load_pool_from_db(self):
self.pool.load_db_tasks_to_hold()
self.pool.update_flow_mgr()
+ if self.restored_stop_task_id is not None:
+ self.pool.set_stop_task(self.restored_stop_task_id)
+
def restart_remote_init(self):
"""Remote init for all submitted/running tasks in the pool."""
self.task_job_mgr.task_remote_mgr.is_restart = True
@@ -796,11 +871,11 @@ def manage_remote_init(self):
install_target]
if status == REMOTE_INIT_DONE:
self.task_job_mgr.task_remote_mgr.file_install(platform)
- if status in [REMOTE_FILE_INSTALL_DONE,
- REMOTE_INIT_255,
- REMOTE_FILE_INSTALL_255,
- REMOTE_INIT_FAILED,
- REMOTE_FILE_INSTALL_FAILED]:
+ elif status in [REMOTE_FILE_INSTALL_DONE,
+ REMOTE_INIT_255,
+ REMOTE_FILE_INSTALL_255,
+ REMOTE_INIT_FAILED,
+ REMOTE_FILE_INSTALL_FAILED]:
# Remove install target
self.incomplete_ri_map.pop(install_target)
@@ -1645,6 +1720,10 @@ async def main_loop(self) -> None:
# Shutdown workflow if timeouts have occurred
self.timeout_check()
+ if self.graph_finished() and self.graph_loaders:
+ # Return control to load the next graph.
+ break
+
# Does the workflow need to shutdown on task failure?
await self.workflow_shutdown()
@@ -1876,6 +1955,21 @@ def set_stop_clock(self, unix_time):
self.workflow_db_mgr.put_workflow_stop_clock_time(self.stop_clock_time)
self.update_data_store()
+ def graph_finished(self):
+ """Nothing left to run."""
+ return not any(
+ itask for itask in self.pool.get_tasks()
+ if itask.state(
+ TASK_STATUS_PREPARING,
+ TASK_STATUS_SUBMITTED,
+ TASK_STATUS_RUNNING
+ )
+ or (
+ itask.state(TASK_STATUS_WAITING)
+ and not itask.state.is_runahead
+ )
+ )
+
def stop_clock_done(self):
"""Return True if wall clock stop time reached."""
if self.stop_clock_time is None:
@@ -1893,24 +1987,10 @@ def stop_clock_done(self):
def check_auto_shutdown(self):
"""Check if we should shut down now."""
- if self.is_paused:
- # Don't if paused.
- return False
-
- if self.check_workflow_stalled():
- return False
-
- if any(
- itask for itask in self.pool.get_tasks()
- if itask.state(
- TASK_STATUS_PREPARING,
- TASK_STATUS_SUBMITTED,
- TASK_STATUS_RUNNING
- )
- or (
- itask.state(TASK_STATUS_WAITING)
- and not itask.state.is_runahead
- )
+ if (
+ self.is_paused or
+ self.check_workflow_stalled() or
+ not self.graph_finished()
):
# Don't if there are more tasks to run (if waiting and not
# runahead, then held, queued, or xtriggered).
diff --git a/cylc/flow/scripts/graph.py b/cylc/flow/scripts/graph.py
index ce22cfdc2ed..258f5389644 100644
--- a/cylc/flow/scripts/graph.py
+++ b/cylc/flow/scripts/graph.py
@@ -44,6 +44,7 @@
from typing import Dict, List, Optional, TYPE_CHECKING, Tuple, Callable
from cylc.flow.config import WorkflowConfig
+from cylc.flow.cycling.nocycle import NOCYCLE_PT_ALPHA
from cylc.flow.exceptions import InputError, CylcError
from cylc.flow.id import Tokens
from cylc.flow.id_cli import parse_id_async
@@ -66,10 +67,20 @@ def sort_integer_node(id_):
Example:
>>> sort_integer_node('11/foo')
('foo', 11)
-
+ >>> sort_integer_node('alpha/foo')
+ ('foo', 0)
+ >>> sort_integer_node('omega/foo')
+ ('foo', 1)
"""
tokens = Tokens(id_, relative=True)
- return (tokens['task'], int(tokens['cycle']))
+ try:
+ return (tokens['task'], int(tokens['cycle']))
+ except ValueError:
+ # nocycle point
+ if tokens['cycle'] == NOCYCLE_PT_ALPHA:
+ return (tokens['task'], 0)
+ else:
+ return (tokens['task'], 1)
def sort_integer_edge(id_):
@@ -153,7 +164,7 @@ def _get_graph_nodes_edges(
edge_sort = sort_integer_edge
else:
# datetime sorting
- node_sort = None # lexicographically sortable
+ node_sort = None
edge_sort = sort_datetime_edge
# get nodes
diff --git a/cylc/flow/task_pool.py b/cylc/flow/task_pool.py
index 1dd0d5e6c19..b1e69e9b4d5 100644
--- a/cylc/flow/task_pool.py
+++ b/cylc/flow/task_pool.py
@@ -29,6 +29,7 @@
import cylc.flow.flags
from cylc.flow import LOG
from cylc.flow.cycling.loader import get_point, standardise_point_string
+from cylc.flow.cycling.nocycle import NocyclePoint, NOCYCLE_POINTS
from cylc.flow.exceptions import WorkflowConfigError, PointParsingError
from cylc.flow.id import Tokens, detokenise
from cylc.flow.id_cli import contains_fnmatch
@@ -167,6 +168,23 @@ def _swap_out(self, itask):
self.main_pool[itask.point][itask.identity] = itask
self.main_pool_changed = True
+ def load_nocycle_graph(self, seq):
+ """blah """
+ LOG.info(f"LOADING {seq} GRAPH")
+ flow_num = self.flow_mgr.get_new_flow(f"original {seq} flow")
+ self.runahead_limit_point = None
+ for name in self.config.get_task_name_list():
+ tdef = self.config.get_taskdef(name)
+ if str(seq) not in [str(s) for s in tdef.sequences]:
+ continue
+ if tdef.is_parentless(seq.point, seq):
+ ntask = self._get_spawned_or_merged_task(
+ seq.point, tdef.name, {flow_num}
+ )
+ if ntask is not None:
+ self.add_to_pool(ntask)
+ self.rh_release_and_queue(ntask)
+
def load_from_point(self):
"""Load the task pool for the workflow start point.
@@ -174,6 +192,8 @@ def load_from_point(self):
"""
flow_num = self.flow_mgr.get_new_flow(
f"original flow from {self.config.start_point}")
+
+ # self.runahead_limit_point = None # reset from nocycle
self.compute_runahead()
for name in self.config.get_task_name_list():
tdef = self.config.get_taskdef(name)
@@ -258,7 +278,7 @@ def release_runahead_tasks(self):
Return True if any tasks are released, else False.
Call when RH limit changes.
"""
- if not self.main_pool or not self.runahead_limit_point:
+ if not self.main_pool:
# (At start-up main pool might not exist yet)
return False
@@ -270,7 +290,12 @@ def release_runahead_tasks(self):
itask
for point, itask_id_map in self.main_pool.items()
for itask in itask_id_map.values()
- if point <= self.runahead_limit_point
+ if (
+ self.runahead_limit_point and
+ point <= self.runahead_limit_point
+ or
+ str(point) in NOCYCLE_POINTS
+ )
if itask.state.is_runahead
]
@@ -334,6 +359,10 @@ def compute_runahead(self, force=False) -> bool:
)
):
points.append(point)
+ points = [
+ p for p in points
+ if type(p) is not NocyclePoint # type: ignore
+ ]
if not points:
return False
base_point = min(points)
@@ -436,11 +465,17 @@ def load_db_task_pool_for_restart(self, row_idx, row):
(cycle, name, flow_nums, flow_wait, is_manual_submit, is_late, status,
is_held, submit_num, _, platform_name, time_submit, time_run, timeout,
outputs_str) = row
+
+ if cycle in NOCYCLE_POINTS:
+ point = NocyclePoint(cycle)
+ else:
+ point = get_point(cycle)
+
try:
itask = TaskProxy(
self.tokens,
self.config.get_taskdef(name),
- get_point(cycle),
+ point,
deserialise(flow_nums),
status=status,
is_held=is_held,
@@ -831,6 +866,10 @@ def release_queued_tasks(self):
# Note: released and pre_prep_tasks can overlap
return list(set(released + pre_prep_tasks))
+ def get_points(self):
+ """Return current list of cycle points in the pool."""
+ return list(self.main_pool)
+
def get_min_point(self):
"""Return the minimum cycle point currently in the pool."""
cycles = list(self.main_pool)
@@ -1269,7 +1308,10 @@ def spawn_on_output(self, itask, output, forced=False):
# Add it to the hidden pool or move it to the main pool.
self.add_to_pool(t)
- if t.point <= self.runahead_limit_point:
+ if (
+ t.point <= self.runahead_limit_point
+ or str(t.point) in NOCYCLE_POINTS
+ ):
self.rh_release_and_queue(t)
# Event-driven suicide.
@@ -1383,6 +1425,7 @@ def spawn_on_all_outputs(
self.data_store_mgr.delta_task_prerequisite(c_task)
self.add_to_pool(c_task)
if (
+ # TODO NOCYCLE
self.runahead_limit_point is not None
and c_task.point <= self.runahead_limit_point
):
@@ -1875,11 +1918,15 @@ def match_future_tasks(
try:
point_str = standardise_point_string(point_str)
except PointParsingError as exc:
- LOG.warning(
- f"{id_} - invalid cycle point: {point_str} ({exc})")
- unmatched_tasks.append(id_)
- continue
- point = get_point(point_str)
+ if point_str in NOCYCLE_POINTS:
+ point = NocyclePoint(point_str)
+ else:
+ LOG.warning(
+ f"{id_} - invalid cycle point: {point_str} ({exc})")
+ unmatched_tasks.append(id_)
+ continue
+ else:
+ point = get_point(point_str)
taskdef = self.config.taskdefs[name_str]
if taskdef.is_valid_point(point):
matched_tasks.add((taskdef.name, point))
diff --git a/cylc/flow/taskdef.py b/cylc/flow/taskdef.py
index 29e0f7d0de0..61f8062c43c 100644
--- a/cylc/flow/taskdef.py
+++ b/cylc/flow/taskdef.py
@@ -133,6 +133,7 @@ def __init__(self, name, rtcfg, run_mode, start_point, initial_point):
self.initial_point = initial_point
self.sequences = []
+
self.used_in_offset_trigger = False
# some defaults
@@ -264,15 +265,19 @@ def check_for_explicit_cycling(self):
raise TaskDefError(
"No cycling sequences defined for %s" % self.name)
- def get_parent_points(self, point):
+ def get_parent_points(self, point, seq=None):
"""Return the cycle points of my parents, at point."""
parent_points = set()
- for seq in self.sequences:
- if not seq.is_valid(point):
+ if seq:
+ sequences = [seq]
+ else:
+ sequences = self.sequences
+ for sequence in sequences:
+ if not sequence.is_valid(point):
continue
- if seq in self.dependencies:
+ if sequence in self.dependencies:
# task has prereqs in this sequence
- for dep in self.dependencies[seq]:
+ for dep in self.dependencies[sequence]:
if dep.suicide:
continue
for trig in dep.task_triggers:
@@ -309,9 +314,12 @@ def is_valid_point(self, point: 'PointBase') -> bool:
def first_point(self, icp):
"""Return the first point for this task."""
+ from cylc.flow.cycling.nocycle import NocycleSequence
point = None
adjusted = []
for seq in self.sequences:
+ if type(seq) is NocycleSequence:
+ continue
pt = seq.get_first_point(icp)
if pt:
# may be None if beyond the sequence bounds
@@ -333,7 +341,7 @@ def next_point(self, point):
p_next = min(adjusted)
return p_next
- def is_parentless(self, point):
+ def is_parentless(self, point, seq=None):
"""Return True if task has no parents at point.
Tasks are considered parentless if they have:
@@ -352,7 +360,8 @@ def is_parentless(self, point):
if self.sequential:
# Implicit parents
return False
- parent_points = self.get_parent_points(point)
+
+ parent_points = self.get_parent_points(point, seq)
return (
not parent_points
or all(x < self.start_point for x in parent_points)
diff --git a/tests/functional/alpha-omega/00-basic.t b/tests/functional/alpha-omega/00-basic.t
new file mode 100644
index 00000000000..c753503bf82
--- /dev/null
+++ b/tests/functional/alpha-omega/00-basic.t
@@ -0,0 +1,30 @@
+#!/usr/bin/env bash
+# THIS FILE IS PART OF THE CYLC WORKFLOW ENGINE.
+# Copyright (C) NIWA & British Crown (Met Office) & Contributors.
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see .
+#-------------------------------------------------------------------------------
+
+# Check basic separate triggering of alpha, omega, and main graphs.
+. "$(dirname "$0")/test_header"
+set_test_number 3
+
+install_and_validate
+
+reftest_run
+
+graph_workflow "${WORKFLOW_NAME}" "${WORKFLOW_NAME}.graph"
+cmp_ok "${WORKFLOW_NAME}.graph" "$TEST_SOURCE_DIR/${TEST_NAME_BASE}/reference.graph"
+
+purge
diff --git a/tests/functional/alpha-omega/00-basic/.cylcignore b/tests/functional/alpha-omega/00-basic/.cylcignore
new file mode 100644
index 00000000000..6ded72ca000
--- /dev/null
+++ b/tests/functional/alpha-omega/00-basic/.cylcignore
@@ -0,0 +1 @@
+twat
diff --git a/tests/functional/alpha-omega/00-basic/flow.cylc b/tests/functional/alpha-omega/00-basic/flow.cylc
new file mode 100644
index 00000000000..6e3b45ff681
--- /dev/null
+++ b/tests/functional/alpha-omega/00-basic/flow.cylc
@@ -0,0 +1,12 @@
+
+[scheduling]
+ cycling mode = integer
+ final cycle point = 2
+ [[graph]]
+ alpha = "a => b"
+ omega = "x => y"
+ R1 = "foo => bar"
+ P1 = "bar => baz"
+[runtime]
+ [[a, b, x, y]]
+ [[foo, bar, baz]]
diff --git a/tests/functional/alpha-omega/00-basic/reference.graph b/tests/functional/alpha-omega/00-basic/reference.graph
new file mode 100644
index 00000000000..8bfc40edead
--- /dev/null
+++ b/tests/functional/alpha-omega/00-basic/reference.graph
@@ -0,0 +1,13 @@
+edge "alpha/a" "alpha/b"
+edge "1/bar" "1/baz"
+edge "2/bar" "2/baz"
+edge "1/foo" "1/bar"
+graph
+node "alpha/a" "a\nalpha"
+node "alpha/b" "b\nalpha"
+node "1/bar" "bar\n1"
+node "2/bar" "bar\n2"
+node "1/baz" "baz\n1"
+node "2/baz" "baz\n2"
+node "1/foo" "foo\n1"
+stop
diff --git a/tests/functional/alpha-omega/00-basic/reference.log b/tests/functional/alpha-omega/00-basic/reference.log
new file mode 100644
index 00000000000..5d44fb3261f
--- /dev/null
+++ b/tests/functional/alpha-omega/00-basic/reference.log
@@ -0,0 +1,9 @@
+alpha/a -triggered off [] in flow 1
+alpha/b -triggered off ['alpha/a'] in flow 1
+1/foo -triggered off [] in flow 2
+2/bar -triggered off [] in flow 2
+1/bar -triggered off ['1/foo'] in flow 2
+2/baz -triggered off ['2/bar'] in flow 2
+1/baz -triggered off ['1/bar'] in flow 2
+omega/x -triggered off [] in flow 3
+omega/y -triggered off ['omega/x'] in flow 3
diff --git a/tests/functional/alpha-omega/01-restart.t b/tests/functional/alpha-omega/01-restart.t
new file mode 100644
index 00000000000..5a40023cb4d
--- /dev/null
+++ b/tests/functional/alpha-omega/01-restart.t
@@ -0,0 +1,34 @@
+#!/usr/bin/env bash
+# THIS FILE IS PART OF THE CYLC WORKFLOW ENGINE.
+# Copyright (C) NIWA & British Crown (Met Office) & Contributors.
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see .
+#-------------------------------------------------------------------------------
+
+# Check restart in alpha, maind, and omega graphs.
+
+. "$(dirname "$0")/test_header"
+set_test_number 5
+
+install_and_validate
+
+SRC_DIR="$TEST_SOURCE_DIR/${TEST_NAME_BASE}"
+RUN_DIR="$WORKFLOW_RUN_DIR"
+
+for RUN in 1 2 3 4; do
+ cp "${SRC_DIR}/reference.log.$RUN" "${RUN_DIR}/reference.log"
+ reftest_run "${TEST_NAME_BASE}-${RUN}"
+done
+
+purge
diff --git a/tests/functional/alpha-omega/01-restart/flow.cylc b/tests/functional/alpha-omega/01-restart/flow.cylc
new file mode 100644
index 00000000000..2140de2eb37
--- /dev/null
+++ b/tests/functional/alpha-omega/01-restart/flow.cylc
@@ -0,0 +1,23 @@
+[scheduling]
+ cycling mode = integer
+ final cycle point = 2
+ [[graph]]
+ alpha = "a => b"
+ omega = "x => y"
+ R1 = "foo => bar"
+ P1 = "bar[-P1] => bar => baz"
+[runtime]
+ [[a]]
+ script = cylc stop $CYLC_WORKFLOW_ID
+ [[b]]
+ [[x]]
+ script = cylc stop $CYLC_WORKFLOW_ID
+ [[y]]
+ [[foo]]
+ [[bar]]
+ script = """
+ if ((CYLC_TASK_CYCLE_POINT == 1)); then
+ cylc stop $CYLC_WORKFLOW_ID
+ fi
+ """
+ [[baz]]
diff --git a/tests/functional/alpha-omega/01-restart/reference.log.1 b/tests/functional/alpha-omega/01-restart/reference.log.1
new file mode 100644
index 00000000000..7554e7516e0
--- /dev/null
+++ b/tests/functional/alpha-omega/01-restart/reference.log.1
@@ -0,0 +1 @@
+alpha/a -triggered off [] in flow 1
diff --git a/tests/functional/alpha-omega/01-restart/reference.log.2 b/tests/functional/alpha-omega/01-restart/reference.log.2
new file mode 100644
index 00000000000..ae6c028f2fc
--- /dev/null
+++ b/tests/functional/alpha-omega/01-restart/reference.log.2
@@ -0,0 +1,3 @@
+alpha/b -triggered off ['alpha/a'] in flow 1
+1/foo -triggered off [] in flow 2
+1/bar -triggered off ['0/bar', '1/foo'] in flow 2
diff --git a/tests/functional/alpha-omega/01-restart/reference.log.3 b/tests/functional/alpha-omega/01-restart/reference.log.3
new file mode 100644
index 00000000000..cc9e663fd41
--- /dev/null
+++ b/tests/functional/alpha-omega/01-restart/reference.log.3
@@ -0,0 +1,4 @@
+1/baz -triggered off ['1/bar'] in flow 2
+2/bar -triggered off ['1/bar'] in flow 2
+2/baz -triggered off ['2/bar'] in flow 2
+omega/x -triggered off [] in flow 3
diff --git a/tests/functional/alpha-omega/01-restart/reference.log.4 b/tests/functional/alpha-omega/01-restart/reference.log.4
new file mode 100644
index 00000000000..c47f0b3c77f
--- /dev/null
+++ b/tests/functional/alpha-omega/01-restart/reference.log.4
@@ -0,0 +1 @@
+omega/y -triggered off ['omega/x'] in flow 3
diff --git a/tests/functional/alpha-omega/02-retrigger.t b/tests/functional/alpha-omega/02-retrigger.t
new file mode 100644
index 00000000000..1eecf2c5cbd
--- /dev/null
+++ b/tests/functional/alpha-omega/02-retrigger.t
@@ -0,0 +1,24 @@
+#!/usr/bin/env bash
+# THIS FILE IS PART OF THE CYLC WORKFLOW ENGINE.
+# Copyright (C) NIWA & British Crown (Met Office) & Contributors.
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see .
+#-------------------------------------------------------------------------------
+
+# Check manual triggering of alpha and omega graph tasks.
+
+. "$(dirname "$0")/test_header"
+set_test_number 2
+reftest
+exit
diff --git a/tests/functional/alpha-omega/02-retrigger/flow.cylc b/tests/functional/alpha-omega/02-retrigger/flow.cylc
new file mode 100644
index 00000000000..7f1889a9f19
--- /dev/null
+++ b/tests/functional/alpha-omega/02-retrigger/flow.cylc
@@ -0,0 +1,24 @@
+
+# 1. alpha graph completes
+# 2. then main graph starts, and triggers alpha/a and omega/x with --flow=none
+# 3. then when all tasks finish, omega graph runs
+
+# TODO alpha and omega graphs need special flow number treatment?
+
+[scheduling]
+ [[graph]]
+ alpha = "a => b"
+ omega = "x => y"
+ R1 = "foo => bar => baz"
+[runtime]
+ [[a]]
+ [[b]]
+ [[x]]
+ [[y]]
+ [[foo]]
+ [[bar]]
+ script = """
+ cylc trigger --flow=none $CYLC_WORKFLOW_ID//alpha/a
+ cylc trigger --flow=none $CYLC_WORKFLOW_ID//omega/x
+ """
+ [[baz]]
diff --git a/tests/functional/alpha-omega/02-retrigger/reference.log b/tests/functional/alpha-omega/02-retrigger/reference.log
new file mode 100644
index 00000000000..807771322ac
--- /dev/null
+++ b/tests/functional/alpha-omega/02-retrigger/reference.log
@@ -0,0 +1,9 @@
+alpha/a -triggered off [] in flow 1
+alpha/b -triggered off ['alpha/a'] in flow 1
+1/foo -triggered off [] in flow 2
+1/bar -triggered off ['1/foo'] in flow 2
+alpha/a -triggered off [] in flow none
+omega/x -triggered off [] in flow none
+1/baz -triggered off ['1/bar'] in flow 2
+omega/x -triggered off [] in flow 3
+omega/y -triggered off ['omega/x'] in flow 3
diff --git a/tests/functional/alpha-omega/test_header b/tests/functional/alpha-omega/test_header
new file mode 120000
index 00000000000..90bd5a36f92
--- /dev/null
+++ b/tests/functional/alpha-omega/test_header
@@ -0,0 +1 @@
+../lib/bash/test_header
\ No newline at end of file
diff --git a/tests/integration/utils/flow_tools.py b/tests/integration/utils/flow_tools.py
index 2a172a09140..2d86beb48de 100644
--- a/tests/integration/utils/flow_tools.py
+++ b/tests/integration/utils/flow_tools.py
@@ -69,6 +69,14 @@ def _make_flow(
return reg
+def _load_graph(sched):
+ """Get scheduler to load the main graph."""
+ if sched.is_restart:
+ sched._load_pool_from_db()
+ else:
+ sched._load_pool_from_point()
+
+
@contextmanager
def _make_scheduler():
"""Return a scheduler object for a flow registration."""
@@ -106,6 +114,7 @@ async def _start_flow(
# exception occurs in Scheduler
try:
await schd.start()
+ _load_graph(schd)
finally:
# After this `yield`, the `with` block of the context manager
# is executed:
@@ -137,6 +146,7 @@ async def _run_flow(
# exception occurs in Scheduler
try:
await schd.start()
+ _load_graph(schd)
# Do not await as we need to yield control to the main loop:
task = asyncio.create_task(schd.run_scheduler())
finally: