Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

RPC capability #151

Open
ahilloffis opened this issue Jan 13, 2025 · 2 comments
Open

RPC capability #151

ahilloffis opened this issue Jan 13, 2025 · 2 comments
Assignees

Comments

@ahilloffis
Copy link

Communication between two agents often times contain request-reply structures, as also shown in the tutorial.

It would be nice to have an inbuilt way RPCs, which could also facilitate await. This way, the control flow is easier to track.

@ahilloffis
Copy link
Author

ahilloffis commented Jan 13, 2025

Just for you, maybe for an internal discussion, our current implementation of such an RPC structure:

import uuid
import asyncio
import logging
from typing import Dict, Any, Coroutine, Set
from enum import Enum
from dataclasses import dataclass

from mango import Role, RoleContext, AgentAddress, sender_addr, json_serializable

logging.basicConfig(level=logging.INFO)

LOGGER = logging.getLogger(__name__)

@json_serializable
@dataclass
class RequestMessage:
    request: Any

@json_serializable
@dataclass
class ResponseMessage:
    content: Any

class Response:
    corr_id: str
    future: asyncio.Future
    request: Any
    content: Any

    def __init__(self, request: Any = None, future: asyncio.Future = None, corr_id: str = None):
        self.request = request
        self.future = future if future is not None else asyncio.Future()
        self.corr_id = corr_id if corr_id is not None else str(uuid.uuid4())

class RPCContainer:
    class SCHEDULER(Enum):
        ASYNCIO = 1
        MANGO = 2

    def __init__(self, context: RoleContext, timeout=None):
        self.context = context
        self._requests_container: Dict[str, Response] = {}
        self.timeout = timeout
        self._registered_roles: Set[int] = {}
        
    def handle_response(self, response_message: ResponseMessage, meta):
        corr_id = meta["corr_id"]
        response = self._requests_container.pop(corr_id)
        response.content = response_message.content
        response.future.set_result(response.content)

    async def send_rpc(self, src: Role, message: Any, receiver_addr: AgentAddress, timeout: float=None, scheduler: SCHEDULER=SCHEDULER.ASYNCIO):
        LOGGER.debug(f"Send rpc to {receiver_addr}")
        timeout = self.timeout if timeout is None else timeout
        if id(src) not in self._registered_roles:
            src.context.subscribe_message(src, self.handle_response, lambda content, meta: isinstance(content, ResponseMessage) and meta.get("corr_id", "") in self._requests_container)

        response = Response(message)
        corr_id = response.corr_id
        self._requests_container[corr_id] = response
        if timeout is not None and timeout > 0:
            def at_timeout():
                if not response.future.done() and not response.future.cancelled():
                    LOGGER.info(f"Timeout of rpc of correlation id {corr_id}")
                    response.future.set_exception(TimeoutError(f"Request not successfull after {timeout} seconds for correlation id {corr_id}."))
            
            match scheduler:
                case RPCContainer.SCHEDULER.MANGO:
                    async def at_timeout_async():
                        at_timeout()
                    self.context.schedule_timestamp_task(at_timeout_async(), self.context.current_timestamp+timeout)
                case RPCContainer.SCHEDULER.ASYNCIO: # ToDo: Just use asyncio.wait_for(coro, timeout)
                    asyncio.get_event_loop().call_later(timeout, at_timeout)
                case _:
                    raise AssertionError("Scheduler not found") # ToDo: Allow additional schedulers
            
        await self.context.send_message(content=RequestMessage(message), receiver_addr=receiver_addr, corr_id=corr_id)
        LOGGER.debug(f"Await rpc for correlation id {corr_id}")
        return await response.future
    
    async def handle_request_async(self, coro, content, meta):
        response = await coro(content, meta)
        if response is not None:
            if not isinstance(response, ResponseMessage):
                response = ResponseMessage(response)
            self.context.schedule_instant_message(response, sender_addr(meta), corr_id=meta["corr_id"])

    def handle_request(self, coro, content, meta):
        LOGGER.info(f"Got request for {meta["corr_id"]}: Will schedule instant task")
        self.context.schedule_instant_task(self.handle_request_async(coro, content.request, meta))

    def register_response(self, coro: Coroutine, role: Role, request_class):
        role.context.subscribe_message(role,
                                       lambda content, meta: self.handle_request(coro, content, meta), # Wrapper for receiving the data
                                       lambda content, meta: isinstance(content, RequestMessage) and isinstance(content.request, request_class) # Filter for request response pattern
                                       )
        # ToDo: Think about making a more general subscription and then handle request in here instead, to also handle invalid requests

And usage:

rpc = RPCContainer(self.context) # rpc container should be part of the agent and not initialized every time
msg = MyCustomRequest(data)
try:
    response = await rpc.send_rpc(src=self, message=msg, receiver_addr=self.history_agent[key])
    LOGGER.info(f"Answer: {response}")
except TimeoutError as e:
    LOGGER.info(f"Got a timeout error {e}")

If MQTT is used, version 5 now also supports extra options that can be used for RPCs.

@rcschrg rcschrg self-assigned this Jan 13, 2025
@rcschrg
Copy link
Member

rcschrg commented Jan 13, 2025

As a quick note for myself, this is very similar to the tracked messages we implemented in Mango.jl, so maybe we should just transfer this feature to mango-python.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants