Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Unify group and aggregate storage with a bi-map data structure. #415

Merged
merged 13 commits into from
Jan 6, 2021

Conversation

bdice
Copy link
Member

@bdice bdice commented Jan 3, 2021

Description

Introduces a bidirectional map data structure that allows for a unification of group / aggregate registration and storage mechanisms in the FlowProject.

Motivation and Context

Using a bidict data structure reduces redundancy. It eliminates superfluous private methods and makes all accesses O(1) whether by group or by aggregator.

This is an intermediate step for unifying and optimizing the status fetching logic with run/submit logic.

Types of Changes

  • Documentation update
  • Bug fix
  • New feature
  • Breaking change1

1The change breaks (or has the potential to break) existing functionality.

Checklist:

If necessary:

  • I have updated the API documentation as part of the package doc-strings.
  • I have created a separate pull request to update the framework documentation on signac-docs and linked it here.
  • I have updated the changelog.

@bdice bdice requested review from a team as code owners January 3, 2021 07:06
@bdice bdice requested review from atravitz and Charlottez112 and removed request for a team January 3, 2021 07:06
Copy link
Collaborator

@atravitz atravitz left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This seems much more intuitive and efficient! I just have a few things I'd like to understand

flow/aggregates.py Outdated Show resolved Hide resolved
@@ -2082,7 +2079,7 @@ def _get_operations_status(self, jobs, cached_status):

groups = [self._groups[name] for name in self.operations]
for group in groups:
if get_aggregate_id(jobs) in self._get_aggregate_store(group.name):
if get_aggregate_id(jobs) in self._group_to_aggregator[group]:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this and line 2186 are where we get the performance boost, yes?

Copy link
Member Author

@bdice bdice Jan 5, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That is a small speedup, yes. The bigger performance boost is from replacing code like:

for group in groups:
    for aggregate in self._get_aggregate_store(group.name).values()
        ...

with the selection logic that iterates over aggregates, then groups. This is partially implemented in this PR and more thoroughly implemented in #416.

flow/project.py Show resolved Hide resolved
flow/project.py Outdated Show resolved Hide resolved
@bdice
Copy link
Member Author

bdice commented Jan 4, 2021

There are some template tests that are failing due to (nondeterministic?) re-ordering of operations that run in parallel. From what I can tell, we don't enforce a sorting order for the output script when operations are launched in parallel. I think this choice of ordering is arbitrary (it shouldn't matter for execution) but we may want to enforce a sorting order (e.g. sorted by the _JobOperation id) to prevent template test failures.

@kidrahahjo
Copy link
Collaborator

@bdice this is fantastic. I loved the way it is implemented.

One question, the reason to store values like before was due to memory issues that can come into play. Now we're storing the mapping of the group to its aggregate store ({group1: default_store. group2: default_store}) as well as the mapping of an aggregate store to all the groups it's associated to ({default_store: [group1, group2]}).

Wouldn't this be a problem?

@kidrahahjo
Copy link
Collaborator

There are some template tests that are failing due to (nondeterministic?) re-ordering of operations that run in parallel. From what I can tell, we don't enforce a sorting order for the output script when operations are launched in parallel. I think this choice of ordering is arbitrary (it shouldn't matter for execution) but we may want to enforce a sorting order (e.g. sorted by the _JobOperation id) to prevent template test failures.

I agree with @bdice. While analysing the test failure output, I saw that when bundling, the order of operations is not the same as intended. A quick fix would be something like this:

for bundle in _make_bundles(operations, bundle_size):
  def key_func(op):
      return op._id
  bundle = sorted(bundle, key=key_func)

But I am not sure if we should go in this direction but it's a small start.

Copy link
Member

@b-butler b-butler left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I like the use of the bidirectional mapping.


def __delitem__(self, key):
"""Delete the provided key."""
self.inverse.setdefault(self[key], []).remove(key)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need setdefault here? If we set self.inverse[self[key]] = [], then remove will always just raise an ValueError. self.inverse[self[key]] should exist or the class would be in an invalid state.

