Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Runner changes: will use existing tokio::runtime or create one for itself and race condition fix #162

Merged
merged 20 commits into from
Sep 6, 2021
Merged

Conversation

conorbros
Copy link
Member

Shotover's Runner will use an existing runtime if it is running inside one (e.g. in async tests) or create one for itself (same behaviour as before).

The Runtime builder was moved to another method (Runner::runtime) and we check if we can get a handle on the current runtime, otherwise create one and return the Handle and Runtime objects. run_spawn, run_block and with_observability_interface use this method to get a handle to start their tasks from. ShotoverManager was modified to hold a handle and runtime object. We have to store the runtime inside the ShotoverManager so that it does not go out of scope during tests and shut down.

I was also able to update tokio from 1.6.1 to 1.11.0.

@conorbros conorbros linked an issue Sep 1, 2021 that may be closed by this pull request
shotover-proxy/tests/helpers/mod.rs Outdated Show resolved Hide resolved
shotover-proxy/tests/runner/runtime_int_tests.rs Outdated Show resolved Hide resolved
shotover-proxy/src/runner.rs Outdated Show resolved Hide resolved
shotover-proxy/tests/helpers/mod.rs Outdated Show resolved Hide resolved
shotover-proxy/src/runner.rs Outdated Show resolved Hide resolved
shotover-proxy/tests/runner/runtime_int_tests.rs Outdated Show resolved Hide resolved
shotover-proxy/src/runner.rs Outdated Show resolved Hide resolved
@conorbros conorbros changed the title Runner use existing tokio::runtime or create one for itself Runner changes: will existing tokio::runtime or create one for itself and race condition fix Sep 2, 2021
Copy link
Member

@rukai rukai left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Now that I can see the size of the race condition fix I can see that should have gone in a separate PR.
But the changes arent too complicated so I should be able to manage reviewing as is.

shotover-proxy/src/config/topology.rs Outdated Show resolved Hide resolved
shotover-proxy/src/runner.rs Outdated Show resolved Hide resolved
shotover-proxy/tests/helpers/mod.rs Outdated Show resolved Hide resolved
shotover-proxy/src/runner.rs Outdated Show resolved Hide resolved
shotover-proxy/src/runner.rs Outdated Show resolved Hide resolved
// Cannot receive a "lag error" as only one value is ever sent.
self.notify.recv().await.unwrap();
// check we didn't receive a shutdown message before the receiver was created
if !*self.notify.borrow() {
Copy link
Member

@rukai rukai Sep 3, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this looks like a race condition, but also this whole shutdown abstraction looks kind of weird.

Copy link
Member Author

@conorbros conorbros Sep 3, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This code is adapted from the tokio mini-redis project.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was wondering why there was so many comments... hehe.
Ill take a look

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This snippet demonstrates why we need the check:

#[tokio::main]
async fn main() {
    let (tx, mut rx) = tokio::sync::watch::channel(false);

    tx.send(true).unwrap();

    rx.changed().await.unwrap();
    println!("borrow rx: {}", *rx.borrow());

    let mut rx2 = rx.clone();

    rx2.changed().await.unwrap(); //hangs here!
    println!("borrow rx2: {}", *rx2.borrow());
}

Alright, fair enough.

shotover-proxy/tests/runner/runner_int_tests.rs Outdated Show resolved Hide resolved
// Cannot receive a "lag error" as only one value is ever sent.
self.notify.recv().await.unwrap();
// check we didn't receive a shutdown message before the receiver was created
if !*self.notify.borrow() {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This snippet demonstrates why we need the check:

#[tokio::main]
async fn main() {
    let (tx, mut rx) = tokio::sync::watch::channel(false);

    tx.send(true).unwrap();

    rx.changed().await.unwrap();
    println!("borrow rx: {}", *rx.borrow());

    let mut rx2 = rx.clone();

    rx2.changed().await.unwrap(); //hangs here!
    println!("borrow rx2: {}", *rx2.borrow());
}

Alright, fair enough.

.unwrap()
.unwrap();
self.trigger_shutdown_tx.send(true).unwrap();
tokio::task::block_in_place(move || {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this doing what we think it is doing?
I dont see anything that actually uses the join handle in anyway.
Consider this screenshot with the ; removed.
We can see that its actually returning the JoinHandle (so the join handle must not be being used in any way)
image

If the block_in_place happens to give us a nice error maybe we can just stick it somewhere to give us that error.
Or maybe we can do whatever detection it is doing and give an even better error! Dont forget to include #[tokio::test(flavor = "multi_thread")] on your test! or something

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I'm pretty sure this isn't doing anything. 😂

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Strange. With it removed shotover doesn't shutdown cleanly so I was under the impression it was blocking it until the join handle task finishes

Copy link
Member Author

@conorbros conorbros Sep 3, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah I think we need this instead. block_in_place lets us enter an async context and then we can use the handle to run the join handle future to completion.

tokio::task::block_in_place(move || {
    self.runtime_handle
    .block_on(self.join_handle.take().unwrap())
    .unwrap()
    .unwrap();
});

Copy link
Member

@rukai rukai Sep 3, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah that works.

However I'm thinking we should use block_in_place in Runner to assert we got a multithread runtime.
This way any user of Runner gets this assertion.
Using block_in_place is discouraged so we should revert back to entering the runtime rather than using block_in_place twice.
image

@@ -62,20 +62,20 @@ impl SourcesConfig {
&self,
chain: &TransformChain,
topics: &mut TopicHolder,
trigger_shutdown_tx: broadcast::Sender<()>,
trigger_shutdown_rx: watch::Receiver<bool>,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would it make sense to use Shutdown instead of watch everywhere else? The "whole shutdown abstraction looks kind of weird" is encapsulated by the abstraction so it seems a bit of a waste to reimplement it? Or is it important to keep the separation between what can initiate shutdowns and what can listen for it?

Copy link
Member

@rukai rukai Sep 3, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I think using Shutdown instead of watch is a good idea, but lets do that in a follow up PR.
Otherwise we risk hitting more issues and further blowing up the size of this PR.

@conorbros conorbros requested a review from rukai September 6, 2021 01:36
Copy link
Member

@rukai rukai left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

@conorbros conorbros mentioned this pull request Sep 6, 2021
@rukai rukai merged commit 3d713b4 into shotover:main Sep 6, 2021
@conorbros conorbros changed the title Runner changes: will existing tokio::runtime or create one for itself and race condition fix Runner changes: will use existing tokio::runtime or create one for itself and race condition fix Sep 6, 2021
@conorbros conorbros deleted the int-test-async branch September 6, 2021 04:08
@rukai rukai mentioned this pull request Sep 7, 2021
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Add optionally configurable runtime to enable async integration tests
4 participants