diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index 7b1f6b6..6a0fb22 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -8,7 +8,7 @@ repos: - id: end-of-file-fixer - id: trailing-whitespace - repo: https://github.com/astral-sh/ruff-pre-commit - rev: v0.5.7 + rev: v0.9.4 hooks: - id: ruff args: [ --fix ] diff --git a/examples/2_counting/counting_service.py b/examples/2_counting/counting_service.py index 6304d51..a02f4b9 100644 --- a/examples/2_counting/counting_service.py +++ b/examples/2_counting/counting_service.py @@ -4,6 +4,9 @@ from dataclasses import dataclass from typing import Optional +from pydantic import BaseModel, Field +from typing_extensions import Annotated + from intersect_sdk import ( HierarchyConfig, IntersectBaseCapabilityImplementation, @@ -13,8 +16,6 @@ intersect_message, intersect_status, ) -from pydantic import BaseModel, Field -from typing_extensions import Annotated logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) diff --git a/examples/4_service_to_service_events/__init__.py b/examples/4_service_to_service_events/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/examples/4_service_to_service_events/example_client.py b/examples/4_service_to_service_events/example_client.py new file mode 100644 index 0000000..1f8198f --- /dev/null +++ b/examples/4_service_to_service_events/example_client.py @@ -0,0 +1,78 @@ +"""Client for service to service example. + +Listens for events from the exposed_service, and prints each one out. +Once it gets two events, it terminates itself. +""" + +import logging + +from intersect_sdk import ( + INTERSECT_JSON_VALUE, + IntersectClient, + IntersectClientCallback, + IntersectClientConfig, + default_intersect_lifecycle_loop, +) + +logging.basicConfig(level=logging.INFO) +logger = logging.getLogger(__name__) + + +class SampleOrchestrator: + """Simply contains an event callback for events from the exposed service. + + In this example, we just want to receive two events from the exposed service before killing the client. + """ + + def __init__(self) -> None: + """Straightforward constructor, just initializes global variable which counts events.""" + self.got_first_event = False + + def event_callback( + self, _source: str, _operation: str, _event_name: str, payload: INTERSECT_JSON_VALUE + ) -> None: + """This simply prints the event from the exposed service to your console. + + Params: + source: the source of the response message. + operation: the name of the function we called in the original message. + _has_error: Boolean value which represents an error. + payload: Value of the response from the Service. + """ + print(payload) + if self.got_first_event: + # break out of pubsub loop + raise Exception + self.got_first_event = True + # empty return, don't send any additional messages or modify the events listened to + + +if __name__ == '__main__': + from_config_file = { + 'brokers': [ + { + 'username': 'intersect_username', + 'password': 'intersect_password', + 'port': 5672, + 'protocol': 'amqp0.9.1', + }, + ], + } + + # Listen for an event on the exposed service + config = IntersectClientConfig( + initial_message_event_config=IntersectClientCallback( + services_to_start_listening_for_events=[ + 'example-organization.example-facility.example-system.example-subsystem.exposed-service' + ], + ), + **from_config_file, + ) + orchestrator = SampleOrchestrator() + client = IntersectClient( + config=config, + event_callback=orchestrator.event_callback, + ) + default_intersect_lifecycle_loop( + client, + ) diff --git a/examples/4_service_to_service_events/exposed_service.py b/examples/4_service_to_service_events/exposed_service.py new file mode 100644 index 0000000..73a46e4 --- /dev/null +++ b/examples/4_service_to_service_events/exposed_service.py @@ -0,0 +1,87 @@ +"""Exposed service. + +This service listens for events from the internal service, and then emits its own +'exposed_service_event' event. The client listens for the 'exposed_service_event'. +""" + +import logging + +from intersect_sdk import ( + HierarchyConfig, + IntersectBaseCapabilityImplementation, + IntersectEventDefinition, + IntersectService, + IntersectServiceConfig, + default_intersect_lifecycle_loop, + intersect_event, +) + +logging.basicConfig(level=logging.INFO) +logger = logging.getLogger(__name__) + + +class ExposedServiceCapabilityImplementation(IntersectBaseCapabilityImplementation): + """Exposed service capability.""" + + intersect_sdk_capability_name = 'ExposedService' + + def on_service_startup(self) -> None: + """This function will get called when starting up the Service. + + Note that while we could call this function earlier, we should not call this function in the constructor. The order of things which need to happen: + + 1) Capability constructor is called + 2) Service constructor is called (which will include this capability) + 3) We can now start listening for events. + + Note that you do not have to explicitly start the Service, you only need to follow steps one and two. + """ + self.intersect_sdk_listen_for_service_event( + HierarchyConfig( + organization='example-organization', + facility='example-facility', + system='example-system', + subsystem='example-subsystem', + service='internal-service', + ), + 'internal_service_event', + self.on_internal_service_event, + ) + + @intersect_event(events={'exposed_service_event': IntersectEventDefinition(event_type=str)}) + def on_internal_service_event( + self, source: str, _operation: str, event_name: str, payload: str + ) -> None: + """When we get an event back from the internal_service, we will emit our own event.""" + self.intersect_sdk_emit_event( + 'exposed_service_event', + f'From event "{event_name}", received message "{payload}" from "{source}"', + ) + + +if __name__ == '__main__': + from_config_file = { + 'brokers': [ + { + 'username': 'intersect_username', + 'password': 'intersect_password', + 'port': 5672, + 'protocol': 'amqp0.9.1', + }, + ], + } + config = IntersectServiceConfig( + hierarchy=HierarchyConfig( + organization='example-organization', + facility='example-facility', + system='example-system', + subsystem='example-subsystem', + service='exposed-service', + ), + status_interval=30.0, + **from_config_file, + ) + capability = ExposedServiceCapabilityImplementation() + service = IntersectService([capability], config) + logger.info('Starting Service 1, use Ctrl+C to exit.') + default_intersect_lifecycle_loop(service, post_startup_callback=capability.on_service_startup) diff --git a/examples/4_service_to_service_events/internal_service.py b/examples/4_service_to_service_events/internal_service.py new file mode 100644 index 0000000..fdfc9c4 --- /dev/null +++ b/examples/4_service_to_service_events/internal_service.py @@ -0,0 +1,73 @@ +"""Internal service. + +This service periodically emits an 'internal_service_event' string, as an event. +The exposed service listens to the events of this service. +""" + +import logging +import threading +import time + +from intersect_sdk import ( + HierarchyConfig, + IntersectBaseCapabilityImplementation, + IntersectEventDefinition, + IntersectService, + IntersectServiceConfig, + default_intersect_lifecycle_loop, + intersect_event, +) + +logging.basicConfig(level=logging.INFO) +logger = logging.getLogger(__name__) + + +class InternalServiceCapabilityImplementation(IntersectBaseCapabilityImplementation): + """Internal service capability.""" + + intersect_sdk_capability_name = 'InternalService' + + def after_service_startup(self) -> None: + """Called after service startup.""" + self.thread = threading.Thread( + target=self.internal_service_event_generator, daemon=True, name='event_thread' + ) + self.thread.start() + + @intersect_event(events={'internal_service_event': IntersectEventDefinition(event_type=str)}) + def internal_service_event_generator(self) -> str: + """Emits a periodic internal_service_event event.""" + while True: + time.sleep(2.0) + self.intersect_sdk_emit_event('internal_service_event', 'not_feeling_creative') + + +if __name__ == '__main__': + from_config_file = { + 'brokers': [ + { + 'username': 'intersect_username', + 'password': 'intersect_password', + 'port': 5672, + 'protocol': 'amqp0.9.1', + }, + ], + } + config = IntersectServiceConfig( + hierarchy=HierarchyConfig( + organization='example-organization', + facility='example-facility', + system='example-system', + subsystem='example-subsystem', + service='internal-service', + ), + status_interval=30.0, + **from_config_file, + ) + capability = InternalServiceCapabilityImplementation() + service = IntersectService([capability], config) + logger.info('Starting Service 2, use Ctrl+C to exit.') + default_intersect_lifecycle_loop( + service, + post_startup_callback=capability.after_service_startup, + ) diff --git a/pdm.lock b/pdm.lock index f305f6f..ede1c27 100644 --- a/pdm.lock +++ b/pdm.lock @@ -5,7 +5,7 @@ groups = ["default", "amqp", "docs", "lint", "test"] strategy = ["inherit_metadata"] lock_version = "4.5.0" -content_hash = "sha256:ff374db8858c76f77a76bfeea9be055aeda34404a49abbf60b64a3543deee5eb" +content_hash = "sha256:01496ba2183edd321b27ad6408825bff436ac2152d58364254ff713e086f3fa1" [[metadata.targets]] requires_python = ">=3.8.10,<4.0" @@ -1477,29 +1477,29 @@ files = [ [[package]] name = "ruff" -version = "0.5.7" +version = "0.9.4" requires_python = ">=3.7" summary = "An extremely fast Python linter and code formatter, written in Rust." groups = ["lint"] files = [ - {file = "ruff-0.5.7-py3-none-linux_armv6l.whl", hash = "sha256:548992d342fc404ee2e15a242cdbea4f8e39a52f2e7752d0e4cbe88d2d2f416a"}, - {file = "ruff-0.5.7-py3-none-macosx_10_12_x86_64.whl", hash = "sha256:00cc8872331055ee017c4f1071a8a31ca0809ccc0657da1d154a1d2abac5c0be"}, - {file = "ruff-0.5.7-py3-none-macosx_11_0_arm64.whl", hash = "sha256:eaf3d86a1fdac1aec8a3417a63587d93f906c678bb9ed0b796da7b59c1114a1e"}, - {file = "ruff-0.5.7-py3-none-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:a01c34400097b06cf8a6e61b35d6d456d5bd1ae6961542de18ec81eaf33b4cb8"}, - {file = "ruff-0.5.7-py3-none-manylinux_2_17_armv7l.manylinux2014_armv7l.whl", hash = "sha256:fcc8054f1a717e2213500edaddcf1dbb0abad40d98e1bd9d0ad364f75c763eea"}, - {file = "ruff-0.5.7-py3-none-manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:7f70284e73f36558ef51602254451e50dd6cc479f8b6f8413a95fcb5db4a55fc"}, - {file = "ruff-0.5.7-py3-none-manylinux_2_17_ppc64.manylinux2014_ppc64.whl", hash = "sha256:a78ad870ae3c460394fc95437d43deb5c04b5c29297815a2a1de028903f19692"}, - {file = "ruff-0.5.7-py3-none-manylinux_2_17_ppc64le.manylinux2014_ppc64le.whl", hash = "sha256:9ccd078c66a8e419475174bfe60a69adb36ce04f8d4e91b006f1329d5cd44bcf"}, - {file = "ruff-0.5.7-py3-none-manylinux_2_17_s390x.manylinux2014_s390x.whl", hash = "sha256:7e31c9bad4ebf8fdb77b59cae75814440731060a09a0e0077d559a556453acbb"}, - {file = "ruff-0.5.7-py3-none-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:8d796327eed8e168164346b769dd9a27a70e0298d667b4ecee6877ce8095ec8e"}, - {file = "ruff-0.5.7-py3-none-musllinux_1_2_aarch64.whl", hash = "sha256:4a09ea2c3f7778cc635e7f6edf57d566a8ee8f485f3c4454db7771efb692c499"}, - {file = "ruff-0.5.7-py3-none-musllinux_1_2_armv7l.whl", hash = "sha256:a36d8dcf55b3a3bc353270d544fb170d75d2dff41eba5df57b4e0b67a95bb64e"}, - {file = "ruff-0.5.7-py3-none-musllinux_1_2_i686.whl", hash = "sha256:9369c218f789eefbd1b8d82a8cf25017b523ac47d96b2f531eba73770971c9e5"}, - {file = "ruff-0.5.7-py3-none-musllinux_1_2_x86_64.whl", hash = "sha256:b88ca3db7eb377eb24fb7c82840546fb7acef75af4a74bd36e9ceb37a890257e"}, - {file = "ruff-0.5.7-py3-none-win32.whl", hash = "sha256:33d61fc0e902198a3e55719f4be6b375b28f860b09c281e4bdbf783c0566576a"}, - {file = "ruff-0.5.7-py3-none-win_amd64.whl", hash = "sha256:083bbcbe6fadb93cd86709037acc510f86eed5a314203079df174c40bbbca6b3"}, - {file = "ruff-0.5.7-py3-none-win_arm64.whl", hash = "sha256:2dca26154ff9571995107221d0aeaad0e75a77b5a682d6236cf89a58c70b76f4"}, - {file = "ruff-0.5.7.tar.gz", hash = "sha256:8dfc0a458797f5d9fb622dd0efc52d796f23f0a1493a9527f4e49a550ae9a7e5"}, + {file = "ruff-0.9.4-py3-none-linux_armv6l.whl", hash = "sha256:64e73d25b954f71ff100bb70f39f1ee09e880728efb4250c632ceed4e4cdf706"}, + {file = "ruff-0.9.4-py3-none-macosx_10_12_x86_64.whl", hash = "sha256:6ce6743ed64d9afab4fafeaea70d3631b4d4b28b592db21a5c2d1f0ef52934bf"}, + {file = "ruff-0.9.4-py3-none-macosx_11_0_arm64.whl", hash = "sha256:54499fb08408e32b57360f6f9de7157a5fec24ad79cb3f42ef2c3f3f728dfe2b"}, + {file = "ruff-0.9.4-py3-none-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:37c892540108314a6f01f105040b5106aeb829fa5fb0561d2dcaf71485021137"}, + {file = "ruff-0.9.4-py3-none-manylinux_2_17_armv7l.manylinux2014_armv7l.whl", hash = "sha256:de9edf2ce4b9ddf43fd93e20ef635a900e25f622f87ed6e3047a664d0e8f810e"}, + {file = "ruff-0.9.4-py3-none-manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:87c90c32357c74f11deb7fbb065126d91771b207bf9bfaaee01277ca59b574ec"}, + {file = "ruff-0.9.4-py3-none-manylinux_2_17_ppc64.manylinux2014_ppc64.whl", hash = "sha256:56acd6c694da3695a7461cc55775f3a409c3815ac467279dfa126061d84b314b"}, + {file = "ruff-0.9.4-py3-none-manylinux_2_17_ppc64le.manylinux2014_ppc64le.whl", hash = "sha256:e0c93e7d47ed951b9394cf352d6695b31498e68fd5782d6cbc282425655f687a"}, + {file = "ruff-0.9.4-py3-none-manylinux_2_17_s390x.manylinux2014_s390x.whl", hash = "sha256:1d4c8772670aecf037d1bf7a07c39106574d143b26cfe5ed1787d2f31e800214"}, + {file = "ruff-0.9.4-py3-none-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:bfc5f1d7afeda8d5d37660eeca6d389b142d7f2b5a1ab659d9214ebd0e025231"}, + {file = "ruff-0.9.4-py3-none-musllinux_1_2_aarch64.whl", hash = "sha256:faa935fc00ae854d8b638c16a5f1ce881bc3f67446957dd6f2af440a5fc8526b"}, + {file = "ruff-0.9.4-py3-none-musllinux_1_2_armv7l.whl", hash = "sha256:a6c634fc6f5a0ceae1ab3e13c58183978185d131a29c425e4eaa9f40afe1e6d6"}, + {file = "ruff-0.9.4-py3-none-musllinux_1_2_i686.whl", hash = "sha256:433dedf6ddfdec7f1ac7575ec1eb9844fa60c4c8c2f8887a070672b8d353d34c"}, + {file = "ruff-0.9.4-py3-none-musllinux_1_2_x86_64.whl", hash = "sha256:d612dbd0f3a919a8cc1d12037168bfa536862066808960e0cc901404b77968f0"}, + {file = "ruff-0.9.4-py3-none-win32.whl", hash = "sha256:db1192ddda2200671f9ef61d9597fcef89d934f5d1705e571a93a67fb13a4402"}, + {file = "ruff-0.9.4-py3-none-win_amd64.whl", hash = "sha256:05bebf4cdbe3ef75430d26c375773978950bbf4ee3c95ccb5448940dc092408e"}, + {file = "ruff-0.9.4-py3-none-win_arm64.whl", hash = "sha256:585792f1e81509e38ac5123492f8875fbc36f3ede8185af0a26df348e5154f41"}, + {file = "ruff-0.9.4.tar.gz", hash = "sha256:6907ee3529244bb0ed066683e075f09285b38dd5b4039370df6ff06041ca19e7"}, ] [[package]] diff --git a/pyproject.toml b/pyproject.toml index cbc34be..c8081f7 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -104,8 +104,6 @@ ignore = [ 'COM812', # formatter, handled by Ruff format 'ISC001', # formatter, handled by Ruff format 'SIM105', # "with contextlib.suppress():" is slower than try-except-pass - 'ANN101', # don't need to annotate "self" typing - 'ANN102', # don't need to annotate "cls" typing for class methods 'ANN401', # allow explicit "Any" typing, use with care 'PLR2004', # allow "magic numbers" ] @@ -171,7 +169,7 @@ exclude_also = [ [tool.pdm.dev-dependencies] lint = [ "pre-commit>=3.3.1", - "ruff==0.5.7", + "ruff==0.9.4", "mypy>=1.10.0", "types-paho-mqtt>=1.6.0.20240106", "codespell>=2.3.0", @@ -184,7 +182,8 @@ test-all = "pytest tests/ --cov=src/intersect_sdk/ --cov-fail-under=80 --cov-rep test-all-debug = "pytest tests/ --cov=src/intersect_sdk/ --cov-fail-under=80 --cov-report=html:reports/htmlcov/ --cov-report=xml:reports/coverage_report.xml --junitxml=reports/junit.xml -s" test-unit = "pytest tests/unit --cov=src/intersect_sdk/" test-e2e = "pytest tests/e2e --cov=src/intersect_sdk/" -lint = { composite = ["lint-format", "lint-ruff", "lint-mypy", "lint-spelling"] } +lint = { composite = ["lint-format", "lint-ruff", "lint-mypy", "lint-spelling", "lint-docs"] } +lint-docs = "sphinx-build -W --keep-going docs docs/_build" lint-format = "ruff format" lint-ruff = "ruff check --fix" lint-mypy = "mypy src/intersect_sdk/" diff --git a/src/intersect_sdk/__init__.py b/src/intersect_sdk/__init__.py index 74c0023..9eb584d 100644 --- a/src/intersect_sdk/__init__.py +++ b/src/intersect_sdk/__init__.py @@ -43,31 +43,31 @@ from .version import __version__, version_info, version_string __all__ = [ - 'IntersectDataHandler', - 'IntersectEventDefinition', - 'IntersectMimeType', - 'intersect_event', - 'intersect_message', - 'intersect_status', - 'get_schema_from_capability_implementations', - 'IntersectService', - 'IntersectClient', - 'IntersectClientCallback', - 'IntersectDirectMessageParams', - 'INTERSECT_CLIENT_RESPONSE_CALLBACK_TYPE', 'INTERSECT_CLIENT_EVENT_CALLBACK_TYPE', + 'INTERSECT_CLIENT_RESPONSE_CALLBACK_TYPE', 'INTERSECT_JSON_VALUE', 'INTERSECT_SERVICE_RESPONSE_CALLBACK_TYPE', - 'IntersectBaseCapabilityImplementation', - 'default_intersect_lifecycle_loop', - 'IntersectClientConfig', - 'IntersectServiceConfig', - 'HierarchyConfig', 'ControlPlaneConfig', 'ControlProvider', 'DataStoreConfig', 'DataStoreConfigMap', + 'HierarchyConfig', + 'IntersectBaseCapabilityImplementation', + 'IntersectClient', + 'IntersectClientCallback', + 'IntersectClientConfig', + 'IntersectDataHandler', + 'IntersectDirectMessageParams', + 'IntersectEventDefinition', + 'IntersectMimeType', + 'IntersectService', + 'IntersectServiceConfig', '__version__', + 'default_intersect_lifecycle_loop', + 'get_schema_from_capability_implementations', + 'intersect_event', + 'intersect_message', + 'intersect_status', 'version_info', 'version_string', ] diff --git a/src/intersect_sdk/_internal/interfaces.py b/src/intersect_sdk/_internal/interfaces.py index 1634e7d..31bccae 100644 --- a/src/intersect_sdk/_internal/interfaces.py +++ b/src/intersect_sdk/_internal/interfaces.py @@ -1,11 +1,12 @@ from __future__ import annotations -from abc import ABC, abstractmethod -from typing import TYPE_CHECKING, Any +from typing import TYPE_CHECKING, Any, Protocol if TYPE_CHECKING: from uuid import UUID + from ..client_callback_definitions import INTERSECT_CLIENT_EVENT_CALLBACK_TYPE + from ..config.shared import HierarchyConfig from ..service_callback_definitions import ( INTERSECT_SERVICE_RESPONSE_CALLBACK_TYPE, ) @@ -14,13 +15,12 @@ ) -class IntersectEventObserver(ABC): +class IntersectEventObserver(Protocol): """Abstract definition of an entity which observes an INTERSECT event (i.e. IntersectService). Used as the common interface for event emitters (i.e. CapabilityImplementations). """ - @abstractmethod def _on_observe_event(self, event_name: str, event_value: Any, operation: str) -> None: """How to react to an event being fired. @@ -31,7 +31,6 @@ def _on_observe_event(self, event_name: str, event_value: Any, operation: str) - """ ... - @abstractmethod def create_external_request( self, request: IntersectDirectMessageParams, @@ -49,3 +48,18 @@ def create_external_request( - generated RequestID associated with your request """ ... + + def register_event( + self, + service: HierarchyConfig, + event_name: str, + response_handler: INTERSECT_CLIENT_EVENT_CALLBACK_TYPE, + ) -> None: + """Observed entity (capability) tells observer (i.e. service) to subscribe to a specific event. + + Params: + - service: HierarchyConfig of the service we want to talk to + - event_name: name of event to subscribe to + - response_handler: callback for how to handle the reception of an event + """ + ... diff --git a/src/intersect_sdk/_internal/messages/userspace.py b/src/intersect_sdk/_internal/messages/userspace.py index 22bc75e..f2a0b9d 100644 --- a/src/intersect_sdk/_internal/messages/userspace.py +++ b/src/intersect_sdk/_internal/messages/userspace.py @@ -25,13 +25,13 @@ from pydantic import AwareDatetime, Field, TypeAdapter from typing_extensions import Annotated, TypedDict -from ...constants import SYSTEM_OF_SYSTEM_REGEX # noqa: TCH001 (this is runtime checked) -from ...core_definitions import ( # noqa: TCH001 (this is runtime checked) +from ...constants import SYSTEM_OF_SYSTEM_REGEX +from ...core_definitions import ( IntersectDataHandler, IntersectMimeType, ) from ...version import version_string -from ..data_plane.minio_utils import MinioPayload # noqa: TCH001 (this is runtime checked) +from ..data_plane.minio_utils import MinioPayload # noqa: TC001 (this is runtime checked) class UserspaceMessageHeader(TypedDict): diff --git a/src/intersect_sdk/app_lifecycle.py b/src/intersect_sdk/app_lifecycle.py index 976d398..13977b8 100644 --- a/src/intersect_sdk/app_lifecycle.py +++ b/src/intersect_sdk/app_lifecycle.py @@ -82,7 +82,7 @@ def _on_signal_caught(self, signal: int, _: Any) -> None: ---------- signal (int): signal code. """ - logger.warning('shutting down and handling signal %d' % signal) + logger.warning('shutting down and handling signal %s', signal) if self._cleanup_callback: self._cleanup_callback(signal) self._exit.set() diff --git a/src/intersect_sdk/capability/base.py b/src/intersect_sdk/capability/base.py index f908392..bf25a57 100644 --- a/src/intersect_sdk/capability/base.py +++ b/src/intersect_sdk/capability/base.py @@ -14,6 +14,8 @@ from uuid import UUID from .._internal.interfaces import IntersectEventObserver + from ..client_callback_definitions import INTERSECT_CLIENT_EVENT_CALLBACK_TYPE + from ..config.shared import HierarchyConfig from ..service_callback_definitions import ( INTERSECT_SERVICE_RESPONSE_CALLBACK_TYPE, ) @@ -137,6 +139,8 @@ def intersect_sdk_call_service( ) -> list[UUID]: """Create an external request that we'll send to a different Service. + Note: You should generally NOT call this function until after you have initialized the IntersectService class. + Params: - request: the request we want to send out, encapsulated as an IntersectDirectMessageParams object - response_handler: optional callback for how we want to handle the response from this request. @@ -153,3 +157,27 @@ def intersect_sdk_call_service( observer.create_external_request(request, response_handler, timeout) for observer in self.__intersect_sdk_observers__ ] + + @final + def intersect_sdk_listen_for_service_event( + self, + service: HierarchyConfig, + event_name: str, + response_handler: INTERSECT_CLIENT_EVENT_CALLBACK_TYPE, + ) -> None: + """Start listening to events from a specific Service. + + Note: You should generally NOT call this function until after you have initialized the IntersectService class. + + Params: + - service: The system-of-system hierarchy which points to the specific service + - event_name: The name of the event we want to listen for + - response_handler: callback for how to handle the reception of an event + The callback submits these parameters: + 1) message source + 2) name of operation + 3) name of event + 4) payload + """ + for observer in self.__intersect_sdk_observers__: + observer.register_event(service, event_name, response_handler) diff --git a/src/intersect_sdk/client.py b/src/intersect_sdk/client.py index bae67c1..7a2389c 100644 --- a/src/intersect_sdk/client.py +++ b/src/intersect_sdk/client.py @@ -129,7 +129,7 @@ def __init__( if user_callback: # Do not persist, as we use the temporary client information to build this. self._control_plane_manager.add_subscription_channel( - f"{self._hierarchy.hierarchy_string('/')}/response", + f'{self._hierarchy.hierarchy_string("/")}/response', {self._handle_userspace_message_raw}, persist=False, ) @@ -140,7 +140,7 @@ def __init__( service ) in config.initial_message_event_config.services_to_start_listening_for_events: self._control_plane_manager.add_subscription_channel( - f"{service.replace('.', '/')}/events", + f'{service.replace(".", "/")}/events', {self._handle_event_message_raw}, persist=False, ) @@ -390,13 +390,13 @@ def _handle_client_callback(self, user_value: IntersectClientCallback | None) -> if self._event_callback: for add_event in validated_result.services_to_start_listening_for_events: self._control_plane_manager.add_subscription_channel( - f"{add_event.replace('.', '/')}/events", + f'{add_event.replace(".", "/")}/events', {self._handle_event_message_raw}, persist=False, ) for remove_event in validated_result.services_to_stop_listening_for_events: self._control_plane_manager.remove_subscription_channel( - f"{remove_event.replace('.', '/')}/events" + f'{remove_event.replace(".", "/")}/events' ) # sending userspace messages without the callback is okay, we just won't get the response @@ -431,7 +431,7 @@ def _send_userspace_message(self, params: IntersectDirectMessageParams) -> None: payload=out_payload, ) logger.debug(f'Send userspace message:\n{msg}') - channel = f"{params.destination.replace('.', '/')}/request" + channel = f'{params.destination.replace(".", "/")}/request' # WARNING: If both the Service and the Client drop, the Service will execute the command # but cannot communicate the response to the Client. # in experiment controllers or production, you'll want to set persist to True diff --git a/src/intersect_sdk/service.py b/src/intersect_sdk/service.py index 7dc38e1..37dfef3 100644 --- a/src/intersect_sdk/service.py +++ b/src/intersect_sdk/service.py @@ -17,6 +17,7 @@ from __future__ import annotations import time +from collections import defaultdict from threading import Lock from types import MappingProxyType from typing import TYPE_CHECKING, Any, Callable, Literal, Union @@ -40,7 +41,10 @@ from ._internal.exceptions import IntersectApplicationError, IntersectError from ._internal.interfaces import IntersectEventObserver from ._internal.logger import logger -from ._internal.messages.event import create_event_message +from ._internal.messages.event import ( + create_event_message, + deserialize_and_validate_event_message, +) from ._internal.messages.lifecycle import LifecycleType, create_lifecycle_message from ._internal.messages.userspace import ( UserspaceMessage, @@ -52,19 +56,24 @@ from ._internal.utils import die from ._internal.version_resolver import resolve_user_version from .capability.base import IntersectBaseCapabilityImplementation +from .client_callback_definitions import ( + INTERSECT_CLIENT_EVENT_CALLBACK_TYPE, # noqa: TC001 (runtime-checked-annotation) +) from .config.service import IntersectServiceConfig +from .config.shared import HierarchyConfig from .core_definitions import IntersectDataHandler, IntersectMimeType from .service_callback_definitions import ( - INTERSECT_SERVICE_RESPONSE_CALLBACK_TYPE, # noqa: TCH001 (runtime-checked annotation) + INTERSECT_SERVICE_RESPONSE_CALLBACK_TYPE, # noqa: TC001 (runtime-checked annotation) ) from .shared_callback_definitions import ( - INTERSECT_JSON_VALUE, # noqa: TCH001 (runtime-checked annotation) - IntersectDirectMessageParams, # noqa: TCH001 (runtime-checked annotation) + INTERSECT_JSON_VALUE, # noqa: TC001 (runtime-checked annotation) + IntersectDirectMessageParams, # noqa: TC001 (runtime-checked annotation) ) from .version import version_string if TYPE_CHECKING: from ._internal.function_metadata import FunctionMetadata + from .config.shared import HierarchyConfig @final @@ -241,7 +250,9 @@ def __init__( self._status_retrieval_fn: Callable[[], bytes] = ( ( lambda: status_type_adapter.dump_json( - getattr(status_fn_capability, status_fn_name)(), by_alias=True, warnings='error' + getattr(status_fn_capability, status_fn_name)(), + by_alias=True, + warnings='error', ) ) if status_type_adapter and status_fn_name @@ -255,25 +266,49 @@ def __init__( self._external_requests: dict[str, IntersectService._ExternalRequest] = {} self._external_request_ctr = 0 + self._svc2svc_events: defaultdict[ + str, defaultdict[str, set[INTERSECT_CLIENT_EVENT_CALLBACK_TYPE]] + ] = defaultdict(lambda: defaultdict(set)) + """ + tree of other service events we're subscribed to, and what we need to call when we get an event + + i.e. + + { + "org.fac.sys1.subsys.service": { + "event_name_1": [ + # user_function_1, + # user_function_2 + ] + } + } + """ + self._startup_messages: list[ - tuple[IntersectDirectMessageParams, INTERSECT_SERVICE_RESPONSE_CALLBACK_TYPE | None] + tuple[ + IntersectDirectMessageParams, + INTERSECT_SERVICE_RESPONSE_CALLBACK_TYPE | None, + ] ] = [] self._resend_startup_messages = True self._sent_startup_messages = False self._shutdown_messages: list[ - tuple[IntersectDirectMessageParams, INTERSECT_SERVICE_RESPONSE_CALLBACK_TYPE | None] + tuple[ + IntersectDirectMessageParams, + INTERSECT_SERVICE_RESPONSE_CALLBACK_TYPE | None, + ] ] = [] self._data_plane_manager = DataPlaneManager(self._hierarchy, config.data_stores) # we PUBLISH messages on this channel - self._lifecycle_channel_name = f"{config.hierarchy.hierarchy_string('/')}/lifecycle" + self._lifecycle_channel_name = f'{config.hierarchy.hierarchy_string("/")}/lifecycle' # we PUBLISH event messages on this channel - self._events_channel_name = f"{config.hierarchy.hierarchy_string('/')}/events" + self._events_channel_name = f'{config.hierarchy.hierarchy_string("/")}/events' # we SUBSCRIBE to messages on this channel to receive requests - self._service_channel_name = f"{config.hierarchy.hierarchy_string('/')}/request" + self._service_channel_name = f'{config.hierarchy.hierarchy_string("/")}/request' # we SUBSCRIBE to messages on this channel to receive responses - self._client_channel_name = f"{config.hierarchy.hierarchy_string('/')}/response" + self._client_channel_name = f'{config.hierarchy.hierarchy_string("/")}/response' self._control_plane_manager = ControlPlaneManager( control_configs=config.brokers, @@ -323,7 +358,8 @@ def startup(self) -> Self: # Start the status thread if it doesn't already exist if self._status_thread is None: self._status_thread = StoppableThread( - target=self._status_ticker, name=f'IntersectService_{self._uuid}_status_thread' + target=self._status_ticker, + name=f'IntersectService_{self._uuid}_status_thread', ) self._status_thread.start() @@ -483,7 +519,10 @@ def get_blocked_keys(self) -> set[str]: def add_startup_messages( self, messages: list[ - tuple[IntersectDirectMessageParams, INTERSECT_SERVICE_RESPONSE_CALLBACK_TYPE | None] + tuple[ + IntersectDirectMessageParams, + INTERSECT_SERVICE_RESPONSE_CALLBACK_TYPE | None, + ] ], ) -> None: """Add request messages to send out to various microservices when this service starts. @@ -497,7 +536,10 @@ def add_startup_messages( def add_shutdown_messages( self, messages: list[ - tuple[IntersectDirectMessageParams, INTERSECT_SERVICE_RESPONSE_CALLBACK_TYPE | None] + tuple[ + IntersectDirectMessageParams, + INTERSECT_SERVICE_RESPONSE_CALLBACK_TYPE | None, + ] ], ) -> None: """Add request messages to send out to various microservices on shutdown. @@ -508,6 +550,67 @@ def add_shutdown_messages( """ self._shutdown_messages.extend(messages) + @validate_call(config=ConfigDict(revalidate_instances='always')) + def register_event( + self, + service: HierarchyConfig, + event_name: str, + response_handler: INTERSECT_CLIENT_EVENT_CALLBACK_TYPE, + ) -> None: + """Begin subscribing to events from a different Service. + + Params: + - service: HierarchyConfig of the service we want to talk to + - event_name: name of event to subscribe to + - response_handler: callback for how to handle the reception of an event + """ + hierarchy = service.hierarchy_string('.') + self._svc2svc_events[hierarchy][event_name].add(response_handler) + + self._control_plane_manager.add_subscription_channel( + f'{service.hierarchy_string("/")}/events', + {self._svc2svc_event_callback}, + persist=True, + ) + + def _svc2svc_event_callback(self, raw: bytes) -> None: + """Callback received when this service gets an event from another service. + + Deserializes and validates an EventMessage, will call a userspace function accordingly. + """ + try: + message = deserialize_and_validate_event_message(raw) + except ValidationError as e: + logger.warning( + "Invalid message received from another service's events channel, ignoring. Full message:\n{}", + e, + ) + return + logger.debug('Received event message:\n{}', message) + try: + payload = GENERIC_MESSAGE_SERIALIZER.validate_json( + self._data_plane_manager.incoming_message_data_handler(message) + ) + except ValidationError as e: + logger.warning( + 'Invalid payload message received as an event, ignoring. Full message: {}', + e, + ) + return + source = message['headers']['source'] + event_name = message['headers']['event_name'] + for user_callback in self._svc2svc_events[source][event_name]: + try: + user_callback(source, message['operationId'], event_name, payload) + except Exception as e: # noqa: BLE001 (need to catch any possible user exception) + logger.warning( + '!!! INTERSECT: event callback function "%s" produced uncaught exception when handling event "%s" from "%s"', + user_callback.__name__, + event_name, + source, + ) + logger.warning(e) + @validate_call(config=ConfigDict(revalidate_instances='always')) def create_external_request( self, @@ -625,7 +728,7 @@ def _handle_service_message_raw(self, raw: bytes) -> None: 'error' if response_msg['headers']['has_error'] else 'userspace', response_msg, ) - response_channel = f"{message['headers']['source'].replace('.', '/')}/response" + response_channel = f'{message["headers"]["source"].replace(".", "/")}/response' # Persistent userspace messages may be useful for orchestration. # Persistence will not hurt anything. self._control_plane_manager.publish_message( @@ -796,7 +899,7 @@ def _send_client_message(self, request_id: UUID, params: IntersectDirectMessageP message_id=request_id, ) logger.debug(f'Sending client message:\n{msg}') - request_channel = f"{params.destination.replace('.', '/')}/request" + request_channel = f'{params.destination.replace(".", "/")}/request' self._control_plane_manager.publish_message(request_channel, msg, persist=True) return True @@ -923,8 +1026,7 @@ def _on_observe_event(self, event_name: str, event_value: Any, operation: str) - event_name=event_name, payload=response_payload, ) - # Event messages are meant to be short-lived and should not persist. - self._control_plane_manager.publish_message(self._events_channel_name, msg, persist=False) + self._control_plane_manager.publish_message(self._events_channel_name, msg, persist=True) def _make_error_message( self, error_string: str, original_message: UserspaceMessage diff --git a/tests/e2e/test_examples.py b/tests/e2e/test_examples.py index bab8a99..33dd880 100644 --- a/tests/e2e/test_examples.py +++ b/tests/e2e/test_examples.py @@ -143,3 +143,10 @@ def test_example_4_service_to_service(): 'Received Response from Service 2: Acknowledging service one text -> Kicking off the example!\n' 'Received Second Response from Service 2: Acknowledging service one text -> Final Verification\n' ) + + +def test_example_4_service_to_service_events(): + assert run_example_test('4_service_to_service_events') == ( + 'From event "internal_service_event", received message "not_feeling_creative" from "example-organization.example-facility.example-system.example-subsystem.internal-service"\n' + 'From event "internal_service_event", received message "not_feeling_creative" from "example-organization.example-facility.example-system.example-subsystem.internal-service"\n' + ) diff --git a/tests/fixtures/example_schema.py b/tests/fixtures/example_schema.py index 6f16654..5dfa01d 100644 --- a/tests/fixtures/example_schema.py +++ b/tests/fixtures/example_schema.py @@ -28,16 +28,6 @@ from uuid import UUID from annotated_types import Ge, Le -from intersect_sdk import ( - HierarchyConfig, - IntersectBaseCapabilityImplementation, - IntersectDataHandler, - IntersectEventDefinition, - IntersectMimeType, - intersect_event, - intersect_message, - intersect_status, -) from pydantic import ( BaseModel, Field, @@ -49,6 +39,17 @@ from pydantic_core import PydanticCustomError, Url from typing_extensions import Annotated, TypeAliasType, TypedDict +from intersect_sdk import ( + HierarchyConfig, + IntersectBaseCapabilityImplementation, + IntersectDataHandler, + IntersectEventDefinition, + IntersectMimeType, + intersect_event, + intersect_message, + intersect_status, +) + FAKE_HIERARCHY_CONFIG = HierarchyConfig( organization='test', facility='test', @@ -450,7 +451,7 @@ def test_decimal(self, input_value: Decimal) -> Decimal: self.update_status('test_decimal') decimal.getcontext().prec = 20 decimal.getcontext().rounding = decimal.ROUND_HALF_UP - return input_value / Decimal(3.14159265358979323846) + return input_value / Decimal('3.14159265358979323846') @intersect_message( response_content_type=IntersectMimeType.STRING, diff --git a/tests/integration/test_return_type_mismatch.py b/tests/integration/test_return_type_mismatch.py index b37d5cb..2ef15e5 100644 --- a/tests/integration/test_return_type_mismatch.py +++ b/tests/integration/test_return_type_mismatch.py @@ -28,7 +28,6 @@ create_userspace_message, deserialize_and_validate_userspace_message, ) - from tests.fixtures.example_schema import FAKE_HIERARCHY_CONFIG # FIXTURE ############################# diff --git a/tests/integration/test_service.py b/tests/integration/test_service.py index e8a8ecd..190ed70 100644 --- a/tests/integration/test_service.py +++ b/tests/integration/test_service.py @@ -31,7 +31,6 @@ create_userspace_message, deserialize_and_validate_userspace_message, ) - from tests.fixtures.example_schema import FAKE_HIERARCHY_CONFIG, DummyCapabilityImplementation # HELPERS ############################# diff --git a/tests/unit/test_annotations.py b/tests/unit/test_annotations.py index 6685203..3874ce4 100644 --- a/tests/unit/test_annotations.py +++ b/tests/unit/test_annotations.py @@ -1,4 +1,6 @@ import pytest +from pydantic import ValidationError + from intersect_sdk import ( IntersectBaseCapabilityImplementation, IntersectEventDefinition, @@ -6,7 +8,6 @@ intersect_message, intersect_status, ) -from pydantic import ValidationError # this should immediately fail when trying to define the class itself diff --git a/tests/unit/test_base_capability_implementation.py b/tests/unit/test_base_capability_implementation.py index ac63d1c..ebac778 100644 --- a/tests/unit/test_base_capability_implementation.py +++ b/tests/unit/test_base_capability_implementation.py @@ -4,6 +4,7 @@ from uuid import UUID, uuid4 import pytest + from intersect_sdk import ( INTERSECT_SERVICE_RESPONSE_CALLBACK_TYPE, IntersectBaseCapabilityImplementation, diff --git a/tests/unit/test_config.py b/tests/unit/test_config.py index d4f6028..0990473 100644 --- a/tests/unit/test_config.py +++ b/tests/unit/test_config.py @@ -1,4 +1,6 @@ import pytest +from pydantic import TypeAdapter, ValidationError + from intersect_sdk import ( ControlPlaneConfig, DataStoreConfig, @@ -8,7 +10,6 @@ IntersectClientConfig, IntersectServiceConfig, ) -from pydantic import TypeAdapter, ValidationError # TESTS ##################### diff --git a/tests/unit/test_invalid_schema_runtime.py b/tests/unit/test_invalid_schema_runtime.py index 6586b07..4d80db0 100644 --- a/tests/unit/test_invalid_schema_runtime.py +++ b/tests/unit/test_invalid_schema_runtime.py @@ -6,6 +6,7 @@ """ import pytest + from intersect_sdk import ( ControlPlaneConfig, IntersectBaseCapabilityImplementation, diff --git a/tests/unit/test_lifecycle_message.py b/tests/unit/test_lifecycle_message.py index fd2b2e6..fa7f38f 100644 --- a/tests/unit/test_lifecycle_message.py +++ b/tests/unit/test_lifecycle_message.py @@ -6,13 +6,14 @@ import uuid import pytest +from pydantic import ValidationError + from intersect_sdk import version_string from intersect_sdk._internal.messages.lifecycle import ( LifecycleType, create_lifecycle_message, deserialize_and_validate_lifecycle_message, ) -from pydantic import ValidationError def test_valid_lifecycle_message_deserializes(): diff --git a/tests/unit/test_schema_invalids.py b/tests/unit/test_schema_invalids.py index 7d28c33..02755f0 100644 --- a/tests/unit/test_schema_invalids.py +++ b/tests/unit/test_schema_invalids.py @@ -21,6 +21,9 @@ import pytest from annotated_types import Gt +from pydantic import BaseModel, Field +from typing_extensions import Annotated, TypeAliasType, TypedDict + from intersect_sdk import ( HierarchyConfig, IntersectBaseCapabilityImplementation, @@ -32,8 +35,6 @@ intersect_message, intersect_status, ) -from pydantic import BaseModel, Field -from typing_extensions import Annotated, TypeAliasType, TypedDict # HELPERS ######################### diff --git a/tests/unit/test_schema_valid.py b/tests/unit/test_schema_valid.py index 1d2b12c..1ba7568 100644 --- a/tests/unit/test_schema_valid.py +++ b/tests/unit/test_schema_valid.py @@ -10,7 +10,6 @@ ) from intersect_sdk._internal.schema import get_schema_and_functions_from_capability_implementations from intersect_sdk.schema import get_schema_from_capability_implementations - from tests.fixtures.example_schema import ( FAKE_HIERARCHY_CONFIG, DummyCapabilityImplementation, diff --git a/tests/unit/test_userspace_message.py b/tests/unit/test_userspace_message.py index 6302bb7..ffa0a28 100644 --- a/tests/unit/test_userspace_message.py +++ b/tests/unit/test_userspace_message.py @@ -6,12 +6,13 @@ import uuid import pytest +from pydantic import ValidationError + from intersect_sdk import IntersectDataHandler, IntersectMimeType, version_string from intersect_sdk._internal.messages.userspace import ( create_userspace_message, deserialize_and_validate_userspace_message, ) -from pydantic import ValidationError def test_valid_userspace_message_deserializes(): diff --git a/tests/unit/test_version_resolver.py b/tests/unit/test_version_resolver.py index 1f90b9e..1a3943d 100644 --- a/tests/unit/test_version_resolver.py +++ b/tests/unit/test_version_resolver.py @@ -14,6 +14,7 @@ from uuid import uuid4 import pytest + from intersect_sdk import ( IntersectDataHandler, IntersectMimeType,