Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update stresstest command #641

Merged
merged 52 commits into from
Apr 23, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
52 commits
Select commit Hold shift + click to select a range
6fae8f7
Print message at start of test
stveit Apr 27, 2023
bf4c007
Return created IDs
stveit Apr 27, 2023
0ed8300
Make success message more accurate
stveit Apr 27, 2023
df22a2f
Add funcs for verifying created incidents
stveit Apr 27, 2023
8c7a498
Build break condition into while condition
stveit Apr 27, 2023
f81170e
Include tags in post data
stveit Apr 27, 2023
3da2af0
Put stresstest logic in its own class
stveit Apr 27, 2023
37e8485
Return joined list instead of fragments
stveit Apr 27, 2023
85ca01c
Rename func
stveit Apr 27, 2023
119135d
Add func for running verification workers
stveit Apr 27, 2023
ffad42b
Verify created incidents
stveit Apr 27, 2023
f93d2e9
Use options args directly
stveit Apr 27, 2023
e421a79
Add timeout arg
stveit Apr 27, 2023
17f0d45
Add get function for auth header
stveit May 3, 2023
7876a60
Add option for bulk ack created incidents
stveit May 3, 2023
13c234e
Put main logic in one try except
stveit May 3, 2023
16ec75d
Use copy of list so ids are kept on original list
stveit May 3, 2023
b6349c6
Support v1 and v2 api
stveit May 3, 2023
43cd1b5
Change default runtime to 10s
stveit May 3, 2023
de9de06
Include default values in helptext
stveit May 3, 2023
21fc578
Rename variable
stveit Sep 18, 2023
df13b6d
Remove second unit from worker default value
stveit Sep 18, 2023
077cf93
Include requests per second in output
stveit Sep 18, 2023
ca1dd9e
Separate code for verifying one incident
stveit Sep 18, 2023
45935e8
Add type hints
stveit Sep 18, 2023
f836855
Remove unecessary return
stveit Sep 18, 2023
c6d49b8
Split up handle function to reduce complexity
stveit Sep 18, 2023
60151c2
Build loop handling into stresstester
stveit Sep 18, 2023
33ae773
Rename functions
stveit Sep 18, 2023
18b5254
Add docstrings
stveit Sep 18, 2023
b61d5de
Fix tuple typehint
stveit Sep 19, 2023
ece532a
Handle all httpx errors
stveit Sep 19, 2023
f230cd3
Update exception strings
stveit Sep 19, 2023
57508d6
Await coroutine
stveit Sep 20, 2023
72c808b
Make requests/s more accurate
stveit Sep 20, 2023
c0db0b2
Reorder functions
stveit Sep 20, 2023
863a53a
Restructure handle code and print outputs
stveit Sep 20, 2023
12b9a7a
Import post directly
stveit Sep 21, 2023
6585d78
Update url help string
stveit Sep 21, 2023
f7bdfe3
Add docs for stresstest command
stveit Feb 16, 2024
133788b
Add towncrier file
stveit Apr 8, 2024
3e51217
Update src/argus/dev/management/commands/stresstest.py
stveit Apr 18, 2024
28e5dd5
Update src/argus/dev/management/commands/stresstest.py
stveit Apr 18, 2024
cba787e
create url and headers once per worker
stveit Apr 18, 2024
8e85834
Use modern docker commands
stveit Apr 18, 2024
bec870b
Move StressTester class to utils file
stveit Apr 18, 2024
1454134
Add tests
stveit Apr 18, 2024
1f7f3e3
Fix spacing
stveit Apr 18, 2024
a24945e
Merge branch 'master' into stresstest.verify
hmpf Apr 19, 2024
7b58247
Merge branch 'master' into stresstest.verify
stveit Apr 22, 2024
d083214
Merge branch 'master' into stresstest.verify
hmpf Apr 22, 2024
78bbd9c
Use typing supported by 3.8
stveit Apr 23, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog.d/641.added.md
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Adds more capability to the stresstest command, including incident creation verification, bulk ACKing and timeout configuration.
70 changes: 70 additions & 0 deletions docs/development/management-commands.rst
Original file line number Diff line number Diff line change
Expand Up @@ -223,3 +223,73 @@ arguments:
$ python manage.py toggle_profile_activation 1 2 3 4

It will lead to an error if no ids are given.


.. _stresstest:

Stresstest
----------
.. warning::
You should be careful using the `stresstest` command against a production environment,
as the incidents created during the stresstest can trigger notifications
like any other real incidents.

