Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

service to service events #21

Merged
merged 5 commits into from
Feb 10, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .pre-commit-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ repos:
- id: end-of-file-fixer
- id: trailing-whitespace
- repo: https://github.com/astral-sh/ruff-pre-commit
rev: v0.5.7
rev: v0.9.4
hooks:
- id: ruff
args: [ --fix ]
Expand Down
5 changes: 3 additions & 2 deletions examples/2_counting/counting_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,9 @@
from dataclasses import dataclass
from typing import Optional

from pydantic import BaseModel, Field
from typing_extensions import Annotated

from intersect_sdk import (
HierarchyConfig,
IntersectBaseCapabilityImplementation,
Expand All @@ -13,8 +16,6 @@
intersect_message,
intersect_status,
)
from pydantic import BaseModel, Field
from typing_extensions import Annotated

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
Expand Down
Empty file.
78 changes: 78 additions & 0 deletions examples/4_service_to_service_events/example_client.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
"""Client for service to service example.

Listens for events from the exposed_service, and prints each one out.
Once it gets two events, it terminates itself.
"""

import logging

from intersect_sdk import (
INTERSECT_JSON_VALUE,
IntersectClient,
IntersectClientCallback,
IntersectClientConfig,
default_intersect_lifecycle_loop,
)

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)


class SampleOrchestrator:
"""Simply contains an event callback for events from the exposed service.

In this example, we just want to receive two events from the exposed service before killing the client.
"""

def __init__(self) -> None:
"""Straightforward constructor, just initializes global variable which counts events."""
self.got_first_event = False

def event_callback(
self, _source: str, _operation: str, _event_name: str, payload: INTERSECT_JSON_VALUE
) -> None:
"""This simply prints the event from the exposed service to your console.

Params:
source: the source of the response message.
operation: the name of the function we called in the original message.
_has_error: Boolean value which represents an error.
payload: Value of the response from the Service.
"""
print(payload)
if self.got_first_event:
# break out of pubsub loop
raise Exception
self.got_first_event = True
# empty return, don't send any additional messages or modify the events listened to


if __name__ == '__main__':
from_config_file = {
'brokers': [
{
'username': 'intersect_username',
'password': 'intersect_password',
'port': 5672,
'protocol': 'amqp0.9.1',
},
],
}

# Listen for an event on the exposed service
config = IntersectClientConfig(
initial_message_event_config=IntersectClientCallback(
services_to_start_listening_for_events=[
'example-organization.example-facility.example-system.example-subsystem.exposed-service'
],
),
**from_config_file,
)
orchestrator = SampleOrchestrator()
client = IntersectClient(
config=config,
event_callback=orchestrator.event_callback,
)
default_intersect_lifecycle_loop(
client,
)
87 changes: 87 additions & 0 deletions examples/4_service_to_service_events/exposed_service.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
"""Exposed service.

This service listens for events from the internal service, and then emits its own
'exposed_service_event' event. The client listens for the 'exposed_service_event'.
"""

import logging

from intersect_sdk import (
HierarchyConfig,
IntersectBaseCapabilityImplementation,
IntersectEventDefinition,
IntersectService,
IntersectServiceConfig,
default_intersect_lifecycle_loop,
intersect_event,
)

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)


class ExposedServiceCapabilityImplementation(IntersectBaseCapabilityImplementation):
"""Exposed service capability."""

intersect_sdk_capability_name = 'ExposedService'

def on_service_startup(self) -> None:
"""This function will get called when starting up the Service.

Note that while we could call this function earlier, we should not call this function in the constructor. The order of things which need to happen:

1) Capability constructor is called
2) Service constructor is called (which will include this capability)
3) We can now start listening for events.

Note that you do not have to explicitly start the Service, you only need to follow steps one and two.
"""
self.intersect_sdk_listen_for_service_event(
HierarchyConfig(
organization='example-organization',
facility='example-facility',
system='example-system',
subsystem='example-subsystem',
service='internal-service',
),
'internal_service_event',
self.on_internal_service_event,
)
marshallmcdonnell marked this conversation as resolved.
Show resolved Hide resolved

@intersect_event(events={'exposed_service_event': IntersectEventDefinition(event_type=str)})
def on_internal_service_event(
self, source: str, _operation: str, event_name: str, payload: str
) -> None:
"""When we get an event back from the internal_service, we will emit our own event."""
self.intersect_sdk_emit_event(
'exposed_service_event',
f'From event "{event_name}", received message "{payload}" from "{source}"',
)


