Skip to content

Commit

Permalink
Add RayPipeline class (#769)
Browse files Browse the repository at this point in the history
* Create N replicas per `Step`

* Update `_BatchManager` to handle batch sorting uncertainty

* Add multiple replicas test

* Fix unit tests

* Fix `next_expected_seq_no` needed to be updated if
`routing_batch_function`

* Update `set_next_expected_batch_seq_no` only if no `data`

* Fix `next_expected_seq_no` with `routing_batch_function`

* Remove prints

* Add `StepResource` import

* Add missing return type hint

* Add `StepResources` docs

* Add `get_steps_load_stages` method

* Update to load steps in stages

* Add `_teardown` method

* Add load stages

* Add printing info about stages

* Refactor load stages to avoid race conditions

* Add load stages integration test

* Fix unit tests

* Add unit tests for new methods

* Move send last batch message

* Refactor to make it work with routing batch function

* Add integration test for load stages & routing batch function

* Update docs to tell about resources as runtime parameters

* Add missing doc pages

* Add `ray>=2.31.0` optional dependency

* Initial work for `RayPipeline`

* Update to load stages from cache

* Fix bugs requesting initial batches

* Add integration tests for recovering states from cache

* Remove atexit

* Move `_ProcessWrapper` to different file

* `RayPipeline` mvp

* Install `ray` if `python!=3.12`

* Assign ray actor name

* Fix setting `options` for Ray actor

* Set name for all the queues

* Add requirements

* Add docstrings

* Remove unit test

* Add extra `resources`

* Add `ray` method

* Add `ray[default]` as dependency

* Add `script_executed_in_ray_cluster` function

* Fix step load fail didn't stop the pipeline

* Run with `RayPipeline` if detected Ray cluster

* Set built dag

* Fix unit tests

* Add `Pipeline` to `RayPipeline` unit tests

* Add `ray_init_kwargs` argument

* Add `memory` attribute

* Add simple `RayPipeline` integration test

* Override `RayPipeline.dump` method

* Add docs for `RayPipeline`

* Fix close PR docs
  • Loading branch information
gabrielmbmb authored Jul 9, 2024
1 parent 3098ecd commit 3d07c22
Show file tree
Hide file tree
Showing 30 changed files with 1,429 additions and 370 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/docs-pr-close.yml
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ jobs:
python-version: ${{ matrix.python-version }}

- name: Install dependencies
run: pip install -e .[docs]
run: pip install mike

- name: Set git credentials
run: |
Expand Down
210 changes: 210 additions & 0 deletions docs/sections/how_to_guides/advanced/scaling_with_ray.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,210 @@
# Scaling and distributing a pipeline with Ray

Although the local [Pipeline][distilabel.pipeline.local.Pipeline] based on [`multiprocessing`](https://docs.python.org/3/library/multiprocessing.html) + [serving LLMs with an external service](serving_an_llm_for_reuse.md) is enough for executing most of the pipelines used to create SFT and preference datasets, there are scenarios where you might need to scale your pipeline across multiple machines. In such cases, distilabel leverages [Ray](https://www.ray.io/) to distribute the workload efficiently. This allows you to generate larger datasets, reduce execution time, and maximize resource utilization across a cluster of machines, without needing to change a single line of code.

## Relation between distilabel steps and Ray Actors

A `distilabel` pipeline consist of several [`Step`][distilabel.steps.base.Step]s. An `Step` is a class that defines a basic life-cycle:

1. It will load or create the resources (LLMs, clients, etc) required to run its logic.
2. It will run a loop waiting for incoming batches received using a queue. Once it receives one batch, it will process it and put the processed batch into an output queue.
3. When it finish a batch that is the final one or receives a special signal, the loop will finish and the unload logic will be executed.

So an `Step` needs to maintain a minimum state and the best way to do that with Ray is using [actors](https://docs.ray.io/en/latest/ray-core/actors.html).

``` mermaid
graph TD
A[Step] -->|has| B[Multiple Replicas]
B -->|wrapped in| C[Ray Actor]
C -->|maintains| D[Step Replica State]
C -->|executes| E[Step Lifecycle]
E -->|1. Load/Create Resources| F[LLMs, Clients, etc.]
E -->|2. Process batches from| G[Input Queue]
E -->|3. Processed batches are put in| H[Output Queue]
E -->|4. Unload| I[Cleanup]
```

## Executing a pipeline with Ray

The recommended way to execute a `distilabel` pipeline using Ray is using the [Ray Jobs API](https://docs.ray.io/en/latest/cluster/running-applications/job-submission/index.html#ray-jobs-api).

Before jumping on the explanation, let's first install the prerequisites:
```bash
pip install distilabel[ray]
```

!!! tip

It's recommended to create a virtual environment.

For the purpose of explaining how to execute a pipeline with Ray, we'll use the following pipeline throughout the examples:

```python
from distilabel.llms import vLLM
from distilabel.pipeline import Pipeline
from distilabel.steps import LoadDataFromHub
from distilabel.steps.tasks import TextGeneration

with Pipeline(name="text-generation-ray-pipeline") as pipeline:
load_data_from_hub = LoadDataFromHub(output_mappings={"prompt": "instruction"})

text_generation = TextGeneration(
llm=vLLM(
model="meta-llama/Meta-Llama-3-8B-Instruct",
tokenizer="meta-llama/Meta-Llama-3-8B-Instruct",
)
)

load_data_from_hub >> text_generation

if __name__ == "__main__":
distiset = pipeline.run(
parameters={
load_data_from_hub.name: {
"repo_id": "HuggingFaceH4/instruction-dataset",
"split": "test",
},
text_generation.name: {
"llm": {
"generation_kwargs": {
"temperature": 0.7,
"max_new_tokens": 4096,
}
},
"resources": {"replicas": 2, "gpus": 1}, # (1)
},
}
)

distiset.push_to_hub(
"<YOUR_HF_USERNAME_OR_ORGANIZATION>/text-generation-distilabel-ray" # (2)
)
```

1. We're setting [resources](assigning_resources_to_step.md) for the `text_generation` step and defining that we want two replicas and one GPU per replica. `distilabel` will create two replicas of the step i.e. two actors in the Ray cluster, and each actor will request to be allocated in a node of the cluster that have at least one GPU. You can read more about how Ray manages the resources [here](https://docs.ray.io/en/latest/ray-core/scheduling/resources.html#resources).
2. You should modify this and add your user or organization on the Hugging Face Hub.

It's a basic pipeline with just two steps: one to load a dataset from the Hub with an `instruction` column and one to generate a `response` for that instruction using Llama 3 8B Instruct with [vLLM](/distilabel/components-gallery/llms/vllm/). Simple but enough to demonstrate how to distribute and scale the workload using a Ray cluster!

### Using Ray Jobs API

If you don't know the Ray Jobs API then it's recommended to read [Ray Jobs Overview](https://docs.ray.io/en/latest/cluster/running-applications/job-submission/index.html#ray-jobs-overview). Quick summary: Ray Jobs is the recommended way to execute a job in a Ray cluster as it will handle packaging, deploying and managing the Ray application.

To execute the pipeline above, we first need to create a directory (kind of a package) with the pipeline script (or scripts)
that we will submit to the Ray cluster:

```bash
mkdir ray-pipeline
```

The content of the directory `ray-pipeline` should be:

```
ray-pipeline/
├── pipeline.py
└── runtime_env.yaml
```

The first file contains the code of the pipeline, while the second one (`runtime_env.yaml`) is a specific Ray file containing the [environment dependencies](https://docs.ray.io/en/latest/ray-core/handling-dependencies.html#environment-dependencies) required to run the job:

```yaml
pip:
- distilabel[ray,vllm] >= 1.3.0
env_vars:
HF_TOKEN: <YOUR_HF_TOKEN>
```
With this file we're basically informing to the Ray cluster that it will have to install `distilabel` with the `vllm` and `ray` extra dependencies to be able to run the job. In addition, we're defining the `HF_TOKEN` environment variable that will be used (by the `push_to_hub` method) to upload the resulting dataset to the Hugging Face Hub.

After that, we can proceed to execute the `ray` command that will submit the job to the Ray cluster:
```bash
ray job submit \
--address http://localhost:8265 \
--working-dir ray-pipeline \
--runtime-env ray-pipeline/runtime_env.yaml -- python pipeline.py
```

What this will do, it's to basically upload the `--working-dir` to the Ray cluster, install the dependencies and then execute the `python pipeline.py` command from the head node.

## File system requirements

As described in [Using a file system to pass data to steps](fs_to_pass_data.md), `distilabel` relies on the file system to pass the data to the `GlobalStep`s, so if the pipeline to be executed in the Ray cluster have any `GlobalStep` or do you want to set the `use_fs_to_pass_data=True` of the [run][distilabel.pipeline.local.Pipeline.run] method, then you will need to setup a file system to which all the nodes of the Ray cluster have access:

```python
if __name__ == "__main__":
distiset = pipeline.run(
parameters={...},
storage_parameters={"path": "file:///mnt/data"}, # (1)
use_fs_to_pass_data=True,
)
```

1. All the nodes of the Ray cluster should have access to `/mnt/data`.

## Executing a `RayPipeline` in a cluster with Slurm

If you have access to an HPC, then you're probably also a user of [Slurm](https://slurm.schedmd.com/), a workload manager typically used on HPCs. We can create Slurm job that takes some nodes and deploy a Ray cluster to run a distributed `distilabel` pipeline:

```bash
#!/bin/bash
#SBATCH --job-name=distilabel-ray-text-generation
#SBATCH --partition=your-partition
#SBATCH --qos=normal
#SBATCH --nodes=2 # (1)
#SBATCH --exclusive
#SBATCH --ntasks-per-node=1 # (2)
#SBATCH --gpus-per-node=1 # (3)
#SBATCH --time=0:30:00
set -ex
echo "SLURM_JOB_ID: $SLURM_JOB_ID"
echo "SLURM_JOB_NODELIST: $SLURM_JOB_NODELIST"
# Activate virtual environment
source /path/to/virtualenv/.venv/bin/activate
# Getting the node names
nodes=$(scontrol show hostnames "$SLURM_JOB_NODELIST")
nodes_array=($nodes)
# Get the IP address of the head node
head_node=${nodes_array[0]}
head_node_ip=$(srun --nodes=1 --ntasks=1 -w "$head_node" hostname --ip-address)
# Start Ray head node
port=6379
ip_head=$head_node_ip:$port
export ip_head
echo "IP Head: $ip_head"
echo "Starting HEAD at $head_node"
srun --nodes=1 --ntasks=1 -w "$head_node" \
ray start --head --node-ip-address="$head_node_ip" --port=$port \
--dashboard-host=0.0.0.0 \
--block &
# Give some time to head node to start...
sleep 10
# Start Ray worker nodes
worker_num=$((SLURM_JOB_NUM_NODES - 1))
# Start from 1 (0 is head node)
for ((i = 1; i <= worker_num; i++)); do
node_i=${nodes_array[$i]}
echo "Starting WORKER $i at $node_i"
srun --nodes=1 --ntasks=1 -w "$node_i" \
ray start --address "$ip_head" \
--block &
sleep 5
done
# Finally submit the job to the cluster
ray job submit --address http://localhost:8265 --working-dir ray-pipeline -- python -u pipeline.py
```

1. In this case, we just want two nodes: one to run the Ray head node and one to run a worker.
2. We just want to run a task per node i.e. the Ray command that starts the head/worker node.
3. We have selected 1 GPU per node, but we could have selected more depending on the pipeline.
9 changes: 5 additions & 4 deletions mkdocs.yml
Original file line number Diff line number Diff line change
Expand Up @@ -155,14 +155,15 @@ nav:
- Execute Steps and Tasks in a Pipeline: "sections/how_to_guides/basic/pipeline/index.md"
- Advanced:
- Using the Distiset dataset object: "sections/how_to_guides/advanced/distiset.md"
- Cache and recover pipeline executions: "sections/how_to_guides/advanced/caching.md"
- Export data to Argilla: "sections/how_to_guides/advanced/argilla.md"
- Using a file system to pass data of batches between steps: "sections/how_to_guides/advanced/fs_to_pass_data.md"
- Structured data generation: "sections/how_to_guides/advanced/structured_generation.md"
- Specify requirements for pipelines and steps: "sections/how_to_guides/advanced/pipeline_requirements.md"
- Using CLI to explore and re-run existing Pipelines: "sections/how_to_guides/advanced/cli/index.md"
- Cache and recover pipeline executions: "sections/how_to_guides/advanced/caching.md"
- Using a file system to pass data of batches between steps: "sections/how_to_guides/advanced/fs_to_pass_data.md"
- Assigning resources to a step: "sections/how_to_guides/advanced/assigning_resources_to_step.md"
- Structured data generation: "sections/how_to_guides/advanced/structured_generation.md"
- Serving an LLM for sharing it between several tasks: "sections/how_to_guides/advanced/serving_an_llm_for_reuse.md"
- Specify requirements for pipelines and steps: "sections/how_to_guides/advanced/pipeline_requirements.md"
- Scaling and distributing a pipeline with Ray: "sections/how_to_guides/advanced/scaling_with_ray.md"
- Pipeline Samples:
- Examples: "sections/pipeline_samples/examples/index.md"
- Papers:
Expand Down
1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@ mistralai = ["mistralai >= 0.1.0"]
ollama = ["ollama >= 0.1.7"]
openai = ["openai >= 1.0.0"]
outlines = ["outlines >= 0.0.40"]
ray = ["ray[default] >= 2.31.0"]
vertexai = ["google-cloud-aiplatform >= 1.38.0"]
vllm = ["vllm >= 0.4.0", "outlines == 0.0.34", "filelock >= 3.13.4"]

Expand Down
4 changes: 4 additions & 0 deletions scripts/install_dependencies.sh
Original file line number Diff line number Diff line change
Expand Up @@ -8,4 +8,8 @@ python -m pip install uv

uv pip install --system -e ".[dev,tests,anthropic,argilla,cohere,groq,hf-inference-endpoints,hf-transformers,litellm,llama-cpp,ollama,openai,outlines,vertexai,mistralai,instructor]"

if [ "${python_version}" != "(3, 12)" ]; then
uv pip install --system -e .[ray]
fi

uv pip install --system git+https://github.com/argilla-io/LLM-Blender.git
3 changes: 2 additions & 1 deletion src/distilabel/pipeline/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,10 @@
# limitations under the License.

from distilabel.pipeline.local import Pipeline
from distilabel.pipeline.ray import RayPipeline
from distilabel.pipeline.routing_batch_function import (
routing_batch_function,
sample_n_steps,
)

__all__ = ["Pipeline", "routing_batch_function", "sample_n_steps"]
__all__ = ["Pipeline", "RayPipeline", "routing_batch_function", "sample_n_steps"]
Loading

0 comments on commit 3d07c22

Please sign in to comment.