You can run stresstests against the incident creation API with the command `stresstest`:

.. code:: console

$ python manage.py stresstest

The stresstest will create as many incidents as it can during a given timespan by
sending requests to the incident creation API. Afterwards, it will verify that the
incidents added to the database by the stresstest were created correctly.

`stresstest` requires a URL to the target host and a token related to a source system
as positional arguments. The URL can point to a local or remote argus instance.
A valid token can be generated via the Django admin interface.

Example usage pointing to a local testing instance of Argus:

.. code:: console

$ python manage.py stresstest http://localhost:8000 $TOKEN

See the inbuilt help for flags and toggles:

.. code:: console

$ python manage.py stresstest --help

The duration of the stresstest in seconds can be set using the `-s` flag:

.. code:: console

$ python manage.py stresstest http://localhost:8000 $TOKEN -s 10

Timeout of requests in seconds can be set with the `-t` flag:

.. code:: console

$ python manage.py stresstest http://localhost:8000 $TOKEN -t 5


Multiple asynchronous workers can be used to send requests in parallel
using the -w flag:

.. code:: console

$ python manage.py stresstest http://localhost:8000 $TOKEN -w 5

The created incidents can be bulk ACKed at the end of the test by setting
the `-b` flag:

.. code:: console

$ python manage.py stresstest http://localhost:8000 $TOKEN -b


If you are running Argus inside a Docker container, the stresstest can be run with:

.. code:: console

$ docker compose exec api python manage.py stresstest
76 changes: 23 additions & 53 deletions src/argus/dev/management/commands/stresstest.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,9 @@
from datetime import datetime, timedelta
from urllib.parse import urljoin
import asyncio

from httpx import AsyncClient, TimeoutException, HTTPStatusError
from httpx import TimeoutException, HTTPStatusError, HTTPError

from django.core.management.base import BaseCommand

from argus.dev.utils import StressTester, DatabaseMismatchError


class Command(BaseCommand):
help = "Stresstests incident creation API"
Expand All @@ -14,7 +12,7 @@ def add_arguments(self, parser):
parser.add_argument(
"url",
type=str,
help="URL for target argus host including port, ex https://argushost.no:443",
help="URL for target argus host. Port may be specified (defaults to 443 for HTTPS and 80 for HTTP), e.g. https://argushost.no:8080",
)
parser.add_argument(
"token",
Expand All @@ -25,58 +23,30 @@ def add_arguments(self, parser):
"-s",
"--seconds",
type=int,
help="Number of seconds to send http requests. After this no more requests will be sent but responses will be waited for",
default=100,
help="Number of seconds to send http requests. After this no more requests will be sent but responses will be waited for. Default 10s",
default=10,
)
parser.add_argument("-w", "--workers", type=int, help="Number of workers", default=1)

def get_incident_data(self):
return {
"start_time": datetime.now().isoformat(),
"description": "Stresstest",
"tags": [],
}

async def post_incidents_until_end_time(self, url, end_time, token, client):
request_counter = 0
incident_data = self.get_incident_data()
while True:
if datetime.now() >= end_time:
break
try:
response = await client.post(url, json=incident_data, headers={"Authorization": f"Token {token}"})
response.raise_for_status()
except TimeoutException:
raise TimeoutException(f"Timeout waiting for POST response to {url}")
except HTTPStatusError as e:
msg = f"HTTP error {e.response.status_code}: {e.response.content.decode('utf-8')}"
raise HTTPStatusError(msg, request=e.request, response=e.response)
request_counter += 1
return request_counter

async def run_workers(self, url, end_time, token, worker_count):
async with AsyncClient() as client:
return await asyncio.gather(
*(self.post_incidents_until_end_time(url, end_time, token, client) for _ in range(worker_count))
)
parser.add_argument("-t", "--timeout", type=int, help="Timeout for requests. Default 5s", default=5)
parser.add_argument("-w", "--workers", type=int, help="Number of workers. Default 1", default=1)
parser.add_argument("-b", "--bulk", action="store_true", help="Bulk ACK created incidents")

