From cb45a4b109365c65b7e5854bdde87e1cde20f654 Mon Sep 17 00:00:00 2001 From: Xinlu Tu Date: Thu, 10 Nov 2022 09:26:07 -0800 Subject: [PATCH] fix: bug on AutoMLInput to allow PipelineVariable (#736) Co-authored-by: Xinlu Tu --- src/sagemaker/automl/automl.py | 38 ++++++++----- src/sagemaker/workflow/automl_step.py | 12 ++-- .../sagemaker/workflow/test_automl_steps.py | 57 ++++++++++++++++--- tests/integ/test_auto_ml.py | 1 - tests/unit/sagemaker/automl/test_auto_ml.py | 44 +++++++++++++- .../sagemaker/workflow/test_automl_step.py | 8 +-- 6 files changed, 123 insertions(+), 37 deletions(-) diff --git a/src/sagemaker/automl/automl.py b/src/sagemaker/automl/automl.py index a1cb1442be..41192fbe03 100644 --- a/src/sagemaker/automl/automl.py +++ b/src/sagemaker/automl/automl.py @@ -22,6 +22,7 @@ from sagemaker.job import _Job from sagemaker.session import Session from sagemaker.utils import name_from_base +from sagemaker.workflow.entities import PipelineVariable from sagemaker.workflow.pipeline_context import runnable_by_pipeline logger = logging.getLogger("sagemaker") @@ -44,18 +45,20 @@ def __init__( ): """Convert an S3 Uri or a list of S3 Uri to an AutoMLInput object. - :param inputs (str, list[str]): a string or a list of string that points to (a) - S3 location(s) where input data is stored. - :param target_attribute_name (str): the target attribute name for regression - or classification. - :param compression (str): if training data is compressed, the compression type. - The default value is None. - :param channel_type (str): The channel type an enum to specify - whether the input resource is for training or validation. - Valid values: training or validation. - :param content_type (str): The content type of the data from the input source. - :param s3_data_type (str): The data type for S3 data source. - Valid values: ManifestFile or S3Prefix. + Args: + inputs (str, list[str], PipelineVariable): + a string or a list of string or a PipelineVariable that points to (a) + S3 location(s) where input data is stored. + target_attribute_name (str): the target attribute name for regression + or classification. + compression (str): if training data is compressed, the compression type. + The default value is None. + channel_type (str): The channel type an enum to specify + whether the input resource is for training or validation. + Valid values: training or validation. + content_type (str): The content type of the data from the input source. + s3_data_type (str): The data type for S3 data source. + Valid values: ManifestFile or S3Prefix. """ self.inputs = inputs self.target_attribute_name = target_attribute_name @@ -70,6 +73,8 @@ def to_request_dict(self): auto_ml_input = [] if isinstance(self.inputs, string_types): self.inputs = [self.inputs] + if isinstance(self.inputs, PipelineVariable): + self.inputs = [self.inputs] for entry in self.inputs: input_entry = { "DataSource": {"S3DataSource": {"S3DataType": "S3Prefix", "S3Uri": entry}}, @@ -106,7 +111,7 @@ def __init__( max_candidates: Optional[int] = None, max_runtime_per_training_job_in_seconds: Optional[int] = None, total_job_runtime_in_seconds: Optional[int] = None, - job_objective: Optional[str] = None, + job_objective: Optional[Dict[str, str]] = None, generate_candidate_definitions_only: Optional[bool] = False, tags: Optional[List[Dict[str, str]]] = None, content_type: Optional[str] = None, @@ -142,8 +147,9 @@ def __init__( that each training job executed inside hyperparameter tuning is allowed to run as part of a hyperparameter tuning job. total_job_runtime_in_seconds (int): the total wait time of an AutoML job. - job_objective (str): Defines the objective metric + job_objective (dict[str, str]): Defines the objective metric used to measure the predictive quality of an AutoML job. + In the format of: {"MetricName": str} generate_candidate_definitions_only (bool): Whether to generates possible candidates without training the models. tags (List[dict[str, str]]): The list of tags to attach to this @@ -969,8 +975,10 @@ def _prepare_auto_ml_stop_condition( Returns (dict): an AutoML CompletionCriteria. """ - stopping_condition = {"MaxCandidates": max_candidates} + stopping_condition = {} + if max_candidates is not None: + stopping_condition["MaxCandidates"] = max_candidates if max_runtime_per_training_job_in_seconds is not None: stopping_condition[ "MaxRuntimePerTrainingJobInSeconds" diff --git a/src/sagemaker/workflow/automl_step.py b/src/sagemaker/workflow/automl_step.py index e1127c956f..4878a648bb 100644 --- a/src/sagemaker/workflow/automl_step.py +++ b/src/sagemaker/workflow/automl_step.py @@ -71,15 +71,15 @@ def __init__( root_property = Properties(step_name=name, shape_name="DescribeAutoMLJobResponse") - best_candidate_properties = Properties(step_name=name, path="bestCandidateProperties") - best_candidate_properties.__dict__["modelInsightsJsonReportPath"] = Properties( - step_name=name, path="bestCandidateProperties.modelInsightsJsonReportPath" + best_candidate_properties = Properties(step_name=name, path="BestCandidateProperties") + best_candidate_properties.__dict__["ModelInsightsJsonReportPath"] = Properties( + step_name=name, path="BestCandidateProperties.ModelInsightsJsonReportPath" ) - best_candidate_properties.__dict__["explainabilityJsonReportPath"] = Properties( - step_name=name, path="bestCandidateProperties.explainabilityJsonReportPath" + best_candidate_properties.__dict__["ExplainabilityJsonReportPath"] = Properties( + step_name=name, path="BestCandidateProperties.ExplainabilityJsonReportPath" ) - root_property.__dict__["bestCandidateProperties"] = best_candidate_properties + root_property.__dict__["BestCandidateProperties"] = best_candidate_properties self._properties = root_property @property diff --git a/tests/integ/sagemaker/workflow/test_automl_steps.py b/tests/integ/sagemaker/workflow/test_automl_steps.py index c97fac0890..eb557777d8 100644 --- a/tests/integ/sagemaker/workflow/test_automl_steps.py +++ b/tests/integ/sagemaker/workflow/test_automl_steps.py @@ -14,14 +14,15 @@ import os +import boto3 import pytest from botocore.exceptions import WaiterError +from sagemaker.workflow import ParameterString from sagemaker.workflow.automl_step import AutoMLStep from sagemaker.automl.automl import AutoML, AutoMLInput -from sagemaker import utils, get_execution_role -from sagemaker.utils import unique_name_from_base +from sagemaker import utils, get_execution_role, ModelMetrics, MetricsSource from sagemaker.workflow.model_step import ModelStep from sagemaker.workflow.pipeline import Pipeline @@ -50,10 +51,8 @@ def test_automl_step(pipeline_session, role, pipeline_name): role=role, target_attribute_name=TARGET_ATTRIBUTE_NAME, sagemaker_session=pipeline_session, - max_candidates=1, mode=MODE, ) - job_name = unique_name_from_base("auto-ml", max_length=32) s3_input_training = pipeline_session.upload_data( path=TRAINING_DATA, key_prefix=PREFIX + "/input" ) @@ -72,7 +71,7 @@ def test_automl_step(pipeline_session, role, pipeline_name): ) inputs = [input_training, input_validation] - step_args = auto_ml.fit(inputs=inputs, job_name=job_name) + step_args = auto_ml.fit(inputs=inputs) automl_step = AutoMLStep( name="MyAutoMLStep", @@ -80,19 +79,48 @@ def test_automl_step(pipeline_session, role, pipeline_name): ) automl_model = automl_step.get_best_auto_ml_model(sagemaker_session=pipeline_session, role=role) - step_args_create_model = automl_model.create( instance_type="c4.4xlarge", ) - automl_model_step = ModelStep( name="MyAutoMLModelStep", step_args=step_args_create_model, ) + model_package_group_name = ParameterString( + name="ModelPackageName", default_value="AutoMlModelPackageGroup" + ) + model_approval_status = ParameterString(name="ModelApprovalStatus", default_value="Approved") + model_metrics = ModelMetrics( + model_statistics=MetricsSource( + s3_uri=automl_step.properties.BestCandidateProperties.ModelInsightsJsonReportPath, + content_type="application/json", + ), + explainability=MetricsSource( + s3_uri=automl_step.properties.BestCandidateProperties.ExplainabilityJsonReportPath, + content_type="application/json", + ), + ) + step_args_register_model = automl_model.register( + content_types=["text/csv"], + response_types=["text/csv"], + inference_instances=["ml.m5.xlarge"], + transform_instances=["ml.m5.xlarge"], + model_package_group_name=model_package_group_name, + approval_status=model_approval_status, + model_metrics=model_metrics, + ) + register_model_step = ModelStep( + name="ModelRegistrationStep", step_args=step_args_register_model + ) + pipeline = Pipeline( name=pipeline_name, - steps=[automl_step, automl_model_step], + parameters=[ + model_approval_status, + model_package_group_name, + ], + steps=[automl_step, automl_model_step, register_model_step], sagemaker_session=pipeline_session, ) @@ -114,9 +142,20 @@ def test_automl_step(pipeline_session, role, pipeline_name): assert step["Metadata"]["AutoMLJob"]["Arn"] is not None assert has_automl_job - assert len(execution_steps) == 2 + assert len(execution_steps) == 3 finally: try: + sagemaker_client = boto3.client("sagemaker") + for model_package in sagemaker_client.list_model_packages( + ModelPackageGroupName="AutoMlModelPackageGroup" + )["ModelPackageSummaryList"]: + sagemaker_client.delete_model_package( + ModelPackageName=model_package["ModelPackageArn"] + ) + sagemaker_client.delete_model_package_group( + ModelPackageGroupName="AutoMlModelPackageGroup" + ) + pipeline.delete() except Exception: pass diff --git a/tests/integ/test_auto_ml.py b/tests/integ/test_auto_ml.py index 2ce21034ea..f8d1610d2b 100644 --- a/tests/integ/test_auto_ml.py +++ b/tests/integ/test_auto_ml.py @@ -78,7 +78,6 @@ def test_auto_ml_fit_local_input(sagemaker_session): role=ROLE, target_attribute_name=TARGET_ATTRIBUTE_NAME, sagemaker_session=sagemaker_session, - max_candidates=1, generate_candidate_definitions_only=True, ) diff --git a/tests/unit/sagemaker/automl/test_auto_ml.py b/tests/unit/sagemaker/automl/test_auto_ml.py index 42438338cc..2c997397c5 100644 --- a/tests/unit/sagemaker/automl/test_auto_ml.py +++ b/tests/unit/sagemaker/automl/test_auto_ml.py @@ -18,6 +18,7 @@ from mock import Mock, patch from sagemaker import AutoML, AutoMLJob, AutoMLInput, CandidateEstimator, PipelineModel from sagemaker.predictor import Predictor +from sagemaker.workflow.functions import Join MODEL_DATA = "s3://bucket/model.tar.gz" MODEL_IMAGE = "mi" @@ -52,7 +53,7 @@ MAX_RUNTIME_PER_TRAINING_JOB = 3600 TOTAL_JOB_RUNTIME = 36000 TARGET_OBJECTIVE = "0.01" -JOB_OBJECTIVE = {"fake job objective"} +JOB_OBJECTIVE = {"MetricName": "F1"} TAGS = [{"Name": "some-tag", "Value": "value-for-tag"}] CONTENT_TYPE = "x-application/vnd.amazon+parquet" S3_DATA_TYPE = "ManifestFile" @@ -503,7 +504,46 @@ def test_auto_ml_default_fit(strftime, sagemaker_session): ], "output_config": {"S3OutputPath": DEFAULT_OUTPUT_PATH}, "auto_ml_job_config": { - "CompletionCriteria": {"MaxCandidates": DEFAULT_MAX_CANDIDATES}, + "CompletionCriteria": {}, + "SecurityConfig": { + "EnableInterContainerTrafficEncryption": ENCRYPT_INTER_CONTAINER_TRAFFIC + }, + }, + "role": ROLE, + "job_name": DEFAULT_JOB_NAME, + "problem_type": None, + "job_objective": None, + "generate_candidate_definitions_only": GENERATE_CANDIDATE_DEFINITIONS_ONLY, + "tags": None, + } + + +@patch("time.strftime", return_value=TIMESTAMP) +def test_auto_ml_default_fit_with_pipeline_variable(strftime, sagemaker_session): + auto_ml = AutoML( + role=ROLE, + target_attribute_name=TARGET_ATTRIBUTE_NAME, + sagemaker_session=sagemaker_session, + ) + inputs = Join(on="/", values=[DEFAULT_S3_INPUT_DATA, "ProcessingJobName"]) + auto_ml.fit(inputs=AutoMLInput(inputs=inputs, target_attribute_name=TARGET_ATTRIBUTE_NAME)) + sagemaker_session.auto_ml.assert_called_once() + _, args = sagemaker_session.auto_ml.call_args + assert args == { + "input_config": [ + { + "DataSource": { + "S3DataSource": { + "S3DataType": "S3Prefix", + "S3Uri": Join(on="/", values=["s3://mybucket/data", "ProcessingJobName"]), + } + }, + "TargetAttributeName": TARGET_ATTRIBUTE_NAME, + } + ], + "output_config": {"S3OutputPath": DEFAULT_OUTPUT_PATH}, + "auto_ml_job_config": { + "CompletionCriteria": {}, "SecurityConfig": { "EnableInterContainerTrafficEncryption": ENCRYPT_INTER_CONTAINER_TRAFFIC }, diff --git a/tests/unit/sagemaker/workflow/test_automl_step.py b/tests/unit/sagemaker/workflow/test_automl_step.py index 248ed22257..891e38a37f 100644 --- a/tests/unit/sagemaker/workflow/test_automl_step.py +++ b/tests/unit/sagemaker/workflow/test_automl_step.py @@ -231,11 +231,11 @@ def test_single_automl_step_with_parameter(pipeline_session): step_args=step_args, ) - assert automl_step.properties.bestCandidateProperties.modelInsightsJsonReportPath.expr == { - "Get": "Steps.MyAutoMLStep.bestCandidateProperties.modelInsightsJsonReportPath" + assert automl_step.properties.BestCandidateProperties.ModelInsightsJsonReportPath.expr == { + "Get": "Steps.MyAutoMLStep.BestCandidateProperties.ModelInsightsJsonReportPath" } - assert automl_step.properties.bestCandidateProperties.explainabilityJsonReportPath.expr == { - "Get": "Steps.MyAutoMLStep.bestCandidateProperties.explainabilityJsonReportPath" + assert automl_step.properties.BestCandidateProperties.ExplainabilityJsonReportPath.expr == { + "Get": "Steps.MyAutoMLStep.BestCandidateProperties.ExplainabilityJsonReportPath" } pipeline = Pipeline(