Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Avoid lagged receiver in TCP manager #1672

Merged
merged 1 commit into from
Nov 17, 2023
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 12 additions & 11 deletions libafl/src/events/tcp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ use libafl_bolts::{shmem::StdShMemProvider, staterestore::StateRestorer};
use serde::{de::DeserializeOwned, Deserialize, Serialize};
use tokio::{
io::{AsyncReadExt, AsyncWriteExt},
sync::{broadcast, mpsc},
sync::{broadcast, broadcast::error::RecvError, mpsc},
task::{spawn, JoinHandle},
};
#[cfg(feature = "std")]
Expand Down Expand Up @@ -111,8 +111,8 @@ where
#[tokio::main(flavor = "current_thread")]
#[allow(clippy::too_many_lines)]
pub async fn broker_loop(&mut self) -> Result<(), Error> {
let (tx_bc, rx) = broadcast::channel(1024);
let (tx, mut rx_mpsc) = mpsc::channel(1024);
let (tx_bc, rx) = broadcast::channel(65536);
let (tx, mut rx_mpsc) = mpsc::channel(65536);

let exit_cleanly_after = self.exit_cleanly_after;

Expand Down Expand Up @@ -224,13 +224,14 @@ where
spawn(async move {
// In a loop, read data from the socket and write the data back.
loop {
let buf: Vec<u8> = rx_inner
.lock()
.await
.recv()
.await
.expect("Could not receive");
// TODO handle full capacity, Lagged https://docs.rs/tokio/latest/tokio/sync/broadcast/error/enum.RecvError.html
let buf: Vec<u8> = match rx_inner.lock().await.recv().await {
Ok(buf) => buf,
Err(RecvError::Lagged(num)) => {
log::error!("Receiver lagged, skipping {num} messages");
continue;
}
_ => panic!("Could not receive"),
};

#[cfg(feature = "tcp_debug")]
println!("{buf:?}");
Expand Down Expand Up @@ -704,7 +705,7 @@ where
if self_id == other_client_id {
panic!("Own ID should never have been sent by the broker");
} else {
println!("{self_id:?} (from {other_client_id:?}) Received: {buf:?}");
log::info!("{self_id:?} (from {other_client_id:?}) Received: {buf:?}");

let event = postcard::from_bytes(&buf[4..])?;
self.handle_in_client(fuzzer, executor, state, other_client_id, event)?;
Expand Down
Loading