Skip to content

Commit

Permalink
Fix percent change computation in stale_entity_removal & default value
Browse files Browse the repository at this point in the history
of threshold
  • Loading branch information
rslanka committed Oct 4, 2022
1 parent 3c0f63c commit 5c43c41
Show file tree
Hide file tree
Showing 2 changed files with 54 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ class StatefulStaleMetadataRemovalConfig(StatefulIngestionConfig):
description=f"Soft-deletes the entities of type {', '.join(_entity_types)} in the last successful run but missing in the current run with stateful_ingestion enabled.",
)
fail_safe_threshold: float = pydantic.Field(
default=95.0,
default=20.0,
description="Prevents large amount of soft deletes & the state from committing from accidental changes to the source configuration if the relative change percent in entities compared to the previous state is above the 'fail_safe_threshold'.",
le=100.0, # mypy does not work with pydantic.confloat. This is the recommended work-around.
ge=0.0,
Expand Down Expand Up @@ -94,7 +94,7 @@ def get_percent_entities_changed(self, old_checkpoint_state: Derived) -> float:
"""
Returns the percentage of entities that have changed relative to `old_checkpoint_state`.
:param old_checkpoint_state: the old checkpoint state to compute the relative change percent against.
:return: (|intersection(self, old_checkpoint_state)| * 100.0 / |old_checkpoint_state|)
:return: (1-|intersection(self, old_checkpoint_state)| / |old_checkpoint_state|) * 100.0
"""
pass

Expand All @@ -115,7 +115,7 @@ def compute_percent_entities_changed(
overlap_count_all += overlap_count
old_count_all += old_count
if old_count_all:
return overlap_count * 100.0 / old_count_all
return (1 - overlap_count / old_count_all) * 100.0
return 0.0

@staticmethod
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
from typing import Dict, List, Tuple

import pytest

from datahub.ingestion.source.state.stale_entity_removal_handler import (
StaleEntityCheckpointStateBase,
)

OldNewEntLists = List[Tuple[List[str], List[str]]]

old_new_ent_tests: Dict[str, Tuple[OldNewEntLists, float]] = {
"no_change_empty_old_and_new": ([([], [])], 0.0),
"no_change_empty_old_and_non_empty_new": ([(["a"], [])], 0.0),
"no_change_non_empty_old_new_equals_old": (
[(["a", "b", "c"], ["c", "b", "a"])],
0.0,
),
"no_change_non_empty_old_new_superset_old": (
[(["a", "b", "c", "d"], ["c", "b", "a"])],
0.0,
),
"change_25_percent_delta": ([(["a", "b", "c"], ["d", "c", "b", "a"])], 25.0),
"change_50_percent_delta": (
[
(
["b", "a"],
["a", "b", "c", "d"],
)
],
50.0,
),
"change_75_percent_delta": ([(["a"], ["a", "b", "c", "d"])], 75.0),
"change_100_percent_delta_empty_new": ([([], ["a", "b", "c", "d"])], 100.0),
"change_100_percent_delta_non_empty_new": ([(["e"], ["a", "b", "c", "d"])], 100.0),
}


@pytest.mark.parametrize(
"new_old_entity_list, expected_percent_change",
old_new_ent_tests.values(),
ids=old_new_ent_tests.keys(),
)
def test_change_percent(
new_old_entity_list: OldNewEntLists, expected_percent_change: float
) -> None:
actual_percent_change = (
StaleEntityCheckpointStateBase.compute_percent_entities_changed(
new_old_entity_list
)
)
assert actual_percent_change == expected_percent_change

0 comments on commit 5c43c41

Please sign in to comment.