Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Ignore pre-initial deps for --start-tasks. #4259

Merged
merged 10 commits into from
Jul 5, 2021

Conversation

hjoliver
Copy link
Member

@hjoliver hjoliver commented Jun 11, 2021

These changes:

Requirements check-list

  • I have read CONTRIBUTING.md and added my name as a Code Contributor.
  • Contains logically grouped changes (else tidy your branch by rebase).
  • Does not contain off-topic changes (use other PRs for other changes).
  • Applied any dependency changes to both setup.py and
    conda-environment.yml. (None)
  • Appropriate tests are included (extended unit tests).
  • Appropriate change log entry included.
  • No documentation update required. (Command help updated).

@hjoliver
Copy link
Member Author

hjoliver commented Jun 11, 2021

A good test case for this:

[scheduling]
   cycling mode = integer
   initial cycle point = 1
   final cycle point = 3
   [[graph]]
      P1 = """
         a => b => c
         b[-P1] => b
      """
[runtime]
   [[a, b, c]]
      script = "sleep 5"

Start up with:

cylc play --start-task=a.2 <wf-name>

... should not stall and shut down at b.2

@hjoliver hjoliver force-pushed the start-task-pre-initial-ignore branch from 8237125 to fe66b43 Compare June 17, 2021 03:53
@hjoliver hjoliver self-assigned this Jun 17, 2021
@hjoliver hjoliver marked this pull request as ready for review June 17, 2021 04:02
Comment on lines -308 to +310
for point_id_map in self.main_pool.values()
for itask in point_id_map.values()
for itask in self.get_tasks()
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This was actually a bug, because the task pool can change size during iteration (as described in the comment above).

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(In fact the above example triggers this bug, on master)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't understand the difference with how it was previously, considering the definition of

def get_tasks(self) -> List[TaskProxy]:
"""Return a list of task proxies in the main pool."""
if self.main_pool_changed:
self.main_pool_changed = False
self.main_pool_list = []
for _, itask_id_map in self.main_pool.items():
for __, itask in itask_id_map.items():
self.main_pool_list.append(itask)
return self.main_pool_list

(apart from the fact that self.get_tasks() can mutate self.main_pool_list and self.main_pool_changed)

Copy link
Member Author

@hjoliver hjoliver Jun 22, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If I run the example above on master, with --start-task=a.2:

