Skip to content

Commit

Permalink
[Networking] Allow sending messages through Outgoing<M>
Browse files Browse the repository at this point in the history
Allows Outgoing<M> values to be sent directly if they were reciprocal to Incoming.
  • Loading branch information
AhmedSoliman committed Sep 9, 2024
1 parent 464a699 commit f632f65
Showing 1 changed file with 41 additions and 28 deletions.
69 changes: 41 additions & 28 deletions crates/core/src/network/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -127,8 +127,7 @@ impl<M> Incoming<M> {
&self,
response: O,
) -> Result<(), NetworkSendError<O>> {
let (connection, versions, response) = self.respond_prep_inner(response)?;
connection.try_send(response, versions)
self.prepare_response(response).try_send()
}

/// Sends a response on the same connection where we received the request. This will
Expand All @@ -139,27 +138,7 @@ impl<M> Incoming<M> {
&self,
response: O,
) -> Result<(), NetworkSendError<O>> {
let (connection, versions, response) = self.respond_prep_inner(response)?;
connection.send(response, versions).await
}

fn respond_prep_inner<O>(
&self,
response: O,
) -> Result<(Arc<Connection>, HeaderMetadataVersions, Outgoing<O>), NetworkSendError<O>> {
let response = self.prepare_response(response);

let connection = match self.connection.upgrade() {
Some(connection) => connection,
None => {
return Err(NetworkSendError::new(
response,
NetworkError::ConnectionClosed,
));
}
};
let versions = with_metadata(HeaderMetadataVersions::from_metadata);
Ok((connection, versions, response))
self.prepare_response(response).send().await
}
}

Expand All @@ -179,8 +158,7 @@ impl<M: RpcRequest> Incoming<M> {
&self,
response: M::ResponseMessage,
) -> Result<(), NetworkSendError<M::ResponseMessage>> {
let (connection, versions, response) = self.respond_prep_inner(response)?;
connection.try_send(response, versions)
self.prepare_response(response).try_send()
}

/// Sends a response on the same connection where we received the request. This will
Expand All @@ -191,8 +169,7 @@ impl<M: RpcRequest> Incoming<M> {
&self,
response: M::ResponseMessage,
) -> Result<(), NetworkSendError<M::ResponseMessage>> {
let (connection, versions, response) = self.respond_prep_inner(response)?;
connection.send(response, versions).await
self.prepare_response(response).send().await
}
}

Expand Down Expand Up @@ -255,7 +232,9 @@ impl<M> Outgoing<M> {
self.connection.upgrade()
}

pub(crate) fn reset_connection(&mut self) {
/// Detaches this message from the associated connection (if set). This allows this message to
/// be sent on any connection if NetworkSender is used to send this message.
pub fn reset_connection(&mut self) {
self.connection = Weak::new();
}

Expand All @@ -267,6 +246,7 @@ impl<M> Outgoing<M> {
self.body
}

/// If this is a response to a request, what is the message id of the original request?
pub fn in_response_to(&self) -> Option<u64> {
self.in_response_to
}
Expand All @@ -291,3 +271,36 @@ impl<M> Outgoing<M> {
}
}
}

impl<M: Targeted + WireEncode> Outgoing<M> {
/// Sends a response on the same connection where we received the request. This will
/// fail with [`NetworkError::ConnectionClosed`] if the connection is terminated.
///
/// This blocks until there is capacity on the connection stream.
pub async fn send(self) -> Result<(), NetworkSendError<M>> {
let (connection, versions, outgoing) = self.prepare_send()?;
connection.send(outgoing, versions).await
}

/// Sends a response on the same connection where we received the request. This will
/// fail with [`NetworkError::ConnectionClosed`] if the connection is terminated.
///
/// This fails immediately with [`NetworkError::Full`] if connection stream is out of capacity.
pub fn try_send(self) -> Result<(), NetworkSendError<M>> {
let (connection, versions, outgoing) = self.prepare_send()?;
connection.try_send(outgoing, versions)
}

fn prepare_send(
self,
) -> Result<(Arc<Connection>, HeaderMetadataVersions, Self), NetworkSendError<M>> {
let connection = match self.connection.upgrade() {
Some(connection) => connection,
None => {
return Err(NetworkSendError::new(self, NetworkError::ConnectionClosed));
}
};
let versions = with_metadata(HeaderMetadataVersions::from_metadata);
Ok((connection, versions, self))
}
}

0 comments on commit f632f65

Please sign in to comment.