From db23f5eecbc377f8be2f40b2d29bf318b8bd956b Mon Sep 17 00:00:00 2001 From: matusdrobuliak66 Date: Fri, 10 Nov 2023 15:49:20 +0100 Subject: [PATCH 01/14] listen to wallet events in payment system --- .../core/application.py | 4 + .../services/auto_recharge_listener.py | 36 +++++++++ .../services/auto_recharge_process_message.py | 20 +++++ .../test_services_auto_recharge_listener.py | 75 +++++++++++++++++++ 4 files changed, 135 insertions(+) create mode 100644 services/payments/src/simcore_service_payments/services/auto_recharge_listener.py create mode 100644 services/payments/src/simcore_service_payments/services/auto_recharge_process_message.py create mode 100644 services/payments/tests/unit/test_services_auto_recharge_listener.py diff --git a/services/payments/src/simcore_service_payments/core/application.py b/services/payments/src/simcore_service_payments/core/application.py index 899f2735d0f..815355b1cf9 100644 --- a/services/payments/src/simcore_service_payments/core/application.py +++ b/services/payments/src/simcore_service_payments/core/application.py @@ -11,6 +11,7 @@ ) from ..api.rest.routes import setup_rest_api from ..api.rpc.routes import setup_rpc_api_routes +from ..services.auto_recharge_listener import setup_auto_recharge_listener from ..services.payments_gateway import setup_payments_gateway from ..services.postgres import setup_postgres from ..services.rabbitmq import setup_rabbitmq @@ -51,6 +52,9 @@ def create_app(settings: ApplicationSettings | None = None) -> FastAPI: # APIs w/ RUT setup_resource_usage_tracker(app) + # Listening to Rabbitmq + setup_auto_recharge_listener(app) + # ERROR HANDLERS # ... add here ... diff --git a/services/payments/src/simcore_service_payments/services/auto_recharge_listener.py b/services/payments/src/simcore_service_payments/services/auto_recharge_listener.py new file mode 100644 index 00000000000..2949c847b02 --- /dev/null +++ b/services/payments/src/simcore_service_payments/services/auto_recharge_listener.py @@ -0,0 +1,36 @@ +import functools +import logging + +from fastapi import FastAPI +from models_library.rabbitmq_messages import WalletCreditsMessage +from servicelib.logging_utils import log_context +from servicelib.rabbitmq import RabbitMQClient + +from .auto_recharge_process_message import process_message +from .rabbitmq import get_rabbitmq_client + +_logger = logging.getLogger(__name__) + + +async def _subscribe_to_rabbitmq(app) -> str: + with log_context(_logger, logging.INFO, msg="Subscribing to rabbitmq channel"): + rabbit_client: RabbitMQClient = get_rabbitmq_client(app) + subscribed_queue: str = await rabbit_client.subscribe( + WalletCreditsMessage.get_channel_name(), + message_handler=functools.partial(process_message, app), + exclusive_queue=False, + topics=["#"], + ) + return subscribed_queue + + +def setup_auto_recharge_listener(app: FastAPI) -> None: + async def _on_startup() -> None: + app.state.auto_recharge_rabbitmq_consumer = await _subscribe_to_rabbitmq(app) + + async def _on_shutdown() -> None: + # NOTE: We want to have persistent queue, therefore we will not unsubscribe + ... + + app.add_event_handler("startup", _on_startup) + app.add_event_handler("shutdown", _on_shutdown) diff --git a/services/payments/src/simcore_service_payments/services/auto_recharge_process_message.py b/services/payments/src/simcore_service_payments/services/auto_recharge_process_message.py new file mode 100644 index 00000000000..8480063b19b --- /dev/null +++ b/services/payments/src/simcore_service_payments/services/auto_recharge_process_message.py @@ -0,0 +1,20 @@ +import logging + +from fastapi import FastAPI +from models_library.rabbitmq_messages import WalletCreditsMessage +from pydantic import parse_raw_as + +_logger = logging.getLogger(__name__) + + +async def process_message(app: FastAPI, data: bytes) -> bool: + assert app # nosec + rabbit_message = parse_raw_as(WalletCreditsMessage, data) + _logger.debug("Process msg: %s", rabbit_message) + + # 1. Check if auto-recharge functionality is ON for wallet_id + # 2. Check if wallet credits are bellow the threshold + # 3. Get Payment method + # 4. Pay with payment method + + return True diff --git a/services/payments/tests/unit/test_services_auto_recharge_listener.py b/services/payments/tests/unit/test_services_auto_recharge_listener.py new file mode 100644 index 00000000000..34e82ec1993 --- /dev/null +++ b/services/payments/tests/unit/test_services_auto_recharge_listener.py @@ -0,0 +1,75 @@ +from collections.abc import Callable +from decimal import Decimal +from unittest import mock + +import pytest +from fastapi import FastAPI +from models_library.rabbitmq_messages import WalletCreditsMessage +from pytest_mock.plugin import MockerFixture +from pytest_simcore.helpers.typing_env import EnvVarsDict +from pytest_simcore.helpers.utils_envs import setenvs_from_dict +from servicelib.rabbitmq import RabbitMQClient +from tenacity._asyncio import AsyncRetrying +from tenacity.retry import retry_if_exception_type +from tenacity.stop import stop_after_delay +from tenacity.wait import wait_fixed + +pytest_simcore_core_services_selection = [ + "postgres", + "rabbit", +] +pytest_simcore_ops_services_selection = [ + "adminer", +] + + +@pytest.fixture +def app_environment( + monkeypatch: pytest.MonkeyPatch, + app_environment: EnvVarsDict, + rabbit_env_vars_dict: EnvVarsDict, # rabbitMQ settings from 'rabbit' service + postgres_env_vars_dict: EnvVarsDict, + wait_for_postgres_ready_and_db_migrated: None, + external_environment: EnvVarsDict, +): + # set environs + monkeypatch.delenv("PAYMENTS_RABBITMQ", raising=False) + monkeypatch.delenv("PAYMENTS_POSTGRES", raising=False) + + return setenvs_from_dict( + monkeypatch, + { + **app_environment, + **rabbit_env_vars_dict, + **postgres_env_vars_dict, + **external_environment, + "POSTGRES_CLIENT_NAME": "payments-service-pg-client", + }, + ) + + +@pytest.fixture +async def mocked_message_parser(mocker: MockerFixture) -> mock.AsyncMock: + # return mocker.AsyncMock(return_value=True) + return mocker.patch( + "simcore_service_payments.services.auto_recharge_listener.process_message" + ) + + +async def test_process_event_functions( + mocked_message_parser: mock.AsyncMock, + app: FastAPI, + rabbitmq_client: Callable[[str], RabbitMQClient], +): + publisher = rabbitmq_client("publisher") + msg = WalletCreditsMessage(wallet_id=1, credits=Decimal(120.5)) + await publisher.publish(WalletCreditsMessage.get_channel_name(), msg) + + async for attempt in AsyncRetrying( + wait=wait_fixed(0.1), + stop=stop_after_delay(5), + retry=retry_if_exception_type(AssertionError), + reraise=True, + ): + with attempt: + mocked_message_parser.assert_called_once() From bd38e85063ef8b7f01d9f987c80e99cfc43f224b Mon Sep 17 00:00:00 2001 From: matusdrobuliak66 Date: Fri, 10 Nov 2023 16:03:19 +0100 Subject: [PATCH 02/14] fix test --- .../tests/unit/test_services_auto_recharge_listener.py | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/services/payments/tests/unit/test_services_auto_recharge_listener.py b/services/payments/tests/unit/test_services_auto_recharge_listener.py index 34e82ec1993..78ff4097d02 100644 --- a/services/payments/tests/unit/test_services_auto_recharge_listener.py +++ b/services/payments/tests/unit/test_services_auto_recharge_listener.py @@ -1,3 +1,9 @@ +# pylint: disable=protected-access +# pylint: disable=redefined-outer-name +# pylint: disable=too-many-arguments +# pylint: disable=unused-argument +# pylint: disable=unused-variable + from collections.abc import Callable from decimal import Decimal from unittest import mock From caefaf185f0472b6c20f94fa168d60ec0649ddb5 Mon Sep 17 00:00:00 2001 From: matusdrobuliak66 Date: Fri, 10 Nov 2023 17:03:53 +0100 Subject: [PATCH 03/14] fix tests --- services/payments/tests/unit/conftest.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/services/payments/tests/unit/conftest.py b/services/payments/tests/unit/conftest.py index f177e747db8..b46bc3a4673 100644 --- a/services/payments/tests/unit/conftest.py +++ b/services/payments/tests/unit/conftest.py @@ -57,6 +57,9 @@ def _do(): # The following services are affected if rabbitmq is not in place mocker.patch("simcore_service_payments.core.application.setup_rabbitmq") mocker.patch("simcore_service_payments.core.application.setup_rpc_api_routes") + mocker.patch( + "simcore_service_payments.core.application.setup_auto_recharge_listener" + ) return _do From 1967ea48b65c9f0bc8f0c05036935f55d5a6c48e Mon Sep 17 00:00:00 2001 From: matusdrobuliak66 Date: Sun, 12 Nov 2023 11:03:00 +0100 Subject: [PATCH 04/14] review @sanderegg --- .../src/servicelib/rabbitmq/_client.py | 14 +++++++++++++- .../tests/rabbitmq/test_rabbitmq.py | 17 +++++++++++++++++ .../services/auto_recharge_listener.py | 13 +++++++++++-- .../test_services_auto_recharge_listener.py | 1 - 4 files changed, 41 insertions(+), 4 deletions(-) diff --git a/packages/service-library/src/servicelib/rabbitmq/_client.py b/packages/service-library/src/servicelib/rabbitmq/_client.py index ae668f26737..542bd2ba2ee 100644 --- a/packages/service-library/src/servicelib/rabbitmq/_client.py +++ b/packages/service-library/src/servicelib/rabbitmq/_client.py @@ -63,6 +63,9 @@ async def _get_channel(self) -> aio_pika.abc.AbstractChannel: channel.close_callbacks.add(self._channel_close_callback) return channel + async def _get_consumer_tag(self, exchange_name) -> str: + return f"{get_rabbitmq_client_unique_name(self.client_name)}_{exchange_name}" + async def subscribe( self, exchange_name: str, @@ -139,10 +142,11 @@ async def _on_message( ) await message.nack() + _consumer_tag = await self._get_consumer_tag(exchange_name) await queue.consume( _on_message, exclusive=exclusive_queue, - consumer_tag=f"{get_rabbitmq_client_unique_name(self.client_name)}_{exchange_name}", + consumer_tag=_consumer_tag, ) output: str = queue.name return output @@ -214,3 +218,11 @@ async def publish(self, exchange_name: str, message: RabbitMessage) -> None: aio_pika.Message(message.body()), routing_key=message.routing_key() or "", ) + + async def unsubscribe_consumer(self, exchange_name: str): + assert self._channel_pool # nosec + async with self._channel_pool.acquire() as channel: + queue_name = exchange_name + queue = await channel.get_queue(queue_name) + _consumer_tag = await self._get_consumer_tag(exchange_name) + await queue.cancel(_consumer_tag) diff --git a/packages/service-library/tests/rabbitmq/test_rabbitmq.py b/packages/service-library/tests/rabbitmq/test_rabbitmq.py index bf95e267d7c..af93023ea5f 100644 --- a/packages/service-library/tests/rabbitmq/test_rabbitmq.py +++ b/packages/service-library/tests/rabbitmq/test_rabbitmq.py @@ -435,3 +435,20 @@ async def test_rabbit_not_using_the_same_exchange_type_raises( # now do a second subscribtion wiht topics, will create a TOPICS exchange with pytest.raises(aio_pika.exceptions.ChannelPreconditionFailed): await client.subscribe(exchange_name, mocked_message_parser, topics=[]) + + +@pytest.mark.no_cleanup_check_rabbitmq_server_has_no_errors() +async def test_unsubscribe_consumer( + rabbitmq_client: Callable[[str], RabbitMQClient], + random_exchange_name: Callable[[], str], + mocked_message_parser: mock.AsyncMock, +): + exchange_name = f"{random_exchange_name()}" + client = rabbitmq_client("consumer") + await client.subscribe(exchange_name, mocked_message_parser, exclusive_queue=False) + # Unsubsribe just a consumer, the queue will be still there + await client.unsubscribe_consumer(exchange_name) + # Unsubsribe the queue + await client.unsubscribe(exchange_name) + with pytest.raises(aio_pika.exceptions.ChannelNotFoundEntity): + await client.unsubscribe(exchange_name) diff --git a/services/payments/src/simcore_service_payments/services/auto_recharge_listener.py b/services/payments/src/simcore_service_payments/services/auto_recharge_listener.py index 2949c847b02..a71038cafb5 100644 --- a/services/payments/src/simcore_service_payments/services/auto_recharge_listener.py +++ b/services/payments/src/simcore_service_payments/services/auto_recharge_listener.py @@ -24,13 +24,22 @@ async def _subscribe_to_rabbitmq(app) -> str: return subscribed_queue +async def _unsubscribe_consumer(app) -> None: + with log_context(_logger, logging.INFO, msg="Unsubscribing from rabbitmq queue"): + rabbit_client: RabbitMQClient = get_rabbitmq_client(app) + await rabbit_client.unsubscribe_consumer( + WalletCreditsMessage.get_channel_name(), + ) + return None + + def setup_auto_recharge_listener(app: FastAPI) -> None: async def _on_startup() -> None: app.state.auto_recharge_rabbitmq_consumer = await _subscribe_to_rabbitmq(app) async def _on_shutdown() -> None: - # NOTE: We want to have persistent queue, therefore we will not unsubscribe - ... + # NOTE: We want to have persistent queue, therefore we will unsubscribe only consumer + app.state.auto_recharge_rabbitmq_constumer = await _unsubscribe_consumer(app) app.add_event_handler("startup", _on_startup) app.add_event_handler("shutdown", _on_shutdown) diff --git a/services/payments/tests/unit/test_services_auto_recharge_listener.py b/services/payments/tests/unit/test_services_auto_recharge_listener.py index 78ff4097d02..9557dba382d 100644 --- a/services/payments/tests/unit/test_services_auto_recharge_listener.py +++ b/services/payments/tests/unit/test_services_auto_recharge_listener.py @@ -56,7 +56,6 @@ def app_environment( @pytest.fixture async def mocked_message_parser(mocker: MockerFixture) -> mock.AsyncMock: - # return mocker.AsyncMock(return_value=True) return mocker.patch( "simcore_service_payments.services.auto_recharge_listener.process_message" ) From 928cf2ca4fcb9798ca7bbaa7053d94037e2923d7 Mon Sep 17 00:00:00 2001 From: matusdrobuliak66 Date: Mon, 4 Dec 2023 18:50:24 +0100 Subject: [PATCH 05/14] adding experimental RUT test --- .../tests/resource_usage_tracker.py | 89 +++++++++++++++++++ 1 file changed, 89 insertions(+) create mode 100644 tests/e2e-playwright/tests/resource_usage_tracker.py diff --git a/tests/e2e-playwright/tests/resource_usage_tracker.py b/tests/e2e-playwright/tests/resource_usage_tracker.py new file mode 100644 index 00000000000..13ea26569de --- /dev/null +++ b/tests/e2e-playwright/tests/resource_usage_tracker.py @@ -0,0 +1,89 @@ +# pylint: disable=redefined-outer-name +# pylint: disable=unused-argument +# pylint: disable=unused-variable +# pylint: disable=too-many-arguments +# pylint: disable=too-many-statements +# pylint: disable=unnecessary-lambda + +import os +import pytest +from playwright.sync_api import APIRequestContext, Page +from tenacity import Retrying +from tenacity.retry import retry_if_exception_type +from tenacity.stop import stop_after_delay +from tenacity.wait import wait_fixed +from datetime import datetime, timezone + +PRODUCT_URL = os.environ["PRODUCT_URL"] +PRODUCT_BILLABLE = os.environ["PRODUCT_BILLABLE"] +USER_NAME = os.environ["USER_NAME"] +USER_PASSWORD = os.environ["USER_PASSWORD"] +NUM_OF_SLEEPERS = os.environ["NUM_OF_SLEEPERS"] +WALLET_ID = os.environ["WALLET_ID"] +STUDY_ID = os.environ["STUDY_ID"] + + +@pytest.fixture +def product_and_user() -> tuple: + product_url = PRODUCT_URL + user_name = USER_NAME + user_password = USER_PASSWORD + return (product_url, user_name, user_password) + +@pytest.mark.testit +def test_resource_usage_tracker( + page: Page, + log_in_and_out: None, + api_request_context: APIRequestContext, + product_and_user: tuple, +): + # 1. Resource usage before + rut_before = api_request_context.get( + f"{PRODUCT_URL}v0/services/-/resource-usages?wallet_id={WALLET_ID}&offset=0&limit={NUM_OF_SLEEPERS}" + ) + assert rut_before.status == 200 + service_runs_before = rut_before.json()['data'] + service_run_ids_before = set() + for service_run in service_runs_before: + service_run_ids_before.add(service_run['service_run_id']) + print(service_run_ids_before) + + # 2. Start computations + data = {"subgraph": [], "force_restart": True} + resp = api_request_context.post( + f"{PRODUCT_URL}v0/computations/{STUDY_ID}:start", + data=data, + ) + assert resp.status == 201 + + for attempt in Retrying( + wait=wait_fixed(60), + stop=stop_after_delay(1000), + retry=retry_if_exception_type(AssertionError), + reraise=True, + ): + with attempt: + print(f"====================={datetime.now(tz=timezone.utc)}=============================") + output = api_request_context.get( + f"{PRODUCT_URL}v0/projects/{STUDY_ID}" + ) + assert output.status == 200 + workbench = output.json()['data']['workbench'] + assert len(workbench.keys()) == NUM_OF_SLEEPERS + status_check = set() + for node in list(workbench.keys()): + node_label = workbench[node]['label'] + node_current_status = workbench[node]['state']['currentStatus'] + print((node_label, node_current_status, node)) + status_check.add(node_current_status) + + assert len(status_check.union({"SUCCESS", "FAILED"})) == 2 + + # 3. Check Resource usage after + rut_after = api_request_context.get(f"{PRODUCT_URL}v0/services/-/resource-usages?wallet_id={WALLET_ID}&offset=0&limit={NUM_OF_SLEEPERS}") + service_runs_after = rut_after.json()['data'] + service_run_ids_after = set() + for service_run in service_runs_after: + service_run_ids_after.add(service_run['service_run_id']) + + assert service_run_ids_before.intersection(service_run_ids_after) == {} From 564538a4969491fb2e1055132470771e0c4de1ef Mon Sep 17 00:00:00 2001 From: matusdrobuliak66 Date: Mon, 4 Dec 2023 19:20:43 +0100 Subject: [PATCH 06/14] fix experimental test checkout --- .../tests/resource_usage_tracker.py | 15 +++++++++------ 1 file changed, 9 insertions(+), 6 deletions(-) diff --git a/tests/e2e-playwright/tests/resource_usage_tracker.py b/tests/e2e-playwright/tests/resource_usage_tracker.py index 13ea26569de..9e60580f686 100644 --- a/tests/e2e-playwright/tests/resource_usage_tracker.py +++ b/tests/e2e-playwright/tests/resource_usage_tracker.py @@ -13,6 +13,7 @@ from tenacity.stop import stop_after_delay from tenacity.wait import wait_fixed from datetime import datetime, timezone +from http import HTTPStatus PRODUCT_URL = os.environ["PRODUCT_URL"] PRODUCT_BILLABLE = os.environ["PRODUCT_BILLABLE"] @@ -30,9 +31,8 @@ def product_and_user() -> tuple: user_password = USER_PASSWORD return (product_url, user_name, user_password) -@pytest.mark.testit + def test_resource_usage_tracker( - page: Page, log_in_and_out: None, api_request_context: APIRequestContext, product_and_user: tuple, @@ -41,7 +41,7 @@ def test_resource_usage_tracker( rut_before = api_request_context.get( f"{PRODUCT_URL}v0/services/-/resource-usages?wallet_id={WALLET_ID}&offset=0&limit={NUM_OF_SLEEPERS}" ) - assert rut_before.status == 200 + assert rut_before.status == HTTPStatus.OK service_runs_before = rut_before.json()['data'] service_run_ids_before = set() for service_run in service_runs_before: @@ -54,7 +54,7 @@ def test_resource_usage_tracker( f"{PRODUCT_URL}v0/computations/{STUDY_ID}:start", data=data, ) - assert resp.status == 201 + assert resp.status == HTTPStatus.CREATED for attempt in Retrying( wait=wait_fixed(60), @@ -67,9 +67,9 @@ def test_resource_usage_tracker( output = api_request_context.get( f"{PRODUCT_URL}v0/projects/{STUDY_ID}" ) - assert output.status == 200 + assert output.status == HTTPStatus.OK workbench = output.json()['data']['workbench'] - assert len(workbench.keys()) == NUM_OF_SLEEPERS + assert len(workbench.keys()) == int(NUM_OF_SLEEPERS) status_check = set() for node in list(workbench.keys()): node_label = workbench[node]['label'] @@ -81,9 +81,12 @@ def test_resource_usage_tracker( # 3. Check Resource usage after rut_after = api_request_context.get(f"{PRODUCT_URL}v0/services/-/resource-usages?wallet_id={WALLET_ID}&offset=0&limit={NUM_OF_SLEEPERS}") + assert rut_after.status == HTTPStatus.OK service_runs_after = rut_after.json()['data'] service_run_ids_after = set() for service_run in service_runs_after: service_run_ids_after.add(service_run['service_run_id']) + # If there is an intersection with old service run id, that means that + # RUT didn't created a new service run id assert service_run_ids_before.intersection(service_run_ids_after) == {} From efa9d6abee6e2ed15b3e4c6e02d20db97fd3096b Mon Sep 17 00:00:00 2001 From: matusdrobuliak66 Date: Mon, 4 Dec 2023 19:21:30 +0100 Subject: [PATCH 07/14] fix experimental test checkout --- tests/e2e-playwright/tests/resource_usage_tracker.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/e2e-playwright/tests/resource_usage_tracker.py b/tests/e2e-playwright/tests/resource_usage_tracker.py index 9e60580f686..e2ad27513f1 100644 --- a/tests/e2e-playwright/tests/resource_usage_tracker.py +++ b/tests/e2e-playwright/tests/resource_usage_tracker.py @@ -58,7 +58,7 @@ def test_resource_usage_tracker( for attempt in Retrying( wait=wait_fixed(60), - stop=stop_after_delay(1000), + stop=stop_after_delay(800), retry=retry_if_exception_type(AssertionError), reraise=True, ): From b2bd8a7cb246f2bd1af1c3d133d42f2e2f5aaa4a Mon Sep 17 00:00:00 2001 From: matusdrobuliak66 Date: Mon, 4 Dec 2023 20:22:20 +0100 Subject: [PATCH 08/14] fix experimental test checkout --- tests/e2e-playwright/tests/resource_usage_tracker.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/e2e-playwright/tests/resource_usage_tracker.py b/tests/e2e-playwright/tests/resource_usage_tracker.py index e2ad27513f1..bc6d4655984 100644 --- a/tests/e2e-playwright/tests/resource_usage_tracker.py +++ b/tests/e2e-playwright/tests/resource_usage_tracker.py @@ -89,4 +89,4 @@ def test_resource_usage_tracker( # If there is an intersection with old service run id, that means that # RUT didn't created a new service run id - assert service_run_ids_before.intersection(service_run_ids_after) == {} + assert service_run_ids_before.intersection(service_run_ids_after) == set() From f4e52c419ece0a6cc67f20eae304f7aaaa8f6d16 Mon Sep 17 00:00:00 2001 From: matusdrobuliak66 Date: Mon, 4 Dec 2023 20:26:15 +0100 Subject: [PATCH 09/14] fix experimental test print --- tests/e2e-playwright/tests/resource_usage_tracker.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/tests/e2e-playwright/tests/resource_usage_tracker.py b/tests/e2e-playwright/tests/resource_usage_tracker.py index bc6d4655984..6abeb7ecca7 100644 --- a/tests/e2e-playwright/tests/resource_usage_tracker.py +++ b/tests/e2e-playwright/tests/resource_usage_tracker.py @@ -46,7 +46,7 @@ def test_resource_usage_tracker( service_run_ids_before = set() for service_run in service_runs_before: service_run_ids_before.add(service_run['service_run_id']) - print(service_run_ids_before) + print(f"Service runs after: {service_run_ids_before}") # 2. Start computations data = {"subgraph": [], "force_restart": True} @@ -86,6 +86,7 @@ def test_resource_usage_tracker( service_run_ids_after = set() for service_run in service_runs_after: service_run_ids_after.add(service_run['service_run_id']) + print(f"Service runs after: {service_run_ids_after}") # If there is an intersection with old service run id, that means that # RUT didn't created a new service run id From a5d4b7b9111a4556372029c94cf1f95e3d3f9180 Mon Sep 17 00:00:00 2001 From: matusdrobuliak66 Date: Mon, 4 Dec 2023 22:26:17 +0100 Subject: [PATCH 10/14] fgenerate after some manual modification during testing --- tests/e2e-playwright/tests/resource_usage_tracker.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/e2e-playwright/tests/resource_usage_tracker.py b/tests/e2e-playwright/tests/resource_usage_tracker.py index 6abeb7ecca7..6ae798f1178 100644 --- a/tests/e2e-playwright/tests/resource_usage_tracker.py +++ b/tests/e2e-playwright/tests/resource_usage_tracker.py @@ -58,7 +58,7 @@ def test_resource_usage_tracker( for attempt in Retrying( wait=wait_fixed(60), - stop=stop_after_delay(800), + stop=stop_after_delay(1000), retry=retry_if_exception_type(AssertionError), reraise=True, ): From b91dfacc77ac685aaa88c7f374da7b8ce0f6b398 Mon Sep 17 00:00:00 2001 From: matusdrobuliak66 Date: Tue, 5 Dec 2023 09:26:58 +0100 Subject: [PATCH 11/14] adding stopping in case of fail --- .../tests/resource_usage_tracker.py | 33 ++++++++++++------- 1 file changed, 22 insertions(+), 11 deletions(-) diff --git a/tests/e2e-playwright/tests/resource_usage_tracker.py b/tests/e2e-playwright/tests/resource_usage_tracker.py index 6ae798f1178..337adb2884d 100644 --- a/tests/e2e-playwright/tests/resource_usage_tracker.py +++ b/tests/e2e-playwright/tests/resource_usage_tracker.py @@ -6,6 +6,7 @@ # pylint: disable=unnecessary-lambda import os +from typing import Iterator import pytest from playwright.sync_api import APIRequestContext, Page from tenacity import Retrying @@ -32,20 +33,28 @@ def product_and_user() -> tuple: return (product_url, user_name, user_password) +def stop_pipeline(api_request_context) -> Iterator[None]: + + yield + + api_request_context.post(f"{PRODUCT_URL}v0/computations/{STUDY_ID}:stop") + + def test_resource_usage_tracker( log_in_and_out: None, api_request_context: APIRequestContext, product_and_user: tuple, + stop_pipeline: None, ): # 1. Resource usage before rut_before = api_request_context.get( f"{PRODUCT_URL}v0/services/-/resource-usages?wallet_id={WALLET_ID}&offset=0&limit={NUM_OF_SLEEPERS}" ) assert rut_before.status == HTTPStatus.OK - service_runs_before = rut_before.json()['data'] + service_runs_before = rut_before.json()["data"] service_run_ids_before = set() for service_run in service_runs_before: - service_run_ids_before.add(service_run['service_run_id']) + service_run_ids_before.add(service_run["service_run_id"]) print(f"Service runs after: {service_run_ids_before}") # 2. Start computations @@ -63,29 +72,31 @@ def test_resource_usage_tracker( reraise=True, ): with attempt: - print(f"====================={datetime.now(tz=timezone.utc)}=============================") - output = api_request_context.get( - f"{PRODUCT_URL}v0/projects/{STUDY_ID}" + print( + f"====================={datetime.now(tz=timezone.utc)}=============================" ) + output = api_request_context.get(f"{PRODUCT_URL}v0/projects/{STUDY_ID}") assert output.status == HTTPStatus.OK - workbench = output.json()['data']['workbench'] + workbench = output.json()["data"]["workbench"] assert len(workbench.keys()) == int(NUM_OF_SLEEPERS) status_check = set() for node in list(workbench.keys()): - node_label = workbench[node]['label'] - node_current_status = workbench[node]['state']['currentStatus'] + node_label = workbench[node]["label"] + node_current_status = workbench[node]["state"]["currentStatus"] print((node_label, node_current_status, node)) status_check.add(node_current_status) assert len(status_check.union({"SUCCESS", "FAILED"})) == 2 # 3. Check Resource usage after - rut_after = api_request_context.get(f"{PRODUCT_URL}v0/services/-/resource-usages?wallet_id={WALLET_ID}&offset=0&limit={NUM_OF_SLEEPERS}") + rut_after = api_request_context.get( + f"{PRODUCT_URL}v0/services/-/resource-usages?wallet_id={WALLET_ID}&offset=0&limit={NUM_OF_SLEEPERS}" + ) assert rut_after.status == HTTPStatus.OK - service_runs_after = rut_after.json()['data'] + service_runs_after = rut_after.json()["data"] service_run_ids_after = set() for service_run in service_runs_after: - service_run_ids_after.add(service_run['service_run_id']) + service_run_ids_after.add(service_run["service_run_id"]) print(f"Service runs after: {service_run_ids_after}") # If there is an intersection with old service run id, that means that From 6e29910c885a7927c9e3ef9813e65115f7cf6fb0 Mon Sep 17 00:00:00 2001 From: matusdrobuliak66 Date: Tue, 5 Dec 2023 09:29:56 +0100 Subject: [PATCH 12/14] adding stopping in case of fail --- tests/e2e-playwright/tests/resource_usage_tracker.py | 1 + 1 file changed, 1 insertion(+) diff --git a/tests/e2e-playwright/tests/resource_usage_tracker.py b/tests/e2e-playwright/tests/resource_usage_tracker.py index 337adb2884d..1a09ca5f7ac 100644 --- a/tests/e2e-playwright/tests/resource_usage_tracker.py +++ b/tests/e2e-playwright/tests/resource_usage_tracker.py @@ -33,6 +33,7 @@ def product_and_user() -> tuple: return (product_url, user_name, user_password) +@pytest.fixture def stop_pipeline(api_request_context) -> Iterator[None]: yield From c86541336cc55c50d47213d4c4134ade8788c3f4 Mon Sep 17 00:00:00 2001 From: matusdrobuliak66 Date: Tue, 5 Dec 2023 10:02:29 +0100 Subject: [PATCH 13/14] minor --- tests/e2e-playwright/tests/resource_usage_tracker.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/e2e-playwright/tests/resource_usage_tracker.py b/tests/e2e-playwright/tests/resource_usage_tracker.py index 1a09ca5f7ac..54dec5adaca 100644 --- a/tests/e2e-playwright/tests/resource_usage_tracker.py +++ b/tests/e2e-playwright/tests/resource_usage_tracker.py @@ -56,7 +56,7 @@ def test_resource_usage_tracker( service_run_ids_before = set() for service_run in service_runs_before: service_run_ids_before.add(service_run["service_run_id"]) - print(f"Service runs after: {service_run_ids_before}") + print(f"Service runs before: {service_run_ids_before}") # 2. Start computations data = {"subgraph": [], "force_restart": True} From e33c21b022cf89ceccaf72f7f346dc6a66545c24 Mon Sep 17 00:00:00 2001 From: matusdrobuliak66 Date: Fri, 8 Dec 2023 10:44:30 +0100 Subject: [PATCH 14/14] fix test --- tests/e2e-playwright/tests/resource_usage_tracker.py | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/tests/e2e-playwright/tests/resource_usage_tracker.py b/tests/e2e-playwright/tests/resource_usage_tracker.py index 54dec5adaca..bc2a336abe1 100644 --- a/tests/e2e-playwright/tests/resource_usage_tracker.py +++ b/tests/e2e-playwright/tests/resource_usage_tracker.py @@ -6,15 +6,16 @@ # pylint: disable=unnecessary-lambda import os +from datetime import datetime, timezone +from http import HTTPStatus from typing import Iterator + import pytest -from playwright.sync_api import APIRequestContext, Page +from playwright.sync_api import APIRequestContext from tenacity import Retrying from tenacity.retry import retry_if_exception_type from tenacity.stop import stop_after_delay from tenacity.wait import wait_fixed -from datetime import datetime, timezone -from http import HTTPStatus PRODUCT_URL = os.environ["PRODUCT_URL"] PRODUCT_BILLABLE = os.environ["PRODUCT_BILLABLE"]