Skip to content

Commit

Permalink
Fix networking task having died when tried to send message to it (#1315)
Browse files Browse the repository at this point in the history
* Fix networking task having died when tried to send message to it

* Docfix
  • Loading branch information
tomaka authored Nov 10, 2023
1 parent fef01da commit fab20b8
Show file tree
Hide file tree
Showing 4 changed files with 49 additions and 26 deletions.
50 changes: 34 additions & 16 deletions lib/src/libp2p/collection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -196,6 +196,11 @@ pub struct Network<TConn, TNow> {
/// that concern this connection before processing any other incoming message.
shutting_down_connection: Option<ConnectionId>,

/// List of connections for which a [`ConnectionToCoordinatorInner::ShutdownFinished`] has
/// been received and a [`CoordinatorToConnectionInner::ShutdownFinishedAck`] has been sent.
/// We can now remove these connections and generate a [`Event::Shutdown`].
shutdown_finished_connections: VecDeque<ConnectionId>,

/// List of all outgoing notification substreams that we have opened. Can be either pending
/// (waiting for the connection task to say whether it has been accepted or not) or fully
/// open.
Expand Down Expand Up @@ -325,6 +330,7 @@ where
Default::default(),
),
shutting_down_connection: None,
shutdown_finished_connections: VecDeque::with_capacity(config.capacity),
outgoing_requests: BTreeSet::new(),
ingoing_requests: hashbrown::HashMap::with_capacity_and_hasher(
4 * config.capacity,
Expand Down Expand Up @@ -984,13 +990,21 @@ where
///
/// This function guarantees that the [`ConnectionId`] always refers to a connection that
/// is still alive, in the sense that [`SingleStreamConnectionTask::inject_coordinator_message`]
/// or [`MultiStreamConnectionTask::inject_coordinator_message`] has never returned `None`.
/// or [`MultiStreamConnectionTask::inject_coordinator_message`] has never returned `None`
/// and that no [`Event::Shutdown`] has been generated for this connection.
pub fn pull_message_to_connection(
&mut self,
) -> Option<(ConnectionId, CoordinatorToConnection)> {
self.messages_to_connections
let message = self
.messages_to_connections
.pop_front()
.map(|(id, inner)| (id, CoordinatorToConnection { inner }))
.map(|(id, inner)| (id, CoordinatorToConnection { inner }))?;

if let CoordinatorToConnectionInner::ShutdownFinishedAck = message.1.inner {
self.shutdown_finished_connections.push_back(message.0);
}

Some(message)
}

/// Injects into the state machine a message generated by
Expand Down Expand Up @@ -1024,6 +1038,22 @@ where
/// [`Network::inject_connection_message`].
pub fn next_event(&mut self) -> Option<Event<TConn>> {
loop {
if let Some(connection_id) = self.shutdown_finished_connections.pop_front() {
let connection = self.connections.remove(&connection_id).unwrap();
let was_established = match &connection.state {
InnerConnectionState::ShuttingDown {
was_established, ..
} => *was_established,
_ => unreachable!(),
};

return Some(Event::Shutdown {
id: connection_id,
was_established,
user_data: connection.user_data,
});
}

// When a connection starts its shutdown, its id is put in `shutting_down_connection`.
// When that happens, we go through the local state and clean up all requests and
// notification substreams that are in progress/open and return the cancellations
Expand Down Expand Up @@ -1220,23 +1250,11 @@ where
}
}
ConnectionToCoordinatorInner::ShutdownFinished => {
let was_established = match &connection.state {
InnerConnectionState::ShuttingDown {
was_established, ..
} => *was_established,
_ => unreachable!(),
};

let user_data = self.connections.remove(&connection_id).unwrap().user_data;
self.messages_to_connections.push_back((
connection_id,
CoordinatorToConnectionInner::ShutdownFinishedAck,
));
Event::Shutdown {
id: connection_id,
was_established,
user_data,
}
continue;
}
ConnectionToCoordinatorInner::HandshakeFinished(peer_id) => {
debug_assert_eq!(
Expand Down
4 changes: 3 additions & 1 deletion lib/src/network/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -872,7 +872,9 @@ where
///
/// This function guarantees that the [`ConnectionId`] always refers to a connection that
/// is still alive, in the sense that [`SingleStreamConnectionTask::inject_coordinator_message`]
/// or [`MultiStreamConnectionTask::inject_coordinator_message`] has never returned `None`.
/// or [`MultiStreamConnectionTask::inject_coordinator_message`] has never returned `None`
/// and that no [`Event::Disconnected`] or [`Event::PreHandshakeDisconnected`] has been
/// generated for this connection.
pub fn pull_message_to_connection(
&mut self,
) -> Option<(ConnectionId, CoordinatorToConnection)> {
Expand Down
17 changes: 9 additions & 8 deletions light-base/src/network_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2053,19 +2053,20 @@ async fn background_task<TPlat: PlatformRef>(mut task: BackgroundTask<TPlat>) {
connection_id,
message,
} => {
// Note that it is critical for the sending to not take too long here, in order to not
// block the process of the network service.
// In particular, if sending the message to the connection is blocked due to sending
// a message on the connection-to-coordinator channel, this will result in a deadlock.
// For this reason, the connection task is always ready to immediately accept a message
// on the coordinator-to-connection channel.
let _send_success = task
// Note that it is critical for the sending to not take too long here, in order to
// not block the process of the network service.
// In particular, if sending the message to the connection is blocked due to
// sending a message on the connection-to-coordinator channel, this will result
// in a deadlock.
// For this reason, the connection task is always ready to immediately accept a
// message on the coordinator-to-connection channel.
let _send_result = task
.active_connections
.get_mut(&connection_id)
.unwrap()
.send(message)
.await;
// debug_assert!(_send_success.is_ok()); // TODO: panics right now /!\
debug_assert!(_send_result.is_ok());
}
}
}
Expand Down
4 changes: 3 additions & 1 deletion light-base/src/network_service/tasks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,9 @@ pub(super) async fn single_stream_connection_task<TPlat: PlatformRef>(
// Because only one message should be sent to the coordinator at a time, and that
// processing the socket might generate a message, we only process the socket if no
// message is currently being sent.
if let (false, Some(mut task)) = (message_sending.is_some(), connection_task.take()) {
if message_sending.is_none() && connection_task.is_some() {
let mut task = connection_task.take().unwrap();

match platform.read_write_access(socket.as_mut()) {
Ok(mut socket_read_write) => {
// The code in this block is a bit cumbersome due to the logging.
Expand Down

0 comments on commit fab20b8

Please sign in to comment.