-
Notifications
You must be signed in to change notification settings - Fork 9
/
Copy pathcve.py
228 lines (173 loc) · 6.47 KB
/
cve.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
#! /usr/bin/env python3
from dataclasses import dataclass
from datetime import datetime, UTC
from functools import cached_property, lru_cache
from json import load as json_load
from pathlib import Path
from time import gmtime
from typing import Self
import logging
import re
log = logging.getLogger(__name__)
cve_id_re = re.compile(r"^CVE-(\d+)-(\d+)$")
def is_valid_id(cve_id: str) -> bool:
"""Checks whether a string is a valid CVE identifier."""
return cve_id_re.match(cve_id) is not None
def parse_id(cve_id: str) -> tuple[int, int]:
"""Parses a CVE identifier string into its components.
Returns a tuple with the year and serial numbers extracted from the string.
"""
id_parts_match = cve_id_re.match(cve_id)
if id_parts_match is None:
raise ValueError(f"Invalid CVE string: {cve_id!r}")
return (int(id_parts_match[1]), int(id_parts_match[2]))
def unparse_id(cve_id: tuple[int, int]) -> str:
"""Formats a year in serial number into a CVE identifier."""
return f"CVE-{cve_id[0]:04}-{cve_id[1]:04}"
@dataclass(frozen=True, slots=True)
class Id:
"""Represents a CVE identifier."""
year: int
serial: int
is_valid = is_valid_id
@classmethod
def parse(cls, cve_id: str) -> Self:
"""Parses a CVE identifier string into an identifier."""
parsed = parse_id(cve_id)
return cls(year=parsed[0], serial=parsed[1])
def __str__(self):
"""Represents a CVE identifier as a string."""
return unparse_id((self.year, self.serial))
class Entry:
"""Convenience class to access CVE JSON data."""
_data: dict
def __init__(self, data: dict):
self._data = data
@cached_property
def identifier(self) -> Id:
"""Identifier for the CVE entry."""
return Id.parse(self._data["cveMetadata"]["cveId"])
@cached_property
def published(self):
"""Whether the CVE entry has been published."""
return self._data["cveMetadata"]["state"] == "PUBLISHED"
@cached_property
def description(self) -> None | str:
"""CVE entry description."""
for container_id, container in self._data.get("containers", {}).items():
for desc in container.get("descriptions", ()):
if isinstance(desc, dict) and desc.get("lang", None) in ("en", "eng"):
return desc["value"]
return None
def path_datetime(p: Path) -> datetime:
mtime = p.stat().st_mtime
ms = 1000 * (mtime - int(mtime))
t = gmtime(mtime)
return datetime(
t.tm_year, t.tm_mon, t.tm_mday, t.tm_hour, t.tm_min, t.tm_sec, int(ms), UTC
)
@dataclass(slots=True)
class CacheEntry:
time: datetime
data: Entry | None = None
class DiskCache:
__path: Path
def __init__(self, path: Path):
self.__path = path.absolute()
if self.__path.exists():
assert self.__path.is_dir()
else:
self.__path.mkdir(parents=True, exist_ok=True)
self._data = None
def get_json_path(self, cve_id: Id) -> Path:
return self.__path / str(cve_id.year) / f"{cve_id!s}.json"
def get(self, cve_id: Id | str) -> None | Entry:
if isinstance(cve_id, str):
cve_id = Id.parse(cve_id)
assert isinstance(cve_id, Id)
entry = self.get_entry(cve_id)
return None if entry is None else entry.data
def get_entry(self, cve_id: Id) -> None | CacheEntry:
json_path = self.get_json_path(cve_id)
if json_path.is_file() or self.materialize_file(cve_id, json_path):
with json_path.open() as fp:
json_data = json_load(fp)
if json_data.get("dataType", None) != "CVE_RECORD":
return None
if json_data.get("dataVersion", None) != "5.1":
return None
return CacheEntry(data=Entry(json_data), time=path_datetime(json_path))
return None
def materialize_file(self, cve_id: Id, json_path: Path) -> bool:
return False
class DiskFetcherCache(DiskCache):
__base_url: str = "https://cveawg.mitre.org/api/cve/{cve_id}"
def __init__(self, path: Path, base_url: str = None):
super().__init__(path)
if base_url is not None:
self.__base_url = base_url
def materialize_file(self, cve_id: Id, json_path: Path) -> bool:
from urllib.error import HTTPError
from urllib.request import urlopen
json_url = self.__base_url.format(cve_id=cve_id)
log.info("%s, fetching: %s", cve_id, json_url)
try:
response = urlopen(json_url)
except HTTPError as err:
log.debug("%s, exception: %s", cve_id, err)
if err.status == 404:
return False
else:
raise
content_encoding = response.getheader("content-encoding")
if content_encoding:
content_encoding = content_encoding.lower()
log.debug("%s, content encoding: %s", cve_id, content_encoding)
if content_encoding in ("gzip", "x-gzip"):
from gzip import GzipFile
response = GzipFile(fileobj=response)
temp_path = json_path.with_suffix(".fetch")
temp_dir = temp_path.parent
try:
temp_dir.mkdir(exist_ok=True)
except Exception as e:
log.error("%s, cannot mkdir '%s': %s", cve_id, temp_dir, e)
raise
try:
with temp_path.open("wb") as fp:
while data := response.read(1024):
fp.write(data)
except Exception as e:
log.error("%s, cannot write '%s': %s", cve_id, temp_path, e)
raise
log.debug("%s, saved: %s", cve_id, json_path)
temp_path.rename(json_path)
return True
class LRUMemDiskFetcherCache(DiskFetcherCache):
@lru_cache
def get_entry(self, cve_id: Id) -> None | CacheEntry:
return super().get_entry(cve_id)
def cache_info(self):
return self.get_entry.cache_info()
def cache_clear(self):
self.get_entry.cache_clear()
__all__ = [
"is_valid_id",
"parse_id",
"unparse_id",
"Id",
"Entry",
"CacheEntry",
"DiskCache",
"DiskFetcherCache",
"LRUMemDiskFetcherCache",
]
if __name__ == "__main__":
import sys
logging.basicConfig(level=logging.DEBUG)
cve_id = Id.parse(sys.argv[1])
cc = LRUMemDiskFetcherCache(Path(__file__).parent / "cve")
cve = cc.get(cve_id)
if cve:
print(f"{cve.identifier}: {cve.description}")
raise SystemExit(1)