Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: handle networking bug #907

Merged
merged 26 commits into from
Nov 21, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
53 changes: 44 additions & 9 deletions signer/src/context/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,17 +5,17 @@ mod signer_context;
mod signer_state;
mod termination;

use futures::stream::SelectAll;
use tokio_stream::wrappers::BroadcastStream;
use tokio::sync::broadcast::error::RecvError;
use tokio_stream::wrappers::ReceiverStream;

use crate::bitcoin::BitcoinInteract;
use crate::config::Settings;
use crate::emily_client::EmilyInteract;
use crate::error::Error;
use crate::network::MessageTransfer;
use crate::stacks::api::StacksInteract;
use crate::storage::DbRead;
use crate::storage::DbWrite;
use crate::SIGNER_CHANNEL_CAPACITY;

pub use messaging::*;
pub use signer_context::SignerContext;
Expand Down Expand Up @@ -58,14 +58,49 @@ pub trait Context: Clone + Sync + Send {
/// later return `Some(_)`. But if [`StreamExt::next`] yields `None`
/// three times then the stream is "fused" and will return `None`
/// forever after.
fn as_signal_stream<M>(&self, network: &M) -> SelectAll<BroadcastStream<SignerSignal>>
fn as_signal_stream<F>(&self, predicate: F) -> ReceiverStream<SignerSignal>
where
M: MessageTransfer,
F: Fn(&SignerSignal) -> bool + Send + Sync + 'static,
{
let term = self.get_termination_handle().as_stream();
let signal_stream = BroadcastStream::new(self.get_signal_receiver());
let network_stream = network.receiver_stream();
let (sender, receiver) = tokio::sync::mpsc::channel(SIGNER_CHANNEL_CAPACITY);

futures::stream::select_all([term, signal_stream, network_stream])
let mut watch_receiver = self.get_termination_handle();
let mut signal_stream = self.get_signal_receiver();

tokio::spawn(async move {
loop {
tokio::select! {
_ = watch_receiver.wait_for_shutdown() => {
let signal = SignerSignal::Command(SignerCommand::Shutdown);
// An error means that the channel has been closed.
// This is most likely due to the receiver being
// closed so we can bail.
if sender.send(signal).await.is_err() {
break;
}
}
item = signal_stream.recv() => {
match item {
Ok(signal) if predicate(&signal) => {
// See comment above, we can bail.
if sender.send(signal).await.is_err() {
break;
}
}
Ok(_) => continue,
Err(RecvError::Closed) => {
tracing::warn!("internal signal stream closed");
break;
}
Err(error @ RecvError::Lagged(_)) => {
tracing::warn!(%error, "internal signal stream lagging");
continue
}
}
}
}
}
});
ReceiverStream::new(receiver)
}
}
3 changes: 2 additions & 1 deletion signer/src/context/signer_context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ use crate::{
error::Error,
stacks::api::StacksInteract,
storage::{DbRead, DbWrite},
SIGNER_CHANNEL_CAPACITY,
};

use super::{Context, SignerSignal, SignerState, TerminationHandle};
Expand Down Expand Up @@ -83,7 +84,7 @@ where
// TODO: Decide on the channel capacity and how we should handle slow consumers.
// NOTE: Ideally consumers which require processing time should pull the relevent
// messages into a local VecDequeue and process them in their own time.
let (signal_tx, _) = tokio::sync::broadcast::channel(1024);
let (signal_tx, _) = tokio::sync::broadcast::channel(SIGNER_CHANNEL_CAPACITY);
let (term_tx, _) = tokio::sync::watch::channel(false);

Self {
Expand Down
21 changes: 0 additions & 21 deletions signer/src/context/termination.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,5 @@
//! Module that contains termination-related code for the [`Context`].

use tokio_stream::wrappers::BroadcastStream;

use super::SignerCommand;
use super::SignerSignal;

/// Handle to the termination signal. This can be used to signal the application
/// to shutdown or to wait for a shutdown signal.
pub struct TerminationHandle(
Expand Down Expand Up @@ -61,20 +56,4 @@ impl TerminationHandle {
}
}
}

