Skip to content

Commit

Permalink
feat(ingest): tableau - improve lineage, workbooks query, fix paginat…
Browse files Browse the repository at this point in the history
…ion (#5756)
  • Loading branch information
mayurinehate authored Sep 6, 2022
1 parent 7e16ce0 commit a8c1397
Show file tree
Hide file tree
Showing 7 changed files with 24,272 additions and 42,375 deletions.
93 changes: 79 additions & 14 deletions metadata-ingestion/src/datahub/ingestion/source/tableau.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,8 @@
)

import datahub.emitter.mce_builder as builder
from datahub.configuration.common import ConfigModel, ConfigurationError
from datahub.configuration.common import ConfigurationError
from datahub.configuration.source_common import DatasetLineageProviderConfigBase
from datahub.emitter.mcp import MetadataChangeProposalWrapper
from datahub.emitter.mcp_builder import (
PlatformKey,
Expand All @@ -36,8 +37,10 @@
from datahub.ingestion.source.tableau_common import (
FIELD_TYPE_MAPPING,
MetadataQueryException,
TableauLineageOverrides,
clean_query,
custom_sql_graphql_query,
embedded_datasource_graphql_query,
get_field_value_in_sheet,
get_tags_from_params,
get_unique_custom_sql,
Expand Down Expand Up @@ -89,7 +92,7 @@
REPLACE_SLASH_CHAR = "|"


class TableauConfig(ConfigModel):
class TableauConfig(DatasetLineageProviderConfigBase):
connect_uri: str = Field(description="Tableau host URL.")
username: Optional[str] = Field(
default=None,
Expand Down Expand Up @@ -147,6 +150,11 @@ class TableauConfig(ConfigModel):
description="Environment to use in namespace when constructing URNs.",
)

lineage_overrides: Optional[TableauLineageOverrides] = Field(
default=None,
description="Mappings to change generated dataset urns. Use only if you really know what you are doing.",
)

@validator("connect_uri")
def remove_trailing_slash(cls, v):
return config_clean.remove_trailing_slashes(v)
Expand Down Expand Up @@ -210,6 +218,10 @@ def __init__(
self.config = config
self.report = SourceReport()
self.server = None

# This list keeps track of embedded datasources in workbooks so that we retrieve those
# when emitting embedded data sources.
self.embedded_datasource_ids_being_used: List[str] = []
# This list keeps track of datasource being actively used by workbooks so that we only retrieve those
# when emitting published data sources.
self.datasource_ids_being_used: List[str] = []
Expand Down Expand Up @@ -329,8 +341,8 @@ def emit_workbooks(self) -> Iterable[MetadataWorkUnit]:
yield from self.emit_workbook_as_container(workbook)
yield from self.emit_sheets_as_charts(workbook)
yield from self.emit_dashboards(workbook)
yield from self.emit_embedded_datasource(workbook)
yield from self.emit_upstream_tables()
for ds in workbook.get("embeddedDatasources", []):
self.embedded_datasource_ids_being_used.append(ds["id"])

def _track_custom_sql_ids(self, field: dict) -> None:
# Tableau shows custom sql datasource as a table in ColumnField.
Expand Down Expand Up @@ -419,12 +431,15 @@ def _create_upstream_table_lineage(
f"Omitting schema for upstream table {table['id']}, schema included in table name"
)
schema = ""

table_urn = make_table_urn(
self.config.env,
upstream_db,
table.get("connectionType", ""),
schema,
table_name,
self.config.platform_instance_map,
self.config.lineage_overrides,
)

upstream_table = UpstreamClass(
Expand All @@ -447,7 +462,7 @@ def _create_upstream_table_lineage(
return upstream_tables

def emit_custom_sql_datasources(self) -> Iterable[MetadataWorkUnit]:
count_on_query = len(self.custom_sql_ids_being_used)
count_on_query = self.config.page_size
custom_sql_filter = f"idWithin: {json.dumps(self.custom_sql_ids_being_used)}"
custom_sql_connection, total_count, has_next_page = self.get_connection_object(
custom_sql_graphql_query, "customSQLTablesConnection", custom_sql_filter
Expand Down Expand Up @@ -509,11 +524,14 @@ def emit_custom_sql_datasources(self) -> Iterable[MetadataWorkUnit]:
and datasource.get("workbook").get("name")
else None
)
yield from add_entity_to_container(
workunits = add_entity_to_container(
self.gen_workbook_key(datasource["workbook"]),
"dataset",
dataset_snapshot.urn,
)
for wu in workunits:
self.report.report_workunit(wu)
yield wu
project = self._get_project(datasource)

# lineage from custom sql -> datasets/tables #
Expand Down Expand Up @@ -555,7 +573,7 @@ def emit_custom_sql_datasources(self) -> Iterable[MetadataWorkUnit]:
yield self.get_metadata_change_proposal(
dataset_snapshot.urn,
aspect_name="subTypes",
aspect=SubTypesClass(typeNames=["View", "Custom SQL"]),
aspect=SubTypesClass(typeNames=["view", "Custom SQL"]),
)

def get_schema_metadata_for_custom_sql(
Expand Down Expand Up @@ -818,12 +836,15 @@ def emit_datasource(
)

if is_embedded_ds:
yield from add_entity_to_container(
workunits = add_entity_to_container(
self.gen_workbook_key(workbook), "dataset", dataset_snapshot.urn
)
for wu in workunits:
self.report.report_workunit(wu)
yield wu

def emit_published_datasources(self) -> Iterable[MetadataWorkUnit]:
count_on_query = len(self.datasource_ids_being_used)
count_on_query = self.config.page_size
datasource_filter = f"idWithin: {json.dumps(self.datasource_ids_being_used)}"
(
published_datasource_conn,
Expand Down Expand Up @@ -1007,9 +1028,12 @@ def emit_sheets_as_charts(self, workbook: Dict) -> Iterable[MetadataWorkUnit]:

yield self.get_metadata_change_event(chart_snapshot)

yield from add_entity_to_container(
workunits = add_entity_to_container(
self.gen_workbook_key(workbook), "chart", chart_snapshot.urn
)
for wu in workunits:
self.report.report_workunit(wu)
yield wu

def emit_workbook_as_container(self, workbook: Dict) -> Iterable[MetadataWorkUnit]:

Expand Down Expand Up @@ -1115,13 +1139,51 @@ def emit_dashboards(self, workbook: Dict) -> Iterable[MetadataWorkUnit]:

yield self.get_metadata_change_event(dashboard_snapshot)

yield from add_entity_to_container(
workunits = add_entity_to_container(
self.gen_workbook_key(workbook), "dashboard", dashboard_snapshot.urn
)
for wu in workunits:
self.report.report_workunit(wu)
yield wu

def emit_embedded_datasources(self) -> Iterable[MetadataWorkUnit]:
count_on_query = self.config.page_size
datasource_filter = (
f"idWithin: {json.dumps(self.embedded_datasource_ids_being_used)}"
)
(
embedded_datasource_conn,
total_count,
has_next_page,
) = self.get_connection_object(
embedded_datasource_graphql_query,
"embeddedDatasourcesConnection",
datasource_filter,
)
current_count = 0
while has_next_page:
count = (
count_on_query
if current_count + count_on_query < total_count
else total_count - current_count
)
(
embedded_datasource_conn,
total_count,
has_next_page,
) = self.get_connection_object(
embedded_datasource_graphql_query,
"embeddedDatasourcesConnection",
datasource_filter,
count,
current_count,
)

def emit_embedded_datasource(self, workbook: Dict) -> Iterable[MetadataWorkUnit]:
for datasource in workbook.get("embeddedDatasources", []):
yield from self.emit_datasource(datasource, workbook, is_embedded_ds=True)
current_count += count
for datasource in embedded_datasource_conn.get("nodes", []):
yield from self.emit_datasource(
datasource, datasource.get("workbook"), is_embedded_ds=True
)

@lru_cache(maxsize=None)
def _get_schema(self, schema_provided: str, database: str, fullName: str) -> str:
Expand Down Expand Up @@ -1194,10 +1256,13 @@ def get_workunits(self) -> Iterable[MetadataWorkUnit]:
return
try:
yield from self.emit_workbooks()
if self.embedded_datasource_ids_being_used:
yield from self.emit_embedded_datasources()
if self.datasource_ids_being_used:
yield from self.emit_published_datasources()
if self.custom_sql_ids_being_used:
yield from self.emit_custom_sql_datasources()
yield from self.emit_upstream_tables()
except MetadataQueryException as md_exception:
self.report.report_failure(
key="tableau-metadata",
Expand Down
Loading

0 comments on commit a8c1397

Please sign in to comment.