Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add idle check to connection cleaner and rework when it runs #60

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 8 additions & 4 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -107,11 +107,15 @@ Sets the maximum idle connection count maintained by the pool. The pool will mai

#### max_lifetime
Sets the maximum lifetime of connections in the pool. Expired connections may be closed lazily before reuse.
>None meas reuse forever, defaults to None.
>None means reuse forever, defaults to None.

#### max_idle_lifetime
Sets the maximum idle lifetime of connections in the pool. Expired connections may be closed lazily before reuse.
>None means reuse forever, defaults to None.

#### get_timeout
Sets the get timeout used by the pool. Calls to Pool::get will wait this long for a connection to become available before returning an error.
>None meas never timeout, defaults to 30 seconds.
>None means never timeout, defaults to 30 seconds.


## Variable
Expand All @@ -121,6 +125,7 @@ Some of the connection pool configurations can be adjusted dynamically. Each con
* set_max_open_conns
* set_max_idle_conns
* set_conn_max_lifetime
* set_conn_max_idle_lifetime

## Stats
* max_open - Maximum number of open connections to the database.
Expand All @@ -131,8 +136,7 @@ Some of the connection pool configurations can be adjusted dynamically. Each con
* wait_duration - The total time blocked waiting for a new connection.
* max_idle_closed - The total number of connections closed due to max_idle.
* max_lifetime_closed - The total number of connections closed due to max_lifetime.
* max_lifetime_idle_closed - The total number of connections closed due to max_idle_lifetime.

## Compatibility
Because tokio is not compatible with other runtimes, such as async-std. So a database driver written with tokio cannot run in the async-std runtime. For example, you can't use redis-rs in tide because it uses tokio, so the connection pool which bases on redis-res can't be used in tide either.


