Skip to content

Commit

Permalink
Avoid lagged receiver in TCP manager
Browse files Browse the repository at this point in the history
  • Loading branch information
andreafioraldi committed Nov 17, 2023
1 parent d782585 commit 96d3a20
Showing 1 changed file with 12 additions and 11 deletions.
23 changes: 12 additions & 11 deletions libafl/src/events/tcp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ use libafl_bolts::{shmem::StdShMemProvider, staterestore::StateRestorer};
use serde::{de::DeserializeOwned, Deserialize, Serialize};
use tokio::{
io::{AsyncReadExt, AsyncWriteExt},
sync::{broadcast, mpsc},
sync::{broadcast, broadcast::error::RecvError, mpsc},
task::{spawn, JoinHandle},
};
#[cfg(feature = "std")]
Expand Down Expand Up @@ -111,8 +111,8 @@ where
#[tokio::main(flavor = "current_thread")]
#[allow(clippy::too_many_lines)]
pub async fn broker_loop(&mut self) -> Result<(), Error> {
let (tx_bc, rx) = broadcast::channel(1024);
let (tx, mut rx_mpsc) = mpsc::channel(1024);
let (tx_bc, rx) = broadcast::channel(65536);
let (tx, mut rx_mpsc) = mpsc::channel(65536);

let exit_cleanly_after = self.exit_cleanly_after;

Expand Down Expand Up @@ -224,13 +224,14 @@ where
spawn(async move {
// In a loop, read data from the socket and write the data back.
loop {
let buf: Vec<u8> = rx_inner
.lock()
.await
.recv()
.await
.expect("Could not receive");
// TODO handle full capacity, Lagged https://docs.rs/tokio/latest/tokio/sync/broadcast/error/enum.RecvError.html
let buf: Vec<u8> = match rx_inner.lock().await.recv().await {
Ok(buf) => buf,
Err(RecvError::Lagged(num)) => {
log::error!("Receiver lagged, skipping {num} messages");
continue;
}
_ => panic!("Could not receive"),
};

#[cfg(feature = "tcp_debug")]
println!("{buf:?}");
Expand Down Expand Up @@ -704,7 +705,7 @@ where
if self_id == other_client_id {
panic!("Own ID should never have been sent by the broker");
} else {
println!("{self_id:?} (from {other_client_id:?}) Received: {buf:?}");
log::info!("{self_id:?} (from {other_client_id:?}) Received: {buf:?}");

let event = postcard::from_bytes(&buf[4..])?;
self.handle_in_client(fuzzer, executor, state, other_client_id, event)?;
Expand Down

0 comments on commit 96d3a20

Please sign in to comment.