Skip to content

Commit

Permalink
WIP: Allow passing Tokio Handle on ntex-rt
Browse files Browse the repository at this point in the history
  • Loading branch information
pavlospt committed Apr 3, 2024
1 parent 395cf69 commit 9049c36
Show file tree
Hide file tree
Showing 5 changed files with 205 additions and 29 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -13,3 +13,4 @@ cobertura.xml

# These are backup files generated by rustfmt
**/*.rs.bk
.idea/
4 changes: 4 additions & 0 deletions ntex-rt/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,11 @@ tok-io = { version = "1", package = "tokio", default-features = false, features
"net",
], optional = true }
async_std = { version = "1", package = "async-std", optional = true }
env_logger = "0.11.3"

[target.'cfg(target_os = "linux")'.dependencies]
glomm-io = { version = "0.8", package = "glommio", optional = true }
futures-channel = { version = "0.3", optional = true }

[dev-dependencies]
tok-io = { version = "1.37.0", package = "tokio", features = ["rt", "macros"] }
12 changes: 12 additions & 0 deletions ntex-rt/src/arbiter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -370,6 +370,7 @@ where
mod tests {
use super::*;

#[cfg(not(feature = "tokio"))]
#[test]
fn test_arbiter_local_storage() {
let _s = System::new("test");
Expand All @@ -379,4 +380,15 @@ mod tests {
assert!(Arbiter::contains_item::<&'static str>());
assert!(format!("{:?}", Arbiter::current()).contains("Arbiter"));
}

#[tok_io::test]
#[cfg(feature = "tokio")]
async fn test_arbiter_local_storage() {
let _s = System::new_with_handle("test");
Arbiter::set_item("test");
assert!(Arbiter::get_item::<&'static str, _, _>(|s| *s == "test"));
assert!(Arbiter::get_mut_item::<&'static str, _, _>(|s| *s == "test"));
assert!(Arbiter::contains_item::<&'static str>());
assert!(format!("{:?}", Arbiter::current()).contains("Arbiter"));
}
}
197 changes: 168 additions & 29 deletions ntex-rt/src/builder.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,12 @@
#![allow(clippy::let_underscore_future)]

use std::{cell::RefCell, future::Future, io, rc::Rc};

use async_channel::unbounded;

#[cfg(feature = "tokio")]
use tok_io::runtime::Handle;

use crate::arbiter::{Arbiter, ArbiterController, SystemArbiter};
use crate::System;

Expand All @@ -16,13 +20,24 @@ pub struct Builder {
name: String,
/// Whether the Arbiter will stop the whole System on uncaught panic. Defaults to false.
stop_on_panic: bool,
/// Use current Handler
use_current_handle: bool,
}

impl Builder {
pub(super) fn new_with_handle() -> Self {
Builder {
name: "ntex".into(),
stop_on_panic: false,
use_current_handle: true,
}
}

pub(super) fn new() -> Self {
Builder {
name: "ntex".into(),
stop_on_panic: false,
use_current_handle: false,
}
}

Expand Down Expand Up @@ -56,15 +71,36 @@ impl Builder {
let arb = SystemArbiter::new(stop_tx, sys_receiver);

// init system arbiter and run configuration method
SystemRunner {
stop,
arb,
arb_controller,
system,
#[cfg(feature = "tokio")]
{
let handle = if self.use_current_handle {
Some(Handle::current())
} else {
None
};

SystemRunner {
stop,
arb,
arb_controller,
system,
handle,
}
}

#[cfg(not(feature = "tokio"))]
{
SystemRunner {
stop,
arb,
arb_controller,
system,
}
}
}
}

#[cfg(not(feature = "tokio"))]
/// Helper object that runs System's event loop
#[must_use = "SystemRunner must be run"]
pub struct SystemRunner {
Expand All @@ -74,6 +110,15 @@ pub struct SystemRunner {
system: System,
}

#[cfg(feature = "tokio")]
pub struct SystemRunner {
stop: oneshot::Receiver<i32>,
arb: SystemArbiter,
arb_controller: ArbiterController,
system: System,
handle: Option<Handle>,
}

impl SystemRunner {
/// Get current system.
pub fn system(&self) -> System {
Expand All @@ -93,26 +138,58 @@ impl SystemRunner {
where
F: FnOnce() -> io::Result<()> + 'static,
{
let SystemRunner {
stop,
arb,
arb_controller,
..
} = self;
#[cfg(feature = "tokio")]
{
let SystemRunner {
arb,
arb_controller,
stop,
handle,
..
} = self;

if let Some(handle) = handle {
let result = handle.block_on(async { f() });
return result;
}

match crate::builder::block_on(stop, arb, arb_controller, f).take()? {
Ok(code) => {
if code != 0 {
Err(io::Error::new(
io::ErrorKind::Other,
format!("Non-zero exit code: {}", code),
))
} else {
Ok(())
}
}
Err(_) => Err(io::Error::new(io::ErrorKind::Other, "Closed")),
}
}

// run loop
match block_on(stop, arb, arb_controller, f).take()? {
Ok(code) => {
if code != 0 {
Err(io::Error::new(
io::ErrorKind::Other,
format!("Non-zero exit code: {}", code),
))
} else {
Ok(())
#[cfg(not(feature = "tokio"))]
{
let SystemRunner {
arb,
arb_controller,
stop,
..
} = self;
match crate::builder::block_on(stop, arb, arb_controller, f).take()? {
Ok(code) => {
if code != 0 {
Err(io::Error::new(
io::ErrorKind::Other,
format!("Non-zero exit code: {}", code),
))
} else {
Ok(())
}
}
Err(_) => Err(io::Error::new(io::ErrorKind::Other, "Closed")),
}
Err(_) => Err(io::Error::new(io::ErrorKind::Other, "Closed")),
}
}

Expand All @@ -123,16 +200,38 @@ impl SystemRunner {
F: Future<Output = R> + 'static,
R: 'static,
{
let SystemRunner {
arb,
arb_controller,
..
} = self;
#[cfg(feature = "tokio")]
{
let SystemRunner {
handle,
arb,
arb_controller,
..
} = self;

if let Some(handle) = handle {
let result = handle.block_on(fut);
return result;
}

match crate::builder::block_on(fut, arb, arb_controller, || Ok(())).take() {
Ok(result) => result,
Err(_) => unreachable!(),
}
}

// run loop
match block_on(fut, arb, arb_controller, || Ok(())).take() {
Ok(result) => result,
Err(_) => unreachable!(),
#[cfg(not(feature = "tokio"))]
{
let SystemRunner {
arb,
arb_controller,
..
} = self;
match crate::builder::block_on(fut, arb, arb_controller, || Ok(())).take() {
Ok(result) => result,
Err(_) => unreachable!(),
}
}
}
}
Expand Down Expand Up @@ -179,6 +278,7 @@ mod tests {

use super::*;

#[cfg(not(feature = "tokio"))]
#[test]
fn test_async() {
let (tx, rx) = mpsc::channel();
Expand Down Expand Up @@ -212,4 +312,43 @@ mod tests {
let id2 = rx.recv().unwrap();
assert_eq!(id, id2);
}

#[cfg(feature = "tokio")]
#[tok_io::test]
async fn test_async() {
_ = env_logger::builder().is_test(true).try_init();

let (tx, rx) = mpsc::channel();
let s = System::new_with_handle("test");

thread::spawn(move || {
let runner = crate::System::build_with_handle()
.stop_on_panic(true)
.finish();

tx.send(runner.system()).unwrap();
let _ = runner.run_until_stop();
});

let sys = rx.recv().unwrap();
let id = sys.id();
let (tx, rx) = mpsc::channel();
sys.arbiter().exec_fn(move || {
let _ = tx.send(System::current().id());
});
let id2 = rx.recv().unwrap();
assert_eq!(id, id2);

let id2 = s
.block_on(sys.arbiter().exec(|| System::current().id()))
.unwrap();
assert_eq!(id, id2);

let (tx, rx) = mpsc::channel();
sys.arbiter().spawn(Box::pin(async move {
let _ = tx.send(System::current().id());
}));
let id2 = rx.recv().unwrap();
assert_eq!(id, id2);
}
}
20 changes: 20 additions & 0 deletions ntex-rt/src/system.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,10 +40,30 @@ impl System {
///
/// This allows to customize the runtime. See struct level docs on
/// `Builder` for more information.
#[cfg(feature = "tokio")]
pub fn build_with_handle() -> Builder {
Builder::new_with_handle()
}

/// Build a new system with a customized tokio runtime.
///
/// This allows to customize the runtime. See struct level docs on
/// `Builder` for more information.
#[cfg(not(feature = "tokio"))]
pub fn build() -> Builder {
Builder::new()
}

#[cfg(feature = "tokio")]
#[allow(clippy::new_ret_no_self)]
/// Create new system.
///
/// This method panics if it can not create tokio runtime
pub fn new_with_handle(name: &str) -> SystemRunner {
Self::build_with_handle().name(name).finish()
}

#[cfg(not(feature = "tokio"))]
#[allow(clippy::new_ret_no_self)]
/// Create new system.
///
Expand Down

0 comments on commit 9049c36

Please sign in to comment.