Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(ingest/tableau): emit status aspects + streamline stateful ingestion #6188

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions metadata-ingestion/src/datahub/ingestion/api/workunit.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,13 @@ def __init__(
def get_metadata(self) -> dict:
return {"metadata": self.metadata}

def get_urn(self) -> str:
if isinstance(self.metadata, MetadataChangeEvent):
return self.metadata.proposedSnapshot.urn
else:
assert self.metadata.entityUrn
return self.metadata.entityUrn

def decompose_mce_into_mcps(self) -> Iterable["MetadataWorkUnit"]:
from datahub.emitter.mcp_builder import mcps_from_mce

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
from datahub.ingestion.source.state.use_case_handler import (
StatefulIngestionUsecaseHandlerBase,
)
from datahub.metadata.schema_classes import ChangeTypeClass, StatusClass
from datahub.metadata.schema_classes import StatusClass

logger: logging.Logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -223,19 +223,12 @@ def create_checkpoint(self) -> Optional[Checkpoint]:
return None

def _create_soft_delete_workunit(self, urn: str, type: str) -> MetadataWorkUnit:
entity_type = type
if entity_type in ["view", "table", "topic"]:
entity_type = "dataset"

logger.info(f"Soft-deleting stale entity of type {type} - {urn}.")
logger.info(f"Soft-deleting stale entity - {urn}")
mcp = MetadataChangeProposalWrapper(
entityType=entity_type,
entityUrn=urn,
changeType=ChangeTypeClass.UPSERT,
aspectName="status",
aspect=StatusClass(removed=True),
)
wu = MetadataWorkUnit(id=f"soft-delete-{type}-{urn}", mcp=mcp)
wu = MetadataWorkUnit(id=f"soft-delete-{urn}", mcp=mcp)
report = self.source.get_report()
assert isinstance(report, StaleEntityRemovalSourceReport)
report.report_workunit(wu)
Expand Down
111 changes: 13 additions & 98 deletions metadata-ingestion/src/datahub/ingestion/source/state/tableau_state.py
Original file line number Diff line number Diff line change
@@ -1,18 +1,12 @@
import logging
from typing import Callable, Dict, Iterable, List
from typing import Iterable, List

import pydantic

from datahub.emitter.mce_builder import (
chart_urn_to_key,
dashboard_urn_to_key,
make_chart_urn,
make_dashboard_urn,
)
from datahub.ingestion.source.state.stale_entity_removal_handler import (
StaleEntityCheckpointStateBase,
)
from datahub.utilities.checkpoint_state_util import CheckpointStateUtil
from datahub.utilities.urns.urn import guess_entity_type

logger = logging.getLogger(__name__)

Expand All @@ -24,108 +18,29 @@ class TableauCheckpointState(StaleEntityCheckpointStateBase["TableauCheckpointSt
used to remove any stale entities.
"""

encoded_dataset_urns: List[str] = pydantic.Field(default_factory=list)
encoded_chart_urns: List[str] = pydantic.Field(default_factory=list)
encoded_dashboard_urns: List[str] = pydantic.Field(default_factory=list)
urns: List[str] = pydantic.Field(default_factory=list)

@classmethod
def get_supported_types(cls) -> List[str]:
return ["dataset", "chart", "dashboard"]

@staticmethod
def _get_dataset_lightweight_repr(dataset_urn: str) -> str:
"""Reduces the amount of text in the URNs for smaller state footprint."""
return CheckpointStateUtil.get_dataset_lightweight_repr(dataset_urn)

@staticmethod
def _get_chart_lightweight_repr(chart_urn: str) -> str:
"""Reduces the amount of text in the URNs for smaller state footprint."""
SEP = CheckpointStateUtil.get_separator()
key = chart_urn_to_key(chart_urn)
assert key is not None
return f"{key.dashboardTool}{SEP}{key.chartId}"

@staticmethod
def _get_dashboard_lightweight_repr(dashboard_urn: str) -> str:
"""Reduces the amount of text in the URNs for smaller state footprint."""
SEP = CheckpointStateUtil.get_separator()
key = dashboard_urn_to_key(dashboard_urn)
assert key is not None
return f"{key.dashboardTool}{SEP}{key.dashboardId}"

def _add_dataset_urn(self, dataset_urn: str) -> None:
self.encoded_dataset_urns.append(
self._get_dataset_lightweight_repr(dataset_urn)
)

def _add_chart_urn(self, chart_urn: str) -> None:
self.encoded_chart_urns.append(self._get_chart_lightweight_repr(chart_urn))

def _add_dashboard_urn(self, dashboard_urn: str) -> None:
self.encoded_dashboard_urns.append(
self._get_dashboard_lightweight_repr(dashboard_urn)
)

def _get_dataset_urns_not_in(
self, checkpoint: "TableauCheckpointState"
) -> Iterable[str]:
yield from CheckpointStateUtil.get_dataset_urns_not_in(
self.encoded_dataset_urns, checkpoint.encoded_dataset_urns
)

def _get_chart_urns_not_in(
self, checkpoint: "TableauCheckpointState"
) -> Iterable[str]:
difference = CheckpointStateUtil.get_encoded_urns_not_in(
self.encoded_chart_urns, checkpoint.encoded_chart_urns
)
for encoded_urn in difference:
platform, name = encoded_urn.split(CheckpointStateUtil.get_separator())
yield make_chart_urn(platform, name)

def _get_dashboard_urns_not_in(
self, checkpoint: "TableauCheckpointState"
) -> Iterable[str]:
difference = CheckpointStateUtil.get_encoded_urns_not_in(
self.encoded_dashboard_urns, checkpoint.encoded_dashboard_urns
)
for encoded_urn in difference:
platform, name = encoded_urn.split(CheckpointStateUtil.get_separator())
yield make_dashboard_urn(platform, name)
return ["*"]

def add_checkpoint_urn(self, type: str, urn: str) -> None:
supported_entities_add_handlers: Dict[str, Callable[[str], None]] = {
"dataset": self._add_dataset_urn,
"chart": self._add_chart_urn,
"dashboard": self._add_dashboard_urn,
}

if type not in supported_entities_add_handlers:
logger.error(f"Can not save Unknown entity {type} to checkpoint.")

supported_entities_add_handlers[type](urn)
self.urns.append(urn)

def get_urns_not_in(
self, type: str, other_checkpoint_state: "TableauCheckpointState"
) -> Iterable[str]:
assert type in self.get_supported_types()
if type == "dataset":
yield from self._get_dataset_urns_not_in(other_checkpoint_state)
elif type == "chart":
yield from self._get_chart_urns_not_in(other_checkpoint_state)
elif type == "dashboard":
yield from self._get_dashboard_urns_not_in(other_checkpoint_state)
diff = set(self.urns) - set(other_checkpoint_state.urns)

# To maintain backwards compatibility, we provide this filtering mechanism.
if type == "*":
yield from diff
else:
yield from (urn for urn in diff if guess_entity_type(urn) == type)

def get_percent_entities_changed(
self, old_checkpoint_state: "TableauCheckpointState"
) -> float:
return StaleEntityCheckpointStateBase.compute_percent_entities_changed(
[
(self.encoded_dataset_urns, old_checkpoint_state.encoded_dataset_urns),
(self.encoded_chart_urns, old_checkpoint_state.encoded_chart_urns),
(
self.encoded_dashboard_urns,
old_checkpoint_state.encoded_dashboard_urns,
),
]
[(self.urns, old_checkpoint_state.urns)]
)
29 changes: 10 additions & 19 deletions metadata-ingestion/src/datahub/ingestion/source/tableau.py
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,10 @@
ViewPropertiesClass,
)
from datahub.utilities import config_clean
from datahub.utilities.source_helpers import (
auto_stale_entity_removal,
auto_status_aspect,
)

logger: logging.Logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -754,23 +758,6 @@ def get_metadata_change_event(
work_unit = MetadataWorkUnit(id=snap_shot.urn, mce=mce)
self.report.report_workunit(work_unit)

# Add snapshot to the checkpoint state.
entity = None
if type(snap_shot).__name__ == "DatasetSnapshotClass":
entity = "dataset"
elif type(snap_shot).__name__ == "DashboardSnapshotClass":
entity = "dashboard"
elif type(snap_shot).__name__ == "ChartSnapshotClass":
entity = "chart"
else:
logger.warning(
f"Skipping snapshot {snap_shot.urn} from being added to the state"
f" since it is not of the expected type {type(snap_shot).__name__}."
)

if entity is not None:
self.stale_entity_removal_handler.add_entity_to_state(entity, snap_shot.urn)

return work_unit

def get_metadata_change_proposal(
Expand Down Expand Up @@ -1365,6 +1352,12 @@ def create(cls, config_dict: dict, ctx: PipelineContext) -> Source:
return cls(config, ctx)

def get_workunits(self) -> Iterable[MetadataWorkUnit]:
return auto_stale_entity_removal(
self.stale_entity_removal_handler,
auto_status_aspect(self.get_workunits_internal()),
)

def get_workunits_internal(self) -> Iterable[MetadataWorkUnit]:
if self.server is None or not self.server.is_signed_in():
return
try:
Expand All @@ -1384,8 +1377,6 @@ def get_workunits(self) -> Iterable[MetadataWorkUnit]:
key="tableau-metadata",
reason=f"Unable to retrieve metadata from tableau. Information: {str(md_exception)}",
)
# Clean up stale entities.
yield from self.stale_entity_removal_handler.gen_removed_entity_workunits()

def get_report(self) -> StaleEntityRemovalSourceReport:
return self.report
Expand Down
71 changes: 71 additions & 0 deletions metadata-ingestion/src/datahub/utilities/source_helpers.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
from typing import Callable, Iterable, Optional, Set

from datahub.emitter.mcp import MetadataChangeProposalWrapper
from datahub.ingestion.api.workunit import MetadataWorkUnit
from datahub.ingestion.source.state.stale_entity_removal_handler import (
StaleEntityRemovalHandler,
)
from datahub.metadata.schema_classes import MetadataChangeEventClass, StatusClass
from datahub.utilities.urns.urn import guess_entity_type


def auto_status_aspect(
stream: Iterable[MetadataWorkUnit],
) -> Iterable[MetadataWorkUnit]:
"""
For all entities that don't have a status aspect, add one with removed set to false.
"""

all_urns: Set[str] = set()
status_urns: Set[str] = set()
for wu in stream:
urn = wu.get_urn()
if isinstance(wu.metadata, MetadataChangeEventClass):
if any(
isinstance(aspect, StatusClass)
for aspect in wu.metadata.proposedSnapshot.aspects
):
status_urns.add(urn)
elif isinstance(wu.metadata, MetadataChangeProposalWrapper):
if isinstance(wu.metadata.aspect, StatusClass):
status_urns.add(urn)
else:
raise ValueError(f"Unexpected type {type(wu.metadata)}")

yield wu

for urn in all_urns - status_urns:
yield MetadataChangeProposalWrapper(
entityUrn=urn,
aspect=StatusClass(removed=False),
).as_workunit()


def _default_entity_type_fn(wu: MetadataWorkUnit) -> Optional[str]:
urn = wu.get_urn()
entity_type = guess_entity_type(urn)
return entity_type


def auto_stale_entity_removal(
stale_entity_removal_handler: StaleEntityRemovalHandler,
stream: Iterable[MetadataWorkUnit],
entity_type_fn: Callable[
[MetadataWorkUnit], Optional[str]
] = _default_entity_type_fn,
) -> Iterable[MetadataWorkUnit]:
"""
Record all entities that are found, and emit removals for any that disappeared in this run.
"""

for wu in stream:
urn = wu.get_urn()

entity_type = entity_type_fn(wu)
if entity_type is not None:
stale_entity_removal_handler.add_entity_to_state(entity_type, urn)

yield wu

# Clean up stale entities.
yield from stale_entity_removal_handler.gen_removed_entity_workunits()
Loading