From e8ddcc5a17990d4b4a24daa0ef4428ce60f0327e Mon Sep 17 00:00:00 2001 From: Romain Malmain Date: Wed, 9 Oct 2024 16:10:06 +0200 Subject: [PATCH] Multi-machine: make sure only testcases are sent over the wire --- .../sqlite_centralized_multi_machine/build.sh | 8 +- .../run_child.sh | 2 +- .../broker_hooks/centralized_multi_machine.rs | 69 +++++++++------- libafl/src/events/launcher.rs | 3 +- libafl/src/events/llmp/mgr.rs | 39 ++++++---- libafl/src/events/llmp/mod.rs | 78 ++++++++++++------- libafl/src/events/mod.rs | 2 +- libafl_bolts/src/llmp.rs | 8 +- 8 files changed, 123 insertions(+), 86 deletions(-) diff --git a/fuzzers/inprocess/sqlite_centralized_multi_machine/build.sh b/fuzzers/inprocess/sqlite_centralized_multi_machine/build.sh index 7ca12f128f..828995f9e6 100755 --- a/fuzzers/inprocess/sqlite_centralized_multi_machine/build.sh +++ b/fuzzers/inprocess/sqlite_centralized_multi_machine/build.sh @@ -32,11 +32,11 @@ make -j$(nproc) make sqlite3.c popd -if [ "$1" = "release" ]; then - ./target/release/libafl_cc --libafl -I ./sqlite3 -c ./sqlite3/test/ossfuzz.c -o ./sqlite3/test/ossfuzz.o - ./target/release/libafl_cxx --libafl -o ossfuzz ./sqlite3/test/ossfuzz.o ./sqlite3/sqlite3.o -pthread -ldl -lz -else +if [ "$1" = "d" ]; then ./target/debug/libafl_cc --libafl -I ./sqlite3 -c ./sqlite3/test/ossfuzz.c -o ./sqlite3/test/ossfuzz.o ./target/debug/libafl_cxx --libafl -o ossfuzz ./sqlite3/test/ossfuzz.o ./sqlite3/sqlite3.o -pthread -ldl -lz +else + ./target/release/libafl_cc --libafl -I ./sqlite3 -c ./sqlite3/test/ossfuzz.c -o ./sqlite3/test/ossfuzz.o + ./target/release/libafl_cxx --libafl -o ossfuzz ./sqlite3/test/ossfuzz.o ./sqlite3/sqlite3.o -pthread -ldl -lz fi diff --git a/fuzzers/inprocess/sqlite_centralized_multi_machine/run_child.sh b/fuzzers/inprocess/sqlite_centralized_multi_machine/run_child.sh index 5ee94ffb0c..9001d35865 100755 --- a/fuzzers/inprocess/sqlite_centralized_multi_machine/run_child.sh +++ b/fuzzers/inprocess/sqlite_centralized_multi_machine/run_child.sh @@ -1,3 +1,3 @@ #!/bin/bash -./ossfuzz --cores 0-3 --input ./corpus --parent-addr 0.0.0.0:50000 --broker-port 3000 +./ossfuzz --cores 2-3 --input ./corpus --parent-addr 0.0.0.0:50000 --broker-port 3000 diff --git a/libafl/src/events/broker_hooks/centralized_multi_machine.rs b/libafl/src/events/broker_hooks/centralized_multi_machine.rs index 0630f13572..5678e4bb26 100644 --- a/libafl/src/events/broker_hooks/centralized_multi_machine.rs +++ b/libafl/src/events/broker_hooks/centralized_multi_machine.rs @@ -9,7 +9,7 @@ use std::{ #[cfg(feature = "llmp_compression")] use libafl_bolts::llmp::LLMP_FLAG_COMPRESSED; use libafl_bolts::{ - llmp::{Flags, LlmpBrokerInner, LlmpHook, LlmpMsgHookResult, Tag, LLMP_FLAG_FROM_MM}, + llmp::{Flags, LlmpBrokerInner, LlmpHook, LlmpMsgHookResult, Tag, LLMP_FLAG_MM_FORWARD}, ownedref::OwnedRef, shmem::ShMemProvider, ClientId, Error, @@ -30,18 +30,25 @@ use crate::{ inputs::Input, }; -/// Makes a raw pointer send + sync. +/// Makes anything send + sync. +/// It is intended to be used for raw-pointers in very specific scenarios. /// Extremely unsafe to use in general, only use this if you know what you're doing. +/// +/// Some guarantees that should be enforced to use it in general (for raw pointers): +/// - the pointee should be initialized +/// - the pointee should never be written again +/// - the pointee should never be unmapped +/// - the pointee should never be moved #[derive(Debug, Clone, Copy)] -pub struct NullLock { +struct MultiMachineNullLock { value: T, } -unsafe impl Send for NullLock {} -unsafe impl Sync for NullLock {} +unsafe impl Send for MultiMachineNullLock {} +unsafe impl Sync for MultiMachineNullLock {} -impl NullLock { - /// Instantiate a [`NullLock`] +impl MultiMachineNullLock { + /// Instantiate a [`MultiMachineNullLock`] /// /// # Safety /// @@ -50,15 +57,15 @@ impl NullLock { Self { value } } - /// Get a reference to value - pub fn get(&self) -> &T { - &self.value - } + // /// Get a reference to value + // pub fn get(&self) -> &T { + // &self.value + // } - /// Get a mutable reference to value - pub fn get_mut(&mut self) -> &mut T { - &mut self.value - } + // /// Get a mutable reference to value + // pub fn get_mut(&mut self) -> &mut T { + // &mut self.value + // } /// Get back the value pub fn into_innter(self) -> T { @@ -66,8 +73,8 @@ impl NullLock { } } -/// The Receiving side of the multi-machine architecture -/// It is responsible for receiving messages from other neighbours. +/// The sending side of the multi-machine architecture +/// It is responsible for sending messages to other neighbours. /// Please check [`crate::events::multi_machine`] for more information. #[derive(Debug)] pub struct TcpMultiMachineLlmpSenderHook @@ -81,7 +88,7 @@ where phantom: PhantomData, } -/// The Receiving side of the multi-machine architecture +/// The receiving side of the multi-machine architecture /// It is responsible for receiving messages from other neighbours. /// Please check [`crate::events::multi_machine`] for more information. #[derive(Debug)] @@ -164,23 +171,27 @@ where SP: ShMemProvider, I: Input + Send + Sync + 'static, { - /// check for received messages, and forward them alongside the incoming message to inner. + /// send new interesting messages over the wire fn on_new_message( &mut self, _broker_inner: &mut LlmpBrokerInner, _client_id: ClientId, _msg_tag: &mut Tag, - _msg_flags: &mut Flags, + msg_flags: &mut Flags, msg: &mut [u8], _new_msgs: &mut Vec<(Tag, Flags, Vec)>, ) -> Result { + if (*msg_flags & LLMP_FLAG_MM_FORWARD) == Flags(0) { + return Ok(LlmpMsgHookResult::ForwardToClients); + } + let shared_state = self.shared_state.clone(); // # Safety + // // Here, we suppose msg will *never* be written again and will always be available. // Thus, it is safe to handle this in a separate thread. - let msg_lock = unsafe { NullLock::new((msg.as_ptr(), msg.len())) }; - // let flags = msg_flags.clone(); + let msg_lock = unsafe { MultiMachineNullLock::new((msg.as_ptr(), msg.len())) }; let _handle: JoinHandle> = self.rt.spawn(async move { let mut state_wr_lock = shared_state.write().await; @@ -255,22 +266,20 @@ where let msg = msg.into_owned().unwrap().into_vec(); #[cfg(feature = "llmp_compression")] match state_wr_lock.compressor().maybe_compress(msg.as_ref()) { - Some(comp_buf) => Ok(( - _LLMP_TAG_TO_MAIN, - LLMP_FLAG_COMPRESSED | LLMP_FLAG_FROM_MM, - comp_buf, - )), - None => Ok((_LLMP_TAG_TO_MAIN, LLMP_FLAG_FROM_MM, msg)), + Some(comp_buf) => { + Ok((_LLMP_TAG_TO_MAIN, LLMP_FLAG_COMPRESSED, comp_buf)) + } + None => Ok((_LLMP_TAG_TO_MAIN, Flags(0), msg)), } #[cfg(not(feature = "llmp_compression"))] - Ok((_LLMP_TAG_TO_MAIN, LLMP_FLAG_FROM_MM, msg)) + Ok((_LLMP_TAG_TO_MAIN, Flags(0), msg)) } MultiMachineMsg::Event(evt) => { let evt = evt.into_owned().unwrap(); let (inner_flags, buf) = Self::try_compress(&mut state_wr_lock, evt.as_ref())?; - Ok((_LLMP_TAG_TO_MAIN, inner_flags | LLMP_FLAG_FROM_MM, buf)) + Ok((_LLMP_TAG_TO_MAIN, inner_flags, buf)) } }) .collect(); diff --git a/libafl/src/events/launcher.rs b/libafl/src/events/launcher.rs index 95f8abde1f..e298138423 100644 --- a/libafl/src/events/launcher.rs +++ b/libafl/src/events/launcher.rs @@ -747,8 +747,9 @@ where } // Create this after forks, to avoid problems with tokio runtime - + // // # Safety + // // The `multi_machine_receiver_hook` needs messages to outlive the receiver. // The underlying memory region for incoming messages lives longer than the async thread processing them. #[cfg(feature = "multi_machine")] diff --git a/libafl/src/events/llmp/mgr.rs b/libafl/src/events/llmp/mgr.rs index 6e232ad919..f3ff725c59 100644 --- a/libafl/src/events/llmp/mgr.rs +++ b/libafl/src/events/llmp/mgr.rs @@ -8,14 +8,13 @@ use core::{marker::PhantomData, time::Duration}; #[cfg(feature = "std")] use std::net::TcpStream; +#[cfg(feature = "multi_machine")] +use libafl_bolts::llmp::LLMP_FLAG_MM_FORWARD; #[cfg(feature = "llmp_compression")] -use libafl_bolts::{ - compress::GzipCompressor, - llmp::{LLMP_FLAG_COMPRESSED, LLMP_FLAG_INITIALIZED}, -}; +use libafl_bolts::{compress::GzipCompressor, llmp::LLMP_FLAG_COMPRESSED}; use libafl_bolts::{ current_time, - llmp::{LlmpClient, LlmpClientDescription, LLMP_FLAG_FROM_MM}, + llmp::{LlmpClient, LlmpClientDescription, LLMP_FLAG_INITIALIZED}, shmem::{NopShMemProvider, ShMemProvider}, tuples::Handle, ClientId, @@ -523,7 +522,12 @@ where event: Event<::Input>, ) -> Result<(), Error> { let serialized = postcard::to_allocvec(&event)?; - let flags = LLMP_FLAG_INITIALIZED; + let mut flags = LLMP_FLAG_INITIALIZED; + + #[cfg(feature = "multi_machine")] + if event.is_new_testcase() { + flags = flags | LLMP_FLAG_MM_FORWARD; + } match self.compressor.maybe_compress(&serialized) { Some(comp_buf) => { @@ -534,7 +538,8 @@ where )?; } None => { - self.llmp.send_buf(LLMP_TAG_EVENT_TO_BOTH, &serialized)?; + self.llmp + .send_buf_with_flags(LLMP_TAG_EVENT_TO_BOTH, flags, &serialized)?; } } self.last_sent = current_time(); @@ -548,8 +553,16 @@ where _state: &mut Self::State, event: Event<::Input>, ) -> Result<(), Error> { + let mut flags = LLMP_FLAG_INITIALIZED; + + #[cfg(feature = "multi_machine")] + if event.is_new_testcase() { + flags = flags | LLMP_FLAG_MM_FORWARD; + } + let serialized = postcard::to_allocvec(&event)?; - self.llmp.send_buf(LLMP_TAG_EVENT_TO_BOTH, &serialized)?; + self.llmp + .send_buf_with_flags(LLMP_TAG_EVENT_TO_BOTH, flags, &serialized)?; Ok(()) } @@ -605,7 +618,7 @@ where // TODO: Get around local event copy by moving handle_in_client let self_id = self.llmp.sender().id(); let mut count = 0; - while let Some((client_id, tag, flags, msg)) = self.llmp.recv_buf_with_flags()? { + while let Some((client_id, tag, _flags, msg)) = self.llmp.recv_buf_with_flags()? { assert!( tag != _LLMP_TAG_EVENT_TO_BROKER, "EVENT_TO_BROKER parcel should not have arrived in the client!" @@ -619,7 +632,7 @@ where #[cfg(feature = "llmp_compression")] let compressed; #[cfg(feature = "llmp_compression")] - let event_bytes = if flags & LLMP_FLAG_COMPRESSED == LLMP_FLAG_COMPRESSED { + let event_bytes = if _flags & LLMP_FLAG_COMPRESSED == LLMP_FLAG_COMPRESSED { compressed = self.compressor.decompress(msg)?; &compressed } else { @@ -628,12 +641,6 @@ where let event: Event = postcard::from_bytes(event_bytes)?; log::debug!("Received event in normal llmp {}", event.name_detailed()); - // If the message comes from another machine, do not - // consider other events than new testcase. - if !event.is_new_testcase() && (flags & LLMP_FLAG_FROM_MM == LLMP_FLAG_FROM_MM) { - continue; - } - self.handle_in_client(fuzzer, executor, state, client_id, event)?; count += 1; } diff --git a/libafl/src/events/llmp/mod.rs b/libafl/src/events/llmp/mod.rs index bdbe32f4ba..5ba48d9fb6 100644 --- a/libafl/src/events/llmp/mod.rs +++ b/libafl/src/events/llmp/mod.rs @@ -3,13 +3,12 @@ use alloc::{boxed::Box, vec::Vec}; use core::{marker::PhantomData, time::Duration}; +#[cfg(feature = "multi_machine")] +use libafl_bolts::llmp::LLMP_FLAG_MM_FORWARD; #[cfg(feature = "llmp_compression")] +use libafl_bolts::{compress::GzipCompressor, llmp::LLMP_FLAG_COMPRESSED}; use libafl_bolts::{ - compress::GzipCompressor, - llmp::{LLMP_FLAG_COMPRESSED, LLMP_FLAG_INITIALIZED}, -}; -use libafl_bolts::{ - llmp::{LlmpClient, LlmpClientDescription, Tag}, + llmp::{LlmpClient, LlmpClientDescription, Tag, LLMP_FLAG_INITIALIZED}, shmem::{NopShMemProvider, ShMemProvider}, ClientId, }; @@ -422,6 +421,8 @@ where return Ok(()); } + let mut flags = LLMP_FLAG_INITIALIZED; + // Filter out non interestign events and convert `NewTestcase` let converted_event = match event { Event::NewTestcase { @@ -434,24 +435,30 @@ where forward_id, #[cfg(all(unix, feature = "std", feature = "multi_machine"))] node_id, - } => Event::NewTestcase { - input: self.converter.as_mut().unwrap().convert(input)?, - client_config, - exit_kind, - corpus_size, - observers_buf, - time, - forward_id, - #[cfg(all(unix, feature = "std", feature = "multi_machine"))] - node_id, - }, + } => { + #[cfg(feature = "multi_machine")] + { + flags = flags | LLMP_FLAG_MM_FORWARD; + } + + Event::NewTestcase { + input: self.converter.as_mut().unwrap().convert(input)?, + client_config, + exit_kind, + corpus_size, + observers_buf, + time, + forward_id, + #[cfg(all(unix, feature = "std", feature = "multi_machine"))] + node_id, + } + } Event::CustomBuf { buf, tag } => Event::CustomBuf { buf, tag }, _ => { return Ok(()); } }; let serialized = postcard::to_allocvec(&converted_event)?; - let flags = LLMP_FLAG_INITIALIZED; match self.compressor.maybe_compress(&serialized) { Some(comp_buf) => { @@ -462,7 +469,8 @@ where )?; } None => { - self.llmp.send_buf(LLMP_TAG_EVENT_TO_BOTH, &serialized)?; + self.llmp + .send_buf_with_flags(LLMP_TAG_EVENT_TO_BOTH, flags, &serialized)?; } } self.last_sent = libafl_bolts::current_time(); @@ -479,6 +487,8 @@ where return Ok(()); } + let mut flags = LLMP_FLAG_INITIALIZED; + // Filter out non interestign events and convert `NewTestcase` let converted_event = match event { Event::NewTestcase { @@ -491,24 +501,32 @@ where forward_id, #[cfg(all(unix, feature = "std", feature = "multi_machine"))] node_id, - } => Event::NewTestcase { - input: self.converter.as_mut().unwrap().convert(input)?, - client_config, - exit_kind, - corpus_size, - observers_buf, - time, - forward_id, - #[cfg(all(unix, feature = "std", feature = "multi_machine"))] - node_id, - }, + } => { + #[cfg(feature = "multi_machine")] + { + flags = flags | LLMP_FLAG_MM_FORWARD; + } + + Event::NewTestcase { + input: self.converter.as_mut().unwrap().convert(input)?, + client_config, + exit_kind, + corpus_size, + observers_buf, + time, + forward_id, + #[cfg(all(unix, feature = "std", feature = "multi_machine"))] + node_id, + } + } Event::CustomBuf { buf, tag } => Event::CustomBuf { buf, tag }, _ => { return Ok(()); } }; let serialized = postcard::to_allocvec(&converted_event)?; - self.llmp.send_buf(LLMP_TAG_EVENT_TO_BOTH, &serialized)?; + self.llmp + .send_buf_with_flags(LLMP_TAG_EVENT_TO_BOTH, flags, &serialized)?; Ok(()) } } diff --git a/libafl/src/events/mod.rs b/libafl/src/events/mod.rs index abe1e9494a..4d640d8e11 100644 --- a/libafl/src/events/mod.rs +++ b/libafl/src/events/mod.rs @@ -488,7 +488,7 @@ where let executions = *state.executions(); let cur = current_time(); - // Default no introspection implmentation + // Default no introspection implementation #[cfg(not(feature = "introspection"))] self.fire( state, diff --git a/libafl_bolts/src/llmp.rs b/libafl_bolts/src/llmp.rs index d29763bdcd..1ce4b87fab 100644 --- a/libafl_bolts/src/llmp.rs +++ b/libafl_bolts/src/llmp.rs @@ -143,8 +143,8 @@ pub const LLMP_FLAG_INITIALIZED: Flags = Flags(0x0); pub const LLMP_FLAG_COMPRESSED: Flags = Flags(0x1); /// From another broker. pub const LLMP_FLAG_FROM_B2B: Flags = Flags(0x2); -/// From another machine (with the `multi_machine` mode) -pub const LLMP_FLAG_FROM_MM: Flags = Flags(0x4); +/// The message should be forwarded to other machines, and discarded early otherwise +pub const LLMP_FLAG_MM_FORWARD: Flags = Flags(0x4); /// Timt the broker 2 broker connection waits for incoming data, /// before checking for own data to forward again. @@ -2739,7 +2739,7 @@ where new_page.mark_safe_to_unmap(); let _new_client = self.inner.add_client(LlmpReceiver { - id: ClientId(0), // will be auto-filled + id: ClientId(0), // will be autofilled current_recv_shmem: new_page, last_msg_recvd: ptr::null_mut(), shmem_provider: self.inner.shmem_provider.clone(), @@ -2776,7 +2776,9 @@ where let msg_buf = (*msg).try_as_slice_mut(map)?; // The message is not specifically for use. Let the user handle it, then forward it to the clients, if necessary. + // TODO: use a cached vector here let mut new_msgs: Vec<(Tag, Flags, Vec)> = Vec::new(); + if let LlmpMsgHookResult::ForwardToClients = self.hooks.on_new_message_all( &mut self.inner, client_id,