-
Notifications
You must be signed in to change notification settings - Fork 19
/
Copy pathplugin.py
145 lines (118 loc) · 4.37 KB
/
plugin.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
import json
from typing import Dict, Any, TextIO
from _pytest.pathlib import Path
import pytest
def pytest_addoption(parser):
group = parser.getgroup("terminal reporting", "report-log plugin options")
group.addoption(
"--report-log",
action="store",
metavar="path",
default=None,
help="Path to line-based json objects of test session events.",
)
group.addoption(
"--report-log-exclude-logs-on-passed-tests",
action="store_true",
default=False,
help="Don't capture logs for passing tests",
)
def pytest_configure(config):
report_log = config.option.report_log
if report_log and not hasattr(config, "workerinput"):
config._report_log_plugin = ReportLogPlugin(config, Path(report_log))
config.pluginmanager.register(config._report_log_plugin)
def pytest_unconfigure(config):
report_log_plugin = getattr(config, "_report_log_plugin", None)
if report_log_plugin:
report_log_plugin.close()
del config._report_log_plugin
def _open_filtered_writer(log_path: Path) -> TextIO:
if log_path.suffix == ".gz":
import gzip
return gzip.open(log_path, "wt", encoding="UTF-8")
elif log_path.suffix == ".bz2":
import bz2
return bz2.open(log_path, "wt", encoding="UTF-8")
elif log_path.suffix == ".xz":
import lzma
return lzma.open(log_path, "wt", encoding="UTF-8")
else:
# line buffer for text mode to ease tail -f
return log_path.open("wt", buffering=1, encoding="UTF-8")
class ReportLogPlugin:
def __init__(self, config, log_path: Path):
self._config = config
self._log_path = log_path
log_path.parent.mkdir(parents=True, exist_ok=True)
self._file = _open_filtered_writer(log_path)
def close(self):
if self._file is not None:
self._file.close()
self._file = None
def _write_json_data(self, data):
try:
json_data = json.dumps(data)
except TypeError:
data = cleanup_unserializable(data)
json_data = json.dumps(data)
self._file.write(json_data + "\n")
self._file.flush()
def pytest_sessionstart(self):
data = {"pytest_version": pytest.__version__, "$report_type": "SessionStart"}
self._write_json_data(data)
def pytest_sessionfinish(self, exitstatus):
data = {"exitstatus": exitstatus, "$report_type": "SessionFinish"}
self._write_json_data(data)
def pytest_runtest_logreport(self, report):
data = self._config.hook.pytest_report_to_serializable(
config=self._config, report=report
)
if (
self._config.option.report_log_exclude_logs_on_passed_tests
and data.get("outcome", "") == "passed"
):
data["sections"] = [
s
for s in data["sections"]
if s[0]
not in [
"Captured log setup",
"Captured log call",
"Captured log teardown",
]
]
self._write_json_data(data)
def pytest_warning_recorded(self, warning_message, when, nodeid, location):
data = {
"category": (
warning_message.category.__name__ if warning_message.category else None
),
"filename": warning_message.filename,
"lineno": warning_message.lineno,
"message": warning_message.message,
}
extra_data = {
"$report_type": "WarningMessage",
"when": when,
"location": location,
}
data.update(extra_data)
self._write_json_data(data)
def pytest_collectreport(self, report):
data = self._config.hook.pytest_report_to_serializable(
config=self._config, report=report
)
self._write_json_data(data)
def pytest_terminal_summary(self, terminalreporter):
terminalreporter.write_sep("-", f"generated report log file: {self._log_path}")
def cleanup_unserializable(d: Dict[str, Any]) -> Dict[str, Any]:
"""Return new dict with entries that are not json serializable by their str()."""
result = {}
for k, v in d.items():
try:
json.dumps({k: v})
except TypeError:
v = str(v)
result[k] = v
return result