Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(ingest): SageMaker feature store ingestion #2758

Merged
merged 28 commits into from
Jun 30, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
ec8a385
Create common AWS config
kevinhu Jun 23, 2021
d3bf612
Init sagemaker
kevinhu Jun 23, 2021
3db0a58
Common AWS dependencies
kevinhu Jun 23, 2021
d282559
Get features in feature group
kevinhu Jun 23, 2021
8f455c9
Ingest feature groups
kevinhu Jun 23, 2021
2bfb882
Add example ingestion config
kevinhu Jun 23, 2021
5bff9f4
Fix feature ingestion
kevinhu Jun 23, 2021
44ecb58
Append Glue data catalog source
kevinhu Jun 23, 2021
d660a9b
Handle primary key ingestion
kevinhu Jun 24, 2021
0259845
Init tests and stubs
kevinhu Jun 24, 2021
cd4d233
Add sagemaker golden
kevinhu Jun 24, 2021
4ff8434
Clean up golden
kevinhu Jun 24, 2021
8971109
Add descriptions and filter primary keys
kevinhu Jun 24, 2021
9133c85
Include custom fields in feature tables
kevinhu Jun 24, 2021
777f7df
Add sagemaker custom properties
kevinhu Jun 24, 2021
3722726
Merge
kevinhu Jun 24, 2021
149584a
Cleanup
kevinhu Jun 24, 2021
fb70c0b
Fix old references
kevinhu Jun 24, 2021
1c248c3
Add test stub with offline store
kevinhu Jun 24, 2021
3a4012e
Update custom properties
kevinhu Jun 24, 2021
3b575b1
Merge
kevinhu Jun 25, 2021
ffcd8cc
Merge branch 'master' of github.com:kevinhu/datahub into sagemaker-fe…
kevinhu Jun 25, 2021
768393e
Refactor
kevinhu Jun 25, 2021
4bc4601
Merge branch 'master' of github.com:kevinhu/datahub into sagemaker-fe…
kevinhu Jun 28, 2021
63841e4
Update comments
kevinhu Jun 28, 2021
30564cc
Merge branch 'master' of github.com:kevinhu/datahub into sagemaker-fe…
kevinhu Jun 29, 2021
0bbe932
Merge
kevinhu Jun 29, 2021
8f96239
Fix imports order
kevinhu Jun 29, 2021
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -3397,6 +3397,7 @@
"name" : "MLFeatureTableProperties",
"namespace" : "com.linkedin.ml.metadata",
"doc" : "Properties associated with a MLFeatureTable",
"include" : [ "com.linkedin.common.CustomProperties" ],
"fields" : [ {
"name" : "description",
"type" : "string",
Expand Down
22 changes: 22 additions & 0 deletions metadata-ingestion/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ We use a plugin architecture so that you can install only the dependencies you a
| oracle | `pip install 'acryl-datahub[oracle]'` | Oracle source |
| postgres | `pip install 'acryl-datahub[postgres]'` | Postgres source |
| redshift | `pip install 'acryl-datahub[redshift]'` | Redshift source |
| sagemaker | `pip install 'acryl-datahub[sagemaker]'` | AWS SageMaker source |
| sqlalchemy | `pip install 'acryl-datahub[sqlalchemy]'` | Generic SQLAlchemy source |
| snowflake | `pip install 'acryl-datahub[snowflake]'` | Snowflake source |
| snowflake-usage | `pip install 'acryl-datahub[snowflake-usage]'` | Snowflake usage statistics source |
Expand Down Expand Up @@ -345,6 +346,27 @@ source:
# options is same as above
```

### AWS SageMaker `sagemaker`

Extracts:

- Feature groups (support for models, jobs, and more coming soon!)

```yml
source:
type: sagemaker
config:
aws_region: # aws_region_name, i.e. "eu-west-1"
env: # environment for the DatasetSnapshot URN, one of "DEV", "EI", "PROD" or "CORP". Defaults to "PROD".

# Credentials. If not specified here, these are picked up according to boto3 rules.
# (see https://boto3.amazonaws.com/v1/documentation/api/latest/guide/credentials.html)
aws_access_key_id: # Optional.
aws_secret_access_key: # Optional.
aws_session_token: # Optional.
aws_role: # Optional (Role chaining supported by using a sorted list).
```

### Snowflake `snowflake`

Extracts:
Expand Down
10 changes: 10 additions & 0 deletions metadata-ingestion/examples/recipes/sagemaker_to_datahub.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
# in this example, AWS creds are detected automatically – see the README for more details
source:
type: sagemaker
config:
aws_region: "us-west-2"
kevinhu marked this conversation as resolved.
Show resolved Hide resolved

sink:
type: "datahub-rest"
config:
server: "http://localhost:8080"
23 changes: 12 additions & 11 deletions metadata-ingestion/scripts/update_golden_files.sh
Original file line number Diff line number Diff line change
Expand Up @@ -6,17 +6,18 @@ set -euxo pipefail
pytest --basetemp=tmp || true

# Update the golden files.
cp tmp/test_serde_to_json_tests_unit_0/output.json tests/unit/serde/test_serde_large.json
cp tmp/test_serde_to_json_tests_unit_1/output.json tests/unit/serde/test_serde_chart_snapshot.json
cp tmp/test_ldap_ingest0/ldap_mces.json tests/integration/ldap/ldap_mces_golden.json
cp tmp/test_mysql_ingest0/mysql_mces.json tests/integration/mysql/mysql_mces_golden.json
cp tmp/test_mssql_ingest0/mssql_mces.json tests/integration/sql_server/mssql_mces_golden.json
cp tmp/test_mongodb_ingest0/mongodb_mces.json tests/integration/mongodb/mongodb_mces_golden.json
cp tmp/test_feast_ingest0/feast_mces.json tests/integration/feast/feast_mces_golden.json
cp tmp/test_dbt_ingest0/dbt_mces.json tests/integration/dbt/dbt_mces_golden.json
cp tmp/test_glue_ingest0/glue_mces.json tests/unit/glue/glue_mces_golden.json
cp tmp/test_lookml_ingest0/lookml_mces.json tests/integration/lookml/expected_output.json
cp tmp/test_looker_ingest0/looker_mces.json tests/integration/looker/expected_output.json
cp tmp/test_serde_to_json_tests_unit_0/output.json tests/unit/serde/test_serde_large.json
cp tmp/test_serde_to_json_tests_unit_1/output.json tests/unit/serde/test_serde_chart_snapshot.json
cp tmp/test_ldap_ingest0/ldap_mces.json tests/integration/ldap/ldap_mces_golden.json
cp tmp/test_mysql_ingest0/mysql_mces.json tests/integration/mysql/mysql_mces_golden.json
cp tmp/test_mssql_ingest0/mssql_mces.json tests/integration/sql_server/mssql_mces_golden.json
cp tmp/test_mongodb_ingest0/mongodb_mces.json tests/integration/mongodb/mongodb_mces_golden.json
cp tmp/test_feast_ingest0/feast_mces.json tests/integration/feast/feast_mces_golden.json
cp tmp/test_dbt_ingest0/dbt_mces.json tests/integration/dbt/dbt_mces_golden.json
cp tmp/test_glue_ingest0/glue_mces.json tests/unit/glue/glue_mces_golden.json
cp tmp/test_sagemaker_ingest0/sagemaker_mces.json tests/unit/sagemaker/sagemaker_mces_golden.json
cp tmp/test_lookml_ingest0/lookml_mces.json tests/integration/lookml/expected_output.json
cp tmp/test_looker_ingest0/looker_mces.json tests/integration/looker/expected_output.json
cp tmp/test_bq_usage_source0/bigquery_usages.json tests/integration/bigquery-usage/bigquery_usages_golden.json

# Print success message.
Expand Down
10 changes: 9 additions & 1 deletion metadata-ingestion/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,11 @@ def get_long_description():
"sqlalchemy==1.3.24",
}

aws_common = {
# AWS Python SDK
"boto3"
}

# Note: for all of these, framework_common will be added.
plugins: Dict[str, Set[str]] = {
# Sink plugins.
Expand All @@ -73,7 +78,7 @@ def get_long_description():
"bigquery-usage": {"google-cloud-logging", "cachetools"},
"druid": sql_common | {"pydruid>=0.6.2"},
"feast": {"docker"},
"glue": {"boto3"},
"glue": aws_common,
"hive": sql_common
| {
# Acryl Data maintains a fork of PyHive, which adds support for table comments
Expand All @@ -90,6 +95,7 @@ def get_long_description():
"oracle": sql_common | {"cx_Oracle"},
"postgres": sql_common | {"psycopg2-binary", "GeoAlchemy2"},
"redshift": sql_common | {"sqlalchemy-redshift", "psycopg2-binary", "GeoAlchemy2"},
"sagemaker": aws_common,
"snowflake": sql_common | {"snowflake-sqlalchemy"},
"snowflake-usage": sql_common | {"snowflake-sqlalchemy"},
"superset": {"requests"},
Expand Down Expand Up @@ -150,6 +156,7 @@ def get_long_description():
"glue",
"hive",
"oracle",
"sagemaker",
"datahub-kafka",
"datahub-rest",
# airflow is added below
Expand Down Expand Up @@ -188,6 +195,7 @@ def get_long_description():
"druid = datahub.ingestion.source.druid:DruidSource",
"feast = datahub.ingestion.source.feast:FeastSource",
"glue = datahub.ingestion.source.glue:GlueSource",
"sagemaker = datahub.ingestion.source.sagemaker:SagemakerSource",
"hive = datahub.ingestion.source.hive:HiveSource",
"kafka = datahub.ingestion.source.kafka:KafkaSource",
"kafka-connect = datahub.ingestion.source.kafka_connect:KafkaConnectSource",
Expand Down
87 changes: 87 additions & 0 deletions metadata-ingestion/src/datahub/ingestion/source/aws_common.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
from functools import reduce
from typing import List, Optional, Union

import boto3

from datahub.configuration import ConfigModel
from datahub.configuration.common import AllowDenyPattern


def assume_role(
role_arn: str, aws_region: str, credentials: Optional[dict] = None
) -> dict:
credentials = credentials or {}
sts_client = boto3.client(
"sts",
region_name=aws_region,
aws_access_key_id=credentials.get("AccessKeyId"),
aws_secret_access_key=credentials.get("SecretAccessKey"),
aws_session_token=credentials.get("SessionToken"),
)

assumed_role_object = sts_client.assume_role(
RoleArn=role_arn, RoleSessionName="DatahubIngestionSource"
)
return assumed_role_object["Credentials"]


class AwsSourceConfig(ConfigModel):
kevinhu marked this conversation as resolved.
Show resolved Hide resolved
"""
Common AWS credentials config.

Currently used by:
- Glue source
- SageMaker source
"""

env: str = "PROD"

database_pattern: AllowDenyPattern = AllowDenyPattern.allow_all()
table_pattern: AllowDenyPattern = AllowDenyPattern.allow_all()

aws_access_key_id: Optional[str] = None
aws_secret_access_key: Optional[str] = None
aws_session_token: Optional[str] = None
aws_role: Optional[Union[str, List[str]]] = None
aws_region: str

def get_client(self, service: str) -> boto3.client:
if (
self.aws_access_key_id
and self.aws_secret_access_key
and self.aws_session_token
):
return boto3.client(
service,
aws_access_key_id=self.aws_access_key_id,
aws_secret_access_key=self.aws_secret_access_key,
aws_session_token=self.aws_session_token,
region_name=self.aws_region,
)
elif self.aws_access_key_id and self.aws_secret_access_key:
return boto3.client(
service,
aws_access_key_id=self.aws_access_key_id,
aws_secret_access_key=self.aws_secret_access_key,
region_name=self.aws_region,
)
elif self.aws_role:
if isinstance(self.aws_role, str):
credentials = assume_role(self.aws_role, self.aws_region)
else:
credentials = reduce(
lambda new_credentials, role_arn: assume_role(
role_arn, self.aws_region, new_credentials
),
self.aws_role,
{},
)
return boto3.client(
service,
aws_access_key_id=credentials["AccessKeyId"],
aws_secret_access_key=credentials["SecretAccessKey"],
aws_session_token=credentials["SessionToken"],
region_name=self.aws_region,
)
else:
return boto3.client(service, region_name=self.aws_region)
77 changes: 2 additions & 75 deletions metadata-ingestion/src/datahub/ingestion/source/glue.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,18 +3,14 @@
from collections import defaultdict
from dataclasses import dataclass
from dataclasses import field as dataclass_field
from functools import reduce
from typing import Any, Dict, Iterable, Iterator, List, Optional, Set, Tuple, Union
from urllib.parse import urlparse

import boto3

from datahub.configuration import ConfigModel
from datahub.configuration.common import AllowDenyPattern
from datahub.emitter import mce_builder
from datahub.ingestion.api.common import PipelineContext
from datahub.ingestion.api.source import Source, SourceReport
from datahub.ingestion.api.workunit import MetadataWorkUnit
from datahub.ingestion.source.aws_common import AwsSourceConfig
from datahub.metadata.com.linkedin.pegasus2avro.common import AuditStamp, Status
from datahub.metadata.com.linkedin.pegasus2avro.metadata.snapshot import DatasetSnapshot
from datahub.metadata.com.linkedin.pegasus2avro.mxe import MetadataChangeEvent
Expand Down Expand Up @@ -49,79 +45,10 @@
)


def assume_role(
role_arn: str, aws_region: str, credentials: Optional[dict] = None
) -> dict:
credentials = credentials or {}
sts_client = boto3.client(
"sts",
region_name=aws_region,
aws_access_key_id=credentials.get("AccessKeyId"),
aws_secret_access_key=credentials.get("SecretAccessKey"),
aws_session_token=credentials.get("SessionToken"),
)

assumed_role_object = sts_client.assume_role(
RoleArn=role_arn, RoleSessionName="DatahubIngestionSourceGlue"
)
return assumed_role_object["Credentials"]


class GlueSourceConfig(ConfigModel):
env: str = "PROD"

database_pattern: AllowDenyPattern = AllowDenyPattern.allow_all()
table_pattern: AllowDenyPattern = AllowDenyPattern.allow_all()
class GlueSourceConfig(AwsSourceConfig):

extract_transforms: Optional[bool] = True

aws_access_key_id: Optional[str] = None
aws_secret_access_key: Optional[str] = None
aws_session_token: Optional[str] = None
aws_role: Optional[Union[str, List[str]]] = None
aws_region: str

def get_client(self, service: str) -> boto3.client:
if (
self.aws_access_key_id
and self.aws_secret_access_key
and self.aws_session_token
):
return boto3.client(
service,
aws_access_key_id=self.aws_access_key_id,
aws_secret_access_key=self.aws_secret_access_key,
aws_session_token=self.aws_session_token,
region_name=self.aws_region,
)
elif self.aws_access_key_id and self.aws_secret_access_key:
return boto3.client(
service,
aws_access_key_id=self.aws_access_key_id,
aws_secret_access_key=self.aws_secret_access_key,
region_name=self.aws_region,
)
elif self.aws_role:
if isinstance(self.aws_role, str):
credentials = assume_role(self.aws_role, self.aws_region)
else:
credentials = reduce(
lambda new_credentials, role_arn: assume_role(
role_arn, self.aws_region, new_credentials
),
self.aws_role,
{},
)
return boto3.client(
service,
aws_access_key_id=credentials["AccessKeyId"],
aws_secret_access_key=credentials["SecretAccessKey"],
aws_session_token=credentials["SessionToken"],
region_name=self.aws_region,
)
else:
return boto3.client(service, region_name=self.aws_region)

@property
def glue_client(self):
return self.get_client("glue")
Expand Down
Loading