-
Notifications
You must be signed in to change notification settings - Fork 2.6k
Add a DirectedGossip struct #6803
Changes from 3 commits
6384b85
b5b0bca
02dff05
67e531d
bf5fbb0
c11bcfb
da88d2b
126c3a7
ffbe0bb
3a79f62
a4071a1
9ebc8f8
b1b1d62
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,318 @@ | ||
// This file is part of Substrate. | ||
|
||
// Copyright (C) 2017-2020 Parity Technologies (UK) Ltd. | ||
// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0 | ||
|
||
// This program is free software: you can redistribute it and/or modify | ||
// it under the terms of the GNU General Public License as published by | ||
// the Free Software Foundation, either version 3 of the License, or | ||
// (at your option) any later version. | ||
|
||
// This program is distributed in the hope that it will be useful, | ||
// but WITHOUT ANY WARRANTY; without even the implied warranty of | ||
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the | ||
// GNU General Public License for more details. | ||
|
||
// You should have received a copy of the GNU General Public License | ||
// along with this program. If not, see <https://www.gnu.org/licenses/>. | ||
|
||
//! Helper for sending rate-limited gossip messages. | ||
tomaka marked this conversation as resolved.
Show resolved
Hide resolved
|
||
//! | ||
//! # Context | ||
//! | ||
//! The [`NetworkService`] struct provides a way to send notifications to a certain peer through | ||
//! the [`NetworkService::notification_sender`] method. This method is quite low level and isn't | ||
//! expected to be used directly. | ||
//! | ||
//! The [`DirectedGossip`] struct provided by this module is built on top of | ||
//! [`NetworkService::notification_sender`] and provides a cleaner way to send notifications. | ||
//! | ||
//! # Behaviour | ||
//! | ||
//! An instance of [`DirectedGossip`] is specific to a certain combination of `PeerId` and | ||
//! protocol name. It maintains a buffer of messages waiting to be sent out. The user of this API | ||
//! is able to manipulate that queue, adding or removing obsolete messages. | ||
//! | ||
//! Creating a [`DirectedGossip`] also returns a opaque `Future` whose responsibility it to | ||
//! drain that queue and actually send the messages. If the substream with the given combination | ||
//! of peer and protocol is closed, the queue is silently discarded. It is the role of the user | ||
//! to track which peers we are connected to. | ||
//! | ||
//! If multiple instances of [`DirectedGossip`] exist for the same peer and protocol, or if some | ||
//! other code uses the [`NetworkService`] to send notifications to this peer and protocol, then | ||
//! the notifications will be interleaved in an unpredictable way. | ||
tomaka marked this conversation as resolved.
Show resolved
Hide resolved
|
||
//! | ||
|
||
use crate::{ExHashT, NetworkService, service::{NotificationSender, NotificationSenderError}}; | ||
|
||
use async_std::sync::{Condvar, Mutex, MutexGuard}; | ||
use futures::prelude::*; | ||
use libp2p::PeerId; | ||
use sp_runtime::{traits::Block as BlockT, ConsensusEngineId}; | ||
use std::{ | ||
collections::VecDeque, | ||
fmt, | ||
sync::{atomic, Arc}, | ||
time::Duration, | ||
}; | ||
|
||
/// Notifications sender for a specific combination of network service, peer, and protocol. | ||
pub struct DirectedGossip<M> { | ||
/// Shared between the front and the back task. | ||
shared: Arc<Shared<M>>, | ||
} | ||
|
||
impl<M> DirectedGossip<M> { | ||
/// Returns a new [`DirectedGossip`] containing a queue of message for this specific | ||
/// combination of peer and protocol. | ||
/// | ||
/// In addition to the [`DirectedGossip`], also returns a `Future` whose role is to drive | ||
/// the messages sending forward. | ||
pub fn new<B, H, F>( | ||
service: Arc<NetworkService<B, H>>, | ||
peer_id: PeerId, | ||
protocol: ConsensusEngineId, | ||
queue_size_limit: usize, | ||
messages_encode: F | ||
) -> (Self, impl Future<Output = ()> + Send + 'static) | ||
where | ||
M: Send + 'static, | ||
B: BlockT + 'static, | ||
H: ExHashT, | ||
F: Fn(M) -> Vec<u8> + Send + 'static, | ||
{ | ||
DirectedGossipPrototype::new(service, peer_id) | ||
.build(protocol, queue_size_limit, messages_encode) | ||
} | ||
|
||
/// Locks the queue of messages towards this peer. | ||
/// | ||
/// The returned `Future` is expected to be ready quite quickly. | ||
pub async fn lock_queue<'a>(&'a self) -> QueueLock<'a, M> { | ||
QueueLock { | ||
lock: self.shared.locked.lock().await, | ||
condvar: &self.shared.condvar, | ||
queue_size_limit: self.shared.queue_size_limit, | ||
} | ||
} | ||
|
||
/// Pushes a message to the queue, or discards it if the queue is full. | ||
/// | ||
/// The returned `Future` is expected to be ready quite quickly. | ||
pub async fn queue_or_discard(&self, message: M) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Why offer two ways to There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This API could return a There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It's a convenient shortcut. |
||
where | ||
M: Send + 'static | ||
{ | ||
self.lock_queue().await.push_or_discard(message); | ||
} | ||
} | ||
|
||
impl<M> fmt::Debug for DirectedGossip<M> { | ||
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { | ||
f.debug_struct("DirectedGossip").finish() | ||
} | ||
} | ||
|
||
impl<M> Drop for DirectedGossip<M> { | ||
fn drop(&mut self) { | ||
// The "clean" way to notify the `Condvar` here is normally to first lock the `Mutex`, | ||
// then notify the `Condvar` while the `Mutex` is locked. Unfortunately, the `Mutex` | ||
// being asynchronous, it can't reasonably be locked from within a destructor. | ||
// For this reason, this destructor is a "best effort" destructor. | ||
tomaka marked this conversation as resolved.
Show resolved
Hide resolved
|
||
// See also the corresponding code in the background task. | ||
self.shared.stop_task.store(true, atomic::Ordering::Acquire); | ||
tomaka marked this conversation as resolved.
Show resolved
Hide resolved
|
||
self.shared.condvar.notify_all(); | ||
mxinden marked this conversation as resolved.
Show resolved
Hide resolved
|
||
} | ||
} | ||
|
||
/// Utility. Generic over the type of the messages. Holds a [`NetworkService`] and a [`PeerId`]. | ||
/// Provides a [`DirectedGossipPrototype::build`] function that builds a [`DirectedGossip`]. | ||
#[derive(Clone)] | ||
pub struct DirectedGossipPrototype { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I admit that this is type is specifically targeted for the Polkadot use case. In Polkadot's code, the so-called network bridge communicates via messages with other subsystems. For example, when we connect, a The network bridge doesn't know what is the type of the networking messages that the various subsystems would manipulate (the This There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Should Polkadot-specific code not be put in Polkadot? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
I am actually changing this right now, so we may not need this type. Will push a PR shortly There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. paritytech/polkadot#1535 - now the network bridge is aware of the specific information flowing over the network and we may be able to avoid these prototypes. However, different subsystems will still want There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
To answer specifically this: the way I see it, this code isn't really Polkadot-specific. I think that, in general, we want to be as "universal" as possible when designing low-level code, but as long as the lower-level primitives are exposed, higher-level code can be a bit more targeted towards certain use-cases. |
||
service: Arc<dyn AbstractNotificationSender + Send + Sync + 'static>, | ||
peer_id: PeerId, | ||
} | ||
|
||
impl DirectedGossipPrototype { | ||
/// Builds a new [`DirectedGossipPrototype`] containing the given components. | ||
pub fn new<B, H>( | ||
service: Arc<NetworkService<B, H>>, | ||
peer_id: PeerId, | ||
) -> Self | ||
where | ||
B: BlockT + 'static, | ||
H: ExHashT, | ||
{ | ||
DirectedGossipPrototype { | ||
service, | ||
peer_id, | ||
} | ||
} | ||
|
||
/// Turns this [`DirectedGossipPrototype`] into a [`DirectedGossip`] and a future. | ||
/// | ||
/// See [`DirectGossip::new`] for details. | ||
pub fn build<M, F>( | ||
self, | ||
protocol: ConsensusEngineId, | ||
queue_size_limit: usize, | ||
messages_encode: F | ||
) -> (DirectedGossip<M>, impl Future<Output = ()> + Send + 'static) | ||
where | ||
M: Send + 'static, | ||
F: Fn(M) -> Vec<u8> + Send + 'static, | ||
{ | ||
let shared = Arc::new(Shared { | ||
stop_task: atomic::AtomicBool::new(false), | ||
condvar: Condvar::new(), | ||
queue_size_limit, | ||
locked: Mutex::new(SharedLock { | ||
messages_queue: VecDeque::with_capacity(queue_size_limit), | ||
}), | ||
}); | ||
|
||
let task = spawn_task( | ||
self.service, | ||
self.peer_id, | ||
protocol, | ||
shared.clone(), | ||
messages_encode | ||
); | ||
|
||
(DirectedGossip { shared }, task) | ||
} | ||
} | ||
|
||
impl fmt::Debug for DirectedGossipPrototype { | ||
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { | ||
f.debug_struct("DirectedGossipPrototype") | ||
.field("peer_id", &self.peer_id) | ||
.finish() | ||
} | ||
} | ||
|
||
/// Locked queue of messages to the given peer. | ||
/// | ||
/// As long as this struct exists, the background task is asleep and the owner of the [`QueueLock`] | ||
/// is in total control of the buffer. | ||
tomaka marked this conversation as resolved.
Show resolved
Hide resolved
|
||
#[must_use] | ||
pub struct QueueLock<'a, M> { | ||
tomaka marked this conversation as resolved.
Show resolved
Hide resolved
|
||
lock: MutexGuard<'a, SharedLock<M>>, | ||
condvar: &'a Condvar, | ||
/// Same as [`Shared::queue_size_limit`]. | ||
queue_size_limit: usize, | ||
} | ||
|
||
impl<'a, M: Send + 'static> QueueLock<'a, M> { | ||
tomaka marked this conversation as resolved.
Show resolved
Hide resolved
|
||
/// Pushes a message to the queue, or discards it if the queue is full. | ||
pub fn push_or_discard(&mut self, message: M) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Same here. It would be good to get back the There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. But what would you do with the message that is returned? Put it in another queue? Additionally, what if the connection with the remote is closed? Are we supposed to return back the message as well? If so, then it's very problematic because we can't detect this. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Isn't And is there no sane thing you can do? Why can't you wait for space to appear in the queue or something like that? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The way I see
The entire reason for this API to exist is to remove the need for any waiting. See also this paragraph. Ultimately there has to be a code somewhere that holds some sort of There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It would be nice to have a way to wait, although I agree that the API for that should be used sparingly so you don't degrade to the performance of the slowest peer. There are cases where we don't want to drop messages, for instance when responding to a validator's request. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
I believe that everything that would fall in this category should be covered by request-response protocols. Rather than adding a wait, I could restore the There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. |
||
if self.lock.messages_queue.len() < self.queue_size_limit { | ||
self.lock.messages_queue.push_back(message); | ||
} | ||
} | ||
|
||
/// Pushes a message to the queue. Does not enforce any limit to the size of the queue. | ||
/// Use this method only if the message is extremely important. | ||
pub fn push_unbounded(&mut self, message: M) { | ||
tomaka marked this conversation as resolved.
Show resolved
Hide resolved
|
||
self.lock.messages_queue.push_back(message); | ||
} | ||
|
||
/// Calls `filter` for each message in the queue, and removes the ones for which `false` is | ||
/// returned. | ||
/// | ||
/// > **Note**: The parameter of `filter` is a `&M` and not a `&mut M` (which would be | ||
/// > better) because the underlying implementation relies on `VecDeque::retain`. | ||
pub fn retain(&mut self, filter: impl FnMut(&M) -> bool) { | ||
self.lock.messages_queue.retain(filter); | ||
} | ||
} | ||
|
||
impl<'a, M> Drop for QueueLock<'a, M> { | ||
fn drop(&mut self) { | ||
// We notify the `Condvar` in the destructor in order to be able to push multiple | ||
// messages and wake up the background task only one afterwards. | ||
tomaka marked this conversation as resolved.
Show resolved
Hide resolved
|
||
self.condvar.notify_one(); | ||
} | ||
} | ||
|
||
#[derive(Debug)] | ||
struct Shared<M> { | ||
/// Read by the background task after locking `locked`. If true, the task stops. | ||
stop_task: atomic::AtomicBool, | ||
locked: Mutex<SharedLock<M>>, | ||
/// Must be notified every time the content of `locked` changes. | ||
condvar: Condvar, | ||
/// Maximum number of elements in `messages_queue`. | ||
queue_size_limit: usize, | ||
} | ||
|
||
#[derive(Debug)] | ||
struct SharedLock<M> { | ||
tomaka marked this conversation as resolved.
Show resolved
Hide resolved
|
||
/// Queue of messages waiting to be sent out. | ||
messages_queue: VecDeque<M>, | ||
} | ||
|
||
async fn spawn_task<M, F: Fn(M) -> Vec<u8>>( | ||
service: Arc<dyn AbstractNotificationSender + Send + Sync + 'static>, | ||
peer_id: PeerId, | ||
protocol: ConsensusEngineId, | ||
shared: Arc<Shared<M>>, | ||
messages_encode: F, | ||
) { | ||
loop { | ||
let next_message = 'next_msg: loop { | ||
let mut locked = shared.locked.lock().await; | ||
|
||
loop { | ||
if shared.stop_task.load(atomic::Ordering::Acquire) { | ||
return; | ||
} | ||
|
||
if let Some(msg) = locked.messages_queue.pop_front() { | ||
break 'next_msg msg; | ||
} | ||
|
||
// It is possible that the destructor of `DirectedGossip` sets `stop_task` to | ||
// true and notifies the `Condvar` after the background task loads `stop_task` | ||
// and before it calls `Condvar::wait`. | ||
// See also the corresponding comment in `DirectedGossip::drop`. | ||
// For this reason, we use `wait_timeout`. In the worst case scenario, | ||
// `stop_task` will always be checked again after the timeout is reached. | ||
locked = shared.condvar.wait_timeout(locked, Duration::from_secs(10)).await.0; | ||
} | ||
}; | ||
|
||
// Starting from below, we try to send the message. If an error happens when sending, | ||
// the only sane option we have is to silently discard the message. | ||
let sender = match service.notification_sender(peer_id.clone(), protocol) { | ||
Ok(s) => s, | ||
Err(_) => continue, | ||
}; | ||
|
||
let ready = match sender.ready().await { | ||
Ok(r) => r, | ||
Err(_) => continue, | ||
}; | ||
|
||
let _ = ready.send(messages_encode(next_message)); | ||
} | ||
} | ||
|
||
/// Abstraction around `NetworkService` that permits removing the `B` and `H` parameters. | ||
trait AbstractNotificationSender { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I suggest to remove this trait. The two type parameters There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It's necessary for |
||
fn notification_sender( | ||
&self, | ||
target: PeerId, | ||
engine_id: ConsensusEngineId, | ||
) -> Result<NotificationSender, NotificationSenderError>; | ||
} | ||
|
||
impl<B: BlockT, H: ExHashT> AbstractNotificationSender for NetworkService<B, H> { | ||
fn notification_sender( | ||
&self, | ||
target: PeerId, | ||
engine_id: ConsensusEngineId, | ||
) -> Result<NotificationSender, NotificationSenderError> { | ||
NetworkService::notification_sender(self, target, engine_id) | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do I understand correctly that this uses the
unstable
feature to havefutures-timer
? If so, should we not do that across the entire crate for all usage offutures-timer
?There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No, it is for
Convar
. It is unfortunate that we have to depend on theunstable
feature, but I couldn't find any crate other thanasync-std
that provides an asynchronousCondvar
.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If this is something we want to avoid, one can use a channel as a condvar.
Problem is that this can't be used in the
Drop
implementations and thus cleanup would depend on the 10 sec timeout.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It does not always need to. When the
Sender
is dropped, the receiver will notice and the task can terminate. Instead oflet _ = rx.next().await;
one would writeif rx.next().await.is_some() { ... }
. In the Drop impl of QueueGuard one can useSender::try_send
.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I find it a bit weird to use a channel, which involves an additional
Arc
,Mutex
andVec
, just to wake up a task, rather than aWaker
.But I have now also tried using a
Waker
, and the implementation is considerably more tricky and difficult to read because of potential race conditions and having to introduce manual polling within anasync
function and having to implement your ownWaker
.Before going on, I'd like to understand what is wrong with the
Condvar
solution, as aCondvar
is exactly the tool that is designed for this specific job.