Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Aggregation Feature to Flow #336

Closed
wants to merge 96 commits into from
Closed
Changes from all commits
Commits
Show all changes
96 commits
Select commit Hold shift + click to select a range
4eb97a0
Add the private Aggregate class having fundamental concepts
kidrahahjo Jul 15, 2020
07c5e16
Tests for _Aggregate class added
kidrahahjo Jul 15, 2020
5cc7912
Logic for _condition, BaseFlowOperation, FlowGroup added
kidrahahjo Jul 15, 2020
b9c4655
Logic extended for exec command
kidrahahjo Jul 15, 2020
9a7ca7e
Logic extended for next command
kidrahahjo Jul 15, 2020
15b79cf
Logic for run command
kidrahahjo Jul 15, 2020
df99e50
Logic for script command added
kidrahahjo Jul 15, 2020
5f1e060
Logic for submit and status enabled
kidrahahjo Jul 16, 2020
0a33a27
Make tests pass, replaced by-job grouping with by-op grouping
kidrahahjo Jul 16, 2020
6a37920
Style Fix
kidrahahjo Jul 16, 2020
39b4a3c
Remove print statement
kidrahahjo Jul 16, 2020
7478f4d
Suggested changes implemented with refactoring of Aggregate class and…
kidrahahjo Jul 16, 2020
b5ba89f
Documentation Edit and implement suggested changes
kidrahahjo Jul 17, 2020
1b07f7b
Improve status fetching performance
kidrahahjo Jul 18, 2020
6da1cf1
Use get_id() instead of the id property
kidrahahjo Jul 18, 2020
eadb031
Remove the use of Aggregate class and refactored
kidrahahjo Jul 23, 2020
a2446e5
Resolve merge conflicts
kidrahahjo Jul 23, 2020
b937d46
Minute document change
kidrahahjo Jul 23, 2020
be56b51
Apply suggested changes
kidrahahjo Jul 26, 2020
f266987
Merge upstream/master
kidrahahjo Jul 26, 2020
a4f51e2
Deprecate eligible and complete methods of Flow{Group|Operation}
kidrahahjo Jul 27, 2020
729dca0
Add support with private class _JobOperation
kidrahahjo Jul 27, 2020
0fd78b5
Merge remote-tracking branch 'upstream/master' into enable-aggregate-…
kidrahahjo Jul 27, 2020
fab2f68
Move property method job to JobOperation only
kidrahahjo Jul 28, 2020
fcb29de
Change the submission ID for aggregate support
kidrahahjo Jul 29, 2020
abbd6d1
Indentation fix
kidrahahjo Jul 29, 2020
26928dc
Merge remote-tracking branch 'upstream/master' into enable-submission…
kidrahahjo Jul 30, 2020
14f9fa9
Merge remote-tracking branch 'upstream/master' into enable-aggregate-…
kidrahahjo Jul 30, 2020
c11f40b
Merge branch 'enable-aggregate-logic' into enable-submission-aggregat…
kidrahahjo Jul 30, 2020
8dc5a95
Apply suggested changes
kidrahahjo Jul 31, 2020
4efa480
Merge remote-tracking branch 'origin/enable-aggregate-logic' into ena…
kidrahahjo Jul 31, 2020
bd26aa8
Apply suggested changes
kidrahahjo Aug 3, 2020
39ec72b
Merge branch 'enable-aggregate-logic' into enable-submission-aggregat…
kidrahahjo Aug 3, 2020
702ae4e
Enable storing aggregate information while submission and fetch befor…
kidrahahjo Jul 29, 2020
05855af
Make a method for fetching aggregates
kidrahahjo Aug 3, 2020
fd2aaee
Implementation of _aggregate and _select classes
kidrahahjo Jun 30, 2020
857b54c
Tests added for aggregate and select class with refactoring
kidrahahjo Jun 30, 2020
c3154f1
Indentation fix
kidrahahjo Jun 30, 2020
2cca72a
Merged select and aggregate class into one aggregate class
kidrahahjo Jul 7, 2020
7e66463
Improved visual indentation
kidrahahjo Jul 7, 2020
355a9bf
Used autopep8 for indentation fix
kidrahahjo Jul 7, 2020
35e8295
Bad Lint
kidrahahjo Jul 8, 2020
3b41151
Aggregate class now also handles making of aggregated while calling
kidrahahjo Jul 9, 2020
c1e69cc
Minor edit
kidrahahjo Jul 9, 2020
760de94
Support for JobCursor added
kidrahahjo Jul 10, 2020
8a7f630
Style fix
kidrahahjo Jul 10, 2020
417ff18
Suggested changes made
kidrahahjo Jul 16, 2020
b720363
Minor change
kidrahahjo Jul 16, 2020
e460b70
Make a seperate function to create MakeAggregate instance
kidrahahjo Jul 23, 2020
441db26
Introduce Aggregator class in flow
kidrahahjo Aug 3, 2020
c300013
Refactor fetching aggregates
kidrahahjo Aug 3, 2020
810850a
Refactor status check
kidrahahjo Aug 3, 2020
2ff36c0
Style fix
kidrahahjo Aug 3, 2020
cc3d42a
Style fix
kidrahahjo Aug 4, 2020
fe17853
Merge branch 'feature/enable-aggregate-status-logic' into feature/int…
kidrahahjo Aug 4, 2020
1f3c268
Refactor status check for deleted aggregates
kidrahahjo Aug 4, 2020
024921a
Add make_aggregates method for signac-examples support
kidrahahjo Aug 6, 2020
86d784f
Change message of progress bar
kidrahahjo Aug 6, 2020
e64c09c
Add aggregate CLA for status which tells whether to fetch previously …
kidrahahjo Aug 6, 2020
b2e21b7
Refactor all the jinja templates for status print to support aggregation
kidrahahjo Aug 8, 2020
4322323
raise ValueError if num <=0
kidrahahjo Aug 8, 2020
1d5f6cc
Changes made
kidrahahjo Aug 12, 2020
a9d5b1a
Resolve merge conflicts
kidrahahjo Aug 12, 2020
d74b3a0
Enable storing aggregate information while submission and fetch befor…
kidrahahjo Jul 29, 2020
bc75900
Make a method for fetching aggregates
kidrahahjo Aug 3, 2020
f05cb70
Refactor fetching aggregates
kidrahahjo Aug 3, 2020
6c3c21b
Style fix
kidrahahjo Aug 4, 2020
339345f
Apply suggestions from code review
kidrahahjo Aug 4, 2020
9bbbcfc
Merge remote-tracking branch 'origin/feature/enable-submission-aggreg…
kidrahahjo Aug 12, 2020
d5e558c
Add PersmissionError check
kidrahahjo Aug 12, 2020
6598004
Apply suggested changes
kidrahahjo Aug 13, 2020
49007b6
Merge feature/enable-aggregation
kidrahahjo Aug 13, 2020
66d78eb
Fixed _fn_stored method
kidrahahjo Aug 13, 2020
f196cdd
Refactor _fetch_status
kidrahahjo Aug 13, 2020
e0d1483
Update this branch and add tests in test_project.py
kidrahahjo Aug 14, 2020
cfbc133
Add template tests
kidrahahjo Aug 14, 2020
5881a69
Refactor templates
kidrahahjo Aug 14, 2020
07dd725
Remove template test for aggregates
kidrahahjo Aug 14, 2020
e1f815a
Apply suggested change
kidrahahjo Aug 14, 2020
4cbe754
Generate aggregates on initialization
kidrahahjo Aug 17, 2020
5f4f09d
Change expected number of submit steps
kidrahahjo Aug 17, 2020
63d6810
Refactor creation of aggregates
kidrahahjo Aug 17, 2020
cd3def4
Unable static aggregation and some refactoring
kidrahahjo Aug 18, 2020
ec8a04b
Minor refactoring, apply suggested changes
kidrahahjo Aug 18, 2020
d16edeb
Merge feature/enable-aggregate-status-logic, Refactor hashing, cmd ar…
kidrahahjo Aug 18, 2020
ef70af7
Fix execution command
kidrahahjo Aug 18, 2020
ea4bccb
Parallelize status update completely, apply suggested change
kidrahahjo Aug 19, 2020
a76c5b2
Merge #335 into this branch, and refactor
kidrahahjo Aug 20, 2020
8925ad4
Fix issue with dummy operations
kidrahahjo Aug 20, 2020
8d4313b
Implement by-job order option
kidrahahjo Aug 20, 2020
35b8bda
use get_id instead of id
kidrahahjo Aug 20, 2020
97714c9
Implement by-job order option
kidrahahjo Aug 20, 2020
fbb9e15
Apply suggested change of merging if else block
kidrahahjo Aug 20, 2020
92b425b
Apply suggestions from code review and revert the deprecation of get_…
kidrahahjo Aug 21, 2020
ad2da19
Change the --aggregate CLI option for status check to --orphan and me…
kidrahahjo Aug 21, 2020
c7e463d
Apply suggestions from code review
bdice Nov 23, 2020
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions flow/__init__.py
Original file line number Diff line number Diff line change
@@ -12,12 +12,15 @@
from . import scheduling
from . import errors
from . import testing
from .aggregate import Aggregate
from .aggregate import get_aggregate_id
from .project import IgnoreConditions
from .project import FlowProject
from .project import JobOperation
from .project import label
from .project import classlabel
from .project import staticlabel
from .project import make_aggregates
from .operations import cmd
from .operations import directives
from .operations import run
@@ -34,6 +37,9 @@


