Skip to content

Commit

Permalink
fix: introduce backpressure from consensus to the networking layer by…
Browse files Browse the repository at this point in the history
… using bounded channels (#2340)
  • Loading branch information
rumenov authored Jan 31, 2025
1 parent 4161756 commit eb4a6d5
Show file tree
Hide file tree
Showing 4 changed files with 34 additions and 42 deletions.
13 changes: 5 additions & 8 deletions rs/p2p/artifact_manager/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -279,8 +279,7 @@ pub fn create_ingress_handlers<
metrics_registry: MetricsRegistry,
) -> Box<dyn JoinGuard> {
let client = IngressProcessor::new(ingress_pool.clone(), ingress_handler);
let inbound_rx_stream =
tokio_stream::wrappers::UnboundedReceiverStream::new(channel.inbound_rx);
let inbound_rx_stream = tokio_stream::wrappers::ReceiverStream::new(channel.inbound_rx);
let user_ingress_rx_stream =
tokio_stream::wrappers::UnboundedReceiverStream::new(user_ingress_rx);
run_artifact_processor(
Expand All @@ -307,8 +306,7 @@ pub fn create_artifact_handler<
) -> Box<dyn JoinGuard> {
let inital_artifacts: Vec<_> = pool.read().unwrap().get_all_for_broadcast().collect();
let client = Processor::new(pool, change_set_producer);
let inbound_rx_stream =
tokio_stream::wrappers::UnboundedReceiverStream::new(channel.inbound_rx);
let inbound_rx_stream = tokio_stream::wrappers::ReceiverStream::new(channel.inbound_rx);
run_artifact_processor(
time_source.clone(),
metrics_registry,
Expand Down Expand Up @@ -446,7 +444,7 @@ mod tests {
use ic_types::artifact::UnvalidatedArtifactMutation;
use std::{convert::Infallible, sync::Arc};
use tokio::sync::mpsc::channel;
use tokio_stream::wrappers::{ReceiverStream, UnboundedReceiverStream};
use tokio_stream::wrappers::ReceiverStream;

use crate::{run_artifact_processor, ArtifactProcessor};

Expand Down Expand Up @@ -534,11 +532,10 @@ mod tests {

let time_source = Arc::new(SysTimeSource::new());
let (send_tx, mut send_rx) = tokio::sync::mpsc::channel(100);
#[allow(clippy::disallowed_methods)]
let (_, inbound_rx) = tokio::sync::mpsc::unbounded_channel();
let (_, inbound_rx) = tokio::sync::mpsc::channel(100);
run_artifact_processor::<
DummyArtifact,
UnboundedReceiverStream<UnvalidatedArtifactMutation<DummyArtifact>>,
ReceiverStream<UnvalidatedArtifactMutation<DummyArtifact>>,
>(
time_source,
MetricsRegistry::default(),
Expand Down
21 changes: 8 additions & 13 deletions rs/p2p/consensus_manager/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ use phantom_newtype::AmountOf;
use tokio::{
runtime::Handle,
sync::{
mpsc::{Receiver, Sender, UnboundedReceiver, UnboundedSender},
mpsc::{Receiver, Sender},
watch,
},
};
Expand All @@ -29,10 +29,12 @@ type StartConsensusManagerFn =
Box<dyn FnOnce(Arc<dyn Transport>, watch::Receiver<SubnetTopology>) -> Vec<Shutdown>>;

/// Same order of magnitude as the number of active artifacts.
const MAX_OUTBOUND_CHANNEL_SIZE: usize = 100_000;
/// Please note that we put fairly big number mainly for perfomance reasons so either side of a channel doesn't await.
/// The replica code should be designed in such a way that if we put a channel of size 1, the protocol should still work.
const MAX_IO_CHANNEL_SIZE: usize = 100_000;

pub type AbortableBroadcastSender<T> = Sender<ArtifactTransmit<T>>;
pub type AbortableBroadcastReceiver<T> = UnboundedReceiver<UnvalidatedArtifactMutation<T>>;
pub type AbortableBroadcastReceiver<T> = Receiver<UnvalidatedArtifactMutation<T>>;

pub struct AbortableBroadcastChannel<T: IdentifiableArtifact> {
pub outbound_tx: AbortableBroadcastSender<T>,
Expand Down Expand Up @@ -69,15 +71,8 @@ impl AbortableBroadcastChannelBuilder {
(assembler, assembler_router): (F, Router),
slot_limit: usize,
) -> AbortableBroadcastChannel<Artifact> {
let (outbound_tx, outbound_rx) = tokio::sync::mpsc::channel(MAX_OUTBOUND_CHANNEL_SIZE);
// Making this channel bounded can be problematic since we don't have true multiplexing
// of P2P messages.
// Possible scenario is - adverts+chunks arrive on the same channel, slow consensus
// will result on slow consuption of chunks. Slow consumption of chunks will in turn
// result in slower consumptions of adverts. Ideally adverts are consumed at rate
// independent of consensus.
#[allow(clippy::disallowed_methods)]
let (inbound_tx, inbound_rx) = tokio::sync::mpsc::unbounded_channel();
let (outbound_tx, outbound_rx) = tokio::sync::mpsc::channel(MAX_IO_CHANNEL_SIZE);
let (inbound_tx, inbound_rx) = tokio::sync::mpsc::channel(MAX_IO_CHANNEL_SIZE);

assert!(uri_prefix::<WireArtifact>()
.chars()
Expand Down Expand Up @@ -143,7 +138,7 @@ fn start_consensus_manager<Artifact, WireArtifact, Assembler>(
outbound_transmits: Receiver<ArtifactTransmit<Artifact>>,
// Slot updates received from peers
slot_updates_rx: Receiver<(SlotUpdate<WireArtifact>, NodeId, ConnId)>,
sender: UnboundedSender<UnvalidatedArtifactMutation<Artifact>>,
sender: Sender<UnvalidatedArtifactMutation<Artifact>>,
assembler: Assembler,
transport: Arc<dyn Transport>,
topology_watcher: watch::Receiver<SubnetTopology>,
Expand Down
33 changes: 18 additions & 15 deletions rs/p2p/consensus_manager/src/receiver.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@
#![allow(clippy::disallowed_methods)]

use std::collections::{hash_map::Entry, HashMap, HashSet};

use crate::{
Expand Down Expand Up @@ -28,7 +26,7 @@ use tokio::{
runtime::Handle,
select,
sync::{
mpsc::{Receiver, Sender, UnboundedSender},
mpsc::{Receiver, Sender},
watch,
},
task::JoinSet,
Expand Down Expand Up @@ -181,7 +179,8 @@ pub(crate) struct ConsensusManagerReceiver<

// Receive side:
slot_updates_rx: Receiver<(SlotUpdate<WireArtifact>, NodeId, ConnId)>,
sender: UnboundedSender<UnvalidatedArtifactMutation<Artifact>>,
sender: Sender<UnvalidatedArtifactMutation<Artifact>>,

artifact_assembler: Assembler,

slot_table: HashMap<NodeId, HashMap<SlotNumber, SlotEntry<WireArtifact::Id>>>,
Expand All @@ -207,7 +206,7 @@ where
rt_handle: Handle,
slot_updates_rx: Receiver<(SlotUpdate<WireArtifact>, NodeId, ConnId)>,
artifact_assembler: Assembler,
sender: UnboundedSender<UnvalidatedArtifactMutation<Artifact>>,
sender: Sender<UnvalidatedArtifactMutation<Artifact>>,
topology_watcher: watch::Receiver<SubnetTopology>,
slot_limit: usize,
) -> Shutdown {
Expand Down Expand Up @@ -449,7 +448,7 @@ where
// Only first peer for specific artifact ID is considered for push
artifact: Option<(WireArtifact, NodeId)>,
mut peer_rx: watch::Receiver<PeerCounter>,
sender: UnboundedSender<UnvalidatedArtifactMutation<Artifact>>,
sender: Sender<UnvalidatedArtifactMutation<Artifact>>,
artifact_assembler: Assembler,
metrics: ConsensusManagerMetrics,
cancellation_token: CancellationToken,
Expand Down Expand Up @@ -480,17 +479,22 @@ where
match assemble_result {
AssembleResult::Done { message, peer_id } => {
let id = message.id();
// Send artifact to pool
if sender.send(UnvalidatedArtifactMutation::Insert((message, peer_id))).is_err() {
// Sends artifact to the pool. In theory this channel can get full if there is a bug in consensus and each round takes very long time.
// However, the duration of this await is not IO-bound so for the time being it is fine that sending over the channel is not done as
// part of a select.
if sender.send(UnvalidatedArtifactMutation::Insert((message, peer_id))).await.is_err() {

error!(log, "The receiving side of the channel, owned by the consensus thread, was closed. This should be infallible situation since a cancellation token should be received. If this happens then most likely there is very subnet synchonization bug.");
}

// wait for deletion from peers
// TODO: NET-1774
let _ = peer_rx.wait_for(|p| p.is_empty()).await;

// Purge from the unvalidated pool
if sender.send(UnvalidatedArtifactMutation::Remove(id)).is_err() {
// Purge artifact from the unvalidated pool. In theory this channel can get full if there is a bug in consensus and each round takes very long time.
// However, the duration of this await is not IO-bound so for the time being it is fine that sending over the channel is not done as
// part of a select.
if sender.send(UnvalidatedArtifactMutation::Remove(id)).await.is_err() {
error!(log, "The receiving side of the channel, owned by the consensus thread, was closed. This should be infallible situation since a cancellation token should be received. If this happens then most likely there is very subnet synchonization bug.");
}
metrics
Expand Down Expand Up @@ -596,7 +600,7 @@ mod tests {
use ic_test_utilities_logger::with_test_replica_logger;
use ic_types::{artifact::IdentifiableArtifact, RegistryVersion};
use ic_types_test_utils::ids::{NODE_1, NODE_2};
use tokio::{sync::mpsc::UnboundedReceiver, time::timeout};
use tokio::time::timeout;
use tower::util::ServiceExt;

use super::*;
Expand All @@ -606,7 +610,7 @@ mod tests {
struct ReceiverManagerBuilder {
// Slot updates received from peers
slot_updates_rx: Receiver<(SlotUpdate<U64Artifact>, NodeId, ConnId)>,
sender: UnboundedSender<UnvalidatedArtifactMutation<U64Artifact>>,
sender: Sender<UnvalidatedArtifactMutation<U64Artifact>>,
artifact_assembler: MockArtifactAssembler,
topology_watcher: watch::Receiver<SubnetTopology>,
slot_limit: usize,
Expand All @@ -618,7 +622,7 @@ mod tests {
ConsensusManagerReceiver<U64Artifact, U64Artifact, MockArtifactAssembler>;

struct Channels {
unvalidated_artifact_receiver: UnboundedReceiver<UnvalidatedArtifactMutation<U64Artifact>>,
unvalidated_artifact_receiver: Receiver<UnvalidatedArtifactMutation<U64Artifact>>,
}

impl ReceiverManagerBuilder {
Expand All @@ -634,8 +638,7 @@ mod tests {

fn new() -> Self {
let (_, slot_updates_rx) = tokio::sync::mpsc::channel(100);
#[allow(clippy::disallowed_methods)]
let (sender, unvalidated_artifact_receiver) = tokio::sync::mpsc::unbounded_channel();
let (sender, unvalidated_artifact_receiver) = tokio::sync::mpsc::channel(1000);
let (_, topology_watcher) = watch::channel(SubnetTopology::default());
let artifact_assembler =
Self::make_mock_artifact_assembler_with_clone(MockArtifactAssembler::default);
Expand Down
9 changes: 3 additions & 6 deletions rs/p2p/test_utils/src/turmoil.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ use tokio::{
select,
sync::{mpsc, oneshot, watch, Notify},
};
use tokio_stream::wrappers::UnboundedReceiverStream;
use tokio_stream::wrappers::ReceiverStream;
use turmoil::Sim;

pub struct CustomUdp {
Expand Down Expand Up @@ -443,16 +443,13 @@ pub fn waiter_fut(
#[allow(clippy::type_complexity)]
pub fn start_test_processor(
outbound_tx: mpsc::Sender<ArtifactTransmit<U64Artifact>>,
inbound_rx: mpsc::UnboundedReceiver<UnvalidatedArtifactMutation<U64Artifact>>,
inbound_rx: mpsc::Receiver<UnvalidatedArtifactMutation<U64Artifact>>,
pool: Arc<RwLock<TestConsensus<U64Artifact>>>,
change_set_producer: TestConsensus<U64Artifact>,
) -> Box<dyn JoinGuard> {
let time_source = Arc::new(SysTimeSource::new());
let client = ic_artifact_manager::Processor::new(pool, change_set_producer);
run_artifact_processor::<
U64Artifact,
UnboundedReceiverStream<UnvalidatedArtifactMutation<U64Artifact>>,
>(
run_artifact_processor::<U64Artifact, ReceiverStream<UnvalidatedArtifactMutation<U64Artifact>>>(
time_source,
MetricsRegistry::default(),
Box::new(client),
Expand Down

0 comments on commit eb4a6d5

Please sign in to comment.