Skip to content

Commit

Permalink
feat(ingest): fix tableau stateful ingestion
Browse files Browse the repository at this point in the history
- Emit status aspects for all entities
- Track containers in stateful ingestion
- Simplify stateful ingestion code
  • Loading branch information
hsheth2 committed Oct 12, 2022
1 parent c7db7da commit ef89764
Show file tree
Hide file tree
Showing 6 changed files with 223 additions and 190 deletions.
7 changes: 7 additions & 0 deletions metadata-ingestion/src/datahub/ingestion/api/workunit.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,13 @@ def __init__(
def get_metadata(self) -> dict:
return {"metadata": self.metadata}

def get_urn(self) -> str:
if isinstance(self.metadata, MetadataChangeEvent):
return self.metadata.proposedSnapshot.urn
else:
assert self.metadata.entityUrn
return self.metadata.entityUrn

def decompose_mce_into_mcps(self) -> Iterable["MetadataWorkUnit"]:
from datahub.emitter.mcp_builder import mcps_from_mce

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
from datahub.ingestion.source.state.use_case_handler import (
StatefulIngestionUsecaseHandlerBase,
)
from datahub.metadata.schema_classes import ChangeTypeClass, StatusClass
from datahub.metadata.schema_classes import StatusClass

logger: logging.Logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -223,19 +223,12 @@ def create_checkpoint(self) -> Optional[Checkpoint]:
return None

def _create_soft_delete_workunit(self, urn: str, type: str) -> MetadataWorkUnit:
entity_type = type
if entity_type in ["view", "table", "topic"]:
entity_type = "dataset"

logger.info(f"Soft-deleting stale entity of type {type} - {urn}.")
logger.info(f"Soft-deleting stale entity - {urn}")
mcp = MetadataChangeProposalWrapper(
entityType=entity_type,
entityUrn=urn,
changeType=ChangeTypeClass.UPSERT,
aspectName="status",
aspect=StatusClass(removed=True),
)
wu = MetadataWorkUnit(id=f"soft-delete-{type}-{urn}", mcp=mcp)
wu = MetadataWorkUnit(id=f"soft-delete-{urn}", mcp=mcp)
report = self.source.get_report()
assert isinstance(report, StaleEntityRemovalSourceReport)
report.report_workunit(wu)
Expand Down
111 changes: 13 additions & 98 deletions metadata-ingestion/src/datahub/ingestion/source/state/tableau_state.py
Original file line number Diff line number Diff line change
@@ -1,18 +1,12 @@
import logging
from typing import Callable, Dict, Iterable, List
from typing import Iterable, List

import pydantic

from datahub.emitter.mce_builder import (
chart_urn_to_key,
dashboard_urn_to_key,
make_chart_urn,
make_dashboard_urn,
)
from datahub.ingestion.source.state.stale_entity_removal_handler import (
StaleEntityCheckpointStateBase,
)
from datahub.utilities.checkpoint_state_util import CheckpointStateUtil
from datahub.utilities.urns.urn import guess_entity_type

logger = logging.getLogger(__name__)

Expand All @@ -24,108 +18,29 @@ class TableauCheckpointState(StaleEntityCheckpointStateBase["TableauCheckpointSt
used to remove any stale entities.
"""

encoded_dataset_urns: List[str] = pydantic.Field(default_factory=list)
encoded_chart_urns: List[str] = pydantic.Field(default_factory=list)
encoded_dashboard_urns: List[str] = pydantic.Field(default_factory=list)
urns: List[str] = pydantic.Field(default_factory=list)

@classmethod
def get_supported_types(cls) -> List[str]:
return ["dataset", "chart", "dashboard"]

@staticmethod
def _get_dataset_lightweight_repr(dataset_urn: str) -> str:
"""Reduces the amount of text in the URNs for smaller state footprint."""
return CheckpointStateUtil.get_dataset_lightweight_repr(dataset_urn)

@staticmethod
def _get_chart_lightweight_repr(chart_urn: str) -> str:
"""Reduces the amount of text in the URNs for smaller state footprint."""
SEP = CheckpointStateUtil.get_separator()
key = chart_urn_to_key(chart_urn)
assert key is not None
return f"{key.dashboardTool}{SEP}{key.chartId}"

@staticmethod
def _get_dashboard_lightweight_repr(dashboard_urn: str) -> str:
"""Reduces the amount of text in the URNs for smaller state footprint."""
SEP = CheckpointStateUtil.get_separator()
key = dashboard_urn_to_key(dashboard_urn)
assert key is not None
return f"{key.dashboardTool}{SEP}{key.dashboardId}"

def _add_dataset_urn(self, dataset_urn: str) -> None:
self.encoded_dataset_urns.append(
self._get_dataset_lightweight_repr(dataset_urn)
)

def _add_chart_urn(self, chart_urn: str) -> None:
self.encoded_chart_urns.append(self._get_chart_lightweight_repr(chart_urn))

def _add_dashboard_urn(self, dashboard_urn: str) -> None:
self.encoded_dashboard_urns.append(
self._get_dashboard_lightweight_repr(dashboard_urn)
)

def _get_dataset_urns_not_in(
self, checkpoint: "TableauCheckpointState"
) -> Iterable[str]:
yield from CheckpointStateUtil.get_dataset_urns_not_in(
self.encoded_dataset_urns, checkpoint.encoded_dataset_urns
)

def _get_chart_urns_not_in(
self, checkpoint: "TableauCheckpointState"
) -> Iterable[str]:
difference = CheckpointStateUtil.get_encoded_urns_not_in(
self.encoded_chart_urns, checkpoint.encoded_chart_urns
)
for encoded_urn in difference:
platform, name = encoded_urn.split(CheckpointStateUtil.get_separator())
yield make_chart_urn(platform, name)

def _get_dashboard_urns_not_in(
self, checkpoint: "TableauCheckpointState"
) -> Iterable[str]:
difference = CheckpointStateUtil.get_encoded_urns_not_in(
self.encoded_dashboard_urns, checkpoint.encoded_dashboard_urns
)
for encoded_urn in difference:
platform, name = encoded_urn.split(CheckpointStateUtil.get_separator())
yield make_dashboard_urn(platform, name)
return ["*"]

def add_checkpoint_urn(self, type: str, urn: str) -> None:
supported_entities_add_handlers: Dict[str, Callable[[str], None]] = {
"dataset": self._add_dataset_urn,
"chart": self._add_chart_urn,
"dashboard": self._add_dashboard_urn,
}

if type not in supported_entities_add_handlers:
logger.error(f"Can not save Unknown entity {type} to checkpoint.")

supported_entities_add_handlers[type](urn)
self.urns.append(urn)

def get_urns_not_in(
self, type: str, other_checkpoint_state: "TableauCheckpointState"
) -> Iterable[str]:
assert type in self.get_supported_types()
if type == "dataset":
yield from self._get_dataset_urns_not_in(other_checkpoint_state)
elif type == "chart":
yield from self._get_chart_urns_not_in(other_checkpoint_state)
elif type == "dashboard":
yield from self._get_dashboard_urns_not_in(other_checkpoint_state)
diff = set(self.urns) - set(other_checkpoint_state.urns)

# To maintain backwards compatibility, we provide this filtering mechanism.
if type == "*":
yield from diff
else:
yield from (urn for urn in diff if guess_entity_type(urn) == type)

def get_percent_entities_changed(
self, old_checkpoint_state: "TableauCheckpointState"
) -> float:
return StaleEntityCheckpointStateBase.compute_percent_entities_changed(
[
(self.encoded_dataset_urns, old_checkpoint_state.encoded_dataset_urns),
(self.encoded_chart_urns, old_checkpoint_state.encoded_chart_urns),
(
self.encoded_dashboard_urns,
old_checkpoint_state.encoded_dashboard_urns,
),
]
[(self.urns, old_checkpoint_state.urns)]
)
29 changes: 10 additions & 19 deletions metadata-ingestion/src/datahub/ingestion/source/tableau.py
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,10 @@
ViewPropertiesClass,
)
from datahub.utilities import config_clean
from datahub.utilities.source_helpers import (
auto_stale_entity_removal,
auto_status_aspect,
)

logger: logging.Logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -754,23 +758,6 @@ def get_metadata_change_event(
work_unit = MetadataWorkUnit(id=snap_shot.urn, mce=mce)
self.report.report_workunit(work_unit)

# Add snapshot to the checkpoint state.
entity = None
if type(snap_shot).__name__ == "DatasetSnapshotClass":
entity = "dataset"
elif type(snap_shot).__name__ == "DashboardSnapshotClass":
entity = "dashboard"
elif type(snap_shot).__name__ == "ChartSnapshotClass":
entity = "chart"
else:
logger.warning(
f"Skipping snapshot {snap_shot.urn} from being added to the state"
f" since it is not of the expected type {type(snap_shot).__name__}."
)

if entity is not None:
self.stale_entity_removal_handler.add_entity_to_state(entity, snap_shot.urn)

return work_unit

def get_metadata_change_proposal(
Expand Down Expand Up @@ -1365,6 +1352,12 @@ def create(cls, config_dict: dict, ctx: PipelineContext) -> Source:
return cls(config, ctx)

def get_workunits(self) -> Iterable[MetadataWorkUnit]:
return auto_stale_entity_removal(
self.stale_entity_removal_handler,
auto_status_aspect(self.get_workunits_internal()),
)

def get_workunits_internal(self) -> Iterable[MetadataWorkUnit]:
if self.server is None or not self.server.is_signed_in():
return
try:
Expand All @@ -1384,8 +1377,6 @@ def get_workunits(self) -> Iterable[MetadataWorkUnit]:
key="tableau-metadata",
reason=f"Unable to retrieve metadata from tableau. Information: {str(md_exception)}",
)
# Clean up stale entities.
yield from self.stale_entity_removal_handler.gen_removed_entity_workunits()

def get_report(self) -> StaleEntityRemovalSourceReport:
return self.report
Expand Down
71 changes: 71 additions & 0 deletions metadata-ingestion/src/datahub/utilities/source_helpers.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
from typing import Callable, Iterable, Optional, Set

from datahub.emitter.mcp import MetadataChangeProposalWrapper
from datahub.ingestion.api.workunit import MetadataWorkUnit
from datahub.ingestion.source.state.stale_entity_removal_handler import (
StaleEntityRemovalHandler,
)
from datahub.metadata.schema_classes import MetadataChangeEventClass, StatusClass
from datahub.utilities.urns.urn import guess_entity_type


def auto_status_aspect(
stream: Iterable[MetadataWorkUnit],
) -> Iterable[MetadataWorkUnit]:
"""
For all entities that don't have a status aspect, add one with removed set to false.
"""

all_urns: Set[str] = set()
status_urns: Set[str] = set()
for wu in stream:
urn = wu.get_urn()
if isinstance(wu.metadata, MetadataChangeEventClass):
if any(
isinstance(aspect, StatusClass)
for aspect in wu.metadata.proposedSnapshot.aspects
):
status_urns.add(urn)
elif isinstance(wu.metadata, MetadataChangeProposalWrapper):
if isinstance(wu.metadata.aspect, StatusClass):
status_urns.add(urn)
else:
raise ValueError(f"Unexpected type {type(wu.metadata)}")

yield wu

for urn in all_urns - status_urns:
yield MetadataChangeProposalWrapper(
entityUrn=urn,
aspect=StatusClass(removed=False),
).as_workunit()


def _default_entity_type_fn(wu: MetadataWorkUnit) -> Optional[str]:
urn = wu.get_urn()
entity_type = guess_entity_type(urn)
return entity_type


def auto_stale_entity_removal(
stale_entity_removal_handler: StaleEntityRemovalHandler,
stream: Iterable[MetadataWorkUnit],
entity_type_fn: Callable[
[MetadataWorkUnit], Optional[str]
] = _default_entity_type_fn,
) -> Iterable[MetadataWorkUnit]:
"""
Record all entities that are found, and emit removals for any that disappeared in this run.
"""

for wu in stream:
urn = wu.get_urn()

entity_type = entity_type_fn(wu)
if entity_type is not None:
stale_entity_removal_handler.add_entity_to_state(entity_type, urn)

yield wu

# Clean up stale entities.
yield from stale_entity_removal_handler.gen_removed_entity_workunits()
Loading

0 comments on commit ef89764

Please sign in to comment.