diff --git a/core/dbt/graph/graph.py b/core/dbt/graph/graph.py index 2dda596e073..673f6c2471e 100644 --- a/core/dbt/graph/graph.py +++ b/core/dbt/graph/graph.py @@ -65,6 +65,26 @@ def select_successors(self, selected: Set[UniqueId]) -> Set[UniqueId]: successors.update(self.graph.successors(node)) return successors + def trim_graph(self, trimgraph: nx.DiGraph, lookup_node, all_nodes: set, selected_nodes: set): + """introduced to boost performance in selecting nodes for dbt build, + this process untangles unnecessary nodes from lookup node without effecting selected nodes + """ + ancestors_set = set(nx.ancestors(trimgraph,lookup_node)) + predecessors_set = set(nx.predecessor(trimgraph,lookup_node)).difference({lookup_node}) # make sure to remove self node + + upflow_common_list = selected_nodes.intersection(ancestors_set) + downflow_common_list = selected_nodes.intersection(predecessors_set) + + if not upflow_common_list: + trimgraph.remove_nodes_from(ancestors_set) + if not downflow_common_list: + trimgraph.remove_nodes_from(predecessors_set) + + touched_nodes = ancestors_set.union(predecessors_set).union(selected_nodes) + untouched_nodes = all_nodes.difference(touched_nodes) + if untouched_nodes: + trimgraph.remove_nodes_from(untouched_nodes) + def get_subset_graph(self, selected: Iterable[UniqueId]) -> "Graph": """Create and return a new graph that is a shallow copy of the graph, but with only the nodes in include_nodes. Transitive edges across @@ -73,19 +93,35 @@ def get_subset_graph(self, selected: Iterable[UniqueId]) -> "Graph": new_graph = self.graph.copy() include_nodes = set(selected) + all_nodes = set(new_graph.nodes) + + for singlenode in include_nodes: + self.trim_graph(new_graph,singlenode,set(all_nodes),set(include_nodes)) + + all_nodes = set(new_graph.nodes) # reset all_nodes after trim graph + remove_nodes = all_nodes - include_nodes # consider all remaining nodes to exclude + + for singlenode in remove_nodes: + product_list = list(product(new_graph.predecessors(singlenode),new_graph.successors(singlenode))) + non_cyclic_new_edges = [ + (source, target) for source, target in product_list if source != target + ] # removes cyclic refs + new_graph.remove_node(singlenode) + if non_cyclic_new_edges: + new_graph.add_edges_from(non_cyclic_new_edges) - for node in self: - if node not in include_nodes: - source_nodes = [x for x, _ in new_graph.in_edges(node)] - target_nodes = [x for _, x in new_graph.out_edges(node)] + # for node in self: + # if node not in include_nodes: + # source_nodes = [x for x, _ in new_graph.in_edges(node)] + # target_nodes = [x for _, x in new_graph.out_edges(node)] - new_edges = product(source_nodes, target_nodes) - non_cyclic_new_edges = [ - (source, target) for source, target in new_edges if source != target - ] # removes cyclic refs + # new_edges = product(source_nodes, target_nodes) + # non_cyclic_new_edges = [ + # (source, target) for source, target in new_edges if source != target + # ] # removes cyclic refs - new_graph.add_edges_from(non_cyclic_new_edges) - new_graph.remove_node(node) + # new_graph.add_edges_from(non_cyclic_new_edges) + # new_graph.remove_node(node) for node in include_nodes: if node not in new_graph: