Skip to content

Commit

Permalink
fix(postgres): avoid recursively spawning tasks in `PgListener::drop(…
Browse files Browse the repository at this point in the history
…)` (#1393)

refactor(pool): deprecate `PoolConnection::release()`, provide renamed alts
  • Loading branch information
abonander authored Aug 20, 2021
1 parent c04f83b commit 0e8ffb5
Show file tree
Hide file tree
Showing 2 changed files with 68 additions and 26 deletions.
89 changes: 63 additions & 26 deletions sqlx-core/src/pool/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ use crate::database::Database;
use crate::error::Error;

use super::inner::{DecrementSizeGuard, SharedPool};
use std::future::Future;

/// A connection managed by a [`Pool`][crate::pool::Pool].
///
Expand Down Expand Up @@ -60,43 +61,79 @@ impl<DB: Database> DerefMut for PoolConnection<DB> {

impl<DB: Database> PoolConnection<DB> {
/// Explicitly release a connection from the pool
pub fn release(mut self) -> DB::Connection {
#[deprecated = "renamed to `.detach()` for clarity"]
pub fn release(self) -> DB::Connection {
self.detach()
}

/// Detach this connection from the pool, allowing it to open a replacement.
///
/// Note that if your application uses a single shared pool, this
/// effectively lets the application exceed the `max_connections` setting.
///
/// If you want the pool to treat this connection as permanently checked-out,
/// use [`.leak()`][Self::leak] instead.
pub fn detach(mut self) -> DB::Connection {
self.live
.take()
.expect("PoolConnection double-dropped")
.float(&self.pool)
.detach()
}

/// Detach this connection from the pool, treating it as permanently checked-out.
///
/// This effectively will reduce the maximum capacity of the pool by 1 every time it is used.
///
/// If you don't want to impact the pool's capacity, use [`.detach()`][Self::detach] instead.
pub fn leak(mut self) -> DB::Connection {
self.live.take().expect("PoolConnection double-dropped").raw
}

/// Test the connection to make sure it is still live before returning it to the pool.
///
/// This effectively runs the drop handler eagerly instead of spawning a task to do it.
pub(crate) fn return_to_pool(&mut self) -> impl Future<Output = ()> + Send + 'static {
// we want these to happen synchronously so the drop handler doesn't try to spawn a task anyway
// this also makes the returned future `'static`
let live = self.live.take();
let pool = self.pool.clone();

async move {
let mut floating = if let Some(live) = live {
live.float(&pool)
} else {
return;
};

// test the connection on-release to ensure it is still viable
// if an Executor future/stream is dropped during an `.await` call, the connection
// is likely to be left in an inconsistent state, in which case it should not be
// returned to the pool; also of course, if it was dropped due to an error
// this is simply a band-aid as SQLx-next (0.6) connections should be able
// to recover from cancellations
if let Err(e) = floating.raw.ping().await {
log::warn!(
"error occurred while testing the connection on-release: {}",
e
);

// we now consider the connection to be broken; just drop it to close
// trying to close gracefully might cause something weird to happen
drop(floating);
} else {
// if the connection is still viable, release it to the pool
pool.release(floating);
}
}
}
}

/// Returns the connection to the [`Pool`][crate::pool::Pool] it was checked-out from.
impl<DB: Database> Drop for PoolConnection<DB> {
fn drop(&mut self) {
if let Some(live) = self.live.take() {
let pool = self.pool.clone();
sqlx_rt::spawn(async move {
let mut floating = live.float(&pool);

// test the connection on-release to ensure it is still viable
// if an Executor future/stream is dropped during an `.await` call, the connection
// is likely to be left in an inconsistent state, in which case it should not be
// returned to the pool; also of course, if it was dropped due to an error
// this is simply a band-aid as SQLx-next (0.6) connections should be able
// to recover from cancellations
if let Err(e) = floating.raw.ping().await {
log::warn!(
"error occurred while testing the connection on-release: {}",
e
);

// we now consider the connection to be broken; just drop it to close
// trying to close gracefully might cause something weird to happen
drop(floating);
} else {
// if the connection is still viable, release it to th epool
pool.release(floating);
}
});
if self.live.is_some() {
sqlx_rt::spawn(self.return_to_pool());
}
}
}
Expand Down
5 changes: 5 additions & 0 deletions sqlx-core/src/postgres/listener.rs
Original file line number Diff line number Diff line change
Expand Up @@ -263,6 +263,11 @@ impl Drop for PgListener {
// Unregister any listeners before returning the connection to the pool.
sqlx_rt::spawn(async move {
let _ = conn.execute("UNLISTEN *").await;

// inline the drop handler from `PoolConnection` so it doesn't try to spawn another task
// otherwise, it may trigger a panic if this task is dropped because the runtime is going away:
// https://github.com/launchbadge/sqlx/issues/1389
conn.return_to_pool().await;
});
}
}
Expand Down

0 comments on commit 0e8ffb5

Please sign in to comment.