Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: chain sync server side #277

Merged
merged 1 commit into from
Aug 23, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 1 addition & 3 deletions pallas-network/src/miniprotocols/chainsync/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use tracing::debug;
use crate::miniprotocols::Point;
use crate::multiplexer;

use super::{BlockContent, HeaderContent, Message, State, Tip};
use super::{BlockContent, HeaderContent, IntersectResponse, Message, State, Tip};

#[derive(Error, Debug)]
pub enum Error {
Expand All @@ -29,8 +29,6 @@ pub enum Error {
Plexer(multiplexer::Error),
}

pub type IntersectResponse = (Option<Point>, Tip);

#[derive(Debug)]
pub enum NextResponse<CONTENT> {
RollForward(CONTENT, Tip),
Expand Down
40 changes: 34 additions & 6 deletions pallas-network/src/miniprotocols/chainsync/codec.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use pallas_codec::minicbor;
use pallas_codec::minicbor::encode::Error;
use pallas_codec::minicbor::{decode, encode, Decode, Decoder, Encode, Encoder};

use super::{BlockContent, HeaderContent, Message, SkippedContent, Tip};
Expand Down Expand Up @@ -167,10 +168,32 @@ impl<'b> Decode<'b, ()> for HeaderContent {
impl Encode<()> for HeaderContent {
fn encode<W: encode::Write>(
&self,
_e: &mut Encoder<W>,
e: &mut Encoder<W>,
_ctx: &mut (),
) -> Result<(), encode::Error<W::Error>> {
todo!()
e.array(2)?;
e.u8(self.variant)?;

// variant 0 is byron
if self.variant == 0 {
e.array(2)?;

if let Some((a, b)) = self.byron_prefix {
e.array(2)?;
e.u8(a)?;
e.u64(b)?;
} else {
return Err(Error::message("header variant 0 but no byron prefix"));
}

e.tag(minicbor::data::Tag::Cbor)?;
e.bytes(&self.cbor)?;
} else {
e.tag(minicbor::data::Tag::Cbor)?;
e.bytes(&self.cbor)?;
}

Ok(())
}
}

Expand All @@ -185,10 +208,13 @@ impl<'b> Decode<'b, ()> for BlockContent {
impl Encode<()> for BlockContent {
fn encode<W: encode::Write>(
&self,
_e: &mut Encoder<W>,
e: &mut Encoder<W>,
_ctx: &mut (),
) -> Result<(), encode::Error<W::Error>> {
todo!()
e.tag(minicbor::data::Tag::Cbor)?;
e.bytes(&self.0)?;

Ok(())
}
}

Expand All @@ -202,9 +228,11 @@ impl<'b> Decode<'b, ()> for SkippedContent {
impl Encode<()> for SkippedContent {
fn encode<W: encode::Write>(
&self,
_e: &mut Encoder<W>,
e: &mut Encoder<W>,
_ctx: &mut (),
) -> Result<(), encode::Error<W::Error>> {
todo!()
e.null()?;

Ok(())
}
}
2 changes: 2 additions & 0 deletions pallas-network/src/miniprotocols/chainsync/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,10 @@ mod buffer;
mod client;
mod codec;
mod protocol;
mod server;

pub use buffer::*;
pub use client::*;
pub use codec::*;
pub use protocol::*;
pub use server::*;
2 changes: 2 additions & 0 deletions pallas-network/src/miniprotocols/chainsync/protocol.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ use crate::miniprotocols::Point;
#[derive(Debug, Clone)]
pub struct Tip(pub Point, pub u64);

pub type IntersectResponse = (Option<Point>, Tip);

#[derive(Debug, PartialEq, Eq, Clone)]
pub enum State {
Idle,
Expand Down
285 changes: 285 additions & 0 deletions pallas-network/src/miniprotocols/chainsync/server.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,285 @@
use pallas_codec::Fragment;
use std::marker::PhantomData;
use thiserror::Error;
use tracing::debug;

use crate::miniprotocols::Point;
use crate::multiplexer;

use super::{BlockContent, HeaderContent, Message, State, Tip};

#[derive(Error, Debug)]
pub enum Error {
#[error("attempted to receive message while agency is ours")]
AgencyIsOurs,

#[error("attempted to send message while agency is theirs")]
AgencyIsTheirs,

#[error("inbound message is not valid for current state")]
InvalidInbound,

#[error("outbound message is not valid for current state")]
InvalidOutbound,

#[error("error while sending or receiving data through the channel")]
Plexer(multiplexer::Error),
}

#[derive(Debug)]
pub enum ClientRequest {
Intersect(Vec<Point>),
RequestNext,
}

pub struct Server<O>(State, multiplexer::ChannelBuffer, PhantomData<O>)
where
Message<O>: Fragment;

impl<O> Server<O>
where
Message<O>: Fragment,
{
/// Constructs a new ChainSync `Server` instance.
///
/// # Arguments
///
/// * `channel` - An instance of `multiplexer::AgentChannel` to be used for
/// communication.
pub fn new(channel: multiplexer::AgentChannel) -> Self {
Self(
State::Idle,
multiplexer::ChannelBuffer::new(channel),
PhantomData {},
)
}

/// Returns the current state of the server.
pub fn state(&self) -> &State {
&self.0
}

/// Checks if the server state is done.
pub fn is_done(&self) -> bool {
self.0 == State::Done
}

/// Checks if the server has agency.
pub fn has_agency(&self) -> bool {
match self.state() {
State::Idle => false,
State::CanAwait => true,
State::MustReply => true,
State::Intersect => true,
State::Done => false,
}
}

fn assert_agency_is_ours(&self) -> Result<(), Error> {
if !self.has_agency() {
Err(Error::AgencyIsTheirs)
} else {
Ok(())
}
}

fn assert_agency_is_theirs(&self) -> Result<(), Error> {
if self.has_agency() {
Err(Error::AgencyIsOurs)
} else {
Ok(())
}
}

fn assert_outbound_state(&self, msg: &Message<O>) -> Result<(), Error> {
match (&self.0, msg) {
(State::CanAwait, Message::RollForward(_, _)) => Ok(()),
(State::CanAwait, Message::RollBackward(_, _)) => Ok(()),
(State::CanAwait, Message::AwaitReply) => Ok(()),
(State::MustReply, Message::RollForward(_, _)) => Ok(()),
(State::MustReply, Message::RollBackward(_, _)) => Ok(()),
(State::Intersect, Message::IntersectFound(_, _)) => Ok(()),
(State::Intersect, Message::IntersectNotFound(_)) => Ok(()),
_ => Err(Error::InvalidOutbound),
}
}

fn assert_inbound_state(&self, msg: &Message<O>) -> Result<(), Error> {
match (&self.0, msg) {
(State::Idle, Message::RequestNext) => Ok(()),
(State::Idle, Message::FindIntersect(_)) => Ok(()),
(State::Idle, Message::Done) => Ok(()),
_ => Err(Error::InvalidInbound),
}
}

/// Sends a message to the client
///
/// # Arguments
///
/// * `msg` - A reference to the `Message` to be sent.
///
/// # Errors
///
/// Returns an error if the agency is not ours or if the outbound state is
/// invalid.
pub async fn send_message(&mut self, msg: &Message<O>) -> Result<(), Error> {
self.assert_agency_is_ours()?;
self.assert_outbound_state(msg)?;

self.1.send_msg_chunks(msg).await.map_err(Error::Plexer)?;

Ok(())
}

/// Receives the next message from the client.
///
/// # Errors
///
/// Returns an error if the agency is not theirs or if the inbound state is
/// invalid.
async fn recv_message(&mut self) -> Result<Message<O>, Error> {
self.assert_agency_is_theirs()?;

let msg = self.1.recv_full_msg().await.map_err(Error::Plexer)?;

self.assert_inbound_state(&msg)?;

Ok(msg)
}

/// Receive a message from the client when the protocol state is Idle.
///
/// # Errors
///
/// Returns an error if the agency is not theirs or if the inbound message
/// is invalid for Idle protocol state.
pub async fn recv_while_idle(&mut self) -> Result<Option<ClientRequest>, Error> {
match self.recv_message().await? {
Message::FindIntersect(points) => {
self.0 = State::Intersect;
Ok(Some(ClientRequest::Intersect(points)))
}
Message::RequestNext => {
self.0 = State::CanAwait;
Ok(Some(ClientRequest::RequestNext))
}
Message::Done => {
self.0 = State::Done;

Ok(None)
}
_ => Err(Error::InvalidInbound),
}
}

/// Sends an IntersectNotFound message to the client.
///
/// # Arguments
///
/// * `tip` - the most recent point of the server's chain.
///
/// # Errors
///
/// Returns an error if the message cannot be sent or if it's not valid for
/// the current state of the server.
pub async fn send_intersect_not_found(&mut self, tip: Tip) -> Result<(), Error> {
debug!("send intersect not found");

let msg = Message::IntersectNotFound(tip);
self.send_message(&msg).await?;
self.0 = State::Idle;

Ok(())
}

/// Sends an IntersectFound message to the client.
///
/// # Arguments
///
/// * `point` - the first point in the client's provided list of intersect
/// points that was found in the servers's current chain.
/// * `tip` - the most recent point of the server's chain.
///
/// # Errors
///
/// Returns an error if the message cannot be sent or if it's not valid for
/// the current state of the server.
pub async fn send_intersect_found(&mut self, point: Point, tip: Tip) -> Result<(), Error> {
debug!("send intersect found ({point:?}");

let msg = Message::IntersectFound(point, tip);
self.send_message(&msg).await?;
self.0 = State::Idle;

Ok(())
}

/// Sends a RollForward message to the client.
///
/// # Arguments
///
/// * `content` - the data to send to the client: for example block headers
/// for N2N or full blocks for N2C.
/// * `tip` - the most recent point of the server's chain.
///
/// # Errors
///
/// Returns an error if the message cannot be sent or if it's not valid for
/// the current state of the server.
pub async fn send_roll_forward(&mut self, content: O, tip: Tip) -> Result<(), Error> {
debug!("send roll forward");

let msg = Message::RollForward(content, tip);
self.send_message(&msg).await?;
self.0 = State::Idle;

Ok(())
}

/// Sends a RollBackward message to the client.
///
/// # Arguments
///
/// * `point` - point at which the client should rollback their chain to.
/// * `tip` - the most recent point of the server's chain.
///
/// # Errors
///
/// Returns an error if the message cannot be sent or if it's not valid for
/// the current state of the server.
pub async fn send_roll_backward(&mut self, point: Point, tip: Tip) -> Result<(), Error> {
debug!("send roll backward {point:?}");

let msg = Message::RollBackward(point, tip);
self.send_message(&msg).await?;
self.0 = State::Idle;

Ok(())
}

/// Sends an AwaitReply message to the client.
///
/// # Arguments
///
/// * `point` - point at which the client should rollback their chain to.
/// * `tip` - the most recent point of the server's chain.
///
/// # Errors
///
/// Returns an error if the message cannot be sent or if it's not valid for
/// the current state of the server.
pub async fn send_await_reply(&mut self) -> Result<(), Error> {
debug!("send await reply");

let msg = Message::AwaitReply;
self.send_message(&msg).await?;
self.0 = State::MustReply;

Ok(())
}
}

pub type N2NServer = Server<HeaderContent>;

pub type N2CServer = Server<BlockContent>;
Loading