-
Notifications
You must be signed in to change notification settings - Fork 21
/
Copy pathcreate_predictor.py
154 lines (129 loc) · 5.74 KB
/
create_predictor.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
import json
from mindsdb_sql.parser.ast.base import ASTNode
from mindsdb_sql.parser.utils import indent
from mindsdb_sql.parser.ast.select import Identifier
from mindsdb_sql.parser.ast.select.operation import Object
class CreatePredictorBase(ASTNode):
def __init__(self,
name,
targets=None,
integration_name=None,
query_str=None,
datasource_name=None,
order_by=None,
group_by=None,
window=None,
horizon=None,
using=None,
is_replace=False,
if_not_exists=False,
task=None,
*args, **kwargs):
super().__init__(*args, **kwargs)
self.name = name
self.integration_name = integration_name
self.query_str = query_str
self.targets = targets
self.datasource_name = datasource_name
self.order_by = order_by
self.group_by = group_by
self.window = window
self.horizon = horizon
self.using = using
self.is_replace = is_replace
self.if_not_exists = if_not_exists
self.task = task
def to_tree(self, *args, level=0, **kwargs):
ind = indent(level)
ind1 = indent(level+1)
name_str = f'\n{ind1}name={self.name.to_tree()},'
if self.integration_name is not None:
integration_name_str = f'\n{ind1}integration_name={self.integration_name.to_tree()},'
else:
integration_name_str = 'None'
query_str = f'\n{ind1}query={self.query_str},'
datasource_name_str = ''
if self.datasource_name:
datasource_name_str = f'\n{ind1}datasource_name={self.datasource_name.to_tree()},'
if self.targets is not None:
target_trees = ',\n'.join([t.to_tree(level=level+2) for t in self.targets])
targets_str = f'\n{ind1}targets=[\n{target_trees}\n{ind1}],'
else:
targets_str = ''
group_by_str = ''
if self.group_by:
group_by_trees = ',\n'.join([t.to_tree(level=level+2) for t in self.group_by])
group_by_str = f'\n{ind1}group_by=[\n{group_by_trees}\n{ind1}],'
order_by_str = ''
if self.order_by:
order_by_trees = ',\n'.join([t.to_tree(level=level + 2) for t in self.order_by])
order_by_str = f'\n{ind1}order_by=[\n{order_by_trees}\n{ind1}],'
window_str = f'\n{ind1}window={repr(self.window)},'
horizon_str = f'\n{ind1}horizon={repr(self.horizon)},'
using_str = f'\n{ind1}using={repr(self.using)},'
if_not_exists_str = f'\n{ind1}if_not_exists={self.if_not_exists},' if self.if_not_exists else ''
out_str = f'{ind}{self.__class__.__name__}(' \
f'{if_not_exists_str}' \
f'{name_str}' \
f'{integration_name_str}' \
f'{query_str}' \
f'{datasource_name_str}' \
f'{targets_str}' \
f'{order_by_str}' \
f'{group_by_str}' \
f'{window_str}' \
f'{horizon_str}' \
f'{using_str}' \
f'\n{ind})'
return out_str
def get_string(self, *args, **kwargs):
if self.targets is not None:
targets_str = 'PREDICT ' + ', '.join([out.to_string() for out in self.targets])
else:
targets_str = ''
order_by_str = f'ORDER BY {", ".join([out.to_string() for out in self.order_by])} ' if self.order_by else ''
group_by_str = f'GROUP BY {", ".join([out.to_string() for out in self.group_by])} ' if self.group_by else ''
window_str = f'WINDOW {self.window} ' if self.window is not None else ''
horizon_str = f'HORIZON {self.horizon} ' if self.horizon is not None else ''
using_str = ''
if self.using:
using_ar = []
for key, value in self.using.items():
if isinstance(value, Object):
args = [
f'{k}={json.dumps(v)}'
for k, v in value.params.items()
]
args_str = ', '.join(args)
value = f'{value.type}({args_str})'
else:
value = json.dumps(value)
using_ar.append(f'{Identifier(key).to_string()}={value}')
using_str = f'USING ' + ', '.join(using_ar)
datasource_name_str = f'AS {self.datasource_name.to_string()} ' if self.datasource_name is not None else ''
query_str = ''
if self.query_str is not None:
query_str = f'({self.query_str}) '
integration_name_str = ''
if self.integration_name is not None:
integration_name_str = f'FROM {self.integration_name.to_string()} '
if_not_exists_str = 'IF NOT EXISTS ' if self.if_not_exists else ''
out_str = f'{self._command} {if_not_exists_str}{self.name.to_string()} {integration_name_str}{query_str}' \
f'{datasource_name_str}' \
f'{targets_str} ' \
f'{order_by_str}' \
f'{group_by_str}' \
f'{window_str}' \
f'{horizon_str}' \
f'{using_str}'
return out_str.strip()
class CreatePredictor(CreatePredictorBase):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self._command = 'CREATE PREDICTOR'
# Models by task type
class CreateAnomalyDetectionModel(CreatePredictorBase):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self._command = 'CREATE ANOMALY DETECTION MODEL'
self.task = Identifier('AnomalyDetection')