def handle(self, *args, **options):
test_duration = options.get("seconds")
url = urljoin(options.get("url"), "/api/v1/incidents/")
token = options.get("token")
worker_count = options.get("workers")
loop = asyncio.get_event_loop()
start_time = datetime.now()
end_time = start_time + timedelta(seconds=test_duration)
tester = StressTester(options.get("url"), options.get("token"), options.get("timeout"), options.get("workers"))
try:
result = loop.run_until_complete(self.run_workers(url, end_time, token, worker_count))
except (TimeoutException, HTTPStatusError) as e:
self.stderr.write(self.style.ERROR(e))
else:
total_requests = sum(result)
seconds_run = (datetime.now() - start_time).seconds
self.stdout.write("Running stresstest ...")
incident_ids, runtime = tester.run(options.get("seconds"))
requests_per_second = round(len(incident_ids) / runtime.total_seconds(), 2)
self.stdout.write("Verifying incidents were created correctly ...")
tester.verify(incident_ids)
if options.get("bulk"):
self.stdout.write("Bulk ACKing incidents ...")
tester.bulk_ack(incident_ids)
self.stdout.write(
self.style.SUCCESS(
f"Stresstest complete with no errors. {total_requests} requests were sent in {seconds_run} seconds."
f"Stresstest completed. Runtime: {runtime}. Incidents created: {len(incident_ids)}. Average incidents per second: {requests_per_second}."
)
)
except (DatabaseMismatchError, HTTPStatusError, TimeoutException) as e:
self.stderr.write(self.style.ERROR(e))
except HTTPError as e:
self.stderr.write(self.style.ERROR(f"HTTP Error: {e}"))
133 changes: 133 additions & 0 deletions src/argus/dev/utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,133 @@
from datetime import datetime, timedelta
from urllib.parse import urljoin
import asyncio
import itertools
from typing import Any, Dict, AnyStr, List, Tuple

from httpx import AsyncClient, TimeoutException, HTTPStatusError, post


class DatabaseMismatchError(Exception):
pass


class StressTester:
def __init__(self, url: str, token: str, timeout: int, worker_count: int):
self.url = url
self.token = token
self.timeout = timeout
self.worker_count = worker_count
self._loop = asyncio.get_event_loop()

def _get_incident_data(self) -> Dict[str, Any]:
return {
"start_time": datetime.now().isoformat(),
"description": "Stresstest",
"tags": [{"tag": "problem_type=stresstest"}],
}

def _get_auth_header(self) -> Dict[str, str]:
return {"Authorization": f"Token {self.token}"}

def _get_incidents_v1_url(self) -> AnyStr:
return urljoin(self.url, "/api/v1/incidents/")

def _get_incidents_v2_url(self) -> AnyStr:
return urljoin(self.url, "/api/v2/incidents/")

def run(self, seconds: int) -> Tuple[List[int], timedelta]:
"""Runs a stresstest against the configured URL.
The test will continually send requests for `seconds` seconds and stop when all requests have gotten a response.
Returns a list containing the IDs of all created incidents and a timedelta detailing how long the test ran for.
Since the stresstest waits for responses to all requests, the total runtime should exceed `seconds` to varying degrees.
"""
start_time = datetime.now()
end_time = start_time + timedelta(seconds=seconds)
incident_ids = self._loop.run_until_complete(self._run_stresstest_workers(end_time))
runtime = datetime.now() - start_time
return incident_ids, runtime

async def _run_stresstest_workers(self, end_time: datetime) -> List[int]:
async with AsyncClient(timeout=self.timeout) as client:
results = await asyncio.gather(*(self._post_incidents(end_time, client) for _ in range(self.worker_count)))
return list(itertools.chain.from_iterable(results))

async def _post_incidents(self, end_time: datetime, client: AsyncClient) -> List[int]:
created_ids = []
incident_data = self._get_incident_data()
url = self._get_incidents_v1_url()
headers = self._get_auth_header()
while datetime.now() < end_time:
try:
response = await client.post(url, json=incident_data, headers=headers)
response.raise_for_status()
incident = response.json()
created_ids.append(incident["pk"])
except TimeoutException:
raise TimeoutException(f"Timeout waiting for POST response to {self.url}")
except HTTPStatusError as e:
msg = f"HTTP Error {e.response.status_code}: {e.response.content.decode('utf-8')}"
raise HTTPStatusError(msg, request=e.request, response=e.response)
return created_ids

def verify(self, incident_ids: List[int]):
"""Verifies that the incidents included in `incident_ids` exist and contain the expected values"""
self._loop.run_until_complete(self._run_verification_workers(incident_ids))

async def _run_verification_workers(self, incident_ids: List[int]):
ids = incident_ids.copy()
async with AsyncClient(timeout=self.timeout) as client:
await asyncio.gather(*(self._verify_created_incidents(ids, client) for _ in range(self.worker_count)))

async def _verify_created_incidents(self, incident_ids: List[int], client: AsyncClient):
while incident_ids:
incident_id = incident_ids.pop()
await self._verify_incident(incident_id, client)

async def _verify_incident(self, incident_id: int, client: AsyncClient):
expected_data = self._get_incident_data()
id_url = urljoin(self._get_incidents_v1_url(), str(incident_id) + "/")
try:
response = await client.get(id_url, headers=self._get_auth_header())
response.raise_for_status()
except TimeoutException:
raise TimeoutException(f"Timeout waiting for GET response to {id_url}")
except HTTPStatusError as e:
msg = f"HTTP Error {e.response.status_code}: {e.response.content.decode('utf-8')}"
raise HTTPStatusError(msg, request=e.request, response=e.response)
response_data = response.json()
self._verify_tags(response_data, expected_data)
self._verify_description(response_data, expected_data)

def _verify_tags(self, response_data: Dict[str, Any], expected_data: Dict[str, Any]):
expected_tags = set([tag["tag"] for tag in expected_data["tags"]])
response_tags = set([tag["tag"] for tag in response_data["tags"]])
if expected_tags != response_tags:
msg = f'Actual tag(s) "{response_tags}" differ(s) from expected tag(s) "{expected_tags}"'
raise DatabaseMismatchError(msg)

def _verify_description(self, response_data: Dict[str, Any], expected_data: Dict[str, Any]):
expected_descr = expected_data["description"]
response_descr = response_data["description"]
if response_descr != expected_descr:
msg = f'Actual description "{response_descr}" differs from expected description "{expected_descr}"'
raise DatabaseMismatchError(msg)

def bulk_ack(self, incident_ids: List[int]):
"""Sends a request to ACK all incidents included in `incident_ids`"""
request_data = {
"ids": incident_ids,
"ack": {
"timestamp": datetime.now().isoformat(),
"description": "Stresstest",
},
}
url = urljoin(self._get_incidents_v2_url(), "acks/bulk/")
try:
response = post(url, json=request_data, headers=self._get_auth_header())
response.raise_for_status()
except TimeoutException:
raise TimeoutException(f"Timeout waiting for POST response to {url}")
except HTTPStatusError as e:
msg = f"HTTP Error {e.response.status_code}: {e.response.content.decode('utf-8')}"
raise HTTPStatusError(msg, request=e.request, response=e.response)
61 changes: 61 additions & 0 deletions tests/dev/test_utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
from django.test import TestCase

from argus.dev.utils import StressTester, DatabaseMismatchError

from argus.util.testing import connect_signals, disconnect_signals


class StressTesterTests(TestCase):
def setUp(self):
disconnect_signals()
self.stresstester = StressTester("http://localhost.com", "token", 10, 1)

def tearDown(self):
connect_signals()

def test_verify_tags_raise_error_for_incorrect_tags(self):
expected_data = {
"tags": [{"tag": "tag1=value1"}, {"tag": "tag2=value2"}],
}
actual_data = {
"tags": [{"tag": "tag1=value2"}, {"tag": "tag2=value1"}],
}
with self.assertRaises(DatabaseMismatchError):
self.stresstester._verify_tags(actual_data, expected_data)

def test_verify_tags_does_not_raise_exception_for_correct_tags(self):
expected_data = {
"tags": [{"tag": "tag1=value1"}, {"tag": "tag2=value2"}],
}
actual_data = {
"tags": [{"tag": "tag1=value1"}, {"tag": "tag2=value2"}],
}
self.stresstester._verify_tags(actual_data, expected_data)

def test_verify_description_raise_error_for_incorrect_description(self):
expected_data = {
"description": "correct description",
}
actual_data = {
"description": "incorrect description",
}
with self.assertRaises(DatabaseMismatchError):
self.stresstester._verify_description(actual_data, expected_data)

def test_verify_description_does_not_raise_exception_for_correct_description(self):
expected_data = {
"description": "correct description",
}
actual_data = {
"description": "correct description",
}
self.stresstester._verify_description(actual_data, expected_data)

def test_get_auth_header_returns_correct_header_values(self):
self.assertEqual(self.stresstester._get_auth_header(), {"Authorization": f"Token token"})

def test_get_incidents_v1_url_returns_correct_url(self):
self.assertEqual(self.stresstester._get_incidents_v1_url(), "http://localhost.com/api/v1/incidents/")

def test_get_incidents_v2_url_returns_correct_url(self):
self.assertEqual(self.stresstester._get_incidents_v2_url(), "http://localhost.com/api/v2/incidents/")
Loading