Skip to content

Commit

Permalink
intercept: rewrite the implementation draft to use TCP
Browse files Browse the repository at this point in the history
  • Loading branch information
rizsotto committed Feb 11, 2024
1 parent 6470e8d commit d2447a1
Showing 1 changed file with 104 additions and 51 deletions.
155 changes: 104 additions & 51 deletions source/intercept_rs/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,21 +4,21 @@ pub mod ipc {

use serde::{Deserialize, Serialize};

#[derive(Debug, Serialize, Deserialize, PartialEq)]
#[derive(Serialize, Deserialize, Debug, PartialEq)]
struct SessionLocator(String);

// Reporter id is a unique identifier for a reporter.
//
// It is used to identify the process that sends the execution report.
// Because the OS PID is not unique across a single build (PIDs are
// recycled), we need to use a new unique identifier to identify the process.
#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
#[derive(Serialize, Deserialize, Debug, PartialEq, Clone)]
pub struct ReporterId(pub u64);

#[derive(Debug, Serialize, Deserialize, PartialEq)]
#[derive(Serialize, Deserialize, Debug, PartialEq)]
pub struct ProcessId(u32);

#[derive(Debug, Serialize, Deserialize, PartialEq)]
#[derive(Serialize, Deserialize, Debug, PartialEq)]
pub struct Execution {
pub executable: PathBuf,
pub arguments: Vec<String>,
Expand All @@ -32,7 +32,7 @@ pub mod ipc {
// terminate), but can be extended later with performance related
// events like monitoring the CPU usage or the memory allocation if
// this information is available.
#[derive(Debug, Serialize, Deserialize, PartialEq)]
#[derive(Serialize, Deserialize, Debug, PartialEq)]
pub enum Event {
Started {
pid: ProcessId,
Expand All @@ -47,7 +47,7 @@ pub mod ipc {
},
}

#[derive(Debug, Serialize, Deserialize, PartialEq)]
#[derive(Serialize, Deserialize, Debug, PartialEq)]
pub struct Envelope {
pub rid: ReporterId,
pub timestamp: u64,
Expand All @@ -56,21 +56,22 @@ pub mod ipc {
}

mod client {
use std::net::UdpSocket;
use std::io::Write;
use std::net::TcpStream;

use super::ipc::{Envelope, Event, ReporterId};
use rand::random;

use rand::Rng;
use super::ipc::{Envelope, Event, ReporterId};

impl ReporterId {
pub fn new() -> Self {
let id = rand::thread_rng().gen::<u64>();
let id = random::<u64>();
ReporterId(id)
}
}

impl Envelope {
pub fn new(rid: &ReporterId, event: super::ipc::Event) -> Self {
pub fn new(rid: &ReporterId, event: Event) -> Self {
let timestamp = chrono::Utc::now().timestamp_millis() as u64;
Envelope { rid: rid.clone(), timestamp, event }
}
Expand All @@ -81,70 +82,122 @@ mod client {
// Events from a process execution can be sent from many actors (mostly
// supervisor processes). The events are collected in a common place
// in order to reconstruct of final report of a build process.
trait Report {
fn report(&self, event: Event);
trait Reporter {
fn report(&mut self, event: Event) -> Result<(), anyhow::Error>;
}

struct UdpReporter {
socket: UdpSocket,
struct TcpReporter {
socket: TcpStream,
destination: String,
reporter_id: ReporterId,
}

impl Report for UdpReporter {
fn report(&self, event: Event) {
impl TcpReporter {
pub fn new(destination: String) -> Result<Self, anyhow::Error> {
let socket = TcpStream::connect(destination.clone())?;
let reporter_id = ReporterId::new();
let result = TcpReporter { socket, destination, reporter_id };
Ok(result)
}
}

impl Reporter for TcpReporter {
fn report(&mut self, event: Event) -> Result<(), anyhow::Error> {
let envelope = Envelope::new(&self.reporter_id, event);
let serialized_envelope = match serde_json::to_string(&envelope) {
Ok(s) => s,
Err(e) => {
eprintln!("Failed to serialize envelope: {}", e);
return;
}
};
let serialized_envelope = serde_json::to_string(&envelope)?;

self.socket.write_all(serialized_envelope.as_bytes())?;

match self.socket.send_to(serialized_envelope.as_bytes(), &self.destination) {
Ok(_) => (),
Err(e) => eprintln!("Failed to send envelope: {}", e),
};
Ok(())
}
}
}

mod server {
use std::net::UdpSocket;
use std::io::Read;
use std::net::{TcpListener, TcpStream};

use crossbeam::channel::Sender;
use serde_json::Result;
use crossbeam::channel::{Receiver, Sender};
use crossbeam_channel::bounded;

use super::ipc::Envelope;

struct UdpServer {
socket: UdpSocket,
sender: Sender<Envelope>,
trait EventCollector {
fn address(&self) -> Result<String, anyhow::Error>;
fn collect(&self, destination: Sender<Envelope>) -> Result<(), anyhow::Error>;
fn stop(&self) -> Result<(), anyhow::Error>;
}

struct EventCollectorOnTcp {
control_input: Sender<bool>,
control_output: Receiver<bool>,
listener: TcpListener,
}

impl EventCollectorOnTcp {
pub fn new() -> Result<Self, anyhow::Error> {
let (control_input, control_output) = bounded(0);
let listener = TcpListener::bind("127.0.0.1:0")?;

let result = EventCollectorOnTcp { control_input, control_output, listener };

Ok(result)
}

pub fn send(
&self,
mut socket: TcpStream,
destination: Sender<Envelope>,
) -> Result<(), anyhow::Error> {
let mut buffer = vec![];

let amt = socket.read_to_end(&mut buffer)?;
let envelope = serde_json::from_slice(buffer[..amt].as_ref())?;
destination.send(envelope)?;

Ok(())
}
}

impl UdpServer {
fn listen(&self) {
let mut buf = [0; 4096];
impl EventCollector for EventCollectorOnTcp {

fn address(&self) -> Result<String, anyhow::Error> {
let local_addr = self.listener.local_addr()?;
Ok(local_addr.to_string())
}

fn collect(&self, destination: Sender<Envelope>) -> Result<(), anyhow::Error> {
loop {
match self.socket.recv_from(&mut buf) {
Ok((amt, _src)) => {
let data = &mut buf[..amt];
let envelope: Result<Envelope> = serde_json::from_slice(data);

match envelope {
Ok(envelope) => {
if let Err(e) = self.sender.send(envelope) {
eprintln!("Failed to send envelope to channel: {}", e);
}
}
Err(e) => eprintln!("Failed to deserialize envelope: {}", e),
}
if let Ok(shutdown) = self.control_output.try_recv() {
if shutdown {
break;
}
}

match self.listener.accept() {
Ok((stream, _)) => {
println!("Got a connection");
// ... (process the connection in a separate thread or task)
self.send(stream, destination.clone())?;
}
Err(ref e) if e.kind() == std::io::ErrorKind::WouldBlock => {
// No new connection available, continue checking for shutdown
continue;
}
Err(e) => {
println!("Error: {}", e);
break;
}
Err(e) => eprintln!("Failed to receive data: {}", e),
}
}

println!("Server shutting down");
Ok(())
}

fn stop(&self) -> Result<(), anyhow::Error> {
self.control_input.send(true)?;
Ok(())
}
}
}

0 comments on commit d2447a1

Please sign in to comment.