Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[core][compiled graphs] Support experimental_compile(_default_communicator=comm) #50023

Open
wants to merge 29 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 25 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
398 changes: 237 additions & 161 deletions python/ray/dag/compiled_dag_node.py

Large diffs are not rendered by default.

17 changes: 17 additions & 0 deletions python/ray/dag/dag_node.py
Original file line number Diff line number Diff line change
Expand Up @@ -228,6 +228,7 @@ def experimental_compile(
enable_asyncio: bool = False,
_max_inflight_executions: Optional[int] = None,
_overlap_gpu_communication: Optional[bool] = None,
_default_communicator: Optional[Union[Communicator, str]] = "create",
) -> "ray.dag.CompiledDAG":
"""Compile an accelerated execution path for this DAG.

Expand All @@ -250,6 +251,21 @@ def experimental_compile(
communication and computation can be overlapped, which can improve
the performance of the DAG execution. If None, the default value
will be used.
_default_communicator: The default communicator to use to transfer
tensors. Three types of values are valid. (1) Communicator:
For p2p operations, this is the default communicator
to use for nodes annotated with `with_tensor_transport()` and when
shared memory is not the desired option (e.g., when transport="nccl",
or when transport="auto" for communication between two different GPUs).
For collective operations, this is the default communicator to use
ruisearch42 marked this conversation as resolved.
Show resolved Hide resolved
when a custom communicator is not specified.
(2) "create": for each collective operation without a custom communicator
specified, a communicator is created and initialized on its involved actors,
or an already created communicator is reused if the set of actors is the same.
For all p2p operations without a custom communicator specified, it reuses
an already created collective communicator if the p2p actors are a subset.
Otherwise, a new communicator is created.
(3) None: a ValueError will be thrown if a custom communicator is not specified.

Returns:
A compiled DAG.
Expand Down Expand Up @@ -278,6 +294,7 @@ def experimental_compile(
enable_asyncio,
_max_inflight_executions,
_overlap_gpu_communication,
_default_communicator,
ruisearch42 marked this conversation as resolved.
Show resolved Hide resolved
)

def execute(
Expand Down
21 changes: 11 additions & 10 deletions python/ray/dag/tests/experimental/test_collective_dag.py
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,6 @@ def test_comm_deduplicate_p2p_and_collective(ray_start_regular, monkeypatch):
monkeypatch,
dag,
{(frozenset(workers), None)},
(frozenset(workers), None),
)

check_nccl_group_teardown(monkeypatch, compiled_dag, mock_nccl_group_set)
Expand All @@ -177,7 +176,6 @@ def test_comm_deduplicate_p2p_and_collective(ray_start_regular, monkeypatch):
monkeypatch,
dag,
{(frozenset(workers), None)},
(frozenset(workers), None),
)

check_nccl_group_teardown(monkeypatch, compiled_dag, mock_nccl_group_set)
Expand All @@ -186,9 +184,10 @@ def test_comm_deduplicate_p2p_and_collective(ray_start_regular, monkeypatch):
@pytest.mark.parametrize(
"ray_start_regular", [{"num_cpus": 4, "num_gpus": 4}], indirect=True
)
def test_custom_comm_deduplicate(ray_start_regular, monkeypatch):
def test_custom_comm(ray_start_regular, monkeypatch):
"""
Test a custom GPU communicator is reused when possible.
Test a custom GPU communicator is used when specified and a default
communicator is used otherwise.
"""
actor_cls = CPUTorchTensorWorker.options(num_cpus=0, num_gpus=1)

Expand All @@ -208,8 +207,10 @@ def test_custom_comm_deduplicate(ray_start_regular, monkeypatch):
compiled_dag, mock_nccl_group_set = check_nccl_group_init(
monkeypatch,
dag,
{(frozenset(workers), comm)},
(frozenset(workers), comm),
{
(frozenset(workers), comm),
(frozenset(workers), None),
},
)

check_nccl_group_teardown(monkeypatch, compiled_dag, mock_nccl_group_set)
Expand All @@ -225,8 +226,10 @@ def test_custom_comm_deduplicate(ray_start_regular, monkeypatch):
compiled_dag, mock_nccl_group_set = check_nccl_group_init(
monkeypatch,
dag,
{(frozenset(workers), comm)},
(frozenset(workers), comm),
{
(frozenset(workers), comm),
(frozenset(workers), None),
},
)

check_nccl_group_teardown(monkeypatch, compiled_dag, mock_nccl_group_set)
Expand Down Expand Up @@ -259,7 +262,6 @@ def test_custom_comm_init_teardown(ray_start_regular, monkeypatch):
monkeypatch,
dag,
{(frozenset(workers), comm)},
(frozenset(workers), comm),
)

check_nccl_group_teardown(monkeypatch, compiled_dag, mock_nccl_group_set)
Expand All @@ -285,7 +287,6 @@ def test_custom_comm_init_teardown(ray_start_regular, monkeypatch):
(frozenset(workers), comm_2),
(frozenset(workers), comm_3),
},
(frozenset(workers), comm_3),
)

check_nccl_group_teardown(monkeypatch, compiled_dag, mock_nccl_group_set)
Expand Down
165 changes: 103 additions & 62 deletions python/ray/dag/tests/experimental/test_torch_tensor_dag.py
Original file line number Diff line number Diff line change
Expand Up @@ -240,7 +240,7 @@ def test_torch_tensor_nccl(
dag = receiver.recv.bind(dag)

compiled_dag = dag.experimental_compile(
_overlap_gpu_communication=overlap_gpu_communication
_overlap_gpu_communication=overlap_gpu_communication,
)

# Test that we can pass different shapes and data.
Expand Down Expand Up @@ -351,7 +351,7 @@ def test_torch_tensor_nccl_overlap_timed(ray_start_regular, overlap_gpu_communic

# Test normal execution.
compiled_dag = dag.experimental_compile(
_overlap_gpu_communication=overlap_gpu_communication
_overlap_gpu_communication=overlap_gpu_communication,
)

start = time.monotonic()
Expand Down Expand Up @@ -530,22 +530,37 @@ def get_transport_name(self) -> str:


@pytest.mark.parametrize("ray_start_regular", [{"num_cpus": 4}], indirect=True)
def test_torch_tensor_custom_comm_invalid(ray_start_regular):
def test_torch_tensor_custom_comm_inited(ray_start_regular):
if not USE_GPU:
pytest.skip("NCCL tests require GPUs")

assert (
sum(node["Resources"].get("GPU", 0) for node in ray.nodes()) > 1
), "This test requires at least 2 GPUs"
runtime_env = {
"env_vars": {
"MASTER_ADDR": socket.gethostbyname(socket.gethostname()),
"MASTER_PORT": "8888",
}
}
actor_cls = TorchTensorWorker.options(
num_cpus=0, num_gpus=1, runtime_env=runtime_env
)

actor_cls = TorchTensorWorker.options(num_cpus=0, num_gpus=1)
sender = actor_cls.remote()
receiver = actor_cls.remote()

actor1 = actor_cls.remote()
actor2 = actor_cls.remote()
# Simulates that the distributed environment (e.g., torch.distributed)
# have already been set up
refs = [
sender.init_distributed.remote(2, 0),
receiver.init_distributed.remote(2, 1),
]
ray.wait(refs)

class MockNcclGroup(Communicator):
class InitedNcclGroup(Communicator):
"""
A mock NCCL group for testing. Send and recv are not implemented.
A custom NCCL group based on existing torch.distributed setup.
"""

import cupy as cp
Expand Down Expand Up @@ -581,7 +596,7 @@ def get_actor_handles(self) -> List["ray.actor.ActorHandle"]:
return self._actor_handles

def send(self, value: "torch.Tensor", peer_rank: int) -> None:
return None
torch.distributed.send(value, peer_rank)

def recv(
self,
Expand All @@ -590,7 +605,9 @@ def recv(
peer_rank: int,
allocator: Optional[TorchTensorAllocator] = None,
) -> "torch.Tensor":
return None
tensor = torch.empty(torch.Size(shape), dtype=dtype, device=self._device)
torch.distributed.recv(tensor, peer_rank)
return tensor

def allreduce(
self,
Expand All @@ -602,72 +619,47 @@ def allreduce(

@property
def recv_stream(self) -> Optional["cp.cuda.ExternalStream"]:
return None
import cupy as cp

return cp.cuda.get_current_stream()

@property
def send_stream(self) -> Optional["cp.cuda.ExternalStream"]:
return None
import cupy as cp

return cp.cuda.get_current_stream()

def destroy(self) -> None:
pass

def get_transport_name(self) -> str:
return "nccl"

nccl_group = MockNcclGroup(2, [actor1, actor2])

# Mixed usage of NCCL groups should throw an error
# Case 1: custom NCCL group first, then default NCCL group
with InputNode() as inp:
dag = actor1.send.bind(inp.shape, inp.dtype, inp.value)
dag = dag.with_tensor_transport(transport=nccl_group)
dag = actor2.recv.options(num_returns=3).bind(dag)
dag = actor2.send.bind(*dag)
dag = dag.with_tensor_transport(transport="nccl")
dag = actor1.recv.bind(dag)
with pytest.raises(
ValueError,
match=r"Compiled Graphs do not support mixed usage of type hints.*",
):
dag.experimental_compile()
nccl_group = InitedNcclGroup(2, [sender, receiver])

# Case 2: default NCCL group first, then custom NCCL group
with InputNode() as inp:
dag = actor1.send.bind(inp.shape, inp.dtype, inp.value)
dag = dag.with_tensor_transport(transport="nccl")
dag = actor2.recv.options(num_returns=3).bind(dag)
dag = actor2.send.bind(*dag)
dag = sender.send.bind(inp.shape, inp.dtype, inp.value)
dag = dag.with_tensor_transport(transport=nccl_group)
dag = actor1.recv.bind(dag)
with pytest.raises(
ValueError,
match=r"Compiled Graphs do not support mixed usage of type hints.*",
):
dag.experimental_compile()

nccl_group2 = MockNcclGroup(2, [actor1, actor2])
dag = receiver.recv.bind(dag)

# Using two different custom NCCL groups are currently not supported
with InputNode() as inp:
dag = actor1.send.bind(inp.shape, inp.dtype, inp.value)
dag = dag.with_tensor_transport(transport=nccl_group)
dag = actor2.recv.options(num_returns=3).bind(dag)
dag = actor2.send.bind(*dag)
dag = dag.with_tensor_transport(transport=nccl_group2)
dag = actor1.recv.bind(dag)
with pytest.raises(
ValueError,
match=(
"Compiled Graphs currently only support "
"a single custom NCCL group, but multiple "
"have been specified."
),
):
dag.experimental_compile()
compiled_dag = dag.experimental_compile()
for i in range(3):
i += 1
shape = (i * 10,)
dtype = torch.float16
kwargs = {
"shape": shape,
"dtype": dtype,
"value": i,
}
ref = compiled_dag.execute(**kwargs)
result = ray.get(ref)
assert result == (i, shape, dtype)


@pytest.mark.parametrize("ray_start_regular", [{"num_cpus": 4}], indirect=True)
def test_torch_tensor_custom_comm_inited(ray_start_regular):
@pytest.mark.parametrize("transport", ["auto", "nccl"])
def test_torch_tensor_default_comm(ray_start_regular, transport):
if not USE_GPU:
pytest.skip("NCCL tests require GPUs")

Expand Down Expand Up @@ -776,10 +768,10 @@ def get_transport_name(self) -> str:

with InputNode() as inp:
dag = sender.send.bind(inp.shape, inp.dtype, inp.value)
dag = dag.with_tensor_transport(transport=nccl_group)
dag = dag.with_tensor_transport(transport=transport)
dag = receiver.recv.bind(dag)

compiled_dag = dag.experimental_compile()
compiled_dag = dag.experimental_compile(_default_communicator=nccl_group)
for i in range(3):
i += 1
shape = (i * 10,)
Expand All @@ -793,6 +785,11 @@ def get_transport_name(self) -> str:
result = ray.get(ref)
assert result == (i, shape, dtype)

with InputNode() as inp:
dag = sender.send.bind(inp.shape, inp.dtype, inp.value)
dag = dag.with_tensor_transport(transport="auto")
dag = receiver.recv.bind(dag)


@pytest.mark.parametrize("ray_start_regular", [{"num_cpus": 4}], indirect=True)
def test_torch_tensor_nccl_static_shape(ray_start_regular):
Expand Down Expand Up @@ -937,7 +934,7 @@ def test_torch_tensor_exceptions(
dag = receiver.recv.bind(dag)

compiled_dag = dag.experimental_compile(
_overlap_gpu_communication=overlap_gpu_communication
_overlap_gpu_communication=overlap_gpu_communication,
)

shape = (10,)
Expand Down Expand Up @@ -1036,6 +1033,50 @@ def test_torch_tensor_exceptions2(
ref = compiled_dag.execute(2)


@pytest.mark.parametrize("ray_start_regular", [{"num_cpus": 4}], indirect=True)
def test_torch_tensor_explicit_communicator(ray_start_regular):
if not USE_GPU:
pytest.skip("NCCL tests require GPUs")

assert (
sum(node["Resources"].get("GPU", 0) for node in ray.nodes()) > 1
), "This test requires at least 2 GPUs"

actor_cls = TorchTensorWorker.options(num_cpus=0, num_gpus=1)

sender = actor_cls.remote()
receiver = actor_cls.remote()

with InputNode() as inp:
dag = sender.send.bind(inp.shape, inp.dtype, inp[0])
dag = dag.with_tensor_transport(transport="nccl")
dag = receiver.recv.bind(dag)

with pytest.raises(
ValueError,
match=(
"Please specify a custom communicator for the DAGNode using "
"`with_tensor_transport\(\)`, or specify a communicator or 'create' for "
"_default_communicator when calling experimental_compile()."
),
):
dag.experimental_compile(_default_communicator=None)

with InputNode() as inp:
dag = sender.send.bind(inp.shape, inp.dtype, inp[0])
dag = dag.with_tensor_transport(transport="auto")
dag = receiver.recv.bind(dag)

with pytest.raises(
ValueError,
match=(
"This requires specifying a default communicator or 'create' for "
"_default_communicator when calling experimental_compile()."
),
):
dag.experimental_compile(_default_communicator=None)


@pytest.mark.parametrize("ray_start_regular", [{"num_cpus": 4}], indirect=True)
def test_torch_tensor_nccl_all_reduce(ray_start_regular):
"""
Expand Down
2 changes: 0 additions & 2 deletions python/ray/experimental/channel/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -109,8 +109,6 @@ def get_custom_communicator(self) -> Optional[Communicator]:
"""
Return the custom NCCL group if one is specified.
"""
if self._contains_type is not None:
return self._contains_type.get_custom_nccl_group()
return None

def set_communicator_id(self, group_id: str) -> None:
Expand Down
4 changes: 2 additions & 2 deletions python/ray/experimental/channel/torch_tensor_nccl_channel.py
Original file line number Diff line number Diff line change
Expand Up @@ -396,7 +396,7 @@ def __init__(
ctx = ChannelContext.get_current()
assert isinstance(
typ.communicator_id, str
), "NCCL group ID ({nccl_group_id}) must be a str."
), f"NCCL group ID ({typ.communicator_id}) must be a str."
self._typ = typ

assert self._typ.communicator_id is not None, "No NCCL group specified."
Expand Down Expand Up @@ -732,7 +732,7 @@ def _init_communicator(
custom_communicator: A custom NCCL group to initialize.
use_communication_streams: Whether to use dedicated send and recv
streams for communication. If True, communication and computation
can be overlapped to improve perfomrance.
can be overlapped to improve performance.
"""
ctx = ChannelContext.get_current()

Expand Down
Loading