-
Notifications
You must be signed in to change notification settings - Fork 1.7k
/
Copy pathsql.py
110 lines (90 loc) · 3.59 KB
/
sql.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
import traceback
from abc import abstractmethod
from datetime import datetime
from typing import Generic, TypeVar
import dbt.exceptions
import dbt_common.exceptions.base
from dbt.contracts.graph.manifest import Manifest
from dbt.contracts.sql import (
RemoteCompileResult,
RemoteCompileResultMixin,
RemoteRunResult,
ResultTable,
)
from dbt.events.types import SQLRunnerException
from dbt.task.compile import CompileRunner
from dbt_common.events.functions import fire_event
SQLResult = TypeVar("SQLResult", bound=RemoteCompileResultMixin)
class GenericSqlRunner(CompileRunner, Generic[SQLResult]):
def __init__(self, config, adapter, node, node_index, num_nodes) -> None:
CompileRunner.__init__(self, config, adapter, node, node_index, num_nodes)
def handle_exception(self, e, ctx):
fire_event(
SQLRunnerException(
exc=str(e), exc_info=traceback.format_exc(), node_info=self.node.node_info
)
)
# REVIEW: This code is invalid and will always throw.
if isinstance(e, dbt.exceptions.Exception):
if isinstance(e, dbt_common.exceptions.DbtRuntimeError):
e.add_node(ctx.node)
return e
def before_execute(self) -> None:
pass
def after_execute(self, result) -> None:
pass
def compile(self, manifest: Manifest):
return self.compiler.compile_node(self.node, manifest, {}, write=False)
@abstractmethod
def execute(self, compiled_node, manifest) -> SQLResult:
pass
@abstractmethod
def from_run_result(self, result, start_time, timing_info) -> SQLResult:
pass
def error_result(self, node, error, start_time, timing_info):
raise error
def ephemeral_result(self, node, start_time, timing_info):
raise dbt_common.exceptions.base.NotImplementedError(
"cannot execute ephemeral nodes remotely!"
)
class SqlCompileRunner(GenericSqlRunner[RemoteCompileResult]):
def execute(self, compiled_node, manifest) -> RemoteCompileResult:
return RemoteCompileResult(
raw_code=compiled_node.raw_code,
compiled_code=compiled_node.compiled_code,
node=compiled_node,
timing=[], # this will get added later
generated_at=datetime.utcnow(),
)
def from_run_result(self, result, start_time, timing_info) -> RemoteCompileResult:
return RemoteCompileResult(
raw_code=result.raw_code,
compiled_code=result.compiled_code,
node=result.node,
timing=timing_info,
generated_at=datetime.utcnow(),
)
class SqlExecuteRunner(GenericSqlRunner[RemoteRunResult]):
def execute(self, compiled_node, manifest) -> RemoteRunResult:
_, execute_result = self.adapter.execute(compiled_node.compiled_code, fetch=True)
table = ResultTable(
column_names=list(execute_result.column_names),
rows=[list(row) for row in execute_result],
)
return RemoteRunResult(
raw_code=compiled_node.raw_code,
compiled_code=compiled_node.compiled_code,
node=compiled_node,
table=table,
timing=[],
generated_at=datetime.utcnow(),
)
def from_run_result(self, result, start_time, timing_info) -> RemoteRunResult:
return RemoteRunResult(
raw_code=result.raw_code,
compiled_code=result.compiled_code,
node=result.node,
table=result.table,
timing=timing_info,
generated_at=datetime.utcnow(),
)