__all__ = [
'Aggregate',
'make_aggregates',
'get_aggregate_id',
'environment',
'scheduling',
'errors',
215 changes: 215 additions & 0 deletions flow/aggregate.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,215 @@
# Copyright (c) 2020 The Regents of the University of Michigan
# All rights reserved.
# This software is licensed under the BSD 3-Clause License.
from collections.abc import Iterable
from hashlib import md5
from itertools import groupby
from itertools import zip_longest
from tqdm import tqdm


class Aggregate:
"""Decorator for operation functions that are to be aggregated.

By default, if the aggregator parameter is not passed,
an aggregate of all jobs will be created.

.. code-block:: python

example_aggregate = Aggregate()
@example_aggregate
@FlowProject.operation
def foo(*jobs):
print(len(jobs))

:param aggregator:
Information describing how to aggregate jobs. Is a callable that
takes in a list of jobs and can return or yield subsets of jobs as
an iterable. The default behavior is creating a single aggregate
of all jobs
:type aggregator:
callable
:param sort:
Before aggregating, sort the jobs by a given statepoint parameter.
The default behavior is no sorting.
:type sort:
str or NoneType
:param reverse:
States if the jobs are to be sorted in reverse order.
The default value is False.
:type reverse:
bool
:param select:
Condition for filtering individual jobs. This is passed as the
callable argument to `filter`.
The default behavior is no filtering.
:type select:
callable or NoneType
"""

def __init__(self, aggregator=None, sort=None, reverse=False, select=None):
if aggregator is None:
def aggregator(jobs):
return [jobs]

if not callable(aggregator):
raise TypeError("Expected callable for aggregator, got {}"
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Use f-strings, here and below.

"".format(type(aggregator)))

if sort is not None and not isinstance(sort, str):
raise TypeError("Expected string sort parameter, got {}"
"".format(type(sort)))

if select is not None and not callable(select):
raise TypeError("Expected callable for select, got {}"
"".format(type(select)))

if getattr(aggregator, '_num', False):
self._is_aggregate = False if aggregator._num == 1 else True
else:
self._is_aggregate = True
Comment on lines +67 to +70
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is equivalent - please check it.

Suggested change
if getattr(aggregator, '_num', False):
self._is_aggregate = False if aggregator._num == 1 else True
else:
self._is_aggregate = True
self._is_aggregate = getattr(aggregator, '_num', 0) != 1


self._aggregator = aggregator
self._sort = sort
self._reverse = reverse
self._select = select

@classmethod
def groupsof(cls, num=1, sort=None, reverse=False, select=None):
# copied from: https://docs.python.org/3/library/itertools.html#itertools.zip_longest
try:
num = int(num)
if num <= 0:
raise ValueError('The num parameter should have a value greater than 0')
except TypeError:
raise TypeError('The num parameter should be an integer')

def aggregator(jobs):
args = [iter(jobs)] * num
return zip_longest(*args)
setattr(aggregator, '_num', num)
return cls(aggregator, sort, reverse, select)

@classmethod
def groupby(cls, key, default=None, sort=None, reverse=False, select=None):
Copy link
Member

@bdice bdice Aug 24, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Don't shadow sort and reverse functions.
edit: This is fine. See above.

if isinstance(key, str):
if default is None:
def keyfunction(job):
return job.sp[key]
else:
def keyfunction(job):
return job.sp.get(key, default)

elif isinstance(key, Iterable):
keys = list(key)

if default is None:
def keyfunction(job):
return [job.sp[key] for key in keys]
else:
if isinstance(default, Iterable):
if len(default) != len(keys):
raise ValueError("Expected length of default argument is {}, "
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Use f-strings here and below.

"got {}.".format(len(keys), len(default)))
else:
raise TypeError("Invalid default argument. Expected Iterable, "
"got {}".format(type(default)))

def keyfunction(job):
return [job.sp.get(key, default[i]) for i, key in enumerate(keys)]
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd rewrite this to use zip instead of enumerate. This might work:

Suggested change
return [job.sp.get(key, default[i]) for i, key in enumerate(keys)]
return [job.sp.get(key, default_value) for key, default_value in zip(keys, default)]


elif callable(key):
keyfunction = key

else:
raise TypeError("Invalid key argument. Expected either str, Iterable "
"or a callable, got {}".format(type(key)))

def aggregator(jobs):
for key, group in groupby(sorted(jobs, key=keyfunction), key=keyfunction):
yield group

return cls(aggregator, sort, reverse, select)

def _create_MakeAggregate(self):
return MakeAggregate(self._aggregator, self._sort, self._reverse, self._select)

def __call__(self, func=None):
if callable(func):
setattr(func, '_flow_aggregate', self._create_MakeAggregate())
return func
else:
raise TypeError('Invalid argument passed while calling '
'the aggregate instance. Expected a callable, '
'got {}.'.format(type(func)))


class MakeAggregate(Aggregate):
r"""This class handles the creation of aggregates.

.. note::
This class should not be instantiated by users directly.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This class should be private, according to this comment.

:param \*args:
Passed to the constructor of :py:class:`Aggregate`.
"""
def __init__(self, *args):
super(MakeAggregate, self).__init__(*args)

def __call__(self, obj, group_name='unknown-operation', project=None):
"Return aggregated jobs."
aggregated_jobs = list(obj)
if self._select is not None:
aggregated_jobs = list(filter(self._select, aggregated_jobs))
if self._sort is not None:
aggregated_jobs = list(sorted(aggregated_jobs,
key=lambda job: job.sp[self._sort],
reverse=bool(self._reverse)))

aggregated_jobs = self._aggregator([job for job in aggregated_jobs])
aggregated_jobs = self._create_nested_aggregate_list(aggregated_jobs, group_name, project)
if not len(aggregated_jobs):
return []
return aggregated_jobs

def _create_nested_aggregate_list(self, aggregated_jobs, group_name, project):
# This method converts the returned subset of jobs as an Iterable
# from an aggregator function to a subset of jobs as list.
aggregated_jobs = list(aggregated_jobs)
nested_aggregates = []

desc = f"Collecting aggregates for {group_name}"
for aggregate in tqdm(aggregated_jobs, total=len(aggregated_jobs),
desc=desc, leave=False):
try:
filter_aggregate = []
for job in aggregate:
if job is None:
continue
if project is not None:
if job not in project:
raise ValueError(f'The signac job {str(job)} not found in {project}')
filter_aggregate.append(job)
filter_aggregate = tuple(filter_aggregate)
if project is not None:
project._aggregates_ids[get_aggregate_id(filter_aggregate)] = \
filter_aggregate
nested_aggregates.append(filter_aggregate)
except Exception:
raise ValueError("Invalid aggregator function provided by "
"the user.")
return nested_aggregates


def get_aggregate_id(jobs):
"""Generate hashed id for an aggregate of jobs.

:param jobs:
The signac job handles
:type jobs:
tuple
"""
if len(jobs) == 1:
return str(jobs[0]) # Return job id as it's already unique

blob = ''.join((job.get_id() for job in jobs))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd prefer to separate the job ids with a comma for conceptual clarity. I know this isn't really "meaningful" since it's hashed, but I think it makes the purpose clearer.

Suggested change
blob = ''.join((job.get_id() for job in jobs))
blob = ','.join((job.get_id() for job in jobs))

return f'agg-{md5(blob.encode()).hexdigest()}'
Loading