diff --git a/python/aistore/pytorch/iter_dataset.py b/python/aistore/pytorch/iter_dataset.py index 94e201d1005..0351593b256 100644 --- a/python/aistore/pytorch/iter_dataset.py +++ b/python/aistore/pytorch/iter_dataset.py @@ -57,5 +57,4 @@ def __iter__(self): disable=not self._show_progress, force_tty=False, ): - # Update each object to use WorkerRequestSession for multithreading support yield obj.name, obj.get(etl_name=self._etl_name).read_all() diff --git a/python/aistore/sdk/bucket.py b/python/aistore/sdk/bucket.py index efb42f8bd93..4f541c81bba 100644 --- a/python/aistore/sdk/bucket.py +++ b/python/aistore/sdk/bucket.py @@ -760,7 +760,7 @@ def put_files( Args: path (str): Local filepath, can be relative or absolute prefix_filter (str, optional): Only put files with names starting with this prefix - pattern (str, optional): Regex pattern to filter files + pattern (str, optional): Shell-style wildcard pattern to filter files basename (bool, optional): Whether to use the file names only as object names and omit the path information prepend (str, optional): Optional string to use as a prefix in the object name for all objects uploaded No delimiter ("/", "-", etc.) is automatically applied between the prepend value and the object name diff --git a/python/examples/aisio-pytorch/cifar10_training_example.ipynb b/python/examples/aisio-pytorch/cifar10_training_example.ipynb new file mode 100644 index 00000000000..eead2fb2473 --- /dev/null +++ b/python/examples/aisio-pytorch/cifar10_training_example.ipynb @@ -0,0 +1,666 @@ +{ + "cells": [ + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "# PyTorch: Training CIFAR-10 using the AIStore Pytorch Plugin" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "We train a classifier using the CIFAR-10 Dataset ([Learning Multiple Layers of Features from Tiny Images](https://www.cs.toronto.edu/~kriz/learning-features-2009-TR.pdf), Alex Krizhevsky, 2009). Note that this notebook was run on `Python 3.11.9`." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "! pip install -r ../../aistore/common_requirements --quiet\n", + "! pip install -r ../../aistore/pytorch/dev_requirements --quiet\n", + "! pip install pillow torchvision" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "#### 1). Import necessary packages, define constants, and create AIS Client." + ] + }, + { + "cell_type": "code", + "execution_count": 1, + "metadata": {}, + "outputs": [], + "source": [ + "try:\n", + " from aistore.sdk import Client\n", + " from aistore.pytorch import AISBaseIterDataset\n", + " from aistore.sdk.ais_source import AISSource\n", + "except:\n", + "\n", + " # Use local version of aistore if pip version is too old\n", + " import sys\n", + " sys.path.append(\"../../\")\n", + "\n", + " from aistore.sdk import Client\n", + " from aistore.pytorch import AISBaseIterDataset\n", + " from aistore.sdk.ais_source import AISSource\n", + "import requests\n", + "import tarfile\n", + "from io import BytesIO\n", + "import pickle\n", + "from torch.utils.data import DataLoader\n", + "import torchvision.transforms as transforms\n", + "from torch.utils.data import DataLoader\n", + "import torch\n", + "from PIL import Image\n", + "\n", + "from itertools import islice\n", + "from typing import List, Union, Dict\n", + "from torch.utils.data import get_worker_info" + ] + }, + { + "cell_type": "code", + "execution_count": 2, + "metadata": {}, + "outputs": [], + "source": [ + "AIS_ENDPOINT = \"http://localhost:8080\"\n", + "AIS_PROVIDER = \"ais\"\n", + "BCK_NAME = \"data-bck\"\n", + "\n", + "DATASET_URL = \"https://www.cs.toronto.edu/~kriz/cifar-10-python.tar.gz\"\n", + "\n", + "client = Client(endpoint=AIS_ENDPOINT)" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "#### 2). Load the CIFAR-10 dataset into a bucket in our AIS Cluster." + ] + }, + { + "cell_type": "code", + "execution_count": 3, + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "Downloading dataset...\n", + "Putting files into buckets...\n", + "Done putting files into buckets.\n", + "Cleaned up downloaded dataset.\n" + ] + } + ], + "source": [ + "bucket = client.bucket(BCK_NAME, AIS_PROVIDER)\n", + "try:\n", + " bucket.create(exist_ok=False)\n", + "\n", + " print(\"Downloading dataset...\")\n", + " filename = \"cifar-10-python.tar.gz\"\n", + "\n", + " headers = {\n", + " \"User-Agent\": \"Mozilla/5.0\" \n", + " }\n", + " response = requests.get(DATASET_URL, headers=headers, stream=True)\n", + " response.raise_for_status()\n", + "\n", + " data = BytesIO(response.content)\n", + "\n", + " members = []\n", + " with tarfile.open(fileobj=data, mode=\"r:gz\") as file:\n", + " members = file.getmembers()\n", + "\n", + " print(\"Putting files into buckets...\")\n", + "\n", + " for member in members:\n", + " if member.isfile():\n", + " extracted_file = file.extractfile(member)\n", + " bucket.object(member.name).put_content(extracted_file.read())\n", + " \n", + " print(\"Done putting files into buckets.\")\n", + "\n", + " print(\"Cleaned up downloaded dataset.\")\n", + "except Exception as e:\n", + " print(\"Bucket already has dataset! Nothing will be done.\")\n", + " bucket.create(exist_ok=True)" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "#### 3). Extend AISBaseIterDataset to easily create an Iterable-style dataset that can parse CIFAR-10 pickle files." + ] + }, + { + "cell_type": "code", + "execution_count": 14, + "metadata": {}, + "outputs": [], + "source": [ + "class CIFAR10Dataset(AISBaseIterDataset):\n", + " def __init__(\n", + " self,\n", + " cifar_ais_source: AISSource,\n", + " transform: transforms.Compose,\n", + " prefix_map: Dict[AISSource, Union[str, List[str]]] = {}\n", + " ):\n", + " super().__init__(cifar_ais_source, prefix_map)\n", + " self._transform = transform\n", + " self._reset_iterator()\n", + "\n", + " def __iter__(self):\n", + " worker_info = get_worker_info() \n", + " \n", + " if worker_info is None or worker_info.num_workers == 1:\n", + " # If not using multiple workers, load directly\n", + " current_iterator = self._iterator\n", + " else:\n", + " current_iterator = islice(self._iterator, worker_info.id, None, worker_info.num_workers)\n", + "\n", + " # Slice iterator based on worker id as starting index (0, 1, 2, ..) and steps of total workers\n", + " for obj in current_iterator:\n", + " obj_file = BytesIO(obj.get().read_all())\n", + "\n", + " cifar_dict = pickle.load(obj_file, encoding='bytes')\n", + "\n", + " data = cifar_dict[b\"data\"]\n", + " labels = cifar_dict[b\"labels\"]\n", + "\n", + " # Ensure the data is in the correct shape\n", + " assert data.shape == (10000, 3072), \"CIFAR data files each have shape (10000, 3072)\"\n", + " assert len(labels) == 10000, \"CIFAR files each store 10000 labels\"\n", + " \n", + " # Reshape the data to (10000, 3, 32, 32)\n", + " reshaped_data = data.reshape(10000, 3, 32, 32)\n", + " \n", + " # Transpose the data to (10000, 32, 32, 3) to match the image format (height, width, channels)\n", + " reshaped_data = reshaped_data.transpose(0, 2, 3, 1)\n", + " \n", + " # Get list of images tensors\n", + " image_tensors = [self._transform(Image.fromarray(image.astype('uint8'), 'RGB')) for image in reshaped_data]\n", + "\n", + " yield from iter(zip(image_tensors, labels))" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "#### 4). Define our transformation (Image -> Tensor and normalzing mean/stdev)." + ] + }, + { + "cell_type": "code", + "execution_count": 15, + "metadata": {}, + "outputs": [], + "source": [ + "transform = transforms.Compose(\n", + " [\n", + " transforms.Resize((128, 128)), # reduce this size if your training exits due to OOM or decrease batch size\n", + " transforms.ToTensor(),\n", + " transforms.Normalize((0.5, 0.5, 0.5), (0.5, 0.5, 0.5))\n", + " ]\n", + ")" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "#### 5). Create our training and validation datasets. \n", + "\n", + "Note that we use the same bucket for both, but by using the `prefix_map` arg, we can control which AIS objects are used in each dataset" + ] + }, + { + "cell_type": "code", + "execution_count": 16, + "metadata": {}, + "outputs": [], + "source": [ + "\"\"\"\n", + "Prefix map allows us to choose specific objects from the bucket to be included in the dataset.\n", + "The map is a dictionary mapping the AISSource (bucket) to a list of prefixes (in this case object names).\n", + "If prefix map is None or not specified, then all objects will be included.\n", + "\"\"\"\n", + "\n", + "train_dataset = CIFAR10Dataset(\n", + " cifar_ais_source=bucket,\n", + " prefix_map={\n", + " bucket: [\n", + " \"cifar-10-batches-py/data_batch_1\",\n", + " \"cifar-10-batches-py/data_batch_2\",\n", + " \"cifar-10-batches-py/data_batch_3\",\n", + " \"cifar-10-batches-py/data_batch_4\",\n", + " \"cifar-10-batches-py/data_batch_5\",\n", + " ]\n", + " },\n", + " transform=transform\n", + ")\n", + "\n", + "test_dataset = CIFAR10Dataset(\n", + " cifar_ais_source=bucket,\n", + " prefix_map={\n", + " bucket: \"cifar-10-batches-py/test_batch\"\n", + " },\n", + " transform=transform\n", + ")" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "### 6). Create our training and validation dataloaders and configure workers and batch size. " + ] + }, + { + "cell_type": "code", + "execution_count": 17, + "metadata": {}, + "outputs": [], + "source": [ + "BATCH_SIZE = 5\n", + "NUM_WORKERS = 3\n", + "\n", + "training_loader = DataLoader(\n", + " dataset=train_dataset,\n", + " num_workers=NUM_WORKERS,\n", + " batch_size = BATCH_SIZE,\n", + ")\n", + "\n", + "validation_loader = DataLoader(\n", + " dataset=test_dataset,\n", + " num_workers=NUM_WORKERS,\n", + " batch_size = BATCH_SIZE,\n", + ")" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "### 7). Define a multi-layer Convolutional Neural Network (CNN) to use as our model." + ] + }, + { + "cell_type": "code", + "execution_count": 18, + "metadata": {}, + "outputs": [], + "source": [ + "class CIFARClassifier(torch.nn.Module):\n", + " def __init__(self):\n", + " super(CIFARClassifier, self).__init__()\n", + " \n", + " # Convolutional layers with batch normalization\n", + " self.conv1 = torch.nn.Conv2d(3, 32, 3, padding=1)\n", + " self.bn1 = torch.nn.BatchNorm2d(32)\n", + " self.conv2 = torch.nn.Conv2d(32, 64, 3, padding=1)\n", + " self.bn2 = torch.nn.BatchNorm2d(64)\n", + " self.conv3 = torch.nn.Conv2d(64, 128, 3, padding=1)\n", + " self.bn3 = torch.nn.BatchNorm2d(128)\n", + " self.conv4 = torch.nn.Conv2d(128, 256, 3, padding=1)\n", + " self.bn4 = torch.nn.BatchNorm2d(256)\n", + " \n", + " # Pooling layer\n", + " self.pool = torch.nn.MaxPool2d(2, 2)\n", + " \n", + " # Adaptive pooling layer to handle variable input sizes\n", + " self.adaptive_pool = torch.nn.AdaptiveAvgPool2d((2, 2))\n", + " \n", + " # Fully connected layers\n", + " self.fc1 = torch.nn.Linear(256 * 2 * 2, 512)\n", + " self.fc2 = torch.nn.Linear(512, 10)\n", + " \n", + " # Dropout layer\n", + " self.dropout = torch.nn.Dropout(0.5)\n", + " \n", + " def forward(self, x):\n", + " x = self.pool(torch.functional.F.relu(self.bn1(self.conv1(x))))\n", + " x = self.pool(torch.functional.F.relu(self.bn2(self.conv2(x))))\n", + " x = self.pool(torch.functional.F.relu(self.bn3(self.conv3(x))))\n", + " x = self.pool(torch.functional.F.relu(self.bn4(self.conv4(x))))\n", + " \n", + " # Apply adaptive pooling to ensure the output size is (2, 2)\n", + " x = self.adaptive_pool(x)\n", + " \n", + " # Flatten the image\n", + " x = x.view(-1, 256 * 2 * 2)\n", + " x = self.dropout(x)\n", + " x = torch.functional.F.relu(self.fc1(x))\n", + " x = self.dropout(x)\n", + " x = self.fc2(x)\n", + " return x" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "### 8). Define additional hyperparameters, create our classifier, loss function, and optimizer, and define class names." + ] + }, + { + "cell_type": "code", + "execution_count": 19, + "metadata": {}, + "outputs": [], + "source": [ + "LEARNING_RATE = 0.0001 #0.001\n", + "MOMENTUM = 0.9\n", + "EPOCH_NUM = 19\n", + "WEIGHT_DECAY = 0.000125\n", + "\n", + "classifier = CIFARClassifier()\n", + "criterion = torch.nn.CrossEntropyLoss()\n", + "optimizer = torch.optim.SGD(\n", + " classifier.parameters(), \n", + " lr=LEARNING_RATE, \n", + " momentum=MOMENTUM,\n", + " weight_decay=WEIGHT_DECAY,\n", + ")\n", + "\n", + "classes = (\n", + " \"airplane\",\t\t\t\t\t\t\t\t\t\n", + " \"automobile\",\t\t\t\t\t\t\t\t\t\n", + " \"bird\",\t\t\t\t\t\t\t\t\n", + " \"cat\",\t\t\t\t\t\t\t\t\t\n", + " \"deer\",\t\t\t\t\t\t\t\t\t\n", + " \"dog\",\t\t\t\t\t\t\t\t\t\n", + " \"frog\",\t\t\t\t\t\t\t\t\t\t\n", + " \"horse\",\t\t\t\t\t\t\t\t\t\t\n", + " \"ship\",\t\t\t\t\t\t\t\t\t\n", + " \"truck\"\n", + ")" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "### 9). Train the model by passing the data through our model and performing back prop to calculate gradients. \n", + "\n", + "Then, validate the trained model at each epoch." + ] + }, + { + "cell_type": "code", + "execution_count": 20, + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "EPOCH 1\n", + "-----------\n" + ] + }, + { + "name": "stdout", + "output_type": "stream", + "text": [ + "Samples Processed: 50000, Loss: 0.04675844684243202\n", + "Accuracy: 44.52%\n", + "\n", + "EPOCH 2\n", + "-----------\n", + "Samples Processed: 50000, Loss: 0.04545032978057861\n", + "Accuracy: 57.24%\n", + "\n", + "EPOCH 3\n", + "-----------\n", + "Samples Processed: 50000, Loss: 0.04300606623291969\n", + "Accuracy: 65.3%\n", + "\n", + "EPOCH 4\n", + "-----------\n", + "Samples Processed: 50000, Loss: 0.0419011227786541\n", + "Accuracy: 69.52%\n", + "\n", + "EPOCH 5\n", + "-----------\n", + "Samples Processed: 50000, Loss: 0.039300207048654556\n", + "Accuracy: 72.0%\n", + "\n", + "EPOCH 6\n", + "-----------\n", + "Samples Processed: 50000, Loss: 0.03518728166818619\n", + "Accuracy: 73.37%\n", + "\n", + "EPOCH 7\n", + "-----------\n", + "Samples Processed: 50000, Loss: 0.03255374729633331\n", + "Accuracy: 74.45%\n", + "\n", + "EPOCH 8\n", + "-----------\n", + "Samples Processed: 50000, Loss: 0.03144019469618797\n", + "Accuracy: 75.51%\n", + "\n", + "EPOCH 9\n", + "-----------\n", + "Samples Processed: 50000, Loss: 0.025155983865261078\n", + "Accuracy: 75.28%\n", + "\n", + "EPOCH 10\n", + "-----------\n", + "Samples Processed: 50000, Loss: 0.020765958353877068\n", + "Accuracy: 74.65%\n", + "\n", + "EPOCH 11\n", + "-----------\n", + "Samples Processed: 50000, Loss: 0.019731147214770317\n", + "Accuracy: 74.52%\n", + "\n", + "EPOCH 12\n", + "-----------\n", + "Samples Processed: 50000, Loss: 0.017680812627077103\n", + "Accuracy: 76.17%\n", + "\n", + "EPOCH 13\n", + "-----------\n", + "Samples Processed: 50000, Loss: 0.015884069725871086\n", + "Accuracy: 76.02%\n", + "\n", + "EPOCH 14\n", + "-----------\n", + "Samples Processed: 50000, Loss: 0.0066224741749465466\n", + "Accuracy: 74.6%\n", + "\n", + "EPOCH 15\n", + "-----------\n", + "Samples Processed: 50000, Loss: 0.009886597283184528\n", + "Accuracy: 74.41%\n", + "\n", + "EPOCH 16\n", + "-----------\n", + "Samples Processed: 50000, Loss: 0.011452939361333847\n", + "Accuracy: 75.75%\n", + "\n", + "EPOCH 17\n", + "-----------\n", + "Samples Processed: 50000, Loss: 0.0064519476145505905\n", + "Accuracy: 76.68%\n", + "\n", + "EPOCH 18\n", + "-----------\n", + "Samples Processed: 50000, Loss: 0.0025990409776568413\n", + "Accuracy: 76.2%\n", + "\n", + "EPOCH 19\n", + "-----------\n", + "Samples Processed: 50000, Loss: 0.0025861309841275215\n", + "Accuracy for class airplane: 84.9%\n", + "Accuracy for class automobile: 92.3%\n", + "Accuracy for class bird: 76.4%\n", + "Accuracy for class cat: 55.7%\n", + "Accuracy for class deer: 70.3%\n", + "Accuracy for class dog: 74.0%\n", + "Accuracy for class frog: 79.0%\n", + "Accuracy for class horse: 76.5%\n", + "Accuracy for class ship: 83.6%\n", + "Accuracy for class truck: 75.7%\n", + "Accuracy: 76.84%\n", + "\n", + "-----------\n", + "Finished Training\n" + ] + } + ], + "source": [ + "for epoch in range(EPOCH_NUM): # loop over the dataset multiple times\n", + "\n", + " print(f\"EPOCH {epoch + 1}\\n-----------\")\n", + "\n", + " loss = 0\n", + " i = 0\n", + " for i, (images, labels) in enumerate(training_loader):\n", + " \n", + " # zero the parameter gradients\n", + " optimizer.zero_grad()\n", + "\n", + " # forward + backward + optimize\n", + " outputs = classifier(images)\n", + " loss = criterion(outputs, labels)\n", + " loss.backward()\n", + " optimizer.step()\n", + "\n", + " # print statistics\n", + " loss += loss.item()\n", + "\n", + " print(f\"Samples Processed: {(i+1) * BATCH_SIZE}, Loss: {loss / 100}\")\n", + "\n", + " # Validation\n", + " classifier.eval()\n", + " predictions = {name: [0, 0] for name in classes}\n", + " with torch.no_grad():\n", + " correct = 0\n", + " total = 0\n", + " for images, labels in validation_loader:\n", + "\n", + " if len(labels) != BATCH_SIZE:\n", + " print(len(labels))\n", + " outputs = classifier(images)\n", + " _, predicted = torch.max(outputs.data, 1)\n", + "\n", + " correct += (predicted == labels).sum().item()\n", + " total += len(labels)\n", + "\n", + " if epoch == EPOCH_NUM - 1:\n", + " for label, prediction in zip(labels, predicted):\n", + " if label == prediction:\n", + " predictions[classes[label]][0] += 1\n", + " predictions[classes[label]][1] += 1\n", + "\n", + " if epoch == EPOCH_NUM - 1:\n", + " for name, data in predictions.items():\n", + " successes, guesses = data[0], data[1]\n", + " print(f\"Accuracy for class {name}: {100 * successes / guesses}%\")\n", + "\n", + " print(f\"Accuracy: {100 * correct / total}%\\n\")\n", + "\n", + "print(\"-----------\\nFinished Training\")\n", + "\n" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "### EXTRA). Visualize each class using our dataset and matplotlib." + ] + }, + { + "cell_type": "code", + "execution_count": 21, + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "cat ship airplane frog automobile truck dog horse deer bird " + ] + }, + { + "data": { + "image/png": "", + "text/plain": [ + "
" + ] + }, + "metadata": {}, + "output_type": "display_data" + } + ], + "source": [ + "import matplotlib.pyplot as plt\n", + "from torchvision.utils import make_grid\n", + "\n", + "# Same as test dataset without normalization\n", + "vis_dataset = CIFAR10Dataset(\n", + " cifar_ais_source=bucket,\n", + " prefix_map={\n", + " bucket: \"cifar-10-batches-py/test_batch\"\n", + " },\n", + " transform=transforms.Compose([\n", + " transforms.ToTensor(),\n", + " ])\n", + ")\n", + "\n", + "matched_images = {}\n", + "\n", + "for image, label in vis_dataset: \n", + " if label not in matched_images:\n", + " matched_images[label] = image\n", + " if len(matched_images.keys()) == len(classes):\n", + " break\n", + "\n", + "# Create a grid from the images and show them\n", + "img_grid = make_grid([image for image in matched_images.values()])\n", + "plt.imshow(img_grid.permute(1, 2, 0))\n", + "for label in matched_images.keys():\n", + " print(classes[label], end=\" \")" + ] + } + ], + "metadata": { + "kernelspec": { + "display_name": "dev", + "language": "python", + "name": "python3" + }, + "language_info": { + "codemirror_mode": { + "name": "ipython", + "version": 3 + }, + "file_extension": ".py", + "mimetype": "text/x-python", + "name": "python", + "nbconvert_exporter": "python", + "pygments_lexer": "ipython3", + "version": "3.11.9" + } + }, + "nbformat": 4, + "nbformat_minor": 2 +} diff --git a/python/examples/aisio-pytorch/multishard_dataset_example.ipynb b/python/examples/aisio-pytorch/multishard_dataset_example.ipynb index eb59cbbb5ab..22075b7588d 100644 --- a/python/examples/aisio-pytorch/multishard_dataset_example.ipynb +++ b/python/examples/aisio-pytorch/multishard_dataset_example.ipynb @@ -152,7 +152,7 @@ "metadata": {}, "outputs": [], "source": [ - "dataset = AISMultiShardStream(data_sorces=[shard1, shard2])\n", + "dataset = AISMultiShardStream(data_sources=[shard1, shard2])\n", "\n", "for data in dataset:\n", " text_content, class_content = data\n", diff --git a/python/examples/aisio-pytorch/shard_reader_example.ipynb b/python/examples/aisio-pytorch/shard_reader_example.ipynb index 82f8882f037..a7bcdb3c834 100644 --- a/python/examples/aisio-pytorch/shard_reader_example.ipynb +++ b/python/examples/aisio-pytorch/shard_reader_example.ipynb @@ -52,7 +52,7 @@ "cell_type": "markdown", "metadata": {}, "source": [ - "### Populate the bucket with WebDataset formatted shards using the AIS CLI\"\"\"\n", + "### Populate the bucket with WebDataset formatted shards using the AIS CLI\n", "\n", "To only get one shard (recommended, each shard is quite big).\n", "```console\n",