From 91ca09e4f011951b543cec10925c0c448df337e2 Mon Sep 17 00:00:00 2001 From: Aaryamann Challani <43716372+rymnc@users.noreply.github.com> Date: Tue, 14 Jan 2025 11:35:24 +0530 Subject: [PATCH] chore(gas_price_service): lock-free latest_l2_height MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Co-authored-by: RafaƂ Chabowski --- .../src/v1/da_source_service.rs | 15 +++---- .../src/v1/da_source_service/service.rs | 23 ++++++----- .../gas_price_service/src/v1/service.rs | 40 ++++++++++--------- .../gas_price_service/src/v1/tests.rs | 33 ++++++++------- .../src/v1/uninitialized_task.rs | 15 ++++--- 5 files changed, 67 insertions(+), 59 deletions(-) diff --git a/crates/services/gas_price_service/src/v1/da_source_service.rs b/crates/services/gas_price_service/src/v1/da_source_service.rs index 2108f7e10e..37b3f304ce 100644 --- a/crates/services/gas_price_service/src/v1/da_source_service.rs +++ b/crates/services/gas_price_service/src/v1/da_source_service.rs @@ -37,16 +37,13 @@ mod tests { use fuel_core_types::fuel_types::BlockHeight; use std::{ sync::{ + atomic::AtomicU32, Arc, Mutex, }, time::Duration, }; - fn latest_l2_height(height: u32) -> Arc> { - Arc::new(Mutex::new(BlockHeight::new(height))) - } - #[tokio::test] async fn run__when_da_block_cost_source_gives_value_shared_state_is_updated() { // given @@ -59,7 +56,7 @@ mod tests { let notifier = Arc::new(tokio::sync::Notify::new()); let da_block_costs_source = DummyDaBlockCosts::new(Ok(expected_da_cost.clone()), notifier.clone()); - let latest_l2_height = Arc::new(Mutex::new(BlockHeight::new(10u32))); + let latest_l2_height = Arc::new(AtomicU32::new(10u32)); let service = new_da_service( da_block_costs_source, Some(Duration::from_millis(1)), @@ -83,7 +80,7 @@ mod tests { let notifier = Arc::new(tokio::sync::Notify::new()); let da_block_costs_source = DummyDaBlockCosts::new(Err(anyhow::anyhow!("boo!")), notifier.clone()); - let latest_l2_height = latest_l2_height(0); + let latest_l2_height = Arc::new(AtomicU32::new(0)); let service = new_da_service( da_block_costs_source, Some(Duration::from_millis(1)), @@ -116,7 +113,7 @@ mod tests { let notifier = Arc::new(tokio::sync::Notify::new()); let da_block_costs_source = DummyDaBlockCosts::new(Ok(unexpected_costs.clone()), notifier.clone()); - let latest_l2_height = latest_l2_height(l2_height); + let latest_l2_height = Arc::new(AtomicU32::new(l2_height)); let service = new_da_service( da_block_costs_source, Some(Duration::from_millis(1)), @@ -147,7 +144,7 @@ mod tests { let notifier = Arc::new(tokio::sync::Notify::new()); let da_block_costs_source = DummyDaBlockCosts::new(Ok(unexpected_costs.clone()), notifier.clone()); - let latest_l2_height = latest_l2_height(l2_height); + let latest_l2_height = Arc::new(AtomicU32::new(l2_height)); let mut service = DaSourceService::new( da_block_costs_source, Some(Duration::from_millis(1)), @@ -178,7 +175,7 @@ mod tests { let notifier = Arc::new(tokio::sync::Notify::new()); let da_block_costs_source = DummyDaBlockCosts::new(Ok(unexpected_costs.clone()), notifier.clone()); - let latest_l2_height = latest_l2_height(l2_height); + let latest_l2_height = Arc::new(AtomicU32::new(l2_height)); let (sender, mut receiver) = tokio::sync::broadcast::channel(DA_BLOCK_COSTS_CHANNEL_SIZE); let mut service = DaSourceService::new_with_sender( diff --git a/crates/services/gas_price_service/src/v1/da_source_service/service.rs b/crates/services/gas_price_service/src/v1/da_source_service/service.rs index 6a65ec8252..cadee929fd 100644 --- a/crates/services/gas_price_service/src/v1/da_source_service/service.rs +++ b/crates/services/gas_price_service/src/v1/da_source_service/service.rs @@ -7,6 +7,10 @@ use fuel_core_services::{ }; use std::{ sync::{ + atomic::{ + AtomicU32, + Ordering, + }, Arc, Mutex, }, @@ -43,7 +47,7 @@ pub struct DaSourceService { poll_interval: Interval, source: Source, shared_state: SharedState, - latest_l2_height: Arc>, + latest_l2_height: Arc, recorded_height: Option, } @@ -57,7 +61,7 @@ where pub fn new( source: Source, poll_interval: Option, - latest_l2_height: Arc>, + latest_l2_height: Arc, recorded_height: Option, ) -> Self { let (sender, _) = tokio::sync::broadcast::channel(DA_BLOCK_COSTS_CHANNEL_SIZE); @@ -77,7 +81,7 @@ where pub fn new_with_sender( source: Source, poll_interval: Option, - latest_l2_height: Arc>, + latest_l2_height: Arc, recorded_height: Option, sender: Sender, ) -> Self { @@ -103,7 +107,7 @@ where .filter_costs_that_have_values_greater_than_l2_block_height(da_block_costs)?; tracing::debug!( "the latest l2 height is: {:?}", - *self.latest_l2_height.lock().unwrap() + self.latest_l2_height.load(Ordering::Acquire) ); for da_block_costs in filtered_block_costs { tracing::debug!("Sending block costs: {:?}", da_block_costs); @@ -124,12 +128,9 @@ where &self, da_block_costs: Vec, ) -> Result> { - let latest_l2_height = *self - .latest_l2_height - .lock() - .map_err(|err| anyhow::anyhow!("lock error: {:?}", err))?; + let latest_l2_height = self.latest_l2_height.load(Ordering::Acquire); let iter = da_block_costs.into_iter().filter(move |da_block_costs| { - let end = BlockHeight::from(*da_block_costs.l2_blocks.end()); + let end = *da_block_costs.l2_blocks.end(); end < latest_l2_height }); Ok(iter) @@ -206,11 +207,11 @@ where } } -#[cfg(feature = "test-helpers")] +#[cfg(any(test, feature = "test-helpers"))] pub fn new_da_service( da_source: S, poll_interval: Option, - latest_l2_height: Arc>, + latest_l2_height: Arc, ) -> ServiceRunner> { ServiceRunner::new(DaSourceService::new( da_source, diff --git a/crates/services/gas_price_service/src/v1/service.rs b/crates/services/gas_price_service/src/v1/service.rs index 82a109227b..98d647a98b 100644 --- a/crates/services/gas_price_service/src/v1/service.rs +++ b/crates/services/gas_price_service/src/v1/service.rs @@ -61,6 +61,10 @@ use futures::FutureExt; use std::{ num::NonZeroU64, sync::{ + atomic::{ + AtomicU32, + Ordering, + }, Arc, Mutex, }, @@ -112,7 +116,7 @@ where /// Storage transaction provider for metadata and unrecorded blocks storage_tx_provider: AtomicStorage, /// communicates to the Da source service what the latest L2 block was - latest_l2_block: Arc>, + latest_l2_block: Arc, } impl GasPriceServiceV1 @@ -131,6 +135,11 @@ where } } } + + #[cfg(test)] + pub fn latest_l2_block(&self) -> &AtomicU32 { + &self.latest_l2_block + } } impl GasPriceServiceV1 @@ -159,11 +168,7 @@ where match block { BlockInfo::GenesisBlock => {} BlockInfo::Block { height, .. } => { - let mut latest_l2_block = self - .latest_l2_block - .lock() - .map_err(|err| anyhow!("Error locking latest L2 block: {:?}", err))?; - *latest_l2_block = BlockHeight::from(height); + self.latest_l2_block.store(height, Ordering::Release); } } Ok(()) @@ -182,7 +187,7 @@ where algorithm_updater: AlgorithmUpdaterV1, da_source_adapter_handle: ServiceRunner>, storage_tx_provider: AtomicStorage, - latest_l2_block: Arc>, + latest_l2_block: Arc, ) -> Self { let da_source_channel = da_source_adapter_handle.shared.clone().subscribe(); Self { @@ -415,6 +420,7 @@ mod tests { use std::{ num::NonZeroU64, sync::{ + atomic::AtomicU32, Arc, Mutex, }, @@ -526,9 +532,6 @@ mod tests { fn database() -> StorageTransaction> { InMemoryStorage::default().into_transaction() } - fn latest_l2_height(height: u32) -> Arc> { - Arc::new(Mutex::new(BlockHeight::new(height))) - } #[tokio::test] async fn run__updates_gas_price_with_l2_block_source() { @@ -577,20 +580,19 @@ mod tests { .unwrap(); let notifier = Arc::new(tokio::sync::Notify::new()); - let latest_l2_block = Arc::new(Mutex::new(BlockHeight::new(0))); + let latest_l2_height = Arc::new(AtomicU32::new(0)); let dummy_da_source = DaSourceService::new( DummyDaBlockCosts::new( Err(anyhow::anyhow!("unused at the moment")), notifier.clone(), ), None, - latest_l2_block, + Arc::clone(&latest_l2_height), None, ); let da_service_runner = ServiceRunner::new(dummy_da_source); da_service_runner.start_and_await().await.unwrap(); let latest_gas_price = LatestGasPrice::new(0, 0); - let latest_l2_height = latest_l2_height(0); let mut service = GasPriceServiceV1::new( l2_block_source, @@ -665,7 +667,7 @@ mod tests { algo_updater.last_profit = 10_000; algo_updater.new_scaled_da_gas_price = 10_000_000; - let latest_l2_block = latest_l2_height(block_height - 1); + let latest_l2_block = Arc::new(AtomicU32::new(block_height - 1)); let notifier = Arc::new(tokio::sync::Notify::new()); let da_source = DaSourceService::new( DummyDaBlockCosts::new( @@ -678,7 +680,7 @@ mod tests { notifier.clone(), ), Some(Duration::from_millis(1)), - latest_l2_block.clone(), + Arc::clone(&latest_l2_block), None, ); let mut watcher = StateWatcher::started(); @@ -767,7 +769,7 @@ mod tests { algo_updater.last_profit = 10_000; algo_updater.new_scaled_da_gas_price = 10_000_000; - let latest_l2_height = latest_l2_height(block_height - 1); + let latest_l2_height = Arc::new(AtomicU32::new(block_height - 1)); let notifier = Arc::new(tokio::sync::Notify::new()); let da_source = DaSourceService::new( DummyDaBlockCosts::new( @@ -780,7 +782,7 @@ mod tests { notifier.clone(), ), Some(Duration::from_millis(1)), - latest_l2_height.clone(), + Arc::clone(&latest_l2_height), None, ); let mut watcher = StateWatcher::started(); @@ -859,7 +861,7 @@ mod tests { let notifier = Arc::new(tokio::sync::Notify::new()); let blob_cost_wei = 9000; - let latest_l2_height = latest_l2_height(block_height - 1); + let latest_l2_height = Arc::new(AtomicU32::new(block_height - 1)); let da_source = DaSourceService::new( DummyDaBlockCosts::new( Ok(DaBlockCosts { @@ -871,7 +873,7 @@ mod tests { notifier.clone(), ), Some(Duration::from_millis(1)), - latest_l2_height.clone(), + Arc::clone(&latest_l2_height), None, ); let mut watcher = StateWatcher::started(); diff --git a/crates/services/gas_price_service/src/v1/tests.rs b/crates/services/gas_price_service/src/v1/tests.rs index 8f208692c7..73a6165a55 100644 --- a/crates/services/gas_price_service/src/v1/tests.rs +++ b/crates/services/gas_price_service/src/v1/tests.rs @@ -110,6 +110,10 @@ use std::{ num::NonZeroU64, ops::Deref, sync::{ + atomic::{ + AtomicU32, + Ordering, + }, Arc, Mutex, }, @@ -358,9 +362,6 @@ fn gas_price_database_with_metadata( tx.commit().unwrap(); db } -fn latest_l2_height(height: u32) -> Arc> { - Arc::new(Mutex::new(BlockHeight::new(height))) -} #[tokio::test] async fn next_gas_price__affected_by_new_l2_block() { @@ -385,8 +386,9 @@ async fn next_gas_price__affected_by_new_l2_block() { let (algo_updater, shared_algo) = initialize_algorithm(&config, height, height, &metadata_storage).unwrap(); let da_source = FakeDABlockCost::never_returns(); - let latest_l2_height = latest_l2_height(0); - let da_service_runner = new_da_service(da_source, None, latest_l2_height.clone()); + let latest_l2_height = Arc::new(AtomicU32::new(0)); + let da_service_runner = + new_da_service(da_source, None, Arc::clone(&latest_l2_height)); da_service_runner.start_and_await().await.unwrap(); let latest_gas_price = LatestGasPrice::new(0, 0); @@ -436,8 +438,9 @@ async fn run__new_l2_block_saves_old_metadata() { let algo_updater = updater_from_config(&config, 0); let shared_algo = SharedV1Algorithm::new_with_algorithm(algo_updater.algorithm()); let da_source = FakeDABlockCost::never_returns(); - let latest_l2_height = latest_l2_height(0); - let da_service_runner = new_da_service(da_source, None, latest_l2_height.clone()); + let latest_l2_height = Arc::new(AtomicU32::new(0)); + let da_service_runner = + new_da_service(da_source, None, Arc::clone(&latest_l2_height)); da_service_runner.start_and_await().await.unwrap(); let latest_gas_price = LatestGasPrice::new(0, 0); let mut service = GasPriceServiceV1::new( @@ -490,8 +493,9 @@ async fn run__new_l2_block_updates_latest_gas_price_arc() { let algo_updater = updater_from_config(&config, 0); let shared_algo = SharedV1Algorithm::new_with_algorithm(algo_updater.algorithm()); let da_source = FakeDABlockCost::never_returns(); - let latest_l2_height = latest_l2_height(0); - let da_service_runner = new_da_service(da_source, None, latest_l2_height.clone()); + let latest_l2_height = Arc::new(AtomicU32::new(0)); + let da_service_runner = + new_da_service(da_source, None, Arc::clone(&latest_l2_height)); let latest_gas_price = LatestGasPrice::new(0, 0); let mut service = GasPriceServiceV1::new( l2_block_source, @@ -540,9 +544,10 @@ async fn run__updates_da_service_latest_l2_height() { algo_updater.l2_block_height = l2_height - 1; let shared_algo = SharedV1Algorithm::new_with_algorithm(algo_updater.algorithm()); let da_source = FakeDABlockCost::never_returns(); - let latest_l2_height = latest_l2_height(0); + let latest_l2_height = Arc::new(AtomicU32::new(0)); let latest_gas_price = LatestGasPrice::new(0, 0); - let da_service_runner = new_da_service(da_source, None, latest_l2_height.clone()); + let da_service_runner = + new_da_service(da_source, None, Arc::clone(&latest_l2_height)); da_service_runner.start_and_await().await.unwrap(); let mut service = GasPriceServiceV1::new( l2_block_source, @@ -551,7 +556,7 @@ async fn run__updates_da_service_latest_l2_height() { algo_updater, da_service_runner, inner, - latest_l2_height.clone(), + latest_l2_height, ); let mut watcher = StateWatcher::started(); @@ -560,8 +565,8 @@ async fn run__updates_da_service_latest_l2_height() { let _ = service.run(&mut watcher).await; // then - let latest_value = *latest_l2_height.lock().unwrap(); - assert_eq!(*latest_value, l2_height); + let latest_value = service.latest_l2_block().load(Ordering::SeqCst); + assert_eq!(latest_value, l2_height); } #[derive(Clone)] diff --git a/crates/services/gas_price_service/src/v1/uninitialized_task.rs b/crates/services/gas_price_service/src/v1/uninitialized_task.rs index af6e1925a4..99bbaa0227 100644 --- a/crates/services/gas_price_service/src/v1/uninitialized_task.rs +++ b/crates/services/gas_price_service/src/v1/uninitialized_task.rs @@ -82,7 +82,10 @@ use fuel_gas_price_algorithm::v1::{ UnrecordedBlocks, }; use std::{ - sync::Arc, + sync::{ + atomic::AtomicU32, + Arc, + }, time::Duration, }; @@ -194,12 +197,12 @@ where let recorded_height = self.gas_price_db.get_recorded_height()?; let poll_duration = self.config.da_poll_interval; - let latest_l2_height = - Arc::new(std::sync::Mutex::new(BlockHeight::new(latest_block_height))); + let latest_l2_height = Arc::new(AtomicU32::new(latest_block_height)); + let da_service = DaSourceService::new( self.da_source, poll_duration, - latest_l2_height.clone(), + Arc::clone(&latest_l2_height), recorded_height, ); let da_service_runner = ServiceRunner::new(da_service); @@ -213,7 +216,7 @@ where self.algo_updater, da_service_runner, self.gas_price_db, - latest_l2_height, + Arc::clone(&latest_l2_height), ); Ok(service) } else { @@ -236,7 +239,7 @@ where self.algo_updater, da_service_runner, self.gas_price_db, - latest_l2_height, + Arc::clone(&latest_l2_height), ); Ok(service) }