Skip to content

Commit

Permalink
Add Pool::swap_config method
Browse files Browse the repository at this point in the history
  • Loading branch information
bikeshedder committed Feb 14, 2022
1 parent 8386178 commit b67b689
Show file tree
Hide file tree
Showing 3 changed files with 93 additions and 9 deletions.
1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ deadpool-runtime = { version = "0.1", path = "./runtime" }
# `tokio::sync::Semaphore`. No other features of `tokio` are enabled or used
# unless the `rt_tokio_1` feature is enabled.
tokio = { version = "1.0", features = ["sync"] }
arc-swap = "1.5.0"

[dev-dependencies]
async-std = { version = "1.0", features = ["attributes"] }
Expand Down
43 changes: 34 additions & 9 deletions src/managed/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,11 +72,12 @@ use std::{
ops::{Deref, DerefMut},
sync::{
atomic::{AtomicIsize, Ordering},
Arc, Mutex, Weak,
Arc, Mutex, MutexGuard, Weak,
},
time::{Duration, Instant},
};

use arc_swap::ArcSwap;
use async_trait::async_trait;
use deadpool_runtime::Runtime;
use retain_mut::RetainMut;
Expand Down Expand Up @@ -287,7 +288,7 @@ impl<M: Manager, W: From<Object<M>>> Pool<M, W> {
}),
available: AtomicIsize::new(0),
semaphore: Semaphore::new(builder.config.max_size),
config: builder.config,
config: ArcSwap::new(Arc::new(builder.config)),
hooks: builder.hooks,
runtime: builder.runtime,
}),
Expand Down Expand Up @@ -466,14 +467,24 @@ impl<M: Manager, W: From<Object<M>>> Pool<M, W> {
* always reports a `max_size` of 0 for closed pools.
*/
pub fn resize(&self, max_size: usize) {
self.swap_config(PoolConfig {
max_size,
..self.inner.config.load()
});
}

/// Internal _resize method which uses an already
/// locked mutex. This method is used by the `swap_config`
/// method.
fn _resize(&self, slots: MutexGuard<'_, Slots<ObjectInner<M>>>) {
if self.inner.semaphore.is_closed() {
return;
}
let mut slots = self.inner.slots.lock().unwrap();
let old_max_size = slots.max_size;
slots.max_size = max_size;
let new_max_size = self.config().max_size;
slots.max_size = new_max_size;
// shrink pool
if max_size < old_max_size {
if new_max_size < old_max_size {
while slots.size > slots.max_size {
if let Ok(permit) = self.inner.semaphore.try_acquire() {
permit.forget();
Expand All @@ -485,14 +496,14 @@ impl<M: Manager, W: From<Object<M>>> Pool<M, W> {
}
}
// Create a new VecDeque with a smaller capacity
let mut vec = VecDeque::with_capacity(max_size);
let mut vec = VecDeque::with_capacity(new_max_size);
for obj in slots.vec.drain(..) {
vec.push_back(obj);
}
slots.vec = vec;
}
// grow pool
if max_size > old_max_size {
if new_max_size > old_max_size {
let additional = slots.max_size - slots.size;
slots.vec.reserve_exact(additional);
self.inner.semaphore.add_permits(additional);
Expand Down Expand Up @@ -536,9 +547,23 @@ impl<M: Manager, W: From<Object<M>>> Pool<M, W> {
guard.size -= len_before - guard.vec.len();
}

/// Get current pool configuration
pub fn config(&self) -> Arc<PoolConfig> {
self.inner.config.load_full()
}

/// Swap config.
pub fn swap_config(&self, config: PoolConfig) {
let slots = self.inner.slots.lock().unwrap();
self.inner.config.swap(Arc::new(config));
if slots.max_size != config.max_size {
self._resize(config.max_size, slots);
}
}

/// Get current timeout configuration
pub fn timeouts(&self) -> Timeouts {
self.inner.config.timeouts
self.inner.config.load().timeouts
}

/// Closes this [`Pool`].
Expand Down Expand Up @@ -584,7 +609,7 @@ struct PoolInner<M: Manager> {
/// the number of [`Future`]s waiting for an [`Object`].
available: AtomicIsize,
semaphore: Semaphore,
config: PoolConfig,
config: ArcSwap<PoolConfig>,
runtime: Option<Runtime>,
hooks: hooks::Hooks<M>,
}
Expand Down
58 changes: 58 additions & 0 deletions tests/managed.rs
Original file line number Diff line number Diff line change
Expand Up @@ -261,6 +261,64 @@ async fn resize_pool_grow_concurrent() {
assert_eq!(pool.status().available, 1);
}

#[tokio::test]
async fn resize_pool_with_config() {
let mgr = Manager {};
let pool = Pool::builder(mgr).max_size(0).build().unwrap();
let join_handle = {
let pool = pool.clone();
tokio::spawn(async move { pool.get().await })
};
tokio::task::yield_now().await;
assert_eq!(
pool.status(),
Status {
max_size: 0,
size: 0,
available: -1
}
);
pool.swap_config(PoolConfig {
max_size: 1,
..pool.config()
});
assert_eq!(
pool.status(),
Status {
max_size: 1,
size: 0,
available: -1
}
);
tokio::task::yield_now().await;
assert_eq!(
pool.status(),
Status {
max_size: 1,
size: 1,
available: 0,
}
);
let obj0 = join_handle.await.unwrap().unwrap();
assert_eq!(
pool.status(),
Status {
max_size: 1,
size: 1,
available: 0
}
);
pool.swap_config(PoolConfig {
max_size: 2,
..pool.config()
});
assert_eq!(pool.status(), Status {
max_size: 2,
size: 1,
available, 0,
});
}

#[tokio::test]
async fn retain() {
let mgr = Manager {};
Expand Down

0 comments on commit b67b689

Please sign in to comment.