Skip to content

Commit

Permalink
Use new custom metrics collector
Browse files Browse the repository at this point in the history
This uses the new custom KFP V1 metrics collector that can directly extract
metrics from Kubeflow Pipeline metrics.

With this collector, to measure metrics of a Kubeflow pipeline
only requires to a) add the label which step is the `model-training`
b) diseable caching for this step
c) configure the katib metrics collector.

Also all the information is added now, such that the Katib pipeline
can be run via the KatibClient.

Addresses:
- kubeflow/katib#1914
- kubeflow/katib#2019

k
  • Loading branch information
votti committed Feb 10, 2023
1 parent a33cbf4 commit e196b9b
Show file tree
Hide file tree
Showing 3 changed files with 231 additions and 150 deletions.
2 changes: 1 addition & 1 deletion .pre-commit-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ repos:
rev: 5.0.4
hooks:
- id: flake8
args: ['--ignore=E501,W503,E203']
args: ['--ignore=E501,W503,E203,E402']

- repo: https://github.com/pre-commit/mirrors-mypy
rev: v0.982
Expand Down
207 changes: 127 additions & 80 deletions notebooks/mnist_pipeline_v1.ipynb
Original file line number Diff line number Diff line change
Expand Up @@ -247,13 +247,28 @@
" with open(metrics_log_path, \"w\") as f:\n",
" f.write(f\"val-accuracy={history.history['val_accuracy'][0]}\\n\")\n",
" f.write(f\"accuracy={history.history['accuracy'][0]}\\n\")\n",
" print(metrics_log_path)\n",
"\n",
"\n",
"train_op = create_component_from_func(\n",
" func=train, base_image=\"tensorflow/tensorflow:2.7.1\", packages_to_install=[\"scipy\"]\n",
")"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"def _label_cache(step):\n",
" \"\"\"Helper to add pod cache label\n",
"\n",
" Somehow in our configuration the wrong cache label is applied :/\n",
" \"\"\"\n",
" step.add_pod_label(\"pipelines.kubeflow.org/cache_enabled\", \"true\")"
]
},
{
"cell_type": "code",
"execution_count": null,
Expand All @@ -276,24 +291,31 @@
"\n",
" train_imgs = download_data_op(TRAIN_IMG_URL)\n",
" train_imgs.set_display_name(\"Download training images\")\n",
" _label_cache(train_imgs)\n",
"\n",
" train_y = download_data_op(TRAIN_LAB_URL)\n",
" train_y.set_display_name(\"Download training labels\")\n",
" _label_cache(train_y)\n",
"\n",
" mnist_train = parse_mnist_op(train_imgs.output, train_y.output)\n",
" mnist_train.set_display_name(\"Prepare train dataset\")\n",
"\n",
" processed_train = process_op(mnist_train.output, val_pct=0.2, trainset_flag=True)\n",
" processed_train.set_display_name(\"Preprocess images\")\n",
"\n",
" model_out = train_op(\n",
" processed_train.output,\n",
" lr=lr,\n",
" optimizer=optimizer,\n",
" epochs=epochs,\n",
" batch_size=batch_size,\n",
" loss=loss,\n",
" training_output = (\n",
" train_op(\n",
" processed_train.outputs[\"data_processed\"],\n",
" lr=lr,\n",
" optimizer=optimizer,\n",
" epochs=epochs,\n",
" batch_size=batch_size,\n",
" loss=loss,\n",
" )\n",
" .set_cpu_limit(\"1\")\n",
" .set_memory_limit(\"1Gi\")\n",
" )\n",
" model_out.set_display_name(\"Fit the model\")\n",
" training_output.set_display_name(\"Fit the model\")\n",
" return mnist_train.output"
]
},
Expand All @@ -310,65 +332,23 @@
" # pipeline_root='gs://my-pipeline-root/example-pipeline',\n",
" arguments={},\n",
" experiment_name=\"mnist\",\n",
" run_name=\"training_mnist_classifier\",\n",
" run_name=\"training_mnist_classifier_13\",\n",
" namespace=\"vito-zanotelli\",\n",
")"
]
},
{
"attachments": {},
"cell_type": "markdown",
"metadata": {},
"source": [
"# Parameter tuning with Katib\n",
"\n",
"We now want to do parameter tuning over the pipeline with Katib\n",
"\n",
"To do this, we need to extend the pipeline with a step to do metrics collection.\n",
"This is currently done in a hacky way using a separate step/component"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"print_file_op = components.load_component_from_text(\n",
" \"\"\"\n",
"name: Sleepy print file\n",
"description: Helper container to print input to stdout.\n",
" Contains a sleep step to prevent race conditions in container\n",
" startup.\n",
" Note that this command is broken - but currently this seems to work\n",
" well together with the broken metrics collection sidecar injection of Katib\n",
" that will modify this command by concatenating it.\n",
" \n",
"inputs:\n",
"- {name: data, type: String}\n",
"We now want to do parameter tuning over the pipeline with Katib.\n",
"\n",
"implementation:\n",
" container:\n",
" image: busybox\n",
" command:\n",
" - sleep 60 &&\n",
" - cat\n",
" - {inputPath: data}\n",
"\"\"\"\n",
")"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"def _label_cache(step):\n",
" \"\"\"Helper to add pod cache label\n",
"\n",
" Somehow in our configuration the wrong cache label is applied :/\n",
" \"\"\"\n",
" step.add_pod_label(\"pipelines.kubeflow.org/cache_enabled\", \"true\")"
"This requires:\n",
"- adding a label to the step from which parameters should be collected\n",
"- preventing that the step generating parameters is not skipped due to caching\n"
]
},
{
Expand Down Expand Up @@ -402,28 +382,27 @@
" processed_train = process_op(mnist_train.output, val_pct=0.2, trainset_flag=True)\n",
" processed_train.set_display_name(\"Preprocess images\")\n",
"\n",
" training_output = train_op(\n",
" processed_train.outputs[\"data_processed\"],\n",
" lr=lr,\n",
" optimizer=optimizer,\n",
" epochs=epochs,\n",
" batch_size=batch_size,\n",
" loss=loss,\n",
" training_output = (\n",
" train_op(\n",
" processed_train.outputs[\"data_processed\"],\n",
" lr=lr,\n",
" optimizer=optimizer,\n",
" epochs=epochs,\n",
" batch_size=batch_size,\n",
" loss=loss,\n",
" )\n",
" .set_cpu_limit(\"1\")\n",
" .set_memory_limit(\"1Gi\")\n",
" )\n",
" _label_cache(training_output)\n",
"\n",
" # The printing op is required such that the metrics collector sidecar\n",
" # can collect the metrics.\n",
" # This is a workaround required as the side-car injection of katib\n",
" # breaks complex commands, such as the one of the python training operator\n",
" po = print_file_op(training_output.outputs[\"metrics_log\"])\n",
" training_output.set_display_name(\"Fit the model\")\n",
" # This step needs to run always, as otherwise the metrics cannot be collected.\n",
" # Other steps are cached if appropriate\n",
" po.execution_options.caching_strategy.max_cache_staleness = \"P0D\"\n",
" training_output.execution_options.caching_strategy.max_cache_staleness = \"P0D\"\n",
"\n",
" # This pod label indicates which pod Katib should collect the metric from.\n",
" # A metrics collecting sidecar container will be added\n",
" po.add_pod_label(\"katib.kubeflow.org/model-training\", \"true\")\n",
" _label_cache(po)"
" training_output.add_pod_label(\"katib.kubeflow.org/model-training\", \"true\")"
]
},
{
Expand All @@ -432,8 +411,6 @@
"metadata": {},
"outputs": [],
"source": [
"# Note: this pipeline will not work, as the print op is only working when the Metrics collector sidecar is injected\n",
"\n",
"run = client.create_run_from_pipeline_func(\n",
" mnist_training_pipeline_katib,\n",
" mode=kfp.dsl.PipelineExecutionMode.V1_LEGACY,\n",
Expand Down Expand Up @@ -568,6 +545,48 @@
" return trial_template"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"def create_metrics_collector_spec():\n",
" \"\"\"This defines the custom metrics collector\"\"\"\n",
" return {\n",
" \"source\": {\n",
" \"fileSystemPath\": {\n",
" \"path\": \"/tmp/outputs/mlpipeline_metrics/data\",\n",
" \"kind\": \"File\",\n",
" }\n",
" },\n",
" \"collector\": {\n",
" \"kind\": \"Custom\",\n",
" \"customCollector\": {\n",
" \"args\": [\n",
" \"-m\",\n",
" \"val-accuracy;accuracy\",\n",
" \"-s\",\n",
" \"katib-db-manager.kubeflow:6789\",\n",
" \"-t\",\n",
" \"$(PodName)\",\n",
" \"-path\",\n",
" \"/tmp/outputs/mlpipeline_metrics\",\n",
" ],\n",
" \"image\": \"votti/kfpv1-metricscollector:v0.0.10\",\n",
" \"imagePullPolicy\": \"Always\",\n",
" \"name\": \"custom-metrics-logger-and-collector\",\n",
" \"env\": [\n",
" {\n",
" \"name\": \"PodName\",\n",
" \"valueFrom\": {\"fieldRef\": {\"fieldPath\": \"metadata.name\"}},\n",
" }\n",
" ],\n",
" },\n",
" },\n",
" }"
]
},
{
"cell_type": "code",
"execution_count": null,
Expand Down Expand Up @@ -613,6 +632,9 @@
" # Configure parameters for the Trial template.\n",
" trial_template = create_trial_template(trial_spec)\n",
"\n",
" # Metrics collector spec\n",
" metrics_collector = create_metrics_collector_spec()\n",
"\n",
" # Create an Experiment from the above parameters.\n",
" experiment_spec = V1beta1ExperimentSpec(\n",
" # Experimental Budget\n",
Expand All @@ -627,6 +649,8 @@
" parameters=parameters,\n",
" # Trial Template\n",
" trial_template=trial_template,\n",
" # Metrics collector\n",
" metrics_collector_spec=metrics_collector,\n",
" )\n",
"\n",
" return experiment_spec"
Expand Down Expand Up @@ -664,10 +688,11 @@
]
},
{
"attachments": {},
"cell_type": "markdown",
"metadata": {},
"source": [
"In order to generate a full experiment, name and namespace need to be defined"
"In order to generate a full experiment the api_version, kind and namespace need to be defined:"
]
},
{
Expand All @@ -677,19 +702,22 @@
"outputs": [],
"source": [
"katib_experiment = V1beta1Experiment(\n",
" api_version=\"kubeflow.org/v1beta1\",\n",
" kind=\"Experiment\",\n",
" metadata=V1ObjectMeta(\n",
" name=\"katib-kfp-mnist\",\n",
" namespace=client.get_user_namespace(),\n",
" name=\"katib-kfp-mnist-custom-31\",\n",
" namespace=\"vito-zanotelli\",\n",
" ),\n",
" spec=katib_spec,\n",
")"
]
},
{
"attachments": {},
"cell_type": "markdown",
"metadata": {},
"source": [
"The generated yaml can be submitted via web ui - I did not manage yet to submitt via sdk"
"The generated yaml can written out to submit via the web ui:"
]
},
{
Expand All @@ -698,16 +726,35 @@
"metadata": {},
"outputs": [],
"source": [
"with open(\"experiment_template_kfp_mnist.yaml\", \"w\") as f:\n",
"with open(\"experiment_template_kfp_mnist_4.yaml\", \"w\") as f:\n",
" yaml.dump(ApiClient().sanitize_for_serialization(katib_experiment), f)"
]
},
{
"attachments": {},
"cell_type": "markdown",
"metadata": {},
"source": [
"Or sumitted via the KatibClient:"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": []
"source": [
"client = KatibClient()"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"client.create_experiment(katib_experiment)"
]
}
],
"metadata": {
Expand Down
Loading

0 comments on commit e196b9b

Please sign in to comment.