From fab20b833906d374a20a536715536b2234af31f2 Mon Sep 17 00:00:00 2001 From: Pierre Krieger Date: Fri, 10 Nov 2023 12:00:54 +0100 Subject: [PATCH] Fix networking task having died when tried to send message to it (#1315) * Fix networking task having died when tried to send message to it * Docfix --- lib/src/libp2p/collection.rs | 50 +++++++++++++++++-------- lib/src/network/service.rs | 4 +- light-base/src/network_service.rs | 17 +++++---- light-base/src/network_service/tasks.rs | 4 +- 4 files changed, 49 insertions(+), 26 deletions(-) diff --git a/lib/src/libp2p/collection.rs b/lib/src/libp2p/collection.rs index edd030f728..49e7f1e0b0 100644 --- a/lib/src/libp2p/collection.rs +++ b/lib/src/libp2p/collection.rs @@ -196,6 +196,11 @@ pub struct Network { /// that concern this connection before processing any other incoming message. shutting_down_connection: Option, + /// List of connections for which a [`ConnectionToCoordinatorInner::ShutdownFinished`] has + /// been received and a [`CoordinatorToConnectionInner::ShutdownFinishedAck`] has been sent. + /// We can now remove these connections and generate a [`Event::Shutdown`]. + shutdown_finished_connections: VecDeque, + /// List of all outgoing notification substreams that we have opened. Can be either pending /// (waiting for the connection task to say whether it has been accepted or not) or fully /// open. @@ -325,6 +330,7 @@ where Default::default(), ), shutting_down_connection: None, + shutdown_finished_connections: VecDeque::with_capacity(config.capacity), outgoing_requests: BTreeSet::new(), ingoing_requests: hashbrown::HashMap::with_capacity_and_hasher( 4 * config.capacity, @@ -984,13 +990,21 @@ where /// /// This function guarantees that the [`ConnectionId`] always refers to a connection that /// is still alive, in the sense that [`SingleStreamConnectionTask::inject_coordinator_message`] - /// or [`MultiStreamConnectionTask::inject_coordinator_message`] has never returned `None`. + /// or [`MultiStreamConnectionTask::inject_coordinator_message`] has never returned `None` + /// and that no [`Event::Shutdown`] has been generated for this connection. pub fn pull_message_to_connection( &mut self, ) -> Option<(ConnectionId, CoordinatorToConnection)> { - self.messages_to_connections + let message = self + .messages_to_connections .pop_front() - .map(|(id, inner)| (id, CoordinatorToConnection { inner })) + .map(|(id, inner)| (id, CoordinatorToConnection { inner }))?; + + if let CoordinatorToConnectionInner::ShutdownFinishedAck = message.1.inner { + self.shutdown_finished_connections.push_back(message.0); + } + + Some(message) } /// Injects into the state machine a message generated by @@ -1024,6 +1038,22 @@ where /// [`Network::inject_connection_message`]. pub fn next_event(&mut self) -> Option> { loop { + if let Some(connection_id) = self.shutdown_finished_connections.pop_front() { + let connection = self.connections.remove(&connection_id).unwrap(); + let was_established = match &connection.state { + InnerConnectionState::ShuttingDown { + was_established, .. + } => *was_established, + _ => unreachable!(), + }; + + return Some(Event::Shutdown { + id: connection_id, + was_established, + user_data: connection.user_data, + }); + } + // When a connection starts its shutdown, its id is put in `shutting_down_connection`. // When that happens, we go through the local state and clean up all requests and // notification substreams that are in progress/open and return the cancellations @@ -1220,23 +1250,11 @@ where } } ConnectionToCoordinatorInner::ShutdownFinished => { - let was_established = match &connection.state { - InnerConnectionState::ShuttingDown { - was_established, .. - } => *was_established, - _ => unreachable!(), - }; - - let user_data = self.connections.remove(&connection_id).unwrap().user_data; self.messages_to_connections.push_back(( connection_id, CoordinatorToConnectionInner::ShutdownFinishedAck, )); - Event::Shutdown { - id: connection_id, - was_established, - user_data, - } + continue; } ConnectionToCoordinatorInner::HandshakeFinished(peer_id) => { debug_assert_eq!( diff --git a/lib/src/network/service.rs b/lib/src/network/service.rs index cd67995bf0..cf13ad9cb7 100644 --- a/lib/src/network/service.rs +++ b/lib/src/network/service.rs @@ -872,7 +872,9 @@ where /// /// This function guarantees that the [`ConnectionId`] always refers to a connection that /// is still alive, in the sense that [`SingleStreamConnectionTask::inject_coordinator_message`] - /// or [`MultiStreamConnectionTask::inject_coordinator_message`] has never returned `None`. + /// or [`MultiStreamConnectionTask::inject_coordinator_message`] has never returned `None` + /// and that no [`Event::Disconnected`] or [`Event::PreHandshakeDisconnected`] has been + /// generated for this connection. pub fn pull_message_to_connection( &mut self, ) -> Option<(ConnectionId, CoordinatorToConnection)> { diff --git a/light-base/src/network_service.rs b/light-base/src/network_service.rs index ee4b3c4b07..7ddb6fc007 100644 --- a/light-base/src/network_service.rs +++ b/light-base/src/network_service.rs @@ -2053,19 +2053,20 @@ async fn background_task(mut task: BackgroundTask) { connection_id, message, } => { - // Note that it is critical for the sending to not take too long here, in order to not - // block the process of the network service. - // In particular, if sending the message to the connection is blocked due to sending - // a message on the connection-to-coordinator channel, this will result in a deadlock. - // For this reason, the connection task is always ready to immediately accept a message - // on the coordinator-to-connection channel. - let _send_success = task + // Note that it is critical for the sending to not take too long here, in order to + // not block the process of the network service. + // In particular, if sending the message to the connection is blocked due to + // sending a message on the connection-to-coordinator channel, this will result + // in a deadlock. + // For this reason, the connection task is always ready to immediately accept a + // message on the coordinator-to-connection channel. + let _send_result = task .active_connections .get_mut(&connection_id) .unwrap() .send(message) .await; - // debug_assert!(_send_success.is_ok()); // TODO: panics right now /!\ + debug_assert!(_send_result.is_ok()); } } } diff --git a/light-base/src/network_service/tasks.rs b/light-base/src/network_service/tasks.rs index f07f99caec..87a178c550 100644 --- a/light-base/src/network_service/tasks.rs +++ b/light-base/src/network_service/tasks.rs @@ -51,7 +51,9 @@ pub(super) async fn single_stream_connection_task( // Because only one message should be sent to the coordinator at a time, and that // processing the socket might generate a message, we only process the socket if no // message is currently being sent. - if let (false, Some(mut task)) = (message_sending.is_some(), connection_task.take()) { + if message_sending.is_none() && connection_task.is_some() { + let mut task = connection_task.take().unwrap(); + match platform.read_write_access(socket.as_mut()) { Ok(mut socket_read_write) => { // The code in this block is a bit cumbersome due to the logging.