Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: Move miniprotocol examples to custom crate #97

Merged
merged 2 commits into from
Apr 28, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -9,4 +9,6 @@ members = [
"pallas",
"examples/block-download",
"examples/block-decode",
"examples/n2n-miniprotocols",
"examples/n2c-miniprotocols",
]
4 changes: 4 additions & 0 deletions examples/n2c-miniprotocols/.gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
/target

scratchpad
.DS_Store
16 changes: 16 additions & 0 deletions examples/n2c-miniprotocols/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
[package]
name = "n2c-miniprotocols"
version = "0.1.0"
edition = "2021"
publish = false

# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[dependencies]
pallas = { path = "../../pallas" }
net2 = "0.2.37"
env_logger = "0.9.0"
hex = "0.4.3"
log = "0.4.16"


108 changes: 108 additions & 0 deletions examples/n2c-miniprotocols/src/main.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
use pallas::network::{
miniprotocols::{chainsync, handshake, localstate, run_agent, Point, MAINNET_MAGIC},
multiplexer::Multiplexer,
};

use std::os::unix::net::UnixStream;

#[derive(Debug)]
struct LoggingObserver;

impl chainsync::Observer<chainsync::HeaderContent> for LoggingObserver {
fn on_roll_forward(
&mut self,
_content: chainsync::HeaderContent,
tip: &chainsync::Tip,
) -> Result<chainsync::Continuation, Box<dyn std::error::Error>> {
log::debug!("asked to roll forward, tip at {:?}", tip);

Ok(chainsync::Continuation::Proceed)
}

fn on_intersect_found(
&mut self,
point: &Point,
tip: &chainsync::Tip,
) -> Result<chainsync::Continuation, Box<dyn std::error::Error>> {
log::debug!("intersect was found {:?} (tip: {:?})", point, tip);

Ok(chainsync::Continuation::Proceed)
}

fn on_rollback(
&mut self,
point: &Point,
) -> Result<chainsync::Continuation, Box<dyn std::error::Error>> {
log::debug!("asked to roll back {:?}", point);

Ok(chainsync::Continuation::Proceed)
}

fn on_tip_reached(&mut self) -> Result<chainsync::Continuation, Box<dyn std::error::Error>> {
log::debug!("tip was reached");

Ok(chainsync::Continuation::Proceed)
}
}

fn do_handshake(muxer: &mut Multiplexer) {
let mut channel = muxer.use_channel(0);
let versions = handshake::n2c::VersionTable::v1_and_above(MAINNET_MAGIC);
let _last = run_agent(handshake::Initiator::initial(versions), &mut channel).unwrap();
}

fn do_localstate_query(muxer: &mut Multiplexer) {
let mut channel = muxer.use_channel(7);

let agent = run_agent(
localstate::OneShotClient::<localstate::queries::QueryV10>::initial(
None,
localstate::queries::RequestV10::GetChainPoint,
),
&mut channel,
);

log::info!("state query result: {:?}", agent);
}

fn do_chainsync(muxer: &mut Multiplexer) {
let mut channel = muxer.use_channel(5);

let known_points = vec![Point::Specific(
43847831u64,
hex::decode("15b9eeee849dd6386d3770b0745e0450190f7560e5159b1b3ab13b14b2684a45").unwrap(),
)];

let agent = run_agent(
chainsync::Consumer::<chainsync::HeaderContent, _>::initial(
Some(known_points),
LoggingObserver {},
),
&mut channel,
);

println!("{:?}", agent);
}

fn main() {
env_logger::builder()
.filter_level(log::LevelFilter::Trace)
.init();

// we connect to the unix socket of the local node. Make sure you have the right
// path for your environment
let bearer = UnixStream::connect("/tmp/node.socket").unwrap();

// setup the multiplexer by specifying the bearer and the IDs of the
// miniprotocols to use
let mut muxer = Multiplexer::setup(bearer, &[0, 4, 5]).unwrap();

// execute the required handshake against the relay
do_handshake(&mut muxer);

// execute an arbitrary "Local State" query against the node
do_localstate_query(&mut muxer);

// execute the chainsync flow from an arbitrary point in the chain
do_chainsync(&mut muxer);
}
4 changes: 4 additions & 0 deletions examples/n2n-miniprotocols/.gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
/target

scratchpad
.DS_Store
15 changes: 15 additions & 0 deletions examples/n2n-miniprotocols/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
[package]
name = "n2n-miniprotocols"
version = "0.1.0"
edition = "2021"
publish = false

# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[dependencies]
pallas = { path = "../../pallas" }
net2 = "0.2.37"
env_logger = "0.9.0"
hex = "0.4.3"
log = "0.4.16"

129 changes: 129 additions & 0 deletions examples/n2n-miniprotocols/src/main.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,129 @@
use net2::TcpStreamExt;

use pallas::network::{
miniprotocols::{blockfetch, chainsync, handshake, run_agent, Point, MAINNET_MAGIC},
multiplexer::Multiplexer,
};

use std::net::TcpStream;

#[derive(Debug)]
struct LoggingObserver;

impl blockfetch::Observer for LoggingObserver {
fn on_block_received(&mut self, body: Vec<u8>) -> Result<(), Box<dyn std::error::Error>> {
log::trace!("block received: {}", hex::encode(&body));
Ok(())
}
}

impl chainsync::Observer<chainsync::HeaderContent> for LoggingObserver {
fn on_roll_forward(
&mut self,
_content: chainsync::HeaderContent,
tip: &chainsync::Tip,
) -> Result<chainsync::Continuation, Box<dyn std::error::Error>> {
log::debug!("asked to roll forward, tip at {:?}", tip);

Ok(chainsync::Continuation::Proceed)
}

fn on_intersect_found(
&mut self,
point: &Point,
tip: &chainsync::Tip,
) -> Result<chainsync::Continuation, Box<dyn std::error::Error>> {
log::debug!("intersect was found {:?} (tip: {:?})", point, tip);

Ok(chainsync::Continuation::Proceed)
}

fn on_rollback(
&mut self,
point: &Point,
) -> Result<chainsync::Continuation, Box<dyn std::error::Error>> {
log::debug!("asked to roll back {:?}", point);

Ok(chainsync::Continuation::Proceed)
}

fn on_tip_reached(&mut self) -> Result<chainsync::Continuation, Box<dyn std::error::Error>> {
log::debug!("tip was reached");

Ok(chainsync::Continuation::Proceed)
}
}

fn do_handshake(muxer: &mut Multiplexer) {
let mut channel = muxer.use_channel(0);
let versions = handshake::n2n::VersionTable::v4_and_above(MAINNET_MAGIC);
let _last = run_agent(handshake::Initiator::initial(versions), &mut channel).unwrap();
}

fn do_blockfetch(muxer: &mut Multiplexer) {
let mut channel = muxer.use_channel(3);

let range = (
Point::Specific(
43847831,
hex::decode("15b9eeee849dd6386d3770b0745e0450190f7560e5159b1b3ab13b14b2684a45")
.unwrap(),
),
Point::Specific(
43847844,
hex::decode("ff8d558a3d5a0e058beb3d94d26a567f75cd7d09ff5485aa0d0ebc38b61378d4")
.unwrap(),
),
);

let agent = run_agent(
blockfetch::BatchClient::initial(range, LoggingObserver {}),
&mut channel,
);

println!("{:?}", agent);
}

fn do_chainsync(muxer: &mut Multiplexer) {
let mut channel = muxer.use_channel(2);

let known_points = vec![Point::Specific(
43847831u64,
hex::decode("15b9eeee849dd6386d3770b0745e0450190f7560e5159b1b3ab13b14b2684a45").unwrap(),
)];

let agent = run_agent(
chainsync::Consumer::<chainsync::HeaderContent, _>::initial(
Some(known_points),
LoggingObserver {},
),
&mut channel,
);

println!("{:?}", agent);
}

fn main() {
env_logger::builder()
.filter_level(log::LevelFilter::Trace)
.init();

// setup a TCP socket to act as data bearer between our agents and the remote
// relay.
let bearer = TcpStream::connect("relays-new.cardano-mainnet.iohk.io:3001").unwrap();
bearer.set_nodelay(true).unwrap();
bearer.set_keepalive_ms(Some(30_000u32)).unwrap();

// setup the multiplexer by specifying the bearer and the IDs of the
// miniprotocols to use
let mut muxer = Multiplexer::setup(bearer, &[0, 2, 3, 4]).unwrap();

// execute the required handshake against the relay
do_handshake(&mut muxer);

// fetch an arbitrary batch of block
do_blockfetch(&mut muxer);

// execute the chainsync flow from an arbitrary point in the chain
do_chainsync(&mut muxer);
}
4 changes: 0 additions & 4 deletions pallas-miniprotocols/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,3 @@ log = "0.4.14"
hex = "0.4.3"
itertools = "0.10.3"
net2 = "0.2.37"

[dev-dependencies]
env_logger = "0.9.0"
pallas-primitives = { version = "0.9.0-alpha.0", path = "../pallas-primitives/" }
42 changes: 0 additions & 42 deletions pallas-miniprotocols/examples/blockfetch.rs

This file was deleted.

31 changes: 0 additions & 31 deletions pallas-miniprotocols/examples/chainsync-blocks.rs

This file was deleted.

Loading