Skip to content

Commit

Permalink
feat(ingest): add timestamps for snowflake objects
Browse files Browse the repository at this point in the history
add query to populate last altered time of snowflake database

add note in updating datahub doc
  • Loading branch information
mayurinehate committed Dec 7, 2022
1 parent 626a064 commit 8df9157
Show file tree
Hide file tree
Showing 10 changed files with 141 additions and 37 deletions.
2 changes: 2 additions & 0 deletions docs/how/updating-datahub.md
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,8 @@ This file documents any backwards-incompatible changes in DataHub and assists pe

### Breaking Changes

- #6570 `snowflake` connector now populates created, last modified timestamps for snowflake datasets and containers. This version of snowflake connnector will not work with **datahub-gms** version order than `v0.9.3`

### Potential Downtime

### Deprecations
Expand Down
11 changes: 10 additions & 1 deletion metadata-ingestion/src/datahub/emitter/mcp_builder.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,10 @@
)
from datahub.emitter.mcp import MetadataChangeProposalWrapper
from datahub.ingestion.api.workunit import MetadataWorkUnit
from datahub.metadata.com.linkedin.pegasus2avro.common import DataPlatformInstance
from datahub.metadata.com.linkedin.pegasus2avro.common import (
DataPlatformInstance,
TimeStamp,
)
from datahub.metadata.com.linkedin.pegasus2avro.container import ContainerProperties
from datahub.metadata.com.linkedin.pegasus2avro.events.metadata import ChangeType
from datahub.metadata.schema_classes import (
Expand Down Expand Up @@ -205,6 +208,8 @@ def gen_containers(
external_url: Optional[str] = None,
tags: Optional[List[str]] = None,
qualified_name: Optional[str] = None,
created: Optional[int] = None,
last_modified: Optional[int] = None,
) -> Iterable[MetadataWorkUnit]:
container_urn = make_container_urn(
guid=container_key.guid(),
Expand All @@ -220,6 +225,10 @@ def gen_containers(
customProperties=container_key.guid_dict(),
externalUrl=external_url,
qualifiedName=qualified_name,
created=TimeStamp(time=created) if created is not None else None,
lastModified=TimeStamp(time=last_modified)
if last_modified is not None
else None,
),
)
wu = MetadataWorkUnit(id=f"container-info-{name}-{container_urn}", mcp=mcp)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -831,7 +831,9 @@ def gen_dataset_workunits(
name=datahub_dataset_name.get_table_display_name(),
description=table.comment,
qualifiedName=str(datahub_dataset_name),
created=TimeStamp(time=int(table.created.timestamp() * 1000)),
created=TimeStamp(time=int(table.created.timestamp() * 1000))
if table.created is not None
else None,
lastModified=TimeStamp(time=int(table.last_altered.timestamp() * 1000))
if table.last_altered is not None
else None,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,17 @@ def show_databases() -> str:
def use_database(db_name: str) -> str:
return f'use database "{db_name}"'

@staticmethod
def get_databases(db_name: Optional[str]) -> str:
db_clause = f'"{db_name}".' if db_name is not None else ""
return f"""
SELECT database_name AS "DATABASE_NAME",
created AS "CREATED",
last_altered AS "LAST_ALTERED",
comment AS "COMMENT"
from {db_clause}information_schema.databases
order by database_name"""

@staticmethod
def schemas_for_database(db_name: Optional[str]) -> str:
db_clause = f'"{db_name}".' if db_name is not None else ""
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,8 +72,8 @@ class SnowflakeView(BaseView):
@dataclass
class SnowflakeSchema:
name: str
created: datetime
last_altered: datetime
created: Optional[datetime]
last_altered: Optional[datetime]
comment: Optional[str]
tables: List[SnowflakeTable] = field(default_factory=list)
views: List[SnowflakeView] = field(default_factory=list)
Expand All @@ -82,16 +82,17 @@ class SnowflakeSchema:
@dataclass
class SnowflakeDatabase:
name: str
created: datetime
created: Optional[datetime]
comment: Optional[str]
last_altered: Optional[datetime] = None
schemas: List[SnowflakeSchema] = field(default_factory=list)


class SnowflakeDataDictionary(SnowflakeQueryMixin):
def __init__(self) -> None:
self.logger = logger

def get_databases(self, conn: SnowflakeConnection) -> List[SnowflakeDatabase]:
def show_databases(self, conn: SnowflakeConnection) -> List[SnowflakeDatabase]:

databases: List[SnowflakeDatabase] = []

Expand All @@ -110,6 +111,28 @@ def get_databases(self, conn: SnowflakeConnection) -> List[SnowflakeDatabase]:

return databases

def get_databases(
self, conn: SnowflakeConnection, db_name: str
) -> List[SnowflakeDatabase]:

databases: List[SnowflakeDatabase] = []

cur = self.query(
conn,
SnowflakeQuery.get_databases(db_name),
)

for database in cur:
snowflake_db = SnowflakeDatabase(
name=database["DATABASE_NAME"],
created=database["CREATED"],
last_altered=database["LAST_ALTERED"],
comment=database["COMMENT"],
)
databases.append(snowflake_db)

return databases

def get_schemas_for_database(
self, conn: SnowflakeConnection, db_name: str
) -> List[SnowflakeSchema]:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,11 @@
from datahub.ingestion.source.state.stateful_ingestion_base import (
StatefulIngestionSourceBase,
)
from datahub.metadata.com.linkedin.pegasus2avro.common import Status, SubTypes
from datahub.metadata.com.linkedin.pegasus2avro.common import (
Status,
SubTypes,
TimeStamp,
)
from datahub.metadata.com.linkedin.pegasus2avro.dataset import (
DatasetProperties,
UpstreamLineage,
Expand Down Expand Up @@ -438,7 +442,8 @@ def get_workunits(self) -> Iterable[WorkUnit]:
self.report.include_technical_schema = self.config.include_technical_schema
databases: List[SnowflakeDatabase] = []

databases = self.data_dictionary.get_databases(conn)
databases = self.get_databases(conn)

for snowflake_db in databases:
self.report.report_entity_scanned(snowflake_db.name, "database")

Expand Down Expand Up @@ -481,6 +486,31 @@ def get_workunits(self) -> Iterable[WorkUnit]:
]
yield from self.usage_extractor.get_workunits(discovered_datasets)

def get_databases(self, conn):
databases = self.data_dictionary.show_databases(conn)

# Below code block is required to enrich database with additional
# information that is missing in `show databases` results
# For example - last modified time of database
ischema_database_map: Dict[str, SnowflakeDatabase] = {}
for database in databases:
try:
ischema_databases = self.data_dictionary.get_databases(
conn, database.name
)
ischema_database_map = {db.name: db for db in ischema_databases}
break
except Exception:
# query fails if "USAGE" access is not granted for database
logger.debug(
f"Failed to list databases {database.name} information_schema"
)
for database in databases:
if database.name in ischema_database_map.keys():
database.last_altered = ischema_database_map[database.name].last_altered

return databases

def _process_database(
self, conn: SnowflakeConnection, snowflake_db: SnowflakeDatabase
) -> Iterable[MetadataWorkUnit]:
Expand Down Expand Up @@ -649,6 +679,12 @@ def gen_dataset_workunits(

dataset_properties = DatasetProperties(
name=table.name,
created=TimeStamp(time=int(table.created.timestamp() * 1000))
if table.created is not None
else None,
lastModified=TimeStamp(time=int(table.last_altered.timestamp() * 1000))
if table.last_altered is not None
else None,
description=table.comment,
qualifiedName=dataset_name,
customProperties={**upstream_column_props},
Expand Down Expand Up @@ -906,6 +942,12 @@ def gen_database_containers(
external_url=self.get_external_url_for_database(database.name)
if self.config.include_external_url
else None,
created=int(database.created.timestamp() * 1000)
if database.created is not None
else None,
last_modified=int(database.last_altered.timestamp() * 1000)
if database.last_altered is not None
else None,
)

self.stale_entity_removal_handler.add_entity_to_state(
Expand Down Expand Up @@ -945,6 +987,12 @@ def gen_schema_containers(
external_url=self.get_external_url_for_schema(schema.name, db_name)
if self.config.include_external_url
else None,
created=int(schema.created.timestamp() * 1000)
if schema.created is not None
else None,
last_modified=int(schema.last_altered.timestamp() * 1000)
if schema.last_altered is not None
else None,
)

for wu in container_workunits:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,9 +33,9 @@ class BaseTable(Generic[SqlTableColumn]):
name: str
comment: Optional[str]
created: datetime
last_altered: datetime
size_in_bytes: int
rows_count: int
last_altered: Optional[datetime]
size_in_bytes: Optional[int]
rows_count: Optional[int]
columns: List[SqlTableColumn] = field(default_factory=list)
ddl: Optional[str] = None

Expand All @@ -44,8 +44,8 @@ class BaseTable(Generic[SqlTableColumn]):
class BaseView(Generic[SqlTableColumn]):
name: str
comment: Optional[str]
created: datetime
last_altered: datetime
created: Optional[datetime]
last_altered: Optional[datetime]
view_definition: str
size_in_bytes: Optional[int] = None
rows_count: Optional[int] = None
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
"changeType": "UPSERT",
"aspectName": "containerProperties",
"aspect": {
"value": "{\"customProperties\": {\"platform\": \"snowflake\", \"instance\": \"PROD\", \"database\": \"test_db\"}, \"externalUrl\": \"https://app.snowflake.com/ap-south-1/abc12345/#/data/databases/TEST_DB/\", \"name\": \"TEST_DB\", \"description\": \"Comment for TEST_DB\"}",
"value": "{\"customProperties\": {\"platform\": \"snowflake\", \"instance\": \"PROD\", \"database\": \"test_db\"}, \"externalUrl\": \"https://app.snowflake.com/ap-south-1/abc12345/#/data/databases/TEST_DB/\", \"name\": \"TEST_DB\", \"description\": \"Comment for TEST_DB\", \"created\": {\"time\": 1623110400000}, \"lastModified\": {\"time\": 1623110400000}}",
"contentType": "application/json"
},
"systemMetadata": {
Expand Down Expand Up @@ -61,7 +61,7 @@
"changeType": "UPSERT",
"aspectName": "containerProperties",
"aspect": {
"value": "{\"customProperties\": {\"platform\": \"snowflake\", \"instance\": \"PROD\", \"database\": \"test_db\", \"schema\": \"test_schema\"}, \"externalUrl\": \"https://app.snowflake.com/ap-south-1/abc12345/#/data/databases/TEST_DB/schemas/TEST_SCHEMA/\", \"name\": \"TEST_SCHEMA\", \"description\": \"comment for TEST_DB.TEST_SCHEMA\"}",
"value": "{\"customProperties\": {\"platform\": \"snowflake\", \"instance\": \"PROD\", \"database\": \"test_db\", \"schema\": \"test_schema\"}, \"externalUrl\": \"https://app.snowflake.com/ap-south-1/abc12345/#/data/databases/TEST_DB/schemas/TEST_SCHEMA/\", \"name\": \"TEST_SCHEMA\", \"description\": \"comment for TEST_DB.TEST_SCHEMA\", \"created\": {\"time\": 1623110400000}, \"lastModified\": {\"time\": 1623110400000}}",
"contentType": "application/json"
},
"systemMetadata": {
Expand Down Expand Up @@ -159,7 +159,7 @@
"changeType": "UPSERT",
"aspectName": "datasetProperties",
"aspect": {
"value": "{\"customProperties\": {}, \"externalUrl\": \"https://app.snowflake.com/ap-south-1/abc12345/#/data/databases/TEST_DB/schemas/TEST_SCHEMA/table/TABLE_1/\", \"name\": \"TABLE_1\", \"qualifiedName\": \"test_db.test_schema.table_1\", \"description\": \"Comment for Table\", \"tags\": []}",
"value": "{\"customProperties\": {}, \"externalUrl\": \"https://app.snowflake.com/ap-south-1/abc12345/#/data/databases/TEST_DB/schemas/TEST_SCHEMA/table/TABLE_1/\", \"name\": \"TABLE_1\", \"qualifiedName\": \"test_db.test_schema.table_1\", \"description\": \"Comment for Table\", \"created\": {\"time\": 1623110400000}, \"lastModified\": {\"time\": 1623110400000}, \"tags\": []}",
"contentType": "application/json"
},
"systemMetadata": {
Expand Down Expand Up @@ -243,7 +243,7 @@
"changeType": "UPSERT",
"aspectName": "datasetProperties",
"aspect": {
"value": "{\"customProperties\": {}, \"externalUrl\": \"https://app.snowflake.com/ap-south-1/abc12345/#/data/databases/TEST_DB/schemas/TEST_SCHEMA/table/TABLE_2/\", \"name\": \"TABLE_2\", \"qualifiedName\": \"test_db.test_schema.table_2\", \"description\": \"Comment for Table\", \"tags\": []}",
"value": "{\"customProperties\": {}, \"externalUrl\": \"https://app.snowflake.com/ap-south-1/abc12345/#/data/databases/TEST_DB/schemas/TEST_SCHEMA/table/TABLE_2/\", \"name\": \"TABLE_2\", \"qualifiedName\": \"test_db.test_schema.table_2\", \"description\": \"Comment for Table\", \"created\": {\"time\": 1623110400000}, \"lastModified\": {\"time\": 1623110400000}, \"tags\": []}",
"contentType": "application/json"
},
"systemMetadata": {
Expand Down Expand Up @@ -327,7 +327,7 @@
"changeType": "UPSERT",
"aspectName": "datasetProperties",
"aspect": {
"value": "{\"customProperties\": {}, \"externalUrl\": \"https://app.snowflake.com/ap-south-1/abc12345/#/data/databases/TEST_DB/schemas/TEST_SCHEMA/table/TABLE_3/\", \"name\": \"TABLE_3\", \"qualifiedName\": \"test_db.test_schema.table_3\", \"description\": \"Comment for Table\", \"tags\": []}",
"value": "{\"customProperties\": {}, \"externalUrl\": \"https://app.snowflake.com/ap-south-1/abc12345/#/data/databases/TEST_DB/schemas/TEST_SCHEMA/table/TABLE_3/\", \"name\": \"TABLE_3\", \"qualifiedName\": \"test_db.test_schema.table_3\", \"description\": \"Comment for Table\", \"created\": {\"time\": 1623110400000}, \"lastModified\": {\"time\": 1623110400000}, \"tags\": []}",
"contentType": "application/json"
},
"systemMetadata": {
Expand Down Expand Up @@ -411,7 +411,7 @@
"changeType": "UPSERT",
"aspectName": "datasetProperties",
"aspect": {
"value": "{\"customProperties\": {}, \"externalUrl\": \"https://app.snowflake.com/ap-south-1/abc12345/#/data/databases/TEST_DB/schemas/TEST_SCHEMA/table/TABLE_4/\", \"name\": \"TABLE_4\", \"qualifiedName\": \"test_db.test_schema.table_4\", \"description\": \"Comment for Table\", \"tags\": []}",
"value": "{\"customProperties\": {}, \"externalUrl\": \"https://app.snowflake.com/ap-south-1/abc12345/#/data/databases/TEST_DB/schemas/TEST_SCHEMA/table/TABLE_4/\", \"name\": \"TABLE_4\", \"qualifiedName\": \"test_db.test_schema.table_4\", \"description\": \"Comment for Table\", \"created\": {\"time\": 1623110400000}, \"lastModified\": {\"time\": 1623110400000}, \"tags\": []}",
"contentType": "application/json"
},
"systemMetadata": {
Expand Down Expand Up @@ -495,7 +495,7 @@
"changeType": "UPSERT",
"aspectName": "datasetProperties",
"aspect": {
"value": "{\"customProperties\": {}, \"externalUrl\": \"https://app.snowflake.com/ap-south-1/abc12345/#/data/databases/TEST_DB/schemas/TEST_SCHEMA/table/TABLE_5/\", \"name\": \"TABLE_5\", \"qualifiedName\": \"test_db.test_schema.table_5\", \"description\": \"Comment for Table\", \"tags\": []}",
"value": "{\"customProperties\": {}, \"externalUrl\": \"https://app.snowflake.com/ap-south-1/abc12345/#/data/databases/TEST_DB/schemas/TEST_SCHEMA/table/TABLE_5/\", \"name\": \"TABLE_5\", \"qualifiedName\": \"test_db.test_schema.table_5\", \"description\": \"Comment for Table\", \"created\": {\"time\": 1623110400000}, \"lastModified\": {\"time\": 1623110400000}, \"tags\": []}",
"contentType": "application/json"
},
"systemMetadata": {
Expand Down Expand Up @@ -579,7 +579,7 @@
"changeType": "UPSERT",
"aspectName": "datasetProperties",
"aspect": {
"value": "{\"customProperties\": {}, \"externalUrl\": \"https://app.snowflake.com/ap-south-1/abc12345/#/data/databases/TEST_DB/schemas/TEST_SCHEMA/table/TABLE_6/\", \"name\": \"TABLE_6\", \"qualifiedName\": \"test_db.test_schema.table_6\", \"description\": \"Comment for Table\", \"tags\": []}",
"value": "{\"customProperties\": {}, \"externalUrl\": \"https://app.snowflake.com/ap-south-1/abc12345/#/data/databases/TEST_DB/schemas/TEST_SCHEMA/table/TABLE_6/\", \"name\": \"TABLE_6\", \"qualifiedName\": \"test_db.test_schema.table_6\", \"description\": \"Comment for Table\", \"created\": {\"time\": 1623110400000}, \"lastModified\": {\"time\": 1623110400000}, \"tags\": []}",
"contentType": "application/json"
},
"systemMetadata": {
Expand Down Expand Up @@ -663,7 +663,7 @@
"changeType": "UPSERT",
"aspectName": "datasetProperties",
"aspect": {
"value": "{\"customProperties\": {}, \"externalUrl\": \"https://app.snowflake.com/ap-south-1/abc12345/#/data/databases/TEST_DB/schemas/TEST_SCHEMA/table/TABLE_7/\", \"name\": \"TABLE_7\", \"qualifiedName\": \"test_db.test_schema.table_7\", \"description\": \"Comment for Table\", \"tags\": []}",
"value": "{\"customProperties\": {}, \"externalUrl\": \"https://app.snowflake.com/ap-south-1/abc12345/#/data/databases/TEST_DB/schemas/TEST_SCHEMA/table/TABLE_7/\", \"name\": \"TABLE_7\", \"qualifiedName\": \"test_db.test_schema.table_7\", \"description\": \"Comment for Table\", \"created\": {\"time\": 1623110400000}, \"lastModified\": {\"time\": 1623110400000}, \"tags\": []}",
"contentType": "application/json"
},
"systemMetadata": {
Expand Down Expand Up @@ -747,7 +747,7 @@
"changeType": "UPSERT",
"aspectName": "datasetProperties",
"aspect": {
"value": "{\"customProperties\": {}, \"externalUrl\": \"https://app.snowflake.com/ap-south-1/abc12345/#/data/databases/TEST_DB/schemas/TEST_SCHEMA/table/TABLE_8/\", \"name\": \"TABLE_8\", \"qualifiedName\": \"test_db.test_schema.table_8\", \"description\": \"Comment for Table\", \"tags\": []}",
"value": "{\"customProperties\": {}, \"externalUrl\": \"https://app.snowflake.com/ap-south-1/abc12345/#/data/databases/TEST_DB/schemas/TEST_SCHEMA/table/TABLE_8/\", \"name\": \"TABLE_8\", \"qualifiedName\": \"test_db.test_schema.table_8\", \"description\": \"Comment for Table\", \"created\": {\"time\": 1623110400000}, \"lastModified\": {\"time\": 1623110400000}, \"tags\": []}",
"contentType": "application/json"
},
"systemMetadata": {
Expand Down Expand Up @@ -831,7 +831,7 @@
"changeType": "UPSERT",
"aspectName": "datasetProperties",
"aspect": {
"value": "{\"customProperties\": {}, \"externalUrl\": \"https://app.snowflake.com/ap-south-1/abc12345/#/data/databases/TEST_DB/schemas/TEST_SCHEMA/table/TABLE_9/\", \"name\": \"TABLE_9\", \"qualifiedName\": \"test_db.test_schema.table_9\", \"description\": \"Comment for Table\", \"tags\": []}",
"value": "{\"customProperties\": {}, \"externalUrl\": \"https://app.snowflake.com/ap-south-1/abc12345/#/data/databases/TEST_DB/schemas/TEST_SCHEMA/table/TABLE_9/\", \"name\": \"TABLE_9\", \"qualifiedName\": \"test_db.test_schema.table_9\", \"description\": \"Comment for Table\", \"created\": {\"time\": 1623110400000}, \"lastModified\": {\"time\": 1623110400000}, \"tags\": []}",
"contentType": "application/json"
},
"systemMetadata": {
Expand Down Expand Up @@ -915,7 +915,7 @@
"changeType": "UPSERT",
"aspectName": "datasetProperties",
"aspect": {
"value": "{\"customProperties\": {}, \"externalUrl\": \"https://app.snowflake.com/ap-south-1/abc12345/#/data/databases/TEST_DB/schemas/TEST_SCHEMA/table/TABLE_10/\", \"name\": \"TABLE_10\", \"qualifiedName\": \"test_db.test_schema.table_10\", \"description\": \"Comment for Table\", \"tags\": []}",
"value": "{\"customProperties\": {}, \"externalUrl\": \"https://app.snowflake.com/ap-south-1/abc12345/#/data/databases/TEST_DB/schemas/TEST_SCHEMA/table/TABLE_10/\", \"name\": \"TABLE_10\", \"qualifiedName\": \"test_db.test_schema.table_10\", \"description\": \"Comment for Table\", \"created\": {\"time\": 1623110400000}, \"lastModified\": {\"time\": 1623110400000}, \"tags\": []}",
"contentType": "application/json"
},
"systemMetadata": {
Expand Down
Loading

0 comments on commit 8df9157

Please sign in to comment.