Skip to content

Commit

Permalink
intercept: rewrite the implementation draft to use TCP
Browse files Browse the repository at this point in the history
  • Loading branch information
rizsotto committed Feb 11, 2024
1 parent 6470e8d commit 5906ab2
Showing 1 changed file with 131 additions and 60 deletions.
191 changes: 131 additions & 60 deletions source/intercept_rs/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,24 +1,26 @@
pub mod ipc {
use std::collections::HashMap;
use std::io::{Read, Write};
use std::path::PathBuf;

use chrono::Utc;
use serde::{Deserialize, Serialize};

#[derive(Debug, Serialize, Deserialize, PartialEq)]
struct SessionLocator(String);
#[derive(Serialize, Deserialize, Debug, PartialEq)]
pub struct SessionLocator(pub String);

// Reporter id is a unique identifier for a reporter.
//
// It is used to identify the process that sends the execution report.
// Because the OS PID is not unique across a single build (PIDs are
// recycled), we need to use a new unique identifier to identify the process.
#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
#[derive(Serialize, Deserialize, Debug, PartialEq, Clone)]
pub struct ReporterId(pub u64);

#[derive(Debug, Serialize, Deserialize, PartialEq)]
pub struct ProcessId(u32);
#[derive(Serialize, Deserialize, Debug, PartialEq)]
pub struct ProcessId(pub u32);

#[derive(Debug, Serialize, Deserialize, PartialEq)]
#[derive(Serialize, Deserialize, Debug, PartialEq)]
pub struct Execution {
pub executable: PathBuf,
pub arguments: Vec<String>,
Expand All @@ -32,7 +34,7 @@ pub mod ipc {
// terminate), but can be extended later with performance related
// events like monitoring the CPU usage or the memory allocation if
// this information is available.
#[derive(Debug, Serialize, Deserialize, PartialEq)]
#[derive(Serialize, Deserialize, Debug, PartialEq)]
pub enum Event {
Started {
pid: ProcessId,
Expand All @@ -47,104 +49,173 @@ pub mod ipc {
},
}

#[derive(Debug, Serialize, Deserialize, PartialEq)]
#[derive(Serialize, Deserialize, Debug, PartialEq)]
pub struct Envelope {
pub rid: ReporterId,
pub timestamp: u64,
pub event: Event,
}

impl Envelope {
pub fn new(rid: &ReporterId, event: Event) -> Self {
let timestamp = Utc::now().timestamp_millis() as u64;
Envelope { rid: rid.clone(), timestamp, event }
}

pub fn read_from(mut reader: impl Read) -> Result<Self, anyhow::Error> {
let mut length_bytes = [0; 4];
reader.read_exact(&mut length_bytes)?;
let length = u32::from_be_bytes(length_bytes) as usize;

let mut buffer = vec![0; length];
reader.read_exact(&mut buffer)?;
let envelope = serde_json::from_slice(buffer.as_ref())?;

Ok(envelope)
}

pub fn write_into(&self, mut writer: impl Write) -> Result<(), anyhow::Error> {
let serialized_envelope = serde_json::to_string(&self)?;
let bytes = serialized_envelope.into_bytes();
let length = bytes.len() as u32;

writer.write_all(&length.to_be_bytes())?;
writer.write_all(&bytes)?;

Ok(())
}
}
}

mod client {
use std::net::UdpSocket;
use std::net::TcpStream;

use super::ipc::{Envelope, Event, ReporterId};
use rand::random;

use rand::Rng;
use super::ipc::{Envelope, Event, ReporterId};

impl ReporterId {
pub fn new() -> Self {
let id = rand::thread_rng().gen::<u64>();
let id = random::<u64>();
ReporterId(id)
}
}

impl Envelope {
pub fn new(rid: &ReporterId, event: super::ipc::Event) -> Self {
let timestamp = chrono::Utc::now().timestamp_millis() as u64;
Envelope { rid: rid.clone(), timestamp, event }
}
}

// Represents the remote sink of supervised process events.
//
// Events from a process execution can be sent from many actors (mostly
// supervisor processes). The events are collected in a common place
// in order to reconstruct of final report of a build process.
trait Report {
fn report(&self, event: Event);
trait Reporter {
fn report(&mut self, event: Event) -> Result<(), anyhow::Error>;
}

struct UdpReporter {
socket: UdpSocket,
struct TcpReporter {
socket: TcpStream,
destination: String,
reporter_id: ReporterId,
}

impl Report for UdpReporter {
fn report(&self, event: Event) {
impl TcpReporter {
pub fn new(destination: String) -> Result<Self, anyhow::Error> {
let socket = TcpStream::connect(destination.clone())?;
let reporter_id = ReporterId::new();
let result = TcpReporter { socket, destination, reporter_id };
Ok(result)
}
}

impl Reporter for TcpReporter {
fn report(&mut self, event: Event) -> Result<(), anyhow::Error> {
let envelope = Envelope::new(&self.reporter_id, event);
let serialized_envelope = match serde_json::to_string(&envelope) {
Ok(s) => s,
Err(e) => {
eprintln!("Failed to serialize envelope: {}", e);
return;
}
};
envelope.write_into(&mut self.socket)?;

match self.socket.send_to(serialized_envelope.as_bytes(), &self.destination) {
Ok(_) => (),
Err(e) => eprintln!("Failed to send envelope: {}", e),
};
Ok(())
}
}
}

mod server {
use std::net::UdpSocket;
use std::net::{TcpListener, TcpStream};

use crossbeam::channel::{Receiver, Sender};
use crossbeam_channel::bounded;

use crossbeam::channel::Sender;
use serde_json::Result;
use super::ipc::{Envelope, SessionLocator};

use super::ipc::Envelope;
trait EventCollector {
fn address(&self) -> Result<SessionLocator, anyhow::Error>;
fn collect(&self, destination: Sender<Envelope>) -> Result<(), anyhow::Error>;
fn stop(&self) -> Result<(), anyhow::Error>;
}

struct UdpServer {
socket: UdpSocket,
sender: Sender<Envelope>,
struct EventCollectorOnTcp {
control_input: Sender<bool>,
control_output: Receiver<bool>,
listener: TcpListener,
}

impl UdpServer {
fn listen(&self) {
let mut buf = [0; 4096];
impl EventCollectorOnTcp {
pub fn new() -> Result<Self, anyhow::Error> {
let (control_input, control_output) = bounded(0);
let listener = TcpListener::bind("127.0.0.1:0")?;

let result = EventCollectorOnTcp { control_input, control_output, listener };

Ok(result)
}

pub fn send(
&self,
mut socket: TcpStream,
destination: Sender<Envelope>,
) -> Result<(), anyhow::Error> {
let envelope = Envelope::read_from(&mut socket)?;
destination.send(envelope)?;

Ok(())
}
}

impl EventCollector for EventCollectorOnTcp {
fn address(&self) -> Result<SessionLocator, anyhow::Error> {
let local_addr = self.listener.local_addr()?;
let locator = SessionLocator(local_addr.to_string());
Ok(locator)
}

fn collect(&self, destination: Sender<Envelope>) -> Result<(), anyhow::Error> {
loop {
match self.socket.recv_from(&mut buf) {
Ok((amt, _src)) => {
let data = &mut buf[..amt];
let envelope: Result<Envelope> = serde_json::from_slice(data);

match envelope {
Ok(envelope) => {
if let Err(e) = self.sender.send(envelope) {
eprintln!("Failed to send envelope to channel: {}", e);
}
}
Err(e) => eprintln!("Failed to deserialize envelope: {}", e),
}
if let Ok(shutdown) = self.control_output.try_recv() {
if shutdown {
break;
}
}

match self.listener.accept() {
Ok((stream, _)) => {
println!("Got a connection");
// ... (process the connection in a separate thread or task)
self.send(stream, destination.clone())?;
}
Err(ref e) if e.kind() == std::io::ErrorKind::WouldBlock => {
// No new connection available, continue checking for shutdown
continue;
}
Err(e) => {
println!("Error: {}", e);
break;
}
Err(e) => eprintln!("Failed to receive data: {}", e),
}
}

println!("Server shutting down");
Ok(())
}

fn stop(&self) -> Result<(), anyhow::Error> {
self.control_input.send(true)?;
Ok(())
}
}
}

0 comments on commit 5906ab2

Please sign in to comment.