-
Notifications
You must be signed in to change notification settings - Fork 8
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- Loading branch information
coufon
committed
Feb 4, 2024
1 parent
25d5d45
commit 2a8af4d
Showing
5 changed files
with
291 additions
and
31 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,93 @@ | ||
## Cluster Setup and Performance Tuning | ||
|
||
Data operations in Space can run distributedly in a Ray cluster. Ray nodes access the same Space dataset files via Cloud Storage or distributed file systems. | ||
|
||
## Setup | ||
|
||
### Cloud Storage | ||
|
||
Setup [GCS FUSE](https://cloud.google.com/storage/docs/gcs-fuse) to use files on Google Cloud Storage (GCS) (or [S3](https://github.com/s3fs-fuse/s3fs-fuse), [Azure](https://github.com/Azure/azure-storage-fuse)): | ||
|
||
```bash | ||
gcsfuse <mybucket> "/path/to/<mybucket>" | ||
``` | ||
|
||
Space has not yet implemented Cloud Storage file systems. FUSE is the current suggested approach. | ||
|
||
### Cluster Setup | ||
|
||
On the Ray cluster head/worker nodes: | ||
```bash | ||
# Start a Ray head node (IP 123.45.67.89, for example). | ||
# See https://docs.ray.io/en/latest/ray-core/starting-ray.html for details. | ||
ray start --head --port=6379 | ||
``` | ||
|
||
Using [Cloud Storage + FUSE](#cloud-storage) is required in the distributed mode, because the Ray cluster and the client machine should operate on the same directory of files. The mapped local directory paths **must be the same**. | ||
|
||
Run the following code on the client machine to connect to the Ray cluster: | ||
```py | ||
import ray | ||
|
||
# Connect to the Ray cluster. | ||
ray.init(address="ray://123.45.67.89:10001") | ||
``` | ||
|
||
## Configure Space Options | ||
|
||
Create a Ray runner linking to a Space dataset or view to run operations in the Ray cluster. Use options to tune the performance. | ||
|
||
### Data Ingestion | ||
|
||
The [WebDataset ingestion example](/notebooks/webdataset_ingestion.ipynb) describes the setup in detail. The options to tune include: | ||
|
||
- `max_parallelism`: ingestion workload will run in parallel on Ray nodes, capped by this parallelism | ||
|
||
- `array_record_options`: set the [options of ArrayRecord lib](https://github.com/google/array_record/blob/2ac1d904f6be31e5aa2f09549774af65d84bff5a/cpp/array_record_writer.h#L83); Group size is the number of records to serialize together in one chunk. A lower value improves random access latency. However, a larger value is preferred on Cloud Storage, which performs better for batch read. A larger group size reduces the ArrayRecord file size. | ||
|
||
```py | ||
# `ds_or_view` is a Space dataset or (materialized) view. | ||
runner = ds_or_view.ray( | ||
ray_options=RayOptions(max_parallelism=4), | ||
file_options=FileOptions( | ||
array_record_options=ArrayRecordOptions(options="group_size:64") | ||
)) | ||
``` | ||
|
||
### Data Read | ||
|
||
Data read in Ray runner has the following steps: | ||
|
||
- Obtain a list of index files to read, based on the filter and version. If a read `batch size` is provided, further split a file into row ranges. Each row range will be a [Ray data block](https://docs.ray.io/en/latest/data/api/doc/ray.data.block.Block.html). | ||
|
||
- When reading a block, first read the index file as an Arrow table. If there are record fields, read these fields from ArrayRecord files. | ||
|
||
The options to tune include: | ||
|
||
- `max_parallelism`: Ray read parallelism, controlls `parallelism` of [Datasource.get_read_tasks](https://docs.ray.io/en/latest/data/api/doc/ray.data.Datasource.get_read_tasks.html#ray.data.Datasource.get_read_tasks) | ||
|
||
- `batch_size`: a too small batch size will produce too many Ray blocks and have a negative performance impact. A large batch size will require reading many records from ArrayRecord files for each Ray block, which can be slow. | ||
|
||
Examples of setting read batch size in different scenarios: | ||
|
||
```py | ||
ray_option = RayOptions(max_parallelism=4) | ||
|
||
iterator = ds.ray(ray_option).read(batch_size=64) | ||
|
||
mv.ray(ray_option).refresh(batch_size=64) | ||
|
||
ray_ds = ds.ray_dataset(ray_option, ReadOptions(batch_size=64)) | ||
``` | ||
|
||
#### Read Data for Training | ||
|
||
Users can choose to store data fields in **Parquet** or **ArrayRecord** files (record fields). Space performance is similar to other Parquet based datasets when all fields are in Parquet. | ||
|
||
The ArrayRecord reader uses random access read at the granularity of `group size` records. Random access read performs well on local or high performance distributed file systems attached to the reader node (e.g., training VMs). However, the read performance degrades drastically on Cloud Storage, because of too many read RPCs (e.g., per record). The tips for **Cloud Storage** are: | ||
|
||
- For quasi sequential read, use a larger `group size` when ingesting data to Space. It can effectively improve read throughput by reducing number of RPC. But it helps only when adjacent records are read together. | ||
|
||
- For fully randomized read (the order to read records are shuffled), the Space dataset files should be first cached in a high performance file system. | ||
|
||
Random access read training has the benefit of lightweight global shuffling, deterministic training, and checkpointing training state. See the [Grain](https://github.com/google/grain) framework for more details. Integration with Grain is a TODO. |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,190 @@ | ||
{ | ||
"cells": [ | ||
{ | ||
"cell_type": "markdown", | ||
"metadata": {}, | ||
"source": [ | ||
"## Parallel Data Ingestion from WebDataset\n", | ||
"\n", | ||
"This example ingests data from a [WebDataset](https://github.com/webdataset/webdataset) to a Space dataset. The ingestion operation uses Ray runner to distribute the workload in a Ray cluster.\n", | ||
"\n", | ||
"We use [img2dataset](https://github.com/rom1504/img2dataset) to download popular ML datasets in the WebDataset format. Install the packages and download the COCO dataset following the [guide](https://github.com/rom1504/img2dataset/blob/main/dataset_examples/mscoco.md):" | ||
] | ||
}, | ||
{ | ||
"cell_type": "code", | ||
"execution_count": null, | ||
"metadata": { | ||
"vscode": { | ||
"languageId": "shellscript" | ||
} | ||
}, | ||
"outputs": [], | ||
"source": [ | ||
"pip install webdataset img2dataset \n", | ||
"\n", | ||
"wget https://huggingface.co/datasets/ChristophSchuhmann/MS_COCO_2017_URL_TEXT/resolve/main/mscoco.parquet\n", | ||
"\n", | ||
"img2dataset --url_list mscoco.parquet --input_format \"parquet\" \\\n", | ||
" --url_col \"URL\" --caption_col \"TEXT\" --output_format \"webdataset\" \\\n", | ||
" --output_folder \"mscoco\" --processes_count 16 --thread_count 64 --image_size 256 \\\n", | ||
" --enable_wandb True" | ||
] | ||
}, | ||
{ | ||
"cell_type": "markdown", | ||
"metadata": {}, | ||
"source": [ | ||
"Download the COCO WebDataset to \"COCO_DIR\" (local or in Cloud Storage). Then create an empty Space dataset:" | ||
] | ||
}, | ||
{ | ||
"cell_type": "code", | ||
"execution_count": null, | ||
"metadata": {}, | ||
"outputs": [], | ||
"source": [ | ||
"import pyarrow as pa\n", | ||
"\n", | ||
"from space import DirCatalog\n", | ||
"from space import ArrayRecordOptions, FileOptions\n", | ||
"\n", | ||
"# The schema of a new Space dataset to create.\n", | ||
"schema = pa.schema([\n", | ||
" (\"key\", pa.string()),\n", | ||
" (\"caption\", pa.binary()),\n", | ||
" (\"jpg\", pa.binary())])\n", | ||
"\n", | ||
"catalog = DirCatalog(\"/path/to/my/tables\")\n", | ||
"ds = catalog.create_dataset(\"coco\", schema, primary_keys=[\"key\"],\n", | ||
" record_fields=[\"jpg\"]) # Store \"jpg\" in ArrayRecord files\n", | ||
"\n", | ||
"# Or load an existing dataset.\n", | ||
"# ds = catalog.dataset(\"images_size64\")" | ||
] | ||
}, | ||
{ | ||
"cell_type": "markdown", | ||
"metadata": {}, | ||
"source": [ | ||
"Connect to a Ray runner and create a Ray runner for the dataset. See the [setup and performance doc](/docs/performance.md#ray-runner-setup) for how to configure the options." | ||
] | ||
}, | ||
{ | ||
"cell_type": "code", | ||
"execution_count": null, | ||
"metadata": {}, | ||
"outputs": [], | ||
"source": [ | ||
"import ray\n", | ||
"\n", | ||
"# Connect to a Ray cluster. Or skip it to use a local Ray instance.\n", | ||
"ray.init(address=\"ray://12.34.56.78:10001\")\n", | ||
"\n", | ||
"runner = ds.ray(\n", | ||
" ray_options=RayOptions(max_parallelism=4),\n", | ||
" file_options=FileOptions(\n", | ||
" array_record_options=ArrayRecordOptions(options=\"group_size:64\")\n", | ||
" ))" | ||
] | ||
}, | ||
{ | ||
"cell_type": "markdown", | ||
"metadata": {}, | ||
"source": [ | ||
"A WebDataset consists of a directory of tar files. A URL in form of `something-{000000..012345}.tar` represents a shard of the dataset, i.e., a subset of tar files. The following `read_webdataset` method returns an iterator to scan the shard described by a URL." | ||
] | ||
}, | ||
{ | ||
"cell_type": "code", | ||
"execution_count": null, | ||
"metadata": {}, | ||
"outputs": [], | ||
"source": [ | ||
"import webdataset as wds\n", | ||
"\n", | ||
"# The size of a batch returned by the iterator.\n", | ||
"BATCH_SIZE = 64\n", | ||
"\n", | ||
"def read_webdataset(shard_url: str):\n", | ||
" print(f\"Processing URL: {shard_url}\")\n", | ||
"\n", | ||
" def to_dict(keys, captions, jpgs):\n", | ||
" return {\"key\": keys, \"caption\": captions, \"jpg\": jpgs}\n", | ||
"\n", | ||
" ds = wds.WebDataset(shard_url)\n", | ||
" keys, captions, jpgs = [], [], []\n", | ||
" for i, sample in enumerate(ds):\n", | ||
" keys.append(sample[\"__key__\"])\n", | ||
" captions.append(sample[\"txt\"])\n", | ||
" jpgs.append(sample[\"jpg\"])\n", | ||
"\n", | ||
" if len(keys) == BATCH_SIZE:\n", | ||
" yield (to_dict(keys, captions, jpgs))\n", | ||
" keys.clear()\n", | ||
" captions.clear()\n", | ||
" jpgs.clear()\n", | ||
"\n", | ||
" if len(keys) > 0:\n", | ||
" yield (to_dict(keys, captions, jpgs))" | ||
] | ||
}, | ||
{ | ||
"cell_type": "markdown", | ||
"metadata": {}, | ||
"source": [ | ||
"The whole COCO dataset has 60 tar files. We split it into multiple shards to ingest data in parallel:" | ||
] | ||
}, | ||
{ | ||
"cell_type": "code", | ||
"execution_count": null, | ||
"metadata": {}, | ||
"outputs": [], | ||
"source": [ | ||
"NUM_SHARDS = 60\n", | ||
"NUM_WORKERS = 4\n", | ||
"\n", | ||
"COCO_DIR = \"/path/to/my/downloaded/coco\"\n", | ||
"\n", | ||
"shards_per_worker = NUM_SHARDS // NUM_WORKERS\n", | ||
"shard_urls = []\n", | ||
"for i in range(NUM_WORKERS):\n", | ||
" start = f\"{i * shards_per_worker:05d}\"\n", | ||
" end = f\"{min(NUM_SHARDS-1, (i + 1) * shards_per_worker - 1):05d}\"\n", | ||
" shard_urls.append(COCO_DIR + f\"/{{{start}..{end}}}.tar\")" | ||
] | ||
}, | ||
{ | ||
"cell_type": "markdown", | ||
"metadata": {}, | ||
"source": [ | ||
"The `append_from` method takes a list of no-arg methods as input. Each method makes a new iterator that reads a shard of input data. The iterators are read in parallel on Ray nodes." | ||
] | ||
}, | ||
{ | ||
"cell_type": "code", | ||
"execution_count": null, | ||
"metadata": {}, | ||
"outputs": [], | ||
"source": [ | ||
"from functools import partial\n", | ||
"\n", | ||
"# A Ray remote fn to call `append_from` from the Ray cluster.\n", | ||
"@ray.remote\n", | ||
"def run():\n", | ||
" runner.append_from([partial(read_webdataset, url) for url in shard_urls])\n", | ||
"\n", | ||
"# Start ingestion\n", | ||
"ray.get(run.remote())" | ||
] | ||
} | ||
], | ||
"metadata": { | ||
"language_info": { | ||
"name": "python" | ||
} | ||
}, | ||
"nbformat": 4, | ||
"nbformat_minor": 2 | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,6 +1,6 @@ | ||
[project] | ||
name = "space-datasets" | ||
version = "0.0.8" | ||
version = "0.0.9" | ||
authors = [{ name = "Space team", email = "[email protected]" }] | ||
description = "Unified storage framework for machine learning datasets" | ||
readme = "README.md" | ||
|
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters