Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(ingest): improve tableau lineage, workbooks query, fix pagination #5756

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
93 changes: 79 additions & 14 deletions metadata-ingestion/src/datahub/ingestion/source/tableau.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,8 @@
)

import datahub.emitter.mce_builder as builder
from datahub.configuration.common import ConfigModel, ConfigurationError
from datahub.configuration.common import ConfigurationError
from datahub.configuration.source_common import DatasetLineageProviderConfigBase
from datahub.emitter.mcp import MetadataChangeProposalWrapper
from datahub.emitter.mcp_builder import (
PlatformKey,
Expand All @@ -36,8 +37,10 @@
from datahub.ingestion.source.tableau_common import (
FIELD_TYPE_MAPPING,
MetadataQueryException,
TableauLineageOverrides,
clean_query,
custom_sql_graphql_query,
embedded_datasource_graphql_query,
get_field_value_in_sheet,
get_tags_from_params,
get_unique_custom_sql,
Expand Down Expand Up @@ -89,7 +92,7 @@
REPLACE_SLASH_CHAR = "|"


class TableauConfig(ConfigModel):
class TableauConfig(DatasetLineageProviderConfigBase):
connect_uri: str = Field(description="Tableau host URL.")
username: Optional[str] = Field(
default=None,
Expand Down Expand Up @@ -147,6 +150,11 @@ class TableauConfig(ConfigModel):
description="Environment to use in namespace when constructing URNs.",
)

lineage_overrides: Optional[TableauLineageOverrides] = Field(
default=None,
description="Mappings to change generated dataset urns. Use only if you really know what you are doing.",
)

@validator("connect_uri")
def remove_trailing_slash(cls, v):
return config_clean.remove_trailing_slashes(v)
Expand Down Expand Up @@ -210,6 +218,10 @@ def __init__(
self.config = config
self.report = SourceReport()
self.server = None

# This list keeps track of embedded datasources in workbooks so that we retrieve those
# when emitting embedded data sources.
self.embedded_datasource_ids_being_used: List[str] = []
# This list keeps track of datasource being actively used by workbooks so that we only retrieve those
# when emitting published data sources.
self.datasource_ids_being_used: List[str] = []
Expand Down Expand Up @@ -329,8 +341,8 @@ def emit_workbooks(self) -> Iterable[MetadataWorkUnit]:
yield from self.emit_workbook_as_container(workbook)
yield from self.emit_sheets_as_charts(workbook)
yield from self.emit_dashboards(workbook)
yield from self.emit_embedded_datasource(workbook)
yield from self.emit_upstream_tables()
for ds in workbook.get("embeddedDatasources", []):
self.embedded_datasource_ids_being_used.append(ds["id"])

def _track_custom_sql_ids(self, field: dict) -> None:
# Tableau shows custom sql datasource as a table in ColumnField.
Expand Down Expand Up @@ -419,12 +431,15 @@ def _create_upstream_table_lineage(
f"Omitting schema for upstream table {table['id']}, schema included in table name"
)
schema = ""

table_urn = make_table_urn(
self.config.env,
upstream_db,
table.get("connectionType", ""),
schema,
table_name,
self.config.platform_instance_map,
self.config.lineage_overrides,
)

upstream_table = UpstreamClass(
Expand All @@ -447,7 +462,7 @@ def _create_upstream_table_lineage(
return upstream_tables

def emit_custom_sql_datasources(self) -> Iterable[MetadataWorkUnit]:
count_on_query = len(self.custom_sql_ids_being_used)
count_on_query = self.config.page_size
custom_sql_filter = f"idWithin: {json.dumps(self.custom_sql_ids_being_used)}"
custom_sql_connection, total_count, has_next_page = self.get_connection_object(
custom_sql_graphql_query, "customSQLTablesConnection", custom_sql_filter
Expand Down Expand Up @@ -509,11 +524,14 @@ def emit_custom_sql_datasources(self) -> Iterable[MetadataWorkUnit]:
and datasource.get("workbook").get("name")
else None
)
yield from add_entity_to_container(
workunits = add_entity_to_container(
self.gen_workbook_key(datasource["workbook"]),
"dataset",
dataset_snapshot.urn,
)
for wu in workunits:
self.report.report_workunit(wu)
yield wu
project = self._get_project(datasource)

# lineage from custom sql -> datasets/tables #
Expand Down Expand Up @@ -555,7 +573,7 @@ def emit_custom_sql_datasources(self) -> Iterable[MetadataWorkUnit]:
yield self.get_metadata_change_proposal(
dataset_snapshot.urn,
aspect_name="subTypes",
aspect=SubTypesClass(typeNames=["View", "Custom SQL"]),
aspect=SubTypesClass(typeNames=["view", "Custom SQL"]),
)

def get_schema_metadata_for_custom_sql(
Expand Down Expand Up @@ -818,12 +836,15 @@ def emit_datasource(
)

if is_embedded_ds:
yield from add_entity_to_container(
workunits = add_entity_to_container(
self.gen_workbook_key(workbook), "dataset", dataset_snapshot.urn
)
for wu in workunits:
self.report.report_workunit(wu)
yield wu

def emit_published_datasources(self) -> Iterable[MetadataWorkUnit]:
count_on_query = len(self.datasource_ids_being_used)
count_on_query = self.config.page_size
datasource_filter = f"idWithin: {json.dumps(self.datasource_ids_being_used)}"
(
published_datasource_conn,
Expand Down Expand Up @@ -1007,9 +1028,12 @@ def emit_sheets_as_charts(self, workbook: Dict) -> Iterable[MetadataWorkUnit]:

yield self.get_metadata_change_event(chart_snapshot)

yield from add_entity_to_container(
workunits = add_entity_to_container(
self.gen_workbook_key(workbook), "chart", chart_snapshot.urn
)
for wu in workunits:
self.report.report_workunit(wu)
yield wu

def emit_workbook_as_container(self, workbook: Dict) -> Iterable[MetadataWorkUnit]:

Expand Down Expand Up @@ -1115,13 +1139,51 @@ def emit_dashboards(self, workbook: Dict) -> Iterable[MetadataWorkUnit]:

yield self.get_metadata_change_event(dashboard_snapshot)

yield from add_entity_to_container(
workunits = add_entity_to_container(
self.gen_workbook_key(workbook), "dashboard", dashboard_snapshot.urn
)
for wu in workunits:
self.report.report_workunit(wu)
yield wu

def emit_embedded_datasources(self) -> Iterable[MetadataWorkUnit]:
count_on_query = self.config.page_size
datasource_filter = (
f"idWithin: {json.dumps(self.embedded_datasource_ids_being_used)}"
)
(
embedded_datasource_conn,
total_count,
has_next_page,
) = self.get_connection_object(
embedded_datasource_graphql_query,
"embeddedDatasourcesConnection",
datasource_filter,
)
current_count = 0
while has_next_page:
count = (
count_on_query
if current_count + count_on_query < total_count
else total_count - current_count
)
(
embedded_datasource_conn,
total_count,
has_next_page,
) = self.get_connection_object(
embedded_datasource_graphql_query,
"embeddedDatasourcesConnection",
datasource_filter,
count,
current_count,
)

def emit_embedded_datasource(self, workbook: Dict) -> Iterable[MetadataWorkUnit]:
for datasource in workbook.get("embeddedDatasources", []):
yield from self.emit_datasource(datasource, workbook, is_embedded_ds=True)
current_count += count
for datasource in embedded_datasource_conn.get("nodes", []):
yield from self.emit_datasource(
datasource, datasource.get("workbook"), is_embedded_ds=True
)

@lru_cache(maxsize=None)
def _get_schema(self, schema_provided: str, database: str, fullName: str) -> str:
Expand Down Expand Up @@ -1194,10 +1256,13 @@ def get_workunits(self) -> Iterable[MetadataWorkUnit]:
return
try:
yield from self.emit_workbooks()
if self.embedded_datasource_ids_being_used:
yield from self.emit_embedded_datasources()
if self.datasource_ids_being_used:
yield from self.emit_published_datasources()
if self.custom_sql_ids_being_used:
yield from self.emit_custom_sql_datasources()
yield from self.emit_upstream_tables()
except MetadataQueryException as md_exception:
self.report.report_failure(
key="tableau-metadata",
Expand Down
Loading