Skip to content

Commit

Permalink
protocols/*: Update
Browse files Browse the repository at this point in the history
  • Loading branch information
mxinden committed Aug 19, 2021
1 parent 9262c03 commit 62c5e13
Show file tree
Hide file tree
Showing 11 changed files with 48 additions and 34 deletions.
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ libp2p-pnet = { version = "0.21.0", path = "transports/pnet", optional = true }
libp2p-relay = { version = "0.4.0", path = "protocols/relay", optional = true }
libp2p-request-response = { version = "0.13.0", path = "protocols/request-response", optional = true }
libp2p-swarm = { version = "0.31.0", path = "swarm" }
libp2p-swarm-derive = { version = "0.24.0", path = "swarm-derive" }
libp2p-swarm-derive = { version = "0.25.0", path = "swarm-derive" }
libp2p-uds = { version = "0.30.0", path = "transports/uds", optional = true }
libp2p-wasm-ext = { version = "0.30.0", path = "transports/wasm-ext", default-features = false, optional = true }
libp2p-yamux = { version = "0.34.0", path = "muxers/yamux", optional = true }
Expand Down
6 changes: 3 additions & 3 deletions core/tests/connection_limits.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,9 +53,9 @@ fn max_outgoing() {
.dial(Multiaddr::empty(), Vec::new(), TestHandler())
.expect_err("Unexpected dialing success.")
{
DialError::ConnectionLimit(err) => {
assert_eq!(err.current, outgoing_limit);
assert_eq!(err.limit, outgoing_limit);
DialError::ConnectionLimit{limit, handler: _} => {
assert_eq!(limit.current, outgoing_limit);
assert_eq!(limit.limit, outgoing_limit);
}
e => panic!("Unexpected error: {:?}", e),
}
Expand Down
6 changes: 3 additions & 3 deletions misc/metrics/src/swarm.rs
Original file line number Diff line number Diff line change
Expand Up @@ -230,7 +230,7 @@ enum PendingConnectionError {
InvalidPeerId,
TransportErrorMultiaddrNotSupported,
TransportErrorOther,
ConnectionLimit,
Aborted,
Io,
}

Expand All @@ -248,8 +248,8 @@ impl<TTransErr> From<&libp2p_core::connection::PendingConnectionError<TTransErr>
libp2p_core::connection::PendingConnectionError::Transport(
libp2p_core::transport::TransportError::Other(_),
) => PendingConnectionError::TransportErrorOther,
libp2p_core::connection::PendingConnectionError::ConnectionLimit(_) => {
PendingConnectionError::ConnectionLimit
libp2p_core::connection::PendingConnectionError::Aborted => {
PendingConnectionError::Aborted
}
libp2p_core::connection::PendingConnectionError::IO(_) => PendingConnectionError::Io,
}
Expand Down
3 changes: 2 additions & 1 deletion protocols/floodsub/src/layer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -106,10 +106,11 @@ impl Floodsub {
}

if self.target_peers.insert(peer_id) {
let handler = self.new_handler();
self.events.push_back(NetworkBehaviourAction::DialPeer {
peer_id,
condition: DialPeerCondition::Disconnected,
handler: self.new_handler(),
handler,
});
}
}
Expand Down
10 changes: 7 additions & 3 deletions protocols/identify/src/identify.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,9 @@ use libp2p_core::{
ConnectedPoint, Multiaddr, PeerId, PublicKey,
};
use libp2p_swarm::{
AddressScore, DialPeerCondition, NegotiatedSubstream, NetworkBehaviour, NetworkBehaviourAction,
NotifyHandler, PollParameters, ProtocolsHandler, ProtocolsHandlerUpgrErr,
AddressScore, DialPeerCondition, IntoProtocolsHandler, NegotiatedSubstream, NetworkBehaviour,
NetworkBehaviourAction, NotifyHandler, PollParameters, ProtocolsHandler,
ProtocolsHandlerUpgrErr,
};
use std::{
collections::{HashMap, HashSet, VecDeque},
Expand Down Expand Up @@ -173,9 +174,11 @@ impl Identify {
for p in peers {
if self.pending_push.insert(p) {
if !self.connected.contains_key(&p) {
let handler = self.new_handler();
self.events.push_back(NetworkBehaviourAction::DialPeer {
peer_id: p,
condition: DialPeerCondition::Disconnected,
handler,
});
}
}
Expand Down Expand Up @@ -213,13 +216,14 @@ impl NetworkBehaviour for Identify {
peer_id: &PeerId,
conn: &ConnectionId,
_: &ConnectedPoint,
_: <Self::ProtocolsHandler as IntoProtocolsHandler>::Handler,
) {
if let Some(addrs) = self.connected.get_mut(peer_id) {
addrs.remove(conn);
}
}

fn inject_dial_failure(&mut self, peer_id: &PeerId) {
fn inject_dial_failure(&mut self, peer_id: &PeerId, _: Self::ProtocolsHandler) {
if !self.connected.contains_key(peer_id) {
self.pending_push.remove(peer_id);
}
Expand Down
7 changes: 4 additions & 3 deletions protocols/kad/src/behaviour.rs
Original file line number Diff line number Diff line change
Expand Up @@ -394,6 +394,7 @@ impl KademliaConfig {
impl<TStore> Kademlia<TStore>
where
for<'a> TStore: RecordStore<'a>,
TStore: Send + 'static,
{
/// Creates a new `Kademlia` network behaviour with a default configuration.
pub fn new(id: PeerId, store: TStore) -> Self {
Expand Down Expand Up @@ -561,7 +562,7 @@ where
RoutingUpdate::Failed
}
kbucket::InsertResult::Pending { disconnected } => {
let handler = self.new_handler(),
let handler = self.new_handler();
self.queued_events
.push_back(NetworkBehaviourAction::DialPeer {
peer_id: disconnected.into_preimage(),
Expand Down Expand Up @@ -1142,7 +1143,7 @@ where
//
// Only try dialing peer if not currently connected.
if !self.connected_peers.contains(disconnected.preimage()) {
let handler = self.new_handler(),
let handler = self.new_handler();
self.queued_events
.push_back(NetworkBehaviourAction::DialPeer {
peer_id: disconnected.into_preimage(),
Expand Down Expand Up @@ -2258,7 +2259,7 @@ where
});
} else if &peer_id != self.kbuckets.local_key().preimage() {
query.inner.pending_rpcs.push((peer_id, event));
let handler = self.new_handler(),
let handler = self.new_handler();
self.queued_events
.push_back(NetworkBehaviourAction::DialPeer {
peer_id,
Expand Down
3 changes: 2 additions & 1 deletion protocols/relay/src/behaviour.rs
Original file line number Diff line number Diff line change
Expand Up @@ -474,11 +474,12 @@ impl NetworkBehaviour for Relay {
src_connection_id: connection,
},
);
let handler = self.new_handler();
self.outbox_to_swarm
.push_back(NetworkBehaviourAction::DialPeer {
peer_id: dest_id,
condition: DialPeerCondition::NotDialing,
handler: self.new_handler(),
handler,
});
} else {
self.outbox_to_swarm
Expand Down
2 changes: 2 additions & 0 deletions swarm-derive/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
# 0.25.0 [unreleased]

# 0.24.0 [2021-07-12]

- Handle `NetworkBehaviourAction::CloseConnection`. See [PR 2110] for details.
Expand Down
2 changes: 1 addition & 1 deletion swarm-derive/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
name = "libp2p-swarm-derive"
edition = "2018"
description = "Procedural macros of libp2p-core"
version = "0.24.0"
version = "0.25.0"
authors = ["Parity Technologies <[email protected]>"]
license = "MIT"
repository = "https://github.com/libp2p/rust-libp2p"
Expand Down
12 changes: 6 additions & 6 deletions swarm-derive/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -518,11 +518,11 @@ fn build_struct(ast: &DeriveInput, data_struct: &DataStruct) -> TokenStream {
loop {
match #trait_to_impl::poll(&mut #field_name, cx, poll_params) {
#generate_event_match_arm
std::task::Poll::Ready(#network_behaviour_action::DialAddress { address }) => {
return std::task::Poll::Ready(#network_behaviour_action::DialAddress { address });
std::task::Poll::Ready(#network_behaviour_action::DialAddress { address, handler }) => {
return std::task::Poll::Ready(#network_behaviour_action::DialAddress { address, handler });
}
std::task::Poll::Ready(#network_behaviour_action::DialPeer { peer_id, condition }) => {
return std::task::Poll::Ready(#network_behaviour_action::DialPeer { peer_id, condition });
std::task::Poll::Ready(#network_behaviour_action::DialPeer { peer_id, condition, handler }) => {
return std::task::Poll::Ready(#network_behaviour_action::DialPeer { peer_id, condition, handler });
}
std::task::Poll::Ready(#network_behaviour_action::NotifyHandler { peer_id, handler, event }) => {
return std::task::Poll::Ready(#network_behaviour_action::NotifyHandler {
Expand Down Expand Up @@ -586,7 +586,7 @@ fn build_struct(ast: &DeriveInput, data_struct: &DataStruct) -> TokenStream {
#(#inject_addr_reach_failure_stmts);*
}

fn inject_dial_failure(&mut self, peer_id: &#peer_id) {
fn inject_dial_failure(&mut self, peer_id: &#peer_id, handler: Self::ProtocolsHandler) {
#(#inject_dial_failure_stmts);*
}

Expand Down Expand Up @@ -629,7 +629,7 @@ fn build_struct(ast: &DeriveInput, data_struct: &DataStruct) -> TokenStream {
}
}

fn poll(&mut self, cx: &mut std::task::Context, poll_params: &mut impl #poll_parameters) -> std::task::Poll<#network_behaviour_action<<<Self::ProtocolsHandler as #into_protocols_handler>::Handler as #protocols_handler>::InEvent, Self::OutEvent>> {
fn poll(&mut self, cx: &mut std::task::Context, poll_params: &mut impl #poll_parameters) -> std::task::Poll<#network_behaviour_action<Self::OutEvent, Self::ProtocolsHandler>> {
use libp2p::futures::prelude::*;
#(#poll_stmts)*
let f: std::task::Poll<#network_behaviour_action<<<Self::ProtocolsHandler as #into_protocols_handler>::Handler as #protocols_handler>::InEvent, Self::OutEvent>> = #poll_method;
Expand Down
29 changes: 17 additions & 12 deletions swarm/src/test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ where
/// The next action to return from `poll`.
///
/// An action is only returned once.
pub next_action: Option<NetworkBehaviourAction<THandler::InEvent, TOutEvent>>,
pub next_action: Option<NetworkBehaviourAction<TOutEvent, THandler>>,
}

impl<THandler, TOutEvent> MockBehaviour<THandler, TOutEvent>
Expand Down Expand Up @@ -84,7 +84,7 @@ where
&mut self,
_: &mut Context,
_: &mut impl PollParameters,
) -> Poll<NetworkBehaviourAction<THandler::InEvent, Self::OutEvent>> {
) -> Poll<NetworkBehaviourAction<Self::OutEvent, Self::ProtocolsHandler>> {
self.next_action.take().map_or(Poll::Pending, Poll::Ready)
}
}
Expand Down Expand Up @@ -202,10 +202,16 @@ where
self.inner.inject_disconnected(peer);
}

fn inject_connection_closed(&mut self, p: &PeerId, c: &ConnectionId, e: &ConnectedPoint) {
fn inject_connection_closed(
&mut self,
p: &PeerId,
c: &ConnectionId,
e: &ConnectedPoint,
handler: <Self::ProtocolsHandler as IntoProtocolsHandler>::Handler,
) {
self.inject_connection_closed
.push((p.clone(), c.clone(), e.clone()));
self.inner.inject_connection_closed(p, c, e);
self.inner.inject_connection_closed(p, c, e, handler);
}

fn inject_event(
Expand All @@ -228,9 +234,9 @@ where
self.inner.inject_addr_reach_failure(p, a, e);
}

fn inject_dial_failure(&mut self, p: &PeerId) {
fn inject_dial_failure(&mut self, p: &PeerId, handler: Self::ProtocolsHandler) {
self.inject_dial_failure.push(p.clone());
self.inner.inject_dial_failure(p);
self.inner.inject_dial_failure(p, handler);
}

fn inject_new_listener(&mut self, id: ListenerId) {
Expand Down Expand Up @@ -268,12 +274,11 @@ where
self.inner.inject_listener_closed(l, r);
}

fn poll(&mut self, cx: &mut Context, args: &mut impl PollParameters) ->
Poll<NetworkBehaviourAction<
<<Self::ProtocolsHandler as IntoProtocolsHandler>::Handler as ProtocolsHandler>::InEvent,
Self::OutEvent
>>
{
fn poll(
&mut self,
cx: &mut Context,
args: &mut impl PollParameters,
) -> Poll<NetworkBehaviourAction<Self::OutEvent, Self::ProtocolsHandler>> {
self.poll += 1;
self.inner.poll(cx, args)
}
Expand Down

0 comments on commit 62c5e13

Please sign in to comment.