Copy link
Member Author

@bdice bdice Jan 5, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This might be true. I got this implementation from a StackOverflow post and cleaned up a few pieces of it, but I don't have any tests for the class. I think your statement seems reasonable and changing that worked in my small amount of local testing. I also looked at the StackOverflow post comments but nobody had mentioned that piece of the code. Fixed in adf5370.

Copy link
Contributor

@vyasr vyasr Jan 5, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree with Brandon's analysis. setdefault is required in __setitem__ and in the constructor (to account for duplicate keys).

flow/util/misc.py Outdated Show resolved Hide resolved
flow/project.py Outdated Show resolved Hide resolved
flow/project.py Show resolved Hide resolved
flow/util/misc.py Outdated Show resolved Hide resolved
@bdice bdice requested review from vyasr, atravitz and b-butler January 6, 2021 01:19
@vyasr
Copy link
Contributor

vyasr commented Jan 6, 2021

I'm fine with the contents of this PR and with how my comment was resolved, but @atravitz, @b-butler, and @kidrahahjo have given this a more thorough review than I have so I'm going to remove myself as a reviewer and defer to their evaluations rather than approving.

@vyasr vyasr removed their request for review January 6, 2021 01:31
@bdice
Copy link
Member Author

bdice commented Jan 6, 2021

I fixed the template tests in #416. Reviewers, please ignore the failing tests on this PR.

Copy link
Collaborator

@kidrahahjo kidrahahjo left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have some minor suggestions/questions, apart from them, the PR is looking good.

flow/util/misc.py Outdated Show resolved Hide resolved
tests/test_util.py Outdated Show resolved Hide resolved
tests/test_util.py Show resolved Hide resolved
# Skip iteration over aggregate store if no groups match the
# specified names
continue
for aggregate in aggregate_store.values():
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
for aggregate in aggregate_store.values():
for aggregate in aggregate_store.values():
if aggregate not in aggregates:
continue

This is a really small optimization which makes a difference when only a few aggregates/jobs are provided by the user.

Given this, we can now remove the exact same check in the select method above.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I will look at this in #416, I refactored this code and may have already made the change you're suggesting.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the new approach in #416 is similar to what you were looking for here. Please comment on that PR in the method _generate_selected_aggregate_groups if not. Here's the logic:

if selected_aggregates is not None:
    # Use selected aggregates in the aggregate store
    for aggregate in aggregate_progress_wrapper(selected_aggregates):
        aggregate_id = get_aggregate_id(aggregate)
        if aggregate_id in aggregate_store:
            for group in matching_groups:
                yield aggregate_id, aggregate, group
else:
    # Use all aggregates in the aggregate store
    for aggregate_id, aggregate in aggregate_progress_wrapper(
        aggregate_store.items()
    ):
        for group in matching_groups:
            yield aggregate_id, aggregate, group

Comment on lines +3479 to +3484
if (
group._eligible(aggregate, ignore_conditions)
and self._eligible_for_submission(
group, aggregate, cached_status
)
and self._is_selected_aggregate(aggregate, aggregates)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is not group-specific so we can consider checking this in the outer loop and continue if it's not the selected aggregate.

Also, I feel like the _is_selected_aggregate method is now redundant because we simply just check the availability of aggregate in aggregates.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is handled in #416.

aggregate_store,
aggregate_groups,
) in self._group_to_aggregator.inverse.items():
matching_groups = set(aggregate_groups) & run_groups
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we have tests to check that the groups don't get executed unless the user specifically asked for them?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not entirely sure but I think we have coverage of that. I'll defer this to a follow-up PR.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I confirmed that tests fail on #416 if this check is not performed correctly (if more groups are run/status-fetched/submitted than were requested).

@@ -251,3 +251,50 @@ def _cached_partial(func, *args, maxsize=None, **kwargs):

"""
return lru_cache(maxsize=maxsize)(partial(func, *args, **kwargs))


class _bidict(dict):
Copy link
Member

@b-butler b-butler Jan 6, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@bdice Also do note that update will not call __setitem__, pop will not call __delitem__, setdefault will not call __setitem__ and so on. The CPython dict class is optimized to not use these paradigms that exist in abc.collections. We should probably either disable these methods return NotImplementedError or implement all methods that involve setting or deleting. I forgot this when reviewing yesterday.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@b-butler That's a really helpful observation, I didn't realize that. I re-implemented the class as a MutableMapping with an internal dictionary, so it gets all of those methods "for free." Also, I added tests for all of the methods expected from a MutableMapping. Fixed in 0789f2a.

Copy link
Member

@b-butler b-butler left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I left a comment about the implementation of _bidict. Also I would like to test some of the methods mentioned in the comment in your _bidict test. Beyond that I am ready to merge.

@bdice
Copy link
Member Author

bdice commented Jan 6, 2021

I'm going to go ahead and merge this into next. I will handle the remaining suggestions/comments in #416. The two PRs are very closely related and I think #416 will fix most of the problems that we have identified in this PR. Thank you so much to all reviewers for the thoughtful input!

@bdice bdice merged commit d7f022f into next Jan 6, 2021
@bdice bdice deleted the feature/unify-group-aggregate-storage branch January 6, 2021 22:11
bdice added a commit that referenced this pull request Jan 20, 2021
… and documentation. (#427)

* Change the submission ID for aggregate logic (#334)

Changes the submission id generation to prepare for aggregation. The `__str__` method of `_JobOperation` is also changed to better handle job aggregation. The unique id generated using a hash of all the `_JobOperation` object's data is still the same, so the uniqueness should not be in question; the only change is to the readable part of the id.

The change to the `str` implementation mimics NumPy, and the submission id favors brevity.

* Add aggregator classes in flow (#348)

Add a new decorator by which operations can be tagged for various types of aggregation, along with classes to store actual aggregates once they have been generated.

* Fix hashing of aggregate classes, rename reverse_order to sort_ascending (#351)

* Use sort_ascending instead of reverse_order.

* Fix and simplify hash methods.

* Clarify docstrings.

* Initialize list/dict with literals.

* Use one-line function instead of defining a separate method.

* Don't wrap line.

* class -> object.

* use itertools reference while using groupby

* Use reference for zip_longest too

* Document _get_unique_function_id

Co-authored-by: Hardik Ojha <[email protected]>

* Add default aggregate support to flow (#335)

Change the internals of flow so that everything operates on default aggregates (aggregates of size 1) rather than individual jobs.

* Use tqdm.auto for better notebook compatibility. (#371)

* Remove unused attributes/methods. (#380)

* Remove unused __init__, the implicit parent constructor is equivalent.

* Use public .job attribute.

* Remove unused argument from _add_operation_selection_arg_group.

* Use logger.warning instead of deprecated logger.warn.

* Refactor error variables and messages. (#373)

* Remove unused **kwargs from _FlowProjectClass metaclass constructor.

* Raise ImportError instead of RuntimeWarning if pprofile is not installed (it's a hard error).

* Deprecate CPUEnvironment and GPUEnvironment (#381)

* Deprecate CPUEnvironment and GPUEnvironment. The classes are unused and untested.

* Remove unused private functions for importing from files.

* Update changelog.

* Make _verify_group_compatibility a class method. (#379)

* Use isinstance check for IgnoreConditions. (#378)

* Remove dangling else/elif clauses after raise/return statements. (#377)

Introduces stylistic changes suggested by pylint.

* Use f-string.

* Linting dicts. (#374)

* Refactor dicts, OrderedDict, and comprehensions. Resolves #323.

* Import Mapping from collections.abc.

* Use {}

* Use descriptive variable names. (#376)

* Use longer/clearer variable names.

* Update _no_aggregation.

* Rename _operation_to/from_tuple to _job_operation_to/from_tuple

Co-authored-by: Brandon Butler <[email protected]>

* Make _to_hashable a private function. (#384)

* Make _to_hashable private.

* Copy _to_hashable method from signac.

* Adding/improving docstrings and comments. (#375)

* Add and improve docstrings.

* Minor edits to docstrings.

* Use shorter one-line summaries in some long docstrings.

* Cleanup docstrings, variable names. (#372)

* Clean up docs and variables relating to pre-conditions and post-conditions.

* Clean up docstrings, variable names, defaults, remove obsolete helper functions.

* Use f-strings, avoid second-person "you".

* Use full name for template_filters.

* Docstring revisions.

* Use job_id consistently (adds to #363). (#386)

* Fix other instances of job_id missed in #363.

* Update changelog.

* Use descriptive variable name job_id.

* Add all missing docstrings, enforce pydocstyle. (#387)

Validates docstrings using pydocstyle and adds all the necessary documentation for this to pass.

* Revise Sphinx docs. (#389)

* Enforce pydocstyle more broadly.

* Ignore hidden folders.

* Use temporary config that allows missing docstrings.

* Apply changes from pydocstyle and minor edits for clarity.

* Add more docstrings.

* Re-enable D102.

* Add all missing docstrings for public methods.

* Update changelog.

* Remove duplicate summary of FlowProject.

* Don't use syntax highlighting in CLI help message.

* Revise environment docstrings.

* Update link for Comet docs.

* Document class property ALIASES.

* Revise copy_from.

* Revise label docstring.

* Add operations and base environment classes to docs.

* Standardize references to signac Jobs.

* Update APIs and references.

* Revise docstrings.

* Rename dummy state to placeholder.

* Update ComputeEnvironmentType docstrings.

* Explicitly list classes/functions to document.

* Remove all API from root of scheduling subpackage.

* Revise aggregation docs.

* Improve IgnoreConditions docs.

* Remove :py: (role is unnecessary).

* Docstring revisions.

* Apply suggestions from code review

Co-authored-by: Vyas Ramasubramani <[email protected]>

* Apply suggestions from code review

Co-authored-by: Vyas Ramasubramani <[email protected]>

* Update flow/project.py

Co-authored-by: Bradley Dice <[email protected]>

* Update flow/project.py

* Clarify simple-scheduler purpose.

Co-authored-by: Vyas Ramasubramani <[email protected]>

* Use NumPy style docstrings. (#392)

* Use napoleon to render numpydoc style.

* NumPy-doc-ify aggregates.

* Fix name of aggregator class.

* Revisions to environment docstrings.

* Revisions to aggregate docstrings.

* NumPy-doc-ify INCITE environments.

* NumPy-doc-ify remaining environments.

* NumPy-doc-ify render_status.

* NumPy-doc-ify scheduler base classes.

* NumPy-doc-ify LSF scheduler.

* NumPy-doc-ify simple scheduler.

* NumPy-doc-ify SLURM scheduler.

* NumPy-doc-ify Torque scheduler.

* NumPy-doc-ify template.

* NumPy-doc-ify testing.

* NumPy-doc-ify util.

* NumPy-doc-ify project.

* Fix remaining sphinx-style docstrings.

* Updated docstrings with velin (https://github.com/Carreau/velin).

* Adopt numpy convention for pydocstyle.

* Remove methods that are implemented identically in the parent class.

* Add call operators to docs.

* Fix FlowGroup reference.

* Reformat examples.

* Remove trailing colons where no type is specified.

* Fix class reference.

* Fix (deprecated) JobOperation references.

* Use preconditions/postconditions.

* Add changelog line.

* Apply suggestions from code review

Co-authored-by: Brandon Butler <[email protected]>

* Move env argument.

* Fix typo.

* Apply suggested changes.

* Add docs to simple-scheduler, don't git ignore the bin directory containing simple-scheduler.

Co-authored-by: Brandon Butler <[email protected]>

* Optimization: Don't use min_len_unique_id in str(_JobOperation). (#400)

* Don't call min_len_unique_id when converting _JobOperation to a string. It's O(N) and is called O(N) times, leading to O(N^2) behavior and significant performance penalties.

* Update template reference data to use long job ids.

* Evaluate run commands lazily. (#396)

Cache lazily generated run commands to avoid side affects associated with the generation.
Also avoids introducing any side effects due to f-string evaluation when generating logging entries.

* Add missing raise statement.

* Initialize variables early to prevent unbound values.

* Use same boolean logic as elsewhere for determining eligibility.

* Use job id instead of string call.

* Try to return immediately instead of calling both __contains__ and __getitem__.

* Add codecov support to CI. (#405)

* Add codecov support.

* Remove mock (not used).

* Get executable directive at call time instead of definition time (#402)

* Make DefaultAggregateStore picklable (#383)

* Make default aggregate class picklable

* Refactor pickling

* Refactor code comments

* Add __getstate__, __setstate__ methods to FlowProject

* Make cloudpickle a hard dependancy

* Fix linting issues

* Minor changes to variable names.

* Use f-string.

* Solve circular reference issue during unpickling by pre-computing hash value.

* Cache actual hash value.

Co-authored-by: Bradley Dice <[email protected]>

* Show total for aggregate status info.

* Simplify implementation of __iter__, avoid duplicated logic.

* Optimize _get_aggregate_from_id to avoid open_job call. Speeds up function by about 2x for default aggregators.

* Fail gracefully when expanding bundles if no jobs are on the scheduler (e.g. using TestEnvironment with a fake scheduler).

* Minor changes to variable names and docstrings.

* Fix Iteration Bug on next and add AggregatesCursor to flow (#390)

* Change method name, fix return for _select_jobs_from_args

* Add AggregatesCursor class to flow

* make AggregatesCursor private

* Fix import error

* Simplify iteration logic.

* Update docstring.

* Fix iteration order.

* Use set comprehension.

* Parse filter arguments in the CLI, pass through filters to JobsCursor.

* Rename internal variable to _jobs_cursor.

* Fix docstring errors.

* Refactor jobs-to-aggregates conversion.

* Update changelog.

* Optimize __contains__ for case with no filters.

* Simplify tqdm bar description.

* Update comment.

* Update comment.

* Update comment.

* Update variable name.

Co-authored-by: Bradley Dice <[email protected]>

* Small updates to docstrings and small refactors.

* Optimization: use cached status everywhere. (#410)

Prevents repeatedly checking the project document's file for status, by using a cached status for all methods that require submission status.

Squashed commit messages:
* Ensure that all status-fetching operations use a cached status dictionary.

* Use cached status in script command.

* Use cached_status and keyword arguments in tests.

* Fix test of submission operations.

* Update changelog.

* Remove FlowGroup._get_status. Resolves #368.

* Ensure that job operations are created.

* Update docstring.

* Remove pass-through argument for cached_status.

* Optimize job id generation to avoid expensive JSON encoding step (it's a plain string).

* Use context manager functionality of pools (requires Python 3.4+ and hadn't been updated).

* Add explicit test for threaded parallelism.

* Unify group and aggregate storage with a bi-map data structure. (#415)

* Unify group and aggregate storage with a bidirectional map data structure.

* Update comment.

* Swap iteration order of generating run operations.

* Improve __eq__ implementation.

* Update comment.

* Improve bidict.__delitem__.

* Fix bidict docstring.

* Only create aggregate stores once.

* Add tests for bidict.

* Make bidict private.

* Make bidict a MutableMapping, add more bidict tests.

* Add comment about insertion ordering.

* Optimization: don't deepcopy directives (#421)

* Modify _JobOperation construction to not deepcopy directives

Deepcopying directives is expensive since it is not a pure data
structure. This commit just converts the directives to a pure data
structure (i.e. a ``dict``). In addition, we pass the user specified
directives (as opposed to the environment specified directives) to
``_JobOperation``. This should improve the performance of run and
submit.

* Only evaluate directives when unevaluated.

Adds a check to prevent the same directive from being evaluated twice.
This should speed up the evaluation of multi-operation groups.

* Add back (fixed) comment about user directives.

* Update docstring.

* Use dict constructor instead of dict comprehension.

Co-authored-by: Bradley Dice <[email protected]>

* Unify group/aggregate selection (#416)

* Update argument name for get_aggregate_id.

* Update hashing.

* Add shared logic for group/aggregate selection.

* Reduce redundancy in methods using group/aggregate selection.

* Revert to by-job ordering in pending operations.

* Rename test.

* Add optimization if all selected groups share an aggregate store.

* Refactor group aggregate store optimization.

* Yield aggregate_id during selection iteration. Rename jobs to aggregate. Unify more selection logic.

* Use aggregate id in group id generation.

* Simplify _SubmissionJobOperation constructor.

* Make order of run operations deterministic (avoid set subtraction).

* Remove duplicate code for label fetching.

* Update template reference data.

* Remove _is_selected_aggregate method (replaced with "in" check).

* Skip postconditions check for eligibility if the group/aggregate is known to be incomplete.

* Rename variable.

* Only consider selected aggregates.

* Fix comment.

* Rename blob to id_string.

* Move check for zero selected groups.

* Add TODO.

* Add comment explaining eligibility ignoring postconditions.

* Remove index parameter (#423)

* Remove index parameter (resolves #418).

* Update template reference data.

* Remove the use of arguments scheduled to be removed in 0.12 (#424)

* Remove the use of  argument

* Remove the use of env attribute while submit

* Pass environment as keyword argument.

* Remove env argument in a few missed places.

* Fix template reference data. It was using mpiexec (default) and should use the cluster-specific MPI commands. I don't know why the reference data was wrong.

* Remove extra comment about status parallelization.

Co-authored-by: Bradley Dice <[email protected]>

* Update tqdm requirement.

* Deprecate unbuffered mode (#425)

* Always buffer, add deprecation warning if use_buffered_mode=False.

* Remove unbuffered tests.

* Reduce number of jobs to speed up tests.

* Update changelog.

* Update deprecation notice.

* Refactoring tasks (#426)

* Refactor status rendering into a private function.

Co-authored-by: Alyssa Travitz <[email protected]>
Co-authored-by: Hardik Ojha <[email protected]>

* Renamed fakescheduler to fake_scheduler.

Co-authored-by: Alyssa Travitz <[email protected]>
Co-authored-by: Hardik Ojha <[email protected]>

* Make environment metaclass private.

Co-authored-by: Alyssa Travitz <[email protected]>
Co-authored-by: Hardik Ojha <[email protected]>

* Remove unused status.py file.

Co-authored-by: Alyssa Travitz <[email protected]>
Co-authored-by: Hardik Ojha <[email protected]>

* Remove out argument from templates.

Co-authored-by: Alyssa Travitz <[email protected]>
Co-authored-by: Hardik Ojha <[email protected]>

* Refactor scheduler classes, unify code.

Co-authored-by: Alyssa Travitz <[email protected]>
Co-authored-by: Hardik Ojha <[email protected]>

* Remove metaclass from docs.

Co-authored-by: Alyssa Travitz <[email protected]>
Co-authored-by: Hardik Ojha <[email protected]>

* Update docstrings.

* Update docstrings.

* Update changelog.

* Update docstrings.

* Update changelog.txt

* Update changelog.txt

Co-authored-by: Alyssa Travitz <[email protected]>
Co-authored-by: Hardik Ojha <[email protected]>

* Refactor aggregation (#422)

Cleans up various internals associated with aggregation, introducing additional abstraction layers and adding docstrings.  

* Refactor aggregation.

* Fix failing tests from refactor.

* Improve clarity and coverage of tests.

* Add more tests to enforce that aggregates are tuples.

* Expand tests of default aggregator.

* Update tests.

* Unify __iter__.

* __getitem__ should raise a KeyError if not found.

* Update error message.

* Update flow/aggregates.py

Co-authored-by: Vyas Ramasubramani <[email protected]>

* Simplify _register_aggregates method. (#430)

* New _fetch_status code. (#417)

Rewrite the _fetch_status method, which was extremely large and unwieldy. The new code is more efficient since it avoids making multiple passes through the jobs. It also uses some newly introduced functions and context managers as well as newer tqdm functions to clean up existing code.

* Initial rewrite of _fetch_status.

* Update _fetch_scheduler_status to require aggregates.

* Clarify docstring for status_parallelization.

* Experiment with callback approach (only iterate over aggregates/groups once).

* Skip evaluation of post-conditions during eligibility if known to be incomplete.

* Change operations to groups.

* Use context manager for updating the cached status in bulk.

* Separate scheduler query from cached status update.

* Fix docstring.

* Use context manager for _fetch_scheduler_status cache update.

* Use cached status context manager for _fetch_status.

* Refactor status updates so the scheduler query happens on-the-fly and only considers selected groups/aggregates.

* Intermediate commit with notes.

* Simplify eligibility check.

* Update docstring.

* Update tests to account for removed method.

* Rename cid to cluster_id.

* Define parallel executor.

* Only update status if the update dictionary has data.

* Add parallel execution.

* Remove status callback.

* Clean up status fetching.

* Remove unnecessary internal methods.

* Fix test (scheduler is queried internally).

* Rename jobs -> aggregate.

* Fix bug in tests (the dynamically altered jobs should NOT be resubmitted, this was probably an error due to the use of cached status rather than querying the scheduler).

* Show aggregate id in str of _JobOperation.

* Fix script output.

* Remove keyword argument.

* Reset MockScheduler in setUp for all tests.

* Mock root directory (can't override Project.root_directory() because it is used for all job document paths and buffering, and must reflect an actual path on disk).

* Update template reference data.

* Refactor mocked root.

* Pickle function to be executed in parallel using cloudpickle so that it can be sent across threads.

* Fix pre-commit hooks.

* Fix root directory mocking.

* Improve progress iterators.

* Use chunked job label fetching.

* Refactor parallel executor to accept one iterable and use Pool for process parallelism.

* Use integers in the cached status update.

* Fix mocking of system executable. Resolves #413.

* Update changelog.

* Mock environment directly rather than using **kwargs for compatibility with signac 1.0.0.

* Buffer during project status update.

* Use ordered results.

* Don't buffer during status update (no performance difference).

* Refactor job labels so that the list of individual jobs is generated during the same single loop over the project to generate aggregates.

* Update flow/util/misc.py

Co-authored-by: Vyas Ramasubramani <[email protected]>

* Fix function parameters in test, use kwargs for _fetch_status.

* Use process_map.

* Use MOCK_EXECUTABLE.

* Add comments explaining use of FakeScheduler.

* Collect errors that occur during status evaluation.

* Mock the id generation method instead of injecting mocked attributes.

Co-authored-by: Vyas Ramasubramani <[email protected]>
Co-authored-by: Vyas Ramasubramani <[email protected]>

* Change reference name from fakescheduler to fake_scheduler (#431)

* Make class aggregator and method get_aggregate_id private (#432)

* Make aggregator and get_aggregate_id private

* Remove reference from docs

* Use private name for _aggregator in test_aggregates.py.

* Remove aggregates from documentation, add TODOs.

* Fix docstring reference.

* Update changelog.

Co-authored-by: Bradley Dice <[email protected]>

Co-authored-by: Hardik Ojha <[email protected]>
Co-authored-by: Vyas Ramasubramani <[email protected]>
Co-authored-by: Brandon Butler <[email protected]>
Co-authored-by: Alyssa Travitz <[email protected]>
Co-authored-by: Vyas Ramasubramani <[email protected]>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants