diff --git a/CHANGELOG.md b/CHANGELOG.md index 48b6d73162d..d012d84ef53 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -7,6 +7,7 @@ and this project adheres to [Semantic Versioning](http://semver.org/). ## [Unreleased] ### Added +- [2442](https://github.com/FuelLabs/fuel-core/pull/2442): Add uninitialized task for V1 gas price service - [2154](https://github.com/FuelLabs/fuel-core/pull/2154): Added `Unknown` variant to `ConsensusParameters` graphql queries - [2154](https://github.com/FuelLabs/fuel-core/pull/2154): Added `Unknown` variant to `Block` graphql queries - [2154](https://github.com/FuelLabs/fuel-core/pull/2154): Added `TransactionType` type in `fuel-client` diff --git a/crates/fuel-gas-price-algorithm/src/v1.rs b/crates/fuel-gas-price-algorithm/src/v1.rs index 09cddbb42ab..2464f196a09 100644 --- a/crates/fuel-gas-price-algorithm/src/v1.rs +++ b/crates/fuel-gas-price-algorithm/src/v1.rs @@ -16,7 +16,7 @@ pub enum Error { #[error("Could not calculate cost per byte: {bytes:?} bytes, {cost:?} cost")] CouldNotCalculateCostPerByte { bytes: u128, cost: u128 }, #[error("Failed to include L2 block data: {0}")] - FailedTooIncludeL2BlockData(String), + FailedToIncludeL2BlockData(String), #[error("L2 block expected but not found in unrecorded blocks: {height}")] L2BlockExpectedNotFound { height: u32 }, } diff --git a/crates/services/gas_price_service/src/common/fuel_core_storage_adapter.rs b/crates/services/gas_price_service/src/common/fuel_core_storage_adapter.rs index e81ea58b2c3..4d452421ba1 100644 --- a/crates/services/gas_price_service/src/common/fuel_core_storage_adapter.rs +++ b/crates/services/gas_price_service/src/common/fuel_core_storage_adapter.rs @@ -111,7 +111,7 @@ pub fn get_block_info( Ok(info) } -fn mint_values(block: &Block) -> GasPriceResult<(u64, u64)> { +pub(crate) fn mint_values(block: &Block) -> GasPriceResult<(u64, u64)> { let mint = block .transactions() .last() @@ -121,6 +121,13 @@ fn mint_values(block: &Block) -> GasPriceResult<(u64, u64)> { })?; Ok((*mint.mint_amount(), *mint.gas_price())) } + +// TODO: Don't take a direct dependency on `Postcard` as it's not guaranteed to be the encoding format +// https://github.com/FuelLabs/fuel-core/issues/2443 +pub(crate) fn block_bytes(block: &Block) -> u64 { + Postcard::encode(block).len() as u64 +} + fn block_used_gas( fee: u64, gas_price: u64, diff --git a/crates/services/gas_price_service/src/v0/service.rs b/crates/services/gas_price_service/src/v0/service.rs index 7591c8a377b..557d8cd6842 100644 --- a/crates/services/gas_price_service/src/v0/service.rs +++ b/crates/services/gas_price_service/src/v0/service.rs @@ -141,6 +141,7 @@ where Metadata: MetadataStorage, { async fn run(&mut self, watcher: &mut StateWatcher) -> TaskNextAction { + tracing::trace!("Call of `run` function of the gas price service v0"); tokio::select! { biased; _ = watcher.while_started() => { @@ -148,6 +149,7 @@ where TaskNextAction::Stop } l2_block_res = self.l2_block_source.get_l2_block() => { + tracing::debug!("Received L2 block"); let res = self.process_l2_block_res(l2_block_res).await; TaskNextAction::always_continue(res) } diff --git a/crates/services/gas_price_service/src/v0/tests.rs b/crates/services/gas_price_service/src/v0/tests.rs index 8a224a482c6..294c5029a4d 100644 --- a/crates/services/gas_price_service/src/v0/tests.rs +++ b/crates/services/gas_price_service/src/v0/tests.rs @@ -167,14 +167,16 @@ async fn next_gas_price__affected_by_new_l2_block() { algo_updater, ); + tracing::debug!("service created"); + let read_algo = service.next_block_algorithm(); let initial = read_algo.next_gas_price(); - let mut watcher = StateWatcher::default(); + let mut watcher = StateWatcher::started(); + tokio::spawn(async move { service.run(&mut watcher).await }); // when - service.run(&mut watcher).await; l2_block_sender.send(l2_block).await.unwrap(); - service.shutdown().await.unwrap(); + tokio::time::sleep(std::time::Duration::from_millis(10)).await; // then let new = read_algo.next_gas_price(); @@ -212,18 +214,16 @@ async fn next__new_l2_block_saves_old_metadata() { algo_updater, ); - // when - let read_algo = service.next_block_algorithm(); - let mut watcher = StateWatcher::default(); - let start = read_algo.next_gas_price(); + let mut watcher = StateWatcher::started(); + tokio::spawn(async move { service.run(&mut watcher).await }); - service.run(&mut watcher).await; + // when l2_block_sender.send(l2_block).await.unwrap(); - service.shutdown().await.unwrap(); + tokio::time::sleep(std::time::Duration::from_millis(10)).await; // then - let new = read_algo.next_gas_price(); - assert_ne!(start, new); + let metadata_has_been_updated = metadata_inner.lock().unwrap().is_some(); + assert!(metadata_has_been_updated); } #[derive(Clone)] diff --git a/crates/services/gas_price_service/src/v1.rs b/crates/services/gas_price_service/src/v1.rs index fafb7245ef8..c0e83dbb6e7 100644 --- a/crates/services/gas_price_service/src/v1.rs +++ b/crates/services/gas_price_service/src/v1.rs @@ -2,3 +2,7 @@ pub mod algorithm; pub mod da_source_service; pub mod metadata; pub mod service; + +#[cfg(test)] +mod tests; +pub mod uninitialized_task; diff --git a/crates/services/gas_price_service/src/v1/metadata.rs b/crates/services/gas_price_service/src/v1/metadata.rs index 68253981eec..54000623bac 100644 --- a/crates/services/gas_price_service/src/v1/metadata.rs +++ b/crates/services/gas_price_service/src/v1/metadata.rs @@ -92,7 +92,11 @@ impl From<&V1AlgorithmConfig> for AlgorithmUpdaterV1 { .map(|size| u128::from(*size)) .sum(); Self { - new_scaled_exec_price: value.new_exec_gas_price, + // TODO:We don't need this after we implement + // https://github.com/FuelLabs/fuel-core/issues/2481 + new_scaled_exec_price: value + .new_exec_gas_price + .saturating_mul(value.gas_price_factor.get()), l2_block_height: 0, new_scaled_da_gas_price: value.min_da_gas_price, gas_price_factor: value.gas_price_factor, diff --git a/crates/services/gas_price_service/src/v1/service.rs b/crates/services/gas_price_service/src/v1/service.rs index 32baf69618b..f95649144a3 100644 --- a/crates/services/gas_price_service/src/v1/service.rs +++ b/crates/services/gas_price_service/src/v1/service.rs @@ -254,7 +254,7 @@ where async fn shutdown(mut self) -> anyhow::Result<()> { // handle all the remaining l2 blocks while let Some(Ok(block)) = self.l2_block_source.get_l2_block().now_or_never() { - tracing::debug!("Updating gas price algorithm"); + tracing::debug!("Updating gas price algorithm before shutdown"); self.apply_block_info_to_gas_algorithm(block).await?; } diff --git a/crates/services/gas_price_service/src/v1/tests.rs b/crates/services/gas_price_service/src/v1/tests.rs new file mode 100644 index 00000000000..e5188a9c869 --- /dev/null +++ b/crates/services/gas_price_service/src/v1/tests.rs @@ -0,0 +1,481 @@ +#![allow(non_snake_case)] +use crate::{ + common::{ + fuel_core_storage_adapter::{ + GasPriceSettings, + GasPriceSettingsProvider, + }, + l2_block_source::L2BlockSource, + updater_metadata::UpdaterMetadata, + utils::{ + BlockInfo, + Error as GasPriceError, + Result as GasPriceResult, + }, + }, + ports::{ + GasPriceData, + L2Data, + MetadataStorage, + }, + v1::{ + da_source_service::{ + service::{ + DaBlockCostsSource, + DaSourceService, + }, + DaBlockCosts, + }, + metadata::{ + V1AlgorithmConfig, + V1Metadata, + }, + service::{ + initialize_algorithm, + GasPriceServiceV1, + }, + uninitialized_task::UninitializedTask, + }, +}; +use anyhow::anyhow; +use fuel_core_services::{ + stream::{ + BoxStream, + IntoBoxStream, + }, + RunnableTask, + StateWatcher, +}; +use fuel_core_storage::{ + transactional::AtomicView, + Result as StorageResult, +}; +use fuel_core_types::{ + blockchain::{ + block::Block, + header::ConsensusParametersVersion, + }, + fuel_tx::Transaction, + fuel_types::BlockHeight, + services::block_importer::{ + ImportResult, + SharedImportResult, + }, +}; +use fuel_gas_price_algorithm::v1::AlgorithmUpdaterV1; +use std::{ + num::NonZeroU64, + ops::Deref, + sync::Arc, +}; +use tokio::sync::mpsc::Receiver; + +struct FakeL2BlockSource { + l2_block: Receiver, +} + +#[async_trait::async_trait] +impl L2BlockSource for FakeL2BlockSource { + async fn get_l2_block(&mut self) -> GasPriceResult { + let block = self.l2_block.recv().await.unwrap(); + Ok(block) + } +} + +struct FakeMetadata { + inner: Arc>>, +} + +impl FakeMetadata { + fn empty() -> Self { + Self { + inner: Arc::new(std::sync::Mutex::new(None)), + } + } +} + +impl MetadataStorage for FakeMetadata { + fn get_metadata(&self, _: &BlockHeight) -> GasPriceResult> { + let metadata = self.inner.lock().unwrap().clone(); + Ok(metadata) + } + + fn set_metadata(&mut self, metadata: &UpdaterMetadata) -> GasPriceResult<()> { + *self.inner.lock().unwrap() = Some(metadata.clone()); + Ok(()) + } +} + +struct ErroringMetadata; + +impl MetadataStorage for ErroringMetadata { + fn get_metadata(&self, _: &BlockHeight) -> GasPriceResult> { + Err(GasPriceError::CouldNotFetchMetadata { + source_error: anyhow!("boo!"), + }) + } + + fn set_metadata(&mut self, _: &UpdaterMetadata) -> GasPriceResult<()> { + Err(GasPriceError::CouldNotSetMetadata { + block_height: Default::default(), + source_error: anyhow!("boo!"), + }) + } +} + +struct FakeDABlockCost { + da_block_costs: Receiver, +} + +impl FakeDABlockCost { + fn never_returns() -> Self { + let (_sender, receiver) = tokio::sync::mpsc::channel(1); + Self { + da_block_costs: receiver, + } + } + + fn new(da_block_costs: Receiver) -> Self { + Self { da_block_costs } + } +} + +#[async_trait::async_trait] +impl DaBlockCostsSource for FakeDABlockCost { + async fn request_da_block_cost(&mut self) -> anyhow::Result { + let costs = self.da_block_costs.recv().await.unwrap(); + Ok(costs) + } +} + +fn zero_threshold_arbitrary_config() -> V1AlgorithmConfig { + V1AlgorithmConfig { + new_exec_gas_price: 100, + min_exec_gas_price: 0, + exec_gas_price_change_percent: 10, + l2_block_fullness_threshold_percent: 0, + gas_price_factor: NonZeroU64::new(100).unwrap(), + min_da_gas_price: 0, + max_da_gas_price_change_percent: 0, + da_p_component: 0, + da_d_component: 0, + normal_range_size: 0, + capped_range_size: 0, + decrease_range_size: 0, + block_activity_threshold: 0, + unrecorded_blocks: vec![], + } +} + +fn arbitrary_metadata() -> V1Metadata { + V1Metadata { + new_scaled_exec_price: 100, + l2_block_height: 0, + new_scaled_da_gas_price: 0, + gas_price_factor: NonZeroU64::new(100).unwrap(), + total_da_rewards_excess: 0, + latest_known_total_da_cost_excess: 0, + last_profit: 0, + second_to_last_profit: 0, + latest_da_cost_per_byte: 0, + unrecorded_blocks: vec![], + } +} + +fn different_arb_config() -> V1AlgorithmConfig { + V1AlgorithmConfig { + new_exec_gas_price: 200, + min_exec_gas_price: 0, + exec_gas_price_change_percent: 20, + l2_block_fullness_threshold_percent: 0, + gas_price_factor: NonZeroU64::new(100).unwrap(), + min_da_gas_price: 0, + max_da_gas_price_change_percent: 0, + da_p_component: 0, + da_d_component: 0, + normal_range_size: 0, + capped_range_size: 0, + decrease_range_size: 0, + block_activity_threshold: 0, + unrecorded_blocks: vec![], + } +} + +#[tokio::test] +async fn next_gas_price__affected_by_new_l2_block() { + // given + let l2_block = BlockInfo::Block { + height: 1, + gas_used: 60, + block_gas_capacity: 100, + block_bytes: 100, + block_fees: 100, + }; + let (l2_block_sender, l2_block_receiver) = tokio::sync::mpsc::channel(1); + let l2_block_source = FakeL2BlockSource { + l2_block: l2_block_receiver, + }; + let metadata_storage = FakeMetadata::empty(); + + let config = zero_threshold_arbitrary_config(); + let height = 0; + let (algo_updater, shared_algo) = + initialize_algorithm(&config, height, &metadata_storage).unwrap(); + let da_source = FakeDABlockCost::never_returns(); + let da_source_service = DaSourceService::new(da_source, None); + let mut service = GasPriceServiceV1::new( + l2_block_source, + metadata_storage, + shared_algo, + algo_updater, + da_source_service, + ); + + let read_algo = service.next_block_algorithm(); + let initial = read_algo.next_gas_price(); + let mut watcher = StateWatcher::started(); + tokio::spawn(async move { service.run(&mut watcher).await }); + + // when + l2_block_sender.send(l2_block).await.unwrap(); + tokio::time::sleep(std::time::Duration::from_millis(10)).await; + + // then + let new = read_algo.next_gas_price(); + assert_ne!(initial, new); +} + +#[tokio::test] +async fn run__new_l2_block_saves_old_metadata() { + // given + let l2_block = BlockInfo::Block { + height: 1, + gas_used: 60, + block_gas_capacity: 100, + block_bytes: 100, + block_fees: 100, + }; + let (l2_block_sender, l2_block_receiver) = tokio::sync::mpsc::channel(1); + let l2_block_source = FakeL2BlockSource { + l2_block: l2_block_receiver, + }; + let metadata_inner = Arc::new(std::sync::Mutex::new(None)); + let metadata_storage = FakeMetadata { + inner: metadata_inner.clone(), + }; + + let config = zero_threshold_arbitrary_config(); + let height = 0; + let (algo_updater, shared_algo) = + initialize_algorithm(&config, height, &metadata_storage).unwrap(); + let da_source = FakeDABlockCost::never_returns(); + let da_source_service = DaSourceService::new(da_source, None); + let mut service = GasPriceServiceV1::new( + l2_block_source, + metadata_storage, + shared_algo, + algo_updater, + da_source_service, + ); + let mut watcher = StateWatcher::default(); + + // when + service.run(&mut watcher).await; + l2_block_sender.send(l2_block).await.unwrap(); + service.shutdown().await.unwrap(); + + // then + let metadata_is_some = metadata_inner.lock().unwrap().is_some(); + assert!(metadata_is_some) +} + +#[derive(Clone)] +struct FakeSettings; + +impl GasPriceSettingsProvider for FakeSettings { + fn settings( + &self, + _param_version: &ConsensusParametersVersion, + ) -> GasPriceResult { + unimplemented!() + } +} + +#[derive(Clone)] +struct FakeGasPriceDb; + +// GasPriceData + Modifiable + KeyValueInspect +impl GasPriceData for FakeGasPriceDb { + fn latest_height(&self) -> Option { + unimplemented!() + } +} + +#[derive(Clone)] +struct FakeOnChainDb { + height: BlockHeight, +} + +impl FakeOnChainDb { + fn new(height: u32) -> Self { + Self { + height: height.into(), + } + } +} + +struct FakeL2Data { + height: BlockHeight, +} + +impl FakeL2Data { + fn new(height: BlockHeight) -> Self { + Self { height } + } +} + +impl L2Data for FakeL2Data { + fn latest_height(&self) -> StorageResult { + Ok(self.height) + } + + fn get_block( + &self, + _height: &BlockHeight, + ) -> StorageResult>> { + unimplemented!() + } +} +impl AtomicView for FakeOnChainDb { + type LatestView = FakeL2Data; + + fn latest_view(&self) -> StorageResult { + Ok(FakeL2Data::new(self.height)) + } +} + +fn empty_block_stream() -> BoxStream { + let blocks: Vec + Send + Sync>> = vec![]; + tokio_stream::iter(blocks).into_boxed() +} + +#[tokio::test] +async fn uninitialized_task__new__if_exists_already_reload_old_values_with_overrides() { + // given + let original_metadata = arbitrary_metadata(); + let original = UpdaterMetadata::V1(original_metadata.clone()); + let metadata_inner = Arc::new(std::sync::Mutex::new(Some(original.clone()))); + let metadata_storage = FakeMetadata { + inner: metadata_inner, + }; + + let different_config = different_arb_config(); + let descaleed_exec_price = + original_metadata.new_scaled_exec_price / original_metadata.gas_price_factor; + assert_ne!(different_config.new_exec_gas_price, descaleed_exec_price); + let different_l2_block = 1231; + assert_ne!(different_l2_block, original_metadata.l2_block_height); + let settings = FakeSettings; + let block_stream = empty_block_stream(); + let gas_price_db = FakeGasPriceDb; + let on_chain_db = FakeOnChainDb::new(different_l2_block); + let da_cost_source = FakeDABlockCost::never_returns(); + + // when + let service = UninitializedTask::new( + different_config, + 0.into(), + settings, + block_stream, + gas_price_db, + metadata_storage, + da_cost_source, + on_chain_db, + ) + .unwrap(); + + // then + let UninitializedTask { algo_updater, .. } = service; + algo_updater_matches_values_from_old_metadata(algo_updater, original_metadata); +} + +fn algo_updater_matches_values_from_old_metadata( + algo_updater: AlgorithmUpdaterV1, + original_metadata: V1Metadata, +) { + let V1Metadata { + new_scaled_exec_price: original_new_scaled_exec_price, + l2_block_height: original_l2_block_height, + new_scaled_da_gas_price: original_new_scaled_da_gas_price, + gas_price_factor: original_gas_price_factor, + total_da_rewards_excess: original_total_da_rewards_excess, + latest_known_total_da_cost_excess: original_latest_known_total_da_cost_excess, + last_profit: original_last_profit, + second_to_last_profit: original_second_to_last_profit, + latest_da_cost_per_byte: original_latest_da_cost_per_byte, + unrecorded_blocks: original_unrecorded_blocks, + } = original_metadata; + assert_eq!( + algo_updater.new_scaled_exec_price, + original_new_scaled_exec_price + ); + assert_eq!(algo_updater.l2_block_height, original_l2_block_height); + assert_eq!( + algo_updater.new_scaled_da_gas_price, + original_new_scaled_da_gas_price + ); + assert_eq!(algo_updater.gas_price_factor, original_gas_price_factor); + assert_eq!( + algo_updater.total_da_rewards_excess, + original_total_da_rewards_excess + ); + assert_eq!( + algo_updater.latest_known_total_da_cost_excess, + original_latest_known_total_da_cost_excess + ); + assert_eq!(algo_updater.last_profit, original_last_profit); + assert_eq!( + algo_updater.second_to_last_profit, + original_second_to_last_profit + ); + assert_eq!( + algo_updater.latest_da_cost_per_byte, + original_latest_da_cost_per_byte + ); + assert_eq!( + algo_updater + .unrecorded_blocks + .into_iter() + .collect::>(), + original_unrecorded_blocks.into_iter().collect::>() + ); +} + +#[tokio::test] +async fn uninitialized_task__new__should_fail_if_cannot_fetch_metadata() { + // given + let config = zero_threshold_arbitrary_config(); + let different_l2_block = 1231; + let metadata_storage = ErroringMetadata; + let settings = FakeSettings; + let block_stream = empty_block_stream(); + let gas_price_db = FakeGasPriceDb; + let on_chain_db = FakeOnChainDb::new(different_l2_block); + let da_cost_source = FakeDABlockCost::never_returns(); + + // when + let res = UninitializedTask::new( + config, + 0.into(), + settings, + block_stream, + gas_price_db, + metadata_storage, + da_cost_source, + on_chain_db, + ); + + // then + let is_err = res.is_err(); + assert!(is_err); +} diff --git a/crates/services/gas_price_service/src/v1/uninitialized_task.rs b/crates/services/gas_price_service/src/v1/uninitialized_task.rs new file mode 100644 index 00000000000..2f5699c41f7 --- /dev/null +++ b/crates/services/gas_price_service/src/v1/uninitialized_task.rs @@ -0,0 +1,359 @@ +use crate::{ + common::{ + fuel_core_storage_adapter::{ + block_bytes, + get_block_info, + mint_values, + GasPriceSettings, + GasPriceSettingsProvider, + }, + gas_price_algorithm::SharedGasPriceAlgo, + l2_block_source::FuelL2BlockSource, + updater_metadata::UpdaterMetadata, + utils::{ + BlockInfo, + Error as GasPriceError, + Result as GasPriceResult, + }, + }, + ports::{ + GasPriceData, + GasPriceServiceConfig, + L2Data, + MetadataStorage, + }, + v1::{ + algorithm::SharedV1Algorithm, + da_source_service::service::{ + DaBlockCostsSource, + DaSourceService, + }, + metadata::{ + v1_algorithm_from_metadata, + V1AlgorithmConfig, + V1Metadata, + }, + service::{ + initialize_algorithm, + GasPriceServiceV1, + }, + }, +}; +use anyhow::Error; +use fuel_core_services::{ + stream::BoxStream, + RunnableService, + ServiceRunner, + StateWatcher, +}; +use fuel_core_storage::{ + not_found, + transactional::AtomicView, +}; +use fuel_core_types::{ + fuel_tx::field::MintAmount, + fuel_types::BlockHeight, + services::block_importer::SharedImportResult, +}; +use fuel_gas_price_algorithm::v1::AlgorithmUpdaterV1; + +pub struct UninitializedTask< + L2DataStoreView, + GasPriceStore, + Metadata, + DA, + SettingsProvider, +> { + pub config: V1AlgorithmConfig, + pub genesis_block_height: BlockHeight, + pub settings: SettingsProvider, + pub gas_price_db: GasPriceStore, + pub on_chain_db: L2DataStoreView, + pub block_stream: BoxStream, + pub(crate) shared_algo: SharedV1Algorithm, + pub(crate) algo_updater: AlgorithmUpdaterV1, + pub(crate) metadata_storage: Metadata, + pub(crate) da_source: DA, +} + +impl + UninitializedTask +where + L2DataStore: L2Data, + L2DataStoreView: AtomicView, + GasPriceStore: GasPriceData, + Metadata: MetadataStorage, + DA: DaBlockCostsSource, + SettingsProvider: GasPriceSettingsProvider, +{ + #[allow(clippy::too_many_arguments)] + pub fn new( + config: V1AlgorithmConfig, + genesis_block_height: BlockHeight, + settings: SettingsProvider, + block_stream: BoxStream, + gas_price_db: GasPriceStore, + metadata_storage: Metadata, + da_source: DA, + on_chain_db: L2DataStoreView, + ) -> anyhow::Result { + let latest_block_height: u32 = on_chain_db + .latest_view()? + .latest_height() + .unwrap_or(genesis_block_height) + .into(); + + let (algo_updater, shared_algo) = + initialize_algorithm(&config, latest_block_height, &metadata_storage)?; + + let task = Self { + config, + genesis_block_height, + settings, + gas_price_db, + on_chain_db, + block_stream, + algo_updater, + shared_algo, + metadata_storage, + da_source, + }; + Ok(task) + } + + pub fn init( + mut self, + ) -> anyhow::Result< + GasPriceServiceV1, Metadata, DA>, + > { + let mut first_run = false; + let latest_block_height: u32 = self + .on_chain_db + .latest_view()? + .latest_height() + .unwrap_or(self.genesis_block_height) + .into(); + + let maybe_metadata_height = self.gas_price_db.latest_height(); + let metadata_height = if let Some(metadata_height) = maybe_metadata_height { + metadata_height.into() + } else { + first_run = true; + latest_block_height + }; + + let l2_block_source = FuelL2BlockSource::new( + self.genesis_block_height, + self.settings.clone(), + self.block_stream, + ); + + // TODO: Add to config + // https://github.com/FuelLabs/fuel-core/issues/2140 + let poll_interval = None; + let da_service = DaSourceService::new(self.da_source, poll_interval); + + if BlockHeight::from(latest_block_height) == self.genesis_block_height + || first_run + { + let service = GasPriceServiceV1::new( + l2_block_source, + self.metadata_storage, + self.shared_algo, + self.algo_updater, + da_service, + ); + Ok(service) + } else { + if latest_block_height > metadata_height { + sync_gas_price_db_with_on_chain_storage( + &self.settings, + &self.config, + &mut self.metadata_storage, + &self.on_chain_db, + metadata_height, + latest_block_height, + )?; + } + + let service = GasPriceServiceV1::new( + l2_block_source, + self.metadata_storage, + self.shared_algo, + self.algo_updater, + da_service, + ); + Ok(service) + } + } +} + +#[async_trait::async_trait] +impl + RunnableService + for UninitializedTask +where + L2DataStore: L2Data, + L2DataStoreView: AtomicView, + GasPriceStore: GasPriceData, + Metadata: MetadataStorage, + DA: DaBlockCostsSource, + SettingsProvider: GasPriceSettingsProvider, +{ + const NAME: &'static str = "GasPriceServiceV1"; + type SharedData = SharedV1Algorithm; + type Task = GasPriceServiceV1, Metadata, DA>; + type TaskParams = (); + + fn shared_data(&self) -> Self::SharedData { + self.shared_algo.clone() + } + + async fn into_task( + self, + _state_watcher: &StateWatcher, + _params: Self::TaskParams, + ) -> anyhow::Result { + UninitializedTask::init(self) + } +} + +fn sync_gas_price_db_with_on_chain_storage< + L2DataStore, + L2DataStoreView, + Metadata, + SettingsProvider, +>( + settings: &SettingsProvider, + config: &V1AlgorithmConfig, + metadata_storage: &mut Metadata, + on_chain_db: &L2DataStoreView, + metadata_height: u32, + latest_block_height: u32, +) -> anyhow::Result<()> +where + L2DataStore: L2Data, + L2DataStoreView: AtomicView, + Metadata: MetadataStorage, + SettingsProvider: GasPriceSettingsProvider, +{ + let metadata = metadata_storage + .get_metadata(&metadata_height.into())? + .ok_or(anyhow::anyhow!( + "Expected metadata to exist for height: {metadata_height}" + ))?; + + let metadata = match metadata { + UpdaterMetadata::V1(metadata) => metadata, + UpdaterMetadata::V0(metadata) => { + V1Metadata::construct_from_v0_metadata(metadata, config)? + } + }; + let mut algo_updater = v1_algorithm_from_metadata(metadata, config); + + sync_v1_metadata( + settings, + on_chain_db, + metadata_height, + latest_block_height, + &mut algo_updater, + metadata_storage, + )?; + + Ok(()) +} + +fn sync_v1_metadata( + settings: &SettingsProvider, + on_chain_db: &L2DataStoreView, + metadata_height: u32, + latest_block_height: u32, + updater: &mut AlgorithmUpdaterV1, + metadata_storage: &mut Metadata, +) -> anyhow::Result<()> +where + L2DataStore: L2Data, + L2DataStoreView: AtomicView, + Metadata: MetadataStorage, + SettingsProvider: GasPriceSettingsProvider, +{ + let first = metadata_height.saturating_add(1); + let view = on_chain_db.latest_view()?; + for height in first..=latest_block_height { + let block = view + .get_block(&height.into())? + .ok_or(not_found!("FullBlock"))?; + let param_version = block.header().consensus_parameters_version; + + let GasPriceSettings { + gas_price_factor, + block_gas_limit, + } = settings.settings(¶m_version)?; + let block_gas_capacity = block_gas_limit.try_into()?; + + let block_gas_used = + match get_block_info(&block, gas_price_factor, block_gas_limit)? { + BlockInfo::GenesisBlock => { + Err(anyhow::anyhow!("should not be genesis block"))? + } + BlockInfo::Block { gas_used, .. } => gas_used, + }; + + let block_bytes = block_bytes(&block); + let (fee_wei, _) = mint_values(&block)?; + updater.update_l2_block_data( + height, + block_gas_used, + block_gas_capacity, + block_bytes, + fee_wei.into(), + )?; + let metadata: UpdaterMetadata = updater.clone().into(); + metadata_storage.set_metadata(&metadata)?; + } + + Ok(()) +} + +#[allow(clippy::type_complexity)] +#[allow(clippy::too_many_arguments)] +pub fn new_gas_price_service_v1< + L2DataStore, + GasPriceStore, + Metadata, + DA, + SettingsProvider, +>( + v1_config: V1AlgorithmConfig, + genesis_block_height: BlockHeight, + settings: SettingsProvider, + block_stream: BoxStream, + gas_price_db: GasPriceStore, + metadata: Metadata, + da_source: DA, + on_chain_db: L2DataStore, +) -> anyhow::Result< + ServiceRunner< + UninitializedTask, + >, +> +where + L2DataStore: AtomicView, + L2DataStore::LatestView: L2Data, + GasPriceStore: GasPriceData, + SettingsProvider: GasPriceSettingsProvider, + Metadata: MetadataStorage, + DA: DaBlockCostsSource, +{ + let gas_price_init = UninitializedTask::new( + v1_config, + genesis_block_height, + settings, + block_stream, + gas_price_db, + metadata, + da_source, + on_chain_db, + )?; + Ok(ServiceRunner::new(gas_price_init)) +}