Skip to content

Commit

Permalink
cycle-safe root_stale_causes (#19298)
Browse files Browse the repository at this point in the history
similar to #18516, update the root cause calculation to be safe in the
event a cycle happens

## How I Tested These Changes

was able to verify the fix against a complex standing deployment, have
not been able to distill to minimal repro
  • Loading branch information
alangenfeld authored Jan 19, 2024
1 parent 14438e8 commit aa1289c
Showing 1 changed file with 22 additions and 20 deletions.
42 changes: 22 additions & 20 deletions python_modules/dagster/dagster/_core/definitions/data_version.py
Original file line number Diff line number Diff line change
@@ -1,17 +1,15 @@
import functools
from collections import OrderedDict
from enum import Enum
from hashlib import sha256
from typing import (
TYPE_CHECKING,
Callable,
Dict,
Iterator,
List,
Mapping,
NamedTuple,
Optional,
Sequence,
Tuple,
Union,
)

Expand Down Expand Up @@ -347,6 +345,12 @@ def sort_key(self) -> str:
self._sort_key = f"{self.key}/{self.dependency}" if self.dependency else str(self.key)
return self._sort_key

@property
def dedupe_key(self) -> int:
# subset of properties that are safe to hash
safe_tup = (self.key, self.category, self.reason, self.dependency)
return hash(safe_tup)


# If a partition has greater than this number of dependencies, we don't check
# this edge for updated data or propagate other stale causes through this edge.
Expand Down Expand Up @@ -581,23 +585,21 @@ def _get_stale_causes_materialized(self, key: "AssetKeyPartitionKey") -> Iterato

@cached_method
def _get_stale_root_causes(self, key: "AssetKeyPartitionKey") -> Sequence[StaleCause]:
causes = self._get_stale_causes(key=key)
root_pairs = sorted([pair for cause in causes for pair in self._gather_leaves(cause)])
# After sorting the pairs, we can drop the level and de-dup using an
# ordered dict as an ordered set. This will give us unique root causes,
# sorted by level.
roots: Dict[StaleCause, None] = OrderedDict()
for root_cause in [leaf_cause for _, leaf_cause in root_pairs]:
roots[root_cause] = None
return list(roots.keys())

# The leaves of the cause tree for an asset are the root causes of its staleness.
def _gather_leaves(self, cause: StaleCause, level: int = 0) -> Iterator[Tuple[int, StaleCause]]:
if cause.children is None:
yield (level, cause)
else:
for child in cause.children:
yield from self._gather_leaves(child, level=level + 1)
candidates = self._get_stale_causes(key=key)
visited = set()
root_causes = []
while candidates:
next_candidates: List[StaleCause] = []
for cause in candidates:
if cause.dedupe_key not in visited:
if cause.children is None:
root_causes.append(cause)
else:
next_candidates.extend(cause.children)
visited.add(cause.dedupe_key)

candidates = next_candidates
return root_causes

@property
def asset_graph(self) -> "AssetGraph":
Expand Down

0 comments on commit aa1289c

Please sign in to comment.