From 6281e2a9b8bf6e5638946d9444cbfdf1ad0b5780 Mon Sep 17 00:00:00 2001 From: Alexandre Marcireau Date: Mon, 6 Nov 2023 14:55:57 +1100 Subject: [PATCH] Fix a missing flag clear causing a race condition error to incorrectly be raised --- drivers/Cargo.toml | 2 +- drivers/src/devices.rs | 2 +- drivers/src/devices/prophesee_evk3_hd.rs | 2 +- drivers/src/devices/prophesee_evk4.rs | 2 +- drivers/src/lib.rs | 5 ++ drivers/src/usb.rs | 72 +++++++++++++++--------- 6 files changed, 55 insertions(+), 30 deletions(-) diff --git a/drivers/Cargo.toml b/drivers/Cargo.toml index 58aa3c4..a41f468 100644 --- a/drivers/Cargo.toml +++ b/drivers/Cargo.toml @@ -5,7 +5,7 @@ authors = [ "Alexandre Marcireau", ] description = "Neuromorphic devices drivers" -version = "0.8.0" +version = "0.9.0" edition = "2021" license-file = "../LICENSE" homepage = "https://github.com/neuromorphicsystems/neuromorphic-rs/" diff --git a/drivers/src/devices.rs b/drivers/src/devices.rs index dee7475..618d76c 100644 --- a/drivers/src/devices.rs +++ b/drivers/src/devices.rs @@ -273,7 +273,7 @@ macro_rules! register { #[derive(thiserror::Error, Debug, Clone)] pub enum Error { - #[error("{0}")] + #[error(transparent)] Usb(#[from] usb::Error), #[error("{device_type} with serial \"{serial}\" not found")] diff --git a/drivers/src/devices/prophesee_evk3_hd.rs b/drivers/src/devices/prophesee_evk3_hd.rs index f8b30d3..b65bb5a 100644 --- a/drivers/src/devices/prophesee_evk3_hd.rs +++ b/drivers/src/devices/prophesee_evk3_hd.rs @@ -36,7 +36,7 @@ pub struct Configuration { #[derive(thiserror::Error, Debug, Clone)] pub enum Error { - #[error("{0}")] + #[error(transparent)] Usb(#[from] usb::Error), #[error("short write ({requested} bytes requested, {written} bytes written)")] diff --git a/drivers/src/devices/prophesee_evk4.rs b/drivers/src/devices/prophesee_evk4.rs index 3cfa98b..ffa0e2a 100644 --- a/drivers/src/devices/prophesee_evk4.rs +++ b/drivers/src/devices/prophesee_evk4.rs @@ -58,7 +58,7 @@ pub struct Device { #[derive(thiserror::Error, Debug, Clone)] pub enum Error { - #[error("{0}")] + #[error(transparent)] Usb(#[from] usb::Error), #[error("short write ({requested} bytes requested, {written} bytes written)")] diff --git a/drivers/src/lib.rs b/drivers/src/lib.rs index 5005b95..997e4e0 100644 --- a/drivers/src/lib.rs +++ b/drivers/src/lib.rs @@ -15,4 +15,9 @@ pub use crate::devices::Error; pub use crate::devices::Properties; pub use crate::devices::Type; pub use crate::usb::Configuration as UsbConfiguration; + +pub use bincode; +pub use libc; +pub use libusb1_sys; pub use neuromorphic_types as types; +pub use rusb; diff --git a/drivers/src/usb.rs b/drivers/src/usb.rs index 0afa291..5ef0f14 100644 --- a/drivers/src/usb.rs +++ b/drivers/src/usb.rs @@ -17,7 +17,7 @@ impl Configuration { #[derive(thiserror::Error, Debug, Clone)] pub enum Error { - #[error("{0}")] + #[error(transparent)] Rusb(#[from] rusb::Error), #[error("device with serial not found")] @@ -171,8 +171,7 @@ impl Drop for EventLoop { self.running .store(false, std::sync::atomic::Ordering::Release); if let Some(thread) = self.thread.take() { - // unwrap: not joining self - thread.join().unwrap(); + thread.join().expect("event loop joined self"); } } } @@ -184,16 +183,16 @@ enum TransferStatus { Deallocated, } -struct SharedRingContext { +struct RingContext { read: usize, write_range: (usize, usize), transfer_statuses: Vec, buffers: Vec, } -struct RingContext { +struct SharedRingContext { on_error: Box, - shared: std::sync::Mutex, + shared: std::sync::Mutex, shared_condvar: std::sync::Condvar, } @@ -217,7 +216,7 @@ pub struct Ring { active_buffer_view: std::sync::Arc, #[allow(dead_code)] event_loop: std::sync::Arc, - context: std::sync::Arc, + context: std::sync::Arc, } unsafe impl Send for Ring {} @@ -251,7 +250,7 @@ pub struct TransferProperties { } struct TransferContext { - ring: std::sync::Arc, + ring: std::sync::Arc, transfer_index: usize, } @@ -268,8 +267,11 @@ extern "system" fn usb_transfer_callback(transfer_pointer: *mut libusb1_sys::lib let context = unsafe { &mut *(context as *mut TransferContext) }; let mut error = None; { - // unwrap: mutex is not poisonned - let mut shared = context.ring.shared.lock().unwrap(); + let mut shared = context + .ring + .shared + .lock() + .expect("ring context's lock is poisonned"); match shared.transfer_statuses[context.transfer_index] { TransferStatus::Active => match transfer.status { libusb1_sys::constants::LIBUSB_TRANSFER_COMPLETED @@ -477,9 +479,9 @@ impl Ring { for _ in 0..configuration.transfer_queue_size { transfer_statuses.push(TransferStatus::Active); } - let context = std::sync::Arc::new(RingContext { + let context = std::sync::Arc::new(SharedRingContext { on_error: Box::new(on_error), - shared: std::sync::Mutex::new(SharedRingContext { + shared: std::sync::Mutex::new(RingContext { read: buffers.len() - 1, write_range: (0, configuration.transfer_queue_size), transfer_statuses, @@ -490,8 +492,10 @@ impl Ring { let mut transfers: Vec = Vec::new(); transfers.reserve_exact(configuration.transfer_queue_size); { - // unwrap: mutex is not poisonned - let shared = context.shared.lock().unwrap(); + let shared = context + .shared + .lock() + .expect("ring context's lock is poisonned"); for index in 0..configuration.transfer_queue_size { // unsafe: libusb1_sys wrapper let mut transfer = match std::ptr::NonNull::new(unsafe { @@ -615,8 +619,11 @@ impl Ring { 0 => (), submit_transfer_status => { { - // unwrap: mutex is not poisonned - let mut shared = result.context.shared.lock().unwrap(); + let mut shared = result + .context + .shared + .lock() + .expect("ring context's lock is poisonned"); for rest_index in index..result.transfers.len() { // dropping 'result' cancels transfers // mark unscheduled transfers as complete to prevent un-needed cancelling @@ -690,8 +697,11 @@ impl Ring { } let (system_time, slice, read, write_range, ring_length) = { let start = std::time::Instant::now(); - // unwrap: mutex is not poisonned - let mut shared = self.context.shared.lock().unwrap(); + let mut shared = self + .context + .shared + .lock() + .expect("ring context's lock is poisonned"); loop { shared.read = (shared.read + 1) % shared.buffers.len(); while (shared.write_range.1 + shared.buffers.len() - 1 - shared.read) @@ -700,14 +710,15 @@ impl Ring { { let ellapsed = std::time::Instant::now() - start; if ellapsed >= *duration { + self.active_buffer_view + .store(false, std::sync::atomic::Ordering::Release); return None; } - // unwrap: shared_condvar always used with the same mutex (shared) shared = self .context .shared_condvar .wait_timeout(shared, *duration - ellapsed) - .unwrap() + .expect("shared_condar used with two different mutexes") .0; } if shared.buffers[shared.read].length > 0 { @@ -745,8 +756,11 @@ impl Drop for Ring { let before_dealloc_transfers = std::time::Instant::now(); #[cfg(target_os = "macos")] { - // unwrap: mutex is not poisonned - let mut shared = self.context.shared.lock().unwrap(); + let mut shared = self + .context + .shared + .lock() + .expect("ring context's lock is poisonned"); // unsafe: transfer is allocated let _ = unsafe { libusb1_sys::libusb_cancel_transfer(self.transfers[0].as_ptr()) }; for index in 0..self.transfers.len() { @@ -756,8 +770,11 @@ impl Drop for Ring { loop { let mut deallocated_transfers: usize = 0; { - // unwrap: mutex is not poisonned - let mut shared = self.context.shared.lock().unwrap(); + let mut shared = self + .context + .shared + .lock() + .expect("ring context's lock is poisonned"); for index in 0..self.transfers.len() { match shared.transfer_statuses[index] { TransferStatus::Active => { @@ -805,8 +822,11 @@ impl Drop for Ring { std::thread::sleep(std::time::Duration::from_millis(100)); } if dealloc_buffers { - // unwrap: mutex is not poisonned - let shared = self.context.shared.lock().unwrap(); + let shared = self + .context + .shared + .lock() + .expect("ring context's lock is poisonned"); for buffer in shared.buffers.iter() { if buffer.dma { // unsafe: buffer was allocated by libusb with 'capacity' bytes