Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Task proxy spawn on demand. #3515

Merged
merged 29 commits into from
Jul 29, 2020
Merged
Show file tree
Hide file tree
Changes from 13 commits
Commits
Show all changes
29 commits
Select commit Hold shift + click to select a range
edf61d3
Spawn on demand
hjoliver Apr 24, 2020
492cd22
Fix tests
hjoliver Jul 1, 2020
b132fff
Add functional tests for reflow.
hjoliver Jul 2, 2020
b800703
Update change log.
hjoliver Jul 2, 2020
af686fb
Shellcheck tweak.
hjoliver Jul 2, 2020
082d565
Merge branch 'master' into spawn-on-demand
hjoliver Jul 2, 2020
cae790f
Auto run main loop flow label pruning.
hjoliver Jul 2, 2020
1513b4e
Reformulate flow stop as `cylc stop` option.
hjoliver Jul 3, 2020
373475e
Add flow stop functional tests.
hjoliver Jul 3, 2020
9627b14
De-flake a test.
hjoliver Jul 3, 2020
816ebe2
Address OS review feedback.
hjoliver Jul 16, 2020
66bfeb8
Merge branch 'master' into spawn-on-demand-merge
hjoliver Jul 16, 2020
8bdeb25
Tweak scan test.
hjoliver Jul 16, 2020
1cc7bac
Fix sched options for integration tests.
hjoliver Jul 19, 2020
68c59cd
Remove some comments.
hjoliver Jul 19, 2020
39af527
Merge branch 'master' into spawn-on-demand
hjoliver Jul 20, 2020
61ac4b2
Address MH review feedback.
hjoliver Jul 21, 2020
f7df57f
Address more MH review feedback.
hjoliver Jul 22, 2020
2748f3c
Address DS review feedback.
hjoliver Jul 22, 2020
157b24b
No need to set stuck future tasks to waiting.
hjoliver Jul 22, 2020
e078f53
Fix validation of offset triggers.
hjoliver Jul 23, 2020
aa49848
Event-driven suicide triggering.
hjoliver Jul 24, 2020
c7ffec1
No implicit task creation by suicide triggers.
hjoliver Jul 24, 2020
727588a
Fix handling of abs triggers.
hjoliver Jul 26, 2020
66df7fb
Style fixes.
hjoliver Jul 28, 2020
4c42a3c
Update tests/functional/api-suite-info/00-get-graph-raw-1/suite.rc
hjoliver Jul 28, 2020
e24780f
Update DB schema test.
hjoliver Jul 28, 2020
de855b0
Remove debug print.
hjoliver Jul 28, 2020
48441ab
New SoD test for abs triggers.
hjoliver Jul 28, 2020
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
The table of contents is too big for display.
Diff view
Diff view
  •  
  •  
  •  
7 changes: 7 additions & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,13 @@ namespace, under `cylc.jinja.filters`.

Cylc Review was also removed in this version.

-------------------------------------------------------------------------------
<<<<<<< HEAD
## __cylc-8.0a3 (2020-Q3?)__

