Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

How to write it: RPC client with out of order responses #369

Closed
bluetech opened this issue Dec 6, 2017 · 7 comments
Closed

How to write it: RPC client with out of order responses #369

bluetech opened this issue Dec 6, 2017 · 7 comments

Comments

@bluetech
Copy link
Member

bluetech commented Dec 6, 2017

This is a fun little problem and I was wondering what is the best way to use trio to solve it. I hope this is appropriate here.

Consider a simple RPC protocol:

Example request: {"id": 15353, "params": ...}

Example successful response: {"id": 15353, "result": ...}

Example error response: {"id": 15353, "error": ...}

The client can send multiple requests concurrently. The server can respond out of order.

Suppose we want to write a client for this protocol using trio. We want to send many concurrent requests and handle their responses. We want to have a synchronous-looking API, e.g. try { response = await send(request) } except { ... }.


Here is how I would solve it (I've hidden it in case someone wants to come up with an independent solution 💭):

  • Add a future/promise abstraction using trio.Event internally. Curio has a simple one.

  • The sending function creates a future, stores it in a request id -> future dict, sends the request, and waits on the future.

  • Spawn a received task which receives responses from the server and sets the result/exception on the corresponding future in the request id -> future dict.

  • For each request, spawn a task which would use the sending function.

Is there a more elegant way?

@njsmith
Copy link
Member

njsmith commented Dec 7, 2017

Well, if we're avoiding spoilers :-)

Sure, that would work. There's a sample (untested) future implementation [here](https://gist.github.com/njsmith/aa269020e40cc137936805d5d5bbdd91), that takes advantage of trio's `Result` abstraction to simplify things a bit compared to the curio one.

You could also drop down a layer of abstraction fairly easily, using Trio's wait_task_rescheduled primitive. This is how things like Event are implemented internally:

@attr.s
class Connection:
    # maps request id -> waiting task
    _outstanding_requests = attr.ib(default=attr.Factory(dict))

    # Let's assume someone calls nursery.start_soon on this when the connection is opened
    async def response_task(self):
        async for response in ...:
            waiting_task = self._outstanding_requests.get(response["id"])
            if waiting_task is None:  # must have been cancelled XXX
                continue
            if "value" in response:
                result = trio.hazmat.Value(response["value"])
            else:
                result = trio.hazmat.Error(RequestFailedError(response["error"]))
            trio.hazmat.reschedule(waiting_task, result)

    async def send_request(self, ...):
        id = self._get_next_id()
        await self._send_the_actual_request_on_the_wire(id, ...)
        # Register ourselves to be woken up with the response when it arrives
        this_task = trio.current_task()
        self._outstanding_requests[id] = this_task
        # If we get cancelled, unregister ourselves and carry on (XXX discussed below XXX)
        def abort_fn(_):
            del self._outstanding_requests[id]
            return trio.hazmat.Abort.SUCCEEDED
        # Go to sleep until response_task wakes us, or we're cancelled,
        # returning/raising whatever value/error it gives us
        return await trio.hazmat.wait_task_rescheduled(abort_fn)

I don't have a strong feeling about which way is better. This one uses slightly trickier interfaces, so might be more error prone. But we've dropped a whole layer of abstraction, and the end result isn't really any more complicated -- it might actually be fewer lines of code than the Promise based approach? -- so maybe it's simpler overall. And one thing this lower-level interface does do is remind you to think about cancellation.

Cancellation is an important and interesting issue for this problem, specifically: what do you want to happen if conn.send_request gets cancelled? Does our protocol have some way to propagate a cancellation across the wire? If so, do we want to wait for the cancel to be acknowledged? My solution above assumes (specifically in the lines marked XXX) that in response to cancellation we want to just abandon our outstanding request to continue on without us, and ignore whatever response comes back... but this is definitely not the only plausible semantics :-).

@bluetech
Copy link
Member Author

bluetech commented Dec 8, 2017

Thanks! I'm trying it out, and will report back.

future.py needs this diff:

diff --git a/future.py b/future.py
index ca97701..0e55a96 100644
--- a/future.py
+++ b/future.py
@@ -8,9 +8,9 @@ class TrioFuture:
         self._result = result
         self._finished.set()
 
-    def get(self):
+    async def get(self):
         await self._finished.wait()
