Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Bevy tasks #3

Merged
merged 2 commits into from
Aug 29, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion crates/bevy_app/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ keywords = ["bevy"]
bevy_derive = { path = "../bevy_derive", version = "0.1" }
bevy_ecs = { path = "../bevy_ecs", version = "0.1" }
bevy_tasks = { path = "../bevy_tasks" }
num_cpus = "1"
bevy_math = { path = "../bevy_math", version = "0.1" }

# other
libloading = "0.6"
Expand Down
16 changes: 3 additions & 13 deletions crates/bevy_app/src/task_pool_options.rs
Original file line number Diff line number Diff line change
@@ -1,16 +1,6 @@
use bevy_ecs::Resources;
use bevy_tasks::{AsyncComputeTaskPool, ComputeTaskPool, IOTaskPool, TaskPoolBuilder};

fn clamp_usize(value: usize, min: usize, max: usize) -> usize {
if value > max {
max
} else if value < min {
min
} else {
value
}
}

/// Defines a simple way to determine how many threads to use given the number of remaining cores
/// and number of total cores
#[derive(Clone)]
Expand All @@ -36,7 +26,7 @@ impl TaskPoolThreadAssignmentPolicy {
// Clamp by min_threads, max_threads. (This may result in us using more threads than are
// available, this is intended. An example case where this might happen is a device with
// <= 2 threads.
clamp_usize(desired, self.min_threads, self.max_threads)
bevy_math::clamp(desired, self.min_threads, self.max_threads)
}
}

Expand Down Expand Up @@ -101,8 +91,8 @@ impl DefaultTaskPoolOptions {

/// Inserts the default thread pools into the given resource map based on the configured values
pub fn create_default_pools(&self, resources: &mut Resources) {
let total_threads = clamp_usize(
num_cpus::get(),
let total_threads = bevy_math::clamp(
bevy_tasks::logical_core_count(),
self.min_total_threads,
self.max_total_threads,
);
Expand Down
19 changes: 19 additions & 0 deletions crates/bevy_math/src/clamp.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
/// A value bounded by a minimum and a maximum
///
/// If input is less than min then this returns min.
/// If input is greater than max then this returns max.
/// Otherwise this returns input.
///
/// **Panics** in debug mode if `!(min <= max)`.
///
/// Original implementation from num-traits licensed as MIT
pub fn clamp<T: PartialOrd>(input: T, min: T, max: T) -> T {
debug_assert!(min <= max, "min must be less than or equal to max");
if input < min {
min
} else if input > max {
max
} else {
input
}
}
2 changes: 2 additions & 0 deletions crates/bevy_math/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
mod clamp;
mod face_toward;
mod geometry;

pub use clamp::*;
pub use face_toward::*;
pub use geometry::*;
pub use glam::*;
Expand Down
8 changes: 8 additions & 0 deletions crates/bevy_tasks/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,3 +16,11 @@ pub mod prelude {
usages::{AsyncComputeTaskPool, ComputeTaskPool, IOTaskPool},
};
}

pub fn logical_core_count() -> usize {
num_cpus::get()
}

pub fn physical_core_count() -> usize {
num_cpus::get_physical()
}
15 changes: 13 additions & 2 deletions crates/bevy_tasks/src/slice.rs
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,12 @@ mod tests {
let task_pool = TaskPool::new();
let outputs = v.par_splat_map(&task_pool, None, |numbers| -> i32 { numbers.iter().sum() });

println!("outputs: {:?}", outputs);
let mut sum = 0;
for output in outputs {
sum += output;
}

assert_eq!(sum, 1000 * 42);
}

#[test]
Expand All @@ -100,6 +105,12 @@ mod tests {
numbers.iter().sum()
});

println!("outputs: {:?}", outputs);
let mut sum = 0;
for output in outputs {
sum += output;
}

assert_eq!(sum, 1000 * 42 * 2);
assert_eq!(v[0], 84);
}
}
41 changes: 23 additions & 18 deletions crates/bevy_tasks/src/task_pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,19 +10,6 @@ use std::{
thread::{self, JoinHandle},
};

macro_rules! pin_mut {
($($x:ident),*) => { $(
// Move the value to ensure that it is owned
let mut $x = $x;
// Shadow the original binding so that it can't be directly accessed
// ever again.
#[allow(unused_mut)]
let mut $x = unsafe {
Pin::new_unchecked(&mut $x)
};
)* }
}

/// Used to create a TaskPool
#[derive(Debug, Default, Clone)]
pub struct TaskPoolBuilder {
Expand Down Expand Up @@ -103,7 +90,7 @@ pub struct TaskPool {
/// Vec<Task<T>> contained within TaskPoolInner
executor: Arc<multitask::Executor>,

///
/// Inner state of the pool
inner: Arc<TaskPoolInner>,
}

Expand Down Expand Up @@ -187,7 +174,12 @@ impl TaskPool {
F: FnOnce(&mut Scope<'scope, T>) + 'scope + Send,
T: Send + 'static,
{
let executor: &'scope multitask::Executor = unsafe { mem::transmute(&*self.executor) };
// SAFETY: This function blocks until all futures complete, so this future must return
// before this function returns. However, rust has no way of knowing
// this so we must convert to 'static here to appease the compiler as it is unable to
// validate safety.
let executor: &multitask::Executor = &*self.executor as &multitask::Executor;
let executor: &'scope multitask::Executor = unsafe { mem::transmute(executor) };

let fut = async move {
let mut scope = Scope {
Expand All @@ -205,11 +197,20 @@ impl TaskPool {
results
};

pin_mut!(fut);
// Move the value to ensure that it is owned
let mut fut = fut;

// Shadow the original binding so that it can't be directly accessed
// ever again.
let fut = unsafe { Pin::new_unchecked(&mut fut) };

// let fut: Pin<&mut (dyn Future<Output=()> + Send)> = fut;
// SAFETY: This function blocks until all futures complete, so we do not read/write the
// data from futures outside of the 'scope lifetime. However, rust has no way of knowing
// this so we must convert to 'static here to appease the compiler as it is unable to
// validate safety.
let fut: Pin<&mut (dyn Future<Output = Vec<T>> + Send)> = fut;
let fut: Pin<&'static mut (dyn Future<Output = Vec<T>> + Send + 'static)> =
unsafe { mem::transmute(fut as Pin<&mut (dyn Future<Output = Vec<T>> + Send)>) };
unsafe { mem::transmute(fut) };

pollster::block_on(self.executor.spawn(fut))
}
Expand Down Expand Up @@ -241,6 +242,10 @@ pub struct Scope<'scope, T> {

impl<'scope, T: Send + 'static> Scope<'scope, T> {
pub fn spawn<Fut: Future<Output = T> + 'scope + Send>(&mut self, f: Fut) {
// SAFETY: This function blocks until all futures complete, so we do not read/write the
// data from futures outside of the 'scope lifetime. However, rust has no way of knowing
// this so we must convert to 'static here to appease the compiler as it is unable to
// validate safety.
let fut: Pin<Box<dyn Future<Output = T> + 'scope + Send>> = Box::pin(f);
let fut: Pin<Box<dyn Future<Output = T> + 'static + Send>> = unsafe { mem::transmute(fut) };

Expand Down