From ab26a0b7aad190684050810c5a0a0253448a07de Mon Sep 17 00:00:00 2001 From: acerone85 Date: Tue, 15 Oct 2024 21:03:51 +0100 Subject: [PATCH 01/23] Add a module for versioning the request response protocols in the p2p_service --- crates/services/p2p/src/codecs/postcard.rs | 2 +- crates/services/p2p/src/p2p_service.rs | 1 - crates/services/p2p/src/request_response.rs | 2 + .../p2p/src/request_response/protocols.rs | 112 ++++++++++++++++++ 4 files changed, 115 insertions(+), 2 deletions(-) create mode 100644 crates/services/p2p/src/request_response/protocols.rs diff --git a/crates/services/p2p/src/codecs/postcard.rs b/crates/services/p2p/src/codecs/postcard.rs index 94f23cd6fd2..1ec714b5f72 100644 --- a/crates/services/p2p/src/codecs/postcard.rs +++ b/crates/services/p2p/src/codecs/postcard.rs @@ -166,7 +166,7 @@ impl NetworkCodec for PostcardCodec { } } -#[derive(Default, Debug, Clone)] +#[derive(Default, Debug, Clone, PartialEq, Eq, PartialOrd, Ord)] pub struct MessageExchangePostcardProtocol; impl AsRef for MessageExchangePostcardProtocol { diff --git a/crates/services/p2p/src/p2p_service.rs b/crates/services/p2p/src/p2p_service.rs index eb5a5a75a6c..fdab0a51af2 100644 --- a/crates/services/p2p/src/p2p_service.rs +++ b/crates/services/p2p/src/p2p_service.rs @@ -719,7 +719,6 @@ impl FuelP2PService { let mut addresses = info.listen_addrs; let agent_version = info.agent_version; - if addresses.len() > MAX_IDENTIFY_ADDRESSES { let protocol_version = info.protocol_version; debug!( diff --git a/crates/services/p2p/src/request_response.rs b/crates/services/p2p/src/request_response.rs index ba63992f3cb..0fdacb37e66 100644 --- a/crates/services/p2p/src/request_response.rs +++ b/crates/services/p2p/src/request_response.rs @@ -1 +1,3 @@ pub mod messages; + +pub mod protocols; diff --git a/crates/services/p2p/src/request_response/protocols.rs b/crates/services/p2p/src/request_response/protocols.rs new file mode 100644 index 00000000000..7c3a3af39ae --- /dev/null +++ b/crates/services/p2p/src/request_response/protocols.rs @@ -0,0 +1,112 @@ +//! This module contains structs and functions for versioning +//! request response protocols, and for recovering the list +//! of different versions of the protocol understood by +//! connected peers. + +use libp2p::{ + identify, + StreamProtocol, +}; + +use crate::codecs::postcard::MessageExchangePostcardProtocol; + +use super::messages::REQUEST_RESPONSE_PROTOCOL_ID; + +#[derive(Clone, Debug, PartialEq, Eq, PartialOrd, Ord)] +pub enum ProtocolVersion { + V1(MessageExchangePostcardProtocol), +} + +impl ProtocolVersion { + pub fn as_ref(&self) -> &str { + match self { + ProtocolVersion::V1(protocol) => protocol.as_ref(), + } + } + + pub fn latest_compatible_version_for_peer( + info: identify::Info, + ) -> Option { + info.protocols + .iter() + .filter_map(|protocol| ProtocolVersion::try_from(protocol.clone()).ok()) + .max() + } +} + +impl TryFrom for ProtocolVersion { + // TODO: Better error type + type Error = (); + + fn try_from(protocol: StreamProtocol) -> Result { + match protocol.as_ref() { + REQUEST_RESPONSE_PROTOCOL_ID => { + Ok(ProtocolVersion::V1(MessageExchangePostcardProtocol)) + } + _ => Err(()), + } + } +} + +#[cfg(test)] +mod tests { + use libp2p::{ + identify::{ + self, + }, + identity::PublicKey, + Multiaddr, + StreamProtocol, + }; + + use crate::{ + codecs::postcard::MessageExchangePostcardProtocol, + heartbeat::HEARTBEAT_PROTOCOL, + request_response::protocols::ProtocolVersion, + }; + + fn peer_info<'a>(protocols: &[impl AsRef]) -> identify::Info { + let public_key = PublicKey::try_decode_protobuf(&hex::decode( + "080112201ed1e8fae2c4a144b8be8fd4b47bf3d3b34b871c3cacf6010f0e42d474fce27e", + ).unwrap()).unwrap(); + + let mut stream_protocols: Vec = + Vec::with_capacity(protocols.len()); + for protocol in protocols { + stream_protocols.push( + StreamProtocol::try_from_owned(protocol.as_ref().to_string()).unwrap(), + ); + } + + identify::Info { + protocols: stream_protocols, + agent_version: "0.0.1".to_string(), + protocol_version: "0.0.1".to_string(), + public_key, + listen_addrs: vec![], + observed_addr: Multiaddr::empty(), + } + } + + #[test] + fn test_latest_protocol_version_defined() { + let peer_info = + peer_info(&[MessageExchangePostcardProtocol.as_ref(), HEARTBEAT_PROTOCOL]); + let latest_compatible_version_for_peer = + ProtocolVersion::latest_compatible_version_for_peer(peer_info).unwrap(); + assert_eq!( + latest_compatible_version_for_peer, + crate::request_response::protocols::ProtocolVersion::V1( + MessageExchangePostcardProtocol + ) + ); + } + + #[test] + fn test_latest_protocol_version_undefined() { + let peer_info = peer_info(&[HEARTBEAT_PROTOCOL, "/some/other/protocol/1.0.0"]); + let latest_compatible_version_for_peer = + ProtocolVersion::latest_compatible_version_for_peer(peer_info); + assert!(latest_compatible_version_for_peer.is_none(),); + } +} From c9ba07417a4670bc05a4e79b5e42b1482320eeb6 Mon Sep 17 00:00:00 2001 From: acerone85 Date: Tue, 15 Oct 2024 21:20:39 +0100 Subject: [PATCH 02/23] Record the latest compatible version for the request response protocol in the PeerManager table --- crates/services/p2p/src/p2p_service.rs | 4 +++ crates/services/p2p/src/peer_manager.rs | 25 +++++++++++++++++++ crates/services/p2p/src/request_response.rs | 2 ++ .../p2p/src/request_response/protocols.rs | 6 ++--- crates/services/p2p/src/service.rs | 2 ++ 5 files changed, 36 insertions(+), 3 deletions(-) diff --git a/crates/services/p2p/src/p2p_service.rs b/crates/services/p2p/src/p2p_service.rs index fdab0a51af2..1f133a92640 100644 --- a/crates/services/p2p/src/p2p_service.rs +++ b/crates/services/p2p/src/p2p_service.rs @@ -26,6 +26,7 @@ use crate::{ Punisher, }, peer_report::PeerReportEvent, + request_response as fuel_request_response, request_response::messages::{ RequestError, RequestMessage, @@ -717,6 +718,8 @@ impl FuelP2PService { identify::Event::Received { peer_id, info } => { self.update_metrics(increment_unique_peers); + let request_response_protocol_version = + fuel_request_response::ProtocolVersion::latest_compatible_version_for_peer(&info); let mut addresses = info.listen_addrs; let agent_version = info.agent_version; if addresses.len() > MAX_IDENTIFY_ADDRESSES { @@ -733,6 +736,7 @@ impl FuelP2PService { &peer_id, addresses.clone(), agent_version, + request_response_protocol_version, ); self.swarm diff --git a/crates/services/p2p/src/peer_manager.rs b/crates/services/p2p/src/peer_manager.rs index 1c4cf6251d9..28e292b990a 100644 --- a/crates/services/p2p/src/peer_manager.rs +++ b/crates/services/p2p/src/peer_manager.rs @@ -31,6 +31,10 @@ use tracing::{ use crate::{ gossipsub_config::GRAYLIST_THRESHOLD, peer_manager::heartbeat_data::HeartbeatData, + request_response::{ + self, + ProtocolVersion, + }, }; pub mod heartbeat_data; @@ -45,6 +49,9 @@ pub struct PeerInfo { pub client_version: Option, pub heartbeat_data: HeartbeatData, pub score: AppScore, + /// The latest protocol version that the peer supports and that is + /// compatible with the current version of the node + pub request_response_protocol_version: Option, } impl PeerInfo { @@ -54,6 +61,7 @@ impl PeerInfo { client_version: None, heartbeat_data: HeartbeatData::new(heartbeat_avg_window), score: DEFAULT_APP_SCORE, + request_response_protocol_version: None, } } } @@ -135,10 +143,14 @@ impl PeerManager { peer_id: &PeerId, addresses: Vec, agent_version: String, + protocol_version: Option, ) { let peers = self.get_assigned_peer_table_mut(peer_id); insert_client_version(peers, peer_id, agent_version); insert_peer_addresses(peers, peer_id, addresses); + protocol_version.into_iter().for_each(|protocol| { + update_request_response_protocol_version(peers, peer_id, protocol) + }); } pub fn batch_update_score_with_decay(&mut self) { @@ -356,6 +368,19 @@ fn insert_client_version( } } +// Updates the latest request response protocol version that the peer supports +fn update_request_response_protocol_version( + peers: &mut HashMap, + peer_id: &PeerId, + protocol: ProtocolVersion, +) { + if let Some(peer) = peers.get_mut(peer_id) { + peer.request_response_protocol_version = Some(protocol); + } else { + log_missing_peer(peer_id); + } +} + fn log_missing_peer(peer_id: &PeerId) { debug!(target: "fuel-p2p", "Peer with PeerId: {:?} is not among the connected peers", peer_id) } diff --git a/crates/services/p2p/src/request_response.rs b/crates/services/p2p/src/request_response.rs index 0fdacb37e66..21e7c503087 100644 --- a/crates/services/p2p/src/request_response.rs +++ b/crates/services/p2p/src/request_response.rs @@ -1,3 +1,5 @@ pub mod messages; pub mod protocols; + +pub use protocols::ProtocolVersion; diff --git a/crates/services/p2p/src/request_response/protocols.rs b/crates/services/p2p/src/request_response/protocols.rs index 7c3a3af39ae..1fa7e6f0ba8 100644 --- a/crates/services/p2p/src/request_response/protocols.rs +++ b/crates/services/p2p/src/request_response/protocols.rs @@ -25,7 +25,7 @@ impl ProtocolVersion { } pub fn latest_compatible_version_for_peer( - info: identify::Info, + info: &identify::Info, ) -> Option { info.protocols .iter() @@ -93,7 +93,7 @@ mod tests { let peer_info = peer_info(&[MessageExchangePostcardProtocol.as_ref(), HEARTBEAT_PROTOCOL]); let latest_compatible_version_for_peer = - ProtocolVersion::latest_compatible_version_for_peer(peer_info).unwrap(); + ProtocolVersion::latest_compatible_version_for_peer(&peer_info).unwrap(); assert_eq!( latest_compatible_version_for_peer, crate::request_response::protocols::ProtocolVersion::V1( @@ -106,7 +106,7 @@ mod tests { fn test_latest_protocol_version_undefined() { let peer_info = peer_info(&[HEARTBEAT_PROTOCOL, "/some/other/protocol/1.0.0"]); let latest_compatible_version_for_peer = - ProtocolVersion::latest_compatible_version_for_peer(peer_info); + ProtocolVersion::latest_compatible_version_for_peer(&peer_info); assert!(latest_compatible_version_for_peer.is_none(),); } } diff --git a/crates/services/p2p/src/service.rs b/crates/services/p2p/src/service.rs index 3ff09da0beb..1ca18f05a04 100644 --- a/crates/services/p2p/src/service.rs +++ b/crates/services/p2p/src/service.rs @@ -1534,6 +1534,7 @@ pub mod tests { client_version: None, heartbeat_data, score: 100.0, + request_response_protocol_version: None, }; let peer_info = vec![(peer_id, peer_info)]; let p2p_service = FakeP2PService { @@ -1624,6 +1625,7 @@ pub mod tests { client_version: None, heartbeat_data, score: 100.0, + request_response_protocol_version: None, }; let peer_info = vec![(peer_id, peer_info)]; let p2p_service = FakeP2PService { From b3d321928024b71aa44590475312c2d15e5ba2dc Mon Sep 17 00:00:00 2001 From: acerone85 Date: Tue, 15 Oct 2024 21:53:42 +0100 Subject: [PATCH 03/23] P2PService uses peer manager information to determine the version of the request response protocol to use --- crates/services/p2p/src/p2p_service.rs | 48 ++++++++++++++++++-------- 1 file changed, 33 insertions(+), 15 deletions(-) diff --git a/crates/services/p2p/src/p2p_service.rs b/crates/services/p2p/src/p2p_service.rs index 1f133a92640..dcb54c1b34d 100644 --- a/crates/services/p2p/src/p2p_service.rs +++ b/crates/services/p2p/src/p2p_service.rs @@ -4,7 +4,10 @@ use crate::{ FuelBehaviourEvent, }, codecs::{ - postcard::PostcardCodec, + postcard::{ + MessageExchangePostcardProtocol, + PostcardCodec, + }, GossipsubCodec, }, config::{ @@ -26,14 +29,16 @@ use crate::{ Punisher, }, peer_report::PeerReportEvent, - request_response as fuel_request_response, - request_response::messages::{ - RequestError, - RequestMessage, - ResponseError, - ResponseMessage, - ResponseSendError, - ResponseSender, + request_response::{ + self as fuel_request_response, + messages::{ + RequestError, + RequestMessage, + ResponseError, + ResponseMessage, + ResponseSendError, + ResponseSender, + }, }, TryPeerId, }; @@ -373,14 +378,27 @@ impl FuelP2PService { } }; - let request_id = self - .swarm - .behaviour_mut() - .send_request_msg(message_request, &peer_id); + let latest_compatible_request_response_protocol_version = self + .peer_manager + .get_peer_info(&peer_id) + .map(|peer_info| peer_info.request_response_protocol_version.as_ref()) + .flatten() + .unwrap_or(&fuel_request_response::ProtocolVersion::V1( + MessageExchangePostcardProtocol, + )); + + match latest_compatible_request_response_protocol_version { + fuel_request_response::ProtocolVersion::V1(_) => { + let request_id = self + .swarm + .behaviour_mut() + .send_request_msg(message_request, &peer_id); - self.outbound_requests_table.insert(request_id, on_response); + self.outbound_requests_table.insert(request_id, on_response); - Ok(request_id) + Ok(request_id) + } + } } /// Sends ResponseMessage to a peer that requested the data From 8817d88fcbfe867746f5e6a9199103cb1796fd02 Mon Sep 17 00:00:00 2001 From: acerone85 Date: Wed, 16 Oct 2024 07:29:26 +0100 Subject: [PATCH 04/23] Remove argument from Protocol::V1 and minor improvements to tests --- crates/services/p2p/src/p2p_service.rs | 6 ++--- .../p2p/src/request_response/protocols.rs | 27 +++++++++---------- 2 files changed, 14 insertions(+), 19 deletions(-) diff --git a/crates/services/p2p/src/p2p_service.rs b/crates/services/p2p/src/p2p_service.rs index dcb54c1b34d..75c0a05473e 100644 --- a/crates/services/p2p/src/p2p_service.rs +++ b/crates/services/p2p/src/p2p_service.rs @@ -383,12 +383,10 @@ impl FuelP2PService { .get_peer_info(&peer_id) .map(|peer_info| peer_info.request_response_protocol_version.as_ref()) .flatten() - .unwrap_or(&fuel_request_response::ProtocolVersion::V1( - MessageExchangePostcardProtocol, - )); + .unwrap_or_default(); match latest_compatible_request_response_protocol_version { - fuel_request_response::ProtocolVersion::V1(_) => { + fuel_request_response::ProtocolVersion::V1 => { let request_id = self .swarm .behaviour_mut() diff --git a/crates/services/p2p/src/request_response/protocols.rs b/crates/services/p2p/src/request_response/protocols.rs index 1fa7e6f0ba8..a680bebd795 100644 --- a/crates/services/p2p/src/request_response/protocols.rs +++ b/crates/services/p2p/src/request_response/protocols.rs @@ -8,22 +8,22 @@ use libp2p::{ StreamProtocol, }; -use crate::codecs::postcard::MessageExchangePostcardProtocol; - use super::messages::REQUEST_RESPONSE_PROTOCOL_ID; #[derive(Clone, Debug, PartialEq, Eq, PartialOrd, Ord)] pub enum ProtocolVersion { - V1(MessageExchangePostcardProtocol), + /// The Version 1 of the protocol. This version does not have error codes + /// in the response messages. + V1, } -impl ProtocolVersion { - pub fn as_ref(&self) -> &str { - match self { - ProtocolVersion::V1(protocol) => protocol.as_ref(), - } +impl Default for &ProtocolVersion { + fn default() -> Self { + &ProtocolVersion::V1 } +} +impl ProtocolVersion { pub fn latest_compatible_version_for_peer( info: &identify::Info, ) -> Option { @@ -40,9 +40,7 @@ impl TryFrom for ProtocolVersion { fn try_from(protocol: StreamProtocol) -> Result { match protocol.as_ref() { - REQUEST_RESPONSE_PROTOCOL_ID => { - Ok(ProtocolVersion::V1(MessageExchangePostcardProtocol)) - } + REQUEST_RESPONSE_PROTOCOL_ID => Ok(ProtocolVersion::V1), _ => Err(()), } } @@ -66,9 +64,10 @@ mod tests { }; fn peer_info<'a>(protocols: &[impl AsRef]) -> identify::Info { + // This public key is valid, it has been copied from libp2p tests. let public_key = PublicKey::try_decode_protobuf(&hex::decode( "080112201ed1e8fae2c4a144b8be8fd4b47bf3d3b34b871c3cacf6010f0e42d474fce27e", - ).unwrap()).unwrap(); + ).expect("Decoding hexadecimal string cannot fail")).expect("Decoding valid public key cannot fail"); let mut stream_protocols: Vec = Vec::with_capacity(protocols.len()); @@ -96,9 +95,7 @@ mod tests { ProtocolVersion::latest_compatible_version_for_peer(&peer_info).unwrap(); assert_eq!( latest_compatible_version_for_peer, - crate::request_response::protocols::ProtocolVersion::V1( - MessageExchangePostcardProtocol - ) + crate::request_response::protocols::ProtocolVersion::V1 ); } From 185535092983bbc76c6f6b03517277b0be684ab8 Mon Sep 17 00:00:00 2001 From: acerone85 Date: Wed, 16 Oct 2024 07:36:26 +0100 Subject: [PATCH 05/23] Add changelog entry --- CHANGELOG.md | 3 +++ 1 file changed, 3 insertions(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 1fb76386829..7d4fe31fa09 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -9,6 +9,9 @@ and this project adheres to [Semantic Versioning](http://semver.org/). ### Added - [2321](https://github.com/FuelLabs/fuel-core/pull/2321): New metrics for the txpool: "The size of transactions in the txpool" (`txpool_tx_size`), "The time spent by a transaction in the txpool in seconds" (`txpool_tx_time_in_txpool_seconds`), The number of transactions in the txpool (`txpool_number_of_transactions`), "The number of transactions pending verification before entering the txpool" (`txpool_number_of_transactions_pending_verification`), "The number of executable transactions in the txpool" (`txpool_number_of_executable_transactions`), "The time it took to select transactions for inclusion in a block in nanoseconds" (`txpool_select_transaction_time_nanoseconds`), The time it took to insert a transaction in the txpool in milliseconds (`txpool_insert_transaction_time_milliseconds`). +### Changed +- [2362](https://github.com/FuelLabs/fuel-core/pull/2362): Preparation work for supporting multiple request response protocols in the P2P service. Information about request exchange protocols known by a peer is saved in the peer manager upon connection. Currently we only have `/fuel/req_res/0.0.1`, but a node will select the latest compatible version of the protocol that is known by a peer for sending messages. + ## [Version 0.40.0] ### Added From 1fa98a4ab9948e141e1b9c96156a2a940d5a7b4a Mon Sep 17 00:00:00 2001 From: acerone85 Date: Wed, 16 Oct 2024 07:46:50 +0100 Subject: [PATCH 06/23] Fix compilation error --- crates/services/p2p/src/p2p_service.rs | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/crates/services/p2p/src/p2p_service.rs b/crates/services/p2p/src/p2p_service.rs index 75767575ca8..64afc63bca1 100644 --- a/crates/services/p2p/src/p2p_service.rs +++ b/crates/services/p2p/src/p2p_service.rs @@ -4,10 +4,7 @@ use crate::{ FuelBehaviourEvent, }, codecs::{ - postcard::{ - MessageExchangePostcardProtocol, - PostcardCodec, - }, + postcard::PostcardCodec, GossipsubCodec, }, config::{ From 6cebf0e18821c847b8a83efad412895e8f5a23f9 Mon Sep 17 00:00:00 2001 From: acerone85 Date: Wed, 16 Oct 2024 08:59:30 +0100 Subject: [PATCH 07/23] Placate Clippy --- crates/services/p2p/src/p2p_service.rs | 3 +-- crates/services/p2p/src/request_response/protocols.rs | 2 +- crates/services/p2p/src/service.rs | 2 +- 3 files changed, 3 insertions(+), 4 deletions(-) diff --git a/crates/services/p2p/src/p2p_service.rs b/crates/services/p2p/src/p2p_service.rs index 64afc63bca1..b19289a79c9 100644 --- a/crates/services/p2p/src/p2p_service.rs +++ b/crates/services/p2p/src/p2p_service.rs @@ -420,8 +420,7 @@ impl FuelP2PService { let latest_compatible_request_response_protocol_version = self .peer_manager .get_peer_info(&peer_id) - .map(|peer_info| peer_info.request_response_protocol_version.as_ref()) - .flatten() + .and_then(|peer_info| peer_info.request_response_protocol_version.as_ref()) .unwrap_or_default(); match latest_compatible_request_response_protocol_version { diff --git a/crates/services/p2p/src/request_response/protocols.rs b/crates/services/p2p/src/request_response/protocols.rs index a680bebd795..bdb20ec91a4 100644 --- a/crates/services/p2p/src/request_response/protocols.rs +++ b/crates/services/p2p/src/request_response/protocols.rs @@ -63,7 +63,7 @@ mod tests { request_response::protocols::ProtocolVersion, }; - fn peer_info<'a>(protocols: &[impl AsRef]) -> identify::Info { + fn peer_info(protocols: &[impl AsRef]) -> identify::Info { // This public key is valid, it has been copied from libp2p tests. let public_key = PublicKey::try_decode_protobuf(&hex::decode( "080112201ed1e8fae2c4a144b8be8fd4b47bf3d3b34b871c3cacf6010f0e42d474fce27e", diff --git a/crates/services/p2p/src/service.rs b/crates/services/p2p/src/service.rs index 1ca18f05a04..533b9a8656e 100644 --- a/crates/services/p2p/src/service.rs +++ b/crates/services/p2p/src/service.rs @@ -967,7 +967,7 @@ where #[derive(Clone)] pub struct SharedState { - /// Sender of p2p with peer gossip subscription (vec represent the peer_id) + /// Sender of p2p with peer gossip subscription (`Vec` represents the peer_id) new_tx_subscription_broadcast: broadcast::Sender, /// Sender of p2p transaction used for subscribing. tx_broadcast: broadcast::Sender, From 939939db46f83f4a98a3170f64a7a41e927d406e Mon Sep 17 00:00:00 2001 From: acerone85 Date: Wed, 16 Oct 2024 11:28:54 +0100 Subject: [PATCH 08/23] Version PostcardCodec --- crates/services/p2p/src/behavior.rs | 20 +++++++++------ crates/services/p2p/src/codecs/postcard.rs | 25 +++++++++++-------- crates/services/p2p/src/p2p_service.rs | 8 +++--- .../p2p/src/request_response/messages.rs | 2 +- .../p2p/src/request_response/protocols.rs | 12 +++++---- 5 files changed, 38 insertions(+), 29 deletions(-) diff --git a/crates/services/p2p/src/behavior.rs b/crates/services/p2p/src/behavior.rs index 2b689eb3949..a2e33a3d039 100644 --- a/crates/services/p2p/src/behavior.rs +++ b/crates/services/p2p/src/behavior.rs @@ -1,6 +1,6 @@ use crate::{ codecs::{ - postcard::PostcardCodec, + postcard::PostcardCodecV1, NetworkCodec, }, config::Config, @@ -60,12 +60,15 @@ pub struct FuelBehaviour { /// Node discovery discovery: discovery::Behaviour, - /// RequestResponse protocol - request_response: request_response::Behaviour, + /// RequestResponse protocol Version 1 + request_response_v1: request_response::Behaviour, } impl FuelBehaviour { - pub(crate) fn new(p2p_config: &Config, codec: PostcardCodec) -> anyhow::Result { + pub(crate) fn new( + p2p_config: &Config, + codec: PostcardCodecV1, + ) -> anyhow::Result { let local_public_key = p2p_config.keypair.public(); let local_peer_id = PeerId::from_public_key(&local_public_key); @@ -119,7 +122,7 @@ impl FuelBehaviour { .with_request_timeout(p2p_config.set_request_timeout) .with_max_concurrent_streams(p2p_config.max_concurrent_streams); - let request_response = request_response::Behaviour::with_codec( + let request_response_v1 = request_response::Behaviour::with_codec( codec, req_res_protocol, req_res_config, @@ -130,7 +133,7 @@ impl FuelBehaviour { discovery, gossipsub, peer_report, - request_response, + request_response_v1, blocked_peer: Default::default(), identify, heartbeat, @@ -160,7 +163,8 @@ impl FuelBehaviour { message_request: RequestMessage, peer_id: &PeerId, ) -> OutboundRequestId { - self.request_response.send_request(peer_id, message_request) + self.request_response_v1 + .send_request(peer_id, message_request) } pub fn send_response_msg( @@ -168,7 +172,7 @@ impl FuelBehaviour { channel: ResponseChannel, message: ResponseMessage, ) -> Result<(), ResponseMessage> { - self.request_response.send_response(channel, message) + self.request_response_v1.send_response(channel, message) } pub fn report_message_validation_result( diff --git a/crates/services/p2p/src/codecs/postcard.rs b/crates/services/p2p/src/codecs/postcard.rs index 1ec714b5f72..9f6b0770df2 100644 --- a/crates/services/p2p/src/codecs/postcard.rs +++ b/crates/services/p2p/src/codecs/postcard.rs @@ -11,7 +11,7 @@ use crate::{ request_response::messages::{ RequestMessage, ResponseMessage, - REQUEST_RESPONSE_PROTOCOL_ID, + REQUEST_RESPONSE_PROTOCOL_ID_V1, }, }; use async_trait::async_trait; @@ -40,14 +40,14 @@ fn serialize(data: &D) -> Result, io::Error> { } #[derive(Debug, Clone)] -pub struct PostcardCodec { +pub struct PostcardCodec { /// Used for `max_size` parameter when reading Response Message /// Necessary in order to avoid DoS attacks /// Currently the size mostly depends on the max size of the Block max_response_size: usize, } -impl PostcardCodec { +impl PostcardCodec { pub fn new(max_block_size: usize) -> Self { assert_ne!( max_block_size, 0, @@ -60,6 +60,8 @@ impl PostcardCodec { } } +pub type PostcardCodecV1 = PostcardCodec<0>; + /// Since Postcard does not support async reads or writes out of the box /// We prefix Request & Response Messages with the length of the data in bytes /// We expect the substream to be properly closed when response channel is dropped. @@ -68,8 +70,8 @@ impl PostcardCodec { /// If the substream was not properly closed when dropped, the sender would instead /// run into a timeout waiting for the response. #[async_trait] -impl request_response::Codec for PostcardCodec { - type Protocol = MessageExchangePostcardProtocol; +impl request_response::Codec for PostcardCodecV1 { + type Protocol = MessageExchangePostcardProtocolV1; type Request = RequestMessage; type Response = ResponseMessage; @@ -135,7 +137,8 @@ impl request_response::Codec for PostcardCodec { } } -impl GossipsubCodec for PostcardCodec { +// GossipsubCodec is independent of the PostcardCoded version being used +impl GossipsubCodec for PostcardCodec { type RequestMessage = GossipsubBroadcastRequest; type ResponseMessage = GossipsubMessage; @@ -160,18 +163,18 @@ impl GossipsubCodec for PostcardCodec { } } -impl NetworkCodec for PostcardCodec { +impl NetworkCodec for PostcardCodecV1 { fn get_req_res_protocol(&self) -> ::Protocol { - MessageExchangePostcardProtocol {} + MessageExchangePostcardProtocolV1 {} } } #[derive(Default, Debug, Clone, PartialEq, Eq, PartialOrd, Ord)] -pub struct MessageExchangePostcardProtocol; +pub struct MessageExchangePostcardProtocolV1; -impl AsRef for MessageExchangePostcardProtocol { +impl AsRef for MessageExchangePostcardProtocolV1 { fn as_ref(&self) -> &str { - REQUEST_RESPONSE_PROTOCOL_ID + REQUEST_RESPONSE_PROTOCOL_ID_V1 } } diff --git a/crates/services/p2p/src/p2p_service.rs b/crates/services/p2p/src/p2p_service.rs index b19289a79c9..4c0a1a63012 100644 --- a/crates/services/p2p/src/p2p_service.rs +++ b/crates/services/p2p/src/p2p_service.rs @@ -4,7 +4,7 @@ use crate::{ FuelBehaviourEvent, }, codecs::{ - postcard::PostcardCodec, + postcard::PostcardCodecV1, GossipsubCodec, }, config::{ @@ -125,7 +125,7 @@ pub struct FuelP2PService { inbound_requests_table: HashMap>, /// NetworkCodec used as `` for encoding and decoding of Gossipsub messages - network_codec: PostcardCodec, + network_codec: PostcardCodecV1, /// Stores additional p2p network info network_metadata: NetworkMetadata, @@ -214,7 +214,7 @@ impl FuelP2PService { pub async fn new( reserved_peers_updates: broadcast::Sender, config: Config, - codec: PostcardCodec, + codec: PostcardCodecV1, ) -> anyhow::Result { let metrics = config.metrics; @@ -580,7 +580,7 @@ impl FuelP2PService { self.handle_gossipsub_event(event) } FuelBehaviourEvent::PeerReport(event) => self.handle_peer_report_event(event), - FuelBehaviourEvent::RequestResponse(event) => { + FuelBehaviourEvent::RequestResponseV1(event) => { self.handle_request_response_event(event) } FuelBehaviourEvent::Identify(event) => { diff --git a/crates/services/p2p/src/request_response/messages.rs b/crates/services/p2p/src/request_response/messages.rs index 83f3f7a3a50..10fe110e762 100644 --- a/crates/services/p2p/src/request_response/messages.rs +++ b/crates/services/p2p/src/request_response/messages.rs @@ -18,7 +18,7 @@ use std::ops::Range; use thiserror::Error; use tokio::sync::oneshot; -pub(crate) const REQUEST_RESPONSE_PROTOCOL_ID: &str = "/fuel/req_res/0.0.1"; +pub(crate) const REQUEST_RESPONSE_PROTOCOL_ID_V1: &str = "/fuel/req_res/0.0.1"; /// Max Size in Bytes of the Request Message #[cfg(test)] diff --git a/crates/services/p2p/src/request_response/protocols.rs b/crates/services/p2p/src/request_response/protocols.rs index bdb20ec91a4..963f2976bbe 100644 --- a/crates/services/p2p/src/request_response/protocols.rs +++ b/crates/services/p2p/src/request_response/protocols.rs @@ -8,7 +8,7 @@ use libp2p::{ StreamProtocol, }; -use super::messages::REQUEST_RESPONSE_PROTOCOL_ID; +use super::messages::REQUEST_RESPONSE_PROTOCOL_ID_V1; #[derive(Clone, Debug, PartialEq, Eq, PartialOrd, Ord)] pub enum ProtocolVersion { @@ -40,7 +40,7 @@ impl TryFrom for ProtocolVersion { fn try_from(protocol: StreamProtocol) -> Result { match protocol.as_ref() { - REQUEST_RESPONSE_PROTOCOL_ID => Ok(ProtocolVersion::V1), + REQUEST_RESPONSE_PROTOCOL_ID_V1 => Ok(ProtocolVersion::V1), _ => Err(()), } } @@ -58,7 +58,7 @@ mod tests { }; use crate::{ - codecs::postcard::MessageExchangePostcardProtocol, + codecs::postcard::MessageExchangePostcardProtocolV1, heartbeat::HEARTBEAT_PROTOCOL, request_response::protocols::ProtocolVersion, }; @@ -89,8 +89,10 @@ mod tests { #[test] fn test_latest_protocol_version_defined() { - let peer_info = - peer_info(&[MessageExchangePostcardProtocol.as_ref(), HEARTBEAT_PROTOCOL]); + let peer_info = peer_info(&[ + MessageExchangePostcardProtocolV1.as_ref(), + HEARTBEAT_PROTOCOL, + ]); let latest_compatible_version_for_peer = ProtocolVersion::latest_compatible_version_for_peer(&peer_info).unwrap(); assert_eq!( From e8c563ca6b82b3426d7803985bcb7595977febab Mon Sep 17 00:00:00 2001 From: acerone85 Date: Thu, 17 Oct 2024 18:17:21 +0100 Subject: [PATCH 09/23] Revert "Revert changes" This reverts commit f3b7db3b0f4aa92f2606c87ed6f65f4798734889. --- CHANGELOG.md | 3 - crates/services/p2p/src/behavior.rs | 20 ++-- crates/services/p2p/src/codecs/postcard.rs | 27 ++--- crates/services/p2p/src/p2p_service.rs | 51 +++----- crates/services/p2p/src/peer_manager.rs | 25 ---- crates/services/p2p/src/request_response.rs | 4 - .../p2p/src/request_response/messages.rs | 2 +- .../p2p/src/request_response/protocols.rs | 111 ------------------ crates/services/p2p/src/service.rs | 4 +- 9 files changed, 40 insertions(+), 207 deletions(-) delete mode 100644 crates/services/p2p/src/request_response/protocols.rs diff --git a/CHANGELOG.md b/CHANGELOG.md index 7d4fe31fa09..1fb76386829 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -9,9 +9,6 @@ and this project adheres to [Semantic Versioning](http://semver.org/). ### Added - [2321](https://github.com/FuelLabs/fuel-core/pull/2321): New metrics for the txpool: "The size of transactions in the txpool" (`txpool_tx_size`), "The time spent by a transaction in the txpool in seconds" (`txpool_tx_time_in_txpool_seconds`), The number of transactions in the txpool (`txpool_number_of_transactions`), "The number of transactions pending verification before entering the txpool" (`txpool_number_of_transactions_pending_verification`), "The number of executable transactions in the txpool" (`txpool_number_of_executable_transactions`), "The time it took to select transactions for inclusion in a block in nanoseconds" (`txpool_select_transaction_time_nanoseconds`), The time it took to insert a transaction in the txpool in milliseconds (`txpool_insert_transaction_time_milliseconds`). -### Changed -- [2362](https://github.com/FuelLabs/fuel-core/pull/2362): Preparation work for supporting multiple request response protocols in the P2P service. Information about request exchange protocols known by a peer is saved in the peer manager upon connection. Currently we only have `/fuel/req_res/0.0.1`, but a node will select the latest compatible version of the protocol that is known by a peer for sending messages. - ## [Version 0.40.0] ### Added diff --git a/crates/services/p2p/src/behavior.rs b/crates/services/p2p/src/behavior.rs index a2e33a3d039..2b689eb3949 100644 --- a/crates/services/p2p/src/behavior.rs +++ b/crates/services/p2p/src/behavior.rs @@ -1,6 +1,6 @@ use crate::{ codecs::{ - postcard::PostcardCodecV1, + postcard::PostcardCodec, NetworkCodec, }, config::Config, @@ -60,15 +60,12 @@ pub struct FuelBehaviour { /// Node discovery discovery: discovery::Behaviour, - /// RequestResponse protocol Version 1 - request_response_v1: request_response::Behaviour, + /// RequestResponse protocol + request_response: request_response::Behaviour, } impl FuelBehaviour { - pub(crate) fn new( - p2p_config: &Config, - codec: PostcardCodecV1, - ) -> anyhow::Result { + pub(crate) fn new(p2p_config: &Config, codec: PostcardCodec) -> anyhow::Result { let local_public_key = p2p_config.keypair.public(); let local_peer_id = PeerId::from_public_key(&local_public_key); @@ -122,7 +119,7 @@ impl FuelBehaviour { .with_request_timeout(p2p_config.set_request_timeout) .with_max_concurrent_streams(p2p_config.max_concurrent_streams); - let request_response_v1 = request_response::Behaviour::with_codec( + let request_response = request_response::Behaviour::with_codec( codec, req_res_protocol, req_res_config, @@ -133,7 +130,7 @@ impl FuelBehaviour { discovery, gossipsub, peer_report, - request_response_v1, + request_response, blocked_peer: Default::default(), identify, heartbeat, @@ -163,8 +160,7 @@ impl FuelBehaviour { message_request: RequestMessage, peer_id: &PeerId, ) -> OutboundRequestId { - self.request_response_v1 - .send_request(peer_id, message_request) + self.request_response.send_request(peer_id, message_request) } pub fn send_response_msg( @@ -172,7 +168,7 @@ impl FuelBehaviour { channel: ResponseChannel, message: ResponseMessage, ) -> Result<(), ResponseMessage> { - self.request_response_v1.send_response(channel, message) + self.request_response.send_response(channel, message) } pub fn report_message_validation_result( diff --git a/crates/services/p2p/src/codecs/postcard.rs b/crates/services/p2p/src/codecs/postcard.rs index 9f6b0770df2..94f23cd6fd2 100644 --- a/crates/services/p2p/src/codecs/postcard.rs +++ b/crates/services/p2p/src/codecs/postcard.rs @@ -11,7 +11,7 @@ use crate::{ request_response::messages::{ RequestMessage, ResponseMessage, - REQUEST_RESPONSE_PROTOCOL_ID_V1, + REQUEST_RESPONSE_PROTOCOL_ID, }, }; use async_trait::async_trait; @@ -40,14 +40,14 @@ fn serialize(data: &D) -> Result, io::Error> { } #[derive(Debug, Clone)] -pub struct PostcardCodec { +pub struct PostcardCodec { /// Used for `max_size` parameter when reading Response Message /// Necessary in order to avoid DoS attacks /// Currently the size mostly depends on the max size of the Block max_response_size: usize, } -impl PostcardCodec { +impl PostcardCodec { pub fn new(max_block_size: usize) -> Self { assert_ne!( max_block_size, 0, @@ -60,8 +60,6 @@ impl PostcardCodec { } } -pub type PostcardCodecV1 = PostcardCodec<0>; - /// Since Postcard does not support async reads or writes out of the box /// We prefix Request & Response Messages with the length of the data in bytes /// We expect the substream to be properly closed when response channel is dropped. @@ -70,8 +68,8 @@ pub type PostcardCodecV1 = PostcardCodec<0>; /// If the substream was not properly closed when dropped, the sender would instead /// run into a timeout waiting for the response. #[async_trait] -impl request_response::Codec for PostcardCodecV1 { - type Protocol = MessageExchangePostcardProtocolV1; +impl request_response::Codec for PostcardCodec { + type Protocol = MessageExchangePostcardProtocol; type Request = RequestMessage; type Response = ResponseMessage; @@ -137,8 +135,7 @@ impl request_response::Codec for PostcardCodecV1 { } } -// GossipsubCodec is independent of the PostcardCoded version being used -impl GossipsubCodec for PostcardCodec { +impl GossipsubCodec for PostcardCodec { type RequestMessage = GossipsubBroadcastRequest; type ResponseMessage = GossipsubMessage; @@ -163,18 +160,18 @@ impl GossipsubCodec for PostcardCodec { } } -impl NetworkCodec for PostcardCodecV1 { +impl NetworkCodec for PostcardCodec { fn get_req_res_protocol(&self) -> ::Protocol { - MessageExchangePostcardProtocolV1 {} + MessageExchangePostcardProtocol {} } } -#[derive(Default, Debug, Clone, PartialEq, Eq, PartialOrd, Ord)] -pub struct MessageExchangePostcardProtocolV1; +#[derive(Default, Debug, Clone)] +pub struct MessageExchangePostcardProtocol; -impl AsRef for MessageExchangePostcardProtocolV1 { +impl AsRef for MessageExchangePostcardProtocol { fn as_ref(&self) -> &str { - REQUEST_RESPONSE_PROTOCOL_ID_V1 + REQUEST_RESPONSE_PROTOCOL_ID } } diff --git a/crates/services/p2p/src/p2p_service.rs b/crates/services/p2p/src/p2p_service.rs index 4c0a1a63012..024cc006785 100644 --- a/crates/services/p2p/src/p2p_service.rs +++ b/crates/services/p2p/src/p2p_service.rs @@ -4,7 +4,7 @@ use crate::{ FuelBehaviourEvent, }, codecs::{ - postcard::PostcardCodecV1, + postcard::PostcardCodec, GossipsubCodec, }, config::{ @@ -26,16 +26,13 @@ use crate::{ Punisher, }, peer_report::PeerReportEvent, - request_response::{ - self as fuel_request_response, - messages::{ - RequestError, - RequestMessage, - ResponseError, - ResponseMessage, - ResponseSendError, - ResponseSender, - }, + request_response::messages::{ + RequestError, + RequestMessage, + ResponseError, + ResponseMessage, + ResponseSendError, + ResponseSender, }, TryPeerId, }; @@ -125,7 +122,7 @@ pub struct FuelP2PService { inbound_requests_table: HashMap>, /// NetworkCodec used as `` for encoding and decoding of Gossipsub messages - network_codec: PostcardCodecV1, + network_codec: PostcardCodec, /// Stores additional p2p network info network_metadata: NetworkMetadata, @@ -214,7 +211,7 @@ impl FuelP2PService { pub async fn new( reserved_peers_updates: broadcast::Sender, config: Config, - codec: PostcardCodecV1, + codec: PostcardCodec, ) -> anyhow::Result { let metrics = config.metrics; @@ -417,24 +414,14 @@ impl FuelP2PService { } }; - let latest_compatible_request_response_protocol_version = self - .peer_manager - .get_peer_info(&peer_id) - .and_then(|peer_info| peer_info.request_response_protocol_version.as_ref()) - .unwrap_or_default(); - - match latest_compatible_request_response_protocol_version { - fuel_request_response::ProtocolVersion::V1 => { - let request_id = self - .swarm - .behaviour_mut() - .send_request_msg(message_request, &peer_id); + let request_id = self + .swarm + .behaviour_mut() + .send_request_msg(message_request, &peer_id); - self.outbound_requests_table.insert(request_id, on_response); + self.outbound_requests_table.insert(request_id, on_response); - Ok(request_id) - } - } + Ok(request_id) } /// Sends ResponseMessage to a peer that requested the data @@ -580,7 +567,7 @@ impl FuelP2PService { self.handle_gossipsub_event(event) } FuelBehaviourEvent::PeerReport(event) => self.handle_peer_report_event(event), - FuelBehaviourEvent::RequestResponseV1(event) => { + FuelBehaviourEvent::RequestResponse(event) => { self.handle_request_response_event(event) } FuelBehaviourEvent::Identify(event) => { @@ -785,10 +772,9 @@ impl FuelP2PService { identify::Event::Received { peer_id, info } => { self.update_metrics(increment_unique_peers); - let request_response_protocol_version = - fuel_request_response::ProtocolVersion::latest_compatible_version_for_peer(&info); let mut addresses = info.listen_addrs; let agent_version = info.agent_version; + if addresses.len() > MAX_IDENTIFY_ADDRESSES { let protocol_version = info.protocol_version; debug!( @@ -803,7 +789,6 @@ impl FuelP2PService { &peer_id, addresses.clone(), agent_version, - request_response_protocol_version, ); self.swarm diff --git a/crates/services/p2p/src/peer_manager.rs b/crates/services/p2p/src/peer_manager.rs index 28e292b990a..1c4cf6251d9 100644 --- a/crates/services/p2p/src/peer_manager.rs +++ b/crates/services/p2p/src/peer_manager.rs @@ -31,10 +31,6 @@ use tracing::{ use crate::{ gossipsub_config::GRAYLIST_THRESHOLD, peer_manager::heartbeat_data::HeartbeatData, - request_response::{ - self, - ProtocolVersion, - }, }; pub mod heartbeat_data; @@ -49,9 +45,6 @@ pub struct PeerInfo { pub client_version: Option, pub heartbeat_data: HeartbeatData, pub score: AppScore, - /// The latest protocol version that the peer supports and that is - /// compatible with the current version of the node - pub request_response_protocol_version: Option, } impl PeerInfo { @@ -61,7 +54,6 @@ impl PeerInfo { client_version: None, heartbeat_data: HeartbeatData::new(heartbeat_avg_window), score: DEFAULT_APP_SCORE, - request_response_protocol_version: None, } } } @@ -143,14 +135,10 @@ impl PeerManager { peer_id: &PeerId, addresses: Vec, agent_version: String, - protocol_version: Option, ) { let peers = self.get_assigned_peer_table_mut(peer_id); insert_client_version(peers, peer_id, agent_version); insert_peer_addresses(peers, peer_id, addresses); - protocol_version.into_iter().for_each(|protocol| { - update_request_response_protocol_version(peers, peer_id, protocol) - }); } pub fn batch_update_score_with_decay(&mut self) { @@ -368,19 +356,6 @@ fn insert_client_version( } } -// Updates the latest request response protocol version that the peer supports -fn update_request_response_protocol_version( - peers: &mut HashMap, - peer_id: &PeerId, - protocol: ProtocolVersion, -) { - if let Some(peer) = peers.get_mut(peer_id) { - peer.request_response_protocol_version = Some(protocol); - } else { - log_missing_peer(peer_id); - } -} - fn log_missing_peer(peer_id: &PeerId) { debug!(target: "fuel-p2p", "Peer with PeerId: {:?} is not among the connected peers", peer_id) } diff --git a/crates/services/p2p/src/request_response.rs b/crates/services/p2p/src/request_response.rs index 21e7c503087..ba63992f3cb 100644 --- a/crates/services/p2p/src/request_response.rs +++ b/crates/services/p2p/src/request_response.rs @@ -1,5 +1 @@ pub mod messages; - -pub mod protocols; - -pub use protocols::ProtocolVersion; diff --git a/crates/services/p2p/src/request_response/messages.rs b/crates/services/p2p/src/request_response/messages.rs index 10fe110e762..83f3f7a3a50 100644 --- a/crates/services/p2p/src/request_response/messages.rs +++ b/crates/services/p2p/src/request_response/messages.rs @@ -18,7 +18,7 @@ use std::ops::Range; use thiserror::Error; use tokio::sync::oneshot; -pub(crate) const REQUEST_RESPONSE_PROTOCOL_ID_V1: &str = "/fuel/req_res/0.0.1"; +pub(crate) const REQUEST_RESPONSE_PROTOCOL_ID: &str = "/fuel/req_res/0.0.1"; /// Max Size in Bytes of the Request Message #[cfg(test)] diff --git a/crates/services/p2p/src/request_response/protocols.rs b/crates/services/p2p/src/request_response/protocols.rs deleted file mode 100644 index 963f2976bbe..00000000000 --- a/crates/services/p2p/src/request_response/protocols.rs +++ /dev/null @@ -1,111 +0,0 @@ -//! This module contains structs and functions for versioning -//! request response protocols, and for recovering the list -//! of different versions of the protocol understood by -//! connected peers. - -use libp2p::{ - identify, - StreamProtocol, -}; - -use super::messages::REQUEST_RESPONSE_PROTOCOL_ID_V1; - -#[derive(Clone, Debug, PartialEq, Eq, PartialOrd, Ord)] -pub enum ProtocolVersion { - /// The Version 1 of the protocol. This version does not have error codes - /// in the response messages. - V1, -} - -impl Default for &ProtocolVersion { - fn default() -> Self { - &ProtocolVersion::V1 - } -} - -impl ProtocolVersion { - pub fn latest_compatible_version_for_peer( - info: &identify::Info, - ) -> Option { - info.protocols - .iter() - .filter_map(|protocol| ProtocolVersion::try_from(protocol.clone()).ok()) - .max() - } -} - -impl TryFrom for ProtocolVersion { - // TODO: Better error type - type Error = (); - - fn try_from(protocol: StreamProtocol) -> Result { - match protocol.as_ref() { - REQUEST_RESPONSE_PROTOCOL_ID_V1 => Ok(ProtocolVersion::V1), - _ => Err(()), - } - } -} - -#[cfg(test)] -mod tests { - use libp2p::{ - identify::{ - self, - }, - identity::PublicKey, - Multiaddr, - StreamProtocol, - }; - - use crate::{ - codecs::postcard::MessageExchangePostcardProtocolV1, - heartbeat::HEARTBEAT_PROTOCOL, - request_response::protocols::ProtocolVersion, - }; - - fn peer_info(protocols: &[impl AsRef]) -> identify::Info { - // This public key is valid, it has been copied from libp2p tests. - let public_key = PublicKey::try_decode_protobuf(&hex::decode( - "080112201ed1e8fae2c4a144b8be8fd4b47bf3d3b34b871c3cacf6010f0e42d474fce27e", - ).expect("Decoding hexadecimal string cannot fail")).expect("Decoding valid public key cannot fail"); - - let mut stream_protocols: Vec = - Vec::with_capacity(protocols.len()); - for protocol in protocols { - stream_protocols.push( - StreamProtocol::try_from_owned(protocol.as_ref().to_string()).unwrap(), - ); - } - - identify::Info { - protocols: stream_protocols, - agent_version: "0.0.1".to_string(), - protocol_version: "0.0.1".to_string(), - public_key, - listen_addrs: vec![], - observed_addr: Multiaddr::empty(), - } - } - - #[test] - fn test_latest_protocol_version_defined() { - let peer_info = peer_info(&[ - MessageExchangePostcardProtocolV1.as_ref(), - HEARTBEAT_PROTOCOL, - ]); - let latest_compatible_version_for_peer = - ProtocolVersion::latest_compatible_version_for_peer(&peer_info).unwrap(); - assert_eq!( - latest_compatible_version_for_peer, - crate::request_response::protocols::ProtocolVersion::V1 - ); - } - - #[test] - fn test_latest_protocol_version_undefined() { - let peer_info = peer_info(&[HEARTBEAT_PROTOCOL, "/some/other/protocol/1.0.0"]); - let latest_compatible_version_for_peer = - ProtocolVersion::latest_compatible_version_for_peer(&peer_info); - assert!(latest_compatible_version_for_peer.is_none(),); - } -} diff --git a/crates/services/p2p/src/service.rs b/crates/services/p2p/src/service.rs index 533b9a8656e..3ff09da0beb 100644 --- a/crates/services/p2p/src/service.rs +++ b/crates/services/p2p/src/service.rs @@ -967,7 +967,7 @@ where #[derive(Clone)] pub struct SharedState { - /// Sender of p2p with peer gossip subscription (`Vec` represents the peer_id) + /// Sender of p2p with peer gossip subscription (vec represent the peer_id) new_tx_subscription_broadcast: broadcast::Sender, /// Sender of p2p transaction used for subscribing. tx_broadcast: broadcast::Sender, @@ -1534,7 +1534,6 @@ pub mod tests { client_version: None, heartbeat_data, score: 100.0, - request_response_protocol_version: None, }; let peer_info = vec![(peer_id, peer_info)]; let p2p_service = FakeP2PService { @@ -1625,7 +1624,6 @@ pub mod tests { client_version: None, heartbeat_data, score: 100.0, - request_response_protocol_version: None, }; let peer_info = vec![(peer_id, peer_info)]; let p2p_service = FakeP2PService { From f50a4d3874c6b1d464ad83c529956a6c7a473819 Mon Sep 17 00:00:00 2001 From: acerone85 Date: Thu, 17 Oct 2024 18:04:46 +0100 Subject: [PATCH 10/23] P2P Service: Add /fuel/req_res/0.0.2 that sends error codes in responses to unsuccessful requests --- Cargo.lock | 2 + crates/fuel-core/src/p2p_test_helpers.rs | 2 +- crates/services/p2p/Cargo.toml | 2 + crates/services/p2p/src/behavior.rs | 7 +- crates/services/p2p/src/codecs.rs | 4 +- crates/services/p2p/src/codecs/postcard.rs | 54 +++++++++++---- crates/services/p2p/src/p2p_service.rs | 19 +++--- .../p2p/src/request_response/messages.rs | 67 ++++++++++++++++++- crates/services/p2p/src/service.rs | 48 +++++++++---- 9 files changed, 164 insertions(+), 41 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 7574ea76ac7..d4d274e7f71 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3558,6 +3558,8 @@ dependencies = [ "serde", "serde_with", "sha2 0.10.8", + "strum 0.25.0", + "strum_macros 0.25.3", "thiserror", "tokio", "tracing", diff --git a/crates/fuel-core/src/p2p_test_helpers.rs b/crates/fuel-core/src/p2p_test_helpers.rs index fbc9f32afb4..872835f4da8 100644 --- a/crates/fuel-core/src/p2p_test_helpers.rs +++ b/crates/fuel-core/src/p2p_test_helpers.rs @@ -178,7 +178,7 @@ impl Bootstrap { if request_message == RequestMessage::TxPoolAllTransactionsIds { let _ = bootstrap.send_response_msg( request_id, - ResponseMessage::TxPoolAllTransactionsIds(Some(vec![])), + ResponseMessage::TxPoolAllTransactionsIds(Ok(vec![])), ); } } diff --git a/crates/services/p2p/Cargo.toml b/crates/services/p2p/Cargo.toml index 5455e83c5f3..09662d26311 100644 --- a/crates/services/p2p/Cargo.toml +++ b/crates/services/p2p/Cargo.toml @@ -48,6 +48,8 @@ rayon = { workspace = true } serde = { workspace = true, features = ["derive"] } serde_with = { workspace = true } sha2 = "0.10" +strum = { workspace = true } +strum_macros = { workspace = true } thiserror = "1.0.47" tokio = { workspace = true, features = ["sync"] } tracing = { workspace = true } diff --git a/crates/services/p2p/src/behavior.rs b/crates/services/p2p/src/behavior.rs index 2b689eb3949..9929dabbf46 100644 --- a/crates/services/p2p/src/behavior.rs +++ b/crates/services/p2p/src/behavior.rs @@ -112,15 +112,16 @@ impl FuelBehaviour { BlockHeight::default(), ); - let req_res_protocol = - core::iter::once((codec.get_req_res_protocol(), ProtocolSupport::Full)); + let req_res_protocol = codec + .get_req_res_protocols() + .map(|protocol| (protocol, ProtocolSupport::Full)); let req_res_config = request_response::Config::default() .with_request_timeout(p2p_config.set_request_timeout) .with_max_concurrent_streams(p2p_config.max_concurrent_streams); let request_response = request_response::Behaviour::with_codec( - codec, + codec.clone(), req_res_protocol, req_res_config, ); diff --git a/crates/services/p2p/src/codecs.rs b/crates/services/p2p/src/codecs.rs index c22aacd5671..283fef1ee6c 100644 --- a/crates/services/p2p/src/codecs.rs +++ b/crates/services/p2p/src/codecs.rs @@ -41,5 +41,7 @@ pub trait NetworkCodec: { /// Returns RequestResponse's Protocol /// Needed for initialization of RequestResponse Behaviour - fn get_req_res_protocol(&self) -> ::Protocol; + fn get_req_res_protocols( + &self, + ) -> impl Iterator::Protocol>; } diff --git a/crates/services/p2p/src/codecs/postcard.rs b/crates/services/p2p/src/codecs/postcard.rs index 94f23cd6fd2..4fea7484786 100644 --- a/crates/services/p2p/src/codecs/postcard.rs +++ b/crates/services/p2p/src/codecs/postcard.rs @@ -9,9 +9,11 @@ use crate::{ GossipsubMessage, }, request_response::messages::{ + LegacyResponseMessage, RequestMessage, ResponseMessage, REQUEST_RESPONSE_PROTOCOL_ID, + REQUEST_RESPONSE_WITH_ERROR_CODES_PROTOCOL_ID, }, }; use async_trait::async_trait; @@ -26,6 +28,8 @@ use serde::{ Serialize, }; use std::io; +use strum::IntoEnumIterator; +use strum_macros::EnumIter; /// Helper method for decoding data /// Reusable across `RequestResponseCodec` and `GossipsubCodec` @@ -69,13 +73,13 @@ impl PostcardCodec { /// run into a timeout waiting for the response. #[async_trait] impl request_response::Codec for PostcardCodec { - type Protocol = MessageExchangePostcardProtocol; + type Protocol = PostcardProtocol; type Request = RequestMessage; type Response = ResponseMessage; async fn read_request( &mut self, - _: &Self::Protocol, + _protocol: &Self::Protocol, socket: &mut T, ) -> io::Result where @@ -91,7 +95,7 @@ impl request_response::Codec for PostcardCodec { async fn read_response( &mut self, - _: &Self::Protocol, + protocol: &Self::Protocol, socket: &mut T, ) -> io::Result where @@ -103,7 +107,13 @@ impl request_response::Codec for PostcardCodec { .read_to_end(&mut response) .await?; - deserialize(&response) + match protocol { + PostcardProtocol::V1 => { + let legacy_response = deserialize::(&response)?; + Ok(legacy_response.into()) + } + PostcardProtocol::V2 => deserialize::(&response), + } } async fn write_request( @@ -122,14 +132,20 @@ impl request_response::Codec for PostcardCodec { async fn write_response( &mut self, - _protocol: &Self::Protocol, + protocol: &Self::Protocol, socket: &mut T, res: Self::Response, ) -> io::Result<()> where T: futures::AsyncWrite + Unpin + Send, { - let encoded_data = serialize(&res)?; + let encoded_data = match protocol { + PostcardProtocol::V1 => { + let legacy_response: LegacyResponseMessage = res.into(); + serialize(&legacy_response)? + } + PostcardProtocol::V2 => serialize(&res)?, + }; socket.write_all(&encoded_data).await?; Ok(()) } @@ -161,17 +177,31 @@ impl GossipsubCodec for PostcardCodec { } impl NetworkCodec for PostcardCodec { - fn get_req_res_protocol(&self) -> ::Protocol { - MessageExchangePostcardProtocol {} + fn get_req_res_protocols( + &self, + ) -> impl Iterator::Protocol> { + PostcardProtocol::iter() } } -#[derive(Default, Debug, Clone)] -pub struct MessageExchangePostcardProtocol; +#[derive(Debug, Clone, EnumIter)] +pub enum PostcardProtocol { + V1, + V2, +} -impl AsRef for MessageExchangePostcardProtocol { +impl Default for PostcardProtocol { + fn default() -> Self { + PostcardProtocol::V1 + } +} + +impl AsRef for PostcardProtocol { fn as_ref(&self) -> &str { - REQUEST_RESPONSE_PROTOCOL_ID + match self { + PostcardProtocol::V1 => REQUEST_RESPONSE_PROTOCOL_ID, + PostcardProtocol::V2 => REQUEST_RESPONSE_WITH_ERROR_CODES_PROTOCOL_ID, + } } } diff --git a/crates/services/p2p/src/p2p_service.rs b/crates/services/p2p/src/p2p_service.rs index 024cc006785..556c7b1de29 100644 --- a/crates/services/p2p/src/p2p_service.rs +++ b/crates/services/p2p/src/p2p_service.rs @@ -675,7 +675,8 @@ impl FuelP2PService { let send_ok = match channel { ResponseSender::SealedHeaders(c) => match response { ResponseMessage::SealedHeaders(v) => { - c.send((peer, Ok(v))).is_ok() + // TODO[AC]: Change type of ResponseSender and remove the .ok() here + c.send((peer, Ok(v.ok()))).is_ok() } _ => { warn!( @@ -687,7 +688,7 @@ impl FuelP2PService { }, ResponseSender::Transactions(c) => match response { ResponseMessage::Transactions(v) => { - c.send((peer, Ok(v))).is_ok() + c.send((peer, Ok(v.ok()))).is_ok() } _ => { warn!( @@ -699,7 +700,7 @@ impl FuelP2PService { }, ResponseSender::TxPoolAllTransactionsIds(c) => match response { ResponseMessage::TxPoolAllTransactionsIds(v) => { - c.send((peer, Ok(v))).is_ok() + c.send((peer, Ok(v.ok()))).is_ok() } _ => { warn!( @@ -711,7 +712,7 @@ impl FuelP2PService { }, ResponseSender::TxPoolFullTransactions(c) => match response { ResponseMessage::TxPoolFullTransactions(v) => { - c.send((peer, Ok(v))).is_ok() + c.send((peer, Ok(v.ok()))).is_ok() } _ => { warn!( @@ -1778,16 +1779,16 @@ mod tests { RequestMessage::SealedHeaders(range) => { let sealed_headers: Vec<_> = arbitrary_headers_for_range(range.clone()); - let _ = node_b.send_response_msg(*request_id, ResponseMessage::SealedHeaders(Some(sealed_headers))); + let _ = node_b.send_response_msg(*request_id, ResponseMessage::SealedHeaders(Ok(sealed_headers))); } RequestMessage::Transactions(_) => { let txs = (0..5).map(|_| Transaction::default_test_tx()).collect(); let transactions = vec![Transactions(txs)]; - let _ = node_b.send_response_msg(*request_id, ResponseMessage::Transactions(Some(transactions))); + let _ = node_b.send_response_msg(*request_id, ResponseMessage::Transactions(Ok(transactions))); } RequestMessage::TxPoolAllTransactionsIds => { let tx_ids = (0..5).map(|_| Transaction::default_test_tx().id(&ChainId::new(1))).collect(); - let _ = node_b.send_response_msg(*request_id, ResponseMessage::TxPoolAllTransactionsIds(Some(tx_ids))); + let _ = node_b.send_response_msg(*request_id, ResponseMessage::TxPoolAllTransactionsIds(Ok(tx_ids))); } RequestMessage::TxPoolFullTransactions(tx_ids) => { let txs = tx_ids.iter().enumerate().map(|(i, _)| { @@ -1797,7 +1798,7 @@ mod tests { Some(NetworkableTransactionPool::Transaction(Transaction::default_test_tx())) } }).collect(); - let _ = node_b.send_response_msg(*request_id, ResponseMessage::TxPoolFullTransactions(Some(txs))); + let _ = node_b.send_response_msg(*request_id, ResponseMessage::TxPoolFullTransactions(Ok(txs))); } } } @@ -1905,7 +1906,7 @@ mod tests { // 2. Node B receives the RequestMessage from Node A initiated by the NetworkOrchestrator if let Some(FuelP2PEvent::InboundRequestMessage{ request_id, request_message: _ }) = &node_b_event { let sealed_headers: Vec<_> = arbitrary_headers_for_range(1..3); - let _ = node_b.send_response_msg(*request_id, ResponseMessage::SealedHeaders(Some(sealed_headers))); + let _ = node_b.send_response_msg(*request_id, ResponseMessage::SealedHeaders(Ok(sealed_headers))); } tracing::info!("Node B Event: {:?}", node_b_event); diff --git a/crates/services/p2p/src/request_response/messages.rs b/crates/services/p2p/src/request_response/messages.rs index 83f3f7a3a50..ef0d49b69a2 100644 --- a/crates/services/p2p/src/request_response/messages.rs +++ b/crates/services/p2p/src/request_response/messages.rs @@ -19,6 +19,8 @@ use thiserror::Error; use tokio::sync::oneshot; pub(crate) const REQUEST_RESPONSE_PROTOCOL_ID: &str = "/fuel/req_res/0.0.1"; +pub(crate) const REQUEST_RESPONSE_WITH_ERROR_CODES_PROTOCOL_ID: &str = + "/fuel/req_res/0.0.2"; /// Max Size in Bytes of the Request Message #[cfg(test)] @@ -32,14 +34,77 @@ pub enum RequestMessage { TxPoolFullTransactions(Vec), } +// TODO: Do we want explicit status codes or an Error type? +#[derive(Error, Debug, Clone, Serialize, Deserialize)] +pub enum ResponseMessageErrorCode { + /// The peer sent an empty response using protocol `/fuel/req_res/0.0.1` + #[error("Empty response sent by peer using legacy protocol /fuel/req_res/0.0.1")] + ProtocolV1EmptyResponse = 0, +} + #[derive(Debug, Clone, Serialize, Deserialize)] -pub enum ResponseMessage { +pub enum LegacyResponseMessage { SealedHeaders(Option>), Transactions(Option>), TxPoolAllTransactionsIds(Option>), TxPoolFullTransactions(Option>>), } +#[derive(Debug, Clone, Serialize, Deserialize)] +pub enum ResponseMessage { + SealedHeaders(Result, ResponseMessageErrorCode>), + Transactions(Result, ResponseMessageErrorCode>), + TxPoolAllTransactionsIds(Result, ResponseMessageErrorCode>), + TxPoolFullTransactions( + Result>, ResponseMessageErrorCode>, + ), +} + +impl From for ResponseMessage { + fn from(v1_response: LegacyResponseMessage) -> Self { + match v1_response { + LegacyResponseMessage::SealedHeaders(sealed_headers) => { + ResponseMessage::SealedHeaders( + sealed_headers + .ok_or(ResponseMessageErrorCode::ProtocolV1EmptyResponse), + ) + } + LegacyResponseMessage::Transactions(vec) => ResponseMessage::Transactions( + vec.ok_or(ResponseMessageErrorCode::ProtocolV1EmptyResponse), + ), + LegacyResponseMessage::TxPoolAllTransactionsIds(vec) => { + ResponseMessage::TxPoolAllTransactionsIds( + vec.ok_or(ResponseMessageErrorCode::ProtocolV1EmptyResponse), + ) + } + LegacyResponseMessage::TxPoolFullTransactions(vec) => { + ResponseMessage::TxPoolFullTransactions( + vec.ok_or(ResponseMessageErrorCode::ProtocolV1EmptyResponse), + ) + } + } + } +} + +impl From for LegacyResponseMessage { + fn from(response: ResponseMessage) -> Self { + match response { + ResponseMessage::SealedHeaders(sealed_headers) => { + LegacyResponseMessage::SealedHeaders(sealed_headers.ok()) + } + ResponseMessage::Transactions(transactions) => { + LegacyResponseMessage::Transactions(transactions.ok()) + } + ResponseMessage::TxPoolAllTransactionsIds(tx_ids) => { + LegacyResponseMessage::TxPoolAllTransactionsIds(tx_ids.ok()) + } + ResponseMessage::TxPoolFullTransactions(tx_pool) => { + LegacyResponseMessage::TxPoolFullTransactions(tx_pool.ok()) + } + } + } +} + pub type OnResponse = oneshot::Sender<(PeerId, Result)>; #[derive(Debug)] diff --git a/crates/services/p2p/src/service.rs b/crates/services/p2p/src/service.rs index 3ff09da0beb..22f7d50c778 100644 --- a/crates/services/p2p/src/service.rs +++ b/crates/services/p2p/src/service.rs @@ -23,6 +23,7 @@ use crate::{ OnResponse, RequestMessage, ResponseMessage, + ResponseMessageErrorCode, ResponseSender, }, }; @@ -136,19 +137,20 @@ pub enum TaskRequest { reporting_service: &'static str, }, DatabaseTransactionsLookUp { - response: Option>, + response: Result, ResponseMessageErrorCode>, request_id: InboundRequestId, }, DatabaseHeaderLookUp { - response: Option>, + response: Result, ResponseMessageErrorCode>, request_id: InboundRequestId, }, TxPoolAllTransactionsIds { - response: Option>, + response: Result, ResponseMessageErrorCode>, request_id: InboundRequestId, }, TxPoolFullTransactions { - response: Option>>, + response: + Result>, ResponseMessageErrorCode>, request_id: InboundRequestId, }, } @@ -532,8 +534,11 @@ where where DbLookUpFn: Fn(&V::LatestView, Range) -> anyhow::Result> + Send + 'static, - ResponseSenderFn: Fn(Option) -> ResponseMessage + Send + 'static, - TaskRequestFn: Fn(Option, InboundRequestId) -> TaskRequest + Send + 'static, + ResponseSenderFn: + Fn(Result) -> ResponseMessage + Send + 'static, + TaskRequestFn: Fn(Result, InboundRequestId) -> TaskRequest + + Send + + 'static, R: Send + 'static, { let instant = Instant::now(); @@ -550,7 +555,8 @@ where "Requested range is too big" ); // TODO: Return helpful error message to requester. https://github.com/FuelLabs/fuel-core/issues/1311 - let response = None; + // TODO[AC] Use more meaningful error codes + let response = Err(ResponseMessageErrorCode::ProtocolV1EmptyResponse); let _ = self .p2p_service .send_response_msg(request_id, response_sender(response)); @@ -564,17 +570,23 @@ where return; } - let response = db_lookup(&view, range.clone()).ok().flatten(); + // TODO[AC] Assign an error code to this + let response = db_lookup(&view, range.clone()) + .ok() + .flatten() + .ok_or(ResponseMessageErrorCode::ProtocolV1EmptyResponse); let _ = response_channel .try_send(task_request(response, request_id)) .trace_err("Failed to send response to the request channel"); }); + // TODO[AC]: Handle error cases and return meaningful status codes if result.is_err() { + let err = Err(ResponseMessageErrorCode::ProtocolV1EmptyResponse); let _ = self .p2p_service - .send_response_msg(request_id, response_sender(None)); + .send_response_msg(request_id, response_sender(err)); } Ok(()) @@ -624,8 +636,11 @@ where task_request: TaskRequestFn, ) -> anyhow::Result<()> where - ResponseSenderFn: Fn(Option) -> ResponseMessage + Send + 'static, - TaskRequestFn: Fn(Option, InboundRequestId) -> TaskRequest + Send + 'static, + ResponseSenderFn: + Fn(Result) -> ResponseMessage + Send + 'static, + TaskRequestFn: Fn(Result, InboundRequestId) -> TaskRequest + + Send + + 'static, F: Future> + Send + 'static, { let instant = Instant::now(); @@ -644,14 +659,16 @@ where // TODO: Return helpful error message to requester. https://github.com/FuelLabs/fuel-core/issues/1311 let _ = response_channel - .try_send(task_request(Some(response), request_id)) + .try_send(task_request(Ok(response), request_id)) .trace_err("Failed to send response to the request channel"); }); if result.is_err() { + // TODO[AC]: return better error code + let res = Err(ResponseMessageErrorCode::ProtocolV1EmptyResponse); let _ = self .p2p_service - .send_response_msg(request_id, response_sender(None)); + .send_response_msg(request_id, response_sender(res)); } Ok(()) @@ -680,10 +697,13 @@ where request_id: InboundRequestId, ) -> anyhow::Result<()> { // TODO: Return helpful error message to requester. https://github.com/FuelLabs/fuel-core/issues/1311 + // TODO[AC] Use more meaningful error codes if tx_ids.len() > self.max_txs_per_request { self.p2p_service.send_response_msg( request_id, - ResponseMessage::TxPoolFullTransactions(None), + ResponseMessage::TxPoolFullTransactions(Err( + ResponseMessageErrorCode::ProtocolV1EmptyResponse, + )), )?; return Ok(()); } From 77841f08d71021b957fd085845fd5c0e0a036209 Mon Sep 17 00:00:00 2001 From: acerone85 Date: Mon, 21 Oct 2024 11:10:52 +0100 Subject: [PATCH 11/23] Prefer V2 over V1 when peers support both versions of the protocol --- crates/services/p2p/src/codecs/postcard.rs | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/crates/services/p2p/src/codecs/postcard.rs b/crates/services/p2p/src/codecs/postcard.rs index 4fea7484786..ae834248731 100644 --- a/crates/services/p2p/src/codecs/postcard.rs +++ b/crates/services/p2p/src/codecs/postcard.rs @@ -176,11 +176,15 @@ impl GossipsubCodec for PostcardCodec { } } +// TODO: Remove this NetworkCodec impl NetworkCodec for PostcardCodec { fn get_req_res_protocols( &self, ) -> impl Iterator::Protocol> { - PostcardProtocol::iter() + // TODO: Iterating over versions in reverse order should prefer + // peers to use V2 over V1 for exchanging messages. However, this is + // not guaranteed by the specs for the `request_response` protocol. + PostcardProtocol::iter().rev() } } From 71600349aa75f2eca8206af873178a5f6b1bbb38 Mon Sep 17 00:00:00 2001 From: acerone85 Date: Mon, 21 Oct 2024 11:20:09 +0100 Subject: [PATCH 12/23] Changelog --- CHANGELOG.md | 1 + 1 file changed, 1 insertion(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 1fb76386829..50d8621d011 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -8,6 +8,7 @@ and this project adheres to [Semantic Versioning](http://semver.org/). ### Added - [2321](https://github.com/FuelLabs/fuel-core/pull/2321): New metrics for the txpool: "The size of transactions in the txpool" (`txpool_tx_size`), "The time spent by a transaction in the txpool in seconds" (`txpool_tx_time_in_txpool_seconds`), The number of transactions in the txpool (`txpool_number_of_transactions`), "The number of transactions pending verification before entering the txpool" (`txpool_number_of_transactions_pending_verification`), "The number of executable transactions in the txpool" (`txpool_number_of_executable_transactions`), "The time it took to select transactions for inclusion in a block in nanoseconds" (`txpool_select_transaction_time_nanoseconds`), The time it took to insert a transaction in the txpool in milliseconds (`txpool_insert_transaction_time_milliseconds`). +- [2362](https://github.com/FuelLabs/fuel-core/pull/2362): Added a new request_response protocol version `/fuel/req_res/0.0.2`. In comparison with `/fuel/req/0.0.1`, which returns an empty response when a request cannot be fulfilled, this version returns more meaningful error codes. Nodes still support the version `0.0.1` of the protocol to guarantee backward compatibility with fuel-core nodes. Empty responses received from nodes using the old protocol `/fuel/req/0.0.1` are automatically converted into an error `ProtocolV1EmptyResponse` with error code 0, which is also the only error code implemented. More specific error codes will be added in the future. ## [Version 0.40.0] From b2b5b3cb6040b732eec70f0f3d5a24e77bb1110b Mon Sep 17 00:00:00 2001 From: acerone85 Date: Mon, 21 Oct 2024 12:28:20 +0100 Subject: [PATCH 13/23] PostcardProtocol default implementation is now derived --- crates/services/p2p/src/codecs/postcard.rs | 9 ++------- 1 file changed, 2 insertions(+), 7 deletions(-) diff --git a/crates/services/p2p/src/codecs/postcard.rs b/crates/services/p2p/src/codecs/postcard.rs index ae834248731..b2701c0be6f 100644 --- a/crates/services/p2p/src/codecs/postcard.rs +++ b/crates/services/p2p/src/codecs/postcard.rs @@ -188,18 +188,13 @@ impl NetworkCodec for PostcardCodec { } } -#[derive(Debug, Clone, EnumIter)] +#[derive(Debug, Default, Clone, EnumIter)] pub enum PostcardProtocol { + #[default] V1, V2, } -impl Default for PostcardProtocol { - fn default() -> Self { - PostcardProtocol::V1 - } -} - impl AsRef for PostcardProtocol { fn as_ref(&self) -> &str { match self { From 93b4d06900e79e58be7205a6f8bdffbed84d78ec Mon Sep 17 00:00:00 2001 From: acerone85 Date: Mon, 21 Oct 2024 19:25:38 +0100 Subject: [PATCH 14/23] Add tests to check serialization and deserialization of responses --- crates/services/p2p/src/codecs/postcard.rs | 156 ++++++++++++++++++++- 1 file changed, 155 insertions(+), 1 deletion(-) diff --git a/crates/services/p2p/src/codecs/postcard.rs b/crates/services/p2p/src/codecs/postcard.rs index b2701c0be6f..93e0cd99a0c 100644 --- a/crates/services/p2p/src/codecs/postcard.rs +++ b/crates/services/p2p/src/codecs/postcard.rs @@ -206,8 +206,14 @@ impl AsRef for PostcardProtocol { #[cfg(test)] mod tests { + use fuel_core_types::blockchain::SealedBlockHeader; + use request_response::Codec as _; + use super::*; - use crate::request_response::messages::MAX_REQUEST_SIZE; + use crate::request_response::messages::{ + ResponseMessageErrorCode, + MAX_REQUEST_SIZE, + }; #[test] fn test_request_size_fits() { @@ -215,4 +221,152 @@ mod tests { let m = RequestMessage::Transactions(arbitrary_range); assert!(postcard::to_stdvec(&m).unwrap().len() <= MAX_REQUEST_SIZE); } + + #[tokio::test] + async fn serialzation_roundtrip_using_v2() { + let sealed_block_headers = vec![SealedBlockHeader::default()]; + let response = ResponseMessage::SealedHeaders(Ok(sealed_block_headers.clone())); + let mut codec = PostcardCodec::new(1024); + let mut buf = Vec::with_capacity(1024); + codec + .write_response(&PostcardProtocol::V2, &mut buf, response) + .await + .expect("Valid Vec should be serialized using v1"); + + let deserialized = codec + .read_response(&PostcardProtocol::V2, &mut buf.as_slice()) + .await + .expect("Valid Vec should be deserialized using v1"); + + match deserialized { + ResponseMessage::SealedHeaders(Ok(sealed_block_headers_response)) => { + assert_eq!(sealed_block_headers, sealed_block_headers_response); + } + other => { + panic!("Deserialized to {other:?}, expected {sealed_block_headers:?}") + } + } + } + + #[tokio::test] + async fn serialzation_roundtrip_using_v1() { + let sealed_block_headers = vec![SealedBlockHeader::default()]; + let response = ResponseMessage::SealedHeaders(Ok(sealed_block_headers.clone())); + let mut codec = PostcardCodec::new(1024); + let mut buf = Vec::with_capacity(1024); + codec + .write_response(&PostcardProtocol::V1, &mut buf, response) + .await + .expect("Valid Vec should be serialized using v1"); + + let deserialized = codec + .read_response(&PostcardProtocol::V1, &mut buf.as_slice()) + .await + .expect("Valid Vec should be deserialized using v1"); + + match deserialized { + ResponseMessage::SealedHeaders(Ok(sealed_block_headers_response)) => { + assert_eq!(sealed_block_headers, sealed_block_headers_response); + } + other => { + panic!("Deserialized to {other:?}, expected {sealed_block_headers:?}") + } + } + } + + #[tokio::test] + async fn serialzation_roundtrip_using_v2_error_response() { + let response = ResponseMessage::SealedHeaders(Err( + ResponseMessageErrorCode::ProtocolV1EmptyResponse, + )); + let mut codec = PostcardCodec::new(1024); + let mut buf = Vec::with_capacity(1024); + codec + .write_response(&PostcardProtocol::V2, &mut buf, response.clone()) + .await + .expect("Valid Vec is serialized using v1"); + + let deserialized = codec + .read_response(&PostcardProtocol::V2, &mut buf.as_slice()) + .await + .expect("Valid Vec is deserialized using v1"); + + match deserialized { + ResponseMessage::SealedHeaders(Err( + ResponseMessageErrorCode::ProtocolV1EmptyResponse, + )) => {} + other => { + panic!("Deserialized to {other:?}, expected {response:?}") + } + } + } + + #[tokio::test] + async fn serialzation_roundtrip_using_v1_error_response() { + let response = ResponseMessage::SealedHeaders(Err( + ResponseMessageErrorCode::ProtocolV1EmptyResponse, + )); + let mut codec = PostcardCodec::new(1024); + let mut buf = Vec::with_capacity(1024); + codec + .write_response(&PostcardProtocol::V1, &mut buf, response.clone()) + .await + .expect("Valid Vec is serialized using v1"); + + let deserialized = codec + .read_response(&PostcardProtocol::V1, &mut buf.as_slice()) + .await + .expect("Valid Vec is deserialized using v1"); + + match deserialized { + ResponseMessage::SealedHeaders(Err( + ResponseMessageErrorCode::ProtocolV1EmptyResponse, + )) => {} + other => { + panic!("Deserialized to {other:?}, expected {response:?}") + } + } + } + + #[tokio::test] + async fn backward_compatibility_v1_read_error_response() { + let response = ResponseMessage::SealedHeaders(Err( + ResponseMessageErrorCode::ProtocolV1EmptyResponse, + )); + let mut codec = PostcardCodec::new(1024); + let mut buf = Vec::with_capacity(1024); + codec + .write_response(&PostcardProtocol::V1, &mut buf, response.clone()) + .await + .expect("Valid Vec is serialized using v1"); + + let deserialized_as_legacy = + // We cannot access the codec trait from an old node here, + // so we deserialize directly using the `LegacyResponseMessage` type. + deserialize::(&buf).expect("Deserialization as LegacyResponseMessage should succeed"); + match deserialized_as_legacy { + LegacyResponseMessage::SealedHeaders(None) => {} + other => { + panic!("Deserialized to {other:?}, expected {response:?}") + } + } + } + + #[tokio::test] + async fn backward_compatibility_v1_write_error_response() { + let response = LegacyResponseMessage::SealedHeaders(None); + let mut codec = PostcardCodec::new(1024); + let buf = serialize(&response).expect("Serialization as LegacyResponseMessage should succeed"); + + let deserialized = codec + .read_response(&PostcardProtocol::V1, &mut buf.as_slice()) + .await + .expect("Valid Vec is deserialized using v1"); + match deserialized { + ResponseMessage::SealedHeaders(Err(ResponseMessageErrorCode::ProtocolV1EmptyResponse)) => {} + other => { + panic!("Deserialized to {other:?}, expected {response:?}") + } + } + } } From 06e417750d30988f489523213beb858d1ba8fd5e Mon Sep 17 00:00:00 2001 From: acerone85 Date: Mon, 21 Oct 2024 19:31:55 +0100 Subject: [PATCH 15/23] Formatting --- crates/services/p2p/src/codecs/postcard.rs | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/crates/services/p2p/src/codecs/postcard.rs b/crates/services/p2p/src/codecs/postcard.rs index 93e0cd99a0c..f2f861c1b97 100644 --- a/crates/services/p2p/src/codecs/postcard.rs +++ b/crates/services/p2p/src/codecs/postcard.rs @@ -340,7 +340,7 @@ mod tests { .await .expect("Valid Vec is serialized using v1"); - let deserialized_as_legacy = + let deserialized_as_legacy = // We cannot access the codec trait from an old node here, // so we deserialize directly using the `LegacyResponseMessage` type. deserialize::(&buf).expect("Deserialization as LegacyResponseMessage should succeed"); @@ -356,14 +356,17 @@ mod tests { async fn backward_compatibility_v1_write_error_response() { let response = LegacyResponseMessage::SealedHeaders(None); let mut codec = PostcardCodec::new(1024); - let buf = serialize(&response).expect("Serialization as LegacyResponseMessage should succeed"); + let buf = serialize(&response) + .expect("Serialization as LegacyResponseMessage should succeed"); let deserialized = codec .read_response(&PostcardProtocol::V1, &mut buf.as_slice()) .await .expect("Valid Vec is deserialized using v1"); match deserialized { - ResponseMessage::SealedHeaders(Err(ResponseMessageErrorCode::ProtocolV1EmptyResponse)) => {} + ResponseMessage::SealedHeaders(Err( + ResponseMessageErrorCode::ProtocolV1EmptyResponse, + )) => {} other => { panic!("Deserialized to {other:?}, expected {response:?}") } From 24dd5db021d74d5c632b6d2d016cd4f1fef4b9e3 Mon Sep 17 00:00:00 2001 From: acerone85 Date: Mon, 21 Oct 2024 13:42:57 +0100 Subject: [PATCH 16/23] Request/response: Return meaningful error codes wehen using version 0.0.2 --- .../p2p/src/request_response/messages.rs | 6 ++++++ crates/services/p2p/src/service.rs | 19 ++++++------------- 2 files changed, 12 insertions(+), 13 deletions(-) diff --git a/crates/services/p2p/src/request_response/messages.rs b/crates/services/p2p/src/request_response/messages.rs index ef0d49b69a2..d80bef35b50 100644 --- a/crates/services/p2p/src/request_response/messages.rs +++ b/crates/services/p2p/src/request_response/messages.rs @@ -40,6 +40,12 @@ pub enum ResponseMessageErrorCode { /// The peer sent an empty response using protocol `/fuel/req_res/0.0.1` #[error("Empty response sent by peer using legacy protocol /fuel/req_res/0.0.1")] ProtocolV1EmptyResponse = 0, + #[error("The requested range is too large")] + RequestedRangeTooLarge = 1, + #[error("Timeout while processing request")] + Timeout = 2, + #[error("Sync processor is out of capacity")] + SyncProcessorOutOfCapacity = 3, } #[derive(Debug, Clone, Serialize, Deserialize)] diff --git a/crates/services/p2p/src/service.rs b/crates/services/p2p/src/service.rs index 22f7d50c778..12d936b18d8 100644 --- a/crates/services/p2p/src/service.rs +++ b/crates/services/p2p/src/service.rs @@ -552,11 +552,9 @@ where tracing::error!( requested_length = range.len(), max_len, - "Requested range is too big" + "Requested range is too large" ); - // TODO: Return helpful error message to requester. https://github.com/FuelLabs/fuel-core/issues/1311 - // TODO[AC] Use more meaningful error codes - let response = Err(ResponseMessageErrorCode::ProtocolV1EmptyResponse); + let response = Err(ResponseMessageErrorCode::RequestedRangeTooLarge); let _ = self .p2p_service .send_response_msg(request_id, response_sender(response)); @@ -570,20 +568,18 @@ where return; } - // TODO[AC] Assign an error code to this let response = db_lookup(&view, range.clone()) .ok() .flatten() - .ok_or(ResponseMessageErrorCode::ProtocolV1EmptyResponse); + .ok_or(ResponseMessageErrorCode::Timeout); let _ = response_channel .try_send(task_request(response, request_id)) .trace_err("Failed to send response to the request channel"); }); - // TODO[AC]: Handle error cases and return meaningful status codes if result.is_err() { - let err = Err(ResponseMessageErrorCode::ProtocolV1EmptyResponse); + let err = Err(ResponseMessageErrorCode::SyncProcessorOutOfCapacity); let _ = self .p2p_service .send_response_msg(request_id, response_sender(err)); @@ -664,8 +660,7 @@ where }); if result.is_err() { - // TODO[AC]: return better error code - let res = Err(ResponseMessageErrorCode::ProtocolV1EmptyResponse); + let res = Err(ResponseMessageErrorCode::SyncProcessorOutOfCapacity); let _ = self .p2p_service .send_response_msg(request_id, response_sender(res)); @@ -696,13 +691,11 @@ where tx_ids: Vec, request_id: InboundRequestId, ) -> anyhow::Result<()> { - // TODO: Return helpful error message to requester. https://github.com/FuelLabs/fuel-core/issues/1311 - // TODO[AC] Use more meaningful error codes if tx_ids.len() > self.max_txs_per_request { self.p2p_service.send_response_msg( request_id, ResponseMessage::TxPoolFullTransactions(Err( - ResponseMessageErrorCode::ProtocolV1EmptyResponse, + ResponseMessageErrorCode::RequestedRangeTooLarge, )), )?; return Ok(()); From 1bc42532f3ddc7eccddefd60472ec4273b98f793 Mon Sep 17 00:00:00 2001 From: acerone85 Date: Mon, 21 Oct 2024 16:34:33 +0100 Subject: [PATCH 17/23] Log response error codes as warnings --- crates/services/p2p/src/p2p_service.rs | 8 +-- .../p2p/src/request_response/messages.rs | 12 ++-- crates/services/p2p/src/service.rs | 55 +++++++++++++------ 3 files changed, 50 insertions(+), 25 deletions(-) diff --git a/crates/services/p2p/src/p2p_service.rs b/crates/services/p2p/src/p2p_service.rs index 556c7b1de29..c7354a8bf2f 100644 --- a/crates/services/p2p/src/p2p_service.rs +++ b/crates/services/p2p/src/p2p_service.rs @@ -676,7 +676,7 @@ impl FuelP2PService { ResponseSender::SealedHeaders(c) => match response { ResponseMessage::SealedHeaders(v) => { // TODO[AC]: Change type of ResponseSender and remove the .ok() here - c.send((peer, Ok(v.ok()))).is_ok() + c.send((peer, Ok(v))).is_ok() } _ => { warn!( @@ -688,7 +688,7 @@ impl FuelP2PService { }, ResponseSender::Transactions(c) => match response { ResponseMessage::Transactions(v) => { - c.send((peer, Ok(v.ok()))).is_ok() + c.send((peer, Ok(v))).is_ok() } _ => { warn!( @@ -700,7 +700,7 @@ impl FuelP2PService { }, ResponseSender::TxPoolAllTransactionsIds(c) => match response { ResponseMessage::TxPoolAllTransactionsIds(v) => { - c.send((peer, Ok(v.ok()))).is_ok() + c.send((peer, Ok(v))).is_ok() } _ => { warn!( @@ -712,7 +712,7 @@ impl FuelP2PService { }, ResponseSender::TxPoolFullTransactions(c) => match response { ResponseMessage::TxPoolFullTransactions(v) => { - c.send((peer, Ok(v.ok()))).is_ok() + c.send((peer, Ok(v))).is_ok() } _ => { warn!( diff --git a/crates/services/p2p/src/request_response/messages.rs b/crates/services/p2p/src/request_response/messages.rs index d80bef35b50..06f2a50d3ce 100644 --- a/crates/services/p2p/src/request_response/messages.rs +++ b/crates/services/p2p/src/request_response/messages.rs @@ -115,10 +115,14 @@ pub type OnResponse = oneshot::Sender<(PeerId, Result)>; #[derive(Debug)] pub enum ResponseSender { - SealedHeaders(OnResponse>>), - Transactions(OnResponse>>), - TxPoolAllTransactionsIds(OnResponse>>), - TxPoolFullTransactions(OnResponse>>>), + SealedHeaders(OnResponse, ResponseMessageErrorCode>>), + Transactions(OnResponse, ResponseMessageErrorCode>>), + TxPoolAllTransactionsIds(OnResponse, ResponseMessageErrorCode>>), + TxPoolFullTransactions( + OnResponse< + Result>, ResponseMessageErrorCode>, + >, + ), } #[derive(Debug, Error)] diff --git a/crates/services/p2p/src/service.rs b/crates/services/p2p/src/service.rs index 12d936b18d8..eacfdd5ce48 100644 --- a/crates/services/p2p/src/service.rs +++ b/crates/services/p2p/src/service.rs @@ -113,21 +113,23 @@ pub enum TaskRequest { }, GetSealedHeaders { block_height_range: Range, - channel: OnResponse>>, + channel: OnResponse, ResponseMessageErrorCode>>, }, GetTransactions { block_height_range: Range, from_peer: PeerId, - channel: OnResponse>>, + channel: OnResponse, ResponseMessageErrorCode>>, }, TxPoolGetAllTxIds { from_peer: PeerId, - channel: OnResponse>>, + channel: OnResponse, ResponseMessageErrorCode>>, }, TxPoolGetFullTransactions { tx_ids: Vec, from_peer: PeerId, - channel: OnResponse>>>, + channel: OnResponse< + Result>, ResponseMessageErrorCode>, + >, }, // Responds back to the p2p network RespondWithGossipsubMessageReport((GossipsubMessageInfo, GossipsubMessageAcceptance)), @@ -1029,8 +1031,17 @@ impl SharedState { let (peer_id, response) = receiver.await.map_err(|e| anyhow!("{e}"))?; - let data = response.map_err(|e| anyhow!("Invalid response from peer {e:?}"))?; - Ok((peer_id.to_bytes(), data)) + let data = match response { + Err(request_response_protocol_error) => Err(anyhow!( + "Invalid response from peer {request_response_protocol_error:?}" + )), + Ok(Err(response_error_code)) => { + warn!("Peer {peer_id:?} failed to respond with sealed headers: {response_error_code:?}"); + Ok(None) + } + Ok(Ok(headers)) => Ok(Some(headers)), + }; + data.map(|data| (peer_id.to_bytes(), data)) } pub async fn get_transactions_from_peer( @@ -1056,7 +1067,18 @@ impl SharedState { "Bug: response from non-requested peer" ); - response.map_err(|e| anyhow!("Invalid response from peer {e:?}")) + match response { + Err(request_response_protocol_error) => Err(anyhow!( + "Invalid response from peer {request_response_protocol_error:?}" + )), + Ok(Err(response_error_code)) => { + warn!( + "Peer {peer_id:?} failed to respond with transactions: {response_error_code:?}" + ); + return Ok(None); + } + Ok(Ok(txs)) => Ok(Some(txs)), + } } pub async fn get_all_transactions_ids_from_peer( @@ -1080,11 +1102,11 @@ impl SharedState { "Bug: response from non-requested peer" ); - let Some(txs) = - response.map_err(|e| anyhow!("Invalid response from peer {e:?}"))? - else { - return Ok(vec![]); - }; + let response = + response.map_err(|e| anyhow!("Invalid response from peer {e:?}"))?; + + let txs = response.inspect_err(|e| { warn!("Peer {peer_id:?} could not response to request to get all transactions ids: {e:?}"); } ).unwrap_or_default(); + if txs.len() > self.max_txs_per_request { return Err(anyhow!("Too many transactions requested: {}", txs.len())); } @@ -1113,11 +1135,10 @@ impl SharedState { "Bug: response from non-requested peer" ); - let Some(txs) = - response.map_err(|e| anyhow!("Invalid response from peer {e:?}"))? - else { - return Ok(vec![]); - }; + let response = + response.map_err(|e| anyhow!("Invalid response from peer {e:?}"))?; + let txs = response.inspect_err(|e| { warn!("Peer {peer_id:?} could not response to request to get full transactions: {e:?}"); } ).unwrap_or_default(); + if txs.len() > self.max_txs_per_request { return Err(anyhow!("Too many transactions requested: {}", txs.len())); } From c76608be4bab449007dc8700ff9c84b98c1991e7 Mon Sep 17 00:00:00 2001 From: acerone85 Date: Tue, 22 Oct 2024 15:26:29 +0100 Subject: [PATCH 18/23] Fix inconsistencies from git merge master --- crates/services/p2p/src/codecs/postcard.rs | 11 +++++------ crates/services/p2p/src/p2p_service.rs | 6 +++--- crates/services/p2p/src/service.rs | 2 +- 3 files changed, 9 insertions(+), 10 deletions(-) diff --git a/crates/services/p2p/src/codecs/postcard.rs b/crates/services/p2p/src/codecs/postcard.rs index a0e93d8a882..c71b7f80007 100644 --- a/crates/services/p2p/src/codecs/postcard.rs +++ b/crates/services/p2p/src/codecs/postcard.rs @@ -176,14 +176,15 @@ impl GossipsubCodec for PostcardCodec { } } -// TODO: Remove this NetworkCodec impl NetworkCodec for PostcardCodec { fn get_req_res_protocols( &self, ) -> impl Iterator::Protocol> { - // TODO: Iterating over versions in reverse order should prefer + // TODO: https://github.com/FuelLabs/fuel-core/issues/2374 + // Iterating over versions in reverse order should prefer // peers to use V2 over V1 for exchanging messages. However, this is - // not guaranteed by the specs for the `request_response` protocol. + // not guaranteed by the specs for the `request_response` protocol, + // and it should be tested. PostcardProtocol::iter().rev() } } @@ -310,10 +311,8 @@ mod tests { async fn codec__serialzation_roundtrip_using_v1_on_error_response_returns_predefined_error_code( ) { // Given - // TODO: https://github.com/FuelLabs/fuel-core/issues/1311 - // Change this to a different ResponseMessageErrorCode once these have been implemented. let response = V2ResponseMessage::SealedHeaders(Err( - ResponseMessageErrorCode::ProtocolV1EmptyResponse, + ResponseMessageErrorCode::RequestedRangeTooLarge, )); let mut codec = PostcardCodec::new(1024); let mut buf = Vec::with_capacity(1024); diff --git a/crates/services/p2p/src/p2p_service.rs b/crates/services/p2p/src/p2p_service.rs index c1092f844f8..969d8cc3639 100644 --- a/crates/services/p2p/src/p2p_service.rs +++ b/crates/services/p2p/src/p2p_service.rs @@ -1717,7 +1717,7 @@ mod tests { tokio::spawn(async move { let response_message = rx_orchestrator.await; - if let Ok((_, Ok(Some(transactions)))) = response_message { + if let Ok((_, Ok(Ok(transactions)))) = response_message { let check = transactions.len() == 1 && transactions[0].0.len() == 5; let _ = tx_test_end.send(check).await; } else { @@ -1733,7 +1733,7 @@ mod tests { tokio::spawn(async move { let response_message = rx_orchestrator.await; - if let Ok((_, Ok(Some(transaction_ids)))) = response_message { + if let Ok((_, Ok(Ok(transaction_ids)))) = response_message { let tx_ids: Vec = (0..5).map(|_| Transaction::default_test_tx().id(&ChainId::new(1))).collect(); let check = transaction_ids.len() == 5 && transaction_ids.iter().zip(tx_ids.iter()).all(|(a, b)| a == b); let _ = tx_test_end.send(check).await; @@ -1750,7 +1750,7 @@ mod tests { tokio::spawn(async move { let response_message = rx_orchestrator.await; - if let Ok((_, Ok(Some(transactions)))) = response_message { + if let Ok((_, Ok(Ok(transactions)))) = response_message { let txs: Vec> = tx_ids.iter().enumerate().map(|(i, _)| { if i == 0 { None diff --git a/crates/services/p2p/src/service.rs b/crates/services/p2p/src/service.rs index c2181892b15..50d34b2d1e3 100644 --- a/crates/services/p2p/src/service.rs +++ b/crates/services/p2p/src/service.rs @@ -1078,7 +1078,7 @@ impl SharedState { warn!( "Peer {peer_id:?} failed to respond with transactions: {response_error_code:?}" ); - return Ok(None); + Ok(None) } Ok(Ok(txs)) => Ok(Some(txs)), } From aa79b3cde71808261f8dfe718e9bbcaa3cdaabdd Mon Sep 17 00:00:00 2001 From: acerone85 Date: Tue, 22 Oct 2024 16:59:40 +0100 Subject: [PATCH 19/23] add changelog entry --- CHANGELOG.md | 3 +++ 1 file changed, 3 insertions(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 645237e21c9..cd7fc8cc5a9 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -14,6 +14,9 @@ and this project adheres to [Semantic Versioning](http://semver.org/). - [2321](https://github.com/FuelLabs/fuel-core/pull/2321): New metrics for the txpool: "The size of transactions in the txpool" (`txpool_tx_size`), "The time spent by a transaction in the txpool in seconds" (`txpool_tx_time_in_txpool_seconds`), The number of transactions in the txpool (`txpool_number_of_transactions`), "The number of transactions pending verification before entering the txpool" (`txpool_number_of_transactions_pending_verification`), "The number of executable transactions in the txpool" (`txpool_number_of_executable_transactions`), "The time it took to select transactions for inclusion in a block in nanoseconds" (`txpool_select_transaction_time_nanoseconds`), The time it took to insert a transaction in the txpool in milliseconds (`txpool_insert_transaction_time_milliseconds`). - [2362](https://github.com/FuelLabs/fuel-core/pull/2362): Added a new request_response protocol version `/fuel/req_res/0.0.2`. In comparison with `/fuel/req/0.0.1`, which returns an empty response when a request cannot be fulfilled, this version returns more meaningful error codes. Nodes still support the version `0.0.1` of the protocol to guarantee backward compatibility with fuel-core nodes. Empty responses received from nodes using the old protocol `/fuel/req/0.0.1` are automatically converted into an error `ProtocolV1EmptyResponse` with error code 0, which is also the only error code implemented. More specific error codes will be added in the future. +### Changed +- [2377](https://github.com/FuelLabs/fuel-core/pull/2377): Add more errors that can be returned as responses when using protocol `/fuel/req_res/0.0.2`. The errors supported are `ProtocolV1EmptyResponse` (status code `0`) for converting empty responses sent via protocol `/fuel/req_res/0.0.1`, `RequestedRangeTooLarge`(status code `1`) if the client requests a range of objects such as sealed block headers or transactions too large, `Timeout` (status code `2`) if the remote peer takes too long to fulfill a request, or `SyncProcessorOutOfCapacity` if the remote peer is fulfilling too many requests concurrently. + ## [Version 0.40.0] ### Added From ea4842c3c67ad23ff743cda8810eacfceeac964f Mon Sep 17 00:00:00 2001 From: acerone85 Date: Wed, 23 Oct 2024 13:44:20 +0100 Subject: [PATCH 20/23] Remove default implementation for PostcardProtocol --- crates/services/p2p/src/codecs/postcard.rs | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/crates/services/p2p/src/codecs/postcard.rs b/crates/services/p2p/src/codecs/postcard.rs index c71b7f80007..e2b7a97f9b1 100644 --- a/crates/services/p2p/src/codecs/postcard.rs +++ b/crates/services/p2p/src/codecs/postcard.rs @@ -189,9 +189,8 @@ impl NetworkCodec for PostcardCodec { } } -#[derive(Debug, Default, Clone, EnumIter)] +#[derive(Debug, Clone, EnumIter)] pub enum PostcardProtocol { - #[default] V1, V2, } From 216dfb94d32b1da2dff33a92ae4ecc5f05f390e9 Mon Sep 17 00:00:00 2001 From: acerone85 Date: Thu, 31 Oct 2024 17:33:39 +0000 Subject: [PATCH 21/23] Fix tests compilation --- crates/services/p2p/src/p2p_service.rs | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/crates/services/p2p/src/p2p_service.rs b/crates/services/p2p/src/p2p_service.rs index 0947e10049c..85581c7427d 100644 --- a/crates/services/p2p/src/p2p_service.rs +++ b/crates/services/p2p/src/p2p_service.rs @@ -1719,12 +1719,12 @@ mod tests { if let Ok(response) = response_message { match response { - Ok((_, Ok(Some(sealed_headers)))) => { + Ok((_, Ok(Ok(sealed_headers)))) => { let check = expected.iter().zip(sealed_headers.iter()).all(|(a, b)| eq_except_metadata(a, b)); let _ = tx_test_end.send(check).await; }, - Ok((_, Ok(None))) => { - tracing::error!("Node A did not return any headers"); + Ok((_, Ok(Err(e)))) => { + tracing::error!("Node A did not return any headers: {:?}", e); let _ = tx_test_end.send(false).await; }, Ok((_, Err(e))) => { @@ -1752,12 +1752,12 @@ mod tests { if let Ok(response) = response_message { match response { - Ok((_, Ok(Some(transactions)))) => { + Ok((_, Ok(Ok(transactions)))) => { let check = transactions.len() == 1 && transactions[0].0.len() == 5; let _ = tx_test_end.send(check).await; }, - Ok((_, Ok(None))) => { - tracing::error!("Node A did not return any transactions"); + Ok((_, Ok(Err(e)))) => { + tracing::error!("Node A did not return any transactions: {:?}", e); let _ = tx_test_end.send(false).await; }, Ok((_, Err(e))) => { From fd2ba043a2f389cc206a906d9fa66c5029004abb Mon Sep 17 00:00:00 2001 From: acerone85 Date: Thu, 31 Oct 2024 17:34:53 +0000 Subject: [PATCH 22/23] Remove stale todos --- crates/services/p2p/src/p2p_service.rs | 2 -- crates/services/p2p/src/service.rs | 4 ---- 2 files changed, 6 deletions(-) diff --git a/crates/services/p2p/src/p2p_service.rs b/crates/services/p2p/src/p2p_service.rs index 85581c7427d..94646d50815 100644 --- a/crates/services/p2p/src/p2p_service.rs +++ b/crates/services/p2p/src/p2p_service.rs @@ -675,8 +675,6 @@ impl FuelP2PService { let send_ok = match channel { ResponseSender::SealedHeaders(c) => match response { V2ResponseMessage::SealedHeaders(v) => { - // TODO: https://github.com/FuelLabs/fuel-core/issues/1311 - // Change type of ResponseSender and remove the .ok() here c.send(Ok((peer, Ok(v)))).is_ok() } _ => { diff --git a/crates/services/p2p/src/service.rs b/crates/services/p2p/src/service.rs index 4879eab9782..0004a15ea63 100644 --- a/crates/services/p2p/src/service.rs +++ b/crates/services/p2p/src/service.rs @@ -599,8 +599,6 @@ where .trace_err("Failed to send response to the request channel"); }); - // TODO: https://github.com/FuelLabs/fuel-core/issues/1311 - // Handle error cases and return meaningful status codes if result.is_err() { let err = Err(ResponseMessageErrorCode::SyncProcessorOutOfCapacity); let _ = self @@ -676,8 +674,6 @@ where return; }; - // TODO: https://github.com/FuelLabs/fuel-core/issues/1311 - // Return helpful error message to requester. let _ = response_channel .try_send(task_request(Ok(response), request_id)) .trace_err("Failed to send response to the request channel"); From 5ea9b30e4d409b85f76aa40cc60c52d409b11ec9 Mon Sep 17 00:00:00 2001 From: acerone85 Date: Thu, 31 Oct 2024 18:43:41 +0000 Subject: [PATCH 23/23] Add unknown error code --- .../p2p/src/request_response/messages.rs | 42 +++++++++++++++++++ 1 file changed, 42 insertions(+) diff --git a/crates/services/p2p/src/request_response/messages.rs b/crates/services/p2p/src/request_response/messages.rs index 6c9b23f431f..5a453b784cf 100644 --- a/crates/services/p2p/src/request_response/messages.rs +++ b/crates/services/p2p/src/request_response/messages.rs @@ -46,6 +46,9 @@ pub enum ResponseMessageErrorCode { Timeout = 2, #[error("Sync processor is out of capacity")] SyncProcessorOutOfCapacity = 3, + #[error("The peer sent an unknown error code")] + #[serde(skip_serializing, other)] + Unknown, } #[derive(Debug, Clone, Serialize, Deserialize)] @@ -163,3 +166,42 @@ pub enum ResponseSendError { #[error("Failed to convert response to intermediate format")] ConversionToIntermediateFailed, } + +#[cfg(test)] +#[allow(non_snake_case)] +mod tests { + use super::ResponseMessageErrorCode; + + #[test] + fn response_message_error_code__unknown_error_cannot_be_serialized() { + let error = super::ResponseMessageErrorCode::Unknown; + let serialized = postcard::to_allocvec(&error); + assert!(serialized.is_err()); + } + + #[test] + fn response_message_error_code__known_error_code_is_deserialized_to_variant() { + let serialized_error_code = + postcard::to_stdvec(&ResponseMessageErrorCode::ProtocolV1EmptyResponse) + .unwrap(); + println!("Error code: {:?}", serialized_error_code); + let response_message_error_code: ResponseMessageErrorCode = + postcard::from_bytes(&serialized_error_code).unwrap(); + assert!(matches!( + response_message_error_code, + ResponseMessageErrorCode::ProtocolV1EmptyResponse + )); + } + + #[test] + fn response_message_error_code__unknown_error_code_is_deserialized_to_unknown_variant( + ) { + let serialized_error_code = vec![42]; + let response_message_error_code: ResponseMessageErrorCode = + postcard::from_bytes(&serialized_error_code).unwrap(); + assert!(matches!( + response_message_error_code, + ResponseMessageErrorCode::Unknown + )); + } +}