-
Notifications
You must be signed in to change notification settings - Fork 11
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Suggestions and issues for the workflow class #756
Comments
Thanks for the testing and feedback @JNmpi! Here's a summary of the TODO's I see until this issue can be closed:
This needs a clearer error message, but is behaving as expected. The issue is I'll leave this issue open until I patch in a cleared error message so users immediately know why this breaks, but right now I have no intention to change the behaviour. It might be possible to code it up so the class "intelligently" knows whether to parse
💯 The IO classes can and should have better human-readable representation. This should be straightforward to add.
Yep, good idea. Given our other recent discussion, I think this can instead return a
On We want such behaviour available for nodes that are super expensive (think VASP calculations), where we don't necessarily want the node re-running when updating the structure, again on kpoints, etc. However, I totally agree that the "fast" behaviour is what is intuitively expected! Over in #729 we reverse things so that the standard EDIT: I missed the IO formatting request on my first pass. It's absolutely a good idea though. |
Thanks, @liam for the fast response and the detailed explanations. Below are some thoughts and comments on the various issues:
|
@JNmpi, and thanks for getting back to me so fast! I like asychronous stuff, but fast-paced async is even better 😄 Initial input as args
Yeah, I totally agree with the sentiment. So the issue is that for, e.g., the base node class that wraps functions the initialization signature looks like this: def __init__(
self,
node_function: callable,
*output_labels: str,
label: Optional[str] = None,
run_on_updates: bool = True,
update_on_instantiation: bool = True,
channels_requiring_update_after_run: Optional[list[str]] = None,
parent: Optional[Composite] = None,
**kwargs,
): Here, the Of course, we could take our "mandatory" input, make it a keyword argument, e.g. This is all further complicated when we think about the nodes created by decorators. There, we accept the output labels and node function kwargs as arguments to the decorator, and return a dynamically defined class. When we instantiate that class, we override some of its arguments, but additional positional arguments are still interpreted as output labels (hence the error message from your example where the code complains that the number of output labels doesn't match the number of returned values). Here's the decorator code for convenience to see in detail the implementation I'm talking about: def function_node(*output_labels: str, **node_class_kwargs):
"""
A decorator for dynamically creating node classes from functions.
Decorates a function.
Takes an output label for each returned value of the function.
Returns a `Function` subclass whose name is the camel-case version of the function node,
and whose signature is modified to exclude the node function and output labels
(which are explicitly defined in the process of using the decorator).
"""
def as_node(node_function: callable):
return type(
node_function.__name__.title().replace("_", ""), # fnc_name to CamelCase
(Function,), # Define parentage
{
"__init__": partialmethod(
Function.__init__,
node_function,
*output_labels,
**node_class_kwargs,
)
},
)
return as_node I do really like your idea of making input positional-arg-accessible, it's just that at this exact moment the implementation cost seems very high. @samwaseda had a really nice idea for automatically extracting output labels; if/when we get this implemented, it makes much more sense to move Push vs pull workflows
Ahhh, ok! Unfortunately, to the best of my understanding, this is incompatible with cyclic graphs. We could imagine supporting both modes, with "pull" only being eligible for DAGs -- ryven supports both modes, although few or perhaps no others do. IMO, it's not worth the extra implementation headache to support both. I have zero expectation that you've read this -- it's not even merged yet! -- but in the spec for workflows, I've claimed that something one of our differentiating features is ground-up support for cyclic graphs. Dask graphs, in comparison, offer "directed acyclic graph of tasks with data dependencies". With the disclaimer that I may simply not be sufficiently imaginative or clever, I believe that writing cycling workflows, e.g. with a "while loop", is not only impossible in a pull-mode, but also requires execution dependencies and not just data dependencies -- and I don't understand how one would implement a pull-mode for execution dependencies. To make it concrete, suppose we wanted a graph that encodes "generate a random number between 0 and 20; while this number is <10, keep generating new random numbers; finally, take the square root of this number". Below is a minimal push-based, "execution dependency", So I really don't have any intrinsic objection to a pull-mode, but I absolutely want to support cyclic graphs and am not smart enough to see a way to both at once. I'm open to conditional dual-support, but wouldn't want to implement that until we're really happy with the cyclic-supporting push-mode.
I 100% agree we want some sort of single-button "run it all" access exposed to users. There is definitely some tension here, as simply making all nodes delayed by default conflicts with your user frustration in #756. Ultimately there may be some element of "no free lunch", but -- unlike the cyclic-pull-graphs -- I don't see any deep and fundamental barrier here. We just need to play around and come up with more and more clever UI tricks and interfaces. Certainly the current implementation falls flat on its face here, as |
Thank you very much for all these asynchronous discussions in a very well expressed form! I enjoy reading these discussions and I get an idea of what is happening! :)
That sounds really like the way to go for me, populate a workflow object with all the nodes. In the best case I would like to support even something like (Sorry for not being able to dig out the correct/available syntax here!) wf = Workflow()
@node('y')
def identity(x: int=0) -> int:
return x
node_identity = identity(0)
@node("p1", "m1")
def my_mwe_node(
x: int | float, y: int | float) -> tuple[int | float, int | float]:
return x+1, y-1
node_instance = my_mwe_node(x=0)
node_instance.inputs.y = node_identity.outputs.y
wf = Workflow([node_identity,node_instance])
wf.run(1, 2) to populate the missing input fields ( |
🥳
I hadn't thought of (optionally!) passing input data to the However, at this stage it gets reeeaaally hard to do it with |
Yes, I thought similar, however would not a |
Indeed, |
Thanks, @liamhuber and @niklassiemer. A quick thought regarding the last item. The syntax |
Some (super programmatic) thoughts regarding push vs pull workflows. The reason we want to have delayed execution is to avoid for time-consuming executions such as DFT runs that changing the input is going to immediately start execution, even if the changes are not complete. If we call run on a top node or the workflow we know that all input changes have been finalised and that the computation should run with exactly these input parameters. My suggestion would be therefore to temporarily change the status of all nodes below the top node is changed from delayed to running instantaneously. This approach should also work for cyclic workflows since in the running mode each node is getting this status. Once the workflow calculation is finished the status is reverted to its original stage. Of course, to prevent undesired side effects the workflow/nodes should be locked, i.e., they should raise an error when trying to modify its/their input. Since we know that the present workflow implementation works when being in push mode, introducing such a temporary mode switch should work right out of the box. |
Also some thoughts regarding the issue Initial input as args: A possible solution may be to rewrite the decorator to convert the function into a class. This is what I did in my code example when discussing the conversion of nodes into files and vice versa. You may have a look into it. Once I have a class I can introduce my own call definition and I should be free to use the various arguments from the node and the decorator in whatever way we want. Using this freedom I expect that it becomes possible to realize the desired behavior. |
Some notes following a quick synchronous discussion between me and @JNmpi:
I share this concern. Something like using double underscores as suggested is indeed a good first-step to solving it.
We have a couple tools for this:
We talked through the "while" schematic above, as well as a hypothetical macro where a DFT node gets its I'll try to get the schematic example running before Monday -- it just requires defining an
This was a misunderstanding on my part. I was confused because our decorators do return a class! Joerg clarified his code returns a class instance. He's spot-on that we can simply modify |
@JNmpi I made a demo that implements the sketch in the comment above. It runs on the latest The setup looks like this: # Node definitions
# This is pretty verbose, but just because I want print statements and
# because there are no flow-control classes implemented in the library yet
# So in real cases these would most be imports from node packages
# I'm glad we've got a big enough toolbox to do this type of development
# right in the notebook though!
@Workflow.wrap_as.single_value_node("rand")
def numpy_randint(low=0, high=20):
rand = np.random.randint(low=low, high=high)
print(f"Generating random number between {low} and {high}...{rand}!")
return rand
class GreaterThanLimitSwitch(Function):
"""
A switch class for sending signal output depending on a '>' check
applied to input
"""
def __init__(self, **kwargs):
super().__init__(self.greater_than, "value_gt_limit", **kwargs)
self.signals.output.true = OutputSignal("true", self)
self.signals.output.false = OutputSignal("false", self)
@staticmethod
def greater_than(value, limit=10):
return value > limit
def process_run_result(self, function_output):
"""
Process the output as usual, then fire signals accordingly.
"""
super().process_run_result(function_output)
if self.outputs.value_gt_limit.value:
print(f"{self.inputs.value.value} > {self.inputs.limit.value}")
self.signals.output.true()
else:
print(f"{self.inputs.value.value} <= {self.inputs.limit.value}")
self.signals.output.false()
@Workflow.wrap_as.single_value_node("sqrt")
def numpy_sqrt(value=0):
sqrt = np.sqrt(value)
print(f"sqrt({value}) = {sqrt}")
return sqrt
# Graph definition
# Including data flow
# To handle execution control for cyclic flows we turn off some of the
# automated running
# I also stop the headmost node from running at instantiation,
# but that's for output prettiness, not functionality
wf = Workflow("rand_until_big_then_sqrt")
wf.rand = numpy_randint(update_on_instantiation=False)
wf.gt_switch = GreaterThanLimitSwitch(run_on_updates=False)
wf.gt_switch.inputs.value = wf.rand
wf.sqrt = numpy_sqrt(run_on_updates=False)
wf.sqrt.inputs.value = wf.rand
# Finally, define the flow control
# To avoid a race condition, we let rand push out its data,
# _then_ trigger the switch with signal control
wf.gt_switch.signals.input.run = wf.rand.signals.output.ran
wf.sqrt.signals.input.run = wf.gt_switch.signals.output.true
wf.rand.signals.input.run = wf.gt_switch.signals.output.false This is a bit long, but without the comments and if you assume the individual nodes are part of a node package somewhere and compress it down, it's just seven lines: wf = Workflow("rand_until_big_then_sqrt")
wf.rand = numpy_randint(update_on_instantiation=False)
wf.gt_switch = GreaterThanLimitSwitch(run_on_updates=False, value=wf.rand)
wf.sqrt = numpy_sqrt(run_on_updates=False, value=wf.rand)
wf.gt_switch.signals.input.run = wf.rand.signals.output.ran
wf.sqrt.signals.input.run = wf.gt_switch.signals.output.true
wf.rand.signals.input.run = wf.gt_switch.signals.output.false Ideally we want to call wf.rand.update()
print(wf.sqrt) We can repeat those two lines as many times as we like to get output like:
I'm a little sad that we need to think about race conditions, but by setting it in #761 so that data gets pushed first, then signals get fired, we've got all the necessary tools to make sure everyone has the data they need before they run. There's lots of room for improvement, but the idea is working! |
Thanks, @liam, for implementing the example so quickly. Inspired by it I played a bit with workflow concepts and came up with a solution that does not require users to explicitly set signals. It is completely based on connecting data input and output. It is only a simple demonstrator (no decorators, shortcomings in the syntax etc.) but should be sufficient to highlight the concept. I have a couple of ideas to extend it but to start the discussion I attach it below. I provide two examples: A generic one and one which reproduces your workflow above. Class definitionclass Node:
def __init__(self, name, dependencies=None):
self.name = name
self.dependencies = dependencies if dependencies else []
self.result = None
def add_dependency(self, dependency):
self.dependencies.append(dependency)
def process(self):
# Override this method in derived classes
return self.result
def pull(self):
self.process()
return
# delete above two lines to get back to original code
if self.result is None:
for dependency in self.dependencies:
dependency.pull()
self.process()
class Workflow:
def __init__(self):
self.nodes = []
def add_node(self, node):
self.nodes.append(node)
def run(self):
# TODO: use user input or graph analysis to set output
# The following is a very pragmatic and simple solution
top_node_index = -1
top_node = self.nodes[top_node_index]
top_node.pull()
# for node in self.nodes:
# node.pull()
self.output = top_node.result
return self.output
def visualize(self):
from graphviz import Digraph
from IPython.display import display
dot = Digraph(comment='Workflow')
# Add nodes to the graph
for node in self.nodes:
dot.node(node.name, node.name)
# Add edges for dependencies
for node in self.nodes:
for dependency in node.dependencies:
dot.edge(dependency.name, node.name)
# Display the graph
display(dot) Example: 1# Example usage:
class AddNode(Node):
def __init__(self, name, a, b):
super().__init__(name, dependencies=[a, b])
self.a = a
self.b = b
def process(self):
self.result = self.a.process() + self.b.process()
return self.result
class MultiplyNode(Node):
def __init__(self, name, a, b):
super().__init__(name, dependencies=[a, b])
self.a = a
self.b = b
def process(self):
self.result = self.a.process() * self.b.process()
return self.result
# Creating nodes
a = Node("a")
b = Node("b")
add_node = AddNode("add", a, b)
multiply_node = MultiplyNode("multiply", add_node, b)
# Setting initial values
a.result = 3
b.result = 4
# Creating a workflow and adding nodes
workflow = Workflow()
workflow.add_node(a)
workflow.add_node(b)
workflow.add_node(add_node)
workflow.add_node(multiply_node)
# Running the workflow
workflow.run()
workflow.visualize() Liam's example# Liam's example:
class Random(Node):
def __init__(self, name, low=0, high=20):
super().__init__(name, dependencies=[])
self.low = low
self.high = high
def process(self):
import numpy as np
rand = np.random.randint(low=self.low, high=self.high)
print(f"Generating random number between {self.low} and {self.high}...{rand}!")
self.result = rand
return self.result
class GreaterThanLimitSwitch(Node):
def __init__(self, name, rand, threshold=10, max_attempts=100):
super().__init__(name, dependencies=[rand])
self.rand = rand
self.threshold = threshold
self.max_attempts = max_attempts
def process(self):
print (f'compute {self.name}')
for i in range(self.max_attempts):
r = self.rand.process()
if r > self.threshold:
self.result = r
return self.result
raise ValueError(f'Threshold={self.threshold} not exceeded within max_attempts={self.max_attempts} steps')
class SquareRoot(Node):
def __init__(self, name, arg):
super().__init__(name, dependencies=[arg])
self.arg = arg
def process(self):
import numpy as np
print (f'compute {self.name}')
arg = self.arg.process()
self.result = np.sqrt(arg)
print(f"sqrt({arg}) = {self.result}")
return self.result rand_node = Random('rand')
switch_node = GreaterThanLimitSwitch('switch', rand_node)
sqrt_node = SquareRoot('sqrt', switch_node)
# Creating a workflow and adding nodes
workflow = Workflow()
workflow.add_node(rand_node)
workflow.add_node(switch_node)
workflow.add_node(sqrt_node)
workflow.visualize() workflow.run()
|
@JNmpi, nice, I like defining a "dependencies" list for pull modes. However, I don't think it's the pull mode that is making the it possible to avoid using signals. Rather the two big differences I see are that (a) one of the nodes takes a node instance as input -- I think we eventually want to allow this type of behavior, but I think it represents a more complex concept of a "meta node"; certainly it is certainly harder to make a GUI for than "regular" -- and (b) is that there is explicitly a I believe we can easily have a similar "data only" push-mode approach with the existing architecture if we allow the same features (node instance as input and internal for loop). I agree that going forward we will want to allow for such meta nodes that allow other nodes as input, but I also really like that it's possible to avoid this with the signals architecture. |
@liamhuber, thanks for your super-fast reply. It would be good to have a more in-depth discussion via Zoom. Just a few quick thoughts/ideas regarding your comments:
|
Hi @JNmpi, We can absolutely talk about it in more detail on Monday. I think I can offer a quick explanation here to get that ball rolling though: My serious concern is how this implementation destroys idempotency. In this particular example it's totally benign, because I think there is room for this type of behaviour, where the accepting node either makes a copy or just accepts a class instead of an instance to begin with -- then we can internally have for loops that are either creating new sub-graphs (e.g. for parallel computation) or doing something extremely similar to here, but now on totally internally owned data (copies of the passed node) such that we don't worry about modification of externally used data. It's also possible we may find a solution that uses a raw node instance as here, but somehow manages to otherwise control and constrain the side-effects of being non-idempotent. I also didn't mean to imply that we should never have for-loops inside node functions, just that I was very happy to be able to make a loop over a node purely at the graph level, without having to write a for loop. |
Thanks, @liam. We can talk, e.g., tomorrow after the pyiron meeting. I fully agree with your concern regarding idempotency. I had thought about it and see a couple of solutions, which we can discuss tomorrow. While I am fully open and happy to use explicit signals to realize constructs like for-loops, I see the potential pitfalls of the proposed concept but also how things can get much simpler. The concept reminds me a lot on the yield statement in Python, which cleanly separates the generation of new list items from the execution of the for-loop. This is one of the features I really like in Python. Let's talk one of the next few days more about these issues. I am looking already forward to it. |
Please stop using @liam. It doesn't inform the person you want.
…On Sun, 9 Jul 2023, 19:22 JNmpi, ***@***.***> wrote:
Thanks, @liam <https://github.com/liam>. We can talk, e.g., tomorrow
after the pyiron meeting.
I fully agree with your concern regarding idempotency. I had thought about
it and see a couple of solutions, which we can discuss tomorrow. While I am
fully open and happy to use explicit signals to realize constructs like
for-loops, I see the potential pitfalls of the proposed concept but also
how things can get much simpler. The concept reminds me a lot on the yield
statement in Python, which cleanly separates the generation of new list
items from the execution of the for-loop. This is one of the features I
really like in Python. Let's talk one of the next few days more about these
issues. I am looking already forward to it.
—
Reply to this email directly, view it on GitHub
<#756 (comment)>,
or unsubscribe
<https://github.com/notifications/unsubscribe-auth/AAAGRFBF6U45FDBJBDKL7RDXPLZFHANCNFSM6AAAAAAZ2IUODE>
.
You are receiving this because you were mentioned.Message ID:
***@***.***>
|
@liamhuber, do you have time for a Zoom meeting now? (my other meeting is already over) |
Yep, getting a coffee refill and brt |
Shall we take the pyiron Zoom link? |
@liamhuber, thanks for all the new features and enhancements for the workflow class. In the following, I list some issues and suggestions regarding the workflow class. For the following jupyter notebook I used the main branch. Thus, some of the issues may have been fixed in the latest branch.
Using the decorator for a single output variable fails
The following construction without a decorator works as expected
Some suggestions for making working with the nodes more convenient
Return the available input channels (same for outputs):
Return the output (e.g. node.outputs.to_value_dict()), similar like dask.compute() when performing run:
Connecting two nodes
The following code should 'see' that the input of the first node has been updated (i.e., should print 5 rather than 10):
To get the expected behavior I have to manually run the first node (which is not what I would like to have for a delayed operation):
The text was updated successfully, but these errors were encountered: