-
Notifications
You must be signed in to change notification settings - Fork 24
/
Copy pathconftest.py
509 lines (407 loc) · 17.9 KB
/
conftest.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
import copy
import io
import os
import socket
import threading
import uuid
# Workaround being able to use freezegun with pandas.
# https://github.com/spulec/freezegun/issues/98
import pandas # noqa F401
import paramiko
import patchy
import pytest
from cryptography.hazmat.primitives import serialization
from cryptography.hazmat.primitives.asymmetric import rsa
from django.conf import settings
from django.contrib.gis.db.models.fields import get_srid_info
from django.core import management
from django.core.cache import cache
from django.core.files.storage import default_storage, storages
from django.core.management import call_command
from django.db import connection
from django.template import base as base_template
from django.test import override_settings
from factory import Faker
from paramiko import ServerInterface
from pytest_django.lazy_django import django_settings_is_configured
from pytest_django.plugin import INVALID_TEMPLATE_VARS_ENV
# Rewrite before importing itou code.
pytest.register_assert_rewrite("tests.utils.test", "tests.utils.htmx.test")
from itou.utils import faker_providers # noqa: E402
from itou.utils.storage.s3 import s3_client # noqa: E402
from tests.utils.htmx.test import HtmxClient # noqa: E402
from tests.utils.test import ItouClient # noqa: E402
@pytest.hookimpl(tryfirst=True)
def pytest_collection_modifyitems(config, items):
"""Automatically add pytest db marker if needed."""
for item in items:
markers = {marker.name for marker in item.iter_markers()}
if "no_django_db" not in markers and "django_db" not in markers:
item.add_marker(pytest.mark.django_db)
@pytest.hookimpl(trylast=True)
def pytest_configure(config) -> None:
# Make sure pytest-randomly's pytest_collection_modifyitems hook runs before pytest-django's one
# Note: _hookimpls execution order is reversed meaning that the last ones are run first
config.pluginmanager.hook.pytest_collection_modifyitems._hookimpls.sort(
key=lambda hook_impl: (
hook_impl.wrapper or hook_impl.hookwrapper, # Keep hookwrappers last
hook_impl.plugin_name == "randomly", # Then pytest-randomly's and after that all the other ones
)
)
config.addinivalue_line(
"markers",
(
"ignore_unknown_variable_template_error(*ignore_names): "
"ignore unknown variable error in templates, optionally providing specific names to ignore"
),
)
@pytest.fixture
def admin_client():
from tests.users.factories import ItouStaffFactory
client = ItouClient()
client.force_login(ItouStaffFactory(is_superuser=True))
return client
@pytest.fixture
def client():
return ItouClient()
@pytest.fixture
def api_client():
from rest_framework.test import APIClient
return APIClient()
@pytest.fixture()
def htmx_client():
"""
Mimic a response to an HTMX request.
Usage
```
def my_test(htmx_client):
response = htmx_client.get("/)
```
"""
return HtmxClient()
@pytest.fixture(autouse=True, scope="session")
def preload_spatial_reference(django_db_setup, django_db_blocker):
"""
Any first acces to a PostGIS field with geodjango loads the associated spatial
reference information in an memory cache within Django.
This fixture ensures this cache has been filled so that we have a consistent amount
of database requests between tests to avoid a potential source of flakiness.
Make a request for every spatial reference in use in the project.
"""
with django_db_blocker.unblock():
get_srid_info(4326, connection)
@pytest.fixture(autouse=True, scope="session")
def test_bucket():
# TODO: Remove this code block once we stop using a models.URLField() to store a S3 link (ie. `resume_link`)
from django.core.validators import URLValidator
patchy.patch(
URLValidator.__call__,
'''\
@@ -16,3 +16,5 @@ def __call__(self, value):
try:
+ if value.startswith("''' + settings.AWS_S3_ENDPOINT_URL + '''"):
+ return
super().__call__(value)
except ValidationError as e:
''',
) # fmt: skip
call_command("configure_bucket", autoexpire=True)
yield
@pytest.fixture(autouse=True)
def storage_prefix_per_test():
public_storage = storages["public"]
original_default_location = default_storage.location
original_public_location = public_storage.location
namespace = f"{uuid.uuid4()}"
default_storage.location = namespace
public_storage.location = namespace
yield
default_storage.location = original_default_location
public_storage.location = original_public_location
@pytest.fixture(autouse=True)
def cache_per_test(settings):
caches = copy.deepcopy(settings.CACHES)
for cache_config in caches.values():
cache_config["KEY_PREFIX"] = f"{uuid.uuid4()}"
settings.CACHES = caches
@pytest.fixture(autouse=True)
def cached_announce_campaign():
"""
Populates cache for AnnouncementCampaign to avoid an extra database hit in many tests
"""
from itou.communications.cache import CACHE_ACTIVE_ANNOUNCEMENTS_KEY
cache.set(CACHE_ACTIVE_ANNOUNCEMENTS_KEY, None, None)
yield
cache.delete(CACHE_ACTIVE_ANNOUNCEMENTS_KEY)
@pytest.fixture
def empty_active_announcements_cache(cached_announce_campaign):
from itou.communications.cache import CACHE_ACTIVE_ANNOUNCEMENTS_KEY
cache.delete(CACHE_ACTIVE_ANNOUNCEMENTS_KEY)
yield
@pytest.fixture
def temporary_bucket():
with override_settings(AWS_STORAGE_BUCKET_NAME=f"tests-{uuid.uuid4()}"):
call_command("configure_bucket")
yield
client = s3_client()
paginator = client.get_paginator("list_objects_v2")
try:
for page in paginator.paginate(Bucket=settings.AWS_STORAGE_BUCKET_NAME):
# Empty pages don’t have a Contents key.
if "Contents" in page:
client.delete_objects(
Bucket=settings.AWS_STORAGE_BUCKET_NAME,
Delete={"Objects": [{"Key": obj["Key"]} for obj in page["Contents"]]},
)
client.delete_bucket(Bucket=settings.AWS_STORAGE_BUCKET_NAME)
except s3_client.exceptions.NoSuchBucket:
pass
@pytest.fixture(autouse=True, scope="session", name="django_loaddata")
def django_loaddata_fixture(django_db_setup, django_db_blocker):
with django_db_blocker.unblock():
management.call_command(
"loaddata",
*{
"test_asp_INSEE_communes_factory.json",
"test_asp_INSEE_countries_factory.json",
},
)
@pytest.fixture(autouse=True)
def django_test_environment_email_fixup(django_test_environment, settings) -> None:
# Django forcefully sets the EMAIL_BACKEND to
# "django.core.mail.backends.locmem.EmailBackend" in
# django.test.utils.setup_test_environment.
settings.EMAIL_BACKEND = "itou.emails.tasks.AsyncEmailBackend"
settings.ASYNC_EMAIL_BACKEND = "django.core.mail.backends.locmem.EmailBackend"
@pytest.fixture(autouse=True, scope="session")
def itou_faker_provider(_session_faker):
_session_faker.add_provider(faker_providers.ItouProvider) # For faker
Faker.add_provider(faker_providers.ItouProvider) # For factory_boy
@pytest.fixture(scope="function")
def unittest_compatibility(request, faker, pdf_file, snapshot, mocker, xlsx_file, tmp_path):
request.instance.faker = faker
request.instance.pdf_file = pdf_file
request.instance.snapshot = snapshot
request.instance.mocker = mocker
request.instance.xlsx_file = xlsx_file
request.instance.tmp_path = tmp_path
@pytest.fixture(autouse=True)
def django_ensure_matomo_titles(monkeypatch) -> None:
is_running_on_ci = os.getenv("CI", False)
if not is_running_on_ci:
return
from django.template import base, defaulttags, loader, loader_tags
original_render = loader.render_to_string
def assertive_render(template_name, context=None, request=None, using=None):
if isinstance(template_name, list | tuple):
template = loader.select_template(template_name, using=using)
else:
template = loader.get_template(template_name, using=using)
def _walk_template_nodes(nodelist, condition_fn):
for node in nodelist:
if isinstance(node, loader_tags.ExtendsNode | defaulttags.IfNode):
return _walk_template_nodes(node.nodelist, condition_fn)
if condition_fn(node):
return node
def is_title_node(node):
return isinstance(node, loader_tags.BlockNode) and node.name == "title"
def is_variable_node(node):
return (
isinstance(node, base.VariableNode) and "block.super" not in str(node) and "CSP_NONCE" not in str(node)
)
title_node = _walk_template_nodes(template.template.nodelist, is_title_node)
if title_node:
var_node = _walk_template_nodes(title_node.nodelist, is_variable_node)
if var_node and "matomo_custom_title" not in context:
raise AssertionError(
f"template={template_name} uses a variable title; "
"please provide a `matomo_custom_title` in the context !"
)
return original_render(template_name, context, request, using)
monkeypatch.setattr(loader, "render_to_string", assertive_render)
@pytest.fixture(autouse=True, scope="session")
def _fail_for_invalid_template_variable_improved(_fail_for_invalid_template_variable):
# Workaround following https://github.com/pytest-dev/pytest-django/issues/1059#issue-1665002785
# rationale to better handle OneToOne field
if os.environ.get(INVALID_TEMPLATE_VARS_ENV, "false") == "true" and django_settings_is_configured():
from django.conf import settings as dj_settings
invalid_var_exception = dj_settings.TEMPLATES[0]["OPTIONS"]["string_if_invalid"]
# Make InvalidVarException falsy to keep the behavior consistent for OneToOneField
invalid_var_exception.__class__.__bool__ = lambda self: False
# but adapt Django's template code to behave as if it was truthy in resolve
# (except when the default filter is used)
patchy.patch(
base_template.FilterExpression.resolve,
"""\
@@ -7,7 +7,8 @@
obj = None
else:
string_if_invalid = context.template.engine.string_if_invalid
- if string_if_invalid:
+ from django.template.defaultfilters import default as default_filter
+ if default_filter not in {func for func, _args in self.filters}:
if "%s" in string_if_invalid:
return string_if_invalid % self.var
else:
""",
)
@pytest.fixture(autouse=True, scope="function")
def unknown_variable_template_error(monkeypatch, request):
marker = request.keywords.get("ignore_unknown_variable_template_error", None)
if os.environ.get(INVALID_TEMPLATE_VARS_ENV, "false") == "true":
# debug can be injected by django.template.context_processors.debug
# user can be injected by django.contrib.auth.context_processors.auth
# TODO(xfernandez): remove user from allow list (and remove the matching processor ?)
BASE_IGNORE_LIST = {"debug", "user"}
strict = True
if marker is None:
ignore_list = BASE_IGNORE_LIST
elif marker.args:
ignore_list = BASE_IGNORE_LIST | set(marker.args)
else:
# Marker without list
strict = False
if strict:
origin_resolve = base_template.FilterExpression.resolve
def stricter_resolve(self, context, ignore_failures=False):
if (
self.is_var
and self.var.lookups is not None
and self.var.lookups[0] not in context
and self.var.lookups[0] not in ignore_list
):
ignore_failures = False
return origin_resolve(self, context, ignore_failures)
monkeypatch.setattr(base_template.FilterExpression, "resolve", stricter_resolve)
@pytest.fixture(scope="session", autouse=True)
def make_unordered_queries_randomly_ordered():
"""
Patch Django’s ORM to randomly order all queries without a specified
order.
This discovers problems where code expects a given order but the
database doesn’t guarantee one.
https://adamj.eu/tech/2023/07/04/django-test-random-order-querysets/
"""
from django.db.models.sql.compiler import SQLCompiler
patchy.patch(
SQLCompiler._order_by_pairs,
"""\
@@ -9,7 +9,7 @@
ordering = meta.ordering
self._meta_ordering = ordering
else:
- ordering = []
+ ordering = ["?"] if not self.query.distinct else []
if self.query.standard_ordering:
default_order, _ = ORDER_DIR["ASC"]
else:
""",
)
@pytest.fixture
def pdf_file():
with open("tests/data/empty.pdf", "rb") as pdf:
yield pdf
@pytest.fixture
def xlsx_file():
with open("tests/data/empty.xlsx", "rb") as xlsx:
yield xlsx
# SFTP related fixtures
# ------------------------------------------------------------------------------
class Server(paramiko.ServerInterface):
def check_auth_password(self, *args, **kwargs):
# all are allowed
return paramiko.AUTH_SUCCESSFUL
def check_channel_request(self, *args, **kwargs):
return paramiko.OPEN_SUCCEEDED
class RootedSFTPServer(paramiko.SFTPServerInterface):
"""Taken and adapted from https://github.com/paramiko/paramiko/blob/main/tests/_stub_sftp.py"""
def __init__(self, server: ServerInterface, *args, root_path, **kwargs):
self._root_path = root_path
super().__init__(server, *args, **kwargs)
def _realpath(self, path):
return str(self._root_path) + self.canonicalize(path)
def list_folder(self, path):
path = self._realpath(path)
try:
out = []
for file_name in os.listdir(path):
attr = paramiko.SFTPAttributes.from_stat(os.stat(os.path.join(path, file_name)))
attr.filename = file_name
out.append(attr)
return out
except OSError as e:
return paramiko.SFTPServer.convert_errno(e.errno)
def stat(self, path):
try:
return paramiko.SFTPAttributes.from_stat(os.stat(self._realpath(path)))
except OSError as e:
return paramiko.SFTPServer.convert_errno(e.errno)
def open(self, path, flags, attr):
path = self._realpath(path)
flags = flags | getattr(os, "O_BINARY", 0)
mode = getattr(attr, "st_mode", None) or 0o777
try:
fd = os.open(path, flags, mode)
except OSError as e:
return paramiko.SFTPServer.convert_errno(e.errno)
if (flags & os.O_CREAT) and (attr is not None):
attr._flags &= ~attr.FLAG_PERMISSIONS
paramiko.SFTPServer.set_file_attr(path, attr)
if flags & os.O_WRONLY:
mode_from_flags = "a" if flags & os.O_APPEND else "w"
elif flags & os.O_RDWR:
mode_from_flags = "a+" if flags & os.O_APPEND else "r+"
else:
mode_from_flags = "r" # O_RDONLY (== 0)
try:
f = os.fdopen(fd, mode_from_flags + "b")
except OSError as e:
return paramiko.SFTPServer.convert_errno(e.errno)
handle = paramiko.SFTPHandle(flags)
handle.filename = path
handle.readfile = f
handle.writefile = f
return handle
def remove(self, path):
try:
os.remove(self._realpath(path))
except OSError as e:
return paramiko.SFTPServer.convert_errno(e.errno)
return paramiko.SFTP_OK
@pytest.fixture(scope="session", name="sftp_host_key")
def sftp_host_key_fixture():
# Use a 1024-bits key otherwise we get an OpenSSLError("digest too big for rsa key")
return (
rsa.generate_private_key(key_size=1024, public_exponent=65537)
.private_bytes(
encoding=serialization.Encoding.PEM,
format=serialization.PrivateFormat.TraditionalOpenSSL,
encryption_algorithm=serialization.NoEncryption(),
)
.decode()
)
@pytest.fixture(name="sftp_directory")
def sftp_directory_fixture(tmp_path_factory):
return tmp_path_factory.mktemp("sftp")
@pytest.fixture(name="sftp_client_factory")
def sftp_client_factory_fixture(sftp_host_key, sftp_directory):
"""
Set up an in-memory SFTP server thread. Return the client Transport/socket.
The resulting client Transport (along with all the server components) will
be the same object throughout the test session; the `sftp_client_factory` fixture then
creates new higher level client objects wrapped around the client Transport, as necessary.
"""
# Sockets & transports
server_socket, client_socket = socket.socketpair()
server_transport = paramiko.Transport(server_socket)
client_transport = paramiko.Transport(client_socket)
# Auth
server_transport.add_server_key(paramiko.RSAKey.from_private_key(io.StringIO(sftp_host_key)))
# Server setup
server_transport.set_subsystem_handler("sftp", paramiko.SFTPServer, RootedSFTPServer, root_path=sftp_directory)
# The event parameter is here to not block waiting for a client connection
server_transport.start_server(event=threading.Event(), server=Server())
client_transport.connect(username="user", password="password")
def sftp_client_factory(*args, **kwargs):
return paramiko.SFTPClient.from_transport(client_transport)
return sftp_client_factory