Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

RUST-565 Properly report connection closures due to error #258

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
343acd8
properly report Conncection closures due to errors
patrickfreed Sep 30, 2020
b25146d
fix failing tests
patrickfreed Sep 30, 2020
1978770
use subscriber in cmap spec tests
patrickfreed Oct 1, 2020
cf7e3b5
use large timeout when waiting for events
patrickfreed Oct 1, 2020
cf019b9
up broadcast channel capacity, handle lag
patrickfreed Oct 1, 2020
6b5958c
update spec tests to use longer timeout, wait for events in ops
patrickfreed Oct 1, 2020
9b9405c
remove ClientPool::check_in
patrickfreed Oct 1, 2020
e369b27
fail monitor checks due to command errors
patrickfreed Oct 1, 2020
68a4813
accept impl Into in fail_command
patrickfreed Oct 1, 2020
d5edff1
monitoring threads wait for check request
patrickfreed Oct 1, 2020
b699e15
limit max pool size in integration test
patrickfreed Oct 1, 2020
214b3ea
rename test
patrickfreed Oct 1, 2020
676a97e
fix heartbeat frequency test
patrickfreed Oct 1, 2020
c0558af
acquire lock in tests
patrickfreed Oct 1, 2020
8c30198
remove redundant lock
patrickfreed Oct 2, 2020
bec0200
switch message manager to a subscription model
patrickfreed Oct 2, 2020
7efd55d
acquire lock at beginning of crudv1 tests
patrickfreed Oct 2, 2020
af7f239
remove heartbeat frequency test
patrickfreed Oct 2, 2020
3f84524
fix more test lock acquisition
patrickfreed Oct 2, 2020
d448df2
increase times on failpoints
patrickfreed Oct 2, 2020
04a6b9c
fix clippy
patrickfreed Oct 6, 2020
2234827
is_errored -> has_errored
patrickfreed Oct 19, 2020
4a58c32
add comment to EventSubscriber
patrickfreed Oct 19, 2020
d469eca
fix rustfmt
patrickfreed Oct 19, 2020
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 5 additions & 5 deletions src/client/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -269,18 +269,18 @@ impl Client {
return Ok(server);
}

let mut topology_change_subscriber =
self.inner.topology.subscribe_to_topology_changes().await;
self.inner.topology.request_topology_check();

let time_passed = start_time.to(PreciseTime::now());
let time_remaining = std::cmp::max(time::Duration::zero(), timeout - time_passed);

let message_received = self
.inner
.topology
.wait_for_topology_change(time_remaining.to_std()?)
let change_occurred = topology_change_subscriber
.wait_for_message(time_remaining.to_std()?)
.await;

if !message_received {
if !change_occurred {
return Err(ErrorKind::ServerSelectionError {
message: self
.inner
Expand Down
5 changes: 2 additions & 3 deletions src/client/session/test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ use crate::{
error::Result,
options::{Acknowledgment, FindOptions, InsertOneOptions, ReadPreference, WriteConcern},
test::{EventClient, TestClient, CLIENT_OPTIONS, LOCK},
Client,
Collection,
RUNTIME,
};
Expand Down Expand Up @@ -240,7 +239,7 @@ async fn cluster_time_in_commands() {

async fn cluster_time_test<F, G, R>(command_name: &str, operation: F)
where
F: Fn(Client) -> G,
F: Fn(EventClient) -> G,
G: Future<Output = Result<R>>,
{
let mut options = CLIENT_OPTIONS.clone();
Expand Down Expand Up @@ -327,7 +326,7 @@ async fn session_usage() {

async fn session_usage_test<F, G>(command_name: &str, operation: F)
where
F: Fn(Client) -> G,
F: Fn(EventClient) -> G,
G: Future<Output = ()>,
{
let client = EventClient::new().await;
Expand Down
30 changes: 26 additions & 4 deletions src/cmap/conn/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,11 @@ pub(crate) struct Connection {
/// been read.
command_executing: bool,

/// Whether or not this connection has experienced a network error while reading or writing.
/// Once the connection has received an error, it should not be used again or checked back
/// into a pool.
error: bool,

stream: AsyncStream,

#[derivative(Debug = "ignore")]
Expand Down Expand Up @@ -95,6 +100,7 @@ impl Connection {
address,
handler: options.and_then(|options| options.event_handler),
stream_description: None,
error: false,
};

Ok(conn)
Expand Down Expand Up @@ -187,6 +193,11 @@ impl Connection {
self.command_executing
}

/// Checks if the connection experienced a network error and should be closed.
pub(super) fn has_errored(&self) -> bool {
self.error
}

/// Helper to create a `ConnectionCheckedOutEvent` for the connection.
pub(super) fn checked_out_event(&self) -> ConnectionCheckedOutEvent {
ConnectionCheckedOutEvent {
Expand Down Expand Up @@ -233,12 +244,15 @@ impl Connection {
let message = Message::with_command(command, request_id.into());

self.command_executing = true;
message.write_to(&mut self.stream).await?;
let write_result = message.write_to(&mut self.stream).await;
self.error = write_result.is_err();
write_result?;

let response_message = Message::read_from(&mut self.stream).await?;
let response_message_result = Message::read_from(&mut self.stream).await;
self.command_executing = false;
self.error = response_message_result.is_err();

CommandResponse::new(self.address.clone(), response_message)
CommandResponse::new(self.address.clone(), response_message_result?)
}

/// Gets the connection's StreamDescription.
Expand Down Expand Up @@ -276,6 +290,7 @@ impl Connection {
handler: self.handler.take(),
stream_description: self.stream_description.take(),
command_executing: self.command_executing,
error: self.error,
}
}
}
Expand Down Expand Up @@ -327,6 +342,7 @@ struct DroppedConnectionState {
handler: Option<Arc<dyn CmapEventHandler>>,
stream_description: Option<StreamDescription>,
command_executing: bool,
error: bool,
}

impl Drop for DroppedConnectionState {
Expand All @@ -335,10 +351,15 @@ impl Drop for DroppedConnectionState {
/// again, we just close the connection directly.
fn drop(&mut self) {
if let Some(ref handler) = self.handler {
let reason = if self.error {
ConnectionClosedReason::Error
} else {
ConnectionClosedReason::PoolClosed
};
handler.handle_connection_closed_event(ConnectionClosedEvent {
address: self.address.clone(),
connection_id: self.id,
reason: ConnectionClosedReason::PoolClosed,
reason,
});
}
}
Expand All @@ -356,6 +377,7 @@ impl From<DroppedConnectionState> for Connection {
stream_description: state.stream_description.take(),
ready_and_available_time: None,
pool: None,
error: state.error,
}
}
}
Expand Down
1 change: 1 addition & 0 deletions src/cmap/establish/handshake/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -201,6 +201,7 @@ impl Handshaker {
let response = conn.send_command(command, None).await?;
let end_time = PreciseTime::now();

response.validate()?;
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We weren't checking the initial handshake response to see if it returned an error, mistakenly returning Ok(connection) here even if the handshake failed.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch!

let mut command_response: IsMasterCommandResponse = response.body()?;

// Record the client's message and the server's response from speculative authentication if
Expand Down
30 changes: 19 additions & 11 deletions src/cmap/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ use crate::{
ConnectionCheckoutFailedEvent,
ConnectionCheckoutFailedReason,
ConnectionCheckoutStartedEvent,
ConnectionClosedEvent,
ConnectionClosedReason,
PoolClearedEvent,
PoolClosedEvent,
Expand Down Expand Up @@ -179,15 +180,6 @@ impl ConnectionPool {
Ok(conn)
}

/// Checks a connection back into the pool and notifies the wait queue that a connection is
/// ready. If the connection is stale, it will be closed instead of being added to the set of
/// available connections. The time that the connection is checked in will be marked to
/// facilitate detecting if the connection becomes idle.
#[cfg(test)]
pub(crate) async fn check_in(&self, conn: Connection) {
self.inner.check_in(conn).await;
}

/// Increments the generation of the pool. Rather than eagerly removing stale connections from
/// the pool, they are left for the background thread to clean up.
pub(crate) fn clear(&self) {
Expand All @@ -207,6 +199,10 @@ impl ConnectionPoolInner {
}
}

/// Checks a connection back into the pool and notifies the wait queue that a connection is
/// ready. If the connection is stale, it will be closed instead of being added to the set of
/// available connections. The time that the connection is checked in will be marked to
/// facilitate detecting if the connection becomes idle.
async fn check_in(&self, mut conn: Connection) {
self.emit_event(|handler| {
handler.handle_connection_checked_in_event(conn.checked_in_event());
Expand All @@ -216,8 +212,9 @@ impl ConnectionPoolInner {

let mut available_connections = self.available_connections.lock().await;

// Close the connection if it's stale.
if conn.is_stale(self.generation.load(Ordering::SeqCst)) {
if conn.has_errored() {
self.close_connection(conn, ConnectionClosedReason::Error);
} else if conn.is_stale(self.generation.load(Ordering::SeqCst)) {
self.close_connection(conn, ConnectionClosedReason::Stale);
} else if conn.is_executing() {
self.close_connection(conn, ConnectionClosedReason::Dropped)
Expand Down Expand Up @@ -334,6 +331,9 @@ impl ConnectionPoolInner {
&self,
pending_connection: PendingConnection,
) -> Result<Connection> {
let address = pending_connection.address.clone();
let id = pending_connection.id;

let establish_result = self
.establisher
.establish_connection(pending_connection)
Expand All @@ -348,6 +348,14 @@ impl ConnectionPoolInner {
// Establishing a pending connection failed, so that must be reflected in to total
// connection count.
self.total_connection_count.fetch_sub(1, Ordering::SeqCst);
self.emit_event(|handler| {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

According to CMAP, we are supposed to be emitting ConnectionClosed events here. This makes sense because we emitted a ConnectionCreatedEvent when this pending connection was created.

let event = ConnectionClosedEvent {
address,
reason: ConnectionClosedReason::Error,
connection_id: id,
};
handler.handle_connection_closed_event(event);
});
}
Ok(ref connection) => {
self.emit_event(|handler| {
Expand Down
54 changes: 51 additions & 3 deletions src/cmap/test/event.rs
Original file line number Diff line number Diff line change
@@ -1,25 +1,42 @@
use std::sync::{Arc, RwLock};
use std::{
sync::{Arc, RwLock},
time::Duration,
};

use serde::{de::Unexpected, Deserialize, Deserializer};

use crate::{event::cmap::*, options::StreamAddress};
use crate::{event::cmap::*, options::StreamAddress, RUNTIME};
use tokio::sync::broadcast::{RecvError, SendError};

#[derive(Clone, Debug)]
pub struct EventHandler {
pub events: Arc<RwLock<Vec<Event>>>,
channel_sender: tokio::sync::broadcast::Sender<Event>,
}

impl EventHandler {
pub fn new() -> Self {
let (channel_sender, _) = tokio::sync::broadcast::channel(500);
Self {
events: Default::default(),
channel_sender,
}
}

fn handle<E: Into<Event>>(&self, event: E) {
let event = event.into();
// this only errors if no receivers are listening which isn't a concern here.
let _: std::result::Result<usize, SendError<Event>> =
self.channel_sender.send(event.clone());
self.events.write().unwrap().push(event);
}

pub fn subscribe(&self) -> EventSubscriber {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added a mechanism by which threads could subscribe to the event handler and wait for a particular event. This seemed like a more async-y approach than our current sleep for a little + check again. I updated the cmap spec tests to use it too, hopefully reducing those test failures we get every now and then due to not waiting long enough. I filed RUST-572 to cover the work for introducing this to the test suite as a whole.

EventSubscriber {
_handler: self,
receiver: self.channel_sender.subscribe(),
}
}
}

impl CmapEventHandler for EventHandler {
Expand Down Expand Up @@ -64,8 +81,39 @@ impl CmapEventHandler for EventHandler {
}
}

pub struct EventSubscriber<'a> {
/// A reference to the handler this subscriber is receiving events from.
/// Stored here to ensure this subscriber cannot outlive the handler that is generating its
/// events.
_handler: &'a EventHandler,
saghm marked this conversation as resolved.
Show resolved Hide resolved
receiver: tokio::sync::broadcast::Receiver<Event>,
}

impl EventSubscriber<'_> {
pub async fn wait_for_event<F>(&mut self, timeout: Duration, filter: F) -> Option<Event>
where
F: Fn(&Event) -> bool,
{
RUNTIME
.timeout(timeout, async {
loop {
match self.receiver.recv().await {
Ok(event) if filter(&event) => return event.into(),
// the channel hit capacity and the channnel will skip a few to catch up.
Err(RecvError::Lagged(_)) => continue,
Err(_) => return None,
_ => continue,
}
}
})
.await
.ok()
.flatten()
}
}

#[allow(clippy::large_enum_variant)]
#[derive(Debug, Deserialize, From, PartialEq)]
#[derive(Clone, Debug, Deserialize, From, PartialEq)]
#[serde(tag = "type")]
pub enum Event {
#[serde(deserialize_with = "self::deserialize_pool_created")]
Expand Down
Loading