-
Notifications
You must be signed in to change notification settings - Fork 423
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Parellel node execution in Graphs #704
Comments
These are my preliminary thoughts, so please bear with me 😂 do we need to define incoming edges too?I think so. Any node(s) that is/are a pre-condition for this current node should be registered on the graph so that we will know which nodes to wait for before we kick things off in the current node. So the types for such pre-condition nodes should be registered on the current node and we can parse it later when you are registering the node definitions and edges in nodes.py [1]. class BaseNode(ABC, Generic[StateT, DepsT, NodeRunEndT]):
"""Base class for a node."""
@abstractmethod
async def run(self, ctx: GraphRunContext[StateT, DepsT]) -> BaseNode[StateT, DepsT, Any] | End[NodeRunEndT] | list[type[BaseNode]]:
"""We might need to return a list of BaseNode types too if multiple nodes in parallel follow this node"""
pass
@classmethod
def get_precondition_nodes(cls) -> list[type[BaseNode]]:
"""Returns a set or list of nodes that must be completed before starting this node."""
pass
@classmethod
@cache
def get_id(cls) -> str:
pass
@classmethod
def get_note(cls) -> str | None:
pass
@classmethod
def get_node_def(cls, local_ns: dict[str, Any] | None) -> NodeDef[StateT, DepsT, NodeRunEndT]:
pass how do we know when to start nodes that rely on multiple nodes as a pre-condition to their kickoff?I think we could use asyncio.Event() to gather and wait for all my current node's pre-conditions to signal that they are done before I can proceed with this current node. how do we manage state updates with parallel nodes?I think this can be done by setting up a special type for state management scenarios that requires certain reducer() functions to be defined on the state class or annotations on specific fields that need cooperation with other nodes. Then the current node that use the reducer function to update the state (in today's current API). Currently in state.py [2] any data type is allowed including simple types like Strings and Booleans, but I think you might need to restrict this to a complex type for this to work. The reducer(s) can be defined as annotation metadata on the state object/class and we can use asyncio.Lock() primitives to prevent concurrent updates that can corrupt the state values from multiple tasks References: |
Follows #528.
It would be great to be able to run multiple nodes in parallel.
While starting two nodes is easy (basically just create multiple asyncio tasks), and defining multiple nodes to run is easily (return a list of nodes instead of a single node), some other stuff is hard:
The text was updated successfully, but these errors were encountered: