diff --git a/crates/core/src/network/types.rs b/crates/core/src/network/types.rs index 990ad11177..3386200715 100644 --- a/crates/core/src/network/types.rs +++ b/crates/core/src/network/types.rs @@ -127,8 +127,7 @@ impl Incoming { &self, response: O, ) -> Result<(), NetworkSendError> { - let (connection, versions, response) = self.respond_prep_inner(response)?; - connection.try_send(response, versions) + self.prepare_response(response).try_send() } /// Sends a response on the same connection where we received the request. This will @@ -139,27 +138,7 @@ impl Incoming { &self, response: O, ) -> Result<(), NetworkSendError> { - let (connection, versions, response) = self.respond_prep_inner(response)?; - connection.send(response, versions).await - } - - fn respond_prep_inner( - &self, - response: O, - ) -> Result<(Arc, HeaderMetadataVersions, Outgoing), NetworkSendError> { - let response = self.prepare_response(response); - - let connection = match self.connection.upgrade() { - Some(connection) => connection, - None => { - return Err(NetworkSendError::new( - response, - NetworkError::ConnectionClosed, - )); - } - }; - let versions = with_metadata(HeaderMetadataVersions::from_metadata); - Ok((connection, versions, response)) + self.prepare_response(response).send().await } } @@ -179,8 +158,7 @@ impl Incoming { &self, response: M::ResponseMessage, ) -> Result<(), NetworkSendError> { - let (connection, versions, response) = self.respond_prep_inner(response)?; - connection.try_send(response, versions) + self.prepare_response(response).try_send() } /// Sends a response on the same connection where we received the request. This will @@ -191,8 +169,7 @@ impl Incoming { &self, response: M::ResponseMessage, ) -> Result<(), NetworkSendError> { - let (connection, versions, response) = self.respond_prep_inner(response)?; - connection.send(response, versions).await + self.prepare_response(response).send().await } } @@ -255,7 +232,9 @@ impl Outgoing { self.connection.upgrade() } - pub(crate) fn reset_connection(&mut self) { + /// Detaches this message from the associated connection (if set). This allows this message to + /// be sent on any connection if NetworkSender is used to send this message. + pub fn reset_connection(&mut self) { self.connection = Weak::new(); } @@ -267,6 +246,7 @@ impl Outgoing { self.body } + /// If this is a response to a request, what is the message id of the original request? pub fn in_response_to(&self) -> Option { self.in_response_to } @@ -291,3 +271,36 @@ impl Outgoing { } } } + +impl Outgoing { + /// Sends a response on the same connection where we received the request. This will + /// fail with [`NetworkError::ConnectionClosed`] if the connection is terminated. + /// + /// This blocks until there is capacity on the connection stream. + pub async fn send(self) -> Result<(), NetworkSendError> { + let (connection, versions, outgoing) = self.prepare_send()?; + connection.send(outgoing, versions).await + } + + /// Sends a response on the same connection where we received the request. This will + /// fail with [`NetworkError::ConnectionClosed`] if the connection is terminated. + /// + /// This fails immediately with [`NetworkError::Full`] if connection stream is out of capacity. + pub fn try_send(self) -> Result<(), NetworkSendError> { + let (connection, versions, outgoing) = self.prepare_send()?; + connection.try_send(outgoing, versions) + } + + fn prepare_send( + self, + ) -> Result<(Arc, HeaderMetadataVersions, Self), NetworkSendError> { + let connection = match self.connection.upgrade() { + Some(connection) => connection, + None => { + return Err(NetworkSendError::new(self, NetworkError::ConnectionClosed)); + } + }; + let versions = with_metadata(HeaderMetadataVersions::from_metadata); + Ok((connection, versions, self)) + } +}