From 7190bf3757a33143e4969598a55685344c73a15b Mon Sep 17 00:00:00 2001
From: Vismita Uppalli <32617204+vuppalli@users.noreply.github.com>
Date: Wed, 15 Jul 2020 19:24:29 +0000
Subject: [PATCH] Add guide for AI Platform (previously Machine Learning
Engine) Operators (#9798)
---
.../cloud/example_dags/example_mlengine.py | 30 +++
.../google/cloud/operators/mlengine.py | 36 +++
docs/build | 1 -
docs/howto/operator/google/cloud/mlengine.rst | 233 ++++++++++++++++++
docs/operators-and-hooks-ref.rst | 4 +-
5 files changed, 301 insertions(+), 3 deletions(-)
create mode 100644 docs/howto/operator/google/cloud/mlengine.rst
diff --git a/airflow/providers/google/cloud/example_dags/example_mlengine.py b/airflow/providers/google/cloud/example_dags/example_mlengine.py
index b3ed17249e55e..f805340814f3d 100644
--- a/airflow/providers/google/cloud/example_dags/example_mlengine.py
+++ b/airflow/providers/google/cloud/example_dags/example_mlengine.py
@@ -62,6 +62,7 @@
schedule_interval=None, # Override to match your needs
tags=['example'],
) as dag:
+ # [START howto_operator_gcp_mlengine_training]
training = MLEngineStartTrainingJobOperator(
task_id="training",
project_id=PROJECT_ID,
@@ -74,7 +75,9 @@
training_python_module=TRAINER_PY_MODULE,
training_args=[],
)
+ # [END howto_operator_gcp_mlengine_training]
+ # [START howto_operator_gcp_mlengine_create_model]
create_model = MLEngineCreateModelOperator(
task_id="create-model",
project_id=PROJECT_ID,
@@ -82,18 +85,24 @@
"name": MODEL_NAME,
},
)
+ # [END howto_operator_gcp_mlengine_create_model]
+ # [START howto_operator_gcp_mlengine_get_model]
get_model = MLEngineGetModelOperator(
task_id="get-model",
project_id=PROJECT_ID,
model_name=MODEL_NAME,
)
+ # [END howto_operator_gcp_mlengine_get_model]
+ # [START howto_operator_gcp_mlengine_print_model]
get_model_result = BashOperator(
bash_command="echo \"{{ task_instance.xcom_pull('get-model') }}\"",
task_id="get-model-result",
)
+ # [END howto_operator_gcp_mlengine_print_model]
+ # [START howto_operator_gcp_mlengine_create_version1]
create_version = MLEngineCreateVersionOperator(
task_id="create-version",
project_id=PROJECT_ID,
@@ -108,7 +117,9 @@
"pythonVersion": "3.7"
}
)
+ # [END howto_operator_gcp_mlengine_create_version1]
+ # [START howto_operator_gcp_mlengine_create_version2]
create_version_2 = MLEngineCreateVersionOperator(
task_id="create-version-2",
project_id=PROJECT_ID,
@@ -123,25 +134,33 @@
"pythonVersion": "3.7"
}
)
+ # [END howto_operator_gcp_mlengine_create_version2]
+ # [START howto_operator_gcp_mlengine_default_version]
set_defaults_version = MLEngineSetDefaultVersionOperator(
task_id="set-default-version",
project_id=PROJECT_ID,
model_name=MODEL_NAME,
version_name="v2",
)
+ # [END howto_operator_gcp_mlengine_default_version]
+ # [START howto_operator_gcp_mlengine_list_versions]
list_version = MLEngineListVersionsOperator(
task_id="list-version",
project_id=PROJECT_ID,
model_name=MODEL_NAME,
)
+ # [END howto_operator_gcp_mlengine_list_versions]
+ # [START howto_operator_gcp_mlengine_print_versions]
list_version_result = BashOperator(
bash_command="echo \"{{ task_instance.xcom_pull('list-version') }}\"",
task_id="list-version-result",
)
+ # [END howto_operator_gcp_mlengine_print_versions]
+ # [START howto_operator_gcp_mlengine_get_prediction]
prediction = MLEngineStartBatchPredictionJobOperator(
task_id="prediction",
project_id=PROJECT_ID,
@@ -152,20 +171,25 @@
input_paths=[PREDICTION_INPUT],
output_path=PREDICTION_OUTPUT,
)
+ # [END howto_operator_gcp_mlengine_get_prediction]
+ # [START howto_operator_gcp_mlengine_delete_version]
delete_version = MLEngineDeleteVersionOperator(
task_id="delete-version",
project_id=PROJECT_ID,
model_name=MODEL_NAME,
version_name="v1"
)
+ # [END howto_operator_gcp_mlengine_delete_version]
+ # [START howto_operator_gcp_mlengine_delete_model]
delete_model = MLEngineDeleteModelOperator(
task_id="delete-model",
project_id=PROJECT_ID,
model_name=MODEL_NAME,
delete_contents=True
)
+ # [END howto_operator_gcp_mlengine_delete_model]
training >> create_version
training >> create_version_2
@@ -178,6 +202,7 @@
list_version >> delete_version
delete_version >> delete_model
+ # [START howto_operator_gcp_mlengine_get_metric]
def get_metric_fn_and_keys():
"""
Gets metric function and keys used to generate summary
@@ -186,7 +211,9 @@ def normalize_value(inst: Dict):
val = float(inst['dense_4'][0])
return tuple([val]) # returns a tuple.
return normalize_value, ['val'] # key order must match.
+ # [END howto_operator_gcp_mlengine_get_metric]
+ # [START howto_operator_gcp_mlengine_validate_error]
def validate_err_and_count(summary: Dict) -> Dict:
"""
Validate summary result
@@ -198,7 +225,9 @@ def validate_err_and_count(summary: Dict) -> Dict:
if summary['count'] != 20:
raise ValueError('Invalid value val != 20; summary={}'.format(summary))
return summary
+ # [END howto_operator_gcp_mlengine_validate_error]
+ # [START howto_operator_gcp_mlengine_evaluate]
evaluate_prediction, evaluate_summary, evaluate_validation = mlengine_operator_utils.create_evaluate_ops(
task_prefix="evaluate-ops",
data_format="TEXT",
@@ -218,6 +247,7 @@ def validate_err_and_count(summary: Dict) -> Dict:
version_name="v1",
py_interpreter="python3",
)
+ # [END howto_operator_gcp_mlengine_evaluate]
create_model >> create_version >> evaluate_prediction
evaluate_validation >> delete_version
diff --git a/airflow/providers/google/cloud/operators/mlengine.py b/airflow/providers/google/cloud/operators/mlengine.py
index c402c478f38cd..623c14d540c2c 100644
--- a/airflow/providers/google/cloud/operators/mlengine.py
+++ b/airflow/providers/google/cloud/operators/mlengine.py
@@ -71,6 +71,10 @@ class MLEngineStartBatchPredictionJobOperator(BaseOperator):
"""
Start a Google Cloud ML Engine prediction job.
+ .. seealso::
+ For more information on how to use this operator, take a look at the guide:
+ :ref:`howto/operator:MLEngineStartBatchPredictionJobOperator`
+
NOTE: For model origin, users should consider exactly one from the
three options below:
@@ -351,6 +355,10 @@ class MLEngineCreateModelOperator(BaseOperator):
"""
Creates a new model.
+ .. seealso::
+ For more information on how to use this operator, take a look at the guide:
+ :ref:`howto/operator:MLEngineCreateModelOperator`
+
The model should be provided by the `model` parameter.
:param model: A dictionary containing the information about the model.
@@ -395,6 +403,10 @@ class MLEngineGetModelOperator(BaseOperator):
"""
Gets a particular model
+ .. seealso::
+ For more information on how to use this operator, take a look at the guide:
+ :ref:`howto/operator:MLEngineGetModelOperator`
+
The name of model shold be specified in `model_name`.
:param model_name: The name of the model.
@@ -438,6 +450,10 @@ class MLEngineDeleteModelOperator(BaseOperator):
"""
Deletes a model.
+ .. seealso::
+ For more information on how to use this operator, take a look at the guide:
+ :ref:`howto/operator:MLEngineDeleteModelOperator`
+
The model should be provided by the `model_name` parameter.
:param model_name: The name of the model.
@@ -615,6 +631,10 @@ class MLEngineCreateVersionOperator(BaseOperator):
"""
Creates a new version in the model
+ .. seealso::
+ For more information on how to use this operator, take a look at the guide:
+ :ref:`howto/operator:MLEngineCreateVersionOperator`
+
Model should be specified by `model_name`, in which case the `version` parameter should contain all the
information to create that version
@@ -678,6 +698,10 @@ class MLEngineSetDefaultVersionOperator(BaseOperator):
"""
Sets a version in the model.
+ .. seealso::
+ For more information on how to use this operator, take a look at the guide:
+ :ref:`howto/operator:MLEngineSetDefaultVersionOperator`
+
The model should be specified by `model_name` to be the default. The name of the version should be
specified in the `version_name` parameter.
@@ -741,6 +765,10 @@ class MLEngineListVersionsOperator(BaseOperator):
"""
Lists all available versions of the model
+ .. seealso::
+ For more information on how to use this operator, take a look at the guide:
+ :ref:`howto/operator:MLEngineListVersionsOperator`
+
The model should be specified by `model_name`.
:param model_name: The name of the Google Cloud ML Engine model that the version
@@ -794,6 +822,10 @@ class MLEngineDeleteVersionOperator(BaseOperator):
"""
Deletes the version from the model.
+ .. seealso::
+ For more information on how to use this operator, take a look at the guide:
+ :ref:`howto/operator:MLEngineDeleteVersionOperator`
+
The name of the version should be specified in `version_name` parameter from the model specified
by `model_name`.
@@ -874,6 +906,10 @@ class MLEngineStartTrainingJobOperator(BaseOperator):
"""
Operator for launching a MLEngine training job.
+ .. seealso::
+ For more information on how to use this operator, take a look at the guide:
+ :ref:`howto/operator:MLEngineStartTrainingJobOperator`
+
:param job_id: A unique templated id for the submitted Google MLEngine
training job. (templated)
:type job_id: str
diff --git a/docs/build b/docs/build
index a30b6a1a9215b..d8a93da286acd 100755
--- a/docs/build
+++ b/docs/build
@@ -358,7 +358,6 @@ MISSING_GOOGLLE_DOC_GUIDES = {
'datastore',
'dlp',
'gcs_to_bigquery',
- 'mlengine',
'mssql_to_gcs',
'mysql_to_gcs',
'postgres_to_gcs',
diff --git a/docs/howto/operator/google/cloud/mlengine.rst b/docs/howto/operator/google/cloud/mlengine.rst
new file mode 100644
index 0000000000000..0b3dd66f816ed
--- /dev/null
+++ b/docs/howto/operator/google/cloud/mlengine.rst
@@ -0,0 +1,233 @@
+ .. Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ .. http://www.apache.org/licenses/LICENSE-2.0
+
+ .. Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied. See the License for the
+ specific language governing permissions and limitations
+ under the License.
+
+
+
+Google Cloud AI Platform Operators
+==================================
+
+`Google Cloud AI Platform `__ (formerly known
+as ML Engine) can be used to train machine learning models at scale, host trained models
+in the cloud, and use models to make predictions for new data. AI Platform is a collection
+of tools for training, evaluating, and tuning machine learning models. AI Platform can also
+be used to deploy a trained model, make predictions, and manage various model versions.
+
+.. contents::
+ :depth: 1
+ :local:
+
+Prerequisite tasks
+^^^^^^^^^^^^^^^^^^
+
+.. include:: ../_partials/prerequisite_tasks.rst
+
+.. _howto/operator:MLEngineStartTrainingJobOperator:
+
+Launching a Job
+^^^^^^^^^^^^^^^
+To start a machine learning operation with AI Platform, you must launch a training job.
+This creates a virtual machine that can run code specified in the trainer file, which
+contains the main application code. A job can be initiated with the
+:class:`~airflow.providers.google.cloud.operators.mlengine.MLEngineStartTrainingJobOperator`.
+
+.. exampleinclude:: /../airflow/providers/google/cloud/example_dags/example_mlengine.py
+ :language: python
+ :dedent: 4
+ :start-after: [START howto_operator_gcp_mlengine_training]
+ :end-before: [END howto_operator_gcp_mlengine_training]
+
+.. _howto/operator:MLEngineCreateModelOperator:
+
+Creating a model
+^^^^^^^^^^^^^^^^
+A model is a container that can hold multiple model versions. A new model can be created through the
+:class:`~airflow.providers.google.cloud.operators.mlengine.MLEngineCreateModelOperator`.
+The ``model`` field should be defined with a dictionary containing the information about the model.
+``name`` is a required field in this dictionary.
+
+.. exampleinclude:: /../airflow/providers/google/cloud/example_dags/example_mlengine.py
+ :language: python
+ :dedent: 4
+ :start-after: [START howto_operator_gcp_mlengine_create_model]
+ :end-before: [END howto_operator_gcp_mlengine_create_model]
+
+.. _howto/operator:MLEngineGetModelOperator:
+
+Getting a model
+^^^^^^^^^^^^^^^
+The :class:`~airflow.providers.google.cloud.operators.mlengine.MLEngineGetModelOperator`
+can be used to obtain a model previously created. To obtain the correct model, ``model_name``
+must be defined in the operator.
+
+.. exampleinclude:: /../airflow/providers/google/cloud/example_dags/example_mlengine.py
+ :language: python
+ :dedent: 4
+ :start-after: [START howto_operator_gcp_mlengine_get_model]
+ :end-before: [END howto_operator_gcp_mlengine_get_model]
+
+You can use :ref:`Jinja templating ` with the ``project_id`` and ``model``
+fields to dynamically determine their values. The result are saved to :ref:`XCom `,
+allowing them to be used by other operators. In this case, the
+:class:`~airflow.operators.bash.BashOperator` is used to print the model information.
+
+.. exampleinclude:: /../airflow/providers/google/cloud/example_dags/example_mlengine.py
+ :language: python
+ :dedent: 4
+ :start-after: [START howto_operator_gcp_mlengine_print_model]
+ :end-before: [END howto_operator_gcp_mlengine_print_model]
+
+.. _howto/operator:MLEngineCreateVersionOperator:
+
+Creating model versions
+^^^^^^^^^^^^^^^^^^^^^^^
+A model version is a subset of the model container where the code runs. A new version of the model can be created
+through the :class:`~airflow.providers.google.cloud.operators.mlengine.MLEngineCreateVersionOperator`.
+The model must be specified by ``model_name``, and the ``version`` parameter should contain a dictionary of
+all the information about the version. Within the ``version`` parameter’s dictionary, the ``name`` field is
+required.
+
+.. exampleinclude:: /../airflow/providers/google/cloud/example_dags/example_mlengine.py
+ :language: python
+ :dedent: 4
+ :start-after: [START howto_operator_gcp_mlengine_create_version1]
+ :end-before: [END howto_operator_gcp_mlengine_create_version1]
+
+The :class:`~airflow.providers.google.cloud.operators.mlengine.MLEngineCreateVersionOperator`
+can also be used to create more versions with varying parameters.
+
+.. exampleinclude:: /../airflow/providers/google/cloud/example_dags/example_mlengine.py
+ :language: python
+ :dedent: 4
+ :start-after: [START howto_operator_gcp_mlengine_create_version2]
+ :end-before: [END howto_operator_gcp_mlengine_create_version2]
+
+.. _howto/operator:MLEngineSetDefaultVersionOperator:
+.. _howto/operator:MLEngineListVersionsOperator:
+
+Managing model versions
+^^^^^^^^^^^^^^^^^^^^^^^
+By default, the model code will run using the default model version. You can set the model version through the
+:class:`~airflow.providers.google.cloud.operators.mlengine.MLEngineSetDefaultVersionOperator`
+by specifying the ``model_name`` and ``version_name`` parameters.
+
+.. exampleinclude:: /../airflow/providers/google/cloud/example_dags/example_mlengine.py
+ :language: python
+ :dedent: 4
+ :start-after: [START howto_operator_gcp_mlengine_default_version]
+ :end-before: [END howto_operator_gcp_mlengine_default_version]
+
+To list the model versions available, use the
+:class:`~airflow.providers.google.cloud.operators.mlengine.MLEngineListVersionsOperator`
+while specifying the ``model_name`` parameter.
+
+.. exampleinclude:: /../airflow/providers/google/cloud/example_dags/example_mlengine.py
+ :language: python
+ :dedent: 4
+ :start-after: [START howto_operator_gcp_mlengine_list_versions]
+ :end-before: [END howto_operator_gcp_mlengine_list_versions]
+
+You can use :ref:`Jinja templating ` with the ``project_id`` and ``model``
+fields to dynamically determine their values. The result are saved to :ref:`XCom `,
+allowing them to be used by other operators. In this case, the
+:class:`~airflow.operators.bash.BashOperator` is used to print the version information.
+
+.. exampleinclude:: /../airflow/providers/google/cloud/example_dags/example_mlengine.py
+ :language: python
+ :dedent: 4
+ :start-after: [START howto_operator_gcp_mlengine_print_versions]
+ :end-before: [END howto_operator_gcp_mlengine_print_versions]
+
+.. _howto/operator:MLEngineStartBatchPredictionJobOperator:
+
+Making predictions
+^^^^^^^^^^^^^^^^^^
+A Google Cloud AI Platform prediction job can be started with the
+:class:`~airflow.providers.google.cloud.operators.mlengine.MLEngineStartBatchPredictionJobOperator`.
+For specifying the model origin, you need to provide either the ``model_name``, ``uri``, or ``model_name`` and
+``version_name``. If you do not provide the ``version_name``, the operator will use the default model version.
+
+.. exampleinclude:: /../airflow/providers/google/cloud/example_dags/example_mlengine.py
+ :language: python
+ :dedent: 4
+ :start-after: [START howto_operator_gcp_mlengine_get_prediction]
+ :end-before: [END howto_operator_gcp_mlengine_get_prediction]
+
+.. _howto/operator:MLEngineDeleteVersionOperator:
+.. _howto/operator:MLEngineDeleteModelOperator:
+
+Cleaning up
+^^^^^^^^^^^
+A model version can be deleted with the
+:class:`~airflow.providers.google.cloud.operators.mlengine.MLEngineDeleteVersionOperator` by
+the ``version_name`` and ``model_name`` parameters.
+
+.. exampleinclude:: /../airflow/providers/google/cloud/example_dags/example_mlengine.py
+ :language: python
+ :dedent: 4
+ :start-after: [START howto_operator_gcp_mlengine_delete_version]
+ :end-before: [END howto_operator_gcp_mlengine_delete_version]
+
+You can also delete a model with the
+:class:`~airflow.providers.google.cloud.operators.mlengine.MLEngineDeleteModelOperator`
+by providing the ``model_name`` parameter.
+
+.. exampleinclude:: /../airflow/providers/google/cloud/example_dags/example_mlengine.py
+ :language: python
+ :dedent: 4
+ :start-after: [START howto_operator_gcp_mlengine_delete_model]
+ :end-before: [END howto_operator_gcp_mlengine_delete_model]
+
+Evaluating a model
+^^^^^^^^^^^^^^^^^^
+To evaluate a prediction and model, specify a metric function to generate a summary and customize
+the evaluation of the model. This function receives a dictionary derived from a json in the batch
+prediction result, then returns a tuple of metrics.
+
+.. exampleinclude:: /../airflow/providers/google/cloud/example_dags/example_mlengine.py
+ :language: python
+ :dedent: 4
+ :start-after: [START howto_operator_gcp_mlengine_get_metric]
+ :end-before: [END howto_operator_gcp_mlengine_get_metric]
+
+To evaluate a prediction and model, it’s useful to have a function to validate the summary result.
+This function receives a dictionary of the averaged metrics the function above generated. It then
+raises an exception if a task fails or should not proceed.
+
+.. exampleinclude:: /../airflow/providers/google/cloud/example_dags/example_mlengine.py
+ :language: python
+ :dedent: 4
+ :start-after: [START howto_operator_gcp_mlengine_validate_error]
+ :end-before: [END howto_operator_gcp_mlengine_validate_error]
+
+Prediction results and a model summary can be generated through a function such as
+:class:`~airflow.providers.google.cloud.utils.mlengine_operator_utils.create_evaluate_ops`.
+It makes predictions using the specified inputs and then summarizes and validates the result. The
+functions created above should be passed in through the ``metric_fn_and_keys`` and ``validate_fn`` fields.
+
+.. exampleinclude:: /../airflow/providers/google/cloud/example_dags/example_mlengine.py
+ :language: python
+ :dedent: 4
+ :start-after: [START howto_operator_gcp_mlengine_evaluate]
+ :end-before: [END howto_operator_gcp_mlengine_evaluate]
+
+Reference
+^^^^^^^^^
+
+For further information, look at:
+
+* `Client Library Documentation `__
+* `Product Documentation `__
diff --git a/docs/operators-and-hooks-ref.rst b/docs/operators-and-hooks-ref.rst
index 12d61cd31d248..985e4a96647b4 100644
--- a/docs/operators-and-hooks-ref.rst
+++ b/docs/operators-and-hooks-ref.rst
@@ -758,8 +758,8 @@ These integrations allow you to perform various operations within the Google Clo
- :mod:`airflow.providers.google.cloud.operators.kubernetes_engine`
-
- * - `Machine Learning Engine `__
- -
+ * - `Machine Learning Engine `__
+ - :doc:`How to use `
- :mod:`airflow.providers.google.cloud.hooks.mlengine`
- :mod:`airflow.providers.google.cloud.operators.mlengine`
-