-
Notifications
You must be signed in to change notification settings - Fork 3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat(ingest/teradata): view parsing #9005
Changes from all commits
96331e2
de50545
2375f2d
8003ee0
7980121
246b28d
ff4a65e
b86c7a0
f26349a
1ebbdca
6abf101
6245a7b
fc280da
478c997
cf320eb
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,5 +1,6 @@ | ||
import logging | ||
from dataclasses import dataclass | ||
from datetime import datetime | ||
from typing import Iterable, Optional, Set, Union | ||
|
||
# This import verifies that the dependencies are available. | ||
|
@@ -11,6 +12,7 @@ | |
|
||
from datahub.configuration.common import AllowDenyPattern | ||
from datahub.configuration.time_window_config import BaseTimeWindowConfig | ||
from datahub.emitter.mcp import MetadataChangeProposalWrapper | ||
from datahub.emitter.sql_parsing_builder import SqlParsingBuilder | ||
from datahub.ingestion.api.common import PipelineContext | ||
from datahub.ingestion.api.decorators import ( | ||
|
@@ -32,11 +34,18 @@ | |
from datahub.ingestion.source.usage.usage_common import BaseUsageConfig | ||
from datahub.ingestion.source_report.ingestion_stage import IngestionStageReport | ||
from datahub.ingestion.source_report.time_window import BaseTimeWindowReport | ||
from datahub.metadata._schema_classes import ( | ||
MetadataChangeEventClass, | ||
SchemaMetadataClass, | ||
ViewPropertiesClass, | ||
) | ||
from datahub.metadata.com.linkedin.pegasus2avro.schema import ( | ||
BytesTypeClass, | ||
TimeTypeClass, | ||
) | ||
from datahub.utilities.file_backed_collections import FileBackedDict | ||
from datahub.utilities.sqlglot_lineage import SchemaResolver, sqlglot_lineage | ||
from datahub.utilities.urns.dataset_urn import DatasetUrn | ||
|
||
logger: logging.Logger = logging.getLogger(__name__) | ||
|
||
|
@@ -64,6 +73,7 @@ | |
@dataclass | ||
class TeradataReport(ProfilingSqlReport, IngestionStageReport, BaseTimeWindowReport): | ||
num_queries_parsed: int = 0 | ||
num_view_ddl_parsed: int = 0 | ||
num_table_parse_failures: int = 0 | ||
|
||
|
||
|
@@ -82,17 +92,16 @@ class TeradataConfig(BaseTeradataConfig, BaseTimeWindowConfig): | |
"This requires to have the table lineage feature enabled.", | ||
) | ||
|
||
include_view_lineage = Field( | ||
default=True, | ||
description="Whether to include view lineage in the ingestion. " | ||
"This requires to have the view lineage feature enabled.", | ||
) | ||
usage: BaseUsageConfig = Field( | ||
description="The usage config to use when generating usage statistics", | ||
default=BaseUsageConfig(), | ||
) | ||
|
||
use_schema_resolver: bool = Field( | ||
default=True, | ||
description="Read SchemaMetadata aspects from DataHub to aid in SQL parsing. Turn off only for testing.", | ||
hidden_from_docs=True, | ||
) | ||
|
||
default_db: Optional[str] = Field( | ||
default=None, | ||
description="The default database to use for unqualified table names", | ||
|
@@ -141,46 +150,47 @@ def __init__(self, config: TeradataConfig, ctx: PipelineContext): | |
self.report: TeradataReport = TeradataReport() | ||
self.graph: Optional[DataHubGraph] = ctx.graph | ||
|
||
if self.graph: | ||
if self.config.use_schema_resolver: | ||
self.schema_resolver = ( | ||
self.graph.initialize_schema_resolver_from_datahub( | ||
platform=self.platform, | ||
platform_instance=self.config.platform_instance, | ||
env=self.config.env, | ||
) | ||
) | ||
self.urns = self.schema_resolver.get_urns() | ||
else: | ||
self.schema_resolver = self.graph._make_schema_resolver( | ||
platform=self.platform, | ||
platform_instance=self.config.platform_instance, | ||
env=self.config.env, | ||
) | ||
self.urns = None | ||
else: | ||
self.schema_resolver = SchemaResolver( | ||
platform=self.platform, | ||
platform_instance=self.config.platform_instance, | ||
graph=None, | ||
env=self.config.env, | ||
) | ||
self.urns = None | ||
|
||
self.builder: SqlParsingBuilder = SqlParsingBuilder( | ||
usage_config=self.config.usage | ||
if self.config.include_usage_statistics | ||
else None, | ||
generate_lineage=self.config.include_table_lineage, | ||
generate_lineage=True, | ||
generate_usage_statistics=self.config.include_usage_statistics, | ||
generate_operations=self.config.usage.include_operational_stats, | ||
) | ||
|
||
self.schema_resolver = SchemaResolver( | ||
platform=self.platform, | ||
platform_instance=self.config.platform_instance, | ||
graph=None, | ||
env=self.config.env, | ||
) | ||
|
||
self._view_definition_cache: FileBackedDict[str] = FileBackedDict() | ||
|
||
@classmethod | ||
def create(cls, config_dict, ctx): | ||
config = TeradataConfig.parse_obj(config_dict) | ||
return cls(config, ctx) | ||
|
||
def get_view_lineage(self) -> Iterable[MetadataWorkUnit]: | ||
for key in self._view_definition_cache.keys(): | ||
view_definition = self._view_definition_cache[key] | ||
dataset_urn = DatasetUrn.create_from_string(key) | ||
|
||
db_name: Optional[str] = None | ||
# We need to get the default db from the dataset urn otherwise the builder generates the wrong urns | ||
if "." in dataset_urn.get_dataset_name(): | ||
db_name = dataset_urn.get_dataset_name().split(".", 1)[0] | ||
|
||
self.report.num_view_ddl_parsed += 1 | ||
if self.report.num_view_ddl_parsed % 1000 == 0: | ||
logger.info(f"Parsed {self.report.num_queries_parsed} view ddl") | ||
|
||
yield from self.gen_lineage_from_query( | ||
query=view_definition, default_database=db_name, is_view_ddl=True | ||
) | ||
|
||
def get_audit_log_mcps(self) -> Iterable[MetadataWorkUnit]: | ||
engine = self.get_metadata_engine() | ||
for entry in engine.execute( | ||
|
@@ -192,27 +202,43 @@ def get_audit_log_mcps(self) -> Iterable[MetadataWorkUnit]: | |
if self.report.num_queries_parsed % 1000 == 0: | ||
logger.info(f"Parsed {self.report.num_queries_parsed} queries") | ||
|
||
result = sqlglot_lineage( | ||
sql=entry.query, | ||
schema_resolver=self.schema_resolver, | ||
default_db=None, | ||
default_schema=entry.default_database | ||
if entry.default_database | ||
else self.config.default_db, | ||
yield from self.gen_lineage_from_query( | ||
query=entry.query, | ||
default_database=entry.default_database, | ||
timestamp=entry.timestamp, | ||
user=entry.user, | ||
is_view_ddl=False, | ||
) | ||
if result.debug_info.table_error: | ||
logger.debug( | ||
f"Error parsing table lineage, {result.debug_info.table_error}" | ||
) | ||
self.report.num_table_parse_failures += 1 | ||
continue | ||
|
||
def gen_lineage_from_query( | ||
self, | ||
query: str, | ||
default_database: Optional[str] = None, | ||
timestamp: Optional[datetime] = None, | ||
user: Optional[str] = None, | ||
is_view_ddl: bool = False, | ||
) -> Iterable[MetadataWorkUnit]: | ||
result = sqlglot_lineage( | ||
sql=query, | ||
schema_resolver=self.schema_resolver, | ||
default_db=None, | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We should set this right? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Nvm, seems good |
||
default_schema=default_database | ||
if default_database | ||
else self.config.default_db, | ||
) | ||
if result.debug_info.table_error: | ||
logger.debug( | ||
f"Error parsing table lineage, {result.debug_info.table_error}" | ||
) | ||
self.report.num_table_parse_failures += 1 | ||
else: | ||
yield from self.builder.process_sql_parsing_result( | ||
result, | ||
query=entry.query, | ||
query_timestamp=entry.timestamp, | ||
user=f"urn:li:corpuser:{entry.user}", | ||
include_urns=self.urns, | ||
query=query, | ||
is_view_ddl=is_view_ddl, | ||
query_timestamp=timestamp, | ||
user=f"urn:li:corpuser:{user}", | ||
include_urns=self.schema_resolver.get_urns(), | ||
) | ||
|
||
def get_metadata_engine(self) -> Engine: | ||
|
@@ -221,8 +247,34 @@ def get_metadata_engine(self) -> Engine: | |
return create_engine(url, **self.config.options) | ||
|
||
def get_workunits_internal(self) -> Iterable[Union[MetadataWorkUnit, SqlWorkUnit]]: | ||
yield from super().get_workunits_internal() | ||
# Add all schemas to the schema resolver | ||
for wu in super().get_workunits_internal(): | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Can prob make a helper for this in the future |
||
if isinstance(wu.metadata, MetadataChangeEventClass): | ||
if wu.metadata.proposedSnapshot: | ||
for aspect in wu.metadata.proposedSnapshot.aspects: | ||
if isinstance(aspect, SchemaMetadataClass): | ||
self.schema_resolver.add_schema_metadata( | ||
wu.metadata.proposedSnapshot.urn, | ||
aspect, | ||
) | ||
break | ||
if isinstance(wu.metadata, MetadataChangeProposalWrapper): | ||
if ( | ||
wu.metadata.entityUrn | ||
and isinstance(wu.metadata.aspect, ViewPropertiesClass) | ||
and wu.metadata.aspect.viewLogic | ||
): | ||
self._view_definition_cache[ | ||
wu.metadata.entityUrn | ||
] = wu.metadata.aspect.viewLogic | ||
Comment on lines
+252
to
+269
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I have a method |
||
yield wu | ||
|
||
if self.config.include_view_lineage: | ||
self.report.report_ingestion_stage_start("view lineage extraction") | ||
yield from self.get_view_lineage() | ||
|
||
if self.config.include_table_lineage or self.config.include_usage_statistics: | ||
self.report.report_ingestion_stage_start("audit log extraction") | ||
yield from self.get_audit_log_mcps() | ||
yield from self.builder.gen_workunits() | ||
|
||
yield from self.builder.gen_workunits() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If we want to be safe we should truncate this before putting in the cache, in case it's absurdly large... but I think this is fine for now
Actually, teradata has that query size cap right? Should be safe then