/// Get a stream for the shutdown signal
pub fn as_stream(&self) -> BroadcastStream<SignerSignal> {
let (sender, receiver) = tokio::sync::broadcast::channel(5);
let mut watch_receiver = self.0.subscribe();
tokio::spawn(async move {
loop {
let signal = SignerSignal::Command(SignerCommand::Shutdown);
let _ = watch_receiver.changed().await;
if *watch_receiver.borrow_and_update() {
let _ = sender.send(signal);
}
}
});
BroadcastStream::new(receiver)
}
}
4 changes: 2 additions & 2 deletions signer/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -493,8 +493,8 @@ pub enum Error {
CoordinatorTimeout(u64),

/// Wsts state machine returned unexpected operation result
#[error("unexpected operation result")]
UnexpectedOperationResult,
#[error("unexpected operation result: {0:?}")]
UnexpectedOperationResult(Box<wsts::state_machine::OperationResult>),

/// The smart contract has already been deployed
#[error("smart contract already deployed, contract name: {0}")]
Expand Down
4 changes: 4 additions & 0 deletions signer/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,3 +63,7 @@ const MAX_KEYS: u16 = 128;
/// it were spendable on block 1002, then the signers WOULD attempt to sweep
/// the deposit.
pub const DEPOSIT_LOCKTIME_BLOCK_BUFFER: u16 = 3;

/// This is the capacity of the channel used for messages sent within the
/// signer.
pub const SIGNER_CHANNEL_CAPACITY: usize = 1024;
15 changes: 9 additions & 6 deletions signer/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -195,10 +195,13 @@ async fn run_libp2p_swarm(ctx: impl Context) -> Result<(), Error> {

// Build the swarm.
tracing::debug!("building the libp2p swarm");
let mut swarm = SignerSwarmBuilder::new(&ctx.config().signer.private_key)
.add_listen_endpoints(&ctx.config().signer.p2p.listen_on)
.add_seed_addrs(&ctx.config().signer.p2p.seeds)
.build()?;
let config = ctx.config();
let mut swarm =
SignerSwarmBuilder::new(&config.signer.private_key, config.signer.p2p.enable_mdns)
.add_listen_endpoints(&ctx.config().signer.p2p.listen_on)
.add_seed_addrs(&ctx.config().signer.p2p.seeds)
.add_external_addresses(&ctx.config().signer.p2p.public_endpoints)
.build()?;

// Start the libp2p swarm. This will run until either the shutdown signal is
// received, or an unrecoverable error has occurred.
Expand Down Expand Up @@ -295,9 +298,9 @@ async fn run_transaction_coordinator(ctx: impl Context) -> Result<(), Error> {
context: ctx,
context_window: 10000,
private_key,
signing_round_max_duration: Duration::from_secs(10),
signing_round_max_duration: Duration::from_secs(15),
threshold: 2,
dkg_max_duration: Duration::from_secs(10),
dkg_max_duration: Duration::from_secs(15),
sbtc_contracts_deployed: false,
is_epoch3: false,
};
Expand Down
30 changes: 2 additions & 28 deletions signer/src/network/in_memory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,9 @@ use std::{
sync::{atomic::AtomicU16, Arc},
};

use tokio::sync::{broadcast, Mutex};
use tokio_stream::wrappers::BroadcastStream;
use tokio::sync::broadcast;
use tokio::sync::Mutex;

use crate::context::P2PEvent;
use crate::context::SignerSignal;
use crate::error::Error;

const BROADCAST_CHANNEL_CAPACITY: usize = 10_000;
Expand Down Expand Up @@ -105,30 +103,6 @@ impl super::MessageTransfer for MpmcBroadcaster {

Ok(msg)
}

fn receiver_stream(&self) -> BroadcastStream<SignerSignal> {
let (sender, receiver) = tokio::sync::broadcast::channel(1000);
let mut signal_rx = self.receiver.resubscribe();
let recently_sent = self.recently_sent.clone();
tokio::spawn(async move {
loop {
match signal_rx.recv().await {
Ok(mut msg) => {
while Some(&msg.id()) == recently_sent.lock().await.front() {
recently_sent.lock().await.pop_front();
msg = signal_rx.recv().await.map_err(Error::ChannelReceive)?;
}
let _ = sender.send(P2PEvent::MessageReceived(msg).into());
}
Err(error) => {
tracing::error!(%error, "got a receive error");
return Err::<(), _>(Error::SignerShutdown);
}
}
}
});
BroadcastStream::new(receiver)
}
}

#[cfg(test)]
Expand Down
Loading
Loading