Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

✨🐛Webserver: enable socketio horizontal scaling + ensure only 1 update goes through (⚠️ devops) #4286

Merged
Changes from 1 commit
Commits
Show all changes
50 commits
Select commit Hold shift + click to select a range
09c730f
setup socketio horizontal scaling
sanderegg May 30, 2023
5317425
add dependency
sanderegg May 30, 2023
97a84d6
create new service to get db notifications only once
sanderegg May 30, 2023
e395ea5
ensure envs are set correctly
sanderegg May 30, 2023
94dba1d
new db_listener plugin
sanderegg May 30, 2023
3176639
renaming stuff
sanderegg May 30, 2023
e397401
upgrade import
sanderegg May 30, 2023
3be0e6c
add documentation
sanderegg May 30, 2023
a52b088
ensure we do have rabbit in the tests
sanderegg May 30, 2023
8575337
this shall be moved
sanderegg May 30, 2023
de92a58
adding rabbit to test suite
sanderegg May 30, 2023
c30f282
typo
sanderegg May 30, 2023
2a06d2a
pylance
sanderegg May 30, 2023
f536489
add rabbit settings
sanderegg May 30, 2023
6d8e541
fix mock
sanderegg May 30, 2023
6a57f16
type
sanderegg May 30, 2023
17cf4ee
fix tests
sanderegg May 30, 2023
437d600
fix missing rabbitmq
sanderegg May 31, 2023
6b9af3d
type
sanderegg May 31, 2023
0db3359
enable rabbit
sanderegg May 31, 2023
1363940
remove todos
sanderegg May 31, 2023
d14b567
setup of socketio now done as event.
sanderegg May 31, 2023
edb506d
linter
sanderegg May 31, 2023
9f2b087
fix dependencies
sanderegg May 31, 2023
bd3d328
fix closing
sanderegg May 31, 2023
72c2a0a
use hasattr
sanderegg May 31, 2023
fe93688
closing is now handled without warnings
sanderegg May 31, 2023
0a22612
refactor
sanderegg May 31, 2023
be4e43e
revert
sanderegg Jun 1, 2023
58b566c
type
sanderegg Jun 1, 2023
a8dfea0
clean
sanderegg Jun 1, 2023
96c183c
fix init
sanderegg Jun 1, 2023
25d5131
clean
sanderegg Jun 1, 2023
689602e
make storage more reactive in tests
sanderegg Jun 1, 2023
a4c6fc3
clean
sanderegg Jun 1, 2023
792436c
clean stuff
sanderegg Jun 1, 2023
7f7940d
fix fixtures
sanderegg Jun 1, 2023
cc86f38
sql 2.0
sanderegg Jun 1, 2023
acdb448
@pcrespov review: use parse_obj_as
sanderegg Jun 1, 2023
3befcd9
sql 2.0
sanderegg Jun 1, 2023
7eac9a3
@pcrespov review: carefully select what is enabled
sanderegg Jun 1, 2023
b052161
@pcrespov review: fix comment
sanderegg Jun 1, 2023
aaea70e
cleanup
sanderegg Jun 1, 2023
294df4f
remove unused stuff
sanderegg Jun 1, 2023
7612321
sql 2.0
sanderegg Jun 1, 2023
f986f57
fix tests
sanderegg Jun 1, 2023
f914b5b
linter
sanderegg Jun 1, 2023
67e5c71
fix test
sanderegg Jun 1, 2023
0ec1936
@pcrespov review: rename
sanderegg Jun 1, 2023
8f3ea94
increase wait timeout during tests
sanderegg Jun 1, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
fix tests
sanderegg committed Jun 1, 2023

Verified

This commit was created on GitHub.com and signed with GitHub’s verified signature. The key has expired.
commit f986f5713343b5da4ebc3283ba0205ba51cc5ed4
140 changes: 73 additions & 67 deletions services/web/server/tests/integration/02/test_computation.py
Original file line number Diff line number Diff line change
@@ -12,15 +12,16 @@
from typing import Any, Callable, NamedTuple

