Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Multi-machine: make sure only testcases are sent over the wire #2601

Draft
wants to merge 1 commit into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions fuzzers/inprocess/sqlite_centralized_multi_machine/build.sh
Original file line number Diff line number Diff line change
Expand Up @@ -32,11 +32,11 @@ make -j$(nproc)
make sqlite3.c
popd

if [ "$1" = "release" ]; then
./target/release/libafl_cc --libafl -I ./sqlite3 -c ./sqlite3/test/ossfuzz.c -o ./sqlite3/test/ossfuzz.o
./target/release/libafl_cxx --libafl -o ossfuzz ./sqlite3/test/ossfuzz.o ./sqlite3/sqlite3.o -pthread -ldl -lz
else
if [ "$1" = "d" ]; then
./target/debug/libafl_cc --libafl -I ./sqlite3 -c ./sqlite3/test/ossfuzz.c -o ./sqlite3/test/ossfuzz.o
./target/debug/libafl_cxx --libafl -o ossfuzz ./sqlite3/test/ossfuzz.o ./sqlite3/sqlite3.o -pthread -ldl -lz
else
./target/release/libafl_cc --libafl -I ./sqlite3 -c ./sqlite3/test/ossfuzz.c -o ./sqlite3/test/ossfuzz.o
./target/release/libafl_cxx --libafl -o ossfuzz ./sqlite3/test/ossfuzz.o ./sqlite3/sqlite3.o -pthread -ldl -lz
fi

Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
#!/bin/bash

./ossfuzz --cores 0-3 --input ./corpus --parent-addr 0.0.0.0:50000 --broker-port 3000
./ossfuzz --cores 2-3 --input ./corpus --parent-addr 0.0.0.0:50000 --broker-port 3000
69 changes: 39 additions & 30 deletions libafl/src/events/broker_hooks/centralized_multi_machine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use std::{
#[cfg(feature = "llmp_compression")]
use libafl_bolts::llmp::LLMP_FLAG_COMPRESSED;
use libafl_bolts::{
llmp::{Flags, LlmpBrokerInner, LlmpHook, LlmpMsgHookResult, Tag, LLMP_FLAG_FROM_MM},
llmp::{Flags, LlmpBrokerInner, LlmpHook, LlmpMsgHookResult, Tag, LLMP_FLAG_MM_FORWARD},
ownedref::OwnedRef,
shmem::ShMemProvider,
ClientId, Error,
Expand All @@ -30,18 +30,25 @@ use crate::{
inputs::Input,
};

/// Makes a raw pointer send + sync.
/// Makes anything send + sync.
/// It is intended to be used for raw-pointers in very specific scenarios.
/// Extremely unsafe to use in general, only use this if you know what you're doing.
///
/// Some guarantees that should be enforced to use it in general (for raw pointers):
/// - the pointee should be initialized
/// - the pointee should never be written again
/// - the pointee should never be unmapped
/// - the pointee should never be moved
#[derive(Debug, Clone, Copy)]
pub struct NullLock<T> {
struct MultiMachineNullLock<T> {
value: T,
}

unsafe impl<T> Send for NullLock<T> {}
unsafe impl<T> Sync for NullLock<T> {}
unsafe impl<T> Send for MultiMachineNullLock<T> {}
unsafe impl<T> Sync for MultiMachineNullLock<T> {}

impl<T> NullLock<T> {
/// Instantiate a [`NullLock`]
impl<T> MultiMachineNullLock<T> {
/// Instantiate a [`MultiMachineNullLock`]
///
/// # Safety
///
Expand All @@ -50,24 +57,24 @@ impl<T> NullLock<T> {
Self { value }
}

/// Get a reference to value
pub fn get(&self) -> &T {
&self.value
}
// /// Get a reference to value
// pub fn get(&self) -> &T {
// &self.value
// }

/// Get a mutable reference to value
pub fn get_mut(&mut self) -> &mut T {
&mut self.value
}
// /// Get a mutable reference to value
// pub fn get_mut(&mut self) -> &mut T {
// &mut self.value
// }

/// Get back the value
pub fn into_innter(self) -> T {
self.value
}
}

/// The Receiving side of the multi-machine architecture
/// It is responsible for receiving messages from other neighbours.
/// The sending side of the multi-machine architecture
/// It is responsible for sending messages to other neighbours.
/// Please check [`crate::events::multi_machine`] for more information.
#[derive(Debug)]
pub struct TcpMultiMachineLlmpSenderHook<A, I>
Expand All @@ -81,7 +88,7 @@ where
phantom: PhantomData<I>,
}

/// The Receiving side of the multi-machine architecture
/// The receiving side of the multi-machine architecture
/// It is responsible for receiving messages from other neighbours.
/// Please check [`crate::events::multi_machine`] for more information.
#[derive(Debug)]
Expand Down Expand Up @@ -164,23 +171,27 @@ where
SP: ShMemProvider,
I: Input + Send + Sync + 'static,
{
/// check for received messages, and forward them alongside the incoming message to inner.
/// send new interesting messages over the wire
fn on_new_message(
&mut self,
_broker_inner: &mut LlmpBrokerInner<SP>,
_client_id: ClientId,
_msg_tag: &mut Tag,
_msg_flags: &mut Flags,
msg_flags: &mut Flags,
msg: &mut [u8],
_new_msgs: &mut Vec<(Tag, Flags, Vec<u8>)>,
) -> Result<LlmpMsgHookResult, Error> {
if (*msg_flags & LLMP_FLAG_MM_FORWARD) == Flags(0) {
return Ok(LlmpMsgHookResult::ForwardToClients);
}

let shared_state = self.shared_state.clone();

// # Safety
//
// Here, we suppose msg will *never* be written again and will always be available.
// Thus, it is safe to handle this in a separate thread.
let msg_lock = unsafe { NullLock::new((msg.as_ptr(), msg.len())) };
// let flags = msg_flags.clone();
let msg_lock = unsafe { MultiMachineNullLock::new((msg.as_ptr(), msg.len())) };

let _handle: JoinHandle<Result<(), Error>> = self.rt.spawn(async move {
let mut state_wr_lock = shared_state.write().await;
Expand Down Expand Up @@ -255,22 +266,20 @@ where
let msg = msg.into_owned().unwrap().into_vec();
#[cfg(feature = "llmp_compression")]
match state_wr_lock.compressor().maybe_compress(msg.as_ref()) {
Some(comp_buf) => Ok((
_LLMP_TAG_TO_MAIN,
LLMP_FLAG_COMPRESSED | LLMP_FLAG_FROM_MM,
comp_buf,
)),
None => Ok((_LLMP_TAG_TO_MAIN, LLMP_FLAG_FROM_MM, msg)),
Some(comp_buf) => {
Ok((_LLMP_TAG_TO_MAIN, LLMP_FLAG_COMPRESSED, comp_buf))
}
None => Ok((_LLMP_TAG_TO_MAIN, Flags(0), msg)),
}
#[cfg(not(feature = "llmp_compression"))]
Ok((_LLMP_TAG_TO_MAIN, LLMP_FLAG_FROM_MM, msg))
Ok((_LLMP_TAG_TO_MAIN, Flags(0), msg))
}
MultiMachineMsg::Event(evt) => {
let evt = evt.into_owned().unwrap();
let (inner_flags, buf) =
Self::try_compress(&mut state_wr_lock, evt.as_ref())?;

Ok((_LLMP_TAG_TO_MAIN, inner_flags | LLMP_FLAG_FROM_MM, buf))
Ok((_LLMP_TAG_TO_MAIN, inner_flags, buf))
}
})
.collect();
Expand Down
3 changes: 2 additions & 1 deletion libafl/src/events/launcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -747,8 +747,9 @@ where
}

// Create this after forks, to avoid problems with tokio runtime

//
// # Safety
//
// The `multi_machine_receiver_hook` needs messages to outlive the receiver.
// The underlying memory region for incoming messages lives longer than the async thread processing them.
#[cfg(feature = "multi_machine")]
Expand Down
39 changes: 23 additions & 16 deletions libafl/src/events/llmp/mgr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,14 +8,13 @@ use core::{marker::PhantomData, time::Duration};
#[cfg(feature = "std")]
use std::net::TcpStream;

#[cfg(feature = "multi_machine")]
use libafl_bolts::llmp::LLMP_FLAG_MM_FORWARD;
#[cfg(feature = "llmp_compression")]
use libafl_bolts::{
compress::GzipCompressor,
llmp::{LLMP_FLAG_COMPRESSED, LLMP_FLAG_INITIALIZED},
};
use libafl_bolts::{compress::GzipCompressor, llmp::LLMP_FLAG_COMPRESSED};
use libafl_bolts::{
current_time,
llmp::{LlmpClient, LlmpClientDescription, LLMP_FLAG_FROM_MM},
llmp::{LlmpClient, LlmpClientDescription, LLMP_FLAG_INITIALIZED},
shmem::{NopShMemProvider, ShMemProvider},
tuples::Handle,
ClientId,
Expand Down Expand Up @@ -523,7 +522,12 @@ where
event: Event<<Self::State as UsesInput>::Input>,
) -> Result<(), Error> {
let serialized = postcard::to_allocvec(&event)?;
let flags = LLMP_FLAG_INITIALIZED;
let mut flags = LLMP_FLAG_INITIALIZED;

#[cfg(feature = "multi_machine")]
if event.is_new_testcase() {
flags = flags | LLMP_FLAG_MM_FORWARD;
}

match self.compressor.maybe_compress(&serialized) {
Some(comp_buf) => {
Expand All @@ -534,7 +538,8 @@ where
)?;
}
None => {
self.llmp.send_buf(LLMP_TAG_EVENT_TO_BOTH, &serialized)?;
self.llmp
.send_buf_with_flags(LLMP_TAG_EVENT_TO_BOTH, flags, &serialized)?;
}
}
self.last_sent = current_time();
Expand All @@ -548,8 +553,16 @@ where
_state: &mut Self::State,
event: Event<<Self::State as UsesInput>::Input>,
) -> Result<(), Error> {
let mut flags = LLMP_FLAG_INITIALIZED;

#[cfg(feature = "multi_machine")]
if event.is_new_testcase() {
flags = flags | LLMP_FLAG_MM_FORWARD;
}

let serialized = postcard::to_allocvec(&event)?;
self.llmp.send_buf(LLMP_TAG_EVENT_TO_BOTH, &serialized)?;
self.llmp
.send_buf_with_flags(LLMP_TAG_EVENT_TO_BOTH, flags, &serialized)?;
Ok(())
}

Expand Down Expand Up @@ -605,7 +618,7 @@ where
// TODO: Get around local event copy by moving handle_in_client
let self_id = self.llmp.sender().id();
let mut count = 0;
while let Some((client_id, tag, flags, msg)) = self.llmp.recv_buf_with_flags()? {
while let Some((client_id, tag, _flags, msg)) = self.llmp.recv_buf_with_flags()? {
assert!(
tag != _LLMP_TAG_EVENT_TO_BROKER,
"EVENT_TO_BROKER parcel should not have arrived in the client!"
Expand All @@ -619,7 +632,7 @@ where
#[cfg(feature = "llmp_compression")]
let compressed;
#[cfg(feature = "llmp_compression")]
let event_bytes = if flags & LLMP_FLAG_COMPRESSED == LLMP_FLAG_COMPRESSED {
let event_bytes = if _flags & LLMP_FLAG_COMPRESSED == LLMP_FLAG_COMPRESSED {
compressed = self.compressor.decompress(msg)?;
&compressed
} else {
Expand All @@ -628,12 +641,6 @@ where
let event: Event<S::Input> = postcard::from_bytes(event_bytes)?;
log::debug!("Received event in normal llmp {}", event.name_detailed());

// If the message comes from another machine, do not
// consider other events than new testcase.
if !event.is_new_testcase() && (flags & LLMP_FLAG_FROM_MM == LLMP_FLAG_FROM_MM) {
continue;
}

self.handle_in_client(fuzzer, executor, state, client_id, event)?;
count += 1;
}
Expand Down
Loading
Loading