97 changes: 87 additions & 10 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -248,6 +248,7 @@ struct PoolInternals<C, E> {
conn_requests: VecDeque<ReqSender<Conn<C, E>>>,
num_open: u64,
max_lifetime_closed: u64,
max_idle_lifetime_closed: u64,
max_idle_closed: u64,
wait_count: u64,
wait_duration: Duration,
Expand Down Expand Up @@ -292,6 +293,8 @@ pub struct State {
pub max_idle_closed: u64,
/// The total number of connections closed due to `max_lifetime`.
pub max_lifetime_closed: u64,
/// The total number of connections closed due to `max_idle_lifetime`.
pub max_idle_lifetime_closed: u64,
}

impl fmt::Debug for State {
Expand All @@ -305,6 +308,7 @@ impl fmt::Debug for State {
.field("wait_duration", &self.wait_duration)
.field("max_idle_closed", &self.max_idle_closed)
.field("max_lifetime_closed", &self.max_lifetime_closed)
.field("max_idle_lifetime_closed", &self.max_idle_lifetime_closed)
.finish()
}
}
Expand Down Expand Up @@ -371,10 +375,8 @@ impl<M: Manager> Pool<M> {

/// Sets the maximum lifetime of connections in the pool.
///
/// Expired connections may be closed lazily before reuse.
///
/// None meas reuse forever.
/// Defaults to None.
/// - `None` means reuse forever.
/// - Defaults to `None`.
///
/// # Panics
///
Expand All @@ -385,8 +387,8 @@ impl<M: Manager> Pool<M> {
Some(Duration::from_secs(0)),
"max_lifetime must be positive"
);

let mut internals = self.0.internals.lock().await;
internals.config.max_lifetime = max_lifetime;
if let Some(lifetime) = max_lifetime {
match internals.config.max_lifetime {
Some(prev) if lifetime < prev && internals.cleaner_ch.is_some() => {
Expand All @@ -395,9 +397,45 @@ impl<M: Manager> Pool<M> {
}
_ => (),
}
self.start_conn_cleaner(&mut internals);
}
internals.config.max_lifetime = max_lifetime;
}

if max_lifetime.is_some() && internals.num_open > 0 && internals.cleaner_ch.is_none() {
/// Sets the maximum idle lifetime of connections in the pool.
///
/// - `None` means reuse forever.
/// - Defaults to `None`.
///
/// # Panics
///
/// Panics if `max_idle_lifetime` is the zero `Duration`.
pub async fn set_conn_max_idle_lifetime(&self, max_idle_lifetime: Option<Duration>) {
assert_ne!(
max_idle_lifetime,
Some(Duration::from_secs(0)),
"max_idle_lifetime must be positive"
);

let mut internals = self.0.internals.lock().await;
if let Some(lifetime) = max_idle_lifetime {
match internals.config.max_idle_lifetime {
Some(prev) if lifetime < prev && internals.cleaner_ch.is_some() => {
// FIXME
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure what you had in mind for the result here @importcjj
Returning a result would break the API for set_conn_max_lifetime

let _ = internals.cleaner_ch.as_mut().unwrap().send(()).await;
}
_ => (),
}
self.start_conn_cleaner(&mut internals)
}
internals.config.max_idle_lifetime = max_idle_lifetime;
}

fn start_conn_cleaner(
&self,
internals: &mut MutexGuard<'_, PoolInternals<M::Connection, M::Error>>,
) {
if internals.cleaner_ch.is_none() {
log::debug!("run connection cleaner");
let shared1 = Arc::downgrade(&self.0);
let clean_rate = self.0.config.clean_rate;
Expand All @@ -415,6 +453,13 @@ impl<M: Manager> Pool<M> {
} else {
config.max_open as usize
};
let (cleaner_ch_sender, cleaner_ch) =
if config.max_lifetime.is_some() || config.max_idle_lifetime.is_some() {
let (cleaner_ch_sender, cleaner_ch) = mpsc::channel(1);
(Some(cleaner_ch_sender), Some(cleaner_ch))
} else {
(None, None)
};
let (opener_ch_sender, mut opener_ch) = mpsc::channel(max_open);
let (share_config, internal_config) = config.split();
let internals = Mutex::new(PoolInternals {
Expand All @@ -423,11 +468,12 @@ impl<M: Manager> Pool<M> {
conn_requests: VecDeque::new(),
num_open: 0,
max_lifetime_closed: 0,
max_idle_lifetime_closed: 0,
wait_count: 0,
max_idle_closed: 0,
opener_ch: opener_ch_sender,
wait_duration: Duration::from_secs(0),
cleaner_ch: None,
cleaner_ch: cleaner_ch_sender,
});
let shared = Arc::new(SharedPool {
config: share_config,
Expand All @@ -442,6 +488,14 @@ impl<M: Manager> Pool<M> {
}
});

if cleaner_ch.is_some() {
let shared2 = Arc::downgrade(&shared);
let clean_rate = shared.config.clean_rate;
shared.manager.spawn_task(async move {
connection_cleaner(shared2, cleaner_ch.unwrap(), clean_rate).await;
});
}

Pool(shared)
}

Expand Down Expand Up @@ -590,6 +644,7 @@ impl<M: Manager> Pool<M> {
wait_duration: internals.wait_duration,
max_idle_closed: internals.max_idle_closed,
max_lifetime_closed: internals.max_lifetime_closed,
max_idle_lifetime_closed: internals.max_idle_lifetime_closed,
}
}
}
Expand Down Expand Up @@ -739,13 +794,24 @@ async fn clean_connection<M: Manager>(shared: &Weak<SharedPool<M>>) -> bool {
log::debug!("Clean connections");

let mut internals = shared.internals.lock().await;
if internals.num_open == 0 || internals.config.max_lifetime.is_none() {
if internals.config.max_lifetime.is_none() && internals.config.max_idle_lifetime.is_none() {
internals.cleaner_ch.take();
return false;
}
if internals.num_open == 0 {
return true;
}

let expired = Instant::now() - internals.config.max_lifetime.unwrap();
let expired = internals
.config
.max_lifetime
.map(|lifetime| Instant::now() - lifetime);
let mut closing = vec![];
let idle_expired = internals
.config
.max_idle_lifetime
.map(|lifetime| Instant::now() - lifetime);
let mut idle_closing = vec![];

let mut i = 0;
log::debug!(
Expand All @@ -758,18 +824,29 @@ async fn clean_connection<M: Manager>(shared: &Weak<SharedPool<M>>) -> bool {
break;
}

if internals.free_conns[i].created_at < expired {
if expired.is_some() && internals.free_conns[i].created_at < expired.unwrap() {
let c = internals.free_conns.swap_remove(i);
closing.push(c);
continue;
}
if idle_expired.is_some() && internals.free_conns[i].last_used_at < idle_expired.unwrap() {
let c = internals.free_conns.swap_remove(i);
idle_closing.push(c);
continue;
}

i += 1;
}

internals.max_lifetime_closed += closing.len() as u64;
for conn in closing {
conn.close(&mut internals);
}
internals.max_idle_lifetime_closed += idle_closing.len() as u64;
for conn in idle_closing {
conn.close(&mut internals);
}

return true;
}

Expand Down
Loading