From 6384b85d0b25a99e01cfb62a81a551d27aa5c74b Mon Sep 17 00:00:00 2001 From: Pierre Krieger Date: Mon, 3 Aug 2020 14:36:42 +0200 Subject: [PATCH 01/12] Add a DirectedGossip struct --- client/network/Cargo.toml | 2 +- client/network/src/gossip.rs | 305 ++++++++++++++++++++++++++++++++++ client/network/src/lib.rs | 2 + client/network/src/service.rs | 4 + 4 files changed, 312 insertions(+), 1 deletion(-) create mode 100644 client/network/src/gossip.rs diff --git a/client/network/Cargo.toml b/client/network/Cargo.toml index 11346fdd3ffc6..9870cb73c2ecf 100644 --- a/client/network/Cargo.toml +++ b/client/network/Cargo.toml @@ -16,6 +16,7 @@ targets = ["x86_64-unknown-linux-gnu"] prost-build = "0.6.1" [dependencies] +async-std = { version = "1.6.2", features = ["unstable"] } bitflags = "1.2.0" bs58 = "0.3.1" bytes = "0.5.0" @@ -66,7 +67,6 @@ default-features = false features = ["identify", "kad", "mdns", "mplex", "noise", "ping", "tcp-async-std", "websocket", "yamux"] [dev-dependencies] -async-std = "1.6.2" assert_matches = "1.3" env_logger = "0.7.0" libp2p = { version = "0.22.0", default-features = false, features = ["secio"] } diff --git a/client/network/src/gossip.rs b/client/network/src/gossip.rs new file mode 100644 index 0000000000000..69655186b647e --- /dev/null +++ b/client/network/src/gossip.rs @@ -0,0 +1,305 @@ +// This file is part of Substrate. + +// Copyright (C) 2017-2020 Parity Technologies (UK) Ltd. +// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0 + +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with this program. If not, see . + +//! Helper for sending rate-limited gossip messages. +//! +//! # Context +//! +//! The [`NetworkService`] struct provides a way to send notifications to a certain peer through +//! the [`NetworkService::notification_sender`] method. This method is quite low level and isn't +//! expected to be used directly. +//! +//! The [`DirectedGossip`] struct provided by this module is built on top of +//! [`NetworkService::notification_sender`] and provides a cleaner way to send notifications. +//! +//! # Behaviour +//! +//! An instance of [`DirectedGossip`] is specific to a certain combination of `PeerId` and +//! protocol name. It maintains a buffer of messages waiting to be sent out. The user of this API +//! is able to manipulate that queue, adding or removing obsolete messages. +//! +//! Creating a [`DirectedGossip`] also returns a opaque `Future` whose responsibility it to +//! drain that queue and actually send the messages. If the substream with the given combination +//! of peer and protocol is closed, the queue is silently discarded. It is the role of the user +//! to track which peers we are connected to. +//! +//! If multiple instances of [`DirectedGossip`] exist for the same peer and protocol, or if some +//! other code uses the [`NetworkService`] to send notifications to this peer and protocol, then +//! the notifications will be interleaved in an unpredictable way. +//! + +use crate::{ExHashT, NetworkService, service::{NotificationSender, NotificationSenderError}}; + +use async_std::sync::{Condvar, Mutex, MutexGuard}; +use futures::prelude::*; +use libp2p::PeerId; +use sp_runtime::{traits::Block as BlockT, ConsensusEngineId}; +use std::{ + collections::VecDeque, + sync::{atomic, Arc}, + time::Duration, +}; + +/// Notifications sender for a specific combination of network service, peer, and protocol. +pub struct DirectedGossip { + /// Shared between the front and the back task. + shared: Arc>, +} + +impl DirectedGossip { + /// Returns a new [`DirectedGossip`] containing a queue of message for this specific + /// combination of peer and protocol. + /// + /// In addition to the [`DirectedGossip`], also returns a `Future` whose role is to drive + /// the messages sending forward. + pub fn new( + service: Arc>, + peer_id: PeerId, + protocol: ConsensusEngineId, + queue_size_limit: usize, + messages_encode: F + ) -> (Self, impl Future + Send + 'static) + where + M: Send + 'static, + B: BlockT + 'static, + H: ExHashT, + F: Fn(M) -> Vec + Send + 'static, + { + DirectedGossipPrototype::new(service, peer_id, protocol) + .build(queue_size_limit, messages_encode) + } + + /// Locks the queue of messages towards this peer. + /// + /// The returned `Future` is expected to be ready quite quickly. + pub async fn lock_queue<'a>(&'a self) -> QueueLock<'a, M> { + QueueLock { + lock: self.shared.locked.lock().await, + condvar: &self.shared.condvar, + queue_size_limit: self.shared.queue_size_limit, + } + } + + /// Pushes a message to the queue, or discards it if the queue is full. + /// + /// The returned `Future` is expected to be ready quite quickly. + pub async fn queue_or_discard(&self, message: M) + where + M: Send + 'static + { + self.lock_queue().await.push_or_discard(message); + } +} + +impl Drop for DirectedGossip { + fn drop(&mut self) { + // The "clean" way to notify the `Condvar` here is normally to first lock the `Mutex`, + // then notify the `Condvar` while the `Mutex` is locked. Unfortunately, the `Mutex` + // being asynchronous, it can't reasonably be locked from within a destructor. + // For this reason, this destructor is a "best effort" destructor. + // See also the corresponding code in the background task. + self.shared.stop_task.store(true, atomic::Ordering::Acquire); + self.shared.condvar.notify_all(); + } +} + +/// Utility. Generic over the type of the messages. Holds a [`NetworkService`], a [`PeerId`], +/// and a [`ConsensusEngineId`]. Provides a [`DirectedGossipPrototype::build`] function that +/// builds a [`DirectedGossip`]. +pub struct DirectedGossipPrototype { + service: Arc, + peer_id: PeerId, + protocol: ConsensusEngineId, +} + +impl DirectedGossipPrototype { + /// Builds a new [`DirectedGossipPrototype`] containing the given components. + pub fn new( + service: Arc>, + peer_id: PeerId, + protocol: ConsensusEngineId, + ) -> Self + where + B: BlockT + 'static, + H: ExHashT, + { + DirectedGossipPrototype { + service, + peer_id, + protocol, + } + } + + /// Turns this [`DirectedGossipPrototype`] into a [`DirectedGossip`] and a future. + /// + /// See [`DirectGossip::new`] for details. + pub fn build( + self, + queue_size_limit: usize, + messages_encode: F + ) -> (DirectedGossip, impl Future + Send + 'static) + where + M: Send + 'static, + F: Fn(M) -> Vec + Send + 'static, + { + let shared = Arc::new(Shared { + stop_task: atomic::AtomicBool::new(false), + condvar: Condvar::new(), + queue_size_limit, + locked: Mutex::new(SharedLock { + messages_queue: VecDeque::with_capacity(queue_size_limit), + }), + }); + + let task = spawn_task( + self.service, + self.peer_id, + self.protocol, + shared.clone(), + messages_encode + ); + + (DirectedGossip { shared }, task) + } +} + +/// Locked queue of messages to the given peer. +/// +/// As long as this struct exists, the background task is asleep and the owner of the [`QueueLock`] +/// is in total control of the buffer. +#[must_use] +pub struct QueueLock<'a, M> { + lock: MutexGuard<'a, SharedLock>, + condvar: &'a Condvar, + /// Same as [`Shared::queue_size_limit`]. + queue_size_limit: usize, +} + +impl<'a, M: Send + 'static> QueueLock<'a, M> { + /// Pushes a message to the queue, or discards it if the queue is full. + pub fn push_or_discard(&mut self, message: M) { + if self.lock.messages_queue.len() < self.queue_size_limit { + self.lock.messages_queue.push_back(message); + } + } + + /// Pushes a message to the queue. Does not enforce any limit to the size of the queue. + /// Use this method only if the message is extremely important. + pub fn push_unbounded(&mut self, message: M) { + self.lock.messages_queue.push_back(message); + } + + /// Calls `filter` for each message in the queue, and removes the ones for which `false` is + /// returned. + /// + /// > **Note**: The parameter of `filter` is a `&M` and not a `&mut M` (which would be + /// > better) because the underlying implementation relies on `VecDeque::retain`. + pub fn retain(&mut self, filter: impl FnMut(&M) -> bool) { + self.lock.messages_queue.retain(filter); + } +} + +impl<'a, M> Drop for QueueLock<'a, M> { + fn drop(&mut self) { + // We notify the `Condvar` in the destructor in order to be able to push multiple + // messages and wake up the background task only one afterwards. + self.condvar.notify_one(); + } +} + +#[derive(Debug)] +struct Shared { + /// Read by the background task after locking `locked`. If true, the task stops. + stop_task: atomic::AtomicBool, + locked: Mutex>, + /// Must be notified every time the content of `locked` changes. + condvar: Condvar, + /// Maximum number of elements in `messages_queue`. + queue_size_limit: usize, +} + +#[derive(Debug)] +struct SharedLock { + /// Queue of messages waiting to be sent out. + messages_queue: VecDeque, +} + +async fn spawn_task Vec>( + service: Arc, + peer_id: PeerId, + protocol: ConsensusEngineId, + shared: Arc>, + messages_encode: F, +) { + loop { + let next_message = 'next_msg: loop { + let mut locked = shared.locked.lock().await; + + loop { + if shared.stop_task.load(atomic::Ordering::Acquire) { + return; + } + + if let Some(msg) = locked.messages_queue.pop_front() { + break 'next_msg msg; + } + + // It is possible that the destructor of `DirectedGossip` sets `stop_task` to + // true and notifies the `Condvar` after the background task loads `stop_task` + // and before it calls `Condvar::wait`. + // See also the corresponding comment in `DirectedGossip::drop`. + // For this reason, we use `wait_timeout`. In the worst case scenario, + // `stop_task` will always be checked again after the timeout is reached. + locked = shared.condvar.wait_timeout(locked, Duration::from_secs(10)).await.0; + } + }; + + // Starting from below, we try to send the message. If an error happens when sending, + // the only sane option we have is to silently discard the message. + let sender = match service.notification_sender(peer_id.clone(), protocol) { + Ok(s) => s, + Err(_) => continue, + }; + + let ready = match sender.ready().await { + Ok(r) => r, + Err(_) => continue, + }; + + let _ = ready.send(messages_encode(next_message)); + } +} + +/// Abstraction around `NetworkService` that permits removing the `B` and `H` parameters. +trait AbstractNotificationSender { + fn notification_sender( + &self, + target: PeerId, + engine_id: ConsensusEngineId, + ) -> Result; +} + +impl AbstractNotificationSender for NetworkService { + fn notification_sender( + &self, + target: PeerId, + engine_id: ConsensusEngineId, + ) -> Result { + NetworkService::notification_sender(self, target, engine_id) + } +} diff --git a/client/network/src/lib.rs b/client/network/src/lib.rs index fc5cab321d127..e387d8ca28f37 100644 --- a/client/network/src/lib.rs +++ b/client/network/src/lib.rs @@ -15,6 +15,7 @@ // You should have received a copy of the GNU General Public License // along with this program. If not, see . + #![warn(unused_extern_crates)] #![warn(missing_docs)] @@ -259,6 +260,7 @@ mod utils; pub mod config; pub mod error; +pub mod gossip; pub mod network_state; pub use service::{NetworkService, NetworkWorker}; diff --git a/client/network/src/service.rs b/client/network/src/service.rs index e4ba36be587fe..197675d41a39f 100644 --- a/client/network/src/service.rs +++ b/client/network/src/service.rs @@ -686,6 +686,10 @@ impl NetworkService { /// Notifications should be dropped /// if buffer is full /// + /// + /// See also the [`gossip`](crate::gossip) module for a higher-level way to send + /// notifications. + /// pub fn notification_sender( &self, target: PeerId, From b5b0bcac9dcabf884e8e96c3d4eb4837c07eebc2 Mon Sep 17 00:00:00 2001 From: Pierre Krieger Date: Mon, 3 Aug 2020 17:05:37 +0200 Subject: [PATCH 02/12] Move protocol from prototype::new to biuld --- client/network/src/gossip.rs | 15 ++++++--------- 1 file changed, 6 insertions(+), 9 deletions(-) diff --git a/client/network/src/gossip.rs b/client/network/src/gossip.rs index 69655186b647e..38b3317e119aa 100644 --- a/client/network/src/gossip.rs +++ b/client/network/src/gossip.rs @@ -80,8 +80,8 @@ impl DirectedGossip { H: ExHashT, F: Fn(M) -> Vec + Send + 'static, { - DirectedGossipPrototype::new(service, peer_id, protocol) - .build(queue_size_limit, messages_encode) + DirectedGossipPrototype::new(service, peer_id) + .build(protocol, queue_size_limit, messages_encode) } /// Locks the queue of messages towards this peer. @@ -118,13 +118,11 @@ impl Drop for DirectedGossip { } } -/// Utility. Generic over the type of the messages. Holds a [`NetworkService`], a [`PeerId`], -/// and a [`ConsensusEngineId`]. Provides a [`DirectedGossipPrototype::build`] function that -/// builds a [`DirectedGossip`]. +/// Utility. Generic over the type of the messages. Holds a [`NetworkService`] and a [`PeerId`]. +/// Provides a [`DirectedGossipPrototype::build`] function that builds a [`DirectedGossip`]. pub struct DirectedGossipPrototype { service: Arc, peer_id: PeerId, - protocol: ConsensusEngineId, } impl DirectedGossipPrototype { @@ -132,7 +130,6 @@ impl DirectedGossipPrototype { pub fn new( service: Arc>, peer_id: PeerId, - protocol: ConsensusEngineId, ) -> Self where B: BlockT + 'static, @@ -141,7 +138,6 @@ impl DirectedGossipPrototype { DirectedGossipPrototype { service, peer_id, - protocol, } } @@ -150,6 +146,7 @@ impl DirectedGossipPrototype { /// See [`DirectGossip::new`] for details. pub fn build( self, + protocol: ConsensusEngineId, queue_size_limit: usize, messages_encode: F ) -> (DirectedGossip, impl Future + Send + 'static) @@ -169,7 +166,7 @@ impl DirectedGossipPrototype { let task = spawn_task( self.service, self.peer_id, - self.protocol, + protocol, shared.clone(), messages_encode ); From 02dff055dd51254019f43bff2301f802fbbe8e7a Mon Sep 17 00:00:00 2001 From: Pierre Krieger Date: Mon, 3 Aug 2020 17:09:13 +0200 Subject: [PATCH 03/12] More traits impls --- client/network/src/gossip.rs | 16 ++++++++++++++++ 1 file changed, 16 insertions(+) diff --git a/client/network/src/gossip.rs b/client/network/src/gossip.rs index 38b3317e119aa..c93ef51db536b 100644 --- a/client/network/src/gossip.rs +++ b/client/network/src/gossip.rs @@ -51,6 +51,7 @@ use libp2p::PeerId; use sp_runtime::{traits::Block as BlockT, ConsensusEngineId}; use std::{ collections::VecDeque, + fmt, sync::{atomic, Arc}, time::Duration, }; @@ -106,6 +107,12 @@ impl DirectedGossip { } } +impl fmt::Debug for DirectedGossip { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + f.debug_struct("DirectedGossip").finish() + } +} + impl Drop for DirectedGossip { fn drop(&mut self) { // The "clean" way to notify the `Condvar` here is normally to first lock the `Mutex`, @@ -120,6 +127,7 @@ impl Drop for DirectedGossip { /// Utility. Generic over the type of the messages. Holds a [`NetworkService`] and a [`PeerId`]. /// Provides a [`DirectedGossipPrototype::build`] function that builds a [`DirectedGossip`]. +#[derive(Clone)] pub struct DirectedGossipPrototype { service: Arc, peer_id: PeerId, @@ -175,6 +183,14 @@ impl DirectedGossipPrototype { } } +impl fmt::Debug for DirectedGossipPrototype { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + f.debug_struct("DirectedGossipPrototype") + .field("peer_id", &self.peer_id) + .finish() + } +} + /// Locked queue of messages to the given peer. /// /// As long as this struct exists, the background task is asleep and the owner of the [`QueueLock`] From 67e531d2f452838c03b3e0d2e4630cde17c61e58 Mon Sep 17 00:00:00 2001 From: Pierre Krieger Date: Tue, 4 Aug 2020 12:11:38 +0200 Subject: [PATCH 04/12] Explain ordering --- client/network/src/gossip.rs | 10 +++++++--- 1 file changed, 7 insertions(+), 3 deletions(-) diff --git a/client/network/src/gossip.rs b/client/network/src/gossip.rs index c93ef51db536b..0119e5bdd7af2 100644 --- a/client/network/src/gossip.rs +++ b/client/network/src/gossip.rs @@ -38,9 +38,13 @@ //! of peer and protocol is closed, the queue is silently discarded. It is the role of the user //! to track which peers we are connected to. //! -//! If multiple instances of [`DirectedGossip`] exist for the same peer and protocol, or if some -//! other code uses the [`NetworkService`] to send notifications to this peer and protocol, then -//! the notifications will be interleaved in an unpredictable way. +//! In normal situations, messages sent through a [`DirectedGossip`] will arrive in the same +//! order as they have been sent. +//! It is possible, in the situation of disconnects and reconnects, that messages arrive in a +//! different order. See also https://github.com/paritytech/substrate/issues/6756. +//! However, if multiple instances of [`DirectedGossip`] exist for the same peer and protocol, or +//! if some other code uses the [`NetworkService`] to send notifications to this combination or +//! peer and protocol, then the notifications will be interleaved in an unpredictable way. //! use crate::{ExHashT, NetworkService, service::{NotificationSender, NotificationSenderError}}; From bf5fbb013b23c29e63d6d00cdff50512287a6495 Mon Sep 17 00:00:00 2001 From: Pierre Krieger Date: Tue, 4 Aug 2020 12:26:52 +0200 Subject: [PATCH 05/12] Apply suggestions from code review Co-authored-by: Toralf Wittner --- client/network/src/gossip.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/client/network/src/gossip.rs b/client/network/src/gossip.rs index 0119e5bdd7af2..8d451d558c604 100644 --- a/client/network/src/gossip.rs +++ b/client/network/src/gossip.rs @@ -124,7 +124,7 @@ impl Drop for DirectedGossip { // being asynchronous, it can't reasonably be locked from within a destructor. // For this reason, this destructor is a "best effort" destructor. // See also the corresponding code in the background task. - self.shared.stop_task.store(true, atomic::Ordering::Acquire); + self.shared.stop_task.store(true, atomic::Ordering::Release); self.shared.condvar.notify_all(); } } @@ -234,7 +234,7 @@ impl<'a, M: Send + 'static> QueueLock<'a, M> { impl<'a, M> Drop for QueueLock<'a, M> { fn drop(&mut self) { // We notify the `Condvar` in the destructor in order to be able to push multiple - // messages and wake up the background task only one afterwards. + // messages and wake up the background task only once afterwards. self.condvar.notify_one(); } } From c11bcfb5cecd983085eacf3f2b08f9a6e27c6651 Mon Sep 17 00:00:00 2001 From: Pierre Krieger Date: Tue, 4 Aug 2020 12:35:21 +0200 Subject: [PATCH 06/12] Address concerns --- client/network/src/gossip.rs | 35 +++++++++++------------------------ 1 file changed, 11 insertions(+), 24 deletions(-) diff --git a/client/network/src/gossip.rs b/client/network/src/gossip.rs index 8d451d558c604..410d43e8790d3 100644 --- a/client/network/src/gossip.rs +++ b/client/network/src/gossip.rs @@ -94,7 +94,7 @@ impl DirectedGossip { /// The returned `Future` is expected to be ready quite quickly. pub async fn lock_queue<'a>(&'a self) -> QueueLock<'a, M> { QueueLock { - lock: self.shared.locked.lock().await, + messages_queue: self.shared.messages_queue.lock().await, condvar: &self.shared.condvar, queue_size_limit: self.shared.queue_size_limit, } @@ -170,9 +170,7 @@ impl DirectedGossipPrototype { stop_task: atomic::AtomicBool::new(false), condvar: Condvar::new(), queue_size_limit, - locked: Mutex::new(SharedLock { - messages_queue: VecDeque::with_capacity(queue_size_limit), - }), + messages_queue: Mutex::new(VecDeque::with_capacity(queue_size_limit)), }); let task = spawn_task( @@ -201,7 +199,7 @@ impl fmt::Debug for DirectedGossipPrototype { /// is in total control of the buffer. #[must_use] pub struct QueueLock<'a, M> { - lock: MutexGuard<'a, SharedLock>, + messages_queue: MutexGuard<'a, VecDeque>, condvar: &'a Condvar, /// Same as [`Shared::queue_size_limit`]. queue_size_limit: usize, @@ -210,24 +208,18 @@ pub struct QueueLock<'a, M> { impl<'a, M: Send + 'static> QueueLock<'a, M> { /// Pushes a message to the queue, or discards it if the queue is full. pub fn push_or_discard(&mut self, message: M) { - if self.lock.messages_queue.len() < self.queue_size_limit { - self.lock.messages_queue.push_back(message); + if self.messages_queue.len() < self.queue_size_limit { + self.messages_queue.push_back(message); } } - /// Pushes a message to the queue. Does not enforce any limit to the size of the queue. - /// Use this method only if the message is extremely important. - pub fn push_unbounded(&mut self, message: M) { - self.lock.messages_queue.push_back(message); - } - /// Calls `filter` for each message in the queue, and removes the ones for which `false` is /// returned. /// /// > **Note**: The parameter of `filter` is a `&M` and not a `&mut M` (which would be /// > better) because the underlying implementation relies on `VecDeque::retain`. pub fn retain(&mut self, filter: impl FnMut(&M) -> bool) { - self.lock.messages_queue.retain(filter); + self.messages_queue.retain(filter); } } @@ -243,19 +235,14 @@ impl<'a, M> Drop for QueueLock<'a, M> { struct Shared { /// Read by the background task after locking `locked`. If true, the task stops. stop_task: atomic::AtomicBool, - locked: Mutex>, + /// Queue of messages waiting to be sent out. + messages_queue: Mutex>, /// Must be notified every time the content of `locked` changes. condvar: Condvar, /// Maximum number of elements in `messages_queue`. queue_size_limit: usize, } -#[derive(Debug)] -struct SharedLock { - /// Queue of messages waiting to be sent out. - messages_queue: VecDeque, -} - async fn spawn_task Vec>( service: Arc, peer_id: PeerId, @@ -265,14 +252,14 @@ async fn spawn_task Vec>( ) { loop { let next_message = 'next_msg: loop { - let mut locked = shared.locked.lock().await; + let mut lock = shared.messages_queue.lock().await; loop { if shared.stop_task.load(atomic::Ordering::Acquire) { return; } - if let Some(msg) = locked.messages_queue.pop_front() { + if let Some(msg) = lock.pop_front() { break 'next_msg msg; } @@ -282,7 +269,7 @@ async fn spawn_task Vec>( // See also the corresponding comment in `DirectedGossip::drop`. // For this reason, we use `wait_timeout`. In the worst case scenario, // `stop_task` will always be checked again after the timeout is reached. - locked = shared.condvar.wait_timeout(locked, Duration::from_secs(10)).await.0; + lock = shared.condvar.wait_timeout(lock, Duration::from_secs(10)).await.0; } }; From da88d2b91a20faa2a54ab36442266f58407ab8cd Mon Sep 17 00:00:00 2001 From: Pierre Krieger Date: Tue, 4 Aug 2020 12:58:55 +0200 Subject: [PATCH 07/12] Add basic test --- client/network/src/gossip.rs | 3 + client/network/src/gossip/tests.rs | 201 +++++++++++++++++++++++++++++ 2 files changed, 204 insertions(+) create mode 100644 client/network/src/gossip/tests.rs diff --git a/client/network/src/gossip.rs b/client/network/src/gossip.rs index 410d43e8790d3..4848e83b431e5 100644 --- a/client/network/src/gossip.rs +++ b/client/network/src/gossip.rs @@ -60,6 +60,9 @@ use std::{ time::Duration, }; +#[cfg(test)] +mod tests; + /// Notifications sender for a specific combination of network service, peer, and protocol. pub struct DirectedGossip { /// Shared between the front and the back task. diff --git a/client/network/src/gossip/tests.rs b/client/network/src/gossip/tests.rs new file mode 100644 index 0000000000000..07779835c0884 --- /dev/null +++ b/client/network/src/gossip/tests.rs @@ -0,0 +1,201 @@ +// This file is part of Substrate. + +// Copyright (C) 2017-2020 Parity Technologies (UK) Ltd. +// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0 + +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with this program. If not, see . + +use crate::{config, gossip::DirectedGossip, Event, NetworkService, NetworkWorker}; + +use futures::prelude::*; +use sp_runtime::traits::{Block as BlockT, Header as _}; +use std::{sync::Arc, time::Duration}; +use substrate_test_runtime_client::{TestClientBuilder, TestClientBuilderExt as _}; + +type TestNetworkService = NetworkService< + substrate_test_runtime_client::runtime::Block, + substrate_test_runtime_client::runtime::Hash, +>; + +/// Builds a full node to be used for testing. Returns the node service and its associated events +/// stream. +/// +/// > **Note**: We return the events stream in order to not possibly lose events between the +/// > construction of the service and the moment the events stream is grabbed. +fn build_test_full_node(config: config::NetworkConfiguration) + -> (Arc, impl Stream) +{ + let client = Arc::new( + TestClientBuilder::with_default_backend() + .build_with_longest_chain() + .0, + ); + + #[derive(Clone)] + struct PassThroughVerifier(bool); + impl sp_consensus::import_queue::Verifier for PassThroughVerifier { + fn verify( + &mut self, + origin: sp_consensus::BlockOrigin, + header: B::Header, + justification: Option, + body: Option>, + ) -> Result< + ( + sp_consensus::BlockImportParams, + Option)>>, + ), + String, + > { + let maybe_keys = header + .digest() + .log(|l| { + l.try_as_raw(sp_runtime::generic::OpaqueDigestItemId::Consensus(b"aura")) + .or_else(|| { + l.try_as_raw(sp_runtime::generic::OpaqueDigestItemId::Consensus(b"babe")) + }) + }) + .map(|blob| { + vec![( + sp_blockchain::well_known_cache_keys::AUTHORITIES, + blob.to_vec(), + )] + }); + + let mut import = sp_consensus::BlockImportParams::new(origin, header); + import.body = body; + import.finalized = self.0; + import.justification = justification; + import.fork_choice = Some(sp_consensus::ForkChoiceStrategy::LongestChain); + Ok((import, maybe_keys)) + } + } + + let import_queue = Box::new(sp_consensus::import_queue::BasicQueue::new( + PassThroughVerifier(false), + Box::new(client.clone()), + None, + None, + &sp_core::testing::TaskExecutor::new(), + None, + )); + + let worker = NetworkWorker::new(config::Params { + role: config::Role::Full, + executor: None, + network_config: config, + chain: client.clone(), + finality_proof_provider: None, + finality_proof_request_builder: None, + on_demand: None, + transaction_pool: Arc::new(crate::config::EmptyTransactionPool), + protocol_id: config::ProtocolId::from(&b"/test-protocol-name"[..]), + import_queue, + block_announce_validator: Box::new( + sp_consensus::block_validation::DefaultBlockAnnounceValidator, + ), + metrics_registry: None, + }) + .unwrap(); + + let service = worker.service().clone(); + let event_stream = service.event_stream("test"); + + async_std::task::spawn(async move { + futures::pin_mut!(worker); + let _ = worker.await; + }); + + (service, event_stream) +} + +const ENGINE_ID: sp_runtime::ConsensusEngineId = *b"foo\0"; + +/// Builds two nodes and their associated events stream. +/// The nodes are connected together and have the `ENGINE_ID` protocol registered. +fn build_nodes_one_proto() + -> (Arc, impl Stream, Arc, impl Stream) +{ + let listen_addr = config::build_multiaddr![Memory(rand::random::())]; + + let (node1, events_stream1) = build_test_full_node(config::NetworkConfiguration { + notifications_protocols: vec![(ENGINE_ID, From::from(&b"/foo"[..]))], + listen_addresses: vec![listen_addr.clone()], + transport: config::TransportConfig::MemoryOnly, + .. config::NetworkConfiguration::new_local() + }); + + let (node2, events_stream2) = build_test_full_node(config::NetworkConfiguration { + notifications_protocols: vec![(ENGINE_ID, From::from(&b"/foo"[..]))], + listen_addresses: vec![], + reserved_nodes: vec![config::MultiaddrWithPeerId { + multiaddr: listen_addr, + peer_id: node1.local_peer_id().clone(), + }], + transport: config::TransportConfig::MemoryOnly, + .. config::NetworkConfiguration::new_local() + }); + + (node1, events_stream1, node2, events_stream2) +} + +#[test] +fn basic_works() { + const NUM_NOTIFS: usize = 256; + + let (node1, mut events_stream1, node2, mut events_stream2) = build_nodes_one_proto(); + let node2_id = node2.local_peer_id().clone(); + + let receiver = async_std::task::spawn(async move { + let mut received_notifications = 0; + + while received_notifications < NUM_NOTIFS { + match events_stream2.next().await.unwrap() { + Event::NotificationStreamClosed { .. } => panic!(), + Event::NotificationsReceived { messages, .. } => { + for message in messages { + assert_eq!(message.0, ENGINE_ID); + assert_eq!(message.1, &b"message"[..]); + received_notifications += 1; + } + } + _ => {} + }; + + if rand::random::() < 2 { + async_std::task::sleep(Duration::from_millis(rand::random::() % 750)).await; + } + } + }); + + async_std::task::block_on(async move { + let (sender, bg_future) = + DirectedGossip::new(node1, node2_id, ENGINE_ID, NUM_NOTIFS, |msg| msg); + async_std::task::spawn(bg_future); + + // Wait for the `NotificationStreamOpened`. + loop { + match events_stream1.next().await.unwrap() { + Event::NotificationStreamOpened { .. } => break, + _ => {} + }; + } + + for _ in 0..NUM_NOTIFS { + sender.queue_or_discard(b"message".to_vec()).await; + } + + receiver.await; + }); +} From 126c3a7be19768f960ad0ca835c5c4cd0a64f246 Mon Sep 17 00:00:00 2001 From: Pierre Krieger Date: Thu, 6 Aug 2020 10:27:26 +0200 Subject: [PATCH 08/12] Concerns --- client/network/src/gossip.rs | 67 ++++++++++++++++-------------- client/network/src/gossip/tests.rs | 4 +- 2 files changed, 37 insertions(+), 34 deletions(-) diff --git a/client/network/src/gossip.rs b/client/network/src/gossip.rs index 4848e83b431e5..1a24c8005d04e 100644 --- a/client/network/src/gossip.rs +++ b/client/network/src/gossip.rs @@ -24,25 +24,25 @@ //! the [`NetworkService::notification_sender`] method. This method is quite low level and isn't //! expected to be used directly. //! -//! The [`DirectedGossip`] struct provided by this module is built on top of +//! The [`QueueSender`] struct provided by this module is built on top of //! [`NetworkService::notification_sender`] and provides a cleaner way to send notifications. //! //! # Behaviour //! -//! An instance of [`DirectedGossip`] is specific to a certain combination of `PeerId` and +//! An instance of [`QueueSender`] is specific to a certain combination of `PeerId` and //! protocol name. It maintains a buffer of messages waiting to be sent out. The user of this API //! is able to manipulate that queue, adding or removing obsolete messages. //! -//! Creating a [`DirectedGossip`] also returns a opaque `Future` whose responsibility it to +//! Creating a [`QueueSender`] also returns a opaque `Future` whose responsibility it to //! drain that queue and actually send the messages. If the substream with the given combination //! of peer and protocol is closed, the queue is silently discarded. It is the role of the user //! to track which peers we are connected to. //! -//! In normal situations, messages sent through a [`DirectedGossip`] will arrive in the same +//! In normal situations, messages sent through a [`QueueSender`] will arrive in the same //! order as they have been sent. //! It is possible, in the situation of disconnects and reconnects, that messages arrive in a //! different order. See also https://github.com/paritytech/substrate/issues/6756. -//! However, if multiple instances of [`DirectedGossip`] exist for the same peer and protocol, or +//! However, if multiple instances of [`QueueSender`] exist for the same peer and protocol, or //! if some other code uses the [`NetworkService`] to send notifications to this combination or //! peer and protocol, then the notifications will be interleaved in an unpredictable way. //! @@ -64,16 +64,16 @@ use std::{ mod tests; /// Notifications sender for a specific combination of network service, peer, and protocol. -pub struct DirectedGossip { +pub struct QueueSender { /// Shared between the front and the back task. shared: Arc>, } -impl DirectedGossip { - /// Returns a new [`DirectedGossip`] containing a queue of message for this specific +impl QueueSender { + /// Returns a new [`QueueSender`] containing a queue of message for this specific /// combination of peer and protocol. /// - /// In addition to the [`DirectedGossip`], also returns a `Future` whose role is to drive + /// In addition to the [`QueueSender`], also returns a `Future` whose role is to drive /// the messages sending forward. pub fn new( service: Arc>, @@ -88,15 +88,15 @@ impl DirectedGossip { H: ExHashT, F: Fn(M) -> Vec + Send + 'static, { - DirectedGossipPrototype::new(service, peer_id) + QueueSenderPrototype::new(service, peer_id) .build(protocol, queue_size_limit, messages_encode) } /// Locks the queue of messages towards this peer. /// /// The returned `Future` is expected to be ready quite quickly. - pub async fn lock_queue<'a>(&'a self) -> QueueLock<'a, M> { - QueueLock { + pub async fn lock_queue<'a>(&'a self) -> QueueGuard<'a, M> { + QueueGuard { messages_queue: self.shared.messages_queue.lock().await, condvar: &self.shared.condvar, queue_size_limit: self.shared.queue_size_limit, @@ -114,13 +114,13 @@ impl DirectedGossip { } } -impl fmt::Debug for DirectedGossip { +impl fmt::Debug for QueueSender { fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { - f.debug_struct("DirectedGossip").finish() + f.debug_struct("QueueSender").finish() } } -impl Drop for DirectedGossip { +impl Drop for QueueSender { fn drop(&mut self) { // The "clean" way to notify the `Condvar` here is normally to first lock the `Mutex`, // then notify the `Condvar` while the `Mutex` is locked. Unfortunately, the `Mutex` @@ -133,15 +133,15 @@ impl Drop for DirectedGossip { } /// Utility. Generic over the type of the messages. Holds a [`NetworkService`] and a [`PeerId`]. -/// Provides a [`DirectedGossipPrototype::build`] function that builds a [`DirectedGossip`]. +/// Provides a [`QueueSenderPrototype::build`] function that builds a [`QueueSender`]. #[derive(Clone)] -pub struct DirectedGossipPrototype { +pub struct QueueSenderPrototype { service: Arc, peer_id: PeerId, } -impl DirectedGossipPrototype { - /// Builds a new [`DirectedGossipPrototype`] containing the given components. +impl QueueSenderPrototype { + /// Builds a new [`QueueSenderPrototype`] containing the given components. pub fn new( service: Arc>, peer_id: PeerId, @@ -150,13 +150,13 @@ impl DirectedGossipPrototype { B: BlockT + 'static, H: ExHashT, { - DirectedGossipPrototype { + QueueSenderPrototype { service, peer_id, } } - /// Turns this [`DirectedGossipPrototype`] into a [`DirectedGossip`] and a future. + /// Turns this [`QueueSenderPrototype`] into a [`QueueSender`] and a future. /// /// See [`DirectGossip::new`] for details. pub fn build( @@ -164,7 +164,7 @@ impl DirectedGossipPrototype { protocol: ConsensusEngineId, queue_size_limit: usize, messages_encode: F - ) -> (DirectedGossip, impl Future + Send + 'static) + ) -> (QueueSender, impl Future + Send + 'static) where M: Send + 'static, F: Fn(M) -> Vec + Send + 'static, @@ -184,13 +184,13 @@ impl DirectedGossipPrototype { messages_encode ); - (DirectedGossip { shared }, task) + (QueueSender { shared }, task) } } -impl fmt::Debug for DirectedGossipPrototype { +impl fmt::Debug for QueueSenderPrototype { fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { - f.debug_struct("DirectedGossipPrototype") + f.debug_struct("QueueSenderPrototype") .field("peer_id", &self.peer_id) .finish() } @@ -198,18 +198,21 @@ impl fmt::Debug for DirectedGossipPrototype { /// Locked queue of messages to the given peer. /// -/// As long as this struct exists, the background task is asleep and the owner of the [`QueueLock`] -/// is in total control of the buffer. +/// As long as this struct exists, the background task is asleep and the owner of the [`QueueGuard`] +/// is in total control of the buffer. Messages can only ever be sent out after the [`QueueGuard`] +/// is dropped. #[must_use] -pub struct QueueLock<'a, M> { +pub struct QueueGuard<'a, M> { messages_queue: MutexGuard<'a, VecDeque>, condvar: &'a Condvar, /// Same as [`Shared::queue_size_limit`]. queue_size_limit: usize, } -impl<'a, M: Send + 'static> QueueLock<'a, M> { +impl<'a, M: Send + 'static> QueueGuard<'a, M> { /// Pushes a message to the queue, or discards it if the queue is full. + /// + /// The message will only start being sent out after the [`QueueGuard`] is dropped. pub fn push_or_discard(&mut self, message: M) { if self.messages_queue.len() < self.queue_size_limit { self.messages_queue.push_back(message); @@ -226,7 +229,7 @@ impl<'a, M: Send + 'static> QueueLock<'a, M> { } } -impl<'a, M> Drop for QueueLock<'a, M> { +impl<'a, M> Drop for QueueGuard<'a, M> { fn drop(&mut self) { // We notify the `Condvar` in the destructor in order to be able to push multiple // messages and wake up the background task only once afterwards. @@ -266,10 +269,10 @@ async fn spawn_task Vec>( break 'next_msg msg; } - // It is possible that the destructor of `DirectedGossip` sets `stop_task` to + // It is possible that the destructor of `QueueSender` sets `stop_task` to // true and notifies the `Condvar` after the background task loads `stop_task` // and before it calls `Condvar::wait`. - // See also the corresponding comment in `DirectedGossip::drop`. + // See also the corresponding comment in `QueueSender::drop`. // For this reason, we use `wait_timeout`. In the worst case scenario, // `stop_task` will always be checked again after the timeout is reached. lock = shared.condvar.wait_timeout(lock, Duration::from_secs(10)).await.0; diff --git a/client/network/src/gossip/tests.rs b/client/network/src/gossip/tests.rs index 07779835c0884..55fd352b17f27 100644 --- a/client/network/src/gossip/tests.rs +++ b/client/network/src/gossip/tests.rs @@ -16,7 +16,7 @@ // You should have received a copy of the GNU General Public License // along with this program. If not, see . -use crate::{config, gossip::DirectedGossip, Event, NetworkService, NetworkWorker}; +use crate::{config, gossip::QueueSender, Event, NetworkService, NetworkWorker}; use futures::prelude::*; use sp_runtime::traits::{Block as BlockT, Header as _}; @@ -181,7 +181,7 @@ fn basic_works() { async_std::task::block_on(async move { let (sender, bg_future) = - DirectedGossip::new(node1, node2_id, ENGINE_ID, NUM_NOTIFS, |msg| msg); + QueueSender::new(node1, node2_id, ENGINE_ID, NUM_NOTIFS, |msg| msg); async_std::task::spawn(bg_future); // Wait for the `NotificationStreamOpened`. From ffbe0bbfe257900bb2f62edf55b9a53b54b43d79 Mon Sep 17 00:00:00 2001 From: Pierre Krieger Date: Thu, 6 Aug 2020 11:02:22 +0200 Subject: [PATCH 09/12] More concerns --- client/network/src/gossip.rs | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/client/network/src/gossip.rs b/client/network/src/gossip.rs index 1a24c8005d04e..31d3105010fb5 100644 --- a/client/network/src/gossip.rs +++ b/client/network/src/gossip.rs @@ -125,7 +125,6 @@ impl Drop for QueueSender { // The "clean" way to notify the `Condvar` here is normally to first lock the `Mutex`, // then notify the `Condvar` while the `Mutex` is locked. Unfortunately, the `Mutex` // being asynchronous, it can't reasonably be locked from within a destructor. - // For this reason, this destructor is a "best effort" destructor. // See also the corresponding code in the background task. self.shared.stop_task.store(true, atomic::Ordering::Release); self.shared.condvar.notify_all(); @@ -258,14 +257,14 @@ async fn spawn_task Vec>( ) { loop { let next_message = 'next_msg: loop { - let mut lock = shared.messages_queue.lock().await; + let mut queue = shared.messages_queue.lock().await; loop { if shared.stop_task.load(atomic::Ordering::Acquire) { return; } - if let Some(msg) = lock.pop_front() { + if let Some(msg) = queue.pop_front() { break 'next_msg msg; } @@ -275,7 +274,7 @@ async fn spawn_task Vec>( // See also the corresponding comment in `QueueSender::drop`. // For this reason, we use `wait_timeout`. In the worst case scenario, // `stop_task` will always be checked again after the timeout is reached. - lock = shared.condvar.wait_timeout(lock, Duration::from_secs(10)).await.0; + queue = shared.condvar.wait_timeout(queue, Duration::from_secs(10)).await.0; } }; From 3a79f62a1c2c62d2a45519087026d27f97d9b2df Mon Sep 17 00:00:00 2001 From: Pierre Krieger Date: Fri, 7 Aug 2020 11:39:40 +0200 Subject: [PATCH 10/12] Remove QueueSenderPrototype --- client/network/src/gossip.rs | 107 +++++++---------------------------- 1 file changed, 19 insertions(+), 88 deletions(-) diff --git a/client/network/src/gossip.rs b/client/network/src/gossip.rs index 31d3105010fb5..a8cf96bb3dc19 100644 --- a/client/network/src/gossip.rs +++ b/client/network/src/gossip.rs @@ -47,7 +47,7 @@ //! peer and protocol, then the notifications will be interleaved in an unpredictable way. //! -use crate::{ExHashT, NetworkService, service::{NotificationSender, NotificationSenderError}}; +use crate::{ExHashT, NetworkService}; use async_std::sync::{Condvar, Mutex, MutexGuard}; use futures::prelude::*; @@ -88,8 +88,22 @@ impl QueueSender { H: ExHashT, F: Fn(M) -> Vec + Send + 'static, { - QueueSenderPrototype::new(service, peer_id) - .build(protocol, queue_size_limit, messages_encode) + let shared = Arc::new(Shared { + stop_task: atomic::AtomicBool::new(false), + condvar: Condvar::new(), + queue_size_limit, + messages_queue: Mutex::new(VecDeque::with_capacity(queue_size_limit)), + }); + + let task = spawn_task( + service, + peer_id, + protocol, + shared.clone(), + messages_encode + ); + + (QueueSender { shared }, task) } /// Locks the queue of messages towards this peer. @@ -131,70 +145,6 @@ impl Drop for QueueSender { } } -/// Utility. Generic over the type of the messages. Holds a [`NetworkService`] and a [`PeerId`]. -/// Provides a [`QueueSenderPrototype::build`] function that builds a [`QueueSender`]. -#[derive(Clone)] -pub struct QueueSenderPrototype { - service: Arc, - peer_id: PeerId, -} - -impl QueueSenderPrototype { - /// Builds a new [`QueueSenderPrototype`] containing the given components. - pub fn new( - service: Arc>, - peer_id: PeerId, - ) -> Self - where - B: BlockT + 'static, - H: ExHashT, - { - QueueSenderPrototype { - service, - peer_id, - } - } - - /// Turns this [`QueueSenderPrototype`] into a [`QueueSender`] and a future. - /// - /// See [`DirectGossip::new`] for details. - pub fn build( - self, - protocol: ConsensusEngineId, - queue_size_limit: usize, - messages_encode: F - ) -> (QueueSender, impl Future + Send + 'static) - where - M: Send + 'static, - F: Fn(M) -> Vec + Send + 'static, - { - let shared = Arc::new(Shared { - stop_task: atomic::AtomicBool::new(false), - condvar: Condvar::new(), - queue_size_limit, - messages_queue: Mutex::new(VecDeque::with_capacity(queue_size_limit)), - }); - - let task = spawn_task( - self.service, - self.peer_id, - protocol, - shared.clone(), - messages_encode - ); - - (QueueSender { shared }, task) - } -} - -impl fmt::Debug for QueueSenderPrototype { - fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { - f.debug_struct("QueueSenderPrototype") - .field("peer_id", &self.peer_id) - .finish() - } -} - /// Locked queue of messages to the given peer. /// /// As long as this struct exists, the background task is asleep and the owner of the [`QueueGuard`] @@ -248,8 +198,8 @@ struct Shared { queue_size_limit: usize, } -async fn spawn_task Vec>( - service: Arc, +async fn spawn_task Vec>( + service: Arc>, peer_id: PeerId, protocol: ConsensusEngineId, shared: Arc>, @@ -293,22 +243,3 @@ async fn spawn_task Vec>( let _ = ready.send(messages_encode(next_message)); } } - -/// Abstraction around `NetworkService` that permits removing the `B` and `H` parameters. -trait AbstractNotificationSender { - fn notification_sender( - &self, - target: PeerId, - engine_id: ConsensusEngineId, - ) -> Result; -} - -impl AbstractNotificationSender for NetworkService { - fn notification_sender( - &self, - target: PeerId, - engine_id: ConsensusEngineId, - ) -> Result { - NetworkService::notification_sender(self, target, engine_id) - } -} From 9ebc8f82b154bf6b0d644148198543568f7f152c Mon Sep 17 00:00:00 2001 From: Pierre Krieger Date: Thu, 13 Aug 2020 10:45:33 +0200 Subject: [PATCH 11/12] Rename --- client/network/src/gossip.rs | 30 +++++++++++++++--------------- 1 file changed, 15 insertions(+), 15 deletions(-) diff --git a/client/network/src/gossip.rs b/client/network/src/gossip.rs index a8cf96bb3dc19..0650e7a2f818b 100644 --- a/client/network/src/gossip.rs +++ b/client/network/src/gossip.rs @@ -24,25 +24,25 @@ //! the [`NetworkService::notification_sender`] method. This method is quite low level and isn't //! expected to be used directly. //! -//! The [`QueueSender`] struct provided by this module is built on top of +//! The [`QueuedSender`] struct provided by this module is built on top of //! [`NetworkService::notification_sender`] and provides a cleaner way to send notifications. //! //! # Behaviour //! -//! An instance of [`QueueSender`] is specific to a certain combination of `PeerId` and +//! An instance of [`QueuedSender`] is specific to a certain combination of `PeerId` and //! protocol name. It maintains a buffer of messages waiting to be sent out. The user of this API //! is able to manipulate that queue, adding or removing obsolete messages. //! -//! Creating a [`QueueSender`] also returns a opaque `Future` whose responsibility it to +//! Creating a [`QueuedSender`] also returns a opaque `Future` whose responsibility it to //! drain that queue and actually send the messages. If the substream with the given combination //! of peer and protocol is closed, the queue is silently discarded. It is the role of the user //! to track which peers we are connected to. //! -//! In normal situations, messages sent through a [`QueueSender`] will arrive in the same +//! In normal situations, messages sent through a [`QueuedSender`] will arrive in the same //! order as they have been sent. //! It is possible, in the situation of disconnects and reconnects, that messages arrive in a //! different order. See also https://github.com/paritytech/substrate/issues/6756. -//! However, if multiple instances of [`QueueSender`] exist for the same peer and protocol, or +//! However, if multiple instances of [`QueuedSender`] exist for the same peer and protocol, or //! if some other code uses the [`NetworkService`] to send notifications to this combination or //! peer and protocol, then the notifications will be interleaved in an unpredictable way. //! @@ -64,16 +64,16 @@ use std::{ mod tests; /// Notifications sender for a specific combination of network service, peer, and protocol. -pub struct QueueSender { +pub struct QueuedSender { /// Shared between the front and the back task. shared: Arc>, } -impl QueueSender { - /// Returns a new [`QueueSender`] containing a queue of message for this specific +impl QueuedSender { + /// Returns a new [`QueuedSender`] containing a queue of message for this specific /// combination of peer and protocol. /// - /// In addition to the [`QueueSender`], also returns a `Future` whose role is to drive + /// In addition to the [`QueuedSender`], also returns a `Future` whose role is to drive /// the messages sending forward. pub fn new( service: Arc>, @@ -103,7 +103,7 @@ impl QueueSender { messages_encode ); - (QueueSender { shared }, task) + (QueuedSender { shared }, task) } /// Locks the queue of messages towards this peer. @@ -128,13 +128,13 @@ impl QueueSender { } } -impl fmt::Debug for QueueSender { +impl fmt::Debug for QueuedSender { fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { - f.debug_struct("QueueSender").finish() + f.debug_struct("QueuedSender").finish() } } -impl Drop for QueueSender { +impl Drop for QueuedSender { fn drop(&mut self) { // The "clean" way to notify the `Condvar` here is normally to first lock the `Mutex`, // then notify the `Condvar` while the `Mutex` is locked. Unfortunately, the `Mutex` @@ -218,10 +218,10 @@ async fn spawn_task Vec>( break 'next_msg msg; } - // It is possible that the destructor of `QueueSender` sets `stop_task` to + // It is possible that the destructor of `QueuedSender` sets `stop_task` to // true and notifies the `Condvar` after the background task loads `stop_task` // and before it calls `Condvar::wait`. - // See also the corresponding comment in `QueueSender::drop`. + // See also the corresponding comment in `QueuedSender::drop`. // For this reason, we use `wait_timeout`. In the worst case scenario, // `stop_task` will always be checked again after the timeout is reached. queue = shared.condvar.wait_timeout(queue, Duration::from_secs(10)).await.0; From b1b1d628cb5d69909b8f9a0cc391a6ffa069c99f Mon Sep 17 00:00:00 2001 From: Pierre Krieger Date: Fri, 14 Aug 2020 09:23:01 +0200 Subject: [PATCH 12/12] Apply suggestions from code review Co-authored-by: Max Inden --- client/network/src/gossip/tests.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/client/network/src/gossip/tests.rs b/client/network/src/gossip/tests.rs index 55fd352b17f27..9b16e057461bf 100644 --- a/client/network/src/gossip/tests.rs +++ b/client/network/src/gossip/tests.rs @@ -16,7 +16,7 @@ // You should have received a copy of the GNU General Public License // along with this program. If not, see . -use crate::{config, gossip::QueueSender, Event, NetworkService, NetworkWorker}; +use crate::{config, gossip::QueuedSender, Event, NetworkService, NetworkWorker}; use futures::prelude::*; use sp_runtime::traits::{Block as BlockT, Header as _}; @@ -181,7 +181,7 @@ fn basic_works() { async_std::task::block_on(async move { let (sender, bg_future) = - QueueSender::new(node1, node2_id, ENGINE_ID, NUM_NOTIFS, |msg| msg); + QueuedSender::new(node1, node2_id, ENGINE_ID, NUM_NOTIFS, |msg| msg); async_std::task::spawn(bg_future); // Wait for the `NotificationStreamOpened`.