From e5c48f467527230b22769c6859e42410550bd0f6 Mon Sep 17 00:00:00 2001 From: Harshal Sheth Date: Thu, 22 Dec 2022 14:58:46 -0500 Subject: [PATCH 1/2] feat(ingest): add db/schema properties hook to SQL common --- .../src/datahub/emitter/mcp_builder.py | 6 ++- .../datahub/ingestion/source/sql/athena.py | 4 +- .../datahub/ingestion/source/sql/bigquery.py | 4 +- .../ingestion/source/sql/presto_on_hive.py | 6 ++- .../ingestion/source/sql/sql_common.py | 37 ++++++++++++++----- .../source/sql/two_tier_sql_source.py | 2 +- 6 files changed, 42 insertions(+), 17 deletions(-) diff --git a/metadata-ingestion/src/datahub/emitter/mcp_builder.py b/metadata-ingestion/src/datahub/emitter/mcp_builder.py index 3bcfcd9909148..a288e807affaa 100644 --- a/metadata-ingestion/src/datahub/emitter/mcp_builder.py +++ b/metadata-ingestion/src/datahub/emitter/mcp_builder.py @@ -201,6 +201,7 @@ def gen_containers( name: str, sub_types: List[str], parent_container_key: Optional[PlatformKey] = None, + extra_properties: Optional[Dict[str, str]] = None, domain_urn: Optional[str] = None, description: Optional[str] = None, owner_urn: Optional[str] = None, @@ -221,7 +222,10 @@ def gen_containers( aspect=ContainerProperties( name=name, description=description, - customProperties=container_key.guid_dict(), + customProperties={ + **container_key.guid_dict(), + **(extra_properties or {}), + }, externalUrl=external_url, qualifiedName=qualified_name, created=TimeStamp(time=created) if created is not None else None, diff --git a/metadata-ingestion/src/datahub/ingestion/source/sql/athena.py b/metadata-ingestion/src/datahub/ingestion/source/sql/athena.py index 54efda036df09..b8f341cf27733 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/sql/athena.py +++ b/metadata-ingestion/src/datahub/ingestion/source/sql/athena.py @@ -163,7 +163,7 @@ def get_schema_names(self, inspector: Inspector) -> List[str]: return schemas def gen_database_containers( - self, database: str + self, inspector: Inspector, database: str ) -> typing.Iterable[MetadataWorkUnit]: # In Athena the schema is the database and database is not existing return [] @@ -177,7 +177,7 @@ def gen_schema_key(self, db_name: str, schema: str) -> DatabaseKey: ) def gen_schema_containers( - self, schema: str, db_name: str + self, inspector: Inspector, schema: str, db_name: str ) -> typing.Iterable[MetadataWorkUnit]: database_container_key = self.gen_database_key(database=schema) diff --git a/metadata-ingestion/src/datahub/ingestion/source/sql/bigquery.py b/metadata-ingestion/src/datahub/ingestion/source/sql/bigquery.py index cc4affbe7d363..d45febc253ea8 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/sql/bigquery.py +++ b/metadata-ingestion/src/datahub/ingestion/source/sql/bigquery.py @@ -1196,7 +1196,7 @@ def gen_database_key(self, database: str) -> PlatformKey: backcompat_instance_for_guid=self.config.env, ) - def gen_database_containers(self, database: str) -> Iterable[MetadataWorkUnit]: + def gen_database_containers(self, inspector: Inspector, database: str) -> Iterable[MetadataWorkUnit]: domain_urn = self._gen_domain_urn(database) database_container_key = self.gen_database_key(database) @@ -1213,7 +1213,7 @@ def gen_database_containers(self, database: str) -> Iterable[MetadataWorkUnit]: yield wu def gen_schema_containers( - self, schema: str, db_name: str + self, inspector: Inspector, schema: str, db_name: str ) -> Iterable[MetadataWorkUnit]: schema_container_key = self.gen_schema_key(db_name, schema) diff --git a/metadata-ingestion/src/datahub/ingestion/source/sql/presto_on_hive.py b/metadata-ingestion/src/datahub/ingestion/source/sql/presto_on_hive.py index d89e07ccb00bf..718585b49283f 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/sql/presto_on_hive.py +++ b/metadata-ingestion/src/datahub/ingestion/source/sql/presto_on_hive.py @@ -317,7 +317,9 @@ def create(cls, config_dict, ctx): config = PrestoOnHiveConfig.parse_obj(config_dict) return cls(config, ctx) - def gen_database_containers(self, database: str) -> Iterable[MetadataWorkUnit]: + def gen_database_containers( + self, inspector: Inspector, database: str + ) -> Iterable[MetadataWorkUnit]: domain_urn = self._gen_domain_urn(database) database_container_key = self.gen_database_key(database) @@ -333,7 +335,7 @@ def gen_database_containers(self, database: str) -> Iterable[MetadataWorkUnit]: yield wu def gen_schema_containers( - self, schema: str, db_name: str + self, inspector: Inspector, schema: str, db_name: str ) -> Iterable[MetadataWorkUnit]: assert isinstance(self.config, PrestoOnHiveConfig) where_clause_suffix: str = "" diff --git a/metadata-ingestion/src/datahub/ingestion/source/sql/sql_common.py b/metadata-ingestion/src/datahub/ingestion/source/sql/sql_common.py index 021459329626f..84118b305b93e 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/sql/sql_common.py +++ b/metadata-ingestion/src/datahub/ingestion/source/sql/sql_common.py @@ -545,7 +545,9 @@ def gen_database_key(self, database: str) -> PlatformKey: backcompat_instance_for_guid=self.config.env, ) - def gen_database_containers(self, database: str) -> Iterable[MetadataWorkUnit]: + def gen_database_containers( + self, inspector: Inspector, database: str + ) -> Iterable[MetadataWorkUnit]: domain_urn = self._gen_domain_urn(database) database_container_key = self.gen_database_key(database) @@ -554,6 +556,7 @@ def gen_database_containers(self, database: str) -> Iterable[MetadataWorkUnit]: name=database, sub_types=[SqlContainerSubTypes.DATABASE], domain_urn=domain_urn, + extra_properties=self.get_database_properties(inspector, database=database), ) # Add container to the checkpoint state @@ -567,7 +570,7 @@ def gen_database_containers(self, database: str) -> Iterable[MetadataWorkUnit]: yield wu def gen_schema_containers( - self, schema: str, db_name: str + self, inspector: Inspector, schema: str, db_name: str ) -> Iterable[MetadataWorkUnit]: schema_container_key = self.gen_schema_key(db_name, schema) @@ -576,11 +579,13 @@ def gen_schema_containers( database_container_key = self.gen_database_key(database=db_name) container_workunits = gen_containers( - # TODO: this one is bad - schema_container_key, - schema, - [SqlContainerSubTypes.SCHEMA], - database_container_key, + container_key=schema_container_key, + name=schema, + sub_types=[SqlContainerSubTypes.SCHEMA], + parent_container_key=database_container_key, + extra_properties=self.get_schema_properties( + inspector, database=db_name, schema=schema + ), ) # Add container to the checkpoint state @@ -623,12 +628,16 @@ def get_workunits(self) -> Iterable[Union[MetadataWorkUnit, SqlWorkUnit]]: profiler = self.get_profiler_instance(inspector) db_name = self.get_db_name(inspector) - yield from self.gen_database_containers(db_name) + yield from self.gen_database_containers( + inspector=inspector, database=db_name + ) for schema in self.get_allowed_schemas(inspector, db_name): self.add_information_for_schema(inspector, schema) - yield from self.gen_schema_containers(schema, db_name) + yield from self.gen_schema_containers( + inspector=inspector, schema=schema, db_name=db_name + ) if sql_config.include_tables: yield from self.loop_tables(inspector, schema, sql_config) @@ -893,6 +902,16 @@ def _process_table( sql_config=sql_config, ) + def get_database_properties( + self, inspector: Inspector, database: str + ) -> Optional[Dict[str, str]]: + return None + + def get_schema_properties( + self, inspector: Inspector, database: str, schema: str + ) -> Optional[Dict[str, str]]: + return None + def get_table_properties( self, inspector: Inspector, schema: str, table: str ) -> Tuple[Optional[str], Dict[str, str], Optional[str]]: diff --git a/metadata-ingestion/src/datahub/ingestion/source/sql/two_tier_sql_source.py b/metadata-ingestion/src/datahub/ingestion/source/sql/two_tier_sql_source.py index f2914ae2c1fa4..3691a22ff596b 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/sql/two_tier_sql_source.py +++ b/metadata-ingestion/src/datahub/ingestion/source/sql/two_tier_sql_source.py @@ -90,7 +90,7 @@ def get_inspectors(self): yield inspector def gen_schema_containers( - self, schema: str, db_name: str + self, inspector: Inspector, schema: str, db_name: str ) -> typing.Iterable[MetadataWorkUnit]: return [] From 4ee12dcffd6622ea588ffa446c56930ebed54991 Mon Sep 17 00:00:00 2001 From: Harshal Sheth Date: Thu, 22 Dec 2022 15:26:41 -0500 Subject: [PATCH 2/2] lint --- .../src/datahub/ingestion/source/sql/bigquery.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/metadata-ingestion/src/datahub/ingestion/source/sql/bigquery.py b/metadata-ingestion/src/datahub/ingestion/source/sql/bigquery.py index d45febc253ea8..77d0832865213 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/sql/bigquery.py +++ b/metadata-ingestion/src/datahub/ingestion/source/sql/bigquery.py @@ -1196,7 +1196,9 @@ def gen_database_key(self, database: str) -> PlatformKey: backcompat_instance_for_guid=self.config.env, ) - def gen_database_containers(self, inspector: Inspector, database: str) -> Iterable[MetadataWorkUnit]: + def gen_database_containers( + self, inspector: Inspector, database: str + ) -> Iterable[MetadataWorkUnit]: domain_urn = self._gen_domain_urn(database) database_container_key = self.gen_database_key(database)