Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

New _fetch_status code. #417

Merged
merged 57 commits into from
Jan 19, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
57 commits
Select commit Hold shift + click to select a range
081f367
Initial rewrite of _fetch_status.
bdice Jan 6, 2021
e048af6
Update _fetch_scheduler_status to require aggregates.
bdice Jan 6, 2021
4e32661
Clarify docstring for status_parallelization.
bdice Jan 6, 2021
7855249
Experiment with callback approach (only iterate over aggregates/group…
bdice Jan 6, 2021
34b6cef
Skip evaluation of post-conditions during eligibility if known to be …
bdice Jan 6, 2021
6138326
Change operations to groups.
bdice Jan 6, 2021
de73489
Use context manager for updating the cached status in bulk.
bdice Jan 7, 2021
e06b74f
Separate scheduler query from cached status update.
bdice Jan 7, 2021
1fccff8
Fix docstring.
bdice Jan 7, 2021
b88f62e
Use context manager for _fetch_scheduler_status cache update.
bdice Jan 7, 2021
ef2d168
Use cached status context manager for _fetch_status.
bdice Jan 7, 2021
28916a1
Refactor status updates so the scheduler query happens on-the-fly and…
bdice Jan 8, 2021
1b2ab67
Intermediate commit with notes.
bdice Jan 8, 2021
09a8b0b
Simplify eligibility check.
bdice Jan 12, 2021
03fbcf5
Update docstring.
bdice Jan 12, 2021
bd6a6a3
Update tests to account for removed method.
bdice Jan 12, 2021
cc52517
Rename cid to cluster_id.
bdice Jan 12, 2021
ff63832
Define parallel executor.
bdice Jan 12, 2021
36ef3f9
Only update status if the update dictionary has data.
bdice Jan 12, 2021
3f5e04d
Add parallel execution.
bdice Jan 12, 2021
fd3c273
Remove status callback.
bdice Jan 12, 2021
af03674
Clean up status fetching.
bdice Jan 12, 2021
8b9b53c
Remove unnecessary internal methods.
bdice Jan 12, 2021
eafa6cc
Fix test (scheduler is queried internally).
bdice Jan 12, 2021
c196dfe
Rename jobs -> aggregate.
bdice Jan 13, 2021
80da4d2
Fix bug in tests (the dynamically altered jobs should NOT be resubmit…
bdice Jan 13, 2021
e5cf670
Show aggregate id in str of _JobOperation.
bdice Jan 13, 2021
0f287d8
Fix script output.
bdice Jan 13, 2021
82dba08
Remove keyword argument.
bdice Jan 13, 2021
fa6dad6
Reset MockScheduler in setUp for all tests.
bdice Jan 13, 2021
d367611
Mock root directory (can't override Project.root_directory() because …
bdice Jan 13, 2021
e268d43
Update template reference data.
bdice Jan 13, 2021
aa8b327
Refactor mocked root.
bdice Jan 13, 2021
65fd807
Pickle function to be executed in parallel using cloudpickle so that …
vyasr Jan 14, 2021
5a15f5b
Fix pre-commit hooks.
vyasr Jan 14, 2021
234b879
Fix root directory mocking.
bdice Jan 16, 2021
a89137d
Improve progress iterators.
bdice Jan 16, 2021
3fc8397
Merge branch 'next' into feature/new-fetch-status
bdice Jan 16, 2021
5288a25
Use chunked job label fetching.
bdice Jan 16, 2021
c2086a4
Refactor parallel executor to accept one iterable and use Pool for pr…
bdice Jan 16, 2021
f695fa6
Use integers in the cached status update.
bdice Jan 16, 2021
95dc985
Fix mocking of system executable. Resolves #413.
bdice Jan 16, 2021
4e62a3d
Update changelog.
bdice Jan 16, 2021
bf58677
Mock environment directly rather than using **kwargs for compatibilit…
bdice Jan 16, 2021
0256b57
Merge remote-tracking branch 'origin/next' into feature/new-fetch-status
bdice Jan 18, 2021
d13e1e7
Buffer during project status update.
bdice Jan 18, 2021
5774fc5
Use ordered results.
bdice Jan 18, 2021
581a591
Don't buffer during status update (no performance difference).
bdice Jan 18, 2021
957c9ce
Refactor job labels so that the list of individual jobs is generated …
bdice Jan 18, 2021
eb4d693
Update flow/util/misc.py
bdice Jan 18, 2021
86824fb
Fix function parameters in test, use kwargs for _fetch_status.
bdice Jan 19, 2021
f9ba946
Merge branch 'feature/new-fetch-status' of github.com:glotzerlab/sign…
bdice Jan 19, 2021
9de6be8
Use process_map.
bdice Jan 19, 2021
d673995
Use MOCK_EXECUTABLE.
bdice Jan 19, 2021
85b25a7
Add comments explaining use of FakeScheduler.
bdice Jan 19, 2021
d73a6b9
Collect errors that occur during status evaluation.
bdice Jan 19, 2021
a66cb77
Mock the id generation method instead of injecting mocked attributes.
bdice Jan 19, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog.txt
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ Changed
- Default environment for the University of Minnesota Mangi cluster changed from SLURM to Torque (#393).
- Deprecated method ``export_job_statuses`` (#402).
- Improved internal caching of scheduler status (#410).
- Refactored status fetching code (#368, #417).
- The ``use_buffered_mode`` config option is deprecated. Buffering is always internally enabled (#425).

Fixed
Expand Down
773 changes: 336 additions & 437 deletions flow/project.py

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion flow/templates/base_status.jinja
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@
{% if parameters %}
{% set para_output = ns.field_parameters | format(*job['parameters'].values()) %}
{% endif %}
{% for key, value in job['operations'].items() if value | job_filter(scheduler_status_code, all_ops) %}
{% for key, value in job['groups'].items() if value | job_filter(scheduler_status_code, all_ops) %}
{% if loop.first %}
| {{job['job_id']}} | {{ field_operation | highlight(value['eligible'], pretty) | format(key, '['+scheduler_status_code[value['scheduler_status']]+']') }} | {{ para_output }}{{ job['labels'] | join(', ') }} |
{% else %}
Expand Down
75 changes: 75 additions & 0 deletions flow/util/misc.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,10 @@
from functools import lru_cache, partial
from itertools import cycle, islice

import cloudpickle
from tqdm.contrib import tmap
from tqdm.contrib.concurrent import process_map, thread_map


def _positive_int(value):
"""Parse a command line argument as a positive integer.
Expand Down Expand Up @@ -303,3 +307,74 @@ def __iter__(self):

def __len__(self):
return len(self._data)


def _run_cloudpickled_func(func, *args):
"""Execute a cloudpickled function.

The set of functions that can be pickled by the built-in pickle module is
very limited, which prevents the usage of various useful cases such as
locally-defined functions or functions that internally call class methods.
This function circumvents that difficulty by allowing the user to pickle
the function object a priori and bind it as the first argument to a partial
application of this function. All subsequent arguments are transparently
passed through.
"""
unpickled_func = cloudpickle.loads(func)
args = list(map(cloudpickle.loads, args))
return unpickled_func(*args)


def _get_parallel_executor(parallelization="thread"):
vyasr marked this conversation as resolved.
Show resolved Hide resolved
"""Get an executor for the desired parallelization strategy.

This executor shows a progress bar while executing a function over an
iterable in parallel. The returned callable has signature ``func,
iterable, **kwargs``. The iterable must have a length (generators are not
supported). The keyword argument ``chunksize`` is used for chunking the
iterable in supported parallelization modes
(see :meth:`concurrent.futures.Executor.map`). All other ``**kwargs`` are
passed to the tqdm progress bar.

Parameters
----------
parallelization : str
Parallelization mode. Allowed values are "thread", "process", or
"none". (Default value = "thread")

Returns
-------
callable
A callable with signature ``func, iterable, **kwargs``.

"""
if parallelization == "thread":
parallel_executor = thread_map
elif parallelization == "process":
vyasr marked this conversation as resolved.
Show resolved Hide resolved

def parallel_executor(func, iterable, **kwargs):
# The tqdm progress bar requires a total. We compute the total in
# advance because a map iterable (which has no total) is passed to
# process_map.
if "total" not in kwargs:
kwargs["total"] = len(iterable)

return process_map(
# The top-level function called on each process cannot be a
# local function, it must be a module-level function. Creating
# a partial here allows us to use the passed function "func"
# regardless of whether it is a local function.
partial(_run_cloudpickled_func, cloudpickle.dumps(func)),
map(cloudpickle.dumps, iterable),
**kwargs,
)

else:

def parallel_executor(func, iterable, **kwargs):
if "chunksize" in kwargs:
# Chunk size only applies to thread/process parallel executors
del kwargs["chunksize"]
return list(tmap(func, iterable, **kwargs))

return parallel_executor
2 changes: 1 addition & 1 deletion tests/define_test_project.py
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ class _DynamicTestProject(_TestProject):
@_DynamicTestProject.operation
@group3
@_DynamicTestProject.pre.after(op1)
@_DynamicTestProject.post.true("dynamic")
@_DynamicTestProject.post(lambda job: job.sp.get("dynamic", False))
vyasr marked this conversation as resolved.
Show resolved Hide resolved
def op4(job):
job.sp.dynamic = True # migration during execution

Expand Down
163 changes: 102 additions & 61 deletions tests/generate_template_reference_data.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
# All rights reserved.
# This software is licensed under the BSD 3-Clause License.
import argparse
import contextlib
import io
import itertools
import operator
Expand All @@ -17,6 +18,7 @@

import flow
import flow.environments
from flow.scheduling.fakescheduler import FakeScheduler

# Define a consistent submission name so that we can test that job names are
# being correctly generated.
Expand All @@ -25,6 +27,7 @@
os.path.join(os.path.dirname(__file__), "./template_reference_data.tar.gz")
)
PROJECT_DIRECTORY = "/home/user/project/"
MOCK_EXECUTABLE = "/usr/local/bin/python"


def cartesian(**kwargs):
Expand Down Expand Up @@ -143,16 +146,40 @@ def _store_bundled(self, operations):
return bid


def get_masked_flowproject(p, **kwargs):
@contextlib.contextmanager
def get_masked_flowproject(p, environment=None):
"""Mock environment-dependent attributes and functions. Need to mock
sys.executable before the FlowProject is instantiated, and then modify the
root_directory and project_dir elements after creation."""
sys.executable = "/usr/local/bin/python"
fp = TestProject.get_project(root=p.root_directory(), **kwargs)
fp._entrypoint.setdefault("path", "generate_template_reference_data.py")
fp.root_directory = lambda: PROJECT_DIRECTORY
fp.config.project_dir = PROJECT_DIRECTORY
return fp
try:
old_executable = sys.executable
sys.executable = MOCK_EXECUTABLE
fp = TestProject.get_project(root=p.root_directory())
if environment is not None:
fp._environment = environment
fp._entrypoint.setdefault("path", "generate_template_reference_data.py")
fp.config.project_dir = PROJECT_DIRECTORY
old_generate_id = flow.project.FlowGroup._generate_id

def wrapped_generate_id(self, aggregate, *args, **kwargs):
"""Mock the root directory used for id generation.

We need to generate consistent ids for all operations. This
mocking has to happen within this method to avoid affecting other
methods called during the test that access the project root directory.
"""
old_root_directory = fp.root_directory
fp.root_directory = lambda: PROJECT_DIRECTORY
operation_id = old_generate_id(self, aggregate, *args, **kwargs)
fp.root_directory = old_root_directory
return operation_id

flow.project.FlowGroup._generate_id = wrapped_generate_id
yield fp

finally:
sys.executable = old_executable
flow.project.FlowGroup._generate_id = old_generate_id


def main(args):
Expand All @@ -170,79 +197,93 @@ def main(args):

with signac.TemporaryProject(name=PROJECT_NAME) as p:
init(p)
fp = get_masked_flowproject(p)
# Here we set the appropriate executable for all the operations. This
# is necessary as otherwise the default executable between submitting
# and running could look different depending on the environment.
executable = "/usr/local/bin/python"
for group in fp.groups.values():
for op_key in group.operations:
if op_key in group.operation_directives:
group.operation_directives[op_key]["executable"] = executable
for job in fp:
with job:
kwargs = job.statepoint()
env = get_nested_attr(flow, kwargs["environment"])
fp._environment = env
parameters = kwargs["parameters"]
if "bundle" in parameters:
bundle = parameters.pop("bundle")
fn = "script_{}.sh".format("_".join(bundle))
tmp_out = io.TextIOWrapper(io.BytesIO(), sys.stdout.encoding)
with redirect_stdout(tmp_out):
try:
fp.submit(
jobs=[job],
names=bundle,
pretend=True,
force=True,
bundle_size=len(bundle),
**parameters,
)
except jinja2.TemplateError as e:
print("ERROR:", e) # Shows template error in output script

# Filter out non-header lines
tmp_out.seek(0)
with open(fn, "w") as f:
with redirect_stdout(f):
print(tmp_out.read(), end="")
else:
for op in {**fp.operations, **fp.groups}:
if "partition" in parameters:
# Don't try to submit GPU operations to CPU partitions
# and vice versa. We should be able to relax this
# requirement if we make our error checking more
# consistent.
if operator.xor(
"gpu" in parameters["partition"].lower(),
"gpu" in op.lower(),
):
continue
fn = f"script_{op}.sh"
with get_masked_flowproject(p) as fp:
# Here we set the appropriate executable for all the operations. This
# is necessary as otherwise the default executable between submitting
# and running could look different depending on the environment.
for group in fp.groups.values():
for op_key in group.operations:
if op_key in group.operation_directives:
group.operation_directives[op_key][
"executable"
] = MOCK_EXECUTABLE
for job in fp:
with job:
kwargs = job.statepoint()
env = get_nested_attr(flow, kwargs["environment"])
# We need to set the scheduler manually. The FakeScheduler
# is used for two reasons. First, the FakeScheduler prints
# scripts to screen on submission and we can capture that
# output. Second, the FakeScheduler won't try to call any
# cluster executable (e.g. squeue) associated with the real
# schedulers used on supported clusters. Otherwise
# submission would fail when attempting to determine what
# jobs already exist on the scheduler.
env.scheduler_type = FakeScheduler
bdice marked this conversation as resolved.
Show resolved Hide resolved
fp._environment = env
parameters = kwargs["parameters"]
bdice marked this conversation as resolved.
Show resolved Hide resolved
if "bundle" in parameters:
bundle = parameters.pop("bundle")
fn = "script_{}.sh".format("_".join(bundle))
tmp_out = io.TextIOWrapper(io.BytesIO(), sys.stdout.encoding)
with redirect_stdout(tmp_out):
try:
fp.submit(
jobs=[job],
names=[op],
names=bundle,
pretend=True,
force=True,
bundle_size=len(bundle),
**parameters,
)
except jinja2.TemplateError as e:
print(
"ERROR:", e
) # Shows template error in output script

# Filter out non-header lines and the job-name line
# Filter out non-header lines
tmp_out.seek(0)
with open(fn, "w") as f:
with redirect_stdout(f):
print(tmp_out.read(), end="")
else:
for op in {**fp.operations, **fp.groups}:
if "partition" in parameters:
# Don't try to submit GPU operations to CPU partitions
# and vice versa. We should be able to relax this
# requirement if we make our error checking more
# consistent.
if operator.xor(
"gpu" in parameters["partition"].lower(),
"gpu" in op.lower(),
):
continue
fn = f"script_{op}.sh"
tmp_out = io.TextIOWrapper(
io.BytesIO(), sys.stdout.encoding
)
with redirect_stdout(tmp_out):
try:
fp.submit(
jobs=[job],
names=[op],
pretend=True,
force=True,
**parameters,
)
except jinja2.TemplateError as e:
print(
"ERROR:", e
) # Shows template error in output script

# Filter out non-header lines and the job-name line
tmp_out.seek(0)
with open(fn, "w") as f:
with redirect_stdout(f):
print(tmp_out.read(), end="")

# For compactness, we move the output into an ARCHIVE_DIR then delete the original data.
fp.export_to(target=ARCHIVE_DIR)
# For compactness, we move the output into an ARCHIVE_DIR then delete the original data.
fp.export_to(target=ARCHIVE_DIR)


if __name__ == "__main__":
Expand Down
Binary file modified tests/template_reference_data.tar.gz
Binary file not shown.
Loading