Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Factor Out Repeated Logic in the PartialParsing Class #7952

Merged
merged 3 commits into from
Jun 26, 2023
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
171 changes: 51 additions & 120 deletions core/dbt/parser/partial.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
import os
from copy import deepcopy
from typing import MutableMapping, Dict, List
from typing import MutableMapping, Dict, List, Callable
from dbt.contracts.graph.manifest import Manifest
from dbt.contracts.files import (
AnySourceFile,
ParseFileType,
parse_file_type_to_parser,
SchemaSourceFile,
)
from dbt.events.functions import fire_event
from dbt.events.base_types import EventLevel
Expand Down Expand Up @@ -403,41 +404,19 @@ def schedule_nodes_for_parsing(self, unique_ids):
self.add_to_pp_files(self.saved_files[file_id])
elif unique_id in self.saved_manifest.sources:
source = self.saved_manifest.sources[unique_id]
file_id = source.file_id
if file_id in self.saved_files and file_id not in self.file_diff["deleted"]:
schema_file = self.saved_files[file_id]
sources = []
if "sources" in schema_file.dict_from_yaml:
sources = schema_file.dict_from_yaml["sources"]
source_element = self.get_schema_element(sources, source.source_name)
if source_element:
self.delete_schema_source(schema_file, source_element)
self.remove_tests(schema_file, "sources", source_element["name"])
self.merge_patch(schema_file, "sources", source_element)
self._schedule_for_parsing(
"sources", source, source.source_name, self.delete_schema_source
)
elif unique_id in self.saved_manifest.exposures:
exposure = self.saved_manifest.exposures[unique_id]
file_id = exposure.file_id
if file_id in self.saved_files and file_id not in self.file_diff["deleted"]:
schema_file = self.saved_files[file_id]
exposures = []
if "exposures" in schema_file.dict_from_yaml:
exposures = schema_file.dict_from_yaml["exposures"]
exposure_element = self.get_schema_element(exposures, exposure.name)
if exposure_element:
self.delete_schema_exposure(schema_file, exposure_element)
self.merge_patch(schema_file, "exposures", exposure_element)
self._schedule_for_parsing(
"exposures", exposure, exposure.name, self.delete_schema_exposure
)
elif unique_id in self.saved_manifest.metrics:
metric = self.saved_manifest.metrics[unique_id]
file_id = metric.file_id
if file_id in self.saved_files and file_id not in self.file_diff["deleted"]:
schema_file = self.saved_files[file_id]
metrics = []
if "metrics" in schema_file.dict_from_yaml:
metrics = schema_file.dict_from_yaml["metrics"]
metric_element = self.get_schema_element(metrics, metric.name)
if metric_element:
self.delete_schema_metric(schema_file, metric_element)
self.merge_patch(schema_file, "metrics", metric_element)
self._schedule_for_parsing(
"metrics", metric, metric.name, self.delete_schema_metric
)
elif unique_id in self.saved_manifest.macros:
macro = self.saved_manifest.macros[unique_id]
file_id = macro.file_id
Expand All @@ -447,6 +426,19 @@ def schedule_nodes_for_parsing(self, unique_ids):
self.saved_files[file_id] = deepcopy(self.new_files[file_id])
self.add_to_pp_files(self.saved_files[file_id])

def _schedule_for_parsing(self, dict_key: str, element, name, delete: Callable) -> None:
file_id = element.file_id
if file_id in self.saved_files and file_id not in self.file_diff["deleted"]:
schema_file = self.saved_files[file_id]
elements = []
assert isinstance(schema_file, SchemaSourceFile)
if dict_key in schema_file.dict_from_yaml:
elements = schema_file.dict_from_yaml[dict_key]
schema_element = self.get_schema_element(elements, name)
if schema_element:
delete(schema_file, schema_element)
self.merge_patch(schema_file, dict_key, schema_element)

