Skip to content

Commit

Permalink
fix: fixed read task breaking too early, caused error during proof gen
Browse files Browse the repository at this point in the history
  • Loading branch information
fabian1409 authored and 0xThemis committed Oct 15, 2024
1 parent 4851f11 commit 6a8e829
Show file tree
Hide file tree
Showing 3 changed files with 13 additions and 78 deletions.
2 changes: 1 addition & 1 deletion mpc-core/src/protocols/rep3/network.rs
Original file line number Diff line number Diff line change
Expand Up @@ -216,9 +216,9 @@ pub trait Rep3Network: Send {
#[derive(Debug)]
pub struct Rep3MpcNet {
pub(crate) id: PartyID,
pub(crate) net_handler: Arc<MpcNetworkHandlerWrapper>,
pub(crate) chan_next: ChannelHandle<Bytes, BytesMut>,
pub(crate) chan_prev: ChannelHandle<Bytes, BytesMut>,
pub(crate) net_handler: Arc<MpcNetworkHandlerWrapper>,
}

impl Rep3MpcNet {
Expand Down
2 changes: 1 addition & 1 deletion mpc-core/src/protocols/shamir/network.rs
Original file line number Diff line number Diff line change
Expand Up @@ -77,8 +77,8 @@ pub trait ShamirNetwork: Send {
pub struct ShamirMpcNet {
pub(crate) id: usize, // 0 <= id < num_parties
pub(crate) num_parties: usize,
pub(crate) net_handler: Arc<MpcNetworkHandlerWrapper>,
pub(crate) channels: HashMap<usize, ChannelHandle<Bytes, BytesMut>>,
pub(crate) net_handler: Arc<MpcNetworkHandlerWrapper>,
}

impl ShamirMpcNet {
Expand Down
87 changes: 11 additions & 76 deletions mpc-net/src/channel.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
//! A channel abstraction for sending and receiving messages.
use futures::{Sink, SinkExt, Stream, StreamExt};
use std::{collections::VecDeque, io, marker::Unpin, pin::Pin};
use std::{io, marker::Unpin, pin::Pin};
use tokio::{
io::{AsyncReadExt, AsyncWriteExt},
sync::{mpsc, oneshot},
Expand All @@ -12,8 +12,6 @@ pub type ReadChannel<T, D> = FramedRead<T, D>;
/// A write end of the channel, just a type alias for [`FramedWrite`].
pub type WriteChannel<T, E> = FramedWrite<T, E>;

const READ_BUFFER_SIZE: usize = 16;

/// A channel that uses a [`Encoder`] and [`Decoder`] to send and receive messages.
#[derive(Debug)]
pub struct Channel<R, W, C> {
Expand Down Expand Up @@ -156,64 +154,6 @@ where
MRecv: Send + std::fmt::Debug + 'static,
MSend: Send + std::fmt::Debug + 'static,
{
async fn handle_read_job<R, C>(
job: ReadJob<MRecv>,
buffer: &mut VecDeque<Result<MRecv, io::Error>>,
frame_reader: &mut FramedRead<R, C>,
) where
C: 'static,
R: AsyncReadExt + Unpin + 'static,
FramedRead<R, C>: Stream<Item = Result<MRecv, io::Error>> + Send,
{
//we got a read job - do we have something in buffer?
let frame = if let Some(frame) = buffer.pop_front() {
//send the frame - even if it is an error
//this means the pipe is broken
frame
} else {
//wait for frame
match frame_reader.next().await {
None => Err(io::Error::new(io::ErrorKind::BrokenPipe, "closed pipe")),
Some(res) => res,
}
};
// we don't really care if the receiver is gone, although most of the time this would be a usage error, so at least emit a warning
if job.ret.send(frame).is_err() {
tracing::warn!("Warning: Read Job finished but receiver is gone!");
}
}

async fn handle_read_frame<R, C>(
frame: Option<Result<MRecv, io::Error>>,
buffer: &mut VecDeque<Result<MRecv, io::Error>>,
read_recv: &mut mpsc::Receiver<ReadJob<MRecv>>,
frame_reader: &mut FramedRead<R, C>,
) -> bool
where
C: 'static,
R: AsyncReadExt + Unpin + 'static,
FramedRead<R, C>: Stream<Item = Result<MRecv, io::Error>> + Send,
{
// we did not get a job so far so just put into buffer
// if the frame is None (either because the other party is done, or the connection was closed) we return false and stop read task
if let Some(read_result) = frame {
if buffer.len() >= READ_BUFFER_SIZE {
//wait for a read job as buffer is full
if let Some(read_job) = read_recv.recv().await {
Self::handle_read_job(read_job, buffer, frame_reader).await;
} else {
tracing::warn!(
"[handel_read_frame] still have frames in buffer but channel dropped?"
);
}
}
buffer.push_back(read_result);
true
} else {
false
}
}

/// Create a new [`ChannelHandle`] from a [`Channel`]. This spawns a new tokio task that handles the read and write jobs so they can happen concurrently.
pub fn manage<R, W, C>(chan: Channel<R, W, C>) -> ChannelHandle<MSend, MRecv>
where
Expand All @@ -229,24 +169,19 @@ where
let (mut write, mut read) = chan.split();

tokio::spawn(async move {
let mut buffer = VecDeque::with_capacity(READ_BUFFER_SIZE);
loop {
tokio::select! {
read_job = read_recv.recv() => {
if let Some(read_job) = read_job {
Self::handle_read_job(read_job, &mut buffer, &mut read).await;
} else {
break;
while let Some(frame) = read.next().await {
let job = read_recv.recv().await;
match job {
Some(job) => {
if job.ret.send(frame).is_err() {
tracing::warn!("Warning: Read Job finished but receiver is gone!");
}
}
// is this cancellation safe??? According to tokio::select docs a call to
//futures::stream::StreamExt::next on any Stream is cancellation safe but also
//when using quinn? Should be...
frame = read.next() => {
//if this method returns false we break
if !Self::handle_read_frame(frame, &mut buffer, &mut read_recv, &mut read).await {
break;
None => {
if frame.is_ok() {
tracing::warn!("Warning: received Ok frame but receiver is gone!");
}
break;
}
}
}
Expand Down

0 comments on commit 6a8e829

Please sign in to comment.