Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(ingest): simplify more stateful ingestion state #6762

Merged
merged 6 commits into from
Dec 15, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@
IcebergSourceReport,
)
from datahub.ingestion.source.iceberg.iceberg_profiler import IcebergProfiler
from datahub.ingestion.source.state.iceberg_state import IcebergCheckpointState
from datahub.ingestion.source.state.entity_removal_state import GenericCheckpointState
from datahub.ingestion.source.state.stale_entity_removal_handler import (
StaleEntityRemovalHandler,
)
Expand Down Expand Up @@ -123,7 +123,7 @@ def __init__(self, config: IcebergSourceConfig, ctx: PipelineContext) -> None:
self.stale_entity_removal_handler = StaleEntityRemovalHandler(
source=self,
config=self.config,
state_type_class=IcebergCheckpointState,
state_type_class=GenericCheckpointState,
pipeline_name=self.ctx.pipeline_name,
run_id=self.ctx.run_id,
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,3 @@ class DbtCheckpointState(GenericCheckpointState):
"encoded_assertion_urns": "assertion",
}
)

def prepare_for_commit(self) -> None:
self.urns = list(set(self.urns))
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from typing import Dict, Iterable, List, Type
from typing import Any, Dict, Iterable, List, Type

import pydantic

Expand All @@ -7,19 +7,32 @@
StaleEntityCheckpointStateBase,
)
from datahub.utilities.checkpoint_state_util import CheckpointStateUtil
from datahub.utilities.dedup_list import deduplicate_list
from datahub.utilities.urns.urn import guess_entity_type


class GenericCheckpointState(StaleEntityCheckpointStateBase["GenericCheckpointState"]):
urns: List[str] = pydantic.Field(default_factory=list)

# We store a bit of extra internal-only state so that we can keep the urns list deduplicated.
# However, we still want `urns` to be a list so that it maintains its order.
# We can't used OrderedSet here because pydantic doesn't recognize it and
# it isn't JSON serializable.
_urns_set: set = pydantic.PrivateAttr(default_factory=set)

def __init__(self, **data: Any): # type: ignore
super().__init__(**data)
self.urns = deduplicate_list(self.urns)
self._urns_set = set(self.urns)

@classmethod
def get_supported_types(cls) -> List[str]:
return ["*"]

def add_checkpoint_urn(self, type: str, urn: str) -> None:
# TODO: dedup
self.urns.append(urn)
if urn not in self._urns_set:
self.urns.append(urn)
self._urns_set.add(urn)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need the set if we deduplicate the list in the beginning and we only add new items to the list?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We're not guaranteed that we only add new items to the list once - for example, I think the dbt source sometimes adds urns to the state multiple times.

Really the deduplication on initialization isn't necessary, since our state payloads already shouldn't have any duplicates. However, I still added it because it's cheap and improves code clarity.


def get_urns_not_in(
self, type: str, other_checkpoint_state: "GenericCheckpointState"
Expand All @@ -29,6 +42,8 @@ def get_urns_not_in(
# To maintain backwards compatibility, we provide this filtering mechanism.
if type == "*":
yield from diff
elif type == "topic":
yield from (urn for urn in diff if guess_entity_type(urn) == "dataset")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wow, these exceptions are scary

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

they're only for compatibility with our existing tests - we don't need them in our actual code

else:
yield from (urn for urn in diff if guess_entity_type(urn) == type)

Expand All @@ -51,6 +66,7 @@ def pydantic_state_migrator(mapping: Dict[str, str]) -> classmethod:
"dataset",
"container",
"assertion",
"topic",
]
assert set(mapping.values()) <= set(SUPPORTED_TYPES)

Expand All @@ -64,6 +80,11 @@ def _validate_field_rename(cls: Type, values: dict) -> dict:
value = values.pop(old_field)
if mapped_type == "dataset":
values["urns"] += CheckpointStateUtil.get_dataset_urns_not_in(value, [])
elif mapped_type == "topic":
values["urns"] += [
CheckpointStateUtil.get_urn_from_encoded_topic(encoded_urn)
for encoded_urn in value
]
elif mapped_type == "container":
values["urns"] += [make_container_urn(guid) for guid in value]
elif mapped_type == "assertion":
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
@@ -1,66 +1,17 @@
from typing import Iterable, List

import pydantic

from datahub.emitter.mce_builder import dataset_urn_to_key, make_dataset_urn
from datahub.ingestion.source.state.stale_entity_removal_handler import (
StaleEntityCheckpointStateBase,
from datahub.ingestion.source.state.entity_removal_state import (
GenericCheckpointState,
pydantic_state_migrator,
)


class KafkaCheckpointState(StaleEntityCheckpointStateBase["KafkaCheckpointState"]):
class KafkaCheckpointState(GenericCheckpointState):
"""
This Class represents the checkpoint state for Kafka based sources.
This class represents the checkpoint state for Kafka based sources.
Stores all the topics being ingested and it is used to remove any stale entities.
"""

encoded_topic_urns: List[str] = pydantic.Field(default_factory=list)

@classmethod
def get_supported_types(cls) -> List[str]:
return ["topic"]

@staticmethod
def _get_separator() -> str:
# Unique small string not allowed in URNs.
return "||"

@staticmethod
def _get_lightweight_repr(dataset_urn: str) -> str:
"""Reduces the amount of text in the URNs for smaller state footprint."""
SEP = KafkaCheckpointState._get_separator()
key = dataset_urn_to_key(dataset_urn)
assert key is not None
return f"{key.platform}{SEP}{key.name}{SEP}{key.origin}"

@staticmethod
def _get_urns_not_in(
encoded_urns_1: List[str], encoded_urns_2: List[str]
) -> Iterable[str]:
difference = set(encoded_urns_1) - set(encoded_urns_2)
for encoded_urn in difference:
platform, name, env = encoded_urn.split(
KafkaCheckpointState._get_separator()
)
yield make_dataset_urn(platform, name, env)

def add_checkpoint_urn(self, type: str, urn: str) -> None:
assert type in self.get_supported_types()
if type == "topic":
self.encoded_topic_urns.append(self._get_lightweight_repr(urn))

def get_urns_not_in(
self, type: str, other_checkpoint_state: "KafkaCheckpointState"
) -> Iterable[str]:
assert type in self.get_supported_types()
if type == "topic":
yield from self._get_urns_not_in(
self.encoded_topic_urns, other_checkpoint_state.encoded_topic_urns
)

def get_percent_entities_changed(
self, old_checkpoint_state: "KafkaCheckpointState"
) -> float:
return StaleEntityCheckpointStateBase.compute_percent_entities_changed(
[(self.encoded_topic_urns, old_checkpoint_state.encoded_topic_urns)]
)
_migration = pydantic_state_migrator(
{
"encoded_topic_urns": "topic",
}
)
Original file line number Diff line number Diff line change
@@ -1,39 +1,8 @@
from typing import Iterable, List
from datahub.ingestion.source.state.entity_removal_state import GenericCheckpointState

import pydantic

from datahub.ingestion.source.state.stale_entity_removal_handler import (
StaleEntityCheckpointStateBase,
)
from datahub.utilities.urns.urn import guess_entity_type


class LdapCheckpointState(StaleEntityCheckpointStateBase["LdapCheckpointState"]):
class LdapCheckpointState(GenericCheckpointState):
"""
Base class for representing the checkpoint state for all LDAP based sources.
Stores all corpuser and corpGroup and being ingested and is used to remove any stale entities.
"""

urns: List[str] = pydantic.Field(default_factory=list)

@classmethod
def get_supported_types(cls) -> List[str]:
return ["corpuser", "corpGroup"]

def add_checkpoint_urn(self, type: str, urn: str) -> None:
assert type in self.get_supported_types()
self.urns.append(urn)

def get_urns_not_in(
self, type: str, other_checkpoint_state: "LdapCheckpointState"
) -> Iterable[str]:
assert type in self.get_supported_types()
diff = set(self.urns) - set(other_checkpoint_state.urns)
yield from (urn for urn in diff if guess_entity_type(urn) == type)

def get_percent_entities_changed(
self, old_checkpoint_state: "LdapCheckpointState"
) -> float:
return StaleEntityCheckpointStateBase.compute_percent_entities_changed(
[(self.urns, old_checkpoint_state.urns)]
)

This file was deleted.

4 changes: 2 additions & 2 deletions metadata-ingestion/src/datahub/ingestion/source/tableau.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
)
from datahub.ingestion.api.source import Source
from datahub.ingestion.api.workunit import MetadataWorkUnit
from datahub.ingestion.source.state.entity_removal_state import GenericCheckpointState
from datahub.ingestion.source.state.stale_entity_removal_handler import (
StaleEntityRemovalHandler,
StaleEntityRemovalSourceReport,
Expand All @@ -46,7 +47,6 @@
StatefulIngestionConfigBase,
StatefulIngestionSourceBase,
)
from datahub.ingestion.source.state.tableau_state import TableauCheckpointState
from datahub.ingestion.source.tableau_common import (
FIELD_TYPE_MAPPING,
MetadataQueryException,
Expand Down Expand Up @@ -316,7 +316,7 @@ def __init__(
self.stale_entity_removal_handler = StaleEntityRemovalHandler(
source=self,
config=self.config,
state_type_class=TableauCheckpointState,
state_type_class=GenericCheckpointState,
pipeline_name=self.ctx.pipeline_name,
run_id=self.ctx.run_id,
)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,10 @@
from typing import Iterable, List, Set

from datahub.emitter.mce_builder import dataset_key_to_urn, dataset_urn_to_key
from datahub.emitter.mce_builder import (
dataset_key_to_urn,
dataset_urn_to_key,
make_dataset_urn,
)
from datahub.metadata.schema_classes import DatasetKeyClass


Expand Down Expand Up @@ -39,3 +43,8 @@ def get_dataset_urns_not_in(
yield dataset_key_to_urn(
DatasetKeyClass(platform=platform, name=name, origin=env)
)

@staticmethod
def get_urn_from_encoded_topic(encoded_urn: str) -> str:
platform, name, env = encoded_urn.split(CheckpointStateUtil.get_separator())
return make_dataset_urn(platform, name, env)
8 changes: 4 additions & 4 deletions metadata-ingestion/tests/integration/iceberg/test_iceberg.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
from datahub.ingestion.run.pipeline import Pipeline
from datahub.ingestion.source.iceberg.iceberg import IcebergSource
from datahub.ingestion.source.state.checkpoint import Checkpoint
from datahub.ingestion.source.state.iceberg_state import IcebergCheckpointState
from datahub.ingestion.source.state.entity_removal_state import GenericCheckpointState
from tests.test_helpers import mce_helpers
from tests.test_helpers.state_helpers import (
run_and_get_pipeline,
Expand All @@ -24,7 +24,7 @@

def get_current_checkpoint_from_pipeline(
pipeline: Pipeline,
) -> Optional[Checkpoint]:
) -> Optional[Checkpoint[GenericCheckpointState]]:
iceberg_source = cast(IcebergSource, pipeline.source)
return iceberg_source.get_current_checkpoint(
iceberg_source.stale_entity_removal_handler.job_id
Expand Down Expand Up @@ -153,8 +153,8 @@ def test_iceberg_stateful_ingest(pytestconfig, tmp_path, mock_time, mock_datahub

# Perform all assertions on the states. The deleted table should not be
# part of the second state
state1 = cast(IcebergCheckpointState, checkpoint1.state)
state2 = cast(IcebergCheckpointState, checkpoint2.state)
state1 = checkpoint1.state
state2 = checkpoint2.state
difference_urns = list(
state1.get_urns_not_in(type="dataset", other_checkpoint_state=state2)
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
from datahub.configuration.source_common import DEFAULT_ENV
from datahub.ingestion.run.pipeline import Pipeline, PipelineContext
from datahub.ingestion.source.state.checkpoint import Checkpoint
from datahub.ingestion.source.state.tableau_state import TableauCheckpointState
from datahub.ingestion.source.state.entity_removal_state import GenericCheckpointState
from datahub.ingestion.source.tableau import TableauSource
from datahub.ingestion.source.tableau_common import (
TableauLineageOverrides,
Expand Down Expand Up @@ -145,7 +145,7 @@ def tableau_ingest_common(

def get_current_checkpoint_from_pipeline(
pipeline: Pipeline,
) -> Optional[Checkpoint]:
) -> Optional[Checkpoint[GenericCheckpointState]]:
tableau_source = cast(TableauSource, pipeline.source)
return tableau_source.get_current_checkpoint(
tableau_source.stale_entity_removal_handler.job_id
Expand Down Expand Up @@ -320,8 +320,8 @@ def test_tableau_stateful(pytestconfig, tmp_path, mock_time, mock_datahub_graph)

# Perform all assertions on the states. The deleted table should not be
# part of the second state
state1 = cast(TableauCheckpointState, checkpoint1.state)
state2 = cast(TableauCheckpointState, checkpoint2.state)
state1 = checkpoint1.state
state2 = checkpoint2.state

difference_dataset_urns = list(
state1.get_urns_not_in(type="dataset", other_checkpoint_state=state2)
Expand Down
Loading