-
Notifications
You must be signed in to change notification settings - Fork 18
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Runner changes: will use existing tokio::runtime or create one for itself and race condition fix #162
Conversation
If a shutdown message is sent before the receiver's waiting to receive that shutdown message are created shotover will panic on the send
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Now that I can see the size of the race condition fix I can see that should have gone in a separate PR.
But the changes arent too complicated so I should be able to manage reviewing as is.
// Cannot receive a "lag error" as only one value is ever sent. | ||
self.notify.recv().await.unwrap(); | ||
// check we didn't receive a shutdown message before the receiver was created | ||
if !*self.notify.borrow() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this looks like a race condition, but also this whole shutdown abstraction looks kind of weird.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This code is adapted from the tokio mini-redis project.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I was wondering why there was so many comments... hehe.
Ill take a look
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This snippet demonstrates why we need the check:
#[tokio::main]
async fn main() {
let (tx, mut rx) = tokio::sync::watch::channel(false);
tx.send(true).unwrap();
rx.changed().await.unwrap();
println!("borrow rx: {}", *rx.borrow());
let mut rx2 = rx.clone();
rx2.changed().await.unwrap(); //hangs here!
println!("borrow rx2: {}", *rx2.borrow());
}
Alright, fair enough.
// Cannot receive a "lag error" as only one value is ever sent. | ||
self.notify.recv().await.unwrap(); | ||
// check we didn't receive a shutdown message before the receiver was created | ||
if !*self.notify.borrow() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This snippet demonstrates why we need the check:
#[tokio::main]
async fn main() {
let (tx, mut rx) = tokio::sync::watch::channel(false);
tx.send(true).unwrap();
rx.changed().await.unwrap();
println!("borrow rx: {}", *rx.borrow());
let mut rx2 = rx.clone();
rx2.changed().await.unwrap(); //hangs here!
println!("borrow rx2: {}", *rx2.borrow());
}
Alright, fair enough.
shotover-proxy/tests/helpers/mod.rs
Outdated
.unwrap() | ||
.unwrap(); | ||
self.trigger_shutdown_tx.send(true).unwrap(); | ||
tokio::task::block_in_place(move || { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this doing what we think it is doing?
I dont see anything that actually uses the join handle in anyway.
Consider this screenshot with the ;
removed.
We can see that its actually returning the JoinHandle (so the join handle must not be being used in any way)
If the block_in_place happens to give us a nice error maybe we can just stick it somewhere to give us that error.
Or maybe we can do whatever detection it is doing and give an even better error! Dont forget to include #[tokio::test(flavor = "multi_thread")] on your test!
or something
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, I'm pretty sure this isn't doing anything. 😂
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Strange. With it removed shotover doesn't shutdown cleanly so I was under the impression it was blocking it until the join handle task finishes
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah I think we need this instead. block_in_place
lets us enter an async context and then we can use the handle to run the join handle future to completion.
tokio::task::block_in_place(move || {
self.runtime_handle
.block_on(self.join_handle.take().unwrap())
.unwrap()
.unwrap();
});
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@@ -62,20 +62,20 @@ impl SourcesConfig { | |||
&self, | |||
chain: &TransformChain, | |||
topics: &mut TopicHolder, | |||
trigger_shutdown_tx: broadcast::Sender<()>, | |||
trigger_shutdown_rx: watch::Receiver<bool>, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Would it make sense to use Shutdown
instead of watch
everywhere else? The "whole shutdown abstraction looks kind of weird" is encapsulated by the abstraction so it seems a bit of a waste to reimplement it? Or is it important to keep the separation between what can initiate shutdowns and what can listen for it?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, I think using Shutdown instead of watch is a good idea, but lets do that in a follow up PR.
Otherwise we risk hitting more issues and further blowing up the size of this PR.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
Shotover's
Runner
will use an existing runtime if it is running inside one (e.g. in async tests) or create one for itself (same behaviour as before).The
Runtime
builder was moved to another method (Runner::runtime
) and we check if we can get a handle on the current runtime, otherwise create one and return theHandle
andRuntime
objects.run_spawn
,run_block
andwith_observability_interface
use this method to get a handle to start their tasks from.ShotoverManager
was modified to hold a handle and runtime object. We have to store the runtime inside theShotoverManager
so that it does not go out of scope during tests and shut down.I was also able to update
tokio
from1.6.1
to1.11.0
.