From 9227b9e0868dc873ee0f564d78086d71ed7a23a1 Mon Sep 17 00:00:00 2001 From: Andreu Botella Date: Sat, 28 Jan 2023 06:13:38 +0100 Subject: [PATCH] chore: Reduce contention when using `EszipV2` from multiple threads The `EszipV2` struct guards the module map behind an `Arc`. This is needed to support streaming sources while returning a result before finishing parsing. However, this means every operation that needs access to the module map has to contend for that same lock. This contention is inevitable while the streaming parsing is going on, but reads also contend with each other. This change avoids this read contention by switching the `Mutex` for a `RwLock`. However, for pending modules, the `get_module_source` and `get_module_source_map` methods add a waker for the current async context to the module's waker list, and this would need some write locking. To avoid this, this change also guards the waker list behind a `Mutex`, which also reduces the scope of that contention by making it specific to a module. With these changes, given a fully-resolved `EszipV2` which isn't modified, all operations on it are now guaranteed to be lock-free. --- src/v2.rs | 41 +++++++++++++++++++++-------------------- 1 file changed, 21 insertions(+), 20 deletions(-) diff --git a/src/v2.rs b/src/v2.rs index a83ac68..e0410be 100644 --- a/src/v2.rs +++ b/src/v2.rs @@ -4,6 +4,7 @@ use std::future::Future; use std::mem::size_of; use std::sync::Arc; use std::sync::Mutex; +use std::sync::RwLock; use std::task::Poll; use std::task::Waker; @@ -36,7 +37,7 @@ enum HeaderFrameKind { /// source maps. #[derive(Debug, Default)] pub struct EszipV2 { - modules: Arc>>, + modules: Arc>>, pub ordered_modules: Vec, } @@ -57,7 +58,7 @@ pub enum EszipV2SourceSlot { Pending { offset: usize, length: usize, - wakers: Vec, + wakers: Mutex>, }, Ready(Arc>), } @@ -155,7 +156,7 @@ impl EszipV2 { EszipV2SourceSlot::Pending { offset: source_offset as usize, length: source_len as usize, - wakers: vec![], + wakers: Mutex::new(vec![]), } }; let source_map = if source_map_offset == 0 && source_map_len == 0 { @@ -164,7 +165,7 @@ impl EszipV2 { EszipV2SourceSlot::Pending { offset: source_map_offset as usize, length: source_map_len as usize, - wakers: vec![], + wakers: Mutex::new(vec![]), } }; let module = EszipV2Module::Module { @@ -216,7 +217,7 @@ impl EszipV2 { }) .collect::>(); - let modules = Arc::new(Mutex::new(modules)); + let modules = Arc::new(RwLock::new(modules)); let modules_ = modules.clone(); let fut = async move { @@ -246,7 +247,7 @@ impl EszipV2 { read += length + 32; let wakers = { - let mut modules = modules.lock().unwrap(); + let mut modules = modules.write().unwrap(); let module = modules.get_mut(&specifier).expect("module not found"); if let EszipV2Module::Module { source, .. } = module { let slot = std::mem::replace( @@ -255,7 +256,7 @@ impl EszipV2 { ); if let EszipV2SourceSlot::Pending { wakers, .. } = slot { - wakers + wakers.into_inner().unwrap() } else { panic!("already populated source slot"); } @@ -292,7 +293,7 @@ impl EszipV2 { read += length + 32; let wakers = { - let mut modules = modules.lock().unwrap(); + let mut modules = modules.write().unwrap(); let module = modules.get_mut(&specifier).expect("module not found"); if let EszipV2Module::Module { source_map, .. } = module { let slot = std::mem::replace( @@ -301,7 +302,7 @@ impl EszipV2 { ); if let EszipV2SourceSlot::Pending { wakers, .. } = slot { - wakers + wakers.into_inner().unwrap() } else { panic!("already populated source_map slot"); } @@ -338,7 +339,7 @@ impl EszipV2 { source: EszipV2SourceSlot::Ready(source), source_map: EszipV2SourceSlot::Ready(Arc::new(vec![])), }; - let mut modules = self.modules.lock().unwrap(); + let mut modules = self.modules.write().unwrap(); if modules.get(&specifier).is_none() { modules.insert(specifier.clone(), module); } @@ -357,7 +358,7 @@ impl EszipV2 { let mut sources: Vec = Vec::new(); let mut source_maps: Vec = Vec::new(); - let modules = self.modules.lock().unwrap(); + let modules = self.modules.read().unwrap(); let mut ordered_modules = self.ordered_modules.clone(); let seen_modules: HashSet = @@ -601,7 +602,7 @@ impl EszipV2 { } Ok(Self { - modules: Arc::new(Mutex::new(modules)), + modules: Arc::new(RwLock::new(modules)), ordered_modules, }) } @@ -612,7 +613,7 @@ impl EszipV2 { pub fn get_module(&self, specifier: &str) -> Option { let mut specifier = specifier; let mut visited = HashSet::new(); - let modules = self.modules.lock().unwrap(); + let modules = self.modules.read().unwrap(); loop { visited.insert(specifier); let module = modules.get(specifier)?; @@ -642,8 +643,8 @@ impl EszipV2 { specifier: &str, ) -> Arc> { poll_fn(|cx| { - let mut modules = self.modules.lock().unwrap(); - let module = modules.get_mut(specifier).unwrap(); + let modules = self.modules.read().unwrap(); + let module = modules.get(specifier).unwrap(); let slot = match module { EszipV2Module::Module { source, .. } => source, EszipV2Module::Redirect { .. } => { @@ -652,7 +653,7 @@ impl EszipV2 { }; match slot { EszipV2SourceSlot::Pending { wakers, .. } => { - wakers.push(cx.waker().clone()); + wakers.lock().unwrap().push(cx.waker().clone()); Poll::Pending } EszipV2SourceSlot::Ready(bytes) => Poll::Ready(bytes.clone()), @@ -666,8 +667,8 @@ impl EszipV2 { specifier: &str, ) -> Arc> { poll_fn(|cx| { - let mut modules = self.modules.lock().unwrap(); - let module = modules.get_mut(specifier).unwrap(); + let modules = self.modules.read().unwrap(); + let module = modules.get(specifier).unwrap(); let slot = match module { EszipV2Module::Module { source_map, .. } => source_map, EszipV2Module::Redirect { .. } => { @@ -676,7 +677,7 @@ impl EszipV2 { }; match slot { EszipV2SourceSlot::Pending { wakers, .. } => { - wakers.push(cx.waker().clone()); + wakers.lock().unwrap().push(cx.waker().clone()); Poll::Pending } EszipV2SourceSlot::Ready(bytes) => Poll::Ready(bytes.clone()), @@ -686,7 +687,7 @@ impl EszipV2 { } pub fn specifiers(&self) -> Vec { - let modules = self.modules.lock().unwrap(); + let modules = self.modules.read().unwrap(); modules.keys().cloned().collect() } }