Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Parellel node execution in Graphs #704

Open
samuelcolvin opened this issue Jan 16, 2025 · 1 comment
Open

Parellel node execution in Graphs #704

samuelcolvin opened this issue Jan 16, 2025 · 1 comment
Labels
enhancement New feature or request graph

Comments

@samuelcolvin
Copy link
Member

Follows #528.

It would be great to be able to run multiple nodes in parallel.

While starting two nodes is easy (basically just create multiple asyncio tasks), and defining multiple nodes to run is easily (return a list of nodes instead of a single node), some other stuff is hard:

  • how do we manage recording state (see State persistence #695) when multiple nodes run in parallel?
  • how to we know when to start a node that relies on multiple other nodes completing?
  • currently nodes define just their outgoing edges, do we need nodes to define their incoming edges too?
@izzyacademy
Copy link
Contributor

izzyacademy commented Jan 18, 2025

These are my preliminary thoughts, so please bear with me 😂

do we need to define incoming edges too?

I think so. Any node(s) that is/are a pre-condition for this current node should be registered on the graph so that we will know which nodes to wait for before we kick things off in the current node. So the types for such pre-condition nodes should be registered on the current node and we can parse it later when you are registering the node definitions and edges in nodes.py [1].

class BaseNode(ABC, Generic[StateT, DepsT, NodeRunEndT]):
    """Base class for a node."""

    @abstractmethod
    async def run(self, ctx: GraphRunContext[StateT, DepsT]) -> BaseNode[StateT, DepsT, Any] | End[NodeRunEndT] | list[type[BaseNode]]:
     """We might need to return a list of BaseNode types too if multiple nodes in parallel follow this node"""
        pass

   @classmethod
    def get_precondition_nodes(cls) -> list[type[BaseNode]]:
    """Returns a set or list of nodes that must be completed before starting this node."""
      pass

    @classmethod
    @cache
    def get_id(cls) -> str:
       pass

    @classmethod
    def get_note(cls) -> str | None:
        pass

    @classmethod
    def get_node_def(cls, local_ns: dict[str, Any] | None) -> NodeDef[StateT, DepsT, NodeRunEndT]:
        pass

how do we know when to start nodes that rely on multiple nodes as a pre-condition to their kickoff?

I think we could use asyncio.Event() to gather and wait for all my current node's pre-conditions to signal that they are done before I can proceed with this current node.

how do we manage state updates with parallel nodes?

I think this can be done by setting up a special type for state management scenarios that requires certain reducer() functions to be defined on the state class or annotations on specific fields that need cooperation with other nodes. Then the current node that use the reducer function to update the state (in today's current API). Currently in state.py [2] any data type is allowed including simple types like Strings and Booleans, but I think you might need to restrict this to a complex type for this to work.

The reducer(s) can be defined as annotation metadata on the state object/class and we can use asyncio.Lock() primitives to prevent concurrent updates that can corrupt the state values from multiple tasks

References:
[1] https://github.com/pydantic/pydantic-ai/blob/main/pydantic_graph/pydantic_graph/nodes.py
[2] https://github.com/pydantic/pydantic-ai/blob/main/pydantic_graph/pydantic_graph/state.py

@sydney-runkle sydney-runkle added the enhancement New feature or request label Jan 23, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement New feature or request graph
Projects
None yet
Development

No branches or pull requests

3 participants