-
Notifications
You must be signed in to change notification settings - Fork 170
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
RUST-565 Properly report connection closures due to error #258
Changes from all commits
343acd8
b25146d
1978770
cf7e3b5
cf019b9
6b5958c
9b9405c
e369b27
68a4813
d5edff1
b699e15
214b3ea
676a97e
c0558af
8c30198
bec0200
7efd55d
af7f239
3f84524
d448df2
04a6b9c
2234827
4a58c32
d469eca
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -33,6 +33,7 @@ use crate::{ | |
ConnectionCheckoutFailedEvent, | ||
ConnectionCheckoutFailedReason, | ||
ConnectionCheckoutStartedEvent, | ||
ConnectionClosedEvent, | ||
ConnectionClosedReason, | ||
PoolClearedEvent, | ||
PoolClosedEvent, | ||
|
@@ -179,15 +180,6 @@ impl ConnectionPool { | |
Ok(conn) | ||
} | ||
|
||
/// Checks a connection back into the pool and notifies the wait queue that a connection is | ||
/// ready. If the connection is stale, it will be closed instead of being added to the set of | ||
/// available connections. The time that the connection is checked in will be marked to | ||
/// facilitate detecting if the connection becomes idle. | ||
#[cfg(test)] | ||
pub(crate) async fn check_in(&self, conn: Connection) { | ||
self.inner.check_in(conn).await; | ||
} | ||
|
||
/// Increments the generation of the pool. Rather than eagerly removing stale connections from | ||
/// the pool, they are left for the background thread to clean up. | ||
pub(crate) fn clear(&self) { | ||
|
@@ -207,6 +199,10 @@ impl ConnectionPoolInner { | |
} | ||
} | ||
|
||
/// Checks a connection back into the pool and notifies the wait queue that a connection is | ||
/// ready. If the connection is stale, it will be closed instead of being added to the set of | ||
/// available connections. The time that the connection is checked in will be marked to | ||
/// facilitate detecting if the connection becomes idle. | ||
async fn check_in(&self, mut conn: Connection) { | ||
self.emit_event(|handler| { | ||
handler.handle_connection_checked_in_event(conn.checked_in_event()); | ||
|
@@ -216,8 +212,9 @@ impl ConnectionPoolInner { | |
|
||
let mut available_connections = self.available_connections.lock().await; | ||
|
||
// Close the connection if it's stale. | ||
if conn.is_stale(self.generation.load(Ordering::SeqCst)) { | ||
if conn.has_errored() { | ||
self.close_connection(conn, ConnectionClosedReason::Error); | ||
} else if conn.is_stale(self.generation.load(Ordering::SeqCst)) { | ||
self.close_connection(conn, ConnectionClosedReason::Stale); | ||
} else if conn.is_executing() { | ||
self.close_connection(conn, ConnectionClosedReason::Dropped) | ||
|
@@ -334,6 +331,9 @@ impl ConnectionPoolInner { | |
&self, | ||
pending_connection: PendingConnection, | ||
) -> Result<Connection> { | ||
let address = pending_connection.address.clone(); | ||
let id = pending_connection.id; | ||
|
||
let establish_result = self | ||
.establisher | ||
.establish_connection(pending_connection) | ||
|
@@ -348,6 +348,14 @@ impl ConnectionPoolInner { | |
// Establishing a pending connection failed, so that must be reflected in to total | ||
// connection count. | ||
self.total_connection_count.fetch_sub(1, Ordering::SeqCst); | ||
self.emit_event(|handler| { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. According to CMAP, we are supposed to be emitting |
||
let event = ConnectionClosedEvent { | ||
address, | ||
reason: ConnectionClosedReason::Error, | ||
connection_id: id, | ||
}; | ||
handler.handle_connection_closed_event(event); | ||
}); | ||
} | ||
Ok(ref connection) => { | ||
self.emit_event(|handler| { | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,25 +1,42 @@ | ||
use std::sync::{Arc, RwLock}; | ||
use std::{ | ||
sync::{Arc, RwLock}, | ||
time::Duration, | ||
}; | ||
|
||
use serde::{de::Unexpected, Deserialize, Deserializer}; | ||
|
||
use crate::{event::cmap::*, options::StreamAddress}; | ||
use crate::{event::cmap::*, options::StreamAddress, RUNTIME}; | ||
use tokio::sync::broadcast::{RecvError, SendError}; | ||
|
||
#[derive(Clone, Debug)] | ||
pub struct EventHandler { | ||
pub events: Arc<RwLock<Vec<Event>>>, | ||
channel_sender: tokio::sync::broadcast::Sender<Event>, | ||
} | ||
|
||
impl EventHandler { | ||
pub fn new() -> Self { | ||
let (channel_sender, _) = tokio::sync::broadcast::channel(500); | ||
Self { | ||
events: Default::default(), | ||
channel_sender, | ||
} | ||
} | ||
|
||
fn handle<E: Into<Event>>(&self, event: E) { | ||
let event = event.into(); | ||
// this only errors if no receivers are listening which isn't a concern here. | ||
let _: std::result::Result<usize, SendError<Event>> = | ||
self.channel_sender.send(event.clone()); | ||
self.events.write().unwrap().push(event); | ||
} | ||
|
||
pub fn subscribe(&self) -> EventSubscriber { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I added a mechanism by which threads could subscribe to the event handler and wait for a particular event. This seemed like a more async-y approach than our current sleep for a little + check again. I updated the cmap spec tests to use it too, hopefully reducing those test failures we get every now and then due to not waiting long enough. I filed RUST-572 to cover the work for introducing this to the test suite as a whole. |
||
EventSubscriber { | ||
_handler: self, | ||
receiver: self.channel_sender.subscribe(), | ||
} | ||
} | ||
} | ||
|
||
impl CmapEventHandler for EventHandler { | ||
|
@@ -64,8 +81,39 @@ impl CmapEventHandler for EventHandler { | |
} | ||
} | ||
|
||
pub struct EventSubscriber<'a> { | ||
/// A reference to the handler this subscriber is receiving events from. | ||
/// Stored here to ensure this subscriber cannot outlive the handler that is generating its | ||
/// events. | ||
_handler: &'a EventHandler, | ||
saghm marked this conversation as resolved.
Show resolved
Hide resolved
|
||
receiver: tokio::sync::broadcast::Receiver<Event>, | ||
} | ||
|
||
impl EventSubscriber<'_> { | ||
pub async fn wait_for_event<F>(&mut self, timeout: Duration, filter: F) -> Option<Event> | ||
where | ||
F: Fn(&Event) -> bool, | ||
{ | ||
RUNTIME | ||
.timeout(timeout, async { | ||
loop { | ||
match self.receiver.recv().await { | ||
Ok(event) if filter(&event) => return event.into(), | ||
// the channel hit capacity and the channnel will skip a few to catch up. | ||
Err(RecvError::Lagged(_)) => continue, | ||
Err(_) => return None, | ||
_ => continue, | ||
} | ||
} | ||
}) | ||
.await | ||
.ok() | ||
.flatten() | ||
} | ||
} | ||
|
||
#[allow(clippy::large_enum_variant)] | ||
#[derive(Debug, Deserialize, From, PartialEq)] | ||
#[derive(Clone, Debug, Deserialize, From, PartialEq)] | ||
#[serde(tag = "type")] | ||
pub enum Event { | ||
#[serde(deserialize_with = "self::deserialize_pool_created")] | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We weren't checking the initial handshake response to see if it returned an error, mistakenly returning
Ok(connection)
here even if the handshake failed.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good catch!