diff --git a/sagemaker-featurestore/index.rst b/sagemaker-featurestore/index.rst new file mode 100644 index 0000000000..8c3f921a78 --- /dev/null +++ b/sagemaker-featurestore/index.rst @@ -0,0 +1,9 @@ +Get started with SageMaker Feature Store +======================================== + +Feature Store is a centralized store for features and associated metadata so features can be reused. + +.. toctree:: + :maxdepth: 1 + + sagemaker_featurestore_fraud_detection_python_sdk diff --git a/sagemaker-featurestore/sagemaker_featurestore_fraud_detection_python_sdk.ipynb b/sagemaker-featurestore/sagemaker_featurestore_fraud_detection_python_sdk.ipynb new file mode 100644 index 0000000000..69e4eafdd9 --- /dev/null +++ b/sagemaker-featurestore/sagemaker_featurestore_fraud_detection_python_sdk.ipynb @@ -0,0 +1,743 @@ +{ + "cells": [ + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "# Fraud Detection with Amazon SageMaker FeatureStore\n", + "\n", + "Kernel `Python 3 (Data Science)` works well with this notebook.\n", + "\n", + "## Contents\n", + "1. [Background](#Background)\n", + "1. [Setup SageMaker FeatureStore](#Setup-SageMaker-FeatureStore)\n", + "1. [Inspect Dataset](#Inspect-Dataset)\n", + "1. [Ingest Data into FeatureStore](#Ingest-Data-into-FeatureStore)\n", + "1. [Build_Training_Dataset](#Build-Training-Dataset)\n", + "1. [Train_and Deploy_the Model](#Train-and-Deploy-the-Model)\n", + "1. [SageMaker FeatureStore At Inference](#SageMaker-FeatureStore-During-Inference)\n", + "1. [Cleanup Resources](#Cleanup-Resources)\n", + "\n", + "## Background\n", + "\n", + "Amazon SageMaker FeatureStore is a new SageMaker capability that makes it easy for customers to create and manage curated data for machine learning (ML) development. SageMaker FeatureStore enables data ingestion via a high TPS API and data consumption via the online and offline stores. \n", + "\n", + "This notebook provides an example for the APIs provided by SageMaker FeatureStore by walking through the process of training a fraud detection model. The notebook demonstrates how the dataset's tables can be ingested into the FeatureStore, queried to create a training dataset, and quickly accessed during inference. \n", + "\n", + "\n", + "### Terminology\n", + "\n", + "A **FeatureGroup** is the main resource that contains the metadata for all the data stored in SageMaker FeatureStore. A FeatureGroup contains a list of FeatureDefinitions. A **FeatureDefinition** consists of a name and one of the following data types: a integral, string or decimal. The FeatureGroup also contains an **OnlineStoreConfig** and an **OfflineStoreConfig** controlling where the data is stored. Enabling the online store allows quick access to the latest value for a Record via the GetRecord API. The offline store, a required configuration, allows storage of historical data in your S3 bucket. \n", + "\n", + "Once a FeatureGroup is created, data can be added as Records. **Records** can be thought of as a row in a table. Each record will have a unique **RecordIdentifier** along with values for all other FeatureDefinitions in the FeatureGroup. " + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## Setup SageMaker FeatureStore\n", + "\n", + "Let's start by setting up the SageMaker Python SDK and boto client." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "# to get the latest sagemaker python sdk\n", + "!pip install -U sagemaker" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "import boto3\n", + "import sagemaker\n", + "from sagemaker.session import Session\n", + "\n", + "print(sagemaker.__version__)\n", + "\n", + "region = boto3.Session().region_name\n", + "\n", + "boto_session = boto3.Session(region_name=region)\n", + "\n", + "sagemaker_client = boto_session.client(service_name='sagemaker', region_name=region)\n", + "featurestore_runtime = boto_session.client(service_name='sagemaker-featurestore-runtime', region_name=region)\n", + "\n", + "feature_store_session = Session(\n", + " boto_session=boto_session,\n", + " sagemaker_client=sagemaker_client,\n", + " sagemaker_featurestore_runtime_client=featurestore_runtime\n", + ")" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "#### S3 Bucket Setup For The OfflineStore\n", + "\n", + "SageMaker FeatureStore writes the data in the OfflineStore of a FeatureGroup to a S3 bucket owned by you. To be able to write to your S3 bucket, SageMaker FeatureStore assumes an IAM role which has access to it. The role is also owned by you.\n", + "Note that the same bucket can be re-used across FeatureGroups. Data in the bucket is partitioned by FeatureGroup." + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "Set the default s3 bucket name and it will be referenced throughout the notebook." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "# You can modify the following to use a bucket of your choosing\n", + "default_s3_bucket_name = feature_store_session.default_bucket()\n", + "prefix = 'sagemaker-featurestore-demo'\n", + "\n", + "print(default_s3_bucket_name)" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "Set up the IAM role. This role gives SageMaker FeatureStore access to your S3 bucket. \n", + "\n", + "
\n", + "Note: In this example we use the default SageMaker role, assuming it has both AmazonSageMakerFullAccess and AmazonSageMakerFeatureStoreAccess managed policies. If not, please make sure to attach them to the role before proceeding.\n", + "
" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "from sagemaker import get_execution_role\n", + "\n", + "# You can modify the following to use a role of your choosing. See the documentation for how to create this.\n", + "role = get_execution_role()\n", + "print (role)" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## Inspect Dataset\n", + "\n", + "The provided dataset is a synthetic dataset with two tables: identity and transactions. They can both be joined by the `TransactionId` column. The transaction table contains information about a particular transaction such as amount, credit or debit card while the identity table contains information about the user such as device type and browser. The transaction must exist in the transaction table, but might not always be available in the identity table.\n", + "\n", + "The objective of the model is to predict if a transaction is fraudulent or not, given the transaction record." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "import numpy as np \n", + "import pandas as pd\n", + "import matplotlib.pyplot as plt\n", + "import io\n", + "\n", + "s3_client = boto3.client('s3', region_name=region)\n", + "\n", + "fraud_detection_bucket_name = 'sagemaker-sample-files'\n", + "identity_file_key = 'datasets/tabular/fraud_detection/synthethic_fraud_detection_SA/sampled_identity.csv'\n", + "transaction_file_key = 'datasets/tabular/fraud_detection/synthethic_fraud_detection_SA/sampled_transactions.csv'\n", + "\n", + "identity_data_object = s3_client.get_object(Bucket=fraud_detection_bucket_name, Key=identity_file_key)\n", + "transaction_data_object = s3_client.get_object(Bucket=fraud_detection_bucket_name, Key=transaction_file_key)\n", + "\n", + "identity_data = pd.read_csv(io.BytesIO(identity_data_object['Body'].read()))\n", + "transaction_data = pd.read_csv(io.BytesIO(transaction_data_object['Body'].read()))\n", + "\n", + "identity_data = identity_data.round(5)\n", + "transaction_data = transaction_data.round(5)\n", + "\n", + "identity_data = identity_data.fillna(0)\n", + "transaction_data = transaction_data.fillna(0)\n", + "\n", + "# Feature transformations for this dataset are applied before ingestion into FeatureStore.\n", + "# One hot encode card4, card6\n", + "encoded_card_bank = pd.get_dummies(transaction_data['card4'], prefix = 'card_bank')\n", + "encoded_card_type = pd.get_dummies(transaction_data['card6'], prefix = 'card_type')\n", + "\n", + "transformed_transaction_data = pd.concat([transaction_data, encoded_card_type, encoded_card_bank], axis=1)\n", + "# blank space is not allowed in feature name\n", + "transformed_transaction_data = transformed_transaction_data.rename(columns={\"card_bank_american express\": \"card_bank_american_express\"})" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "identity_data.head()" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "transformed_transaction_data.head()" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## Ingest Data into FeatureStore\n" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "In this step we will create the FeatureGroups representing the transaction and identity tables." + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "#### Define FeatureGroups" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "from time import gmtime, strftime, sleep\n", + "\n", + "identity_feature_group_name = 'identity-feature-group-' + strftime('%d-%H-%M-%S', gmtime())\n", + "transaction_feature_group_name = 'transaction-feature-group-' + strftime('%d-%H-%M-%S', gmtime())" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "from sagemaker.feature_store.feature_group import FeatureGroup\n", + "\n", + "identity_feature_group = FeatureGroup(name=identity_feature_group_name, sagemaker_session=feature_store_session)\n", + "transaction_feature_group = FeatureGroup(name=transaction_feature_group_name, sagemaker_session=feature_store_session)" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "import time\n", + "\n", + "current_time_sec = int(round(time.time()))\n", + "\n", + "def cast_object_to_string(data_frame):\n", + " for label in data_frame.columns:\n", + " if data_frame.dtypes[label] == 'object':\n", + " data_frame[label] = data_frame[label].astype(\"str\").astype(\"string\")\n", + "\n", + "# cast object dtype to string. The SageMaker FeatureStore Python SDK will then map the string dtype to String feature type.\n", + "cast_object_to_string(identity_data)\n", + "cast_object_to_string(transformed_transaction_data)\n", + "\n", + "# record identifier and event time feature names\n", + "record_identifier_feature_name = \"TransactionID\"\n", + "event_time_feature_name = \"EventTime\"\n", + "\n", + "# append EventTime feature\n", + "identity_data[event_time_feature_name] = pd.Series([current_time_sec]*len(identity_data), dtype=\"float64\")\n", + "transformed_transaction_data[event_time_feature_name] = pd.Series([current_time_sec]*len(transaction_data), dtype=\"float64\")\n", + "\n", + "# load feature definitions to the feature group. SageMaker FeatureStore Python SDK will auto-detect the data schema based on input data.\n", + "identity_feature_group.load_feature_definitions(data_frame=identity_data); # output is suppressed\n", + "transaction_feature_group.load_feature_definitions(data_frame=transformed_transaction_data); # output is suppressed" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "#### Create FeatureGroups in SageMaker FeatureStore" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "def wait_for_feature_group_creation_complete(feature_group):\n", + " status = feature_group.describe().get(\"FeatureGroupStatus\")\n", + " while status == \"Creating\":\n", + " print(\"Waiting for Feature Group Creation\")\n", + " time.sleep(5)\n", + " status = feature_group.describe().get(\"FeatureGroupStatus\")\n", + " if status != \"Created\":\n", + " raise RuntimeError(f\"Failed to create feature group {feature_group.name}\")\n", + " print(f\"FeatureGroup {feature_group.name} successfully created.\")\n", + "\n", + "identity_feature_group.create(\n", + " s3_uri=f\"s3://{default_s3_bucket_name}/{prefix}\",\n", + " record_identifier_name=record_identifier_feature_name,\n", + " event_time_feature_name=event_time_feature_name,\n", + " role_arn=role,\n", + " enable_online_store=True\n", + ")\n", + "\n", + "transaction_feature_group.create(\n", + " s3_uri=f\"s3://{default_s3_bucket_name}/{prefix}\",\n", + " record_identifier_name=record_identifier_feature_name,\n", + " event_time_feature_name=event_time_feature_name,\n", + " role_arn=role,\n", + " enable_online_store=True\n", + ")\n", + "\n", + "wait_for_feature_group_creation_complete(feature_group=identity_feature_group)\n", + "wait_for_feature_group_creation_complete(feature_group=transaction_feature_group)" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "Confirm the FeatureGroup has been created by using the DescribeFeatureGroup and ListFeatureGroups APIs." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "identity_feature_group.describe()" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "transaction_feature_group.describe()" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "sagemaker_client.list_feature_groups() # use boto client to list FeatureGroups" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "#### PutRecords into FeatureGroup\n", + "\n", + "After the FeatureGroups have been created, we can put data into the FeatureGroups by using the PutRecord API. This API can handle high TPS and is designed to be called by different streams. The data from all of these Put requests is buffered and written to S3 in chunks. The files will be written to the offline store within a few minutes of ingestion. For this example, to accelerate the ingestion process, we are specifying multiple workers to do the job simultaneously. It will take ~1min to ingest data to the 2 FeatureGroups, respectively." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "identity_feature_group.ingest(\n", + " data_frame=identity_data, max_workers=3, wait=True\n", + ")" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "transaction_feature_group.ingest(\n", + " data_frame=transformed_transaction_data, max_workers=5, wait=True\n", + ")" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "To confirm that data has been ingested, we can quickly retrieve a record from the online store:" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "record_identifier_value = str(2990130)\n", + "\n", + "featurestore_runtime.get_record(FeatureGroupName=transaction_feature_group_name, RecordIdentifierValueAsString=record_identifier_value)" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "The SageMaker Python SDK’s FeatureStore class also provides the functionality to generate Hive DDL commands. Schema of the table is generated based on the feature definitions. Columns are named after feature name and data-type are inferred based on feature type." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "print(identity_feature_group.as_hive_ddl())" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "print(transaction_feature_group.as_hive_ddl())" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "Now let's wait for the data to appear in our offline store before moving forward to creating a dataset. This will take approximately 5 minutes." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "account_id = boto3.client('sts').get_caller_identity()[\"Account\"]\n", + "\n", + "identity_feature_group_s3_prefix = prefix + '/' + account_id + '/sagemaker/' + region + '/offline-store/' + identity_feature_group_name + '/data'\n", + "transaction_feature_group_s3_prefix = prefix + '/' + account_id + '/sagemaker/' + region + '/offline-store/' + transaction_feature_group_name + '/data'\n", + "\n", + "offline_store_contents = None\n", + "while (offline_store_contents is None):\n", + " objects_in_bucket = s3_client.list_objects(Bucket=default_s3_bucket_name,Prefix=transaction_feature_group_s3_prefix)\n", + " if ('Contents' in objects_in_bucket and len(objects_in_bucket['Contents']) > 1):\n", + " offline_store_contents = objects_in_bucket['Contents']\n", + " else:\n", + " print('Waiting for data in offline store...\\n')\n", + " sleep(60)\n", + " \n", + "print('Data available.')" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "SageMaker FeatureStore adds metadata for each record that's ingested into the offline store." + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## Build Training Dataset\n", + "\n", + "SageMaker FeatureStore automatically builds the Glue Data Catalog for FeatureGroups (you can optionally turn it on/off while creating the FeatureGroup). In this example, we want to create one training dataset with FeatureValues from both identity and transaction FeatureGroups. This is done by utilizing the auto-built Catalog. We run an Athena query that joins the data stored in the offline store in S3 from the 2 FeatureGroups. " + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "identity_query = identity_feature_group.athena_query()\n", + "transaction_query = transaction_feature_group.athena_query()\n", + "\n", + "identity_table = identity_query.table_name\n", + "transaction_table = transaction_query.table_name\n", + "\n", + "query_string = 'SELECT * FROM \"'+transaction_table+'\" LEFT JOIN \"'+identity_table+'\" ON \"'+transaction_table+'\".transactionid = \"'+identity_table+'\".transactionid'\n", + "print('Running ' + query_string)\n", + "\n", + "# run Athena query. The output is loaded to a Pandas dataframe.\n", + "dataset = pd.DataFrame()\n", + "identity_query.run(query_string=query_string, output_location='s3://'+default_s3_bucket_name+'/'+prefix+'/query_results/')\n", + "identity_query.wait()\n", + "dataset = identity_query.as_dataframe()\n", + "\n", + "dataset" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "# Prepare query results for training.\n", + "query_execution = identity_query.get_query_execution()\n", + "query_result = 's3://'+default_s3_bucket_name+'/'+prefix+'/query_results/'+query_execution['QueryExecution']['QueryExecutionId']+'.csv'\n", + "print(query_result)\n", + "\n", + "# Select useful columns for training with target column as the first.\n", + "dataset = dataset[[\"isfraud\", \"transactiondt\", \"transactionamt\", \"card1\", \"card2\", \"card3\", \"card5\", \"card_type_credit\", \"card_type_debit\", \"card_bank_american_express\", \"card_bank_discover\", \"card_bank_mastercard\", \"card_bank_visa\", \"id_01\", \"id_02\", \"id_03\", \"id_04\", \"id_05\"]]\n", + "\n", + "# Write to csv in S3 without headers and index column.\n", + "dataset.to_csv('dataset.csv', header=False, index=False)\n", + "s3_client.upload_file('dataset.csv', default_s3_bucket_name, prefix+'/training_input/dataset.csv')\n", + "dataset_uri_prefix = 's3://'+default_s3_bucket_name+'/'+prefix+'/training_input/';\n", + "\n", + "dataset" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## Train and Deploy the Model\n", + "\n", + "Now it's time to launch a Training job to fit our model." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "training_output_path = 's3://'+default_s3_bucket_name+'/'+prefix+'/training_output'\n", + "training_image = '257758044811.dkr.ecr.us-east-2.amazonaws.com/sagemaker-xgboost:1.0-1-cpu-py3'" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "Create an `Estimator` object. This estimator will launch the training job." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "from sagemaker.estimator import Estimator\n", + "\n", + "training_model = Estimator(training_image,\n", + " role, \n", + " instance_count=1, \n", + " instance_type='ml.m5.2xlarge',\n", + " volume_size = 5,\n", + " max_run = 3600,\n", + " input_mode= 'File',\n", + " output_path=training_output_path,\n", + " sagemaker_session=feature_store_session)" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "training_model.set_hyperparameters(objective = \"binary:logistic\",\n", + " num_round = 50)" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "Specify training dataset, which is the dataset we created above." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "import sagemaker.inputs\n", + "\n", + "train_data = sagemaker.inputs.TrainingInput(dataset_uri_prefix, distribution='FullyReplicated', \n", + " content_type='text/csv', s3_data_type='S3Prefix')\n", + "data_channels = {'train': train_data}" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "training_model.fit(inputs=data_channels, logs=True)" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## Set up Hosting for the Model\n", + "\n", + "Once the training is done, we can deploy the trained model as an Amazon SageMaker real-time hosted endpoint. This will allow us to make predictions (or inference) from the model. Note that we don't have to host on the same instance (or type of instance) that we used to train. The endpoint deployment can be accomplished as follows. This takes 8-10 minutes to complete." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "predictor = training_model.deploy(initial_instance_count = 1, instance_type = 'ml.m5.xlarge')" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## SageMaker FeatureStore During Inference\n", + "\n", + "SageMaker FeatureStore can be useful in supplementing data for inference requests because of the low-latency GetRecord functionality. For this demo, we will be given a TransactionId and query our online FeatureGroups for data on the transaction to build our inference request. \n" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "# Incoming inference request.\n", + "transaction_id = str(3450774)\n", + "\n", + "# Helper to parse the feature value from the record.\n", + "def get_feature_value(record, feature_name):\n", + " return str(list(filter(lambda r: r['FeatureName'] == feature_name, record))[0]['ValueAsString'])\n", + "\n", + "transaction_response = featurestore_runtime.get_record(FeatureGroupName=transaction_feature_group_name, RecordIdentifierValueAsString=transaction_id)\n", + "transaction_record = transaction_response['Record']\n", + "\n", + "transaction_test_data = [\n", + " get_feature_value(transaction_record, 'TransactionDT'),\n", + " get_feature_value(transaction_record, 'TransactionAmt'),\n", + " get_feature_value(transaction_record, 'card1'),\n", + " get_feature_value(transaction_record, 'card2'),\n", + " get_feature_value(transaction_record, 'card3'),\n", + " get_feature_value(transaction_record, 'card5'),\n", + " get_feature_value(transaction_record, 'card_type_credit'),\n", + " get_feature_value(transaction_record, 'card_type_debit'),\n", + " get_feature_value(transaction_record, 'card_bank_american_express'),\n", + " get_feature_value(transaction_record, 'card_bank_discover'),\n", + " get_feature_value(transaction_record, 'card_bank_mastercard'),\n", + " get_feature_value(transaction_record, 'card_bank_visa')\n", + "]\n", + "\n", + "identity_response = featurestore_runtime.get_record(FeatureGroupName=identity_feature_group_name, RecordIdentifierValueAsString=transaction_id)\n", + "identity_record = identity_response['Record']\n", + "id_test_data = [\n", + " get_feature_value(identity_record, 'id_01'),\n", + " get_feature_value(identity_record, 'id_02'),\n", + " get_feature_value(identity_record, 'id_03'),\n", + " get_feature_value(identity_record, 'id_04'),\n", + " get_feature_value(identity_record, 'id_05')\n", + "]\n", + "\n", + "# Join all pieces for inference request.\n", + "inference_request = []\n", + "inference_request.extend(transaction_test_data[:])\n", + "inference_request.extend(id_test_data[:])\n", + "\n", + "inference_request" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "import json\n", + "\n", + "results = predictor.predict(','.join(inference_request), initial_args = {\"ContentType\": \"text/csv\"})\n", + "prediction = json.loads(results)\n", + "print (prediction)" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## Cleanup Resources" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "predictor.delete_endpoint()" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "identity_feature_group.delete()\n", + "transaction_feature_group.delete()" + ] + } + ], + "metadata": { + "kernelspec": { + "display_name": "conda_python3", + "language": "python", + "name": "conda_python3" + }, + "language_info": { + "codemirror_mode": { + "name": "ipython", + "version": 3 + }, + "file_extension": ".py", + "mimetype": "text/x-python", + "name": "python", + "nbconvert_exporter": "python", + "pygments_lexer": "ipython3", + "version": "3.6.10" + } + }, + "nbformat": 4, + "nbformat_minor": 4 +} diff --git a/sagemaker-lineage/sagemaker-lineage.ipynb b/sagemaker-lineage/sagemaker-lineage.ipynb new file mode 100644 index 0000000000..720925eced --- /dev/null +++ b/sagemaker-lineage/sagemaker-lineage.ipynb @@ -0,0 +1,312 @@ +{ + "cells": [ + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "# Amazon SageMaker Lineage\n", + "Amazon SageMaker Lineage enables events that happen within SageMaker to be traced via a graph structure. The data simplifies generating reports, making comparisons, or discovering relationships between events. For example easily trace both how a model was generated and where the model was deployed. \n", + "\n", + "The lineage graph is created automatically by SageMaker and you can directly create or modify your own graphs.\n", + "\n", + "\n", + "## Key Concepts\n", + "\n", + "* **Lineage Graph** - A connected graph tracing your machine learning workflow end to end. \n", + "* **Artifacts** - Represents a URI addressable object or data. Artifacts are typically inputs or outputs to Actions. \n", + "* **Actions** - Represents an action taken such as a computation, transformation, or job. \n", + "* **Contexts** - Provides a method to logically group other entities.\n", + "* **Associations** - A directed edge in the lineage graph that links two entities.\n", + "* **Lineage Traversal** - Starting from an arbitrary point trace the lineage graph to discover and analyze relationships between steps in your workflow.\n", + "* **Experiments** - Experiment entites (Experiments, Trials, and Trial Components) are also part of the lineage graph and can be associated wtih Artifacts, Actions, or Contexts.\n" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## Notebook Overview\n", + "\n", + "This notebook demonstrates how to:\n", + "* Understand the basics of lineage entities.\n", + "* Create and associate lineage entities to track your workflow.\n", + "* Traverse the associations between lineage entities." + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## Prerequisites\n", + "\n", + "Select the `Python 3 (Data Science)` kernel in SageMaker Studio." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "import boto3\n", + "import sagemaker\n", + "\n", + "region = boto3.Session().region_name\n", + "sagemaker_session = sagemaker.session.Session()\n", + "default_bucket = sagemaker_session.default_bucket()" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "from datetime import datetime\n", + "from sagemaker.lineage.context import Context\n", + "from sagemaker.lineage.action import Action\n", + "from sagemaker.lineage.association import Association\n", + "from sagemaker.lineage.artifact import Artifact\n", + "\n", + "unique_id = str(int(datetime.now().replace(microsecond=0).timestamp()))\n", + "\n", + "print(f'Unique id is {unique_id}')" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "# create an example context\n", + "\n", + "# the name must be unique across all other contexts\n", + "context_name = f'machine-learning-workflow-{unique_id}' \n", + "\n", + "ml_workflow_context = Context.create(\n", + " context_name=context_name, \n", + " context_type='MLWorkflow', \n", + " source_uri=unique_id,\n", + " # properties services as a method to store metdata on lineage entities in additional to Tags\n", + " properties={\"example\": \"true\"})" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "# list all the contexts\n", + "\n", + "contexts = Context.list(sort_by='CreationTime', sort_order='Descending')\n", + "\n", + "for ctx in contexts:\n", + " print(ctx.context_name)" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "# create an example action and associate it with the context\n", + "\n", + "model_build_action = Action.create(\n", + " action_name=f\"model-build-step-{unique_id}\",\n", + " action_type=\"ModelBuild\",\n", + " source_uri=unique_id,\n", + " properties={\"Example\": \"Metadata\"},\n", + ")" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "# Association Type can be Produced|DerivedFrom|AssociatedWith|ContributedTo\n", + "context_action_association = Association.create(\n", + " source_arn=ml_workflow_context.context_arn,\n", + " destination_arn=model_build_action.action_arn,\n", + " association_type='AssociatedWith'\n", + ")" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "# now the Action and Context are associated:\n", + "incoming_associations_to_action = Association.list(destination_arn=model_build_action.action_arn)\n", + "for association in incoming_associations_to_action:\n", + " print(f'{model_build_action.action_name} has an incoming association from {association.source_name}')\n", + "\n", + "outgoing_associations_from_context = Association.list(source_arn=ml_workflow_context.context_arn)\n", + "for association in outgoing_associations_from_context:\n", + " print(f'{ml_workflow_context.context_name} has an outgoing association to {association.destination_name}')" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "# create an artifact representing inputs to the model building action\n", + "input_test_images = Artifact.create(\n", + " artifact_name='mnist-test-images',\n", + " artifact_type='TestData',\n", + " source_types=[{\"SourceIdType\": \"Custom\", \"Value\": unique_id}],\n", + " source_uri='https://sagemaker-sample-files.s3.amazonaws.com/datasets/image/MNIST/t10k-images-idx3-ubyte.gz')\n", + "\n", + "input_test_labels = Artifact.create(\n", + " artifact_name='mnist-test-labels',\n", + " artifact_type='TestLabels',\n", + " source_types=[{\"SourceIdType\": \"Custom\", \"Value\": unique_id}],\n", + " source_uri='https://sagemaker-sample-files.s3.amazonaws.com/datasets/image/MNIST/t10k-labels-idx1-ubyte.gz')" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "# create an artifact representing a trained model\n", + "output_model = Artifact.create(\n", + " artifact_name='mnist-model',\n", + " artifact_type='Model',\n", + " source_types=[{\"SourceIdType\": \"Custom\", \"Value\": unique_id}],\n", + " source_uri='s3://sagemaker-sample-files.s3.amazonaws.com/datasets/image/MNIST/model/tensorflow-training-2020-11-20-23-57-13-077/model.tar.gz'\n", + ")" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "# associate the data set artifact with an incoming association to the example action\n", + "Association.create(source_arn=input_test_images.artifact_arn, destination_arn=model_build_action.action_arn)\n", + "Association.create(source_arn=input_test_labels.artifact_arn, destination_arn=model_build_action.action_arn)" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "# associate the example action with an outgoing association to the model artifact\n", + "Association.create(source_arn=model_build_action.action_arn, destination_arn=output_model.artifact_arn)" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## Cleanup" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "def delete_associations(arn):\n", + " # delete incoming associations\n", + " incoming_associations = Association.list(destination_arn=arn)\n", + " for summary in incoming_associations:\n", + " assct = Association(\n", + " source_arn=summary.source_arn, \n", + " destination_arn=summary.destination_arn,\n", + " sagemaker_session=sagemaker_session)\n", + " assct.delete()\n", + " \n", + " # delete outgoing associations\n", + " outgoing_associations = Association.list(source_arn=arn)\n", + " for summary in outgoing_associations:\n", + " assct = Association(\n", + " source_arn=summary.source_arn, \n", + " destination_arn=summary.destination_arn,\n", + " sagemaker_session=sagemaker_session)\n", + " assct.delete()\n", + "\n", + "def delete_lineage_data():\n", + " for summary in Context.list():\n", + " print(f'Deleting context {summary.context_name}')\n", + " delete_associations(summary.context_arn)\n", + " ctx = Context(context_name=summary.context_name, sagemaker_session=sagemaker_session)\n", + " ctx.delete()\n", + "\n", + " for summary in Action.list():\n", + " print(f'Deleting action {summary.action_name}')\n", + " delete_associations(summary.action_arn)\n", + " actn = Action(action_name=summary.action_name, sagemaker_session=sagemaker_session)\n", + " actn.delete()\n", + "\n", + " for summary in Artifact.list():\n", + " print(f'Deleting artifact {summary.artifact_arn} {summary.artifact_name}')\n", + " delete_associations(summary.artifact_arn)\n", + " artfct = Artifact(artifact_arn=summary.artifact_arn, sagemaker_session=sagemaker_session)\n", + " artfct.delete()\n", + "\n", + "delete_lineage_data()" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## Caveats\n", + "\n", + "* Associations cannot be created between two experiment entities. For example between an Experiment and Trial.\n", + "* Associations can only be created between the following resources: Experiment, Trial, Trial Component, Action, Artifact, or Context.\n", + "* The maximum number of manually created lineage entities are:\n", + " * Artifacts: 6000\n", + " * Contexts: 500\n", + " * Actions: 3000\n", + " * Associations: 6000\n", + "* There is no limit on the number of lineage entities created automatically by SageMaker." + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## Contact\n", + "\n", + "Submit any questions or issues to https://github.com/aws/sagemaker-experiments/issues or mention @aws/sagemakerexperimentsadmin" + ] + } + ], + "metadata": { + "kernelspec": { + "display_name": "Python 3", + "language": "python", + "name": "python3" + }, + "language_info": { + "codemirror_mode": { + "name": "ipython", + "version": 3 + }, + "file_extension": ".py", + "mimetype": "text/x-python", + "name": "python", + "nbconvert_exporter": "python", + "pygments_lexer": "ipython3", + "version": "3.8.0" + } + }, + "nbformat": 4, + "nbformat_minor": 4 +} \ No newline at end of file diff --git a/sagemaker-python-sdk/pipelines/img/pipeline-1.png b/sagemaker-python-sdk/pipelines/img/pipeline-1.png new file mode 100644 index 0000000000..abb112432d Binary files /dev/null and b/sagemaker-python-sdk/pipelines/img/pipeline-1.png differ diff --git a/sagemaker-python-sdk/pipelines/img/pipeline-2.png b/sagemaker-python-sdk/pipelines/img/pipeline-2.png new file mode 100644 index 0000000000..3bfc455bdc Binary files /dev/null and b/sagemaker-python-sdk/pipelines/img/pipeline-2.png differ diff --git a/sagemaker-python-sdk/pipelines/img/pipeline-3.png b/sagemaker-python-sdk/pipelines/img/pipeline-3.png new file mode 100644 index 0000000000..c272980081 Binary files /dev/null and b/sagemaker-python-sdk/pipelines/img/pipeline-3.png differ diff --git a/sagemaker-python-sdk/pipelines/img/pipeline-4.png b/sagemaker-python-sdk/pipelines/img/pipeline-4.png new file mode 100644 index 0000000000..f07e182b99 Binary files /dev/null and b/sagemaker-python-sdk/pipelines/img/pipeline-4.png differ diff --git a/sagemaker-python-sdk/pipelines/img/pipeline-5.png b/sagemaker-python-sdk/pipelines/img/pipeline-5.png new file mode 100644 index 0000000000..bfdba13560 Binary files /dev/null and b/sagemaker-python-sdk/pipelines/img/pipeline-5.png differ diff --git a/sagemaker-python-sdk/pipelines/img/pipeline-6.png b/sagemaker-python-sdk/pipelines/img/pipeline-6.png new file mode 100644 index 0000000000..f2d0254a3b Binary files /dev/null and b/sagemaker-python-sdk/pipelines/img/pipeline-6.png differ diff --git a/sagemaker-python-sdk/pipelines/img/pipeline-7.png b/sagemaker-python-sdk/pipelines/img/pipeline-7.png new file mode 100644 index 0000000000..c910c12383 Binary files /dev/null and b/sagemaker-python-sdk/pipelines/img/pipeline-7.png differ diff --git a/sagemaker-python-sdk/pipelines/img/pipeline-full.png b/sagemaker-python-sdk/pipelines/img/pipeline-full.png new file mode 100644 index 0000000000..4892e027e2 Binary files /dev/null and b/sagemaker-python-sdk/pipelines/img/pipeline-full.png differ diff --git a/sagemaker-python-sdk/pipelines/sagemaker-pipelines-preprocess-train-evaluate-batch-transform.ipynb b/sagemaker-python-sdk/pipelines/sagemaker-pipelines-preprocess-train-evaluate-batch-transform.ipynb new file mode 100644 index 0000000000..962290bf77 --- /dev/null +++ b/sagemaker-python-sdk/pipelines/sagemaker-pipelines-preprocess-train-evaluate-batch-transform.ipynb @@ -0,0 +1,1267 @@ +{ + "cells": [ + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "# Orchestrating Jobs with Amazon SageMaker Model Building Pipelines\n", + "\n", + "Amazon SageMaker Model Building Pipelines offers machine learning (ML) application developers and operations engineers the ability to orchestrate SageMaker jobs and author reproducible ML pipelines. It also enables them to deploy custom-build models for inference in real-time with low latency, run offline inferences with Batch Transform, and track lineage of artifacts. They can institute sound operational practices in deploying and monitoring production workflows, deploying model artifacts, and tracking artifact lineage through a simple interface, adhering to safety and best practice paradigms for ML application development.\n", + "\n", + "The SageMaker Pipelines service supports a SageMaker Pipeline domain specific language (DSL), which is a declarative JSON specification. This DSL defines a directed acyclic graph (DAG) of pipeline parameters and SageMaker job steps. The SageMaker Python Software Developer Kit (SDK) streamlines the generation of the pipeline DSL using constructs that engineers and scientists are already familiar with." + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## SageMaker Pipelines\n", + "\n", + "SageMaker Pipelines supports the following activities, which are demonstrated in this notebook:\n", + "\n", + "* Pipelines - A DAG of steps and conditions to orchestrate SageMaker jobs and resource creation.\n", + "* Processing job steps - A simplified, managed experience on SageMaker to run data processing workloads, such as feature engineering, data validation, model evaluation, and model interpretation.\n", + "* Training job steps - An iterative process that teaches a model to make predictions by presenting examples from a training dataset.\n", + "* Conditional execution steps - A step that provides conditional execution of branches in a pipeline.\n", + "* Register model steps - A step that creates a model package resource in the Model Registry that can be used to create deployable models in Amazon SageMaker.\n", + "* Create model steps - A step that creates a model for use in transform steps or later publication as an endpoint.\n", + "* Transform job steps - A batch transform to preprocess datasets to remove noise or bias that interferes with training or inference from a dataset, get inferences from large datasets, and run inference when a persistent endpoint is not needed.\n", + "* Parametrized Pipeline executions - Enables variation in pipeline executions according to specified parameters." + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## Notebook Overview\n", + "\n", + "This notebook shows how to:\n", + "\n", + "* Define a set of Pipeline parameters that can be used to parametrize a SageMaker Pipeline.\n", + "* Define a Processing step that performs cleaning, feature engineering, and splitting the input data into train and test data sets.\n", + "* Define a Training step that trains a model on the preprocessed train data set.\n", + "* Define a Processing step that evaluates the trained model's performance on the test dataset.\n", + "* Define a Create Model step that creates a model from the model artifacts used in training.\n", + "* Define a Transform step that performs batch transformation based on the model that was created.\n", + "* Define a Register Model step that creates a model package from the estimator and model artifacts used to train the model.\n", + "* Define a Conditional step that measures a condition based on output from prior steps and conditionally executes other steps.\n", + "* Define and create a Pipeline definition in a DAG, with the defined parameters and steps.\n", + "* Start a Pipeline execution and wait for execution to complete.\n", + "* Download the model evaluation report from the S3 bucket for examination.\n", + "* Start a second Pipeline execution." + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## A SageMaker Pipeline\n", + "\n", + "The pipeline that you create follows a typical machine learning (ML) application pattern of preprocessing, training, evaluation, model creation, batch transformation, and model registration:\n", + "\n", + "![A typical ML Application pipeline](img/pipeline-full.png)" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## Dataset\n", + "\n", + "The dataset you use is the [UCI Machine Learning Abalone Dataset](https://archive.ics.uci.edu/ml/datasets/abalone) [1]. The aim for this task is to determine the age of an abalone from its physical measurements. At the core, this is a regression problem.\n", + "\n", + "The dataset contains several features: length (longest shell measurement), diameter (diameter perpendicular to length), height (height with meat in the shell), whole_weight (weight of whole abalone), shucked_weight (weight of meat), viscera_weight (gut weight after bleeding), shell_weight (weight after being dried), sex ('M', 'F', 'I' where 'I' is Infant), and rings (integer).\n", + "\n", + "The number of rings turns out to be a good approximation for age (age is rings + 1.5). However, to obtain this number requires cutting the shell through the cone, staining the section, and counting the number of rings through a microscope, which is a time-consuming task. However, the other physical measurements are easier to determine. You use the dataset to build a predictive model of the variable rings through these other physical measurements.\n", + "\n", + "Before you upload the data to an S3 bucket, gather some constants you can use later in this notebook.\n", + "\n", + "[1] Dua, D. and Graff, C. (2019). [UCI Machine Learning Repository](http://archive.ics.uci.edu/ml). Irvine, CA: University of California, School of Information and Computer Science." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "import boto3\n", + "import sagemaker\n", + "\n", + "\n", + "region = boto3.Session().region_name\n", + "sagemaker_session = sagemaker.session.Session()\n", + "role = sagemaker.get_execution_role()\n", + "default_bucket = sagemaker_session.default_bucket()\n", + "model_package_group_name = f\"AbaloneModelPackageGroupName\"" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "Now, upload the data into the default bucket. You can select our own data set for the `input_data_uri` as is appropriate." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "!mkdir -p data" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "local_path = \"data/abalone-dataset.csv\"\n", + "\n", + "s3 = boto3.resource(\"s3\")\n", + "s3.Bucket(f\"sagemaker-servicecatalog-seedcode-{region}\").download_file(\n", + " \"dataset/abalone-dataset.csv\",\n", + " local_path\n", + ")\n", + "\n", + "base_uri = f\"s3://{default_bucket}/abalone\"\n", + "input_data_uri = sagemaker.s3.S3Uploader.upload(\n", + " local_path=local_path, \n", + " desired_s3_uri=base_uri,\n", + ")\n", + "print(input_data_uri)" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "Download a second dataset for batch transformation after model creation. You can select our own dataset for the `batch_data_uri` as is appropriate." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "local_path = \"data/abalone-dataset-batch\"\n", + "\n", + "s3 = boto3.resource(\"s3\")\n", + "s3.Bucket(f\"sagemaker-servicecatalog-seedcode-{region}\").download_file(\n", + " \"dataset/abalone-dataset-batch\",\n", + " local_path\n", + ")\n", + "\n", + "base_uri = f\"s3://{default_bucket}/abalone\"\n", + "batch_data_uri = sagemaker.s3.S3Uploader.upload(\n", + " local_path=local_path, \n", + " desired_s3_uri=base_uri,\n", + ")\n", + "print(batch_data_uri)" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "import argparse\n", + "import os\n", + "import requests\n", + "import tempfile\n", + "\n", + "import numpy as np\n", + "import pandas as pd\n", + "\n", + "from sklearn.compose import ColumnTransformer\n", + "from sklearn.impute import SimpleImputer\n", + "from sklearn.pipeline import Pipeline\n", + "from sklearn.preprocessing import StandardScaler, OneHotEncoder\n", + "\n", + "\n", + "# Specify the column names for the .csv file.\n", + "feature_columns_names = [\n", + " \"sex\",\n", + " \"length\",\n", + " \"diameter\",\n", + " \"height\",\n", + " \"whole_weight\",\n", + " \"shucked_weight\",\n", + " \"viscera_weight\",\n", + " \"shell_weight\",\n", + "]\n", + "label_column = \"rings\"\n", + "\n", + "feature_columns_dtype = {\n", + " \"sex\": str,\n", + " \"length\": np.float64,\n", + " \"diameter\": np.float64,\n", + " \"height\": np.float64,\n", + " \"whole_weight\": np.float64,\n", + " \"shucked_weight\": np.float64,\n", + " \"viscera_weight\": np.float64,\n", + " \"shell_weight\": np.float64\n", + "}\n", + "label_column_dtype = {\"rings\": np.float64}\n", + "\n", + "\n", + "def merge_two_dicts(x, y):\n", + " z = x.copy()\n", + " z.update(y)\n", + " return z\n", + "\n", + "\n", + "\n", + "df = pd.read_csv(\n", + " f\"./data/abalone-dataset.csv\",\n", + " header=None, \n", + " names=feature_columns_names + [label_column],\n", + " dtype=merge_two_dicts(feature_columns_dtype, label_column_dtype)\n", + ")\n", + "numeric_features = list(feature_columns_names)\n", + "numeric_features.remove(\"sex\")\n", + "numeric_transformer = Pipeline(\n", + " steps=[\n", + " (\"imputer\", SimpleImputer(strategy=\"median\")),\n", + " (\"scaler\", StandardScaler())\n", + " ]\n", + ")\n", + "\n", + "categorical_features = [\"sex\"]\n", + "categorical_transformer = Pipeline(\n", + " steps=[\n", + " (\"imputer\", SimpleImputer(strategy=\"constant\", fill_value=\"missing\")),\n", + " (\"onehot\", OneHotEncoder(handle_unknown=\"ignore\"))\n", + " ]\n", + ")\n", + "\n", + "preprocess = ColumnTransformer(\n", + " transformers=[\n", + " (\"num\", numeric_transformer, numeric_features),\n", + " (\"cat\", categorical_transformer, categorical_features)\n", + " ]\n", + ")\n", + " \n", + "y = df.pop(\"rings\")\n", + "X_pre = preprocess.fit_transform(df)\n", + "y_pre = y.to_numpy().reshape(len(y), 1)\n", + " \n", + "X = np.concatenate((y_pre, X_pre), axis=1)\n", + "\n", + "pd.DataFrame(X).to_csv(f\"data/all.csv\", header=False, index=False)" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "### Define Parameters to Parametrize Pipeline Execution\n", + "\n", + "Define Pipeline parameters that you can use to parametrize the pipeline. Parameters enable custom pipeline executions and schedules without having to modify the Pipeline definition.\n", + "\n", + "The supported parameter types include:\n", + "\n", + "* `ParameterString` - represents a `str` Python type\n", + "* `ParameterInteger` - represents an `int` Python type\n", + "* `ParameterFloat` - represents a `float` Python type\n", + "\n", + "These parameters support providing a default value, which can be overridden on pipeline execution. The default value specified should be an instance of the type of the parameter.\n", + "\n", + "The parameters defined in this workflow include:\n", + "\n", + "* `processing_instance_type` - The `ml.*` instance type of the processing job.\n", + "* `processing_instance_count` - The instance count of the processing job.\n", + "* `training_instance_type` - The `ml.*` instance type of the training job.\n", + "* `model_approval_status` - What approval status to register the trained model with for CI/CD purposes ( \"PendingManualApproval\" is the default).\n", + "* `input_data` - The S3 bucket URI location of the input data\n", + "* `batch_data` - The S3 bucket URI location of the batch data" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "from sagemaker.workflow.parameters import (\n", + " ParameterInteger,\n", + " ParameterString,\n", + ")\n", + "\n", + "\n", + "processing_instance_count = ParameterInteger(\n", + " name=\"ProcessingInstanceCount\",\n", + " default_value=1\n", + ")\n", + "processing_instance_type = ParameterString(\n", + " name=\"ProcessingInstanceType\",\n", + " default_value=\"ml.m5.xlarge\"\n", + ")\n", + "training_instance_type = ParameterString(\n", + " name=\"TrainingInstanceType\",\n", + " default_value=\"ml.m5.xlarge\"\n", + ")\n", + "model_approval_status = ParameterString(\n", + " name=\"ModelApprovalStatus\",\n", + " default_value=\"PendingManualApproval\"\n", + ")\n", + "input_data = ParameterString(\n", + " name=\"InputData\",\n", + " default_value=input_data_uri,\n", + ")\n", + "batch_data = ParameterString(\n", + " name=\"BatchData\",\n", + " default_value=batch_data_uri,\n", + ")" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "![Define Parameters](img/pipeline-1.png)" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "### Define a Processing Step for Feature Engineering\n", + "\n", + "First, develop a preprocessing script that is specified in the Processing step.\n", + "\n", + "This notebook cell writes a file `preprocessing_abalone.py`, which contains the preprocessing script. You can update the script, and rerun this cell to overwrite. The preprocessing script uses `scikit-learn` to do the following:\n", + "\n", + "* Fill in missing sex category data and encode it so that it is suitable for training.\n", + "* Scale and normalize all numerical fields, aside from sex and rings numerical data.\n", + "* Split the data into training, validation, and test datasets.\n", + "\n", + "The Processing step executes the script on the input data. The Training step uses the preprocessed training features and labels to train a model. The Evaluation step uses the trained model and preprocessed test features and labels to evaluate the model." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "!mkdir -p abalone" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "%%writefile abalone/preprocessing.py\n", + "import argparse\n", + "import os\n", + "import requests\n", + "import tempfile\n", + "\n", + "import numpy as np\n", + "import pandas as pd\n", + "\n", + "from sklearn.compose import ColumnTransformer\n", + "from sklearn.impute import SimpleImputer\n", + "from sklearn.pipeline import Pipeline\n", + "from sklearn.preprocessing import StandardScaler, OneHotEncoder\n", + "\n", + "\n", + "# Since we get a headerless CSV file we specify the column names here.\n", + "feature_columns_names = [\n", + " \"sex\",\n", + " \"length\",\n", + " \"diameter\",\n", + " \"height\",\n", + " \"whole_weight\",\n", + " \"shucked_weight\",\n", + " \"viscera_weight\",\n", + " \"shell_weight\",\n", + "]\n", + "label_column = \"rings\"\n", + "\n", + "feature_columns_dtype = {\n", + " \"sex\": str,\n", + " \"length\": np.float64,\n", + " \"diameter\": np.float64,\n", + " \"height\": np.float64,\n", + " \"whole_weight\": np.float64,\n", + " \"shucked_weight\": np.float64,\n", + " \"viscera_weight\": np.float64,\n", + " \"shell_weight\": np.float64\n", + "}\n", + "label_column_dtype = {\"rings\": np.float64}\n", + "\n", + "\n", + "def merge_two_dicts(x, y):\n", + " z = x.copy()\n", + " z.update(y)\n", + " return z\n", + "\n", + "\n", + "if __name__ == \"__main__\":\n", + " base_dir = \"/opt/ml/processing\"\n", + "\n", + " df = pd.read_csv(\n", + " f\"{base_dir}/input/abalone-dataset.csv\",\n", + " header=None, \n", + " names=feature_columns_names + [label_column],\n", + " dtype=merge_two_dicts(feature_columns_dtype, label_column_dtype)\n", + " )\n", + " numeric_features = list(feature_columns_names)\n", + " numeric_features.remove(\"sex\")\n", + " numeric_transformer = Pipeline(\n", + " steps=[\n", + " (\"imputer\", SimpleImputer(strategy=\"median\")),\n", + " (\"scaler\", StandardScaler())\n", + " ]\n", + " )\n", + "\n", + " categorical_features = [\"sex\"]\n", + " categorical_transformer = Pipeline(\n", + " steps=[\n", + " (\"imputer\", SimpleImputer(strategy=\"constant\", fill_value=\"missing\")),\n", + " (\"onehot\", OneHotEncoder(handle_unknown=\"ignore\"))\n", + " ]\n", + " )\n", + "\n", + " preprocess = ColumnTransformer(\n", + " transformers=[\n", + " (\"num\", numeric_transformer, numeric_features),\n", + " (\"cat\", categorical_transformer, categorical_features)\n", + " ]\n", + " )\n", + " \n", + " y = df.pop(\"rings\")\n", + " X_pre = preprocess.fit_transform(df)\n", + " y_pre = y.to_numpy().reshape(len(y), 1)\n", + " \n", + " X = np.concatenate((y_pre, X_pre), axis=1)\n", + " \n", + " np.random.shuffle(X)\n", + " train, validation, test = np.split(X, [int(.7*len(X)), int(.85*len(X))])\n", + "\n", + " \n", + " pd.DataFrame(train).to_csv(f\"{base_dir}/train/train.csv\", header=False, index=False)\n", + " pd.DataFrame(validation).to_csv(f\"{base_dir}/validation/validation.csv\", header=False, index=False)\n", + " pd.DataFrame(test).to_csv(f\"{base_dir}/test/test.csv\", header=False, index=False)" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "Next, create an instance of an `SKLearnProcessor` processor and use that in our `ProcessingStep`.\n", + "\n", + "You also specify the `framework_version` to use throughout this notebook.\n", + "\n", + "Note the `processing_instance_type` and `processing_instance_count` parameters used by the processor instance." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "from sagemaker.sklearn.processing import SKLearnProcessor\n", + "\n", + "\n", + "framework_version = \"0.23-1\"\n", + "\n", + "sklearn_processor = SKLearnProcessor(\n", + " framework_version=framework_version,\n", + " instance_type=processing_instance_type,\n", + " instance_count=processing_instance_count,\n", + " base_job_name=\"sklearn-abalone-process\",\n", + " role=role,\n", + ")" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "Finally, use the processor instance to construct a `ProcessingStep`, along with the input and output channels, and the code that will be executed when the pipeline invokes pipeline execution. This is similar to a processor instance's `run` method in the Python SDK.\n", + "\n", + "Note the `input_data` parameters passed into `ProcessingStep` is the input data used in the step. This input data is used by the processor instance when it is run.\n", + "\n", + "Also, note the `\"train_data\"` and `\"test_data\"` named channels specified in the output configuration for the processing job. Step `Properties` can be used in subsequent steps and resolve to their runtime values at execution. Specifically, this usage is called out when you define the training step." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "from sagemaker.processing import ProcessingInput, ProcessingOutput\n", + "from sagemaker.workflow.steps import ProcessingStep\n", + " \n", + "\n", + "step_process = ProcessingStep(\n", + " name=\"AbaloneProcess\",\n", + " processor=sklearn_processor,\n", + " inputs=[\n", + " ProcessingInput(source=input_data, destination=\"/opt/ml/processing/input\"), \n", + " ],\n", + " outputs=[\n", + " ProcessingOutput(output_name=\"train\", source=\"/opt/ml/processing/train\"),\n", + " ProcessingOutput(output_name=\"validation\", source=\"/opt/ml/processing/validation\"),\n", + " ProcessingOutput(output_name=\"test\", source=\"/opt/ml/processing/test\")\n", + " ],\n", + " code=\"abalone/preprocessing.py\",\n", + ")" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "![Define a Processing Step for Feature Engineering](img/pipeline-2.png)" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "### Define a Training Step to Train a Model\n", + "\n", + "In this section, use Amazon SageMaker's [XGBoost Algorithm](https://docs.aws.amazon.com/sagemaker/latest/dg/xgboost.html) to train on this dataset. Configure an Estimator for the XGBoost algorithm and the input dataset. A typical training script loads data from the input channels, configures training with hyperparameters, trains a model, and saves a model to `model_dir` so that it can be hosted later.\n", + "\n", + "The model path where the models from training will be saved is also specified.\n", + "\n", + "Note the `training_instance_type` parameter may be used in multiple places in the pipeline. In this case, the `training_instance_type` is passed into the estimator." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "from sagemaker.estimator import Estimator\n", + "\n", + "\n", + "model_path = f\"s3://{default_bucket}/AbaloneTrain\"\n", + "image_uri = sagemaker.image_uris.retrieve(\n", + " framework=\"xgboost\",\n", + " region=region,\n", + " version=\"1.0-1\",\n", + " py_version=\"py3\",\n", + " instance_type=training_instance_type,\n", + ")\n", + "xgb_train = Estimator(\n", + " image_uri=image_uri,\n", + " instance_type=training_instance_type,\n", + " instance_count=1,\n", + " output_path=model_path,\n", + " role=role,\n", + ")\n", + "xgb_train.set_hyperparameters(\n", + " objective=\"reg:linear\",\n", + " num_round=50,\n", + " max_depth=5,\n", + " eta=0.2,\n", + " gamma=4,\n", + " min_child_weight=6,\n", + " subsample=0.7,\n", + " silent=0\n", + ")" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "Finally, use the estimator instance to construct a `TrainingStep` as well as the `properties` of the prior `ProcessingStep` used as input in the `TrainingStep` inputs and the code that's executed when the pipeline invokes the pipeline execution. This is similar to an estimator's `fit` method in the Python SDK.\n", + "\n", + "Pass in the `S3Uri` of the `\"train_data\"` output channel to the `TrainingStep`. Also, use the other `\"test_data\"` output channel for model evaluation in the pipeline. The `properties` attribute of a Pipeline step matches the object model of the corresponding response of a describe call. These properties can be referenced as placeholder values and are resolved at runtime. For example, the `ProcessingStep` `properties` attribute matches the object model of the [DescribeProcessingJob](https://docs.aws.amazon.com/sagemaker/latest/APIReference/API_DescribeProcessingJob.html) response object." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "from sagemaker.inputs import TrainingInput\n", + "from sagemaker.workflow.steps import TrainingStep\n", + "\n", + "\n", + "step_train = TrainingStep(\n", + " name=\"AbaloneTrain\",\n", + " estimator=xgb_train,\n", + " inputs={\n", + " \"train\": TrainingInput(\n", + " s3_data=step_process.properties.ProcessingOutputConfig.Outputs[\n", + " \"train\"\n", + " ].S3Output.S3Uri,\n", + " content_type=\"text/csv\"\n", + " ),\n", + " \"validation\": TrainingInput(\n", + " s3_data=step_process.properties.ProcessingOutputConfig.Outputs[\n", + " \"validation\"\n", + " ].S3Output.S3Uri,\n", + " content_type=\"text/csv\"\n", + " )\n", + " },\n", + ")" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "![Define a Training Step to Train a Model](img/pipeline-3.png)" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "### Define a Model Evaluation Step to Evaluate the Trained Model\n", + "\n", + "First, develop an evaluation script that is specified in a Processing step that performs the model evaluation.\n", + "\n", + "After pipeline execution, you can examine the resulting `evaluation.json` for analysis.\n", + "\n", + "The evaluation script uses `xgboost` to do the following:\n", + "\n", + "* Load the model.\n", + "* Read the test data.\n", + "* Issue predictions against the test data.\n", + "* Build a classification report, including accuracy and ROC curve.\n", + "* Save the evaluation report to the evaluation directory." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "%%writefile abalone/evaluation.py\n", + "import json\n", + "import pathlib\n", + "import pickle\n", + "import tarfile\n", + "\n", + "import joblib\n", + "import numpy as np\n", + "import pandas as pd\n", + "import xgboost\n", + "\n", + "from sklearn.metrics import mean_squared_error\n", + "\n", + "\n", + "if __name__ == \"__main__\":\n", + " model_path = f\"/opt/ml/processing/model/model.tar.gz\"\n", + " with tarfile.open(model_path) as tar:\n", + " tar.extractall(path=\".\")\n", + " \n", + " model = pickle.load(open(\"xgboost-model\", \"rb\"))\n", + "\n", + " test_path = \"/opt/ml/processing/test/test.csv\"\n", + " df = pd.read_csv(test_path, header=None)\n", + " \n", + " y_test = df.iloc[:, 0].to_numpy()\n", + " df.drop(df.columns[0], axis=1, inplace=True)\n", + " \n", + " X_test = xgboost.DMatrix(df.values)\n", + " \n", + " predictions = model.predict(X_test)\n", + "\n", + " mse = mean_squared_error(y_test, predictions)\n", + " std = np.std(y_test - predictions)\n", + " report_dict = {\n", + " \"regression_metrics\": {\n", + " \"mse\": {\n", + " \"value\": mse,\n", + " \"standard_deviation\": std\n", + " },\n", + " },\n", + " }\n", + "\n", + " output_dir = \"/opt/ml/processing/evaluation\"\n", + " pathlib.Path(output_dir).mkdir(parents=True, exist_ok=True)\n", + " \n", + " evaluation_path = f\"{output_dir}/evaluation.json\"\n", + " with open(evaluation_path, \"w\") as f:\n", + " f.write(json.dumps(report_dict))" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "Next, create an instance of a `ScriptProcessor` processor and use it in the `ProcessingStep`.\n", + "\n", + "Note the `processing_instance_type` parameter passed into the processor." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "from sagemaker.processing import ScriptProcessor\n", + "\n", + "\n", + "script_eval = ScriptProcessor(\n", + " image_uri=image_uri,\n", + " command=[\"python3\"],\n", + " instance_type=processing_instance_type,\n", + " instance_count=1,\n", + " base_job_name=\"script-abalone-eval\",\n", + " role=role,\n", + ")" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "Use the processor instance to construct a `ProcessingStep`, along with the input and output channels and the code that will be executed when the pipeline invokes pipeline execution. This is similar to a processor instance's `run` method in the Python SDK.\n", + "\n", + "Specifically, the `S3ModelArtifacts` from the `step_train` `properties` and the `S3Uri` of the `\"test_data\"` output channel of the `step_process` `properties` are passed into the inputs. The `TrainingStep` and `ProcessingStep` `properties` attribute matches the object model of the [DescribeTrainingJob](https://docs.aws.amazon.com/sagemaker/latest/APIReference/API_DescribeTrainingJob.html) and [DescribeProcessingJob](https://docs.aws.amazon.com/sagemaker/latest/APIReference/API_DescribeProcessingJob.html) response objects, respectively." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "from sagemaker.workflow.properties import PropertyFile\n", + "\n", + "\n", + "evaluation_report = PropertyFile(\n", + " name=\"EvaluationReport\",\n", + " output_name=\"evaluation\",\n", + " path=\"evaluation.json\"\n", + ")\n", + "step_eval = ProcessingStep(\n", + " name=\"AbaloneEval\",\n", + " processor=script_eval,\n", + " inputs=[\n", + " ProcessingInput(\n", + " source=step_train.properties.ModelArtifacts.S3ModelArtifacts,\n", + " destination=\"/opt/ml/processing/model\"\n", + " ),\n", + " ProcessingInput(\n", + " source=step_process.properties.ProcessingOutputConfig.Outputs[\n", + " \"test\"\n", + " ].S3Output.S3Uri,\n", + " destination=\"/opt/ml/processing/test\"\n", + " )\n", + " ],\n", + " outputs=[\n", + " ProcessingOutput(output_name=\"evaluation\", source=\"/opt/ml/processing/evaluation\"),\n", + " ],\n", + " code=\"abalone/evaluation.py\",\n", + " property_files=[evaluation_report],\n", + ")" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "![Define a Model Evaluation Step to Evaluate the Trained Model](img/pipeline-4.png)" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "### Define a Create Model Step to Create a Model\n", + "\n", + "In order to perform batch transformation using the example model, create a SageMaker model. \n", + "\n", + "Specifically, pass in the `S3ModelArtifacts` from the `TrainingStep`, `step_train` properties. The `TrainingStep` `properties` attribute matches the object model of the [DescribeTrainingJob](https://docs.aws.amazon.com/sagemaker/latest/APIReference/API_DescribeTrainingJob.html) response object." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "from sagemaker.model import Model\n", + "\n", + "\n", + "model = Model(\n", + " image_uri=image_uri,\n", + " model_data=step_train.properties.ModelArtifacts.S3ModelArtifacts,\n", + " sagemaker_session=sagemaker_session,\n", + " role=role,\n", + ")" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "Supply the model input -- `instance_type` and `accelerator_type` for creating the SageMaker Model and then define the `CreateModelStep` passing in the inputs and the model instance defined before." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "from sagemaker.inputs import CreateModelInput\n", + "from sagemaker.workflow.steps import CreateModelStep\n", + "\n", + "\n", + "inputs = CreateModelInput(\n", + " instance_type=\"ml.m5.large\",\n", + " accelerator_type=\"ml.eia1.medium\",\n", + ")\n", + "step_create_model = CreateModelStep(\n", + " name=\"AbaloneCreateModel\",\n", + " model=model,\n", + " inputs=inputs,\n", + ")" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "### Define a Transform Step to Perform Batch Transformation\n", + "\n", + "Now that a model instance is defined, create a `Transformer` instance with the appropriate model type, compute instance type, and desired output S3 URI.\n", + "\n", + "Specifically, pass in the `ModelName` from the `CreateModelStep`, `step_create_model` properties. The `CreateModelStep` `properties` attribute matches the object model of the [DescribeModel](https://docs.aws.amazon.com/sagemaker/latest/APIReference/API_DescribeModel.html) response object." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "from sagemaker.transformer import Transformer\n", + "\n", + "\n", + "transformer = Transformer(\n", + " model_name=step_create_model.properties.ModelName,\n", + " instance_type=\"ml.m5.xlarge\",\n", + " instance_count=1,\n", + " output_path=f\"s3://{default_bucket}/AbaloneTransform\"\n", + ")" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "Pass in the transformer instance and the `TransformInput` with the `batch_data` pipeline parameter defined earlier." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "from sagemaker.inputs import TransformInput\n", + "from sagemaker.workflow.steps import TransformStep\n", + "\n", + "\n", + "step_transform = TransformStep(\n", + " name=\"AbaloneTransform\",\n", + " transformer=transformer,\n", + " inputs=TransformInput(data=batch_data)\n", + ")" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "### Define a Register Model Step to Create a Model Package\n", + "\n", + "Use the estimator instance specified in the training step to construct an instance of `RegisterModel`. The result of executing `RegisterModel` in a pipeline is a model package. A model package is a reusable model artifacts abstraction that packages all ingredients required for inference. Primarily, it consists of an inference specification that defines the inference image to use along with an optional model weights location.\n", + "\n", + "A model package group is a collection of model packages. A model package group can be created for a specific ML business problem, and new versions of the model packages can be added to it. Typically, customers are expected to create a ModelPackageGroup for a SageMaker pipeline so that model package versions can be added to the group for every SageMaker Pipeline run.\n", + "\n", + "The construction of `RegisterModel` is similar to an estimator instance's `register` method in the Python SDK.\n", + "\n", + "Specifically, pass in the `S3ModelArtifacts` from the `TrainingStep`, `step_train` properties. The `TrainingStep` `properties` attribute matches the object model of the [DescribeTrainingJob](https://docs.aws.amazon.com/sagemaker/latest/APIReference/API_DescribeTrainingJob.html) response object.\n", + "\n", + "Note that the specific model package group name provided in this notebook can be used in the model registry and CI/CD work with SageMaker Projects." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "from sagemaker.model_metrics import MetricsSource, ModelMetrics \n", + "from sagemaker.workflow.step_collections import RegisterModel\n", + "\n", + "\n", + "model_metrics = ModelMetrics(\n", + " model_statistics=MetricsSource(\n", + " s3_uri=\"{}/evaluation.json\".format(\n", + " step_eval.arguments[\"ProcessingOutputConfig\"][\"Outputs\"][0][\"S3Output\"][\"S3Uri\"]\n", + " ),\n", + " content_type=\"application/json\"\n", + " )\n", + ")\n", + "step_register = RegisterModel(\n", + " name=\"AbaloneRegisterModel\",\n", + " estimator=xgb_train,\n", + " model_data=step_train.properties.ModelArtifacts.S3ModelArtifacts,\n", + " content_types=[\"text/csv\"],\n", + " response_types=[\"text/csv\"],\n", + " inference_instances=[\"ml.t2.medium\", \"ml.m5.xlarge\"],\n", + " transform_instances=[\"ml.m5.xlarge\"],\n", + " model_package_group_name=model_package_group_name,\n", + " approval_status=model_approval_status,\n", + " model_metrics=model_metrics,\n", + ")" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "![Define a Create Model Step and Batch Transform to Process Data in Batch at Scale](img/pipeline-5.png)" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "### Define a Condition Step to Check Accuracy and Conditionally Create a Model and Run a Batch Transformation and Register a Model in the Model Registry\n", + "\n", + "In this step, the model is registered only if the accuracy of the model, as determined by the evaluation step `step_eval`, exceeded a specified value. A `ConditionStep` enables pipelines to support conditional execution in the pipeline DAG based on the conditions of the step properties. \n", + "\n", + "In the following section, you:\n", + "\n", + "* Define a `ConditionLessThanOrEqualTo` on the accuracy value found in the output of the evaluation step, `step_eval`.\n", + "* Use the condition in the list of conditions in a `ConditionStep`.\n", + "* Pass the `CreateModelStep` and `TransformStep` steps, and the `RegisterModel` step collection into the `if_steps` of the `ConditionStep`, which are only executed, if the condition evaluates to `True`." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "from sagemaker.workflow.conditions import ConditionLessThanOrEqualTo\n", + "from sagemaker.workflow.condition_step import (\n", + " ConditionStep,\n", + " JsonGet,\n", + ")\n", + "\n", + "\n", + "cond_lte = ConditionLessThanOrEqualTo(\n", + " left=JsonGet(\n", + " step=step_eval,\n", + " property_file=evaluation_report,\n", + " json_path=\"regression_metrics.mse.value\",\n", + " ),\n", + " right=6.0\n", + ")\n", + "\n", + "step_cond = ConditionStep(\n", + " name=\"AbaloneMSECond\",\n", + " conditions=[cond_lte],\n", + " if_steps=[step_register, step_create_model, step_transform],\n", + " else_steps=[], \n", + ")" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "![Define a Condition Step to Check Accuracy and Conditionally Execute Steps](img/pipeline-6.png)" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "### Define a Pipeline of Parameters, Steps, and Conditions\n", + "\n", + "In this section, combine the steps into a Pipeline so it can be executed.\n", + "\n", + "A pipeline requires a `name`, `parameters`, and `steps`. Names must be unique within an `(account, region)` pair.\n", + "\n", + "Note:\n", + "\n", + "* All of the parameters used in the definitions must be present.\n", + "* Steps passed into the pipeline do not have to be listed in the order of execution. The SageMaker Pipeline service resolves the _data dependency_ DAG as steps for the execution to complete.\n", + "* Steps must be unique to across the pipeline step list and all condition step if/else lists." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "from sagemaker.workflow.pipeline import Pipeline\n", + "\n", + "\n", + "pipeline_name = f\"AbalonePipeline\"\n", + "pipeline = Pipeline(\n", + " name=pipeline_name,\n", + " parameters=[\n", + " processing_instance_type, \n", + " processing_instance_count,\n", + " training_instance_type,\n", + " model_approval_status,\n", + " input_data,\n", + " batch_data,\n", + " ],\n", + " steps=[step_process, step_train, step_eval, step_cond],\n", + ")" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "![Define a Pipeline of Parameters, Steps, and Conditions](img/pipeline-7.png)" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "#### (Optional) Examining the pipeline definition\n", + "\n", + "The JSON of the pipeline definition can be examined to confirm the pipeline is well-defined and the parameters and step properties resolve correctly." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "import json\n", + "\n", + "\n", + "definition = json.loads(pipeline.definition())\n", + "definition" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "### Submit the pipeline to SageMaker and start execution\n", + "\n", + "Submit the pipeline definition to the Pipeline service. The role passed in will be used by the Pipeline service to create all the jobs defined in the steps." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "pipeline.upsert(role_arn=role)" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "Start the pipeline and accept all of the default parameters." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "execution = pipeline.start()" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "### Pipeline Operations: Examining and Waiting for Pipeline Execution\n", + "\n", + "Describe the pipeline execution." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "execution.describe()" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "Wait for the execution to complete." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "execution.wait()" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "List the steps in the execution. These are the steps in the pipeline that have been resolved by the step executor service." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "execution.list_steps()" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "### Examining the Evalution\n", + "\n", + "Examine the resulting model evaluation after the pipeline completes. Download the resulting `evaluation.json` file from S3 and print the report." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "from pprint import pprint\n", + "\n", + "\n", + "evaluation_json = sagemaker.s3.S3Downloader.read_file(\"{}/evaluation.json\".format(\n", + " step_eval.arguments[\"ProcessingOutputConfig\"][\"Outputs\"][0][\"S3Output\"][\"S3Uri\"]\n", + "))\n", + "pprint(json.loads(evaluation_json))" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "### Lineage\n", + "\n", + "Review the lineage of the artifacts generated by the pipeline." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "import time\n", + "from sagemaker.lineage.visualizer import LineageTableVisualizer\n", + "\n", + "\n", + "viz = LineageTableVisualizer(sagemaker.session.Session())\n", + "for execution_step in reversed(execution.list_steps()):\n", + " print(execution_step)\n", + " display(viz.show(pipeline_execution_step=execution_step))\n", + " time.sleep(5)" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "### Parametrized Executions\n", + "\n", + "You can run additional executions of the pipeline and specify different pipeline parameters. The parameters argument is a dictionary containing parameter names, and where the values are used to override the defaults values.\n", + "\n", + "Based on the performance of the model, you might want to kick off another pipeline execution on a compute-optimized instance type and set the model approval status to \"Approved\" automatically. This means that the model package version generated by the `RegisterModel` step is automatically ready for deployment through CI/CD pipelines, such as with SageMaker Projects." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "execution = pipeline.start(\n", + " parameters=dict(\n", + " ProcessingInstanceType=\"ml.c5.xlarge\",\n", + " ModelApprovalStatus=\"Approved\",\n", + " )\n", + ")" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "execution.wait()" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "execution.list_steps()" + ] + } + ], + "metadata": { + "instance_type": "ml.t3.medium", + "kernelspec": { + "display_name": "Python 3 (Data Science)", + "language": "python", + "name": "python3__SAGEMAKER_INTERNAL__arn:aws:sagemaker:us-east-2:429704687514:image/datascience-1.0" + }, + "language_info": { + "codemirror_mode": { + "name": "ipython", + "version": 3 + }, + "file_extension": ".py", + "mimetype": "text/x-python", + "name": "python", + "nbconvert_exporter": "python", + "pygments_lexer": "ipython3", + "version": "3.7.6" + } + }, + "nbformat": 4, + "nbformat_minor": 4 +}