-
Notifications
You must be signed in to change notification settings - Fork 248
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
DSN. Fix async handlers issue for request-response protocol. #1206
Conversation
- rename variables - delete unused code
tokio::spawn(async move { | ||
handler.run().await; | ||
}); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This might fix immediate issue, but it will also mean future is not cancellable anymore. And certainly not synchronously.
This doesn't look like a proper fix to me, you removed essential part of the implementation (protocol_handlers
property is now set to an empty vector), which is a hack that even if works is not the way it should be done.
I'm also skeptical as to whether it actually fixes anything, so a simple test case that creates two node and makes async request from one to another would have been helpful.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I initially kept the JoinHandlers in the original vector (protocol_handlers ) in order to have some control (like aborting them) but eventually, I removed that vector. Those handlers are meant to work during the whole app cycle.
I also created a local branch of #1172 merged with the latest main and this PR. It successfully uses this code in the complex import block scenario. Do you want me to publish that testing branch?
The patch seems to fix the original issue without compromising on any features. I'm not following your comment, please, describe the issue in more detail.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Those handlers are meant to work during the whole app cycle.
This is not true. subspace-networking
is a library. It should support both starting and stopping. When you drop everything nothing should be running, yet this PR introduces the first case where this is no longer true.
Do you want me to publish that testing branch?
No, I want a small isolated test care that creates requester, responder, send request, requests is processed asynchronously and then response is successfully received by requester. #1172 is huge and not a great test case as such.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is not true. subspace-networking is a library. It should support both starting and stopping. When you drop everything nothing should be running, yet this PR introduces the first case where this is no longer true.
I could reintroduce the saved join handles vector and abort all of them on some cancellation event. I'm not sure we have any for the request-responses protocol factory but I can attach it to the Drop
implementation.
No, I want a small isolated test care that creates requester, responder, send request, requests is processed asynchronously and then response is successfully received by requester. #1172 is huge and not a great test case as such.
I will probably modify one of our examples then. It will be similar to what we have using the separate test client and async sleep
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I could reintroduce the saved join handles vector and abort all of them on some cancellation event. I'm not sure we have any for the request-responses protocol factory but I can attach it to the Drop implementation.
Right, but then you need to block to wait for them to actually be cancelled because cancellation is not synchronous.
I still don't quite get what was wrong exactly with future driving and why this PR fixes it. Can you elaborate, please?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure.
The initial implementation drove the handlers like this:
for rq_rs_runner in &mut self.protocol_handlers {
// Future.Output == (), so we don't need a result here
let _ = rq_rs_runner.run().poll_unpin(cx);
}
It didn't care about the Poll result.
This is the key part of the handlers` implementation:
async fn run(&mut self) {
while let Some(request) = self.request_receiver.next().await {
let IncomingRequest {
peer,
payload,
pending_response,
} = request;
match self.handle_request(peer, payload).await {
......
The future can return Pending
on two occasions - a request is not ready or the inner future (actually configured handler) is pending. Previously, we had an actual handler as synchronous so it never returned Pending and the external handler returned Pending only for a valid reason - when the request is not ready. This is why it worked even in pseudo-async handlers like async { result }
. However, if we introduce a handler that could return Pending on its own - the previous implementation dropped it without ever returning back to it and that caused our problems. Initially, I tried to move to the separate tasks only handler runs
but the borrow checker prevented it (and for good reason), so I ended up moving the handlers in the true async context in order to drive them properly.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There is nothing wrong with future returning Poll::Pending
sometimes and you're still calling .run()
just like before, so that doesn't fully explain what the root issue.
I read the code carefully now and found that the issue is that rq_rs_runner.run()
returns a future that is polled once and dropped regardless of whether it has finished. THAT is the reason, you're dropping an incomplete future, not just some result.
There are multiple proper solutions here and by far the simplest would be to store futures that you put into tokio::spawn()
into protocol_handlers
rather than protocol handlers themselves. They will essentially never resolve, but you'll also never drop them either. It is still a somewhat ugly and inefficient implementation of manually written future, it might be worth rewriting it as an async function at some point.
This proper fix is also upstreamable into Substrate even if they don't support async handlers otherwise as such, but it'll be helpful for them when they do.
tokio::spawn(async move { | ||
handler.run().await; | ||
}); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There is nothing wrong with future returning Poll::Pending
sometimes and you're still calling .run()
just like before, so that doesn't fully explain what the root issue.
I read the code carefully now and found that the issue is that rq_rs_runner.run()
returns a future that is polled once and dropped regardless of whether it has finished. THAT is the reason, you're dropping an incomplete future, not just some result.
There are multiple proper solutions here and by far the simplest would be to store futures that you put into tokio::spawn()
into protocol_handlers
rather than protocol handlers themselves. They will essentially never resolve, but you'll also never drop them either. It is still a somewhat ugly and inefficient implementation of manually written future, it might be worth rewriting it as an async function at some point.
This proper fix is also upstreamable into Substrate even if they don't support async handlers otherwise as such, but it'll be helpful for them when they do.
@@ -35,6 +36,8 @@ async fn main() { | |||
listen_on: vec!["/ip4/0.0.0.0/tcp/0".parse().unwrap()], | |||
allow_non_global_addresses_in_dht: true, | |||
request_response_protocols: vec![GenericRequestHandler::create(|&ExampleRequest| async { | |||
sleep(Duration::from_secs(2)).await; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Come on, we need a simple quick regression test that runs in CI every time, not example that waits for 2 seconds and reproduces the issue only when someone runs it, which, admittedly, isn't very often.
f519601
to
6213be8
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nice!
} | ||
} | ||
} | ||
} | ||
|
||
// Poll request-responses protocol handlers. | ||
for rq_rs_runner in &mut self.protocol_handlers { | ||
for rq_rs_runner in &mut self.request_handlers { | ||
// Future.Output == (), so we don't need a result here |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Now I see that this comment was misleading in original codebase, it assumed future always resolved immediately, which is not true as we found out
This PR fixes the problem introduced in #1168
It seems the original design of request-responses module from Substrate even having the interface looks like async doesn't actually imply the async handlers.
The problem seems to be the incorrect "future driving" of the handlers in the
poll
method of the request response protocol factory.The fix moves request-response handlers into separate tokio tasks and this seems to fix the problem. In addition, I removed the old unused code.
Code contributor checklist: