From e9cedada0d8dd01bc6e5ae57255bd06ab0c2d3fb Mon Sep 17 00:00:00 2001 From: coufon Date: Sun, 4 Feb 2024 04:33:05 +0000 Subject: [PATCH] Add setup and performance docs --- README.md | 34 +---- docs/performance.md | 93 +++++++++++++ notebooks/webdataset_ingestion.ipynb | 191 +++++++++++++++++++++++++++ python/pyproject.toml | 2 +- python/src/space/__init__.py | 3 +- 5 files changed, 292 insertions(+), 31 deletions(-) create mode 100644 docs/performance.md create mode 100644 notebooks/webdataset_ingestion.ipynb diff --git a/README.md b/README.md index 239d5bd..71e995e 100644 --- a/README.md +++ b/README.md @@ -30,6 +30,7 @@ Unify data in your entire machine learning lifecycle with **Space**, a comprehen - [Ground truth database of LabelStudio](notebooks/label_studio_tutorial.ipynb) - [Transforms and materialized views: Segment Anything as example](notebooks/segment_anything_tutorial.ipynb) - [Incrementally build embedding vector indexes](notebooks/incremental_embedding_index.ipynb) +- [Parallel ingestion from WebDataset](notebooks/webdataset_ingestion.ipynb) ## Space 101 @@ -37,13 +38,12 @@ Unify data in your entire machine learning lifecycle with **Space**, a comprehen - All file paths in Space are [relative](./docs/design.md#relative-paths); datasets are immediately usable after downloading or moving. - Space stores data itself, or a reference of data, in Parquet files. The reference can be the address of a row in ArrayRecord file, or the path of a standalone file (limitted support, see `space.core.schema.types.files`). - `space.TfFeatures` is a built-in field type providing serializers for nested dicts of numpy arrays, based on [TFDS FeaturesDict](https://www.tensorflow.org/datasets/api_docs/python/tfds/features/FeaturesDict). -- Please find more information in [the design page](docs/design.md). +- Please find more information in the [design](docs/design.md) and [performance](docs/performance.md) docs. ## Quick Start - [Install](#install) -- [Cloud Storage](#cloud-storage) -- [Cluster setup](#cluster-setup) +- [Cluster Setup and Performance Tuning](#cluster-setup-and-performance-tuning) - [Create and Load Datasets](#create-and-load-datasets) - [Write and Read](#write-and-read) - [Transform and Materialized Views](#transform-and-materialized-views) @@ -63,33 +63,9 @@ cd python pip install .[dev] ``` -### Cloud Storage +### Cluster Setup and Performance Tuning -Optionally, setup [GCS FUSE](https://cloud.google.com/storage/docs/gcs-fuse) to use files on Google Cloud Storage (GCS) (or [S3](https://github.com/s3fs-fuse/s3fs-fuse), [Azure](https://github.com/Azure/azure-storage-fuse)): - -```bash -gcsfuse "/path/to/" -``` - -Space has not yet implemented Cloud Storage file systems. FUSE is the current suggested approach. - -### Cluster Setup - -Optionally, setup a cluster to run Space operations distributedly. We support Ray clusters, on the Ray cluster head/worker nodes: -```bash -# Start a Ray head node (IP 123.45.67.89, for example). -# See https://docs.ray.io/en/latest/ray-core/starting-ray.html for details. -ray start --head --port=6379 -``` - -Using [Cloud Storage + FUSE](#cloud-storage) is required in the distributed mode, because the Ray cluster and the client machine should operate on the same directory of files. Run `gcsfuse` on all machines and the mapped local directory paths **must be the same**. - -Run the following code on the client machine to connect to the Ray cluster: -```py -import ray -# Connect to the Ray cluster. -ray.init(address="ray://123.45.67.89:10001") -``` +See the [setup and performance doc](/docs/performance.md#ray-runner-setup). ### Create and Load Datasets diff --git a/docs/performance.md b/docs/performance.md new file mode 100644 index 0000000..701bd9d --- /dev/null +++ b/docs/performance.md @@ -0,0 +1,93 @@ +## Cluster Setup and Performance Tuning + +Data operations in Space can run distributedly in a Ray cluster. Ray nodes access the same Space dataset files via Cloud Storage or distributed file systems. + +## Setup + +### Cloud Storage + +Setup [GCS FUSE](https://cloud.google.com/storage/docs/gcs-fuse) to use files on Google Cloud Storage (GCS) (or [S3](https://github.com/s3fs-fuse/s3fs-fuse), [Azure](https://github.com/Azure/azure-storage-fuse)): + +```bash +gcsfuse "/path/to/" +``` + +Space has not yet implemented Cloud Storage file systems. FUSE is the current suggested approach. + +### Cluster Setup + +On the Ray cluster head/worker nodes: +```bash +# Start a Ray head node (IP 123.45.67.89, for example). +# See https://docs.ray.io/en/latest/ray-core/starting-ray.html for details. +ray start --head --port=6379 +``` + +Using [Cloud Storage + FUSE](#cloud-storage) is required in the distributed mode, because the Ray cluster and the client machine should operate on the same directory of files. The mapped local directory paths **must be the same**. + +Run the following code on the client machine to connect to the Ray cluster: +```py +import ray + +# Connect to the Ray cluster. +ray.init(address="ray://123.45.67.89:10001") +``` + +## Configure Space Options + +Create a Ray runner linking to a Space dataset or view to run operations in the Ray cluster. Use options to tune the performance. + +### Data Ingestion + +The [WebDataset ingestion example](/notebooks/webdataset_ingestion.ipynb) describes the setup in detail. The options to tune include: + +- `max_parallelism`: ingestion workload will run in parallel on Ray nodes, capped by this parallelism + +- `array_record_options`: set the [options of ArrayRecord lib](https://github.com/google/array_record/blob/2ac1d904f6be31e5aa2f09549774af65d84bff5a/cpp/array_record_writer.h#L83); Group size is the number of records to serialize together in one chunk. A lower value improves random access latency. However, a larger value is preferred on Cloud Storage, which performs better for batch read. A larger group size reduces the ArrayRecord file size. + +```py +# `ds_or_view` is a Space dataset or (materialized) view. +runner = ds_or_view.ray( + ray_options=RayOptions(max_parallelism=4), + file_options=FileOptions( + array_record_options=ArrayRecordOptions(options="group_size:64") + )) +``` + +### Data Read + +Data read in Ray runner has the following steps: + +- Obtain a list of index files to read, based on the filter and version. If a read `batch size` is provided, further split a file into row ranges. Each row range will be a [Ray data block](https://docs.ray.io/en/latest/data/api/doc/ray.data.block.Block.html). + +- When reading a block, first read the index file as an Arrow table. If there are record fields, read these fields from ArrayRecord files. + +The options to tune include: + +- `max_parallelism`: Ray read parallelism, controlls `parallelism` of [Datasource.get_read_tasks](https://docs.ray.io/en/latest/data/api/doc/ray.data.Datasource.get_read_tasks.html#ray.data.Datasource.get_read_tasks) + +- `batch_size`: a too small batch size will produce too many Ray blocks and have a negative performance impact. A large batch size will require reading many records from ArrayRecord files for each Ray block, which can be slow. + +Examples of setting read batch size in different scenarios: + +```py +ray_option = RayOptions(max_parallelism=4) + +iterator = ds.ray(ray_option).read(batch_size=64) + +mv.ray(ray_option).refresh(batch_size=64) + +ray_ds = ds.ray_dataset(ray_option, ReadOptions(batch_size=64)) +``` + +#### Read Data for Training + +Users can choose to store data fields in **Parquet** or **ArrayRecord** files (record fields). Space performance is similar to other Parquet based datasets when all fields are in Parquet. + +The ArrayRecord reader uses random access read at the granularity of `group size` records. Random access read performs well on local or high performance distributed file systems attached to the reader node (e.g., training VMs). However, the read performance degrades drastically on Cloud Storage, because of too many read RPCs (e.g., per record). The tips for **Cloud Storage** are: + +- For quasi sequential read, use a larger `group size` when ingesting data to Space. It can effectively improve read throughput by reducing number of RPC. But it helps only when adjacent records are read together. + +- For fully randomized read (the order to read records are shuffled), the Space dataset files should be first cached in a high performance file system. + +Random access read training has the benefit of lightweight global shuffling, deterministic training, and checkpointing training state. See the [Grain](https://github.com/google/grain) framework for more details. Integration with Grain is a TODO. diff --git a/notebooks/webdataset_ingestion.ipynb b/notebooks/webdataset_ingestion.ipynb new file mode 100644 index 0000000..6922eff --- /dev/null +++ b/notebooks/webdataset_ingestion.ipynb @@ -0,0 +1,191 @@ +{ + "cells": [ + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## Parallel Data Ingestion from WebDataset\n", + "\n", + "This example ingests data from a [WebDataset](https://github.com/webdataset/webdataset) to a Space dataset. The ingestion operation uses Ray runner to distribute the workload in a Ray cluster.\n", + "\n", + "We use [img2dataset](https://github.com/rom1504/img2dataset) to download popular ML datasets in the WebDataset format. Install the packages and download the COCO dataset following the [guide](https://github.com/rom1504/img2dataset/blob/main/dataset_examples/mscoco.md):" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": { + "vscode": { + "languageId": "shellscript" + } + }, + "outputs": [], + "source": [ + "pip install webdataset img2dataset \n", + "\n", + "wget https://huggingface.co/datasets/ChristophSchuhmann/MS_COCO_2017_URL_TEXT/resolve/main/mscoco.parquet\n", + "\n", + "img2dataset --url_list mscoco.parquet --input_format \"parquet\" \\\n", + " --url_col \"URL\" --caption_col \"TEXT\" --output_format \"webdataset\" \\\n", + " --output_folder \"mscoco\" --processes_count 16 --thread_count 64 --image_size 256 \\\n", + " --enable_wandb True" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "Downloaded the COCO WebDataset to \"COCO_DIR\" (local or in Cloud Storage). The next step is creating an empty Space dataset:" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "import pyarrow as pa\n", + "\n", + "from space import DirCatalog\n", + "from space import ArrayRecordOptions, FileOptions\n", + "\n", + "# The schema of a new Space dataset to create.\n", + "schema = pa.schema([\n", + " (\"key\", pa.string()),\n", + " (\"caption\", pa.binary()),\n", + " (\"jpg\", pa.binary())])\n", + "\n", + "catalog = DirCatalog(\"/path/to/my/tables\")\n", + "ds = catalog.create_dataset(\"coco\", schema, primary_keys=[\"key\"],\n", + " record_fields=[\"jpg\"]) # Store \"jpg\" in ArrayRecord files\n", + "\n", + "# Or load an existing dataset.\n", + "# ds = catalog.dataset(\"images_size64\")" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "Connect to a Ray runner and create a Ray runner for the dataset. See how to configure the options in the [setup and performance doc](/docs/performance.md#ray-runner-setup)." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "import ray\n", + "\n", + "# Connect to a Ray cluster. Or skip it to use a local Ray instance.\n", + "ray.init(address=\"ray://12.34.56.78:10001\")\n", + "\n", + "runner = ds.ray(\n", + " ray_options=RayOptions(max_parallelism=4),\n", + " file_options=FileOptions(\n", + " array_record_options=ArrayRecordOptions(options=\"group_size:64\")\n", + " ))" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "A WebDataset consists of a directory of tar files. A URL in form of `something-{000000..012345}.tar` representing a shard of the dataset, i.e., a subset of tar files. The following `read_webdataset` method returns an iterator to scan the shard described by a URL." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "import webdataset as wds\n", + "\n", + "# The size of a batch returned by the iterator.\n", + "BATCH_SIZE = 64\n", + "\n", + "def read_webdataset(shard_url: str):\n", + " print(f\"Processing URL: {shard_url}\")\n", + "\n", + " def to_dict(keys, captions, jpgs):\n", + " return {\"key\": keys, \"caption\": captions, \"jpg\": jpgs}\n", + "\n", + " ds = wds.WebDataset(shard_url)\n", + " keys, captions, jpgs = [], [], []\n", + " for i, sample in enumerate(ds):\n", + " keys.append(sample[\"__key__\"])\n", + " captions.append(sample[\"txt\"])\n", + " jpgs.append(sample[\"jpg\"])\n", + "\n", + " if len(keys) == BATCH_SIZE:\n", + " yield (to_dict(keys, captions, jpgs))\n", + " keys.clear()\n", + " captions.clear()\n", + " jpgs.clear()\n", + "\n", + " if len(keys) > 0:\n", + " yield (to_dict(keys, captions, jpgs))" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "The whole COCO dataset has 60 tar files. We split it into multiple shards to ingest data in parallel:" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "NUM_SHARDS = 60\n", + "NUM_WORKERS = 4\n", + "\n", + "COCO_DIR = \"/path/to/your/downloaded/coco\"\n", + "\n", + "shards_per_worker = NUM_SHARDS // NUM_WORKERS\n", + "shard_urls = []\n", + "for i in range(NUM_WORKERS):\n", + " start = f\"{i * shards_per_worker:05d}\"\n", + " end = f\"{min(NUM_SHARDS-1, (i + 1) * shards_per_worker - 1):05d}\"\n", + " shard_urls.append(COCO_DIR + f\"/{{{start}..{end}}}.tar\")" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "The `append_from` method takes a list of no-arg methods as input. Each method makes a new iterator that reads a shard of input data. The iterators are read in parallel on Ray nodes." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "from functools import partial\n", + "import time\n", + "\n", + "# A Ray remote fn to call `append_from` from the Ray cluster.\n", + "@ray.remote\n", + "def run():\n", + " runner.append_from([partial(read_webdataset, url) for url in shard_urls])\n", + "\n", + "# Start ingestion:\n", + "ray.get(run.remote())" + ] + } + ], + "metadata": { + "language_info": { + "name": "python" + } + }, + "nbformat": 4, + "nbformat_minor": 2 +} diff --git a/python/pyproject.toml b/python/pyproject.toml index eacb378..f466ce0 100644 --- a/python/pyproject.toml +++ b/python/pyproject.toml @@ -1,6 +1,6 @@ [project] name = "space-datasets" -version = "0.0.8" +version = "0.0.9" authors = [{ name = "Space team", email = "no-reply@google.com" }] description = "Unified storage framework for machine learning datasets" readme = "README.md" diff --git a/python/src/space/__init__.py b/python/src/space/__init__.py index 599adab..056ff2b 100644 --- a/python/src/space/__init__.py +++ b/python/src/space/__init__.py @@ -17,7 +17,8 @@ from space.catalogs.base import DatasetInfo from space.catalogs.directory import DirCatalog from space.core.datasets import Dataset -from space.core.options import JoinOptions, Range, ReadOptions +from space.core.options import (ArrayRecordOptions, FileOptions, JoinOptions, + ParquetWriterOptions, Range, ReadOptions) from space.core.runners import LocalRunner from space.core.random_access import RandomAccessDataSource from space.core.schema.types import File, TfFeatures