-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy path_models.py
191 lines (168 loc) · 5.19 KB
/
_models.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
"""
Models for Arcane
"""
from abc import ABC, abstractmethod
from enum import Enum
from typing import Dict, Optional
from dataclasses import dataclass
from dataclasses_json import DataClassJsonMixin, dataclass_json, LetterCase
class StreamConfiguration(ABC):
"""
Base configuration for all streams.
"""
@abstractmethod
def to_dict(self):
"""
Converts this to the payload accepted by streaming start endpoint.
:return:
"""
@property
@abstractmethod
def url_path(self):
"""
Url path for streams matching this configuration.
:return:
"""
@dataclass
class SqlServerStreamConfiguration(StreamConfiguration):
"""
Stream configuration for Sql Server Change Tracking Source.
"""
connection_string: str
schema: str
table: str
rows_per_group: int
grouping_interval: str
groups_per_file: int
sink_location: str
sink_filename: str
full_load_on_start: bool
client_tag: str
lookback_interval: int = 86400
change_capture_interval: str = "0.00:00:15"
command_timeout: int = 3600
@property
def url_path(self) -> str:
return "start/sqlserverct"
def to_dict(self) -> Dict:
return {
"ConnectionString": self.connection_string,
"Schema": self.schema,
"Table": self.table,
"RowsPerGroup": self.rows_per_group,
"GroupingInterval": self.grouping_interval,
"GroupsPerFile": self.groups_per_file,
"SinkLocation": self.sink_location,
"SinkFileName": self.sink_filename,
"FullLoadOnStart": self.full_load_on_start,
"ClientTag": self.client_tag,
"LookbackInterval": self.lookback_interval,
"ChangeCaptureInterval": self.change_capture_interval,
"CommandTimeout": self.command_timeout
}
@dataclass
class CdmChangeFeedStreamConfiguration(StreamConfiguration):
"""
Stream configuration for Sql Server Change Tracking Source.
"""
storage_account_connection_string: str
base_location: str
entity_name: str
rows_per_group: int
grouping_interval: str
groups_per_file: int
sink_location: str
sink_filename: str
full_load_on_start: bool
client_tag: str
http_client_max_retries: int = 3
http_client_retry_delay: str = "0.00:00:01"
change_capture_interval: str = "0.00:00:15"
@property
def url_path(self) -> str:
return "start/microsoft_cdm"
def to_dict(self) -> Dict:
return {
"StorageAccountConnectionString": self.storage_account_connection_string,
"HttpClientMaxRetries": self.http_client_max_retries,
"HttpClientRetryDelay": self.http_client_retry_delay,
"BaseLocation": self.base_location,
"EntityName": self.entity_name,
"FullLoadOnStart": self.full_load_on_start,
"ChangeCaptureInterval": self.change_capture_interval,
"RowsPerGroup": self.rows_per_group,
"GroupingInterval": self.grouping_interval,
"GroupsPerFile": self.groups_per_file,
"SinkLocation": self.sink_location,
"SinkFileName": self.sink_filename,
"ClientTag": self.client_tag
}
@dataclass
class BigQueryStreamConfiguration(StreamConfiguration):
"""
Stream configuration for Sql Server Change Tracking Source.
"""
project: str
dataset: str
table: str
entity_name: str
secret: str
partition_column_name: str
change_capture_interval: str
lookback_interval: str
full_load_on_start: str
sink_location: str
partition_column_name_format: str
client_tag: str
@property
def url_path(self) -> str:
return "start/bigquery"
def to_dict(self) -> Dict:
return {
"Project": self.project,
"Dataset": self.dataset,
"Table": self.table,
"EntityName": self.entity_name,
"Secret": self.secret,
"PartitionColumnName": self.partition_column_name,
"ChangeCaptureInterval": self.change_capture_interval,
"LookbackInterval": self.lookback_interval,
"FullLoadOnStart": self.full_load_on_start,
"SinkLocation": self.sink_location,
"PartitionColumnNameFormat": self.partition_column_name_format,
"ClientTag": self.client_tag
}
@dataclass_json(letter_case=LetterCase.CAMEL)
@dataclass
class StreamError(DataClassJsonMixin):
"""
Arcane stream failure information.
"""
error_type: str
error_message: str
error_stack: str
@dataclass_json(letter_case=LetterCase.CAMEL)
@dataclass
class StreamInfo(DataClassJsonMixin):
"""
Arcane stream information.
"""
id: str # pylint: disable=C0103
stream_source: str
started_at: str
owner: str
tag: str
stream_configuration: str
stream_metadata: str
stream_state: str
error: StreamError
stopped_at: Optional[str] = None
class StreamState(Enum):
"""
Stream states in Arcane.
"""
RUNNING = 'RUNNING'
STOPPED = 'STOPPED'
TERMINATING = 'TERMINATING'
RESTARTING = 'RESTARTING'
FAILED = 'FAILED'