-
Notifications
You must be signed in to change notification settings - Fork 2
/
Copy pathexport_log.py
311 lines (269 loc) · 10.8 KB
/
export_log.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
"""
Parsing for bulk export log files
https://github.com/smart-on-fhir/bulk-data-client/wiki/Bulk-Data-Export-Log-Items
"""
import datetime
import json
import os
import re
import uuid
import httpx
import cumulus_etl
from cumulus_etl import common, errors, fhir, store
class BulkExportLogParser:
"""
Parses the log file generated by bulk exports.
These are the assumptions we make:
- There will be a log.ndjson or log.*.ndjson file in the given folder.
- There cannot be multiples (unless log.ndjson exists, in which case we always use that)
- That log file will be for a single export.
- e.g. We will generally grab the last "kickoff" event and ignore others.
"""
class LogParsingError(Exception):
pass
class IncompleteLog(LogParsingError):
pass
class MultipleLogs(LogParsingError):
pass
class NoLogs(LogParsingError):
pass
def __init__(self, root: store.Root):
self.group_name: str = None
self.export_datetime: datetime.datetime = None
self.export_url: str = None
self._parse(root, self._find(root))
def _parse(self, root: store.Root, path: str) -> None:
# Go through every row, looking for the final kickoff event.
# We only want to look at one series of events for one bulk export.
# So we pick the last one, in case there are multiple in the log.
# Those early events might be false starts.
export_id = None
for row in common.read_ndjson(root, path):
if row.get("eventId") == "kickoff":
export_id = row.get("exportId")
if not export_id:
raise self.IncompleteLog(f"No kickoff event found in '{path}'")
# Now read through the log file again, only looking for the events from the one export.
try:
for row in common.read_ndjson(root, path):
if row.get("exportId") != export_id:
continue
match row.get("eventId"):
case "kickoff":
self._parse_kickoff(row)
case "status_complete":
self._parse_status_complete(row)
except KeyError as exc:
raise self.IncompleteLog(f"Error parsing '{path}'") from exc
if self.export_datetime is None:
raise self.IncompleteLog(f"No status_complete event found in '{path}'")
def _parse_kickoff(self, row: dict) -> None:
details = row["eventDetail"]
self.group_name = fhir.FhirUrl(details["exportUrl"]).group
self.export_url = details["exportUrl"]
def _parse_status_complete(self, row: dict) -> None:
details = row["eventDetail"]
self.export_datetime = datetime.datetime.fromisoformat(details["transactionTime"])
def _find(self, root: store.Root) -> str:
"""Finds the log file inside the root"""
try:
paths = root.ls()
except FileNotFoundError as exc:
raise self.NoLogs("Folder does not exist") from exc
filenames = {os.path.basename(p): p for p in paths}
# In the easy case, it's just sitting there at log.ndjson,
# which is the filename that bulk-data-client uses.
# Because this is the standard name, we prefer this and don't
# error out even if there are other log.something.ndjson names in
# the folder (see below). Maybe this is a symlink to the most recent...
if full_path := filenames.get("log.ndjson"):
return full_path
# But possibly the user does some file renaming to manage different
# exports, so allow log.something.ndjson as well. (Much like we do
# for the input ndjson files.)
pattern = re.compile(r"log\..+\.ndjson")
log_files = list(filter(pattern.match, filenames.keys()))
match len(log_files):
case 0:
raise self.NoLogs("No log.ndjson file found")
case 1:
return filenames[log_files[0]]
case _:
raise self.MultipleLogs("Multiple log.*.ndjson files found")
class BulkExportLogWriter:
"""Writes a standard log bulk export file."""
def __init__(self, root: store.Root):
self.root = root
self._export_id = str(uuid.uuid4())
self._filename = root.joinpath("log.ndjson")
self._num_files = 0
self._num_resources = 0
self._num_bytes = 0
self._start_time = None
def _event(
self, event_id: str, detail: dict, *, timestamp: datetime.datetime | None = None
) -> None:
timestamp = timestamp or common.datetime_now(local=True)
if self._start_time is None:
self._start_time = timestamp
# We open the file anew for each event because:
# a) logging should be flushed often to disk
# b) it makes the API of the class easier by avoiding a context manager
with self.root.fs.open(self._filename, "a", encoding="utf8") as f:
row = {
"exportId": self._export_id,
"timestamp": timestamp.isoformat(),
"eventId": event_id,
"eventDetail": detail,
}
if event_id == "kickoff":
# The bulk logging spec says we can add whatever other keys we want,
# but does not encourage a namespace to separate them or anything.
# We use a sunder prefix, just in case the spec wants to add new keys itself.
row["_client"] = "cumulus-etl"
row["_clientVersion"] = cumulus_etl.__version__
json.dump(row, f)
f.write("\n")
@staticmethod
def _body(response: httpx.Response) -> dict | str:
try:
parsed = response.json()
if isinstance(parsed, dict):
return parsed
except json.JSONDecodeError:
pass # fall back to text
return response.text
@staticmethod
def _response_info(response: httpx.Response) -> dict:
return {
"body": BulkExportLogWriter._body(response),
"code": response.status_code,
"responseHeaders": dict(response.headers),
}
@staticmethod
def _error_info(exc: Exception) -> dict:
"""Merge the returned dictionary into an event detail object"""
info = {
"body": None,
"code": None,
"message": str(exc),
"responseHeaders": None,
}
if isinstance(exc, errors.NetworkError):
info.update(BulkExportLogWriter._response_info(exc.response))
return info
def kickoff(self, url: str, capabilities: dict, response: httpx.Response | Exception):
# https://www.hl7.org/fhir/R4/capabilitystatement.html
software = capabilities.get("software", {})
response_info = {}
# Create a "merged" version of the params.
# (Merged in the sense that duplicates are converted to comma separated lists.)
request_headers = {}
for k, v in httpx.URL(url).params.multi_items():
if k in request_headers:
request_headers[k] += f",{v}"
else:
request_headers[k] = v
# Spec says we shouldn't log the `patient` parameter, so strip it here.
request_headers.pop("patient", None)
if isinstance(response, Exception):
response_info = self._error_info(response)
if response_info["body"] is None: # for non-httpx error cases
response_info["body"] = response_info["message"]
else:
response_info = BulkExportLogWriter._response_info(response)
if response.status_code == 202:
response_info["body"] = None
response_info["code"] = None
self._event(
"kickoff",
{
"exportUrl": url,
"softwareName": software.get("name"),
"softwareVersion": software.get("version"),
"softwareReleaseDate": software.get("releaseDate"),
"fhirVersion": capabilities.get("fhirVersion"),
"requestParameters": request_headers,
"errorCode": response_info["code"],
"errorBody": response_info["body"],
"responseHeaders": response_info["responseHeaders"],
},
)
def status_progress(self, response: httpx.Response):
self._event(
"status_progress",
{
"body": self._body(response),
"xProgress": response.headers.get("X-Progress"),
"retryAfter": response.headers.get("Retry-After"),
},
)
def status_complete(self, response: httpx.Response):
response_json = response.json()
self._event(
"status_complete",
{
"transactionTime": response_json.get("transactionTime"),
"outputFileCount": len(response_json.get("output", [])),
"deletedFileCount": len(response_json.get("deleted", [])),
"errorFileCount": len(response_json.get("error", [])),
},
)
def status_error(self, exc: Exception):
self._event(
"status_error",
self._error_info(exc),
)
def download_request(
self,
file_url: str | httpx.URL,
item_type: str,
resource_type: str | None,
):
self._event(
"download_request",
{
"fileUrl": str(file_url),
"itemType": item_type,
"resourceType": resource_type,
},
)
def download_complete(
self,
file_url: str | httpx.URL,
resource_count: int | None,
file_size: int,
):
self._num_files += 1
self._num_resources += resource_count or 0
self._num_bytes += file_size
self._event(
"download_complete",
{
"fileUrl": str(file_url),
"resourceCount": resource_count,
"fileSize": file_size,
},
)
def download_error(self, file_url: str | httpx.URL, exc: Exception):
self._event(
"download_error",
{
"fileUrl": str(file_url),
**self._error_info(exc),
},
)
def export_complete(self):
timestamp = common.datetime_now(local=True)
duration = (timestamp - self._start_time) if self._start_time else 0
self._event(
"export_complete",
{
"files": self._num_files,
"resources": self._num_resources,
"bytes": self._num_bytes,
"attachments": None,
"duration": duration.microseconds // 1000,
},
timestamp=timestamp,
)