Skip to content

Commit

Permalink
adding sample MLOps for batch code to the GitHub repo
Browse files Browse the repository at this point in the history
  • Loading branch information
Hasan Shojaei committed Jun 22, 2023
1 parent e5656ca commit 0ef25a2
Show file tree
Hide file tree
Showing 26 changed files with 2,679 additions and 6 deletions.
4 changes: 4 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
# General
.DS_Store
.AppleDouble
.LSOverride
90 changes: 90 additions & 0 deletions .gitlab-ci.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
variables:
TF_ROOT: .
TF_STATE_NAME: "${CI_COMMIT_REF_SLUG}"
CACHE_FALLBACK_KEY: "test-state"
LAMBDA_FILE_NAME: "lambda_function_payload.zip" # Same as the Lambda function name in lambda.tf file.
GIT_CLEAN_FLAGS: "none"
REGION: "us-east-1" # Modify the default value if your resources are in a different region.

cache:
key: "${CI_COMMIT_REF_SLUG}"
paths:
- ${TF_ROOT}/.terraform

stages:
- pipeline
- prepare
- validate
- build
- deploy
- destroy

buildpipeline:
stage: pipeline
script:
- yum install python3 python3-pip unzip -y
- curl "https://awscli.amazonaws.com/awscli-exe-linux-x86_64.zip" -o "awscliv2.zip"
- unzip -o awscliv2.zip
- ./aws/install
- export AWS_DEFAULT_REGION=${REGION}
- aws sagemaker list-domains
- pip3 install boto3 sagemaker pandas pyyaml
- python3 ./pipeline_scripts/train_monitor_pipeline.py
- python3 ./pipeline_scripts/hpo_with_monitor_pipeline.py
- python3 ./pipeline_scripts/inference_with_monitor_pipeline.py

init:
stage: prepare
script:
- gitlab-terraform init

validate:
stage: validate
script:
- yum install zip -y
- zip -j ${LAMBDA_FILE_NAME} code/lambda_function/index.py
- gitlab-terraform validate
- ls -lart
artifacts:
paths:
- ${TF_ROOT}/${LAMBDA_FILE_NAME}
expire_in: 1 week

plan:
stage: build
script:
- yum install -y jq # jq is required by gitlab-terraform
- gitlab-terraform plan -var-file env_files/dev_env.tfvars
- gitlab-terraform plan-json -var-file env_files/dev_env.tfvars
artifacts:
name: plan
paths:
- ${TF_ROOT}/plan.cache
- ${TF_ROOT}/${LAMBDA_FILE_NAME}
reports:
terraform: ${TF_ROOT}/plan.json

apply:
stage: deploy
script:
- gitlab-terraform apply
dependencies:
- plan
when: manual

destroy:
stage: destroy
script:
- gitlab-terraform plan -var-file env_files/dev_env.tfvars
- gitlab-terraform destroy -var-file env_files/dev_env.tfvars
when: manual
dependencies:
- plan

.setup:
script:
- yum install -y git python3 python3-pip
- pip3 --no-cache-dir install --upgrade awscli
- echo "USERID - $(aws sts get-caller-identity --query 'UserId' --output text)"
- echo "AccountID - $(aws sts get-caller-identity --query 'Account' --output text)"
- echo "Region - $(aws configure get region)"
98 changes: 92 additions & 6 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,17 +1,103 @@
## My Project
# MLOps for Batch Inference with Amazon SageMaker, HashiCorp Terraform, and GitLab CI/CD
This repository provides an illustrative example of creating an MLOps workflow to manage batch inference workloads in production, including data/model monitoring and automatic re-training.

TODO: Fill this README out!
You'll find:

Be sure to:
- Details of the MLOps pipeline components with SageMaker, Terraform, and GitLab CI/CD.
- A demonstration of Infrastructure as Code (IaC) with Terraform.
- A structured repository with scripts and data directories for easy reference.

* Change the title in this README
* Edit your repository description on GitHub

## Prerequisites
Before proceeding with the usage of this repository, ensure that you meet the following prerequisites:

- An AWS account
- Amazon SageMaker Studio
- SageMaker execution role with S3 read/write and KMS encrypt/decrypt permissions
- One S3 bucket for storing data, scripts, and model artifacts
- Terraform version 0.13.5 or greater
- GitLab with a working docker runner for pipeline execution
- AWS CLI
- jq
- unzip
- Python3 (Python 3.7 or greater) and the following Python packages:
- boto3
- sagemaker
- pandas
- pyyaml

## Architecture Diagram
The figure below depicts the architecture diagram for the solution in this repository.

![example](diagram/architecture_diagram.png)

## Variables and Configuration
The table below shows the variables that are used to parameterize this solution.

| Name | Description | Type | Default |
|------|-------------|------|---------|
|bucket_name|An existing s3 bucket that is used to store data, scripts, and model artifacts|String|N/A|
|bucket_prefix|S3 prefix for the ML project|String|N/A|
|bucket_train_prefix|S3 prefix for training data|String|N/A|
|bucket_inf_prefix|S3 prefix for inference data|String|N/A|
|notification_function_name|Name of the Lambda function that prepares and sends notification messages about SageMaker Pipelines step state change|String|N/A|
|custom_notification_config|The configuration for customizing notification message for specific SageMaker Pipelines steps when a specific pipeline execution status is detected|String|N/A|
|email_recipient|The email address list for receiving SageMaker Pipelines step state change notification|List(String)|N/A|
|pipeline_inf|Name of the SageMaker inference pipeline|String|InferencePipeline|
|pipeline_train|Name of the SageMaker training pipeline|String|TrainingPipeline|
|pipeline_trainwhpo|Name of SageMaker training pipeline with HPO|String|TrainingWHPOPipeline|
|recreate_pipelines|If set to true, the 3 existing SageMaker Pipelines (training, inference, training with HPO) will be deleted and new ones will be created when GitLab CICD is run|Bool|true|
|model_package_group_name|Name of the model package group|String|N/A|
|accuracy_mse_threshold|Maximum value of MSE before requiring an update to the model|Float|10.00|
|role_arn|The IAM role ARN of the SageMaker pipeline execution role|String|N/A|
|kms_key|KMS key ARN for S3 and SageMaker encryption|ARN|N/A|
|subnet_id|Subnet ID for SageMaker networking configuration|String|N/A|
|sg_id|Security Group Id for SageMaker networking configuration|String|N/A|
|upload_training_data|If set to true, training data will be uploaded to S3, and this upload operation will trigger the execution of the Training Pipeline|Bool|true|
|upload_inference_data|If set to true, inference data will be uploaded to S3, and this upload operation will trigger the execution of the Inference Pipeline|Bool|false|
|user_id|The employee ID of the SageMaker user which is added as a tag to SageMaker resources|String|N/A|


## Solution Deployment

Follow these steps to deploy the solution in your AWS account:

1. Clone the GitHub repository into your workspace.
2. Review and modify the GitLab CICD pipeline configuration to suit your environment. The configuration is specified in the `./gitlab-ci.yaml` file.
3. Update the general solution variables in the `./env_files/dev_env.tfvars` file. This file contains variables for both Python scripts and Terraform automation.

a. Check the additional SageMaker Pipelines parameters that are defined in the YAML files under "./batch_scoring_pipeline/pipeline_scripts". Review and update the parameters if necessary.

4. Review SageMaker Pipelines creation scripts in `./pipeline_scripts/` as well as the scripts that are referenced by them in `./scripts/` folder. The example scripts provided in this repo are based on the [Abalone dataset](https://archive.ics.uci.edu/dataset/1/abalone). If you are going to use a different dataset, ensure you update the scripts to suit your particular problem.

5. Put your data files into the `./data` folder following the below naming convention. If you are using the Abalone dataset along with the provided example scripts, ensure the data files are headerless, the training data include both independent and target variables with the original order of columns preserved, the inference data only include independent variables, and the ground truth file only include the target variable.

a. training-data.csv
b. inference-data.csv
c. ground-truth.csv

6. Commit and push the code to the repository to trigger the GitLab CICD pipeline run (1st run).

a. Note: The first pipeline run will fail on the “pipeline” stage since there’s no approved model version yet for the inference pipeline script to use. Review the step log and verify a new SageMaker Pipeline named "Training Pipeline" has been successfully created.

b. Open SageMaker Studio UI, review and execute the Training Pipeline.
c. After the successful execution of the "Training Pipeline", approve the registered model version in SageMaker Model Registry; and re-run the entire GitLab CI/CD pipeline.
7. Review the Terraform plan output in the build stage. Approve the manual `apply` stage in the GitLab CICD pipeline to resume the pipeline execution and authorize Terraform to create the monitoring and notification resources in your AWS account.
8. Finally review the SageMaker pipelines execution status and output in the SageMaker Studio UI and check your email for notification messages. The default message body is in JSON format.

# Clean Up
Upon concluding your exploration of this solution, you can clean up the resources by following these steps:

1. Employ the `destroy` stage in the GitLab CI/CD pipeline to eliminate all resources provisioned by Terraform.
2. Utilize the AWS CLI to `list` and `remove` any remaining pipelines that are created by the Python scripts.
3. (Optional) Delete other AWS resources such as the S3 bucket or IAM role created outside the CI/CD pipeline.

## Security

See [CONTRIBUTING](CONTRIBUTING.md#security-issue-notifications) for more information.

## License

This library is licensed under the MIT-0 License. See the LICENSE file.
This sample code is made available under the MIT-0 license. See the LICENSE file.

91 changes: 91 additions & 0 deletions code/lambda_function/index.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
"""Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved"""
import os
import json
import logging
import boto3
from botocore.exceptions import ClientError

logger = logging.getLogger()
logger.setLevel(logging.INFO)
sns_client = boto3.client('sns')

TOPIC_ARN = os.environ['TOPIC_ARN']


def sm_pipeline_exec_notification(pipeline_event):
"""To send notification email for FAILED sagemaker pipeline execution."""

execution_details = pipeline_event["execution_details"]
exec_status = execution_details["currentPipelineExecutionStatus"]
pipeline_name = execution_details["pipelineArn"].split("/")[-1]
email_subject = f"SageMaker Pipeline: {pipeline_name} Execution - {exec_status}" # noqa: E501

try:
sns_response = sns_client.publish(
TopicArn=TOPIC_ARN,
Subject=email_subject,
Message=json.dumps(exec_status,sort_keys=True, indent=4)
)
logger.info("Email of pipeline failure was sent successfully. %s",sns_response)

except Exception as error:
logger.error(error)


def sm_pipeline_step_notification(pipeline_event):
"""To send notification email for SELECTED pipeline steps."""
execution_details = pipeline_event["execution_details"]
logger.debug(execution_details.items())
step_name = execution_details["stepName"]
current_step_status = execution_details["currentStepStatus"]
addon_msg = "None"
email_subject = f"SageMaker Pipeline Step: {step_name} Execution - {current_step_status}" # noqa: E501

###############################################################
# Adding extra custom message for predefined steps and status #
###############################################################
if "notification_setup_list" in pipeline_event:
for custom_notification in pipeline_event["notification_setup_list"]: # noqa: E501
if custom_notification["step_name"] == step_name and custom_notification["step_status"] == current_step_status: # noqa: E501
addon_msg = custom_notification["step_msg"]
email_subject += " - Action Required"

execution_details["Addon_Message"]=str(addon_msg)
try:
sns_response = sns_client.publish(
TopicArn=TOPIC_ARN,
Subject=email_subject,
Message=json.dumps(execution_details, sort_keys=True, indent=4)
)

logger.info("Email of pipeline step change was sent successfully.\n %s",sns_response)
except Exception as error:
logger.error(error)


def processing_sm_pipeline_notification(event):
"""To send notification email based on the pipeline execution types."""
if "Execution Step Status Change" in event["pipeline_update"]:
logger.info("Pipeline Step Status Notification")
sm_pipeline_step_notification(event)

elif "Pipeline Execution Status Change" in event["pipeline_update"]:
logger.info("Pipeline Execution Status Notification")
sm_pipeline_exec_notification(event)
else:
logger.error("Invalid pipeline update event.")


def lambda_handler(event, context):
"""Lambda_handler"""
logger.info(event)
logger.debug(context)

try:
if "pipeline_update" in event:
logger.info("Received pipeline status notification")
processing_sm_pipeline_notification(event)

except KeyError as client_error:
logger.error("The event trigger: %s is invalid!", client_error)
return False
Binary file added diagram/architecture_diagram.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
35 changes: 35 additions & 0 deletions env_files/dev_env.tfvars
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
bucket_name = # e.g. "sagemaker-bucket-abcdefg12345"
bucket_prefix = # e.g. "pipeline_shared/batch-scoring"
bucket_train_prefix = # e.g. "training/data/raw"
bucket_inf_prefix = # e.g. "batch-scoring/data/raw"
email_recipient = # e.g. ["[email protected]", "[email protected]"]
user_id = # e.g. "a12345"

custom_notification_config = [
{
step_name = # e.g. "ModelRegisterStep"
step_status = # e.g. "Succeeded"
step_custom_message = # e.g. "This is the custom message for Succeeded \"ModelRegisterStep\" step."
},
{
step_name = # e.g. "TrainingEval"
step_status = # e.g. "Failed"
step_custom_message = # e.g. "This is the custom message for Failed \"TrainingEval\" step."
}
]

# Pipeline information. Will be used by the Python helper script.
accuracy_mse_threshold = # e.g. "10.0"
kms_key = # e.g. "arn:aws:kms:us-east-1:112233445566:key/123456a1-12b1-123c-1234-12345df12e12"
model_package_group_name = # e.g. "poc-mpg"
notification_function_name = # e.g. "pipeline-notification-lambda"
pipeline_inf = # e.g. "InferencePipeline"
pipeline_train = # e.g. "TrainingPipeline"
pipeline_trainwhpo = # e.g. "TrainingWHPOPipeline"

recreate_pipelines = # e.g. "true"
role_arn = # e.g. "arn:aws:iam::112233445566:role/112233445566/sm_execution_role_batch_scoring"
sg_id = # e.g. "sg-0a12b3c45b67de8f9"
subnet_id = # "subnet-01a23bcdef45ghij6"
upload_inference_data = # e.g. "false"
upload_training_data = # e.g. "true"
Loading

0 comments on commit 0ef25a2

Please sign in to comment.