From 0fb9a521d8b4b2161c282650a2749738a55df46e Mon Sep 17 00:00:00 2001 From: Santiago Carmuega Date: Thu, 28 Apr 2022 09:29:16 -0300 Subject: [PATCH 1/2] chore: Move miniprotocol examples to custom crate --- Cargo.toml | 2 + examples/n2c-miniprotocols/.gitignore | 4 + examples/n2c-miniprotocols/Cargo.toml | 16 +++ examples/n2c-miniprotocols/src/main.rs | 110 +++++++++++++++ examples/n2n-miniprotocols/.gitignore | 4 + examples/n2n-miniprotocols/Cargo.toml | 15 ++ examples/n2n-miniprotocols/src/main.rs | 130 ++++++++++++++++++ pallas-miniprotocols/Cargo.toml | 4 - pallas-miniprotocols/examples/blockfetch.rs | 42 ------ .../examples/chainsync-blocks.rs | 31 ----- .../examples/chainsync-headers.rs | 40 ------ .../examples/handshake-client.rs | 24 ---- .../examples/handshake-node.rs | 24 ---- .../examples/localstate-chainpoint.rs | 30 ---- .../examples/txsubmission-naive.rs | 30 ---- 15 files changed, 281 insertions(+), 225 deletions(-) create mode 100644 examples/n2c-miniprotocols/.gitignore create mode 100644 examples/n2c-miniprotocols/Cargo.toml create mode 100644 examples/n2c-miniprotocols/src/main.rs create mode 100644 examples/n2n-miniprotocols/.gitignore create mode 100644 examples/n2n-miniprotocols/Cargo.toml create mode 100644 examples/n2n-miniprotocols/src/main.rs delete mode 100644 pallas-miniprotocols/examples/blockfetch.rs delete mode 100644 pallas-miniprotocols/examples/chainsync-blocks.rs delete mode 100644 pallas-miniprotocols/examples/chainsync-headers.rs delete mode 100644 pallas-miniprotocols/examples/handshake-client.rs delete mode 100644 pallas-miniprotocols/examples/handshake-node.rs delete mode 100644 pallas-miniprotocols/examples/localstate-chainpoint.rs delete mode 100644 pallas-miniprotocols/examples/txsubmission-naive.rs diff --git a/Cargo.toml b/Cargo.toml index 9666332e..3dcc34a6 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -9,4 +9,6 @@ members = [ "pallas", "examples/block-download", "examples/block-decode", + "examples/n2n-miniprotocols", + "examples/n2c-miniprotocols", ] diff --git a/examples/n2c-miniprotocols/.gitignore b/examples/n2c-miniprotocols/.gitignore new file mode 100644 index 00000000..2e01a474 --- /dev/null +++ b/examples/n2c-miniprotocols/.gitignore @@ -0,0 +1,4 @@ +/target + +scratchpad +.DS_Store \ No newline at end of file diff --git a/examples/n2c-miniprotocols/Cargo.toml b/examples/n2c-miniprotocols/Cargo.toml new file mode 100644 index 00000000..283a6207 --- /dev/null +++ b/examples/n2c-miniprotocols/Cargo.toml @@ -0,0 +1,16 @@ +[package] +name = "n2c-miniprotocols" +version = "0.1.0" +edition = "2021" +publish = false + +# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html + +[dependencies] +pallas = { path = "../../pallas" } +net2 = "0.2.37" +env_logger = "0.9.0" +hex = "0.4.3" +log = "0.4.16" + + diff --git a/examples/n2c-miniprotocols/src/main.rs b/examples/n2c-miniprotocols/src/main.rs new file mode 100644 index 00000000..26033aad --- /dev/null +++ b/examples/n2c-miniprotocols/src/main.rs @@ -0,0 +1,110 @@ +use log; + +use pallas::network::{ + miniprotocols::{chainsync, handshake, localstate, run_agent, Point, MAINNET_MAGIC}, + multiplexer::Multiplexer, +}; + +use std::os::unix::net::UnixStream; + +#[derive(Debug)] +struct LoggingObserver; + +impl chainsync::Observer for LoggingObserver { + fn on_roll_forward( + &mut self, + _content: chainsync::HeaderContent, + tip: &chainsync::Tip, + ) -> Result> { + log::debug!("asked to roll forward, tip at {:?}", tip); + + Ok(chainsync::Continuation::Proceed) + } + + fn on_intersect_found( + &mut self, + point: &Point, + tip: &chainsync::Tip, + ) -> Result> { + log::debug!("intersect was found {:?} (tip: {:?})", point, tip); + + Ok(chainsync::Continuation::Proceed) + } + + fn on_rollback( + &mut self, + point: &Point, + ) -> Result> { + log::debug!("asked to roll back {:?}", point); + + Ok(chainsync::Continuation::Proceed) + } + + fn on_tip_reached(&mut self) -> Result> { + log::debug!("tip was reached"); + + Ok(chainsync::Continuation::Proceed) + } +} + +fn do_handshake(muxer: &mut Multiplexer) { + let mut channel = muxer.use_channel(0); + let versions = handshake::n2c::VersionTable::v1_and_above(MAINNET_MAGIC); + let _last = run_agent(handshake::Initiator::initial(versions), &mut channel).unwrap(); +} + +fn do_localstate_query(muxer: &mut Multiplexer) { + let mut channel = muxer.use_channel(7); + + let agent = run_agent( + localstate::OneShotClient::::initial( + None, + localstate::queries::RequestV10::GetChainPoint, + ), + &mut channel, + ); + + log::info!("state query result: {:?}", agent); +} + +fn do_chainsync(muxer: &mut Multiplexer) { + let mut channel = muxer.use_channel(5); + + let known_points = vec![Point::Specific( + 43847831u64, + hex::decode("15b9eeee849dd6386d3770b0745e0450190f7560e5159b1b3ab13b14b2684a45").unwrap(), + )]; + + let agent = run_agent( + chainsync::Consumer::::initial( + Some(known_points), + LoggingObserver {}, + ), + &mut channel, + ); + + println!("{:?}", agent); +} + +fn main() { + env_logger::builder() + .filter_level(log::LevelFilter::Trace) + .init(); + + // we connect to the unix socket of the local node. Make sure you have the right + // path for your environment + let bearer = UnixStream::connect("/tmp/node.socket").unwrap(); + + // setup the multiplexer by specifying the bearer and the IDs of the + // miniprotocols to use + let mut muxer = Multiplexer::setup(bearer, &[0, 4, 5]).unwrap(); + + // execute the required handshake against the relay + do_handshake(&mut muxer); + + // execute an arbitrary "Local State" query against the node + do_localstate_query(&mut muxer); + + // execute the chainsync flow from an arbitrary point in the chain + do_chainsync(&mut muxer); +} diff --git a/examples/n2n-miniprotocols/.gitignore b/examples/n2n-miniprotocols/.gitignore new file mode 100644 index 00000000..2e01a474 --- /dev/null +++ b/examples/n2n-miniprotocols/.gitignore @@ -0,0 +1,4 @@ +/target + +scratchpad +.DS_Store \ No newline at end of file diff --git a/examples/n2n-miniprotocols/Cargo.toml b/examples/n2n-miniprotocols/Cargo.toml new file mode 100644 index 00000000..4aebe0f6 --- /dev/null +++ b/examples/n2n-miniprotocols/Cargo.toml @@ -0,0 +1,15 @@ +[package] +name = "n2n-miniprotocols" +version = "0.1.0" +edition = "2021" +publish = false + +# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html + +[dependencies] +pallas = { path = "../../pallas" } +net2 = "0.2.37" +env_logger = "0.9.0" +hex = "0.4.3" +log = "0.4.16" + diff --git a/examples/n2n-miniprotocols/src/main.rs b/examples/n2n-miniprotocols/src/main.rs new file mode 100644 index 00000000..5b61b38f --- /dev/null +++ b/examples/n2n-miniprotocols/src/main.rs @@ -0,0 +1,130 @@ +use log; +use net2::TcpStreamExt; + +use pallas::network::{ + miniprotocols::{blockfetch, chainsync, handshake, run_agent, Point, MAINNET_MAGIC}, + multiplexer::Multiplexer, +}; + +use std::net::TcpStream; + +#[derive(Debug)] +struct LoggingObserver; + +impl blockfetch::Observer for LoggingObserver { + fn on_block_received(&mut self, body: Vec) -> Result<(), Box> { + log::trace!("block received: {}", hex::encode(&body)); + Ok(()) + } +} + +impl chainsync::Observer for LoggingObserver { + fn on_roll_forward( + &mut self, + _content: chainsync::HeaderContent, + tip: &chainsync::Tip, + ) -> Result> { + log::debug!("asked to roll forward, tip at {:?}", tip); + + Ok(chainsync::Continuation::Proceed) + } + + fn on_intersect_found( + &mut self, + point: &Point, + tip: &chainsync::Tip, + ) -> Result> { + log::debug!("intersect was found {:?} (tip: {:?})", point, tip); + + Ok(chainsync::Continuation::Proceed) + } + + fn on_rollback( + &mut self, + point: &Point, + ) -> Result> { + log::debug!("asked to roll back {:?}", point); + + Ok(chainsync::Continuation::Proceed) + } + + fn on_tip_reached(&mut self) -> Result> { + log::debug!("tip was reached"); + + Ok(chainsync::Continuation::Proceed) + } +} + +fn do_handshake(muxer: &mut Multiplexer) { + let mut channel = muxer.use_channel(0); + let versions = handshake::n2n::VersionTable::v4_and_above(MAINNET_MAGIC); + let _last = run_agent(handshake::Initiator::initial(versions), &mut channel).unwrap(); +} + +fn do_blockfetch(muxer: &mut Multiplexer) { + let mut channel = muxer.use_channel(3); + + let range = ( + Point::Specific( + 43847831, + hex::decode("15b9eeee849dd6386d3770b0745e0450190f7560e5159b1b3ab13b14b2684a45") + .unwrap(), + ), + Point::Specific( + 43847844, + hex::decode("ff8d558a3d5a0e058beb3d94d26a567f75cd7d09ff5485aa0d0ebc38b61378d4") + .unwrap(), + ), + ); + + let agent = run_agent( + blockfetch::BatchClient::initial(range, LoggingObserver {}), + &mut channel, + ); + + println!("{:?}", agent); +} + +fn do_chainsync(muxer: &mut Multiplexer) { + let mut channel = muxer.use_channel(2); + + let known_points = vec![Point::Specific( + 43847831u64, + hex::decode("15b9eeee849dd6386d3770b0745e0450190f7560e5159b1b3ab13b14b2684a45").unwrap(), + )]; + + let agent = run_agent( + chainsync::Consumer::::initial( + Some(known_points), + LoggingObserver {}, + ), + &mut channel, + ); + + println!("{:?}", agent); +} + +fn main() { + env_logger::builder() + .filter_level(log::LevelFilter::Trace) + .init(); + + // setup a TCP socket to act as data bearer between our agents and the remote + // relay. + let bearer = TcpStream::connect("relays-new.cardano-mainnet.iohk.io:3001").unwrap(); + bearer.set_nodelay(true).unwrap(); + bearer.set_keepalive_ms(Some(30_000u32)).unwrap(); + + // setup the multiplexer by specifying the bearer and the IDs of the + // miniprotocols to use + let mut muxer = Multiplexer::setup(bearer, &[0, 2, 3, 4]).unwrap(); + + // execute the required handshake against the relay + do_handshake(&mut muxer); + + // fetch an arbitrary batch of block + do_blockfetch(&mut muxer); + + // execute the chainsync flow from an arbitrary point in the chain + do_chainsync(&mut muxer); +} diff --git a/pallas-miniprotocols/Cargo.toml b/pallas-miniprotocols/Cargo.toml index b8e3a65b..5862f61e 100644 --- a/pallas-miniprotocols/Cargo.toml +++ b/pallas-miniprotocols/Cargo.toml @@ -19,7 +19,3 @@ log = "0.4.14" hex = "0.4.3" itertools = "0.10.3" net2 = "0.2.37" - -[dev-dependencies] -env_logger = "0.9.0" -pallas-primitives = { version = "0.9.0-alpha.0", path = "../pallas-primitives/" } diff --git a/pallas-miniprotocols/examples/blockfetch.rs b/pallas-miniprotocols/examples/blockfetch.rs deleted file mode 100644 index 5d289916..00000000 --- a/pallas-miniprotocols/examples/blockfetch.rs +++ /dev/null @@ -1,42 +0,0 @@ -use net2::TcpStreamExt; -use std::net::TcpStream; - -use pallas_miniprotocols::blockfetch::{BatchClient, NoopObserver}; -use pallas_miniprotocols::handshake::{n2n::VersionTable, Initiator}; -use pallas_miniprotocols::{run_agent, Point, MAINNET_MAGIC}; -use pallas_multiplexer::Multiplexer; - -fn main() { - env_logger::init(); - - //let bearer = TcpStream::connect("localhost:6000").unwrap(); - let bearer = TcpStream::connect("relays-new.cardano-mainnet.iohk.io:3001").unwrap(); - - bearer.set_nodelay(true).unwrap(); - bearer.set_keepalive_ms(Some(30_000u32)).unwrap(); - - let mut muxer = Multiplexer::setup(bearer, &vec![0, 3]).unwrap(); - - let mut hs_channel = muxer.use_channel(0); - let versions = VersionTable::v4_and_above(MAINNET_MAGIC); - let last = run_agent(Initiator::initial(versions), &mut hs_channel).unwrap(); - println!("{:?}", last); - - let range = ( - Point::Specific( - 43847831u64, - hex::decode("15b9eeee849dd6386d3770b0745e0450190f7560e5159b1b3ab13b14b2684a45") - .unwrap(), - ), - Point::Specific( - 43847831u64, - hex::decode("15b9eeee849dd6386d3770b0745e0450190f7560e5159b1b3ab13b14b2684a45") - .unwrap(), - ), - ); - - let mut bf_channel = muxer.use_channel(3); - let bf = BatchClient::initial(range, NoopObserver {}); - let bf_last = run_agent(bf, &mut bf_channel); - println!("{:?}", bf_last); -} diff --git a/pallas-miniprotocols/examples/chainsync-blocks.rs b/pallas-miniprotocols/examples/chainsync-blocks.rs deleted file mode 100644 index 5dad1de0..00000000 --- a/pallas-miniprotocols/examples/chainsync-blocks.rs +++ /dev/null @@ -1,31 +0,0 @@ -use pallas_miniprotocols::chainsync::{BlockContent, Consumer, NoopObserver}; -use pallas_miniprotocols::handshake::{n2c::VersionTable, Initiator}; -use pallas_miniprotocols::{run_agent, Point, MAINNET_MAGIC}; -use pallas_multiplexer::Multiplexer; -use std::os::unix::net::UnixStream; - -fn main() { - env_logger::init(); - - // we connect to the unix socket of the local node. Make sure you have the right - // path for your environment - let bearer = UnixStream::connect("/tmp/node.socket").unwrap(); - - let mut muxer = Multiplexer::setup(bearer, &vec![0, 4, 5]).unwrap(); - - let mut hs_channel = muxer.use_channel(0); - let versions = VersionTable::v1_and_above(MAINNET_MAGIC); - let last = run_agent(Initiator::initial(versions), &mut hs_channel).unwrap(); - println!("last hanshake state: {:?}", last); - - // some random known-point in the chain to use as starting point for the sync - let known_points = vec![Point::Specific( - 45147459, - hex::decode("bee16ef28ac02abb50c340a7deff085a77f3a7b84c66250b3318dcb125c19a10").unwrap(), - )]; - - let mut cs_channel = muxer.use_channel(5); - let cs = Consumer::::initial(Some(known_points), NoopObserver {}); - let cs = run_agent(cs, &mut cs_channel).unwrap(); - println!("{:?}", cs); -} diff --git a/pallas-miniprotocols/examples/chainsync-headers.rs b/pallas-miniprotocols/examples/chainsync-headers.rs deleted file mode 100644 index eb1ba41e..00000000 --- a/pallas-miniprotocols/examples/chainsync-headers.rs +++ /dev/null @@ -1,40 +0,0 @@ -use net2::TcpStreamExt; -use pallas_primitives::alonzo::Header; - -use pallas_miniprotocols::Point; -use std::net::TcpStream; - -use pallas_miniprotocols::chainsync::{Consumer, HeaderContent, NoopObserver}; -use pallas_miniprotocols::handshake::{n2n::VersionTable, Initiator}; -use pallas_miniprotocols::{run_agent, MAINNET_MAGIC}; -use pallas_multiplexer::Multiplexer; - -#[derive(Debug)] -pub struct Content(u32, Header); - -fn main() { - env_logger::init(); - - let bearer = TcpStream::connect("relays-new.cardano-mainnet.iohk.io:3001").unwrap(); - bearer.set_nodelay(true).unwrap(); - bearer.set_keepalive_ms(Some(30_000u32)).unwrap(); - - let mut muxer = Multiplexer::setup(bearer, &vec![0, 2]).unwrap(); - let mut hs_channel = muxer.use_channel(0); - - let versions = VersionTable::v4_and_above(MAINNET_MAGIC); - let last = run_agent(Initiator::initial(versions), &mut hs_channel).unwrap(); - println!("{:?}", last); - - let known_points = vec![Point::Specific( - 43847831u64, - hex::decode("15b9eeee849dd6386d3770b0745e0450190f7560e5159b1b3ab13b14b2684a45").unwrap(), - )]; - - let mut cs_channel = muxer.use_channel(2); - - let cs = Consumer::::initial(Some(known_points), NoopObserver {}); - let cs = run_agent(cs, &mut cs_channel).unwrap(); - - println!("{:?}", cs); -} diff --git a/pallas-miniprotocols/examples/handshake-client.rs b/pallas-miniprotocols/examples/handshake-client.rs deleted file mode 100644 index 15126ebf..00000000 --- a/pallas-miniprotocols/examples/handshake-client.rs +++ /dev/null @@ -1,24 +0,0 @@ -use net2::TcpStreamExt; -use std::net::TcpStream; - -use pallas_miniprotocols::handshake::{n2c::VersionTable, Initiator}; -use pallas_miniprotocols::{run_agent, MAINNET_MAGIC}; -use pallas_multiplexer::Multiplexer; - -fn main() { - env_logger::init(); - - //let bearer = TcpStream::connect("localhost:6000").unwrap(); - let bearer = TcpStream::connect("relays-new.cardano-mainnet.iohk.io:3001").unwrap(); - - bearer.set_nodelay(true).unwrap(); - bearer.set_keepalive_ms(Some(30_000u32)).unwrap(); - - let mut muxer = Multiplexer::setup(bearer, &vec![0]).unwrap(); - - let mut hs_channel = muxer.use_channel(0); - let versions = VersionTable::v1_and_above(MAINNET_MAGIC); - let last = run_agent(Initiator::initial(versions), &mut hs_channel).unwrap(); - - println!("{:?}", last); -} diff --git a/pallas-miniprotocols/examples/handshake-node.rs b/pallas-miniprotocols/examples/handshake-node.rs deleted file mode 100644 index 128654ca..00000000 --- a/pallas-miniprotocols/examples/handshake-node.rs +++ /dev/null @@ -1,24 +0,0 @@ -use net2::TcpStreamExt; -use std::net::TcpStream; - -use pallas_miniprotocols::handshake::{n2n::VersionTable, Initiator}; -use pallas_miniprotocols::{run_agent, MAINNET_MAGIC}; -use pallas_multiplexer::Multiplexer; - -fn main() { - env_logger::init(); - - //let bearer = TcpStream::connect("localhost:6000").unwrap(); - let bearer = TcpStream::connect("relays-new.cardano-mainnet.iohk.io:3001").unwrap(); - - bearer.set_nodelay(true).unwrap(); - bearer.set_keepalive_ms(Some(30_000u32)).unwrap(); - - let mut muxer = Multiplexer::setup(bearer, &vec![0]).unwrap(); - let mut channel = muxer.use_channel(0); - - let versions = VersionTable::v4_and_above(MAINNET_MAGIC); - let last = run_agent(Initiator::initial(versions), &mut channel).unwrap(); - - println!("{:?}", last); -} diff --git a/pallas-miniprotocols/examples/localstate-chainpoint.rs b/pallas-miniprotocols/examples/localstate-chainpoint.rs deleted file mode 100644 index 879537db..00000000 --- a/pallas-miniprotocols/examples/localstate-chainpoint.rs +++ /dev/null @@ -1,30 +0,0 @@ -use pallas_miniprotocols::handshake::{n2c::VersionTable, Initiator}; -use pallas_miniprotocols::localstate::{ - queries::{QueryV10, RequestV10}, - OneShotClient, -}; -use pallas_miniprotocols::run_agent; -use pallas_miniprotocols::MAINNET_MAGIC; -use pallas_multiplexer::Multiplexer; -use std::os::unix::net::UnixStream; - -fn main() { - env_logger::init(); - - // we connect to the unix socket of the local node. Make sure you have the right - // path for your environment - let bearer = UnixStream::connect("/tmp/node.socket").unwrap(); - - let mut muxer = Multiplexer::setup(bearer, &vec![0, 7]).unwrap(); - - let mut hs_channel = muxer.use_channel(0); - let versions = VersionTable::only_v10(MAINNET_MAGIC); - let last = run_agent(Initiator::initial(versions), &mut hs_channel).unwrap(); - println!("last hanshake state: {:?}", last); - - let mut ls_channel = muxer.use_channel(7); - - let cs = OneShotClient::::initial(None, RequestV10::GetChainPoint); - let cs = run_agent(cs, &mut ls_channel).unwrap(); - println!("{:?}", cs); -} diff --git a/pallas-miniprotocols/examples/txsubmission-naive.rs b/pallas-miniprotocols/examples/txsubmission-naive.rs deleted file mode 100644 index 68db2a44..00000000 --- a/pallas-miniprotocols/examples/txsubmission-naive.rs +++ /dev/null @@ -1,30 +0,0 @@ -use net2::TcpStreamExt; -use std::net::TcpStream; - -use pallas_miniprotocols::handshake::{n2c::VersionTable, Initiator}; -use pallas_miniprotocols::txsubmission::NaiveProvider; -use pallas_miniprotocols::{run_agent, MAINNET_MAGIC}; -use pallas_multiplexer::Multiplexer; - -fn main() { - env_logger::init(); - - //let bearer = TcpStream::connect("localhost:6000").unwrap(); - let bearer = TcpStream::connect("relays-new.cardano-mainnet.iohk.io:3001").unwrap(); - - bearer.set_nodelay(true).unwrap(); - bearer.set_keepalive_ms(Some(30_000u32)).unwrap(); - - let mut muxer = Multiplexer::setup(bearer, &vec![0, 4]).unwrap(); - - let mut hs_channel = muxer.use_channel(0); - let versions = VersionTable::v1_and_above(MAINNET_MAGIC); - let last = run_agent(Initiator::initial(versions), &mut hs_channel).unwrap(); - println!("{:?}", last); - - let mut ts_channel = muxer.use_channel(4); - let ts = NaiveProvider::initial(vec![]); - let ts = run_agent(ts, &mut ts_channel).unwrap(); - - println!("{:?}", ts); -} From a39254f4036c32092029e064c13dbb81bfaef442 Mon Sep 17 00:00:00 2001 From: Santiago Carmuega Date: Thu, 28 Apr 2022 09:33:14 -0300 Subject: [PATCH 2/2] Fix lint issues --- examples/n2c-miniprotocols/src/main.rs | 2 -- examples/n2n-miniprotocols/src/main.rs | 1 - 2 files changed, 3 deletions(-) diff --git a/examples/n2c-miniprotocols/src/main.rs b/examples/n2c-miniprotocols/src/main.rs index 26033aad..c24cba1e 100644 --- a/examples/n2c-miniprotocols/src/main.rs +++ b/examples/n2c-miniprotocols/src/main.rs @@ -1,5 +1,3 @@ -use log; - use pallas::network::{ miniprotocols::{chainsync, handshake, localstate, run_agent, Point, MAINNET_MAGIC}, multiplexer::Multiplexer, diff --git a/examples/n2n-miniprotocols/src/main.rs b/examples/n2n-miniprotocols/src/main.rs index 5b61b38f..9f73dff1 100644 --- a/examples/n2n-miniprotocols/src/main.rs +++ b/examples/n2n-miniprotocols/src/main.rs @@ -1,4 +1,3 @@ -use log; use net2::TcpStreamExt; use pallas::network::{