From fd53d8537c58512b1b4deacd1207bbc3de18405c Mon Sep 17 00:00:00 2001 From: votti Date: Wed, 15 Feb 2023 12:12:19 +0100 Subject: [PATCH] Adds example for tuning a kfp v1 pipeline with Katib This example illustrates how a full kfp pipeline can be tuned using Katib. It is based on a metrics collector to collect kubeflow pipeline metrics (#2019). This is used as a Custom Collector. Addresses: #1914, #2019 --- examples/v1beta1/kubeflow-pipelines/README.md | 14 +- .../kubeflow-kfpv1-opt-mnist.ipynb | 1084 +++++++++++++++++ 2 files changed, 1095 insertions(+), 3 deletions(-) create mode 100644 examples/v1beta1/kubeflow-pipelines/kubeflow-kfpv1-opt-mnist.ipynb diff --git a/examples/v1beta1/kubeflow-pipelines/README.md b/examples/v1beta1/kubeflow-pipelines/README.md index df1e2bf0041..b6e53c21555 100644 --- a/examples/v1beta1/kubeflow-pipelines/README.md +++ b/examples/v1beta1/kubeflow-pipelines/README.md @@ -3,6 +3,10 @@ The following examples show how to use Katib with [Kubeflow Pipelines](https://github.com/kubeflow/pipelines). +Two different aspects are illustrated here: +A) How to orchestrate Katib experiments from Kubeflow pipelines using the Katib Kubeflow Component (Example 1 & 2) +B) How to use Katib to tune parameters of Kubeflow pipelines + You can find the Katib Component source code for the Kubeflow Pipelines [here](https://github.com/kubeflow/pipelines/tree/master/components/kubeflow/katib-launcher). @@ -13,6 +17,8 @@ You have to install the following Python SDK to run these examples: - [`kfp`](https://pypi.org/project/kfp/) >= 1.8.12 - [`kubeflow-katib`](https://pypi.org/project/kubeflow-katib/) >= 0.13.0 +In order to run parameter tuning over Kubeflow pipelines, additionally Katib needs to be setup to run with Argo workflow tasks. The setup is described within the example notebook (3). + ## Multi-User Pipelines Setup The Notebooks examples run Pipelines in multi-user mode and your Kubeflow Notebook @@ -25,10 +31,12 @@ to give an access Kubeflow Notebook to run Kubeflow Pipelines. The following Pipelines are deployed from Kubeflow Notebook: -- [Kubeflow E2E MNIST](kubeflow-e2e-mnist.ipynb) +1) [Kubeflow E2E MNIST](kubeflow-e2e-mnist.ipynb) + +2) [Katib Experiment with Early Stopping](early-stopping.ipynb) -- [Katib Experiment with Early Stopping](early-stopping.ipynb) +3) [Tune parameters of a `MNIST` kubeflow pipeline with Katib](pipeline-parameters.ipynb) -The following Pipelines have to be compiled and uploaded to the Kubeflow Pipelines UI: +The following Pipelines have to be compiled and uploaded to the Kubeflow Pipelines UI for examples 1 & 2: - [MPIJob Horovod](mpi-job-horovod.py) diff --git a/examples/v1beta1/kubeflow-pipelines/kubeflow-kfpv1-opt-mnist.ipynb b/examples/v1beta1/kubeflow-pipelines/kubeflow-kfpv1-opt-mnist.ipynb new file mode 100644 index 00000000000..cc16c6d528a --- /dev/null +++ b/examples/v1beta1/kubeflow-pipelines/kubeflow-kfpv1-opt-mnist.ipynb @@ -0,0 +1,1084 @@ +{ + "cells": [ + { + "attachments": {}, + "cell_type": "markdown", + "metadata": {}, + "source": [ + "# Katib parameter tuning over Kubeflow Pipelines (V1)\n", + "\n", + "This example shows how parameter tunning can be done over a multistep Kubeflow pipeline.\n", + "\n", + "The pipeline consists of 4 steps:\n", + "- Download of the training images and labels from the original MNIST publication\n", + "- Prepartion of the training dataset\n", + "- Image pre-processing\n", + "- Model fitting\n", + "\n", + "The pipeline has the model has model fitting parameters as well as image pre-processing parameters exposed as a pipeline parameter for tuning. Katib will be used to explore the question if image preprocessing using a simple histogram normalization might improve a neural network training on MNIST.\n", + "\n", + "## Requirements\n", + "\n", + "This requires a Kubeflow installation with Katib and Pipelines.\n", + "\n", + "Additionally the Katib-Argo integration needs to be setup:\n", + "\n", + "If you are running on a full Kubeflow installation *do not reinstall or update Argo* as this will likely break your installation.\n", + "\n", + "Just run the following commands:\n", + "\n", + "Enable side-car injection:\n", + "\n", + "`kubectl patch namespace argo -p '{\"metadata\":{\"labels\":{\"katib.kubeflow.org/metrics-collector-injection\":\"enabled\"}}}'`\n", + "\n", + "\n", + "Verify that the emissary executor is active (should be default in newer Kubeflow installations):\n", + "\n", + "` kubectl get ConfigMap -n argo workflow-controller-configmap -o yaml | grep containerRuntimeExecutor`\n", + "\n", + "Patch the Katib controller:\n", + "\n", + "`kubectl patch ClusterRole katib-controller -n kubeflow --type=json \\\n", + " -p='[{\"op\": \"add\", \"path\": \"/rules/-\", \"value\": {\"apiGroups\":[\"argoproj.io\"],\"resources\":[\"workflows\"],\"verbs\":[\"get\", \"list\", \"watch\", \"create\", \"delete\"]}}]'\n", + "`\n", + "\n", + "`kubectl patch Deployment katib-controller -n kubeflow --type=json \\\n", + " -p='[{\"op\": \"add\", \"path\": \"/spec/template/spec/containers/0/args/-\", \"value\": \"--trial-resources=Workflow.v1alpha1.argoproj.io\"}]'`\n", + "\n", + "For more details and how to set this up on a partial Kubeflow installation follow:\n", + "https://github.com/kubeflow/katib/tree/master/examples/v1beta1/argo/README.mdd\n", + "If you are running on a full Kubeflow installation *DO NOT INSTALL ARGO* as this will likely break your installation.\n", + "\n", + "Just run the following commands:\n", + "\n", + "Enable side-car injection:\n", + "\n", + "`kubectl patch namespace argo -p '{\"metadata\":{\"labels\":{\"katib.kubeflow.org/metrics-collector-injection\":\"enabled\"}}}'`\n", + "\n", + "\n", + "Verify that the emissary executor is active (should be default in newer Kubeflow installations):\n", + "\n", + "` kubectl get ConfigMap -n argo workflow-controller-configmap -o yaml | grep containerRuntimeExecutor`\n", + "\n", + "Patch the Katib controller:\n", + "\n", + "`kubectl patch ClusterRole katib-controller -n kubeflow --type=json \\\n", + " -p='[{\"op\": \"add\", \"path\": \"/rules/-\", \"value\": {\"apiGroups\":[\"argoproj.io\"],\"resources\":[\"workflows\"],\"verbs\":[\"get\", \"list\", \"watch\", \"create\", \"delete\"]}}]'\n", + "`\n", + "\n", + "`kubectl patch Deployment katib-controller -n kubeflow --type=json \\\n", + " -p='[{\"op\": \"add\", \"path\": \"/spec/template/spec/containers/0/args/-\", \"value\": \"--trial-resources=Workflow.v1alpha1.argoproj.io\"}]'`\n", + "\n", + "For more details and how to set this up on a partial Kubeflow installation follow:\n", + "https://github.com/kubeflow/katib/tree/master/examples/v1beta1/argo/README.md\n", + "\n" + ] + }, + { + "attachments": { + "image.png": { + "image/png": "" + } + }, + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## Building the base Kubeflow pipeline\n", + "\n", + "The next steps will build up the following Kubeflow pipeline:\n", + "\n", + "![image.png](attachment:image.png)" + ] + }, + { + "attachments": {}, + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## Set default variables\n", + "\n", + "The following default variables should be changed when running the notebook" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "# Namespace to run the workloads under\n", + "USER_NAMESPACE = \"vito-zanotelli\"\n", + "# Pipeline service account\n", + "# On a Kubeflow instance on GCP this should be 'default-editor'\n", + "KFP_SERVICE_ACCOUNT = \"default-editor\"\n", + "\n", + "\n", + "# Consmetic variables\n", + "# Pipeline run variables\n", + "KFP_EXPERIMENT = \"katib-kfp-example\"\n", + "KFP_RUN = \"mnist-pipeline-v1\"\n", + "\n", + "# Katib run variables\n", + "KATIB_EXPERIMENT = \"katib-kfp-example-v1\"" + ] + }, + { + "attachments": {}, + "cell_type": "markdown", + "metadata": {}, + "source": [ + "### Install and load required python packages" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "# Install required packages (Kubeflow Pipelines and Katib SDK).\n", + "!pip install kfp==1.8.12\n", + "!pip install kubeflow-katib==0.13.0" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "import kfp\n", + "import kfp.components as components\n", + "import kfp.dsl as dsl\n", + "from kfp.components import InputPath, OutputPath, create_component_from_func" + ] + }, + { + "attachments": {}, + "cell_type": "markdown", + "metadata": {}, + "source": [ + "### Initialize the Kubeflow pipeline client\n", + "\n", + "Documentation how this is done in various environments: https://www.kubeflow.org/docs/components/pipelines/v1/sdk/connect-api/" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "kpf_client = kfp.Client()" + ] + }, + { + "attachments": {}, + "cell_type": "markdown", + "metadata": {}, + "source": [ + "### Get the downloader component\n", + "\n", + "This is a publicly available, generic downloader we use to download the raw MNIST data" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "download_data_op = components.load_component_from_url(\n", + " \"https://raw.githubusercontent.com/kubeflow/pipelines/master/components/contrib/web/Download/component.yaml\"\n", + ")" + ] + }, + { + "attachments": {}, + "cell_type": "markdown", + "metadata": {}, + "source": [ + "### Parse the MNIST raw data format\n", + "\n", + "This is a component from text that converts the raw MNIST data format into a tensorflow compatible format." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "parse_mnist_op = components.load_component_from_text(\n", + " \"\"\"\n", + "name: Parse MNIST\n", + "inputs:\n", + "- {name: Images, description: gziped images in the idx format}\n", + "- {name: Labels, description: gziped labels in the idx format}\n", + "outputs:\n", + "- {name: Dataset}\n", + "metadata:\n", + " annotations:\n", + " author: Vito Zanotelli, D-ONE.ai\n", + " description: Based on https://github.com/kubeflow/pipelines/blob/master/components/contrib/sample/Python_script/component.yaml\n", + "implementation:\n", + " container:\n", + " image: tensorflow/tensorflow:2.7.1\n", + " command:\n", + " - sh\n", + " - -ec\n", + " - |\n", + " # This is how additional packages can be installed dynamically\n", + " python3 -m pip install pip idx2numpy\n", + " # Run the rest of the command after installing the packages.\n", + " \"$0\" \"$@\"\n", + " - python3\n", + " - -u # Auto-flush. We want the logs to appear in the console immediately.\n", + " - -c # Inline scripts are easy, but have size limitaions and the error traces do not show source lines.\n", + " - |\n", + " import gzip\n", + " import idx2numpy\n", + " import sys\n", + " from pathlib import Path\n", + " import pickle\n", + " import tensorflow as tf\n", + " img_path = sys.argv[1]\n", + " label_path = sys.argv[2]\n", + " output_path = sys.argv[3]\n", + " with gzip.open(img_path, 'rb') as f:\n", + " x = idx2numpy.convert_from_string(f.read())\n", + " with gzip.open(label_path, 'rb') as f:\n", + " y = idx2numpy.convert_from_string(f.read())\n", + " #one-hot encode the categories\n", + " x_out = tf.convert_to_tensor(x)\n", + " y_out = tf.keras.utils.to_categorical(y)\n", + " Path(output_path).parent.mkdir(parents=True, exist_ok=True)\n", + " with open(output_path, 'wb') as output_file:\n", + " pickle.dump((x_out, y_out), output_file)\n", + " - {inputPath: Images}\n", + " - {inputPath: Labels}\n", + " - {outputPath: Dataset}\n", + "\"\"\"\n", + ")" + ] + }, + { + "attachments": {}, + "cell_type": "markdown", + "metadata": {}, + "source": [ + "### Process the images\n", + "\n", + "This does the pre-processing of the images, including a training-validation split.\n", + "\n", + "Here also an optional `histogram_norm` image normalization step can be activated" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "def process(\n", + " data_raw_path: InputPath(str), # type: ignore\n", + " data_processed_path: OutputPath(str), # type: ignore\n", + " val_pct: float = 0.2,\n", + " trainset_flag: bool = True,\n", + " histogram_norm: bool = False,\n", + "):\n", + " \"\"\"\n", + " Here we do all the preprocessing\n", + " if the data path is for training data we:\n", + " (1) Normalize the data\n", + " (2) split the train and val data\n", + " If it is for unseen test data, we:\n", + " (1) Normalize the data\n", + " This function returns in any case the processed data path\n", + " \"\"\"\n", + " # sklearn\n", + " import pickle\n", + " from sklearn.model_selection import train_test_split\n", + " import tensorflow as tf\n", + " import tensorflow_addons as tfa\n", + "\n", + " def img_norm(x):\n", + " x_ = tf.reshape(x / 255, list(x.shape) + [1])\n", + "\n", + " if histogram_norm:\n", + " x_ = tfa.image.equalize(x_)\n", + " return x_\n", + "\n", + " with open(data_raw_path, \"rb\") as f:\n", + " x, y = pickle.load(f)\n", + " if trainset_flag:\n", + "\n", + " x_ = img_norm(x)\n", + " x_train, x_val, y_train, y_val = train_test_split(\n", + " x_.numpy(), y, test_size=val_pct, stratify=y, random_state=42\n", + " )\n", + "\n", + " with open(data_processed_path, \"wb\") as output_file:\n", + " pickle.dump((x_train, y_train, x_val, y_val), output_file)\n", + "\n", + " else:\n", + " x_ = img_norm(x)\n", + " with open(data_processed_path, \"wb\") as output_file:\n", + " pickle.dump((x_, y), output_file)" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "process_op = create_component_from_func(\n", + " func=process,\n", + " base_image=\"tensorflow/tensorflow:2.7.1\", # Optional\n", + " packages_to_install=[\"scikit-learn\", \"tensorflow-addons[tensorflow]\"], # Optional\n", + ")" + ] + }, + { + "attachments": {}, + "cell_type": "markdown", + "metadata": {}, + "source": [ + "### Training component\n", + "\n", + "Component with ML hyperparameters as parameters.\n", + "Note that the `metrics` that should be tracked by Katib need to be\n", + "saved as ML metrics output artifacts.\n" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "def train(\n", + " data_train_path: InputPath(str), # type: ignore\n", + " model_out_path: OutputPath(str), # type: ignore\n", + " mlpipeline_metrics_path: OutputPath(\"Metrics\"), # type: ignore # noqa: F821\n", + " lr: float = 1e-4,\n", + " optimizer: str = \"Adam\",\n", + " loss: str = \"categorical_crossentropy\",\n", + " epochs: int = 1,\n", + " batch_size: int = 32,\n", + "):\n", + " \"\"\"\n", + " This is the simulated train part of our ML pipeline where training is performed\n", + " \"\"\"\n", + "\n", + " import tensorflow as tf\n", + " import pickle\n", + " from tensorflow.keras.preprocessing.image import ImageDataGenerator\n", + " import json\n", + "\n", + " with open(data_train_path, \"rb\") as f:\n", + " x_train, y_train, x_val, y_val = pickle.load(f)\n", + "\n", + " model = tf.keras.Sequential(\n", + " [\n", + " tf.keras.layers.Conv2D(\n", + " 64, (3, 3), activation=\"relu\", input_shape=(28, 28, 1)\n", + " ),\n", + " tf.keras.layers.MaxPooling2D(2, 2),\n", + " tf.keras.layers.Conv2D(64, (3, 3), activation=\"relu\"),\n", + " tf.keras.layers.MaxPooling2D(2, 2),\n", + " tf.keras.layers.Flatten(),\n", + " tf.keras.layers.Dense(128, activation=\"relu\"),\n", + " tf.keras.layers.Dense(10, activation=\"softmax\"),\n", + " ]\n", + " )\n", + "\n", + " if optimizer.lower() == \"sgd\":\n", + " optimizer = tf.keras.optimizers.SGD(lr)\n", + " else:\n", + " optimizer = tf.keras.optimizers.Adam(lr)\n", + "\n", + " model.compile(loss=loss, optimizer=optimizer, metrics=[\"accuracy\"])\n", + "\n", + " # fit the model\n", + " model_early_stopping_callback = tf.keras.callbacks.EarlyStopping(\n", + " monitor=\"val_accuracy\", patience=10, verbose=1, restore_best_weights=True\n", + " )\n", + "\n", + " train_datagen = ImageDataGenerator()\n", + "\n", + " validation_datagen = ImageDataGenerator()\n", + " history = model.fit(\n", + " train_datagen.flow(x_train, y_train, batch_size=batch_size),\n", + " epochs=epochs,\n", + " validation_data=validation_datagen.flow(x_val, y_val, batch_size=batch_size),\n", + " shuffle=False,\n", + " callbacks=[model_early_stopping_callback],\n", + " )\n", + "\n", + " model.save(model_out_path, save_format=\"tf\")\n", + "\n", + " metrics = {\n", + " \"metrics\": [\n", + " {\n", + " \"name\": \"accuracy\", # The name of the metric. Visualized as the column name in the runs table.\n", + " \"numberValue\": history.history[\"accuracy\"][\n", + " -1\n", + " ], # The value of the metric. Must be a numeric value.\n", + " \"format\": \"PERCENTAGE\", # The optional format of the metric. Supported values are \"RAW\" (displayed in raw format) and \"PERCENTAGE\" (displayed in percentage format).\n", + " },\n", + " {\n", + " \"name\": \"val-accuracy\", # The name of the metric. Visualized as the column name in the runs table.\n", + " \"numberValue\": history.history[\"val_accuracy\"][\n", + " -1\n", + " ], # The value of the metric. Must be a numeric value.\n", + " \"format\": \"PERCENTAGE\", # The optional format of the metric. Supported values are \"RAW\" (displayed in raw format) and \"PERCENTAGE\" (displayed in percentage format).\n", + " },\n", + " ]\n", + " }\n", + " with open(mlpipeline_metrics_path, \"w\") as f:\n", + " json.dump(metrics, f)\n", + "\n", + "\n", + "train_op = create_component_from_func(\n", + " func=train, base_image=\"tensorflow/tensorflow:2.7.1\", packages_to_install=[\"scipy\"]\n", + ")" + ] + }, + { + "attachments": {}, + "cell_type": "markdown", + "metadata": {}, + "source": [ + "### Build the full pipeline\n", + "\n", + "These wires the components to a full pipeline.\n", + "\n", + "The only thing required to make the pipeline Katib compatible is:\n", + "\n", + "1) A pod label to mark the pod from which the metrics tracked by Katib should be collected from: \"katib.kubeflow.org/model-training\", \"true\"\n", + "2) A mark to prevent caching on this pod: `execution_options.caching_strategy.max_cache_staleness = \"P0D\"`\n", + "\n", + "In addition, currently the pod label for caching seems not be added by default and thus the cache is not used. To enable cache usage, the cache label is added to all the steps.\n", + "\n", + "Apart from these two requirements, there is no restriction on how the pipeline is build. The pipeline remains a normal Kubeflow pipeline." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "def _label_cache(step):\n", + " \"\"\"Helper to add pod cache label\n", + "\n", + " Currently there seems to be an issue with pod labeling.\n", + " \"\"\"\n", + " step.add_pod_label(\"pipelines.kubeflow.org/cache_enabled\", \"true\")" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "@dsl.pipeline(\n", + " name=\"Download MNIST dataset\",\n", + " description=\"A pipeline to download the MNIST dataset files\",\n", + ")\n", + "def mnist_training_pipeline(\n", + " lr: float = 1e-4,\n", + " optimizer: str = \"Adam\",\n", + " loss: str = \"categorical_crossentropy\",\n", + " epochs: int = 3,\n", + " batch_size: int = 5,\n", + " histogram_norm: bool = False,\n", + "):\n", + " TRAIN_IMG_URL = \"http://yann.lecun.com/exdb/mnist/train-images-idx3-ubyte.gz\"\n", + " TRAIN_LAB_URL = \"http://yann.lecun.com/exdb/mnist/train-labels-idx1-ubyte.gz\"\n", + "\n", + " train_imgs = download_data_op(TRAIN_IMG_URL)\n", + " train_imgs.set_display_name(\"Download training images\")\n", + " _label_cache(train_imgs)\n", + "\n", + " train_y = download_data_op(TRAIN_LAB_URL)\n", + " train_y.set_display_name(\"Download training labels\")\n", + " _label_cache(train_y)\n", + "\n", + " mnist_train = parse_mnist_op(train_imgs.output, train_y.output)\n", + " mnist_train.set_display_name(\"Prepare train dataset\")\n", + " _label_cache(mnist_train)\n", + "\n", + " processed_train = (\n", + " process_op(\n", + " mnist_train.output,\n", + " val_pct=0.2,\n", + " trainset_flag=True,\n", + " histogram_norm=histogram_norm,\n", + " )\n", + " .set_cpu_limit(\"1\")\n", + " .set_memory_limit(\"2Gi\")\n", + " .set_display_name(\"Preprocess images\")\n", + " )\n", + " _label_cache(processed_train)\n", + "\n", + " training_output = (\n", + " train_op(\n", + " processed_train.outputs[\"data_processed\"],\n", + " lr=lr,\n", + " optimizer=optimizer,\n", + " epochs=epochs,\n", + " batch_size=batch_size,\n", + " loss=loss,\n", + " )\n", + " .set_cpu_limit(\"1\")\n", + " .set_memory_limit(\"2Gi\")\n", + " )\n", + " training_output.set_display_name(\"Fit the model\")\n", + " # This pod label indicates which pod Katib should collect the metric from.\n", + " # A metrics collecting sidecar container will be added\n", + " training_output.add_pod_label(\"katib.kubeflow.org/model-training\", \"true\")\n", + " # This step needs to run always, as otherwise the metrics for Katib could not\n", + " # be collected.\n", + " training_output.execution_options.caching_strategy.max_cache_staleness = \"P0D\"\n", + "\n", + " return mnist_train.output" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "run = kfp_client.create_run_from_pipeline_func(\n", + " mnist_training_pipeline,\n", + " mode=kfp.dsl.PipelineExecutionMode.V1_LEGACY,\n", + " # You can optionally override your pipeline_root when submitting the run too:\n", + " # pipeline_root='gs://my-pipeline-root/example-pipeline',\n", + " arguments={\"histogram_norm\": \"0\"},\n", + " experiment_name=KFP_EXPERIMENT,\n", + " run_name=KFP_RUN,\n", + " namespace=USER_NAMESPACE,\n", + ")" + ] + }, + { + "attachments": {}, + "cell_type": "markdown", + "metadata": {}, + "source": [ + "# Parameter tuning with Katib\n", + "\n", + "We now want to do parameter tuning over the whole pipeline with Katib.\n", + "\n", + "This requires us to build up a specificaiton for the Katib experiment" + ] + }, + { + "attachments": {}, + "cell_type": "markdown", + "metadata": {}, + "source": [ + "First import the Katib python components:" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "import yaml\n", + "from typing import List\n", + "\n", + "from kubernetes.client.models import V1ObjectMeta\n", + "from kubeflow.katib import ApiClient\n", + "from kubeflow.katib import KatibClient\n", + "from kubeflow.katib import V1beta1Experiment\n", + "from kubeflow.katib import V1beta1ExperimentSpec\n", + "from kubeflow.katib import V1beta1AlgorithmSpec\n", + "from kubeflow.katib import V1beta1ObjectiveSpec\n", + "from kubeflow.katib import V1beta1ParameterSpec\n", + "from kubeflow.katib import V1beta1FeasibleSpace\n", + "from kubeflow.katib import V1beta1TrialTemplate\n", + "from kubeflow.katib import V1beta1TrialParameterSpec\n", + "from kubeflow.katib import V1beta1MetricsCollectorSpec\n", + "from kubeflow.katib import V1beta1CollectorSpec" + ] + }, + { + "attachments": {}, + "cell_type": "markdown", + "metadata": {}, + "source": [ + "In order to build a katib experiment, we require a trial spec.\n", + "\n", + "In this case the trial spec is an Argo workflow produced form the Kubeflow pipeline.\n", + "\n", + "This workflow can be run thanks to the Katib-Argo integration that was setup in the requirements section.\n", + "\n", + "\n", + "The Katib Experiment consists of many components, that we next will setup using custom built helper functions:" + ] + }, + { + "attachments": {}, + "cell_type": "markdown", + "metadata": {}, + "source": [ + "### Helper functions to build the individual Katib Experiment Components\n" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "def create_trial_spec(\n", + " pipeline, params_list: List[dsl.PipelineParam], service_account: str | None = None\n", + "):\n", + " \"\"\"\n", + " Create an Argo workflow specification from a KFP pipeline function\n", + "\n", + " The Argo worklow CRD will be the basis for the trial_template used\n", + " by Katib.\n", + "\n", + " Args:\n", + " pipeline: a kubeflow pipeline function\n", + " params_list (List[dsl.PipelineParam]): a list of mappings of Kubeflow pipeline parameters\n", + " to Katib trialParameters.\n", + " These need to map the pipeline parameter to the Katib parameter.\n", + " Eg: [dsl.PipelineParam(name='lr', value='${trialParameters.learningRate}')]\n", + " here `lr` is the PipelineParam and `trialParameters.learningRate` the Katib trialParameter.\n", + "\n", + " \"\"\"\n", + " compiler = kfp.compiler.Compiler(\n", + " mode=kfp.dsl.PipelineExecutionMode.V1_LEGACY,\n", + " )\n", + " # Here the pipeline parameters are passed.\n", + " # These will be generated in the Katib trials\n", + " trial_spec = compiler._create_workflow(pipeline, params_list=params_list)\n", + " # Somehow the pipeline is configured with the wrong serviceAccountName by default\n", + " if service_account is not None:\n", + " trial_spec[\"spec\"][\"serviceAccountName\"] = service_account\n", + "\n", + " return trial_spec" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "def create_trial_template(\n", + " trial_spec,\n", + " trial_param_specs: List[V1beta1TrialParameterSpec],\n", + " retain_pods: bool = False,\n", + ") -> V1beta1TrialTemplate:\n", + " \"\"\"Generate a trial template from the spec\n", + "\n", + " This takes the Argo workflow CRD and wrapps it as a\n", + " Katib trial template.\n", + " Here the Katib trial parameters are defined.\n", + "\n", + " Args:\n", + " trial_spec (Argo workflow spec): The workflow/pipeline to tune\n", + " trial_params_spec (List[V1beta1TrialParameterSpec]): The trial parameter specifications\n", + " Note that the `name` of the parameters needs to match the names refered to by the\n", + " create_trial_spec `params_list` arguments.\n", + " The `ref` needs to match the names used in the parameter space defined in `V1beta1ParameterSpec`.\n", + "\n", + " Returns:\n", + " V1beta1TrialTemplate: the trial template\n", + " \"\"\"\n", + "\n", + " trial_template = V1beta1TrialTemplate(\n", + " primary_container_name=\"main\", # Name of the primary container returning the metrics in the workflow\n", + " # The label used for the pipeline component returning the pipeline specs\n", + " primary_pod_labels={\"katib.kubeflow.org/model-training\": \"true\"},\n", + " trial_parameters=trial_param_specs,\n", + " trial_spec=trial_spec,\n", + " success_condition='status.[@this].#(phase==\"Succeeded\")#',\n", + " failure_condition='status.[@this].#(phase==\"Failed\")#',\n", + " retain=retain_pods, # Retain completed pods - left hear for easier debugging\n", + " )\n", + " return trial_template" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "def create_metrics_collector_spec(objective: V1beta1ObjectiveSpec):\n", + " \"\"\"This defines the custom metrics collector\n", + "\n", + " This custom metrics connector was built to collect\n", + " Kubeflow pipeline MLmetrics from a step.\n", + "\n", + " Args:\n", + " objective (V1beta1ObjectiveSpec): the objective spec used to get the metrics names\n", + "\n", + " \"\"\"\n", + "\n", + " metric_names = [objective.objective_metric_name] + list(\n", + " objective.additional_metric_names\n", + " )\n", + " collector = V1beta1MetricsCollectorSpec(\n", + " source={\n", + " \"fileSystemPath\": {\n", + " # In KFP v1 this seems to be the hardcoded location\n", + " # for this output file..\n", + " \"path\": \"/tmp/outputs/mlpipeline_metrics/data\",\n", + " \"kind\": \"File\",\n", + " }\n", + " },\n", + " collector=V1beta1CollectorSpec(\n", + " kind=\"Custom\",\n", + " custom_collector={\n", + " \"args\": [\n", + " \"-m\",\n", + " f\"{';'.join(metric_names)}\",\n", + " \"-s\",\n", + " \"katib-db-manager.kubeflow:6789\",\n", + " \"-t\",\n", + " \"$(PodName)\",\n", + " \"-path\",\n", + " \"/tmp/outputs/mlpipeline_metrics\",\n", + " ],\n", + " \"image\": \"votti/kfpv1-metricscollector:v0.0.10\",\n", + " \"imagePullPolicy\": \"Always\",\n", + " \"name\": \"custom-metrics-logger-and-collector\",\n", + " \"env\": [\n", + " {\n", + " # In this setup the PodName can be used to\n", + " # infer the `trial name` required to report back\n", + " # the metrics.\n", + " \"name\": \"PodName\",\n", + " \"valueFrom\": {\"fieldRef\": {\"fieldPath\": \"metadata.name\"}},\n", + " }\n", + " ],\n", + " },\n", + " ),\n", + " )\n", + " return collector" + ] + }, + { + "attachments": {}, + "cell_type": "markdown", + "metadata": {}, + "source": [ + "### Final helper function to create experiments from pipelines\n", + "\n", + "\n", + "This helper function is the main entry point to train pipelines." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "def create_katib_experiment_spec(\n", + " pipeline: dsl.Pipeline,\n", + " pipeline_params: List[dsl.PipelineParam],\n", + " trial_params: List[V1beta1TrialParameterSpec],\n", + " trial_params_space: List[V1beta1ParameterSpec],\n", + " objective: V1beta1ObjectiveSpec,\n", + " algorithm: V1beta1AlgorithmSpec,\n", + " max_trial_count: int = 2,\n", + " max_failed_trial_count: int = 2,\n", + " parallel_trial_count: int = 2,\n", + " pipeline_service_account: str | None = None,\n", + " retain_pods: bool = False,\n", + ") -> V1beta1ExperimentSpec:\n", + " \"\"\"Construct a Katib experiment over a KFP pipeline\n", + "\n", + " Args:\n", + " pipeline (dsl.Pipeline): The Kubeflow Pipeline\n", + " pipeline_params (List[dsl.PipelineParam]): A mapping of trial-parameters to pipeline parameters.\n", + " Example: [\n", + " dsl.PipelineParam(name=\"lr\", value=\"${trialParameters.learningRate}\"),\n", + " ...\n", + " ]\n", + " trial_params (List[V1beta1TrialParameterSpec]): Spec for Trial parameters. Note that name\n", + " and refs need to match the ones used in `pipeline_params` and `trial_params_space`\n", + " Example: [\n", + " V1beta1TrialParameterSpec(\n", + " name=\"learningRate\",\n", + " description=\"Learning rate for the training model\",\n", + " reference=\"learning_rate\",\n", + " ), ...]\n", + " trial_params_space (List[V1beta1ParameterSpec]): The spec for the parameter space explored in the\n", + " Trials\n", + " Example: [\n", + " V1beta1ParameterSpec(\n", + " name=\"learning_rate\",\n", + " parameter_type=\"double\",\n", + " feasible_space=V1beta1FeasibleSpace(min=\"0.00001\", max=\"0.001\"),\n", + " ), ...]\n", + " objective (V1beta1ObjectiveSpec): objective spec. The names used here\n", + " need to match the metrics reported by the pipeline.\n", + " Example: V1beta1ObjectiveSpec(\n", + " type=\"maximize\",\n", + " goal=0.9,\n", + " objective_metric_name=\"val-accuracy\",\n", + " additional_metric_names=[\"accuracy\"],\n", + " )\n", + " algorithm (V1beta1AlgorithmSpec): algorithm spec\n", + " Example: V1beta1AlgorithmSpec(\n", + " algorithm_name=\"random\",\n", + " )\n", + " max_trial_count (int, optional): Max total number of trials. Defaults to 2.\n", + " max_failed_trial_count (int, optional): Number of failed trials tolerated. Defaults to 2.\n", + " parallel_trial_count (int, optional): Number of trials run in parallel. Defaults to 2.\n", + " pipeline_service_account (str | None, optional): Name of the service account to run\n", + " pipelines with. Defaults to None (uses pre-configured default).\n", + " On a Kubeflow GCP deployment this should be set to `default-editor`\n", + " retain_pods (bool): retain pods (good for debugging). Default: false\n", + "\n", + " Returns:\n", + " V1beta1ExperimentSpec: Katib experiment spec\n", + " \"\"\"\n", + "\n", + " trial_spec = create_trial_spec(\n", + " pipeline, pipeline_params, service_account=pipeline_service_account\n", + " )\n", + "\n", + " # Configure parameters for the Trial template.\n", + " trial_template = create_trial_template(\n", + " trial_spec, trial_params, retain_pods=retain_pods\n", + " )\n", + "\n", + " # Metrics collector spec\n", + " metrics_collector = create_metrics_collector_spec(objective=objective)\n", + "\n", + " # Create an Experiment from the above parameters.\n", + " experiment_spec = V1beta1ExperimentSpec(\n", + " # Experimental Budget\n", + " max_trial_count=max_trial_count,\n", + " max_failed_trial_count=max_failed_trial_count,\n", + " parallel_trial_count=parallel_trial_count,\n", + " # Optimization Objective\n", + " objective=objective,\n", + " # Optimization Algorithm\n", + " algorithm=algorithm,\n", + " # Optimization Parameters\n", + " parameters=trial_params_space,\n", + " # Trial Template\n", + " trial_template=trial_template,\n", + " # Metrics collector\n", + " metrics_collector_spec=metrics_collector,\n", + " )\n", + "\n", + " return experiment_spec" + ] + }, + { + "attachments": {}, + "cell_type": "markdown", + "metadata": {}, + "source": [ + "### Tune the MNIST pipeline using Katib\n", + "\n", + "First prepare all required input" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "pipeline_params = [\n", + " dsl.PipelineParam(name=\"lr\", value=\"${trialParameters.learningRate}\"),\n", + " dsl.PipelineParam(name=\"batch_size\", value=\"${trialParameters.batchSize}\"),\n", + " dsl.PipelineParam(name=\"histogram_norm\", value=\"${trialParameters.histogramNorm}\"),\n", + "]\n", + "trial_params_specs = [\n", + " V1beta1TrialParameterSpec(\n", + " name=\"learningRate\", # the parameter name that is replaced in your template (see Trial Specification).\n", + " description=\"Learning rate for the training model\",\n", + " reference=\"learning_rate\", # the parameter name that experiment’s suggestion returns (parameter name in the Parameters Specification).\n", + " ),\n", + " V1beta1TrialParameterSpec(\n", + " name=\"batchSize\",\n", + " description=\"Batch size for NN training\",\n", + " reference=\"batch_size\",\n", + " ),\n", + " V1beta1TrialParameterSpec(\n", + " name=\"histogramNorm\",\n", + " description=\"Histogram normalization of image on?\",\n", + " reference=\"histogram_norm\",\n", + " ),\n", + "]\n", + "parameter_space = [\n", + " V1beta1ParameterSpec(\n", + " name=\"learning_rate\",\n", + " parameter_type=\"double\",\n", + " feasible_space=V1beta1FeasibleSpace(min=\"0.00001\", max=\"0.001\"),\n", + " ),\n", + " V1beta1ParameterSpec(\n", + " name=\"batch_size\",\n", + " parameter_type=\"int\",\n", + " feasible_space=V1beta1FeasibleSpace(min=\"16\", max=\"64\"),\n", + " ),\n", + " V1beta1ParameterSpec(\n", + " name=\"histogram_norm\",\n", + " parameter_type=\"discrete\",\n", + " feasible_space=V1beta1FeasibleSpace(list=[\"0\", \"1\"]),\n", + " ),\n", + "]\n", + "objective = V1beta1ObjectiveSpec(\n", + " type=\"maximize\",\n", + " goal=0.9,\n", + " objective_metric_name=\"val-accuracy\",\n", + " additional_metric_names=[\"accuracy\"],\n", + ")\n", + "\n", + "algorithm = V1beta1AlgorithmSpec(\n", + " algorithm_name=\"random\",\n", + ")" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "# Prepare the full spec\n", + "\n", + "katib_spec = create_katib_experiment_spec(\n", + " pipeline=mnist_training_pipeline,\n", + " pipeline_params=pipeline_params,\n", + " trial_params=trial_params_specs,\n", + " trial_params_space=parameter_space,\n", + " objective=objective,\n", + " algorithm=algorithm,\n", + " pipeline_service_account=KFP_SERVICE_ACCOUNT,\n", + " max_trial_count=5,\n", + " parallel_trial_count=5,\n", + " retain_pods=False,\n", + ")" + ] + }, + { + "attachments": {}, + "cell_type": "markdown", + "metadata": {}, + "source": [ + "In order to generate a full experiment the api_version, kind and namespace need to be defined:" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "katib_experiment = V1beta1Experiment(\n", + " api_version=\"kubeflow.org/v1beta1\",\n", + " kind=\"Experiment\",\n", + " metadata=V1ObjectMeta(\n", + " name=KATIB_EXPERIMENT,\n", + " namespace=USER_NAMESPACE,\n", + " ),\n", + " spec=katib_spec,\n", + ")" + ] + }, + { + "attachments": {}, + "cell_type": "markdown", + "metadata": {}, + "source": [ + "The generated yaml can written out to submit via the web ui:" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "with open(\"experiment_template_kfp_mnist_v1.yaml\", \"w\") as f:\n", + " yaml.dump(ApiClient().sanitize_for_serialization(katib_experiment), f)" + ] + }, + { + "attachments": {}, + "cell_type": "markdown", + "metadata": {}, + "source": [ + "Or sumitted via the KatibClient:" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "katib_client = KatibClient()" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "katib_client.create_experiment(katib_experiment)" + ] + }, + { + "attachments": {}, + "cell_type": "markdown", + "metadata": {}, + "source": [ + "You should now be able to observe in the Web UI how the Katib\n", + "Experiment is running.\n", + "\n", + "To see how the `Argo Workflows` are started, you can also check the Kubernetes cluster:\n", + "\n", + "`kubectl get Workflow -n `" + ] + } + ], + "metadata": { + "kernelspec": { + "display_name": "katib-exp", + "language": "python", + "name": "python3" + }, + "language_info": { + "codemirror_mode": { + "name": "ipython", + "version": 3 + }, + "file_extension": ".py", + "mimetype": "text/x-python", + "name": "python", + "nbconvert_exporter": "python", + "pygments_lexer": "ipython3", + "version": "3.11.0" + }, + "vscode": { + "interpreter": { + "hash": "346a4e9d8b8e6802b68a0916b92683cfb1882082eeafaaae0a3525ab995e1047" + } + } + }, + "nbformat": 4, + "nbformat_minor": 2 +}