From ec8a38524fb09630769487a597f85cb821c4c6b2 Mon Sep 17 00:00:00 2001 From: Kevin Hu Date: Wed, 23 Jun 2021 14:28:18 -0700 Subject: [PATCH 01/22] Create common AWS config --- .../datahub/ingestion/source/aws_common.py | 87 +++++++++++++++++++ .../src/datahub/ingestion/source/glue.py | 85 +----------------- 2 files changed, 89 insertions(+), 83 deletions(-) create mode 100644 metadata-ingestion/src/datahub/ingestion/source/aws_common.py diff --git a/metadata-ingestion/src/datahub/ingestion/source/aws_common.py b/metadata-ingestion/src/datahub/ingestion/source/aws_common.py new file mode 100644 index 00000000000000..073a80e3a1f9fe --- /dev/null +++ b/metadata-ingestion/src/datahub/ingestion/source/aws_common.py @@ -0,0 +1,87 @@ +from functools import reduce +from typing import List, Optional, Union + +import boto3 + +from datahub.configuration import ConfigModel +from datahub.configuration.common import AllowDenyPattern + + +def assume_role( + role_arn: str, aws_region: str, credentials: Optional[dict] = None +) -> dict: + credentials = credentials or {} + sts_client = boto3.client( + "sts", + region_name=aws_region, + aws_access_key_id=credentials.get("AccessKeyId"), + aws_secret_access_key=credentials.get("SecretAccessKey"), + aws_session_token=credentials.get("SessionToken"), + ) + + assumed_role_object = sts_client.assume_role( + RoleArn=role_arn, RoleSessionName="DatahubIngestionSourceGlue" + ) + return assumed_role_object["Credentials"] + + +class AwsSourceConfig(ConfigModel): + env: str = "PROD" + + database_pattern: AllowDenyPattern = AllowDenyPattern.allow_all() + table_pattern: AllowDenyPattern = AllowDenyPattern.allow_all() + + aws_access_key_id: Optional[str] = None + aws_secret_access_key: Optional[str] = None + aws_session_token: Optional[str] = None + aws_role: Optional[Union[str, List[str]]] = None + aws_region: str + + def get_client(self, service: str) -> boto3.client: + if ( + self.aws_access_key_id + and self.aws_secret_access_key + and self.aws_session_token + ): + return boto3.client( + service, + aws_access_key_id=self.aws_access_key_id, + aws_secret_access_key=self.aws_secret_access_key, + aws_session_token=self.aws_session_token, + region_name=self.aws_region, + ) + elif self.aws_access_key_id and self.aws_secret_access_key: + return boto3.client( + service, + aws_access_key_id=self.aws_access_key_id, + aws_secret_access_key=self.aws_secret_access_key, + region_name=self.aws_region, + ) + elif self.aws_role: + if isinstance(self.aws_role, str): + credentials = assume_role(self.aws_role, self.aws_region) + else: + credentials = reduce( + lambda new_credentials, role_arn: assume_role( + role_arn, self.aws_region, new_credentials + ), + self.aws_role, + {}, + ) + return boto3.client( + service, + aws_access_key_id=credentials["AccessKeyId"], + aws_secret_access_key=credentials["SecretAccessKey"], + aws_session_token=credentials["SessionToken"], + region_name=self.aws_region, + ) + else: + return boto3.client(service, region_name=self.aws_region) + + @property + def glue_client(self): + return self.get_client("glue") + + @property + def s3_client(self): + return self.get_client("s3") diff --git a/metadata-ingestion/src/datahub/ingestion/source/glue.py b/metadata-ingestion/src/datahub/ingestion/source/glue.py index 1692760ac327f1..13add19a92febe 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/glue.py +++ b/metadata-ingestion/src/datahub/ingestion/source/glue.py @@ -4,17 +4,13 @@ from collections import defaultdict from dataclasses import dataclass from dataclasses import field as dataclass_field -from functools import reduce from typing import Any, Dict, Iterable, Iterator, List, Optional, Set, Tuple, Union from urllib.parse import urlparse -import boto3 - -from datahub.configuration import ConfigModel -from datahub.configuration.common import AllowDenyPattern from datahub.emitter import mce_builder from datahub.ingestion.api.common import PipelineContext from datahub.ingestion.api.source import Source, SourceReport +from datahub.ingestion.source.aws_common import AwsSourceConfig from datahub.ingestion.source.metadata_common import MetadataWorkUnit from datahub.metadata.com.linkedin.pegasus2avro.common import AuditStamp, Status from datahub.metadata.com.linkedin.pegasus2avro.metadata.snapshot import DatasetSnapshot @@ -50,87 +46,10 @@ ) -def assume_role( - role_arn: str, aws_region: str, credentials: Optional[dict] = None -) -> dict: - credentials = credentials or {} - sts_client = boto3.client( - "sts", - region_name=aws_region, - aws_access_key_id=credentials.get("AccessKeyId"), - aws_secret_access_key=credentials.get("SecretAccessKey"), - aws_session_token=credentials.get("SessionToken"), - ) - - assumed_role_object = sts_client.assume_role( - RoleArn=role_arn, RoleSessionName="DatahubIngestionSourceGlue" - ) - return assumed_role_object["Credentials"] - - -class GlueSourceConfig(ConfigModel): - env: str = "PROD" - - database_pattern: AllowDenyPattern = AllowDenyPattern.allow_all() - table_pattern: AllowDenyPattern = AllowDenyPattern.allow_all() +class GlueSourceConfig(AwsSourceConfig): extract_transforms: Optional[bool] = True - aws_access_key_id: Optional[str] = None - aws_secret_access_key: Optional[str] = None - aws_session_token: Optional[str] = None - aws_role: Optional[Union[str, List[str]]] = None - aws_region: str - - def get_client(self, service: str) -> boto3.client: - if ( - self.aws_access_key_id - and self.aws_secret_access_key - and self.aws_session_token - ): - return boto3.client( - service, - aws_access_key_id=self.aws_access_key_id, - aws_secret_access_key=self.aws_secret_access_key, - aws_session_token=self.aws_session_token, - region_name=self.aws_region, - ) - elif self.aws_access_key_id and self.aws_secret_access_key: - return boto3.client( - service, - aws_access_key_id=self.aws_access_key_id, - aws_secret_access_key=self.aws_secret_access_key, - region_name=self.aws_region, - ) - elif self.aws_role: - if isinstance(self.aws_role, str): - credentials = assume_role(self.aws_role, self.aws_region) - else: - credentials = reduce( - lambda new_credentials, role_arn: assume_role( - role_arn, self.aws_region, new_credentials - ), - self.aws_role, - {}, - ) - return boto3.client( - service, - aws_access_key_id=credentials["AccessKeyId"], - aws_secret_access_key=credentials["SecretAccessKey"], - aws_session_token=credentials["SessionToken"], - region_name=self.aws_region, - ) - else: - return boto3.client(service, region_name=self.aws_region) - - @property - def glue_client(self): - return self.get_client("glue") - - @property - def s3_client(self): - return self.get_client("s3") - @dataclass class GlueSourceReport(SourceReport): From d3bf61248a0f04bfcc282152cd1f5c7117d93820 Mon Sep 17 00:00:00 2001 From: Kevin Hu Date: Wed, 23 Jun 2021 14:37:21 -0700 Subject: [PATCH 02/22] Init sagemaker --- metadata-ingestion/README.md | 22 ++++++++ .../datahub/ingestion/source/aws_common.py | 8 --- .../src/datahub/ingestion/source/glue.py | 8 +++ .../src/datahub/ingestion/source/sagemaker.py | 56 +++++++++++++++++++ 4 files changed, 86 insertions(+), 8 deletions(-) create mode 100644 metadata-ingestion/src/datahub/ingestion/source/sagemaker.py diff --git a/metadata-ingestion/README.md b/metadata-ingestion/README.md index 918440ff2c8594..38e89ab87c0892 100644 --- a/metadata-ingestion/README.md +++ b/metadata-ingestion/README.md @@ -45,6 +45,7 @@ We use a plugin architecture so that you can install only the dependencies you a | oracle | `pip install 'acryl-datahub[oracle]'` | Oracle source | | postgres | `pip install 'acryl-datahub[postgres]'` | Postgres source | | redshift | `pip install 'acryl-datahub[redshift]'` | Redshift source | +| sagemaker | `pip install 'acryl-datahub[sagemaker]'` | AWS SageMaker source | | sqlalchemy | `pip install 'acryl-datahub[sqlalchemy]'` | Generic SQLAlchemy source | | snowflake | `pip install 'acryl-datahub[snowflake]'` | Snowflake source | | superset | `pip install 'acryl-datahub[superset]'` | Superset source | @@ -344,6 +345,27 @@ source: # options is same as above ``` +### AWS SageMaker `sagemaker` + +Extracts: + +- Feature groups (support for models and jobs coming soon!) + +```yml +source: + type: glue + config: + aws_region: # aws_region_name, i.e. "eu-west-1" + env: # environment for the DatasetSnapshot URN, one of "DEV", "EI", "PROD" or "CORP". Defaults to "PROD". + + # Credentials. If not specified here, these are picked up according to boto3 rules. + # (see https://boto3.amazonaws.com/v1/documentation/api/latest/guide/credentials.html) + aws_access_key_id: # Optional. + aws_secret_access_key: # Optional. + aws_session_token: # Optional. + aws_role: # Optional (Role chaining supported by using a sorted list). +``` + ### Snowflake `snowflake` Extracts: diff --git a/metadata-ingestion/src/datahub/ingestion/source/aws_common.py b/metadata-ingestion/src/datahub/ingestion/source/aws_common.py index 073a80e3a1f9fe..3b52d83882741d 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/aws_common.py +++ b/metadata-ingestion/src/datahub/ingestion/source/aws_common.py @@ -77,11 +77,3 @@ def get_client(self, service: str) -> boto3.client: ) else: return boto3.client(service, region_name=self.aws_region) - - @property - def glue_client(self): - return self.get_client("glue") - - @property - def s3_client(self): - return self.get_client("s3") diff --git a/metadata-ingestion/src/datahub/ingestion/source/glue.py b/metadata-ingestion/src/datahub/ingestion/source/glue.py index 13add19a92febe..dcca265e94be22 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/glue.py +++ b/metadata-ingestion/src/datahub/ingestion/source/glue.py @@ -50,6 +50,14 @@ class GlueSourceConfig(AwsSourceConfig): extract_transforms: Optional[bool] = True + @property + def glue_client(self): + return self.get_client("glue") + + @property + def s3_client(self): + return self.get_client("s3") + @dataclass class GlueSourceReport(SourceReport): diff --git a/metadata-ingestion/src/datahub/ingestion/source/sagemaker.py b/metadata-ingestion/src/datahub/ingestion/source/sagemaker.py new file mode 100644 index 00000000000000..6fad358c15ab9e --- /dev/null +++ b/metadata-ingestion/src/datahub/ingestion/source/sagemaker.py @@ -0,0 +1,56 @@ +from dataclasses import dataclass +from dataclasses import field as dataclass_field +from typing import List + +from datahub.ingestion.api.common import PipelineContext +from datahub.ingestion.api.source import Source, SourceReport +from datahub.ingestion.source.aws_common import AwsSourceConfig + + +class SagemakerSourceConfig(AwsSourceConfig): + @property + def sagemaker_client(self): + return self.get_client("samgemaker") + + +@dataclass +class SagemakerSourceReport(SourceReport): + tables_scanned = 0 + filtered: List[str] = dataclass_field(default_factory=list) + + def report_table_scanned(self) -> None: + self.tables_scanned += 1 + + def report_table_dropped(self, table: str) -> None: + self.filtered.append(table) + + +class GlueSource(Source): + source_config: SagemakerSourceConfig + report = SagemakerSourceReport() + + def __init__(self, config: SagemakerSourceConfig, ctx: PipelineContext): + super().__init__(ctx) + self.source_config = config + self.report = SagemakerSourceReport() + self.sagemaker_client = config.sagemaker_client + self.env = config.env + + @classmethod + def create(cls, config_dict, ctx): + config = SagemakerSourceConfig.parse_obj(config_dict) + return cls(config, ctx) + + def get_all_feature_groups(self): + """ + List all feature groups in SageMaker. + """ + + feature_groups = [] + + # see https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/sagemaker.html#SageMaker.Client.list_feature_groups + paginator = self.sagemaker_client.get_paginator("list_feature_groups") + for page in paginator.paginate(): + feature_groups += page["FeatureGroupSummaries"] + + return feature_groups From 3db0a58d5e2681a54683b7396a07aaaadc7dc5eb Mon Sep 17 00:00:00 2001 From: Kevin Hu Date: Wed, 23 Jun 2021 14:38:25 -0700 Subject: [PATCH 03/22] Common AWS dependencies --- metadata-ingestion/setup.py | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/metadata-ingestion/setup.py b/metadata-ingestion/setup.py index 869c846985a5f8..831abfdf54a998 100644 --- a/metadata-ingestion/setup.py +++ b/metadata-ingestion/setup.py @@ -57,6 +57,11 @@ def get_long_description(): "sqlalchemy==1.3.24", } +aws_common = { + # AWS Python SDK + "boto3" +} + # Note: for all of these, framework_common will be added. plugins: Dict[str, Set[str]] = { # Sink plugins. @@ -72,7 +77,7 @@ def get_long_description(): "bigquery": sql_common | {"pybigquery >= 0.6.0"}, "druid": sql_common | {"pydruid>=0.6.2"}, "feast": {"docker"}, - "glue": {"boto3"}, + "glue": aws_common, "hive": sql_common | { # Acryl Data maintains a fork of PyHive, which adds support for table comments @@ -89,6 +94,7 @@ def get_long_description(): "oracle": sql_common | {"cx_Oracle"}, "postgres": sql_common | {"psycopg2-binary", "GeoAlchemy2"}, "redshift": sql_common | {"sqlalchemy-redshift", "psycopg2-binary", "GeoAlchemy2"}, + "sagemaker": aws_common, "snowflake": sql_common | {"snowflake-sqlalchemy"}, "superset": {"requests"}, } From d282559e51fc8be42f81922d34deccdcdb835e50 Mon Sep 17 00:00:00 2001 From: Kevin Hu Date: Wed, 23 Jun 2021 14:50:27 -0700 Subject: [PATCH 04/22] Get features in feature group --- .../src/datahub/ingestion/source/sagemaker.py | 27 +++++++++++++++++-- 1 file changed, 25 insertions(+), 2 deletions(-) diff --git a/metadata-ingestion/src/datahub/ingestion/source/sagemaker.py b/metadata-ingestion/src/datahub/ingestion/source/sagemaker.py index 6fad358c15ab9e..9dfcfb4b65fe61 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/sagemaker.py +++ b/metadata-ingestion/src/datahub/ingestion/source/sagemaker.py @@ -1,6 +1,6 @@ from dataclasses import dataclass from dataclasses import field as dataclass_field -from typing import List +from typing import Any, Dict, List from datahub.ingestion.api.common import PipelineContext from datahub.ingestion.api.source import Source, SourceReport @@ -41,7 +41,7 @@ def create(cls, config_dict, ctx): config = SagemakerSourceConfig.parse_obj(config_dict) return cls(config, ctx) - def get_all_feature_groups(self): + def get_all_feature_groups(self) -> List[Dict[str, Any]]: """ List all feature groups in SageMaker. """ @@ -54,3 +54,26 @@ def get_all_feature_groups(self): feature_groups += page["FeatureGroupSummaries"] return feature_groups + + def get_feature_group_details(self, feature_group_name: str) -> Dict[str, Any]: + """ + Get details of a feature group (including list of component features). + """ + + feature_group = self.sagemaker_client.describe_feature_group( + FeatureGroupName=feature_group_name + ) + + next_token = feature_group.get("NextToken") + + # paginate over feature group features + while next_token is not None: + next_features = self.sagemaker_client.describe_feature_group( + FeatureGroupName=feature_group_name, NextToken=next_token + ) + feature_group["FeatureDefinitions"].append( + next_features["FeatureDefinitions"] + ) + next_token = feature_group.get("NextToken") + + return feature_group From 8f455c9184ecc86a4529796bdc76a6b51dbc9e96 Mon Sep 17 00:00:00 2001 From: Kevin Hu Date: Wed, 23 Jun 2021 14:58:30 -0700 Subject: [PATCH 05/22] Ingest feature groups --- .../src/datahub/ingestion/source/sagemaker.py | 76 ++++++++++++++++++- 1 file changed, 75 insertions(+), 1 deletion(-) diff --git a/metadata-ingestion/src/datahub/ingestion/source/sagemaker.py b/metadata-ingestion/src/datahub/ingestion/source/sagemaker.py index 9dfcfb4b65fe61..b8b03aeb9d7874 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/sagemaker.py +++ b/metadata-ingestion/src/datahub/ingestion/source/sagemaker.py @@ -1,10 +1,25 @@ from dataclasses import dataclass from dataclasses import field as dataclass_field -from typing import Any, Dict, List +from typing import Any, Dict, Iterable, List +import datahub.emitter.mce_builder as builder +from datahub.configuration.common import ConfigModel from datahub.ingestion.api.common import PipelineContext from datahub.ingestion.api.source import Source, SourceReport from datahub.ingestion.source.aws_common import AwsSourceConfig +from datahub.ingestion.source.metadata_common import MetadataWorkUnit +from datahub.metadata.com.linkedin.pegasus2avro.common import MLFeatureDataType +from datahub.metadata.com.linkedin.pegasus2avro.metadata.snapshot import ( + MLFeatureSnapshot, + MLFeatureTableSnapshot, + MLPrimaryKeySnapshot, +) +from datahub.metadata.com.linkedin.pegasus2avro.mxe import MetadataChangeEvent +from datahub.metadata.schema_classes import ( + MLFeaturePropertiesClass, + MLFeatureTablePropertiesClass, + MLPrimaryKeyPropertiesClass, +) class SagemakerSourceConfig(AwsSourceConfig): @@ -77,3 +92,62 @@ def get_feature_group_details(self, feature_group_name: str) -> Dict[str, Any]: next_token = feature_group.get("NextToken") return feature_group + + def get_feature_group_wu( + self, feature_group_details: Dict[str, Any] + ) -> MetadataWorkUnit: + """ + Generate an MLFeatureTable workunit for a SageMaker feature group. + + Parameters + ---------- + feature_group_details: + ingested SageMaker feature group from get_feature_group_details() + """ + + feature_group_name = feature_group_details["FeatureGroupName"] + + feature_group_snapshot = MLFeatureTableSnapshot( + urn=builder.make_ml_feature_table_urn("sagemaker", feature_group_name), + aspects=[], + ) + + feature_group_snapshot.aspects.append( + MLFeatureTablePropertiesClass( + mlFeatures=[ + builder.make_ml_feature_urn( + feature_group_name, + feature["FeatureName"], + ) + for feature in feature_group_details["FeatureDefinitions"] + ], + mlPrimaryKeys=[ + builder.make_ml_primary_key_urn( + feature_group_name, + feature_group_details["RecordIdentifierFeatureName"], + ) + ], + ) + ) + + # make the MCE and workunit + mce = MetadataChangeEvent(proposedSnapshot=feature_group_snapshot) + return MetadataWorkUnit(id=feature_group_name, mce=mce) + + def get_workunits(self) -> Iterable[MetadataWorkUnit]: + + feature_groups = self.get_all_feature_groups() + + for feature_group in feature_groups: + + feature_group_details = self.get_feature_group_details( + feature_group["FeatureGroupName"] + ) + + yield self.get_feature_group_wu(feature_group_details) + + def get_report(self): + return self.report + + def close(self): + pass From 2bfb88235c3029391082e65d435f14a28da36e97 Mon Sep 17 00:00:00 2001 From: Kevin Hu Date: Wed, 23 Jun 2021 16:37:18 -0700 Subject: [PATCH 06/22] Add example ingestion config --- metadata-ingestion/README.md | 2 +- .../examples/recipes/sagemaker_to_datahub.yml | 9 ++ .../src/datahub/ingestion/source/sagemaker.py | 87 ++++++++++++++++++- 3 files changed, 96 insertions(+), 2 deletions(-) create mode 100644 metadata-ingestion/examples/recipes/sagemaker_to_datahub.yml diff --git a/metadata-ingestion/README.md b/metadata-ingestion/README.md index 38e89ab87c0892..8e73225e0b0935 100644 --- a/metadata-ingestion/README.md +++ b/metadata-ingestion/README.md @@ -349,7 +349,7 @@ source: Extracts: -- Feature groups (support for models and jobs coming soon!) +- Feature groups (support for models, jobs, and more coming soon!) ```yml source: diff --git a/metadata-ingestion/examples/recipes/sagemaker_to_datahub.yml b/metadata-ingestion/examples/recipes/sagemaker_to_datahub.yml new file mode 100644 index 00000000000000..45fba2535663ab --- /dev/null +++ b/metadata-ingestion/examples/recipes/sagemaker_to_datahub.yml @@ -0,0 +1,9 @@ +source: + type: glue + config: + aws_region: "us-west-2" + +sink: + type: "datahub-rest" + config: + server: "http://localhost:8080" diff --git a/metadata-ingestion/src/datahub/ingestion/source/sagemaker.py b/metadata-ingestion/src/datahub/ingestion/source/sagemaker.py index b8b03aeb9d7874..a917304f15da3f 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/sagemaker.py +++ b/metadata-ingestion/src/datahub/ingestion/source/sagemaker.py @@ -40,7 +40,7 @@ def report_table_dropped(self, table: str) -> None: self.filtered.append(table) -class GlueSource(Source): +class SagemakerSource(Source): source_config: SagemakerSourceConfig report = SagemakerSourceReport() @@ -134,6 +134,89 @@ def get_feature_group_wu( mce = MetadataChangeEvent(proposedSnapshot=feature_group_snapshot) return MetadataWorkUnit(id=feature_group_name, mce=mce) + def get_feature_type(self, aws_type: str, feature_name: str) -> str: + + mapped_type = { + "String": MLFeatureDataType.TEXT, + "Integral": MLFeatureDataType.ORDINAL, + "Fractional": MLFeatureDataType.CONTINUOUS, + }.get(aws_type) + + if mapped_type is None: + self.report.report_warning( + feature_name, f"unable to map type {aws_type} to metadata schema" + ) + mapped_type = MLFeatureDataType.UNKNOWN + + return mapped_type + + def get_feature_wu( + self, feature_group_details: Dict[str, Any], feature: Dict[str, Any] + ) -> MetadataWorkUnit: + """ + Generate an MLFeature workunit for a SageMaker feature. + + Parameters + ---------- + feature_group_details: + ingested SageMaker feature group from get_feature_group_details() + feature: + ingested SageMaker feature + """ + + # create snapshot instance for the feature + feature_snapshot = MLFeatureSnapshot( + urn=builder.make_ml_feature_urn( + feature_group_details["FeatureGroupName"], + feature["FeatureName"], + ), + aspects=[], + ) + + feature_sources = [] + + if "OfflineStoreConfig" in feature_group_details: + + # remove S3 prefix (s3://) + s3_name = feature_group_details["OfflineStoreConfig"]["S3StorageConfig"]["S3Uri"][5:] + + if s3_name.endswith("/"): + s3_name = s3_name[:-1] + + feature_sources.append( + builder.make_dataset_urn( + "s3", + s3_name, + self.source_config.env, + ) + ) + + glue_database = feature_group_details["OfflineStoreConfig"]["DataCatalogConfig"]["Database"] + glue_table = feature_group_details["OfflineStoreConfig"]["DataCatalogConfig"]["TableName"] + + full_table_name = f"{node_args['database']}.{node_args['table_name']}" + + # we know that the table will already be covered when ingesting Glue tables + node_urn = f"urn:li:dataset:(urn:li:dataPlatform:glue,{full_table_name},{self.env})" + + # note that there's also an OnlineStoreConfig field, but this + # lack enough metadata to create a dataset + # (only specifies the security config and whether it's enabled at all) + + # append feature name and type + feature_snapshot.aspects.append( + MLFeaturePropertiesClass( + dataType=self.get_feature_type( + feature["FeatureType"], feature["FeatureName"] + ), + sources=feature_sources, + ) + ) + + # make the MCE and workunit + mce = MetadataChangeEvent(proposedSnapshot=feature_snapshot) + return MetadataWorkUnit(id=feature["FeatureName"], mce=mce) + def get_workunits(self) -> Iterable[MetadataWorkUnit]: feature_groups = self.get_all_feature_groups() @@ -144,6 +227,8 @@ def get_workunits(self) -> Iterable[MetadataWorkUnit]: feature_group["FeatureGroupName"] ) + for feature in feature_groups + yield self.get_feature_group_wu(feature_group_details) def get_report(self): From 5bff9f4f34b9b6dc989d3e5d8634824f482a3535 Mon Sep 17 00:00:00 2001 From: Kevin Hu Date: Wed, 23 Jun 2021 16:51:51 -0700 Subject: [PATCH 07/22] Fix feature ingestion --- .../examples/recipes/sagemaker_to_datahub.yml | 2 +- metadata-ingestion/setup.py | 2 ++ .../src/datahub/ingestion/source/sagemaker.py | 22 ++++++++++++++----- 3 files changed, 19 insertions(+), 7 deletions(-) diff --git a/metadata-ingestion/examples/recipes/sagemaker_to_datahub.yml b/metadata-ingestion/examples/recipes/sagemaker_to_datahub.yml index 45fba2535663ab..3f94d2c7ba901f 100644 --- a/metadata-ingestion/examples/recipes/sagemaker_to_datahub.yml +++ b/metadata-ingestion/examples/recipes/sagemaker_to_datahub.yml @@ -1,5 +1,5 @@ source: - type: glue + type: sagemaker config: aws_region: "us-west-2" diff --git a/metadata-ingestion/setup.py b/metadata-ingestion/setup.py index 831abfdf54a998..f6b01b55c47393 100644 --- a/metadata-ingestion/setup.py +++ b/metadata-ingestion/setup.py @@ -151,6 +151,7 @@ def get_long_description(): "glue", "hive", "oracle", + "sagemaker", "datahub-kafka", "datahub-rest", # airflow is added below @@ -188,6 +189,7 @@ def get_long_description(): "druid = datahub.ingestion.source.druid:DruidSource", "feast = datahub.ingestion.source.feast:FeastSource", "glue = datahub.ingestion.source.glue:GlueSource", + "sagemaker = datahub.ingestion.source.sagemaker:SagemakerSource", "hive = datahub.ingestion.source.hive:HiveSource", "kafka = datahub.ingestion.source.kafka:KafkaSource", "kafka-connect = datahub.ingestion.source.kafka_connect:KafkaConnectSource", diff --git a/metadata-ingestion/src/datahub/ingestion/source/sagemaker.py b/metadata-ingestion/src/datahub/ingestion/source/sagemaker.py index a917304f15da3f..5e0e2480665015 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/sagemaker.py +++ b/metadata-ingestion/src/datahub/ingestion/source/sagemaker.py @@ -25,7 +25,7 @@ class SagemakerSourceConfig(AwsSourceConfig): @property def sagemaker_client(self): - return self.get_client("samgemaker") + return self.get_client("sagemaker") @dataclass @@ -178,7 +178,9 @@ def get_feature_wu( if "OfflineStoreConfig" in feature_group_details: # remove S3 prefix (s3://) - s3_name = feature_group_details["OfflineStoreConfig"]["S3StorageConfig"]["S3Uri"][5:] + s3_name = feature_group_details["OfflineStoreConfig"]["S3StorageConfig"][ + "S3Uri" + ][5:] if s3_name.endswith("/"): s3_name = s3_name[:-1] @@ -191,8 +193,12 @@ def get_feature_wu( ) ) - glue_database = feature_group_details["OfflineStoreConfig"]["DataCatalogConfig"]["Database"] - glue_table = feature_group_details["OfflineStoreConfig"]["DataCatalogConfig"]["TableName"] + glue_database = feature_group_details["OfflineStoreConfig"][ + "DataCatalogConfig" + ]["Database"] + glue_table = feature_group_details["OfflineStoreConfig"][ + "DataCatalogConfig" + ]["TableName"] full_table_name = f"{node_args['database']}.{node_args['table_name']}" @@ -215,7 +221,10 @@ def get_feature_wu( # make the MCE and workunit mce = MetadataChangeEvent(proposedSnapshot=feature_snapshot) - return MetadataWorkUnit(id=feature["FeatureName"], mce=mce) + return MetadataWorkUnit( + id=f'{feature_group_details["FeatureGroupName"]}-{feature["FeatureName"]}', + mce=mce, + ) def get_workunits(self) -> Iterable[MetadataWorkUnit]: @@ -227,7 +236,8 @@ def get_workunits(self) -> Iterable[MetadataWorkUnit]: feature_group["FeatureGroupName"] ) - for feature in feature_groups + for feature in feature_group_details["FeatureDefinitions"]: + yield self.get_feature_wu(feature_group_details, feature) yield self.get_feature_group_wu(feature_group_details) From 44ecb58a033576430c0921a27ccbd12f54432ad6 Mon Sep 17 00:00:00 2001 From: Kevin Hu Date: Wed, 23 Jun 2021 16:55:31 -0700 Subject: [PATCH 08/22] Append Glue data catalog source --- .../src/datahub/ingestion/source/sagemaker.py | 14 +++++++++++--- 1 file changed, 11 insertions(+), 3 deletions(-) diff --git a/metadata-ingestion/src/datahub/ingestion/source/sagemaker.py b/metadata-ingestion/src/datahub/ingestion/source/sagemaker.py index 5e0e2480665015..b788e25b1ca266 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/sagemaker.py +++ b/metadata-ingestion/src/datahub/ingestion/source/sagemaker.py @@ -200,10 +200,18 @@ def get_feature_wu( "DataCatalogConfig" ]["TableName"] - full_table_name = f"{node_args['database']}.{node_args['table_name']}" + full_table_name = f"{glue_database}.{glue_table}" - # we know that the table will already be covered when ingesting Glue tables - node_urn = f"urn:li:dataset:(urn:li:dataPlatform:glue,{full_table_name},{self.env})" + self.report.report_warning( + full_table_name, + f"""Note: table {full_table_name} is an AWS Glue object. + To view full table metadata, run Glue ingestion + (see https://datahubproject.io/docs/metadata-ingestion/#aws-glue-glue)""", + ) + + feature_sources.append( + f"urn:li:dataset:(urn:li:dataPlatform:glue,{full_table_name},{self.source_config.env})" + ) # note that there's also an OnlineStoreConfig field, but this # lack enough metadata to create a dataset From d660a9b247c3576fe20fa9c9911c47bc3c2514eb Mon Sep 17 00:00:00 2001 From: Kevin Hu Date: Wed, 23 Jun 2021 17:18:58 -0700 Subject: [PATCH 09/22] Handle primary key ingestion --- .../src/datahub/ingestion/source/sagemaker.py | 69 +++++++++++++------ 1 file changed, 48 insertions(+), 21 deletions(-) diff --git a/metadata-ingestion/src/datahub/ingestion/source/sagemaker.py b/metadata-ingestion/src/datahub/ingestion/source/sagemaker.py index b788e25b1ca266..8359ad8a09408e 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/sagemaker.py +++ b/metadata-ingestion/src/datahub/ingestion/source/sagemaker.py @@ -1,6 +1,6 @@ from dataclasses import dataclass from dataclasses import field as dataclass_field -from typing import Any, Dict, Iterable, List +from typing import Any, Dict, Iterable, List, Union import datahub.emitter.mce_builder as builder from datahub.configuration.common import ConfigModel @@ -164,13 +164,10 @@ def get_feature_wu( ingested SageMaker feature """ - # create snapshot instance for the feature - feature_snapshot = MLFeatureSnapshot( - urn=builder.make_ml_feature_urn( - feature_group_details["FeatureGroupName"], - feature["FeatureName"], - ), - aspects=[], + # if the feature acts as the record identifier, then we ingest it as an MLPrimaryKey + is_record_identifier = ( + feature_group_details["RecordIdentifierFeatureName"] + == feature["FeatureName"] ) feature_sources = [] @@ -214,25 +211,55 @@ def get_feature_wu( ) # note that there's also an OnlineStoreConfig field, but this - # lack enough metadata to create a dataset + # lacks enough metadata to create a dataset # (only specifies the security config and whether it's enabled at all) # append feature name and type - feature_snapshot.aspects.append( - MLFeaturePropertiesClass( - dataType=self.get_feature_type( - feature["FeatureType"], feature["FeatureName"] + if is_record_identifier: + primary_key_snapshot: MLPrimaryKeySnapshot = MLPrimaryKeySnapshot( + urn=builder.make_ml_primary_key_urn( + feature_group_details["FeatureGroupName"], + feature["FeatureName"], ), - sources=feature_sources, + aspects=[ + MLPrimaryKeyPropertiesClass( + dataType=self.get_feature_type( + feature["FeatureType"], feature["FeatureName"] + ), + sources=feature_sources, + ), + ], ) - ) - # make the MCE and workunit - mce = MetadataChangeEvent(proposedSnapshot=feature_snapshot) - return MetadataWorkUnit( - id=f'{feature_group_details["FeatureGroupName"]}-{feature["FeatureName"]}', - mce=mce, - ) + # make the MCE and workunit + mce = MetadataChangeEvent(proposedSnapshot=primary_key_snapshot) + return MetadataWorkUnit( + id=f'{feature_group_details["FeatureGroupName"]}-{feature["FeatureName"]}', + mce=mce, + ) + else: + # create snapshot instance for the feature + feature_snapshot: MLFeatureSnapshot = MLFeatureSnapshot( + urn=builder.make_ml_feature_urn( + feature_group_details["FeatureGroupName"], + feature["FeatureName"], + ), + aspects=[ + MLFeaturePropertiesClass( + dataType=self.get_feature_type( + feature["FeatureType"], feature["FeatureName"] + ), + sources=feature_sources, + ) + ], + ) + + # make the MCE and workunit + mce = MetadataChangeEvent(proposedSnapshot=feature_snapshot) + return MetadataWorkUnit( + id=f'{feature_group_details["FeatureGroupName"]}-{feature["FeatureName"]}', + mce=mce, + ) def get_workunits(self) -> Iterable[MetadataWorkUnit]: From 0259845c0a94ff6f6b66e592fc380a1e36453878 Mon Sep 17 00:00:00 2001 From: Kevin Hu Date: Wed, 23 Jun 2021 17:34:48 -0700 Subject: [PATCH 10/22] Init tests and stubs --- .../tests/unit/test_sagemaker_source.py | 69 +++++++++++++++++++ .../tests/unit/test_sagemaker_source_stubs.py | 57 +++++++++++++++ 2 files changed, 126 insertions(+) create mode 100644 metadata-ingestion/tests/unit/test_sagemaker_source.py create mode 100644 metadata-ingestion/tests/unit/test_sagemaker_source_stubs.py diff --git a/metadata-ingestion/tests/unit/test_sagemaker_source.py b/metadata-ingestion/tests/unit/test_sagemaker_source.py new file mode 100644 index 00000000000000..98c68d39302fa5 --- /dev/null +++ b/metadata-ingestion/tests/unit/test_sagemaker_source.py @@ -0,0 +1,69 @@ +import json + +from botocore.stub import Stubber +from freezegun import freeze_time + +from datahub.ingestion.api.common import PipelineContext +from datahub.ingestion.source.glue import ( + SagemakerSource, + SagemakerSourceConfig, + get_column_type, +) +from datahub.metadata.com.linkedin.pegasus2avro.schema import ( + ArrayTypeClass, + MapTypeClass, + SchemaFieldDataType, + StringTypeClass, +) +from tests.test_helpers import mce_helpers +from tests.unit.test_sagemaker_source_stubs import ( + describe_feature_group_response_1, + describe_feature_group_response_2, + list_feature_groups_response, +) + +FROZEN_TIME = "2020-04-14 07:00:00" + + +def sagemaker_source() -> SagemakerSource: + return SagemakerSource( + ctx=PipelineContext(run_id="glue-source-test"), + config=SagemakerSourceConfig(aws_region="us-west-2"), + ) + + +@freeze_time(FROZEN_TIME) +def test_sagemaker_ingest(tmp_path, pytestconfig): + + sagemaker_source_instance = sagemaker_source() + + with Stubber(sagemaker_source_instance.sagemaker_client) as sagemaker_stubber: + + sagemaker_stubber.add_response( + "list_feature_groups", list_feature_groups_response, {} + ) + sagemaker_stubber.add_response( + "describe_feature_group", + describe_feature_group_response_1, + {"FeatureGroupName": "test"}, + ) + sagemaker_stubber.add_response( + "describe_feature_group", + describe_feature_group_response_2, + {"FeatureGroupName": "test-1"}, + ) + + mce_objects = [ + wu.mce.to_obj() for wu in sagemaker_source_instance.get_workunits() + ] + + with open(str(tmp_path / "sagemaker_mces.json"), "w") as f: + json.dump(mce_objects, f, indent=2) + + output = mce_helpers.load_json_file(str(tmp_path / "sagemaker_mces.json")) + + test_resources_dir = pytestconfig.rootpath / "tests/unit/sagemaker" + golden = mce_helpers.load_json_file( + str(test_resources_dir / "sagemaker_mces_golden.json") + ) + mce_helpers.assert_mces_equal(output, golden) diff --git a/metadata-ingestion/tests/unit/test_sagemaker_source_stubs.py b/metadata-ingestion/tests/unit/test_sagemaker_source_stubs.py new file mode 100644 index 00000000000000..a216e504eb58ee --- /dev/null +++ b/metadata-ingestion/tests/unit/test_sagemaker_source_stubs.py @@ -0,0 +1,57 @@ +import datetime + +list_feature_groups_response = { + "FeatureGroupSummaries": [ + { + "FeatureGroupName": "test-1", + "FeatureGroupArn": "arn:aws:sagemaker:us-west-2:123412341234:feature-group/test-1", + "CreationTime": datetime.datetime(2021, 6, 23, 13, 58, 10, 264000), + "FeatureGroupStatus": "Created", + }, + { + "FeatureGroupName": "test", + "FeatureGroupArn": "arn:aws:sagemaker:us-west-2:123412341234:feature-group/test", + "CreationTime": datetime.datetime(2021, 6, 14, 11, 3, 0, 803000), + "FeatureGroupStatus": "Created", + }, + ], +} + +describe_feature_group_response_1 = { + "FeatureGroupArn": "arn:aws:sagemaker:us-west-2:123412341234:feature-group/test", + "FeatureGroupName": "test", + "RecordIdentifierFeatureName": "feature_1", + "EventTimeFeatureName": "feature_3", + "FeatureDefinitions": [ + {"FeatureName": "feature_1", "FeatureType": "String"}, + {"FeatureName": "feature_2", "FeatureType": "Integral"}, + {"FeatureName": "feature_3", "FeatureType": "Fractional"}, + ], + "CreationTime": datetime.datetime( + 2021, + 6, + 14, + 11, + 3, + 0, + 803000, + ), + "OnlineStoreConfig": {"EnableOnlineStore": True}, + "FeatureGroupStatus": "Created", +} +describe_feature_group_response_2 = { + "FeatureGroupArn": "arn:aws:sagemaker:us-west-2:123412341234:feature-group/test-1", + "FeatureGroupName": "test-1", + "RecordIdentifierFeatureName": "id", + "EventTimeFeatureName": "time", + "FeatureDefinitions": [ + {"FeatureName": "name", "FeatureType": "String"}, + {"FeatureName": "id", "FeatureType": "Integral"}, + {"FeatureName": "height", "FeatureType": "Fractional"}, + {"FeatureName": "time", "FeatureType": "String"}, + ], + "CreationTime": datetime.datetime(2021, 6, 23, 13, 58, 10, 264000), + "OnlineStoreConfig": {"EnableOnlineStore": True}, + "FeatureGroupStatus": "Created", + "Description": "First test feature group", +} From cd4d233bb77eba25dd10a8eae096b5a279de9bb8 Mon Sep 17 00:00:00 2001 From: Kevin Hu Date: Wed, 23 Jun 2021 18:03:51 -0700 Subject: [PATCH 11/22] Add sagemaker golden --- .../scripts/update_golden_files.sh | 1 + .../src/datahub/ingestion/source/sagemaker.py | 6 +- .../unit/sagemaker/sagemaker_mces_golden.json | 184 ++++++++++++++++++ .../tests/unit/test_sagemaker_source.py | 28 ++- .../tests/unit/test_sagemaker_source_stubs.py | 4 + 5 files changed, 204 insertions(+), 19 deletions(-) create mode 100644 metadata-ingestion/tests/unit/sagemaker/sagemaker_mces_golden.json diff --git a/metadata-ingestion/scripts/update_golden_files.sh b/metadata-ingestion/scripts/update_golden_files.sh index 68a737b5e7a55d..b9c19b26f87538 100755 --- a/metadata-ingestion/scripts/update_golden_files.sh +++ b/metadata-ingestion/scripts/update_golden_files.sh @@ -15,6 +15,7 @@ cp tmp/test_mongodb_ingest0/mongodb_mces.json tests/integration/mongodb/mongodb_ cp tmp/test_feast_ingest0/feast_mces.json tests/integration/feast/feast_mces_golden.json cp tmp/test_dbt_ingest0/dbt_mces.json tests/integration/dbt/dbt_mces_golden.json cp tmp/test_glue_ingest0/glue_mces.json tests/unit/glue/glue_mces_golden.json +cp tmp/test_glue_ingest0/sagemaker_mces.json tests/unit/sagemaker/sagemaker_mces_golden.json cp tmp/test_lookml_ingest0/lookml_mces.json tests/integration/lookml/expected_output.json cp tmp/test_looker_ingest0/looker_mces.json tests/integration/looker/expected_output.json diff --git a/metadata-ingestion/src/datahub/ingestion/source/sagemaker.py b/metadata-ingestion/src/datahub/ingestion/source/sagemaker.py index 8359ad8a09408e..c50ace924cd5f0 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/sagemaker.py +++ b/metadata-ingestion/src/datahub/ingestion/source/sagemaker.py @@ -79,17 +79,17 @@ def get_feature_group_details(self, feature_group_name: str) -> Dict[str, Any]: FeatureGroupName=feature_group_name ) - next_token = feature_group.get("NextToken") + next_token = feature_group.get("NextToken", "") # paginate over feature group features - while next_token is not None: + while next_token: next_features = self.sagemaker_client.describe_feature_group( FeatureGroupName=feature_group_name, NextToken=next_token ) feature_group["FeatureDefinitions"].append( next_features["FeatureDefinitions"] ) - next_token = feature_group.get("NextToken") + next_token = feature_group.get("NextToken", "") return feature_group diff --git a/metadata-ingestion/tests/unit/sagemaker/sagemaker_mces_golden.json b/metadata-ingestion/tests/unit/sagemaker/sagemaker_mces_golden.json new file mode 100644 index 00000000000000..3e98d8bfc24ca2 --- /dev/null +++ b/metadata-ingestion/tests/unit/sagemaker/sagemaker_mces_golden.json @@ -0,0 +1,184 @@ +[ + { + "auditHeader": null, + "proposedSnapshot": { + "com.linkedin.pegasus2avro.metadata.snapshot.MLFeatureSnapshot": { + "urn": "urn:li:mlFeature:(test-1,name)", + "aspects": [ + { + "com.linkedin.pegasus2avro.ml.metadata.MLFeatureProperties": { + "description": null, + "dataType": "TEXT", + "version": null, + "sources": [] + } + } + ] + } + }, + "proposedDelta": null + }, + { + "auditHeader": null, + "proposedSnapshot": { + "com.linkedin.pegasus2avro.metadata.snapshot.MLPrimaryKeySnapshot": { + "urn": "urn:li:mlPrimaryKey:(test-1,id)", + "aspects": [ + { + "com.linkedin.pegasus2avro.ml.metadata.MLPrimaryKeyProperties": { + "description": null, + "dataType": "ORDINAL", + "version": null, + "sources": [] + } + } + ] + } + }, + "proposedDelta": null + }, + { + "auditHeader": null, + "proposedSnapshot": { + "com.linkedin.pegasus2avro.metadata.snapshot.MLFeatureSnapshot": { + "urn": "urn:li:mlFeature:(test-1,height)", + "aspects": [ + { + "com.linkedin.pegasus2avro.ml.metadata.MLFeatureProperties": { + "description": null, + "dataType": "CONTINUOUS", + "version": null, + "sources": [] + } + } + ] + } + }, + "proposedDelta": null + }, + { + "auditHeader": null, + "proposedSnapshot": { + "com.linkedin.pegasus2avro.metadata.snapshot.MLFeatureSnapshot": { + "urn": "urn:li:mlFeature:(test-1,time)", + "aspects": [ + { + "com.linkedin.pegasus2avro.ml.metadata.MLFeatureProperties": { + "description": null, + "dataType": "TEXT", + "version": null, + "sources": [] + } + } + ] + } + }, + "proposedDelta": null + }, + { + "auditHeader": null, + "proposedSnapshot": { + "com.linkedin.pegasus2avro.metadata.snapshot.MLFeatureTableSnapshot": { + "urn": "urn:li:mlFeatureTable:(urn:li:dataPlatform:sagemaker,test-1)", + "aspects": [ + { + "com.linkedin.pegasus2avro.ml.metadata.MLFeatureTableProperties": { + "description": null, + "mlFeatures": [ + "urn:li:mlFeature:(test-1,name)", + "urn:li:mlFeature:(test-1,id)", + "urn:li:mlFeature:(test-1,height)", + "urn:li:mlFeature:(test-1,time)" + ], + "mlPrimaryKeys": [ + "urn:li:mlPrimaryKey:(test-1,id)" + ] + } + } + ] + } + }, + "proposedDelta": null + }, + { + "auditHeader": null, + "proposedSnapshot": { + "com.linkedin.pegasus2avro.metadata.snapshot.MLPrimaryKeySnapshot": { + "urn": "urn:li:mlPrimaryKey:(test,feature_1)", + "aspects": [ + { + "com.linkedin.pegasus2avro.ml.metadata.MLPrimaryKeyProperties": { + "description": null, + "dataType": "TEXT", + "version": null, + "sources": [] + } + } + ] + } + }, + "proposedDelta": null + }, + { + "auditHeader": null, + "proposedSnapshot": { + "com.linkedin.pegasus2avro.metadata.snapshot.MLFeatureSnapshot": { + "urn": "urn:li:mlFeature:(test,feature_2)", + "aspects": [ + { + "com.linkedin.pegasus2avro.ml.metadata.MLFeatureProperties": { + "description": null, + "dataType": "ORDINAL", + "version": null, + "sources": [] + } + } + ] + } + }, + "proposedDelta": null + }, + { + "auditHeader": null, + "proposedSnapshot": { + "com.linkedin.pegasus2avro.metadata.snapshot.MLFeatureSnapshot": { + "urn": "urn:li:mlFeature:(test,feature_3)", + "aspects": [ + { + "com.linkedin.pegasus2avro.ml.metadata.MLFeatureProperties": { + "description": null, + "dataType": "CONTINUOUS", + "version": null, + "sources": [] + } + } + ] + } + }, + "proposedDelta": null + }, + { + "auditHeader": null, + "proposedSnapshot": { + "com.linkedin.pegasus2avro.metadata.snapshot.MLFeatureTableSnapshot": { + "urn": "urn:li:mlFeatureTable:(urn:li:dataPlatform:sagemaker,test)", + "aspects": [ + { + "com.linkedin.pegasus2avro.ml.metadata.MLFeatureTableProperties": { + "description": null, + "mlFeatures": [ + "urn:li:mlFeature:(test,feature_1)", + "urn:li:mlFeature:(test,feature_2)", + "urn:li:mlFeature:(test,feature_3)" + ], + "mlPrimaryKeys": [ + "urn:li:mlPrimaryKey:(test,feature_1)" + ] + } + } + ] + } + }, + "proposedDelta": null + } +] \ No newline at end of file diff --git a/metadata-ingestion/tests/unit/test_sagemaker_source.py b/metadata-ingestion/tests/unit/test_sagemaker_source.py index 98c68d39302fa5..9f3941a25f868f 100644 --- a/metadata-ingestion/tests/unit/test_sagemaker_source.py +++ b/metadata-ingestion/tests/unit/test_sagemaker_source.py @@ -4,17 +4,7 @@ from freezegun import freeze_time from datahub.ingestion.api.common import PipelineContext -from datahub.ingestion.source.glue import ( - SagemakerSource, - SagemakerSourceConfig, - get_column_type, -) -from datahub.metadata.com.linkedin.pegasus2avro.schema import ( - ArrayTypeClass, - MapTypeClass, - SchemaFieldDataType, - StringTypeClass, -) +from datahub.ingestion.source.sagemaker import SagemakerSource, SagemakerSourceConfig from tests.test_helpers import mce_helpers from tests.unit.test_sagemaker_source_stubs import ( describe_feature_group_response_1, @@ -40,17 +30,23 @@ def test_sagemaker_ingest(tmp_path, pytestconfig): with Stubber(sagemaker_source_instance.sagemaker_client) as sagemaker_stubber: sagemaker_stubber.add_response( - "list_feature_groups", list_feature_groups_response, {} + "list_feature_groups", + list_feature_groups_response, + {}, ) sagemaker_stubber.add_response( "describe_feature_group", - describe_feature_group_response_1, - {"FeatureGroupName": "test"}, + describe_feature_group_response_2, + { + "FeatureGroupName": "test-1", + }, ) sagemaker_stubber.add_response( "describe_feature_group", - describe_feature_group_response_2, - {"FeatureGroupName": "test-1"}, + describe_feature_group_response_1, + { + "FeatureGroupName": "test", + }, ) mce_objects = [ diff --git a/metadata-ingestion/tests/unit/test_sagemaker_source_stubs.py b/metadata-ingestion/tests/unit/test_sagemaker_source_stubs.py index a216e504eb58ee..530eab53ff299c 100644 --- a/metadata-ingestion/tests/unit/test_sagemaker_source_stubs.py +++ b/metadata-ingestion/tests/unit/test_sagemaker_source_stubs.py @@ -15,6 +15,7 @@ "FeatureGroupStatus": "Created", }, ], + "NextToken": "", } describe_feature_group_response_1 = { @@ -38,7 +39,9 @@ ), "OnlineStoreConfig": {"EnableOnlineStore": True}, "FeatureGroupStatus": "Created", + "NextToken": "", } + describe_feature_group_response_2 = { "FeatureGroupArn": "arn:aws:sagemaker:us-west-2:123412341234:feature-group/test-1", "FeatureGroupName": "test-1", @@ -54,4 +57,5 @@ "OnlineStoreConfig": {"EnableOnlineStore": True}, "FeatureGroupStatus": "Created", "Description": "First test feature group", + "NextToken": "", } From 4ff84347557d81e70a06f44d2ece1a8fdd151dc6 Mon Sep 17 00:00:00 2001 From: Kevin Hu Date: Wed, 23 Jun 2021 18:21:06 -0700 Subject: [PATCH 12/22] Clean up golden --- .../scripts/update_golden_files.sh | 24 +++++++++---------- 1 file changed, 12 insertions(+), 12 deletions(-) diff --git a/metadata-ingestion/scripts/update_golden_files.sh b/metadata-ingestion/scripts/update_golden_files.sh index b9c19b26f87538..d182bf310dfcea 100755 --- a/metadata-ingestion/scripts/update_golden_files.sh +++ b/metadata-ingestion/scripts/update_golden_files.sh @@ -6,18 +6,18 @@ set -euxo pipefail pytest --basetemp=tmp || true # Update the golden files. -cp tmp/test_serde_to_json_tests_unit_0/output.json tests/unit/serde/test_serde_large.json -cp tmp/test_serde_to_json_tests_unit_1/output.json tests/unit/serde/test_serde_chart_snapshot.json -cp tmp/test_ldap_ingest0/ldap_mces.json tests/integration/ldap/ldap_mces_golden.json -cp tmp/test_mysql_ingest0/mysql_mces.json tests/integration/mysql/mysql_mces_golden.json -cp tmp/test_mssql_ingest0/mssql_mces.json tests/integration/sql_server/mssql_mces_golden.json -cp tmp/test_mongodb_ingest0/mongodb_mces.json tests/integration/mongodb/mongodb_mces_golden.json -cp tmp/test_feast_ingest0/feast_mces.json tests/integration/feast/feast_mces_golden.json -cp tmp/test_dbt_ingest0/dbt_mces.json tests/integration/dbt/dbt_mces_golden.json -cp tmp/test_glue_ingest0/glue_mces.json tests/unit/glue/glue_mces_golden.json -cp tmp/test_glue_ingest0/sagemaker_mces.json tests/unit/sagemaker/sagemaker_mces_golden.json -cp tmp/test_lookml_ingest0/lookml_mces.json tests/integration/lookml/expected_output.json -cp tmp/test_looker_ingest0/looker_mces.json tests/integration/looker/expected_output.json +cp tmp/test_serde_to_json_tests_unit_0/output.json tests/unit/serde/test_serde_large.json +cp tmp/test_serde_to_json_tests_unit_1/output.json tests/unit/serde/test_serde_chart_snapshot.json +cp tmp/test_ldap_ingest0/ldap_mces.json tests/integration/ldap/ldap_mces_golden.json +cp tmp/test_mysql_ingest0/mysql_mces.json tests/integration/mysql/mysql_mces_golden.json +cp tmp/test_mssql_ingest0/mssql_mces.json tests/integration/sql_server/mssql_mces_golden.json +cp tmp/test_mongodb_ingest0/mongodb_mces.json tests/integration/mongodb/mongodb_mces_golden.json +cp tmp/test_feast_ingest0/feast_mces.json tests/integration/feast/feast_mces_golden.json +cp tmp/test_dbt_ingest0/dbt_mces.json tests/integration/dbt/dbt_mces_golden.json +cp tmp/test_glue_ingest0/glue_mces.json tests/unit/glue/glue_mces_golden.json +cp tmp/test_glue_ingest0/sagemaker_mces.json tests/unit/sagemaker/sagemaker_mces_golden.json +cp tmp/test_lookml_ingest0/lookml_mces.json tests/integration/lookml/expected_output.json +cp tmp/test_looker_ingest0/looker_mces.json tests/integration/looker/expected_output.json # Print success message. set +x From 8971109b79da99248cefd7f23fdefd233bedd2a9 Mon Sep 17 00:00:00 2001 From: Kevin Hu Date: Thu, 24 Jun 2021 08:55:59 -0700 Subject: [PATCH 13/22] Add descriptions and filter primary keys --- metadata-ingestion/src/datahub/ingestion/source/sagemaker.py | 3 +++ .../tests/unit/sagemaker/sagemaker_mces_golden.json | 4 +--- 2 files changed, 4 insertions(+), 3 deletions(-) diff --git a/metadata-ingestion/src/datahub/ingestion/source/sagemaker.py b/metadata-ingestion/src/datahub/ingestion/source/sagemaker.py index c50ace924cd5f0..3b09df4076deb3 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/sagemaker.py +++ b/metadata-ingestion/src/datahub/ingestion/source/sagemaker.py @@ -114,12 +114,15 @@ def get_feature_group_wu( feature_group_snapshot.aspects.append( MLFeatureTablePropertiesClass( + description=feature_group_details.get("Description"), mlFeatures=[ builder.make_ml_feature_urn( feature_group_name, feature["FeatureName"], ) for feature in feature_group_details["FeatureDefinitions"] + if feature["FeatureName"] + != feature_group_details["RecordIdentifierFeatureName"] ], mlPrimaryKeys=[ builder.make_ml_primary_key_urn( diff --git a/metadata-ingestion/tests/unit/sagemaker/sagemaker_mces_golden.json b/metadata-ingestion/tests/unit/sagemaker/sagemaker_mces_golden.json index 3e98d8bfc24ca2..0f8650e1613e10 100644 --- a/metadata-ingestion/tests/unit/sagemaker/sagemaker_mces_golden.json +++ b/metadata-ingestion/tests/unit/sagemaker/sagemaker_mces_golden.json @@ -83,10 +83,9 @@ "aspects": [ { "com.linkedin.pegasus2avro.ml.metadata.MLFeatureTableProperties": { - "description": null, + "description": "First test feature group", "mlFeatures": [ "urn:li:mlFeature:(test-1,name)", - "urn:li:mlFeature:(test-1,id)", "urn:li:mlFeature:(test-1,height)", "urn:li:mlFeature:(test-1,time)" ], @@ -167,7 +166,6 @@ "com.linkedin.pegasus2avro.ml.metadata.MLFeatureTableProperties": { "description": null, "mlFeatures": [ - "urn:li:mlFeature:(test,feature_1)", "urn:li:mlFeature:(test,feature_2)", "urn:li:mlFeature:(test,feature_3)" ], From 9133c85f4c856b02c7616129e2dc18dc6703dbde Mon Sep 17 00:00:00 2001 From: Kevin Hu Date: Thu, 24 Jun 2021 09:08:27 -0700 Subject: [PATCH 14/22] Include custom fields in feature tables --- .../com.linkedin.entity.entities.snapshot.json | 1 + .../src/datahub/ingestion/source/sagemaker.py | 3 +-- .../src/datahub/metadata/schema.avsc | 9 +++++++++ .../src/datahub/metadata/schema_classes.py | 18 ++++++++++++++++++ .../metadata/schemas/MetadataAuditEvent.avsc | 9 +++++++++ .../metadata/schemas/MetadataChangeEvent.avsc | 9 +++++++++ .../ml/metadata/MLFeatureTableProperties.pdl | 3 ++- 7 files changed, 49 insertions(+), 3 deletions(-) diff --git a/gms/api/src/main/snapshot/com.linkedin.entity.entities.snapshot.json b/gms/api/src/main/snapshot/com.linkedin.entity.entities.snapshot.json index 6ce21bf3756032..6ee6ff6970a203 100644 --- a/gms/api/src/main/snapshot/com.linkedin.entity.entities.snapshot.json +++ b/gms/api/src/main/snapshot/com.linkedin.entity.entities.snapshot.json @@ -3397,6 +3397,7 @@ "name" : "MLFeatureTableProperties", "namespace" : "com.linkedin.ml.metadata", "doc" : "Properties associated with a MLFeatureTable", + "include" : [ "com.linkedin.common.CustomProperties" ], "fields" : [ { "name" : "description", "type" : "string", diff --git a/metadata-ingestion/src/datahub/ingestion/source/sagemaker.py b/metadata-ingestion/src/datahub/ingestion/source/sagemaker.py index 3b09df4076deb3..48c65765840230 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/sagemaker.py +++ b/metadata-ingestion/src/datahub/ingestion/source/sagemaker.py @@ -1,9 +1,8 @@ from dataclasses import dataclass from dataclasses import field as dataclass_field -from typing import Any, Dict, Iterable, List, Union +from typing import Any, Dict, Iterable, List import datahub.emitter.mce_builder as builder -from datahub.configuration.common import ConfigModel from datahub.ingestion.api.common import PipelineContext from datahub.ingestion.api.source import Source, SourceReport from datahub.ingestion.source.aws_common import AwsSourceConfig diff --git a/metadata-ingestion/src/datahub/metadata/schema.avsc b/metadata-ingestion/src/datahub/metadata/schema.avsc index 4b4e88c416b69f..105546ea9b223f 100644 --- a/metadata-ingestion/src/datahub/metadata/schema.avsc +++ b/metadata-ingestion/src/datahub/metadata/schema.avsc @@ -4200,6 +4200,15 @@ "name": "MLFeatureTableProperties", "namespace": "com.linkedin.pegasus2avro.ml.metadata", "fields": [ + { + "type": { + "type": "map", + "values": "string" + }, + "name": "customProperties", + "default": {}, + "doc": "Custom property bag." + }, { "type": [ "null", diff --git a/metadata-ingestion/src/datahub/metadata/schema_classes.py b/metadata-ingestion/src/datahub/metadata/schema_classes.py index 23b7b6a4ec30c0..6d2abe7add7393 100644 --- a/metadata-ingestion/src/datahub/metadata/schema_classes.py +++ b/metadata-ingestion/src/datahub/metadata/schema_classes.py @@ -5455,12 +5455,17 @@ class MLFeatureTablePropertiesClass(DictWrapper): RECORD_SCHEMA = get_schema_type("com.linkedin.pegasus2avro.ml.metadata.MLFeatureTableProperties") def __init__(self, + customProperties: Optional[Dict[str, str]]=None, description: Union[None, str]=None, mlFeatures: Union[None, List[str]]=None, mlPrimaryKeys: Union[None, List[str]]=None, ): super().__init__() + if customProperties is None: + self.customProperties = {} + else: + self.customProperties = customProperties self.description = description self.mlFeatures = mlFeatures self.mlPrimaryKeys = mlPrimaryKeys @@ -5473,11 +5478,24 @@ def construct_with_defaults(cls) -> "MLFeatureTablePropertiesClass": return self def _restore_defaults(self) -> None: + self.customProperties = dict() self.description = self.RECORD_SCHEMA.field_map["description"].default self.mlFeatures = self.RECORD_SCHEMA.field_map["mlFeatures"].default self.mlPrimaryKeys = self.RECORD_SCHEMA.field_map["mlPrimaryKeys"].default + @property + def customProperties(self) -> Dict[str, str]: + """Getter: Custom property bag.""" + return self._inner_dict.get('customProperties') # type: ignore + + + @customProperties.setter + def customProperties(self, value: Dict[str, str]) -> None: + """Setter: Custom property bag.""" + self._inner_dict['customProperties'] = value + + @property def description(self) -> Union[None, str]: """Getter: Documentation of the MLFeatureTable""" diff --git a/metadata-ingestion/src/datahub/metadata/schemas/MetadataAuditEvent.avsc b/metadata-ingestion/src/datahub/metadata/schemas/MetadataAuditEvent.avsc index b547ea471f4598..80d3c3f93c3e06 100644 --- a/metadata-ingestion/src/datahub/metadata/schemas/MetadataAuditEvent.avsc +++ b/metadata-ingestion/src/datahub/metadata/schemas/MetadataAuditEvent.avsc @@ -4143,6 +4143,15 @@ "namespace": "com.linkedin.pegasus2avro.ml.metadata", "doc": "Properties associated with a MLFeatureTable", "fields": [ + { + "name": "customProperties", + "type": { + "type": "map", + "values": "string" + }, + "doc": "Custom property bag.", + "default": {} + }, { "name": "description", "type": [ diff --git a/metadata-ingestion/src/datahub/metadata/schemas/MetadataChangeEvent.avsc b/metadata-ingestion/src/datahub/metadata/schemas/MetadataChangeEvent.avsc index 49b407ded70d24..ae3452bd9d38bd 100644 --- a/metadata-ingestion/src/datahub/metadata/schemas/MetadataChangeEvent.avsc +++ b/metadata-ingestion/src/datahub/metadata/schemas/MetadataChangeEvent.avsc @@ -4142,6 +4142,15 @@ "namespace": "com.linkedin.pegasus2avro.ml.metadata", "doc": "Properties associated with a MLFeatureTable", "fields": [ + { + "name": "customProperties", + "type": { + "type": "map", + "values": "string" + }, + "doc": "Custom property bag.", + "default": {} + }, { "name": "description", "type": [ diff --git a/metadata-models/src/main/pegasus/com/linkedin/ml/metadata/MLFeatureTableProperties.pdl b/metadata-models/src/main/pegasus/com/linkedin/ml/metadata/MLFeatureTableProperties.pdl index 7666e8296cf9be..034d1dbaf11db2 100644 --- a/metadata-models/src/main/pegasus/com/linkedin/ml/metadata/MLFeatureTableProperties.pdl +++ b/metadata-models/src/main/pegasus/com/linkedin/ml/metadata/MLFeatureTableProperties.pdl @@ -1,6 +1,7 @@ namespace com.linkedin.ml.metadata import com.linkedin.common.Urn +import com.linkedin.common.CustomProperties /** * Properties associated with a MLFeatureTable @@ -8,7 +9,7 @@ import com.linkedin.common.Urn @Aspect = { "name": "mlFeatureTableProperties" } -record MLFeatureTableProperties { +record MLFeatureTableProperties includes CustomProperties { /** * Documentation of the MLFeatureTable From 777f7df214133ead3bb71da2334847394b1e1a8f Mon Sep 17 00:00:00 2001 From: Kevin Hu Date: Thu, 24 Jun 2021 09:16:37 -0700 Subject: [PATCH 15/22] Add sagemaker custom properties --- .../haskell/bin/dataset-hive-generator.py | 4 ++-- .../src/datahub/ingestion/source/sagemaker.py | 5 +++++ .../tests/unit/sagemaker/sagemaker_mces_golden.json | 10 ++++++++++ 3 files changed, 17 insertions(+), 2 deletions(-) diff --git a/contrib/metadata-ingestion/haskell/bin/dataset-hive-generator.py b/contrib/metadata-ingestion/haskell/bin/dataset-hive-generator.py index 9da5ae198017a4..0671d24632e351 100755 --- a/contrib/metadata-ingestion/haskell/bin/dataset-hive-generator.py +++ b/contrib/metadata-ingestion/haskell/bin/dataset-hive-generator.py @@ -3,10 +3,10 @@ import sys import time -from pyhive import hive -from TCLIService.ttypes import TOperationState import simplejson as json +from pyhive import hive +from TCLIService.ttypes import TOperationState HIVESTORE='localhost' diff --git a/metadata-ingestion/src/datahub/ingestion/source/sagemaker.py b/metadata-ingestion/src/datahub/ingestion/source/sagemaker.py index 48c65765840230..1e56d44a2e384a 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/sagemaker.py +++ b/metadata-ingestion/src/datahub/ingestion/source/sagemaker.py @@ -129,6 +129,11 @@ def get_feature_group_wu( feature_group_details["RecordIdentifierFeatureName"], ) ], + customProperties={ + "arn": feature_group_details["FeatureGroupArn"], + "creation_time": str(feature_group_details["CreationTime"]), + "status": feature_group_details["FeatureGroupStatus"], + }, ) ) diff --git a/metadata-ingestion/tests/unit/sagemaker/sagemaker_mces_golden.json b/metadata-ingestion/tests/unit/sagemaker/sagemaker_mces_golden.json index 0f8650e1613e10..906719ff9b46d5 100644 --- a/metadata-ingestion/tests/unit/sagemaker/sagemaker_mces_golden.json +++ b/metadata-ingestion/tests/unit/sagemaker/sagemaker_mces_golden.json @@ -83,6 +83,11 @@ "aspects": [ { "com.linkedin.pegasus2avro.ml.metadata.MLFeatureTableProperties": { + "customProperties": { + "arn": "arn:aws:sagemaker:us-west-2:123412341234:feature-group/test-1", + "creation_time": "2021-06-23 13:58:10.264000", + "status": "Created" + }, "description": "First test feature group", "mlFeatures": [ "urn:li:mlFeature:(test-1,name)", @@ -164,6 +169,11 @@ "aspects": [ { "com.linkedin.pegasus2avro.ml.metadata.MLFeatureTableProperties": { + "customProperties": { + "arn": "arn:aws:sagemaker:us-west-2:123412341234:feature-group/test", + "creation_time": "2021-06-14 11:03:00.803000", + "status": "Created" + }, "description": null, "mlFeatures": [ "urn:li:mlFeature:(test,feature_2)", From 149584a7549e9e896ad72745f25dae15f24bd22b Mon Sep 17 00:00:00 2001 From: Kevin Hu Date: Thu, 24 Jun 2021 09:25:31 -0700 Subject: [PATCH 16/22] Cleanup --- .../haskell/bin/dataset-hive-generator.py | 4 ++-- .../src/datahub/ingestion/source/sagemaker.py | 16 ++++++++-------- .../tests/unit/test_sagemaker_source.py | 8 ++++---- 3 files changed, 14 insertions(+), 14 deletions(-) diff --git a/contrib/metadata-ingestion/haskell/bin/dataset-hive-generator.py b/contrib/metadata-ingestion/haskell/bin/dataset-hive-generator.py index 0671d24632e351..9da5ae198017a4 100755 --- a/contrib/metadata-ingestion/haskell/bin/dataset-hive-generator.py +++ b/contrib/metadata-ingestion/haskell/bin/dataset-hive-generator.py @@ -3,11 +3,11 @@ import sys import time - -import simplejson as json from pyhive import hive from TCLIService.ttypes import TOperationState +import simplejson as json + HIVESTORE='localhost' AVROLOADPATH = '../../metadata-events/mxe-schemas/src/renamed/avro/com/linkedin/mxe/MetadataChangeEvent.avsc' diff --git a/metadata-ingestion/src/datahub/ingestion/source/sagemaker.py b/metadata-ingestion/src/datahub/ingestion/source/sagemaker.py index 1e56d44a2e384a..1c97de8bb1ad7a 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/sagemaker.py +++ b/metadata-ingestion/src/datahub/ingestion/source/sagemaker.py @@ -74,6 +74,7 @@ def get_feature_group_details(self, feature_group_name: str) -> Dict[str, Any]: Get details of a feature group (including list of component features). """ + # see https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/sagemaker.html#SageMaker.Client.describe_feature_group feature_group = self.sagemaker_client.describe_feature_group( FeatureGroupName=feature_group_name ) @@ -114,6 +115,7 @@ def get_feature_group_wu( feature_group_snapshot.aspects.append( MLFeatureTablePropertiesClass( description=feature_group_details.get("Description"), + # non-primary key features mlFeatures=[ builder.make_ml_feature_urn( feature_group_name, @@ -129,6 +131,7 @@ def get_feature_group_wu( feature_group_details["RecordIdentifierFeatureName"], ) ], + # additional metadata customProperties={ "arn": feature_group_details["FeatureGroupArn"], "creation_time": str(feature_group_details["CreationTime"]), @@ -240,10 +243,6 @@ def get_feature_wu( # make the MCE and workunit mce = MetadataChangeEvent(proposedSnapshot=primary_key_snapshot) - return MetadataWorkUnit( - id=f'{feature_group_details["FeatureGroupName"]}-{feature["FeatureName"]}', - mce=mce, - ) else: # create snapshot instance for the feature feature_snapshot: MLFeatureSnapshot = MLFeatureSnapshot( @@ -263,10 +262,11 @@ def get_feature_wu( # make the MCE and workunit mce = MetadataChangeEvent(proposedSnapshot=feature_snapshot) - return MetadataWorkUnit( - id=f'{feature_group_details["FeatureGroupName"]}-{feature["FeatureName"]}', - mce=mce, - ) + + return MetadataWorkUnit( + id=f'{feature_group_details["FeatureGroupName"]}-{feature["FeatureName"]}', + mce=mce, + ) def get_workunits(self) -> Iterable[MetadataWorkUnit]: diff --git a/metadata-ingestion/tests/unit/test_sagemaker_source.py b/metadata-ingestion/tests/unit/test_sagemaker_source.py index 9f3941a25f868f..d4a4b31ca208d4 100644 --- a/metadata-ingestion/tests/unit/test_sagemaker_source.py +++ b/metadata-ingestion/tests/unit/test_sagemaker_source.py @@ -36,16 +36,16 @@ def test_sagemaker_ingest(tmp_path, pytestconfig): ) sagemaker_stubber.add_response( "describe_feature_group", - describe_feature_group_response_2, + describe_feature_group_response_1, { - "FeatureGroupName": "test-1", + "FeatureGroupName": "test", }, ) sagemaker_stubber.add_response( "describe_feature_group", - describe_feature_group_response_1, + describe_feature_group_response_2, { - "FeatureGroupName": "test", + "FeatureGroupName": "test-1", }, ) From fb70c0b08e78abe4a8a494b39f712886ece9cdaf Mon Sep 17 00:00:00 2001 From: Kevin Hu Date: Thu, 24 Jun 2021 09:31:06 -0700 Subject: [PATCH 17/22] Fix old references --- metadata-ingestion/README.md | 2 +- .../datahub/ingestion/source/aws_common.py | 2 +- .../src/datahub/ingestion/source/sagemaker.py | 39 ++++++++++--------- .../tests/unit/test_sagemaker_source.py | 10 ++--- 4 files changed, 28 insertions(+), 25 deletions(-) diff --git a/metadata-ingestion/README.md b/metadata-ingestion/README.md index 9266e89f88edde..5b6e8ec0298ed8 100644 --- a/metadata-ingestion/README.md +++ b/metadata-ingestion/README.md @@ -353,7 +353,7 @@ Extracts: ```yml source: - type: glue + type: sagemaker config: aws_region: # aws_region_name, i.e. "eu-west-1" env: # environment for the DatasetSnapshot URN, one of "DEV", "EI", "PROD" or "CORP". Defaults to "PROD". diff --git a/metadata-ingestion/src/datahub/ingestion/source/aws_common.py b/metadata-ingestion/src/datahub/ingestion/source/aws_common.py index 3b52d83882741d..1d77e72f62df4d 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/aws_common.py +++ b/metadata-ingestion/src/datahub/ingestion/source/aws_common.py @@ -20,7 +20,7 @@ def assume_role( ) assumed_role_object = sts_client.assume_role( - RoleArn=role_arn, RoleSessionName="DatahubIngestionSourceGlue" + RoleArn=role_arn, RoleSessionName="DatahubIngestionSource" ) return assumed_role_object["Credentials"] diff --git a/metadata-ingestion/src/datahub/ingestion/source/sagemaker.py b/metadata-ingestion/src/datahub/ingestion/source/sagemaker.py index 1c97de8bb1ad7a..d1536d21180bf1 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/sagemaker.py +++ b/metadata-ingestion/src/datahub/ingestion/source/sagemaker.py @@ -200,25 +200,28 @@ def get_feature_wu( ) ) - glue_database = feature_group_details["OfflineStoreConfig"][ - "DataCatalogConfig" - ]["Database"] - glue_table = feature_group_details["OfflineStoreConfig"][ - "DataCatalogConfig" - ]["TableName"] - - full_table_name = f"{glue_database}.{glue_table}" - - self.report.report_warning( - full_table_name, - f"""Note: table {full_table_name} is an AWS Glue object. - To view full table metadata, run Glue ingestion - (see https://datahubproject.io/docs/metadata-ingestion/#aws-glue-glue)""", - ) + if "DataCatalogConfig" in feature_group_details["OfflineStoreConfig"]: + + # if Glue catalog associated with offline store + glue_database = feature_group_details["OfflineStoreConfig"][ + "DataCatalogConfig" + ]["Database"] + glue_table = feature_group_details["OfflineStoreConfig"][ + "DataCatalogConfig" + ]["TableName"] + + full_table_name = f"{glue_database}.{glue_table}" + + self.report.report_warning( + full_table_name, + f"""Note: table {full_table_name} is an AWS Glue object. + To view full table metadata, run Glue ingestion + (see https://datahubproject.io/docs/metadata-ingestion/#aws-glue-glue)""", + ) - feature_sources.append( - f"urn:li:dataset:(urn:li:dataPlatform:glue,{full_table_name},{self.source_config.env})" - ) + feature_sources.append( + f"urn:li:dataset:(urn:li:dataPlatform:glue,{full_table_name},{self.source_config.env})" + ) # note that there's also an OnlineStoreConfig field, but this # lacks enough metadata to create a dataset diff --git a/metadata-ingestion/tests/unit/test_sagemaker_source.py b/metadata-ingestion/tests/unit/test_sagemaker_source.py index d4a4b31ca208d4..f316715ca1166d 100644 --- a/metadata-ingestion/tests/unit/test_sagemaker_source.py +++ b/metadata-ingestion/tests/unit/test_sagemaker_source.py @@ -17,7 +17,7 @@ def sagemaker_source() -> SagemakerSource: return SagemakerSource( - ctx=PipelineContext(run_id="glue-source-test"), + ctx=PipelineContext(run_id="sagemaker-source-test"), config=SagemakerSourceConfig(aws_region="us-west-2"), ) @@ -36,16 +36,16 @@ def test_sagemaker_ingest(tmp_path, pytestconfig): ) sagemaker_stubber.add_response( "describe_feature_group", - describe_feature_group_response_1, + describe_feature_group_response_2, { - "FeatureGroupName": "test", + "FeatureGroupName": "test-1", }, ) sagemaker_stubber.add_response( "describe_feature_group", - describe_feature_group_response_2, + describe_feature_group_response_1, { - "FeatureGroupName": "test-1", + "FeatureGroupName": "test", }, ) From 1c248c38b17f63c5ecfa9d1921aad65bc92af520 Mon Sep 17 00:00:00 2001 From: Kevin Hu Date: Thu, 24 Jun 2021 09:54:07 -0700 Subject: [PATCH 18/22] Add test stub with offline store --- .../src/datahub/ingestion/source/sagemaker.py | 1 + .../unit/sagemaker/sagemaker_mces_golden.json | 94 +++++++++++++++++++ .../tests/unit/test_sagemaker_source.py | 10 +- .../tests/unit/test_sagemaker_source_stubs.py | 68 ++++++++++---- 4 files changed, 156 insertions(+), 17 deletions(-) diff --git a/metadata-ingestion/src/datahub/ingestion/source/sagemaker.py b/metadata-ingestion/src/datahub/ingestion/source/sagemaker.py index d1536d21180bf1..0aaba70f50bbdb 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/sagemaker.py +++ b/metadata-ingestion/src/datahub/ingestion/source/sagemaker.py @@ -79,6 +79,7 @@ def get_feature_group_details(self, feature_group_name: str) -> Dict[str, Any]: FeatureGroupName=feature_group_name ) + # use falsy fallback since AWS stubs require this to be a string in tests next_token = feature_group.get("NextToken", "") # paginate over feature group features diff --git a/metadata-ingestion/tests/unit/sagemaker/sagemaker_mces_golden.json b/metadata-ingestion/tests/unit/sagemaker/sagemaker_mces_golden.json index 906719ff9b46d5..06ec895eb219dd 100644 --- a/metadata-ingestion/tests/unit/sagemaker/sagemaker_mces_golden.json +++ b/metadata-ingestion/tests/unit/sagemaker/sagemaker_mces_golden.json @@ -1,4 +1,98 @@ [ + { + "auditHeader": null, + "proposedSnapshot": { + "com.linkedin.pegasus2avro.metadata.snapshot.MLFeatureSnapshot": { + "urn": "urn:li:mlFeature:(test-2,some-feature-1)", + "aspects": [ + { + "com.linkedin.pegasus2avro.ml.metadata.MLFeatureProperties": { + "description": null, + "dataType": "TEXT", + "version": null, + "sources": [ + "urn:li:dataset:(urn:li:dataPlatform:s3,datahub-sagemaker-outputs,PROD)", + "urn:li:dataset:(urn:li:dataPlatform:glue,sagemaker_featurestore.test-2-123412341234,PROD)" + ] + } + } + ] + } + }, + "proposedDelta": null + }, + { + "auditHeader": null, + "proposedSnapshot": { + "com.linkedin.pegasus2avro.metadata.snapshot.MLPrimaryKeySnapshot": { + "urn": "urn:li:mlPrimaryKey:(test-2,some-feature-2)", + "aspects": [ + { + "com.linkedin.pegasus2avro.ml.metadata.MLPrimaryKeyProperties": { + "description": null, + "dataType": "ORDINAL", + "version": null, + "sources": [ + "urn:li:dataset:(urn:li:dataPlatform:s3,datahub-sagemaker-outputs,PROD)", + "urn:li:dataset:(urn:li:dataPlatform:glue,sagemaker_featurestore.test-2-123412341234,PROD)" + ] + } + } + ] + } + }, + "proposedDelta": null + }, + { + "auditHeader": null, + "proposedSnapshot": { + "com.linkedin.pegasus2avro.metadata.snapshot.MLFeatureSnapshot": { + "urn": "urn:li:mlFeature:(test-2,some-feature-3)", + "aspects": [ + { + "com.linkedin.pegasus2avro.ml.metadata.MLFeatureProperties": { + "description": null, + "dataType": "CONTINUOUS", + "version": null, + "sources": [ + "urn:li:dataset:(urn:li:dataPlatform:s3,datahub-sagemaker-outputs,PROD)", + "urn:li:dataset:(urn:li:dataPlatform:glue,sagemaker_featurestore.test-2-123412341234,PROD)" + ] + } + } + ] + } + }, + "proposedDelta": null + }, + { + "auditHeader": null, + "proposedSnapshot": { + "com.linkedin.pegasus2avro.metadata.snapshot.MLFeatureTableSnapshot": { + "urn": "urn:li:mlFeatureTable:(urn:li:dataPlatform:sagemaker,test-2)", + "aspects": [ + { + "com.linkedin.pegasus2avro.ml.metadata.MLFeatureTableProperties": { + "customProperties": { + "arn": "arn:aws:sagemaker:us-west-2:123412341234:feature-group/test-2", + "creation_time": "2021-06-24 09:48:37.035000", + "status": "Created" + }, + "description": "Yet another test feature group", + "mlFeatures": [ + "urn:li:mlFeature:(test-2,some-feature-1)", + "urn:li:mlFeature:(test-2,some-feature-3)" + ], + "mlPrimaryKeys": [ + "urn:li:mlPrimaryKey:(test-2,some-feature-2)" + ] + } + } + ] + } + }, + "proposedDelta": null + }, { "auditHeader": null, "proposedSnapshot": { diff --git a/metadata-ingestion/tests/unit/test_sagemaker_source.py b/metadata-ingestion/tests/unit/test_sagemaker_source.py index f316715ca1166d..ee2f23441f527c 100644 --- a/metadata-ingestion/tests/unit/test_sagemaker_source.py +++ b/metadata-ingestion/tests/unit/test_sagemaker_source.py @@ -9,6 +9,7 @@ from tests.unit.test_sagemaker_source_stubs import ( describe_feature_group_response_1, describe_feature_group_response_2, + describe_feature_group_response_3, list_feature_groups_response, ) @@ -34,6 +35,13 @@ def test_sagemaker_ingest(tmp_path, pytestconfig): list_feature_groups_response, {}, ) + sagemaker_stubber.add_response( + "describe_feature_group", + describe_feature_group_response_1, + { + "FeatureGroupName": "test-2", + }, + ) sagemaker_stubber.add_response( "describe_feature_group", describe_feature_group_response_2, @@ -43,7 +51,7 @@ def test_sagemaker_ingest(tmp_path, pytestconfig): ) sagemaker_stubber.add_response( "describe_feature_group", - describe_feature_group_response_1, + describe_feature_group_response_3, { "FeatureGroupName": "test", }, diff --git a/metadata-ingestion/tests/unit/test_sagemaker_source_stubs.py b/metadata-ingestion/tests/unit/test_sagemaker_source_stubs.py index 530eab53ff299c..aebf0e524acc15 100644 --- a/metadata-ingestion/tests/unit/test_sagemaker_source_stubs.py +++ b/metadata-ingestion/tests/unit/test_sagemaker_source_stubs.py @@ -2,6 +2,12 @@ list_feature_groups_response = { "FeatureGroupSummaries": [ + { + "FeatureGroupName": "test-2", + "FeatureGroupArn": "arn:aws:sagemaker:us-west-2:123412341234:feature-group/test-2", + "CreationTime": datetime.datetime(2021, 6, 24, 9, 48, 37, 35000), + "FeatureGroupStatus": "Created", + }, { "FeatureGroupName": "test-1", "FeatureGroupArn": "arn:aws:sagemaker:us-west-2:123412341234:feature-group/test-1", @@ -19,26 +25,32 @@ } describe_feature_group_response_1 = { - "FeatureGroupArn": "arn:aws:sagemaker:us-west-2:123412341234:feature-group/test", - "FeatureGroupName": "test", - "RecordIdentifierFeatureName": "feature_1", - "EventTimeFeatureName": "feature_3", + "FeatureGroupArn": "arn:aws:sagemaker:us-west-2:123412341234:feature-group/test-2", + "FeatureGroupName": "test-2", + "RecordIdentifierFeatureName": "some-feature-2", + "EventTimeFeatureName": "some-feature-3", "FeatureDefinitions": [ - {"FeatureName": "feature_1", "FeatureType": "String"}, - {"FeatureName": "feature_2", "FeatureType": "Integral"}, - {"FeatureName": "feature_3", "FeatureType": "Fractional"}, + {"FeatureName": "some-feature-1", "FeatureType": "String"}, + {"FeatureName": "some-feature-2", "FeatureType": "Integral"}, + {"FeatureName": "some-feature-3", "FeatureType": "Fractional"}, ], - "CreationTime": datetime.datetime( - 2021, - 6, - 14, - 11, - 3, - 0, - 803000, - ), + "CreationTime": datetime.datetime(2021, 6, 24, 9, 48, 37, 35000), "OnlineStoreConfig": {"EnableOnlineStore": True}, + "OfflineStoreConfig": { + "S3StorageConfig": { + "S3Uri": "s3://datahub-sagemaker-outputs", + "ResolvedOutputS3Uri": "s3://datahub-sagemaker-outputs/123412341234/sagemaker/us-west-2/offline-store/test-2-123412341234/data", + }, + "DisableGlueTableCreation": False, + "DataCatalogConfig": { + "TableName": "test-2-123412341234", + "Catalog": "AwsDataCatalog", + "Database": "sagemaker_featurestore", + }, + }, + "RoleArn": "arn:aws:iam::123412341234:role/service-role/AmazonSageMaker-ExecutionRole-20210614T104201", "FeatureGroupStatus": "Created", + "Description": "Yet another test feature group", "NextToken": "", } @@ -59,3 +71,27 @@ "Description": "First test feature group", "NextToken": "", } + +describe_feature_group_response_3 = { + "FeatureGroupArn": "arn:aws:sagemaker:us-west-2:123412341234:feature-group/test", + "FeatureGroupName": "test", + "RecordIdentifierFeatureName": "feature_1", + "EventTimeFeatureName": "feature_3", + "FeatureDefinitions": [ + {"FeatureName": "feature_1", "FeatureType": "String"}, + {"FeatureName": "feature_2", "FeatureType": "Integral"}, + {"FeatureName": "feature_3", "FeatureType": "Fractional"}, + ], + "CreationTime": datetime.datetime( + 2021, + 6, + 14, + 11, + 3, + 0, + 803000, + ), + "OnlineStoreConfig": {"EnableOnlineStore": True}, + "FeatureGroupStatus": "Created", + "NextToken": "", +} From 3a4012e42df41e948df22629caea1876357b3d67 Mon Sep 17 00:00:00 2001 From: Kevin Hu Date: Thu, 24 Jun 2021 10:16:13 -0700 Subject: [PATCH 19/22] Update custom properties --- .../tests/integration/feast/feast_mces_golden.json | 3 +++ 1 file changed, 3 insertions(+) diff --git a/metadata-ingestion/tests/integration/feast/feast_mces_golden.json b/metadata-ingestion/tests/integration/feast/feast_mces_golden.json index 2956bdd9dcdac0..c253da63b49749 100644 --- a/metadata-ingestion/tests/integration/feast/feast_mces_golden.json +++ b/metadata-ingestion/tests/integration/feast/feast_mces_golden.json @@ -343,6 +343,7 @@ "aspects": [ { "com.linkedin.pegasus2avro.ml.metadata.MLFeatureTableProperties": { + "customProperties": {}, "description": null, "mlFeatures": [ "urn:li:mlFeature:(test_feature_table_all_feature_dtypes,test_BOOL_LIST_feature)", @@ -421,6 +422,7 @@ "aspects": [ { "com.linkedin.pegasus2avro.ml.metadata.MLFeatureTableProperties": { + "customProperties": {}, "description": null, "mlFeatures": [ "urn:li:mlFeature:(test_feature_table_no_labels,test_BYTES_feature)" @@ -485,6 +487,7 @@ "aspects": [ { "com.linkedin.pegasus2avro.ml.metadata.MLFeatureTableProperties": { + "customProperties": {}, "description": null, "mlFeatures": [ "urn:li:mlFeature:(test_feature_table_single_feature,test_BYTES_feature)" From 768393e1c43a0179b8bc928da577334ae9d569fb Mon Sep 17 00:00:00 2001 From: Kevin Hu Date: Fri, 25 Jun 2021 10:40:01 -0700 Subject: [PATCH 20/22] Refactor --- metadata-ingestion/README.md | 57 ++++++++++--------- .../examples/recipes/sagemaker_to_datahub.yml | 1 + .../src/datahub/ingestion/source/sagemaker.py | 20 ++++--- .../{ => sagemaker}/test_sagemaker_source.py | 0 4 files changed, 43 insertions(+), 35 deletions(-) rename metadata-ingestion/tests/unit/{ => sagemaker}/test_sagemaker_source.py (100%) diff --git a/metadata-ingestion/README.md b/metadata-ingestion/README.md index 175f76976474a9..fbd77aad3df95d 100644 --- a/metadata-ingestion/README.md +++ b/metadata-ingestion/README.md @@ -31,34 +31,35 @@ If you run into an error, try checking the [_common setup issues_](./developing. We use a plugin architecture so that you can install only the dependencies you actually need. -| Plugin Name | Install Command | Provides | -| -------------- | ---------------------------------------------------------- | ----------------------------------- | -| file | _included by default_ | File source and sink | -| console | _included by default_ | Console sink | -| athena | `pip install 'acryl-datahub[athena]'` | AWS Athena source | -| bigquery | `pip install 'acryl-datahub[bigquery]'` | BigQuery source | -| bigquery-usage | `pip install 'acryl-datahub[bigquery-usage]'` | BigQuery usage statistics source | -| feast | `pip install 'acryl-datahub[feast]'` | Feast source | -| glue | `pip install 'acryl-datahub[glue]'` | AWS Glue source | -| hive | `pip install 'acryl-datahub[hive]'` | Hive source | -| mssql | `pip install 'acryl-datahub[mssql]'` | SQL Server source | -| mysql | `pip install 'acryl-datahub[mysql]'` | MySQL source | -| oracle | `pip install 'acryl-datahub[oracle]'` | Oracle source | -| postgres | `pip install 'acryl-datahub[postgres]'` | Postgres source | -| redshift | `pip install 'acryl-datahub[redshift]'` | Redshift source | -| sagemaker | `pip install 'acryl-datahub[sagemaker]'` | AWS SageMaker source | -| sqlalchemy | `pip install 'acryl-datahub[sqlalchemy]'` | Generic SQLAlchemy source | -| snowflake | `pip install 'acryl-datahub[snowflake]'` | Snowflake source | -| superset | `pip install 'acryl-datahub[superset]'` | Superset source | -| mongodb | `pip install 'acryl-datahub[mongodb]'` | MongoDB source | -| ldap | `pip install 'acryl-datahub[ldap]'` ([extra requirements]) | LDAP source | -| looker | `pip install 'acryl-datahub[looker]'` | Looker source | -| lookml | `pip install 'acryl-datahub[lookml]'` | LookML source, requires Python 3.7+ | -| kafka | `pip install 'acryl-datahub[kafka]'` | Kafka source | -| druid | `pip install 'acryl-datahub[druid]'` | Druid Source | -| dbt | _no additional dependencies_ | dbt source | -| datahub-rest | `pip install 'acryl-datahub[datahub-rest]'` | DataHub sink over REST API | -| datahub-kafka | `pip install 'acryl-datahub[datahub-kafka]'` | DataHub sink over Kafka | +| Plugin Name | Install Command | Provides | +| --------------- | ---------------------------------------------------------- | ----------------------------------- | +| file | _included by default_ | File source and sink | +| console | _included by default_ | Console sink | +| athena | `pip install 'acryl-datahub[athena]'` | AWS Athena source | +| bigquery | `pip install 'acryl-datahub[bigquery]'` | BigQuery source | +| bigquery-usage | `pip install 'acryl-datahub[bigquery-usage]'` | BigQuery usage statistics source | +| feast | `pip install 'acryl-datahub[feast]'` | Feast source | +| glue | `pip install 'acryl-datahub[glue]'` | AWS Glue source | +| hive | `pip install 'acryl-datahub[hive]'` | Hive source | +| mssql | `pip install 'acryl-datahub[mssql]'` | SQL Server source | +| mysql | `pip install 'acryl-datahub[mysql]'` | MySQL source | +| oracle | `pip install 'acryl-datahub[oracle]'` | Oracle source | +| postgres | `pip install 'acryl-datahub[postgres]'` | Postgres source | +| redshift | `pip install 'acryl-datahub[redshift]'` | Redshift source | +| sagemaker | `pip install 'acryl-datahub[sagemaker]'` | AWS SageMaker source | +| sqlalchemy | `pip install 'acryl-datahub[sqlalchemy]'` | Generic SQLAlchemy source | +| snowflake | `pip install 'acryl-datahub[snowflake]'` | Snowflake source | +| snowflake-usage | `pip install 'acryl-datahub[snowflake-usage]'` | Snowflake usage statistics source | +| superset | `pip install 'acryl-datahub[superset]'` | Superset source | +| mongodb | `pip install 'acryl-datahub[mongodb]'` | MongoDB source | +| ldap | `pip install 'acryl-datahub[ldap]'` ([extra requirements]) | LDAP source | +| looker | `pip install 'acryl-datahub[looker]'` | Looker source | +| lookml | `pip install 'acryl-datahub[lookml]'` | LookML source, requires Python 3.7+ | +| kafka | `pip install 'acryl-datahub[kafka]'` | Kafka source | +| druid | `pip install 'acryl-datahub[druid]'` | Druid Source | +| dbt | _no additional dependencies_ | dbt source | +| datahub-rest | `pip install 'acryl-datahub[datahub-rest]'` | DataHub sink over REST API | +| datahub-kafka | `pip install 'acryl-datahub[datahub-kafka]'` | DataHub sink over Kafka | These plugins can be mixed and matched as desired. For example: diff --git a/metadata-ingestion/examples/recipes/sagemaker_to_datahub.yml b/metadata-ingestion/examples/recipes/sagemaker_to_datahub.yml index 3f94d2c7ba901f..e1914d38bf31f0 100644 --- a/metadata-ingestion/examples/recipes/sagemaker_to_datahub.yml +++ b/metadata-ingestion/examples/recipes/sagemaker_to_datahub.yml @@ -1,3 +1,4 @@ +# in this example, AWS creds are detected automatically – see the README for more details source: type: sagemaker config: diff --git a/metadata-ingestion/src/datahub/ingestion/source/sagemaker.py b/metadata-ingestion/src/datahub/ingestion/source/sagemaker.py index 0aaba70f50bbdb..8c0e389a865909 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/sagemaker.py +++ b/metadata-ingestion/src/datahub/ingestion/source/sagemaker.py @@ -145,13 +145,15 @@ def get_feature_group_wu( mce = MetadataChangeEvent(proposedSnapshot=feature_group_snapshot) return MetadataWorkUnit(id=feature_group_name, mce=mce) + field_type_mappings = { + "String": MLFeatureDataType.TEXT, + "Integral": MLFeatureDataType.ORDINAL, + "Fractional": MLFeatureDataType.CONTINUOUS, + } + def get_feature_type(self, aws_type: str, feature_name: str) -> str: - mapped_type = { - "String": MLFeatureDataType.TEXT, - "Integral": MLFeatureDataType.ORDINAL, - "Fractional": MLFeatureDataType.CONTINUOUS, - }.get(aws_type) + mapped_type = self.field_type_mappings.get(aws_type) if mapped_type is None: self.report.report_warning( @@ -283,9 +285,13 @@ def get_workunits(self) -> Iterable[MetadataWorkUnit]: ) for feature in feature_group_details["FeatureDefinitions"]: - yield self.get_feature_wu(feature_group_details, feature) + wu = self.get_feature_wu(feature_group_details, feature) + self.report.report_workunit(wu) + yield wu - yield self.get_feature_group_wu(feature_group_details) + wu = self.get_feature_group_wu(feature_group_details) + self.report.report_workunit(wu) + yield wu def get_report(self): return self.report diff --git a/metadata-ingestion/tests/unit/test_sagemaker_source.py b/metadata-ingestion/tests/unit/sagemaker/test_sagemaker_source.py similarity index 100% rename from metadata-ingestion/tests/unit/test_sagemaker_source.py rename to metadata-ingestion/tests/unit/sagemaker/test_sagemaker_source.py From 63841e4faf4036d9680f49864a2236a4d8e99a82 Mon Sep 17 00:00:00 2001 From: Kevin Hu Date: Mon, 28 Jun 2021 14:26:23 -0700 Subject: [PATCH 21/22] Update comments --- .../src/datahub/ingestion/source/aws_common.py | 8 ++++++++ .../src/datahub/ingestion/source/sagemaker.py | 1 + 2 files changed, 9 insertions(+) diff --git a/metadata-ingestion/src/datahub/ingestion/source/aws_common.py b/metadata-ingestion/src/datahub/ingestion/source/aws_common.py index 1d77e72f62df4d..03a0c44699fb1f 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/aws_common.py +++ b/metadata-ingestion/src/datahub/ingestion/source/aws_common.py @@ -26,6 +26,14 @@ def assume_role( class AwsSourceConfig(ConfigModel): + """ + Common AWS credentials config. + + Currently used by: + - Glue source + - SageMaker source + """ + env: str = "PROD" database_pattern: AllowDenyPattern = AllowDenyPattern.allow_all() diff --git a/metadata-ingestion/src/datahub/ingestion/source/sagemaker.py b/metadata-ingestion/src/datahub/ingestion/source/sagemaker.py index 8c0e389a865909..db4e762843106a 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/sagemaker.py +++ b/metadata-ingestion/src/datahub/ingestion/source/sagemaker.py @@ -178,6 +178,7 @@ def get_feature_wu( """ # if the feature acts as the record identifier, then we ingest it as an MLPrimaryKey + # the RecordIdentifierFeatureName is guaranteed to exist as it's required on creation is_record_identifier = ( feature_group_details["RecordIdentifierFeatureName"] == feature["FeatureName"] From 8f96239983a13b68e90c0849fbfa6a2211100351 Mon Sep 17 00:00:00 2001 From: Kevin Hu Date: Tue, 29 Jun 2021 12:51:07 -0700 Subject: [PATCH 22/22] Fix imports order --- metadata-ingestion/src/datahub/ingestion/source/glue.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/metadata-ingestion/src/datahub/ingestion/source/glue.py b/metadata-ingestion/src/datahub/ingestion/source/glue.py index 1dd0450cc4e41d..e22bcfd8a19295 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/glue.py +++ b/metadata-ingestion/src/datahub/ingestion/source/glue.py @@ -9,8 +9,8 @@ from datahub.emitter import mce_builder from datahub.ingestion.api.common import PipelineContext from datahub.ingestion.api.source import Source, SourceReport -from datahub.ingestion.source.aws_common import AwsSourceConfig from datahub.ingestion.api.workunit import MetadataWorkUnit +from datahub.ingestion.source.aws_common import AwsSourceConfig from datahub.metadata.com.linkedin.pegasus2avro.common import AuditStamp, Status from datahub.metadata.com.linkedin.pegasus2avro.metadata.snapshot import DatasetSnapshot from datahub.metadata.com.linkedin.pegasus2avro.mxe import MetadataChangeEvent