Skip to content

Commit

Permalink
gmxapi-221 Allow an array of plugins.
Browse files Browse the repository at this point in the history
If a tuple or list of dependencies is added to a work element, map the
context_rank to the list item when setting up the graph builders in
Context.__enter__

Fixes #221
  • Loading branch information
eirrgang committed Jun 24, 2019
1 parent 4848070 commit 9630656
Show file tree
Hide file tree
Showing 3 changed files with 105 additions and 14 deletions.
10 changes: 9 additions & 1 deletion src/gmx/context.py
Original file line number Diff line number Diff line change
Expand Up @@ -928,7 +928,15 @@ def __enter__(self):
# are a facility provided by the Context, in which case they may be member functions
# of the Context. We will probably need to pass at least some
# of the Session to the `launch()` method, though...
for name in element.depends:
dependencies = element.depends
for dependency in dependencies:
# If a dependency is a list, assume it is an "ensemble" of dependencies
# and pick the element for corresponding to the local rank.
if isinstance(dependency, (list, tuple)):
assert len(dependency) > context_rank
name = str(dependency[context_rank])
else:
name = dependency
logger.info("Subscribing {} to {}.".format(element.name, name))
builders[name].add_subscriber(new_builder)
builders[element.name] = new_builder
Expand Down
41 changes: 41 additions & 0 deletions src/gmx/test/test_pymd.py
Original file line number Diff line number Diff line change
Expand Up @@ -205,5 +205,46 @@ def test_plugin_with_ensemble(caplog):
session.run()


@pytest.mark.usefixtures("cleandir")
@pytest.mark.usefixtures("caplog")
@withmpi_only
def test_plugin_array(caplog):
# Test in ensemble.
md = gmx.workflow.from_tpr([tpr_filename, tpr_filename], threads_per_rank=1)

# Create a WorkElement for the potential
#potential = gmx.core.TestModule()
potential1 = gmx.workflow.WorkElement(namespace="testing", operation="create_test")
potential1.name = "test_module1"

potential2 = gmx.workflow.WorkElement(namespace="testing", operation="create_test")
potential2.name = "test_module2"

# before = md.workspec.elements[md.name]
# md.add_dependency(potential_element)
# assert potential_element.name in md.workspec.elements
# assert potential_element.workspec is md.workspec
# after = md.workspec.elements[md.name]
# assert not before is after

md.add_dependency([potential1, potential2])

# Workaround for https://github.com/kassonlab/gmxapi/issues/42
# We can't add an operation to a context that doesn't exist yet, but we can't
# add a work graph with an operation that is not defined in a context.
context = gmx.get_context()
context.add_operation(potential1.namespace, potential1.operation, my_plugin)
context.work = md

with warnings.catch_warnings():
# Swallow warning about wide MPI context
warnings.simplefilter("ignore")
with context as session:
if context.rank == 0:
print(context.work)
session.run()



if __name__ == '__main__':
unittest.main()
68 changes: 55 additions & 13 deletions src/gmx/workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -178,20 +178,54 @@ def _chase_deps(self, source_set, name_list):
times, giving us extra flexibility in implementation and arguments.
Args:
source_set: a copy of a set of element names (will be consumed during execution)
name_list: name list to be expanded with dependencies and sequenced
sources: a (super)set of element names from the current work spec (will be consumed)
name_list: subset of *sources* to be sequenced
Note that source_set is a reference to an object that is modified arbitrarily.
Returns:
Sequence of WorkElement objects drawn from the names in *source_set*
Requires that WorkElements named in *name_list* and any elements on which
they depend are all named in *source_list* and available in the current
work spec.
Note: *source_set* is a reference to an object that is modified arbitrarily.
The caller should not re-use the object after calling _chase_deps().
TODO: Separate out DAG topology operations from here and Context.__enter__()
Our needs are simple enough that we probably don't need an external dependency
like networkx...
"""
# Recursively (depth-first) generate a topologically valid serialized DAG from source_set.
assert isinstance(source_set, set)
# Warning: This is not at all a rigorous check.
# It is hard to check whether this is string-like or list-like in both Py 2.7 and 3.x
if not isinstance(name_list, (list, tuple, set)):
raise exceptions.ValueError('Must disambiguate "name_list" by passing a list or tuple.')
# Make a copy of name_list in case the input reference is being used elsewhere during
# iteration, such as for source_set, which is modified during the loop.
for name in tuple(name_list):
if name in source_set:
source_set.remove(name)
element = WorkElement.deserialize(self.elements[name], name=name, workspec=self)
for dep in self._chase_deps(source_set, element.depends):
yield dep
dependencies = element.depends
# items in element.depends are either element names or ensembles of element names.
for item in dependencies:
if isinstance(item, (list, tuple, set)):
dependency_list = item
else:
if not isinstance(item, str):
raise exceptions.ValueError(
'Dependencies should be a string or sequence of strings. Got {}'.format(type(item)))
dependency_list = [item]
for dependency in dependency_list:
for recursive_dep in self._chase_deps(source_set, (dependency,)):
yield recursive_dep
yield element
else:
# Note: The user is responsible for ensuring that source_set is complete.
# Otherwise, we would need to maintain a list of elements previously yielded.
pass


def __iter__(self):
source_set = set(self.elements.keys())
Expand Down Expand Up @@ -412,14 +446,22 @@ def add_dependency(self, element):
First move the provided element to the same WorkSpec, if not already here.
Then, add to ``depends`` and update the WorkSpec.
"""
if element.workspec is None:
self.workspec.add_element(element)
assert element.workspec is self.workspec
assert element.name in self.workspec.elements
elif element.workspec is not self.workspec:
raise exceptions.ApiError("Element will need to be moved to the same workspec.")

self.depends.append(element.name)
def check_element(element):
if element.workspec is None:
self.workspec.add_element(element)
assert element.workspec is self.workspec
assert element.name in self.workspec.elements
elif element.workspec is not self.workspec:
raise exceptions.ApiError("Element will need to be moved to the same workspec.")
return True

if hasattr(element, 'workspec') and hasattr(element, 'name'):
check_element(element)
self.depends.append(element.name)
else:
assert isinstance(element, (list, tuple))
self.depends.append(tuple([item.name for item in element if check_element(item)]))

self.workspec.elements[self.name] = self.serialize()

def serialize(self):
Expand Down

0 comments on commit 9630656

Please sign in to comment.