diff --git a/examples/n2n-miniprotocols/src/main.rs b/examples/n2n-miniprotocols/src/main.rs index 44a34d03..86c2cbb6 100644 --- a/examples/n2n-miniprotocols/src/main.rs +++ b/examples/n2n-miniprotocols/src/main.rs @@ -19,7 +19,7 @@ impl chainsync::Observer for LoggingObserver { _content: chainsync::HeaderContent, tip: &chainsync::Tip, ) -> Result> { - log::debug!("asked to roll forward, tip at {:?}", tip); + log::info!("asked to roll forward, tip at {:?}", tip); Ok(chainsync::Continuation::Proceed) } @@ -96,7 +96,7 @@ fn do_chainsync(mut channel: ChannelBuffer) { fn main() { env_logger::builder() - .filter_level(log::LevelFilter::Trace) + .filter_level(log::LevelFilter::Info) .init(); // setup a TCP socket to act as data bearer between our agents and the remote diff --git a/pallas-multiplexer/src/agents.rs b/pallas-multiplexer/src/agents.rs index a00590e9..7ab7fe06 100644 --- a/pallas-multiplexer/src/agents.rs +++ b/pallas-multiplexer/src/agents.rs @@ -116,3 +116,63 @@ impl From for ChannelBuffer { ChannelBuffer::new(channel) } } + +#[cfg(test)] +mod tests { + use std::collections::VecDeque; + + use super::*; + + impl Channel for VecDeque { + fn enqueue_chunk(&mut self, chunk: Payload) -> Result<(), ChannelError> { + self.push_back(chunk); + Ok(()) + } + + fn dequeue_chunk(&mut self) -> Result { + let chunk = self.pop_front().ok_or(ChannelError::NotConnected(None))?; + Ok(chunk) + } + } + + #[test] + fn multiple_messages_in_same_payload() { + let mut input = Vec::new(); + let in_part1 = (1u8, 2u8, 3u8); + let in_part2 = (6u8, 5u8, 4u8); + + minicbor::encode(in_part1, &mut input).unwrap(); + minicbor::encode(in_part2, &mut input).unwrap(); + + let mut channel = VecDeque::::new(); + channel.push_back(input); + + let mut buf = ChannelBuffer::new(channel); + + let out_part1 = buf.recv_full_msg::<(u8, u8, u8)>().unwrap(); + let out_part2 = buf.recv_full_msg::<(u8, u8, u8)>().unwrap(); + + assert_eq!(in_part1, out_part1); + assert_eq!(in_part2, out_part2); + } + + #[test] + fn fragmented_message_in_multiple_payloads() { + let mut input = Vec::new(); + let msg = (11u8, 12u8, 13u8, 14u8, 15u8, 16u8, 17u8); + minicbor::encode(msg, &mut input).unwrap(); + + let mut channel = VecDeque::::new(); + + while !input.is_empty() { + let chunk = Vec::from(input.drain(0..2).as_slice()); + channel.push_back(chunk); + } + + let mut buf = ChannelBuffer::new(channel); + + let out_msg = buf.recv_full_msg::<(u8, u8, u8, u8, u8, u8, u8)>().unwrap(); + + assert_eq!(msg, out_msg); + } +} diff --git a/pallas-multiplexer/src/lib.rs b/pallas-multiplexer/src/lib.rs index 009fe046..7d150c5f 100644 --- a/pallas-multiplexer/src/lib.rs +++ b/pallas-multiplexer/src/lib.rs @@ -3,8 +3,6 @@ pub mod bearers; pub mod demux; pub mod mux; -use bearers::Bearer; - #[cfg(feature = "std")] mod std; @@ -13,29 +11,4 @@ pub use crate::std::*; pub type Payload = Vec; -pub struct Multiplexer -where - I: mux::Ingress, - E: demux::Egress, -{ - pub muxer: mux::Muxer, - pub demuxer: demux::Demuxer, -} - -impl Multiplexer -where - I: mux::Ingress, - E: demux::Egress, -{ - pub fn new(bearer: Bearer) -> Self { - Multiplexer { - muxer: mux::Muxer::new(bearer.clone()), - demuxer: demux::Demuxer::new(bearer), - } - } - - pub fn register_channel(&mut self, protocol: u16, ingress: I, egress: E) { - self.muxer.register(protocol, ingress); - self.demuxer.register(protocol, egress); - } -} +pub type Message = (u16, Payload); diff --git a/pallas-multiplexer/src/mux.rs b/pallas-multiplexer/src/mux.rs index 854703d8..3d805883 100644 --- a/pallas-multiplexer/src/mux.rs +++ b/pallas-multiplexer/src/mux.rs @@ -1,11 +1,8 @@ -use std::{collections::HashMap, time::Instant}; - -use rand::seq::SliceRandom; -use rand::thread_rng; +use std::time::{Duration, Instant}; use crate::{ bearers::{Bearer, Segment}, - Payload, + Message, }; pub enum IngressError { @@ -18,20 +15,19 @@ pub enum IngressError { /// To be implemented by any mechanism that allows to submit a payloads from a /// particular protocol that need to be muxed by the multiplexer. pub trait Ingress { - fn try_recv(&mut self) -> Result; + fn recv_timeout(&mut self, duration: Duration) -> Result; } -type Message = (u16, Payload); - pub enum TickOutcome { BearerError(std::io::Error), + IngressDisconnected, Idle, Busy, } pub struct Muxer { bearer: Bearer, - ingress: HashMap, + ingress: I, clock: Instant, } @@ -39,58 +35,17 @@ impl Muxer where I: Ingress, { - pub fn new(bearer: Bearer) -> Self { + pub fn new(bearer: Bearer, ingress: I) -> Self { Self { bearer, - ingress: Default::default(), + ingress, clock: Instant::now(), } } - /// Register the receiver end of an ingress channel - pub fn register(&mut self, id: u16, rx: I) { - self.ingress.insert(id, rx); - } - - /// Remove a protocol from the ingress - /// - /// Meant to be used after a receive error in a previous tick - pub fn deregister(&mut self, id: u16) { - self.ingress.remove(&id); - } - - #[inline] - fn randomize_ids(&self) -> Vec { - let mut rng = thread_rng(); - let mut keys: Vec<_> = self.ingress.keys().cloned().collect(); - keys.shuffle(&mut rng); - keys - } - - /// Select the next segment to be muxed - /// - /// This method iterates over the existing receivers checking for the first - /// available message. The order of the checks is random to ensure a fair - /// use of the multiplexer amongst all protocols. - pub fn select(&mut self) -> Option { - for id in self.randomize_ids() { - let rx = self.ingress.get_mut(&id).unwrap(); - - match rx.try_recv() { - Ok(payload) => return Some((id, payload)), - Err(IngressError::Disconnected) => { - self.deregister(id); - } - _ => (), - }; - } - - None - } - pub fn tick(&mut self) -> TickOutcome { - match self.select() { - Some((id, payload)) => { + match self.ingress.recv_timeout(Duration::from_millis(500)) { + Ok((id, payload)) => { let segment = Segment::new(self.clock, id, payload); match self.bearer.write_segment(segment) { @@ -98,7 +53,8 @@ where _ => TickOutcome::Busy, } } - None => TickOutcome::Idle, + Err(IngressError::Empty) => TickOutcome::Idle, + Err(IngressError::Disconnected) => TickOutcome::IngressDisconnected, } } } diff --git a/pallas-multiplexer/src/std.rs b/pallas-multiplexer/src/std.rs index d60a0424..f8a93a30 100644 --- a/pallas-multiplexer/src/std.rs +++ b/pallas-multiplexer/src/std.rs @@ -1,26 +1,27 @@ use crate::{ agents::{self, ChannelBuffer}, - demux, mux, Payload, + bearers::Bearer, + demux, mux, Message, Payload, }; use std::{ sync::{ atomic::{AtomicBool, Ordering}, - mpsc::{channel, Receiver, SendError, Sender, TryRecvError}, + mpsc::{channel, Receiver, RecvTimeoutError, SendError, Sender}, Arc, }, thread::{spawn, JoinHandle}, time::Duration, }; -pub type StdIngress = Receiver; +pub type StdIngress = Receiver; impl mux::Ingress for StdIngress { - fn try_recv(&mut self) -> Result { - match Receiver::try_recv(self) { + fn recv_timeout(&mut self, duration: Duration) -> Result { + match Receiver::recv_timeout(self, duration) { Ok(x) => Ok(x), - Err(TryRecvError::Disconnected) => Err(mux::IngressError::Disconnected), - Err(TryRecvError::Empty) => Err(mux::IngressError::Empty), + Err(RecvTimeoutError::Disconnected) => Err(mux::IngressError::Disconnected), + Err(RecvTimeoutError::Timeout) => Err(mux::IngressError::Empty), } } } @@ -36,16 +37,30 @@ impl demux::Egress for StdEgress { } } -pub type StdPlexer = crate::Multiplexer; +pub struct StdPlexer { + pub muxer: mux::Muxer, + pub demuxer: demux::Demuxer, + pub mux_tx: Sender, +} impl StdPlexer { + pub fn new(bearer: Bearer) -> Self { + let (mux_tx, mux_rx) = channel::(); + + Self { + muxer: mux::Muxer::new(bearer.clone(), mux_rx), + demuxer: demux::Demuxer::new(bearer), + mux_tx, + } + } + pub fn use_channel(&mut self, protocol: u16) -> StdChannel { let (demux_tx, demux_rx) = channel::(); - let (mux_tx, mux_rx) = channel::(); + self.demuxer.register(protocol, demux_tx); - self.register_channel(protocol, mux_rx, demux_tx); + let mux_tx = self.mux_tx.clone(); - (mux_tx, demux_rx) + (protocol, mux_tx, demux_rx) } } @@ -56,12 +71,10 @@ impl mux::Muxer { mux::TickOutcome::BearerError(err) => return Err(err), mux::TickOutcome::Idle => match cancel.is_set() { true => break Ok(()), - false => { - // TODO: investigate why std::thread::yield_now() hogs the thread - std::thread::sleep(Duration::from_millis(100)) - } + false => (), }, mux::TickOutcome::Busy => (), + mux::TickOutcome::IngressDisconnected => break Ok(()), } } } @@ -104,20 +117,20 @@ impl demux::Demuxer { } } -pub type StdChannel = (Sender, Receiver); +pub type StdChannel = (u16, Sender, Receiver); pub type StdChannelBuffer = ChannelBuffer; impl agents::Channel for StdChannel { fn enqueue_chunk(&mut self, payload: Payload) -> Result<(), agents::ChannelError> { - match self.0.send(payload) { + match self.1.send((self.0, payload)) { Ok(_) => Ok(()), - Err(SendError(payload)) => Err(agents::ChannelError::NotConnected(Some(payload))), + Err(SendError((_, payload))) => Err(agents::ChannelError::NotConnected(Some(payload))), } } fn dequeue_chunk(&mut self) -> Result { - match self.1.recv() { + match self.2.recv() { Ok(payload) => Ok(payload), Err(_) => Err(agents::ChannelError::NotConnected(None)), } diff --git a/pallas-multiplexer/tests/integration.rs b/pallas-multiplexer/tests/integration.rs index ecbec2de..133db91d 100644 --- a/pallas-multiplexer/tests/integration.rs +++ b/pallas-multiplexer/tests/integration.rs @@ -8,7 +8,7 @@ use pallas_codec::minicbor; use pallas_multiplexer::{ agents::{Channel, ChannelBuffer}, bearers::Bearer, - StdPlexer, + Payload, StdPlexer, }; use rand::{distributions::Uniform, Rng}; @@ -62,44 +62,3 @@ fn one_way_small_sequence_of_payloads() { assert_eq!(payload, received_payload); } } - -#[test] -fn multiple_messages_in_same_payload() { - let mut input = Vec::new(); - let in_part1 = (1u8, 2u8, 3u8); - let in_part2 = (6u8, 5u8, 4u8); - - minicbor::encode(in_part1, &mut input).unwrap(); - minicbor::encode(in_part2, &mut input).unwrap(); - - let channel = std::sync::mpsc::channel(); - channel.0.send(input).unwrap(); - - let mut buf = ChannelBuffer::new(channel); - - let out_part1 = buf.recv_full_msg::<(u8, u8, u8)>().unwrap(); - let out_part2 = buf.recv_full_msg::<(u8, u8, u8)>().unwrap(); - - assert_eq!(in_part1, out_part1); - assert_eq!(in_part2, out_part2); -} - -#[test] -fn fragmented_message_in_multiple_payloads() { - let mut input = Vec::new(); - let msg = (11u8, 12u8, 13u8, 14u8, 15u8, 16u8, 17u8); - minicbor::encode(msg, &mut input).unwrap(); - - let channel = std::sync::mpsc::channel(); - - while !input.is_empty() { - let chunk = Vec::from(input.drain(0..2).as_slice()); - channel.0.send(chunk).unwrap(); - } - - let mut buf = ChannelBuffer::new(channel); - - let out_msg = buf.recv_full_msg::<(u8, u8, u8, u8, u8, u8, u8)>().unwrap(); - - assert_eq!(msg, out_msg); -}