Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(ingestion): Tableau stateful ingestion #6094

Merged
18 changes: 18 additions & 0 deletions metadata-ingestion/src/datahub/emitter/mce_builder.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,9 @@
from datahub.metadata.schema_classes import (
AssertionKeyClass,
AuditStampClass,
ChartKeyClass,
ContainerKeyClass,
DashboardKeyClass,
DatasetKeyClass,
DatasetLineageTypeClass,
DatasetSnapshotClass,
Expand Down Expand Up @@ -214,11 +216,27 @@ def make_dashboard_urn(platform: str, name: str) -> str:
return f"urn:li:dashboard:({platform},{name})"


def dashboard_urn_to_key(dashboard_urn: str) -> Optional[DashboardKeyClass]:
pattern = r"urn:li:dashboard:\((.*),(.*)\)"
results = re.search(pattern, dashboard_urn)
if results is not None:
return DashboardKeyClass(dashboardTool=results[1], dashboardId=results[2])
return None


def make_chart_urn(platform: str, name: str) -> str:
# FIXME: charts don't currently include data platform urn prefixes.
return f"urn:li:chart:({platform},{name})"


def chart_urn_to_key(chart_urn: str) -> Optional[ChartKeyClass]:
pattern = r"urn:li:chart:\((.*),(.*)\)"
results = re.search(pattern, chart_urn)
if results is not None:
return ChartKeyClass(dashboardTool=results[1], chartId=results[2])
return None


def make_domain_urn(domain: str) -> str:
if domain.startswith("urn:li:domain:"):
return domain
Expand Down
131 changes: 131 additions & 0 deletions metadata-ingestion/src/datahub/ingestion/source/state/tableau_state.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,131 @@
import logging
from typing import Callable, Dict, Iterable, List

import pydantic

from datahub.emitter.mce_builder import (
chart_urn_to_key,
dashboard_urn_to_key,
make_chart_urn,
make_dashboard_urn,
)
from datahub.ingestion.source.state.stale_entity_removal_handler import (
StaleEntityCheckpointStateBase,
)
from datahub.utilities.checkpoint_state_util import CheckpointStateUtil

logger = logging.getLogger(__name__)


class TableauCheckpointState(StaleEntityCheckpointStateBase["TableauCheckpointState"]):
"""
Class for representing the checkpoint state for Tableau sources.
Stores all datasets, charts and dashboards being ingested and is
used to remove any stale entities.
"""

encoded_dataset_urns: List[str] = pydantic.Field(default_factory=list)
encoded_chart_urns: List[str] = pydantic.Field(default_factory=list)
encoded_dashboard_urns: List[str] = pydantic.Field(default_factory=list)

@classmethod
def get_supported_types(cls) -> List[str]:
return ["dataset", "chart", "dashboard"]

@staticmethod
def _get_dataset_lightweight_repr(dataset_urn: str) -> str:
"""Reduces the amount of text in the URNs for smaller state footprint."""
return CheckpointStateUtil.get_dataset_lightweight_repr(dataset_urn)

@staticmethod
def _get_chart_lightweight_repr(chart_urn: str) -> str:
"""Reduces the amount of text in the URNs for smaller state footprint."""
SEP = CheckpointStateUtil.get_separator()
key = chart_urn_to_key(chart_urn)
assert key is not None
return f"{key.dashboardTool}{SEP}{key.chartId}"

@staticmethod
def _get_dashboard_lightweight_repr(dashboard_urn: str) -> str:
"""Reduces the amount of text in the URNs for smaller state footprint."""
SEP = CheckpointStateUtil.get_separator()
key = dashboard_urn_to_key(dashboard_urn)
assert key is not None
return f"{key.dashboardTool}{SEP}{key.dashboardId}"

def _add_dataset_urn(self, dataset_urn: str) -> None:
self.encoded_dataset_urns.append(
self._get_dataset_lightweight_repr(dataset_urn)
)

def _add_chart_urn(self, chart_urn: str) -> None:
self.encoded_chart_urns.append(self._get_chart_lightweight_repr(chart_urn))

def _add_dashboard_urn(self, dashboard_urn: str) -> None:
self.encoded_dashboard_urns.append(
self._get_dashboard_lightweight_repr(dashboard_urn)
)

def _get_dataset_urns_not_in(
self, checkpoint: "TableauCheckpointState"
) -> Iterable[str]:
yield from CheckpointStateUtil.get_dataset_urns_not_in(
self.encoded_dataset_urns, checkpoint.encoded_dataset_urns
)

def _get_chart_urns_not_in(
self, checkpoint: "TableauCheckpointState"
) -> Iterable[str]:
difference = CheckpointStateUtil.get_encoded_urns_not_in(
self.encoded_chart_urns, checkpoint.encoded_chart_urns
)
for encoded_urn in difference:
platform, name = encoded_urn.split(CheckpointStateUtil.get_separator())
yield make_chart_urn(platform, name)

def _get_dashboard_urns_not_in(
self, checkpoint: "TableauCheckpointState"
) -> Iterable[str]:
difference = CheckpointStateUtil.get_encoded_urns_not_in(
self.encoded_dashboard_urns, checkpoint.encoded_dashboard_urns
)
for encoded_urn in difference:
platform, name = encoded_urn.split(CheckpointStateUtil.get_separator())
yield make_dashboard_urn(platform, name)

def add_checkpoint_urn(self, type: str, urn: str) -> None:
supported_entities_add_handlers: Dict[str, Callable[[str], None]] = {
"dataset": self._add_dataset_urn,
"chart": self._add_chart_urn,
"dashboard": self._add_dashboard_urn,
}

if type not in supported_entities_add_handlers:
logger.error(f"Can not save Unknown entity {type} to checkpoint.")

supported_entities_add_handlers[type](urn)

def get_urns_not_in(
self, type: str, other_checkpoint_state: "TableauCheckpointState"
) -> Iterable[str]:
assert type in self.get_supported_types()
if type == "dataset":
yield from self._get_dataset_urns_not_in(other_checkpoint_state)
elif type == "chart":
yield from self._get_chart_urns_not_in(other_checkpoint_state)
elif type == "dashboard":
yield from self._get_dashboard_urns_not_in(other_checkpoint_state)

def get_percent_entities_changed(
self, old_checkpoint_state: "TableauCheckpointState"
) -> float:
return StaleEntityCheckpointStateBase.compute_percent_entities_changed(
[
(self.encoded_dataset_urns, old_checkpoint_state.encoded_dataset_urns),
(self.encoded_chart_urns, old_checkpoint_state.encoded_chart_urns),
(
self.encoded_dashboard_urns,
old_checkpoint_state.encoded_dashboard_urns,
),
]
)
80 changes: 72 additions & 8 deletions metadata-ingestion/src/datahub/ingestion/source/tableau.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,18 @@
platform_name,
support_status,
)
from datahub.ingestion.api.source import Source, SourceReport
from datahub.ingestion.api.source import Source
from datahub.ingestion.api.workunit import MetadataWorkUnit
from datahub.ingestion.source.state.stale_entity_removal_handler import (
StaleEntityRemovalHandler,
StaleEntityRemovalSourceReport,
StatefulStaleMetadataRemovalConfig,
)
from datahub.ingestion.source.state.stateful_ingestion_base import (
StatefulIngestionConfigBase,
StatefulIngestionSourceBase,
)
from datahub.ingestion.source.state.tableau_state import TableauCheckpointState
from datahub.ingestion.source.tableau_common import (
FIELD_TYPE_MAPPING,
MetadataQueryException,
Expand Down Expand Up @@ -96,6 +106,16 @@
REPLACE_SLASH_CHAR = "|"


class TableauStatefulIngestionConfig(StatefulStaleMetadataRemovalConfig):
"""
Specialization of StatefulStaleMetadataRemovalConfig to adding custom config.
This will be used to override the stateful_ingestion config param of StatefulIngestionConfigBase
in the TableauConfig.
"""

_entity_types: List[str] = Field(default=["dataset", "chart", "dashboard"])


class TableauConnectionConfig(ConfigModel):
connect_uri: str = Field(description="Tableau host URL.")
username: Optional[str] = Field(
Expand Down Expand Up @@ -154,7 +174,11 @@ def make_tableau_client(self) -> Server:
raise ValueError(f"Unable to login: {str(e)}") from e


class TableauConfig(DatasetLineageProviderConfigBase, TableauConnectionConfig):
class TableauConfig(
DatasetLineageProviderConfigBase,
StatefulIngestionConfigBase,
TableauConnectionConfig,
):
projects: Optional[List[str]] = Field(
default=["default"], description="List of projects"
)
Expand Down Expand Up @@ -194,6 +218,10 @@ class TableauConfig(DatasetLineageProviderConfigBase, TableauConnectionConfig):
description="[experimental] Extract usage statistics for dashboards and charts.",
)

stateful_ingestion: Optional[TableauStatefulIngestionConfig] = Field(
default=None, description=""
)


class WorkbookKey(PlatformKey):
workbook_id: str
Expand All @@ -218,13 +246,16 @@ class UsageStat:
SourceCapability.USAGE_STATS,
"Dashboard/Chart view counts, enabled using extract_usage_stats config",
)
@capability(SourceCapability.DELETION_DETECTION, "", supported=False)
@capability(
SourceCapability.DELETION_DETECTION,
"Enabled by default when stateful ingestion is turned on.",
)
@capability(SourceCapability.OWNERSHIP, "Requires recipe configuration")
@capability(SourceCapability.TAGS, "Requires recipe configuration")
@capability(SourceCapability.LINEAGE_COARSE, "Enabled by default")
class TableauSource(Source):
class TableauSource(StatefulIngestionSourceBase):
config: TableauConfig
report: SourceReport
report: StaleEntityRemovalSourceReport
platform = "tableau"
server: Optional[Server]
upstream_tables: Dict[str, Tuple[Any, Optional[str], bool]] = {}
Expand All @@ -238,10 +269,10 @@ def __init__(
config: TableauConfig,
ctx: PipelineContext,
):
super().__init__(ctx)
super().__init__(config, ctx)

