Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(ingest/teradata): view parsing #9005

Merged
merged 15 commits into from
Oct 13, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion metadata-ingestion/docs/sources/teradata/teradata_pre.md
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@

If you want to run profiling, you need to grant select permission on all the tables you want to profile.

3. If linege or usage extraction is enabled, please, check if query logging is enabled and it is set to size which
3. If lineage or usage extraction is enabled, please, check if query logging is enabled and it is set to size which
will fit for your queries (the default query text size Teradata captures is max 200 chars)
An example how you can set it for all users:
```sql
Expand Down
3 changes: 1 addition & 2 deletions metadata-ingestion/docs/sources/teradata/teradata_recipe.yml
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,11 @@ source:
type: teradata
config:
host_port: "myteradatainstance.teradata.com:1025"
#platform_instance: "myteradatainstance"
username: myuser
password: mypassword
#database_pattern:
# allow:
# - "demo_user"
# - "my_database"
# ignoreCase: true
include_table_lineage: true
include_usage_statistics: true
Expand Down
156 changes: 104 additions & 52 deletions metadata-ingestion/src/datahub/ingestion/source/sql/teradata.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import logging
from dataclasses import dataclass
from datetime import datetime
from typing import Iterable, Optional, Set, Union

# This import verifies that the dependencies are available.
Expand All @@ -11,6 +12,7 @@

from datahub.configuration.common import AllowDenyPattern
from datahub.configuration.time_window_config import BaseTimeWindowConfig
from datahub.emitter.mcp import MetadataChangeProposalWrapper
from datahub.emitter.sql_parsing_builder import SqlParsingBuilder
from datahub.ingestion.api.common import PipelineContext
from datahub.ingestion.api.decorators import (
Expand All @@ -32,11 +34,18 @@
from datahub.ingestion.source.usage.usage_common import BaseUsageConfig
from datahub.ingestion.source_report.ingestion_stage import IngestionStageReport
from datahub.ingestion.source_report.time_window import BaseTimeWindowReport
from datahub.metadata._schema_classes import (
MetadataChangeEventClass,
SchemaMetadataClass,
ViewPropertiesClass,
)
from datahub.metadata.com.linkedin.pegasus2avro.schema import (
BytesTypeClass,
TimeTypeClass,
)
from datahub.utilities.file_backed_collections import FileBackedDict
from datahub.utilities.sqlglot_lineage import SchemaResolver, sqlglot_lineage
from datahub.utilities.urns.dataset_urn import DatasetUrn

logger: logging.Logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -64,6 +73,7 @@
@dataclass
class TeradataReport(ProfilingSqlReport, IngestionStageReport, BaseTimeWindowReport):
num_queries_parsed: int = 0
num_view_ddl_parsed: int = 0
num_table_parse_failures: int = 0


Expand All @@ -82,17 +92,16 @@ class TeradataConfig(BaseTeradataConfig, BaseTimeWindowConfig):
"This requires to have the table lineage feature enabled.",
)

include_view_lineage = Field(
default=True,
description="Whether to include view lineage in the ingestion. "
"This requires to have the view lineage feature enabled.",
)
usage: BaseUsageConfig = Field(
description="The usage config to use when generating usage statistics",
default=BaseUsageConfig(),
)

use_schema_resolver: bool = Field(
default=True,
description="Read SchemaMetadata aspects from DataHub to aid in SQL parsing. Turn off only for testing.",
hidden_from_docs=True,
)

default_db: Optional[str] = Field(
default=None,
description="The default database to use for unqualified table names",
Expand Down Expand Up @@ -141,46 +150,47 @@ def __init__(self, config: TeradataConfig, ctx: PipelineContext):
self.report: TeradataReport = TeradataReport()
self.graph: Optional[DataHubGraph] = ctx.graph

if self.graph:
if self.config.use_schema_resolver:
self.schema_resolver = (
self.graph.initialize_schema_resolver_from_datahub(
platform=self.platform,
platform_instance=self.config.platform_instance,
env=self.config.env,
)
)
self.urns = self.schema_resolver.get_urns()
else:
self.schema_resolver = self.graph._make_schema_resolver(
platform=self.platform,
platform_instance=self.config.platform_instance,
env=self.config.env,
)
self.urns = None
else:
self.schema_resolver = SchemaResolver(
platform=self.platform,
platform_instance=self.config.platform_instance,
graph=None,
env=self.config.env,
)
self.urns = None

self.builder: SqlParsingBuilder = SqlParsingBuilder(
usage_config=self.config.usage
if self.config.include_usage_statistics
else None,
generate_lineage=self.config.include_table_lineage,
generate_lineage=True,
generate_usage_statistics=self.config.include_usage_statistics,
generate_operations=self.config.usage.include_operational_stats,
)

self.schema_resolver = SchemaResolver(
platform=self.platform,
platform_instance=self.config.platform_instance,
graph=None,
env=self.config.env,
)

self._view_definition_cache: FileBackedDict[str] = FileBackedDict()

@classmethod
def create(cls, config_dict, ctx):
config = TeradataConfig.parse_obj(config_dict)
return cls(config, ctx)

def get_view_lineage(self) -> Iterable[MetadataWorkUnit]:
for key in self._view_definition_cache.keys():
view_definition = self._view_definition_cache[key]
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we want to be safe we should truncate this before putting in the cache, in case it's absurdly large... but I think this is fine for now

Actually, teradata has that query size cap right? Should be safe then

dataset_urn = DatasetUrn.create_from_string(key)

db_name: Optional[str] = None
# We need to get the default db from the dataset urn otherwise the builder generates the wrong urns
if "." in dataset_urn.get_dataset_name():
db_name = dataset_urn.get_dataset_name().split(".", 1)[0]

self.report.num_view_ddl_parsed += 1
if self.report.num_view_ddl_parsed % 1000 == 0:
logger.info(f"Parsed {self.report.num_queries_parsed} view ddl")

yield from self.gen_lineage_from_query(
query=view_definition, default_database=db_name, is_view_ddl=True
)

def get_audit_log_mcps(self) -> Iterable[MetadataWorkUnit]:
engine = self.get_metadata_engine()
for entry in engine.execute(
Expand All @@ -192,27 +202,43 @@ def get_audit_log_mcps(self) -> Iterable[MetadataWorkUnit]:
if self.report.num_queries_parsed % 1000 == 0:
logger.info(f"Parsed {self.report.num_queries_parsed} queries")

result = sqlglot_lineage(
sql=entry.query,
schema_resolver=self.schema_resolver,
default_db=None,
default_schema=entry.default_database
if entry.default_database
else self.config.default_db,
yield from self.gen_lineage_from_query(
query=entry.query,
default_database=entry.default_database,
timestamp=entry.timestamp,
user=entry.user,
is_view_ddl=False,
)
if result.debug_info.table_error:
logger.debug(
f"Error parsing table lineage, {result.debug_info.table_error}"
)
self.report.num_table_parse_failures += 1
continue

def gen_lineage_from_query(
self,
query: str,
default_database: Optional[str] = None,
timestamp: Optional[datetime] = None,
user: Optional[str] = None,
is_view_ddl: bool = False,
) -> Iterable[MetadataWorkUnit]:
result = sqlglot_lineage(
sql=query,
schema_resolver=self.schema_resolver,
default_db=None,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should set this right?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nvm, seems good

default_schema=default_database
if default_database
else self.config.default_db,
)
if result.debug_info.table_error:
logger.debug(
f"Error parsing table lineage, {result.debug_info.table_error}"
)
self.report.num_table_parse_failures += 1
else:
yield from self.builder.process_sql_parsing_result(
result,
query=entry.query,
query_timestamp=entry.timestamp,
user=f"urn:li:corpuser:{entry.user}",
include_urns=self.urns,
query=query,
is_view_ddl=is_view_ddl,
query_timestamp=timestamp,
user=f"urn:li:corpuser:{user}",
include_urns=self.schema_resolver.get_urns(),
)

def get_metadata_engine(self) -> Engine:
Expand All @@ -221,8 +247,34 @@ def get_metadata_engine(self) -> Engine:
return create_engine(url, **self.config.options)

def get_workunits_internal(self) -> Iterable[Union[MetadataWorkUnit, SqlWorkUnit]]:
yield from super().get_workunits_internal()
# Add all schemas to the schema resolver
for wu in super().get_workunits_internal():
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can prob make a helper for this in the future

if isinstance(wu.metadata, MetadataChangeEventClass):
if wu.metadata.proposedSnapshot:
for aspect in wu.metadata.proposedSnapshot.aspects:
if isinstance(aspect, SchemaMetadataClass):
self.schema_resolver.add_schema_metadata(
wu.metadata.proposedSnapshot.urn,
aspect,
)
break
if isinstance(wu.metadata, MetadataChangeProposalWrapper):
if (
wu.metadata.entityUrn
and isinstance(wu.metadata.aspect, ViewPropertiesClass)
and wu.metadata.aspect.viewLogic
):
self._view_definition_cache[
wu.metadata.entityUrn
] = wu.metadata.aspect.viewLogic
Comment on lines +252 to +269
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have a method wu.get_aspect_of_type that we can use in the future

yield wu

if self.config.include_view_lineage:
self.report.report_ingestion_stage_start("view lineage extraction")
yield from self.get_view_lineage()

if self.config.include_table_lineage or self.config.include_usage_statistics:
self.report.report_ingestion_stage_start("audit log extraction")
yield from self.get_audit_log_mcps()
yield from self.builder.gen_workunits()

yield from self.builder.gen_workunits()