From 963065646306a4bef493de611da9200eff8395e1 Mon Sep 17 00:00:00 2001 From: "M. Eric Irrgang" Date: Mon, 24 Jun 2019 11:39:57 +0300 Subject: [PATCH] gmxapi-221 Allow an array of plugins. If a tuple or list of dependencies is added to a work element, map the context_rank to the list item when setting up the graph builders in Context.__enter__ Fixes kassonlab#221 --- src/gmx/context.py | 10 +++++- src/gmx/test/test_pymd.py | 41 +++++++++++++++++++++++ src/gmx/workflow.py | 68 +++++++++++++++++++++++++++++++-------- 3 files changed, 105 insertions(+), 14 deletions(-) diff --git a/src/gmx/context.py b/src/gmx/context.py index 3879a61bb2..700e3991cf 100644 --- a/src/gmx/context.py +++ b/src/gmx/context.py @@ -928,7 +928,15 @@ def __enter__(self): # are a facility provided by the Context, in which case they may be member functions # of the Context. We will probably need to pass at least some # of the Session to the `launch()` method, though... - for name in element.depends: + dependencies = element.depends + for dependency in dependencies: + # If a dependency is a list, assume it is an "ensemble" of dependencies + # and pick the element for corresponding to the local rank. + if isinstance(dependency, (list, tuple)): + assert len(dependency) > context_rank + name = str(dependency[context_rank]) + else: + name = dependency logger.info("Subscribing {} to {}.".format(element.name, name)) builders[name].add_subscriber(new_builder) builders[element.name] = new_builder diff --git a/src/gmx/test/test_pymd.py b/src/gmx/test/test_pymd.py index f10520e1a8..43b974cf91 100644 --- a/src/gmx/test/test_pymd.py +++ b/src/gmx/test/test_pymd.py @@ -205,5 +205,46 @@ def test_plugin_with_ensemble(caplog): session.run() +@pytest.mark.usefixtures("cleandir") +@pytest.mark.usefixtures("caplog") +@withmpi_only +def test_plugin_array(caplog): + # Test in ensemble. + md = gmx.workflow.from_tpr([tpr_filename, tpr_filename], threads_per_rank=1) + + # Create a WorkElement for the potential + #potential = gmx.core.TestModule() + potential1 = gmx.workflow.WorkElement(namespace="testing", operation="create_test") + potential1.name = "test_module1" + + potential2 = gmx.workflow.WorkElement(namespace="testing", operation="create_test") + potential2.name = "test_module2" + + # before = md.workspec.elements[md.name] + # md.add_dependency(potential_element) + # assert potential_element.name in md.workspec.elements + # assert potential_element.workspec is md.workspec + # after = md.workspec.elements[md.name] + # assert not before is after + + md.add_dependency([potential1, potential2]) + + # Workaround for https://github.com/kassonlab/gmxapi/issues/42 + # We can't add an operation to a context that doesn't exist yet, but we can't + # add a work graph with an operation that is not defined in a context. + context = gmx.get_context() + context.add_operation(potential1.namespace, potential1.operation, my_plugin) + context.work = md + + with warnings.catch_warnings(): + # Swallow warning about wide MPI context + warnings.simplefilter("ignore") + with context as session: + if context.rank == 0: + print(context.work) + session.run() + + + if __name__ == '__main__': unittest.main() diff --git a/src/gmx/workflow.py b/src/gmx/workflow.py index c4fa7a92f5..871184228d 100644 --- a/src/gmx/workflow.py +++ b/src/gmx/workflow.py @@ -178,20 +178,54 @@ def _chase_deps(self, source_set, name_list): times, giving us extra flexibility in implementation and arguments. Args: - source_set: a copy of a set of element names (will be consumed during execution) - name_list: name list to be expanded with dependencies and sequenced + sources: a (super)set of element names from the current work spec (will be consumed) + name_list: subset of *sources* to be sequenced - Note that source_set is a reference to an object that is modified arbitrarily. + Returns: + Sequence of WorkElement objects drawn from the names in *source_set* + + Requires that WorkElements named in *name_list* and any elements on which + they depend are all named in *source_list* and available in the current + work spec. + Note: *source_set* is a reference to an object that is modified arbitrarily. + The caller should not re-use the object after calling _chase_deps(). + + TODO: Separate out DAG topology operations from here and Context.__enter__() + Our needs are simple enough that we probably don't need an external dependency + like networkx... """ + # Recursively (depth-first) generate a topologically valid serialized DAG from source_set. assert isinstance(source_set, set) + # Warning: This is not at all a rigorous check. + # It is hard to check whether this is string-like or list-like in both Py 2.7 and 3.x + if not isinstance(name_list, (list, tuple, set)): + raise exceptions.ValueError('Must disambiguate "name_list" by passing a list or tuple.') + # Make a copy of name_list in case the input reference is being used elsewhere during + # iteration, such as for source_set, which is modified during the loop. for name in tuple(name_list): if name in source_set: source_set.remove(name) element = WorkElement.deserialize(self.elements[name], name=name, workspec=self) - for dep in self._chase_deps(source_set, element.depends): - yield dep + dependencies = element.depends + # items in element.depends are either element names or ensembles of element names. + for item in dependencies: + if isinstance(item, (list, tuple, set)): + dependency_list = item + else: + if not isinstance(item, str): + raise exceptions.ValueError( + 'Dependencies should be a string or sequence of strings. Got {}'.format(type(item))) + dependency_list = [item] + for dependency in dependency_list: + for recursive_dep in self._chase_deps(source_set, (dependency,)): + yield recursive_dep yield element + else: + # Note: The user is responsible for ensuring that source_set is complete. + # Otherwise, we would need to maintain a list of elements previously yielded. + pass + def __iter__(self): source_set = set(self.elements.keys()) @@ -412,14 +446,22 @@ def add_dependency(self, element): First move the provided element to the same WorkSpec, if not already here. Then, add to ``depends`` and update the WorkSpec. """ - if element.workspec is None: - self.workspec.add_element(element) - assert element.workspec is self.workspec - assert element.name in self.workspec.elements - elif element.workspec is not self.workspec: - raise exceptions.ApiError("Element will need to be moved to the same workspec.") - - self.depends.append(element.name) + def check_element(element): + if element.workspec is None: + self.workspec.add_element(element) + assert element.workspec is self.workspec + assert element.name in self.workspec.elements + elif element.workspec is not self.workspec: + raise exceptions.ApiError("Element will need to be moved to the same workspec.") + return True + + if hasattr(element, 'workspec') and hasattr(element, 'name'): + check_element(element) + self.depends.append(element.name) + else: + assert isinstance(element, (list, tuple)) + self.depends.append(tuple([item.name for item in element if check_element(item)])) + self.workspec.elements[self.name] = self.serialize() def serialize(self):