Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(ingest): remove duplicate mcps,more typing #12557

Merged
merged 3 commits into from
Feb 7, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
from typing import Callable, Dict, Iterable, List, MutableMapping, Optional

from datahub.ingestion.api.report import SupportsAsObj
from datahub.ingestion.source.common.subtypes import DatasetSubTypes
from datahub.ingestion.source.snowflake.constants import SnowflakeObjectDomain
from datahub.ingestion.source.snowflake.snowflake_connection import SnowflakeConnection
from datahub.ingestion.source.snowflake.snowflake_query import (
Expand Down Expand Up @@ -100,6 +101,9 @@
def is_hybrid(self) -> bool:
return self.type is not None and self.type == "HYBRID TABLE"

def get_subtype(self) -> DatasetSubTypes:
return DatasetSubTypes.TABLE

Check warning on line 105 in metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_schema.py

View check run for this annotation

Codecov / codecov/patch

metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_schema.py#L105

Added line #L105 was not covered by tests


@dataclass
class SnowflakeView(BaseView):
Expand All @@ -109,6 +113,9 @@
column_tags: Dict[str, List[SnowflakeTag]] = field(default_factory=dict)
is_secure: bool = False

def get_subtype(self) -> DatasetSubTypes:
return DatasetSubTypes.VIEW

Check warning on line 117 in metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_schema.py

View check run for this annotation

Codecov / codecov/patch

metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_schema.py#L117

Added line #L117 was not covered by tests


@dataclass
class SnowflakeSchema:
Expand Down Expand Up @@ -154,6 +161,9 @@
column_tags: Dict[str, List[SnowflakeTag]] = field(default_factory=dict)
last_altered: Optional[datetime] = None

def get_subtype(self) -> DatasetSubTypes:
return DatasetSubTypes.SNOWFLAKE_STREAM

Check warning on line 165 in metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_schema.py

View check run for this annotation

Codecov / codecov/patch

metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_schema.py#L165

Added line #L165 was not covered by tests


class _SnowflakeTagCache:
def __init__(self) -> None:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
from datahub.ingestion.source.aws.s3_util import make_s3_urn_for_lineage
from datahub.ingestion.source.common.subtypes import (
DatasetContainerSubTypes,
DatasetSubTypes,
)
from datahub.ingestion.source.snowflake.constants import (
GENERIC_PERMISSION_ERROR_KEY,
Expand Down Expand Up @@ -467,7 +466,13 @@
context=f"{db_name}.{schema_name}",
)

def _process_tags(self, snowflake_schema, schema_name, db_name, domain):
def _process_tags(
self,
snowflake_schema: SnowflakeSchema,
schema_name: str,
db_name: str,
domain: str,
) -> None:
snowflake_schema.tags = self.tag_extractor.get_tags_on_object(
schema_name=schema_name, db_name=db_name, domain=domain
)
Expand Down Expand Up @@ -837,15 +842,7 @@
if dpi_aspect:
yield dpi_aspect

subTypes = SubTypes(
typeNames=(
[DatasetSubTypes.SNOWFLAKE_STREAM]
if isinstance(table, SnowflakeStream)
else [DatasetSubTypes.VIEW]
if isinstance(table, SnowflakeView)
else [DatasetSubTypes.TABLE]
)
)
subTypes = SubTypes(typeNames=[table.get_subtype()])

Check warning on line 845 in metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_schema_gen.py

View check run for this annotation

Codecov / codecov/patch

metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_schema_gen.py#L845

Added line #L845 was not covered by tests

yield MetadataChangeProposalWrapper(
entityUrn=dataset_urn, aspect=subTypes
Expand Down Expand Up @@ -932,9 +929,9 @@
"OWNER_ROLE_TYPE": table.owner_role_type,
"TABLE_NAME": table.table_name,
"BASE_TABLES": table.base_tables,
"STALE_AFTER": table.stale_after.isoformat()
if table.stale_after
else None,
"STALE_AFTER": (
table.stale_after.isoformat() if table.stale_after else None
),
}.items()
if v
}
Expand Down
289 changes: 0 additions & 289 deletions metadata-ingestion/tests/integration/snowflake/snowflake_golden.json
Original file line number Diff line number Diff line change
Expand Up @@ -4240,295 +4240,6 @@
"lastRunId": "no-run-id-provided"
}
},
{
"entityType": "dataset",
"entityUrn": "urn:li:dataset:(urn:li:dataPlatform:snowflake,test_db.test_schema.stream_1,PROD)",
"changeType": "UPSERT",
"aspectName": "status",
"aspect": {
"json": {
"removed": false
}
},
"systemMetadata": {
"lastObserved": 1615443388097,
"runId": "snowflake-2025_01_28-00_01_52-5vkne0",
"lastRunId": "no-run-id-provided"
}
},
{
"entityType": "dataset",
"entityUrn": "urn:li:dataset:(urn:li:dataPlatform:snowflake,test_db.test_schema.stream_1,PROD)",
"changeType": "UPSERT",
"aspectName": "schemaMetadata",
"aspect": {
"json": {
"schemaName": "test_db.test_schema.stream_1",
"platform": "urn:li:dataPlatform:snowflake",
"version": 0,
"created": {
"time": 0,
"actor": "urn:li:corpuser:unknown"
},
"lastModified": {
"time": 0,
"actor": "urn:li:corpuser:unknown"
},
"hash": "",
"platformSchema": {
"com.linkedin.schema.MySqlDDL": {
"tableSchema": ""
}
},
"fields": [
{
"fieldPath": "col_1",
"nullable": false,
"description": "Comment for column",
"type": {
"type": {
"com.linkedin.schema.NumberType": {}
}
},
"nativeDataType": "NUMBER(38,0)",
"recursive": false,
"isPartOfKey": false
},
{
"fieldPath": "col_2",
"nullable": false,
"description": "Comment for column",
"type": {
"type": {
"com.linkedin.schema.StringType": {}
}
},
"nativeDataType": "VARCHAR(255)",
"recursive": false,
"isPartOfKey": false
},
{
"fieldPath": "col_3",
"nullable": false,
"description": "Comment for column",
"type": {
"type": {
"com.linkedin.schema.StringType": {}
}
},
"nativeDataType": "VARCHAR(255)",
"recursive": false,
"isPartOfKey": false
},
{
"fieldPath": "col_4",
"nullable": false,
"description": "Comment for column",
"type": {
"type": {
"com.linkedin.schema.StringType": {}
}
},
"nativeDataType": "VARCHAR(255)",
"recursive": false,
"isPartOfKey": false
},
{
"fieldPath": "col_5",
"nullable": false,
"description": "Comment for column",
"type": {
"type": {
"com.linkedin.schema.StringType": {}
}
},
"nativeDataType": "VARCHAR(255)",
"recursive": false,
"isPartOfKey": false
},
{
"fieldPath": "col_6",
"nullable": false,
"description": "Comment for column",
"type": {
"type": {
"com.linkedin.schema.StringType": {}
}
},
"nativeDataType": "VARCHAR(255)",
"recursive": false,
"isPartOfKey": false
},
{
"fieldPath": "col_7",
"nullable": false,
"description": "Comment for column",
"type": {
"type": {
"com.linkedin.schema.StringType": {}
}
},
"nativeDataType": "VARCHAR(255)",
"recursive": false,
"isPartOfKey": false
},
{
"fieldPath": "col_8",
"nullable": false,
"description": "Comment for column",
"type": {
"type": {
"com.linkedin.schema.StringType": {}
}
},
"nativeDataType": "VARCHAR(255)",
"recursive": false,
"isPartOfKey": false
},
{
"fieldPath": "col_9",
"nullable": false,
"description": "Comment for column",
"type": {
"type": {
"com.linkedin.schema.StringType": {}
}
},
"nativeDataType": "VARCHAR(255)",
"recursive": false,
"isPartOfKey": false
},
{
"fieldPath": "col_10",
"nullable": false,
"description": "Comment for column",
"type": {
"type": {
"com.linkedin.schema.StringType": {}
}
},
"nativeDataType": "VARCHAR(255)",
"recursive": false,
"isPartOfKey": false
},
{
"fieldPath": "metadata$action",
"nullable": false,
"description": "Type of DML operation (INSERT/DELETE)",
"type": {
"type": {
"com.linkedin.schema.StringType": {}
}
},
"nativeDataType": "VARCHAR(10)",
"recursive": false,
"isPartOfKey": false
},
{
"fieldPath": "metadata$isupdate",
"nullable": false,
"description": "Whether row is from UPDATE operation",
"type": {
"type": {
"com.linkedin.schema.BooleanType": {}
}
},
"nativeDataType": "BOOLEAN",
"recursive": false,
"isPartOfKey": false
},
{
"fieldPath": "metadata$row_id",
"nullable": false,
"description": "Unique row identifier",
"type": {
"type": {
"com.linkedin.schema.NumberType": {}
}
},
"nativeDataType": "NUMBER(38,0)",
"recursive": false,
"isPartOfKey": false
}
]
}
},
"systemMetadata": {
"lastObserved": 1615443388097,
"runId": "snowflake-2025_01_28-00_01_52-5vkne0",
"lastRunId": "no-run-id-provided"
}
},
{
"entityType": "dataset",
"entityUrn": "urn:li:dataset:(urn:li:dataPlatform:snowflake,test_db.test_schema.stream_1,PROD)",
"changeType": "UPSERT",
"aspectName": "datasetProperties",
"aspect": {
"json": {
"customProperties": {
"SOURCE_TYPE": "Table",
"TYPE": "DELTA",
"STALE": "false",
"MODE": "DEFAULT",
"OWNER_ROLE_TYPE": "ROLE",
"TABLE_NAME": "TEST_DB.TEST_SCHEMA.TABLE_1",
"BASE_TABLES": "TEST_DB.TEST_SCHEMA.TABLE_1",
"STALE_AFTER": "2021-06-22T00:00:00+00:00"
},
"externalUrl": "https://app.snowflake.com/ap-south-1.aws/abc12345/#/data/databases/TEST_DB/schemas/TEST_SCHEMA/view/STREAM_1/",
"name": "STREAM_1",
"qualifiedName": "TEST_DB.TEST_SCHEMA.STREAM_1",
"description": "Comment for Stream 1",
"created": {
"time": 1623110400000
},
"lastModified": {
"time": 1623110400000
},
"tags": []
}
},
"systemMetadata": {
"lastObserved": 1615443388097,
"runId": "snowflake-2025_01_28-00_01_52-5vkne0",
"lastRunId": "no-run-id-provided"
}
},
{
"entityType": "dataset",
"entityUrn": "urn:li:dataset:(urn:li:dataPlatform:snowflake,test_db.test_schema.stream_1,PROD)",
"changeType": "UPSERT",
"aspectName": "container",
"aspect": {
"json": {
"container": "urn:li:container:94c696a054bab40b73e640a7f82e3b1c"
}
},
"systemMetadata": {
"lastObserved": 1615443388097,
"runId": "snowflake-2025_01_28-00_01_52-5vkne0",
"lastRunId": "no-run-id-provided"
}
},
{
"entityType": "dataset",
"entityUrn": "urn:li:dataset:(urn:li:dataPlatform:snowflake,test_db.test_schema.stream_1,PROD)",
"changeType": "UPSERT",
"aspectName": "subTypes",
"aspect": {
"json": {
"typeNames": [
"Snowflake Stream"
]
}
},
"systemMetadata": {
"lastObserved": 1615443388097,
"runId": "snowflake-2025_01_28-00_01_52-5vkne0",
"lastRunId": "no-run-id-provided"
}
},
{
"entityType": "dataset",
"entityUrn": "urn:li:dataset:(urn:li:dataPlatform:snowflake,test_db.test_schema.stream_1,PROD)",
Expand Down
Loading