Skip to content

Commit

Permalink
Cleaned up handling of triggers and offsets.
Browse files Browse the repository at this point in the history
  • Loading branch information
hjoliver committed Apr 23, 2020
1 parent 873b537 commit 7535e12
Show file tree
Hide file tree
Showing 7 changed files with 133 additions and 103 deletions.
27 changes: 9 additions & 18 deletions cylc/flow/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@
from cylc.flow.cycling.loader import (
get_point, get_point_relative, get_interval, get_interval_cls,
get_sequence, get_sequence_cls, init_cyclers, INTEGER_CYCLING_TYPE,
ISO8601_CYCLING_TYPE, is_offset_absolute)
ISO8601_CYCLING_TYPE)
from cylc.flow.cycling.iso8601 import ingest_time
import cylc.flow.flags
from cylc.flow.graphnode import GraphNodeParser
Expand Down Expand Up @@ -1522,7 +1522,7 @@ def generate_taskdefs(self, orig_expr, left_nodes, right, seq):
# if right is None, lefts are lone nodes
# for which we still define the taskdefs
continue
name, offset_is_from_icp, _, offset, _ = (
name, offset, _, offset_is_from_icp, _, _ = (
GraphNodeParser.get_inst().parse(node))

if name not in self.cfg['runtime']:
Expand Down Expand Up @@ -1589,21 +1589,11 @@ def generate_triggers(self, lexpression, left_nodes, right, seq,
xtrig_labels.add(left[1:])
continue
# (GraphParseError checked above)
name, offset_is_from_icp, offset_is_irregular, offset, output = (
(name, offset, output, offset_is_from_icp,
offset_is_irregular, offset_is_absolute) = (
GraphNodeParser.get_inst().parse(left))
ltaskdef = self.taskdefs[name]

# TODO why is offset_is_from_icp not treated directly as an
# absolute cycle point? (It works as is but may be unnecessary
# double handling?)
abs_cycle_point = None
if offset is not None and (
not offset_is_irregular and is_offset_absolute(offset)):
abs_cycle_point = get_point(offset).standardise()
elif offset_is_from_icp:
abs_cycle_point = get_point_relative(
offset, self.initial_point)

# Qualifier.
outputs = self.cfg['runtime'][name]['outputs']
if outputs and output in outputs:
Expand All @@ -1617,8 +1607,9 @@ def generate_triggers(self, lexpression, left_nodes, right, seq,
qualifier = TASK_OUTPUT_SUCCEEDED

# Generate TaskTrigger if not already done.
key = (name, abs_cycle_point, offset_is_irregular, offset,
qualifier)
key = (name, offset, qualifier,
offset_is_irregular, offset_is_absolute,
offset_is_from_icp, self.initial_point)
try:
task_trigger = task_triggers[key]
except KeyError:
Expand Down Expand Up @@ -1813,7 +1804,7 @@ def get_graph_raw(self, start_point_string, stop_point_string,
offset_is_from_icp = False
offset = None
else:
name, offset_is_from_icp, _, offset, _ = (
name, offset, _, offset_is_from_icp, _, _ = (
GraphNodeParser.get_inst().parse(left))
if offset:
if offset_is_from_icp:
Expand Down Expand Up @@ -1927,7 +1918,7 @@ def get_graph_edges(self, start_point, stop_point):
offset_is_from_icp = False
offset = None
else:
name, offset_is_from_icp, _, offset, _ = (
name, offset, _, offset_is_from_icp, _, _ = (
GraphNodeParser.get_inst().parse(left))
if offset:
if offset_is_from_icp:
Expand Down
14 changes: 10 additions & 4 deletions cylc/flow/graphnode.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,8 @@

import re

from cylc.flow.cycling.loader import get_interval, get_interval_cls
from cylc.flow.cycling.loader import (
get_interval, get_interval_cls, is_offset_absolute)
from cylc.flow.exceptions import GraphParseError
from cylc.flow.task_id import TaskID

Expand All @@ -36,7 +37,7 @@ class GraphNodeParser(object):
([^\]]*) # Continue until next ']'
\] # Stop at next ']'
)? # End optional [offset] syntax]
(?::([\w-]+))? # Optional type (e.g. :succeed)
(?::([\w-]+))? # Optional output (e.g. :succeed)
$
""", re.X)

Expand Down Expand Up @@ -89,7 +90,8 @@ def parse(self, node):
Return:
tuple:
(name, offset_is_from_icp, offset_is_irregular, offset, output)
(name, offset, output,
offset_is_from_icp, offset_is_irregular, offset_is_absolute)
Raise:
GraphParseError: on illegal syntax.
Expand All @@ -102,11 +104,15 @@ def parse(self, node):
if offset_is_from_icp and not offset:
offset = self._get_offset()
offset_is_irregular = False
offset_is_absolute = False
if offset:
if is_offset_absolute(offset):
offset_is_absolute = True
if self.REC_IRREGULAR_OFFSET.search(offset):
offset_is_irregular = True
else:
offset = self._get_offset(offset)
self._nodes[node] = (
name, offset_is_from_icp, offset_is_irregular, offset, output)
name, offset, output,
offset_is_from_icp, offset_is_irregular, offset_is_absolute)
return self._nodes[node]
7 changes: 4 additions & 3 deletions cylc/flow/task_pool.py
Original file line number Diff line number Diff line change
Expand Up @@ -439,8 +439,6 @@ def release_runahead_task(self, itask):
self.rhpool_changed = True
if itask.tdef.max_future_prereq_offset is not None:
self.set_max_future_offset()
# Auto-spawn next instance of tasks with no parents at the next point.
# (No parents, or all parents prior to the suite start point).
if itask.tdef.sequential:
# implicit prev-instance parent
return
Expand All @@ -449,12 +447,15 @@ def release_runahead_task(self, itask):
parent_points = itask.tdef.get_parent_points(next_point)
if (not parent_points or
all(x < self.config.start_point for x in parent_points)):
# Auto-spawn next instance of tasks with no parents at the next
# point (or with all parents before the suite start point).
self.spawn(itask.tdef.name, itask.point,
itask.tdef.name, next_point)
else:
# Autos-spawn next instance of tasks with absolute triggers.
abs_triggers = itask.tdef.get_abs_triggers(next_point)
for trig in abs_triggers:
self.spawn(trig.task_name, trig.abs_cycle_point,
self.spawn(trig.task_name, trig.get_point(next_point),
itask.tdef.name, next_point, trig.output,
finished=True)

Expand Down
67 changes: 23 additions & 44 deletions cylc/flow/task_proxy.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,6 @@
TaskState, TASK_STATUS_WAITING, TASK_STATUS_RETRYING,
TASK_OUTPUT_FAILED, TASK_OUTPUT_SUCCEEDED)
from cylc.flow.wallclock import get_unix_time_from_time_string as str2time
from cylc.flow.cycling.loader import (
get_point, get_point_relative, get_interval, is_offset_absolute)


class TaskProxy(object):
Expand Down Expand Up @@ -250,55 +248,36 @@ def __init__(

self.state = TaskState(tdef, self.point, status, is_held)

# Store children by task output, for spawn-on-demand.
# Only spawn if on-sequence. Example:
# T06 = "waz[-PT6H] => foo"
# PT6H = "waz"
# waz should trigger foo only if foo is on the T06 sequence

# TODO sort out the confused handling of absolute vs irregular offsets
# below: is_offset_absolute(irregular_offset) erroneously returns True.
# And why is both abs_cycle_point and is_offset_absolute() needed?
# Determine children of this task (for spawning).
self.children = {}
for seq, dout in tdef.downstreams.items():
for trig, downs in dout.items():
if trig.output not in self.children:
self.children[trig.output] = []
for name, offset in downs:
if offset is None:
point = self.point
elif trig.offset_is_irregular:
# change offset sign, e.g. -P1D+PT18H to +P1D-PT18H
point = get_point_relative(
offset.translate(offset.maketrans('-+', '+-')),
self.point)
elif is_offset_absolute(offset) or trig.abs_cycle_point:
# If absolute, first child start of sequence.
# E.g. for "R/1/P1 = foo[2] => bar"
# foo.2 should spawn bar.1
# Then we auto-spawn bar.2 etc.
point = seq.get_start_point()
else:
point = self.point - get_interval(offset)
if seq.is_on_sequence(point):
self.children[trig.output].append((name, point))
for output, downs in dout.items():
if output not in self.children:
self.children[output] = []
for name, trigger in downs:
child_point = trigger.get_child_point(self.point, seq)
if (trigger.offset_is_absolute or
trigger.offset_is_from_icp):
# (TODO: self.point is irrelevant, find a better way?)
# (parent_point is the absolute offset)
if trigger.get_parent_point(self.point) != self.point:
# If 'foo[^] => bar' only spawn off of '^'!
# (After which we auto-spawn bar instances).
continue
if seq.is_on_sequence(child_point):
# Only spawn if on-sequence. Example:
# T06 = "waz[-PT6H] => foo"
# PT6H = "waz"
# foo should trigger only if it is on the T06 sequence
self.children[output].append((name, child_point))

# Determine parents of this task (to record if finished or not).
self.parents_finished = {}
for seq, ups in tdef.upstreams.items():
if not seq.is_on_sequence(self.point):
continue
for name, trigger in ups:
if trigger.cycle_point_offset is None:
point = self.point
elif trigger.offset_is_irregular:
point = get_point_relative(trigger.cycle_point_offset,
self.point)
elif trigger.abs_cycle_point:
point = trigger.abs_cycle_point
elif is_offset_absolute(trigger.cycle_point_offset):
point = get_point(trigger.cycle_point_offset).standardise()
else:
point = get_point_relative(trigger.cycle_point_offset,
self.point)
point = trigger.get_parent_point(self.point)
if point < initial_point:
# pre-initial dependence
continue
Expand Down
85 changes: 71 additions & 14 deletions cylc/flow/task_trigger.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,8 @@
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.

from cylc.flow.cycling.loader import get_point_relative
from cylc.flow.cycling.loader import (
get_point, get_point_relative, get_interval)
from cylc.flow.exceptions import TriggerExpressionError
from cylc.flow.prerequisite import Prerequisite
from cylc.flow.task_outputs import (
Expand Down Expand Up @@ -48,16 +49,69 @@ class TaskTrigger(object):
"""

__slots__ = ['task_name', 'abs_cycle_point', 'cycle_point_offset',
'offset_is_irregular', 'output']
__slots__ = ['task_name', 'cycle_point_offset', 'output',
'offset_is_irregular', 'offset_is_absolute',
'offset_is_from_icp', 'initial_point']

def __init__(self, task_name, abs_cycle_point, offset_is_irregular,
cycle_point_offset, output):
def __init__(self, task_name, cycle_point_offset, output,
offset_is_irregular, offset_is_absolute,
offset_is_from_icp, initial_point):
self.task_name = task_name
self.abs_cycle_point = abs_cycle_point
self.cycle_point_offset = cycle_point_offset
self.offset_is_irregular = offset_is_irregular
self.output = output
self.offset_is_irregular = offset_is_irregular
self.offset_is_from_icp = offset_is_from_icp
self.offset_is_absolute = offset_is_absolute
self.initial_point = initial_point
# NEED TO DISTINGUISH BETWEEN ABSOLUTE OFFSETS
# 2000, 20000101T0600Z, 2000-01-01T06:00+00:00, ...
# AND NON-ABSOLUTE IRREGULAR:
# -PT6H+P1D, T00, ...
if (self.offset_is_irregular and any(
self.cycle_point_offset.startswith(c)
for c in ['P', '+', '-', 'T'])):
self.offset_is_absolute = False

def get_parent_point(self, from_point):
""" blah """
if self.cycle_point_offset is None:
point = from_point
elif self.offset_is_absolute:
point = get_point(self.cycle_point_offset).standardise()
else:
if self.offset_is_from_icp:
from_point = self.initial_point
# works with offset_is_irregular or not:
point = get_point_relative(self.cycle_point_offset, from_point)
return point

def get_child_point(self, from_point, seq):
"""Return the point of the output to which this TaskTrigger pertains.
Args:
point (cylc.flow.cycling.PointBase): The cycle point of the
dependent task.
Returns:
cylc.flow.cycling.PointBase: The cycle point of the dependency.
"""
if self.cycle_point_offset is None:
point = from_point
elif self.offset_is_absolute or self.offset_is_from_icp:
# First child is at start of sequence.
# E.g. for "R/1/P1 = foo[2] => bar"
# foo.2 should spawn bar.1; then we auto-spawn bar.2,3,...
point = seq.get_start_point() # .standardise()?
elif self.offset_is_irregular:
# Change offset sign to find children
# e.g. -P1D+PT18H to +P1D-PT18H
point = get_point_relative(
self.cycle_point_offset.translate(
self.cycle_point_offset.maketrans('-+', '+-')), from_point)
else:
point = from_point - get_interval(self.cycle_point_offset)
return point

def get_point(self, point):
"""Return the point of the output to which this TaskTrigger pertains.
Expand All @@ -70,17 +124,20 @@ def get_point(self, point):
cylc.flow.cycling.PointBase: The cycle point of the dependency.
"""
if self.abs_cycle_point:
point = self.abs_cycle_point
elif self.cycle_point_offset:
# TODO CAN THIS BE REPLACED WITH get_parent_point() above?
if self.offset_is_absolute:
point = get_point(self.cycle_point_offset).standardise()
elif self.offset_is_from_icp:
point = get_point_relative(
self.cycle_point_offset, point)
self.cycle_point_offset, self.initial_point)
elif self.cycle_point_offset:
point = get_point_relative(self.cycle_point_offset, point)
return point

def __str__(self):
if self.abs_cycle_point:
return '%s[%s]:%s' % (self.task_name, self.abs_cycle_point,
self.output)
if not self.offset_is_irregular and self.offset_is_absolute:
point = get_point(self.cycle_point_offset).standardise()
return '%s[%s]:%s' % (self.task_name, point, self.output)
elif self.cycle_point_offset:
return '%s[%s]:%s' % (self.task_name, self.cycle_point_offset,
self.output)
Expand Down
21 changes: 7 additions & 14 deletions cylc/flow/taskdef.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,24 +69,24 @@ def __init__(self, name, rtcfg, run_mode, start_point):
self.name = name
self.elapsed_times = deque(maxlen=self.MAX_LEN_ELAPSED_TIMES)

def add_downstreams(self, trigger, downstream, sequence):
def add_downstreams(self, trigger, taskname, sequence):
"""Map task outputs to downstream tasks that depend on them.
{sequence:
{
output: [(a,o1), (b,o2), ...] # (task-name, offset)
output: [(a,t1), (b,t2), ...] # (task-name, trigger)
}
}
"""
offset = trigger.cycle_point_offset
self.downstreams.setdefault(
sequence, {}).setdefault(trigger, []).append((downstream, offset))
sequence, {}).setdefault(
trigger.output, []).append((taskname, trigger))

def add_upstreams(self, trigger, upstream, sequence):
"""Record tasks that I depend on.
{
sequence: set([(a,o1), (b,o2), ...]) # (task-name, offset)
sequence: set([(a,t1), (b,t2), ...]) # (task-name, trigger)
}
"""
if sequence not in self.upstreams:
Expand Down Expand Up @@ -148,14 +148,7 @@ def get_parent_points(self, point):
if dep.suicide:
continue
for trig in dep.task_triggers:
if trig.abs_cycle_point is not None:
parent_points.add(trig.abs_cycle_point)
elif trig.cycle_point_offset is None:
parent_points.add(point)
else:
parent_points.add(
get_point_relative(
trig.cycle_point_offset, point))
parent_points.add(trig.get_parent_point(point))
return parent_points

def get_abs_triggers(self, point):
Expand All @@ -170,6 +163,6 @@ def get_abs_triggers(self, point):
# task has prereqs in this sequence
for dep in self.dependencies[seq]:
for trig in dep.task_triggers:
if trig.abs_cycle_point:
if trig.offset_is_absolute or trig.offset_is_from_icp:
abs_triggers.add(trig)
return abs_triggers
Loading

0 comments on commit 7535e12

Please sign in to comment.