-
Notifications
You must be signed in to change notification settings - Fork 3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat(ingest): simplify more stateful ingestion state #6762
Changes from all commits
9f6f598
a9e20fa
22be745
1235e09
0565933
e4e252a
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,4 +1,4 @@ | ||
from typing import Dict, Iterable, List, Type | ||
from typing import Any, Dict, Iterable, List, Type | ||
|
||
import pydantic | ||
|
||
|
@@ -7,19 +7,32 @@ | |
StaleEntityCheckpointStateBase, | ||
) | ||
from datahub.utilities.checkpoint_state_util import CheckpointStateUtil | ||
from datahub.utilities.dedup_list import deduplicate_list | ||
from datahub.utilities.urns.urn import guess_entity_type | ||
|
||
|
||
class GenericCheckpointState(StaleEntityCheckpointStateBase["GenericCheckpointState"]): | ||
urns: List[str] = pydantic.Field(default_factory=list) | ||
|
||
# We store a bit of extra internal-only state so that we can keep the urns list deduplicated. | ||
# However, we still want `urns` to be a list so that it maintains its order. | ||
# We can't used OrderedSet here because pydantic doesn't recognize it and | ||
# it isn't JSON serializable. | ||
_urns_set: set = pydantic.PrivateAttr(default_factory=set) | ||
|
||
def __init__(self, **data: Any): # type: ignore | ||
super().__init__(**data) | ||
self.urns = deduplicate_list(self.urns) | ||
self._urns_set = set(self.urns) | ||
|
||
@classmethod | ||
def get_supported_types(cls) -> List[str]: | ||
return ["*"] | ||
|
||
def add_checkpoint_urn(self, type: str, urn: str) -> None: | ||
# TODO: dedup | ||
self.urns.append(urn) | ||
if urn not in self._urns_set: | ||
self.urns.append(urn) | ||
self._urns_set.add(urn) | ||
|
||
def get_urns_not_in( | ||
self, type: str, other_checkpoint_state: "GenericCheckpointState" | ||
|
@@ -29,6 +42,8 @@ def get_urns_not_in( | |
# To maintain backwards compatibility, we provide this filtering mechanism. | ||
if type == "*": | ||
yield from diff | ||
elif type == "topic": | ||
yield from (urn for urn in diff if guess_entity_type(urn) == "dataset") | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Wow, these exceptions are scary There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. they're only for compatibility with our existing tests - we don't need them in our actual code |
||
else: | ||
yield from (urn for urn in diff if guess_entity_type(urn) == type) | ||
|
||
|
@@ -51,6 +66,7 @@ def pydantic_state_migrator(mapping: Dict[str, str]) -> classmethod: | |
"dataset", | ||
"container", | ||
"assertion", | ||
"topic", | ||
] | ||
assert set(mapping.values()) <= set(SUPPORTED_TYPES) | ||
|
||
|
@@ -64,6 +80,11 @@ def _validate_field_rename(cls: Type, values: dict) -> dict: | |
value = values.pop(old_field) | ||
if mapped_type == "dataset": | ||
values["urns"] += CheckpointStateUtil.get_dataset_urns_not_in(value, []) | ||
elif mapped_type == "topic": | ||
values["urns"] += [ | ||
CheckpointStateUtil.get_urn_from_encoded_topic(encoded_urn) | ||
for encoded_urn in value | ||
] | ||
elif mapped_type == "container": | ||
values["urns"] += [make_container_urn(guid) for guid in value] | ||
elif mapped_type == "assertion": | ||
|
This file was deleted.
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,66 +1,17 @@ | ||
from typing import Iterable, List | ||
|
||
import pydantic | ||
|
||
from datahub.emitter.mce_builder import dataset_urn_to_key, make_dataset_urn | ||
from datahub.ingestion.source.state.stale_entity_removal_handler import ( | ||
StaleEntityCheckpointStateBase, | ||
from datahub.ingestion.source.state.entity_removal_state import ( | ||
GenericCheckpointState, | ||
pydantic_state_migrator, | ||
) | ||
|
||
|
||
class KafkaCheckpointState(StaleEntityCheckpointStateBase["KafkaCheckpointState"]): | ||
class KafkaCheckpointState(GenericCheckpointState): | ||
""" | ||
This Class represents the checkpoint state for Kafka based sources. | ||
This class represents the checkpoint state for Kafka based sources. | ||
Stores all the topics being ingested and it is used to remove any stale entities. | ||
""" | ||
|
||
encoded_topic_urns: List[str] = pydantic.Field(default_factory=list) | ||
|
||
@classmethod | ||
def get_supported_types(cls) -> List[str]: | ||
return ["topic"] | ||
|
||
@staticmethod | ||
def _get_separator() -> str: | ||
# Unique small string not allowed in URNs. | ||
return "||" | ||
|
||
@staticmethod | ||
def _get_lightweight_repr(dataset_urn: str) -> str: | ||
"""Reduces the amount of text in the URNs for smaller state footprint.""" | ||
SEP = KafkaCheckpointState._get_separator() | ||
key = dataset_urn_to_key(dataset_urn) | ||
assert key is not None | ||
return f"{key.platform}{SEP}{key.name}{SEP}{key.origin}" | ||
|
||
@staticmethod | ||
def _get_urns_not_in( | ||
encoded_urns_1: List[str], encoded_urns_2: List[str] | ||
) -> Iterable[str]: | ||
difference = set(encoded_urns_1) - set(encoded_urns_2) | ||
for encoded_urn in difference: | ||
platform, name, env = encoded_urn.split( | ||
KafkaCheckpointState._get_separator() | ||
) | ||
yield make_dataset_urn(platform, name, env) | ||
|
||
def add_checkpoint_urn(self, type: str, urn: str) -> None: | ||
assert type in self.get_supported_types() | ||
if type == "topic": | ||
self.encoded_topic_urns.append(self._get_lightweight_repr(urn)) | ||
|
||
def get_urns_not_in( | ||
self, type: str, other_checkpoint_state: "KafkaCheckpointState" | ||
) -> Iterable[str]: | ||
assert type in self.get_supported_types() | ||
if type == "topic": | ||
yield from self._get_urns_not_in( | ||
self.encoded_topic_urns, other_checkpoint_state.encoded_topic_urns | ||
) | ||
|
||
def get_percent_entities_changed( | ||
self, old_checkpoint_state: "KafkaCheckpointState" | ||
) -> float: | ||
return StaleEntityCheckpointStateBase.compute_percent_entities_changed( | ||
[(self.encoded_topic_urns, old_checkpoint_state.encoded_topic_urns)] | ||
) | ||
_migration = pydantic_state_migrator( | ||
{ | ||
"encoded_topic_urns": "topic", | ||
} | ||
) |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,39 +1,8 @@ | ||
from typing import Iterable, List | ||
from datahub.ingestion.source.state.entity_removal_state import GenericCheckpointState | ||
|
||
import pydantic | ||
|
||
from datahub.ingestion.source.state.stale_entity_removal_handler import ( | ||
StaleEntityCheckpointStateBase, | ||
) | ||
from datahub.utilities.urns.urn import guess_entity_type | ||
|
||
|
||
class LdapCheckpointState(StaleEntityCheckpointStateBase["LdapCheckpointState"]): | ||
class LdapCheckpointState(GenericCheckpointState): | ||
""" | ||
Base class for representing the checkpoint state for all LDAP based sources. | ||
Stores all corpuser and corpGroup and being ingested and is used to remove any stale entities. | ||
""" | ||
|
||
urns: List[str] = pydantic.Field(default_factory=list) | ||
|
||
@classmethod | ||
def get_supported_types(cls) -> List[str]: | ||
return ["corpuser", "corpGroup"] | ||
|
||
def add_checkpoint_urn(self, type: str, urn: str) -> None: | ||
assert type in self.get_supported_types() | ||
self.urns.append(urn) | ||
|
||
def get_urns_not_in( | ||
self, type: str, other_checkpoint_state: "LdapCheckpointState" | ||
) -> Iterable[str]: | ||
assert type in self.get_supported_types() | ||
diff = set(self.urns) - set(other_checkpoint_state.urns) | ||
yield from (urn for urn in diff if guess_entity_type(urn) == type) | ||
|
||
def get_percent_entities_changed( | ||
self, old_checkpoint_state: "LdapCheckpointState" | ||
) -> float: | ||
return StaleEntityCheckpointStateBase.compute_percent_entities_changed( | ||
[(self.urns, old_checkpoint_state.urns)] | ||
) |
This file was deleted.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we need the set if we deduplicate the list in the beginning and we only add new items to the list?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We're not guaranteed that we only add new items to the list once - for example, I think the dbt source sometimes adds urns to the state multiple times.
Really the deduplication on initialization isn't necessary, since our state payloads already shouldn't have any duplicates. However, I still added it because it's cheap and improves code clarity.