Skip to content

Commit

Permalink
Don't close connection in OneShotHandler
Browse files Browse the repository at this point in the history
  • Loading branch information
thomaseizinger committed Oct 24, 2023
1 parent b709a40 commit c40cafe
Show file tree
Hide file tree
Showing 3 changed files with 19 additions and 21 deletions.
11 changes: 8 additions & 3 deletions protocols/floodsub/src/layer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,8 @@ use libp2p_identity::PeerId;
use libp2p_swarm::behaviour::{ConnectionClosed, ConnectionEstablished, FromSwarm};
use libp2p_swarm::{
dial_opts::DialOpts, ConnectionDenied, ConnectionId, NetworkBehaviour, NotifyHandler,
OneShotHandler, PollParameters, THandler, THandlerInEvent, THandlerOutEvent, ToSwarm,
OneShotHandler, PollParameters, StreamUpgradeError, THandler, THandlerInEvent,

Check failure on line 34 in protocols/floodsub/src/layer.rs

View workflow job for this annotation

GitHub Actions / Compile on wasm32-unknown-unknown

unused import: `StreamUpgradeError`

Check failure on line 34 in protocols/floodsub/src/layer.rs

View workflow job for this annotation

GitHub Actions / Compile on wasm32-unknown-emscripten

unused import: `StreamUpgradeError`

Check failure on line 34 in protocols/floodsub/src/layer.rs

View workflow job for this annotation

GitHub Actions / Compile on wasm32-wasi

unused import: `StreamUpgradeError`

Check failure on line 34 in protocols/floodsub/src/layer.rs

View workflow job for this annotation

GitHub Actions / Check rustdoc intra-doc links

unused import: `StreamUpgradeError`

Check failure on line 34 in protocols/floodsub/src/layer.rs

View workflow job for this annotation

GitHub Actions / clippy (nightly-2023-09-10)

unused import: `StreamUpgradeError`
THandlerOutEvent, ToSwarm,
};
use log::warn;
use smallvec::SmallVec;
Expand Down Expand Up @@ -359,8 +360,12 @@ impl NetworkBehaviour for Floodsub {
) {
// We ignore successful sends or timeouts.
let event = match event {
InnerMessage::Rx(event) => event,
InnerMessage::Sent => return,
Ok(InnerMessage::Rx(event)) => event,
Ok(InnerMessage::Sent) => return,
Err(e) => {
log::debug!("Failed to send floodsub message: {e}");
return;
}
};

// Update connected peers topics
Expand Down
3 changes: 3 additions & 0 deletions swarm/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,9 @@
See [PR 4225](https://github.com/libp2p/rust-libp2p/pull/4225).
- Remove deprecated `keep_alive_timeout` in `OneShotHandlerConfig`.
See [PR 4677](https://github.com/libp2p/rust-libp2p/pull/4677).
- Don't close entire connection upon `DialFailure`s within `OneShotHandler`.
Instead, the error is reported as `Err(e)` via `ConnectionHandler::ToBehaviour`.
See [PR XXXX](https://github.com/libp2p/rust-libp2p/pull/XXXX).

## 0.43.6

Expand Down
26 changes: 8 additions & 18 deletions swarm/src/handler/one_shot.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,10 @@

use crate::handler::{
ConnectionEvent, ConnectionHandler, ConnectionHandlerEvent, DialUpgradeError,
FullyNegotiatedInbound, FullyNegotiatedOutbound, KeepAlive, StreamUpgradeError,
SubstreamProtocol,
FullyNegotiatedInbound, FullyNegotiatedOutbound, KeepAlive, SubstreamProtocol,
};
use crate::upgrade::{InboundUpgradeSend, OutboundUpgradeSend};
use crate::StreamUpgradeError;
use smallvec::SmallVec;
use std::{error, fmt::Debug, task::Context, task::Poll, time::Duration};

Expand All @@ -35,10 +35,8 @@ where
{
/// The upgrade for inbound substreams.
listen_protocol: SubstreamProtocol<TInbound, ()>,
/// If `Some`, something bad happened and we should shut down the handler with an error.
pending_error: Option<StreamUpgradeError<<TOutbound as OutboundUpgradeSend>::Error>>,
/// Queue of events to produce in `poll()`.
events_out: SmallVec<[TEvent; 4]>,
events_out: SmallVec<[Result<TEvent, StreamUpgradeError<TOutbound::Error>>; 4]>,
/// Queue of outbound substreams to open.
dial_queue: SmallVec<[TOutbound; 4]>,
/// Current number of concurrent outbound substreams being opened.
Expand All @@ -60,7 +58,6 @@ where
) -> Self {
OneShotHandler {
listen_protocol,
pending_error: None,
events_out: SmallVec::new(),
dial_queue: SmallVec::new(),
dial_negotiated: 0,
Expand Down Expand Up @@ -121,8 +118,8 @@ where
TEvent: Debug + Send + 'static,
{
type FromBehaviour = TOutbound;
type ToBehaviour = TEvent;
type Error = StreamUpgradeError<<Self::OutboundProtocol as OutboundUpgradeSend>::Error>;
type ToBehaviour = Result<TEvent, StreamUpgradeError<TOutbound::Error>>;
type Error = void::Void;
type InboundProtocol = TInbound;
type OutboundProtocol = TOutbound;
type OutboundOpenInfo = ();
Expand Down Expand Up @@ -151,10 +148,6 @@ where
Self::Error,
>,
> {
if let Some(err) = self.pending_error.take() {
return Poll::Ready(ConnectionHandlerEvent::Close(err));
}

if !self.events_out.is_empty() {
return Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour(
self.events_out.remove(0),
Expand Down Expand Up @@ -197,20 +190,17 @@ where
protocol: out,
..
}) => {
self.events_out.push(out.into());
self.events_out.push(Ok(out.into()));
}
ConnectionEvent::FullyNegotiatedOutbound(FullyNegotiatedOutbound {
protocol: out,
..
}) => {
self.dial_negotiated -= 1;
self.events_out.push(out.into());
self.events_out.push(Ok(out.into()));
}
ConnectionEvent::DialUpgradeError(DialUpgradeError { error, .. }) => {
if self.pending_error.is_none() {
log::debug!("DialUpgradeError: {error}");
self.keep_alive = KeepAlive::No;
}
self.events_out.push(Err(error));
}
ConnectionEvent::AddressChange(_)
| ConnectionEvent::ListenUpgradeError(_)
Expand Down

0 comments on commit c40cafe

Please sign in to comment.