Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(ingest): add db/schema properties hook to SQL common #6847

Merged
merged 2 commits into from
Dec 22, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion metadata-ingestion/src/datahub/emitter/mcp_builder.py
Original file line number Diff line number Diff line change
Expand Up @@ -201,6 +201,7 @@ def gen_containers(
name: str,
sub_types: List[str],
parent_container_key: Optional[PlatformKey] = None,
extra_properties: Optional[Dict[str, str]] = None,
domain_urn: Optional[str] = None,
description: Optional[str] = None,
owner_urn: Optional[str] = None,
Expand All @@ -221,7 +222,10 @@ def gen_containers(
aspect=ContainerProperties(
name=name,
description=description,
customProperties=container_key.guid_dict(),
customProperties={
**container_key.guid_dict(),
**(extra_properties or {}),
},
externalUrl=external_url,
qualifiedName=qualified_name,
created=TimeStamp(time=created) if created is not None else None,
Expand Down
4 changes: 2 additions & 2 deletions metadata-ingestion/src/datahub/ingestion/source/sql/athena.py
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,7 @@ def get_schema_names(self, inspector: Inspector) -> List[str]:
return schemas

def gen_database_containers(
self, database: str
self, inspector: Inspector, database: str
) -> typing.Iterable[MetadataWorkUnit]:
# In Athena the schema is the database and database is not existing
return []
Expand All @@ -177,7 +177,7 @@ def gen_schema_key(self, db_name: str, schema: str) -> DatabaseKey:
)

def gen_schema_containers(
self, schema: str, db_name: str
self, inspector: Inspector, schema: str, db_name: str
) -> typing.Iterable[MetadataWorkUnit]:
database_container_key = self.gen_database_key(database=schema)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1196,7 +1196,9 @@ def gen_database_key(self, database: str) -> PlatformKey:
backcompat_instance_for_guid=self.config.env,
)

def gen_database_containers(self, database: str) -> Iterable[MetadataWorkUnit]:
def gen_database_containers(
self, inspector: Inspector, database: str
) -> Iterable[MetadataWorkUnit]:
domain_urn = self._gen_domain_urn(database)

database_container_key = self.gen_database_key(database)
Expand All @@ -1213,7 +1215,7 @@ def gen_database_containers(self, database: str) -> Iterable[MetadataWorkUnit]:
yield wu

def gen_schema_containers(
self, schema: str, db_name: str
self, inspector: Inspector, schema: str, db_name: str
) -> Iterable[MetadataWorkUnit]:
schema_container_key = self.gen_schema_key(db_name, schema)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -317,7 +317,9 @@ def create(cls, config_dict, ctx):
config = PrestoOnHiveConfig.parse_obj(config_dict)
return cls(config, ctx)

def gen_database_containers(self, database: str) -> Iterable[MetadataWorkUnit]:
def gen_database_containers(
self, inspector: Inspector, database: str
) -> Iterable[MetadataWorkUnit]:
domain_urn = self._gen_domain_urn(database)

database_container_key = self.gen_database_key(database)
Expand All @@ -333,7 +335,7 @@ def gen_database_containers(self, database: str) -> Iterable[MetadataWorkUnit]:
yield wu

def gen_schema_containers(
self, schema: str, db_name: str
self, inspector: Inspector, schema: str, db_name: str
) -> Iterable[MetadataWorkUnit]:
assert isinstance(self.config, PrestoOnHiveConfig)
where_clause_suffix: str = ""
Expand Down
37 changes: 28 additions & 9 deletions metadata-ingestion/src/datahub/ingestion/source/sql/sql_common.py
Original file line number Diff line number Diff line change
Expand Up @@ -545,7 +545,9 @@ def gen_database_key(self, database: str) -> PlatformKey:
backcompat_instance_for_guid=self.config.env,
)

def gen_database_containers(self, database: str) -> Iterable[MetadataWorkUnit]:
def gen_database_containers(
self, inspector: Inspector, database: str
) -> Iterable[MetadataWorkUnit]:
domain_urn = self._gen_domain_urn(database)

database_container_key = self.gen_database_key(database)
Expand All @@ -554,6 +556,7 @@ def gen_database_containers(self, database: str) -> Iterable[MetadataWorkUnit]:
name=database,
sub_types=[SqlContainerSubTypes.DATABASE],
domain_urn=domain_urn,
extra_properties=self.get_database_properties(inspector, database=database),
)

# Add container to the checkpoint state
Expand All @@ -567,7 +570,7 @@ def gen_database_containers(self, database: str) -> Iterable[MetadataWorkUnit]:
yield wu

def gen_schema_containers(
self, schema: str, db_name: str
self, inspector: Inspector, schema: str, db_name: str
) -> Iterable[MetadataWorkUnit]:
schema_container_key = self.gen_schema_key(db_name, schema)

Expand All @@ -576,11 +579,13 @@ def gen_schema_containers(
database_container_key = self.gen_database_key(database=db_name)

container_workunits = gen_containers(
# TODO: this one is bad
schema_container_key,
schema,
[SqlContainerSubTypes.SCHEMA],
database_container_key,
container_key=schema_container_key,
name=schema,
sub_types=[SqlContainerSubTypes.SCHEMA],
parent_container_key=database_container_key,
extra_properties=self.get_schema_properties(
inspector, database=db_name, schema=schema
),
)

# Add container to the checkpoint state
Expand Down Expand Up @@ -623,12 +628,16 @@ def get_workunits(self) -> Iterable[Union[MetadataWorkUnit, SqlWorkUnit]]:
profiler = self.get_profiler_instance(inspector)

db_name = self.get_db_name(inspector)
yield from self.gen_database_containers(db_name)
yield from self.gen_database_containers(
inspector=inspector, database=db_name
)

for schema in self.get_allowed_schemas(inspector, db_name):
self.add_information_for_schema(inspector, schema)

yield from self.gen_schema_containers(schema, db_name)
yield from self.gen_schema_containers(
inspector=inspector, schema=schema, db_name=db_name
)

if sql_config.include_tables:
yield from self.loop_tables(inspector, schema, sql_config)
Expand Down Expand Up @@ -893,6 +902,16 @@ def _process_table(
sql_config=sql_config,
)

def get_database_properties(
self, inspector: Inspector, database: str
) -> Optional[Dict[str, str]]:
return None

def get_schema_properties(
self, inspector: Inspector, database: str, schema: str
) -> Optional[Dict[str, str]]:
return None

def get_table_properties(
self, inspector: Inspector, schema: str, table: str
) -> Tuple[Optional[str], Dict[str, str], Optional[str]]:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ def get_inspectors(self):
yield inspector

def gen_schema_containers(
self, schema: str, db_name: str
self, inspector: Inspector, schema: str, db_name: str
) -> typing.Iterable[MetadataWorkUnit]:
return []

Expand Down