[#3515](https://github.com/cylc/cylc-flow/pull/3515) - spawn-on-demand: a more
efficient way of the evolving the workflow via the graph.

-------------------------------------------------------------------------------
## __cylc-8.0a2 (2020-07-03)__

Expand Down
10 changes: 9 additions & 1 deletion cylc/flow/cfgspec/globalcfg.py
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,8 @@
with Conf('main loop', desc='''
Configuration of the Cylc Scheduler's main loop.
'''):
Conf('plugins', VDR.V_STRING_LIST, ['health check'], desc='''
Conf('plugins', VDR.V_STRING_LIST,
['health check', 'prune flow labels'], desc='''
Configure the default main loop plugins to use when
starting up new suites.
''')
Expand All @@ -146,6 +147,13 @@
The interval with which this plugin is run.
''')

with Conf('prune flow labels', meta=MainLoopPlugin, desc='''
Prune redundant flow labels.
'''):
Conf('interval', VDR.V_INTERVAL, DurationFloat(600), desc='''
The interval with which this plugin is run.
''')

# suite
with Conf('suite logging', desc='''
The suite event log, held under the suite run directory, is maintained
Expand Down
18 changes: 0 additions & 18 deletions cylc/flow/cfgspec/suite.py
Original file line number Diff line number Diff line change
Expand Up @@ -269,24 +269,6 @@
consecutive cycle points to be active at any time, adjusted up if
necessary for any future triggering.
''')
Conf('spawn to max active cycle points', VDR.V_BOOLEAN, desc='''
Allows tasks to spawn out to
:cylc:conf:`[..]max active cycle points`,
removing restriction that a task
has to have submitted before its successor can be spawned.

.. warning::
This should be used with care given the potential impact of
additional task proxies in terms of memory and cpu for the
cylc server program. Also, use of the setting may highlight
any issues with suite design relying on the default behaviour
where downstream tasks would otherwise be waiting on ones
upstream submitting and the suite would have stalled e.g. a
housekeeping task at a later cycle deleting an earlier cycle's
data before that cycle has had chance to run where previously
the task would not have been spawned until its predecessor had
been submitted.
''')

with Conf('queues', desc='''
Configuration of internal queues, by which the number of
Expand Down
43 changes: 14 additions & 29 deletions cylc/flow/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -1553,7 +1553,7 @@ def generate_taskdefs(self, orig_expr, left_nodes, right, seq):
# if right is None, lefts are lone nodes
# for which we still define the taskdefs
continue
name, offset_is_from_icp, _, offset, _ = (
name, offset, _, offset_is_from_icp, _, _ = (
GraphNodeParser.get_inst().parse(node))

if name not in self.cfg['runtime']:
Expand Down Expand Up @@ -1620,30 +1620,9 @@ def generate_triggers(self, lexpression, left_nodes, right, seq,
xtrig_labels.add(left[1:])
continue
# (GraphParseError checked above)
name, offset_is_from_icp, offset_is_irregular, offset, output = (
(name, offset, output, offset_is_from_icp,
offset_is_irregular, offset_is_absolute) = (
GraphNodeParser.get_inst().parse(left))
ltaskdef = self.taskdefs[name]

# Determine intercycle offsets.
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Intercycle offsets were used for cycle point housekeeping in SoS.

abs_cycle_point = None
cycle_point_offset = None
if offset_is_from_icp:
first_point = get_point_relative(offset, self.initial_point)
last_point = seq.get_stop_point()
abs_cycle_point = first_point
if last_point is None:
# This dependency persists for the whole suite run.
ltaskdef.intercycle_offsets.add((None, seq))
else:
ltaskdef.intercycle_offsets.add(
(str(-(last_point - first_point)), seq))
elif offset:
if offset_is_irregular:
offset_tuple = (offset, seq)
else:
offset_tuple = (offset, None)
ltaskdef.intercycle_offsets.add(offset_tuple)
cycle_point_offset = offset

# Qualifier.
outputs = self.cfg['runtime'][name]['outputs']
Expand All @@ -1658,7 +1637,9 @@ def generate_triggers(self, lexpression, left_nodes, right, seq,
qualifier = TASK_OUTPUT_SUCCEEDED

# Generate TaskTrigger if not already done.
key = (name, abs_cycle_point, cycle_point_offset, qualifier)
key = (name, offset, qualifier,
offset_is_irregular, offset_is_absolute,
offset_is_from_icp, self.initial_point)
try:
task_trigger = task_triggers[key]
except KeyError:
Expand All @@ -1667,6 +1648,11 @@ def generate_triggers(self, lexpression, left_nodes, right, seq,

triggers[left] = task_trigger

# (name is left name)
self.taskdefs[name].add_graph_child(task_trigger, right, seq)
# graph_parents not currently used but might be needed soon:
# self.taskdefs[right].add_graph_parent(task_trigger, name, seq)

# Walk down "expr_list" depth first, and replace any items matching a
# key in "triggers" ("left" values) with the trigger.
stack = [expr_list]
Expand Down Expand Up @@ -1849,7 +1835,7 @@ def get_graph_raw(self, start_point_string, stop_point_string,
offset_is_from_icp = False
offset = None
else:
name, offset_is_from_icp, _, offset, _ = (
name, offset, _, offset_is_from_icp, _, _ = (
GraphNodeParser.get_inst().parse(left))
if offset:
if offset_is_from_icp:
Expand Down Expand Up @@ -1963,7 +1949,7 @@ def get_graph_edges(self, start_point, stop_point):
offset_is_from_icp = False
offset = None
else:
name, offset_is_from_icp, _, offset, _ = (
name, offset, _, offset_is_from_icp, _, _ = (
GraphNodeParser.get_inst().parse(left))
if offset:
if offset_is_from_icp:
Expand Down Expand Up @@ -2203,8 +2189,7 @@ def _get_taskdef(self, name):

# Get the taskdef object for generating the task proxy class
taskd = TaskDef(
name, rtcfg, self.run_mode(), self.start_point,
self.cfg['scheduling']['spawn to max active cycle points'])
name, rtcfg, self.run_mode(), self.start_point)

# TODO - put all taskd.foo items in a single config dict

Expand Down
29 changes: 14 additions & 15 deletions cylc/flow/data_messages.proto
Original file line number Diff line number Diff line change
Expand Up @@ -162,21 +162,20 @@ message PbTaskProxy {
optional string task = 3;
optional string state = 4;
optional string cycle_point = 5;
optional bool spawned = 6;
optional int32 depth = 7;
optional int32 job_submits = 8;
optional string latest_message = 9;
repeated string outputs = 10;
repeated string broadcasts = 11;
repeated string namespace = 12;
repeated PbPrerequisite prerequisites = 13;
repeated string jobs = 14;
repeated string parents = 15;
optional string first_parent = 16;
optional string name = 17; /* filter item */
optional bool is_held = 18;
repeated string edges = 19;
repeated string ancestors = 20;
optional int32 depth = 6;
dwsutherland marked this conversation as resolved.
Show resolved Hide resolved
optional int32 job_submits = 7;
optional string latest_message = 8;
repeated string outputs = 9;
repeated string broadcasts = 10;
repeated string namespace = 11;
repeated PbPrerequisite prerequisites = 12;
repeated string jobs = 13;
repeated string parents = 14;
optional string first_parent = 15;
optional string name = 16; /* filter item */
optional bool is_held = 17;
repeated string edges = 18;
repeated string ancestors = 19;
}

message PbFamily {
Expand Down
Loading