Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Create PlacementGroup for steps using vLLM #842

Merged
merged 7 commits into from
Jul 30, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 0 additions & 8 deletions docs/sections/how_to_guides/advanced/scaling_with_ray.md
Original file line number Diff line number Diff line change
Expand Up @@ -232,12 +232,4 @@ with Pipeline(name="text-generation-ray-pipeline") as pipeline:
load_data_from_hub >> text_generation
```

Finally, we need to define two environment variables in our `runtime_env.yaml` file:

```yaml
env_vars:
VLLM_USE_RAY_COMPILED_DAG: "1"
VLLM_USE_RAY_SPMD_WORKER: "1"
```

More information about distributed inference with `vLLM` can be found here: [vLLM - Distributed Serving](https://docs.vllm.ai/en/latest/serving/distributed_serving.html)
76 changes: 68 additions & 8 deletions src/distilabel/pipeline/ray.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
from typing import TYPE_CHECKING, Any, Callable, Dict, List, Optional, Union

from distilabel.distiset import create_distiset
from distilabel.llms.vllm import vLLM
from distilabel.pipeline.base import BasePipeline
from distilabel.pipeline.constants import INPUT_QUEUE_ATTR_NAME
from distilabel.pipeline.step_wrapper import _StepWrapper
Expand All @@ -26,6 +27,8 @@
from os import PathLike
from queue import Queue

from ray.util.scheduling_strategies import PlacementGroupSchedulingStrategy

from distilabel.distiset import Distiset
from distilabel.pipeline.typing import InputDataset
from distilabel.steps.base import _Step
Expand Down Expand Up @@ -69,6 +72,7 @@ def __init__(

self._ray_head_node_url = ray_head_node_url
self._ray_init_kwargs = ray_init_kwargs or {}
self._ray_node_ids = {}

def run(
self,
Expand Down Expand Up @@ -171,6 +175,8 @@ def _init_ray(self) -> None:
else:
ray.init(**self._ray_init_kwargs)

self._ray_node_ids = {node["NodeID"]: False for node in ray.nodes()}

@property
def QueueClass(self) -> Callable:
from ray.util.queue import Queue
Expand Down Expand Up @@ -218,17 +224,20 @@ def run(self) -> str:
"name": f"distilabel-{self.name}-{step.name}-{replica}"
}

if step.resources.cpus is not None:
resources["num_cpus"] = step.resources.cpus
if hasattr(step, "llm") and isinstance(step.llm, vLLM): # type: ignore
resources["scheduling_strategy"] = self._create_vllm_placement_group(step)
else:
if step.resources.cpus is not None:
resources["num_cpus"] = step.resources.cpus

if step.resources.gpus is not None:
resources["num_gpus"] = step.resources.gpus
if step.resources.gpus is not None:
resources["num_gpus"] = step.resources.gpus

if step.resources.memory is not None:
resources["memory"] = step.resources.memory
if step.resources.memory is not None:
resources["memory"] = step.resources.memory

if step.resources.resources is not None:
resources["resources"] = step.resources.resources
if step.resources.resources is not None:
resources["resources"] = step.resources.resources

_StepWrapperRay = _StepWrapperRay.options(**resources) # type: ignore

Expand All @@ -255,6 +264,57 @@ def run(self) -> str:
)
step_wrapper.run.remote()

def _create_vllm_placement_group(
self, step: "_Step"
) -> "PlacementGroupSchedulingStrategy":
"""Creates a Ray placement group with as many GPU bundles as `tensor_parallel_size`
specified in the `vLLM` initialisation. The created placement group uses the `STRICT_PACK`
strategy if the `pipeline_parallel_size` is less or equal to 1, otherwise it uses
`SPREAD` (placement group with GPU bundles in several nodes). In addition, the created
placement group is targeted to be created in a specific node. This avoids having
`vLLM` raising the exception `Ray does not allocate any GPUs on the driver node...`,
as it assures that the driver `_StepWrapperRay` actor created resides in the same
node as the ray actors created by `vLLM` for the distributed inference.

Args:
step: the step which uses `vLLM`.

Returns:
A `PlacementGroupSchedulingStrategy` using the created `PlacementGroup`.
"""
import ray

llm = step.llm # type: ignore
tensor_parallel_size = llm.extra_kwargs.get("tensor_parallel_size", 1) # type: ignore
pipeline_parallel_size = llm.extra_kwargs.get( # type: ignore
"pipeline_parallel_size", 1
)

node_id = next(
node_id for node_id, used in self._ray_node_ids.items() if not used
)

self._ray_node_ids[node_id] = True

# Create a placement group
pg = ray.util.placement_group(
# Create `tensor_parallel_size` GPU bundles and at least one CPU bundle
# so the actors can be scheduled and executed (1 CPU bundle can have infinite actors):
# https://docs.ray.io/en/latest/ray-core/scheduling/placement-group.html#schedule-tasks-and-actors-to-placement-groups-use-reserved-resources
bundles=[{"CPU": 1}] + [{"GPU": 1}] * tensor_parallel_size,
strategy="SPREAD" if pipeline_parallel_size > 1 else "STRICT_PACK",
_soft_target_node_id=node_id if pipeline_parallel_size is None else None,
)

self._logger.info(
f"Step '{step.name}' uses `vLLM`. Created a Ray placement group with bundle"
f" specs: {pg.bundle_specs}"
)

return ray.util.scheduling_strategies.PlacementGroupSchedulingStrategy( # type: ignore
placement_group=pg,
)

def _teardown(self) -> None:
"""Clean/release/stop resources reserved to run the pipeline."""
if self._write_buffer:
Expand Down
Loading