Skip to content

Commit

Permalink
feat(ingest/teradata): view parsing (datahub-project#9005)
Browse files Browse the repository at this point in the history
  • Loading branch information
treff7es authored Oct 13, 2023
1 parent 6bc7425 commit 1007204
Show file tree
Hide file tree
Showing 3 changed files with 106 additions and 55 deletions.
2 changes: 1 addition & 1 deletion metadata-ingestion/docs/sources/teradata/teradata_pre.md
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@

If you want to run profiling, you need to grant select permission on all the tables you want to profile.

3. If linege or usage extraction is enabled, please, check if query logging is enabled and it is set to size which
3. If lineage or usage extraction is enabled, please, check if query logging is enabled and it is set to size which
will fit for your queries (the default query text size Teradata captures is max 200 chars)
An example how you can set it for all users:
```sql
Expand Down
3 changes: 1 addition & 2 deletions metadata-ingestion/docs/sources/teradata/teradata_recipe.yml
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,11 @@ source:
type: teradata
config:
host_port: "myteradatainstance.teradata.com:1025"
#platform_instance: "myteradatainstance"
username: myuser
password: mypassword
#database_pattern:
# allow:
# - "demo_user"
# - "my_database"
# ignoreCase: true
include_table_lineage: true
include_usage_statistics: true
Expand Down
156 changes: 104 additions & 52 deletions metadata-ingestion/src/datahub/ingestion/source/sql/teradata.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import logging
from dataclasses import dataclass
from datetime import datetime
from typing import Iterable, Optional, Set, Union

# This import verifies that the dependencies are available.
Expand All @@ -11,6 +12,7 @@

from datahub.configuration.common import AllowDenyPattern
from datahub.configuration.time_window_config import BaseTimeWindowConfig
from datahub.emitter.mcp import MetadataChangeProposalWrapper
from datahub.emitter.sql_parsing_builder import SqlParsingBuilder
from datahub.ingestion.api.common import PipelineContext
from datahub.ingestion.api.decorators import (
Expand All @@ -32,11 +34,18 @@
from datahub.ingestion.source.usage.usage_common import BaseUsageConfig
from datahub.ingestion.source_report.ingestion_stage import IngestionStageReport
from datahub.ingestion.source_report.time_window import BaseTimeWindowReport
from datahub.metadata._schema_classes import (
MetadataChangeEventClass,
SchemaMetadataClass,
ViewPropertiesClass,
)
from datahub.metadata.com.linkedin.pegasus2avro.schema import (
BytesTypeClass,
TimeTypeClass,
)
from datahub.utilities.file_backed_collections import FileBackedDict
from datahub.utilities.sqlglot_lineage import SchemaResolver, sqlglot_lineage
from datahub.utilities.urns.dataset_urn import DatasetUrn

logger: logging.Logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -64,6 +73,7 @@
@dataclass
class TeradataReport(ProfilingSqlReport, IngestionStageReport, BaseTimeWindowReport):
num_queries_parsed: int = 0
num_view_ddl_parsed: int = 0
num_table_parse_failures: int = 0


Expand All @@ -82,17 +92,16 @@ class TeradataConfig(BaseTeradataConfig, BaseTimeWindowConfig):
"This requires to have the table lineage feature enabled.",
)

include_view_lineage = Field(
default=True,
description="Whether to include view lineage in the ingestion. "
"This requires to have the view lineage feature enabled.",
)
usage: BaseUsageConfig = Field(
description="The usage config to use when generating usage statistics",
default=BaseUsageConfig(),
)

use_schema_resolver: bool = Field(
default=True,
description="Read SchemaMetadata aspects from DataHub to aid in SQL parsing. Turn off only for testing.",
hidden_from_docs=True,
)

default_db: Optional[str] = Field(
default=None,
description="The default database to use for unqualified table names",
Expand Down Expand Up @@ -141,46 +150,47 @@ def __init__(self, config: TeradataConfig, ctx: PipelineContext):
self.report: TeradataReport = TeradataReport()
self.graph: Optional[DataHubGraph] = ctx.graph

if self.graph:
if self.config.use_schema_resolver:
self.schema_resolver = (
self.graph.initialize_schema_resolver_from_datahub(
platform=self.platform,
platform_instance=self.config.platform_instance,
env=self.config.env,
)
)
self.urns = self.schema_resolver.get_urns()
else:
self.schema_resolver = self.graph._make_schema_resolver(
platform=self.platform,
platform_instance=self.config.platform_instance,
env=self.config.env,
)
self.urns = None
else:
self.schema_resolver = SchemaResolver(
platform=self.platform,
platform_instance=self.config.platform_instance,
graph=None,
env=self.config.env,
)
self.urns = None

self.builder: SqlParsingBuilder = SqlParsingBuilder(
usage_config=self.config.usage
if self.config.include_usage_statistics
else None,
generate_lineage=self.config.include_table_lineage,
generate_lineage=True,
generate_usage_statistics=self.config.include_usage_statistics,
generate_operations=self.config.usage.include_operational_stats,
)

self.schema_resolver = SchemaResolver(
platform=self.platform,
platform_instance=self.config.platform_instance,
graph=None,
env=self.config.env,
)

self._view_definition_cache: FileBackedDict[str] = FileBackedDict()

@classmethod
def create(cls, config_dict, ctx):
config = TeradataConfig.parse_obj(config_dict)
return cls(config, ctx)

def get_view_lineage(self) -> Iterable[MetadataWorkUnit]:
for key in self._view_definition_cache.keys():
view_definition = self._view_definition_cache[key]
dataset_urn = DatasetUrn.create_from_string(key)

db_name: Optional[str] = None
# We need to get the default db from the dataset urn otherwise the builder generates the wrong urns
if "." in dataset_urn.get_dataset_name():
db_name = dataset_urn.get_dataset_name().split(".", 1)[0]

self.report.num_view_ddl_parsed += 1
if self.report.num_view_ddl_parsed % 1000 == 0:
logger.info(f"Parsed {self.report.num_queries_parsed} view ddl")

yield from self.gen_lineage_from_query(
query=view_definition, default_database=db_name, is_view_ddl=True
)

def get_audit_log_mcps(self) -> Iterable[MetadataWorkUnit]:
engine = self.get_metadata_engine()
for entry in engine.execute(
Expand All @@ -192,27 +202,43 @@ def get_audit_log_mcps(self) -> Iterable[MetadataWorkUnit]:
if self.report.num_queries_parsed % 1000 == 0:
logger.info(f"Parsed {self.report.num_queries_parsed} queries")

result = sqlglot_lineage(
sql=entry.query,
schema_resolver=self.schema_resolver,
default_db=None,
default_schema=entry.default_database
if entry.default_database
else self.config.default_db,
yield from self.gen_lineage_from_query(
query=entry.query,
default_database=entry.default_database,
timestamp=entry.timestamp,
user=entry.user,
is_view_ddl=False,
)
if result.debug_info.table_error:
logger.debug(
f"Error parsing table lineage, {result.debug_info.table_error}"
)
self.report.num_table_parse_failures += 1
continue

def gen_lineage_from_query(
self,
query: str,
default_database: Optional[str] = None,
timestamp: Optional[datetime] = None,
user: Optional[str] = None,
is_view_ddl: bool = False,
) -> Iterable[MetadataWorkUnit]:
result = sqlglot_lineage(
sql=query,
schema_resolver=self.schema_resolver,
default_db=None,
default_schema=default_database
if default_database
else self.config.default_db,
)
if result.debug_info.table_error:
logger.debug(
f"Error parsing table lineage, {result.debug_info.table_error}"
)
self.report.num_table_parse_failures += 1
else:
yield from self.builder.process_sql_parsing_result(
result,
query=entry.query,
query_timestamp=entry.timestamp,
user=f"urn:li:corpuser:{entry.user}",
include_urns=self.urns,
query=query,
is_view_ddl=is_view_ddl,
query_timestamp=timestamp,
user=f"urn:li:corpuser:{user}",
include_urns=self.schema_resolver.get_urns(),
)

def get_metadata_engine(self) -> Engine:
Expand All @@ -221,8 +247,34 @@ def get_metadata_engine(self) -> Engine:
return create_engine(url, **self.config.options)

def get_workunits_internal(self) -> Iterable[Union[MetadataWorkUnit, SqlWorkUnit]]:
yield from super().get_workunits_internal()
# Add all schemas to the schema resolver
for wu in super().get_workunits_internal():
if isinstance(wu.metadata, MetadataChangeEventClass):
if wu.metadata.proposedSnapshot:
for aspect in wu.metadata.proposedSnapshot.aspects:
if isinstance(aspect, SchemaMetadataClass):
self.schema_resolver.add_schema_metadata(
wu.metadata.proposedSnapshot.urn,
aspect,
)
break
if isinstance(wu.metadata, MetadataChangeProposalWrapper):
if (
wu.metadata.entityUrn
and isinstance(wu.metadata.aspect, ViewPropertiesClass)
and wu.metadata.aspect.viewLogic
):
self._view_definition_cache[
wu.metadata.entityUrn
] = wu.metadata.aspect.viewLogic
yield wu

if self.config.include_view_lineage:
self.report.report_ingestion_stage_start("view lineage extraction")
yield from self.get_view_lineage()

if self.config.include_table_lineage or self.config.include_usage_statistics:
self.report.report_ingestion_stage_start("audit log extraction")
yield from self.get_audit_log_mcps()
yield from self.builder.gen_workunits()

yield from self.builder.gen_workunits()

0 comments on commit 1007204

Please sign in to comment.