From 3b685e8b7fa41cade46584a52c77103c5ba13d46 Mon Sep 17 00:00:00 2001 From: Torben <81830852+Sbcdn@users.noreply.github.com> Date: Tue, 12 Jul 2022 17:41:53 +0200 Subject: [PATCH] feat(miniprotocols): Add Tx-Mempool-Monitoring mini-Protocol (#150) * implement txmonitor mini-protocol * update README * add Protocol Version V11,V12 and function V10_and_above for handshake protocol --- pallas-miniprotocols/README.md | 1 + pallas-miniprotocols/src/handshake/n2c.rs | 17 ++ pallas-miniprotocols/src/lib.rs | 1 + pallas-miniprotocols/src/txmonitor/codec.rs | 172 +++++++++++++++ pallas-miniprotocols/src/txmonitor/mod.rs | 220 ++++++++++++++++++++ 5 files changed, 411 insertions(+) create mode 100644 pallas-miniprotocols/src/txmonitor/codec.rs create mode 100644 pallas-miniprotocols/src/txmonitor/mod.rs diff --git a/pallas-miniprotocols/README.md b/pallas-miniprotocols/README.md index 4e0495a0..c3696db6 100644 --- a/pallas-miniprotocols/README.md +++ b/pallas-miniprotocols/README.md @@ -19,6 +19,7 @@ The following architectural decisions were made for this particular Rust impleme | handshake | done | planned | | local-state | done | planned | | tx-submission | planned | minimal | +| local tx monitor | done | planned | | local-tx-submission | ongoing | planned | ## Implementation Details diff --git a/pallas-miniprotocols/src/handshake/n2c.rs b/pallas-miniprotocols/src/handshake/n2c.rs index c3efc598..0c781365 100644 --- a/pallas-miniprotocols/src/handshake/n2c.rs +++ b/pallas-miniprotocols/src/handshake/n2c.rs @@ -16,6 +16,8 @@ const PROTOCOL_V7: u64 = 32775; const PROTOCOL_V8: u64 = 32776; const PROTOCOL_V9: u64 = 32777; const PROTOCOL_V10: u64 = 32778; +const PROTOCOL_V11: u64 = 32779; +const PROTOCOL_V12: u64 = 32780; impl VersionTable { pub fn v1_and_above(network_magic: u64) -> VersionTable { @@ -30,6 +32,8 @@ impl VersionTable { (PROTOCOL_V8, VersionData(network_magic)), (PROTOCOL_V9, VersionData(network_magic)), (PROTOCOL_V10, VersionData(network_magic)), + (PROTOCOL_V11, VersionData(network_magic)), + (PROTOCOL_V12, VersionData(network_magic)), ] .into_iter() .collect::>(); @@ -44,6 +48,19 @@ impl VersionTable { VersionTable { values } } + + pub fn v10_and_above(network_magic: u64) -> VersionTable { + let values = vec![ + (PROTOCOL_V10, VersionData(network_magic)), + (PROTOCOL_V11, VersionData(network_magic)), + (PROTOCOL_V12, VersionData(network_magic)), + + ] + .into_iter() + .collect::>(); + + VersionTable { values } + } } #[derive(Debug, Clone)] diff --git a/pallas-miniprotocols/src/lib.rs b/pallas-miniprotocols/src/lib.rs index 67706c3f..83cdeeea 100644 --- a/pallas-miniprotocols/src/lib.rs +++ b/pallas-miniprotocols/src/lib.rs @@ -6,6 +6,7 @@ pub mod chainsync; pub mod handshake; pub mod localstate; pub mod txsubmission; +pub mod txmonitor; pub use common::*; pub use machines::*; diff --git a/pallas-miniprotocols/src/txmonitor/codec.rs b/pallas-miniprotocols/src/txmonitor/codec.rs new file mode 100644 index 00000000..259a9c1b --- /dev/null +++ b/pallas-miniprotocols/src/txmonitor/codec.rs @@ -0,0 +1,172 @@ +use super::{Message, MsgRequest, MsgResponse, MempoolSizeAndCapacity}; +use pallas_codec::minicbor::{decode, encode, Decode, Encode, Encoder}; + +impl Encode<()> for Message + { + + fn encode( + &self, + e: &mut Encoder, + ctx: &mut (), + ) -> Result<(), encode::Error> { + match self { + Message::MsgDone => { + e.array(1)?.u16(0)?; + }, + Message::MsgAcquire => { + e.array(1)?.u16(1)?; + + }, + Message::MsgAcquired(slot) => { + e.array(2)?.u16(2)?; + e.encode(slot)?; + }, + Message::MsgQuery(query) => { + query.encode(e, ctx)?; + }, + Message::MsgResponse(response) => { + response.encode(e, ctx)?; + } + } + log::debug!("encode message: {:?}",self); + Ok(()) + } +} + +impl<'b> Decode<'b,()> for Message { +fn decode(d: &mut pallas_codec::minicbor::Decoder<'b>, _ctx: &mut ()) -> Result { + d.array()?; + let label = d.u16()?; + log::debug!("decode message: {:?}",label); + match label { + 0 => { + Ok(Message::MsgDone) + }, + 1 => { + Ok(Message::MsgAcquire) + }, + 2 => { + let slot = d.decode()?; + Ok(Message::MsgAcquired(slot)) + }, + 3 => { + Ok(Message::MsgQuery(MsgRequest::MsgRelease)) + }, + 5 => { + Ok(Message::MsgQuery(MsgRequest::MsgNextTx)) + }, + 6 => { + log::trace!("Decoding 6, 1. Array: {:?}",d); + let de : Result,pallas_codec::minicbor::decode::Error> = d.array(); + log::trace!("Decoding 6, 2. Array: {:?}",de); + let tag : Result = d.u8(); + let mut tx = None; + if let Ok(_) = tag { + log::trace!("Decoding 6, Tag: {:?}",tag); + let det = d.tag(); + log::trace!("Decoding 6, Bytes: {:?}",det); + let cbor = d.bytes()?; + tx = Some(hex::encode(cbor)); + log::trace!("Decoding 6, Tx: {:?}",tx); + + } + Ok(Message::MsgResponse(MsgResponse::MsgReplyNextTx(tx))) + }, + 7 => { + let txid = d.decode()?; + Ok(Message::MsgQuery(MsgRequest::MsgHasTx(txid))) + } + 8 => { + let has = d.decode()?; + Ok(Message::MsgResponse(MsgResponse::MsgReplyHasTx(has))) + } + 9 => { + Ok(Message::MsgQuery(MsgRequest::MsgGetSizes)) + } + 10 => { + d.array()?; + let capacity = d.decode()?; + let size_in_bytes = d.decode()?; + let number_of_tx = d.decode()?; + Ok( + Message::MsgResponse(MsgResponse::MsgReplyGetSizes(MempoolSizeAndCapacity { + capacity_in_bytes : capacity, + size_in_bytes : size_in_bytes, + number_of_txs : number_of_tx, + })) + ) + } + _ => Err(decode::Error::message( + "can't decode Message", + )) + } +} + +fn nil() -> Option { + None + } +} + +impl Encode<()> for MsgRequest { + fn encode( + &self, + e: &mut Encoder, + _ctx: &mut (), + ) -> Result<(), encode::Error> { + match self { + MsgRequest::MsgAwaitAcquire => { + e.array(1)?.u16(1)?; + }, + MsgRequest::MsgGetSizes => { + e.array(1)?.u16(9)?; + }, + MsgRequest::MsgHasTx(tx) => { + e.array(2)?.u16(7)?; + e.encode(tx)?; + }, + MsgRequest::MsgNextTx => { + e.array(1)?.u16(5)?; + }, + MsgRequest::MsgRelease => { + e.array(1)?.u16(3)?; + }, + + } + log::debug!("encode message: {:?}",self); + Ok(()) + } +} + +impl Encode<()> for MsgResponse { + fn encode( + &self, + e: &mut Encoder, + _ctx: &mut (), + ) -> Result<(), encode::Error> { + match self { + MsgResponse::MsgReplyGetSizes(sz) => { + e.array(2)?.u16(10)?; + e.array(3)?; + e.encode(sz.capacity_in_bytes)?; + e.encode(sz.size_in_bytes)?; + e.encode(sz.number_of_txs)?; + }, + MsgResponse::MsgReplyHasTx(tx) => { + e.array(2)?.u16(8)?; + e.encode(tx)?; + }, + MsgResponse::MsgReplyNextTx(None) => { + e.array(1)?.u16(6)?; + }, + MsgResponse::MsgReplyNextTx(Some(tx)) => { + e.array(2)?.u16(6)?; + e.encode(tx.to_string())?; + }, + } + Ok(()) + } + + fn is_nil(&self) -> bool { + false + } +} diff --git a/pallas-miniprotocols/src/txmonitor/mod.rs b/pallas-miniprotocols/src/txmonitor/mod.rs new file mode 100644 index 00000000..82a14505 --- /dev/null +++ b/pallas-miniprotocols/src/txmonitor/mod.rs @@ -0,0 +1,220 @@ +mod codec; + +use std::{fmt::{Debug}}; +use pallas_codec::Fragment; +use crate::machines::{Agent, MachineError, Transition}; + +type Slot = u64; +type TxId = String; +type Tx = String; + +#[derive(Debug, PartialEq, Clone)] +pub enum StBusyKind { + NextTx, + HasTx, + GetSizes, +} + +#[derive(Debug, PartialEq, Clone)] +pub enum State { + StIdle, + StAcquiring, + StAcquired, + StBusy(StBusyKind), + StDone, +} + +#[derive(Debug, PartialEq, Clone)] +pub struct MempoolSizeAndCapacity { + pub capacity_in_bytes : u32, + pub size_in_bytes : u32, + pub number_of_txs : u32, +} + +#[derive(Debug, Clone)] +pub enum Message { + MsgAcquire, + MsgAcquired(Slot), + MsgQuery(MsgRequest), + MsgResponse(MsgResponse), + MsgDone, +} + + +#[derive(Debug, Clone)] +pub enum MsgRequest { + MsgAwaitAcquire, + MsgNextTx, + MsgHasTx(TxId), + MsgGetSizes, + MsgRelease, +} +#[derive(Debug, Clone)] +pub enum MsgResponse { + MsgReplyNextTx(Option), + MsgReplyHasTx(bool), + MsgReplyGetSizes(MempoolSizeAndCapacity), +} + +#[derive(Debug, Clone)] +pub struct LocalTxMonitor { + pub state : State, + pub snapshot : Option, + pub request : Option, + pub output : Option, +} + +impl LocalTxMonitor + where + Message : Fragment, +{ + + pub fn initial(state : State) -> Self { + Self { + state : state, + snapshot : None, + request : None, + output : None, + } + } + + fn on_acquired(self, s: Slot) -> Transition { + log::debug!("acquired Slot: '{:?}' ",s); + + Ok(Self { + state : State::StAcquired, + snapshot : Some(s), + output : None, + ..self + }) + } + + fn on_reply_next_tx(self, tx: Option) -> Transition { + log::debug!("Next Transaction: {:?}", tx); + Ok(Self { + output: Some(MsgResponse::MsgReplyNextTx(tx)), + ..self + }) + + } + + fn on_reply_has_tx(self, arg: bool) -> Transition { + log::debug!("Mempool has transaction: {:?}", arg); + Ok(Self { + output: Some(MsgResponse::MsgReplyHasTx(arg)), + ..self + }) + } + + fn on_reply_get_size(self, msc: MempoolSizeAndCapacity) -> Transition { + log::debug!("Mempool Status: {:?}", msc); + + Ok(Self { + output: Some(MsgResponse::MsgReplyGetSizes(msc)), + ..self + }) + } +} + +impl Agent for LocalTxMonitor + where + Message: Fragment, { + type Message = Message; + type State = State; + + fn state(&self) -> &Self::State { + log::debug!("State: {:?}", &self.state); + &self.state + } + + fn is_done(&self) -> bool { + + let done = self.state == State::StDone; + log::debug!("is_done: {:?}",done); + done + } + + fn has_agency(&self) -> bool{ + log::trace!("Hase Agency: State: {:?}, Request: {:?}, Response: {:?}",self.state,self.request,self.output); + match &self.state { + State::StIdle => true, + State::StAcquiring => false, + State::StAcquired => true, + State::StBusy(..) => false, + State::StDone => false, + } + } + + fn build_next(&self) -> Self::Message { + log::debug!("build next; State: {:?}, request: {:?}, output: {:?}",&self.state, &self.request, &self.output); + match (&self.state, &self.request, &self.output) { + (State::StIdle, None ,None) => Message::MsgAcquire, + (State::StAcquired, None , None) => Message::MsgAcquire, + (State::StAcquired, Some(MsgRequest::MsgAwaitAcquire), None) => Message::MsgAcquire, + (State::StAcquired, Some(MsgRequest::MsgNextTx),None) => Message::MsgQuery(MsgRequest::MsgNextTx), + (State::StAcquired, Some(MsgRequest::MsgHasTx(tx)),None) => Message::MsgQuery(MsgRequest::MsgHasTx(tx.clone())), + (State::StAcquired, Some(MsgRequest::MsgGetSizes),None) => Message::MsgQuery(MsgRequest::MsgGetSizes), + (State::StAcquired, None, Some(_)) => Message::MsgAcquire, + (State::StAcquired, Some(req), Some(_)) => Message::MsgQuery(req.to_owned()), + _ => panic!("I do not have agency, don't know what to do") + } + } + + fn apply_start(self) -> Transition { + log::debug!("apply start"); + Ok(self) + } + + fn apply_outbound(self, msg: Self::Message) -> Transition { + log::debug!("apply outbound"); + match (self.state, msg) { + + (State::StIdle, Message::MsgAcquire) => { + log::debug!("apply outbound : MsgAcquire"); + Ok(Self { + state: State::StAcquiring, + ..self + })}, + (State::StAcquired, Message::MsgQuery(MsgRequest::MsgNextTx)) => Ok(Self { + state: State::StBusy(StBusyKind::NextTx), + ..self + }), + (State::StAcquired, Message::MsgQuery(MsgRequest::MsgHasTx(_))) => Ok(Self { + state: State::StBusy(StBusyKind::HasTx), + ..self + }), + + (State::StAcquired, Message::MsgQuery(MsgRequest::MsgGetSizes)) => Ok(Self { + state: State::StBusy(StBusyKind::GetSizes), + ..self + }), + (State::StAcquired, Message::MsgAcquire) => Ok(Self { + state: State::StAcquiring, + ..self + }), + (State::StAcquired, Message::MsgQuery(MsgRequest::MsgRelease)) => Ok(Self { + state: State::StIdle, + ..self + }), + (State::StIdle, Message::MsgDone) => Ok(Self { + state: State::StDone, + ..self + }), + + _ => panic!("PANIC! Cannot match outbound") + } + } + + fn apply_inbound(self, msg: Self::Message) -> Transition { + log::debug!("apply inbound"); + match (&self.state , msg) { + (State::StAcquiring, Message::MsgAcquired(s)) => self.on_acquired(s), + (State::StBusy(StBusyKind::NextTx), Message::MsgResponse(MsgResponse::MsgReplyNextTx(tx))) => self.on_reply_next_tx(tx), + (State::StBusy(StBusyKind::HasTx), Message::MsgResponse(MsgResponse::MsgReplyHasTx(arg))) => self.on_reply_has_tx(arg), + (State::StBusy(StBusyKind::GetSizes), Message::MsgResponse(MsgResponse::MsgReplyGetSizes(msc))) => self.on_reply_get_size(msc), + (state, msg) => Err(MachineError::invalid_msg::(&state, &msg)), + + } + } + +} \ No newline at end of file