Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

New _fetch_status code. #417

Merged
merged 57 commits into from
Jan 19, 2021
Merged
Show file tree
Hide file tree
Changes from 44 commits
Commits
Show all changes
57 commits
Select commit Hold shift + click to select a range
081f367
Initial rewrite of _fetch_status.
bdice Jan 6, 2021
e048af6
Update _fetch_scheduler_status to require aggregates.
bdice Jan 6, 2021
4e32661
Clarify docstring for status_parallelization.
bdice Jan 6, 2021
7855249
Experiment with callback approach (only iterate over aggregates/group…
bdice Jan 6, 2021
34b6cef
Skip evaluation of post-conditions during eligibility if known to be …
bdice Jan 6, 2021
6138326
Change operations to groups.
bdice Jan 6, 2021
de73489
Use context manager for updating the cached status in bulk.
bdice Jan 7, 2021
e06b74f
Separate scheduler query from cached status update.
bdice Jan 7, 2021
1fccff8
Fix docstring.
bdice Jan 7, 2021
b88f62e
Use context manager for _fetch_scheduler_status cache update.
bdice Jan 7, 2021
ef2d168
Use cached status context manager for _fetch_status.
bdice Jan 7, 2021
28916a1
Refactor status updates so the scheduler query happens on-the-fly and…
bdice Jan 8, 2021
1b2ab67
Intermediate commit with notes.
bdice Jan 8, 2021
09a8b0b
Simplify eligibility check.
bdice Jan 12, 2021
03fbcf5
Update docstring.
bdice Jan 12, 2021
bd6a6a3
Update tests to account for removed method.
bdice Jan 12, 2021
cc52517
Rename cid to cluster_id.
bdice Jan 12, 2021
ff63832
Define parallel executor.
bdice Jan 12, 2021
36ef3f9
Only update status if the update dictionary has data.
bdice Jan 12, 2021
3f5e04d
Add parallel execution.
bdice Jan 12, 2021
fd3c273
Remove status callback.
bdice Jan 12, 2021
af03674
Clean up status fetching.
bdice Jan 12, 2021
8b9b53c
Remove unnecessary internal methods.
bdice Jan 12, 2021
eafa6cc
Fix test (scheduler is queried internally).
bdice Jan 12, 2021
c196dfe
Rename jobs -> aggregate.
bdice Jan 13, 2021
80da4d2
Fix bug in tests (the dynamically altered jobs should NOT be resubmit…
bdice Jan 13, 2021
e5cf670
Show aggregate id in str of _JobOperation.
bdice Jan 13, 2021
0f287d8
Fix script output.
bdice Jan 13, 2021
82dba08
Remove keyword argument.
bdice Jan 13, 2021
fa6dad6
Reset MockScheduler in setUp for all tests.
bdice Jan 13, 2021
d367611
Mock root directory (can't override Project.root_directory() because …
bdice Jan 13, 2021
e268d43
Update template reference data.
bdice Jan 13, 2021
aa8b327
Refactor mocked root.
bdice Jan 13, 2021
65fd807
Pickle function to be executed in parallel using cloudpickle so that …
vyasr Jan 14, 2021
5a15f5b
Fix pre-commit hooks.
vyasr Jan 14, 2021
234b879
Fix root directory mocking.
bdice Jan 16, 2021
a89137d
Improve progress iterators.
bdice Jan 16, 2021
3fc8397
Merge branch 'next' into feature/new-fetch-status
bdice Jan 16, 2021
5288a25
Use chunked job label fetching.
bdice Jan 16, 2021
c2086a4
Refactor parallel executor to accept one iterable and use Pool for pr…
bdice Jan 16, 2021
f695fa6
Use integers in the cached status update.
bdice Jan 16, 2021
95dc985
Fix mocking of system executable. Resolves #413.
bdice Jan 16, 2021
4e62a3d
Update changelog.
bdice Jan 16, 2021
bf58677
Mock environment directly rather than using **kwargs for compatibilit…
bdice Jan 16, 2021
0256b57
Merge remote-tracking branch 'origin/next' into feature/new-fetch-status
bdice Jan 18, 2021
d13e1e7
Buffer during project status update.
bdice Jan 18, 2021
5774fc5
Use ordered results.
bdice Jan 18, 2021
581a591
Don't buffer during status update (no performance difference).
bdice Jan 18, 2021
957c9ce
Refactor job labels so that the list of individual jobs is generated …
bdice Jan 18, 2021
eb4d693
Update flow/util/misc.py
bdice Jan 18, 2021
86824fb
Fix function parameters in test, use kwargs for _fetch_status.
bdice Jan 19, 2021
f9ba946
Merge branch 'feature/new-fetch-status' of github.com:glotzerlab/sign…
bdice Jan 19, 2021
9de6be8
Use process_map.
bdice Jan 19, 2021
d673995
Use MOCK_EXECUTABLE.
bdice Jan 19, 2021
85b25a7
Add comments explaining use of FakeScheduler.
bdice Jan 19, 2021
d73a6b9
Collect errors that occur during status evaluation.
bdice Jan 19, 2021
a66cb77
Mock the id generation method instead of injecting mocked attributes.
bdice Jan 19, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog.txt
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ Changed
- Default environment for the University of Minnesota Mangi cluster changed from SLURM to Torque (#393).
- Improved internal caching of scheduler status (#410).
- Deprecated method ``export_job_statuses`` (#402).
- Refactored status fetching code (#368, #417).

Fixed
+++++
Expand Down
733 changes: 314 additions & 419 deletions flow/project.py

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion flow/templates/base_status.jinja
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@
{% if parameters %}
{% set para_output = ns.field_parameters | format(*job['parameters'].values()) %}
{% endif %}
{% for key, value in job['operations'].items() if value | job_filter(scheduler_status_code, all_ops) %}
{% for key, value in job['groups'].items() if value | job_filter(scheduler_status_code, all_ops) %}
{% if loop.first %}
| {{job['job_id']}} | {{ field_operation | highlight(value['eligible'], pretty) | format(key, '['+scheduler_status_code[value['scheduler_status']]+']') }} | {{ para_output }}{{ job['labels'] | join(', ') }} |
{% else %}
Expand Down
80 changes: 80 additions & 0 deletions flow/util/misc.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,18 @@
"""Miscellaneous utility functions."""
import argparse
import logging
import multiprocessing
import os
from collections.abc import MutableMapping
from contextlib import contextmanager
from functools import lru_cache, partial
from itertools import cycle, islice

import cloudpickle
from tqdm.auto import tqdm
from tqdm.contrib import tmap
from tqdm.contrib.concurrent import thread_map
bdice marked this conversation as resolved.
Show resolved Hide resolved


def _positive_int(value):
"""Parse a command line argument as a positive integer.
Expand Down Expand Up @@ -303,3 +309,77 @@ def __iter__(self):

def __len__(self):
return len(self._data)


def _run_cloudpickled_func(func, *args):
"""Execute a cloudpickled function.

The set of functions that can be pickled by the built-in pickle module is
very limited, which prevents the usage of various useful cases such as
locally-defined functions or functions that internally call class methods.
This function circumvents that difficulty by allowing the user to pickle
the function object a priori and bind it as the first argument to a partial
application of this function. All subsequent arguments are transparently
passed through.
"""
unpickled_func = cloudpickle.loads(func)
args = list(map(cloudpickle.loads, args))
return unpickled_func(*args)


def _get_parallel_executor(parallelization="thread"):
vyasr marked this conversation as resolved.
Show resolved Hide resolved
"""Get an executor for the desired parallelization strategy.

This executor shows a progress bar while executing a function over an
iterable in parallel. The returned callable has signature ``func,
iterable, **kwargs``. The iterable must have a length (generators are not
supported). The keyword argument ``chunksize`` is used for chunking the
iterable in supported parallelization modes. All other ``**kwargs`` are
passed to the tqdm progress bar.
bdice marked this conversation as resolved.
Show resolved Hide resolved

Parameters
----------
parallelization : str
Parallelization mode. Allowed values are "thread", "process", or
"none". (Default value = "thread")

Returns
-------
callable
A callable with signature ``func, iterable, **kwargs``.

"""
if parallelization == "thread":
parallel_executor = thread_map
elif parallelization == "process":
vyasr marked this conversation as resolved.
Show resolved Hide resolved

def parallel_executor(func, iterable, chunksize=1, **kwargs):
# tqdm progress bar requires a total
if "total" not in kwargs:
kwargs["total"] = len(iterable)

with multiprocessing.Pool() as pool:
bdice marked this conversation as resolved.
Show resolved Hide resolved
return list(
tqdm(
pool.imap_unordered(
# Creating a partial here allows us to use the
# local function unpickled_func. The top-level
# function called on each process cannot be a local
# function, it must be a module-level function.
partial(_run_cloudpickled_func, cloudpickle.dumps(func)),
map(cloudpickle.dumps, iterable),
chunksize=chunksize,
),
**kwargs,
)
)

else:

def parallel_executor(func, iterable, **kwargs):
if "chunksize" in kwargs:
# Chunk size only applies to thread/process parallel executors
del kwargs["chunksize"]
return list(tmap(func, iterable, **kwargs))

return parallel_executor
2 changes: 1 addition & 1 deletion tests/define_test_project.py
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ class _DynamicTestProject(_TestProject):
@_DynamicTestProject.operation
@group3
@_DynamicTestProject.pre.after(op1)
@_DynamicTestProject.post.true("dynamic")
@_DynamicTestProject.post(lambda job: job.sp.get("dynamic", False))
vyasr marked this conversation as resolved.
Show resolved Hide resolved
def op4(job):
job.sp.dynamic = True # migration during execution

Expand Down
137 changes: 76 additions & 61 deletions tests/generate_template_reference_data.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
# All rights reserved.
# This software is licensed under the BSD 3-Clause License.
import argparse
import contextlib
import io
import itertools
import operator
Expand All @@ -17,6 +18,7 @@

import flow
import flow.environments
from flow.scheduling.fakescheduler import FakeScheduler

# Define a consistent submission name so that we can test that job names are
# being correctly generated.
Expand Down Expand Up @@ -143,16 +145,24 @@ def _store_bundled(self, operations):
return bid


def get_masked_flowproject(p, **kwargs):
@contextlib.contextmanager
def get_masked_flowproject(p, environment=None):
"""Mock environment-dependent attributes and functions. Need to mock
sys.executable before the FlowProject is instantiated, and then modify the
root_directory and project_dir elements after creation."""
sys.executable = "/usr/local/bin/python"
fp = TestProject.get_project(root=p.root_directory(), **kwargs)
fp._entrypoint.setdefault("path", "generate_template_reference_data.py")
fp.root_directory = lambda: PROJECT_DIRECTORY
fp.config.project_dir = PROJECT_DIRECTORY
return fp
try:
old_executable = sys.executable
sys.executable = "/usr/local/bin/python"
fp = TestProject.get_project(root=p.root_directory())
if environment is not None:
fp._environment = environment
fp._entrypoint.setdefault("path", "generate_template_reference_data.py")
fp._mock = True
fp._mock_root_directory = lambda: PROJECT_DIRECTORY
fp.config.project_dir = PROJECT_DIRECTORY
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These appear superfluous to me.

Suggested change
fp._mock = True
fp._mock_root_directory = lambda: PROJECT_DIRECTORY
fp.config.project_dir = PROJECT_DIRECTORY
fp.config.project_dir = PROJECT_DIRECTORY

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Never mind, I see where these are used now. I would rather set fp.root_directory = lambda: PROJECT_DIRECTORY here than include code in the main code base that exists just to facilitate this test. Is there a reason that wouldn't work?

Copy link
Member Author

@bdice bdice Jan 19, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, that won't work because we have to mock the root directory used for operation id generation separately from the real root directory that contains the jobs. We need consistently generated ids (using the mocked value) but we still need the root directory to be correct (pointing to some temporary directory during tests) or else we break signac (maybe instantiating jobs fails? Or the config is inaccessible? Or it can't pickle/unpickle? Something like that, but I forgot the exact reason).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Got it, you're right that this won't work in that case. How about the following context manager instead:

@contextmanager
def wrap_id_generation(fp):
    """Create a context where id generation will use a mocked root directory."""
    orig_generate_id = fp._generate_id 
    orig_root_directory = fp.root_directory
    mocked_root_directory = lambda: PROJECT_DIRECTORY

    def wrapped_generate_id(self, aggregate, operation_name=None):
        """Mock the root directory used for id generation to ensure consistent ids."""
        # This mocking has to happen within this method to avoid affecting other methods called during the test.
        self.root_directory = mocked_root_directory
        _generate_id(aggregate, operation_name)
        self.root_directory = orig_root_directory
    
    fp._generate_id = wrapped_generate_id 
    yield
    fp._generate_id = orig_generate_id

We don't execute anything in parallel here, so I think this should be safe. It's a few more lines of code than the current solution, but I would really like to avoid injected this code into flow/project.py if at all possible.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You're right, this would be much better. I'll adapt this.

yield fp
finally:
sys.executable = old_executable


def main(args):
Expand All @@ -170,79 +180,84 @@ def main(args):

with signac.TemporaryProject(name=PROJECT_NAME) as p:
init(p)
fp = get_masked_flowproject(p)
# Here we set the appropriate executable for all the operations. This
# is necessary as otherwise the default executable between submitting
# and running could look different depending on the environment.
executable = "/usr/local/bin/python"
for group in fp.groups.values():
for op_key in group.operations:
if op_key in group.operation_directives:
group.operation_directives[op_key]["executable"] = executable
for job in fp:
with job:
kwargs = job.statepoint()
env = get_nested_attr(flow, kwargs["environment"])
fp._environment = env
parameters = kwargs["parameters"]
if "bundle" in parameters:
bundle = parameters.pop("bundle")
fn = "script_{}.sh".format("_".join(bundle))
tmp_out = io.TextIOWrapper(io.BytesIO(), sys.stdout.encoding)
with redirect_stdout(tmp_out):
try:
fp.submit(
jobs=[job],
names=bundle,
pretend=True,
force=True,
bundle_size=len(bundle),
**parameters,
)
except jinja2.TemplateError as e:
print("ERROR:", e) # Shows template error in output script

# Filter out non-header lines
tmp_out.seek(0)
with open(fn, "w") as f:
with redirect_stdout(f):
print(tmp_out.read(), end="")
else:
for op in {**fp.operations, **fp.groups}:
if "partition" in parameters:
# Don't try to submit GPU operations to CPU partitions
# and vice versa. We should be able to relax this
# requirement if we make our error checking more
# consistent.
if operator.xor(
"gpu" in parameters["partition"].lower(),
"gpu" in op.lower(),
):
continue
fn = f"script_{op}.sh"
with get_masked_flowproject(p) as fp:
# Here we set the appropriate executable for all the operations. This
# is necessary as otherwise the default executable between submitting
# and running could look different depending on the environment.
executable = "/usr/local/bin/python"
bdice marked this conversation as resolved.
Show resolved Hide resolved
for group in fp.groups.values():
for op_key in group.operations:
if op_key in group.operation_directives:
group.operation_directives[op_key]["executable"] = executable
for job in fp:
with job:
kwargs = job.statepoint()
env = get_nested_attr(flow, kwargs["environment"])
env.scheduler_type = FakeScheduler
bdice marked this conversation as resolved.
Show resolved Hide resolved
fp._environment = env
parameters = kwargs["parameters"]
bdice marked this conversation as resolved.
Show resolved Hide resolved
if "bundle" in parameters:
bundle = parameters.pop("bundle")
fn = "script_{}.sh".format("_".join(bundle))
tmp_out = io.TextIOWrapper(io.BytesIO(), sys.stdout.encoding)
with redirect_stdout(tmp_out):
try:
fp.submit(
jobs=[job],
names=[op],
names=bundle,
pretend=True,
force=True,
bundle_size=len(bundle),
**parameters,
)
except jinja2.TemplateError as e:
print(
"ERROR:", e
) # Shows template error in output script

# Filter out non-header lines and the job-name line
# Filter out non-header lines
tmp_out.seek(0)
with open(fn, "w") as f:
with redirect_stdout(f):
print(tmp_out.read(), end="")
else:
for op in {**fp.operations, **fp.groups}:
if "partition" in parameters:
# Don't try to submit GPU operations to CPU partitions
# and vice versa. We should be able to relax this
# requirement if we make our error checking more
# consistent.
if operator.xor(
"gpu" in parameters["partition"].lower(),
"gpu" in op.lower(),
):
continue
fn = f"script_{op}.sh"
tmp_out = io.TextIOWrapper(
io.BytesIO(), sys.stdout.encoding
)
with redirect_stdout(tmp_out):
try:
fp.submit(
jobs=[job],
names=[op],
pretend=True,
force=True,
**parameters,
)
except jinja2.TemplateError as e:
print(
"ERROR:", e
) # Shows template error in output script

# Filter out non-header lines and the job-name line
tmp_out.seek(0)
with open(fn, "w") as f:
with redirect_stdout(f):
print(tmp_out.read(), end="")

# For compactness, we move the output into an ARCHIVE_DIR then delete the original data.
fp.export_to(target=ARCHIVE_DIR)
# For compactness, we move the output into an ARCHIVE_DIR then delete the original data.
fp.export_to(target=ARCHIVE_DIR)


if __name__ == "__main__":
Expand Down
Binary file modified tests/template_reference_data.tar.gz
Binary file not shown.
Loading