-
Notifications
You must be signed in to change notification settings - Fork 432
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[RayJob] [Doc] Add real-world Ray Job use case tutorial for KubeRay #1361
Changes from 2 commits
0cee164
2b44fc5
32ca1b4
0f3a44b
fd3c353
5372daf
79db9fd
d6bbf58
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,188 @@ | ||
# RayJob Batch Inference Example | ||
|
||
This page demonstrates how to use the RayJob custom resource to run a batch inference job on a Ray cluster. | ||
|
||
We will use an image classification workload. The example is based on <https://docs.ray.io/en/latest/data/examples/huggingface_vit_batch_prediction.html>. Please see that page for a full explanation of the code. | ||
|
||
## Prerequisites | ||
|
||
You must have a Kubernetes cluster running and `kubectl` configured to use it, and GPUs available. We provide a brief tutorial for setting up the necessary GPUs on Google Kubernetes Engine (GKE), but you can use any Kubernetes cluster with GPUs. | ||
|
||
## Deploy KubeRay | ||
|
||
Make sure your KubeRay operator version is at least v0.6.0. | ||
|
||
The latest released KubeRay version is recommended. | ||
|
||
For installation instructions, please follow [the documentation](../deploy/installation.md). | ||
|
||
## Step 0: Create a Kubernetes cluster on GKE (Optional) | ||
|
||
If you already have a Kubernetes cluster with GPUs, you can skip this step. | ||
|
||
Run this command and all following commands on your local machine or on the [Google Cloud Shell](https://cloud.google.com/shell). If running from your local machine, you will need to install the [Google Cloud SDK](https://cloud.google.com/sdk/docs/install). | ||
|
||
```bash | ||
|
||
gcloud container clusters create batch-gpu-cluster \ | ||
--num-nodes=1 --min-nodes 0 --max-nodes 1 --enable-autoscaling \ | ||
--zone=us-west1-b --machine-type e2-standard-8 | ||
|
||
``` | ||
|
||
This command creates a Kubernetes cluster named `batch-gpu-cluster` with 1 node in the `us-west1-b` zone. In this example, we use the `e2-standard-8` machine type, which has 8 vCPUs and 32 GB RAM. | ||
|
||
You can also create a cluster from the [Google Cloud Console](https://console.cloud.google.com/kubernetes/list). | ||
|
||
Run the following command to create a GPU node pool for the Ray cluster. | ||
(You can also create it from the Google Cloud Console; see the [GKE documentation](https://cloud.google.com/kubernetes-engine/docs/how-to/node-taints#create_a_node_pool_with_node_taints) for more details.) | ||
|
||
```bash | ||
|
||
gcloud container node-pools create gpu-node-pool \ | ||
--accelerator type=nvidia-tesla-t4,count=4,gpu-driver-version=default \ | ||
--zone us-west1-b \ | ||
--cluster batch-gpu-cluster \ | ||
--num-nodes 1 \ | ||
--min-nodes 0 \ | ||
--max-nodes 1 \ | ||
--enable-autoscaling \ | ||
--machine-type n1-standard-64 | ||
|
||
``` | ||
|
||
The `--accelerator` flag specifies the type and number of GPUs for each node in the node pool. In this example, we use the [NVIDIA L4](https://cloud.google.com/compute/docs/gpus#l4-gpus) GPU. The machine type is `n1-standard-64`, which has [64 vCPUs and 240 GB RAM](https://cloud.google.com/compute/docs/general-purpose-machines#n1_machine_types). The `--min-nodes 0` and `--max-nodes 1` flags enable autoscaling for the node pool. The `--num-nodes 1` flag specifies the initial number of nodes in the node pool. | ||
|
||
GKE will automatically prevent CPU-only pods such as the Kuberay operator from being scheduled on this GPU node pool. This is because GPUs are expensive, so we want to use this node pool for Ray GPU nodes only. To set this behavior up manually, you can use taints and tolerations; see the [Kubernetes documentation](https://kubernetes.io/docs/concepts/scheduling-eviction/taint-and-toleration/). | ||
|
||
Finally, run the following command to download credentials and configure the Kubernetes CLI to use them. | ||
|
||
```sh | ||
gcloud container clusters get-credentials batch-gpu-cluster --zone us-west1-b | ||
``` | ||
|
||
For more details, see the [GKE documentation](https://cloud.google.com/kubernetes-engine/docs/how-to/cluster-access-for-kubectl). | ||
|
||
## Step 1: Install the KubeRay Operator | ||
|
||
Once `kubectl` is configured to connect to your cluster, you can install the KubeRay operator. | ||
|
||
```sh | ||
# Install both CRDs and KubeRay operator v0.5.0. | ||
helm repo add kuberay https://ray-project.github.io/kuberay-helm/ | ||
helm repo update | ||
helm install kuberay-operator kuberay/kuberay-operator --version 0.5.0 | ||
|
||
# It should be scheduled on the CPU node. If it is not, something is wrong. | ||
``` | ||
|
||
## Step 2: Submit the RayJob | ||
|
||
Now we can submit the RayJob. Our RayJob spec is defined in [ray_v1alpha1_rayjob.batch-inference.yaml](https://github.com/ray-project/kuberay/blob/master/ray-operator/config/samples/ray_v1alpha1_rayjob.batch-inference.yaml). | ||
|
||
Note that the `RayJob` spec contains a spec for the `RayCluster` that is to be created for the job. For this tutorial, we use a single-node cluster with 4 GPUs. For production use cases, we recommend using a multi-node cluster where the head node does not have GPUs, so that Ray can automatically schedule GPU workloads on worker nodes and they won't interfere with critical Ray processes on the head node. | ||
|
||
Note the following fields in the `RayJob` spec, which specify the Ray image and the GPU resources for our Ray node: | ||
```yaml | ||
spec: | ||
containers: | ||
- name: ray-head | ||
image: rayproject/ray-ml:2.6.3-gpu | ||
resources: | ||
limits: | ||
nvidia.com/gpu: "4" | ||
requests: | ||
cpu: "54" | ||
memory: "54Gi" | ||
volumeMounts: | ||
- mountPath: /home/ray/samples | ||
name: code-sample | ||
nodeSelector: | ||
cloud.google.com/gke-accelerator: nvidia-tesla-t4 # This is the GPU type we used in the GPU node pool. | ||
``` | ||
|
||
To submit the job, run the following command: | ||
|
||
```bash | ||
kubectl apply -f ray_v1alpha1_rayjob.batch-inference.yaml | ||
``` | ||
|
||
Here the cluster is running because we did not set `shutdownAfterJobFinishes` in the `RayJob` spec. If you set `shutdownAfterJobFinishes` to `true`, the cluster will be shut down after the job finishes. | ||
|
||
We can check the status with `kubectl describe rayjob rayjob-sample`. | ||
|
||
Sample output: | ||
|
||
``` | ||
[...] | ||
Status: | ||
Dashboard URL: rayjob-sample-raycluster-j6t8n-head-svc.default.svc.cluster.local:8265 | ||
End Time: 2023-08-22T22:48:35Z | ||
Job Deployment Status: Running | ||
Job Id: rayjob-sample-ft8lh | ||
Job Status: SUCCEEDED | ||
Message: Job finished successfully. | ||
Observed Generation: 2 | ||
Ray Cluster Name: rayjob-sample-raycluster-j6t8n | ||
Ray Cluster Status: | ||
Endpoints: | ||
Client: 10001 | ||
Dashboard: 8265 | ||
Gcs - Server: 6379 | ||
Metrics: 8080 | ||
Head: | ||
Pod IP: 10.112.1.3 | ||
Service IP: 10.116.1.93 | ||
Last Update Time: 2023-08-22T22:47:44Z | ||
Observed Generation: 1 | ||
State: ready | ||
Start Time: 2023-08-22T22:48:02Z | ||
Events: | ||
Type Reason Age From Message | ||
---- ------ ---- ---- ------- | ||
Normal Created 36m rayjob-controller Created cluster rayjob-sample-raycluster-j6t8n | ||
Normal Created 32m rayjob-controller Created k8s job rayjob-sample | ||
``` | ||
|
||
To view the logs, first find the name of the pod running the job with `kubectl get pods`. | ||
|
||
Sample output: | ||
|
||
```bash | ||
NAME READY STATUS RESTARTS AGE | ||
kuberay-operator-8b86754c-r4rc2 1/1 Running 0 25h | ||
rayjob-sample-raycluster-j6t8n-head-kx2gz 1/1 Running 0 35m | ||
rayjob-sample-w98c7 0/1 Completed 0 30m | ||
``` | ||
|
||
Next, run | ||
|
||
```text | ||
kubetcl logs rayjob-sample-w98c7 | ||
``` | ||
|
||
to get the standard output of the `entrypoint` command for the `RayJob`. Sample output: | ||
|
||
```text | ||
[...] | ||
Running: 62.0/64.0 CPU, 4.0/4.0 GPU, 955.57 MiB/12.83 GiB object_store_memory: 0%| | 0/200 [00:05<?, ?it/s] | ||
Running: 61.0/64.0 CPU, 4.0/4.0 GPU, 999.41 MiB/12.83 GiB object_store_memory: 0%| | 0/200 [00:05<?, ?it/s] | ||
Running: 61.0/64.0 CPU, 4.0/4.0 GPU, 999.41 MiB/12.83 GiB object_store_memory: 0%| | 1/200 [00:05<17:04, 5.15s/it] | ||
Running: 61.0/64.0 CPU, 4.0/4.0 GPU, 1008.68 MiB/12.83 GiB object_store_memory: 0%| | 1/200 [00:05<17:04, 5.15s/it] | ||
Running: 61.0/64.0 CPU, 4.0/4.0 GPU, 1008.68 MiB/12.83 GiB object_store_memory: 100%|██████████| 1/1 [00:05<00:00, 5.15s/it] | ||
|
||
2023-08-22 15:48:33,905 WARNING actor_pool_map_operator.py:267 -- To ensure full parallelization across an actor pool of size 4, the specified batch size should be at most 5. Your configured batch size for this operator was 16. | ||
<PIL.Image.Image image mode=RGB size=500x375 at 0x7B37546CF7F0> | ||
Label: tench, Tinca tinca | ||
<PIL.Image.Image image mode=RGB size=500x375 at 0x7B37546AE430> | ||
Label: tench, Tinca tinca | ||
<PIL.Image.Image image mode=RGB size=500x375 at 0x7B37546CF430> | ||
Label: tench, Tinca tinca | ||
<PIL.Image.Image image mode=RGB size=500x375 at 0x7B37546AE430> | ||
Label: tench, Tinca tinca | ||
<PIL.Image.Image image mode=RGB size=500x375 at 0x7B37546CF7F0> | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is also suspicious, why are there only two distinct memory locations across the five lines? |
||
Label: tench, Tinca tinca | ||
2023-08-22 15:48:36,522 SUCC cli.py:33 -- ----------------------------------- | ||
2023-08-22 15:48:36,522 SUCC cli.py:34 -- Job 'rayjob-sample-ft8lh' succeeded | ||
2023-08-22 15:48:36,522 SUCC cli.py:35 -- ----------------------------------- | ||
``` |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -8,16 +8,6 @@ spec: | |
# shutdownAfterJobFinishes: false | ||
# ttlSecondsAfterFinished specifies the number of seconds after which the RayCluster will be deleted after the RayJob finishes. | ||
# ttlSecondsAfterFinished: 10 | ||
# Runtime env decoded to { | ||
# { | ||
# "pip": [ | ||
# "torch", | ||
# "torchvision", | ||
# "Pillow", | ||
# "transformers" | ||
# ] | ||
# } | ||
runtimeEnv: ewogICJwaXAiOiBbCiAgICAidG9yY2giLAogICAgInRvcmNodmlzaW9uIiwKICAgICJQaWxsb3ciLAogICAgInRyYW5zZm9ybWVycyIKICBdCn0= | ||
# Suspend specifies whether the RayJob controller should create a RayCluster instance. | ||
# If a job is applied with the suspend field set to true, the RayCluster will not be created and we will wait for the transition to false. | ||
# If the RayCluster is already created, it will be deleted. In the case of transition to false, a new RayCluste rwill be created. | ||
|
@@ -47,14 +37,15 @@ spec: | |
name: client | ||
resources: | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The resource config is pretty weird. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I will update it to make limits=requests |
||
limits: | ||
cpu: 2 | ||
memory: 8Gi | ||
nvidia.com/gpu: "4" | ||
requests: | ||
cpu: 2 | ||
memory: 8Gi | ||
cpu: "54" | ||
memory: "54Gi" | ||
volumeMounts: | ||
- mountPath: /home/ray/samples | ||
name: code-sample | ||
nodeSelector: | ||
cloud.google.com/gke-accelerator: nvidia-tesla-t4 | ||
volumes: | ||
# You set volumes at the Pod level, then mount them into containers inside that Pod | ||
- name: code-sample | ||
|
@@ -65,39 +56,6 @@ spec: | |
items: | ||
- key: sample_code.py | ||
path: sample_code.py | ||
workerGroupSpecs: | ||
# the pod replicas in this group typed worker | ||
- replicas: 1 | ||
minReplicas: 1 | ||
maxReplicas: 5 | ||
# logical group name, for this called small-group, also can be functional | ||
groupName: small-group | ||
# The `rayStartParams` are used to configure the `ray start` command. | ||
# See https://github.com/ray-project/kuberay/blob/master/docs/guidance/rayStartParams.md for the default settings of `rayStartParams` in KubeRay. | ||
# See https://docs.ray.io/en/latest/cluster/cli.html#ray-start for all available options in `rayStartParams`. | ||
rayStartParams: | ||
resources: '"{\"accelerator_type_cpu\": 48, \"accelerator_type_a10\": 2, \"accelerator_type_a100\": 2}"' | ||
#pod template | ||
template: | ||
spec: | ||
containers: | ||
- name: ray-worker # must consist of lower case alphanumeric characters or '-', and must start and end with an alphanumeric character (e.g. 'my-name', or '123-abc' | ||
image: rayproject/ray-ml:2.6.3-gpu | ||
lifecycle: | ||
preStop: | ||
exec: | ||
command: [ "/bin/sh","-c","ray stop" ] | ||
resources: | ||
limits: | ||
cpu: "48" | ||
memory: "192G" | ||
nvidia.com/gpu: 4 | ||
requests: | ||
cpu: "36" | ||
memory: "128G" | ||
nvidia.com/gpu: 4 | ||
nodeSelector: | ||
cloud.google.com/gke-accelerator: nvidia-tesla-t4 | ||
# SubmitterPodTemplate is the template for the pod that will run the `ray job submit` command against the RayCluster. | ||
# If SubmitterPodTemplate is specified, the first container is assumed to be the submitter container. | ||
# submitterPodTemplate: | ||
|
@@ -129,51 +87,48 @@ data: | |
s3_uri, mode="RGB" | ||
) | ||
ds | ||
# TODO(archit) need to install Pillow, pytorch or tf or flax (pip install torch torchvision torchaudio) | ||
from typing import Dict | ||
import numpy as np | ||
|
||
from transformers import pipeline | ||
from PIL import Image | ||
|
||
# Pick the largest batch size that can fit on our GPUs | ||
BATCH_SIZE = 1024 | ||
BATCH_SIZE = 16 | ||
|
||
# TODO(archit) basic step | ||
class ImageClassifier: | ||
def __init__(self): | ||
# If doing CPU inference, set `device="cpu"` instead. | ||
self.classifier = pipeline("image-classification", model="google/vit-base-patch16-224", device=0) # TODO:archit | ||
|
||
# single_batch = ds.take_batch(10) | ||
def __call__(self, batch: Dict[str, np.ndarray]): | ||
# Convert the numpy array of images into a list of PIL images which is the format the HF pipeline expects. | ||
outputs = self.classifier( | ||
[Image.fromarray(image_array) for image_array in batch["image"]], | ||
top_k=1, | ||
batch_size=BATCH_SIZE) | ||
|
||
# `outputs` is a list of length-one lists. For example: | ||
# [[{'score': '...', 'label': '...'}], ..., [{'score': '...', 'label': '...'}]] | ||
batch["score"] = [output[0]["score"] for output in outputs] | ||
batch["label"] = [output[0]["label"] for output in outputs] | ||
return batch | ||
|
||
# from PIL import Image | ||
|
||
# img = Image.fromarray(single_batch["image"][0]) | ||
# # display image | ||
# img.show() | ||
# from transformers import pipeline | ||
# from PIL import Image | ||
|
||
# # If doing CPU inference, set device="cpu" instead. | ||
# classifier = pipeline("image-classification", model="google/vit-base-patch16-224", device="cuda:0") | ||
# outputs = classifier([Image.fromarray(image_array) for image_array in single_batch["image"]], top_k=1, batch_size=10) | ||
# del classifier # Delete the classifier to free up GPU memory. | ||
# print(outputs) | ||
|
||
@ray.remote(num_gpus=1) | ||
def do_single_batch(): | ||
single_batch = ds.take_batch(10) | ||
predictions = ds.map_batches( | ||
ImageClassifier, | ||
compute=ray.data.ActorPoolStrategy(size=4), # Change this number based on the number of GPUs in your cluster. | ||
num_gpus=1, # Specify 1 GPU per model replica. | ||
batch_size=BATCH_SIZE # Use the largest batch size that can fit on our GPUs | ||
) | ||
|
||
from PIL import Image | ||
prediction_batch = predictions.take_batch(5) | ||
|
||
img = Image.fromarray(single_batch["image"][0]) | ||
# display image | ||
from PIL import Image | ||
print("A few sample predictions: ") | ||
for image, prediction in zip(prediction_batch["image"], prediction_batch["label"]): | ||
img = Image.fromarray(image) | ||
# Display the image | ||
img.show() | ||
from transformers import pipeline | ||
from PIL import Image | ||
|
||
# If doing CPU inference, set device="cpu" instead. | ||
classifier = pipeline("image-classification", model="google/vit-base-patch16-224", device="cuda:0") | ||
outputs = classifier([Image.fromarray(image_array) for image_array in single_batch["image"]], top_k=1, batch_size=10) | ||
del classifier # Delete the classifier to free up GPU memory. | ||
print(outputs) | ||
return outputs | ||
print("Label: ", prediction) | ||
|
||
print(ray.get(do_single_batch.remote())) | ||
# Write to local disk, or external storage, e.g. S3 | ||
# ds.write_parquet("s3://my_bucket/my_folder") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This output is a little suspicious.