-
Notifications
You must be signed in to change notification settings - Fork 3
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
RPC capability #151
Comments
Just for you, maybe for an internal discussion, our current implementation of such an RPC structure: import uuid
import asyncio
import logging
from typing import Dict, Any, Coroutine, Set
from enum import Enum
from dataclasses import dataclass
from mango import Role, RoleContext, AgentAddress, sender_addr, json_serializable
logging.basicConfig(level=logging.INFO)
LOGGER = logging.getLogger(__name__)
@json_serializable
@dataclass
class RequestMessage:
request: Any
@json_serializable
@dataclass
class ResponseMessage:
content: Any
class Response:
corr_id: str
future: asyncio.Future
request: Any
content: Any
def __init__(self, request: Any = None, future: asyncio.Future = None, corr_id: str = None):
self.request = request
self.future = future if future is not None else asyncio.Future()
self.corr_id = corr_id if corr_id is not None else str(uuid.uuid4())
class RPCContainer:
class SCHEDULER(Enum):
ASYNCIO = 1
MANGO = 2
def __init__(self, context: RoleContext, timeout=None):
self.context = context
self._requests_container: Dict[str, Response] = {}
self.timeout = timeout
self._registered_roles: Set[int] = {}
def handle_response(self, response_message: ResponseMessage, meta):
corr_id = meta["corr_id"]
response = self._requests_container.pop(corr_id)
response.content = response_message.content
response.future.set_result(response.content)
async def send_rpc(self, src: Role, message: Any, receiver_addr: AgentAddress, timeout: float=None, scheduler: SCHEDULER=SCHEDULER.ASYNCIO):
LOGGER.debug(f"Send rpc to {receiver_addr}")
timeout = self.timeout if timeout is None else timeout
if id(src) not in self._registered_roles:
src.context.subscribe_message(src, self.handle_response, lambda content, meta: isinstance(content, ResponseMessage) and meta.get("corr_id", "") in self._requests_container)
response = Response(message)
corr_id = response.corr_id
self._requests_container[corr_id] = response
if timeout is not None and timeout > 0:
def at_timeout():
if not response.future.done() and not response.future.cancelled():
LOGGER.info(f"Timeout of rpc of correlation id {corr_id}")
response.future.set_exception(TimeoutError(f"Request not successfull after {timeout} seconds for correlation id {corr_id}."))
match scheduler:
case RPCContainer.SCHEDULER.MANGO:
async def at_timeout_async():
at_timeout()
self.context.schedule_timestamp_task(at_timeout_async(), self.context.current_timestamp+timeout)
case RPCContainer.SCHEDULER.ASYNCIO: # ToDo: Just use asyncio.wait_for(coro, timeout)
asyncio.get_event_loop().call_later(timeout, at_timeout)
case _:
raise AssertionError("Scheduler not found") # ToDo: Allow additional schedulers
await self.context.send_message(content=RequestMessage(message), receiver_addr=receiver_addr, corr_id=corr_id)
LOGGER.debug(f"Await rpc for correlation id {corr_id}")
return await response.future
async def handle_request_async(self, coro, content, meta):
response = await coro(content, meta)
if response is not None:
if not isinstance(response, ResponseMessage):
response = ResponseMessage(response)
self.context.schedule_instant_message(response, sender_addr(meta), corr_id=meta["corr_id"])
def handle_request(self, coro, content, meta):
LOGGER.info(f"Got request for {meta["corr_id"]}: Will schedule instant task")
self.context.schedule_instant_task(self.handle_request_async(coro, content.request, meta))
def register_response(self, coro: Coroutine, role: Role, request_class):
role.context.subscribe_message(role,
lambda content, meta: self.handle_request(coro, content, meta), # Wrapper for receiving the data
lambda content, meta: isinstance(content, RequestMessage) and isinstance(content.request, request_class) # Filter for request response pattern
)
# ToDo: Think about making a more general subscription and then handle request in here instead, to also handle invalid requests And usage: rpc = RPCContainer(self.context) # rpc container should be part of the agent and not initialized every time
msg = MyCustomRequest(data)
try:
response = await rpc.send_rpc(src=self, message=msg, receiver_addr=self.history_agent[key])
LOGGER.info(f"Answer: {response}")
except TimeoutError as e:
LOGGER.info(f"Got a timeout error {e}") If MQTT is used, version 5 now also supports extra options that can be used for RPCs. |
As a quick note for myself, this is very similar to the tracked messages we implemented in Mango.jl, so maybe we should just transfer this feature to mango-python. |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Communication between two agents often times contain request-reply structures, as also shown in the tutorial.
It would be nice to have an inbuilt way RPCs, which could also facilitate
await
. This way, the control flow is easier to track.The text was updated successfully, but these errors were encountered: