Skip to content

Commit

Permalink
chore: upgrade libp2p to 0.53 (#3671)
Browse files Browse the repository at this point in the history
  • Loading branch information
hanabi1224 authored Nov 21, 2023
1 parent 7b4eef0 commit efcff85
Show file tree
Hide file tree
Showing 11 changed files with 784 additions and 788 deletions.
1,408 changes: 701 additions & 707 deletions Cargo.lock

Large diffs are not rendered by default.

8 changes: 4 additions & 4 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ libc = "0.2"
libipld = { version = "0.16", default-features = false, features = ["dag-cbor", "dag-json", "derive", "serde-codec"] }
libipld-core = { version = "0.16", features = ['arb', 'serde-codec'] }
libipld-macro = "0.16"
libp2p = { version = "0.52.1", default-features = false, features = [
libp2p = { version = "0.53", default-features = false, features = [
'autonat',
'gossipsub',
'kad',
Expand Down Expand Up @@ -134,7 +134,7 @@ pin-project-lite = "0.2"
positioned-io = "0.3.2"
pretty_assertions = "1.3.0"
prometheus = { version = "0.13", features = ["process"] }
prometheus-client = "0.21"
prometheus-client = "0.22"
quick-protobuf = "0.8"
quick-protobuf-codec = "0.2"
rand = "0.8"
Expand Down Expand Up @@ -203,8 +203,8 @@ derive-quickcheck-arbitrary = "0.1.1"
fvm3 = { package = "fvm", default-features = false, version = "~3.8", features = ["arb"] }
fvm_shared3 = { package = "fvm_shared", version = "~3.6", default-features = false, features = ["arb"] }
http-range-header = "0.4.0"
libp2p = { version = "0.52.1", features = ['tcp', 'noise', 'yamux', 'request-response', 'tokio'] }
libp2p-swarm-test = "0.2"
libp2p = { version = "0.53", features = ['tcp', 'noise', 'yamux', 'request-response', 'tokio'] }
libp2p-swarm-test = "0.3"
num-bigint = { version = "0.4", features = ['quickcheck'] }
predicates = "3.0"
proc-macro2 = { version = "1.0.68", default-features = false, features = ["span-locations"] }
Expand Down
2 changes: 1 addition & 1 deletion src/chain_sync/network_context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -364,7 +364,7 @@ where
}
// Ignore dropping peer on timeout for now. Can't be confident yet that the
// specified timeout is adequate time.
RequestResponseError::Timeout => {
RequestResponseError::Timeout | RequestResponseError::Io(_) => {
peer_manager.log_failure(peer_id, res_duration).await;
}
}
Expand Down
21 changes: 12 additions & 9 deletions src/libp2p/chain_exchange/behaviour.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,9 @@

use ahash::HashMap;
use libp2p::{
request_response::{self, OutboundFailure, ProtocolSupport, RequestId, ResponseChannel},
request_response::{
self, OutboundFailure, OutboundRequestId, ProtocolSupport, ResponseChannel,
},
swarm::{derive_prelude::*, NetworkBehaviour, THandlerOutEvent},
PeerId,
};
Expand All @@ -16,8 +18,10 @@ type InnerBehaviour = request_response::Behaviour<ChainExchangeCodec>;

pub struct ChainExchangeBehaviour {
inner: InnerBehaviour,
response_channels:
HashMap<RequestId, flume::Sender<Result<ChainExchangeResponse, RequestResponseError>>>,
response_channels: HashMap<
OutboundRequestId,
flume::Sender<Result<ChainExchangeResponse, RequestResponseError>>,
>,
}

impl ChainExchangeBehaviour {
Expand All @@ -26,7 +30,7 @@ impl ChainExchangeBehaviour {
peer: &PeerId,
request: ChainExchangeRequest,
response_channel: flume::Sender<Result<ChainExchangeResponse, RequestResponseError>>,
) -> RequestId {
) -> OutboundRequestId {
let request_id = self.inner.send_request(peer, request);
self.response_channels.insert(request_id, response_channel);
self.track_metrics();
Expand All @@ -43,7 +47,7 @@ impl ChainExchangeBehaviour {

pub async fn handle_inbound_response(
&mut self,
request_id: &RequestId,
request_id: &OutboundRequestId,
response: ChainExchangeResponse,
) {
if let Some(channel) = self.response_channels.remove(request_id) {
Expand All @@ -57,7 +61,7 @@ impl ChainExchangeBehaviour {
}
}

pub fn on_outbound_error(&mut self, request_id: &RequestId, error: OutboundFailure) {
pub fn on_outbound_error(&mut self, request_id: &OutboundRequestId, error: OutboundFailure) {
self.track_metrics();
if let Some(tx) = self.response_channels.remove(request_id) {
if let Err(err) = tx.send(Err(error.into())) {
Expand Down Expand Up @@ -155,15 +159,14 @@ impl NetworkBehaviour for ChainExchangeBehaviour {
.on_connection_handler_event(peer_id, connection_id, event)
}

fn on_swarm_event(&mut self, event: FromSwarm<Self::ConnectionHandler>) {
fn on_swarm_event(&mut self, event: FromSwarm) {
self.inner.on_swarm_event(event)
}

fn poll(
&mut self,
cx: &mut std::task::Context<'_>,
params: &mut impl PollParameters,
) -> std::task::Poll<ToSwarm<Self::ToSwarm, THandlerInEvent<Self>>> {
self.inner.poll(cx, params)
self.inner.poll(cx)
}
}
23 changes: 14 additions & 9 deletions src/libp2p/discovery.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,14 +14,14 @@ use libp2p::{
core::Multiaddr,
identify,
identity::{PeerId, PublicKey},
kad::{self, record::store::MemoryStore},
kad::{self, store::MemoryStore},
mdns::{tokio::Behaviour as Mdns, Event as MdnsEvent},
multiaddr::Protocol,
swarm::{
behaviour::toggle::Toggle,
derive_prelude::*,
dial_opts::{DialOpts, PeerCondition},
NetworkBehaviour, PollParameters, ToSwarm,
NetworkBehaviour, ToSwarm,
},
StreamProtocol,
};
Expand Down Expand Up @@ -316,7 +316,7 @@ impl NetworkBehaviour for DiscoveryBehaviour {
)
}

fn on_swarm_event(&mut self, event: FromSwarm<Self::ConnectionHandler>) {
fn on_swarm_event(&mut self, event: FromSwarm) {
match &event {
FromSwarm::ConnectionEstablished(e) => {
if e.other_established == 0 {
Expand Down Expand Up @@ -354,7 +354,6 @@ impl NetworkBehaviour for DiscoveryBehaviour {
fn poll(
&mut self,
cx: &mut Context,
params: &mut impl PollParameters,
) -> Poll<ToSwarm<Self::ToSwarm, libp2p::swarm::THandlerInEvent<Self>>> {
// Immediately process the content of `discovered`.
if let Some(ev) = self.pending_events.pop_front() {
Expand Down Expand Up @@ -391,7 +390,7 @@ impl NetworkBehaviour for DiscoveryBehaviour {
}

// Poll discovery events.
while let Poll::Ready(ev) = self.discovery.poll(cx, params) {
while let Poll::Ready(ev) = self.discovery.poll(cx) {
match ev {
ToSwarm::GenerateEvent(ev) => {
match &ev {
Expand Down Expand Up @@ -475,6 +474,7 @@ impl NetworkBehaviour for DiscoveryBehaviour {
ToSwarm::ExternalAddrExpired(addr) => {
return Poll::Ready(ToSwarm::ExternalAddrExpired(addr))
}
_ => {}
}
}

Expand Down Expand Up @@ -506,7 +506,7 @@ mod tests {
}

let mut b = Swarm::new_ephemeral(|k| new_discovery(k, vec![]));
b.listen().await;
b.listen().with_memory_addr_external().await;
let b_peer_id = *b.local_peer_id();
let b_addresses: Vec<_> = b
.external_addresses()
Expand All @@ -517,11 +517,16 @@ mod tests {
})
.collect();

let mut c = Swarm::new_ephemeral(|k| new_discovery(k, b_addresses.clone()));
c.listen().await;
let mut c = Swarm::new_ephemeral(|k| new_discovery(k, vec![]));
c.listen().with_memory_addr_external().await;
let c_peer_id = *c.local_peer_id();
if let Some(c_kad) = c.behaviour_mut().discovery.kademlia.as_mut() {
for addr in b.external_addresses() {
c_kad.add_address(&b_peer_id, addr.clone());
}
}

let mut a = Swarm::new_ephemeral(|k| new_discovery(k, b_addresses.clone()));
let mut a = Swarm::new_ephemeral(|k| new_discovery(k, b_addresses));

// Bootstrap `a` and `c`
a.behaviour_mut().bootstrap().unwrap();
Expand Down
19 changes: 11 additions & 8 deletions src/libp2p/hello/behaviour.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@

use ahash::HashMap;
use libp2p::{
request_response::{self, ProtocolSupport, RequestId, ResponseChannel},
request_response::{self, OutboundRequestId, ProtocolSupport, ResponseChannel},
swarm::{derive_prelude::*, NetworkBehaviour, THandlerOutEvent},
PeerId,
};
Expand All @@ -16,7 +16,7 @@ type InnerBehaviour = request_response::Behaviour<HelloCodec>;

pub struct HelloBehaviour {
inner: InnerBehaviour,
response_channels: HashMap<RequestId, flume::Sender<HelloResponse>>,
response_channels: HashMap<OutboundRequestId, flume::Sender<HelloResponse>>,
}

impl HelloBehaviour {
Expand All @@ -25,7 +25,7 @@ impl HelloBehaviour {
peer: &PeerId,
request: HelloRequest,
response_channel: flume::Sender<HelloResponse>,
) -> RequestId {
) -> OutboundRequestId {
let request_id = self.inner.send_request(peer, request);
self.response_channels.insert(request_id, response_channel);
self.track_metrics();
Expand All @@ -40,7 +40,11 @@ impl HelloBehaviour {
self.inner.send_response(channel, response)
}

pub async fn handle_response(&mut self, request_id: &RequestId, response: HelloResponse) {
pub async fn handle_response(
&mut self,
request_id: &OutboundRequestId,
response: HelloResponse,
) {
if let Some(channel) = self.response_channels.remove(request_id) {
self.track_metrics();
if let Err(err) = channel.send_async(response).await {
Expand All @@ -49,7 +53,7 @@ impl HelloBehaviour {
}
}

pub fn on_error(&mut self, request_id: &RequestId) {
pub fn on_outbound_failure(&mut self, request_id: &OutboundRequestId) {
if self.response_channels.remove(request_id).is_some() {
self.track_metrics();
}
Expand Down Expand Up @@ -140,15 +144,14 @@ impl NetworkBehaviour for HelloBehaviour {
.on_connection_handler_event(peer_id, connection_id, event)
}

fn on_swarm_event(&mut self, event: FromSwarm<Self::ConnectionHandler>) {
fn on_swarm_event(&mut self, event: FromSwarm) {
self.inner.on_swarm_event(event)
}

fn poll(
&mut self,
cx: &mut std::task::Context<'_>,
params: &mut impl PollParameters,
) -> std::task::Poll<ToSwarm<Self::ToSwarm, THandlerInEvent<Self>>> {
self.inner.poll(cx, params)
self.inner.poll(cx)
}
}
3 changes: 3 additions & 0 deletions src/libp2p/rpc/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,8 @@ pub enum RequestResponseError {
ConnectionClosed,
/// The remote supports none of the requested protocols.
UnsupportedProtocols,
/// An IO failure happened on an outbound stream.
Io(io::Error),
}

impl From<OutboundFailure> for RequestResponseError {
Expand All @@ -61,6 +63,7 @@ impl From<OutboundFailure> for RequestResponseError {
OutboundFailure::Timeout => Self::Timeout,
OutboundFailure::ConnectionClosed => Self::ConnectionClosed,
OutboundFailure::UnsupportedProtocols => Self::UnsupportedProtocols,
OutboundFailure::Io(e) => Self::Io(e),
}
}
}
Expand Down
33 changes: 13 additions & 20 deletions src/libp2p/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,7 @@ use libp2p::{
identity::Keypair,
metrics::{Metrics, Recorder},
multiaddr::Protocol,
noise, ping,
request_response::{self, RequestId, ResponseChannel},
noise, ping, request_response,
swarm::{self, SwarmEvent},
yamux, PeerId, Swarm, Transport,
};
Expand Down Expand Up @@ -108,22 +107,22 @@ pub enum NetworkEvent {
request: HelloRequest,
},
HelloRequestOutbound {
request_id: RequestId,
request_id: request_response::OutboundRequestId,
},
HelloResponseInbound {
request_id: RequestId,
request_id: request_response::OutboundRequestId,
},
ChainExchangeRequestOutbound {
request_id: RequestId,
request_id: request_response::OutboundRequestId,
},
ChainExchangeResponseInbound {
request_id: RequestId,
request_id: request_response::OutboundRequestId,
},
ChainExchangeRequestInbound {
request_id: RequestId,
request_id: request_response::InboundRequestId,
},
ChainExchangeResponseOutbound {
request_id: RequestId,
request_id: request_response::InboundRequestId,
},
PeerConnected(PeerId),
PeerDisconnected(PeerId),
Expand Down Expand Up @@ -650,16 +649,10 @@ async fn handle_hello_event(
peer,
error: _,
} => {
hello.on_error(&request_id);
hello.on_outbound_failure(&request_id);
peer_manager.mark_peer_bad(peer).await;
}
request_response::Event::InboundFailure {
request_id,
peer: _,
error: _,
} => {
hello.on_error(&request_id);
}
request_response::Event::InboundFailure { .. } => {}
request_response::Event::ResponseSent { .. } => (),
}
}
Expand Down Expand Up @@ -697,8 +690,8 @@ async fn handle_chain_exchange_event<DB>(
db: &Arc<ChainStore<DB>>,
network_sender_out: &Sender<NetworkEvent>,
cx_response_tx: Sender<(
RequestId,
ResponseChannel<ChainExchangeResponse>,
request_response::InboundRequestId,
request_response::ResponseChannel<ChainExchangeResponse>,
ChainExchangeResponse,
)>,
) where
Expand Down Expand Up @@ -781,8 +774,8 @@ async fn handle_forest_behaviour_event<DB>(
genesis_cid: &Cid,
network_sender_out: &Sender<NetworkEvent>,
cx_response_tx: Sender<(
RequestId,
ResponseChannel<ChainExchangeResponse>,
request_response::InboundRequestId,
request_response::ResponseChannel<ChainExchangeResponse>,
ChainExchangeResponse,
)>,
pubsub_block_str: &str,
Expand Down
Loading

0 comments on commit efcff85

Please sign in to comment.