Skip to content

Commit

Permalink
refactor: simplify registration serialisation in dispatcher
Browse files Browse the repository at this point in the history
docs: fix types in several docstrings
  • Loading branch information
jpcsmith committed Nov 28, 2023
1 parent a7e4f3a commit 9b2199a
Show file tree
Hide file tree
Showing 9 changed files with 66 additions and 28 deletions.
2 changes: 1 addition & 1 deletion crates/scion-proto/src/address/socket_address.rs
Original file line number Diff line number Diff line change
Expand Up @@ -246,7 +246,7 @@ socket_address! {
}

impl SocketAddrV6 {
/// Construct a new SCION v4 socket address from an ISD-AS number and standard
/// Construct a new SCION v6 socket address from an ISD-AS number and standard
/// rust socket address.
pub const fn from_std(isd_asn: IsdAsn, socket_address: std::net::SocketAddrV6) -> Self {
Self {
Expand Down
5 changes: 5 additions & 0 deletions crates/scion-proto/src/packet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,11 @@ impl From<DataplanePathErrorKind> for DecodeError {
}
}

/// Raised if there nt enough available in the capacity for encoding the SCION headers.
///
/// As the headers can a maximum of 1020 bytes in length, it is advisable to have attempted
/// least that amount of remaining space for encoding a [`ScionPacket`] (the payload is not
/// written to the buffer).
#[derive(Debug, thiserror::Error, PartialEq, Eq, Clone, Copy)]
#[error("the provided buffer did not have sufficient size")]
pub struct InadequateBufferSize;
8 changes: 7 additions & 1 deletion crates/scion-proto/src/packet/common_header.rs
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,12 @@ impl From<AddressInfo> for u8 {
}
}

impl From<ByEndpoint<AddressInfo>> for u8 {
fn from(value: ByEndpoint<AddressInfo>) -> Self {
value.destination.get() << 4 | value.source.get()
}
}

impl<T> WireDecode<T> for CommonHeader
where
T: Buf,
Expand Down Expand Up @@ -247,7 +253,7 @@ impl WireEncode for CommonHeader {
buffer.put_u8(self.header_length_factor.get());
buffer.put_u16(self.payload_length);
buffer.put_u8(self.path_type.into_encoded());
buffer.put_u8((self.address_info.destination.get() << 4) | self.address_info.source.get());
buffer.put_u8(self.address_info.into());
buffer.put_u16(self.reserved);

Ok(())
Expand Down
2 changes: 1 addition & 1 deletion crates/scion-proto/src/reliable.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,6 @@ pub struct Packet {
/// The last AS-level host the packet traversed, such as the ingress border router or the
/// sending host if it is located in the same AS.
pub last_host: Option<SocketAddr>,
/// The content of the single packet as a sequence of Bytes objects.
/// The content of the packet.
pub content: Bytes,
}
2 changes: 0 additions & 2 deletions crates/scion-proto/src/reliable/parser.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,6 @@ impl StreamParser {
}

/// Decode and store the next partial or common header available from the data.
///
/// Returns true if a full header is available.
fn try_decode_header(&mut self, data: &mut BytesMut) -> Result<(), DecodeError> {
match self.header {
None if data.remaining() >= CommonHeader::MIN_LENGTH => {
Expand Down
26 changes: 18 additions & 8 deletions crates/scion-proto/src/reliable/registration.rs
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,7 @@ fn encode_address(buffer: &mut impl BufMut, address: &SocketAddr) {
}
}

/// Error returned when attempting to to register to a service address.
/// Error returned when attempting to register to a service address.
#[derive(Debug, Clone, Copy, thiserror::Error, PartialEq, Eq)]
#[error("cannot register to the provided address type")]
pub struct InvalidRegistrationAddressError;
Expand All @@ -179,7 +179,7 @@ pub enum RegistrationError {

/// A simple state machine for handling the registration to the dispatcher.
///
/// The methods [`RegistrationExchange::register()`] and [`RegistrationExchange::response()`]
/// The methods [`RegistrationExchange::register()`] and [`RegistrationExchange::handle_response()`]
/// are used to initiate the registration and handle registration response respectively.
#[derive(Debug, Default)]
pub struct RegistrationExchange {
Expand All @@ -195,17 +195,24 @@ impl RegistrationExchange {
/// Register to receive SCION packets destined for the given address and port.
///
/// The registration request to be sent to the dispatcher over a Unix socket is
/// written into the provided buffer.
/// written into the provided buffer and the number of bytes written are returned.
///
/// Specify a port number of zero to allow the dispatcher to assign the port number.
///
/// Returns an error if an attempt is made to register to a service address.
/// # Errors
///
/// Returns an error if an attempt is made to register to a service address, or if
/// the address has a wildcard ISD-AS number.
///
/// # Panics
///
/// Panics if called repeatedly before a call to [`Self::handle_response()`]
pub fn register<T: BufMut>(
&mut self,
address: ScionSocketAddr,
buffer: &mut T,
) -> Result<(), InvalidRegistrationAddressError> {
assert!(self.request.is_none());
) -> Result<usize, InvalidRegistrationAddressError> {
assert!(self.request.is_none(), "register called repeatedly");

if address.isd_asn().is_wildcard() {
return Err(InvalidRegistrationAddressError);
Expand All @@ -216,13 +223,16 @@ impl RegistrationExchange {
.ok_or(InvalidRegistrationAddressError)?;

let request = RegistrationRequest::new(address.isd_asn(), public_address);
let encoded_length = request.encoded_request_length();

request.encode_to(buffer);
self.request = Some(request);

Ok(())
Ok(encoded_length)
}

/// Handle the response from the dispatcher and returns the registered address.
/// Handle the response from the dispatcher for the most recent call to [`Self::register()`],
/// and returns the registered address.
///
/// Returns an error if the response cannot be decoded or if the dispatcher has deviated
/// from the expected protocol.
Expand Down
3 changes: 2 additions & 1 deletion crates/scion-proto/src/wire_encoding.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
use bytes::{BufMut, Bytes, BytesMut};

/// A trait for types decodable from a wire format, without any additional information.
pub trait WireDecode<T>: Sized {
/// The error type returned on a failed decode.
Expand Down Expand Up @@ -114,4 +116,3 @@ macro_rules! bounded_uint {
};
}
pub(crate) use bounded_uint;
use bytes::{BufMut, Bytes, BytesMut};
27 changes: 13 additions & 14 deletions crates/scion/src/dispatcher.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use std::{io, path::Path};

use bytes::{Buf, BufMut, BytesMut};
use bytes::{Buf, BytesMut};
use scion_proto::{
address::SocketAddr,
reliable::{
Expand All @@ -18,7 +18,13 @@ use tokio::{
net::UnixStream,
};

const BUFFER_LENGTH: usize = 1024 * 1024;
// Recv buffer to 1 MiB
// TODO(jsmith): Allow the user to set this
const RECV_BUFFER_LEN: usize = 1024 * 1024; // 1 MiB;

// Set the send buffer to 1024 bytes since only single common headers (max ~32 B) are written to it.
// This means that the logic for resetting the BytesMut is triggered only once every ~30 packets.
const SEND_BUFFER_LEN: usize = 1024;

#[derive(Debug, thiserror::Error)]
pub enum RegistrationError {
Expand Down Expand Up @@ -74,8 +80,8 @@ impl DispatcherStream {

Ok(Self {
inner,
send_buffer: BytesMut::with_capacity(BUFFER_LENGTH),
recv_buffer: BytesMut::with_capacity(BUFFER_LENGTH),
send_buffer: BytesMut::with_capacity(SEND_BUFFER_LEN),
recv_buffer: BytesMut::with_capacity(RECV_BUFFER_LEN),
parser: StreamParser::new(),
})
}
Expand All @@ -88,17 +94,10 @@ impl DispatcherStream {
debug_assert!(self.send_buffer.is_empty());

// Known to hold all registration messages
let mut registration_message = [0u8; 64];

// Write the registraton message to the buffer
let mut buffer = registration_message.as_mut_slice();
exchange.register(address, &mut buffer)?;
let bytes_remaining = buffer.remaining_mut();

let message_length = registration_message.len() - bytes_remaining;
let mut buffer = &registration_message[..message_length];
let mut buffer = [0u8; 64];
let message_length = exchange.register(address, &mut buffer.as_mut())?;

if let Err(err) = self.send_via(None, &mut buffer).await {
if let Err(err) = self.send_via(None, &mut &buffer[..message_length]).await {
match err {
SendError::Io(err) => return Err(err.into()),
SendError::PayloadTooLarge(_) => unreachable!(),
Expand Down
19 changes: 19 additions & 0 deletions crates/scion/tests/udp_sanity.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
// use std::{error::Error, path::PathBuf};
//
// use scion::udp::UdpSocket;
//
// #[tokio::test]
// async fn binding_to_port() -> Result<(), Box<dyn Error>> {
// tracing_subscriber::fmt::init();
//
// let dispatcher_path: PathBuf = [env!("CARGO_MANIFEST_DIR"), "tests", "dispatcher.sock"]
// .iter()
// .collect();
// let bind_address = "[1-ff00:0:110,0.0.0.0]:443".parse().unwrap();
//
// let socket = UdpSocket::bind_with_dispatcher(bind_address, dispatcher_path).await?;
//
// assert_eq!(bind_address, socket.local_addr());
//
// Ok(())
// }

0 comments on commit 9b2199a

Please sign in to comment.