self.config = config
self.report = SourceReport()
self.report = StaleEntityRemovalSourceReport()
self.server = None

# This list keeps track of embedded datasources in workbooks so that we retrieve those
Expand All @@ -254,10 +285,20 @@ def __init__(
# when emitting custom SQL data sources.
self.custom_sql_ids_being_used: List[str] = []

# Create and register the stateful ingestion use-case handlers.
self.stale_entity_removal_handler = StaleEntityRemovalHandler(
source=self,
config=self.config,
state_type_class=TableauCheckpointState,
pipeline_name=self.ctx.pipeline_name,
run_id=self.ctx.run_id,
)

self._authenticate()

def close(self) -> None:
if self.server is not None:
self.prepare_for_commit()
self.server.auth.sign_out()

def _populate_usage_stat_registry(self):
Expand Down Expand Up @@ -712,6 +753,24 @@ def get_metadata_change_event(
mce = MetadataChangeEvent(proposedSnapshot=snap_shot)
work_unit = MetadataWorkUnit(id=snap_shot.urn, mce=mce)
self.report.report_workunit(work_unit)

# Add snapshot to the checkpoint state.
entity = None
if type(snap_shot).__name__ == "DatasetSnapshotClass":
entity = "dataset"
elif type(snap_shot).__name__ == "DashboardSnapshotClass":
entity = "dashboard"
elif type(snap_shot).__name__ == "ChartSnapshotClass":
entity = "chart"
else:
logger.warning(
f"Skipping snapshot {snap_shot.urn} from being added to the state"
f" since it is not of the expected type {type(snap_shot).__name__}."
)

if entity is not None:
self.stale_entity_removal_handler.add_entity_to_state(entity, snap_shot.urn)

return work_unit

def get_metadata_change_proposal(
Expand Down Expand Up @@ -1320,6 +1379,11 @@ def get_workunits(self) -> Iterable[MetadataWorkUnit]:
key="tableau-metadata",
reason=f"Unable to retrieve metadata from tableau. Information: {str(md_exception)}",
)
# Clean up stale entities.
yield from self.stale_entity_removal_handler.gen_removed_entity_workunits()

def get_report(self) -> SourceReport:
def get_report(self) -> StaleEntityRemovalSourceReport:
return self.report

def get_platform_instance_id(self) -> str:
return self.platform
Original file line number Diff line number Diff line change
Expand Up @@ -4214,7 +4214,7 @@
"hasNextPage": false,
"endCursor": null
},
"totalCount": 3
"totalCount": 4
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
{
"data": {
"workbooksConnection": {
"nodes": [],
"pageInfo": {
"hasNextPage": false,
"endCursor": null
},
"totalCount": 0
}
}
}
Loading