Skip to content
This repository has been archived by the owner on Nov 15, 2023. It is now read-only.

Runtime diagnostics for leaked messages in unbounded channels #12971

Merged
merged 15 commits into from
Dec 23, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 3 additions & 1 deletion client/api/src/notifications.rs
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,9 @@ impl<Block: BlockT> StorageNotifications<Block> {
filter_keys: Option<&[StorageKey]>,
filter_child_keys: Option<&[(StorageKey, Option<Vec<StorageKey>>)]>,
) -> StorageEventStream<Block::Hash> {
let receiver = self.0.subscribe(registry::SubscribeOp { filter_keys, filter_child_keys });
let receiver = self
.0
.subscribe(registry::SubscribeOp { filter_keys, filter_child_keys }, 100_000);

StorageEventStream(receiver)
}
Expand Down
4 changes: 2 additions & 2 deletions client/beefy/rpc/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ where
) -> Result<Self, Error> {
let beefy_best_block = Arc::new(RwLock::new(None));

let stream = best_block_stream.subscribe();
let stream = best_block_stream.subscribe(100_000);
let closure_clone = beefy_best_block.clone();
let future = stream.for_each(move |best_beefy| {
let async_clone = closure_clone.clone();
Expand All @@ -141,7 +141,7 @@ where
fn subscribe_justifications(&self, mut sink: SubscriptionSink) -> SubscriptionResult {
let stream = self
.finality_proof_stream
.subscribe()
.subscribe(100_000)
.map(|vfp| notification::EncodedVersionedFinalityProof::new::<Block>(vfp));

let fut = async move {
Expand Down
2 changes: 1 addition & 1 deletion client/beefy/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -265,7 +265,7 @@ where
// Subscribe to finality notifications and justifications before waiting for runtime pallet and
// reuse the streams, so we don't miss notifications while waiting for pallet to be available.
let mut finality_notifications = client.finality_notification_stream().fuse();
let block_import_justif = links.from_block_import_justif_stream.subscribe().fuse();
let block_import_justif = links.from_block_import_justif_stream.subscribe(100_000).fuse();

// Wait for BEEFY pallet to be active before starting voter.
let persisted_state =
Expand Down
10 changes: 5 additions & 5 deletions client/beefy/src/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -430,8 +430,8 @@ pub(crate) fn get_beefy_streams(
let beefy_rpc_links = net.peer(index).data.beefy_rpc_links.lock().clone().unwrap();
let BeefyRPCLinks { from_voter_justif_stream, from_voter_best_beefy_stream } =
beefy_rpc_links;
best_block_streams.push(from_voter_best_beefy_stream.subscribe());
versioned_finality_proof_streams.push(from_voter_justif_stream.subscribe());
best_block_streams.push(from_voter_best_beefy_stream.subscribe(100_000));
versioned_finality_proof_streams.push(from_voter_justif_stream.subscribe(100_000));
});
(best_block_streams, versioned_finality_proof_streams)
}
Expand Down Expand Up @@ -736,7 +736,7 @@ async fn beefy_importing_blocks() {
let hashof1 = block.header.hash();

// Import without justifications.
let mut justif_recv = justif_stream.subscribe();
let mut justif_recv = justif_stream.subscribe(100_000);
assert_eq!(
block_import
.import_block(params(block.clone(), None), HashMap::new())
Expand Down Expand Up @@ -779,7 +779,7 @@ async fn beefy_importing_blocks() {
let builder = full_client.new_block_at(&parent_id, Default::default(), false).unwrap();
let block = builder.build().unwrap().block;
let hashof2 = block.header.hash();
let mut justif_recv = justif_stream.subscribe();
let mut justif_recv = justif_stream.subscribe(100_000);
assert_eq!(
block_import.import_block(params(block, justif), HashMap::new()).await.unwrap(),
ImportResult::Imported(ImportedAux {
Expand Down Expand Up @@ -823,7 +823,7 @@ async fn beefy_importing_blocks() {
let builder = full_client.new_block_at(&parent_id, Default::default(), false).unwrap();
let block = builder.build().unwrap().block;
let hashof3 = block.header.hash();
let mut justif_recv = justif_stream.subscribe();
let mut justif_recv = justif_stream.subscribe(100_000);
assert_eq!(
block_import.import_block(params(block, justif), HashMap::new()).await.unwrap(),
ImportResult::Imported(ImportedAux {
Expand Down
8 changes: 4 additions & 4 deletions client/consensus/common/src/import_queue/basic_queue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ impl<B: BlockT, Transaction: Send + 'static> BasicQueue<B, Transaction> {
spawner: &impl sp_core::traits::SpawnEssentialNamed,
prometheus_registry: Option<&Registry>,
) -> Self {
let (result_sender, result_port) = buffered_link::buffered_link();
let (result_sender, result_port) = buffered_link::buffered_link(100_000);

let metrics = prometheus_registry.and_then(|r| {
Metrics::register(r)
Expand Down Expand Up @@ -276,10 +276,10 @@ impl<B: BlockT> BlockImportWorker<B> {
use worker_messages::*;

let (justification_sender, mut justification_port) =
tracing_unbounded("mpsc_import_queue_worker_justification");
tracing_unbounded("mpsc_import_queue_worker_justification", 100_000);

let (block_import_sender, block_import_port) =
tracing_unbounded("mpsc_import_queue_worker_blocks");
tracing_unbounded("mpsc_import_queue_worker_blocks", 100_000);

let mut worker = BlockImportWorker { result_sender, justification_import, metrics };

Expand Down Expand Up @@ -595,7 +595,7 @@ mod tests {

#[test]
fn prioritizes_finality_work_over_block_import() {
let (result_sender, mut result_port) = buffered_link::buffered_link();
let (result_sender, mut result_port) = buffered_link::buffered_link(100_000);

let (worker, mut finality_sender, mut block_import_sender) =
BlockImportWorker::new(result_sender, (), Box::new(()), Some(Box::new(())), None);
Expand Down
12 changes: 7 additions & 5 deletions client/consensus/common/src/import_queue/buffered_link.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
//! # use sp_test_primitives::Block;
//! # struct DummyLink; impl Link<Block> for DummyLink {}
//! # let mut my_link = DummyLink;
//! let (mut tx, mut rx) = buffered_link::<Block>();
//! let (mut tx, mut rx) = buffered_link::<Block>(100_000);
//! tx.blocks_processed(0, 0, vec![]);
//!
//! // Calls `my_link.blocks_processed(0, 0, vec![])` when polled.
Expand All @@ -51,9 +51,11 @@ use super::BlockImportResult;

/// Wraps around an unbounded channel from the `futures` crate. The sender implements `Link` and
/// can be used to buffer commands, and the receiver can be used to poll said commands and transfer
/// them to another link.
pub fn buffered_link<B: BlockT>() -> (BufferedLinkSender<B>, BufferedLinkReceiver<B>) {
let (tx, rx) = tracing_unbounded("mpsc_buffered_link");
/// them to another link. `queue_size_warning` sets the warning threshold of the channel queue size.
pub fn buffered_link<B: BlockT>(
queue_size_warning: i64,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why do we need signed integer here?

Copy link
Contributor Author

@dmitry-markin dmitry-markin Dec 22, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's explained in the comment for a struct field: to avoid underflow if, due to the lack of ordering, the counter happens to go < 0.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Internally: yes. But public API doesn't need to be signed integer. This should have been u32 instead that should still be plenty big for all intents and purposes. Same for tracing_unbounded function.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm also wondering how much of a performance difference it actually makes using Relaxed ordering here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's not that relaxed ordering makes sense in terms of performance, it's about not having to bother about synchronization of increments/decrements, why signed integer is used. Relaxed ordering is just a consequence of this decision, because more strong guarantees are not needed if we use the unsigned integer.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've looked into the issue, and as far as I understand it's impossible to guarantee that the counter is never decremented before it's incremented not relying on internals of mpsc::unbounded(). Basically, we have the following events:

Thread A Thread B
increment pull
push decrement

In order for decrement to never happen before increment, push in thread A must synchronize with pull in thread B. Note that this is not a synchronization between operations with our atomic counter, but a synchronization of mpsc::unbounded() operations we are not in control of. We can try setting the strongest sequentially consistent ordering guarantee for increment and decrement, but for this to work push and pull must also be sequentially consistent operations, what is unlikely and cannot be relied on.

Please correct me if I'm missing something.

CC @bkchr

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If you use Acquire/Release it should work: https://en.cppreference.com/w/cpp/atomic/memory_order

The compiler should add some barrier that ensures that reads/writes are not reordered.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

BTW @nazar-pc why aren't you just use a channel with a size of 0 and using try_send?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because I didn't see try_send in there, also it is usually for different access patterns. I'd expect it to still produce a warning regardless.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here is a PR implementing exact queue size warning (#13117), but I'd like it to be reviewed by somebody with good understanding of concurrency, atomics, and memory order of operations. If you know who to invite for review, please invite them.

) -> (BufferedLinkSender<B>, BufferedLinkReceiver<B>) {
let (tx, rx) = tracing_unbounded("mpsc_buffered_link", queue_size_warning);
let tx = BufferedLinkSender { tx };
let rx = BufferedLinkReceiver { rx: rx.fuse() };
(tx, rx)
Expand Down Expand Up @@ -175,7 +177,7 @@ mod tests {

#[test]
fn is_closed() {
let (tx, rx) = super::buffered_link::<Block>();
let (tx, rx) = super::buffered_link::<Block>(1);
assert!(!tx.is_closed());
drop(rx);
assert!(tx.is_closed());
Expand Down
2 changes: 1 addition & 1 deletion client/finality-grandpa/rpc/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ where
}

fn subscribe_justifications(&self, mut sink: SubscriptionSink) -> SubscriptionResult {
let stream = self.justification_stream.subscribe().map(
let stream = self.justification_stream.subscribe(100_000).map(
|x: sc_finality_grandpa::GrandpaJustification<Block>| {
JustificationNotification::from(x)
},
Expand Down
2 changes: 1 addition & 1 deletion client/finality-grandpa/src/communication/gossip.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1364,7 +1364,7 @@ impl<Block: BlockT> GossipValidator<Block> {
None => None,
};

let (tx, rx) = tracing_unbounded("mpsc_grandpa_gossip_validator");
let (tx, rx) = tracing_unbounded("mpsc_grandpa_gossip_validator", 100_000);
let val = GossipValidator {
inner: parking_lot::RwLock::new(Inner::new(config)),
set_state,
Expand Down
1 change: 1 addition & 0 deletions client/finality-grandpa/src/communication/periodic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ impl<B: BlockT> NeighborPacketWorker<B> {
pub(super) fn new(rebroadcast_period: Duration) -> (Self, NeighborPacketSender<B>) {
let (tx, rx) = tracing_unbounded::<(Vec<PeerId>, NeighborPacket<NumberFor<B>>)>(
"mpsc_grandpa_neighbor_packet_worker",
100_000,
);
let delay = Delay::new(rebroadcast_period);

Expand Down
4 changes: 2 additions & 2 deletions client/finality-grandpa/src/communication/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,7 @@ impl NetworkEventStream for TestNetwork {
&self,
_name: &'static str,
) -> Pin<Box<dyn Stream<Item = NetworkEvent> + Send>> {
let (tx, rx) = tracing_unbounded("test");
let (tx, rx) = tracing_unbounded("test", 100_000);
let _ = self.sender.unbounded_send(Event::EventStream(tx));
Box::pin(rx)
}
Expand Down Expand Up @@ -253,7 +253,7 @@ fn voter_set_state() -> SharedVoterSetState<Block> {

// needs to run in a tokio runtime.
pub(crate) fn make_test_network() -> (impl Future<Output = Tester>, TestNetwork) {
let (tx, rx) = tracing_unbounded("test");
let (tx, rx) = tracing_unbounded("test", 100_000);
let net = TestNetwork { sender: tx };

#[derive(Clone)]
Expand Down
3 changes: 2 additions & 1 deletion client/finality-grandpa/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -566,7 +566,8 @@ where
}
})?;

let (voter_commands_tx, voter_commands_rx) = tracing_unbounded("mpsc_grandpa_voter_command");
let (voter_commands_tx, voter_commands_rx) =
tracing_unbounded("mpsc_grandpa_voter_command", 100_000);

let (justification_sender, justification_stream) = GrandpaJustificationStream::channel();

Expand Down
2 changes: 1 addition & 1 deletion client/finality-grandpa/src/observer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -437,7 +437,7 @@ mod tests {
aux_schema::load_persistent(&*backend, client.info().genesis_hash, 0, || Ok(voters))
.unwrap();

let (_tx, voter_command_rx) = tracing_unbounded("");
let (_tx, voter_command_rx) = tracing_unbounded("test_mpsc_voter_command", 100_000);

let observer = ObserverWork::new(
client,
Expand Down
8 changes: 4 additions & 4 deletions client/finality-grandpa/src/until_imported.rs
Original file line number Diff line number Diff line change
Expand Up @@ -579,7 +579,7 @@ mod tests {

impl TestChainState {
fn new() -> (Self, ImportNotifications<Block>) {
let (tx, rx) = tracing_unbounded("test");
let (tx, rx) = tracing_unbounded("test", 100_000);
let state =
TestChainState { sender: tx, known_blocks: Arc::new(Mutex::new(HashMap::new())) };

Expand Down Expand Up @@ -680,7 +680,7 @@ mod tests {
// enact all dependencies before importing the message
enact_dependencies(&chain_state);

let (global_tx, global_rx) = tracing_unbounded("test");
let (global_tx, global_rx) = tracing_unbounded("test", 100_000);

let until_imported = UntilGlobalMessageBlocksImported::new(
import_notifications,
Expand Down Expand Up @@ -708,7 +708,7 @@ mod tests {
let (chain_state, import_notifications) = TestChainState::new();
let block_status = chain_state.block_status();

let (global_tx, global_rx) = tracing_unbounded("test");
let (global_tx, global_rx) = tracing_unbounded("test", 100_000);

let until_imported = UntilGlobalMessageBlocksImported::new(
import_notifications,
Expand Down Expand Up @@ -896,7 +896,7 @@ mod tests {
let (chain_state, import_notifications) = TestChainState::new();
let block_status = chain_state.block_status();

let (global_tx, global_rx) = tracing_unbounded("test");
let (global_tx, global_rx) = tracing_unbounded("test", 100_000);

let block_sync_requester = TestBlockSyncRequester::default();

Expand Down
4 changes: 2 additions & 2 deletions client/network/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -208,7 +208,7 @@ where
&params.network_config.transport,
)?;

let (to_worker, from_service) = tracing_unbounded("mpsc_network_worker");
let (to_worker, from_service) = tracing_unbounded("mpsc_network_worker", 100_000);

if let Some(path) = &params.network_config.net_config_path {
fs::create_dir_all(path)?;
Expand Down Expand Up @@ -1003,7 +1003,7 @@ where
H: ExHashT,
{
fn event_stream(&self, name: &'static str) -> Pin<Box<dyn Stream<Item = Event> + Send>> {
let (tx, rx) = out_events::channel(name);
let (tx, rx) = out_events::channel(name, 100_000);
let _ = self.to_worker.unbounded_send(ServiceToWorkerMsg::EventStream(tx));
Box::pin(rx)
}
Expand Down
Loading