diff --git a/benches/benches/src/sv2/criterion_sv2_benchmark.rs b/benches/benches/src/sv2/criterion_sv2_benchmark.rs index 7aa35f158c..681a2aaa90 100644 --- a/benches/benches/src/sv2/criterion_sv2_benchmark.rs +++ b/benches/benches/src/sv2/criterion_sv2_benchmark.rs @@ -1,4 +1,4 @@ -use codec_sv2::{Frame, StandardEitherFrame, StandardSv2Frame}; +use codec_sv2::{StandardFrame, StandardSv2Frame}; use criterion::{black_box, Criterion}; use roles_logic_sv2::{ handlers::{common::ParseUpstreamCommonMessages, mining::ParseUpstreamMiningMessages}, @@ -20,7 +20,7 @@ use crate::client::{ pub type Message = MiningDeviceMessages<'static>; pub type StdFrame = StandardSv2Frame; -pub type EitherFrame = StandardEitherFrame; +pub type EitherFrame = StandardFrame; fn client_sv2_setup_connection(c: &mut Criterion) { c.bench_function("client_sv2_setup_connection", |b| { @@ -53,9 +53,11 @@ fn client_sv2_setup_connection_serialize_deserialize(c: &mut Criterion) { let mut dst = vec![0; size]; let _serialized = frame.serialize(&mut dst); b.iter(|| { - let mut frame = StdFrame::from_bytes(black_box(dst.clone().into())).unwrap(); - let type_ = frame.get_header().unwrap().msg_type().clone(); - let payload = frame.payload(); + let frame = StdFrame::from_bytes(black_box(dst.clone().into())).unwrap(); + let type_ = frame.header().msg_type(); + let payload = frame.payload().unwrap(); + let mut payload = payload.to_owned(); + let payload = payload.as_mut(); let _ = AnyMessage::try_from((type_, payload)).unwrap(); }); }); @@ -95,8 +97,10 @@ fn client_sv2_open_channel_serialize_deserialize(c: &mut Criterion) { frame.serialize(&mut dst); b.iter(|| { let mut frame = StdFrame::from_bytes(black_box(dst.clone().into())).unwrap(); - let type_ = frame.get_header().unwrap().msg_type().clone(); - let payload = frame.payload(); + let type_ = frame.header().msg_type(); + let payload = frame.payload().unwrap(); + let mut payload = payload.to_owned(); + let payload = payload.as_mut(); black_box(AnyMessage::try_from((type_, payload)).unwrap()); }); }); @@ -151,8 +155,10 @@ fn client_sv2_mining_message_submit_standard_serialize_deserialize(c: &mut Crite |b| { b.iter(|| { let mut frame = StdFrame::from_bytes(black_box(dst.clone().into())).unwrap(); - let type_ = frame.get_header().unwrap().msg_type().clone(); - let payload = frame.payload(); + let type_ = frame.header().msg_type(); + let payload = frame.payload().unwrap(); + let mut payload = payload.to_owned(); + let payload = payload.as_mut(); black_box(AnyMessage::try_from((type_, payload)).unwrap()); }); }, diff --git a/benches/benches/src/sv2/iai_sv2_benchmark.rs b/benches/benches/src/sv2/iai_sv2_benchmark.rs index 2cab39cc47..481792dd7a 100644 --- a/benches/benches/src/sv2/iai_sv2_benchmark.rs +++ b/benches/benches/src/sv2/iai_sv2_benchmark.rs @@ -1,4 +1,4 @@ -use codec_sv2::{Frame, StandardEitherFrame, StandardSv2Frame}; +use codec_sv2::{StandardFrame, StandardSv2Frame}; use iai::{black_box, main}; use roles_logic_sv2::{ handlers::{common::ParseUpstreamCommonMessages, mining::ParseUpstreamMiningMessages, SendTo_}, @@ -18,7 +18,7 @@ use crate::client::{create_client, open_channel, Device, SetupConnectionHandler} pub type Message = MiningDeviceMessages<'static>; pub type StdFrame = StandardSv2Frame; -pub type EitherFrame = StandardEitherFrame; +pub type EitherFrame = StandardFrame; fn client_sv2_setup_connection() { let address: SocketAddr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)), 34254); @@ -46,8 +46,10 @@ fn client_sv2_setup_connection_serialize_deserialize() { let mut dst = vec![0; size]; frame.serialize(&mut dst); let mut frame = StdFrame::from_bytes(black_box(dst.clone().into())).unwrap(); - let type_ = frame.get_header().unwrap().msg_type().clone(); - let payload = frame.payload(); + let type_ = frame.header().msg_type(); + let payload = frame.payload().unwrap(); + let mut payload = payload.to_owned(); + let payload = payload.as_mut(); black_box(AnyMessage::try_from((type_, payload))); } @@ -77,8 +79,10 @@ fn client_sv2_open_channel_serialize_deserialize() { let mut dst = vec![0; size]; frame.serialize(&mut dst); let mut frame = StdFrame::from_bytes(black_box(dst.clone().into())).unwrap(); - let type_ = frame.get_header().unwrap().msg_type().clone(); - let payload = frame.payload(); + let type_ = frame.header().msg_type(); + let payload = frame.payload().unwrap(); + let mut payload = payload.to_owned(); + let payload = payload.as_mut(); black_box(AnyMessage::try_from((type_, payload))); } @@ -127,8 +131,10 @@ fn client_sv2_mining_message_submit_standard_serialize_deserialize() { let mut dst = vec![0; size]; frame.serialize(&mut dst); let mut frame = StdFrame::from_bytes(black_box(dst.clone().into())).unwrap(); - let type_ = frame.get_header().unwrap().msg_type().clone(); - let payload = frame.payload(); + let type_ = frame.header().msg_type(); + let payload = frame.payload().unwrap(); + let mut payload = payload.to_owned(); + let payload = payload.as_mut(); black_box(AnyMessage::try_from((type_, payload))); } diff --git a/benches/benches/src/sv2/lib/client.rs b/benches/benches/src/sv2/lib/client.rs index 714f237bdf..73ea4f4185 100644 --- a/benches/benches/src/sv2/lib/client.rs +++ b/benches/benches/src/sv2/lib/client.rs @@ -5,7 +5,7 @@ use bitcoin::{ use async_channel::{Receiver, Sender}; use async_std::channel::unbounded; use binary_sv2::u256_from_int; -use codec_sv2::{buffer_sv2::Slice, StandardEitherFrame, StandardSv2Frame}; +use codec_sv2::{buffer_sv2::Slice, StandardFrame, StandardSv2Frame}; use roles_logic_sv2::{ common_messages_sv2::{Protocol, SetupConnection, SetupConnectionSuccess}, common_properties::{IsMiningUpstream, IsUpstream}, @@ -23,7 +23,7 @@ use roles_logic_sv2::{ use std::{net::SocketAddr, sync::Arc}; pub type Message = MiningDeviceMessages<'static>; pub type StdFrame = StandardSv2Frame; -pub type EitherFrame = StandardEitherFrame; +pub type EitherFrame = StandardFrame; pub fn create_client() -> Device { let (sender, receiver) = unbounded(); diff --git a/examples/interop-cpp/src/main.rs b/examples/interop-cpp/src/main.rs index 34f6bef09c..348ef85b44 100644 --- a/examples/interop-cpp/src/main.rs +++ b/examples/interop-cpp/src/main.rs @@ -12,7 +12,7 @@ mod main_ { #[cfg(not(feature = "with_serde"))] mod main_ { - use codec_sv2::{Encoder, Frame, StandardDecoder, StandardSv2Frame}; + use codec_sv2::{Encoder, StandardDecoder, StandardSv2Frame}; use common_messages_sv2::{Protocol, SetupConnection, SetupConnectionError}; use const_sv2::{ CHANNEL_BIT_SETUP_CONNECTION, MESSAGE_TYPE_SETUP_CONNECTION, @@ -124,10 +124,15 @@ mod main_ { loop { let buffer = decoder.writable(); - stream.read_exact(buffer).unwrap(); - if let Ok(mut f) = decoder.next_frame() { - let msg_type = f.get_header().unwrap().msg_type(); - let payload = f.payload(); + match stream.read_exact(buffer) { + Ok(_) => {} + Err(_) => continue, + }; + if let Ok(f) = decoder.next_frame() { + let msg_type = f.header().msg_type(); + let payload = f.payload().unwrap(); + let mut payload = payload.to_owned(); + let payload = payload.as_mut(); let message: Sv2Message = (msg_type, payload).try_into().unwrap(); match message { Sv2Message::SetupConnection(_) => panic!(), diff --git a/examples/ping-pong-with-noise/src/node.rs b/examples/ping-pong-with-noise/src/node.rs index 912d6e8357..713efd10fa 100644 --- a/examples/ping-pong-with-noise/src/node.rs +++ b/examples/ping-pong-with-noise/src/node.rs @@ -11,7 +11,7 @@ use async_std::{ }; use core::convert::TryInto; -use codec_sv2::{Frame, HandshakeRole, StandardEitherFrame, StandardSv2Frame}; +use codec_sv2::{HandshakeRole, StandardFrame, StandardSv2Frame}; use std::time; @@ -28,8 +28,8 @@ pub struct Node { name: String, last_id: u32, expected: Expected, - receiver: Receiver>>, - sender: Sender>>, + receiver: Receiver>>, + sender: Sender>>, } impl Node { @@ -95,11 +95,14 @@ impl Node { fn handle_message( &mut self, - mut frame: StandardSv2Frame>, + frame: StandardSv2Frame>, ) -> Message<'static> { match self.expected { Expected::Ping => { - let ping: Result = from_bytes(frame.payload()); + let payload = frame.payload().unwrap(); + let mut payload = payload.to_owned(); + let payload = payload.as_mut(); + let ping: Result = from_bytes(payload); match ping { Ok(ping) => { println!("Node {} received:", self.name); @@ -118,7 +121,10 @@ impl Node { } } Expected::Pong => { - let pong: Result = from_bytes(frame.payload()); + let payload = frame.payload().unwrap(); + let mut payload = payload.to_owned(); + let payload = payload.as_mut(); + let pong: Result = from_bytes(payload); match pong { Ok(pong) => { println!("Node {} received:", self.name); diff --git a/examples/ping-pong-without-noise/src/node.rs b/examples/ping-pong-without-noise/src/node.rs index 97295d591f..b2418cd55e 100644 --- a/examples/ping-pong-without-noise/src/node.rs +++ b/examples/ping-pong-without-noise/src/node.rs @@ -10,7 +10,7 @@ use async_std::{ task, }; -use codec_sv2::{Frame, StandardDecoder, StandardSv2Frame}; +use codec_sv2::{StandardDecoder, StandardSv2Frame}; #[derive(Debug)] enum Expected { @@ -83,11 +83,14 @@ impl Node { fn handle_message( &mut self, - mut frame: StandardSv2Frame>, + frame: StandardSv2Frame>, ) -> Message<'static> { match self.expected { Expected::Ping => { - let ping: Result = from_bytes(frame.payload()); + let payload = frame.payload().unwrap(); + let mut payload = payload.to_owned(); + let payload = payload.as_mut(); + let ping: Result = from_bytes(payload); match ping { Ok(ping) => { println!("Node {} received:", self.name); @@ -107,7 +110,10 @@ impl Node { } } Expected::Pong => { - let pong: Result = from_bytes(frame.payload()); + let payload = frame.payload().unwrap(); + let mut payload = payload.to_owned(); + let payload = payload.as_mut(); + let pong: Result = from_bytes(payload); match pong { Ok(pong) => { println!("Node {} received:", self.name); diff --git a/examples/template-provider-test/src/main.rs b/examples/template-provider-test/src/main.rs index 78878227b5..4c99773400 100644 --- a/examples/template-provider-test/src/main.rs +++ b/examples/template-provider-test/src/main.rs @@ -1,6 +1,6 @@ use async_channel::{Receiver, Sender}; use async_std::net::TcpStream; -use codec_sv2::{Frame, StandardEitherFrame, StandardSv2Frame, Sv2Frame}; +use codec_sv2::{StandardFrame, StandardSv2Frame, Sv2Frame}; use network_helpers::PlainConnection; use roles_logic_sv2::{ parsers::{IsSv2Message, TemplateDistribution}, @@ -13,7 +13,7 @@ use std::{ pub type Message = TemplateDistribution<'static>; pub type StdFrame = StandardSv2Frame; -pub type EitherFrame = StandardEitherFrame; +pub type EitherFrame = StandardFrame; #[async_std::main] async fn main() { diff --git a/protocols/fuzz-tests/src/main.rs b/protocols/fuzz-tests/src/main.rs index c9623ecb51..bb2364c0fd 100644 --- a/protocols/fuzz-tests/src/main.rs +++ b/protocols/fuzz-tests/src/main.rs @@ -2,7 +2,7 @@ use libfuzzer_sys::fuzz_target; use binary_codec_sv2::{Seq064K,U256,B0255,Seq0255}; use binary_codec_sv2::from_bytes; -use codec_sv2::{StandardDecoder,Sv2Frame,Frame}; +use codec_sv2::{StandardDecoder,Sv2Frame}; use roles_logic_sv2::parsers::PoolMessages; type F = Sv2Frame,Vec>; diff --git a/protocols/v2/codec-sv2/src/decoder.rs b/protocols/v2/codec-sv2/src/decoder.rs index aebc0aee1d..8455211bb2 100644 --- a/protocols/v2/codec-sv2/src/decoder.rs +++ b/protocols/v2/codec-sv2/src/decoder.rs @@ -12,7 +12,7 @@ use framing_sv2::framing2::HandShakeFrame; #[cfg(feature = "noise_sv2")] use framing_sv2::header::{NOISE_HEADER_ENCRYPTED_SIZE, NOISE_HEADER_SIZE}; use framing_sv2::{ - framing2::{EitherFrame, Frame as F_, Sv2Frame}, + framing2::{Frame, Sv2Frame}, header::Header, }; #[cfg(feature = "noise_sv2")] @@ -36,7 +36,7 @@ use crate::State; #[cfg(feature = "noise_sv2")] pub type StandardNoiseDecoder = WithNoise; -pub type StandardEitherFrame = EitherFrame::Slice>; +pub type StandardFrame = Frame::Slice>; pub type StandardSv2Frame = Sv2Frame::Slice>; pub type StandardDecoder = WithoutNoise; @@ -51,7 +51,7 @@ pub struct WithNoise { #[cfg(feature = "noise_sv2")] impl<'a, T: Serialize + GetSize + Deserialize<'a>, B: IsBuffer + AeadBuffer> WithNoise { #[inline] - pub fn next_frame(&mut self, state: &mut State) -> Result> { + pub fn next_frame(&mut self, state: &mut State) -> Result> { match state { State::HandShake(_) => unreachable!(), State::NotInitialized(msg_len) => { @@ -97,10 +97,7 @@ impl<'a, T: Serialize + GetSize + Deserialize<'a>, B: IsBuffer + AeadBuffer> Wit } #[inline] - fn decode_noise_frame( - &mut self, - noise_codec: &mut NoiseCodec, - ) -> Result> { + fn decode_noise_frame(&mut self, noise_codec: &mut NoiseCodec) -> Result> { match ( IsBuffer::len(&self.noise_buffer), IsBuffer::len(&self.sv2_buffer), @@ -142,17 +139,17 @@ impl<'a, T: Serialize + GetSize + Deserialize<'a>, B: IsBuffer + AeadBuffer> Wit } self.sv2_buffer.danger_set_start(0); let src = self.sv2_buffer.get_data_owned(); - let frame = Sv2Frame::::from_bytes_unchecked(src); + let frame = Sv2Frame::::from_bytes(src)?; Ok(frame.into()) } } } - fn while_handshaking(&mut self) -> EitherFrame { + fn while_handshaking(&mut self) -> Frame { let src = self.noise_buffer.get_data_owned().as_mut().to_vec(); // below is inffalible as noise frame length has been already checked - let frame = HandShakeFrame::from_bytes_unchecked(src.into()); + let frame = HandShakeFrame::from_bytes(src.into()); frame.into() } @@ -203,7 +200,7 @@ impl WithoutNoise { 0 => { self.missing_b = Header::SIZE; let src = self.buffer.get_data_owned(); - let frame = Sv2Frame::::from_bytes_unchecked(src); + let frame = Sv2Frame::::from_bytes(src)?; Ok(frame) } _ => { diff --git a/protocols/v2/codec-sv2/src/encoder.rs b/protocols/v2/codec-sv2/src/encoder.rs index a123422176..d807d59c24 100644 --- a/protocols/v2/codec-sv2/src/encoder.rs +++ b/protocols/v2/codec-sv2/src/encoder.rs @@ -5,9 +5,9 @@ pub use const_sv2::{AEAD_MAC_LEN, SV2_FRAME_CHUNK_SIZE, SV2_FRAME_HEADER_SIZE}; #[cfg(feature = "noise_sv2")] use core::convert::TryInto; use core::marker::PhantomData; +use framing_sv2::framing2::Sv2Frame; #[cfg(feature = "noise_sv2")] -use framing_sv2::framing2::{EitherFrame, HandShakeFrame}; -use framing_sv2::framing2::{Frame as F_, Sv2Frame}; +use framing_sv2::framing2::{Frame, HandShakeFrame}; #[allow(unused_imports)] pub use framing_sv2::header::NOISE_HEADER_ENCRYPTED_SIZE; @@ -43,7 +43,7 @@ pub struct NoiseEncoder { } #[cfg(feature = "noise_sv2")] -type Item = EitherFrame; +type Item = Frame; #[cfg(feature = "noise_sv2")] impl NoiseEncoder { @@ -106,7 +106,7 @@ impl NoiseEncoder { error!("Error while encoding 2 frame - while_handshaking: {:?}", e); Error::FramingError(e) })?; - let payload = i.get_payload_when_handshaking(); + let payload = i.payload().to_vec(); let wrtbl = self.noise_buffer.get_writable(payload.len()); for (i, b) in payload.iter().enumerate() { wrtbl[i] = *b; diff --git a/protocols/v2/codec-sv2/src/lib.rs b/protocols/v2/codec-sv2/src/lib.rs index f5cbc013d2..291a9e3794 100644 --- a/protocols/v2/codec-sv2/src/lib.rs +++ b/protocols/v2/codec-sv2/src/lib.rs @@ -11,7 +11,7 @@ pub mod error; pub use error::{CError, Error, Result}; -pub use decoder::{StandardEitherFrame, StandardSv2Frame}; +pub use decoder::{StandardFrame, StandardSv2Frame}; pub use decoder::StandardDecoder; #[cfg(feature = "noise_sv2")] @@ -23,7 +23,7 @@ pub use encoder::NoiseEncoder; #[cfg(feature = "noise_sv2")] pub use framing_sv2::framing2::HandShakeFrame; -pub use framing_sv2::framing2::{Frame, Sv2Frame}; +pub use framing_sv2::framing2::Sv2Frame; #[cfg(feature = "noise_sv2")] pub use noise_sv2::{self, Initiator, NoiseCodec, Responder}; diff --git a/protocols/v2/framing-sv2/src/error.rs b/protocols/v2/framing-sv2/src/error.rs index 44b0c95cef..5b26cb7114 100644 --- a/protocols/v2/framing-sv2/src/error.rs +++ b/protocols/v2/framing-sv2/src/error.rs @@ -1,8 +1,5 @@ -// use crate::framing2::EitherFrame; use core::fmt; -// pub type FramingResult = core::result::Result; - #[derive(Debug, PartialEq, Eq)] pub enum Error { BinarySv2Error(binary_sv2::Error), diff --git a/protocols/v2/framing-sv2/src/framing2.rs b/protocols/v2/framing-sv2/src/framing2.rs index 692a22af4b..e1ea531f6f 100644 --- a/protocols/v2/framing-sv2/src/framing2.rs +++ b/protocols/v2/framing-sv2/src/framing2.rs @@ -6,78 +6,72 @@ use alloc::vec::Vec; use binary_sv2::{to_writer, GetSize, Serialize}; use core::convert::TryFrom; -const NOISE_MAX_LEN: usize = const_sv2::NOISE_FRAME_MAX_SIZE; - #[cfg(not(feature = "with_buffer_pool"))] type Slice = Vec; #[cfg(feature = "with_buffer_pool")] type Slice = buffer_sv2::Slice; -impl Sv2Frame { - /// Maps a `Sv2Frame` to `Sv2Frame` by applying `fun`, - /// which is assumed to be a closure that converts `A` to `C` - pub fn map(self, fun: fn(A) -> C) -> Sv2Frame { - let serialized = self.serialized; - let header = self.header; - let payload = self.payload.map(fun); - Sv2Frame { - header, - payload, - serialized, +/// Represents different types of frames that can be sent over the wire. +#[derive(Debug)] +pub enum Frame { + /// Abstraction for a Noise Handshake Frame Contains only a `Slice` payload with a fixed length + /// Only used during Noise Handshake process + HandShake(HandShakeFrame), + /// Abstraction for a SV2 Frame. + /// `T` represents the deserialized payload, `B` the serialized payload. + Sv2(Sv2Frame), +} + +impl + AsRef<[u8]>> Frame { + pub fn encoded_length(&self) -> usize { + match &self { + Self::HandShake(frame) => frame.encoded_length(), + Self::Sv2(frame) => frame.encoded_length(), } } } -pub trait Frame<'a, T: Serialize + GetSize>: Sized { - type Buffer: AsMut<[u8]>; - type Deserialized; - - /// Write the serialized `Frame` into `dst`. - fn serialize(self, dst: &mut [u8]) -> Result<(), Error>; +impl TryFrom> for HandShakeFrame { + type Error = Error; - /// Get the payload - fn payload(&'a mut self) -> &'a mut [u8]; + fn try_from(v: Frame) -> Result { + match v { + Frame::HandShake(frame) => Ok(frame), + Frame::Sv2(_) => Err(Error::ExpectedHandshakeFrame), + } + } +} - /// Returns `Some(self.header)` when the frame has a header (`Sv2Frame`), returns `None` where it doesn't (`HandShakeFrame`). - fn get_header(&self) -> Option; +impl TryFrom> for Sv2Frame { + type Error = Error; - /// Try to build a `Frame` from raw bytes. - /// Checks if the payload has the correct size (as stated in the `Header`). - /// Returns `Self` on success, or the number of the bytes needed to complete the frame - /// as an error. Nothing is assumed or checked about the correctness of the payload. - fn from_bytes(bytes: Self::Buffer) -> Result; + fn try_from(v: Frame) -> Result { + match v { + Frame::Sv2(frame) => Ok(frame), + Frame::HandShake(_) => Err(Error::ExpectedSv2Frame), + } + } +} - /// Builds a `Frame` from raw bytes. - /// Does not check if the payload has the correct size (as stated in the `Header`). - /// Nothing is assumed or checked about the correctness of the payload. - fn from_bytes_unchecked(bytes: Self::Buffer) -> Self; +impl From for Frame { + fn from(v: HandShakeFrame) -> Self { + Self::HandShake(v) + } +} - /// Helps to determine if the frame size encoded in a byte array correctly representing the size of the frame. - /// - Returns `0` if the byte slice is of the expected size according to the header. - /// - Returns a negative value if the byte slice is smaller than a Noise Frame header; this value - /// represents how many bytes are missing. - /// - Returns a positive value if the byte slice is longer than expected; this value - /// indicates the surplus of bytes beyond the expected size. - fn size_hint(bytes: &[u8]) -> isize; - - /// Returns the size of the `Frame` payload. - fn encoded_length(&self) -> usize; - - /// Try to build a `Frame` from a serializable payload. - /// Returns `Some(Self)` if the size of the payload fits in the frame, `None` otherwise. - fn from_message( - message: T, - message_type: u8, - extension_type: u16, - channel_msg: bool, - ) -> Option; +impl From> for Frame { + fn from(v: Sv2Frame) -> Self { + Self::Sv2(v) + } } /// Abstraction for a SV2 Frame. #[derive(Debug, Clone)] pub struct Sv2Frame { + /// Frame header header: Header, + /// Deserialized payload payload: Option, /// Serialized header + payload serialized: Option, @@ -96,17 +90,12 @@ impl HandShakeFrame { pub fn get_payload_when_handshaking(&self) -> Vec { self.payload[0..].to_vec() } -} - -impl<'a, T: Serialize + GetSize, B: AsMut<[u8]> + AsRef<[u8]>> Frame<'a, T> for Sv2Frame { - type Buffer = B; - type Deserialized = B; /// Write the serialized `Sv2Frame` into `dst`. /// This operation when called on an already serialized frame is very cheap. /// When called on a non serialized frame, it is not so cheap (because it serializes it). #[inline] - fn serialize(self, dst: &mut [u8]) -> Result<(), Error> { + pub fn serialize(self, dst: &mut [u8]) -> Result<(), Error> { if let Some(mut serialized) = self.serialized { dst.swap_with_slice(serialized.as_mut()); Ok(()) @@ -127,48 +116,24 @@ impl<'a, T: Serialize + GetSize, B: AsMut<[u8]> + AsRef<[u8]>> Frame<'a, T> for } } - /// `self` can be either serialized (`self.serialized` is `Some()`) or - /// deserialized (`self.serialized` is `None`, `self.payload` is `Some()`). - /// This function is only intended as a fast way to get a reference to an - /// already serialized payload. If the frame has not yet been - /// serialized, this function should never be used (it will panic). - fn payload(&'a mut self) -> &'a mut [u8] { - if let Some(serialized) = self.serialized.as_mut() { - &mut serialized.as_mut()[Header::SIZE..] - } else { - // panic here is the expected behaviour - panic!("Sv2Frame is not yet serialized.") - } - } - - /// `Sv2Frame` always returns `Some(self.header)`. - fn get_header(&self) -> Option { - Some(self.header) - } - - /// Tries to build a `Sv2Frame` from raw bytes, assuming they represent a serialized `Sv2Frame` frame (`Self.serialized`). - /// Returns a `Sv2Frame` on success, or the number of the bytes needed to complete the frame - /// as an error. `Self.serialized` is `Some`, but nothing is assumed or checked about the correctness of the payload. - #[inline] - fn from_bytes(mut bytes: Self::Buffer) -> Result { - let hint = Self::size_hint(bytes.as_mut()); - - if hint == 0 { - Ok(Self::from_bytes_unchecked(bytes)) + /// Returns the payload of the `Sv2Frame` as a byte slice. + /// + /// Will return None if the `Sv2Frame` is not yet serialized. + /// + /// You can serialize the frame by calling [`Sv2Frame::serialize`]. + /// + /// [`Sv2Frame::serialize`]: crate::framing2::Sv2Frame::serialize + pub fn payload(&self) -> Option<&[u8]> { + if let Some(serialized) = self.serialized.as_ref() { + Some(serialized.as_ref()[Header::SIZE..].as_ref()) } else { - Err(hint) + None // Sv2Frame is not yet serialized. } } - #[inline] - fn from_bytes_unchecked(mut bytes: Self::Buffer) -> Self { - // Unchecked function caller is supposed to already know that the passed bytes are valid - let header = Header::from_bytes(bytes.as_mut()).expect("Invalid header"); - Self { - header, - payload: None, - serialized: Some(bytes), - } + /// Returns the header of the `Sv2Frame`. + pub fn header(&self) -> crate::header::Header { + self.header } /// After parsing `bytes` into a `Header`, this function helps to determine if the `msg_length` @@ -179,7 +144,7 @@ impl<'a, T: Serialize + GetSize, B: AsMut<[u8]> + AsRef<[u8]>> Frame<'a, T> for /// - Returns a positive value if the byte slice is longer than expected; this value /// indicates the surplus of bytes beyond the expected size. #[inline] - fn size_hint(bytes: &[u8]) -> isize { + pub fn size_hint(bytes: &[u8]) -> isize { match Header::from_bytes(bytes) { Err(_) => { // Returns how many bytes are missing from the expected frame size @@ -200,7 +165,7 @@ impl<'a, T: Serialize + GetSize, B: AsMut<[u8]> + AsRef<[u8]>> Frame<'a, T> for /// If `Sv2Frame` is serialized, returns the length of `self.serialized`, /// otherwise, returns the length of `self.payload`. #[inline] - fn encoded_length(&self) -> usize { + pub fn encoded_length(&self) -> usize { if let Some(serialized) = self.serialized.as_ref() { serialized.as_ref().len() } else if let Some(payload) = self.payload.as_ref() { @@ -210,28 +175,32 @@ impl<'a, T: Serialize + GetSize, B: AsMut<[u8]> + AsRef<[u8]>> Frame<'a, T> for panic!("Impossible state") } } +} - /// Tries to build a `Sv2Frame` from a non-serialized payload. - /// Returns a `Sv2Frame` if the size of the payload fits in the frame, `None` otherwise. - fn from_message( - message: T, - message_type: u8, - extension_type: u16, - channel_msg: bool, - ) -> Option { - let extension_type = update_extension_type(extension_type, channel_msg); - let len = message.get_size() as u32; - Header::from_len(len, message_type, extension_type).map(|header| Self { +impl Sv2Frame { + /// Maps a `Sv2Frame` to `Sv2Frame` by applying `fun`, + /// which is assumed to be a closure that converts `A` to `C` + pub fn map(self, fun: fn(A) -> C) -> Sv2Frame { + let serialized = self.serialized; + let header = self.header; + let payload = self.payload.map(fun); + Sv2Frame { header, - payload: Some(message), - serialized: None, - }) + payload, + serialized, + } } } -impl<'a> Frame<'a, Slice> for HandShakeFrame { - type Buffer = Slice; - type Deserialized = &'a mut [u8]; +/// Abstraction for a Noise Handshake Frame. +/// +/// Contains only a `Slice` payload with a fixed length. +/// +/// Only used during Noise Handshake process. +#[derive(Debug)] +pub struct HandShakeFrame { + payload: Slice, +} /// Put the Noise Frame payload into `dst` #[inline] @@ -285,30 +254,9 @@ impl<'a> Frame<'a, Slice> for HandShakeFrame { } /// Returns the size of the `HandShakeFrame` payload. - #[inline] fn encoded_length(&self) -> usize { self.payload.len() } - - /// Tries to build a `HandShakeFrame` frame from a byte slice. - /// Returns a `HandShakeFrame` if the size of the payload fits in the frame, `None` otherwise. - /// This is quite inefficient, and should be used only to build `HandShakeFrames` - // TODO check if is used only to build `HandShakeFrames` - #[allow(clippy::useless_conversion)] - fn from_message( - message: Slice, - _message_type: u8, - _extension_type: u16, - _channel_msg: bool, - ) -> Option { - if message.len() <= NOISE_MAX_LEN { - Some(Self { - payload: message.into(), - }) - } else { - None - } - } } /// Returns a `HandShakeFrame` from a generic byte array @@ -335,66 +283,19 @@ fn update_extension_type(extension_type: u16, channel_msg: bool) -> u16 { } } -/// A wrapper to be used in a context we need a generic reference to a frame -/// but it doesn't matter which kind of frame it is (`Sv2Frame` or `HandShakeFrame`) -#[derive(Debug)] -pub enum EitherFrame { - HandShake(HandShakeFrame), - Sv2(Sv2Frame), -} - -impl + AsRef<[u8]>> EitherFrame { - pub fn encoded_length(&self) -> usize { - match &self { - Self::HandShake(frame) => frame.encoded_length(), - Self::Sv2(frame) => frame.encoded_length(), - } - } -} - -impl TryFrom> for HandShakeFrame { - type Error = Error; - - fn try_from(v: EitherFrame) -> Result { - match v { - EitherFrame::HandShake(frame) => Ok(frame), - EitherFrame::Sv2(_) => Err(Error::ExpectedHandshakeFrame), - } - } -} - -impl TryFrom> for Sv2Frame { - type Error = Error; - - fn try_from(v: EitherFrame) -> Result { - match v { - EitherFrame::Sv2(frame) => Ok(frame), - EitherFrame::HandShake(_) => Err(Error::ExpectedSv2Frame), - } - } -} - -impl From for EitherFrame { - fn from(v: HandShakeFrame) -> Self { - Self::HandShake(v) - } -} - -impl From> for EitherFrame { - fn from(v: Sv2Frame) -> Self { - Self::Sv2(v) - } -} - #[cfg(test)] -use binary_sv2::binary_codec_sv2; - -#[cfg(test)] -#[derive(Serialize)] -struct T {} - -#[test] -fn test_size_hint() { - let h = Sv2Frame::>::size_hint(&[0, 128, 30, 46, 0, 0][..]); - assert!(h == 46); +mod tests { + use crate::framing2::Sv2Frame; + use alloc::vec::Vec; + use binary_sv2::{binary_codec_sv2, Serialize}; + + #[cfg(test)] + #[derive(Serialize)] + struct T {} + + #[test] + fn test_size_hint() { + let h = Sv2Frame::>::size_hint(&[0, 128, 30, 46, 0, 0][..]); + assert!(h == 46); + } } diff --git a/protocols/v2/framing-sv2/src/lib.rs b/protocols/v2/framing-sv2/src/lib.rs index 34fe8708b6..2b540a4bf7 100644 --- a/protocols/v2/framing-sv2/src/lib.rs +++ b/protocols/v2/framing-sv2/src/lib.rs @@ -5,12 +5,15 @@ //! //! The message framing is outlined below ([according to SV2 specs](https://stratumprotocol.org/specification/03-Protocol-Overview/#32-framing)): //! -//! | Protocol Type | Byte Length | Description | -//! |----------------|-------------|-------------| -//! | `extension_type` | `U16` | Unique identifier of the extension describing this protocol message.

Most significant bit (i.e.bit `15`, `0`-indexed, aka `channel_msg`) indicates a message which is specific to a channel, whereas if the most significant bit is unset, the message is to be interpreted by the immediate receiving device.

Note that the `channel_msg` bit is ignored in the extension lookup, i.e.an `extension_type` of `0x8ABC` is for the same "extension" as `0x0ABC`.

If the `channel_msg` bit is set, the first four bytes of the payload field is a `U32` representing the `channel_id` this message is destined for (these bytes are repeated in the message framing descriptions below).

Note that for the Job Declaration and Template Distribution Protocols the `channel_msg` bit is always unset. | -//! | `msg_type` | `U8` | Unique identifier of the extension describing this protocol message. | -//! | `msg_length` | `U24` | Length of the protocol message, not including this header. | -//! | `payload` | `BYTES` | Message-specific payload of length `msg_length`. If the MSB in `extension_type` (the `channel_msg` bit) is set the first four bytes are defined as a `U32` `"channel_id"`, though this definition is repeated in the message definitions below and these 4 bytes are included in `msg_length`. | +//! | Protocol Type | Byte Length | Description | +//! |------------------|-------------|-------------| +//! | `extension_type` | `U16` | Unique identifier of the extension describing this protocol message.

Most significant bit (i.e.bit `15`, `0`-indexed, aka `channel_msg`) indicates a message which is specific to a channel, whereas if the most significant bit is unset, the message is to be interpreted by the immediate receiving device.

Note that the `channel_msg` bit is ignored in the extension lookup, i.e.an `extension_type` of `0x8ABC` is for the same "extension" as `0x0ABC`.

If the `channel_msg` bit is set, the first four bytes of the payload field is a `U32` representing the `channel_id` this message is destined for (these bytes are repeated in the message framing descriptions below).

Note that for the Job Declaration and Template Distribution Protocols the `channel_msg` bit is always unset. | +//! +//! | `msg_type` | `U8` | Unique identifier of the extension describing this protocol message. | +//! +//! | `msg_length` | `U24` | Length of the protocol message, not including this header. | +//! +//! | `payload` | `BYTES` | Message-specific payload of length `msg_length`. If the MSB in `extension_type` (the `channel_msg` bit) is set the first four bytes are defined as a `U32` `"channel_id"`, though this definition is repeated in the message definitions below and these 4 bytes are included in `msg_length`. | //! //! # Features //! This crate can be built with the following features: diff --git a/protocols/v2/roles-logic-sv2/src/parsers.rs b/protocols/v2/roles-logic-sv2/src/parsers.rs index 6eaf5e016c..4d80387c91 100644 --- a/protocols/v2/roles-logic-sv2/src/parsers.rs +++ b/protocols/v2/roles-logic-sv2/src/parsers.rs @@ -13,7 +13,7 @@ use binary_sv2::GetSize; use binary_sv2::{from_bytes, Deserialize}; -use framing_sv2::framing2::{Frame, Sv2Frame}; +use framing_sv2::framing2::Sv2Frame; use const_sv2::{ CHANNEL_BIT_ALLOCATE_MINING_JOB_TOKEN, CHANNEL_BIT_ALLOCATE_MINING_JOB_TOKEN_SUCCESS, diff --git a/protocols/v2/sv2-ffi/src/lib.rs b/protocols/v2/sv2-ffi/src/lib.rs index 346d497b91..b7993f6e9f 100644 --- a/protocols/v2/sv2-ffi/src/lib.rs +++ b/protocols/v2/sv2-ffi/src/lib.rs @@ -4,7 +4,7 @@ use std::{ fmt::{Display, Formatter}, }; -use codec_sv2::{Encoder, Frame, StandardDecoder, StandardSv2Frame}; +use codec_sv2::{Encoder, StandardDecoder, StandardSv2Frame}; use common_messages_sv2::{ CSetupConnection, CSetupConnectionError, ChannelEndpointChanged, SetupConnection, SetupConnectionError, SetupConnectionSuccess, @@ -464,17 +464,17 @@ pub extern "C" fn next_frame(decoder: *mut DecoderWrapper) -> CResult { - let msg_type = match f.get_header() { - Some(header) => header.msg_type(), + Ok(f) => { + let msg_type = f.header(); + let payload = match f.payload() { + Some(payload) => payload, None => return CResult::Err(Sv2Error::InvalidSv2Frame), }; - let payload = f.payload(); let len = payload.len(); - let ptr = payload.as_mut_ptr(); + let ptr = payload.to_owned().as_mut_ptr(); let payload = unsafe { std::slice::from_raw_parts_mut(ptr, len) }; Box::into_raw(decoder); - (msg_type, payload) + (msg_type.msg_type(), payload) .try_into() .map(|x: Sv2Message| x.into()) .map_err(|_| Sv2Error::Unknown) @@ -760,8 +760,10 @@ mod tests { let mut decoded = decoder.next_frame().unwrap(); - let msg_type = decoded.get_header().unwrap().msg_type(); - let payload = decoded.payload(); + let msg_type = decoded.header().msg_type(); + let payload = decoded.payload().unwrap(); + let mut payload = payload.to_owned(); + let payload = payload.as_mut(); let decoded_message: Sv2Message = (msg_type, payload).try_into().unwrap(); let decoded_message = match decoded_message { Sv2Message::CoinbaseOutputDataSize(m) => m, @@ -812,8 +814,10 @@ mod tests { let mut decoded = decoder.next_frame().unwrap(); // Extract payload of the frame which is the NewTemplate message - let msg_type = decoded.get_header().unwrap().msg_type(); - let payload = decoded.payload(); + let msg_type = decoded.header().msg_type(); + let payload = decoded.payload().unwrap(); + let mut payload = payload.to_owned(); + let payload = payload.as_mut(); let decoded_message: Sv2Message = (msg_type, payload).try_into().unwrap(); let decoded_message = match decoded_message { Sv2Message::NewTemplate(m) => m, @@ -860,8 +864,10 @@ mod tests { let mut decoded = decoder.next_frame().unwrap(); - let msg_type = decoded.get_header().unwrap().msg_type(); - let payload = decoded.payload(); + let msg_type = decoded.header().msg_type(); + let payload = decoded.payload().unwrap(); + let mut payload = payload.to_owned(); + let payload = payload.as_mut(); let decoded_message: Sv2Message = (msg_type, payload).try_into().unwrap(); let decoded_message = match decoded_message { Sv2Message::RequestTransactionData(m) => m, @@ -910,8 +916,10 @@ mod tests { let mut decoded = decoder.next_frame().unwrap(); - let msg_type = decoded.get_header().unwrap().msg_type(); - let payload = decoded.payload(); + let msg_type = decoded.header().msg_type(); + let payload = decoded.payload().unwrap(); + let mut payload = payload.to_owned(); + let payload = payload.as_mut(); let decoded_message: Sv2Message = (msg_type, payload).try_into().unwrap(); let decoded_message = match decoded_message { Sv2Message::RequestTransactionDataError(m) => m, @@ -960,8 +968,10 @@ mod tests { let mut decoded = decoder.next_frame().unwrap(); - let msg_type = decoded.get_header().unwrap().msg_type(); - let payload = decoded.payload(); + let msg_type = decoded.header().msg_type(); + let payload = decoded.payload().unwrap(); + let mut payload = payload.to_owned(); + let payload = payload.as_mut(); let decoded_message: Sv2Message = (msg_type, payload).try_into().unwrap(); let decoded_message = match decoded_message { Sv2Message::RequestTransactionDataSuccess(m) => m, @@ -1005,8 +1015,10 @@ mod tests { let mut decoded = decoder.next_frame().unwrap(); - let msg_type = decoded.get_header().unwrap().msg_type(); - let payload = decoded.payload(); + let msg_type = decoded.header().msg_type(); + let payload = decoded.payload().unwrap(); + let mut payload = payload.to_owned(); + let payload = payload.as_mut(); let decoded_message: Sv2Message = (msg_type, payload).try_into().unwrap(); let decoded_message = match decoded_message { Sv2Message::SetNewPrevHash(m) => m, @@ -1050,8 +1062,10 @@ mod tests { let mut decoded = decoder.next_frame().unwrap(); - let msg_type = decoded.get_header().unwrap().msg_type(); - let payload = decoded.payload(); + let msg_type = decoded.header().msg_type(); + let payload = decoded.payload().unwrap(); + let mut payload = payload.to_owned(); + let payload = payload.as_mut(); let decoded_message: Sv2Message = (msg_type, payload).try_into().unwrap(); let decoded_message = match decoded_message { Sv2Message::SubmitSolution(m) => m, @@ -1108,8 +1122,10 @@ mod tests { let mut decoded = decoder.next_frame().unwrap(); - let msg_type = decoded.get_header().unwrap().msg_type(); - let payload = decoded.payload(); + let msg_type = decoded.header().msg_type(); + let payload = decoded.payload().unwrap(); + let mut payload = payload.to_owned(); + let payload = payload.as_mut(); let decoded_message: Sv2Message = (msg_type, payload).try_into().unwrap(); let decoded_message = match decoded_message { Sv2Message::ChannelEndpointChanged(m) => m, @@ -1144,8 +1160,10 @@ mod tests { let mut decoded = decoder.next_frame().unwrap(); - let msg_type = decoded.get_header().unwrap().msg_type(); - let payload = decoded.payload(); + let msg_type = decoded.header().msg_type(); + let payload = decoded.payload().unwrap(); + let mut payload = payload.to_owned(); + let payload = payload.as_mut(); let decoded_message: Sv2Message = (msg_type, payload).try_into().unwrap(); let decoded_message = match decoded_message { Sv2Message::SetupConnection(m) => m, @@ -1193,8 +1211,10 @@ mod tests { let mut decoded = decoder.next_frame().unwrap(); - let msg_type = decoded.get_header().unwrap().msg_type(); - let payload = decoded.payload(); + let msg_type = decoded.header().msg_type(); + let payload = decoded.payload().unwrap(); + let mut payload = payload.to_owned(); + let payload = payload.as_mut(); let decoded_message: Sv2Message = (msg_type, payload).try_into().unwrap(); let decoded_message = match decoded_message { Sv2Message::SetupConnectionError(m) => m, @@ -1242,8 +1262,10 @@ mod tests { let mut decoded = decoder.next_frame().unwrap(); - let msg_type = decoded.get_header().unwrap().msg_type(); - let payload = decoded.payload(); + let msg_type = decoded.header().msg_type(); + let payload = decoded.payload().unwrap(); + let mut payload = payload.to_owned(); + let payload = payload.as_mut(); let decoded_message: Sv2Message = (msg_type, payload).try_into().unwrap(); let decoded_message = match decoded_message { Sv2Message::SetupConnectionSuccess(m) => m, diff --git a/roles/jd-client/src/lib/downstream.rs b/roles/jd-client/src/lib/downstream.rs index ddc6e5bd0f..3bef29352d 100644 --- a/roles/jd-client/src/lib/downstream.rs +++ b/roles/jd-client/src/lib/downstream.rs @@ -21,14 +21,13 @@ use roles_logic_sv2::{ }; use tracing::{debug, error, info, warn}; -use codec_sv2::{Frame, HandshakeRole, Responder, StandardEitherFrame, StandardSv2Frame}; +use codec_sv2::{HandshakeRole, Responder, StandardFrame, StandardSv2Frame}; use key_utils::{Secp256k1PublicKey, Secp256k1SecretKey}; use stratum_common::bitcoin::{consensus::Decodable, TxOut}; pub type Message = MiningDeviceMessages<'static>; pub type StdFrame = StandardSv2Frame; -pub type EitherFrame = StandardEitherFrame; /// 1 to 1 connection with a downstream node that implement the mining (sub)protocol can be either /// a mining device or a downstream proxy. @@ -36,8 +35,8 @@ pub type EitherFrame = StandardEitherFrame; /// downstream do no make much sense. #[derive(Debug)] pub struct DownstreamMiningNode { - receiver: Receiver, - sender: Sender, + receiver: Receiver>, + sender: Sender>, pub status: DownstreamMiningNodeStatus, pub prev_job_id: Option, solution_sender: Sender>, @@ -153,8 +152,8 @@ use std::sync::Arc; impl DownstreamMiningNode { #[allow(clippy::too_many_arguments)] pub fn new( - receiver: Receiver, - sender: Sender, + receiver: Receiver>, + sender: Sender>, upstream: Option>>, solution_sender: Sender>, withhold: bool, @@ -251,19 +250,21 @@ impl DownstreamMiningNode { } /// Parse the received message and relay it to the right upstream - pub async fn next(self_mutex: &Arc>, mut incoming: StdFrame) { - let message_type = incoming.get_header().unwrap().msg_type(); + pub async fn next(self_mutex: &Arc>, incoming: StdFrame) { + let message_type = incoming.header().msg_type(); let payload = incoming.payload(); - - let routing_logic = roles_logic_sv2::routing_logic::MiningRoutingLogic::None; - - let next_message_to_send = ParseDownstreamMiningMessages::handle_message_mining( - self_mutex.clone(), - message_type, - payload, - routing_logic, - ); - Self::match_send_to(self_mutex.clone(), next_message_to_send, Some(incoming)).await; + if let Some(payload) = payload { + let mut payload = payload.to_owned(); + let payload = payload.as_mut(); + let routing_logic = roles_logic_sv2::routing_logic::MiningRoutingLogic::None; + let next_message_to_send = ParseDownstreamMiningMessages::handle_message_mining( + self_mutex.clone(), + message_type, + payload, + routing_logic, + ); + Self::match_send_to(self_mutex.clone(), next_message_to_send, Some(incoming)).await; + } } #[async_recursion::async_recursion] @@ -696,9 +697,11 @@ pub async fn listen_for_downstream_mining( jd, ); - let mut incoming: StdFrame = node.receiver.recv().await.unwrap().try_into().unwrap(); - let message_type = incoming.get_header().unwrap().msg_type(); - let payload = incoming.payload(); + let incoming: StdFrame = node.receiver.recv().await.unwrap().try_into().unwrap(); + let message_type = incoming.header().msg_type(); + let payload = incoming.payload().expect("No payload"); + let mut payload = payload.to_owned(); + let payload = payload.as_mut(); let routing_logic = roles_logic_sv2::routing_logic::CommonRoutingLogic::None; let node = Arc::new(Mutex::new(node)); if let Some(upstream) = upstream { diff --git a/roles/jd-client/src/lib/job_declarator/mod.rs b/roles/jd-client/src/lib/job_declarator/mod.rs index abaf852ca2..12d763e0dc 100644 --- a/roles/jd-client/src/lib/job_declarator/mod.rs +++ b/roles/jd-client/src/lib/job_declarator/mod.rs @@ -1,7 +1,7 @@ pub mod message_handler; use async_channel::{Receiver, Sender}; use binary_sv2::{Seq0255, Seq064K, B016M, B064K, U256}; -use codec_sv2::{HandshakeRole, Initiator, StandardEitherFrame, StandardSv2Frame}; +use codec_sv2::{HandshakeRole, Initiator, StandardFrame, StandardSv2Frame}; use network_helpers_sv2::noise_connection_tokio::Connection; use roles_logic_sv2::{ handlers::SendTo_, @@ -17,7 +17,6 @@ use tokio::task::AbortHandle; use tracing::{error, info}; use async_recursion::async_recursion; -use codec_sv2::Frame; use nohash_hasher::BuildNoHashHasher; use roles_logic_sv2::{ handlers::job_declaration::ParseServerJobDeclarationMessages, @@ -49,8 +48,8 @@ pub struct LastDeclareJob { #[derive(Debug)] pub struct JobDeclarator { - receiver: Receiver>>, - sender: Sender>>, + receiver: Receiver>>, + sender: Sender>>, allocated_tokens: Vec>, req_ids: Id, min_extranonce_size: u16, @@ -277,9 +276,15 @@ impl JobDeclarator { tokio::task::spawn(async move { let receiver = self_mutex.safe_lock(|d| d.receiver.clone()).unwrap(); loop { - let mut incoming: StdFrame = receiver.recv().await.unwrap().try_into().unwrap(); - let message_type = incoming.get_header().unwrap().msg_type(); + let incoming: StdFrame = receiver.recv().await.unwrap().try_into().unwrap(); + let message_type = incoming.header().msg_type(); let payload = incoming.payload(); + let payload = match payload { + Some(p) => p, + None => return, + }; + let mut payload = payload.to_owned(); + let payload = payload.as_mut(); let next_message_to_send = ParseServerJobDeclarationMessages::handle_message_job_declaration( self_mutex.clone(), diff --git a/roles/jd-client/src/lib/job_declarator/setup_connection.rs b/roles/jd-client/src/lib/job_declarator/setup_connection.rs index 063592c403..062ae5c939 100644 --- a/roles/jd-client/src/lib/job_declarator/setup_connection.rs +++ b/roles/jd-client/src/lib/job_declarator/setup_connection.rs @@ -1,5 +1,5 @@ use async_channel::{Receiver, Sender}; -use codec_sv2::{Frame, StandardEitherFrame, StandardSv2Frame}; +use codec_sv2::{StandardFrame, StandardSv2Frame}; use roles_logic_sv2::{ common_messages_sv2::{Protocol, SetupConnection}, handlers::common::{ParseUpstreamCommonMessages, SendTo}, @@ -10,7 +10,6 @@ use roles_logic_sv2::{ use std::{convert::TryInto, net::SocketAddr, sync::Arc}; pub type Message = PoolMessages<'static>; pub type StdFrame = StandardSv2Frame; -pub type EitherFrame = StandardEitherFrame; pub struct SetupConnectionHandler {} impl SetupConnectionHandler { @@ -42,8 +41,8 @@ impl SetupConnectionHandler { } pub async fn setup( - receiver: &mut Receiver, - sender: &mut Sender, + receiver: &mut Receiver>, + sender: &mut Sender>, proxy_address: SocketAddr, ) -> Result<(), ()> { let setup_connection = Self::get_setup_connection_message(proxy_address); @@ -55,10 +54,15 @@ impl SetupConnectionHandler { sender.send(sv2_frame).await.map_err(|_| ())?; - let mut incoming: StdFrame = receiver.recv().await.unwrap().try_into().unwrap(); + let incoming: StdFrame = receiver.recv().await.unwrap().try_into().unwrap(); - let message_type = incoming.get_header().unwrap().msg_type(); - let payload = incoming.payload(); + let message_type = incoming.header().msg_type(); + let payload = match incoming.payload() { + Some(payload) => payload, + None => return Err(()), + }; + let mut payload = payload.to_owned(); + let payload = payload.as_mut(); ParseUpstreamCommonMessages::handle_message_common( Arc::new(Mutex::new(SetupConnectionHandler {})), message_type, diff --git a/roles/jd-client/src/lib/template_receiver/mod.rs b/roles/jd-client/src/lib/template_receiver/mod.rs index 02d3d04972..ce486a07e0 100644 --- a/roles/jd-client/src/lib/template_receiver/mod.rs +++ b/roles/jd-client/src/lib/template_receiver/mod.rs @@ -1,6 +1,6 @@ use super::{job_declarator::JobDeclarator, status, PoolChangerTrigger}; use async_channel::{Receiver, Sender}; -use codec_sv2::{Frame, HandshakeRole, Initiator, StandardEitherFrame, StandardSv2Frame}; +use codec_sv2::{HandshakeRole, Initiator, StandardFrame, StandardSv2Frame}; use error_handling::handle_result; use key_utils::Secp256k1PublicKey; use network_helpers_sv2::noise_connection_tokio::Connection; @@ -25,11 +25,10 @@ mod setup_connection; pub type SendTo = SendTo_, ()>; pub type Message = PoolMessages<'static>; pub type StdFrame = StandardSv2Frame; -pub type EitherFrame = StandardEitherFrame; pub struct TemplateRx { - receiver: Receiver, - sender: Sender, + receiver: Receiver>, + sender: Sender>, /// Allows the tp recv to communicate back to the main thread any status updates /// that would interest the main thread for error handling tx_status: status::Sender, @@ -184,10 +183,12 @@ impl TemplateRx { .safe_lock(|s| s.receiver.clone()) .unwrap(); let received = handle_result!(tx_status.clone(), receiver.recv().await); - let mut frame: StdFrame = - handle_result!(tx_status.clone(), received.try_into()); - let message_type = frame.get_header().unwrap().msg_type(); - let payload = frame.payload(); + let frame: StdFrame = handle_result!(tx_status.clone(), received.try_into()); + let message_type = frame.header().msg_type(); + // Need to add error handling across this file to fix unwraps + let payload = frame.payload().unwrap(); + let mut payload = payload.to_owned(); + let payload = payload.as_mut(); let next_message_to_send = ParseServerTemplateDistributionMessages::handle_message_template_distribution( @@ -273,7 +274,7 @@ impl TemplateRx { _ => { error!("{:?}", frame); error!("{:?}", frame.payload()); - error!("{:?}", frame.get_header()); + error!("{:?}", frame.header()); std::process::exit(1); } } @@ -282,14 +283,14 @@ impl TemplateRx { error!("{:?}", m); error!("{:?}", frame); error!("{:?}", frame.payload()); - error!("{:?}", frame.get_header()); + error!("{:?}", frame.header()); std::process::exit(1); } Err(e) => { error!("{:?}", e); error!("{:?}", frame); error!("{:?}", frame.payload()); - error!("{:?}", frame.get_header()); + error!("{:?}", frame.header()); std::process::exit(1); } } diff --git a/roles/jd-client/src/lib/template_receiver/setup_connection.rs b/roles/jd-client/src/lib/template_receiver/setup_connection.rs index 45f48a2f44..13dc4468b0 100644 --- a/roles/jd-client/src/lib/template_receiver/setup_connection.rs +++ b/roles/jd-client/src/lib/template_receiver/setup_connection.rs @@ -1,5 +1,5 @@ use async_channel::{Receiver, Sender}; -use codec_sv2::{Frame, StandardEitherFrame, StandardSv2Frame}; +use codec_sv2::{StandardFrame, StandardSv2Frame}; use roles_logic_sv2::{ common_messages_sv2::{Protocol, SetupConnection}, handlers::common::{ParseUpstreamCommonMessages, SendTo}, @@ -10,7 +10,6 @@ use roles_logic_sv2::{ use std::{convert::TryInto, net::SocketAddr, sync::Arc}; pub type Message = PoolMessages<'static>; pub type StdFrame = StandardSv2Frame; -pub type EitherFrame = StandardEitherFrame; pub struct SetupConnectionHandler {} impl SetupConnectionHandler { @@ -35,8 +34,8 @@ impl SetupConnectionHandler { } pub async fn setup( - receiver: &mut Receiver, - sender: &mut Sender, + receiver: &mut Receiver>, + sender: &mut Sender>, address: SocketAddr, ) -> Result<(), ()> { let setup_connection = Self::get_setup_connection_message(address); @@ -47,14 +46,20 @@ impl SetupConnectionHandler { let sv2_frame = sv2_frame.into(); sender.send(sv2_frame).await.map_err(|_| ())?; - let mut incoming: StdFrame = receiver + let incoming: StdFrame = receiver .recv() .await .expect("Connection to TP closed!") .try_into() .expect("Failed to parse incoming SetupConnectionResponse"); - let message_type = incoming.get_header().unwrap().msg_type(); + let message_type = incoming.header().msg_type(); let payload = incoming.payload(); + let payload = match payload { + Some(p) => p, + None => return Err(()), + }; + let mut payload = payload.to_owned(); + let payload = payload.as_mut(); ParseUpstreamCommonMessages::handle_message_common( Arc::new(Mutex::new(SetupConnectionHandler {})), message_type, diff --git a/roles/jd-client/src/lib/upstream_sv2/mod.rs b/roles/jd-client/src/lib/upstream_sv2/mod.rs index 4209e686ae..7a0f190d23 100644 --- a/roles/jd-client/src/lib/upstream_sv2/mod.rs +++ b/roles/jd-client/src/lib/upstream_sv2/mod.rs @@ -1,4 +1,4 @@ -use codec_sv2::{StandardEitherFrame, StandardSv2Frame}; +use codec_sv2::{StandardFrame, StandardSv2Frame}; use roles_logic_sv2::parsers::PoolMessages; pub mod upstream; @@ -6,7 +6,7 @@ pub use upstream::Upstream; pub type Message = PoolMessages<'static>; pub type StdFrame = StandardSv2Frame; -pub type EitherFrame = StandardEitherFrame; +pub type EitherFrame = StandardFrame; #[derive(Clone, Copy, Debug)] pub struct Sv2MiningConnection { diff --git a/roles/jd-client/src/lib/upstream_sv2/upstream.rs b/roles/jd-client/src/lib/upstream_sv2/upstream.rs index 857cbd3087..78e0314a69 100644 --- a/roles/jd-client/src/lib/upstream_sv2/upstream.rs +++ b/roles/jd-client/src/lib/upstream_sv2/upstream.rs @@ -11,7 +11,7 @@ use super::super::{ }; use async_channel::{Receiver, Sender}; use binary_sv2::{Seq0255, U256}; -use codec_sv2::{Frame, HandshakeRole, Initiator}; +use codec_sv2::{HandshakeRole, Initiator}; use error_handling::handle_result; use key_utils::Secp256k1PublicKey; use network_helpers_sv2::noise_connection_tokio::Connection; @@ -220,7 +220,7 @@ impl Upstream { // Wait for the SV2 Upstream to respond with either a `SetupConnectionSuccess` or a // `SetupConnectionError` inside a SV2 binary message frame - let mut incoming: StdFrame = match recv.recv().await { + let incoming: StdFrame = match recv.recv().await { Ok(frame) => frame.try_into()?, Err(e) => { error!("Upstream connection closed: {}", e); @@ -230,14 +230,12 @@ impl Upstream { } }; - // Gets the binary frame message type from the message header - let message_type = if let Some(header) = incoming.get_header() { - header.msg_type() - } else { - return Err(framing_sv2::Error::ExpectedHandshakeFrame.into()); - }; - // Gets the message payload - let payload = incoming.payload(); + let message_type = incoming.header().msg_type(); + let payload = incoming + .payload() + .ok_or(framing_sv2::Error::ExpectedHandshakeFrame)?; + let mut payload = payload.to_owned(); + let payload = payload.as_mut(); // Handle the incoming message (should be either `SetupConnectionSuccess` or // `SetupConnectionError`) @@ -321,19 +319,19 @@ impl Upstream { loop { // Waiting to receive a message from the SV2 Upstream role let incoming = handle_result!(tx_status, recv.recv().await); - let mut incoming: StdFrame = handle_result!(tx_status, incoming.try_into()); + let incoming: StdFrame = handle_result!(tx_status, incoming.try_into()); // On message receive, get the message type from the message header and get the // message payload - let message_type = - incoming - .get_header() - .ok_or(super::super::error::Error::FramingSv2( - framing_sv2::Error::ExpectedSv2Frame, - )); + let message_type = incoming.header().msg_type(); - let message_type = handle_result!(tx_status, message_type).msg_type(); - - let payload = incoming.payload(); + let payload = match incoming.payload() { + Some(payload) => payload, + None => { + continue; + } + }; + let mut payload = payload.to_owned(); + let payload = payload.as_mut(); // Since this is not communicating with an SV2 proxy, but instead a custom SV1 // proxy where the routing logic is handled via the `Upstream`'s communication diff --git a/roles/jd-server/src/lib/job_declarator/mod.rs b/roles/jd-server/src/lib/job_declarator/mod.rs index 56d56223cc..b562ed57ec 100644 --- a/roles/jd-server/src/lib/job_declarator/mod.rs +++ b/roles/jd-server/src/lib/job_declarator/mod.rs @@ -2,7 +2,7 @@ pub mod message_handler; use super::{error::JdsError, mempool::JDsMempool, status, Configuration, EitherFrame, StdFrame}; use async_channel::{Receiver, Sender}; use binary_sv2::{B0255, U256}; -use codec_sv2::{Frame, HandshakeRole, Responder}; +use codec_sv2::{HandshakeRole, Responder}; use error_handling::handle_result; use key_utils::{Secp256k1PublicKey, Secp256k1SecretKey, SignatureService}; use network_helpers_sv2::noise_connection_tokio::Connection; @@ -200,13 +200,16 @@ impl JobDeclaratorDownstream { loop { match recv.recv().await { Ok(message) => { - let mut frame: StdFrame = handle_result!(tx_status, message.try_into()); - let header = frame - .get_header() - .ok_or_else(|| JdsError::Custom(String::from("No header set"))); - let header = handle_result!(tx_status, header); - let message_type = header.msg_type(); - let payload = frame.payload(); + let frame: StdFrame = handle_result!(tx_status, message.try_into()); + let message_type = frame.header().msg_type(); + let payload = match frame.payload() { + Some(payload) => payload, + None => { + continue; + } + }; + let mut payload = payload.to_owned(); + let payload = payload.as_mut(); let next_message_to_send = ParseClientJobDeclarationMessages::handle_message_job_declaration( self_mutex.clone(), diff --git a/roles/jd-server/src/lib/mod.rs b/roles/jd-server/src/lib/mod.rs index a76c80cf1f..b30ad917f4 100644 --- a/roles/jd-server/src/lib/mod.rs +++ b/roles/jd-server/src/lib/mod.rs @@ -3,7 +3,7 @@ pub mod job_declarator; pub mod mempool; pub mod status; -use codec_sv2::{StandardEitherFrame, StandardSv2Frame}; +use codec_sv2::{StandardFrame, StandardSv2Frame}; use key_utils::{Secp256k1PublicKey, Secp256k1SecretKey}; use roles_logic_sv2::{ errors::Error, parsers::PoolMessages as JdsMessages, utils::CoinbaseOutput as CoinbaseOutput_, @@ -17,7 +17,7 @@ use stratum_common::bitcoin::{Script, TxOut}; pub type Message = JdsMessages<'static>; pub type StdFrame = StandardSv2Frame; -pub type EitherFrame = StandardEitherFrame; +pub type EitherFrame = StandardFrame; pub fn get_coinbase_output(config: &Configuration) -> Result, Error> { let mut result = Vec::new(); diff --git a/roles/mining-proxy/src/lib/downstream_mining.rs b/roles/mining-proxy/src/lib/downstream_mining.rs index d4e9bcb174..e08854924b 100644 --- a/roles/mining-proxy/src/lib/downstream_mining.rs +++ b/roles/mining-proxy/src/lib/downstream_mining.rs @@ -17,11 +17,10 @@ use roles_logic_sv2::{ }; use tracing::info; -use codec_sv2::{Frame, StandardEitherFrame, StandardSv2Frame}; +use codec_sv2::{StandardFrame, StandardSv2Frame}; pub type Message = MiningDeviceMessages<'static>; pub type StdFrame = StandardSv2Frame; -pub type EitherFrame = StandardEitherFrame; /// 1 to 1 connection with a downstream node that implement the mining (sub)protocol can be either /// a mining device or a downstream proxy. @@ -30,8 +29,8 @@ pub type EitherFrame = StandardEitherFrame; #[derive(Debug)] pub struct DownstreamMiningNode { id: u32, - receiver: Receiver, - sender: Sender, + receiver: Receiver>, + sender: Sender>, pub status: DownstreamMiningNodeStatus, pub prev_job_id: Option, upstream: Option>>, @@ -181,7 +180,11 @@ impl DownstreamMiningNode { self.status.add_extended_from_non_hom_for_up_extended(id); } - pub fn new(receiver: Receiver, sender: Sender, id: u32) -> Self { + pub fn new( + receiver: Receiver>, + sender: Sender>, + id: u32, + ) -> Self { Self { receiver, sender, @@ -227,10 +230,15 @@ impl DownstreamMiningNode { } /// Parse the received message and relay it to the right upstream - pub async fn next(self_mutex: Arc>, mut incoming: StdFrame) { - let message_type = incoming.get_header().unwrap().msg_type(); + pub async fn next(self_mutex: Arc>, incoming: StdFrame) { + let message_type = incoming.header().msg_type(); let payload = incoming.payload(); - + let payload = match payload { + Some(p) => p, + None => panic!(), + }; + let mut payload = payload.to_owned(); + let payload = payload.as_mut(); let routing_logic = super::get_routing_logic(); let next_message_to_send = ParseDownstreamMiningMessages::handle_message_mining( @@ -493,32 +501,40 @@ pub async fn listen_for_downstream_mining(address: SocketAddr) { let mut ids = roles_logic_sv2::utils::Id::new(); while let Ok((stream, _)) = listner.accept().await { - let (receiver, sender): (Receiver, Sender) = - PlainConnection::new(stream).await; + let (receiver, sender): ( + Receiver>, + Sender>, + ) = PlainConnection::new(stream).await; let node = DownstreamMiningNode::new(receiver, sender, ids.next()); task::spawn(async move { - let mut incoming: StdFrame = node.receiver.recv().await.unwrap().try_into().unwrap(); - let message_type = incoming.get_header().unwrap().msg_type(); + let incoming: StdFrame = node.receiver.recv().await.unwrap().try_into().unwrap(); + let message_type = incoming.header().msg_type(); let payload = incoming.payload(); - let routing_logic = super::get_common_routing_logic(); - let node = Arc::new(Mutex::new(node)); - - // Call handle_setup_connection or fail - match DownstreamMiningNode::handle_message_common( - node.clone(), - message_type, - payload, - routing_logic, - ) { - Ok(SendToCommon::RelayNewMessageToRemote(_, message)) => { - let message = match message { - roles_logic_sv2::parsers::CommonMessages::SetupConnectionSuccess(m) => m, - _ => panic!(), - }; - DownstreamMiningNode::start(node, message).await + if let Some(payload) = payload { + let mut payload = payload.to_owned(); + let payload = payload.as_mut(); + let routing_logic = super::get_common_routing_logic(); + let node = Arc::new(Mutex::new(node)); + + // Call handle_setup_connection or fail + match DownstreamMiningNode::handle_message_common( + node.clone(), + message_type, + payload, + routing_logic, + ) { + Ok(SendToCommon::RelayNewMessageToRemote(_, message)) => { + let message = match message { + roles_logic_sv2::parsers::CommonMessages::SetupConnectionSuccess(m) => { + m + } + _ => panic!(), + }; + DownstreamMiningNode::start(node, message).await + } + _ => panic!(), } - _ => panic!(), } }); } diff --git a/roles/mining-proxy/src/lib/error.rs b/roles/mining-proxy/src/lib/error.rs index b5c900bfd9..1249db6664 100644 --- a/roles/mining-proxy/src/lib/error.rs +++ b/roles/mining-proxy/src/lib/error.rs @@ -1,22 +1,22 @@ use async_channel::SendError; -use codec_sv2::StandardEitherFrame; +use codec_sv2::StandardFrame; use roles_logic_sv2::parsers::PoolMessages; use std::net::SocketAddr; pub type Message = PoolMessages<'static>; -pub type EitherFrame = StandardEitherFrame; #[derive(Debug)] #[allow(clippy::large_enum_variant)] #[allow(clippy::enum_variant_names)] pub enum Error { - SendError(SendError), + SendError(SendError>), UpstreamNotAvailabe(SocketAddr), SetupConnectionError(String), + NoPayloadFound, } -impl From> for Error { - fn from(error: SendError) -> Self { +impl From>> for Error { + fn from(error: SendError>) -> Self { Error::SendError(error) } } diff --git a/roles/mining-proxy/src/lib/upstream_mining.rs b/roles/mining-proxy/src/lib/upstream_mining.rs index 61a5d0f31a..2acd447a13 100644 --- a/roles/mining-proxy/src/lib/upstream_mining.rs +++ b/roles/mining-proxy/src/lib/upstream_mining.rs @@ -6,7 +6,7 @@ use roles_logic_sv2::utils::Id; use super::downstream_mining::{Channel, DownstreamMiningNode, StdFrame as DownstreamFrame}; use async_channel::{Receiver, SendError, Sender}; use async_recursion::async_recursion; -use codec_sv2::{Frame, HandshakeRole, Initiator, StandardEitherFrame, StandardSv2Frame}; +use codec_sv2::{HandshakeRole, Initiator, StandardFrame, StandardSv2Frame}; use network_helpers_sv2::noise_connection_tokio::Connection; use nohash_hasher::BuildNoHashHasher; use roles_logic_sv2::{ @@ -36,7 +36,6 @@ use stratum_common::bitcoin::TxOut; pub type Message = PoolMessages<'static>; pub type StdFrame = StandardSv2Frame; -pub type EitherFrame = StandardEitherFrame; pub type ProxyRemoteSelector = Prs; #[derive(Debug)] @@ -119,12 +118,12 @@ impl From for ChannelKind { /// upstream proxy. #[derive(Debug, Clone)] struct UpstreamMiningConnection { - receiver: Receiver, - sender: Sender, + receiver: Receiver>, + sender: Sender>, } impl UpstreamMiningConnection { - async fn send(&mut self, sv2_frame: StdFrame) -> Result<(), SendError> { + async fn send(&mut self, sv2_frame: StdFrame) -> Result<(), SendError>> { info!("SEND"); let either_frame = sv2_frame.into(); match self.sender.send(either_frame).await { @@ -406,13 +405,21 @@ impl UpstreamMiningNode { .map_err(|e| (error!("Failed to send {:?}", e)))?; let cloned = self_mutex.clone(); - let mut response = task::spawn(async { Self::receive(cloned).await }) + let response = task::spawn(async { Self::receive(cloned).await }) .await .unwrap() .unwrap(); - let message_type = response.get_header().unwrap().msg_type(); - let payload = response.payload(); + let message_type = response.header().msg_type(); + let payload = match response.payload() { + Some(p) => p, + None => { + error!("No payload found in response"); + return Err(()); + } + }; + let mut payload = payload.to_owned(); + let payload = payload.as_mut(); match (message_type, payload).try_into() { Ok(CommonMessages::SetupConnectionSuccess(_)) => { let receiver = self_mutex @@ -430,7 +437,7 @@ impl UpstreamMiningNode { fn relay_incoming_messages( self_: Arc>, //_downstreams: HashMap, - receiver: Receiver, + receiver: Receiver>, ) { task::spawn(async move { loop { @@ -576,9 +583,17 @@ impl UpstreamMiningNode { } } - pub async fn next(self_mutex: Arc>, mut incoming: StdFrame) { - let message_type = incoming.get_header().unwrap().msg_type(); - let payload = incoming.payload(); + pub async fn next(self_mutex: Arc>, incoming: StdFrame) { + let message_type = incoming.header().msg_type(); + let payload = match incoming.payload() { + Some(p) => p, + None => { + error!("No payload found in response"); + return; + } + }; + let mut payload = payload.to_owned(); + let payload = payload.as_mut(); let routing_logic = super::get_routing_logic(); @@ -607,16 +622,27 @@ impl UpstreamMiningNode { ) }) .unwrap(); - Self::send(self_mutex.clone(), frame).await?; + match Self::send(self_mutex.clone(), frame).await { + Ok(_) => {} + Err(e) => return Err(e), + }; let cloned = self_mutex.clone(); - let mut response = task::spawn(async { Self::receive(cloned).await }) + let response = task::spawn(async { Self::receive(cloned).await }) .await .unwrap() .unwrap(); - let message_type = response.get_header().unwrap().msg_type(); - let payload = response.payload(); + let message_type = response.header().msg_type(); + let payload = match response.payload() { + Some(p) => p, + None => { + error!("No payload found in response"); + return Err(super::error::Error::NoPayloadFound); + } + }; + let mut payload = payload.to_owned(); + let payload = payload.as_mut(); match (message_type, payload).try_into() { Ok(CommonMessages::SetupConnectionSuccess(m)) => { let receiver = self_mutex @@ -861,7 +887,7 @@ impl UpstreamMiningNode { // #[cfg(test)] // #[allow(unused)] // pub async fn next_faster(&mut self, mut incoming: StdFrame) { - // let message_type = incoming.get_header().unwrap().msg_type(); + // let message_type = incoming.header().msg_type(); // // When a channel is opened we need to setup the channel id in order to relay next messages // // to the right Downstream diff --git a/roles/pool/src/lib/mining_pool/mod.rs b/roles/pool/src/lib/mining_pool/mod.rs index e189c5406a..af7f2945da 100644 --- a/roles/pool/src/lib/mining_pool/mod.rs +++ b/roles/pool/src/lib/mining_pool/mod.rs @@ -4,7 +4,7 @@ use super::{ }; use async_channel::{Receiver, Sender}; use binary_sv2::U256; -use codec_sv2::{Frame, HandshakeRole, Responder, StandardEitherFrame, StandardSv2Frame}; +use codec_sv2::{HandshakeRole, Responder, StandardFrame, StandardSv2Frame}; use error_handling::handle_result; use key_utils::{Secp256k1PublicKey, Secp256k1SecretKey, SignatureService}; use network_helpers_sv2::noise_connection_tokio::Connection; @@ -42,7 +42,7 @@ pub mod message_handler; pub type Message = PoolMessages<'static>; pub type StdFrame = StandardSv2Frame; -pub type EitherFrame = StandardEitherFrame; +pub type EitherFrame = StandardFrame; pub fn get_coinbase_output(config: &Configuration) -> Result, Error> { let mut result = Vec::new(); @@ -100,8 +100,8 @@ pub struct Configuration { pub struct Downstream { // Either group or channel id id: u32, - receiver: Receiver, - sender: Sender, + receiver: Receiver>, + sender: Sender>, downstream_data: CommonDownstreamData, solution_sender: Sender>, channel_factory: Arc>, @@ -120,8 +120,8 @@ pub struct Pool { impl Downstream { #[allow(clippy::too_many_arguments)] pub async fn new( - mut receiver: Receiver, - mut sender: Sender, + mut receiver: Receiver>, + mut sender: Sender>, solution_sender: Sender>, pool: Arc>, channel_factory: Arc>, @@ -199,12 +199,13 @@ impl Downstream { Ok(self_) } - pub async fn next(self_mutex: Arc>, mut incoming: StdFrame) -> PoolResult<()> { - let message_type = incoming - .get_header() - .ok_or_else(|| PoolError::Custom(String::from("No header set")))? - .msg_type(); - let payload = incoming.payload(); + pub async fn next(self_mutex: Arc>, incoming: StdFrame) -> PoolResult<()> { + let message_type = incoming.header().msg_type(); + let payload = incoming + .payload() + .ok_or(PoolError::Custom(String::from("No payload set")))?; + let mut payload = payload.to_owned(); + let payload = payload.as_mut(); debug!( "Received downstream message type: {:?}, payload: {:?}", message_type, payload @@ -325,8 +326,10 @@ impl Pool { let address = stream.peer_addr().unwrap(); debug!("New connection from {}", address); - let (receiver, sender): (Receiver, Sender) = - network_helpers::plain_connection_tokio::PlainConnection::new(stream).await; + let (receiver, sender): ( + Receiver>, + Sender>, + ) = network_helpers::plain_connection_tokio::PlainConnection::new(stream).await; handle_result!( status_tx, @@ -385,8 +388,8 @@ impl Pool { async fn accept_incoming_connection_( self_: Arc>, - receiver: Receiver, - sender: Sender, + receiver: Receiver>, + sender: Sender>, address: SocketAddr, ) -> PoolResult<()> { let solution_sender = self_.safe_lock(|p| p.solution_sender.clone())?; diff --git a/roles/pool/src/lib/mining_pool/setup_connection.rs b/roles/pool/src/lib/mining_pool/setup_connection.rs index cf2c06022d..147ee547d2 100644 --- a/roles/pool/src/lib/mining_pool/setup_connection.rs +++ b/roles/pool/src/lib/mining_pool/setup_connection.rs @@ -3,7 +3,6 @@ use super::super::{ mining_pool::{EitherFrame, StdFrame}, }; use async_channel::{Receiver, Sender}; -use codec_sv2::Frame; use roles_logic_sv2::{ common_messages_sv2::{ has_requires_std_job, has_version_rolling, has_work_selection, SetupConnection, @@ -41,7 +40,7 @@ impl SetupConnectionHandler { ) -> PoolResult { // read stdFrame from receiver - let mut incoming: StdFrame = match receiver.recv().await { + let incoming: StdFrame = match receiver.recv().await { Ok(EitherFrame::Sv2(s)) => { debug!("Got sv2 message: {:?}", s); s @@ -59,11 +58,12 @@ impl SetupConnectionHandler { } }; - let message_type = incoming - .get_header() - .ok_or_else(|| PoolError::Custom(String::from("No header set")))? - .msg_type(); - let payload = incoming.payload(); + let message_type = incoming.header().msg_type(); + let payload = incoming + .payload() + .ok_or(PoolError::Custom(String::from("No payload set")))?; + let mut payload = payload.to_owned(); + let payload = payload.as_mut(); let response = ParseDownstreamCommonMessages::handle_message_common( self_.clone(), message_type, diff --git a/roles/pool/src/lib/template_receiver/mod.rs b/roles/pool/src/lib/template_receiver/mod.rs index 49d58e82a8..2718afd336 100644 --- a/roles/pool/src/lib/template_receiver/mod.rs +++ b/roles/pool/src/lib/template_receiver/mod.rs @@ -4,7 +4,7 @@ use super::{ status, }; use async_channel::{Receiver, Sender}; -use codec_sv2::{Frame, HandshakeRole, Initiator}; +use codec_sv2::{HandshakeRole, Initiator}; use error_handling::handle_result; use key_utils::Secp256k1PublicKey; use network_helpers_sv2::noise_connection_tokio::Connection; @@ -102,17 +102,21 @@ impl TemplateRx { .unwrap(); loop { let message_from_tp = handle_result!(status_tx, receiver.recv().await); - let mut message_from_tp: StdFrame = handle_result!( + let message_from_tp: StdFrame = handle_result!( status_tx, message_from_tp .try_into() .map_err(|e| PoolError::Codec(codec_sv2::Error::FramingSv2Error(e))) ); - let message_type_res = message_from_tp - .get_header() - .ok_or_else(|| PoolError::Custom(String::from("No header set"))); - let message_type = handle_result!(status_tx, message_type_res).msg_type(); - let payload = message_from_tp.payload(); + let message_type = message_from_tp.header().msg_type(); + let payload = match message_from_tp.payload() { + Some(payload) => payload, + None => { + continue; + } + }; + let mut payload = payload.to_owned(); + let payload = payload.as_mut(); let msg = handle_result!( status_tx, ParseServerTemplateDistributionMessages::handle_message_template_distribution( diff --git a/roles/pool/src/lib/template_receiver/setup_connection.rs b/roles/pool/src/lib/template_receiver/setup_connection.rs index 60c3cb4f84..78e73896e1 100644 --- a/roles/pool/src/lib/template_receiver/setup_connection.rs +++ b/roles/pool/src/lib/template_receiver/setup_connection.rs @@ -3,7 +3,6 @@ use super::super::{ mining_pool::{EitherFrame, StdFrame}, }; use async_channel::{Receiver, Sender}; -use codec_sv2::Frame; use roles_logic_sv2::{ common_messages_sv2::{Protocol, SetupConnection}, errors::Error, @@ -49,16 +48,17 @@ impl SetupConnectionHandler { let sv2_frame = sv2_frame.into(); sender.send(sv2_frame).await?; - let mut incoming: StdFrame = receiver + let incoming: StdFrame = receiver .recv() .await? .try_into() .map_err(|e| PoolError::Codec(codec_sv2::Error::FramingSv2Error(e)))?; - let message_type = incoming - .get_header() - .ok_or_else(|| PoolError::Custom(String::from("No header set")))? - .msg_type(); - let payload = incoming.payload(); + let message_type = incoming.header().msg_type(); + let payload = incoming + .payload() + .ok_or(PoolError::Custom(String::from("No payload set")))?; + let mut payload = payload.to_owned(); + let payload = payload.as_mut(); ParseUpstreamCommonMessages::handle_message_common( Arc::new(Mutex::new(SetupConnectionHandler {})), diff --git a/roles/roles-utils/network-helpers/src/lib.rs b/roles/roles-utils/network-helpers/src/lib.rs index 3555c2056d..f58de5a2fa 100644 --- a/roles/roles-utils/network-helpers/src/lib.rs +++ b/roles/roles-utils/network-helpers/src/lib.rs @@ -14,7 +14,7 @@ pub mod noise_connection_tokio; pub mod plain_connection_tokio; use async_channel::{Receiver, RecvError, SendError, Sender}; -use codec_sv2::{Error as CodecError, HandShakeFrame, HandshakeRole, StandardEitherFrame}; +use codec_sv2::{Error as CodecError, HandShakeFrame, HandshakeRole, StandardFrame}; use const_sv2::{ INITIATOR_EXPECTED_HANDSHAKE_MESSAGE_SIZE, RESPONDER_EXPECTED_HANDSHAKE_MESSAGE_SIZE, }; @@ -61,8 +61,8 @@ async fn initialize_as_downstream< >( self_: Arc>, role: HandshakeRole, - sender_outgoing: Sender>, - receiver_incoming: Receiver>, + sender_outgoing: Sender>, + receiver_incoming: Receiver>, ) -> Result<(), Error> { let mut state = codec_sv2::State::initialized(role); @@ -76,7 +76,7 @@ async fn initialize_as_downstream< .try_into() .map_err(|_| Error::HandshakeRemoteInvalidMessage)?; let second_message: [u8; INITIATOR_EXPECTED_HANDSHAKE_MESSAGE_SIZE] = second_message - .get_payload_when_handshaking() + .payload() .try_into() .map_err(|_| Error::HandshakeRemoteInvalidMessage)?; @@ -93,8 +93,8 @@ async fn initialize_as_downstream< async fn initialize_as_upstream<'a, Message: Serialize + Deserialize<'a> + GetSize, T: SetState>( self_: Arc>, role: HandshakeRole, - sender_outgoing: Sender>, - receiver_incoming: Receiver>, + sender_outgoing: Sender>, + receiver_incoming: Receiver>, ) -> Result<(), Error> { let mut state = codec_sv2::State::initialized(role); @@ -105,7 +105,7 @@ async fn initialize_as_upstream<'a, Message: Serialize + Deserialize<'a> + GetSi .try_into() .map_err(|_| Error::HandshakeRemoteInvalidMessage)?; let first_message: [u8; RESPONDER_EXPECTED_HANDSHAKE_MESSAGE_SIZE] = first_message - .get_payload_when_handshaking() + .payload() .try_into() .map_err(|_| Error::HandshakeRemoteInvalidMessage)?; diff --git a/roles/roles-utils/network-helpers/src/noise_connection_async_std.rs b/roles/roles-utils/network-helpers/src/noise_connection_async_std.rs index ae40529037..f9c17bc295 100644 --- a/roles/roles-utils/network-helpers/src/noise_connection_async_std.rs +++ b/roles/roles-utils/network-helpers/src/noise_connection_async_std.rs @@ -10,7 +10,7 @@ use std::{sync::Arc, time::Duration}; use tracing::{debug, error}; use binary_sv2::GetSize; -use codec_sv2::{HandshakeRole, Initiator, Responder, StandardEitherFrame, StandardNoiseDecoder}; +use codec_sv2::{HandshakeRole, Initiator, Responder, StandardFrame, StandardNoiseDecoder}; use crate::Error; @@ -42,8 +42,8 @@ impl Connection { capacity: usize, ) -> Result< ( - Receiver>, - Sender>, + Receiver>, + Sender>, ), Error, > { @@ -51,12 +51,12 @@ impl Connection { let (mut reader, writer) = (stream.clone(), stream.clone()); let (sender_incoming, receiver_incoming): ( - Sender>, - Receiver>, + Sender>, + Receiver>, ) = bounded(capacity); let (sender_outgoing, receiver_outgoing): ( - Sender>, - Receiver>, + Sender>, + Receiver>, ) = bounded(capacity); let state = codec_sv2::State::not_initialized(&role); diff --git a/roles/roles-utils/network-helpers/src/noise_connection_tokio.rs b/roles/roles-utils/network-helpers/src/noise_connection_tokio.rs index 2e2996365f..0ab5ba6bc0 100644 --- a/roles/roles-utils/network-helpers/src/noise_connection_tokio.rs +++ b/roles/roles-utils/network-helpers/src/noise_connection_tokio.rs @@ -10,7 +10,7 @@ use tokio::{ }; use binary_sv2::GetSize; -use codec_sv2::{HandshakeRole, Initiator, Responder, StandardEitherFrame, StandardNoiseDecoder}; +use codec_sv2::{HandshakeRole, Initiator, Responder, StandardFrame, StandardNoiseDecoder}; use tracing::{debug, error}; @@ -41,8 +41,8 @@ impl Connection { role: HandshakeRole, ) -> Result< ( - Receiver>, - Sender>, + Receiver>, + Sender>, AbortHandle, AbortHandle, ), @@ -53,12 +53,12 @@ impl Connection { let (mut reader, mut writer) = stream.into_split(); let (sender_incoming, receiver_incoming): ( - Sender>, - Receiver>, + Sender>, + Receiver>, ) = bounded(10); // TODO caller should provide this param let (sender_outgoing, receiver_outgoing): ( - Sender>, - Receiver>, + Sender>, + Receiver>, ) = bounded(10); // TODO caller should provide this param let state = codec_sv2::State::not_initialized(&role); diff --git a/roles/roles-utils/network-helpers/src/plain_connection_async_std.rs b/roles/roles-utils/network-helpers/src/plain_connection_async_std.rs index e97accfe74..1477319dcf 100644 --- a/roles/roles-utils/network-helpers/src/plain_connection_async_std.rs +++ b/roles/roles-utils/network-helpers/src/plain_connection_async_std.rs @@ -9,7 +9,7 @@ use core::convert::TryInto; use tracing::error; use binary_sv2::GetSize; -use codec_sv2::{StandardDecoder, StandardEitherFrame}; +use codec_sv2::{StandardDecoder, StandardFrame}; #[derive(Debug)] pub struct PlainConnection {} @@ -20,18 +20,18 @@ impl PlainConnection { stream: TcpStream, capacity: usize, ) -> ( - Receiver>, - Sender>, + Receiver>, + Sender>, ) { let (mut reader, writer) = (stream.clone(), stream); let (sender_incoming, receiver_incoming): ( - Sender>, - Receiver>, + Sender>, + Receiver>, ) = bounded(capacity); let (sender_outgoing, receiver_outgoing): ( - Sender>, - Receiver>, + Sender>, + Receiver>, ) = bounded(capacity); // RECEIVE AND PARSE INCOMING MESSAGES FROM TCP STREAM diff --git a/roles/roles-utils/network-helpers/src/plain_connection_tokio.rs b/roles/roles-utils/network-helpers/src/plain_connection_tokio.rs index fe4eb63678..1ef75ef65b 100644 --- a/roles/roles-utils/network-helpers/src/plain_connection_tokio.rs +++ b/roles/roles-utils/network-helpers/src/plain_connection_tokio.rs @@ -8,7 +8,7 @@ use tokio::{ }; use binary_sv2::GetSize; -use codec_sv2::{Error::MissingBytes, StandardDecoder, StandardEitherFrame}; +use codec_sv2::{Error::MissingBytes, StandardDecoder, StandardFrame}; use tracing::{error, trace}; #[derive(Debug)] @@ -25,20 +25,20 @@ impl PlainConnection { pub async fn new<'a, Message: Serialize + Deserialize<'a> + GetSize + Send + 'static>( stream: TcpStream, ) -> ( - Receiver>, - Sender>, + Receiver>, + Sender>, ) { const NOISE_HANDSHAKE_SIZE_HINT: usize = 3363412; let (mut reader, mut writer) = stream.into_split(); let (sender_incoming, receiver_incoming): ( - Sender>, - Receiver>, + Sender>, + Receiver>, ) = bounded(10); // TODO caller should provide this param let (sender_outgoing, receiver_outgoing): ( - Sender>, - Receiver>, + Sender>, + Receiver>, ) = bounded(10); // TODO caller should provide this param // RECEIVE AND PARSE INCOMING MESSAGES FROM TCP STREAM diff --git a/roles/test-utils/mining-device/src/main.rs b/roles/test-utils/mining-device/src/main.rs index 1bfbf6737a..31b3085416 100644 --- a/roles/test-utils/mining-device/src/main.rs +++ b/roles/test-utils/mining-device/src/main.rs @@ -110,7 +110,7 @@ async fn main() { use async_channel::{Receiver, Sender}; use binary_sv2::u256_from_int; -use codec_sv2::{Frame, Initiator, StandardEitherFrame, StandardSv2Frame}; +use codec_sv2::{Initiator, StandardFrame, StandardSv2Frame}; use roles_logic_sv2::{ common_messages_sv2::{Protocol, SetupConnection, SetupConnectionSuccess}, common_properties::{IsMiningUpstream, IsUpstream}, @@ -128,7 +128,7 @@ use roles_logic_sv2::{ pub type Message = MiningDeviceMessages<'static>; pub type StdFrame = StandardSv2Frame; -pub type EitherFrame = StandardEitherFrame; +pub type EitherFrame = StandardFrame; struct SetupConnectionHandler {} use std::convert::TryInto; @@ -179,16 +179,20 @@ impl SetupConnectionHandler { sender.send(sv2_frame).await.unwrap(); info!("Setup connection sent to {}", address); - let mut incoming: StdFrame = receiver.recv().await.unwrap().try_into().unwrap(); - let message_type = incoming.get_header().unwrap().msg_type(); + let incoming: StdFrame = receiver.recv().await.unwrap().try_into().unwrap(); + let message_type = incoming.header().msg_type(); let payload = incoming.payload(); - ParseUpstreamCommonMessages::handle_message_common( - self_, - message_type, - payload, - CommonRoutingLogic::None, - ) - .unwrap(); + if let Some(payload) = payload { + let mut payload = payload.to_owned(); + let payload = payload.as_mut(); + ParseUpstreamCommonMessages::handle_message_common( + self_, + message_type, + payload, + CommonRoutingLogic::None, + ) + .unwrap(); + } } } @@ -312,9 +316,17 @@ impl Device { }); loop { - let mut incoming: StdFrame = receiver.recv().await.unwrap().try_into().unwrap(); - let message_type = incoming.get_header().unwrap().msg_type(); + let incoming: StdFrame = receiver.recv().await.unwrap().try_into().unwrap(); + let message_type = incoming.header().msg_type(); let payload = incoming.payload(); + let payload = match payload { + Some(p) => p, + None => { + continue; + } + }; + let mut payload = payload.to_owned(); + let payload = payload.as_mut(); let next = Device::handle_message_mining( self_mutex.clone(), message_type, diff --git a/roles/translator/src/lib/upstream_sv2/mod.rs b/roles/translator/src/lib/upstream_sv2/mod.rs index ba365bae15..001f250aa5 100644 --- a/roles/translator/src/lib/upstream_sv2/mod.rs +++ b/roles/translator/src/lib/upstream_sv2/mod.rs @@ -1,4 +1,4 @@ -use codec_sv2::{StandardEitherFrame, StandardSv2Frame}; +use codec_sv2::{StandardFrame, StandardSv2Frame}; use roles_logic_sv2::parsers::PoolMessages; pub mod diff_management; @@ -9,7 +9,7 @@ pub use upstream_connection::UpstreamConnection; pub type Message = PoolMessages<'static>; pub type StdFrame = StandardSv2Frame; -pub type EitherFrame = StandardEitherFrame; +pub type EitherFrame = StandardFrame; #[derive(Clone, Copy, Debug)] pub struct Sv2MiningConnection { diff --git a/roles/translator/src/lib/upstream_sv2/upstream.rs b/roles/translator/src/lib/upstream_sv2/upstream.rs index f6d192f75e..e23666d49b 100644 --- a/roles/translator/src/lib/upstream_sv2/upstream.rs +++ b/roles/translator/src/lib/upstream_sv2/upstream.rs @@ -11,7 +11,7 @@ use crate::{ use async_channel::{Receiver, Sender}; use async_std::{net::TcpStream, task}; use binary_sv2::u256_from_int; -use codec_sv2::{Frame, HandshakeRole, Initiator}; +use codec_sv2::{HandshakeRole, Initiator}; use error_handling::handle_result; use key_utils::Secp256k1PublicKey; use network_helpers_sv2::Connection; @@ -194,7 +194,7 @@ impl Upstream { // Wait for the SV2 Upstream to respond with either a `SetupConnectionSuccess` or a // `SetupConnectionError` inside a SV2 binary message frame - let mut incoming: StdFrame = match connection.receiver.recv().await { + let incoming: StdFrame = match connection.receiver.recv().await { Ok(frame) => frame.try_into()?, Err(e) => { error!("Upstream connection closed: {}", e); @@ -205,13 +205,13 @@ impl Upstream { }; // Gets the binary frame message type from the message header - let message_type = if let Some(header) = incoming.get_header() { - header.msg_type() - } else { - return Err(framing_sv2::Error::ExpectedHandshakeFrame.into()); - }; + let message_type = incoming.header().msg_type(); // Gets the message payload - let payload = incoming.payload(); + let payload = incoming + .payload() + .ok_or(framing_sv2::Error::ExpectedHandshakeFrame)?; + let mut payload = payload.to_owned(); + let payload = payload.as_mut(); // Handle the incoming message (should be either `SetupConnectionSuccess` or // `SetupConnectionError`) @@ -294,19 +294,27 @@ impl Upstream { loop { // Waiting to receive a message from the SV2 Upstream role let incoming = handle_result!(tx_status, recv.recv().await); - let mut incoming: StdFrame = handle_result!(tx_status, incoming.try_into()); + let incoming: StdFrame = handle_result!(tx_status, incoming.try_into()); // On message receive, get the message type from the message header and get the // message payload - let message_type = - incoming - .get_header() - .ok_or(super::super::error::Error::FramingSv2( - framing_sv2::Error::ExpectedSv2Frame, - )); - - let message_type = handle_result!(tx_status, message_type).msg_type(); + let message_type = incoming.header().msg_type(); let payload = incoming.payload(); + let payload = match payload { + Some(p) => p, + None => { + error!("Received empty payload from upstream!"); + handle_result!( + tx_status, + Err(CodecNoise( + codec_sv2::noise_sv2::Error::ExpectedIncomingHandshakeMessage + )) + ) + } + }; + let mut payload = payload.to_owned(); + let payload = payload.as_mut(); + // let payload = handle_result!(tx_status, payload).as_mut(); // Since this is not communicating with an SV2 proxy, but instead a custom SV1 // proxy where the routing logic is handled via the `Upstream`'s communication diff --git a/test/scale/src/main.rs b/test/scale/src/main.rs index 0e9872c6b7..c93867af9e 100644 --- a/test/scale/src/main.rs +++ b/test/scale/src/main.rs @@ -7,7 +7,7 @@ use tokio::{ use async_channel::{bounded, Receiver, Sender}; use clap::{App, Arg}; -use codec_sv2::{HandshakeRole, Initiator, Responder, StandardEitherFrame, StandardSv2Frame}; +use codec_sv2::{HandshakeRole, Initiator, Responder, StandardFrame, StandardSv2Frame}; use std::time::Duration; use network_helpers::{ @@ -20,7 +20,7 @@ use roles_logic_sv2::{ parsers::{Mining, MiningDeviceMessages}, }; -pub type EitherFrame = StandardEitherFrame; +pub type EitherFrame = StandardFrame; pub const AUTHORITY_PUBLIC_K: &str = "9auqWEzQDVyd2oe1JVGFLMLHZtCo2FFqZwtKA5gd9xbuEu7PH72"; pub const AUTHORITY_PRIVATE_K: &str = "mkDLTBBRxdBv998612qipDYoTK3YUrqLe8uWw7gu3iXbSrn2n"; diff --git a/utils/message-generator/src/executor.rs b/utils/message-generator/src/executor.rs index b69bce0e6b..15ca460c0e 100644 --- a/utils/message-generator/src/executor.rs +++ b/utils/message-generator/src/executor.rs @@ -7,7 +7,7 @@ use crate::{ }; use async_channel::{Receiver, Sender}; use binary_sv2::Serialize; -use codec_sv2::{Frame, StandardEitherFrame as EitherFrame, Sv2Frame}; +use codec_sv2::{StandardFrame as EitherFrame, Sv2Frame}; use roles_logic_sv2::parsers::{self, AnyMessage}; use std::{collections::HashMap, convert::TryInto, sync::Arc}; @@ -222,10 +222,12 @@ impl Executor { } }; - let mut message: Sv2Frame, _> = message.try_into().unwrap(); + let message: Sv2Frame, _> = message.try_into().unwrap(); debug!("RECV {:#?}", message); - let header = message.get_header().unwrap(); - let payload = message.payload(); + let header = message.header(); + let payload = message.payload().unwrap(); + let mut payload = payload.to_owned(); + let payload = payload.as_mut(); match result { ActionResult::MatchMessageType(message_type) => { if header.msg_type() != *message_type { diff --git a/utils/message-generator/src/main.rs b/utils/message-generator/src/main.rs index 695b3919cd..77d55f9faa 100644 --- a/utils/message-generator/src/main.rs +++ b/utils/message-generator/src/main.rs @@ -10,7 +10,7 @@ extern crate load_file; use crate::parser::sv2_messages::ReplaceField; use binary_sv2::{Deserialize, Serialize}; -use codec_sv2::StandardEitherFrame as EitherFrame; +use codec_sv2::StandardFrame as EitherFrame; use external_commands::*; use key_utils::{Secp256k1PublicKey, Secp256k1SecretKey}; use rand::Rng; @@ -451,7 +451,7 @@ mod test { into_static::into_static, net::{setup_as_downstream, setup_as_upstream}, }; - use codec_sv2::{Frame, Sv2Frame}; + use codec_sv2::Sv2Frame; use roles_logic_sv2::{ mining_sv2::{ CloseChannel, NewExtendedMiningJob, OpenExtendedMiningChannel, @@ -657,10 +657,14 @@ mod test { let client_received = client_recv.recv().await.unwrap(); match (server_received, client_received) { (EitherFrame::Sv2(mut frame1), EitherFrame::Sv2(mut frame2)) => { - let mt1 = frame1.get_header().unwrap().msg_type(); - let mt2 = frame2.get_header().unwrap().msg_type(); - let p1 = frame1.payload(); - let p2 = frame2.payload(); + let mt1 = frame1.header().msg_type(); + let mt2 = frame2.header().msg_type(); + let p1 = frame1.payload().unwrap(); + let mut p1 = p1.to_owned(); + let p1 = p1.as_mut(); + let p2 = frame2.payload().unwrap(); + let mut p2 = p2.to_owned(); + let p2 = p2.as_mut(); let message1: Mining = (mt1, p1).try_into().unwrap(); let message2: Mining = (mt2, p2).try_into().unwrap(); match (message1, message2) { diff --git a/utils/message-generator/src/net.rs b/utils/message-generator/src/net.rs index 8893a9bd5b..75a68fa7dd 100644 --- a/utils/message-generator/src/net.rs +++ b/utils/message-generator/src/net.rs @@ -1,7 +1,7 @@ use crate::{os_command, Command}; use async_channel::{bounded, Receiver, Sender}; use binary_sv2::{Deserialize, GetSize, Serialize}; -use codec_sv2::{HandshakeRole, Initiator, Responder, StandardEitherFrame as EitherFrame}; +use codec_sv2::{HandshakeRole, Initiator, Responder, StandardFrame as EitherFrame}; use key_utils::{Secp256k1PublicKey, Secp256k1SecretKey}; use network_helpers_sv2::{ noise_connection_tokio::Connection, plain_connection_tokio::PlainConnection, diff --git a/utils/message-generator/src/parser/actions.rs b/utils/message-generator/src/parser/actions.rs index ce84c7adf1..559d4e7413 100644 --- a/utils/message-generator/src/parser/actions.rs +++ b/utils/message-generator/src/parser/actions.rs @@ -1,5 +1,5 @@ use crate::{Action, ActionResult, Role, SaveField, Sv1Action, Sv1ActionResult, Sv2Type}; -use codec_sv2::{buffer_sv2::Slice, StandardEitherFrame, Sv2Frame}; +use codec_sv2::{buffer_sv2::Slice, StandardFrame, Sv2Frame}; use roles_logic_sv2::parsers::AnyMessage; use serde_json::{Map, Value}; use std::collections::HashMap; @@ -35,7 +35,7 @@ impl Sv2ActionParser { panic!("Frame id not found: {} Impossible to parse action", id) }) .clone(); - let frame = StandardEitherFrame::Sv2(frame); + let frame = StandardFrame::Sv2(frame); let message = messages.get(id.as_str().unwrap()); let message = message .unwrap_or_else(|| { diff --git a/utils/message-generator/src/parser/frames.rs b/utils/message-generator/src/parser/frames.rs index 633163200b..cd4c2c5823 100644 --- a/utils/message-generator/src/parser/frames.rs +++ b/utils/message-generator/src/parser/frames.rs @@ -1,5 +1,5 @@ use super::sv2_messages::{message_from_path, ReplaceField}; -use codec_sv2::{buffer_sv2::Slice, Frame as _Frame, Sv2Frame}; +use codec_sv2::{buffer_sv2::Slice, Sv2Frame}; use roles_logic_sv2::parsers::AnyMessage; use serde_json::{Map, Value}; use std::{collections::HashMap, convert::TryInto};