-        return self._result.unpack()
+        return self._result.unwrap()
 
     # Sugar
     def set_value(self, value):

I haven't dug into cancellation in trio yet, but I wonder if something like this makes sense in principle, or it's a misuse of the exception?

diff --git a/future.py b/future.py
index 0e55a96..d9bd8e7 100644
--- a/future.py
+++ b/future.py
@@ -12,6 +12,9 @@ class TrioFuture:
         await self._finished.wait()
         return self._result.unwrap()
 
+    def cancel(self):
+        self.set_exception(trio.Cancelled())
+
     # Sugar
     def set_value(self, value):
         self.set_result(trio.hazmat.Value(value))

@njsmith
Copy link
Member

njsmith commented Dec 8, 2017

You definitely should never instantiate a Cancelled object directly. We actually have an issue filed (#342) to make the above an error, so that you don't do it without realizing :-)

The reason is, when you see a Cancelled exception, you can't catch it, because you don't know who created it -- it might be the cancel scope you created, or it might some cancel scope that your caller imposed, because they want you to exit, OK? So the only thing that can catch a Cancelled exception is a cancel scope (like with move_on_after or with open_cancel_scope). To make this work, we need to ensure that only the correct cancel scope actually catches the Cancelled exception, and this is done by marking each Cancelled exception with some invisible metadata that lets the generating cancel scope recognize it.

If you make your own Cancelled exception, then you can't catch it because that's just true of Cancelled exceptions in general, and there's no cancel scope that can catch it because you didn't attach the invisible metadata. So the only possible outcome of raise trio.Cancelled is that your whole program exits with a traceback. Which is probably not what you wanted.

Anyway, Future.cancel is just a bad API in general – it can't work. a Future is a value that can be consumed multiple times; a computation is a one-off thing that either happens or doesn't. Trying to support a cancel method on a Future class leads to the "spooky cancellation at a distance" problem that asyncio has. See: https://vorpus.org/blog/some-thoughts-on-asynchronous-api-design-in-a-post-asyncawait-world/#timeouts-and-cancellation

@bluetech
Copy link
Member Author

bluetech commented Dec 8, 2017

Makes sense! Thinking about cancellation in the future-based solution, with both approaches (just abandoning the request or also proactively sending a cancellation) there is actually no reason to cancel the future itself, I think, only the task running future.get. Because the future's result is only fetched once in one place.

I've written a quick proof of concept (server + client + common protocol code): gist. I took many shortcuts (for one, the client never terminates!) but AFAICS the idea can be made production ready.

Trio is excellent, the documentation as well; I hope it will mature and become stable :)

@njsmith
Copy link
Member

njsmith commented Dec 10, 2017

I've written a quick proof of concept

Cool! BTW, a useful trick for allocating sequential ideas:

In [3]: counter = itertools.count()

In [4]: next(counter)
Out[4]: 0

In [5]: next(counter)
Out[5]: 1

In [6]: next(counter)
Out[6]: 2

Trio is excellent, the documentation as well; I hope it will mature and become stable :)

Thank you!

@bluetech
Copy link
Member Author

Using itertools.count() is a good idea.

I've updated the gist (revision 2). I've handled most real-life concerns I could think of, except invalid messages (e.g. bad JSON) which I am too lazy to fix.

Code-wise, I need to find a way to encapsulate the RpcClient.receiver task; right now the user of the class has to spawn it themselves which feels odd.

I'll close this now - thanks for the discussion!

@njsmith
Copy link
Member

njsmith commented Dec 12, 2017

I need to find a way to encapsulate the RpcClient.receiver task; right now the user of the class has to spawn it themselves which feels odd.

Yeah, this is annoying! My highest priority for 0.3.0 is implementing "simplified nurseries", where it will stop being mandatory for the parent task to park itself in __aexit__ (more details: #136 (comment)). One of the reasons I'm excited about this is that it helps in this kind of case: you can make it so your users write

async with open_rpc_client(...) as client:
    ...

and then open_rpc_client is something like:

@asynccontextmanager
async def open_rpc_client(...):
    client = await _make_connection(...)
    async with trio.open_nursery() as nursery:
        nursery.start_soon(client.receiver)
        try:
            yield client
        finally:
            # shut down the receiver
            nursery.cancel_scope.cancel()

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants