From 6a8e82913b88414ee05a7159fbd390a32db70b9d Mon Sep 17 00:00:00 2001 From: Fabian Gruber Date: Mon, 14 Oct 2024 17:26:44 +0200 Subject: [PATCH] fix: fixed read task breaking too early, caused error during proof gen --- mpc-core/src/protocols/rep3/network.rs | 2 +- mpc-core/src/protocols/shamir/network.rs | 2 +- mpc-net/src/channel.rs | 87 +++--------------------- 3 files changed, 13 insertions(+), 78 deletions(-) diff --git a/mpc-core/src/protocols/rep3/network.rs b/mpc-core/src/protocols/rep3/network.rs index c20b0a217..ea4d3c689 100644 --- a/mpc-core/src/protocols/rep3/network.rs +++ b/mpc-core/src/protocols/rep3/network.rs @@ -216,9 +216,9 @@ pub trait Rep3Network: Send { #[derive(Debug)] pub struct Rep3MpcNet { pub(crate) id: PartyID, - pub(crate) net_handler: Arc, pub(crate) chan_next: ChannelHandle, pub(crate) chan_prev: ChannelHandle, + pub(crate) net_handler: Arc, } impl Rep3MpcNet { diff --git a/mpc-core/src/protocols/shamir/network.rs b/mpc-core/src/protocols/shamir/network.rs index 38ec1d8f3..89a686e47 100644 --- a/mpc-core/src/protocols/shamir/network.rs +++ b/mpc-core/src/protocols/shamir/network.rs @@ -77,8 +77,8 @@ pub trait ShamirNetwork: Send { pub struct ShamirMpcNet { pub(crate) id: usize, // 0 <= id < num_parties pub(crate) num_parties: usize, - pub(crate) net_handler: Arc, pub(crate) channels: HashMap>, + pub(crate) net_handler: Arc, } impl ShamirMpcNet { diff --git a/mpc-net/src/channel.rs b/mpc-net/src/channel.rs index baf2bdf7b..a26a22755 100644 --- a/mpc-net/src/channel.rs +++ b/mpc-net/src/channel.rs @@ -1,6 +1,6 @@ //! A channel abstraction for sending and receiving messages. use futures::{Sink, SinkExt, Stream, StreamExt}; -use std::{collections::VecDeque, io, marker::Unpin, pin::Pin}; +use std::{io, marker::Unpin, pin::Pin}; use tokio::{ io::{AsyncReadExt, AsyncWriteExt}, sync::{mpsc, oneshot}, @@ -12,8 +12,6 @@ pub type ReadChannel = FramedRead; /// A write end of the channel, just a type alias for [`FramedWrite`]. pub type WriteChannel = FramedWrite; -const READ_BUFFER_SIZE: usize = 16; - /// A channel that uses a [`Encoder`] and [`Decoder`] to send and receive messages. #[derive(Debug)] pub struct Channel { @@ -156,64 +154,6 @@ where MRecv: Send + std::fmt::Debug + 'static, MSend: Send + std::fmt::Debug + 'static, { - async fn handle_read_job( - job: ReadJob, - buffer: &mut VecDeque>, - frame_reader: &mut FramedRead, - ) where - C: 'static, - R: AsyncReadExt + Unpin + 'static, - FramedRead: Stream> + Send, - { - //we got a read job - do we have something in buffer? - let frame = if let Some(frame) = buffer.pop_front() { - //send the frame - even if it is an error - //this means the pipe is broken - frame - } else { - //wait for frame - match frame_reader.next().await { - None => Err(io::Error::new(io::ErrorKind::BrokenPipe, "closed pipe")), - Some(res) => res, - } - }; - // we don't really care if the receiver is gone, although most of the time this would be a usage error, so at least emit a warning - if job.ret.send(frame).is_err() { - tracing::warn!("Warning: Read Job finished but receiver is gone!"); - } - } - - async fn handle_read_frame( - frame: Option>, - buffer: &mut VecDeque>, - read_recv: &mut mpsc::Receiver>, - frame_reader: &mut FramedRead, - ) -> bool - where - C: 'static, - R: AsyncReadExt + Unpin + 'static, - FramedRead: Stream> + Send, - { - // we did not get a job so far so just put into buffer - // if the frame is None (either because the other party is done, or the connection was closed) we return false and stop read task - if let Some(read_result) = frame { - if buffer.len() >= READ_BUFFER_SIZE { - //wait for a read job as buffer is full - if let Some(read_job) = read_recv.recv().await { - Self::handle_read_job(read_job, buffer, frame_reader).await; - } else { - tracing::warn!( - "[handel_read_frame] still have frames in buffer but channel dropped?" - ); - } - } - buffer.push_back(read_result); - true - } else { - false - } - } - /// Create a new [`ChannelHandle`] from a [`Channel`]. This spawns a new tokio task that handles the read and write jobs so they can happen concurrently. pub fn manage(chan: Channel) -> ChannelHandle where @@ -229,24 +169,19 @@ where let (mut write, mut read) = chan.split(); tokio::spawn(async move { - let mut buffer = VecDeque::with_capacity(READ_BUFFER_SIZE); - loop { - tokio::select! { - read_job = read_recv.recv() => { - if let Some(read_job) = read_job { - Self::handle_read_job(read_job, &mut buffer, &mut read).await; - } else { - break; + while let Some(frame) = read.next().await { + let job = read_recv.recv().await; + match job { + Some(job) => { + if job.ret.send(frame).is_err() { + tracing::warn!("Warning: Read Job finished but receiver is gone!"); } } - // is this cancellation safe??? According to tokio::select docs a call to - //futures::stream::StreamExt::next on any Stream is cancellation safe but also - //when using quinn? Should be... - frame = read.next() => { - //if this method returns false we break - if !Self::handle_read_frame(frame, &mut buffer, &mut read_recv, &mut read).await { - break; + None => { + if frame.is_ok() { + tracing::warn!("Warning: received Ok frame but receiver is gone!"); } + break; } } }