-
Notifications
You must be signed in to change notification settings - Fork 2.8k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Lock-free latest_l2_height
in gas price service
#2546
base: chore/add-tests-for-v1-gas-service
Are you sure you want to change the base?
Changes from 1 commit
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change | ||||
---|---|---|---|---|---|---|
|
@@ -7,6 +7,10 @@ use fuel_core_services::{ | |||||
}; | ||||||
use std::{ | ||||||
sync::{ | ||||||
atomic::{ | ||||||
AtomicU32, | ||||||
Ordering, | ||||||
}, | ||||||
Arc, | ||||||
Mutex, | ||||||
}, | ||||||
|
@@ -43,7 +47,7 @@ pub struct DaSourceService<Source> { | |||||
poll_interval: Interval, | ||||||
source: Source, | ||||||
shared_state: SharedState, | ||||||
latest_l2_height: Arc<Mutex<BlockHeight>>, | ||||||
latest_l2_height: Arc<AtomicU32>, | ||||||
recorded_height: Option<BlockHeight>, | ||||||
} | ||||||
|
||||||
|
@@ -57,7 +61,7 @@ where | |||||
pub fn new( | ||||||
source: Source, | ||||||
poll_interval: Option<Duration>, | ||||||
latest_l2_height: Arc<Mutex<BlockHeight>>, | ||||||
latest_l2_height: Arc<AtomicU32>, | ||||||
recorded_height: Option<BlockHeight>, | ||||||
) -> Self { | ||||||
let (sender, _) = tokio::sync::broadcast::channel(DA_BLOCK_COSTS_CHANNEL_SIZE); | ||||||
|
@@ -77,7 +81,7 @@ where | |||||
pub fn new_with_sender( | ||||||
source: Source, | ||||||
poll_interval: Option<Duration>, | ||||||
latest_l2_height: Arc<Mutex<BlockHeight>>, | ||||||
latest_l2_height: Arc<AtomicU32>, | ||||||
recorded_height: Option<BlockHeight>, | ||||||
sender: Sender<DaBlockCosts>, | ||||||
) -> Self { | ||||||
|
@@ -103,7 +107,7 @@ where | |||||
.filter_costs_that_have_values_greater_than_l2_block_height(da_block_costs)?; | ||||||
tracing::debug!( | ||||||
"the latest l2 height is: {:?}", | ||||||
*self.latest_l2_height.lock().unwrap() | ||||||
self.latest_l2_height.load(Ordering::SeqCst) | ||||||
); | ||||||
for da_block_costs in filtered_block_costs { | ||||||
tracing::debug!("Sending block costs: {:?}", da_block_costs); | ||||||
|
@@ -124,12 +128,9 @@ where | |||||
&self, | ||||||
da_block_costs: Vec<DaBlockCosts>, | ||||||
) -> Result<impl Iterator<Item = DaBlockCosts>> { | ||||||
let latest_l2_height = *self | ||||||
.latest_l2_height | ||||||
.lock() | ||||||
.map_err(|err| anyhow::anyhow!("lock error: {:?}", err))?; | ||||||
let latest_l2_height = self.latest_l2_height.load(Ordering::SeqCst); | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
SeqCst is not needed unless you have at least two atomic variables. The purpose of SeqCst is to make sure that different threads do not see updates to different variables in different order. See https://stackoverflow.com/questions/14861822/acquire-release-versus-sequentially-consistent-memory-order for an example. (Just mentioning this, overall I think we can keep SeqCst) There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Updated in 5eeba9e |
||||||
let iter = da_block_costs.into_iter().filter(move |da_block_costs| { | ||||||
let end = BlockHeight::from(*da_block_costs.l2_blocks.end()); | ||||||
let end = *da_block_costs.l2_blocks.end(); | ||||||
end < latest_l2_height | ||||||
}); | ||||||
Ok(iter) | ||||||
|
@@ -206,10 +207,11 @@ where | |||||
} | ||||||
} | ||||||
|
||||||
#[cfg(test)] | ||||||
pub fn new_da_service<S: DaBlockCostsSource>( | ||||||
da_source: S, | ||||||
poll_interval: Option<Duration>, | ||||||
latest_l2_height: Arc<Mutex<BlockHeight>>, | ||||||
latest_l2_height: Arc<AtomicU32>, | ||||||
) -> ServiceRunner<DaSourceService<S>> { | ||||||
ServiceRunner::new(DaSourceService::new( | ||||||
da_source, | ||||||
|
Original file line number | Diff line number | Diff line change | ||||
---|---|---|---|---|---|---|
|
@@ -61,6 +61,10 @@ use futures::FutureExt; | |||||
use std::{ | ||||||
num::NonZeroU64, | ||||||
sync::{ | ||||||
atomic::{ | ||||||
AtomicU32, | ||||||
Ordering, | ||||||
}, | ||||||
Arc, | ||||||
Mutex, | ||||||
}, | ||||||
|
@@ -112,7 +116,7 @@ where | |||||
/// Storage transaction provider for metadata and unrecorded blocks | ||||||
storage_tx_provider: AtomicStorage, | ||||||
/// communicates to the Da source service what the latest L2 block was | ||||||
latest_l2_block: Arc<Mutex<BlockHeight>>, | ||||||
latest_l2_block: Arc<AtomicU32>, | ||||||
} | ||||||
|
||||||
impl<L2, DA, StorageTxProvider> GasPriceServiceV1<L2, DA, StorageTxProvider> | ||||||
|
@@ -131,6 +135,11 @@ where | |||||
} | ||||||
} | ||||||
} | ||||||
|
||||||
#[cfg(test)] | ||||||
pub fn latest_l2_block(&self) -> &AtomicU32 { | ||||||
&self.latest_l2_block | ||||||
} | ||||||
} | ||||||
|
||||||
impl<L2, DA, AtomicStorage> GasPriceServiceV1<L2, DA, AtomicStorage> | ||||||
|
@@ -159,11 +168,7 @@ where | |||||
match block { | ||||||
BlockInfo::GenesisBlock => {} | ||||||
BlockInfo::Block { height, .. } => { | ||||||
let mut latest_l2_block = self | ||||||
.latest_l2_block | ||||||
.lock() | ||||||
.map_err(|err| anyhow!("Error locking latest L2 block: {:?}", err))?; | ||||||
*latest_l2_block = BlockHeight::from(height); | ||||||
self.latest_l2_block.store(height, Ordering::SeqCst); | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
(see the corresponding comment on the There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Updated in 5eeba9e |
||||||
} | ||||||
} | ||||||
Ok(()) | ||||||
|
@@ -182,7 +187,7 @@ where | |||||
algorithm_updater: AlgorithmUpdaterV1, | ||||||
da_source_adapter_handle: ServiceRunner<DaSourceService<DA>>, | ||||||
storage_tx_provider: AtomicStorage, | ||||||
latest_l2_block: Arc<Mutex<BlockHeight>>, | ||||||
latest_l2_block: Arc<AtomicU32>, | ||||||
) -> Self { | ||||||
let da_source_channel = da_source_adapter_handle.shared.clone().subscribe(); | ||||||
Self { | ||||||
|
@@ -415,6 +420,7 @@ mod tests { | |||||
use std::{ | ||||||
num::NonZeroU64, | ||||||
sync::{ | ||||||
atomic::AtomicU32, | ||||||
Arc, | ||||||
Mutex, | ||||||
}, | ||||||
|
@@ -525,9 +531,6 @@ mod tests { | |||||
fn database() -> StorageTransaction<InMemoryStorage<GasPriceColumn>> { | ||||||
InMemoryStorage::default().into_transaction() | ||||||
} | ||||||
fn latest_l2_height(height: u32) -> Arc<Mutex<BlockHeight>> { | ||||||
Arc::new(Mutex::new(BlockHeight::new(height))) | ||||||
} | ||||||
|
||||||
#[tokio::test] | ||||||
async fn run__updates_gas_price_with_l2_block_source() { | ||||||
|
@@ -571,20 +574,19 @@ mod tests { | |||||
initialize_algorithm(&config, l2_block_height, &metadata_storage).unwrap(); | ||||||
|
||||||
let notifier = Arc::new(tokio::sync::Notify::new()); | ||||||
let latest_l2_block = Arc::new(Mutex::new(BlockHeight::new(0))); | ||||||
let latest_l2_height = Arc::new(AtomicU32::new(0)); | ||||||
let dummy_da_source = DaSourceService::new( | ||||||
DummyDaBlockCosts::new( | ||||||
Err(anyhow::anyhow!("unused at the moment")), | ||||||
notifier.clone(), | ||||||
), | ||||||
None, | ||||||
latest_l2_block, | ||||||
Arc::clone(&latest_l2_height), | ||||||
None, | ||||||
); | ||||||
let da_service_runner = ServiceRunner::new(dummy_da_source); | ||||||
da_service_runner.start_and_await().await.unwrap(); | ||||||
let latest_gas_price = LatestGasPrice::new(0, 0); | ||||||
let latest_l2_height = latest_l2_height(0); | ||||||
|
||||||
let mut service = GasPriceServiceV1::new( | ||||||
l2_block_source, | ||||||
|
@@ -659,7 +661,7 @@ mod tests { | |||||
algo_updater.last_profit = 10_000; | ||||||
algo_updater.new_scaled_da_gas_price = 10_000_000; | ||||||
|
||||||
let latest_l2_block = latest_l2_height(block_height - 1); | ||||||
let latest_l2_block = Arc::new(AtomicU32::new(block_height - 1)); | ||||||
let notifier = Arc::new(tokio::sync::Notify::new()); | ||||||
let da_source = DaSourceService::new( | ||||||
DummyDaBlockCosts::new( | ||||||
|
@@ -672,7 +674,7 @@ mod tests { | |||||
notifier.clone(), | ||||||
), | ||||||
Some(Duration::from_millis(1)), | ||||||
latest_l2_block.clone(), | ||||||
Arc::clone(&latest_l2_block), | ||||||
None, | ||||||
); | ||||||
let mut watcher = StateWatcher::started(); | ||||||
|
@@ -763,7 +765,7 @@ mod tests { | |||||
algo_updater.last_profit = 10_000; | ||||||
algo_updater.new_scaled_da_gas_price = 10_000_000; | ||||||
|
||||||
let latest_l2_height = latest_l2_height(block_height - 1); | ||||||
let latest_l2_height = Arc::new(AtomicU32::new(block_height - 1)); | ||||||
let notifier = Arc::new(tokio::sync::Notify::new()); | ||||||
let da_source = DaSourceService::new( | ||||||
DummyDaBlockCosts::new( | ||||||
|
@@ -776,7 +778,7 @@ mod tests { | |||||
notifier.clone(), | ||||||
), | ||||||
Some(Duration::from_millis(1)), | ||||||
latest_l2_height.clone(), | ||||||
Arc::clone(&latest_l2_height), | ||||||
None, | ||||||
); | ||||||
let mut watcher = StateWatcher::started(); | ||||||
|
@@ -856,7 +858,7 @@ mod tests { | |||||
|
||||||
let notifier = Arc::new(tokio::sync::Notify::new()); | ||||||
let blob_cost_wei = 9000; | ||||||
let latest_l2_height = latest_l2_height(block_height - 1); | ||||||
let latest_l2_height = Arc::new(AtomicU32::new(block_height - 1)); | ||||||
let da_source = DaSourceService::new( | ||||||
DummyDaBlockCosts::new( | ||||||
Ok(DaBlockCosts { | ||||||
|
@@ -868,7 +870,7 @@ mod tests { | |||||
notifier.clone(), | ||||||
), | ||||||
Some(Duration::from_millis(1)), | ||||||
latest_l2_height.clone(), | ||||||
Arc::clone(&latest_l2_height), | ||||||
None, | ||||||
); | ||||||
let mut watcher = StateWatcher::started(); | ||||||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Updated in 5eeba9e