Skip to content

Commit

Permalink
refactor: Make DownstreamPacket sans-io (#1056)
Browse files Browse the repository at this point in the history
This moves the DownstreamPacket struct to be completely sans-IO, meaning
it is now generic over any input container and output container. This is
just a basic shell right now to enable building and testing more stuff
without IO, and to enable alternatives like XDP to integrate easily.
  • Loading branch information
XAMPPRocky authored Jan 8, 2025
1 parent f06bf5a commit c81df40
Show file tree
Hide file tree
Showing 25 changed files with 159 additions and 100 deletions.
9 changes: 1 addition & 8 deletions src/components/proxy/io_uring_shared.rs
Original file line number Diff line number Diff line change
Expand Up @@ -249,14 +249,7 @@ fn process_packet(
source: packet.source,
};

crate::components::proxy::packet_router::DownstreamReceiveWorkerConfig::process_task(
ds_packet,
*worker_id,
config,
sessions,
error_acc,
destinations,
);
ds_packet.process(*worker_id, config, sessions, error_acc, destinations);
}
PacketProcessorCtx::SessionPool { pool, port, .. } => {
let mut last_received_at = None;
Expand Down
98 changes: 65 additions & 33 deletions src/components/proxy/packet_router.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,13 @@
* limitations under the License.
*/

use super::{sessions::SessionKey, PipelineError, SessionPool};
use super::{
sessions::{SessionKey, SessionManager},
PipelineError, SessionPool,
};
use crate::{
filters::{Filter as _, ReadContext},
metrics,
pool::PoolBuffer,
Config,
metrics, Config,
};
use std::{net::SocketAddr, sync::Arc};
use tokio::sync::mpsc;
Expand All @@ -29,44 +30,64 @@ mod io_uring;
#[cfg(not(target_os = "linux"))]
mod reference;

/// Packet received from local port
pub(crate) struct DownstreamPacket {
pub(crate) contents: PoolBuffer,
//received_at: UtcTimestamp,
pub(crate) source: SocketAddr,
/// Representation of an immutable set of bytes pulled from the network, this trait
/// provides an abstraction over however the packet was received (epoll, io-uring, xdp)
///
/// Use [PacketMut] if you need a mutable representation.
pub trait Packet: Sized {
/// Returns the underlying slice of bytes representing the packet.
fn as_slice(&self) -> &[u8];

/// Returns the size of the packet.
fn len(&self) -> usize;

/// Returns whether the given packet is empty.
fn is_empty(&self) -> bool {
self.len() == 0
}
}

/// Represents the required arguments to run a worker task that
/// processes packets received downstream.
pub struct DownstreamReceiveWorkerConfig {
/// ID of the worker.
pub worker_id: usize,
pub port: u16,
pub config: Arc<Config>,
pub sessions: Arc<SessionPool>,
pub error_sender: super::error::ErrorSender,
pub buffer_pool: Arc<crate::pool::BufferPool>,
/// Representation of an mutable set of bytes pulled from the network, this trait
/// provides an abstraction over however the packet was received (epoll, io-uring, xdp)
pub trait PacketMut: Sized + Packet {
type FrozenPacket: Packet;
fn alloc_sized(&self, size: usize) -> Option<Self>;
fn as_mut_slice(&mut self) -> &mut [u8];
fn set_len(&mut self, len: usize);
fn remove_head(&mut self, length: usize);
fn remove_tail(&mut self, length: usize);
fn extend_head(&mut self, bytes: &[u8]);
fn extend_tail(&mut self, bytes: &[u8]);
/// Returns an immutable version of the packet, this allows certain types
/// return a type that can be more cheaply cloned and shared.
fn freeze(self) -> Self::FrozenPacket;
}

impl DownstreamReceiveWorkerConfig {
/// Packet received from local port
pub(crate) struct DownstreamPacket<P> {
pub(crate) contents: P,
pub(crate) source: SocketAddr,
}

impl<P: PacketMut> DownstreamPacket<P> {
#[inline]
pub(crate) fn process_task(
packet: DownstreamPacket,
pub(crate) fn process<S: SessionManager<Packet = P::FrozenPacket>>(
self,
worker_id: usize,
config: &Arc<Config>,
sessions: &Arc<SessionPool>,
sessions: &S,
error_acc: &mut super::error::ErrorAccumulator,
destinations: &mut Vec<crate::net::EndpointAddress>,
) {
tracing::trace!(
id = worker_id,
size = packet.contents.len(),
source = %packet.source,
size = self.contents.len(),
source = %self.source,
"received packet from downstream"
);

let timer = metrics::processing_time(metrics::READ).start_timer();
match Self::process_downstream_received_packet(packet, config, sessions, destinations) {
match self.process_inner(config, sessions, destinations) {
Ok(()) => {
error_acc.maybe_send();
}
Expand All @@ -84,10 +105,10 @@ impl DownstreamReceiveWorkerConfig {

/// Processes a packet by running it through the filter chain.
#[inline]
fn process_downstream_received_packet(
packet: DownstreamPacket,
fn process_inner<S: SessionManager<Packet = P::FrozenPacket>>(
self,
config: &Arc<Config>,
sessions: &Arc<SessionPool>,
sessions: &S,
destinations: &mut Vec<crate::net::EndpointAddress>,
) -> Result<(), PipelineError> {
if !config.clusters.read().has_endpoints() {
Expand All @@ -97,8 +118,7 @@ impl DownstreamReceiveWorkerConfig {

let cm = config.clusters.clone_value();
let filters = config.filters.load();
let mut context =
ReadContext::new(&cm, packet.source.into(), packet.contents, destinations);
let mut context = ReadContext::new(&cm, self.source.into(), self.contents, destinations);
filters.read(&mut context).map_err(PipelineError::Filter)?;

let ReadContext { contents, .. } = context;
Expand All @@ -110,17 +130,29 @@ impl DownstreamReceiveWorkerConfig {

for epa in destinations.drain(0..) {
let session_key = SessionKey {
source: packet.source,
source: self.source,
dest: epa.to_socket_addr()?,
};

sessions.send(session_key, contents.clone())?;
sessions.send(session_key, &contents)?;
}

Ok(())
}
}

/// Represents the required arguments to run a worker task that
/// processes packets received downstream.
pub struct DownstreamReceiveWorkerConfig {
/// ID of the worker.
pub worker_id: usize,
pub port: u16,
pub config: Arc<Config>,
pub sessions: Arc<SessionPool>,
pub error_sender: super::error::ErrorSender,
pub buffer_pool: Arc<crate::pool::BufferPool>,
}

/// Spawns a background task that sits in a loop, receiving packets from the passed in socket.
/// Each received packet is placed on a queue to be processed by a worker task.
/// This function also spawns the set of worker tasks responsible for consuming packets
Expand Down
3 changes: 1 addition & 2 deletions src/components/proxy/packet_router/reference.rs
Original file line number Diff line number Diff line change
Expand Up @@ -128,8 +128,7 @@ impl super::DownstreamReceiveWorkerConfig {
}
last_received_at = Some(received_at);

Self::process_task(
packet,
packet.process(
worker_id,
&config,
&sessions,
Expand Down
15 changes: 15 additions & 0 deletions src/components/proxy/sessions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,13 @@ cfg_if::cfg_if! {
}
}

/// Responsible for managing sending processed traffic to its destination and
/// tracking metrics and other information about the session.
pub trait SessionManager {
type Packet: crate::filters::Packet;
fn send(&self, key: SessionKey, contents: &Self::Packet) -> Result<(), super::PipelineError>;
}

#[derive(PartialEq, Eq, Hash)]
pub enum SessionError {
SocketAddressUnavailable,
Expand Down Expand Up @@ -456,6 +463,14 @@ impl SessionPool {
}
}

impl SessionManager for Arc<SessionPool> {
type Packet = FrozenPoolBuffer;

fn send(&self, key: SessionKey, contents: &Self::Packet) -> Result<(), super::PipelineError> {
SessionPool::send(self, key, contents.clone())
}
}

impl Drop for SessionPool {
fn drop(&mut self) {
let map = std::mem::take(&mut self.session_map);
Expand Down
4 changes: 2 additions & 2 deletions src/config/slot.rs
Original file line number Diff line number Diff line change
Expand Up @@ -194,14 +194,14 @@ impl<T: JsonSchema + Default> JsonSchema for Slot<T> {
}

impl<T: crate::filters::Filter + Default> crate::filters::Filter for Slot<T> {
fn read<P: crate::filters::Packet>(
fn read<P: crate::filters::PacketMut>(
&self,
ctx: &mut ReadContext<'_, P>,
) -> Result<(), FilterError> {
self.load().read(ctx)
}

fn write<P: crate::filters::Packet>(
fn write<P: crate::filters::PacketMut>(
&self,
ctx: &mut WriteContext<P>,
) -> Result<(), FilterError> {
Expand Down
18 changes: 4 additions & 14 deletions src/filters.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ pub mod token_router;
pub mod prelude {
pub use super::{
ConvertProtoConfigError, CreateFilterArgs, CreationError, Filter, FilterError,
FilterInstance, Packet, ReadContext, StaticFilter, WriteContext,
FilterInstance, Packet, PacketMut, ReadContext, StaticFilter, WriteContext,
};
}

Expand Down Expand Up @@ -73,6 +73,7 @@ pub use self::{
use crate::test::TestFilter;

pub use self::chain::FilterChain;
pub use crate::components::proxy::packet_router::{Packet, PacketMut};

#[enum_dispatch::enum_dispatch(Filter)]
pub enum FilterKind {
Expand Down Expand Up @@ -179,17 +180,6 @@ where
}
}

pub trait Packet: Sized {
fn as_slice(&self) -> &[u8];
fn as_mut_slice(&mut self) -> &mut [u8];
fn set_len(&mut self, len: usize);
fn remove_head(&mut self, length: usize);
fn remove_tail(&mut self, length: usize);
fn extend_head(&mut self, bytes: &[u8]);
fn extend_tail(&mut self, bytes: &[u8]);
fn alloc_sized(&self, size: usize) -> Option<Self>;
}

/// Trait for routing and manipulating packets.
///
/// An implementation of [`Filter`] provides a `read` and a `write` method. Both
Expand Down Expand Up @@ -219,7 +209,7 @@ pub trait Filter: Send + Sync {
/// This function should return an `Some` if the packet processing should
/// proceed. If the packet should be rejected, it will return [`None`]
/// instead. By default, the context passes through unchanged.
fn read<P: Packet>(&self, _: &mut ReadContext<'_, P>) -> Result<(), FilterError> {
fn read<P: PacketMut>(&self, _: &mut ReadContext<'_, P>) -> Result<(), FilterError> {
Ok(())
}

Expand All @@ -229,7 +219,7 @@ pub trait Filter: Send + Sync {
///
/// This function should return an `Some` if the packet processing should
/// proceed. If the packet should be rejected, it will return [`None`]
fn write<P: Packet>(&self, _: &mut WriteContext<P>) -> Result<(), FilterError> {
fn write<P: PacketMut>(&self, _: &mut WriteContext<P>) -> Result<(), FilterError> {
Ok(())
}
}
2 changes: 1 addition & 1 deletion src/filters/capture.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ impl Capture {

impl Filter for Capture {
#[cfg_attr(feature = "instrument", tracing::instrument(skip(self, ctx)))]
fn read<P: Packet>(&self, ctx: &mut ReadContext<'_, P>) -> Result<(), FilterError> {
fn read<P: PacketMut>(&self, ctx: &mut ReadContext<'_, P>) -> Result<(), FilterError> {
let capture = self.capture.capture(ctx.contents.as_slice());
ctx.metadata.insert(
self.is_present_key,
Expand Down
4 changes: 2 additions & 2 deletions src/filters/chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -273,7 +273,7 @@ impl schemars::JsonSchema for FilterChain {
}

impl Filter for FilterChain {
fn read<P: Packet>(&self, ctx: &mut ReadContext<'_, P>) -> Result<(), FilterError> {
fn read<P: PacketMut>(&self, ctx: &mut ReadContext<'_, P>) -> Result<(), FilterError> {
for ((id, instance), histogram) in self
.filters
.iter()
Expand Down Expand Up @@ -303,7 +303,7 @@ impl Filter for FilterChain {
Ok(())
}

fn write<P: Packet>(&self, ctx: &mut WriteContext<P>) -> Result<(), FilterError> {
fn write<P: PacketMut>(&self, ctx: &mut WriteContext<P>) -> Result<(), FilterError> {
for ((id, instance), histogram) in self
.filters
.iter()
Expand Down
4 changes: 2 additions & 2 deletions src/filters/compress.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ impl Compress {

impl Filter for Compress {
#[cfg_attr(feature = "instrument", tracing::instrument(skip(self, ctx)))]
fn read<P: Packet>(&self, ctx: &mut ReadContext<'_, P>) -> Result<(), FilterError> {
fn read<P: PacketMut>(&self, ctx: &mut ReadContext<'_, P>) -> Result<(), FilterError> {
let original_size = ctx.contents.as_slice().len();

match self.on_read {
Expand Down Expand Up @@ -96,7 +96,7 @@ impl Filter for Compress {
}

#[cfg_attr(feature = "instrument", tracing::instrument(skip(self, ctx)))]
fn write<P: Packet>(&self, ctx: &mut WriteContext<P>) -> Result<(), FilterError> {
fn write<P: PacketMut>(&self, ctx: &mut WriteContext<P>) -> Result<(), FilterError> {
let original_size = ctx.contents.as_slice().len();
match self.on_write {
Action::Compress => match self.compressor.encode(&ctx.contents) {
Expand Down
6 changes: 3 additions & 3 deletions src/filters/compress/compressor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
* limitations under the License.
*/

use crate::filters::Packet;
use crate::filters::PacketMut;
use parking_lot::Mutex;
use std::io;

Expand All @@ -30,7 +30,7 @@ pub enum Compressor {
}

impl Compressor {
pub fn encode<P: Packet>(&self, contents: &P) -> io::Result<P> {
pub fn encode<P: PacketMut>(&self, contents: &P) -> io::Result<P> {
let input = contents.as_slice();
let encoded = match self {
Self::Snappy(imp) => {
Expand Down Expand Up @@ -80,7 +80,7 @@ impl Compressor {
Ok(encoded)
}

pub fn decode<P: Packet>(&self, contents: &P) -> io::Result<P> {
pub fn decode<P: PacketMut>(&self, contents: &P) -> io::Result<P> {
let input = contents.as_slice();
let decoded = match self {
Self::Snappy(_imp) => {
Expand Down
4 changes: 2 additions & 2 deletions src/filters/concatenate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ impl Concatenate {
}

impl Filter for Concatenate {
fn read<P: Packet>(&self, ctx: &mut ReadContext<'_, P>) -> Result<(), FilterError> {
fn read<P: PacketMut>(&self, ctx: &mut ReadContext<'_, P>) -> Result<(), FilterError> {
match self.on_read {
Strategy::Append => {
ctx.contents.extend_tail(&self.bytes);
Expand All @@ -57,7 +57,7 @@ impl Filter for Concatenate {
Ok(())
}

fn write<P: Packet>(&self, ctx: &mut WriteContext<P>) -> Result<(), FilterError> {
fn write<P: PacketMut>(&self, ctx: &mut WriteContext<P>) -> Result<(), FilterError> {
match self.on_write {
Strategy::Append => {
ctx.contents.extend_tail(&self.bytes);
Expand Down
4 changes: 2 additions & 2 deletions src/filters/debug.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,13 +38,13 @@ impl Debug {

impl Filter for Debug {
#[cfg_attr(feature = "instrument", tracing::instrument(skip(self, ctx)))]
fn read<P: Packet>(&self, ctx: &mut ReadContext<'_, P>) -> Result<(), FilterError> {
fn read<P: PacketMut>(&self, ctx: &mut ReadContext<'_, P>) -> Result<(), FilterError> {
info!(id = ?self.config.id, source = ?&ctx.source, contents = ?String::from_utf8_lossy(ctx.contents.as_slice()), "Read filter event");
Ok(())
}

#[cfg_attr(feature = "instrument", tracing::instrument(skip(self, ctx)))]
fn write<P: Packet>(&self, ctx: &mut WriteContext<P>) -> Result<(), FilterError> {
fn write<P: PacketMut>(&self, ctx: &mut WriteContext<P>) -> Result<(), FilterError> {
info!(
id = ?self.config.id,
source = ?&ctx.source,
Expand Down
Loading

0 comments on commit c81df40

Please sign in to comment.