diff --git a/rs/p2p/artifact_manager/src/lib.rs b/rs/p2p/artifact_manager/src/lib.rs index 1d8d4c4225c..313e5d715e3 100644 --- a/rs/p2p/artifact_manager/src/lib.rs +++ b/rs/p2p/artifact_manager/src/lib.rs @@ -279,8 +279,7 @@ pub fn create_ingress_handlers< metrics_registry: MetricsRegistry, ) -> Box { let client = IngressProcessor::new(ingress_pool.clone(), ingress_handler); - let inbound_rx_stream = - tokio_stream::wrappers::UnboundedReceiverStream::new(channel.inbound_rx); + let inbound_rx_stream = tokio_stream::wrappers::ReceiverStream::new(channel.inbound_rx); let user_ingress_rx_stream = tokio_stream::wrappers::UnboundedReceiverStream::new(user_ingress_rx); run_artifact_processor( @@ -307,8 +306,7 @@ pub fn create_artifact_handler< ) -> Box { let inital_artifacts: Vec<_> = pool.read().unwrap().get_all_for_broadcast().collect(); let client = Processor::new(pool, change_set_producer); - let inbound_rx_stream = - tokio_stream::wrappers::UnboundedReceiverStream::new(channel.inbound_rx); + let inbound_rx_stream = tokio_stream::wrappers::ReceiverStream::new(channel.inbound_rx); run_artifact_processor( time_source.clone(), metrics_registry, @@ -446,7 +444,7 @@ mod tests { use ic_types::artifact::UnvalidatedArtifactMutation; use std::{convert::Infallible, sync::Arc}; use tokio::sync::mpsc::channel; - use tokio_stream::wrappers::{ReceiverStream, UnboundedReceiverStream}; + use tokio_stream::wrappers::ReceiverStream; use crate::{run_artifact_processor, ArtifactProcessor}; @@ -534,11 +532,10 @@ mod tests { let time_source = Arc::new(SysTimeSource::new()); let (send_tx, mut send_rx) = tokio::sync::mpsc::channel(100); - #[allow(clippy::disallowed_methods)] - let (_, inbound_rx) = tokio::sync::mpsc::unbounded_channel(); + let (_, inbound_rx) = tokio::sync::mpsc::channel(100); run_artifact_processor::< DummyArtifact, - UnboundedReceiverStream>, + ReceiverStream>, >( time_source, MetricsRegistry::default(), diff --git a/rs/p2p/consensus_manager/src/lib.rs b/rs/p2p/consensus_manager/src/lib.rs index 4adc37f4c8c..cfe97522e68 100644 --- a/rs/p2p/consensus_manager/src/lib.rs +++ b/rs/p2p/consensus_manager/src/lib.rs @@ -16,7 +16,7 @@ use phantom_newtype::AmountOf; use tokio::{ runtime::Handle, sync::{ - mpsc::{Receiver, Sender, UnboundedReceiver, UnboundedSender}, + mpsc::{Receiver, Sender}, watch, }, }; @@ -29,10 +29,12 @@ type StartConsensusManagerFn = Box, watch::Receiver) -> Vec>; /// Same order of magnitude as the number of active artifacts. -const MAX_OUTBOUND_CHANNEL_SIZE: usize = 100_000; +/// Please note that we put fairly big number mainly for perfomance reasons so either side of a channel doesn't await. +/// The replica code should be designed in such a way that if we put a channel of size 1, the protocol should still work. +const MAX_IO_CHANNEL_SIZE: usize = 100_000; pub type AbortableBroadcastSender = Sender>; -pub type AbortableBroadcastReceiver = UnboundedReceiver>; +pub type AbortableBroadcastReceiver = Receiver>; pub struct AbortableBroadcastChannel { pub outbound_tx: AbortableBroadcastSender, @@ -69,15 +71,8 @@ impl AbortableBroadcastChannelBuilder { (assembler, assembler_router): (F, Router), slot_limit: usize, ) -> AbortableBroadcastChannel { - let (outbound_tx, outbound_rx) = tokio::sync::mpsc::channel(MAX_OUTBOUND_CHANNEL_SIZE); - // Making this channel bounded can be problematic since we don't have true multiplexing - // of P2P messages. - // Possible scenario is - adverts+chunks arrive on the same channel, slow consensus - // will result on slow consuption of chunks. Slow consumption of chunks will in turn - // result in slower consumptions of adverts. Ideally adverts are consumed at rate - // independent of consensus. - #[allow(clippy::disallowed_methods)] - let (inbound_tx, inbound_rx) = tokio::sync::mpsc::unbounded_channel(); + let (outbound_tx, outbound_rx) = tokio::sync::mpsc::channel(MAX_IO_CHANNEL_SIZE); + let (inbound_tx, inbound_rx) = tokio::sync::mpsc::channel(MAX_IO_CHANNEL_SIZE); assert!(uri_prefix::() .chars() @@ -143,7 +138,7 @@ fn start_consensus_manager( outbound_transmits: Receiver>, // Slot updates received from peers slot_updates_rx: Receiver<(SlotUpdate, NodeId, ConnId)>, - sender: UnboundedSender>, + sender: Sender>, assembler: Assembler, transport: Arc, topology_watcher: watch::Receiver, diff --git a/rs/p2p/consensus_manager/src/receiver.rs b/rs/p2p/consensus_manager/src/receiver.rs index 9714613fcb0..444e47e983f 100644 --- a/rs/p2p/consensus_manager/src/receiver.rs +++ b/rs/p2p/consensus_manager/src/receiver.rs @@ -1,5 +1,3 @@ -#![allow(clippy::disallowed_methods)] - use std::collections::{hash_map::Entry, HashMap, HashSet}; use crate::{ @@ -28,7 +26,7 @@ use tokio::{ runtime::Handle, select, sync::{ - mpsc::{Receiver, Sender, UnboundedSender}, + mpsc::{Receiver, Sender}, watch, }, task::JoinSet, @@ -181,7 +179,8 @@ pub(crate) struct ConsensusManagerReceiver< // Receive side: slot_updates_rx: Receiver<(SlotUpdate, NodeId, ConnId)>, - sender: UnboundedSender>, + sender: Sender>, + artifact_assembler: Assembler, slot_table: HashMap>>, @@ -207,7 +206,7 @@ where rt_handle: Handle, slot_updates_rx: Receiver<(SlotUpdate, NodeId, ConnId)>, artifact_assembler: Assembler, - sender: UnboundedSender>, + sender: Sender>, topology_watcher: watch::Receiver, slot_limit: usize, ) -> Shutdown { @@ -449,7 +448,7 @@ where // Only first peer for specific artifact ID is considered for push artifact: Option<(WireArtifact, NodeId)>, mut peer_rx: watch::Receiver, - sender: UnboundedSender>, + sender: Sender>, artifact_assembler: Assembler, metrics: ConsensusManagerMetrics, cancellation_token: CancellationToken, @@ -480,8 +479,11 @@ where match assemble_result { AssembleResult::Done { message, peer_id } => { let id = message.id(); - // Send artifact to pool - if sender.send(UnvalidatedArtifactMutation::Insert((message, peer_id))).is_err() { + // Sends artifact to the pool. In theory this channel can get full if there is a bug in consensus and each round takes very long time. + // However, the duration of this await is not IO-bound so for the time being it is fine that sending over the channel is not done as + // part of a select. + if sender.send(UnvalidatedArtifactMutation::Insert((message, peer_id))).await.is_err() { + error!(log, "The receiving side of the channel, owned by the consensus thread, was closed. This should be infallible situation since a cancellation token should be received. If this happens then most likely there is very subnet synchonization bug."); } @@ -489,8 +491,10 @@ where // TODO: NET-1774 let _ = peer_rx.wait_for(|p| p.is_empty()).await; - // Purge from the unvalidated pool - if sender.send(UnvalidatedArtifactMutation::Remove(id)).is_err() { + // Purge artifact from the unvalidated pool. In theory this channel can get full if there is a bug in consensus and each round takes very long time. + // However, the duration of this await is not IO-bound so for the time being it is fine that sending over the channel is not done as + // part of a select. + if sender.send(UnvalidatedArtifactMutation::Remove(id)).await.is_err() { error!(log, "The receiving side of the channel, owned by the consensus thread, was closed. This should be infallible situation since a cancellation token should be received. If this happens then most likely there is very subnet synchonization bug."); } metrics @@ -596,7 +600,7 @@ mod tests { use ic_test_utilities_logger::with_test_replica_logger; use ic_types::{artifact::IdentifiableArtifact, RegistryVersion}; use ic_types_test_utils::ids::{NODE_1, NODE_2}; - use tokio::{sync::mpsc::UnboundedReceiver, time::timeout}; + use tokio::time::timeout; use tower::util::ServiceExt; use super::*; @@ -606,7 +610,7 @@ mod tests { struct ReceiverManagerBuilder { // Slot updates received from peers slot_updates_rx: Receiver<(SlotUpdate, NodeId, ConnId)>, - sender: UnboundedSender>, + sender: Sender>, artifact_assembler: MockArtifactAssembler, topology_watcher: watch::Receiver, slot_limit: usize, @@ -618,7 +622,7 @@ mod tests { ConsensusManagerReceiver; struct Channels { - unvalidated_artifact_receiver: UnboundedReceiver>, + unvalidated_artifact_receiver: Receiver>, } impl ReceiverManagerBuilder { @@ -634,8 +638,7 @@ mod tests { fn new() -> Self { let (_, slot_updates_rx) = tokio::sync::mpsc::channel(100); - #[allow(clippy::disallowed_methods)] - let (sender, unvalidated_artifact_receiver) = tokio::sync::mpsc::unbounded_channel(); + let (sender, unvalidated_artifact_receiver) = tokio::sync::mpsc::channel(1000); let (_, topology_watcher) = watch::channel(SubnetTopology::default()); let artifact_assembler = Self::make_mock_artifact_assembler_with_clone(MockArtifactAssembler::default); diff --git a/rs/p2p/test_utils/src/turmoil.rs b/rs/p2p/test_utils/src/turmoil.rs index 512e2dbf042..0621ec2e483 100644 --- a/rs/p2p/test_utils/src/turmoil.rs +++ b/rs/p2p/test_utils/src/turmoil.rs @@ -36,7 +36,7 @@ use tokio::{ select, sync::{mpsc, oneshot, watch, Notify}, }; -use tokio_stream::wrappers::UnboundedReceiverStream; +use tokio_stream::wrappers::ReceiverStream; use turmoil::Sim; pub struct CustomUdp { @@ -443,16 +443,13 @@ pub fn waiter_fut( #[allow(clippy::type_complexity)] pub fn start_test_processor( outbound_tx: mpsc::Sender>, - inbound_rx: mpsc::UnboundedReceiver>, + inbound_rx: mpsc::Receiver>, pool: Arc>>, change_set_producer: TestConsensus, ) -> Box { let time_source = Arc::new(SysTimeSource::new()); let client = ic_artifact_manager::Processor::new(pool, change_set_producer); - run_artifact_processor::< - U64Artifact, - UnboundedReceiverStream>, - >( + run_artifact_processor::>>( time_source, MetricsRegistry::default(), Box::new(client),