def delete_macro_file(self, source_file, follow_references=False):
self.check_for_special_deleted_macros(source_file)
self.handle_macro_file_links(source_file, follow_references)
Expand Down Expand Up @@ -538,7 +530,6 @@ def schedule_macro_nodes_for_parsing(self, unique_ids):
# This is a source patch; need to re-parse orig source
self.remove_source_override_target(patch)
self.delete_schema_source(schema_file, patch)
self.remove_tests(schema_file, "sources", patch["name"])
self.merge_patch(schema_file, "sources", patch)
else:
file_id = node.file_id
Expand Down Expand Up @@ -639,14 +630,12 @@ def handle_schema_file_changes(self, schema_file, saved_yaml_dict, new_yaml_dict
if "overrides" in source: # This is a source patch; need to re-parse orig source
self.remove_source_override_target(source)
self.delete_schema_source(schema_file, source)
self.remove_tests(schema_file, dict_key, source["name"])
self.merge_patch(schema_file, dict_key, source)
if source_diff["deleted"]:
for source in source_diff["deleted"]:
if "overrides" in source: # This is a source patch; need to re-parse orig source
self.remove_source_override_target(source)
self.delete_schema_source(schema_file, source)
self.remove_tests(schema_file, dict_key, source["name"])
if source_diff["added"]:
for source in source_diff["added"]:
if "overrides" in source: # This is a source patch; need to re-parse orig source
Expand All @@ -662,99 +651,40 @@ def handle_schema_file_changes(self, schema_file, saved_yaml_dict, new_yaml_dict
if "overrides" in source:
self.remove_source_override_target(source)
self.delete_schema_source(schema_file, source)
self.remove_tests(schema_file, dict_key, source["name"])
self.merge_patch(schema_file, dict_key, source)

# macros
dict_key = "macros"
macro_diff = self.get_diff_for(dict_key, saved_yaml_dict, new_yaml_dict)
if macro_diff["changed"]:
for macro in macro_diff["changed"]:
self.delete_schema_macro_patch(schema_file, macro)
self.merge_patch(schema_file, dict_key, macro)
if macro_diff["deleted"]:
for macro in macro_diff["deleted"]:
self.delete_schema_macro_patch(schema_file, macro)
if macro_diff["added"]:
for macro in macro_diff["added"]:
self.merge_patch(schema_file, dict_key, macro)
# Handle schema file updates due to env_var changes
if dict_key in env_var_changes and dict_key in new_yaml_dict:
for name in env_var_changes[dict_key]:
if name in macro_diff["changed_or_deleted_names"]:
continue
elem = self.get_schema_element(new_yaml_dict[dict_key], name)
if elem:
self.delete_schema_macro_patch(schema_file, elem)
self.merge_patch(schema_file, dict_key, elem)

# exposures
dict_key = "exposures"
exposure_diff = self.get_diff_for(dict_key, saved_yaml_dict, new_yaml_dict)
if exposure_diff["changed"]:
for exposure in exposure_diff["changed"]:
self.delete_schema_exposure(schema_file, exposure)
self.merge_patch(schema_file, dict_key, exposure)
if exposure_diff["deleted"]:
for exposure in exposure_diff["deleted"]:
self.delete_schema_exposure(schema_file, exposure)
if exposure_diff["added"]:
for exposure in exposure_diff["added"]:
self.merge_patch(schema_file, dict_key, exposure)
# Handle schema file updates due to env_var changes
if dict_key in env_var_changes and dict_key in new_yaml_dict:
for name in env_var_changes[dict_key]:
if name in exposure_diff["changed_or_deleted_names"]:
continue
elem = self.get_schema_element(new_yaml_dict[dict_key], name)
if elem:
self.delete_schema_exposure(schema_file, elem)
self.merge_patch(schema_file, dict_key, elem)

# metrics
dict_key = "metrics"
metric_diff = self.get_diff_for("metrics", saved_yaml_dict, new_yaml_dict)
if metric_diff["changed"]:
for metric in metric_diff["changed"]:
self.delete_schema_metric(schema_file, metric)
self.merge_patch(schema_file, dict_key, metric)
if metric_diff["deleted"]:
for metric in metric_diff["deleted"]:
self.delete_schema_metric(schema_file, metric)
if metric_diff["added"]:
for metric in metric_diff["added"]:
self.merge_patch(schema_file, dict_key, metric)
# Handle schema file updates due to env_var changes
if dict_key in env_var_changes and dict_key in new_yaml_dict:
for name in env_var_changes[dict_key]:
if name in metric_diff["changed_or_deleted_names"]:
continue
elem = self.get_schema_element(new_yaml_dict[dict_key], name)
if elem:
self.delete_schema_metric(schema_file, elem)
self.merge_patch(schema_file, dict_key, elem)
def handle_change(key: str, delete: Callable):
self._handle_element_change(
schema_file, saved_yaml_dict, new_yaml_dict, env_var_changes, key, delete
)

# groups
dict_key = "groups"
group_diff = self.get_diff_for("groups", saved_yaml_dict, new_yaml_dict)
if group_diff["changed"]:
for group in group_diff["changed"]:
self.delete_schema_group(schema_file, group)
self.merge_patch(schema_file, dict_key, group)
if group_diff["deleted"]:
for group in group_diff["deleted"]:
self.delete_schema_group(schema_file, group)
if group_diff["added"]:
for group in group_diff["added"]:
self.merge_patch(schema_file, dict_key, group)
handle_change("macros", self.delete_schema_macro_patch)
handle_change("exposures", self.delete_schema_exposure)
handle_change("metrics", self.delete_schema_metric)
handle_change("groups", self.delete_schema_group)

def _handle_element_change(
self, schema_file, saved_yaml_dict, new_yaml_dict, env_var_changes, dict_key: str, delete
):
element_diff = self.get_diff_for(dict_key, saved_yaml_dict, new_yaml_dict)
if element_diff["changed"]:
for element in element_diff["changed"]:
delete(schema_file, element)
self.merge_patch(schema_file, dict_key, element)
if element_diff["deleted"]:
for element in element_diff["deleted"]:
delete(schema_file, element)
if element_diff["added"]:
for element in element_diff["added"]:
self.merge_patch(schema_file, dict_key, element)
# Handle schema file updates due to env_var changes
if dict_key in env_var_changes and dict_key in new_yaml_dict:
for name in env_var_changes[dict_key]:
if name in group_diff["changed_or_deleted_names"]:
if name in element_diff["changed_or_deleted_names"]:
continue
elem = self.get_schema_element(new_yaml_dict[dict_key], name)
if elem:
self.delete_schema_group(schema_file, elem)
delete(schema_file, elem)
self.merge_patch(schema_file, dict_key, elem)

# Take a "section" of the schema file yaml dictionary from saved and new schema files
Expand Down Expand Up @@ -887,6 +817,8 @@ def delete_schema_source(self, schema_file, source_dict):
schema_file.sources.remove(unique_id)
self.schedule_referencing_nodes_for_parsing(unique_id)

self.remove_tests(schema_file, "sources", source_name)

def delete_schema_macro_patch(self, schema_file, macro):
# This is just macro patches that need to be reapplied
macro_unique_id = None
Expand Down Expand Up @@ -970,7 +902,6 @@ def remove_source_override_target(self, source_dict):
(orig_file, orig_source) = self.get_source_override_file_and_dict(source_dict)
if orig_source:
self.delete_schema_source(orig_file, orig_source)
self.remove_tests(orig_file, "sources", orig_source["name"])
self.merge_patch(orig_file, "sources", orig_source)
self.add_to_pp_files(orig_file)

Expand Down