Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

CT 1808 diff based partial parsing #6873

Merged
merged 19 commits into from
Mar 7, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions .changes/unreleased/Features-20230206-084749.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
kind: Features
body: Enable diff based partial parsing
time: 2023-02-06T08:47:49.688889-05:00
custom:
Author: gshank
Issue: "6592"
7 changes: 1 addition & 6 deletions core/dbt/clients/system.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
from dbt.events.functions import fire_event
from dbt.events.types import (
SystemCouldNotWrite,
SystemErrorRetrievingModTime,
SystemExecutingCmd,
SystemStdOut,
SystemStdErr,
Expand Down Expand Up @@ -77,11 +76,7 @@ def find_matching(
relative_path = os.path.relpath(absolute_path, absolute_path_to_search)
relative_path_to_root = os.path.join(relative_path_to_search, relative_path)

modification_time = 0.0
try:
modification_time = os.path.getmtime(absolute_path)
except OSError:
fire_event(SystemErrorRetrievingModTime(path=absolute_path))
modification_time = os.path.getmtime(absolute_path)
if reobj.match(local_file) and (
not ignore_spec or not ignore_spec.match_file(relative_path_to_root)
):
Expand Down
2 changes: 0 additions & 2 deletions core/dbt/contracts/files.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,8 +61,6 @@ def absolute_path(self) -> str:

@property
def original_file_path(self) -> str:
# this is mostly used for reporting errors. It doesn't show the project
# name, should it?
return os.path.join(self.searched_path, self.relative_path)

def seed_too_large(self) -> bool:
Expand Down
27 changes: 14 additions & 13 deletions core/dbt/events/proto_types.py

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

23 changes: 13 additions & 10 deletions core/dbt/events/types.proto
Original file line number Diff line number Diff line change
Expand Up @@ -762,7 +762,18 @@ message FinishedRunningStatsMsg {

// I - Project parsing

// Skipping I001, I002, I003, I004, I005, I006, I007
// I001
message InputFileDiffError {
string category = 1;
string file_id = 2;
}

message InputFileDiffErrorMsg {
EventInfo info = 1;
InputFileDiffError data = 2;
}

// Skipping I002, I003, I004, I005, I006, I007

// I008
message InvalidValueForField {
Expand Down Expand Up @@ -1808,15 +1819,7 @@ message MainStackTraceMsg {
MainStackTrace data = 2;
}

// Z004
message SystemErrorRetrievingModTime {
string path = 1;
}

message SystemErrorRetrievingModTimeMsg {
EventInfo info = 1;
SystemErrorRetrievingModTime data = 2;
}
// skipping Z004

// Z005
message SystemCouldNotWrite {
Expand Down
19 changes: 11 additions & 8 deletions core/dbt/events/types.py
Original file line number Diff line number Diff line change
Expand Up @@ -768,7 +768,16 @@ def message(self) -> str:
# =======================================================


# Skipping I001, I002, I003, I004, I005, I006, I007
@dataclass
class InputFileDiffError(DebugLevel, pt.InputFileDiffError):
def code(self):
return "I001"

def message(self) -> str:
return f"Error processing file diff: {self.category}, {self.file_id}"


# Skipping I002, I003, I004, I005, I006, I007


@dataclass
Expand Down Expand Up @@ -1891,13 +1900,7 @@ def message(self) -> str:
return self.stack_trace


@dataclass
class SystemErrorRetrievingModTime(ErrorLevel, pt.SystemErrorRetrievingModTime):
def code(self):
return "Z004"

def message(self) -> str:
return f"Error retrieving modification time for file {self.path}"
# Skipped Z004
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It doesn't look like you removed this event from types.proto.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

pfft. Good catch. Have removed.



@dataclass
Expand Down
57 changes: 45 additions & 12 deletions core/dbt/parser/manifest.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,14 +40,19 @@
from dbt.node_types import NodeType, AccessType
from dbt.clients.jinja import get_rendered, MacroStack
from dbt.clients.jinja_static import statically_extract_macro_calls
from dbt.clients.system import make_directory, write_file
from dbt.clients.system import make_directory, path_exists, read_json, write_file
from dbt.config import Project, RuntimeConfig
from dbt.context.docs import generate_runtime_docs_context
from dbt.context.macro_resolver import MacroResolver, TestMacroNamespace
from dbt.context.configured import generate_macro_context
from dbt.context.providers import ParseProvider
from dbt.contracts.files import FileHash, ParseFileType, SchemaSourceFile
from dbt.parser.read_files import read_files, load_source_file
from dbt.parser.read_files import (
ReadFilesFromFileSystem,
load_source_file,
FileDiff,
ReadFilesFromDiff,
)
from dbt.parser.partial import PartialParsing, special_override_macros
from dbt.contracts.graph.manifest import (
Manifest,
Expand Down Expand Up @@ -153,9 +158,11 @@ def __init__(
root_project: RuntimeConfig,
all_projects: Mapping[str, Project],
macro_hook: Optional[Callable[[Manifest], Any]] = None,
file_diff: Optional[FileDiff] = None,
) -> None:
self.root_project: RuntimeConfig = root_project
self.all_projects: Mapping[str, Project] = all_projects
self.file_diff = file_diff
self.manifest: Manifest = Manifest()
self.new_manifest = self.manifest
self.manifest.metadata = root_project.get_metadata()
Expand Down Expand Up @@ -190,6 +197,7 @@ def get_full_manifest(
cls,
config: RuntimeConfig,
*,
file_diff: Optional[FileDiff] = None,
reset: bool = False,
write_perf_info=False,
) -> Manifest:
Expand All @@ -202,12 +210,19 @@ def get_full_manifest(
adapter.clear_macro_manifest()
macro_hook = adapter.connections.set_query_header

# Hack to test file_diffs
if os.environ.get("DBT_PP_FILE_DIFF_TEST"):
file_diff_path = "file_diff.json"
if path_exists(file_diff_path):
file_diff_dct = read_json(file_diff_path)
file_diff = FileDiff.from_dict(file_diff_dct)

with PARSING_STATE: # set up logbook.Processor for parsing
# Start performance counting
start_load_all = time.perf_counter()

projects = config.load_dependencies()
loader = cls(config, projects, macro_hook)
loader = cls(config, projects, macro_hook=macro_hook, file_diff=file_diff)

manifest = loader.load()

Expand All @@ -229,17 +244,35 @@ def get_full_manifest(

# This is where the main action happens
def load(self):
# Read files creates a dictionary of projects to a dictionary
start_read_files = time.perf_counter()

# This updates the "files" dictionary in self.manifest, and creates
# the partial_parser_files dictionary (see read_files.py),
# which is a dictionary of projects to a dictionary
# of parsers to lists of file strings. The file strings are
# used to get the SourceFiles from the manifest files.
start_read_files = time.perf_counter()
project_parser_files = {}
saved_files = {}
if self.saved_manifest:
saved_files = self.saved_manifest.files
for project in self.all_projects.values():
read_files(project, self.manifest.files, project_parser_files, saved_files)
orig_project_parser_files = project_parser_files
saved_files = self.saved_manifest.files if self.saved_manifest else {}
if self.file_diff:
# We're getting files from a file diff
file_reader = ReadFilesFromDiff(
all_projects=self.all_projects,
files=self.manifest.files,
saved_files=saved_files,
root_project_name=self.root_project.project_name,
file_diff=self.file_diff,
)
else:
# We're getting files from the file system
file_reader = ReadFilesFromFileSystem(
all_projects=self.all_projects,
files=self.manifest.files,
saved_files=saved_files,
)

# Set the files in the manifest and save the project_parser_files
file_reader.read_files()
self.manifest.files = file_reader.files
project_parser_files = orig_project_parser_files = file_reader.project_parser_files
self._perf_info.path_count = len(self.manifest.files)
self._perf_info.read_files_elapsed = time.perf_counter() - start_read_files

Expand Down
33 changes: 9 additions & 24 deletions core/dbt/parser/partial.py
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,6 @@ def __init__(self, saved_manifest: Manifest, new_files: MutableMapping[str, AnyS
self.project_parser_files: Dict = {}
self.saved_files = self.saved_manifest.files
self.project_parser_files = {}
self.deleted_manifest = Manifest()
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I removed the deleted_manifest because it's not necessary. Originally I thought it might be helpful for diagnosing, but it wasn't.

self.macro_child_map: Dict[str, List[str]] = {}
(
self.env_vars_changed_source_files,
Expand Down Expand Up @@ -268,7 +267,7 @@ def delete_from_saved(self, file_id):
# macros/tests
if saved_source_file.parse_file_type in mssat_files:
self.remove_mssat_file(saved_source_file)
self.deleted_manifest.files[file_id] = self.saved_manifest.files.pop(file_id)
self.saved_manifest.files.pop(file_id)

# macros
if saved_source_file.parse_file_type in mg_files:
Expand Down Expand Up @@ -311,7 +310,6 @@ def update_mssat_in_saved(self, new_source_file, old_source_file):

# replace source_file in saved and add to parsing list
file_id = new_source_file.file_id
self.deleted_manifest.files[file_id] = old_source_file
self.saved_files[file_id] = deepcopy(new_source_file)
self.add_to_pp_files(new_source_file)
for unique_id in unique_ids:
Expand All @@ -321,7 +319,6 @@ def remove_node_in_saved(self, source_file, unique_id):
if unique_id in self.saved_manifest.nodes:
# delete node in saved
node = self.saved_manifest.nodes.pop(unique_id)
self.deleted_manifest.nodes[unique_id] = node
elif (
source_file.file_id in self.disabled_by_file_id
and unique_id in self.saved_manifest.disabled
Expand Down Expand Up @@ -456,7 +453,7 @@ def delete_macro_file(self, source_file, follow_references=False):
file_id = source_file.file_id
# It's not clear when this file_id would not exist in saved_files
if file_id in self.saved_files:
self.deleted_manifest.files[file_id] = self.saved_files.pop(file_id)
self.saved_files.pop(file_id)

def check_for_special_deleted_macros(self, source_file):
for unique_id in source_file.macros:
Expand Down Expand Up @@ -487,7 +484,6 @@ def handle_macro_file_links(self, source_file, follow_references=False):
continue

base_macro = self.saved_manifest.macros.pop(unique_id)
self.deleted_manifest.macros[unique_id] = base_macro

# Recursively check children of this macro
# The macro_child_map might not exist if a macro is removed by
Expand Down Expand Up @@ -565,16 +561,14 @@ def delete_doc_node(self, source_file):
# remove the nodes in the 'docs' dictionary
docs = source_file.docs.copy()
for unique_id in docs:
self.deleted_manifest.docs[unique_id] = self.saved_manifest.docs.pop(unique_id)
self.saved_manifest.docs.pop(unique_id)
source_file.docs.remove(unique_id)
# The unique_id of objects that contain a doc call are stored in the
# doc source_file.nodes
self.schedule_nodes_for_parsing(source_file.nodes)
source_file.nodes = []
# Remove the file object
self.deleted_manifest.files[source_file.file_id] = self.saved_manifest.files.pop(
source_file.file_id
)
self.saved_manifest.files.pop(source_file.file_id)

# Schema files -----------------------
# Changed schema files
Expand Down Expand Up @@ -608,7 +602,7 @@ def delete_schema_file(self, file_id):
saved_yaml_dict = saved_schema_file.dict_from_yaml
new_yaml_dict = {}
self.handle_schema_file_changes(saved_schema_file, saved_yaml_dict, new_yaml_dict)
self.deleted_manifest.files[file_id] = self.saved_manifest.files.pop(file_id)
self.saved_manifest.files.pop(file_id)

# For each key in a schema file dictionary, process the changed, deleted, and added
# elemnts for the key lists
Expand Down Expand Up @@ -876,8 +870,7 @@ def remove_tests(self, schema_file, dict_key, name):
tests = schema_file.get_tests(dict_key, name)
for test_unique_id in tests:
if test_unique_id in self.saved_manifest.nodes:
node = self.saved_manifest.nodes.pop(test_unique_id)
self.deleted_manifest.nodes[test_unique_id] = node
self.saved_manifest.nodes.pop(test_unique_id)
schema_file.remove_tests(dict_key, name)

def delete_schema_source(self, schema_file, source_dict):
Expand All @@ -892,7 +885,6 @@ def delete_schema_source(self, schema_file, source_dict):
source = self.saved_manifest.sources[unique_id]
if source.source_name == source_name:
source = self.saved_manifest.sources.pop(unique_id)
self.deleted_manifest.sources[unique_id] = source
schema_file.sources.remove(unique_id)
self.schedule_referencing_nodes_for_parsing(unique_id)

Expand All @@ -904,7 +896,6 @@ def delete_schema_macro_patch(self, schema_file, macro):
del schema_file.macro_patches[macro["name"]]
if macro_unique_id and macro_unique_id in self.saved_manifest.macros:
macro = self.saved_manifest.macros.pop(macro_unique_id)
self.deleted_manifest.macros[macro_unique_id] = macro
macro_file_id = macro.file_id
if macro_file_id in self.new_files:
self.saved_files[macro_file_id] = deepcopy(self.new_files[macro_file_id])
Expand All @@ -919,9 +910,7 @@ def delete_schema_exposure(self, schema_file, exposure_dict):
if unique_id in self.saved_manifest.exposures:
exposure = self.saved_manifest.exposures[unique_id]
if exposure.name == exposure_name:
self.deleted_manifest.exposures[unique_id] = self.saved_manifest.exposures.pop(
unique_id
)
self.saved_manifest.exposures.pop(unique_id)
schema_file.exposures.remove(unique_id)
elif unique_id in self.saved_manifest.disabled:
self.delete_disabled(unique_id, schema_file.file_id)
Expand All @@ -935,9 +924,7 @@ def delete_schema_group(self, schema_file, group_dict):
group = self.saved_manifest.groups[unique_id]
if group.name == group_name:
self.schedule_nodes_for_parsing(self.saved_manifest.group_map[group.name])
self.deleted_manifest.groups[unique_id] = self.saved_manifest.groups.pop(
unique_id
)
self.saved_manifest.groups.pop(unique_id)
schema_file.groups.remove(unique_id)

# metrics are created only from schema files, but also can be referred to by other nodes
Expand All @@ -951,9 +938,7 @@ def delete_schema_metric(self, schema_file, metric_dict):
# Need to find everything that referenced this metric and schedule for parsing
if unique_id in self.saved_manifest.child_map:
self.schedule_nodes_for_parsing(self.saved_manifest.child_map[unique_id])
self.deleted_manifest.metrics[unique_id] = self.saved_manifest.metrics.pop(
unique_id
)
self.saved_manifest.metrics.pop(unique_id)
schema_file.metrics.remove(unique_id)
elif unique_id in self.saved_manifest.disabled:
self.delete_disabled(unique_id, schema_file.file_id)
Expand Down
Loading