Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add default aggregate support to flow #335

Merged
Merged
Show file tree
Hide file tree
Changes from 62 commits
Commits
Show all changes
69 commits
Select commit Hold shift + click to select a range
d74b3a0
Enable storing aggregate information while submission and fetch befor…
kidrahahjo Jul 29, 2020
bc75900
Make a method for fetching aggregates
kidrahahjo Aug 3, 2020
f05cb70
Refactor fetching aggregates
kidrahahjo Aug 3, 2020
6c3c21b
Style fix
kidrahahjo Aug 4, 2020
339345f
Apply suggestions from code review
kidrahahjo Aug 4, 2020
4b58566
Changes made
kidrahahjo Aug 12, 2020
1481754
Add PersmissionError check
kidrahahjo Aug 12, 2020
0899121
Apply suggested changes
kidrahahjo Aug 13, 2020
c9c2610
Fixed _fn_stored method
kidrahahjo Aug 13, 2020
4a29c6b
Refactor _fetch_status
kidrahahjo Aug 13, 2020
09b599d
Apply suggested change
kidrahahjo Aug 14, 2020
03d8590
Generate aggregates on initialization
kidrahahjo Aug 17, 2020
3d4e73a
Change expected number of submit steps
kidrahahjo Aug 17, 2020
c22c74d
Refactor creation of aggregates
kidrahahjo Aug 17, 2020
faa6571
Unable static aggregation and some refactoring
kidrahahjo Aug 18, 2020
d73c75c
Minor refactoring, apply suggested changes
kidrahahjo Aug 18, 2020
4126377
Fix execution command
kidrahahjo Aug 18, 2020
05c8118
Parallelize status update completely, apply suggested change
kidrahahjo Aug 19, 2020
beb2804
Implement by-job order option
kidrahahjo Aug 20, 2020
4496a47
Apply suggested change of merging if else block
kidrahahjo Aug 20, 2020
0c6da40
Apply suggestions from code review and revert the deprecation of get_…
kidrahahjo Aug 21, 2020
0d3402d
Apply suggested changes
kidrahahjo Aug 22, 2020
5493b4b
Refactor changes
kidrahahjo Aug 22, 2020
a3f9dd1
Merge feature/enable-aggregation into this branch
kidrahahjo Sep 1, 2020
d9ec08f
Merge remote-tracking branch 'origin/feature/enable-aggregation' into…
kidrahahjo Sep 9, 2020
6eb00cd
Add support for aggregator classes
kidrahahjo Sep 10, 2020
e0cc590
Resolve issues with pickling
kidrahahjo Sep 14, 2020
98e61f5
Add tests for aggregation
kidrahahjo Sep 19, 2020
4343915
diff-1
kidrahahjo Sep 29, 2020
be961c1
Limit aggregation to aggregates of a single job
kidrahahjo Oct 5, 2020
a5563ac
Apply suggested changes except the status part
kidrahahjo Oct 19, 2020
8ef3910
Merge with origin/feature/enable-aggregation
kidrahahjo Oct 19, 2020
c803ee5
change comparison operator
kidrahahjo Oct 19, 2020
4f9276f
Move get_aggregate_id to match import order.
bdice Oct 29, 2020
d1c2913
Fix formatting of UserOperationError string.
bdice Oct 29, 2020
bf4efc0
Add missing "of" to docstrings.
bdice Oct 29, 2020
ad572a6
Revise _is_selected_aggregate.
bdice Oct 29, 2020
a673662
Fix wording of comments.
bdice Oct 29, 2020
5dbf89d
Improve docs and error message text.
bdice Oct 29, 2020
012d110
Use more descriptive variable name.
bdice Oct 29, 2020
86dfd81
Simplify group registration logic.
bdice Oct 29, 2020
0ad9291
Rename aggregate -> aggregates for consistency with other methods.
bdice Oct 29, 2020
1c2dcb4
Clarify wording of comments.
bdice Oct 29, 2020
4aa0be2
Use flow_config.get_config_value.
bdice Nov 3, 2020
791e0b5
Revert "Use flow_config.get_config_value."
bdice Nov 3, 2020
3932832
Use select to ignore unwanted flake8 plugins.
bdice Nov 6, 2020
48cbf7a
Apply suggested changes and fix linting issues
kidrahahjo Nov 8, 2020
d6890fe
Update with master
kidrahahjo Nov 8, 2020
d063e52
make _is_selected_aggregate a static method
kidrahahjo Nov 8, 2020
103bb74
Merge branch 'feature/enable-aggregation' into feature/enable-aggrega…
bdice Nov 9, 2020
a4f0fe6
standardize serialization
kidrahahjo Nov 9, 2020
5a8da4d
Store error state in "else" after "try."
bdice Nov 11, 2020
702c3f1
Rename singleton_groups.
bdice Nov 11, 2020
43ec9a9
Use if instead of try.
bdice Nov 11, 2020
d42c1da
Refactor _job_operations, inlined into deprecated next_operations fun…
bdice Nov 11, 2020
d4dc187
Remove useless except block.
bdice Nov 11, 2020
deed3db
Convert list to tuple, edit comments, simplify try except by adding f…
kidrahahjo Nov 11, 2020
5087981
correct status profile related math error
kidrahahjo Nov 11, 2020
b540cf2
Simplify iteration logic in aggregate stores
kidrahahjo Nov 12, 2020
84091ef
Merge with enable-aggregation to fix exec CLI issue
kidrahahjo Nov 12, 2020
3b70cc8
fix doc
kidrahahjo Nov 12, 2020
66baf64
get status of only singleton groups in _get_operation_status
kidrahahjo Nov 12, 2020
59aae9d
Make aggregate store classes act like mapping, improve variable names…
kidrahahjo Nov 13, 2020
0c60b18
Fix docstring.
bdice Nov 14, 2020
4c8c118
Re-raise LookupError from job contains check.
bdice Nov 14, 2020
a29edc9
Use Mapping instead of MutableMapping.
bdice Nov 14, 2020
00fd1d2
Minor docstring improvements.
bdice Nov 14, 2020
ee66593
Merge branch 'feature/enable-aggregate-status-logic' of github.com:gl…
bdice Nov 14, 2020
a5bcfa8
Improve docstring.
bdice Nov 14, 2020
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions flow/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
from . import scheduling
from . import errors
from . import testing
from .aggregates import get_aggregate_id
from .project import IgnoreConditions
from .project import FlowProject
from .project import JobOperation
Expand All @@ -38,6 +39,7 @@
'scheduling',
'errors',
'testing',
'get_aggregate_id',
'IgnoreConditions',
'FlowProject',
'JobOperation',
Expand Down
49 changes: 28 additions & 21 deletions flow/aggregates.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
# All rights reserved.
# This software is licensed under the BSD 3-Clause License.
import itertools
from collections import OrderedDict
from collections.abc import Iterable
from hashlib import md5

