From 0f564240b7bea0630f1951ce0df91ce1b084d147 Mon Sep 17 00:00:00 2001 From: Rui Qiao Date: Thu, 23 Jan 2025 02:28:17 +0000 Subject: [PATCH 01/29] wip Signed-off-by: Rui Qiao --- python/ray/dag/compiled_dag_node.py | 270 +++++++++++++++------------- python/ray/dag/dag_node.py | 8 + 2 files changed, 154 insertions(+), 124 deletions(-) diff --git a/python/ray/dag/compiled_dag_node.py b/python/ray/dag/compiled_dag_node.py index 122a1921af2c9..ecdc9154186d2 100644 --- a/python/ray/dag/compiled_dag_node.py +++ b/python/ray/dag/compiled_dag_node.py @@ -798,6 +798,7 @@ def __init__( enable_asyncio: bool = False, max_inflight_executions: Optional[int] = None, overlap_gpu_communication: Optional[bool] = None, + default_communicator: Optional[Union[Communicator, str]] = None, ): """ Args: @@ -822,6 +823,12 @@ def __init__( communication and computation can be overlapped, which can improve the performance of the DAG execution. If None, the default value will be used. + default_communicator: The default communicator to use to transport tensors + for nodes annotated with `with_tensor_transport()` and when shared memory + is not the desired option (e.g., when transport="nccl", or when + transport="auto" for communication between two different GPUs). + If it is "create", a default communicator is created when needed. + If None, an error will be thrown. All other values are invalid. Returns: Channel: A wrapper around ray.ObjectRef. @@ -846,6 +853,17 @@ def __init__( self._overlap_gpu_communication: Optional[bool] = overlap_gpu_communication if self._overlap_gpu_communication is None: self._overlap_gpu_communication = ctx.overlap_gpu_communication + self._create_default_communicator = False + if isinstance(default_communicator, str): + if default_communicator == "create": + self._create_default_communicator = True + default_communicator = None + else: + raise ValueError( + "The only allowed string for default_communicator is 'create', " + f"got {default_communicator}" + ) + self._default_communicator: Optional[Communicator] = default_communicator self._default_type_hint: ChannelOutputType = SharedMemoryType( buffer_size_bytes=self._buffer_size_bytes, @@ -1006,6 +1024,9 @@ def _preprocess(self) -> None: self.input_task_idx, self.output_task_idx = None, None nccl_actors_p2p: Set["ray.actor.ActorHandle"] = set() + communicator_to_actors: Dict[ + Optional[Communicator], Set["ray.actor.ActorHandle"] + ] = {} collective_ops: Set[_CollectiveOperation] = set() input_attributes: Set[str] = set() @@ -1098,34 +1119,8 @@ def _preprocess(self) -> None: # Collect actors for NCCL P2P methods. if dag_node.type_hint.requires_nccl(): - nccl_actors_p2p.add(actor_handle) - custom_communicator = dag_node.type_hint.get_custom_communicator() - mixed_nccl_group_error_message = ( - "Compiled Graphs do not support mixed usage of " - "type hints of default NCCL group " - '(i.e., TorchTensor(transport="nccl"))' - "and custom NCCL group " - "(i.e., TorchTensor(transport=nccl_group)). " - "Please check all the TorchTensor type hints and " - "make sure only one type of NCCL transport is specified." - ) - if custom_communicator is None: - if self._custom_communicator_p2p is not None: - raise ValueError(mixed_nccl_group_error_message) - self._use_default_nccl_group = True - else: - if self._use_default_nccl_group: - raise ValueError(mixed_nccl_group_error_message) - if self._custom_communicator_p2p is not None: - if self._custom_communicator_p2p != custom_communicator: - raise ValueError( - "Compiled Graphs currently only support " - "a single custom NCCL group, but multiple " - "have been specified. Check all the " - "TorchTensor(transport=nccl_group) type hints " - "to make sure only one NCCL group is used." - ) - self._custom_communicator_p2p = custom_communicator + communicator = self._select_communicator(dag_node) + communicator_to_actors[communicator].add(actor_handle) # Collect NCCL collective operations. if isinstance(dag_node, CollectiveOutputNode): @@ -1213,61 +1208,39 @@ def _preprocess(self) -> None: upstream_task.downstream_task_idxs[task_idx] = downstream_actor_handle if upstream_task.dag_node.type_hint.requires_nccl(): - # Add all readers to the NCCL actors of P2P. - nccl_actors_p2p.add(downstream_actor_handle) + communicator = self._select_communicator(upstream_task.dag_node) + communicator_to_actors[communicator].add(downstream_actor_handle) # Check that all specified input attributes, e.g., InputNode()["x"], # are used in the DAG. _check_unused_dag_input_attributes(output_node, input_attributes) - # Collect all leaf nodes. - leaf_nodes: DAGNode = [] - for idx, task in self.idx_to_task.items(): - if not isinstance(task.dag_node, ClassMethodNode): - continue - if ( - len(task.downstream_task_idxs) == 0 - and not task.dag_node.is_cgraph_output_node - ): - leaf_nodes.append(task.dag_node) - # Leaf nodes are not allowed because the exception thrown by the leaf - # node will not be propagated to the driver. - if len(leaf_nodes) != 0: - raise ValueError( - "Compiled DAG doesn't support leaf nodes, i.e., nodes that don't have " - "downstream nodes and are not output nodes. There are " - f"{len(leaf_nodes)} leaf nodes in the DAG. Please add the outputs of " - f"{[leaf_node.get_method_name() for leaf_node in leaf_nodes]} to the " - f"the MultiOutputNode." - ) + self._check_leaf_nodes() - type_hint_resolver = TypeHintResolver(self.actor_to_gpu_ids) - # Resolve AutoChannelType type hints and track the actors that use NCCL. - # This is needed so that the NCCL group can be initialized for these - # actors that use NCCL. - for task in auto_transport_tasks: - writer = task.dag_node._get_actor_handle() - readers = task.downstream_task_idxs.values() - writer_and_node = (writer, self._get_node_id(writer)) - reader_and_node_list = [ - (reader, self._get_node_id(reader)) for reader in readers - ] - # Update the type hint to the resolved one. This is needed because - # the resolved type hint's `register_custom_serializer` will be called - # in preparation for channel I/O. - task.dag_node.type_hint = type_hint_resolver.resolve( - task.dag_node.type_hint, - writer_and_node, - reader_and_node_list, - ) - if task.dag_node.type_hint.requires_nccl(): - nccl_actors_p2p.add(writer) - nccl_actors_p2p.update(readers) + self._resolve_auto_transport(auto_transport_tasks, communicator_to_actors) - nccl_actors_p2p = list(nccl_actors_p2p) - if None in nccl_actors_p2p: - raise ValueError("Driver cannot participate in the NCCL group.") + self._init_communicators(communicator_to_actors, collective_ops) + if direct_input: + self._input_num_positional_args = 1 + elif not input_positional_args: + self._input_num_positional_args = 0 + else: + self._input_num_positional_args = max(input_positional_args) + 1 + self._input_kwargs = tuple(input_kwargs) + + def _init_communicators( + self, + communicator_to_actors: Dict[ + Optional[Communicator], Set["ray.actor.ActorHandle"] + ], + collective_ops: Set[ + "ray.experimental.channel.collective_node._CollectiveOperation" + ], + ) -> None: + """ + Initialize communicators for the DAG. + """ # Initialize and cache a NCCL group for each custom NCCL group. All the # custom NCCL groups are initialized before the default NCCL groups. custom_communicator_to_id: Dict[Communicator, str] = {} @@ -1275,26 +1248,19 @@ def _preprocess(self) -> None: # can perform P2P send/recv and collective operations. If there are multiple # custom NCCL groups for a set of actors, only one is cached. actors_to_communicator_id: Dict[FrozenSet["ray.actor.ActorHandle"], str] = {} + for custom_communicator, actors in communicator_to_actors.items(): + if None in actors: + raise ValueError("Driver cannot participate in the NCCL group.") - # If a custom NCCL group is specified for P2P actors, initialize and cache - # the NCCL group ID. - if nccl_actors_p2p and self._custom_communicator_p2p: - if not set(nccl_actors_p2p).issubset( - set(self._custom_communicator_p2p.get_actor_handles()) - ): - raise ValueError( - "Expected P2P actor handles to be a subset of the custom NCCL group" - ) - self._communicator_id_p2p = _init_communicator( - nccl_actors_p2p, - self._custom_communicator_p2p, + communicator_id = _init_communicator( + actors, + custom_communicator, self._overlap_gpu_communication, ) - custom_communicator_to_id[ - self._custom_communicator_p2p - ] = self._communicator_id_p2p - actors = frozenset(nccl_actors_p2p) - actors_to_communicator_id[actors] = self._communicator_id_p2p + custom_communicator_to_id[custom_communicator] = communicator_id + actors = frozenset(actors) + if actors not in actors_to_communicator_id: + actors_to_communicator_id[actors] = communicator_id # If a custom communicator is specified for collective actors, initialize and # cache the communicator ID. @@ -1310,43 +1276,97 @@ def _preprocess(self) -> None: if actors not in actors_to_communicator_id: actors_to_communicator_id[actors] = communicator_id - # If a NCCL group for P2P actors is not initialized, initialize and cache - # the NCCL group ID. - if nccl_actors_p2p and self._communicator_id_p2p is None: - actors = frozenset(nccl_actors_p2p) - if actors in actors_to_communicator_id: - self._communicator_id_p2p = actors_to_communicator_id[actors] - else: - self._communicator_id_p2p = _init_communicator( - nccl_actors_p2p, - self._custom_communicator_p2p, - self._overlap_gpu_communication, + def _select_communicator( + self, dag_node: "ray.dag.DAGNode" + ) -> Optional[Communicator]: + """ + If custom_communicator is provided (i.e., not None), use it. + Otherwise, use the default communicator. + """ + custom_communicator = dag_node.type_hint.get_custom_communicator() + if custom_communicator is not None: + return custom_communicator + if not self._create_default_communicator: + if dag_node._original_type_hint is not None: + assert isinstance(dag_node._original_type_hint, AutoTransportType) + raise ValueError( + f"AutoTransportType is used for DAGNode {dag_node}, " + "This requires specifying a default communicator or 'create' for " + "default_communicator when calling experimental_compile()." ) - actors_to_communicator_id[actors] = self._communicator_id_p2p + raise ValueError( + f"DAGNode {dag_node} has no custom communicator specified. " + "Please specify a custom communicator for the DAGNode using" + "`with_tensor_transport()`, or specify a communicator or 'create' for " + "default_communicator when calling experimental_compile()." + ) + return self._default_communicator - # If a NCCL group for collective actors is not initialized, initialize and - # cache the NCCL group ID. - for collective_op in collective_ops: - if collective_op.type_hint.communicator_id is None: - actors = frozenset(collective_op.actor_handles) - communicator_id = collective_op.init_communicator( - actors_to_communicator_id.get(actors, None) - ) - if actors not in actors_to_communicator_id: - actors_to_communicator_id[actors] = communicator_id + def _resolve_auto_transport( + self, + auto_transport_tasks: Set["CompiledTask"], + communicator_to_actors: Dict[ + Optional[Communicator], Set["ray.actor.ActorHandle"] + ], + ) -> None: + """ + Resolve the auto transport type hint for the DAG. + """ + type_hint_resolver = TypeHintResolver(self.actor_to_gpu_ids) + # For AutoTransportType, there is no custom communicator provided. + # So call _select_communicator with None to get the default communicator, + # and also perform validation. + default_communicator = self._select_communicator(None) + # Resolve AutoChannelType type hints and track the actors that use NCCL. + # This is needed so that the NCCL group can be initialized for these + # actors that use NCCL. + for task in auto_transport_tasks: + writer = task.dag_node._get_actor_handle() + readers = task.downstream_task_idxs.values() + writer_and_node = (writer, self._get_node_id(writer)) + reader_and_node_list = [ + (reader, self._get_node_id(reader)) for reader in readers + ] + # Update the type hint to the resolved one. This is needed because + # the resolved type hint's `register_custom_serializer` will be called + # in preparation for channel I/O. + task.dag_node.type_hint = type_hint_resolver.resolve( + task.dag_node.type_hint, + writer_and_node, + reader_and_node_list, + ) + if task.dag_node.type_hint.requires_nccl(): + communicator_to_actors[default_communicator].add(writer) + communicator_to_actors[default_communicator].update(readers) - # Store all the NCCL group IDs for P2P send/recv and collective operations. - self._communicator_ids = set(actors_to_communicator_id.values()).union( - set(custom_communicator_to_id.values()) + def _check_leaf_nodes(self) -> None: + """ + Check if there are leaf nodes in the DAG and raise an error if there are. + """ + from ray.dag import ( + DAGNode, + ClassMethodNode, ) - if direct_input: - self._input_num_positional_args = 1 - elif not input_positional_args: - self._input_num_positional_args = 0 - else: - self._input_num_positional_args = max(input_positional_args) + 1 - self._input_kwargs = tuple(input_kwargs) + leaf_nodes: List[DAGNode] = [] + for _, task in self.idx_to_task.items(): + if not isinstance(task.dag_node, ClassMethodNode): + continue + if ( + len(task.downstream_task_idxs) == 0 + and not task.dag_node.is_cgraph_output_node + ): + leaf_nodes.append(task.dag_node) + # Leaf nodes are not allowed because the exception thrown by the leaf + # node will not be propagated to the driver. + if len(leaf_nodes) != 0: + raise ValueError( + "Compiled DAG doesn't support leaf nodes, i.e., nodes that don't have " + "downstream nodes and are not output nodes. There are " + f"{len(leaf_nodes)} leaf nodes in the DAG. Please add the outputs of " + f"{[leaf_node.get_method_name() for leaf_node in leaf_nodes]} to the " + f"the MultiOutputNode." + ) @staticmethod def _get_gpu_ids(actor_handle: "ray.actor.ActorHandle") -> List[str]: @@ -2965,6 +2985,7 @@ def build_compiled_dag_from_ray_dag( enable_asyncio: bool = False, max_inflight_executions: Optional[int] = None, overlap_gpu_communication: Optional[bool] = None, + default_communicator: Optional[Union[Communicator, str]] = None, ) -> "CompiledDAG": compiled_dag = CompiledDAG( submit_timeout, @@ -2972,6 +2993,7 @@ def build_compiled_dag_from_ray_dag( enable_asyncio, max_inflight_executions, overlap_gpu_communication, + default_communicator, ) def _build_compiled_dag(node): diff --git a/python/ray/dag/dag_node.py b/python/ray/dag/dag_node.py index 4cd4fd0a7e97a..c7b51a63619d6 100644 --- a/python/ray/dag/dag_node.py +++ b/python/ray/dag/dag_node.py @@ -228,6 +228,7 @@ def experimental_compile( enable_asyncio: bool = False, _max_inflight_executions: Optional[int] = None, _overlap_gpu_communication: Optional[bool] = None, + _default_communicator: Optional[Union[Communicator, str]] = None, ) -> "ray.dag.CompiledDAG": """Compile an accelerated execution path for this DAG. @@ -250,6 +251,12 @@ def experimental_compile( communication and computation can be overlapped, which can improve the performance of the DAG execution. If None, the default value will be used. + _default_communicator: The default communicator to use to transport tensors + for nodes annotated with `with_tensor_transport()` and when shared memory + is not the desired option (e.g., when transport="nccl", or when + transport="auto" for communication between two different GPUs). + If it is "create", a default communicator is created when needed. + If None, an error will be thrown. All other values are invalid. Returns: A compiled DAG. @@ -278,6 +285,7 @@ def experimental_compile( enable_asyncio, _max_inflight_executions, _overlap_gpu_communication, + _default_communicator, ) def execute( From f362475384eaa1aba6da5fa3b835b74fb7a86834 Mon Sep 17 00:00:00 2001 From: Rui Qiao Date: Thu, 23 Jan 2025 22:10:30 +0000 Subject: [PATCH 02/29] up Signed-off-by: Rui Qiao --- python/ray/dag/compiled_dag_node.py | 92 +++++++------------ .../channel/torch_tensor_nccl_channel.py | 2 +- 2 files changed, 35 insertions(+), 59 deletions(-) diff --git a/python/ray/dag/compiled_dag_node.py b/python/ray/dag/compiled_dag_node.py index ecdc9154186d2..7cb51c2536eba 100644 --- a/python/ray/dag/compiled_dag_node.py +++ b/python/ray/dag/compiled_dag_node.py @@ -7,7 +7,6 @@ TYPE_CHECKING, Any, Dict, - FrozenSet, List, Tuple, Union, @@ -864,6 +863,7 @@ def __init__( f"got {default_communicator}" ) self._default_communicator: Optional[Communicator] = default_communicator + self._default_communicator_id: Optional[str] = None self._default_type_hint: ChannelOutputType = SharedMemoryType( buffer_size_bytes=self._buffer_size_bytes, @@ -932,16 +932,6 @@ def __init__( # Mapping from the actor handle to the node ID that the actor is on. # A None actor handle means the actor is the driver. self.actor_to_node_id: Dict[Optional["ray.actor.ActorHandle"], str] = {} - - # This is set to true when type hint of `transport="nccl"` is used. - self._use_default_nccl_group = False - # This is set to the specified custom communicator - # if there exists a type hint of `transport=custom_communicator`. - self._custom_communicator_p2p: Optional[Communicator] = None - # The NCCL group ID for P2P send/recv operations. - self._communicator_id_p2p: Optional[str] = None - # All the NCCL group IDs for P2P send/recv and collective operations. - self._communicator_ids: Set[str] = set() # The index of the current execution. It is incremented each time # the DAG is executed. self._execution_index: int = -1 @@ -976,18 +966,10 @@ def _create_proxy_actor() -> "ray.actor.ActorHandle": # we can lazily release the native buffers self._destructed_ref_idxs: Dict[int, Set[Optional[int]]] = defaultdict(set) - @property - def communicator_id_p2p(self) -> Optional[str]: - return self._communicator_id_p2p - @property def is_teardown(self) -> bool: return self._is_teardown - @property - def communicator_ids(self) -> Set[str]: - return self._communicator_ids - def get_id(self) -> str: """ Get the unique ID of the compiled DAG. @@ -1027,6 +1009,10 @@ def _preprocess(self) -> None: communicator_to_actors: Dict[ Optional[Communicator], Set["ray.actor.ActorHandle"] ] = {} + communicator_to_type_hint: Dict[ + Optional[Communicator], + Set["ray.experimental.channel.torch_tensor_type.TorchTensorType"], + ] = {} collective_ops: Set[_CollectiveOperation] = set() input_attributes: Set[str] = set() @@ -1121,10 +1107,16 @@ def _preprocess(self) -> None: if dag_node.type_hint.requires_nccl(): communicator = self._select_communicator(dag_node) communicator_to_actors[communicator].add(actor_handle) - + communicator_to_type_hint[communicator].add(dag_node.type_hint) # Collect NCCL collective operations. if isinstance(dag_node, CollectiveOutputNode): - collective_ops.add(dag_node.collective_op) + communicator = self._select_communicator(dag_node) + communicator_to_actors[communicator].update( + dag_node.collective_op.actor_handles + ) + communicator_to_type_hint[communicator].add( + dag_node.collective_op.type_hint + ) assert not self._overlap_gpu_communication, ( "Currently, the overlap_gpu_communication option is not " "supported for NCCL collective operations. Please set " @@ -1210,7 +1202,9 @@ def _preprocess(self) -> None: if upstream_task.dag_node.type_hint.requires_nccl(): communicator = self._select_communicator(upstream_task.dag_node) communicator_to_actors[communicator].add(downstream_actor_handle) - + communicator_to_type_hint[communicator].add( + upstream_task.dag_node.type_hint + ) # Check that all specified input attributes, e.g., InputNode()["x"], # are used in the DAG. _check_unused_dag_input_attributes(output_node, input_attributes) @@ -1219,7 +1213,7 @@ def _preprocess(self) -> None: self._resolve_auto_transport(auto_transport_tasks, communicator_to_actors) - self._init_communicators(communicator_to_actors, collective_ops) + self._init_communicators(communicator_to_actors, communicator_to_type_hint) if direct_input: self._input_num_positional_args = 1 @@ -1234,20 +1228,14 @@ def _init_communicators( communicator_to_actors: Dict[ Optional[Communicator], Set["ray.actor.ActorHandle"] ], - collective_ops: Set[ - "ray.experimental.channel.collective_node._CollectiveOperation" + communicator_to_type_hint: Dict[ + Optional[Communicator], + Set["ray.experimental.channel.torch_tensor_type.TorchTensorType"], ], ) -> None: """ Initialize communicators for the DAG. """ - # Initialize and cache a NCCL group for each custom NCCL group. All the - # custom NCCL groups are initialized before the default NCCL groups. - custom_communicator_to_id: Dict[Communicator, str] = {} - # Initialize and cache a NCCL group for each set of actors. A set of actors - # can perform P2P send/recv and collective operations. If there are multiple - # custom NCCL groups for a set of actors, only one is cached. - actors_to_communicator_id: Dict[FrozenSet["ray.actor.ActorHandle"], str] = {} for custom_communicator, actors in communicator_to_actors.items(): if None in actors: raise ValueError("Driver cannot participate in the NCCL group.") @@ -1257,24 +1245,10 @@ def _init_communicators( custom_communicator, self._overlap_gpu_communication, ) - custom_communicator_to_id[custom_communicator] = communicator_id - actors = frozenset(actors) - if actors not in actors_to_communicator_id: - actors_to_communicator_id[actors] = communicator_id - - # If a custom communicator is specified for collective actors, initialize and - # cache the communicator ID. - for collective_op in collective_ops: - type_hint = collective_op.type_hint - custom_communicator = type_hint.get_custom_communicator() - if custom_communicator: - communicator_id = collective_op.init_communicator( - custom_communicator_to_id.get(custom_communicator, None) - ) - custom_communicator_to_id[custom_communicator] = communicator_id - actors = frozenset(collective_op.actor_handles) - if actors not in actors_to_communicator_id: - actors_to_communicator_id[actors] = communicator_id + for type_hint in communicator_to_type_hint[custom_communicator]: + type_hint.set_communicator_id(communicator_id) + if custom_communicator == self._default_communicator: + self._default_communicator_id = communicator_id def _select_communicator( self, dag_node: "ray.dag.DAGNode" @@ -1283,7 +1257,14 @@ def _select_communicator( If custom_communicator is provided (i.e., not None), use it. Otherwise, use the default communicator. """ - custom_communicator = dag_node.type_hint.get_custom_communicator() + from ray.dag.collective_node import CollectiveOutputNode + + if isinstance(dag_node, CollectiveOutputNode): + custom_communicator = ( + dag_node.collective_op.type_hint.get_custom_communicator() + ) + else: + custom_communicator = dag_node.type_hint.get_custom_communicator() if custom_communicator is not None: return custom_communicator if not self._create_default_communicator: @@ -1443,10 +1424,6 @@ def _get_or_compile( visited.add(cur_idx) task = self.idx_to_task[cur_idx] - type_hint = task.dag_node.type_hint - if type_hint.requires_nccl(): - type_hint.set_communicator_id(self._communicator_id_p2p) - if ( isinstance(task.dag_node, ClassMethodNode) and task.dag_node.is_class_method_call @@ -1517,7 +1494,7 @@ def _get_or_compile( fn.remote( do_allocate_channel, reader_and_node_list, - type_hint, + task.dag_node.type_hint, driver_actor_id, ) ) @@ -2036,8 +2013,7 @@ def teardown(self, kill_actors: bool = False): logger.exception("Error cancelling worker task") pass - for communicator_id in outer._communicator_ids: - _destroy_communicator(communicator_id) + _destroy_communicator(outer._default_communicator_id) logger.info("Waiting for worker tasks to exit") self.wait_teardown(kill_actors=kill_actors) diff --git a/python/ray/experimental/channel/torch_tensor_nccl_channel.py b/python/ray/experimental/channel/torch_tensor_nccl_channel.py index dcf42f7ec47ed..74729a54770f3 100644 --- a/python/ray/experimental/channel/torch_tensor_nccl_channel.py +++ b/python/ray/experimental/channel/torch_tensor_nccl_channel.py @@ -732,7 +732,7 @@ def _init_communicator( custom_communicator: A custom NCCL group to initialize. use_communication_streams: Whether to use dedicated send and recv streams for communication. If True, communication and computation - can be overlapped to improve perfomrance. + can be overlapped to improve performance. """ ctx = ChannelContext.get_current() From 3a568e49584dc2311b1be5a20511c258bbf9183f Mon Sep 17 00:00:00 2001 From: Rui Qiao Date: Fri, 24 Jan 2025 02:35:03 +0000 Subject: [PATCH 03/29] up Signed-off-by: Rui Qiao --- python/ray/dag/compiled_dag_node.py | 11 +++++++---- 1 file changed, 7 insertions(+), 4 deletions(-) diff --git a/python/ray/dag/compiled_dag_node.py b/python/ray/dag/compiled_dag_node.py index 7cb51c2536eba..06bb4458ff1ce 100644 --- a/python/ray/dag/compiled_dag_node.py +++ b/python/ray/dag/compiled_dag_node.py @@ -1267,6 +1267,12 @@ def _select_communicator( custom_communicator = dag_node.type_hint.get_custom_communicator() if custom_communicator is not None: return custom_communicator + return self._get_default_communicator(dag_node) + + def _get_default_communicator( + self, + dag_node: "ray.dag.DAGNode", + ) -> Optional[Communicator]: if not self._create_default_communicator: if dag_node._original_type_hint is not None: assert isinstance(dag_node._original_type_hint, AutoTransportType) @@ -1294,14 +1300,11 @@ def _resolve_auto_transport( Resolve the auto transport type hint for the DAG. """ type_hint_resolver = TypeHintResolver(self.actor_to_gpu_ids) - # For AutoTransportType, there is no custom communicator provided. - # So call _select_communicator with None to get the default communicator, - # and also perform validation. - default_communicator = self._select_communicator(None) # Resolve AutoChannelType type hints and track the actors that use NCCL. # This is needed so that the NCCL group can be initialized for these # actors that use NCCL. for task in auto_transport_tasks: + default_communicator = self._get_default_communicator(task.dag_node) writer = task.dag_node._get_actor_handle() readers = task.downstream_task_idxs.values() writer_and_node = (writer, self._get_node_id(writer)) From 91259c6b1261a4ef07f086d032a55b5c625eb0aa Mon Sep 17 00:00:00 2001 From: Rui Qiao Date: Fri, 24 Jan 2025 21:50:10 +0000 Subject: [PATCH 04/29] up Signed-off-by: Rui Qiao --- python/ray/dag/compiled_dag_node.py | 36 +++++---- .../experimental/test_accelerated_dag.py | 4 +- .../experimental/test_torch_tensor_dag.py | 77 ++++++++----------- .../channel/torch_tensor_nccl_channel.py | 2 +- 4 files changed, 54 insertions(+), 65 deletions(-) diff --git a/python/ray/dag/compiled_dag_node.py b/python/ray/dag/compiled_dag_node.py index 06bb4458ff1ce..edc72239dfe2a 100644 --- a/python/ray/dag/compiled_dag_node.py +++ b/python/ray/dag/compiled_dag_node.py @@ -1001,19 +1001,16 @@ def _preprocess(self) -> None: InputNode, MultiOutputNode, ) - from ray.dag.collective_node import _CollectiveOperation self.input_task_idx, self.output_task_idx = None, None - nccl_actors_p2p: Set["ray.actor.ActorHandle"] = set() communicator_to_actors: Dict[ Optional[Communicator], Set["ray.actor.ActorHandle"] - ] = {} - communicator_to_type_hint: Dict[ + ] = defaultdict(set) + communicator_to_type_hints: Dict[ Optional[Communicator], Set["ray.experimental.channel.torch_tensor_type.TorchTensorType"], - ] = {} - collective_ops: Set[_CollectiveOperation] = set() + ] = defaultdict(set) input_attributes: Set[str] = set() # Find the input node and input attribute nodes in the DAG. @@ -1107,14 +1104,14 @@ def _preprocess(self) -> None: if dag_node.type_hint.requires_nccl(): communicator = self._select_communicator(dag_node) communicator_to_actors[communicator].add(actor_handle) - communicator_to_type_hint[communicator].add(dag_node.type_hint) + communicator_to_type_hints[communicator].add(dag_node.type_hint) # Collect NCCL collective operations. if isinstance(dag_node, CollectiveOutputNode): communicator = self._select_communicator(dag_node) communicator_to_actors[communicator].update( dag_node.collective_op.actor_handles ) - communicator_to_type_hint[communicator].add( + communicator_to_type_hints[communicator].add( dag_node.collective_op.type_hint ) assert not self._overlap_gpu_communication, ( @@ -1202,7 +1199,7 @@ def _preprocess(self) -> None: if upstream_task.dag_node.type_hint.requires_nccl(): communicator = self._select_communicator(upstream_task.dag_node) communicator_to_actors[communicator].add(downstream_actor_handle) - communicator_to_type_hint[communicator].add( + communicator_to_type_hints[communicator].add( upstream_task.dag_node.type_hint ) # Check that all specified input attributes, e.g., InputNode()["x"], @@ -1211,9 +1208,11 @@ def _preprocess(self) -> None: self._check_leaf_nodes() - self._resolve_auto_transport(auto_transport_tasks, communicator_to_actors) + self._resolve_auto_transport( + auto_transport_tasks, communicator_to_actors, communicator_to_type_hints + ) - self._init_communicators(communicator_to_actors, communicator_to_type_hint) + self._init_communicators(communicator_to_actors, communicator_to_type_hints) if direct_input: self._input_num_positional_args = 1 @@ -1228,7 +1227,7 @@ def _init_communicators( communicator_to_actors: Dict[ Optional[Communicator], Set["ray.actor.ActorHandle"] ], - communicator_to_type_hint: Dict[ + communicator_to_type_hints: Dict[ Optional[Communicator], Set["ray.experimental.channel.torch_tensor_type.TorchTensorType"], ], @@ -1241,11 +1240,11 @@ def _init_communicators( raise ValueError("Driver cannot participate in the NCCL group.") communicator_id = _init_communicator( - actors, + list(actors), custom_communicator, self._overlap_gpu_communication, ) - for type_hint in communicator_to_type_hint[custom_communicator]: + for type_hint in communicator_to_type_hints[custom_communicator]: type_hint.set_communicator_id(communicator_id) if custom_communicator == self._default_communicator: self._default_communicator_id = communicator_id @@ -1283,7 +1282,7 @@ def _get_default_communicator( ) raise ValueError( f"DAGNode {dag_node} has no custom communicator specified. " - "Please specify a custom communicator for the DAGNode using" + "Please specify a custom communicator for the DAGNode using " "`with_tensor_transport()`, or specify a communicator or 'create' for " "default_communicator when calling experimental_compile()." ) @@ -1295,6 +1294,10 @@ def _resolve_auto_transport( communicator_to_actors: Dict[ Optional[Communicator], Set["ray.actor.ActorHandle"] ], + communicator_to_type_hints: Dict[ + Optional[Communicator], + Set["ray.experimental.channel.torch_tensor_type.TorchTensorType"], + ], ) -> None: """ Resolve the auto transport type hint for the DAG. @@ -1322,6 +1325,9 @@ def _resolve_auto_transport( if task.dag_node.type_hint.requires_nccl(): communicator_to_actors[default_communicator].add(writer) communicator_to_actors[default_communicator].update(readers) + communicator_to_type_hints[default_communicator].add( + task.dag_node.type_hint + ) def _check_leaf_nodes(self) -> None: """ diff --git a/python/ray/dag/tests/experimental/test_accelerated_dag.py b/python/ray/dag/tests/experimental/test_accelerated_dag.py index 43235a97e0991..db9f4492ab298 100644 --- a/python/ray/dag/tests/experimental/test_accelerated_dag.py +++ b/python/ray/dag/tests/experimental/test_accelerated_dag.py @@ -898,7 +898,7 @@ def test_multi_args_and_torch_type(self, ray_start_regular): dag = c.collect_two.bind(branch2, branch1) dag.with_tensor_transport() - compiled_dag = dag.experimental_compile() + compiled_dag = dag.experimental_compile(_default_communicator="create") cpu_tensors = [torch.tensor([0, 0, 0, 0, 0]), torch.tensor([1, 1, 1, 1, 1])] ref = compiled_dag.execute(cpu_tensors[0], cpu_tensors[1]) @@ -2851,7 +2851,7 @@ def __init__(self): inp, ).with_tensor_transport(), ) - self._cdag = dag.experimental_compile() + self._cdag = dag.experimental_compile(_default_communicator="create") def call(self, value): return ray.get(self._cdag.execute(value)) diff --git a/python/ray/dag/tests/experimental/test_torch_tensor_dag.py b/python/ray/dag/tests/experimental/test_torch_tensor_dag.py index 98bf318cd94e4..6d5ba5fd9913e 100644 --- a/python/ray/dag/tests/experimental/test_torch_tensor_dag.py +++ b/python/ray/dag/tests/experimental/test_torch_tensor_dag.py @@ -162,7 +162,7 @@ def test_torch_tensor_p2p(ray_start_regular): dag = dag.with_tensor_transport() dag = receiver.recv.bind(dag) - compiled_dag = dag.experimental_compile() + compiled_dag = dag.experimental_compile(_default_communicator="create") for i in range(3): ref = compiled_dag.execute(i, shape=shape, dtype=dtype) assert ray.get(ref) == (i, shape, dtype) @@ -240,7 +240,8 @@ def test_torch_tensor_nccl( dag = receiver.recv.bind(dag) compiled_dag = dag.experimental_compile( - _overlap_gpu_communication=overlap_gpu_communication + _overlap_gpu_communication=overlap_gpu_communication, + _default_communicator="create", ) # Test that we can pass different shapes and data. @@ -255,7 +256,7 @@ def test_torch_tensor_nccl( dag = dag.with_tensor_transport(transport="nccl") dag = receiver.recv.bind(dag) - compiled_dag = dag.experimental_compile() + compiled_dag = dag.experimental_compile(_default_communicator="create") # Test that we can pass different shapes and data. for i in range(3): @@ -290,7 +291,7 @@ def test_torch_tensor_auto(ray_start_regular, num_gpus): data_annotated = data.with_tensor_transport(transport="auto") dag = receiver.recv.bind(data_annotated) - compiled_dag = dag.experimental_compile() + compiled_dag = dag.experimental_compile(_default_communicator="create") assert isinstance(data_annotated.type_hint, TorchTensorType) assert data_annotated.type_hint.transport == expected_transport @@ -306,7 +307,7 @@ def test_torch_tensor_auto(ray_start_regular, num_gpus): dag = dag.with_tensor_transport(transport="auto") dag = receiver.recv.bind(dag) - compiled_dag = dag.experimental_compile() + compiled_dag = dag.experimental_compile(_default_communicator="create") assert isinstance(data_annotated.type_hint, TorchTensorType) assert data_annotated.type_hint.transport == expected_transport @@ -351,7 +352,8 @@ def test_torch_tensor_nccl_overlap_timed(ray_start_regular, overlap_gpu_communic # Test normal execution. compiled_dag = dag.experimental_compile( - _overlap_gpu_communication=overlap_gpu_communication + _overlap_gpu_communication=overlap_gpu_communication, + _default_communicator="create", ) start = time.monotonic() @@ -397,7 +399,7 @@ def test_torch_tensor_nccl_disallows_driver(ray_start_regular): "via NCCL because the driver cannot participate in the NCCL group" ), ): - dag.experimental_compile() + dag.experimental_compile(_default_communicator="create") # Test that OutputNode cannot cannot participate in the NCCL group. with InputNode() as inp: @@ -408,7 +410,7 @@ def test_torch_tensor_nccl_disallows_driver(ray_start_regular): ValueError, match=(r"Driver cannot participate in the NCCL group\."), ): - dag.experimental_compile() + dag.experimental_compile(_default_communicator="create") @pytest.mark.parametrize("ray_start_regular", [{"num_cpus": 4}], indirect=True) @@ -629,7 +631,7 @@ def get_transport_name(self) -> str: ValueError, match=r"Compiled Graphs do not support mixed usage of type hints.*", ): - dag.experimental_compile() + dag.experimental_compile(_default_communicator="create") # Case 2: default NCCL group first, then custom NCCL group with InputNode() as inp: @@ -643,27 +645,7 @@ def get_transport_name(self) -> str: ValueError, match=r"Compiled Graphs do not support mixed usage of type hints.*", ): - dag.experimental_compile() - - nccl_group2 = MockNcclGroup(2, [actor1, actor2]) - - # Using two different custom NCCL groups are currently not supported - with InputNode() as inp: - dag = actor1.send.bind(inp.shape, inp.dtype, inp.value) - dag = dag.with_tensor_transport(transport=nccl_group) - dag = actor2.recv.options(num_returns=3).bind(dag) - dag = actor2.send.bind(*dag) - dag = dag.with_tensor_transport(transport=nccl_group2) - dag = actor1.recv.bind(dag) - with pytest.raises( - ValueError, - match=( - "Compiled Graphs currently only support " - "a single custom NCCL group, but multiple " - "have been specified." - ), - ): - dag.experimental_compile() + dag.experimental_compile(_default_communicator="create") @pytest.mark.parametrize("ray_start_regular", [{"num_cpus": 4}], indirect=True) @@ -813,7 +795,7 @@ def test_torch_tensor_nccl_static_shape(ray_start_regular): dag = dag.with_tensor_transport(transport="nccl", _static_shape=True) dag = receiver.recv.bind(dag) - compiled_dag = dag.experimental_compile() + compiled_dag = dag.experimental_compile(_default_communicator="create") # Test that the DAG works as long as we send the same shape. shape = (10,) @@ -851,7 +833,7 @@ def test_torch_tensor_nccl_direct_return(ray_start_regular): dag = dag.with_tensor_transport(transport="nccl", _direct_return=True) dag = receiver.recv.bind(dag) - compiled_dag = dag.experimental_compile() + compiled_dag = dag.experimental_compile(_default_communicator="create") for i in range(3): shape = (10 * (i + 1),) @@ -892,7 +874,7 @@ def test_torch_tensor_nccl_nested_dynamic(ray_start_regular): dag = dag.with_tensor_transport(transport="nccl") dag = receiver.recv_dict.bind(dag) - compiled_dag = dag.experimental_compile() + compiled_dag = dag.experimental_compile(_default_communicator="create") for i in range(3): dtype = torch.float16 @@ -937,7 +919,8 @@ def test_torch_tensor_exceptions( dag = receiver.recv.bind(dag) compiled_dag = dag.experimental_compile( - _overlap_gpu_communication=overlap_gpu_communication + _overlap_gpu_communication=overlap_gpu_communication, + _default_communicator="create", ) shape = (10,) @@ -1018,7 +1001,7 @@ def test_torch_tensor_exceptions2( ) dag = receiver.recv.bind(dag) - compiled_dag = dag.experimental_compile() + compiled_dag = dag.experimental_compile(_default_communicator="create") ref = compiled_dag.execute(1) with pytest.raises( @@ -1065,7 +1048,7 @@ def test_torch_tensor_nccl_all_reduce(ray_start_regular): ] dag = MultiOutputNode(recvs) - compiled_dag = dag.experimental_compile() + compiled_dag = dag.experimental_compile(_default_communicator="create") for i in range(3): i += 1 @@ -1109,7 +1092,7 @@ def test_torch_tensor_nccl_all_reduce_get_partial(ray_start_regular): tensor = workers[1].recv_tensor.bind(collectives[0]) dag = MultiOutputNode([recv, tensor, collectives[1]]) - compiled_dag = dag.experimental_compile() + compiled_dag = dag.experimental_compile(_default_communicator="create") for i in range(3): ref = compiled_dag.execute( @@ -1155,7 +1138,7 @@ def test_torch_tensor_nccl_all_reduce_wrong_shape(ray_start_regular): ] dag = MultiOutputNode(recvs) - compiled_dag = dag.experimental_compile() + compiled_dag = dag.experimental_compile(_default_communicator="create") ref = compiled_dag.execute([((20,), dtype, idx + 1) for idx in range(num_workers)]) reduced_val = (1 + num_workers) * num_workers / 2 @@ -1347,7 +1330,7 @@ def test_torch_tensor_nccl_all_reduce_scheduling(ray_start_regular): recv = workers[1].recv.bind(t) dag = MultiOutputNode([collectives[0], collectives[1], recv]) - compiled_dag = dag.experimental_compile() + compiled_dag = dag.experimental_compile(_default_communicator="create") value = 10 ref = compiled_dag.execute(value) @@ -1382,7 +1365,7 @@ def test_nccl_all_reduce_with_class_method_output_node(ray_start_regular): tensors = collective.allreduce.bind([t1, t4], ReduceOp.SUM) dag = MultiOutputNode(tensors + [t2, t3]) - compiled_dag = dag.experimental_compile() + compiled_dag = dag.experimental_compile(_default_communicator="create") t1 = torch.tensor([1], device="cuda") t2 = torch.tensor([2], device="cuda") @@ -1423,7 +1406,7 @@ def recv(self, tensor): torch_inp = inp.with_tensor_transport() dag = receiver.recv.bind(torch_inp) - compiled_dag = dag.experimental_compile() + compiled_dag = dag.experimental_compile(_default_communicator="create") ref = compiled_dag.execute(torch.tensor([1])) assert ray.get(ref) == 1 # This should timeout because actor shouldn't print anything. @@ -1461,7 +1444,7 @@ def test_input_node_without_type_hint(self, ray_start_regular, tensor_device): with InputNode() as inp: dag = worker.echo.bind(inp) - compiled_dag = dag.experimental_compile() + compiled_dag = dag.experimental_compile(_default_communicator="create") tensor = torch.tensor([5]) if tensor_device == "cuda": tensor = tensor.cuda() @@ -1496,7 +1479,7 @@ def test_input_node_with_tensor_transport(self, ray_start_regular, tensor_device with InputNode() as inp: dag = worker.echo.bind(inp.with_tensor_transport()) - compiled_dag = dag.experimental_compile() + compiled_dag = dag.experimental_compile(_default_communicator="create") cpu_tensor = torch.tensor([1]) input_tensor = cpu_tensor if tensor_device == "cuda": @@ -1547,7 +1530,7 @@ def test_input_attr_nodes_with_all_tensor_type_hint(self, ray_start_regular): branch2 = worker2.echo.bind(dag) dag = MultiOutputNode([branch1, branch2]) - compiled_dag = dag.experimental_compile() + compiled_dag = dag.experimental_compile(_default_communicator="create") cpu_tensor_1 = torch.tensor([1]) cpu_tensor_2 = torch.tensor([2]) ref = compiled_dag.execute(cpu_tensor_1, cpu_tensor_2) @@ -1651,7 +1634,7 @@ def test_torch_nccl_channel_with_local_reader(ray_start_regular): branch1 = w1.recv.bind(dag) branch2 = w2.recv.bind(dag) dag = MultiOutputNode([branch1, branch2]) - compiled_dag = dag.experimental_compile() + compiled_dag = dag.experimental_compile(_default_communicator="create") for i in range(3): ref = compiled_dag.execute(i, shape=shape, dtype=dtype) assert ray.get(ref) == [(i, shape, dtype), (i, shape, dtype)] @@ -1687,7 +1670,7 @@ def test_torch_nccl_channel_with_two_local_readers(ray_start_regular): branch2 = w1.recv.bind(dag) branch3 = w2.recv.bind(dag) dag = MultiOutputNode([branch1, branch2, branch3]) - compiled_dag = dag.experimental_compile() + compiled_dag = dag.experimental_compile(_default_communicator="create") for i in range(3): ref = compiled_dag.execute(i, shape=shape, dtype=dtype) assert ray.get(ref) == [(i, shape, dtype), (i, shape, dtype), (i, shape, dtype)] @@ -1722,7 +1705,7 @@ def test_torch_nccl_channel_with_all_local_readers(ray_start_regular): "is not needed. No NCCL channel will be created." ), ): - dag.experimental_compile() + dag.experimental_compile(_default_communicator="create") if __name__ == "__main__": diff --git a/python/ray/experimental/channel/torch_tensor_nccl_channel.py b/python/ray/experimental/channel/torch_tensor_nccl_channel.py index 74729a54770f3..4001268956211 100644 --- a/python/ray/experimental/channel/torch_tensor_nccl_channel.py +++ b/python/ray/experimental/channel/torch_tensor_nccl_channel.py @@ -396,7 +396,7 @@ def __init__( ctx = ChannelContext.get_current() assert isinstance( typ.communicator_id, str - ), "NCCL group ID ({nccl_group_id}) must be a str." + ), f"NCCL group ID ({typ.communicator_id}) must be a str." self._typ = typ assert self._typ.communicator_id is not None, "No NCCL group specified." From 41174962abb1b449431bdcbeaf057007021012cc Mon Sep 17 00:00:00 2001 From: Rui Qiao Date: Mon, 27 Jan 2025 02:39:06 +0000 Subject: [PATCH 05/29] up Signed-off-by: Rui Qiao --- .../ray/dag/tests/experimental/test_collective_dag.py | 3 +-- python/ray/experimental/collective/conftest.py | 10 ++++++---- 2 files changed, 7 insertions(+), 6 deletions(-) diff --git a/python/ray/dag/tests/experimental/test_collective_dag.py b/python/ray/dag/tests/experimental/test_collective_dag.py index 26c07ee6b48ad..5f35d50e01f74 100644 --- a/python/ray/dag/tests/experimental/test_collective_dag.py +++ b/python/ray/dag/tests/experimental/test_collective_dag.py @@ -92,8 +92,7 @@ def test_comm_all_reduces(ray_start_regular, monkeypatch): monkeypatch, dag, { - (frozenset([workers[0]]), None), - (frozenset([workers[1]]), None), + (frozenset(workers), None), }, ) diff --git a/python/ray/experimental/collective/conftest.py b/python/ray/experimental/collective/conftest.py index 70013d600018b..b7265be4d3821 100644 --- a/python/ray/experimental/collective/conftest.py +++ b/python/ray/experimental/collective/conftest.py @@ -1,4 +1,3 @@ -import copy import uuid from typing import Dict, FrozenSet, List, Optional, Set, Tuple @@ -153,7 +152,7 @@ def check_init( set(self.ids_to_actors_and_custom_comms.values()) == actors_and_custom_comms ) - nccl_group_id_p2p = compiled_dag.communicator_id_p2p + nccl_group_id_p2p = compiled_dag._default_communicator if p2p_actors_and_custom_comm is None: assert nccl_group_id_p2p is None else: @@ -228,7 +227,7 @@ def check_nccl_group_init( mock_nccl_group_set, ) - compiled_dag = dag.experimental_compile() + compiled_dag = dag.experimental_compile(_default_communicator="create") mock_nccl_group_set.check_init( compiled_dag, actors_and_custom_comms, @@ -248,6 +247,9 @@ def check_nccl_group_teardown( mock_nccl_group_set.mock_destroy_nccl_group, ) - nccl_group_ids = copy.deepcopy(compiled_dag.communicator_ids) + if compiled_dag._create_default_communicator: + nccl_group_ids = [compiled_dag._default_communicator_id] + else: + nccl_group_ids = [] compiled_dag.teardown() mock_nccl_group_set.check_teardown(nccl_group_ids) From 5d6b5a9d9456e5d4e536a14856b0aecaaca276b8 Mon Sep 17 00:00:00 2001 From: Rui Qiao Date: Tue, 28 Jan 2025 06:12:03 +0000 Subject: [PATCH 06/29] up Signed-off-by: Rui Qiao --- python/ray/dag/compiled_dag_node.py | 7 +++---- .../dag/tests/experimental/test_collective_dag.py | 13 ++++++------- python/ray/experimental/collective/conftest.py | 9 ++++----- 3 files changed, 13 insertions(+), 16 deletions(-) diff --git a/python/ray/dag/compiled_dag_node.py b/python/ray/dag/compiled_dag_node.py index edc72239dfe2a..5e5ac51811a23 100644 --- a/python/ray/dag/compiled_dag_node.py +++ b/python/ray/dag/compiled_dag_node.py @@ -1107,7 +1107,7 @@ def _preprocess(self) -> None: communicator_to_type_hints[communicator].add(dag_node.type_hint) # Collect NCCL collective operations. if isinstance(dag_node, CollectiveOutputNode): - communicator = self._select_communicator(dag_node) + communicator = self._select_communicator(dag_node, collective=True) communicator_to_actors[communicator].update( dag_node.collective_op.actor_handles ) @@ -1250,15 +1250,14 @@ def _init_communicators( self._default_communicator_id = communicator_id def _select_communicator( - self, dag_node: "ray.dag.DAGNode" + self, dag_node: "ray.dag.DAGNode", collective: bool = False ) -> Optional[Communicator]: """ If custom_communicator is provided (i.e., not None), use it. Otherwise, use the default communicator. """ - from ray.dag.collective_node import CollectiveOutputNode - if isinstance(dag_node, CollectiveOutputNode): + if collective: custom_communicator = ( dag_node.collective_op.type_hint.get_custom_communicator() ) diff --git a/python/ray/dag/tests/experimental/test_collective_dag.py b/python/ray/dag/tests/experimental/test_collective_dag.py index 5f35d50e01f74..17db6c0569f60 100644 --- a/python/ray/dag/tests/experimental/test_collective_dag.py +++ b/python/ray/dag/tests/experimental/test_collective_dag.py @@ -94,6 +94,7 @@ def test_comm_all_reduces(ray_start_regular, monkeypatch): { (frozenset(workers), None), }, + (frozenset(workers), None), ) check_nccl_group_teardown(monkeypatch, compiled_dag, mock_nccl_group_set) @@ -119,9 +120,7 @@ def test_comm_deduplicate_all_reduces(ray_start_regular, monkeypatch): dag = MultiOutputNode(collectives) compiled_dag, mock_nccl_group_set = check_nccl_group_init( - monkeypatch, - dag, - {(frozenset(workers), None)}, + monkeypatch, dag, {(frozenset(workers), None)}, (frozenset(workers), None) ) check_nccl_group_teardown(monkeypatch, compiled_dag, mock_nccl_group_set) @@ -207,8 +206,10 @@ def test_custom_comm_deduplicate(ray_start_regular, monkeypatch): compiled_dag, mock_nccl_group_set = check_nccl_group_init( monkeypatch, dag, - {(frozenset(workers), comm)}, - (frozenset(workers), comm), + { + (frozenset(workers), comm), + }, + (frozenset(workers), None), ) check_nccl_group_teardown(monkeypatch, compiled_dag, mock_nccl_group_set) @@ -258,7 +259,6 @@ def test_custom_comm_init_teardown(ray_start_regular, monkeypatch): monkeypatch, dag, {(frozenset(workers), comm)}, - (frozenset(workers), comm), ) check_nccl_group_teardown(monkeypatch, compiled_dag, mock_nccl_group_set) @@ -284,7 +284,6 @@ def test_custom_comm_init_teardown(ray_start_regular, monkeypatch): (frozenset(workers), comm_2), (frozenset(workers), comm_3), }, - (frozenset(workers), comm_3), ) check_nccl_group_teardown(monkeypatch, compiled_dag, mock_nccl_group_set) diff --git a/python/ray/experimental/collective/conftest.py b/python/ray/experimental/collective/conftest.py index b7265be4d3821..26927681d4942 100644 --- a/python/ray/experimental/collective/conftest.py +++ b/python/ray/experimental/collective/conftest.py @@ -147,18 +147,17 @@ def check_init( Tuple[FrozenSet["ray.actor.ActorHandle"], Optional[Communicator]] ], ) -> None: - assert len(self.ids_to_actors_and_custom_comms) == len(actors_and_custom_comms) assert ( set(self.ids_to_actors_and_custom_comms.values()) == actors_and_custom_comms ) - nccl_group_id_p2p = compiled_dag._default_communicator + nccl_group_id_p2p_id = compiled_dag._default_communicator_id if p2p_actors_and_custom_comm is None: - assert nccl_group_id_p2p is None + assert nccl_group_id_p2p_id is None else: - assert nccl_group_id_p2p + assert nccl_group_id_p2p_id assert ( - self.ids_to_actors_and_custom_comms[nccl_group_id_p2p] + self.ids_to_actors_and_custom_comms[nccl_group_id_p2p_id] == p2p_actors_and_custom_comm ) From bb49f713b0a6d7aab45ba44580b6f8f60b8e75cc Mon Sep 17 00:00:00 2001 From: Rui Qiao Date: Tue, 28 Jan 2025 06:34:06 +0000 Subject: [PATCH 07/29] up Signed-off-by: Rui Qiao --- python/ray/dag/tests/experimental/test_collective_dag.py | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/python/ray/dag/tests/experimental/test_collective_dag.py b/python/ray/dag/tests/experimental/test_collective_dag.py index 17db6c0569f60..fd5dfe3ef4c49 100644 --- a/python/ray/dag/tests/experimental/test_collective_dag.py +++ b/python/ray/dag/tests/experimental/test_collective_dag.py @@ -208,6 +208,7 @@ def test_custom_comm_deduplicate(ray_start_regular, monkeypatch): dag, { (frozenset(workers), comm), + (frozenset(workers), None), }, (frozenset(workers), None), ) @@ -225,8 +226,11 @@ def test_custom_comm_deduplicate(ray_start_regular, monkeypatch): compiled_dag, mock_nccl_group_set = check_nccl_group_init( monkeypatch, dag, - {(frozenset(workers), comm)}, - (frozenset(workers), comm), + { + (frozenset(workers), comm), + (frozenset(workers), None), + }, + (frozenset(workers), None), ) check_nccl_group_teardown(monkeypatch, compiled_dag, mock_nccl_group_set) From b29d1e27ada45e7c5d2bd83a5b57b4b23b78e49a Mon Sep 17 00:00:00 2001 From: Rui Qiao Date: Tue, 28 Jan 2025 06:36:39 +0000 Subject: [PATCH 08/29] up Signed-off-by: Rui Qiao --- .../experimental/test_torch_tensor_dag.py | 117 ------------------ 1 file changed, 117 deletions(-) diff --git a/python/ray/dag/tests/experimental/test_torch_tensor_dag.py b/python/ray/dag/tests/experimental/test_torch_tensor_dag.py index 6d5ba5fd9913e..86294113b8496 100644 --- a/python/ray/dag/tests/experimental/test_torch_tensor_dag.py +++ b/python/ray/dag/tests/experimental/test_torch_tensor_dag.py @@ -531,123 +531,6 @@ def get_transport_name(self) -> str: assert result == (i, shape, dtype) -@pytest.mark.parametrize("ray_start_regular", [{"num_cpus": 4}], indirect=True) -def test_torch_tensor_custom_comm_invalid(ray_start_regular): - if not USE_GPU: - pytest.skip("NCCL tests require GPUs") - - assert ( - sum(node["Resources"].get("GPU", 0) for node in ray.nodes()) > 1 - ), "This test requires at least 2 GPUs" - - actor_cls = TorchTensorWorker.options(num_cpus=0, num_gpus=1) - - actor1 = actor_cls.remote() - actor2 = actor_cls.remote() - - class MockNcclGroup(Communicator): - """ - A mock NCCL group for testing. Send and recv are not implemented. - """ - - import cupy as cp - - def __init__(self, world_size, actor_handles): - self._world_size = world_size - self._actor_handles = actor_handles - self._rank = None - - def initialize(self, rank: int) -> None: - expected_rank = self.get_rank(ray.get_runtime_context().current_actor) - assert ( - rank == expected_rank - ), f"NCCL actor's rank {rank} does not match expected rank {expected_rank}" - self._rank = rank - self._device = torch_utils.get_devices()[0] - - def get_rank(self, actor: ray.actor.ActorHandle) -> int: - actor_ids = [a._ray_actor_id for a in self._actor_handles] - try: - rank = actor_ids.index(actor._ray_actor_id) - except ValueError: - raise ValueError("Actor is not in the NCCL group.") - return rank - - def get_world_size(self) -> int: - return self._world_size - - def get_self_rank(self) -> Optional[int]: - return self._rank - - def get_actor_handles(self) -> List["ray.actor.ActorHandle"]: - return self._actor_handles - - def send(self, value: "torch.Tensor", peer_rank: int) -> None: - return None - - def recv( - self, - shape: Tuple[int], - dtype: "torch.dtype", - peer_rank: int, - allocator: Optional[TorchTensorAllocator] = None, - ) -> "torch.Tensor": - return None - - def allreduce( - self, - send_buf: "torch.Tensor", - recv_buf: "torch.Tensor", - op: ReduceOp, - ) -> None: - raise NotImplementedError - - @property - def recv_stream(self) -> Optional["cp.cuda.ExternalStream"]: - return None - - @property - def send_stream(self) -> Optional["cp.cuda.ExternalStream"]: - return None - - def destroy(self) -> None: - pass - - def get_transport_name(self) -> str: - return "nccl" - - nccl_group = MockNcclGroup(2, [actor1, actor2]) - - # Mixed usage of NCCL groups should throw an error - # Case 1: custom NCCL group first, then default NCCL group - with InputNode() as inp: - dag = actor1.send.bind(inp.shape, inp.dtype, inp.value) - dag = dag.with_tensor_transport(transport=nccl_group) - dag = actor2.recv.options(num_returns=3).bind(dag) - dag = actor2.send.bind(*dag) - dag = dag.with_tensor_transport(transport="nccl") - dag = actor1.recv.bind(dag) - with pytest.raises( - ValueError, - match=r"Compiled Graphs do not support mixed usage of type hints.*", - ): - dag.experimental_compile(_default_communicator="create") - - # Case 2: default NCCL group first, then custom NCCL group - with InputNode() as inp: - dag = actor1.send.bind(inp.shape, inp.dtype, inp.value) - dag = dag.with_tensor_transport(transport="nccl") - dag = actor2.recv.options(num_returns=3).bind(dag) - dag = actor2.send.bind(*dag) - dag = dag.with_tensor_transport(transport=nccl_group) - dag = actor1.recv.bind(dag) - with pytest.raises( - ValueError, - match=r"Compiled Graphs do not support mixed usage of type hints.*", - ): - dag.experimental_compile(_default_communicator="create") - - @pytest.mark.parametrize("ray_start_regular", [{"num_cpus": 4}], indirect=True) def test_torch_tensor_custom_comm_inited(ray_start_regular): if not USE_GPU: From f9bca7b5ecc576db399d96873e38e46f1743cc00 Mon Sep 17 00:00:00 2001 From: Rui Qiao Date: Tue, 28 Jan 2025 16:57:06 +0000 Subject: [PATCH 09/29] up Signed-off-by: Rui Qiao --- .../dag/tests/experimental/test_mocked_nccl_dag.py | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/python/ray/dag/tests/experimental/test_mocked_nccl_dag.py b/python/ray/dag/tests/experimental/test_mocked_nccl_dag.py index 3235220f87d92..0887c1ba85fcd 100644 --- a/python/ray/dag/tests/experimental/test_mocked_nccl_dag.py +++ b/python/ray/dag/tests/experimental/test_mocked_nccl_dag.py @@ -96,7 +96,7 @@ def test_p2p(ray_start_cluster): dag = dag.with_tensor_transport(transport="nccl") dag = receiver.recv.bind(dag) - compiled_dag = dag.experimental_compile() + compiled_dag = dag.experimental_compile(_default_communicator="create") for i in range(3): ref = compiled_dag.execute(i, shape=shape, dtype=dtype, send_as_dict=False) assert ray.get(ref) == (i, shape, dtype) @@ -149,7 +149,7 @@ def test_p2p_static_shape(ray_start_cluster, send_as_dict): dag = dag.with_tensor_transport(transport="nccl", _static_shape=True) dag = receiver.recv.bind(dag) - compiled_dag = dag.experimental_compile() + compiled_dag = dag.experimental_compile(_default_communicator="create") for i in range(3): ref = compiled_dag.execute(i, shape=shape, dtype=dtype) assert ray.get(ref) == (i, shape, dtype) @@ -189,7 +189,7 @@ def test_p2p_static_shape_error(capsys, ray_start_cluster, send_as_dict): dag = dag.with_tensor_transport(transport="nccl", _static_shape=True) dag = receiver.recv.bind(dag) - compiled_dag = dag.experimental_compile() + compiled_dag = dag.experimental_compile(_default_communicator="create") for i in range(3): ref = compiled_dag.execute(i, shape=shape, dtype=dtype) assert ray.get(ref) == (i, shape, dtype) @@ -243,7 +243,7 @@ def test_p2p_direct_return(ray_start_cluster): dag = dag.with_tensor_transport(transport="nccl", _direct_return=True) dag = receiver.recv.bind(dag) - compiled_dag = dag.experimental_compile() + compiled_dag = dag.experimental_compile(_default_communicator="create") dtype = torch.float16 for i in range(3): @@ -285,7 +285,7 @@ def test_p2p_direct_return_error(capsys, ray_start_cluster): dag = dag.with_tensor_transport(transport="nccl", _direct_return=True) dag = receiver.recv.bind(dag) - compiled_dag = dag.experimental_compile() + compiled_dag = dag.experimental_compile(_default_communicator="create") dtype = torch.float16 for i in range(3): @@ -353,7 +353,7 @@ def test_p2p_static_shape_and_direct_return( ) dag = receiver.recv.bind(dag) - compiled_dag = dag.experimental_compile() + compiled_dag = dag.experimental_compile(_default_communicator="create") shape = (10,) dtype = torch.float16 From 9af232054baf684031d65a6364ffc87bf3e2bd64 Mon Sep 17 00:00:00 2001 From: Rui Qiao Date: Tue, 28 Jan 2025 17:16:34 +0000 Subject: [PATCH 10/29] clean up Signed-off-by: Rui Qiao --- .../tests/experimental/test_collective_dag.py | 22 +++++++++++-------- .../ray/experimental/collective/conftest.py | 18 +++++++-------- 2 files changed, 22 insertions(+), 18 deletions(-) diff --git a/python/ray/dag/tests/experimental/test_collective_dag.py b/python/ray/dag/tests/experimental/test_collective_dag.py index fd5dfe3ef4c49..2609d101903d6 100644 --- a/python/ray/dag/tests/experimental/test_collective_dag.py +++ b/python/ray/dag/tests/experimental/test_collective_dag.py @@ -73,7 +73,7 @@ def test_all_reduce_custom_comm_wrong_actors(ray_start_regular): ) def test_comm_all_reduces(ray_start_regular, monkeypatch): """ - Test different communicators are used for different all-reduce calls of + Test the same default communicator is used for different all-reduce calls of different sets of actors. """ actor_cls = CPUTorchTensorWorker.options(num_cpus=0, num_gpus=1) @@ -103,10 +103,10 @@ def test_comm_all_reduces(ray_start_regular, monkeypatch): @pytest.mark.parametrize( "ray_start_regular", [{"num_cpus": 4, "num_gpus": 4}], indirect=True ) -def test_comm_deduplicate_all_reduces(ray_start_regular, monkeypatch): +def test_comm_all_reduces2(ray_start_regular, monkeypatch): """ - Test communicators are deduplicated when all-reduces are called on the same - group of actors more than once. + Test the same default communicator is used when all-reduces are called on the + same group of actors more than once. """ actor_cls = CPUTorchTensorWorker.options(num_cpus=0, num_gpus=1) @@ -120,7 +120,10 @@ def test_comm_deduplicate_all_reduces(ray_start_regular, monkeypatch): dag = MultiOutputNode(collectives) compiled_dag, mock_nccl_group_set = check_nccl_group_init( - monkeypatch, dag, {(frozenset(workers), None)}, (frozenset(workers), None) + monkeypatch, + dag, + {(frozenset(workers), None)}, + (frozenset(workers), None), ) check_nccl_group_teardown(monkeypatch, compiled_dag, mock_nccl_group_set) @@ -129,9 +132,9 @@ def test_comm_deduplicate_all_reduces(ray_start_regular, monkeypatch): @pytest.mark.parametrize( "ray_start_regular", [{"num_cpus": 4, "num_gpus": 4}], indirect=True ) -def test_comm_deduplicate_p2p_and_collective(ray_start_regular, monkeypatch): +def test_comm_p2p_and_collective(ray_start_regular, monkeypatch): """ - Test communicators are deduplicated when the collective and the P2P are on + Test the same default communicator is used when the collective and the P2P are on the same set of actors. """ actor_cls = CPUTorchTensorWorker.options(num_cpus=0, num_gpus=1) @@ -184,9 +187,10 @@ def test_comm_deduplicate_p2p_and_collective(ray_start_regular, monkeypatch): @pytest.mark.parametrize( "ray_start_regular", [{"num_cpus": 4, "num_gpus": 4}], indirect=True ) -def test_custom_comm_deduplicate(ray_start_regular, monkeypatch): +def test_custom_comm(ray_start_regular, monkeypatch): """ - Test a custom GPU communicator is reused when possible. + Test a custom GPU communicator is used when specified and a default + communicator is used otherwise. """ actor_cls = CPUTorchTensorWorker.options(num_cpus=0, num_gpus=1) diff --git a/python/ray/experimental/collective/conftest.py b/python/ray/experimental/collective/conftest.py index 26927681d4942..a470c252bfa5f 100644 --- a/python/ray/experimental/collective/conftest.py +++ b/python/ray/experimental/collective/conftest.py @@ -143,7 +143,7 @@ def check_init( actors_and_custom_comms: Set[ Tuple[FrozenSet["ray.actor.ActorHandle"], Optional[Communicator]] ], - p2p_actors_and_custom_comm: Optional[ + default_actors_and_custom_comm: Optional[ Tuple[FrozenSet["ray.actor.ActorHandle"], Optional[Communicator]] ], ) -> None: @@ -151,14 +151,14 @@ def check_init( set(self.ids_to_actors_and_custom_comms.values()) == actors_and_custom_comms ) - nccl_group_id_p2p_id = compiled_dag._default_communicator_id - if p2p_actors_and_custom_comm is None: - assert nccl_group_id_p2p_id is None + default_communicator_id = compiled_dag._default_communicator_id + if default_actors_and_custom_comm is None: + assert default_communicator_id is None else: - assert nccl_group_id_p2p_id + assert default_communicator_id assert ( - self.ids_to_actors_and_custom_comms[nccl_group_id_p2p_id] - == p2p_actors_and_custom_comm + self.ids_to_actors_and_custom_comms[default_communicator_id] + == default_actors_and_custom_comm ) def check_teardown(self, nccl_group_ids: List[str]) -> None: @@ -212,7 +212,7 @@ def check_nccl_group_init( actors_and_custom_comms: Set[ Tuple[FrozenSet["ray.actor.ActorHandle"], Optional[Communicator]] ], - p2p_actors_and_custom_comm: Optional[ + default_actors_and_custom_comm: Optional[ Tuple[FrozenSet["ray.actor.ActorHandle"], Optional[Communicator]] ] = None, ) -> "ray.dag.CompiledDAG": @@ -230,7 +230,7 @@ def check_nccl_group_init( mock_nccl_group_set.check_init( compiled_dag, actors_and_custom_comms, - p2p_actors_and_custom_comm, + default_actors_and_custom_comm, ) return compiled_dag, mock_nccl_group_set From 4b9ce1cc5a70762c7c7247208d818eab6d0a0032 Mon Sep 17 00:00:00 2001 From: Rui Qiao Date: Fri, 31 Jan 2025 00:37:17 +0000 Subject: [PATCH 11/29] up Signed-off-by: Rui Qiao --- python/ray/dag/compiled_dag_node.py | 147 +++++++++++++---------- python/ray/dag/dag_node.py | 2 +- python/ray/util/collective/collective.py | 4 +- 3 files changed, 88 insertions(+), 65 deletions(-) diff --git a/python/ray/dag/compiled_dag_node.py b/python/ray/dag/compiled_dag_node.py index 5e5ac51811a23..b8a11bf540ba0 100644 --- a/python/ray/dag/compiled_dag_node.py +++ b/python/ray/dag/compiled_dag_node.py @@ -865,6 +865,16 @@ def __init__( self._default_communicator: Optional[Communicator] = default_communicator self._default_communicator_id: Optional[str] = None + self._pending_p2p_communicator_actors: Set["ray.actor.ActorHandle"] = set() + self._pending_p2p_communicator_dag_nodes: Set["ray.dag.DAGNode"] = set() + self._pending_collective_ops: Set[ + "ray.dag.collective_node._CollectiveOperation" + ] = set() + self._communicator_to_type_hints: Dict[ + Optional[Communicator], + Set["ray.experimental.channel.torch_tensor_type.TorchTensorType"], + ] = defaultdict(set) + self._default_type_hint: ChannelOutputType = SharedMemoryType( buffer_size_bytes=self._buffer_size_bytes, # We conservatively set num_shm_buffers to _max_inflight_executions. @@ -1004,14 +1014,6 @@ def _preprocess(self) -> None: self.input_task_idx, self.output_task_idx = None, None - communicator_to_actors: Dict[ - Optional[Communicator], Set["ray.actor.ActorHandle"] - ] = defaultdict(set) - communicator_to_type_hints: Dict[ - Optional[Communicator], - Set["ray.experimental.channel.torch_tensor_type.TorchTensorType"], - ] = defaultdict(set) - input_attributes: Set[str] = set() # Find the input node and input attribute nodes in the DAG. for idx, task in self.idx_to_task.items(): @@ -1102,17 +1104,13 @@ def _preprocess(self) -> None: # Collect actors for NCCL P2P methods. if dag_node.type_hint.requires_nccl(): - communicator = self._select_communicator(dag_node) - communicator_to_actors[communicator].add(actor_handle) - communicator_to_type_hints[communicator].add(dag_node.type_hint) + communicator = self._track_communicator_usage( + dag_node, actor_handle + ) # Collect NCCL collective operations. if isinstance(dag_node, CollectiveOutputNode): - communicator = self._select_communicator(dag_node, collective=True) - communicator_to_actors[communicator].update( - dag_node.collective_op.actor_handles - ) - communicator_to_type_hints[communicator].add( - dag_node.collective_op.type_hint + self._track_communicator_usage( + dag_node, actor_handle, collective_op=True ) assert not self._overlap_gpu_communication, ( "Currently, the overlap_gpu_communication option is not " @@ -1197,10 +1195,9 @@ def _preprocess(self) -> None: upstream_task.downstream_task_idxs[task_idx] = downstream_actor_handle if upstream_task.dag_node.type_hint.requires_nccl(): - communicator = self._select_communicator(upstream_task.dag_node) - communicator_to_actors[communicator].add(downstream_actor_handle) - communicator_to_type_hints[communicator].add( - upstream_task.dag_node.type_hint + self._track_communicator_usage( + upstream_task.dag_node, + downstream_actor_handle, ) # Check that all specified input attributes, e.g., InputNode()["x"], # are used in the DAG. @@ -1208,11 +1205,9 @@ def _preprocess(self) -> None: self._check_leaf_nodes() - self._resolve_auto_transport( - auto_transport_tasks, communicator_to_actors, communicator_to_type_hints - ) + self._resolve_auto_transport(auto_transport_tasks) - self._init_communicators(communicator_to_actors, communicator_to_type_hints) + self._init_communicators() if direct_input: self._input_num_positional_args = 1 @@ -1222,50 +1217,87 @@ def _preprocess(self) -> None: self._input_num_positional_args = max(input_positional_args) + 1 self._input_kwargs = tuple(input_kwargs) - def _init_communicators( - self, - communicator_to_actors: Dict[ - Optional[Communicator], Set["ray.actor.ActorHandle"] - ], - communicator_to_type_hints: Dict[ - Optional[Communicator], - Set["ray.experimental.channel.torch_tensor_type.TorchTensorType"], - ], - ) -> None: + def _init_communicators(self) -> None: """ Initialize communicators for the DAG. """ - for custom_communicator, actors in communicator_to_actors.items(): + for communicator, actors in self._communicator_to_actors.items(): if None in actors: raise ValueError("Driver cannot participate in the NCCL group.") communicator_id = _init_communicator( list(actors), - custom_communicator, + communicator, self._overlap_gpu_communication, ) - for type_hint in communicator_to_type_hints[custom_communicator]: + for type_hint in self._communicator_to_type_hints[communicator]: type_hint.set_communicator_id(communicator_id) - if custom_communicator == self._default_communicator: + if communicator == self._default_communicator: self._default_communicator_id = communicator_id - def _select_communicator( - self, dag_node: "ray.dag.DAGNode", collective: bool = False - ) -> Optional[Communicator]: + actors_to_created_communicator_id = Dict[Set["ray.actor.ActorHandle"], str] + for collective_op in self._pending_collective_ops: + if not self._create_default_communicator: + raise ValueError( + "Communicator creation is not allowed for collective operations." + ) + actors = collective_op.actor_handles + if frozenset(actors) in actors_to_created_communicator_id: + communicator_id = actors_to_created_communicator_id[frozenset(actors)] + else: + communicator_id = _init_communicator( + actors, + None, + self._overlap_gpu_communication, + ) + actors_to_created_communicator_id[frozenset(actors)] = communicator_id + collective_op.set_communicator_id(communicator_id) + + if self._pending_p2p_communicator_actors in actors_to_created_communicator_id: + p2p_communicator_id = actors_to_created_communicator_id[ + frozenset(self._pending_p2p_communicator_actors) + ] + else: + p2p_communicator_id = _init_communicator( + list(self._pending_p2p_communicator_actors), + None, + self._overlap_gpu_communication, + ) + for dag_node in self._pending_p2p_communicator_dag_nodes: + dag_node.type_hint.set_communicator_id(p2p_communicator_id) + + def _track_communicator_usage( + self, + dag_node: "ray.dag.DAGNode", + actors: Set["ray.actor.ActorHandle"], + collective_op: bool = False, + ) -> None: """ If custom_communicator is provided (i.e., not None), use it. Otherwise, use the default communicator. """ - if collective: - custom_communicator = ( - dag_node.collective_op.type_hint.get_custom_communicator() - ) + custom_communicator = dag_node.type_hint.get_custom_communicator() + communicator = ( + self._get_default_communicator(dag_node) + if custom_communicator is None + else custom_communicator + ) + if communicator is None: + if collective_op: + self._pending_collective_ops.add(dag_node) + else: + self._pending_p2p_communicator_actors.update(actors) else: - custom_communicator = dag_node.type_hint.get_custom_communicator() - if custom_communicator is not None: - return custom_communicator - return self._get_default_communicator(dag_node) + self._communicator_to_type_hints[communicator].update(dag_node.type_hint) + if collective_op: + if communicator.get_actor_handles() != actors: + raise ValueError( + "Actor sets are different for collective operation and communicator." + ) + else: + if actors not in communicator.get_actor_handles(): + raise ValueError("Actor is not in the communicator group.") def _get_default_communicator( self, @@ -1290,13 +1322,6 @@ def _get_default_communicator( def _resolve_auto_transport( self, auto_transport_tasks: Set["CompiledTask"], - communicator_to_actors: Dict[ - Optional[Communicator], Set["ray.actor.ActorHandle"] - ], - communicator_to_type_hints: Dict[ - Optional[Communicator], - Set["ray.experimental.channel.torch_tensor_type.TorchTensorType"], - ], ) -> None: """ Resolve the auto transport type hint for the DAG. @@ -1306,7 +1331,6 @@ def _resolve_auto_transport( # This is needed so that the NCCL group can be initialized for these # actors that use NCCL. for task in auto_transport_tasks: - default_communicator = self._get_default_communicator(task.dag_node) writer = task.dag_node._get_actor_handle() readers = task.downstream_task_idxs.values() writer_and_node = (writer, self._get_node_id(writer)) @@ -1322,10 +1346,9 @@ def _resolve_auto_transport( reader_and_node_list, ) if task.dag_node.type_hint.requires_nccl(): - communicator_to_actors[default_communicator].add(writer) - communicator_to_actors[default_communicator].update(readers) - communicator_to_type_hints[default_communicator].add( - task.dag_node.type_hint + self._track_communicator_usage( + task.dag_node, + set(readers + [writer]), ) def _check_leaf_nodes(self) -> None: diff --git a/python/ray/dag/dag_node.py b/python/ray/dag/dag_node.py index c7b51a63619d6..7358a0f05b0b3 100644 --- a/python/ray/dag/dag_node.py +++ b/python/ray/dag/dag_node.py @@ -228,7 +228,7 @@ def experimental_compile( enable_asyncio: bool = False, _max_inflight_executions: Optional[int] = None, _overlap_gpu_communication: Optional[bool] = None, - _default_communicator: Optional[Union[Communicator, str]] = None, + _default_communicator: Optional[Union[Communicator, str]] = "create", ) -> "ray.dag.CompiledDAG": """Compile an accelerated execution path for this DAG. diff --git a/python/ray/util/collective/collective.py b/python/ray/util/collective/collective.py index 9399cdb88c016..c9b7759a4f98d 100644 --- a/python/ray/util/collective/collective.py +++ b/python/ray/util/collective/collective.py @@ -558,8 +558,8 @@ def send_multigpu( ): """Send a tensor to a remote GPU synchronously. - The function asssume each process owns >1 GPUs, and the sender - process and receiver process has equal nubmer of GPUs. + The function assumes each process owns >1 GPUs, and the sender + process and receiver process has equal number of GPUs. Args: tensor: the tensor to send, located on a GPU. From 028957cf689580b35235304df0879de7ad3eca9b Mon Sep 17 00:00:00 2001 From: Rui Qiao Date: Fri, 31 Jan 2025 00:52:43 +0000 Subject: [PATCH 12/29] up Signed-off-by: Rui Qiao --- python/ray/dag/compiled_dag_node.py | 48 +++++++++++++++++------------ 1 file changed, 28 insertions(+), 20 deletions(-) diff --git a/python/ray/dag/compiled_dag_node.py b/python/ray/dag/compiled_dag_node.py index b8a11bf540ba0..32e439b5a5724 100644 --- a/python/ray/dag/compiled_dag_node.py +++ b/python/ray/dag/compiled_dag_node.py @@ -7,6 +7,7 @@ TYPE_CHECKING, Any, Dict, + FrozenSet, List, Tuple, Union, @@ -797,7 +798,7 @@ def __init__( enable_asyncio: bool = False, max_inflight_executions: Optional[int] = None, overlap_gpu_communication: Optional[bool] = None, - default_communicator: Optional[Union[Communicator, str]] = None, + default_communicator: Optional[Union[Communicator, str]] = "create", ): """ Args: @@ -865,6 +866,9 @@ def __init__( self._default_communicator: Optional[Communicator] = default_communicator self._default_communicator_id: Optional[str] = None + self._provided_communicators: Set[Communicator] = set() + self._created_communicators: Set[Communicator] = set() + self._pending_p2p_communicator_actors: Set["ray.actor.ActorHandle"] = set() self._pending_p2p_communicator_dag_nodes: Set["ray.dag.DAGNode"] = set() self._pending_collective_ops: Set[ @@ -1221,12 +1225,9 @@ def _init_communicators(self) -> None: """ Initialize communicators for the DAG. """ - for communicator, actors in self._communicator_to_actors.items(): - if None in actors: - raise ValueError("Driver cannot participate in the NCCL group.") - + for communicator in self._provided_communicators: communicator_id = _init_communicator( - list(actors), + communicator.get_actor_handles(), communicator, self._overlap_gpu_communication, ) @@ -1235,7 +1236,9 @@ def _init_communicators(self) -> None: if communicator == self._default_communicator: self._default_communicator_id = communicator_id - actors_to_created_communicator_id = Dict[Set["ray.actor.ActorHandle"], str] + actors_to_created_communicator_id: Dict[ + FrozenSet["ray.actor.ActorHandle"], str + ] = {} for collective_op in self._pending_collective_ops: if not self._create_default_communicator: raise ValueError( @@ -1253,18 +1256,22 @@ def _init_communicators(self) -> None: actors_to_created_communicator_id[frozenset(actors)] = communicator_id collective_op.set_communicator_id(communicator_id) - if self._pending_p2p_communicator_actors in actors_to_created_communicator_id: - p2p_communicator_id = actors_to_created_communicator_id[ + if self._pending_p2p_communicator_actors: + if ( frozenset(self._pending_p2p_communicator_actors) - ] - else: - p2p_communicator_id = _init_communicator( - list(self._pending_p2p_communicator_actors), - None, - self._overlap_gpu_communication, - ) - for dag_node in self._pending_p2p_communicator_dag_nodes: - dag_node.type_hint.set_communicator_id(p2p_communicator_id) + in actors_to_created_communicator_id + ): + p2p_communicator_id = actors_to_created_communicator_id[ + frozenset(self._pending_p2p_communicator_actors) + ] + else: + p2p_communicator_id = _init_communicator( + list(self._pending_p2p_communicator_actors), + None, + self._overlap_gpu_communication, + ) + for dag_node in self._pending_p2p_communicator_dag_nodes: + dag_node.type_hint.set_communicator_id(p2p_communicator_id) def _track_communicator_usage( self, @@ -1276,7 +1283,8 @@ def _track_communicator_usage( If custom_communicator is provided (i.e., not None), use it. Otherwise, use the default communicator. """ - + if None in actors: + raise ValueError("Driver cannot participate in the NCCL group.") custom_communicator = dag_node.type_hint.get_custom_communicator() communicator = ( self._get_default_communicator(dag_node) @@ -2992,7 +3000,7 @@ def build_compiled_dag_from_ray_dag( enable_asyncio: bool = False, max_inflight_executions: Optional[int] = None, overlap_gpu_communication: Optional[bool] = None, - default_communicator: Optional[Union[Communicator, str]] = None, + default_communicator: Optional[Union[Communicator, str]] = "create", ) -> "CompiledDAG": compiled_dag = CompiledDAG( submit_timeout, From 312376808082c84a0951eda60d14f44b12dd7907 Mon Sep 17 00:00:00 2001 From: Rui Qiao Date: Fri, 31 Jan 2025 01:16:00 +0000 Subject: [PATCH 13/29] up Signed-off-by: Rui Qiao --- python/ray/dag/compiled_dag_node.py | 26 ++++++++++++++++---------- 1 file changed, 16 insertions(+), 10 deletions(-) diff --git a/python/ray/dag/compiled_dag_node.py b/python/ray/dag/compiled_dag_node.py index 32e439b5a5724..6da73f25ab263 100644 --- a/python/ray/dag/compiled_dag_node.py +++ b/python/ray/dag/compiled_dag_node.py @@ -867,7 +867,9 @@ def __init__( self._default_communicator_id: Optional[str] = None self._provided_communicators: Set[Communicator] = set() - self._created_communicators: Set[Communicator] = set() + self._actors_to_created_communicator_id: Dict[ + FrozenSet["ray.actor.ActorHandle"], str + ] = {} self._pending_p2p_communicator_actors: Set["ray.actor.ActorHandle"] = set() self._pending_p2p_communicator_dag_nodes: Set["ray.dag.DAGNode"] = set() @@ -1236,32 +1238,33 @@ def _init_communicators(self) -> None: if communicator == self._default_communicator: self._default_communicator_id = communicator_id - actors_to_created_communicator_id: Dict[ - FrozenSet["ray.actor.ActorHandle"], str - ] = {} for collective_op in self._pending_collective_ops: if not self._create_default_communicator: raise ValueError( "Communicator creation is not allowed for collective operations." ) actors = collective_op.actor_handles - if frozenset(actors) in actors_to_created_communicator_id: - communicator_id = actors_to_created_communicator_id[frozenset(actors)] + if frozenset(actors) in self._actors_to_created_communicator_id: + communicator_id = self._actors_to_created_communicator_id[ + frozenset(actors) + ] else: communicator_id = _init_communicator( actors, None, self._overlap_gpu_communication, ) - actors_to_created_communicator_id[frozenset(actors)] = communicator_id + self._actors_to_created_communicator_id[ + frozenset(actors) + ] = communicator_id collective_op.set_communicator_id(communicator_id) if self._pending_p2p_communicator_actors: if ( frozenset(self._pending_p2p_communicator_actors) - in actors_to_created_communicator_id + in self._actors_to_created_communicator_id ): - p2p_communicator_id = actors_to_created_communicator_id[ + p2p_communicator_id = self._actors_to_created_communicator_id[ frozenset(self._pending_p2p_communicator_actors) ] else: @@ -2052,7 +2055,10 @@ def teardown(self, kill_actors: bool = False): logger.exception("Error cancelling worker task") pass - _destroy_communicator(outer._default_communicator_id) + for ( + communicator_id + ) in self._actors_to_created_communicator_id.values(): + _destroy_communicator(communicator_id) logger.info("Waiting for worker tasks to exit") self.wait_teardown(kill_actors=kill_actors) From 2586df217ceaf8779d0903a6029f8eb008d5e346 Mon Sep 17 00:00:00 2001 From: Rui Qiao Date: Fri, 31 Jan 2025 04:33:23 +0000 Subject: [PATCH 14/29] up Signed-off-by: Rui Qiao --- python/ray/dag/compiled_dag_node.py | 23 +++++++++++------------ 1 file changed, 11 insertions(+), 12 deletions(-) diff --git a/python/ray/dag/compiled_dag_node.py b/python/ray/dag/compiled_dag_node.py index 6da73f25ab263..8a25a45a8a0c5 100644 --- a/python/ray/dag/compiled_dag_node.py +++ b/python/ray/dag/compiled_dag_node.py @@ -877,7 +877,7 @@ def __init__( "ray.dag.collective_node._CollectiveOperation" ] = set() self._communicator_to_type_hints: Dict[ - Optional[Communicator], + Communicator, Set["ray.experimental.channel.torch_tensor_type.TorchTensorType"], ] = defaultdict(set) @@ -1110,13 +1110,11 @@ def _preprocess(self) -> None: # Collect actors for NCCL P2P methods. if dag_node.type_hint.requires_nccl(): - communicator = self._track_communicator_usage( - dag_node, actor_handle - ) + self._track_communicator_usage(dag_node, {actor_handle}) # Collect NCCL collective operations. if isinstance(dag_node, CollectiveOutputNode): self._track_communicator_usage( - dag_node, actor_handle, collective_op=True + dag_node, {actor_handle}, collective_op=True ) assert not self._overlap_gpu_communication, ( "Currently, the overlap_gpu_communication option is not " @@ -1203,7 +1201,7 @@ def _preprocess(self) -> None: if upstream_task.dag_node.type_hint.requires_nccl(): self._track_communicator_usage( upstream_task.dag_node, - downstream_actor_handle, + {downstream_actor_handle}, ) # Check that all specified input attributes, e.g., InputNode()["x"], # are used in the DAG. @@ -1227,13 +1225,13 @@ def _init_communicators(self) -> None: """ Initialize communicators for the DAG. """ - for communicator in self._provided_communicators: + for communicator, type_hints in self._communicator_to_type_hints.items(): communicator_id = _init_communicator( communicator.get_actor_handles(), communicator, self._overlap_gpu_communication, ) - for type_hint in self._communicator_to_type_hints[communicator]: + for type_hint in type_hints: type_hint.set_communicator_id(communicator_id) if communicator == self._default_communicator: self._default_communicator_id = communicator_id @@ -1298,16 +1296,17 @@ def _track_communicator_usage( if collective_op: self._pending_collective_ops.add(dag_node) else: + self._pending_p2p_communicator_dag_nodes.add(dag_node) self._pending_p2p_communicator_actors.update(actors) else: - self._communicator_to_type_hints[communicator].update(dag_node.type_hint) + self._communicator_to_type_hints[communicator].add(dag_node.type_hint) if collective_op: if communicator.get_actor_handles() != actors: raise ValueError( "Actor sets are different for collective operation and communicator." ) else: - if actors not in communicator.get_actor_handles(): + if not actors.issubset(communicator.get_actor_handles()): raise ValueError("Actor is not in the communicator group.") def _get_default_communicator( @@ -1359,7 +1358,7 @@ def _resolve_auto_transport( if task.dag_node.type_hint.requires_nccl(): self._track_communicator_usage( task.dag_node, - set(readers + [writer]), + set(readers).union({writer}), ) def _check_leaf_nodes(self) -> None: @@ -2057,7 +2056,7 @@ def teardown(self, kill_actors: bool = False): for ( communicator_id - ) in self._actors_to_created_communicator_id.values(): + ) in outer._actors_to_created_communicator_id.values(): _destroy_communicator(communicator_id) logger.info("Waiting for worker tasks to exit") From a753ca230490d1b0115e8eff496e149414ff1213 Mon Sep 17 00:00:00 2001 From: Rui Qiao Date: Fri, 31 Jan 2025 05:33:47 +0000 Subject: [PATCH 15/29] up Signed-off-by: Rui Qiao --- python/ray/dag/compiled_dag_node.py | 34 +++++++++++------------ python/ray/experimental/channel/common.py | 4 +-- 2 files changed, 18 insertions(+), 20 deletions(-) diff --git a/python/ray/dag/compiled_dag_node.py b/python/ray/dag/compiled_dag_node.py index 8a25a45a8a0c5..4417280018482 100644 --- a/python/ray/dag/compiled_dag_node.py +++ b/python/ray/dag/compiled_dag_node.py @@ -1114,7 +1114,9 @@ def _preprocess(self) -> None: # Collect NCCL collective operations. if isinstance(dag_node, CollectiveOutputNode): self._track_communicator_usage( - dag_node, {actor_handle}, collective_op=True + dag_node, + dag_node._collective_op.actor_handles, + collective_op=True, ) assert not self._overlap_gpu_communication, ( "Currently, the overlap_gpu_communication option is not " @@ -1241,21 +1243,12 @@ def _init_communicators(self) -> None: raise ValueError( "Communicator creation is not allowed for collective operations." ) - actors = collective_op.actor_handles - if frozenset(actors) in self._actors_to_created_communicator_id: - communicator_id = self._actors_to_created_communicator_id[ - frozenset(actors) - ] - else: - communicator_id = _init_communicator( - actors, - None, - self._overlap_gpu_communication, - ) - self._actors_to_created_communicator_id[ - frozenset(actors) - ] = communicator_id - collective_op.set_communicator_id(communicator_id) + actors = frozenset(collective_op.actor_handles) + communicator_id = collective_op.init_communicator( + self._actors_to_created_communicator_id.get(actors, None) + ) + if actors not in self._actors_to_created_communicator_id: + self._actors_to_created_communicator_id[actors] = communicator_id if self._pending_p2p_communicator_actors: if ( @@ -1286,7 +1279,12 @@ def _track_communicator_usage( """ if None in actors: raise ValueError("Driver cannot participate in the NCCL group.") - custom_communicator = dag_node.type_hint.get_custom_communicator() + if collective_op: + custom_communicator = ( + dag_node._collective_op.type_hint.get_custom_communicator() + ) + else: + custom_communicator = dag_node.type_hint.get_custom_communicator() communicator = ( self._get_default_communicator(dag_node) if custom_communicator is None @@ -1294,7 +1292,7 @@ def _track_communicator_usage( ) if communicator is None: if collective_op: - self._pending_collective_ops.add(dag_node) + self._pending_collective_ops.add(dag_node._collective_op) else: self._pending_p2p_communicator_dag_nodes.add(dag_node) self._pending_p2p_communicator_actors.update(actors) diff --git a/python/ray/experimental/channel/common.py b/python/ray/experimental/channel/common.py index c395422d5daef..1f83637c59595 100644 --- a/python/ray/experimental/channel/common.py +++ b/python/ray/experimental/channel/common.py @@ -109,8 +109,8 @@ def get_custom_communicator(self) -> Optional[Communicator]: """ Return the custom NCCL group if one is specified. """ - if self._contains_type is not None: - return self._contains_type.get_custom_nccl_group() + if hasattr(self, "contains_type") and self.contains_type is not None: + return self.contains_type.get_custom_nccl_group() return None def set_communicator_id(self, group_id: str) -> None: From fa98929646b8ff48a6d399cba9988faa0727a701 Mon Sep 17 00:00:00 2001 From: Rui Qiao Date: Fri, 31 Jan 2025 05:42:41 +0000 Subject: [PATCH 16/29] up Signed-off-by: Rui Qiao --- .../experimental/test_torch_tensor_dag.py | 47 +++++++++---------- 1 file changed, 22 insertions(+), 25 deletions(-) diff --git a/python/ray/dag/tests/experimental/test_torch_tensor_dag.py b/python/ray/dag/tests/experimental/test_torch_tensor_dag.py index 86294113b8496..dc06ea3d3d51b 100644 --- a/python/ray/dag/tests/experimental/test_torch_tensor_dag.py +++ b/python/ray/dag/tests/experimental/test_torch_tensor_dag.py @@ -162,7 +162,7 @@ def test_torch_tensor_p2p(ray_start_regular): dag = dag.with_tensor_transport() dag = receiver.recv.bind(dag) - compiled_dag = dag.experimental_compile(_default_communicator="create") + compiled_dag = dag.experimental_compile() for i in range(3): ref = compiled_dag.execute(i, shape=shape, dtype=dtype) assert ray.get(ref) == (i, shape, dtype) @@ -241,7 +241,6 @@ def test_torch_tensor_nccl( compiled_dag = dag.experimental_compile( _overlap_gpu_communication=overlap_gpu_communication, - _default_communicator="create", ) # Test that we can pass different shapes and data. @@ -256,7 +255,7 @@ def test_torch_tensor_nccl( dag = dag.with_tensor_transport(transport="nccl") dag = receiver.recv.bind(dag) - compiled_dag = dag.experimental_compile(_default_communicator="create") + compiled_dag = dag.experimental_compile() # Test that we can pass different shapes and data. for i in range(3): @@ -291,7 +290,7 @@ def test_torch_tensor_auto(ray_start_regular, num_gpus): data_annotated = data.with_tensor_transport(transport="auto") dag = receiver.recv.bind(data_annotated) - compiled_dag = dag.experimental_compile(_default_communicator="create") + compiled_dag = dag.experimental_compile() assert isinstance(data_annotated.type_hint, TorchTensorType) assert data_annotated.type_hint.transport == expected_transport @@ -307,7 +306,7 @@ def test_torch_tensor_auto(ray_start_regular, num_gpus): dag = dag.with_tensor_transport(transport="auto") dag = receiver.recv.bind(dag) - compiled_dag = dag.experimental_compile(_default_communicator="create") + compiled_dag = dag.experimental_compile() assert isinstance(data_annotated.type_hint, TorchTensorType) assert data_annotated.type_hint.transport == expected_transport @@ -353,7 +352,6 @@ def test_torch_tensor_nccl_overlap_timed(ray_start_regular, overlap_gpu_communic # Test normal execution. compiled_dag = dag.experimental_compile( _overlap_gpu_communication=overlap_gpu_communication, - _default_communicator="create", ) start = time.monotonic() @@ -399,7 +397,7 @@ def test_torch_tensor_nccl_disallows_driver(ray_start_regular): "via NCCL because the driver cannot participate in the NCCL group" ), ): - dag.experimental_compile(_default_communicator="create") + dag.experimental_compile() # Test that OutputNode cannot cannot participate in the NCCL group. with InputNode() as inp: @@ -410,7 +408,7 @@ def test_torch_tensor_nccl_disallows_driver(ray_start_regular): ValueError, match=(r"Driver cannot participate in the NCCL group\."), ): - dag.experimental_compile(_default_communicator="create") + dag.experimental_compile() @pytest.mark.parametrize("ray_start_regular", [{"num_cpus": 4}], indirect=True) @@ -678,7 +676,7 @@ def test_torch_tensor_nccl_static_shape(ray_start_regular): dag = dag.with_tensor_transport(transport="nccl", _static_shape=True) dag = receiver.recv.bind(dag) - compiled_dag = dag.experimental_compile(_default_communicator="create") + compiled_dag = dag.experimental_compile() # Test that the DAG works as long as we send the same shape. shape = (10,) @@ -716,7 +714,7 @@ def test_torch_tensor_nccl_direct_return(ray_start_regular): dag = dag.with_tensor_transport(transport="nccl", _direct_return=True) dag = receiver.recv.bind(dag) - compiled_dag = dag.experimental_compile(_default_communicator="create") + compiled_dag = dag.experimental_compile() for i in range(3): shape = (10 * (i + 1),) @@ -757,7 +755,7 @@ def test_torch_tensor_nccl_nested_dynamic(ray_start_regular): dag = dag.with_tensor_transport(transport="nccl") dag = receiver.recv_dict.bind(dag) - compiled_dag = dag.experimental_compile(_default_communicator="create") + compiled_dag = dag.experimental_compile() for i in range(3): dtype = torch.float16 @@ -803,7 +801,6 @@ def test_torch_tensor_exceptions( compiled_dag = dag.experimental_compile( _overlap_gpu_communication=overlap_gpu_communication, - _default_communicator="create", ) shape = (10,) @@ -884,7 +881,7 @@ def test_torch_tensor_exceptions2( ) dag = receiver.recv.bind(dag) - compiled_dag = dag.experimental_compile(_default_communicator="create") + compiled_dag = dag.experimental_compile() ref = compiled_dag.execute(1) with pytest.raises( @@ -931,7 +928,7 @@ def test_torch_tensor_nccl_all_reduce(ray_start_regular): ] dag = MultiOutputNode(recvs) - compiled_dag = dag.experimental_compile(_default_communicator="create") + compiled_dag = dag.experimental_compile() for i in range(3): i += 1 @@ -975,7 +972,7 @@ def test_torch_tensor_nccl_all_reduce_get_partial(ray_start_regular): tensor = workers[1].recv_tensor.bind(collectives[0]) dag = MultiOutputNode([recv, tensor, collectives[1]]) - compiled_dag = dag.experimental_compile(_default_communicator="create") + compiled_dag = dag.experimental_compile() for i in range(3): ref = compiled_dag.execute( @@ -1021,7 +1018,7 @@ def test_torch_tensor_nccl_all_reduce_wrong_shape(ray_start_regular): ] dag = MultiOutputNode(recvs) - compiled_dag = dag.experimental_compile(_default_communicator="create") + compiled_dag = dag.experimental_compile() ref = compiled_dag.execute([((20,), dtype, idx + 1) for idx in range(num_workers)]) reduced_val = (1 + num_workers) * num_workers / 2 @@ -1213,7 +1210,7 @@ def test_torch_tensor_nccl_all_reduce_scheduling(ray_start_regular): recv = workers[1].recv.bind(t) dag = MultiOutputNode([collectives[0], collectives[1], recv]) - compiled_dag = dag.experimental_compile(_default_communicator="create") + compiled_dag = dag.experimental_compile() value = 10 ref = compiled_dag.execute(value) @@ -1248,7 +1245,7 @@ def test_nccl_all_reduce_with_class_method_output_node(ray_start_regular): tensors = collective.allreduce.bind([t1, t4], ReduceOp.SUM) dag = MultiOutputNode(tensors + [t2, t3]) - compiled_dag = dag.experimental_compile(_default_communicator="create") + compiled_dag = dag.experimental_compile() t1 = torch.tensor([1], device="cuda") t2 = torch.tensor([2], device="cuda") @@ -1289,7 +1286,7 @@ def recv(self, tensor): torch_inp = inp.with_tensor_transport() dag = receiver.recv.bind(torch_inp) - compiled_dag = dag.experimental_compile(_default_communicator="create") + compiled_dag = dag.experimental_compile() ref = compiled_dag.execute(torch.tensor([1])) assert ray.get(ref) == 1 # This should timeout because actor shouldn't print anything. @@ -1327,7 +1324,7 @@ def test_input_node_without_type_hint(self, ray_start_regular, tensor_device): with InputNode() as inp: dag = worker.echo.bind(inp) - compiled_dag = dag.experimental_compile(_default_communicator="create") + compiled_dag = dag.experimental_compile() tensor = torch.tensor([5]) if tensor_device == "cuda": tensor = tensor.cuda() @@ -1362,7 +1359,7 @@ def test_input_node_with_tensor_transport(self, ray_start_regular, tensor_device with InputNode() as inp: dag = worker.echo.bind(inp.with_tensor_transport()) - compiled_dag = dag.experimental_compile(_default_communicator="create") + compiled_dag = dag.experimental_compile() cpu_tensor = torch.tensor([1]) input_tensor = cpu_tensor if tensor_device == "cuda": @@ -1413,7 +1410,7 @@ def test_input_attr_nodes_with_all_tensor_type_hint(self, ray_start_regular): branch2 = worker2.echo.bind(dag) dag = MultiOutputNode([branch1, branch2]) - compiled_dag = dag.experimental_compile(_default_communicator="create") + compiled_dag = dag.experimental_compile() cpu_tensor_1 = torch.tensor([1]) cpu_tensor_2 = torch.tensor([2]) ref = compiled_dag.execute(cpu_tensor_1, cpu_tensor_2) @@ -1517,7 +1514,7 @@ def test_torch_nccl_channel_with_local_reader(ray_start_regular): branch1 = w1.recv.bind(dag) branch2 = w2.recv.bind(dag) dag = MultiOutputNode([branch1, branch2]) - compiled_dag = dag.experimental_compile(_default_communicator="create") + compiled_dag = dag.experimental_compile() for i in range(3): ref = compiled_dag.execute(i, shape=shape, dtype=dtype) assert ray.get(ref) == [(i, shape, dtype), (i, shape, dtype)] @@ -1553,7 +1550,7 @@ def test_torch_nccl_channel_with_two_local_readers(ray_start_regular): branch2 = w1.recv.bind(dag) branch3 = w2.recv.bind(dag) dag = MultiOutputNode([branch1, branch2, branch3]) - compiled_dag = dag.experimental_compile(_default_communicator="create") + compiled_dag = dag.experimental_compile() for i in range(3): ref = compiled_dag.execute(i, shape=shape, dtype=dtype) assert ray.get(ref) == [(i, shape, dtype), (i, shape, dtype), (i, shape, dtype)] @@ -1588,7 +1585,7 @@ def test_torch_nccl_channel_with_all_local_readers(ray_start_regular): "is not needed. No NCCL channel will be created." ), ): - dag.experimental_compile(_default_communicator="create") + dag.experimental_compile() if __name__ == "__main__": From 5e92d38bcf2b2b9917aa72879f6ffcd83d1f89a0 Mon Sep 17 00:00:00 2001 From: Rui Qiao Date: Fri, 31 Jan 2025 05:59:23 +0000 Subject: [PATCH 17/29] up Signed-off-by: Rui Qiao --- python/ray/dag/compiled_dag_node.py | 8 +++++++- .../dag/tests/experimental/test_accelerated_dag.py | 4 ++-- .../dag/tests/experimental/test_mocked_nccl_dag.py | 12 ++++++------ python/ray/experimental/collective/conftest.py | 2 +- 4 files changed, 16 insertions(+), 10 deletions(-) diff --git a/python/ray/dag/compiled_dag_node.py b/python/ray/dag/compiled_dag_node.py index 4417280018482..7c81628a9ecbb 100644 --- a/python/ray/dag/compiled_dag_node.py +++ b/python/ray/dag/compiled_dag_node.py @@ -866,16 +866,22 @@ def __init__( self._default_communicator: Optional[Communicator] = default_communicator self._default_communicator_id: Optional[str] = None - self._provided_communicators: Set[Communicator] = set() + # Dict from set of actors to created communicator ID. + # These communicators are created by Compiled Graph, rather than passed in. + # Communicators are only created when self._create_default_communicator is True. self._actors_to_created_communicator_id: Dict[ FrozenSet["ray.actor.ActorHandle"], str ] = {} + # Set of actors involved in P2P communication but pending creation of communicator. self._pending_p2p_communicator_actors: Set["ray.actor.ActorHandle"] = set() + # Set of DAG nodes involved in P2P communication but pending creation of communicator. self._pending_p2p_communicator_dag_nodes: Set["ray.dag.DAGNode"] = set() + # Set of collective operations pending creation of communicator. self._pending_collective_ops: Set[ "ray.dag.collective_node._CollectiveOperation" ] = set() + # Dict from passed-in communicator to set of type hints that refer to it. self._communicator_to_type_hints: Dict[ Communicator, Set["ray.experimental.channel.torch_tensor_type.TorchTensorType"], diff --git a/python/ray/dag/tests/experimental/test_accelerated_dag.py b/python/ray/dag/tests/experimental/test_accelerated_dag.py index db9f4492ab298..43235a97e0991 100644 --- a/python/ray/dag/tests/experimental/test_accelerated_dag.py +++ b/python/ray/dag/tests/experimental/test_accelerated_dag.py @@ -898,7 +898,7 @@ def test_multi_args_and_torch_type(self, ray_start_regular): dag = c.collect_two.bind(branch2, branch1) dag.with_tensor_transport() - compiled_dag = dag.experimental_compile(_default_communicator="create") + compiled_dag = dag.experimental_compile() cpu_tensors = [torch.tensor([0, 0, 0, 0, 0]), torch.tensor([1, 1, 1, 1, 1])] ref = compiled_dag.execute(cpu_tensors[0], cpu_tensors[1]) @@ -2851,7 +2851,7 @@ def __init__(self): inp, ).with_tensor_transport(), ) - self._cdag = dag.experimental_compile(_default_communicator="create") + self._cdag = dag.experimental_compile() def call(self, value): return ray.get(self._cdag.execute(value)) diff --git a/python/ray/dag/tests/experimental/test_mocked_nccl_dag.py b/python/ray/dag/tests/experimental/test_mocked_nccl_dag.py index 0887c1ba85fcd..3235220f87d92 100644 --- a/python/ray/dag/tests/experimental/test_mocked_nccl_dag.py +++ b/python/ray/dag/tests/experimental/test_mocked_nccl_dag.py @@ -96,7 +96,7 @@ def test_p2p(ray_start_cluster): dag = dag.with_tensor_transport(transport="nccl") dag = receiver.recv.bind(dag) - compiled_dag = dag.experimental_compile(_default_communicator="create") + compiled_dag = dag.experimental_compile() for i in range(3): ref = compiled_dag.execute(i, shape=shape, dtype=dtype, send_as_dict=False) assert ray.get(ref) == (i, shape, dtype) @@ -149,7 +149,7 @@ def test_p2p_static_shape(ray_start_cluster, send_as_dict): dag = dag.with_tensor_transport(transport="nccl", _static_shape=True) dag = receiver.recv.bind(dag) - compiled_dag = dag.experimental_compile(_default_communicator="create") + compiled_dag = dag.experimental_compile() for i in range(3): ref = compiled_dag.execute(i, shape=shape, dtype=dtype) assert ray.get(ref) == (i, shape, dtype) @@ -189,7 +189,7 @@ def test_p2p_static_shape_error(capsys, ray_start_cluster, send_as_dict): dag = dag.with_tensor_transport(transport="nccl", _static_shape=True) dag = receiver.recv.bind(dag) - compiled_dag = dag.experimental_compile(_default_communicator="create") + compiled_dag = dag.experimental_compile() for i in range(3): ref = compiled_dag.execute(i, shape=shape, dtype=dtype) assert ray.get(ref) == (i, shape, dtype) @@ -243,7 +243,7 @@ def test_p2p_direct_return(ray_start_cluster): dag = dag.with_tensor_transport(transport="nccl", _direct_return=True) dag = receiver.recv.bind(dag) - compiled_dag = dag.experimental_compile(_default_communicator="create") + compiled_dag = dag.experimental_compile() dtype = torch.float16 for i in range(3): @@ -285,7 +285,7 @@ def test_p2p_direct_return_error(capsys, ray_start_cluster): dag = dag.with_tensor_transport(transport="nccl", _direct_return=True) dag = receiver.recv.bind(dag) - compiled_dag = dag.experimental_compile(_default_communicator="create") + compiled_dag = dag.experimental_compile() dtype = torch.float16 for i in range(3): @@ -353,7 +353,7 @@ def test_p2p_static_shape_and_direct_return( ) dag = receiver.recv.bind(dag) - compiled_dag = dag.experimental_compile(_default_communicator="create") + compiled_dag = dag.experimental_compile() shape = (10,) dtype = torch.float16 diff --git a/python/ray/experimental/collective/conftest.py b/python/ray/experimental/collective/conftest.py index a470c252bfa5f..41537dc566945 100644 --- a/python/ray/experimental/collective/conftest.py +++ b/python/ray/experimental/collective/conftest.py @@ -226,7 +226,7 @@ def check_nccl_group_init( mock_nccl_group_set, ) - compiled_dag = dag.experimental_compile(_default_communicator="create") + compiled_dag = dag.experimental_compile() mock_nccl_group_set.check_init( compiled_dag, actors_and_custom_comms, From 2d8b80d903b18f7966c978d299276ab266686655 Mon Sep 17 00:00:00 2001 From: Rui Qiao Date: Fri, 31 Jan 2025 16:53:15 +0000 Subject: [PATCH 18/29] up Signed-off-by: Rui Qiao --- python/ray/dag/compiled_dag_node.py | 41 +++++++++++-------- .../tests/experimental/test_collective_dag.py | 17 +++----- .../ray/experimental/collective/conftest.py | 36 ++-------------- 3 files changed, 33 insertions(+), 61 deletions(-) diff --git a/python/ray/dag/compiled_dag_node.py b/python/ray/dag/compiled_dag_node.py index 7c81628a9ecbb..d1b3dac8f6d87 100644 --- a/python/ray/dag/compiled_dag_node.py +++ b/python/ray/dag/compiled_dag_node.py @@ -866,6 +866,11 @@ def __init__( self._default_communicator: Optional[Communicator] = default_communicator self._default_communicator_id: Optional[str] = None + # Dict from passed-in communicator to set of type hints that refer to it. + self._communicator_to_type_hints: Dict[ + Communicator, + Set["ray.experimental.channel.torch_tensor_type.TorchTensorType"], + ] = defaultdict(set) # Dict from set of actors to created communicator ID. # These communicators are created by Compiled Graph, rather than passed in. # Communicators are only created when self._create_default_communicator is True. @@ -881,11 +886,6 @@ def __init__( self._pending_collective_ops: Set[ "ray.dag.collective_node._CollectiveOperation" ] = set() - # Dict from passed-in communicator to set of type hints that refer to it. - self._communicator_to_type_hints: Dict[ - Communicator, - Set["ray.experimental.channel.torch_tensor_type.TorchTensorType"], - ] = defaultdict(set) self._default_type_hint: ChannelOutputType = SharedMemoryType( buffer_size_bytes=self._buffer_size_bytes, @@ -1256,15 +1256,16 @@ def _init_communicators(self) -> None: if actors not in self._actors_to_created_communicator_id: self._actors_to_created_communicator_id[actors] = communicator_id + p2p_communicator_id = None if self._pending_p2p_communicator_actors: - if ( - frozenset(self._pending_p2p_communicator_actors) - in self._actors_to_created_communicator_id - ): - p2p_communicator_id = self._actors_to_created_communicator_id[ - frozenset(self._pending_p2p_communicator_actors) - ] - else: + for ( + actors, + communicator_id, + ) in self._actors_to_created_communicator_id.items(): + if self._pending_p2p_communicator_actors.issubset(actors): + p2p_communicator_id = communicator_id + break + if p2p_communicator_id is None: p2p_communicator_id = _init_communicator( list(self._pending_p2p_communicator_actors), None, @@ -1286,11 +1287,10 @@ def _track_communicator_usage( if None in actors: raise ValueError("Driver cannot participate in the NCCL group.") if collective_op: - custom_communicator = ( - dag_node._collective_op.type_hint.get_custom_communicator() - ) + type_hint = dag_node._collective_op.type_hint else: - custom_communicator = dag_node.type_hint.get_custom_communicator() + type_hint = dag_node.type_hint + custom_communicator = type_hint.get_custom_communicator() communicator = ( self._get_default_communicator(dag_node) if custom_communicator is None @@ -1303,7 +1303,12 @@ def _track_communicator_usage( self._pending_p2p_communicator_dag_nodes.add(dag_node) self._pending_p2p_communicator_actors.update(actors) else: - self._communicator_to_type_hints[communicator].add(dag_node.type_hint) + self._communicator_to_type_hints[communicator].add(type_hint) + # if isinstance(dag_node.type_hint, ChannelOutputType): + # logger.error( + # "ChannelOutputType is not allowed for tracking communicator usage.", + # stack_info=True, + # ) if collective_op: if communicator.get_actor_handles() != actors: raise ValueError( diff --git a/python/ray/dag/tests/experimental/test_collective_dag.py b/python/ray/dag/tests/experimental/test_collective_dag.py index 2609d101903d6..b519bfd87544b 100644 --- a/python/ray/dag/tests/experimental/test_collective_dag.py +++ b/python/ray/dag/tests/experimental/test_collective_dag.py @@ -73,7 +73,7 @@ def test_all_reduce_custom_comm_wrong_actors(ray_start_regular): ) def test_comm_all_reduces(ray_start_regular, monkeypatch): """ - Test the same default communicator is used for different all-reduce calls of + Test different communicators are used for different all-reduce calls of different sets of actors. """ actor_cls = CPUTorchTensorWorker.options(num_cpus=0, num_gpus=1) @@ -92,9 +92,9 @@ def test_comm_all_reduces(ray_start_regular, monkeypatch): monkeypatch, dag, { - (frozenset(workers), None), + (frozenset([workers[0]]), None), + (frozenset([workers[1]]), None), }, - (frozenset(workers), None), ) check_nccl_group_teardown(monkeypatch, compiled_dag, mock_nccl_group_set) @@ -103,10 +103,10 @@ def test_comm_all_reduces(ray_start_regular, monkeypatch): @pytest.mark.parametrize( "ray_start_regular", [{"num_cpus": 4, "num_gpus": 4}], indirect=True ) -def test_comm_all_reduces2(ray_start_regular, monkeypatch): +def test_comm_deduplicate_all_reduces(ray_start_regular, monkeypatch): """ - Test the same default communicator is used when all-reduces are called on the - same group of actors more than once. + Test communicators are deduplicated when all-reduces are called on the same + group of actors more than once. """ actor_cls = CPUTorchTensorWorker.options(num_cpus=0, num_gpus=1) @@ -123,7 +123,6 @@ def test_comm_all_reduces2(ray_start_regular, monkeypatch): monkeypatch, dag, {(frozenset(workers), None)}, - (frozenset(workers), None), ) check_nccl_group_teardown(monkeypatch, compiled_dag, mock_nccl_group_set) @@ -160,7 +159,6 @@ def test_comm_p2p_and_collective(ray_start_regular, monkeypatch): monkeypatch, dag, {(frozenset(workers), None)}, - (frozenset(workers), None), ) check_nccl_group_teardown(monkeypatch, compiled_dag, mock_nccl_group_set) @@ -178,7 +176,6 @@ def test_comm_p2p_and_collective(ray_start_regular, monkeypatch): monkeypatch, dag, {(frozenset(workers), None)}, - (frozenset(workers), None), ) check_nccl_group_teardown(monkeypatch, compiled_dag, mock_nccl_group_set) @@ -214,7 +211,6 @@ def test_custom_comm(ray_start_regular, monkeypatch): (frozenset(workers), comm), (frozenset(workers), None), }, - (frozenset(workers), None), ) check_nccl_group_teardown(monkeypatch, compiled_dag, mock_nccl_group_set) @@ -234,7 +230,6 @@ def test_custom_comm(ray_start_regular, monkeypatch): (frozenset(workers), comm), (frozenset(workers), None), }, - (frozenset(workers), None), ) check_nccl_group_teardown(monkeypatch, compiled_dag, mock_nccl_group_set) diff --git a/python/ray/experimental/collective/conftest.py b/python/ray/experimental/collective/conftest.py index 41537dc566945..676b96a7372cf 100644 --- a/python/ray/experimental/collective/conftest.py +++ b/python/ray/experimental/collective/conftest.py @@ -137,30 +137,6 @@ def mock_destroy_nccl_group(self, group_id: str) -> None: ctx.communicators[group_id].destroy() del ctx.communicators[group_id] - def check_init( - self, - compiled_dag: "ray.dag.CompiledDAG", - actors_and_custom_comms: Set[ - Tuple[FrozenSet["ray.actor.ActorHandle"], Optional[Communicator]] - ], - default_actors_and_custom_comm: Optional[ - Tuple[FrozenSet["ray.actor.ActorHandle"], Optional[Communicator]] - ], - ) -> None: - assert ( - set(self.ids_to_actors_and_custom_comms.values()) == actors_and_custom_comms - ) - - default_communicator_id = compiled_dag._default_communicator_id - if default_actors_and_custom_comm is None: - assert default_communicator_id is None - else: - assert default_communicator_id - assert ( - self.ids_to_actors_and_custom_comms[default_communicator_id] - == default_actors_and_custom_comm - ) - def check_teardown(self, nccl_group_ids: List[str]) -> None: ctx = ChannelContext.get_current() for nccl_group_id in nccl_group_ids: @@ -209,12 +185,9 @@ def mock_do_destroy_nccl_group(self, group_id: str) -> None: def check_nccl_group_init( monkeypatch, dag: "ray.dag.DAGNode", - actors_and_custom_comms: Set[ + actors_and_initialized_comms: Set[ Tuple[FrozenSet["ray.actor.ActorHandle"], Optional[Communicator]] ], - default_actors_and_custom_comm: Optional[ - Tuple[FrozenSet["ray.actor.ActorHandle"], Optional[Communicator]] - ] = None, ) -> "ray.dag.CompiledDAG": mock_nccl_group_set = MockNcclGroupSet() monkeypatch.setattr( @@ -227,10 +200,9 @@ def check_nccl_group_init( ) compiled_dag = dag.experimental_compile() - mock_nccl_group_set.check_init( - compiled_dag, - actors_and_custom_comms, - default_actors_and_custom_comm, + assert ( + set(mock_nccl_group_set.ids_to_actors_and_custom_comms.values()) + == actors_and_initialized_comms ) return compiled_dag, mock_nccl_group_set From 2379d2a89176b7bae1c81ca93695f580debd963f Mon Sep 17 00:00:00 2001 From: Rui Qiao Date: Fri, 31 Jan 2025 17:09:21 +0000 Subject: [PATCH 19/29] up Signed-off-by: Rui Qiao --- .../ray/dag/tests/experimental/test_collective_dag.py | 4 ++-- python/ray/experimental/collective/conftest.py | 11 ++++------- 2 files changed, 6 insertions(+), 9 deletions(-) diff --git a/python/ray/dag/tests/experimental/test_collective_dag.py b/python/ray/dag/tests/experimental/test_collective_dag.py index b519bfd87544b..bacf61abd94f6 100644 --- a/python/ray/dag/tests/experimental/test_collective_dag.py +++ b/python/ray/dag/tests/experimental/test_collective_dag.py @@ -131,9 +131,9 @@ def test_comm_deduplicate_all_reduces(ray_start_regular, monkeypatch): @pytest.mark.parametrize( "ray_start_regular", [{"num_cpus": 4, "num_gpus": 4}], indirect=True ) -def test_comm_p2p_and_collective(ray_start_regular, monkeypatch): +def test_comm_deduplicate_p2p_and_collective(ray_start_regular, monkeypatch): """ - Test the same default communicator is used when the collective and the P2P are on + Test communicators are deduplicated when the collective and the P2P are on the same set of actors. """ actor_cls = CPUTorchTensorWorker.options(num_cpus=0, num_gpus=1) diff --git a/python/ray/experimental/collective/conftest.py b/python/ray/experimental/collective/conftest.py index 676b96a7372cf..35ec386fc13bf 100644 --- a/python/ray/experimental/collective/conftest.py +++ b/python/ray/experimental/collective/conftest.py @@ -185,7 +185,7 @@ def mock_do_destroy_nccl_group(self, group_id: str) -> None: def check_nccl_group_init( monkeypatch, dag: "ray.dag.DAGNode", - actors_and_initialized_comms: Set[ + actors_and_custom_comms: Set[ Tuple[FrozenSet["ray.actor.ActorHandle"], Optional[Communicator]] ], ) -> "ray.dag.CompiledDAG": @@ -202,7 +202,7 @@ def check_nccl_group_init( compiled_dag = dag.experimental_compile() assert ( set(mock_nccl_group_set.ids_to_actors_and_custom_comms.values()) - == actors_and_initialized_comms + == actors_and_custom_comms ) return compiled_dag, mock_nccl_group_set @@ -218,9 +218,6 @@ def check_nccl_group_teardown( mock_nccl_group_set.mock_destroy_nccl_group, ) - if compiled_dag._create_default_communicator: - nccl_group_ids = [compiled_dag._default_communicator_id] - else: - nccl_group_ids = [] + created_communicator_ids = compiled_dag._actors_to_created_communicator_id.values() compiled_dag.teardown() - mock_nccl_group_set.check_teardown(nccl_group_ids) + mock_nccl_group_set.check_teardown(created_communicator_ids) From da10b63fb8c5c21d1cd977b7cca5d6e1b3eba598 Mon Sep 17 00:00:00 2001 From: Rui Qiao Date: Fri, 31 Jan 2025 17:40:48 +0000 Subject: [PATCH 20/29] up Signed-off-by: Rui Qiao --- python/ray/dag/compiled_dag_node.py | 60 +++++++++++++++++++++++------ python/ray/dag/dag_node.py | 7 +++- 2 files changed, 54 insertions(+), 13 deletions(-) diff --git a/python/ray/dag/compiled_dag_node.py b/python/ray/dag/compiled_dag_node.py index d1b3dac8f6d87..d050de30b64a3 100644 --- a/python/ray/dag/compiled_dag_node.py +++ b/python/ray/dag/compiled_dag_node.py @@ -823,12 +823,18 @@ def __init__( communication and computation can be overlapped, which can improve the performance of the DAG execution. If None, the default value will be used. - default_communicator: The default communicator to use to transport tensors + default_communicator: The default communicator to use to transfer + tensors. For p2p operations, this is the default communicator to use for nodes annotated with `with_tensor_transport()` and when shared memory is not the desired option (e.g., when transport="nccl", or when transport="auto" for communication between two different GPUs). If it is "create", a default communicator is created when needed. If None, an error will be thrown. All other values are invalid. + For collective operations, this is the default communicator to use + when a custom communicator is not specified. If it is "create", a + communicator is created for each collective operation and initialized + on the involved actors, or an already created communicator is reused + if the set of actors is the same. Returns: Channel: A wrapper around ray.ObjectRef. @@ -1233,6 +1239,8 @@ def _init_communicators(self) -> None: """ Initialize communicators for the DAG. """ + + # First, initialize communicators that are passed in by the user. for communicator, type_hints in self._communicator_to_type_hints.items(): communicator_id = _init_communicator( communicator.get_actor_handles(), @@ -1244,6 +1252,8 @@ def _init_communicators(self) -> None: if communicator == self._default_communicator: self._default_communicator_id = communicator_id + # Then, create communicators for collective operations. + # Reuse an already created communicator for the same set of actors. for collective_op in self._pending_collective_ops: if not self._create_default_communicator: raise ValueError( @@ -1256,6 +1266,9 @@ def _init_communicators(self) -> None: if actors not in self._actors_to_created_communicator_id: self._actors_to_created_communicator_id[actors] = communicator_id + # Finally, create a communicator for P2P operations. + # Reuse an already created collective op communicator when p2p actors + # are a subset of the actors in the collective op communicator. p2p_communicator_id = None if self._pending_p2p_communicator_actors: for ( @@ -1281,8 +1294,19 @@ def _track_communicator_usage( collective_op: bool = False, ) -> None: """ - If custom_communicator is provided (i.e., not None), use it. - Otherwise, use the default communicator. + Track the usage of a communicator. + + This method first determines the communicator to use: if a custom + communicator is specified, use it; if not and a default communicator + is available, use it; otherwise, it records necessary information to + create a new communicator later. + + This method also performs validation checks on the passed-in communicator. + + Args: + dag_node: The DAG node that uses the communicator. + actors: The set of actors that use the communicator. + collective_op: Whether the communicator is used for a collective operation. """ if None in actors: raise ValueError("Driver cannot participate in the NCCL group.") @@ -1303,30 +1327,42 @@ def _track_communicator_usage( self._pending_p2p_communicator_dag_nodes.add(dag_node) self._pending_p2p_communicator_actors.update(actors) else: - self._communicator_to_type_hints[communicator].add(type_hint) - # if isinstance(dag_node.type_hint, ChannelOutputType): - # logger.error( - # "ChannelOutputType is not allowed for tracking communicator usage.", - # stack_info=True, - # ) if collective_op: if communicator.get_actor_handles() != actors: raise ValueError( - "Actor sets are different for collective operation and communicator." + "The passed-in communicator must have the same set " + "of actors as the collective operation. " + f"The passed-in communicator has actors {communicator.get_actor_handles()} " + f"while the collective operation has actors {actors}." ) else: if not actors.issubset(communicator.get_actor_handles()): - raise ValueError("Actor is not in the communicator group.") + raise ValueError( + "The passed-in communicator must include all of the actors " + "used in the P2P operation. " + f"The passed-in communicator has actors {communicator.get_actor_handles()} " + f"while the P2P operation has actors {actors}." + ) + self._communicator_to_type_hints[communicator].add(type_hint) def _get_default_communicator( self, dag_node: "ray.dag.DAGNode", ) -> Optional[Communicator]: + """ + Get the default communicator for the DAG node when a custom communicator + is not specified. + + Args: + dag_node: The DAG node that uses the communicator. + Returns: + The default communicator for the DAG node. + """ if not self._create_default_communicator: if dag_node._original_type_hint is not None: assert isinstance(dag_node._original_type_hint, AutoTransportType) raise ValueError( - f"AutoTransportType is used for DAGNode {dag_node}, " + f"with_tensor_transport(transport='auto') is used for DAGNode {dag_node}, " "This requires specifying a default communicator or 'create' for " "default_communicator when calling experimental_compile()." ) diff --git a/python/ray/dag/dag_node.py b/python/ray/dag/dag_node.py index 7358a0f05b0b3..847f456097301 100644 --- a/python/ray/dag/dag_node.py +++ b/python/ray/dag/dag_node.py @@ -251,12 +251,17 @@ def experimental_compile( communication and computation can be overlapped, which can improve the performance of the DAG execution. If None, the default value will be used. - _default_communicator: The default communicator to use to transport tensors + _default_communicator: The default communicator to use to transfer + tensors. For p2p operations, this is the default communicator to use for nodes annotated with `with_tensor_transport()` and when shared memory is not the desired option (e.g., when transport="nccl", or when transport="auto" for communication between two different GPUs). If it is "create", a default communicator is created when needed. If None, an error will be thrown. All other values are invalid. + For collective operations, this is the default communicator to use + when a custom communicator is not specified. If it is "create", a communicator + is created for each collective operation and initialized on the involved actors, + or an already created communicator is reused if the set of actors is the same. Returns: A compiled DAG. From 80847a6a6e12f49f714f4fd109d542e14b1a0872 Mon Sep 17 00:00:00 2001 From: Rui Qiao Date: Fri, 31 Jan 2025 22:39:41 +0000 Subject: [PATCH 21/29] up Signed-off-by: Rui Qiao --- python/ray/dag/compiled_dag_node.py | 25 ++++++++++++++----------- python/ray/dag/dag_node.py | 20 ++++++++++++-------- 2 files changed, 26 insertions(+), 19 deletions(-) diff --git a/python/ray/dag/compiled_dag_node.py b/python/ray/dag/compiled_dag_node.py index d050de30b64a3..6d954883ca204 100644 --- a/python/ray/dag/compiled_dag_node.py +++ b/python/ray/dag/compiled_dag_node.py @@ -823,18 +823,21 @@ def __init__( communication and computation can be overlapped, which can improve the performance of the DAG execution. If None, the default value will be used. - default_communicator: The default communicator to use to transfer - tensors. For p2p operations, this is the default communicator to use - for nodes annotated with `with_tensor_transport()` and when shared memory - is not the desired option (e.g., when transport="nccl", or when - transport="auto" for communication between two different GPUs). - If it is "create", a default communicator is created when needed. - If None, an error will be thrown. All other values are invalid. + _default_communicator: The default communicator to use to transfer + tensors. Three types of values are valid. (1) Communicator: + For p2p operations, this is the default communicator + to use for nodes annotated with `with_tensor_transport()` and when + shared memory is not the desired option (e.g., when transport="nccl", + or when transport="auto" for communication between two different GPUs). For collective operations, this is the default communicator to use - when a custom communicator is not specified. If it is "create", a - communicator is created for each collective operation and initialized - on the involved actors, or an already created communicator is reused - if the set of actors is the same. + when a custom communicator is not specified. + (2) "create": for each collective operation without a custom communicator + specified, a communicator is created and initialized on its involved actors, + or an already created communicator is reused if the set of actors is the same. + For all p2p operations without a custom communicator specified, it reuses + an already created collective communicator if the p2p actors are a subset. + Otherwise, a new communicator is created. + (3) None: a ValueError will be thrown if a custom communicator is not specified. Returns: Channel: A wrapper around ray.ObjectRef. diff --git a/python/ray/dag/dag_node.py b/python/ray/dag/dag_node.py index 847f456097301..5b304c8064910 100644 --- a/python/ray/dag/dag_node.py +++ b/python/ray/dag/dag_node.py @@ -252,16 +252,20 @@ def experimental_compile( the performance of the DAG execution. If None, the default value will be used. _default_communicator: The default communicator to use to transfer - tensors. For p2p operations, this is the default communicator to use - for nodes annotated with `with_tensor_transport()` and when shared memory - is not the desired option (e.g., when transport="nccl", or when - transport="auto" for communication between two different GPUs). - If it is "create", a default communicator is created when needed. - If None, an error will be thrown. All other values are invalid. + tensors. Three types of values are valid. (1) Communicator: + For p2p operations, this is the default communicator + to use for nodes annotated with `with_tensor_transport()` and when + shared memory is not the desired option (e.g., when transport="nccl", + or when transport="auto" for communication between two different GPUs). For collective operations, this is the default communicator to use - when a custom communicator is not specified. If it is "create", a communicator - is created for each collective operation and initialized on the involved actors, + when a custom communicator is not specified. + (2) "create": for each collective operation without a custom communicator + specified, a communicator is created and initialized on its involved actors, or an already created communicator is reused if the set of actors is the same. + For all p2p operations without a custom communicator specified, it reuses + an already created collective communicator if the p2p actors are a subset. + Otherwise, a new communicator is created. + (3) None: a ValueError will be thrown if a custom communicator is not specified. Returns: A compiled DAG. From 699e556b03921c5a3e2d0886894df3f83cea746c Mon Sep 17 00:00:00 2001 From: Rui Qiao Date: Fri, 31 Jan 2025 23:14:48 +0000 Subject: [PATCH 22/29] up Signed-off-by: Rui Qiao --- python/ray/dag/compiled_dag_node.py | 57 +++++++------------ python/ray/experimental/channel/common.py | 2 - .../experimental/channel/torch_tensor_type.py | 3 - 3 files changed, 20 insertions(+), 42 deletions(-) diff --git a/python/ray/dag/compiled_dag_node.py b/python/ray/dag/compiled_dag_node.py index 6d954883ca204..9d91756196928 100644 --- a/python/ray/dag/compiled_dag_node.py +++ b/python/ray/dag/compiled_dag_node.py @@ -873,7 +873,6 @@ def __init__( f"got {default_communicator}" ) self._default_communicator: Optional[Communicator] = default_communicator - self._default_communicator_id: Optional[str] = None # Dict from passed-in communicator to set of type hints that refer to it. self._communicator_to_type_hints: Dict[ @@ -1252,8 +1251,6 @@ def _init_communicators(self) -> None: ) for type_hint in type_hints: type_hint.set_communicator_id(communicator_id) - if communicator == self._default_communicator: - self._default_communicator_id = communicator_id # Then, create communicators for collective operations. # Reuse an already created communicator for the same set of actors. @@ -1318,11 +1315,26 @@ def _track_communicator_usage( else: type_hint = dag_node.type_hint custom_communicator = type_hint.get_custom_communicator() - communicator = ( - self._get_default_communicator(dag_node) - if custom_communicator is None - else custom_communicator - ) + + if custom_communicator is None: + if self._default_communicator is None: + if dag_node._original_type_hint is not None: + assert isinstance(dag_node._original_type_hint, AutoTransportType) + raise ValueError( + f"with_tensor_transport(transport='auto') is used for DAGNode {dag_node}, " + "This requires specifying a default communicator or 'create' for " + "default_communicator when calling experimental_compile()." + ) + raise ValueError( + f"DAGNode {dag_node} has no custom communicator specified. " + "Please specify a custom communicator for the DAGNode using " + "`with_tensor_transport()`, or specify a communicator or 'create' for " + "default_communicator when calling experimental_compile()." + ) + communicator = self._default_communicator + else: + communicator = custom_communicator + if communicator is None: if collective_op: self._pending_collective_ops.add(dag_node._collective_op) @@ -1348,35 +1360,6 @@ def _track_communicator_usage( ) self._communicator_to_type_hints[communicator].add(type_hint) - def _get_default_communicator( - self, - dag_node: "ray.dag.DAGNode", - ) -> Optional[Communicator]: - """ - Get the default communicator for the DAG node when a custom communicator - is not specified. - - Args: - dag_node: The DAG node that uses the communicator. - Returns: - The default communicator for the DAG node. - """ - if not self._create_default_communicator: - if dag_node._original_type_hint is not None: - assert isinstance(dag_node._original_type_hint, AutoTransportType) - raise ValueError( - f"with_tensor_transport(transport='auto') is used for DAGNode {dag_node}, " - "This requires specifying a default communicator or 'create' for " - "default_communicator when calling experimental_compile()." - ) - raise ValueError( - f"DAGNode {dag_node} has no custom communicator specified. " - "Please specify a custom communicator for the DAGNode using " - "`with_tensor_transport()`, or specify a communicator or 'create' for " - "default_communicator when calling experimental_compile()." - ) - return self._default_communicator - def _resolve_auto_transport( self, auto_transport_tasks: Set["CompiledTask"], diff --git a/python/ray/experimental/channel/common.py b/python/ray/experimental/channel/common.py index 1f83637c59595..32d146be65d37 100644 --- a/python/ray/experimental/channel/common.py +++ b/python/ray/experimental/channel/common.py @@ -109,8 +109,6 @@ def get_custom_communicator(self) -> Optional[Communicator]: """ Return the custom NCCL group if one is specified. """ - if hasattr(self, "contains_type") and self.contains_type is not None: - return self.contains_type.get_custom_nccl_group() return None def set_communicator_id(self, group_id: str) -> None: diff --git a/python/ray/experimental/channel/torch_tensor_type.py b/python/ray/experimental/channel/torch_tensor_type.py index 4d5db286865cb..004aa2b43d06d 100644 --- a/python/ray/experimental/channel/torch_tensor_type.py +++ b/python/ray/experimental/channel/torch_tensor_type.py @@ -117,9 +117,6 @@ def deserialize(b): deserializer=deserialize, ) - def set_contains_type(self, typ: "ChannelOutputType") -> None: - raise ValueError("TorchTensorType cannot contain other types") - def create_channel( self, writer: Optional["ray.actor.ActorHandle"], From 9296310a30694166d7a67e275535a47c7d027249 Mon Sep 17 00:00:00 2001 From: Rui Qiao Date: Fri, 31 Jan 2025 23:49:06 +0000 Subject: [PATCH 23/29] up Signed-off-by: Rui Qiao --- python/ray/dag/compiled_dag_node.py | 4 +- .../experimental/test_torch_tensor_dag.py | 44 +++++++++++++++++++ 2 files changed, 46 insertions(+), 2 deletions(-) diff --git a/python/ray/dag/compiled_dag_node.py b/python/ray/dag/compiled_dag_node.py index 9d91756196928..1570b7351f38e 100644 --- a/python/ray/dag/compiled_dag_node.py +++ b/python/ray/dag/compiled_dag_node.py @@ -1323,13 +1323,13 @@ def _track_communicator_usage( raise ValueError( f"with_tensor_transport(transport='auto') is used for DAGNode {dag_node}, " "This requires specifying a default communicator or 'create' for " - "default_communicator when calling experimental_compile()." + "_default_communicator when calling experimental_compile()." ) raise ValueError( f"DAGNode {dag_node} has no custom communicator specified. " "Please specify a custom communicator for the DAGNode using " "`with_tensor_transport()`, or specify a communicator or 'create' for " - "default_communicator when calling experimental_compile()." + "_default_communicator when calling experimental_compile()." ) communicator = self._default_communicator else: diff --git a/python/ray/dag/tests/experimental/test_torch_tensor_dag.py b/python/ray/dag/tests/experimental/test_torch_tensor_dag.py index dc06ea3d3d51b..7feeb91d12f45 100644 --- a/python/ray/dag/tests/experimental/test_torch_tensor_dag.py +++ b/python/ray/dag/tests/experimental/test_torch_tensor_dag.py @@ -899,6 +899,50 @@ def test_torch_tensor_exceptions2( ref = compiled_dag.execute(2) +@pytest.mark.parametrize("ray_start_regular", [{"num_cpus": 4}], indirect=True) +def test_torch_tensor_explicit_communicator(ray_start_regular): + if not USE_GPU: + pytest.skip("NCCL tests require GPUs") + + assert ( + sum(node["Resources"].get("GPU", 0) for node in ray.nodes()) > 1 + ), "This test requires at least 2 GPUs" + + actor_cls = TorchTensorWorker.options(num_cpus=0, num_gpus=1) + + sender = actor_cls.remote() + receiver = actor_cls.remote() + + with InputNode() as inp: + dag = sender.send.bind(inp.shape, inp.dtype, inp[0]) + dag = dag.with_tensor_transport(transport="nccl") + dag = receiver.recv.bind(dag) + + with pytest.raises( + ValueError, + match=( + "Please specify a custom communicator for the DAGNode using " + "`with_tensor_transport\(\)`, or specify a communicator or 'create' for " + "_default_communicator when calling experimental_compile()." + ), + ): + dag.experimental_compile(_default_communicator=None) + + with InputNode() as inp: + dag = sender.send.bind(inp.shape, inp.dtype, inp[0]) + dag = dag.with_tensor_transport(transport="auto") + dag = receiver.recv.bind(dag) + + with pytest.raises( + ValueError, + match=( + "This requires specifying a default communicator or 'create' for " + "_default_communicator when calling experimental_compile()." + ), + ): + dag.experimental_compile(_default_communicator=None) + + @pytest.mark.parametrize("ray_start_regular", [{"num_cpus": 4}], indirect=True) def test_torch_tensor_nccl_all_reduce(ray_start_regular): """ From 6ebbeced537122ffeed8025a959d666f41cfb358 Mon Sep 17 00:00:00 2001 From: Rui Qiao Date: Sat, 1 Feb 2025 02:25:10 +0000 Subject: [PATCH 24/29] up Signed-off-by: Rui Qiao --- python/ray/dag/compiled_dag_node.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/python/ray/dag/compiled_dag_node.py b/python/ray/dag/compiled_dag_node.py index 1570b7351f38e..1943c3e8a62dc 100644 --- a/python/ray/dag/compiled_dag_node.py +++ b/python/ray/dag/compiled_dag_node.py @@ -1317,7 +1317,10 @@ def _track_communicator_usage( custom_communicator = type_hint.get_custom_communicator() if custom_communicator is None: - if self._default_communicator is None: + if ( + self._default_communicator is None + and not self._create_default_communicator + ): if dag_node._original_type_hint is not None: assert isinstance(dag_node._original_type_hint, AutoTransportType) raise ValueError( From 9bd36891104630302f9efbe6bc7d955cf2d04c35 Mon Sep 17 00:00:00 2001 From: Rui Qiao Date: Mon, 3 Feb 2025 18:55:59 +0000 Subject: [PATCH 25/29] up Signed-off-by: Rui Qiao --- .../experimental/test_torch_tensor_dag.py | 134 ++++++++++++++++++ 1 file changed, 134 insertions(+) diff --git a/python/ray/dag/tests/experimental/test_torch_tensor_dag.py b/python/ray/dag/tests/experimental/test_torch_tensor_dag.py index 7feeb91d12f45..20bf613b11abb 100644 --- a/python/ray/dag/tests/experimental/test_torch_tensor_dag.py +++ b/python/ray/dag/tests/experimental/test_torch_tensor_dag.py @@ -657,6 +657,140 @@ def get_transport_name(self) -> str: assert result == (i, shape, dtype) +@pytest.mark.parametrize("ray_start_regular", [{"num_cpus": 4}], indirect=True) +@pytest.mark.parametrize("transport", ["auto", "nccl"]) +def test_torch_tensor_default_comm(ray_start_regular, transport): + if not USE_GPU: + pytest.skip("NCCL tests require GPUs") + + assert ( + sum(node["Resources"].get("GPU", 0) for node in ray.nodes()) > 1 + ), "This test requires at least 2 GPUs" + runtime_env = { + "env_vars": { + "MASTER_ADDR": socket.gethostbyname(socket.gethostname()), + "MASTER_PORT": "8888", + } + } + actor_cls = TorchTensorWorker.options( + num_cpus=0, num_gpus=1, runtime_env=runtime_env + ) + + sender = actor_cls.remote() + receiver = actor_cls.remote() + + # Simulates that the distributed environment (e.g., torch.distributed) + # have already been set up + refs = [ + sender.init_distributed.remote(2, 0), + receiver.init_distributed.remote(2, 1), + ] + ray.wait(refs) + + class InitedNcclGroup(Communicator): + """ + A custom NCCL group based on existing torch.distributed setup. + """ + + import cupy as cp + + def __init__(self, world_size, actor_handles): + self._world_size = world_size + self._actor_handles = actor_handles + self._rank = None + + def initialize(self, rank: int) -> None: + expected_rank = self.get_rank(ray.get_runtime_context().current_actor) + assert ( + rank == expected_rank + ), f"NCCL actor's rank {rank} does not match expected rank {expected_rank}" + self._rank = rank + self._device = torch_utils.get_devices()[0] + + def get_rank(self, actor: ray.actor.ActorHandle) -> int: + actor_ids = [a._ray_actor_id for a in self._actor_handles] + try: + rank = actor_ids.index(actor._ray_actor_id) + except ValueError: + raise ValueError("Actor is not in the NCCL group.") + return rank + + def get_world_size(self) -> int: + return self._world_size + + def get_self_rank(self) -> Optional[int]: + return self._rank + + def get_actor_handles(self) -> List["ray.actor.ActorHandle"]: + return self._actor_handles + + def send(self, value: "torch.Tensor", peer_rank: int) -> None: + torch.distributed.send(value, peer_rank) + + def recv( + self, + shape: Tuple[int], + dtype: "torch.dtype", + peer_rank: int, + allocator: Optional[TorchTensorAllocator] = None, + ) -> "torch.Tensor": + tensor = torch.empty(torch.Size(shape), dtype=dtype, device=self._device) + torch.distributed.recv(tensor, peer_rank) + return tensor + + def allreduce( + self, + send_buf: "torch.Tensor", + recv_buf: "torch.Tensor", + op: ReduceOp, + ) -> None: + raise NotImplementedError + + @property + def recv_stream(self) -> Optional["cp.cuda.ExternalStream"]: + import cupy as cp + + return cp.cuda.get_current_stream() + + @property + def send_stream(self) -> Optional["cp.cuda.ExternalStream"]: + import cupy as cp + + return cp.cuda.get_current_stream() + + def destroy(self) -> None: + pass + + def get_transport_name(self) -> str: + return "nccl" + + nccl_group = InitedNcclGroup(2, [sender, receiver]) + + with InputNode() as inp: + dag = sender.send.bind(inp.shape, inp.dtype, inp.value) + dag = dag.with_tensor_transport(transport=transport) + dag = receiver.recv.bind(dag) + + compiled_dag = dag.experimental_compile(_default_communicator=nccl_group) + for i in range(3): + i += 1 + shape = (i * 10,) + dtype = torch.float16 + kwargs = { + "shape": shape, + "dtype": dtype, + "value": i, + } + ref = compiled_dag.execute(**kwargs) + result = ray.get(ref) + assert result == (i, shape, dtype) + + with InputNode() as inp: + dag = sender.send.bind(inp.shape, inp.dtype, inp.value) + dag = dag.with_tensor_transport(transport="auto") + dag = receiver.recv.bind(dag) + + @pytest.mark.parametrize("ray_start_regular", [{"num_cpus": 4}], indirect=True) def test_torch_tensor_nccl_static_shape(ray_start_regular): if not USE_GPU: From 4afe03b685cb02fa7cc0b0048a024572d1cad33f Mon Sep 17 00:00:00 2001 From: Rui Qiao Date: Mon, 3 Feb 2025 23:27:01 +0000 Subject: [PATCH 26/29] up Signed-off-by: Rui Qiao --- .../experimental/test_torch_tensor_dag.py | 53 +++++++++++++------ 1 file changed, 36 insertions(+), 17 deletions(-) diff --git a/python/ray/dag/tests/experimental/test_torch_tensor_dag.py b/python/ray/dag/tests/experimental/test_torch_tensor_dag.py index 20bf613b11abb..24af727718330 100644 --- a/python/ray/dag/tests/experimental/test_torch_tensor_dag.py +++ b/python/ray/dag/tests/experimental/test_torch_tensor_dag.py @@ -658,14 +658,17 @@ def get_transport_name(self) -> str: @pytest.mark.parametrize("ray_start_regular", [{"num_cpus": 4}], indirect=True) -@pytest.mark.parametrize("transport", ["auto", "nccl"]) -def test_torch_tensor_default_comm(ray_start_regular, transport): +@pytest.mark.parametrize( + "transports", + [["auto", "nccl"], ["custom", "nccl"], ["auto", "nccl"], ["custom", "custom"]], +) +def test_torch_tensor_default_comm(ray_start_regular, transports): if not USE_GPU: pytest.skip("NCCL tests require GPUs") assert ( - sum(node["Resources"].get("GPU", 0) for node in ray.nodes()) > 1 - ), "This test requires at least 2 GPUs" + sum(node["Resources"].get("GPU", 0) for node in ray.nodes()) > 2 + ), "This test requires at least 3 GPUs" runtime_env = { "env_vars": { "MASTER_ADDR": socket.gethostbyname(socket.gethostname()), @@ -676,14 +679,16 @@ def test_torch_tensor_default_comm(ray_start_regular, transport): num_cpus=0, num_gpus=1, runtime_env=runtime_env ) - sender = actor_cls.remote() - receiver = actor_cls.remote() + worker0 = actor_cls.remote() + worker1 = actor_cls.remote() + worker2 = actor_cls.remote() # Simulates that the distributed environment (e.g., torch.distributed) # have already been set up refs = [ - sender.init_distributed.remote(2, 0), - receiver.init_distributed.remote(2, 1), + worker0.init_distributed.remote(3, 0), + worker1.init_distributed.remote(3, 1), + worker2.init_distributed.remote(3, 2), ] ray.wait(refs) @@ -764,14 +769,22 @@ def destroy(self) -> None: def get_transport_name(self) -> str: return "nccl" - nccl_group = InitedNcclGroup(2, [sender, receiver]) + default_comm = InitedNcclGroup(3, [worker0, worker1, worker2]) + custom_comm = InitedNcclGroup(3, [worker0, worker1, worker2]) + custom_comm_count = 0 + for i in range(2): + if transports[i] == "custom": + transports[i] = custom_comm + custom_comm_count += 1 with InputNode() as inp: - dag = sender.send.bind(inp.shape, inp.dtype, inp.value) - dag = dag.with_tensor_transport(transport=transport) - dag = receiver.recv.bind(dag) + dag = worker0.send.bind(inp.shape, inp.dtype, inp.value) + dag = dag.with_tensor_transport(transport=transports[0]) + dag = worker1.recv_tensor.bind(dag) + dag = dag.with_tensor_transport(transport=transports[1]) + dag = worker2.recv.bind(dag) - compiled_dag = dag.experimental_compile(_default_communicator=nccl_group) + compiled_dag = dag.experimental_compile(_default_communicator=default_comm) for i in range(3): i += 1 shape = (i * 10,) @@ -785,10 +798,16 @@ def get_transport_name(self) -> str: result = ray.get(ref) assert result == (i, shape, dtype) - with InputNode() as inp: - dag = sender.send.bind(inp.shape, inp.dtype, inp.value) - dag = dag.with_tensor_transport(transport="auto") - dag = receiver.recv.bind(dag) + # No communicators are created, the default communicator is used + assert len(compiled_dag._actors_to_created_communicator_id) == 0 + assert compiled_dag._default_communicator == default_comm + if custom_comm_count == 0: + assert len(compiled_dag._communicator_to_type_hints) == 1 + elif custom_comm_count == 1: + assert len(compiled_dag._communicator_to_type_hints) == 2 + else: + assert custom_comm_count == 2 + assert len(compiled_dag._communicator_to_type_hints) == 1 @pytest.mark.parametrize("ray_start_regular", [{"num_cpus": 4}], indirect=True) From 5081e59fdd7d3fd68951166071fe288a24619c05 Mon Sep 17 00:00:00 2001 From: Rui Qiao Date: Mon, 3 Feb 2025 23:37:11 +0000 Subject: [PATCH 27/29] up Signed-off-by: Rui Qiao --- .../experimental/test_torch_tensor_dag.py | 132 ++++++++++++++++++ 1 file changed, 132 insertions(+) diff --git a/python/ray/dag/tests/experimental/test_torch_tensor_dag.py b/python/ray/dag/tests/experimental/test_torch_tensor_dag.py index 24af727718330..c0dd2d8a404f2 100644 --- a/python/ray/dag/tests/experimental/test_torch_tensor_dag.py +++ b/python/ray/dag/tests/experimental/test_torch_tensor_dag.py @@ -810,6 +810,138 @@ def get_transport_name(self) -> str: assert len(compiled_dag._communicator_to_type_hints) == 1 +@pytest.mark.parametrize("ray_start_regular", [{"num_cpus": 4}], indirect=True) +def test_torch_tensor_invalid_custom_comm(ray_start_regular): + if not USE_GPU: + pytest.skip("NCCL tests require GPUs") + + assert ( + sum(node["Resources"].get("GPU", 0) for node in ray.nodes()) > 1 + ), "This test requires at least 2 GPUs" + runtime_env = { + "env_vars": { + "MASTER_ADDR": socket.gethostbyname(socket.gethostname()), + "MASTER_PORT": "8888", + } + } + actor_cls = TorchTensorWorker.options( + num_cpus=0, num_gpus=1, runtime_env=runtime_env + ) + + sender = actor_cls.remote() + receiver = actor_cls.remote() + + # Simulates that the distributed environment (e.g., torch.distributed) + # have already been set up + refs = [ + sender.init_distributed.remote(2, 0), + receiver.init_distributed.remote(2, 1), + ] + ray.wait(refs) + + class InitedNcclGroup(Communicator): + """ + A custom NCCL group based on existing torch.distributed setup. + """ + + import cupy as cp + + def __init__(self, world_size, actor_handles): + self._world_size = world_size + self._actor_handles = actor_handles + self._rank = None + + def initialize(self, rank: int) -> None: + expected_rank = self.get_rank(ray.get_runtime_context().current_actor) + assert ( + rank == expected_rank + ), f"NCCL actor's rank {rank} does not match expected rank {expected_rank}" + self._rank = rank + self._device = torch_utils.get_devices()[0] + + def get_rank(self, actor: ray.actor.ActorHandle) -> int: + actor_ids = [a._ray_actor_id for a in self._actor_handles] + try: + rank = actor_ids.index(actor._ray_actor_id) + except ValueError: + raise ValueError("Actor is not in the NCCL group.") + return rank + + def get_world_size(self) -> int: + return self._world_size + + def get_self_rank(self) -> Optional[int]: + return self._rank + + def get_actor_handles(self) -> List["ray.actor.ActorHandle"]: + return self._actor_handles + + def send(self, value: "torch.Tensor", peer_rank: int) -> None: + torch.distributed.send(value, peer_rank) + + def recv( + self, + shape: Tuple[int], + dtype: "torch.dtype", + peer_rank: int, + allocator: Optional[TorchTensorAllocator] = None, + ) -> "torch.Tensor": + tensor = torch.empty(torch.Size(shape), dtype=dtype, device=self._device) + torch.distributed.recv(tensor, peer_rank) + return tensor + + def allreduce( + self, + send_buf: "torch.Tensor", + recv_buf: "torch.Tensor", + op: ReduceOp, + ) -> None: + raise NotImplementedError + + @property + def recv_stream(self) -> Optional["cp.cuda.ExternalStream"]: + import cupy as cp + + return cp.cuda.get_current_stream() + + @property + def send_stream(self) -> Optional["cp.cuda.ExternalStream"]: + import cupy as cp + + return cp.cuda.get_current_stream() + + def destroy(self) -> None: + pass + + def get_transport_name(self) -> str: + return "nccl" + + comm2 = InitedNcclGroup(2, [sender, receiver]) + comm1 = InitedNcclGroup(1, [sender]) + + with InputNode() as inp: + dag = sender.send.bind(inp.shape, inp.dtype, inp.value) + dag = dag.with_tensor_transport(transport="auto") + dag = receiver.recv.bind(dag) + + with pytest.raises( + ValueError, + match="The passed-in communicator must include all of the actors used in the P2P operation.", + ): + dag.experimental_compile(_default_communicator=comm1) + + with InputNode() as inp: + dag = sender.send.bind(inp.shape, inp.dtype, inp.value) + dag = dag.with_tensor_transport(transport=comm1) + dag = receiver.recv.bind(dag) + + with pytest.raises( + ValueError, + match="The passed-in communicator must include all of the actors used in the P2P operation.", + ): + dag.experimental_compile(_default_communicator=comm2) + + @pytest.mark.parametrize("ray_start_regular", [{"num_cpus": 4}], indirect=True) def test_torch_tensor_nccl_static_shape(ray_start_regular): if not USE_GPU: From d307c89a0288f3b18ef136e3476794c93f243bf3 Mon Sep 17 00:00:00 2001 From: Rui Qiao Date: Mon, 3 Feb 2025 23:50:21 +0000 Subject: [PATCH 28/29] up Signed-off-by: Rui Qiao --- python/ray/dag/compiled_dag_node.py | 16 +++++++++------- 1 file changed, 9 insertions(+), 7 deletions(-) diff --git a/python/ray/dag/compiled_dag_node.py b/python/ray/dag/compiled_dag_node.py index 1943c3e8a62dc..0434c698bddf9 100644 --- a/python/ray/dag/compiled_dag_node.py +++ b/python/ray/dag/compiled_dag_node.py @@ -1129,7 +1129,7 @@ def _preprocess(self) -> None: if isinstance(dag_node, CollectiveOutputNode): self._track_communicator_usage( dag_node, - dag_node._collective_op.actor_handles, + set(dag_node._collective_op.actor_handles), collective_op=True, ) assert not self._overlap_gpu_communication, ( @@ -1304,8 +1304,12 @@ def _track_communicator_usage( This method also performs validation checks on the passed-in communicator. Args: - dag_node: The DAG node that uses the communicator. - actors: The set of actors that use the communicator. + dag_node: The DAG node that uses the communicator, this is the node + that has the `with_tensor_transport()` type hint for p2p communication, + or a `CollectiveOutputNode` for collective operations. + actors: The full or partial set of actors that use the communicator. + This method should be called one or multiple times so that all actors + of the communicator are tracked. collective_op: Whether the communicator is used for a collective operation. """ if None in actors: @@ -1314,9 +1318,9 @@ def _track_communicator_usage( type_hint = dag_node._collective_op.type_hint else: type_hint = dag_node.type_hint - custom_communicator = type_hint.get_custom_communicator() + communicator = type_hint.get_custom_communicator() - if custom_communicator is None: + if communicator is None: if ( self._default_communicator is None and not self._create_default_communicator @@ -1335,8 +1339,6 @@ def _track_communicator_usage( "_default_communicator when calling experimental_compile()." ) communicator = self._default_communicator - else: - communicator = custom_communicator if communicator is None: if collective_op: From 86421989983ac6797ac609d29639fc27c84ebf6e Mon Sep 17 00:00:00 2001 From: Rui Qiao Date: Tue, 4 Feb 2025 02:37:05 +0000 Subject: [PATCH 29/29] up Signed-off-by: Rui Qiao --- python/ray/dag/compiled_dag_node.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/python/ray/dag/compiled_dag_node.py b/python/ray/dag/compiled_dag_node.py index 0434c698bddf9..36c817f436e88 100644 --- a/python/ray/dag/compiled_dag_node.py +++ b/python/ray/dag/compiled_dag_node.py @@ -1348,7 +1348,7 @@ def _track_communicator_usage( self._pending_p2p_communicator_actors.update(actors) else: if collective_op: - if communicator.get_actor_handles() != actors: + if set(communicator.get_actor_handles()) != actors: raise ValueError( "The passed-in communicator must have the same set " "of actors as the collective operation. " @@ -1356,7 +1356,7 @@ def _track_communicator_usage( f"while the collective operation has actors {actors}." ) else: - if not actors.issubset(communicator.get_actor_handles()): + if not actors.issubset(set(communicator.get_actor_handles())): raise ValueError( "The passed-in communicator must include all of the actors " "used in the P2P operation. "