2021-06-23T10:17:47+12:00 INFO - [a.2] -released from runahead
2021-06-23T10:17:47+12:00 INFO - (a.2) spawned a.3 flow(i)
2021-06-23T10:17:47+12:00 ERROR - dictionary changed size during iteration
        Traceback (most recent call last):
          File "/home/oliverh/cylc/cylc-flow/cylc/flow/scheduler.py", line 572, in start_scheduler
            await self.main_loop()
          File "/home/oliverh/cylc/cylc-flow/cylc/flow/scheduler.py", line 1409, in main_loop
            if not self.is_paused and self.pool.release_runahead_tasks():
          File "/home/oliverh/cylc/cylc-flow/cylc/flow/task_pool.py", line 306, in release_runahead_tasks
            for itask in (
          File "/home/oliverh/cylc/cylc-flow/cylc/flow/task_pool.py", line 306, in <genexpr>
            for itask in (
        RuntimeError: dictionary changed size during iteration
2021-06-23T10:17:47+12:00 CRITICAL - Workflow shutting down - dictionary changed size during iteration

Copy link
Member Author

@hjoliver hjoliver Jun 22, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

On this branch, we iterate over the list of tasks (returned by self.get_tasks()) that reflects the state of the pool before iteration begins.

On master, we iterate over main_pool.values(). In Python 3, dict.values() does not return a list of values, it's a "view" of the actual dictionary, so modifying it in-flight is bad. https://docs.python.org/3/library/stdtypes.html#dict-views

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I found this bug too:

2021-06-30T12:27:08+12:00 INFO - [a.2] -released from runahead
2021-06-30T12:27:08+12:00 INFO - (a.2) spawned a.3 flow(r)
2021-06-30T12:27:08+12:00 ERROR - dictionary changed size during iteration
        Traceback (most recent call last):
          File "/home/sutherlander/cylc/cylc-flow/cylc/flow/scheduler.py", line 572, in start_scheduler
            await self.main_loop()
          File "/home/sutherlander/cylc/cylc-flow/cylc/flow/scheduler.py", line 1405, in main_loop
            if not self.is_paused and self.pool.release_runahead_tasks():
          File "/home/sutherlander/cylc/cylc-flow/cylc/flow/task_pool.py", line 309, in release_runahead_tasks
            for point_id_map in self.main_pool.values()
          File "/home/sutherlander/cylc/cylc-flow/cylc/flow/task_pool.py", line 308, in <genexpr>
            itask
        RuntimeError: dictionary changed size during iteration
2021-06-30T12:27:08+12:00 CRITICAL - Workflow shutting down - dictionary changed size during iteration

Changed it to:

         for itask in (
             itask
-            for point_id_map in self.main_pool.values()
+            for point_id_map in list(self.main_pool.values())
             for itask in point_id_map.values()
             if itask.state.is_runahead
             if itask.state(

For the purposes of reviewing this:

2021-06-30T12:33:26+12:00 INFO - (a.2) spawned b.2 flow(w)
2021-06-30T12:33:26+12:00 INFO - [a.3] status=running: (received)succeeded at 2021-06-30T12:33:25+12:00  for job(01) flow(w)
2021-06-30T12:33:26+12:00 INFO - (a.3) spawned b.3 flow(w)
2021-06-30T12:33:26+12:00 INFO - Workflow shutting down - AUTOMATIC
2021-06-30T12:33:26+12:00 WARNING - Partially satisfied prerequisites left over:
        b.2 is waiting on:
        * b.1 succeeded
ESC[0m2021-06-30T12:33:26+12:00 INFO - DONE

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, I'm slightly confused @dwsutherland - isn't that exactly the same bug that I fixed on this branch?? (although you've fixed it in a slightly different way, same result).

Copy link
Member

@dwsutherland dwsutherland Jul 5, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My bad should have phrased it:

"While trying to reproduce the problem on master I found that same bug too:"

Yes, it's fixed on this branch.. But it wasn't the purpose of this branch (though clearly related), so when trying to test the master branch I ran into the same bug that is fixed in this branch.....

It was important to fix master to test the actual purpose of this PR, to know how master handles pre-initial dependencies (because we can't even get that far in the presence of the bug)... So I made that above temporary fix.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK, no worries - thanks for clarifying!

@hjoliver
Copy link
Member Author

One fast test failed. It passes locally and is unrelated to this change.
FAILED tests/integration/test_publisher.py::test_publisher - asyncio.exceptio...

@hjoliver hjoliver requested a review from dwsutherland June 18, 2021 00:07
TaskID.split(taskid)[1]
).standardise()
for taskid in self.options.starttask
)
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The only thing I had to do for this PR was define the start cycle point, for the start task case. Then pre-initial ignore gets done automatically.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm.. Is this the proper way of doing it?

What if the workflow has some R1 / start-up tasks for a few cycles, and then runs it's normal tasks (containing the start task) at some offset from ICP?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry, I don't follow here either 🤔 If we start up the scheduler with --start-task all that matters (for the purpose of this PR) is that other tasks don't get stuck waiting on tasks at cycle points earlier than the start-task(s). How do R1 tasks and offsets from ICP affect that?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The start task (and perhaps dependents) may not exist at the ICP. They could be defined as starting from some offset of the ICP...

So if we force the ICP to a non-ICP start task.. What is the result?
(might test this tomorrow)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You have to give the name.point of start-tasks, and they are checked for validity at the specified point.

[scheduling]
    cycling mode = integer
    initial cycle point = 1
    [[graph]]
        R1 = prep => foo
        P1 = foo[-P1] => foo
[runtime]
    [[prep, foo]]

shot

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, maybe my statement above wasn't clear:

The only thing I had to do for this PR was define the start cycle point, for the start task case. Then pre-initial ignore gets done automatically.

Rather than define the start cycle point for start tasks I should have said, extract the cycle point from start tasks and set the workflow start point to that.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What I'm asking is; how do you square the making the workflow starting point the with the task starting point when the task is defined in the workflow graph to be some offset from the starting point?

[scheduling]
    cycling mode = integer
    initial cycle point = 1
    [[graph]]
        R1 = prep => foo
        +P5/P1 = foo[-P1] => foo => bar
[runtime]
    [[prep, foo, bar]]

And then run with:
$ cylc play --start-task=bar.10 demo2/run1

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

by start point I assume you mean initial cycle point at run time (?).

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right, understood now. No, ICP defines the extent of the graph (and where things happen relative to ICP) but start point (or start task now too) is where you start up within that graph. So we don’t mess with ICP here.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, well, apart from the minor conflict.. wer're ready to go with this one.

@hjoliver hjoliver requested a review from MetRonnie June 21, 2021 23:54
Copy link
Member

@MetRonnie MetRonnie left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Several minor comments and a request for an additional test case

cylc/flow/scheduler_cli.py Outdated Show resolved Hide resolved
cylc/flow/scheduler_cli.py Outdated Show resolved Hide resolved
cylc/flow/scheduler_cli.py Outdated Show resolved Hide resolved
cylc/flow/scheduler_cli.py Outdated Show resolved Hide resolved
Comment on lines -308 to +310
for point_id_map in self.main_pool.values()
for itask in point_id_map.values()
for itask in self.get_tasks()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't understand the difference with how it was previously, considering the definition of

def get_tasks(self) -> List[TaskProxy]:
"""Return a list of task proxies in the main pool."""
if self.main_pool_changed:
self.main_pool_changed = False
self.main_pool_list = []
for _, itask_id_map in self.main_pool.items():
for __, itask in itask_id_map.items():
self.main_pool_list.append(itask)
return self.main_pool_list

(apart from the fact that self.get_tasks() can mutate self.main_pool_list and self.main_pool_changed)

tests/unit/test_config.py Show resolved Hide resolved
cylc/flow/config.py Outdated Show resolved Hide resolved
@MetRonnie MetRonnie added this to the cylc-8.0b2 milestone Jun 24, 2021
Copy link
Member

@MetRonnie MetRonnie left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

Copy link
Member

@dwsutherland dwsutherland left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just a comment about forcing the ICP to match the start task (what if the the start task is defined on some offset from the ICP?).

Otherwise it fixes and bug, and works for the above example workflow.

Comment on lines -308 to +310
for point_id_map in self.main_pool.values()
for itask in point_id_map.values()
for itask in self.get_tasks()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I found this bug too:

2021-06-30T12:27:08+12:00 INFO - [a.2] -released from runahead
2021-06-30T12:27:08+12:00 INFO - (a.2) spawned a.3 flow(r)
2021-06-30T12:27:08+12:00 ERROR - dictionary changed size during iteration
        Traceback (most recent call last):
          File "/home/sutherlander/cylc/cylc-flow/cylc/flow/scheduler.py", line 572, in start_scheduler
            await self.main_loop()
          File "/home/sutherlander/cylc/cylc-flow/cylc/flow/scheduler.py", line 1405, in main_loop
            if not self.is_paused and self.pool.release_runahead_tasks():
          File "/home/sutherlander/cylc/cylc-flow/cylc/flow/task_pool.py", line 309, in release_runahead_tasks
            for point_id_map in self.main_pool.values()
          File "/home/sutherlander/cylc/cylc-flow/cylc/flow/task_pool.py", line 308, in <genexpr>
            itask
        RuntimeError: dictionary changed size during iteration
2021-06-30T12:27:08+12:00 CRITICAL - Workflow shutting down - dictionary changed size during iteration

Changed it to:

         for itask in (
             itask
-            for point_id_map in self.main_pool.values()
+            for point_id_map in list(self.main_pool.values())
             for itask in point_id_map.values()
             if itask.state.is_runahead
             if itask.state(

For the purposes of reviewing this:

2021-06-30T12:33:26+12:00 INFO - (a.2) spawned b.2 flow(w)
2021-06-30T12:33:26+12:00 INFO - [a.3] status=running: (received)succeeded at 2021-06-30T12:33:25+12:00  for job(01) flow(w)
2021-06-30T12:33:26+12:00 INFO - (a.3) spawned b.3 flow(w)
2021-06-30T12:33:26+12:00 INFO - Workflow shutting down - AUTOMATIC
2021-06-30T12:33:26+12:00 WARNING - Partially satisfied prerequisites left over:
        b.2 is waiting on:
        * b.1 succeeded
ESC[0m2021-06-30T12:33:26+12:00 INFO - DONE

TaskID.split(taskid)[1]
).standardise()
for taskid in self.options.starttask
)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm.. Is this the proper way of doing it?

What if the workflow has some R1 / start-up tasks for a few cycles, and then runs it's normal tasks (containing the start task) at some offset from ICP?

@dwsutherland
Copy link
Member

Bah, rebase has conflicts even after manually fixing the Changes.md

@hjoliver hjoliver force-pushed the start-task-pre-initial-ignore branch from 6a02775 to d2c94c3 Compare July 5, 2021 20:52
@hjoliver
Copy link
Member Author

hjoliver commented Jul 5, 2021

Rebased. The conflict was only in the change log so no need to wait for the tests.

@hjoliver hjoliver merged commit 4f59757 into cylc:master Jul 5, 2021
@hjoliver hjoliver deleted the start-task-pre-initial-ignore branch July 5, 2021 20:53
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
3 participants