Skip to content

Commit

Permalink
Fix a missing flag clear causing a race condition error to incorrectl…
Browse files Browse the repository at this point in the history
…y be raised
  • Loading branch information
aMarcireau committed Nov 6, 2023
1 parent de07f84 commit 6281e2a
Show file tree
Hide file tree
Showing 6 changed files with 55 additions and 30 deletions.
2 changes: 1 addition & 1 deletion drivers/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ authors = [
"Alexandre Marcireau",
]
description = "Neuromorphic devices drivers"
version = "0.8.0"
version = "0.9.0"
edition = "2021"
license-file = "../LICENSE"
homepage = "https://github.com/neuromorphicsystems/neuromorphic-rs/"
Expand Down
2 changes: 1 addition & 1 deletion drivers/src/devices.rs
Original file line number Diff line number Diff line change
Expand Up @@ -273,7 +273,7 @@ macro_rules! register {

#[derive(thiserror::Error, Debug, Clone)]
pub enum Error {
#[error("{0}")]
#[error(transparent)]
Usb(#[from] usb::Error),

#[error("{device_type} with serial \"{serial}\" not found")]
Expand Down
2 changes: 1 addition & 1 deletion drivers/src/devices/prophesee_evk3_hd.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ pub struct Configuration {

#[derive(thiserror::Error, Debug, Clone)]
pub enum Error {
#[error("{0}")]
#[error(transparent)]
Usb(#[from] usb::Error),

#[error("short write ({requested} bytes requested, {written} bytes written)")]
Expand Down
2 changes: 1 addition & 1 deletion drivers/src/devices/prophesee_evk4.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ pub struct Device {

#[derive(thiserror::Error, Debug, Clone)]
pub enum Error {
#[error("{0}")]
#[error(transparent)]
Usb(#[from] usb::Error),

#[error("short write ({requested} bytes requested, {written} bytes written)")]
Expand Down
5 changes: 5 additions & 0 deletions drivers/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,4 +15,9 @@ pub use crate::devices::Error;
pub use crate::devices::Properties;
pub use crate::devices::Type;
pub use crate::usb::Configuration as UsbConfiguration;

pub use bincode;
pub use libc;
pub use libusb1_sys;
pub use neuromorphic_types as types;
pub use rusb;
72 changes: 46 additions & 26 deletions drivers/src/usb.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ impl Configuration {

#[derive(thiserror::Error, Debug, Clone)]
pub enum Error {
#[error("{0}")]
#[error(transparent)]
Rusb(#[from] rusb::Error),

#[error("device with serial not found")]
Expand Down Expand Up @@ -171,8 +171,7 @@ impl Drop for EventLoop {
self.running
.store(false, std::sync::atomic::Ordering::Release);
if let Some(thread) = self.thread.take() {
// unwrap: not joining self
thread.join().unwrap();
thread.join().expect("event loop joined self");
}
}
}
Expand All @@ -184,16 +183,16 @@ enum TransferStatus {
Deallocated,
}

struct SharedRingContext {
struct RingContext {
read: usize,
write_range: (usize, usize),
transfer_statuses: Vec<TransferStatus>,
buffers: Vec<Buffer>,
}

struct RingContext {
struct SharedRingContext {
on_error: Box<dyn Fn(Error) + Send + Sync + 'static>,
shared: std::sync::Mutex<SharedRingContext>,
shared: std::sync::Mutex<RingContext>,
shared_condvar: std::sync::Condvar,
}

Expand All @@ -217,7 +216,7 @@ pub struct Ring {
active_buffer_view: std::sync::Arc<std::sync::atomic::AtomicBool>,
#[allow(dead_code)]
event_loop: std::sync::Arc<EventLoop>,
context: std::sync::Arc<RingContext>,
context: std::sync::Arc<SharedRingContext>,
}

unsafe impl Send for Ring {}
Expand Down Expand Up @@ -251,7 +250,7 @@ pub struct TransferProperties {
}

struct TransferContext {
ring: std::sync::Arc<RingContext>,
ring: std::sync::Arc<SharedRingContext>,
transfer_index: usize,
}

Expand All @@ -268,8 +267,11 @@ extern "system" fn usb_transfer_callback(transfer_pointer: *mut libusb1_sys::lib
let context = unsafe { &mut *(context as *mut TransferContext) };
let mut error = None;
{
// unwrap: mutex is not poisonned
let mut shared = context.ring.shared.lock().unwrap();
let mut shared = context
.ring
.shared
.lock()
.expect("ring context's lock is poisonned");
match shared.transfer_statuses[context.transfer_index] {
TransferStatus::Active => match transfer.status {
libusb1_sys::constants::LIBUSB_TRANSFER_COMPLETED
Expand Down Expand Up @@ -477,9 +479,9 @@ impl Ring {
for _ in 0..configuration.transfer_queue_size {
transfer_statuses.push(TransferStatus::Active);
}
let context = std::sync::Arc::new(RingContext {
let context = std::sync::Arc::new(SharedRingContext {
on_error: Box::new(on_error),
shared: std::sync::Mutex::new(SharedRingContext {
shared: std::sync::Mutex::new(RingContext {
read: buffers.len() - 1,
write_range: (0, configuration.transfer_queue_size),
transfer_statuses,
Expand All @@ -490,8 +492,10 @@ impl Ring {
let mut transfers: Vec<LibusbTransfer> = Vec::new();
transfers.reserve_exact(configuration.transfer_queue_size);
{
// unwrap: mutex is not poisonned
let shared = context.shared.lock().unwrap();
let shared = context
.shared
.lock()
.expect("ring context's lock is poisonned");
for index in 0..configuration.transfer_queue_size {
// unsafe: libusb1_sys wrapper
let mut transfer = match std::ptr::NonNull::new(unsafe {
Expand Down Expand Up @@ -615,8 +619,11 @@ impl Ring {
0 => (),
submit_transfer_status => {
{
// unwrap: mutex is not poisonned
let mut shared = result.context.shared.lock().unwrap();
let mut shared = result
.context
.shared
.lock()
.expect("ring context's lock is poisonned");
for rest_index in index..result.transfers.len() {
// dropping 'result' cancels transfers
// mark unscheduled transfers as complete to prevent un-needed cancelling
Expand Down Expand Up @@ -690,8 +697,11 @@ impl Ring {
}
let (system_time, slice, read, write_range, ring_length) = {
let start = std::time::Instant::now();
// unwrap: mutex is not poisonned
let mut shared = self.context.shared.lock().unwrap();
let mut shared = self
.context
.shared
.lock()
.expect("ring context's lock is poisonned");
loop {
shared.read = (shared.read + 1) % shared.buffers.len();
while (shared.write_range.1 + shared.buffers.len() - 1 - shared.read)
Expand All @@ -700,14 +710,15 @@ impl Ring {
{
let ellapsed = std::time::Instant::now() - start;
if ellapsed >= *duration {
self.active_buffer_view
.store(false, std::sync::atomic::Ordering::Release);
return None;
}
// unwrap: shared_condvar always used with the same mutex (shared)
shared = self
.context
.shared_condvar
.wait_timeout(shared, *duration - ellapsed)
.unwrap()
.expect("shared_condar used with two different mutexes")
.0;
}
if shared.buffers[shared.read].length > 0 {
Expand Down Expand Up @@ -745,8 +756,11 @@ impl Drop for Ring {
let before_dealloc_transfers = std::time::Instant::now();
#[cfg(target_os = "macos")]
{
// unwrap: mutex is not poisonned
let mut shared = self.context.shared.lock().unwrap();
let mut shared = self
.context
.shared
.lock()
.expect("ring context's lock is poisonned");
// unsafe: transfer is allocated
let _ = unsafe { libusb1_sys::libusb_cancel_transfer(self.transfers[0].as_ptr()) };
for index in 0..self.transfers.len() {
Expand All @@ -756,8 +770,11 @@ impl Drop for Ring {
loop {
let mut deallocated_transfers: usize = 0;
{
// unwrap: mutex is not poisonned
let mut shared = self.context.shared.lock().unwrap();
let mut shared = self
.context
.shared
.lock()
.expect("ring context's lock is poisonned");
for index in 0..self.transfers.len() {
match shared.transfer_statuses[index] {
TransferStatus::Active => {
Expand Down Expand Up @@ -805,8 +822,11 @@ impl Drop for Ring {
std::thread::sleep(std::time::Duration::from_millis(100));
}
if dealloc_buffers {
// unwrap: mutex is not poisonned
let shared = self.context.shared.lock().unwrap();
let shared = self
.context
.shared
.lock()
.expect("ring context's lock is poisonned");
for buffer in shared.buffers.iter() {
if buffer.dma {
// unsafe: buffer was allocated by libusb with 'capacity' bytes
Expand Down

0 comments on commit 6281e2a

Please sign in to comment.