-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathsetuptools_ext.py
264 lines (225 loc) · 9.71 KB
/
setuptools_ext.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
"""Extension of setuptools to support all core metadata fields"""
import base64
import email.policy
import hashlib
import io
import shutil
import sys
import tarfile
import typing
import zipfile
from importlib.metadata import version
from pathlib import Path
from setuptools.build_meta import * # noqa
from setuptools.build_meta import build_sdist as orig_build_sdist
from setuptools.build_meta import build_wheel as orig_build_wheel
if sys.version_info >= (3, 11):
import tomllib
else:
import tomli as tomllib
allowed_fields = {
x.lower(): x
for x in [
"Platform",
"Supported-Platform",
"Download-URL",
"Requires-External",
"Provides-Dist",
"Obsoletes-Dist",
]
}
def parse_extra_metadata():
project = tomllib.loads(Path("pyproject.toml").read_text())
ours = project.get("tool", {}).get("setuptools-ext", {})
extra_metadata = {}
for key, vals in ours.items():
try:
header = allowed_fields[key.lower()]
except KeyError:
print(f"WARNING: ignored an unsupported option {key} = {vals}")
continue
if not isinstance(vals, list):
t = type(vals).__name__
print(f"WARNING: coercing the value of {key} from {t} to list")
vals = [vals]
extra_metadata[header] = vals
return extra_metadata
def build_wheel(wheel_directory, config_settings=None, metadata_directory=None):
extra_metadata = parse_extra_metadata()
whl = orig_build_wheel(wheel_directory, config_settings, metadata_directory)
rewrite_whl(Path(wheel_directory) / whl, extra_metadata)
return whl
def rewrite_metadata(data, extra_metadata):
"""
Rewrite the METADATA file to include the given additional metadata.
"""
pkginfo = email.message_from_bytes(data)
# delete some annoying kv that distutils seems to put in there for no reason
for key in dict(pkginfo):
if pkginfo.get_all(key) == ["UNKNOWN"]:
if key.lower() not in ["metadata-version", "name", "version"]:
del pkginfo[key]
new_headers = extra_metadata.items()
for key, vals in new_headers:
already_present = pkginfo.get_all(key, [])
for val in vals:
if val not in already_present:
pkginfo.add_header(key, val)
policy = email.policy.EmailPolicy(refold_source="none")
result = pkginfo.as_bytes(policy=policy)
return result
def rewrite_archive_metadata(orig_bytes):
lines = orig_bytes.splitlines()
for i, line in enumerate(lines):
if line.startswith(b"Generator: "):
suffix = b" + setuptools-ext (%s)"
line += suffix % version("setuptools-ext").encode()
lines[i] = line
return b"\n".join(lines) + b"\n"
class WheelRecord:
"""
Represents the RECORD file of a wheel, which can be updated with new checksums
using the record_file method.
See also the (limited) spec on RECORD at
https://packaging.python.org/en/latest/specifications/binary-distribution-format/#signed-wheel-files
"""
def __init__(self, record_content: str = ""):
#: Records mapping filename to (hash, length) tuples.
self._records: typing.Dict[str, typing.Tuple[str, str]] = {}
self.update_from_record(record_content)
def update_from_record(
self, record_content: typing.Union[str, "WheelRecord"]
) -> None:
"""
Update this WheelRecord given another WheelRecord, or RECORD contents
"""
if isinstance(record_content, WheelRecord):
record_content = record_content.record_contents()
for line in record_content.splitlines():
path, file_hash, length = line.split(",")
self._records[path] = file_hash, length
def record_file(self, filename, file_content: typing.Union[bytes, str]):
"""
Record the filename and appropriate digests of its contents
"""
if isinstance(file_content, str):
file_content = file_content.encode("utf-8")
digest = hashlib.sha256(file_content).digest()
checksum = base64.urlsafe_b64encode(digest).rstrip(b"=").decode()
self._records[filename] = f"sha256={checksum}", str(len(file_content))
def record_contents(self) -> str:
"""
Output the representation of this WheelRecord as a string, suitable for
writing to a RECORD file.
"""
contents = []
for path, (file_hash, length) in self._records.items():
contents.append(f"{path},{file_hash},{length}")
return "\n".join(contents)
class WheelModifier:
"""
Representation of an existing wheel with lazily modified contents that
can be written on-demand with the write_wheel method.
"""
def __init__(self, wheel_zipfile: zipfile.ZipFile):
self._wheel_zipfile = wheel_zipfile
# Track updated file contents.
self._updates: typing.Dict[str, typing.Tuple[zipfile.ZipInfo, bytes]] = {}
def dist_info_dirname(self):
for filename in self._wheel_zipfile.namelist():
if filename.endswith(".dist-info/METADATA"):
return filename.rsplit("/", 1)[0]
def read(self, filename: str) -> bytes:
if filename in self._updates:
return self._updates[filename][1]
else:
return self._wheel_zipfile.read(filename)
def zipinfo(self, filename: str) -> zipfile.ZipInfo:
if filename in self._updates:
return self._updates[filename][0]
return self._wheel_zipfile.getinfo(filename)
def write(
self,
filename: typing.Union[str, zipfile.ZipInfo],
content: bytes,
) -> None:
zinfo: zipfile.ZipInfo
if isinstance(filename, zipfile.ZipInfo):
zinfo = filename
else:
try:
zinfo = self.zipinfo(filename)
except KeyError:
raise ValueError(
f"Unable to write filename {filename} as there is no existing "
"file information in the archive. Please provide a zipinfo "
"instance when writing."
)
self._updates[typing.cast(str, zinfo.filename)] = zinfo, content
def write_wheel(self, file: typing.Union[str, Path, typing.IO[bytes]]) -> None:
distinfo_dir = self.dist_info_dirname()
record_filename = f"{distinfo_dir}/RECORD"
orig_record = WheelRecord(self.read(record_filename).decode())
with zipfile.ZipFile(file, "w") as z_out:
for zinfo in self._wheel_zipfile.infolist():
if zinfo.filename == record_filename:
# We deal with record last.
continue
if zinfo.filename in self._updates:
zinfo, content = self._updates.pop(zinfo.filename)
orig_record.record_file(zinfo.filename, content)
else:
content = self._wheel_zipfile.read(zinfo.filename)
z_out.writestr(zinfo, content)
for zinfo, content in self._updates.values():
orig_record.record_file(zinfo.filename, content)
z_out.writestr(zinfo, content)
record_zinfo = self._wheel_zipfile.getinfo(record_filename)
z_out.writestr(record_zinfo, orig_record.record_contents())
def rewrite_whl(path, extra_metadata):
"""Add extra fields into the wheel's METADATA file"""
# It would be a lot simpler if we could have just dropped the additional fields
# into the dist-info as part of the `prepare_metadata_for_build_wheel` hook.
# however, setuptools ignores this .dist-info directory when building the
# wheel, and regenerates the metadata again:
# https://github.com/pypa/setuptools/blob/v62.1.0/setuptools/build_meta.py#L241-L245
# that's potentially a setuptools bug (seems like a PEP 517 violation), so it might
# be changed later on, but unfortunately for now the only option is to rewrite the
# generated .whl with our modifications
tmppath = path.parent.joinpath("." + path.name)
with zipfile.ZipFile(str(path), "r") as whl_zip:
whl = WheelModifier(whl_zip)
metadata_filename = f"{whl.dist_info_dirname()}/METADATA"
metadata = rewrite_metadata(whl.read(metadata_filename), extra_metadata)
whl.write(metadata_filename, metadata)
archive_metadata_filename = f"{whl.dist_info_dirname()}/WHEEL"
archive_metadata = rewrite_archive_metadata(whl.read(archive_metadata_filename))
whl.write(archive_metadata_filename, archive_metadata)
with tmppath.open("wb") as whl_fh:
whl.write_wheel(whl_fh)
shutil.move(tmppath, path)
def rewrite_sdist(path, extra_metadata):
orig_sdist = path.parent.joinpath("orig_" + path.name)
shutil.move(path, orig_sdist)
with tarfile.open(orig_sdist, "r:gz") as tf_in:
with tarfile.open(path, "w:gz", format=tarfile.PAX_FORMAT) as tf_out:
for tarinfo in tf_in.getmembers():
obj = tf_in.extractfile(tarinfo)
if obj is None:
tf_out.addfile(tarinfo)
continue
content = obj.read()
if tarinfo.name.endswith("/PKG-INFO"):
content = rewrite_metadata(content, extra_metadata)
tarinfo.size = len(content)
tf_out.addfile(tarinfo, io.BytesIO(content))
orig_sdist.unlink()
def build_sdist(sdist_directory, config_settings=None):
result = orig_build_sdist(sdist_directory, config_settings=config_settings)
extra_metadata = parse_extra_metadata()
if not extra_metadata:
# nothing to do
return result
path = Path(sdist_directory) / result
rewrite_sdist(path, extra_metadata)
return result