-
-
Notifications
You must be signed in to change notification settings - Fork 348
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
How to write it: RPC client with out of order responses #369
Comments
Well, if we're avoiding spoilers :-)
Sure, that would work. There's a sample (untested) future implementation [here](https://gist.github.com/njsmith/aa269020e40cc137936805d5d5bbdd91), that takes advantage of trio's `Result` abstraction to simplify things a bit compared to the curio one.
You could also drop down a layer of abstraction fairly easily, using Trio's @attr.s
class Connection:
# maps request id -> waiting task
_outstanding_requests = attr.ib(default=attr.Factory(dict))
# Let's assume someone calls nursery.start_soon on this when the connection is opened
async def response_task(self):
async for response in ...:
waiting_task = self._outstanding_requests.get(response["id"])
if waiting_task is None: # must have been cancelled XXX
continue
if "value" in response:
result = trio.hazmat.Value(response["value"])
else:
result = trio.hazmat.Error(RequestFailedError(response["error"]))
trio.hazmat.reschedule(waiting_task, result)
async def send_request(self, ...):
id = self._get_next_id()
await self._send_the_actual_request_on_the_wire(id, ...)
# Register ourselves to be woken up with the response when it arrives
this_task = trio.current_task()
self._outstanding_requests[id] = this_task
# If we get cancelled, unregister ourselves and carry on (XXX discussed below XXX)
def abort_fn(_):
del self._outstanding_requests[id]
return trio.hazmat.Abort.SUCCEEDED
# Go to sleep until response_task wakes us, or we're cancelled,
# returning/raising whatever value/error it gives us
return await trio.hazmat.wait_task_rescheduled(abort_fn) I don't have a strong feeling about which way is better. This one uses slightly trickier interfaces, so might be more error prone. But we've dropped a whole layer of abstraction, and the end result isn't really any more complicated -- it might actually be fewer lines of code than the Promise based approach? -- so maybe it's simpler overall. And one thing this lower-level interface does do is remind you to think about cancellation. Cancellation is an important and interesting issue for this problem, specifically: what do you want to happen if |
Thanks! I'm trying it out, and will report back. future.py needs this diff: diff --git a/future.py b/future.py
index ca97701..0e55a96 100644
--- a/future.py
+++ b/future.py
@@ -8,9 +8,9 @@ class TrioFuture:
self._result = result
self._finished.set()
- def get(self):
+ async def get(self):
await self._finished.wait()
- return self._result.unpack()
+ return self._result.unwrap()
# Sugar
def set_value(self, value): I haven't dug into cancellation in trio yet, but I wonder if something like this makes sense in principle, or it's a misuse of the exception? diff --git a/future.py b/future.py
index 0e55a96..d9bd8e7 100644
--- a/future.py
+++ b/future.py
@@ -12,6 +12,9 @@ class TrioFuture:
await self._finished.wait()
return self._result.unwrap()
+ def cancel(self):
+ self.set_exception(trio.Cancelled())
+
# Sugar
def set_value(self, value):
self.set_result(trio.hazmat.Value(value)) |
You definitely should never instantiate a The reason is, when you see a If you make your own Anyway, |
Makes sense! Thinking about cancellation in the future-based solution, with both approaches (just abandoning the request or also proactively sending a cancellation) there is actually no reason to cancel the future itself, I think, only the task running I've written a quick proof of concept (server + client + common protocol code): gist. I took many shortcuts (for one, the client never terminates!) but AFAICS the idea can be made production ready. Trio is excellent, the documentation as well; I hope it will mature and become stable :) |
Cool! BTW, a useful trick for allocating sequential ideas: In [3]: counter = itertools.count()
In [4]: next(counter)
Out[4]: 0
In [5]: next(counter)
Out[5]: 1
In [6]: next(counter)
Out[6]: 2
Thank you! |
Using I've updated the gist (revision 2). I've handled most real-life concerns I could think of, except invalid messages (e.g. bad JSON) which I am too lazy to fix. Code-wise, I need to find a way to encapsulate the I'll close this now - thanks for the discussion! |
Yeah, this is annoying! My highest priority for 0.3.0 is implementing "simplified nurseries", where it will stop being mandatory for the parent task to park itself in async with open_rpc_client(...) as client:
... and then @asynccontextmanager
async def open_rpc_client(...):
client = await _make_connection(...)
async with trio.open_nursery() as nursery:
nursery.start_soon(client.receiver)
try:
yield client
finally:
# shut down the receiver
nursery.cancel_scope.cancel() |
This is a fun little problem and I was wondering what is the best way to use trio to solve it. I hope this is appropriate here.
Consider a simple RPC protocol:
Example request:
{"id": 15353, "params": ...}
Example successful response:
{"id": 15353, "result": ...}
Example error response:
{"id": 15353, "error": ...}
The client can send multiple requests concurrently. The server can respond out of order.
Suppose we want to write a client for this protocol using trio. We want to send many concurrent requests and handle their responses. We want to have a synchronous-looking API, e.g.
try { response = await send(request) } except { ... }
.Here is how I would solve it (I've hidden it in case someone wants to come up with an independent solution 💭):
Add a future/promise abstraction using
trio.Event
internally. Curio has a simple one.The sending function creates a future, stores it in a
request id -> future
dict, sends the request, and waits on the future.Spawn a received task which receives responses from the server and sets the result/exception on the corresponding future in the
request id -> future
dict.For each request, spawn a task which would use the sending function.
Is there a more elegant way?
The text was updated successfully, but these errors were encountered: