Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Integer flow labels with flow metadata. #4300

Merged
merged 14 commits into from
Oct 20, 2021
3 changes: 3 additions & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,9 @@ Third beta release of Cylc 8.
[#4286](https://github.com/cylc/cylc-flow/pull/4286) -
Add an option for displaying source workflows in `cylc scan`.

[#4300](https://github.com/cylc/cylc-flow/pull/4300) - Integer flow labels with
flow metadata, and improved task logging.

[#4291](https://github.com/cylc/cylc-flow/pull/4291) -
Remove obsolete `cylc edit` and `cylc search` commands.

Expand Down
15 changes: 4 additions & 11 deletions cylc/flow/cfgspec/globalcfg.py
Original file line number Diff line number Diff line change
Expand Up @@ -471,7 +471,7 @@
Configuration of the Cylc Scheduler's main loop.
'''):
Conf('plugins', VDR.V_STRING_LIST,
['health check', 'prune flow labels', 'reset bad hosts'],
['health check', 'reset bad hosts'],
desc='''
Configure the default main loop plugins to use when
starting new workflows.
Expand All @@ -491,13 +491,6 @@
The interval with which this plugin is run.
''')

with Conf('prune flow labels', meta=MainLoopPlugin, desc='''
Prune redundant flow labels.
'''):
Conf('interval', VDR.V_INTERVAL, DurationFloat(600), desc='''
The interval with which this plugin is run.
''')

with Conf('reset bad hosts', meta=MainLoopPlugin, desc='''
Periodically clear the scheduler list of unreachable (bad)
hosts.
Expand Down Expand Up @@ -527,9 +520,9 @@
.. versionadded:: 8.0.0
'''):
Conf('source dirs', VDR.V_STRING_LIST, default=['~/cylc-src'], desc='''
A list of paths where ``cylc install <flow_name>`` will look for
a workflow of that name. All workflow source directories in these
locations will also show up in the GUI, ready for installation.
A list of paths for ``cylc install <name>`` to search for workflow
<name>. All workflow source directories in these locations will
also show up in the GUI, ready for installation.

.. note::
If workflow source directories of the same name exist in more
Expand Down
3 changes: 1 addition & 2 deletions cylc/flow/data_messages.proto
Original file line number Diff line number Diff line change
Expand Up @@ -212,8 +212,7 @@ message PbTaskProxy {
optional bool is_held = 17;
repeated string edges = 18;
repeated string ancestors = 19;
optional string flow_label = 20;
optional bool reflow = 21;
optional string flow_nums = 20;
optional PbClockTrigger clock_trigger = 22;
map<string, PbTrigger> external_triggers = 23;
map<string, PbTrigger> xtriggers = 24;
Expand Down
109 changes: 47 additions & 62 deletions cylc/flow/data_messages_pb2.py

Large diffs are not rendered by default.

23 changes: 11 additions & 12 deletions cylc/flow/data_store_mgr.py
Original file line number Diff line number Diff line change
Expand Up @@ -627,8 +627,6 @@ def increment_graph_window(
Task name.
point (cylc.flow.cycling.PointBase):
PointBase derived object.
flow_label (str):
Flow label used to distinguish multiple runs.
edge_distance (int):
Graph distance from active/origin node.
active_id (str):
Expand Down Expand Up @@ -690,13 +688,15 @@ def increment_graph_window(
if edge_distance == 1:
descendant = True
self._expand_graph_window(
s_id, s_node, items, active_id, itask.flow_label,
itask.reflow, edge_distance, descendant, False)
s_id, s_node, items, active_id, itask.flow_nums,
edge_distance, descendant, False)

for items in generate_graph_parents(
itask.tdef, itask.point).values():
itask.tdef, itask.point
).values():
self._expand_graph_window(
s_id, s_node, items, active_id, itask.flow_label,
itask.reflow, edge_distance, False, True)
s_id, s_node, items, active_id, itask.flow_nums,
edge_distance, False, True)

if edge_distance == 1:
levels = self.n_window_boundary_nodes[active_id].keys()
Expand All @@ -714,7 +714,7 @@ def increment_graph_window(
self.n_window_edges[active_id])

def _expand_graph_window(
self, s_id, s_node, items, active_id, flow_label, reflow,
self, s_id, s_node, items, active_id, flow_nums,
edge_distance, descendant=False, is_parent=False):
"""Construct nodes/edges for children/parents of source node."""
final_point = self.schd.config.final_point
Expand Down Expand Up @@ -756,8 +756,8 @@ def _expand_graph_window(
self.increment_graph_window(
TaskProxy(
self.schd.config.get_taskdef(t_name),
t_point, flow_label,
submit_num=0, reflow=reflow),
t_point, flow_nums, submit_num=0
),
edge_distance, active_id, descendant, is_parent)

def remove_pool_node(self, name, point):
Expand Down Expand Up @@ -828,14 +828,13 @@ def generate_ghost_task(self, tp_id, itask, is_parent=False):
depth=task_def.depth,
name=name,
state=TASK_STATUS_WAITING,
flow_label=itask.flow_label
flow_nums=json.dumps(list(itask.flow_nums))
)
if is_parent and tp_id not in self.n_window_nodes:
# TODO: Load task info from DB, including itask prerequisites
tproxy.state = TASK_STATUS_EXPIRED
else:
tproxy.state = TASK_STATUS_WAITING
tproxy.reflow = itask.reflow

tproxy.namespace[:] = task_def.namespace
if is_orphan:
Expand Down
79 changes: 79 additions & 0 deletions cylc/flow/flow_mgr.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
# THIS FILE IS PART OF THE CYLC WORKFLOW ENGINE.
# Copyright (C) NIWA & British Crown (Met Office) & Contributors.
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.

"""Manage flow counter and flow metadata."""

from typing import Dict, Set
import datetime

from cylc.flow import LOG
from cylc.flow.workflow_db_mgr import WorkflowDatabaseManager


class FlowMgr:
"""Logic to manage flow counter and flow metadata."""

def __init__(self, db_mgr: "WorkflowDatabaseManager") -> None:
"""Initialise the flow manager."""
self.db_mgr = db_mgr
self.flows: Dict[int, Dict[str, str]] = {}
self.counter: int = 0

def get_new_flow(self, description: str) -> int:
"""Increment flow counter, record flow metadata."""
self.counter += 1
# record start time to nearest second
now = datetime.datetime.now()
hjoliver marked this conversation as resolved.
Show resolved Hide resolved
now_sec: str = str(
now - datetime.timedelta(microseconds=now.microsecond))
self.flows[self.counter] = {
"description": description or "no description",
"start_time": now_sec
}
LOG.info(
f"New flow: {self.counter} "
f"({description}) "
f"{now_sec}"
)
self.db_mgr.put_insert_workflow_flows(
self.counter,
self.flows[self.counter]
)
return self.counter

def load_from_db(self, flow_nums: Set[int]) -> None:
"""Load flow data for scheduler restart.

Sets the flow counter to the max flow number in the DB.
Loads metadata for selected flows (those in the task pool at startup).

"""
self.counter = self.db_mgr.pri_dao.select_workflow_flows_max_flow_num()
self.flows = self.db_mgr.pri_dao.select_workflow_flows(flow_nums)
self._log()

def _log(self) -> None:
"""Write current flow info to log."""
LOG.info(
"Flows:\n" + "\n".join(
(
f"flow: {f} "
f"({self.flows[f]['description']}) "
f"{self.flows[f]['start_time']}"
)
for f in self.flows
)
)
4 changes: 3 additions & 1 deletion cylc/flow/job_file.py
Original file line number Diff line number Diff line change
Expand Up @@ -226,7 +226,9 @@ def _write_task_environment(self, handle, job_conf):
handle.write(
'\n export CYLC_TASK_TRY_NUMBER=%s' % job_conf['try_num'])
handle.write(
'\n export CYLC_TASK_FLOW_LABEL=%s' % job_conf['flow_label'])
"\n export CYLC_TASK_FLOWS="
f"{','.join(str(f) for f in job_conf['flow_nums'])}"
)
# Standard parameter environment variables
for var, val in job_conf['param_var'].items():
handle.write('\n export CYLC_TASK_PARAM_%s="%s"' % (var, val))
Expand Down
38 changes: 27 additions & 11 deletions cylc/flow/network/resolvers.py
Original file line number Diff line number Diff line change
Expand Up @@ -712,7 +712,7 @@ def set_graph_window_extent(self, n_edge_distance):
else:
return (False, 'Edge distance cannot be negative')

def force_spawn_children(self, tasks, outputs):
def force_spawn_children(self, tasks, outputs, flow_num):
"""Spawn children of given task outputs.

Args:
hjoliver marked this conversation as resolved.
Show resolved Hide resolved
Expand All @@ -729,8 +729,15 @@ def force_spawn_children(self, tasks, outputs):

"""
self.schd.command_queue.put(
("force_spawn_children", (tasks,),
{'outputs': outputs}))
(
"force_spawn_children",
(tasks,),
{
"outputs": outputs,
"flow_num": flow_num
}
)
)
return (True, 'Command queued')

def stop(
Expand All @@ -739,7 +746,7 @@ def stop(
cycle_point: Optional[str] = None,
clock_time: Optional[str] = None,
task: Optional[str] = None,
flow_label: Optional[str] = None
flow_num: Optional[int] = None
) -> Tuple[bool, str]:
"""Stop the workflow or specific flow from spawning any further.

Expand All @@ -748,7 +755,8 @@ def stop(
cycle_point: Cycle point after which to stop.
clock_time: Wallclock time after which to stop.
task: Stop after this task succeeds.
flow_label: The flow to sterilise.
flow_num: The flow to stop.
):

Returns:
outcome: True if command successfully queued.
Expand All @@ -763,19 +771,21 @@ def stop(
'cycle_point': cycle_point,
'clock_time': clock_time,
'task': task,
'flow_label': flow_label,
'flow_num': flow_num,
})
))
return (True, 'Command queued')

def force_trigger_tasks(self, tasks, reflow=False):
def force_trigger_tasks(self, tasks, reflow, flow_descr):
"""Trigger submission of task jobs where possible.

Args:
tasks (list):
List of identifiers, see `task globs`_
reflow (bool, optional):
Start new flow(s) from triggered tasks.
reflow (bool):
Start new flow from triggered tasks.
flow_descr (str):
Description of new flow.

Returns:
tuple: (outcome, message)
Expand All @@ -787,6 +797,12 @@ def force_trigger_tasks(self, tasks, reflow=False):

"""
self.schd.command_queue.put(
("force_trigger_tasks", (tasks,),
{"reflow": reflow}))
(
"force_trigger_tasks", (tasks,),
{
"reflow": reflow,
"flow_descr": flow_descr
}
)
)
return (True, 'Command queued')
8 changes: 5 additions & 3 deletions cylc/flow/network/schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -909,7 +909,7 @@ class Meta:
is_held = Boolean()
is_queued = Boolean()
is_runahead = Boolean()
flow_label = String()
flow_nums = String()
depth = Int()
job_submits = Int()
outputs = List(
Expand Down Expand Up @@ -1735,8 +1735,8 @@ class Arguments:
task = TaskID(
description='Stop after this task succeeds.'
)
flow_label = String(
description='Label of flow to sterilise.'
flow_num = Int(
description='Number of flow to stop.'
)

result = GenericScalar()
Expand Down Expand Up @@ -1860,6 +1860,7 @@ class Arguments(TaskMutation.Arguments):
default_value=[TASK_OUTPUT_SUCCEEDED],
description='List of task outputs to satisfy.'
)
flow_num = Int()


class Trigger(Mutation, TaskMutation):
Expand All @@ -1877,6 +1878,7 @@ class Meta:

class Arguments(TaskMutation.Arguments):
reflow = Boolean()
flow_descr = String()


def _mut_field(cls):
Expand Down
Loading