-
Notifications
You must be signed in to change notification settings - Fork 1.7k
/
Copy pathcompile.py
88 lines (72 loc) · 2.82 KB
/
compile.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
import threading
from typing import AbstractSet, Optional
from .runnable import GraphRunnableTask
from .base import BaseRunner
from dbt.contracts.graph.manifest import WritableManifest
from dbt.contracts.results import RunStatus, RunResult
from dbt.exceptions import InternalException, RuntimeException
from dbt.graph import ResourceTypeSelector
from dbt.events.functions import fire_event
from dbt.events.types import CompileComplete
from dbt.node_types import NodeType
class CompileRunner(BaseRunner):
def before_execute(self):
pass
def after_execute(self, result):
pass
def execute(self, compiled_node, manifest):
return RunResult(
node=compiled_node,
status=RunStatus.Success,
timing=[],
thread_id=threading.current_thread().name,
execution_time=0,
message=None,
adapter_response={},
failures=None,
)
def compile(self, manifest):
compiler = self.adapter.get_compiler()
return compiler.compile_node(self.node, manifest, {})
class CompileTask(GraphRunnableTask):
def raise_on_first_error(self):
return True
def get_node_selector(self) -> ResourceTypeSelector:
if self.manifest is None or self.graph is None:
raise InternalException("manifest and graph must be set to get perform node selection")
return ResourceTypeSelector(
graph=self.graph,
manifest=self.manifest,
previous_state=self.previous_state,
resource_types=NodeType.executable(),
)
def get_runner_type(self, _):
return CompileRunner
def task_end_messages(self, results):
fire_event(CompileComplete())
def _get_deferred_manifest(self) -> Optional[WritableManifest]:
if not self.args.defer:
return None
state = self.previous_state
if state is None:
raise RuntimeException(
"Received a --defer argument, but no value was provided to --state"
)
if state.manifest is None:
raise RuntimeException(f'Could not find manifest in --state path: "{self.args.state}"')
return state.manifest
def defer_to_manifest(self, adapter, selected_uids: AbstractSet[str]):
deferred_manifest = self._get_deferred_manifest()
if deferred_manifest is None:
return
if self.manifest is None:
raise InternalException(
"Expected to defer to manifest, but there is no runtime manifest to defer from!"
)
self.manifest.merge_from_artifact(
adapter=adapter,
other=deferred_manifest,
selected=selected_uids,
)
# TODO: is it wrong to write the manifest here? I think it's right...
self.write_manifest()