Skip to content

Commit

Permalink
feat(ingestion): Tableau stateful ingestion (#6094)
Browse files Browse the repository at this point in the history
  • Loading branch information
amanda-her authored Oct 11, 2022
1 parent 95d1e01 commit ffa838e
Show file tree
Hide file tree
Showing 7 changed files with 9,712 additions and 86 deletions.
18 changes: 18 additions & 0 deletions metadata-ingestion/src/datahub/emitter/mce_builder.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,9 @@
from datahub.metadata.schema_classes import (
AssertionKeyClass,
AuditStampClass,
ChartKeyClass,
ContainerKeyClass,
DashboardKeyClass,
DatasetKeyClass,
DatasetLineageTypeClass,
DatasetSnapshotClass,
Expand Down Expand Up @@ -214,11 +216,27 @@ def make_dashboard_urn(platform: str, name: str) -> str:
return f"urn:li:dashboard:({platform},{name})"


def dashboard_urn_to_key(dashboard_urn: str) -> Optional[DashboardKeyClass]:
pattern = r"urn:li:dashboard:\((.*),(.*)\)"
results = re.search(pattern, dashboard_urn)
if results is not None:
return DashboardKeyClass(dashboardTool=results[1], dashboardId=results[2])
return None


def make_chart_urn(platform: str, name: str) -> str:
# FIXME: charts don't currently include data platform urn prefixes.
return f"urn:li:chart:({platform},{name})"


def chart_urn_to_key(chart_urn: str) -> Optional[ChartKeyClass]:
pattern = r"urn:li:chart:\((.*),(.*)\)"
results = re.search(pattern, chart_urn)
if results is not None:
return ChartKeyClass(dashboardTool=results[1], chartId=results[2])
return None


def make_domain_urn(domain: str) -> str:
if domain.startswith("urn:li:domain:"):
return domain
Expand Down
131 changes: 131 additions & 0 deletions metadata-ingestion/src/datahub/ingestion/source/state/tableau_state.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,131 @@
import logging
from typing import Callable, Dict, Iterable, List

import pydantic

from datahub.emitter.mce_builder import (
chart_urn_to_key,
dashboard_urn_to_key,
make_chart_urn,
make_dashboard_urn,
)
from datahub.ingestion.source.state.stale_entity_removal_handler import (
StaleEntityCheckpointStateBase,
)
from datahub.utilities.checkpoint_state_util import CheckpointStateUtil

logger = logging.getLogger(__name__)


class TableauCheckpointState(StaleEntityCheckpointStateBase["TableauCheckpointState"]):
"""
Class for representing the checkpoint state for Tableau sources.
Stores all datasets, charts and dashboards being ingested and is
used to remove any stale entities.
"""

encoded_dataset_urns: List[str] = pydantic.Field(default_factory=list)
encoded_chart_urns: List[str] = pydantic.Field(default_factory=list)
encoded_dashboard_urns: List[str] = pydantic.Field(default_factory=list)

@classmethod
def get_supported_types(cls) -> List[str]:
return ["dataset", "chart", "dashboard"]

@staticmethod
def _get_dataset_lightweight_repr(dataset_urn: str) -> str:
"""Reduces the amount of text in the URNs for smaller state footprint."""
return CheckpointStateUtil.get_dataset_lightweight_repr(dataset_urn)

@staticmethod
def _get_chart_lightweight_repr(chart_urn: str) -> str:
"""Reduces the amount of text in the URNs for smaller state footprint."""
SEP = CheckpointStateUtil.get_separator()
key = chart_urn_to_key(chart_urn)
assert key is not None
return f"{key.dashboardTool}{SEP}{key.chartId}"

@staticmethod
def _get_dashboard_lightweight_repr(dashboard_urn: str) -> str:
"""Reduces the amount of text in the URNs for smaller state footprint."""
SEP = CheckpointStateUtil.get_separator()
key = dashboard_urn_to_key(dashboard_urn)
assert key is not None
return f"{key.dashboardTool}{SEP}{key.dashboardId}"

def _add_dataset_urn(self, dataset_urn: str) -> None:
self.encoded_dataset_urns.append(
self._get_dataset_lightweight_repr(dataset_urn)
)

def _add_chart_urn(self, chart_urn: str) -> None:
self.encoded_chart_urns.append(self._get_chart_lightweight_repr(chart_urn))

def _add_dashboard_urn(self, dashboard_urn: str) -> None:
self.encoded_dashboard_urns.append(
self._get_dashboard_lightweight_repr(dashboard_urn)
)

def _get_dataset_urns_not_in(
self, checkpoint: "TableauCheckpointState"
) -> Iterable[str]:
yield from CheckpointStateUtil.get_dataset_urns_not_in(
self.encoded_dataset_urns, checkpoint.encoded_dataset_urns
)

def _get_chart_urns_not_in(
self, checkpoint: "TableauCheckpointState"
) -> Iterable[str]:
difference = CheckpointStateUtil.get_encoded_urns_not_in(
self.encoded_chart_urns, checkpoint.encoded_chart_urns
)
for encoded_urn in difference:
platform, name = encoded_urn.split(CheckpointStateUtil.get_separator())
yield make_chart_urn(platform, name)

def _get_dashboard_urns_not_in(
self, checkpoint: "TableauCheckpointState"
) -> Iterable[str]:
difference = CheckpointStateUtil.get_encoded_urns_not_in(
self.encoded_dashboard_urns, checkpoint.encoded_dashboard_urns
)
for encoded_urn in difference:
platform, name = encoded_urn.split(CheckpointStateUtil.get_separator())
yield make_dashboard_urn(platform, name)

def add_checkpoint_urn(self, type: str, urn: str) -> None:
supported_entities_add_handlers: Dict[str, Callable[[str], None]] = {
"dataset": self._add_dataset_urn,
"chart": self._add_chart_urn,
"dashboard": self._add_dashboard_urn,
}

if type not in supported_entities_add_handlers:
logger.error(f"Can not save Unknown entity {type} to checkpoint.")

supported_entities_add_handlers[type](urn)

def get_urns_not_in(
self, type: str, other_checkpoint_state: "TableauCheckpointState"
) -> Iterable[str]:
assert type in self.get_supported_types()
if type == "dataset":
yield from self._get_dataset_urns_not_in(other_checkpoint_state)
elif type == "chart":
yield from self._get_chart_urns_not_in(other_checkpoint_state)
elif type == "dashboard":
yield from self._get_dashboard_urns_not_in(other_checkpoint_state)

def get_percent_entities_changed(
self, old_checkpoint_state: "TableauCheckpointState"
) -> float:
return StaleEntityCheckpointStateBase.compute_percent_entities_changed(
[
(self.encoded_dataset_urns, old_checkpoint_state.encoded_dataset_urns),
(self.encoded_chart_urns, old_checkpoint_state.encoded_chart_urns),
(
self.encoded_dashboard_urns,
old_checkpoint_state.encoded_dashboard_urns,
),
]
)
80 changes: 72 additions & 8 deletions metadata-ingestion/src/datahub/ingestion/source/tableau.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,18 @@
platform_name,
support_status,
)
from datahub.ingestion.api.source import Source, SourceReport
from datahub.ingestion.api.source import Source
from datahub.ingestion.api.workunit import MetadataWorkUnit
from datahub.ingestion.source.state.stale_entity_removal_handler import (
StaleEntityRemovalHandler,
StaleEntityRemovalSourceReport,
StatefulStaleMetadataRemovalConfig,
)
from datahub.ingestion.source.state.stateful_ingestion_base import (
StatefulIngestionConfigBase,
StatefulIngestionSourceBase,
)
from datahub.ingestion.source.state.tableau_state import TableauCheckpointState
from datahub.ingestion.source.tableau_common import (
FIELD_TYPE_MAPPING,
MetadataQueryException,
Expand Down Expand Up @@ -96,6 +106,16 @@
REPLACE_SLASH_CHAR = "|"


class TableauStatefulIngestionConfig(StatefulStaleMetadataRemovalConfig):
"""
Specialization of StatefulStaleMetadataRemovalConfig to adding custom config.
This will be used to override the stateful_ingestion config param of StatefulIngestionConfigBase
in the TableauConfig.
"""

_entity_types: List[str] = Field(default=["dataset", "chart", "dashboard"])


class TableauConnectionConfig(ConfigModel):
connect_uri: str = Field(description="Tableau host URL.")
username: Optional[str] = Field(
Expand Down Expand Up @@ -154,7 +174,11 @@ def make_tableau_client(self) -> Server:
raise ValueError(f"Unable to login: {str(e)}") from e


class TableauConfig(DatasetLineageProviderConfigBase, TableauConnectionConfig):
class TableauConfig(
DatasetLineageProviderConfigBase,
StatefulIngestionConfigBase,
TableauConnectionConfig,
):
projects: Optional[List[str]] = Field(
default=["default"], description="List of projects"
)
Expand Down Expand Up @@ -194,6 +218,10 @@ class TableauConfig(DatasetLineageProviderConfigBase, TableauConnectionConfig):
description="[experimental] Extract usage statistics for dashboards and charts.",
)

stateful_ingestion: Optional[TableauStatefulIngestionConfig] = Field(
default=None, description=""
)


class WorkbookKey(PlatformKey):
workbook_id: str
Expand All @@ -218,13 +246,16 @@ class UsageStat:
SourceCapability.USAGE_STATS,
"Dashboard/Chart view counts, enabled using extract_usage_stats config",
)
@capability(SourceCapability.DELETION_DETECTION, "", supported=False)
@capability(
SourceCapability.DELETION_DETECTION,
"Enabled by default when stateful ingestion is turned on.",
)
@capability(SourceCapability.OWNERSHIP, "Requires recipe configuration")
@capability(SourceCapability.TAGS, "Requires recipe configuration")
@capability(SourceCapability.LINEAGE_COARSE, "Enabled by default")
class TableauSource(Source):
class TableauSource(StatefulIngestionSourceBase):
config: TableauConfig
report: SourceReport
report: StaleEntityRemovalSourceReport
platform = "tableau"
server: Optional[Server]
upstream_tables: Dict[str, Tuple[Any, Optional[str], bool]] = {}
Expand All @@ -238,10 +269,10 @@ def __init__(
config: TableauConfig,
ctx: PipelineContext,
):
super().__init__(ctx)
super().__init__(config, ctx)

self.config = config
self.report = SourceReport()
self.report = StaleEntityRemovalSourceReport()
self.server = None

# This list keeps track of embedded datasources in workbooks so that we retrieve those
Expand All @@ -254,10 +285,20 @@ def __init__(
# when emitting custom SQL data sources.
self.custom_sql_ids_being_used: List[str] = []

# Create and register the stateful ingestion use-case handlers.
self.stale_entity_removal_handler = StaleEntityRemovalHandler(
source=self,
config=self.config,
state_type_class=TableauCheckpointState,
pipeline_name=self.ctx.pipeline_name,
run_id=self.ctx.run_id,
)

self._authenticate()

def close(self) -> None:
if self.server is not None:
self.prepare_for_commit()
self.server.auth.sign_out()

def _populate_usage_stat_registry(self):
Expand Down Expand Up @@ -712,6 +753,24 @@ def get_metadata_change_event(
mce = MetadataChangeEvent(proposedSnapshot=snap_shot)
work_unit = MetadataWorkUnit(id=snap_shot.urn, mce=mce)
self.report.report_workunit(work_unit)

# Add snapshot to the checkpoint state.
entity = None
if type(snap_shot).__name__ == "DatasetSnapshotClass":
entity = "dataset"
elif type(snap_shot).__name__ == "DashboardSnapshotClass":
entity = "dashboard"
elif type(snap_shot).__name__ == "ChartSnapshotClass":
entity = "chart"
else:
logger.warning(
f"Skipping snapshot {snap_shot.urn} from being added to the state"
f" since it is not of the expected type {type(snap_shot).__name__}."
)

if entity is not None:
self.stale_entity_removal_handler.add_entity_to_state(entity, snap_shot.urn)

return work_unit

def get_metadata_change_proposal(
Expand Down Expand Up @@ -1320,6 +1379,11 @@ def get_workunits(self) -> Iterable[MetadataWorkUnit]:
key="tableau-metadata",
reason=f"Unable to retrieve metadata from tableau. Information: {str(md_exception)}",
)
# Clean up stale entities.
yield from self.stale_entity_removal_handler.gen_removed_entity_workunits()

def get_report(self) -> SourceReport:
def get_report(self) -> StaleEntityRemovalSourceReport:
return self.report

def get_platform_instance_id(self) -> str:
return self.platform
Original file line number Diff line number Diff line change
Expand Up @@ -4214,7 +4214,7 @@
"hasNextPage": false,
"endCursor": null
},
"totalCount": 3
"totalCount": 4
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
{
"data": {
"workbooksConnection": {
"nodes": [],
"pageInfo": {
"hasNextPage": false,
"endCursor": null
},
"totalCount": 0
}
}
}
Loading

0 comments on commit ffa838e

Please sign in to comment.