Skip to content
This repository has been archived by the owner on Nov 15, 2023. It is now read-only.

Commit

Permalink
client/network/req-resp: Don't emit two events for busy builder
Browse files Browse the repository at this point in the history
When a response builder is busy incoming requests are dropped.
Previously this was reported both via a `ResponseFailure::Busy` and a
`ReponseFailure::Network(InboundFailure::Omisssion)` event.

With this commit the former is removed, leaving only the latter in
place.
  • Loading branch information
mxinden committed Dec 9, 2020
1 parent 467206a commit e2cd8f1
Show file tree
Hide file tree
Showing 2 changed files with 41 additions and 51 deletions.
89 changes: 40 additions & 49 deletions client/network/src/request_responses.rs
Original file line number Diff line number Diff line change
Expand Up @@ -158,26 +158,19 @@ pub struct RequestResponsesBehaviour {
/// Whenever an incoming request arrives, a `Future` is added to this list and will yield the
/// response to send back to the remote.
pending_responses: stream::FuturesUnordered<
Pin<Box<dyn Future<Output = RequestProcessingOutcome> + Send>>
Pin<Box<dyn Future<Output = Option<RequestProcessingOutcome>> + Send>>
>,

/// Whenever an incoming request arrives, the arrival [`Instant`] is recorded here.
pending_responses_arrival_time: LruCache<RequestId, Instant>,
}

/// Generated by the response builder and waiting to be processed.
enum RequestProcessingOutcome {
Response {
request_id: RequestId,
peer: PeerId,
protocol: Cow<'static, str>,
inner_channel: ResponseChannel<Result<Vec<u8>, ()>>,
response: Vec<u8>,
},
Busy {
peer: PeerId,
protocol: Cow<'static, str>,
},
struct RequestProcessingOutcome {
request_id: RequestId,
protocol: Cow<'static, str>,
inner_channel: ResponseChannel<Result<Vec<u8>, ()>>,
response: Vec<u8>,
}

impl RequestResponsesBehaviour {
Expand Down Expand Up @@ -357,33 +350,31 @@ impl NetworkBehaviour for RequestResponsesBehaviour {
> {
'poll_all: loop {
// Poll to see if any response is ready to be sent back.
while let Poll::Ready(Some(result)) = self.pending_responses.poll_next_unpin(cx) {
match result {
RequestProcessingOutcome::Response {
request_id, peer, protocol: protocol_name, inner_channel, response
} => {
if let Some((protocol, _)) = self.protocols.get_mut(&*protocol_name) {
if let Err(_) = protocol.send_response(inner_channel, Ok(response)) {
self.pending_responses_arrival_time.pop(&request_id);
log::debug!(
target: "sub-libp2p",
"Failed to send response for {:?} on protocol {:?} due to a \
timeout or due to the connection to the peer being closed. \
Dropping response",
request_id, protocol_name,
);
}
}
}
RequestProcessingOutcome::Busy { peer, protocol } => {
// Note: Request is removed from self.pending_responses_arrival_time when
// handling [`InboundFailure::ResponseOmission`].
let out = Event::InboundRequest {
peer,
protocol,
result: Err(ResponseFailure::Busy),
};
return Poll::Ready(NetworkBehaviourAction::GenerateEvent(out));
while let Poll::Ready(Some(outcome)) = self.pending_responses.poll_next_unpin(cx) {
let RequestProcessingOutcome {
request_id,
protocol: protocol_name,
inner_channel,
response
} = match outcome {
Some(outcome) => outcome,
// The response builder was too busy and thus the request was dropped. This is
// later on reported as a `InboundFailure::Omission`.
None => continue,
};

if let Some((protocol, _)) = self.protocols.get_mut(&*protocol_name) {
if let Err(_) = protocol.send_response(inner_channel, Ok(response)) {
// Note: In case this happened due to a timeout, the corresponding
// `RequestResponse` behaviour will emit an `InboundFailure::Timeout` event.
self.pending_responses_arrival_time.pop(&request_id);
log::debug!(
target: "sub-libp2p",
"Failed to send response for {:?} on protocol {:?} due to a \
timeout or due to the connection to the peer being closed. \
Dropping response",
request_id, protocol_name,
);
}
}
}
Expand Down Expand Up @@ -442,8 +433,9 @@ impl NetworkBehaviour for RequestResponsesBehaviour {
// Submit the request to the "response builder" passed by the user at
// initialization.
if let Some(resp_builder) = resp_builder {
// If the response builder is too busy, silently drop `tx`.
// This will be reported as a `Busy` error.
// If the response builder is too busy, silently drop `tx`. This
// will be reported by the corresponding `RequestResponse` through
// an `InboundFailure::Omission` event.
let _ = resp_builder.try_send(IncomingRequest {
peer: peer.clone(),
payload: request,
Expand All @@ -454,13 +446,14 @@ impl NetworkBehaviour for RequestResponsesBehaviour {
let protocol = protocol.clone();
self.pending_responses.push(Box::pin(async move {
// The `tx` created above can be dropped if we are not capable of
// processing this request, which is reflected as a "Busy" error.
// processing this request, which is reflected as a
// `InboundFailure::Omission` event.
if let Ok(response) = rx.await {
RequestProcessingOutcome::Response {
request_id, peer, protocol, inner_channel: channel, response
}
Some(RequestProcessingOutcome {
request_id, protocol, inner_channel: channel, response
})
} else {
RequestProcessingOutcome::Busy { peer, protocol }
None
}
}));

Expand Down Expand Up @@ -565,8 +558,6 @@ pub enum RequestFailure {
/// Error when processing a request sent by a remote.
#[derive(Debug, derive_more::Display, derive_more::Error)]
pub enum ResponseFailure {
/// Internal response builder is too busy to process this request.
Busy,
/// Problem on the network.
#[display(fmt = "Problem on the network")]
Network(#[error(ignore)] InboundFailure),
Expand Down
3 changes: 1 addition & 2 deletions client/network/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1383,12 +1383,11 @@ impl<B: BlockT + 'static, H: ExHashT> Future for NetworkWorker<B, H> {
Ok(None) => {},
Err(err) => {
let reason = match err {
ResponseFailure::Busy => "busy",
ResponseFailure::Network(InboundFailure::Timeout) => "timeout",
ResponseFailure::Network(InboundFailure::UnsupportedProtocols) =>
"unsupported",
ResponseFailure::Network(InboundFailure::ResponseOmission) =>
"response-omission",
"busy-omitted",
};

metrics.requests_in_failure_total
Expand Down

0 comments on commit e2cd8f1

Please sign in to comment.