import pytest
import sqlalchemy as sa
from aiohttp import web
from aiohttp.test_utils import TestClient
from models_library.projects_state import RunningState
from pytest_mock import MockerFixture
from pytest_simcore.helpers.utils_assert import assert_status
from servicelib.aiohttp.application import create_safe_application
from servicelib.json_serialization import json_dumps
from settings_library.rabbit import RabbitSettings
from settings_library.redis import RedisSettings
from simcore_postgres_database.models.comp_tasks import comp_tasks
from simcore_postgres_database.models.projects import projects
from simcore_postgres_database.models.users import UserRole
from simcore_postgres_database.webserver_models import (
@@ -33,6 +34,7 @@
from simcore_service_webserver.application_settings import setup_settings
from simcore_service_webserver.db import setup_db
from simcore_service_webserver.db_listener._utils import DB_TO_RUNNING_STATE
from simcore_service_webserver.db_listener.plugin import setup_db_listener
from simcore_service_webserver.diagnostics.plugin import setup_diagnostics
from simcore_service_webserver.director_v2.plugin import setup_director_v2
from simcore_service_webserver.login.plugin import setup_login
@@ -45,7 +47,6 @@
from simcore_service_webserver.session import setup_session
from simcore_service_webserver.socketio.plugin import setup_socketio
from simcore_service_webserver.users.plugin import setup_users
from sqlalchemy.orm import session
from tenacity._asyncio import AsyncRetrying
from tenacity.retry import retry_if_exception_type
from tenacity.stop import stop_after_delay
@@ -142,14 +143,13 @@ def standard_role_response() -> tuple[str, list[tuple[UserRole, ExpectedResponse
@pytest.fixture
def client(
event_loop: asyncio.AbstractEventLoop,
postgres_session: session.Session,
postgres_db: sa.engine.Engine,
rabbit_service: RabbitSettings,
redis_settings: RedisSettings,
simcore_services_ready: None,
aiohttp_client: Callable,
app_config: dict[str, Any], # waits until swarm with *_services are up
mocker: MockerFixture,
monkeypatch_setenv_from_app_config: Callable,
simcore_services_ready: None,
) -> TestClient:
cfg = deepcopy(app_config)

@@ -176,6 +176,7 @@ def client(
setup_director_v2(app)
setup_resource_manager(app)
setup_products(app)
setup_db_listener(app)
# no garbage collector

return event_loop.run_until_complete(
@@ -198,78 +199,84 @@ def fake_workbench_adjacency_list(tests_data_dir: Path) -> dict[str, Any]:

def _assert_db_contents(
project_id: str,
postgres_session: session.Session,
postgres_db: sa.engine.Engine,
fake_workbench_payload: dict[str, Any],
fake_workbench_adjacency_list: dict[str, Any],
check_outputs: bool,
):
# pylint: disable=no-member
pipeline_db = (
postgres_session.query(comp_pipeline)
.filter(comp_pipeline.c.project_id == project_id)
.one()
)
assert pipeline_db.project_id == project_id
assert pipeline_db.dag_adjacency_list == fake_workbench_adjacency_list

# check db comp_tasks
tasks_db = (
postgres_session.query(comp_tasks)
.filter(comp_tasks.c.project_id == project_id)
.all()
)
mock_pipeline = fake_workbench_payload
assert len(tasks_db) == len(mock_pipeline)
) -> None:
with postgres_db.connect() as conn:
pipeline_db = conn.execute(
sa.select(comp_pipeline).where(comp_pipeline.c.project_id == project_id)
).fetchone()
assert pipeline_db

assert pipeline_db[comp_pipeline.c.project_id] == project_id
assert (
pipeline_db[comp_pipeline.c.dag_adjacency_list]
== fake_workbench_adjacency_list
)

# check db comp_tasks
tasks_db = conn.execute(
sa.select(comp_tasks).where(comp_tasks.c.project_id == project_id)
).fetchall()
assert tasks_db

for task_db in tasks_db:
assert task_db.project_id == project_id
assert task_db.node_id in mock_pipeline.keys()
mock_pipeline = fake_workbench_payload
assert len(tasks_db) == len(mock_pipeline)

assert task_db.inputs == mock_pipeline[task_db.node_id].get("inputs")
for task_db in tasks_db:
assert task_db.project_id == project_id
assert task_db.node_id in mock_pipeline.keys()

if check_outputs:
assert task_db.outputs == mock_pipeline[task_db.node_id].get("outputs")
assert task_db.inputs == mock_pipeline[task_db.node_id].get("inputs")

assert task_db.image["name"] == mock_pipeline[task_db.node_id]["key"]
assert task_db.image["tag"] == mock_pipeline[task_db.node_id]["version"]
if check_outputs:
assert task_db.outputs == mock_pipeline[task_db.node_id].get("outputs")

assert task_db.image["name"] == mock_pipeline[task_db.node_id]["key"]
assert task_db.image["tag"] == mock_pipeline[task_db.node_id]["version"]


NodeIdStr = str


def _get_computational_tasks_from_db(
project_id: str,
postgres_session: session.Session,
postgres_db: sa.engine.Engine,
) -> dict[NodeIdStr, Any]:
# this check is only there to check the comp_pipeline is there
assert (
postgres_session.query(comp_pipeline)
.filter(comp_pipeline.c.project_id == project_id)
.one()
), f"missing pipeline in the database under comp_pipeline {project_id}"
with postgres_db.connect() as conn:
assert (
conn.execute(
sa.select(comp_pipeline).where(comp_pipeline.c.project_id == project_id)
).fetchone()
is not None
), f"missing pipeline in the database under comp_pipeline {project_id}"

# get the computational tasks
tasks_db = conn.execute(
sa.select(comp_tasks).where(
(comp_tasks.c.project_id == project_id)
& (comp_tasks.c.node_class == NodeClass.COMPUTATIONAL)
)
).fetchall()

# get the computational tasks
tasks_db = (
postgres_session.query(comp_tasks)
.filter(
(comp_tasks.c.project_id == project_id)
& (comp_tasks.c.node_class == NodeClass.COMPUTATIONAL)
)
.all()
)
print(f"--> tasks from DB: {tasks_db=}")
return {t.node_id: t for t in tasks_db}


def _get_project_workbench_from_db(
project_id: str,
postgres_session: session.Session,
postgres_db: sa.engine.Engine,
) -> dict[str, Any]:
# this check is only there to check the comp_pipeline is there
print(f"--> looking for project {project_id=} in projects table...")
project_in_db = (
postgres_session.query(projects).filter(projects.c.uuid == project_id).one()
)
with postgres_db.connect() as conn:
project_in_db = conn.execute(
sa.select(projects).where(projects.c.uuid == project_id)
).fetchone()

assert (
project_in_db
), f"missing pipeline in the database under comp_pipeline {project_id}"
@@ -284,7 +291,7 @@ async def _assert_and_wait_for_pipeline_state(
project_id: str,
expected_state: RunningState,
expected_api_response: ExpectedResponse,
):
) -> None:
assert client.app
url_project_state = client.app.router["get_project_state"].url_for(
project_id=project_id
@@ -315,8 +322,8 @@ async def _assert_and_wait_for_pipeline_state(

async def _assert_and_wait_for_comp_task_states_to_be_transmitted_in_projects(
project_id: str,
postgres_session: session.Session,
):
postgres_db: sa.engine.Engine,
) -> None:
async for attempt in AsyncRetrying(
reraise=True,
stop=stop_after_delay(120),
@@ -328,10 +335,10 @@ async def _assert_and_wait_for_comp_task_states_to_be_transmitted_in_projects(
f"--> waiting for pipeline results to move to projects table, attempt {attempt.retry_state.attempt_number}..."
)
comp_tasks_in_db: dict[NodeIdStr, Any] = _get_computational_tasks_from_db(
project_id, postgres_session
project_id, postgres_db
)
workbench_in_db: dict[NodeIdStr, Any] = _get_project_workbench_from_db(
project_id, postgres_session
project_id, postgres_db
)
for node_id, node_values in comp_tasks_in_db.items():
assert (
@@ -359,12 +366,11 @@ async def _assert_and_wait_for_comp_task_states_to_be_transmitted_in_projects(
)


@pytest.mark.skip(reason="FIXME: still not bullet proof")
@pytest.mark.parametrize(*standard_role_response(), ids=str)
async def test_start_stop_computation(
client: TestClient,
sleeper_service: dict[str, str],
postgres_session: session.Session,
postgres_db: sa.engine.Engine,
logged_user: dict[str, Any],
user_project: dict[str, Any],
fake_workbench_adjacency_list: dict[str, Any],
@@ -392,7 +398,7 @@ async def test_start_stop_computation(

_assert_db_contents(
project_id,
postgres_session,
postgres_db,
fake_workbench_payload,
fake_workbench_adjacency_list,
check_outputs=False,
@@ -403,7 +409,7 @@ async def test_start_stop_computation(
)
# we need to wait until the webserver has updated the projects DB before starting another round
await _assert_and_wait_for_comp_task_states_to_be_transmitted_in_projects(
project_id, postgres_session
project_id, postgres_db
)
# restart the computation, this should produce a 422 since the computation was complete
resp = await client.post(f"{url_start}")
@@ -429,19 +435,19 @@ async def test_start_stop_computation(
)
# we need to wait until the webserver has updated the projects DB
await _assert_and_wait_for_comp_task_states_to_be_transmitted_in_projects(
project_id, postgres_session
project_id, postgres_db
)


@pytest.mark.parametrize(*standard_role_response(), ids=str)
async def test_run_pipeline_and_check_state(
client: TestClient,
sleeper_service: dict[str, str],
postgres_session: session.Session,
logged_user: dict[str, Any],
postgres_db: sa.engine.Engine,
# logged_user: dict[str, Any],
user_project: dict[str, Any],
fake_workbench_adjacency_list: dict[str, Any],
user_role: UserRole,
# user_role: UserRole,
expected: ExpectedResponse,
):
assert client.app
@@ -463,7 +469,7 @@ async def test_run_pipeline_and_check_state(

_assert_db_contents(
project_id,
postgres_session,
postgres_db,
fake_workbench_payload,
fake_workbench_adjacency_list,
check_outputs=False,
@@ -528,7 +534,7 @@ async def test_run_pipeline_and_check_state(
)
assert pipeline_state == RunningState.SUCCESS
comp_tasks_in_db: dict[NodeIdStr, Any] = _get_computational_tasks_from_db(
project_id, postgres_session
project_id, postgres_db
)
assert all( # pylint: disable=use-a-generator
[t.state == StateType.SUCCESS for t in comp_tasks_in_db.values()]
@@ -538,7 +544,7 @@ async def test_run_pipeline_and_check_state(
)
# we need to wait until the webserver has updated the projects DB
await _assert_and_wait_for_comp_task_states_to_be_transmitted_in_projects(
project_id, postgres_session
project_id, postgres_db
)

print(f"<-- pipeline completed successfully in {time.monotonic() - start} seconds")