Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

dbt build performance fix #6322

Closed
wants to merge 1 commit into from
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
56 changes: 46 additions & 10 deletions core/dbt/graph/graph.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,26 @@ def select_successors(self, selected: Set[UniqueId]) -> Set[UniqueId]:
successors.update(self.graph.successors(node))
return successors

def trim_graph(self, trimgraph: nx.DiGraph, lookup_node, all_nodes: set, selected_nodes: set):
"""introduced to boost performance in selecting nodes for dbt build,
this process untangles unnecessary nodes from lookup node without effecting selected nodes
"""
ancestors_set = set(nx.ancestors(trimgraph,lookup_node))
predecessors_set = set(nx.predecessor(trimgraph,lookup_node)).difference({lookup_node}) # make sure to remove self node

upflow_common_list = selected_nodes.intersection(ancestors_set)
downflow_common_list = selected_nodes.intersection(predecessors_set)

if not upflow_common_list:
trimgraph.remove_nodes_from(ancestors_set)
if not downflow_common_list:
trimgraph.remove_nodes_from(predecessors_set)

touched_nodes = ancestors_set.union(predecessors_set).union(selected_nodes)
untouched_nodes = all_nodes.difference(touched_nodes)
if untouched_nodes:
trimgraph.remove_nodes_from(untouched_nodes)

def get_subset_graph(self, selected: Iterable[UniqueId]) -> "Graph":
"""Create and return a new graph that is a shallow copy of the graph,
but with only the nodes in include_nodes. Transitive edges across
Expand All @@ -73,19 +93,35 @@ def get_subset_graph(self, selected: Iterable[UniqueId]) -> "Graph":

new_graph = self.graph.copy()
include_nodes = set(selected)
all_nodes = set(new_graph.nodes)

for singlenode in include_nodes:
self.trim_graph(new_graph,singlenode,set(all_nodes),set(include_nodes))

all_nodes = set(new_graph.nodes) # reset all_nodes after trim graph
remove_nodes = all_nodes - include_nodes # consider all remaining nodes to exclude

for singlenode in remove_nodes:
product_list = list(product(new_graph.predecessors(singlenode),new_graph.successors(singlenode)))
non_cyclic_new_edges = [
(source, target) for source, target in product_list if source != target
] # removes cyclic refs
new_graph.remove_node(singlenode)
if non_cyclic_new_edges:
new_graph.add_edges_from(non_cyclic_new_edges)

for node in self:
if node not in include_nodes:
source_nodes = [x for x, _ in new_graph.in_edges(node)]
target_nodes = [x for _, x in new_graph.out_edges(node)]
# for node in self:
# if node not in include_nodes:
# source_nodes = [x for x, _ in new_graph.in_edges(node)]
# target_nodes = [x for _, x in new_graph.out_edges(node)]

new_edges = product(source_nodes, target_nodes)
non_cyclic_new_edges = [
(source, target) for source, target in new_edges if source != target
] # removes cyclic refs
# new_edges = product(source_nodes, target_nodes)
# non_cyclic_new_edges = [
# (source, target) for source, target in new_edges if source != target
# ] # removes cyclic refs

new_graph.add_edges_from(non_cyclic_new_edges)
new_graph.remove_node(node)
# new_graph.add_edges_from(non_cyclic_new_edges)
# new_graph.remove_node(node)
Comment on lines +113 to +124
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There isn't a need to keep commented out code, this can be removed.


for node in include_nodes:
if node not in new_graph:
Expand Down