diff --git a/core/dbt/parser/partial.py b/core/dbt/parser/partial.py index bc925c9cc1f..8a70ba7525a 100644 --- a/core/dbt/parser/partial.py +++ b/core/dbt/parser/partial.py @@ -1,11 +1,12 @@ import os from copy import deepcopy -from typing import MutableMapping, Dict, List +from typing import MutableMapping, Dict, List, Callable from dbt.contracts.graph.manifest import Manifest from dbt.contracts.files import ( AnySourceFile, ParseFileType, parse_file_type_to_parser, + SchemaSourceFile, ) from dbt.events.functions import fire_event from dbt.events.base_types import EventLevel @@ -403,41 +404,19 @@ def schedule_nodes_for_parsing(self, unique_ids): self.add_to_pp_files(self.saved_files[file_id]) elif unique_id in self.saved_manifest.sources: source = self.saved_manifest.sources[unique_id] - file_id = source.file_id - if file_id in self.saved_files and file_id not in self.file_diff["deleted"]: - schema_file = self.saved_files[file_id] - sources = [] - if "sources" in schema_file.dict_from_yaml: - sources = schema_file.dict_from_yaml["sources"] - source_element = self.get_schema_element(sources, source.source_name) - if source_element: - self.delete_schema_source(schema_file, source_element) - self.remove_tests(schema_file, "sources", source_element["name"]) - self.merge_patch(schema_file, "sources", source_element) + self._schedule_for_parsing( + "sources", source, source.source_name, self.delete_schema_source + ) elif unique_id in self.saved_manifest.exposures: exposure = self.saved_manifest.exposures[unique_id] - file_id = exposure.file_id - if file_id in self.saved_files and file_id not in self.file_diff["deleted"]: - schema_file = self.saved_files[file_id] - exposures = [] - if "exposures" in schema_file.dict_from_yaml: - exposures = schema_file.dict_from_yaml["exposures"] - exposure_element = self.get_schema_element(exposures, exposure.name) - if exposure_element: - self.delete_schema_exposure(schema_file, exposure_element) - self.merge_patch(schema_file, "exposures", exposure_element) + self._schedule_for_parsing( + "exposures", exposure, exposure.name, self.delete_schema_exposure + ) elif unique_id in self.saved_manifest.metrics: metric = self.saved_manifest.metrics[unique_id] - file_id = metric.file_id - if file_id in self.saved_files and file_id not in self.file_diff["deleted"]: - schema_file = self.saved_files[file_id] - metrics = [] - if "metrics" in schema_file.dict_from_yaml: - metrics = schema_file.dict_from_yaml["metrics"] - metric_element = self.get_schema_element(metrics, metric.name) - if metric_element: - self.delete_schema_metric(schema_file, metric_element) - self.merge_patch(schema_file, "metrics", metric_element) + self._schedule_for_parsing( + "metrics", metric, metric.name, self.delete_schema_metric + ) elif unique_id in self.saved_manifest.macros: macro = self.saved_manifest.macros[unique_id] file_id = macro.file_id @@ -447,6 +426,19 @@ def schedule_nodes_for_parsing(self, unique_ids): self.saved_files[file_id] = deepcopy(self.new_files[file_id]) self.add_to_pp_files(self.saved_files[file_id]) + def _schedule_for_parsing(self, dict_key: str, element, name, delete: Callable) -> None: + file_id = element.file_id + if file_id in self.saved_files and file_id not in self.file_diff["deleted"]: + schema_file = self.saved_files[file_id] + elements = [] + assert isinstance(schema_file, SchemaSourceFile) + if dict_key in schema_file.dict_from_yaml: + elements = schema_file.dict_from_yaml[dict_key] + schema_element = self.get_schema_element(elements, name) + if schema_element: + delete(schema_file, schema_element) + self.merge_patch(schema_file, dict_key, schema_element) + def delete_macro_file(self, source_file, follow_references=False): self.check_for_special_deleted_macros(source_file) self.handle_macro_file_links(source_file, follow_references) @@ -538,7 +530,6 @@ def schedule_macro_nodes_for_parsing(self, unique_ids): # This is a source patch; need to re-parse orig source self.remove_source_override_target(patch) self.delete_schema_source(schema_file, patch) - self.remove_tests(schema_file, "sources", patch["name"]) self.merge_patch(schema_file, "sources", patch) else: file_id = node.file_id @@ -639,14 +630,12 @@ def handle_schema_file_changes(self, schema_file, saved_yaml_dict, new_yaml_dict if "overrides" in source: # This is a source patch; need to re-parse orig source self.remove_source_override_target(source) self.delete_schema_source(schema_file, source) - self.remove_tests(schema_file, dict_key, source["name"]) self.merge_patch(schema_file, dict_key, source) if source_diff["deleted"]: for source in source_diff["deleted"]: if "overrides" in source: # This is a source patch; need to re-parse orig source self.remove_source_override_target(source) self.delete_schema_source(schema_file, source) - self.remove_tests(schema_file, dict_key, source["name"]) if source_diff["added"]: for source in source_diff["added"]: if "overrides" in source: # This is a source patch; need to re-parse orig source @@ -662,99 +651,40 @@ def handle_schema_file_changes(self, schema_file, saved_yaml_dict, new_yaml_dict if "overrides" in source: self.remove_source_override_target(source) self.delete_schema_source(schema_file, source) - self.remove_tests(schema_file, dict_key, source["name"]) self.merge_patch(schema_file, dict_key, source) - # macros - dict_key = "macros" - macro_diff = self.get_diff_for(dict_key, saved_yaml_dict, new_yaml_dict) - if macro_diff["changed"]: - for macro in macro_diff["changed"]: - self.delete_schema_macro_patch(schema_file, macro) - self.merge_patch(schema_file, dict_key, macro) - if macro_diff["deleted"]: - for macro in macro_diff["deleted"]: - self.delete_schema_macro_patch(schema_file, macro) - if macro_diff["added"]: - for macro in macro_diff["added"]: - self.merge_patch(schema_file, dict_key, macro) - # Handle schema file updates due to env_var changes - if dict_key in env_var_changes and dict_key in new_yaml_dict: - for name in env_var_changes[dict_key]: - if name in macro_diff["changed_or_deleted_names"]: - continue - elem = self.get_schema_element(new_yaml_dict[dict_key], name) - if elem: - self.delete_schema_macro_patch(schema_file, elem) - self.merge_patch(schema_file, dict_key, elem) - - # exposures - dict_key = "exposures" - exposure_diff = self.get_diff_for(dict_key, saved_yaml_dict, new_yaml_dict) - if exposure_diff["changed"]: - for exposure in exposure_diff["changed"]: - self.delete_schema_exposure(schema_file, exposure) - self.merge_patch(schema_file, dict_key, exposure) - if exposure_diff["deleted"]: - for exposure in exposure_diff["deleted"]: - self.delete_schema_exposure(schema_file, exposure) - if exposure_diff["added"]: - for exposure in exposure_diff["added"]: - self.merge_patch(schema_file, dict_key, exposure) - # Handle schema file updates due to env_var changes - if dict_key in env_var_changes and dict_key in new_yaml_dict: - for name in env_var_changes[dict_key]: - if name in exposure_diff["changed_or_deleted_names"]: - continue - elem = self.get_schema_element(new_yaml_dict[dict_key], name) - if elem: - self.delete_schema_exposure(schema_file, elem) - self.merge_patch(schema_file, dict_key, elem) - - # metrics - dict_key = "metrics" - metric_diff = self.get_diff_for("metrics", saved_yaml_dict, new_yaml_dict) - if metric_diff["changed"]: - for metric in metric_diff["changed"]: - self.delete_schema_metric(schema_file, metric) - self.merge_patch(schema_file, dict_key, metric) - if metric_diff["deleted"]: - for metric in metric_diff["deleted"]: - self.delete_schema_metric(schema_file, metric) - if metric_diff["added"]: - for metric in metric_diff["added"]: - self.merge_patch(schema_file, dict_key, metric) - # Handle schema file updates due to env_var changes - if dict_key in env_var_changes and dict_key in new_yaml_dict: - for name in env_var_changes[dict_key]: - if name in metric_diff["changed_or_deleted_names"]: - continue - elem = self.get_schema_element(new_yaml_dict[dict_key], name) - if elem: - self.delete_schema_metric(schema_file, elem) - self.merge_patch(schema_file, dict_key, elem) + def handle_change(key: str, delete: Callable): + self._handle_element_change( + schema_file, saved_yaml_dict, new_yaml_dict, env_var_changes, key, delete + ) - # groups - dict_key = "groups" - group_diff = self.get_diff_for("groups", saved_yaml_dict, new_yaml_dict) - if group_diff["changed"]: - for group in group_diff["changed"]: - self.delete_schema_group(schema_file, group) - self.merge_patch(schema_file, dict_key, group) - if group_diff["deleted"]: - for group in group_diff["deleted"]: - self.delete_schema_group(schema_file, group) - if group_diff["added"]: - for group in group_diff["added"]: - self.merge_patch(schema_file, dict_key, group) + handle_change("macros", self.delete_schema_macro_patch) + handle_change("exposures", self.delete_schema_exposure) + handle_change("metrics", self.delete_schema_metric) + handle_change("groups", self.delete_schema_group) + + def _handle_element_change( + self, schema_file, saved_yaml_dict, new_yaml_dict, env_var_changes, dict_key: str, delete + ): + element_diff = self.get_diff_for(dict_key, saved_yaml_dict, new_yaml_dict) + if element_diff["changed"]: + for element in element_diff["changed"]: + delete(schema_file, element) + self.merge_patch(schema_file, dict_key, element) + if element_diff["deleted"]: + for element in element_diff["deleted"]: + delete(schema_file, element) + if element_diff["added"]: + for element in element_diff["added"]: + self.merge_patch(schema_file, dict_key, element) # Handle schema file updates due to env_var changes if dict_key in env_var_changes and dict_key in new_yaml_dict: for name in env_var_changes[dict_key]: - if name in group_diff["changed_or_deleted_names"]: + if name in element_diff["changed_or_deleted_names"]: continue elem = self.get_schema_element(new_yaml_dict[dict_key], name) if elem: - self.delete_schema_group(schema_file, elem) + delete(schema_file, elem) self.merge_patch(schema_file, dict_key, elem) # Take a "section" of the schema file yaml dictionary from saved and new schema files @@ -887,6 +817,8 @@ def delete_schema_source(self, schema_file, source_dict): schema_file.sources.remove(unique_id) self.schedule_referencing_nodes_for_parsing(unique_id) + self.remove_tests(schema_file, "sources", source_name) + def delete_schema_macro_patch(self, schema_file, macro): # This is just macro patches that need to be reapplied macro_unique_id = None @@ -970,7 +902,6 @@ def remove_source_override_target(self, source_dict): (orig_file, orig_source) = self.get_source_override_file_and_dict(source_dict) if orig_source: self.delete_schema_source(orig_file, orig_source) - self.remove_tests(orig_file, "sources", orig_source["name"]) self.merge_patch(orig_file, "sources", orig_source) self.add_to_pp_files(orig_file)