Expand Down Expand Up @@ -44,7 +45,7 @@ def foo(*jobs):
def __init__(self, aggregator_function=None, sort_by=None, sort_ascending=True, select=None):
if aggregator_function is None:
def aggregator_function(jobs):
return [jobs]
return tuple([jobs]) if jobs else ()
kidrahahjo marked this conversation as resolved.
Show resolved Hide resolved

if not callable(aggregator_function):
raise TypeError("Expected callable for aggregator_function, got "
Expand Down Expand Up @@ -278,35 +279,35 @@ def __init__(self, aggregator, project):
self._aggregator = aggregator

# We need to register the aggregates for this instance using the
# project provided.
self._aggregates = []
self._aggregate_ids = {}
# project provided. After registering, we store the aggregates
# mapped with the ids using the `get_aggregate_id` method.
self._aggregate_per_id = OrderedDict()
self._register_aggregates(project)

def __iter__(self):
yield from self._aggregates
yield from self._aggregate_per_id.values()
bdice marked this conversation as resolved.
Show resolved Hide resolved
bdice marked this conversation as resolved.
Show resolved Hide resolved

def __getitem__(self, id):
"Return an aggregate, if exists, using the id provided"
try:
return self._aggregate_ids[id]
return self._aggregate_per_id[id]
except KeyError:
raise LookupError(f'Unable to find the aggregate having id {id} in '
'the FlowProject')

def __contains__(self, aggregate):
def __contains__(self, id):
"""Return whether an aggregate is stored in the this
bdice marked this conversation as resolved.
Show resolved Hide resolved
instance of :py:class:`_AggregateStore`

:param aggregate:
An aggregate of jobs.
:type aggregate:
tuple of :py:class:`signac.contrib.job.Job`
:param id:
The id of an aggregate of jobs.
:type id:
str
"""
return get_aggregate_id(aggregate) in self._aggregate_ids
return id in self._aggregate_per_id

def __len__(self):
return len(self._aggregates)
return len(self._aggregate_per_id)

def __eq__(self, other):
return type(self) == type(other) and self._aggregator == other._aggregator
Expand All @@ -316,7 +317,7 @@ def __hash__(self):

def _register_aggregates(self, project):
"""If the instance of this class is called then we will
generate aggregates and store them in ``self._aggregates``.
generate aggregates and store them in ``self._aggregate_per_id``.
"""
aggregated_jobs = self._generate_aggregates(project)
self._create_nested_aggregate_list(aggregated_jobs, project)
Expand Down Expand Up @@ -355,10 +356,8 @@ def _validate_and_filter_job(job):
filter_aggregate = tuple(filter(_validate_and_filter_job, aggregate))
except TypeError: # aggregate is not iterable
ValueError("Invalid aggregator_function provided by the user.")
# Store aggregate in this instance
self._aggregates.append(filter_aggregate)
# Store aggregate by their ids in order to search through id
self._aggregate_ids[get_aggregate_id(filter_aggregate)] = filter_aggregate
self._aggregate_per_id[get_aggregate_id(filter_aggregate)] = filter_aggregate


class _DefaultAggregateStore:
Expand Down Expand Up @@ -387,13 +386,21 @@ def __getitem__(self, id):
except KeyError:
raise LookupError(f"Did not find aggregate with id {id}.")

def __contains__(self, aggregate):
def __contains__(self, id):
"""Return whether the job is present in the project associated with this
instance of :py:class:`_DefaultAggregateStore`.

:param id:
The job id.
:type id:
str
"""
# signac-flow internally assumes every aggregate to be a tuple.
# Hence this method will also get a tuple as an input.
return len(aggregate) == 1 and aggregate[0] in self._project
try:
self._project.open_job(id=id)
except KeyError:
return False
bdice marked this conversation as resolved.
Show resolved Hide resolved
else:
return True

def __len__(self):
return len(self._project)
Expand Down
Loading