Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Ct 1716 cleanup logging events #6561

Merged
merged 8 commits into from
Jan 10, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions .changes/unreleased/Under the Hood-20230110-114233.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
kind: Under the Hood
body: Combine some logging events
time: 2023-01-10T11:42:33.580756-05:00
custom:
Author: gshank
Issue: 1716 1717 1719
77 changes: 41 additions & 36 deletions core/dbt/adapters/cache.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,21 +16,7 @@
TruncatedModelNameCausedCollision,
)
from dbt.events.functions import fire_event, fire_event_if
from dbt.events.types import (
AddLink,
AddRelation,
DropCascade,
DropMissingRelation,
DropRelation,
DumpAfterAddGraph,
DumpAfterRenameSchema,
DumpBeforeAddGraph,
DumpBeforeRenameSchema,
RenameSchema,
TemporaryRelation,
UncachedRelation,
UpdateReference,
)
from dbt.events.types import CacheAction, CacheDumpGraph
import dbt.flags as flags
from dbt.utils import lowercase

Expand Down Expand Up @@ -281,7 +267,7 @@ def _add_link(self, referenced_key, dependent_key):

referenced.add_reference(dependent)

# TODO: Is this dead code? I can't seem to find it grepping the codebase.
# This is called in plugins/postgres/dbt/adapters/postgres/impl.py
def add_link(self, referenced, dependent):
"""Add a link between two relations to the database. If either relation
does not exist, it will be added as an "external" relation.
Expand All @@ -303,9 +289,9 @@ def add_link(self, referenced, dependent):
# referring to a table outside our control. There's no need to make
# a link - we will never drop the referenced relation during a run.
fire_event(
UncachedRelation(
dep_key=_make_msg_from_ref_key(dep_key),
CacheAction(
ref_key=_make_msg_from_ref_key(ref_key),
ref_key_2=_make_msg_from_ref_key(dep_key),
)
)
return
Expand All @@ -318,8 +304,10 @@ def add_link(self, referenced, dependent):
dependent = dependent.replace(type=referenced.External)
self.add(dependent)
fire_event(
AddLink(
dep_key=_make_msg_from_ref_key(dep_key), ref_key=_make_msg_from_ref_key(ref_key)
CacheAction(
action="add_link",
ref_key=_make_msg_from_ref_key(dep_key),
ref_key_2=_make_msg_from_ref_key(ref_key),
)
)
with self.lock:
Expand All @@ -332,12 +320,18 @@ def add(self, relation):
:param BaseRelation relation: The underlying relation.
"""
cached = _CachedRelation(relation)
fire_event(AddRelation(relation=_make_ref_key_msg(cached)))
fire_event_if(flags.LOG_CACHE_EVENTS, lambda: DumpBeforeAddGraph(dump=self.dump_graph()))
fire_event_if(
flags.LOG_CACHE_EVENTS,
lambda: CacheDumpGraph(before_after="before", action="adding", dump=self.dump_graph()),
)
fire_event(CacheAction(action="add_relation", ref_key=_make_ref_key_msg(cached)))

with self.lock:
self._setdefault(cached)
fire_event_if(flags.LOG_CACHE_EVENTS, lambda: DumpAfterAddGraph(dump=self.dump_graph()))
fire_event_if(
flags.LOG_CACHE_EVENTS,
lambda: CacheDumpGraph(before_after="after", action="adding", dump=self.dump_graph()),
)

def _remove_refs(self, keys):
"""Removes all references to all entries in keys. This does not
Expand Down Expand Up @@ -365,16 +359,19 @@ def drop(self, relation):
"""
dropped_key = _make_ref_key(relation)
dropped_key_msg = _make_ref_key_msg(relation)
fire_event(DropRelation(dropped=dropped_key_msg))
fire_event(CacheAction(action="drop_relation", ref_key=dropped_key_msg))
with self.lock:
if dropped_key not in self.relations:
fire_event(DropMissingRelation(relation=dropped_key_msg))
fire_event(CacheAction(action="drop_missing_relation", ref_key=dropped_key_msg))
return
consequences = self.relations[dropped_key].collect_consequences()
# convert from a list of _ReferenceKeys to a list of ReferenceKeyMsgs
consequence_msgs = [_make_msg_from_ref_key(key) for key in consequences]

fire_event(DropCascade(dropped=dropped_key_msg, consequences=consequence_msgs))
fire_event(
CacheAction(
action="drop_cascade", ref_key=dropped_key_msg, ref_list=consequence_msgs
)
)
self._remove_refs(consequences)

def _rename_relation(self, old_key, new_relation):
Expand All @@ -397,12 +394,14 @@ def _rename_relation(self, old_key, new_relation):
for cached in self.relations.values():
if cached.is_referenced_by(old_key):
fire_event(
UpdateReference(
old_key=_make_ref_key_msg(old_key),
new_key=_make_ref_key_msg(new_key),
cached_key=_make_ref_key_msg(cached.key()),
CacheAction(
action="update_reference",
ref_key=_make_ref_key_msg(old_key),
ref_key_2=_make_ref_key_msg(new_key),
ref_key_3=_make_ref_key_msg(cached.key()),
)
)

cached.rename_key(old_key, new_key)

self.relations[new_key] = relation
Expand Down Expand Up @@ -430,7 +429,9 @@ def _check_rename_constraints(self, old_key, new_key):
raise TruncatedModelNameCausedCollision(new_key, self.relations)

if old_key not in self.relations:
fire_event(TemporaryRelation(key=_make_msg_from_ref_key(old_key)))
fire_event(
CacheAction(action="temporary_relation", ref_key=_make_msg_from_ref_key(old_key))
)
return False
return True

Expand All @@ -449,13 +450,16 @@ def rename(self, old, new):
old_key = _make_ref_key(old)
new_key = _make_ref_key(new)
fire_event(
RenameSchema(
old_key=_make_msg_from_ref_key(old_key), new_key=_make_msg_from_ref_key(new)
CacheAction(
action="rename_relation",
ref_key=_make_msg_from_ref_key(old_key),
ref_key_2=_make_msg_from_ref_key(new),
)
)

fire_event_if(
flags.LOG_CACHE_EVENTS, lambda: DumpBeforeRenameSchema(dump=self.dump_graph())
flags.LOG_CACHE_EVENTS,
lambda: CacheDumpGraph(before_after="before", action="rename", dump=self.dump_graph()),
)

with self.lock:
Expand All @@ -465,7 +469,8 @@ def rename(self, old, new):
self._setdefault(_CachedRelation(new))

fire_event_if(
flags.LOG_CACHE_EVENTS, lambda: DumpAfterRenameSchema(dump=self.dump_graph())
flags.LOG_CACHE_EVENTS,
lambda: CacheDumpGraph(before_after="after", action="rename", dump=self.dump_graph()),
)

def get_relations(self, database: Optional[str], schema: Optional[str]) -> List[Any]:
Expand Down
Loading