if __name__ == '__main__':
from_config_file = {
'brokers': [
{
'username': 'intersect_username',
'password': 'intersect_password',
'port': 5672,
'protocol': 'amqp0.9.1',
},
],
}
config = IntersectServiceConfig(
hierarchy=HierarchyConfig(
organization='example-organization',
facility='example-facility',
system='example-system',
subsystem='example-subsystem',
service='exposed-service',
),
status_interval=30.0,
**from_config_file,
)
capability = ExposedServiceCapabilityImplementation()
service = IntersectService([capability], config)
logger.info('Starting Service 1, use Ctrl+C to exit.')
default_intersect_lifecycle_loop(service, post_startup_callback=capability.on_service_startup)
73 changes: 73 additions & 0 deletions examples/4_service_to_service_events/internal_service.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
"""Internal service.

This service periodically emits an 'internal_service_event' string, as an event.
The exposed service listens to the events of this service.
"""

import logging
import threading
import time

from intersect_sdk import (
HierarchyConfig,
IntersectBaseCapabilityImplementation,
IntersectEventDefinition,
IntersectService,
IntersectServiceConfig,
default_intersect_lifecycle_loop,
intersect_event,
)

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)


class InternalServiceCapabilityImplementation(IntersectBaseCapabilityImplementation):
"""Internal service capability."""

intersect_sdk_capability_name = 'InternalService'

def after_service_startup(self) -> None:
"""Called after service startup."""
self.thread = threading.Thread(
target=self.internal_service_event_generator, daemon=True, name='event_thread'
)
self.thread.start()

@intersect_event(events={'internal_service_event': IntersectEventDefinition(event_type=str)})
def internal_service_event_generator(self) -> str:
"""Emits a periodic internal_service_event event."""
while True:
time.sleep(2.0)
self.intersect_sdk_emit_event('internal_service_event', 'not_feeling_creative')


if __name__ == '__main__':
from_config_file = {
'brokers': [
{
'username': 'intersect_username',
'password': 'intersect_password',
'port': 5672,
'protocol': 'amqp0.9.1',
},
],
}
config = IntersectServiceConfig(
hierarchy=HierarchyConfig(
organization='example-organization',
facility='example-facility',
system='example-system',
subsystem='example-subsystem',
service='internal-service',
),
status_interval=30.0,
**from_config_file,
)
capability = InternalServiceCapabilityImplementation()
service = IntersectService([capability], config)
logger.info('Starting Service 2, use Ctrl+C to exit.')
default_intersect_lifecycle_loop(
service,
post_startup_callback=capability.after_service_startup,
)
40 changes: 20 additions & 20 deletions pdm.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

7 changes: 3 additions & 4 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -104,8 +104,6 @@ ignore = [
'COM812', # formatter, handled by Ruff format
'ISC001', # formatter, handled by Ruff format
'SIM105', # "with contextlib.suppress():" is slower than try-except-pass
'ANN101', # don't need to annotate "self" typing
'ANN102', # don't need to annotate "cls" typing for class methods
'ANN401', # allow explicit "Any" typing, use with care
'PLR2004', # allow "magic numbers"
]
Expand Down Expand Up @@ -171,7 +169,7 @@ exclude_also = [
[tool.pdm.dev-dependencies]
lint = [
"pre-commit>=3.3.1",
"ruff==0.5.7",
"ruff==0.9.4",
"mypy>=1.10.0",
"types-paho-mqtt>=1.6.0.20240106",
"codespell>=2.3.0",
Expand All @@ -184,7 +182,8 @@ test-all = "pytest tests/ --cov=src/intersect_sdk/ --cov-fail-under=80 --cov-rep
test-all-debug = "pytest tests/ --cov=src/intersect_sdk/ --cov-fail-under=80 --cov-report=html:reports/htmlcov/ --cov-report=xml:reports/coverage_report.xml --junitxml=reports/junit.xml -s"
test-unit = "pytest tests/unit --cov=src/intersect_sdk/"
test-e2e = "pytest tests/e2e --cov=src/intersect_sdk/"
lint = { composite = ["lint-format", "lint-ruff", "lint-mypy", "lint-spelling"] }
lint = { composite = ["lint-format", "lint-ruff", "lint-mypy", "lint-spelling", "lint-docs"] }
lint-docs = "sphinx-build -W --keep-going docs docs/_build"
lint-format = "ruff format"
lint-ruff = "ruff check --fix"
lint-mypy = "mypy src/intersect_sdk/"
Expand Down
Loading
Loading