From 9de2c08cfdf8e4f6695ddc2c0c660e19db87b8d1 Mon Sep 17 00:00:00 2001 From: Mitchell Turner Date: Fri, 16 Aug 2024 19:19:36 +0200 Subject: [PATCH] Allow producer to produce predefined blocks (#2081) ## Linked Issues/PRs Part of https://github.com/FuelLabs/fuel-core/issues/1902 ## Description Provide mechanism for taking predefined blocks and including them in block production. Does not provide the mechanism for introducing those predefined blocks into the running system, e.g. parsing from file or something. That will come after. ## Checklist - [x] Breaking changes are clearly marked as such in the PR description and changelog - [x] New behavior is reflected in tests - [x] [The specification](https://github.com/FuelLabs/fuel-specs/) matches the implemented behavior (link update PR if changes are needed) ### Before requesting review - [x] I have reviewed the code myself - [ ] I have created follow-up issues caused by this PR and linked them here ### After merging, notify other teams [Add or remove entries as needed] - [ ] [Rust SDK](https://github.com/FuelLabs/fuels-rs/) - [ ] [Sway compiler](https://github.com/FuelLabs/sway/) - [ ] [Platform documentation](https://github.com/FuelLabs/devrel-requests/issues/new?assignees=&labels=new+request&projects=&template=NEW-REQUEST.yml&title=%5BRequest%5D%3A+) (for out-of-organization contributors, the person merging the PR will do this) - [ ] Someone else? --------- Co-authored-by: green --- CHANGELOG.md | 1 + Cargo.lock | 3 + .../service/adapters/consensus_module/poa.rs | 10 + crates/fuel-core/src/service/config.rs | 5 + crates/fuel-core/src/service/sub_services.rs | 20 +- .../services/consensus_module/poa/Cargo.toml | 3 + .../consensus_module/poa/src/config.rs | 3 + .../consensus_module/poa/src/ports.rs | 33 +++ .../consensus_module/poa/src/service.rs | 166 ++++++++++--- .../consensus_module/poa/src/service_test.rs | 219 +++++++++++++++++- crates/services/producer/Cargo.toml | 1 + .../block_producer/tests.txt | 7 + .../services/producer/src/block_producer.rs | 79 ++++++- .../producer/src/block_producer/tests.rs | 204 +++++++++++++++- crates/services/producer/src/mocks.rs | 48 +++- tests/tests/snapshot.rs | 10 +- 16 files changed, 753 insertions(+), 59 deletions(-) create mode 100644 crates/services/producer/proptest-regressions/block_producer/tests.txt diff --git a/CHANGELOG.md b/CHANGELOG.md index 9f8b26657df..9885039c012 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -7,6 +7,7 @@ and this project adheres to [Semantic Versioning](http://semver.org/). ## [Unreleased] ### Added +- [2081](https://github.com/FuelLabs/fuel-core/pull/2081): Enable producer to include predefined blocks. - [2079](https://github.com/FuelLabs/fuel-core/pull/2079): Open unknown columns in the RocksDB for forward compatibility. ### Changed diff --git a/Cargo.lock b/Cargo.lock index 086b4b6e486..8901cc23ae3 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3156,6 +3156,8 @@ dependencies = [ "fuel-core-types", "mockall", "rand", + "serde", + "serde_json", "test-case", "tokio", "tokio-stream", @@ -3174,6 +3176,7 @@ dependencies = [ "fuel-core-trace", "fuel-core-types", "mockall", + "proptest", "rand", "tokio", "tokio-rayon", diff --git a/crates/fuel-core/src/service/adapters/consensus_module/poa.rs b/crates/fuel-core/src/service/adapters/consensus_module/poa.rs index ff2c36d5261..cf5661e5b0f 100644 --- a/crates/fuel-core/src/service/adapters/consensus_module/poa.rs +++ b/crates/fuel-core/src/service/adapters/consensus_module/poa.rs @@ -24,6 +24,7 @@ use fuel_core_poa::{ use fuel_core_services::stream::BoxStream; use fuel_core_storage::transactional::Changes; use fuel_core_types::{ + blockchain::block::Block, fuel_tx::TxId, fuel_types::BlockHeight, services::{ @@ -120,6 +121,15 @@ impl fuel_core_poa::ports::BlockProducer for BlockProducerAdapter { } } } + + async fn produce_predefined_block( + &self, + block: &Block, + ) -> anyhow::Result> { + self.block_producer + .produce_and_execute_predefined(block) + .await + } } #[async_trait::async_trait] diff --git a/crates/fuel-core/src/service/config.rs b/crates/fuel-core/src/service/config.rs index 1c375a4af41..0e84142916a 100644 --- a/crates/fuel-core/src/service/config.rs +++ b/crates/fuel-core/src/service/config.rs @@ -204,6 +204,11 @@ impl From<&Config> for fuel_core_poa::Config { metrics: false, min_connected_reserved_peers: config.min_connected_reserved_peers, time_until_synced: config.time_until_synced, + chain_id: config + .snapshot_reader + .chain_config() + .consensus_parameters + .chain_id(), } } } diff --git a/crates/fuel-core/src/service/sub_services.rs b/crates/fuel-core/src/service/sub_services.rs index 136870874f6..d8f1bf2b27c 100644 --- a/crates/fuel-core/src/service/sub_services.rs +++ b/crates/fuel-core/src/service/sub_services.rs @@ -42,7 +42,10 @@ use fuel_core_gas_price_service::fuel_gas_price_updater::{ UpdaterMetadata, V0Metadata, }; -use fuel_core_poa::Trigger; +use fuel_core_poa::{ + ports::InMemoryPredefinedBlocks, + Trigger, +}; use fuel_core_services::{ RunnableService, ServiceRunner, @@ -53,13 +56,20 @@ use fuel_core_storage::{ }; #[cfg(feature = "relayer")] use fuel_core_types::blockchain::primitives::DaBlockHeight; -use std::sync::Arc; +use std::{ + collections::HashMap, + sync::Arc, +}; use tokio::sync::Mutex; mod algorithm_updater; -pub type PoAService = - fuel_core_poa::Service; +pub type PoAService = fuel_core_poa::Service< + TxPoolAdapter, + BlockProducerAdapter, + BlockImporterAdapter, + InMemoryPredefinedBlocks, +>; #[cfg(feature = "p2p")] pub type P2PService = fuel_core_p2p::service::Service; pub type TxPoolSharedState = fuel_core_txpool::service::SharedState< @@ -235,6 +245,7 @@ pub fn init_sub_services( tracing::info!("Enabled manual block production because of `debug` flag"); } + let predefined_blocks: InMemoryPredefinedBlocks = HashMap::new().into(); let poa = (production_enabled).then(|| { fuel_core_poa::new_service( &last_block_header, @@ -243,6 +254,7 @@ pub fn init_sub_services( producer_adapter.clone(), importer_adapter.clone(), p2p_adapter.clone(), + predefined_blocks, ) }); let poa_adapter = PoAAdapter::new(poa.as_ref().map(|service| service.shared.clone())); diff --git a/crates/services/consensus_module/poa/Cargo.toml b/crates/services/consensus_module/poa/Cargo.toml index 3a96a22e015..3ae865c7de2 100644 --- a/crates/services/consensus_module/poa/Cargo.toml +++ b/crates/services/consensus_module/poa/Cargo.toml @@ -16,12 +16,15 @@ fuel-core-chain-config = { workspace = true } fuel-core-services = { workspace = true } fuel-core-storage = { workspace = true } fuel-core-types = { workspace = true } +serde = { workspace = true, features = ["derive"] } +serde_json = { workspace = true } tokio = { workspace = true, features = ["full"] } tokio-stream = { workspace = true } tracing = { workspace = true } [dev-dependencies] fuel-core-poa = { path = ".", features = ["test-helpers"] } +fuel-core-services = { workspace = true, features = ["test-helpers"] } fuel-core-storage = { path = "./../../../storage", features = ["test-helpers"] } fuel-core-types = { path = "./../../../types", features = ["test-helpers"] } mockall = { workspace = true } diff --git a/crates/services/consensus_module/poa/src/config.rs b/crates/services/consensus_module/poa/src/config.rs index 83f8596b23d..7da6cad5b80 100644 --- a/crates/services/consensus_module/poa/src/config.rs +++ b/crates/services/consensus_module/poa/src/config.rs @@ -1,5 +1,6 @@ use fuel_core_types::{ blockchain::primitives::SecretKeyWrapper, + fuel_types::ChainId, secrecy::Secret, }; use tokio::time::Duration; @@ -11,6 +12,7 @@ pub struct Config { pub metrics: bool, pub min_connected_reserved_peers: usize, pub time_until_synced: Duration, + pub chain_id: ChainId, } #[cfg(feature = "test-helpers")] @@ -22,6 +24,7 @@ impl Default for Config { metrics: false, min_connected_reserved_peers: 0, time_until_synced: Duration::ZERO, + chain_id: ChainId::default(), } } } diff --git a/crates/services/consensus_module/poa/src/ports.rs b/crates/services/consensus_module/poa/src/ports.rs index 41ed8ad4adb..e5fa26fa336 100644 --- a/crates/services/consensus_module/poa/src/ports.rs +++ b/crates/services/consensus_module/poa/src/ports.rs @@ -5,6 +5,7 @@ use fuel_core_storage::{ }; use fuel_core_types::{ blockchain::{ + block::Block, header::BlockHeader, primitives::DaBlockHeight, }, @@ -29,6 +30,7 @@ use fuel_core_types::{ }, tai64::Tai64, }; +use std::collections::HashMap; #[cfg_attr(test, mockall::automock)] pub trait TransactionPool: Send + Sync { @@ -59,6 +61,11 @@ pub trait BlockProducer: Send + Sync { block_time: Tai64, source: TransactionsSource, ) -> anyhow::Result>; + + async fn produce_predefined_block( + &self, + block: &Block, + ) -> anyhow::Result>; } #[cfg_attr(test, mockall::automock)] @@ -108,3 +115,29 @@ pub trait SyncPort: Send + Sync { /// await synchronization with the peers async fn sync_with_peers(&mut self) -> anyhow::Result<()>; } + +pub trait PredefinedBlocks: Send + Sync { + fn get_block(&self, height: &BlockHeight) -> Option; +} + +pub struct InMemoryPredefinedBlocks { + blocks: HashMap, +} + +impl From> for InMemoryPredefinedBlocks { + fn from(blocks: HashMap) -> Self { + Self::new(blocks) + } +} + +impl InMemoryPredefinedBlocks { + pub fn new(blocks: HashMap) -> Self { + Self { blocks } + } +} + +impl PredefinedBlocks for InMemoryPredefinedBlocks { + fn get_block(&self, height: &BlockHeight) -> Option { + self.blocks.get(height).cloned() + } +} diff --git a/crates/services/consensus_module/poa/src/service.rs b/crates/services/consensus_module/poa/src/service.rs index 8a3e01fcca0..43a9878b346 100644 --- a/crates/services/consensus_module/poa/src/service.rs +++ b/crates/services/consensus_module/poa/src/service.rs @@ -1,3 +1,21 @@ +use anyhow::{ + anyhow, + Context, +}; +use std::{ + ops::Deref, + sync::Arc, + time::Duration, +}; +use tokio::{ + sync::{ + mpsc, + oneshot, + }, + time::Instant, +}; +use tokio_stream::StreamExt; + use crate::{ deadline_clock::{ DeadlineClock, @@ -7,6 +25,7 @@ use crate::{ BlockImporter, BlockProducer, P2pPort, + PredefinedBlocks, TransactionPool, TransactionsSource, }, @@ -17,10 +36,6 @@ use crate::{ Config, Trigger, }; -use anyhow::{ - anyhow, - Context, -}; use fuel_core_services::{ stream::BoxStream, RunnableService, @@ -54,6 +69,7 @@ use fuel_core_types::{ services::{ block_importer::ImportResult, executor::{ + Error as ExecutorError, ExecutionResult, UncommittedResult as UncommittedExecutionResult, }, @@ -61,20 +77,9 @@ use fuel_core_types::{ }, tai64::Tai64, }; -use std::{ - ops::Deref, - time::Duration, -}; -use tokio::{ - sync::{ - mpsc, - oneshot, - }, - time::Instant, -}; -use tokio_stream::StreamExt; +use serde::Serialize; -pub type Service = ServiceRunner>; +pub type Service = ServiceRunner>; #[derive(Clone)] pub struct SharedState { request_sender: mpsc::Sender, @@ -128,7 +133,7 @@ pub(crate) enum RequestType { Trigger, } -pub struct MainTask { +pub struct MainTask { signing_key: Option>, block_producer: B, block_importer: I, @@ -139,16 +144,18 @@ pub struct MainTask { last_height: BlockHeight, last_timestamp: Tai64, last_block_created: Instant, + predefined_blocks: PB, trigger: Trigger, /// Deadline clock, used by the triggers timer: DeadlineClock, sync_task_handle: ServiceRunner, } -impl MainTask +impl MainTask where T: TransactionPool, I: BlockImporter, + PB: PredefinedBlocks, { pub fn new( last_block: &BlockHeader, @@ -157,6 +164,7 @@ where block_producer: B, block_importer: I, p2p_port: P, + predefined_blocks: PB, ) -> Self { let tx_status_update_stream = txpool.transaction_status_events(); let (request_sender, request_receiver) = mpsc::channel(1024); @@ -195,6 +203,7 @@ where last_height, last_timestamp, last_block_created, + predefined_blocks, trigger, timer: DeadlineClock::new(), sync_task_handle, @@ -203,10 +212,10 @@ where fn extract_block_info(last_block: &BlockHeader) -> (BlockHeight, Tai64, Instant) { let last_timestamp = last_block.time(); - let duration = + let duration_since_last_block = Duration::from_secs(Tai64::now().0.saturating_sub(last_timestamp.0)); let last_block_created = Instant::now() - .checked_sub(duration) + .checked_sub(duration_since_last_block) .unwrap_or(Instant::now()); let last_height = *last_block.height(); (last_height, last_timestamp, last_block_created) @@ -241,11 +250,12 @@ where } } -impl MainTask +impl MainTask where T: TransactionPool, B: BlockProducer, I: BlockImporter, + PB: PredefinedBlocks, { // Request the block producer to make a new block, and return it when ready async fn signal_produce_block( @@ -387,6 +397,65 @@ where Ok(()) } + async fn produce_predefined_block( + &mut self, + predefined_block: &Block, + ) -> anyhow::Result<()> { + tracing::info!("Producing predefined block"); + let last_block_created = Instant::now(); + // verify signing key is set + if self.signing_key.is_none() { + return Err(anyhow!("unable to produce blocks without a consensus key")) + } + + // Ask the block producer to create the block + let ( + ExecutionResult { + block, + skipped_transactions, + tx_status, + events, + }, + changes, + ) = self + .block_producer + .produce_predefined_block(predefined_block) + .await? + .into(); + + if !skipped_transactions.is_empty() { + let block_and_skipped = PredefinedBlockWithSkippedTransactions { + block: predefined_block.clone(), + skipped_transactions, + }; + let serialized = serde_json::to_string_pretty(&block_and_skipped)?; + tracing::error!( + "During block production got invalid transactions: BEGIN {} END", + serialized + ); + } + // Sign the block and seal it + let seal = seal_block(&self.signing_key, &block)?; + let sealed_block = SealedBlock { + entity: block, + consensus: seal, + }; + // Import the sealed block + self.block_importer + .commit_result(Uncommitted::new( + ImportResult::new_from_local(sealed_block.clone(), tx_status, events), + changes, + )) + .await?; + + // Update last block time + self.last_height = *sealed_block.entity.header().height(); + self.last_timestamp = sealed_block.entity.header().time(); + self.last_block_created = last_block_created; + + Ok(()) + } + pub(crate) async fn on_txpool_event(&mut self) -> anyhow::Result<()> { match self.trigger { Trigger::Instant => { @@ -413,17 +482,32 @@ where } } } + fn update_last_block_values(&mut self, block_header: &Arc) { + let (last_height, last_timestamp, last_block_created) = + Self::extract_block_info(block_header); + if last_height > self.last_height { + self.last_height = last_height; + self.last_timestamp = last_timestamp; + self.last_block_created = last_block_created; + } + } +} + +#[derive(Serialize)] +struct PredefinedBlockWithSkippedTransactions { + block: Block, + skipped_transactions: Vec<(TxId, ExecutorError)>, } #[async_trait::async_trait] -impl RunnableService for MainTask +impl RunnableService for MainTask where Self: RunnableTask, { const NAME: &'static str = "PoA"; type SharedData = SharedState; - type Task = MainTask; + type Task = MainTask; type TaskParams = (); fn shared_data(&self) -> Self::SharedData { @@ -451,24 +535,25 @@ where } #[async_trait::async_trait] -impl RunnableTask for MainTask +impl RunnableTask for MainTask where T: TransactionPool, B: BlockProducer, I: BlockImporter, + PB: PredefinedBlocks, { async fn run(&mut self, watcher: &mut StateWatcher) -> anyhow::Result { let should_continue; - let mut state = self.sync_task_handle.shared.clone(); + let mut sync_state = self.sync_task_handle.shared.clone(); // make sure we're synced first - while *state.borrow_and_update() == SyncState::NotSynced { + while *sync_state.borrow_and_update() == SyncState::NotSynced { tokio::select! { biased; result = watcher.while_started() => { should_continue = result?.started(); return Ok(should_continue); } - _ = state.changed() => { + _ = sync_state.changed() => { break; } _ = self.tx_status_update_stream.next() => { @@ -480,16 +565,17 @@ where } } - if let SyncState::Synced(block_header) = &*state.borrow_and_update() { - let (last_height, last_timestamp, last_block_created) = - Self::extract_block_info(block_header); - if last_height > self.last_height { - self.last_height = last_height; - self.last_timestamp = last_timestamp; - self.last_block_created = last_block_created; - } + if let SyncState::Synced(block_header) = &*sync_state.borrow_and_update() { + self.update_last_block_values(block_header); } + let next_height = self.next_height(); + let maybe_block = self.predefined_blocks.get_block(&next_height); + if let Some(block) = maybe_block { + self.produce_predefined_block(&block).await?; + should_continue = true; + return Ok(should_continue) + } tokio::select! { biased; _ = watcher.while_started() => { @@ -528,6 +614,7 @@ where should_continue = true; } } + Ok(should_continue) } @@ -538,18 +625,20 @@ where } } -pub fn new_service( +pub fn new_service( last_block: &BlockHeader, config: Config, txpool: T, block_producer: B, block_importer: I, p2p_port: P, -) -> Service + predefined_blocks: PB, +) -> Service where T: TransactionPool + 'static, B: BlockProducer + 'static, I: BlockImporter + 'static, + PB: PredefinedBlocks + 'static, P: P2pPort, { Service::new(MainTask::new( @@ -559,6 +648,7 @@ where block_producer, block_importer, p2p_port, + predefined_blocks, )) } diff --git a/crates/services/consensus_module/poa/src/service_test.rs b/crates/services/consensus_module/poa/src/service_test.rs index a9aacc3531f..4c0855d6a4e 100644 --- a/crates/services/consensus_module/poa/src/service_test.rs +++ b/crates/services/consensus_module/poa/src/service_test.rs @@ -1,26 +1,37 @@ #![allow(clippy::arithmetic_side_effects)] +#![allow(non_snake_case)] use crate::{ new_service, ports::{ + BlockProducer, + InMemoryPredefinedBlocks, MockBlockImporter, MockBlockProducer, MockP2pPort, MockTransactionPool, + TransactionsSource, }, service::MainTask, Config, Service, Trigger, }; +use async_trait::async_trait; use fuel_core_services::{ stream::pending, Service as StorageTrait, + ServiceRunner, State, }; +use fuel_core_storage::transactional::Changes; use fuel_core_types::{ blockchain::{ - header::BlockHeader, + block::Block, + header::{ + BlockHeader, + PartialBlockHeader, + }, primitives::SecretKeyWrapper, SealedBlock, }, @@ -47,7 +58,10 @@ use rand::{ SeedableRng, }; use std::{ - collections::HashSet, + collections::{ + HashMap, + HashSet, + }, sync::{ Arc, Mutex as StdMutex, @@ -148,6 +162,8 @@ impl TestContextBuilder { let p2p_port = generate_p2p_port(); + let predefined_blocks = HashMap::new().into(); + let service = new_service( &BlockHeader::new_block(BlockHeight::from(1u32), Tai64::now()), config, @@ -155,6 +171,7 @@ impl TestContextBuilder { producer, importer, p2p_port, + predefined_blocks, ); service.start().unwrap(); TestContext { service } @@ -162,7 +179,12 @@ impl TestContextBuilder { } struct TestContext { - service: Service, + service: Service< + MockTransactionPool, + MockBlockProducer, + MockBlockImporter, + InMemoryPredefinedBlocks, + >, } impl TestContext { @@ -332,6 +354,8 @@ async fn remove_skipped_transactions() { let p2p_port = generate_p2p_port(); + let predefined_blocks: InMemoryPredefinedBlocks = HashMap::new().into(); + let mut task = MainTask::new( &BlockHeader::new_block(BlockHeight::from(1u32), Tai64::now()), config, @@ -339,6 +363,7 @@ async fn remove_skipped_transactions() { block_producer, block_importer, p2p_port, + predefined_blocks, ); assert!(task.produce_next_block().await.is_ok()); @@ -379,6 +404,8 @@ async fn does_not_produce_when_txpool_empty_in_instant_mode() { let p2p_port = generate_p2p_port(); + let predefined_blocks: InMemoryPredefinedBlocks = HashMap::new().into(); + let mut task = MainTask::new( &BlockHeader::new_block(BlockHeight::from(1u32), Tai64::now()), config, @@ -386,6 +413,7 @@ async fn does_not_produce_when_txpool_empty_in_instant_mode() { block_producer, block_importer, p2p_port, + predefined_blocks, ); // simulate some txpool event to see if any block production is erroneously triggered @@ -397,3 +425,188 @@ fn test_signing_key() -> Secret { let secret_key = SecretKey::random(&mut rng); Secret::new(secret_key.into()) } + +#[derive(Debug, PartialEq)] +enum FakeProducedBlock { + Predefined(Block), + New(BlockHeight, Tai64), +} + +struct FakeBlockProducer { + block_sender: tokio::sync::mpsc::Sender, +} + +impl FakeBlockProducer { + fn new() -> (Self, tokio::sync::mpsc::Receiver) { + let (block_sender, receiver) = tokio::sync::mpsc::channel(100); + (Self { block_sender }, receiver) + } +} + +#[async_trait] +impl BlockProducer for FakeBlockProducer { + async fn produce_and_execute_block( + &self, + height: BlockHeight, + block_time: Tai64, + _source: TransactionsSource, + ) -> anyhow::Result> { + self.block_sender + .send(FakeProducedBlock::New(height, block_time)) + .await + .unwrap(); + Ok(UncommittedResult::new( + ExecutionResult { + block: Default::default(), + skipped_transactions: Default::default(), + tx_status: Default::default(), + events: Default::default(), + }, + Default::default(), + )) + } + + async fn produce_predefined_block( + &self, + block: &Block, + ) -> anyhow::Result> { + self.block_sender + .send(FakeProducedBlock::Predefined(block.clone())) + .await + .unwrap(); + Ok(UncommittedResult::new( + ExecutionResult { + block: block.clone(), + skipped_transactions: Default::default(), + tx_status: Default::default(), + events: Default::default(), + }, + Default::default(), + )) + } +} + +fn block_for_height(height: u32) -> Block { + let mut header = PartialBlockHeader::default(); + header.consensus.height = height.into(); + let transactions = vec![]; + Block::new(header, transactions, Default::default(), Default::default()).unwrap() +} + +#[tokio::test] +async fn consensus_service__run__will_include_sequential_predefined_blocks_before_new_blocks( +) { + // given + let blocks: [(BlockHeight, Block); 3] = [ + (2u32.into(), block_for_height(2)), + (3u32.into(), block_for_height(3)), + (4u32.into(), block_for_height(4)), + ]; + let blocks_map: HashMap<_, _> = blocks.clone().into_iter().collect(); + let (block_producer, mut block_receiver) = FakeBlockProducer::new(); + let last_block = BlockHeader::new_block(BlockHeight::from(1u32), Tai64::now()); + let config = Config { + trigger: Trigger::Interval { + block_time: Duration::from_millis(100), + }, + signing_key: Some(test_signing_key()), + metrics: false, + ..Default::default() + }; + let mut block_importer = MockBlockImporter::default(); + block_importer.expect_commit_result().returning(|_| Ok(())); + block_importer + .expect_block_stream() + .returning(|| Box::pin(tokio_stream::empty())); + let mut rng = StdRng::seed_from_u64(0); + let tx = make_tx(&mut rng); + let TxPoolContext { txpool, .. } = MockTransactionPool::new_with_txs(vec![tx]); + let task = MainTask::new( + &last_block, + config, + txpool, + block_producer, + block_importer, + generate_p2p_port(), + InMemoryPredefinedBlocks::new(blocks_map), + ); + + // when + let service = ServiceRunner::new(task); + service.start().unwrap(); + + // then + for (_, block) in blocks { + let expected = FakeProducedBlock::Predefined(block); + let actual = block_receiver.recv().await.unwrap(); + assert_eq!(expected, actual); + } + let maybe_produced_block = block_receiver.recv().await.unwrap(); + assert!(matches! { + maybe_produced_block, + FakeProducedBlock::New(_, _) + }); +} + +#[tokio::test] +async fn consensus_service__run__will_insert_predefined_blocks_in_correct_order() { + // given + let predefined_blocks: &[Option<(BlockHeight, Block)>] = &[ + None, + Some((3u32.into(), block_for_height(3))), + None, + Some((5u32.into(), block_for_height(5))), + None, + Some((7u32.into(), block_for_height(7))), + None, + ]; + let predefined_blocks_map: HashMap<_, _> = predefined_blocks + .iter() + .flat_map(|x| x.to_owned()) + .collect(); + let (block_producer, mut block_receiver) = FakeBlockProducer::new(); + let last_block = BlockHeader::new_block(BlockHeight::from(1u32), Tai64::now()); + let config = Config { + trigger: Trigger::Interval { + block_time: Duration::from_millis(100), + }, + signing_key: Some(test_signing_key()), + metrics: false, + ..Default::default() + }; + let mut block_importer = MockBlockImporter::default(); + block_importer.expect_commit_result().returning(|_| Ok(())); + block_importer + .expect_block_stream() + .returning(|| Box::pin(tokio_stream::empty())); + let mut rng = StdRng::seed_from_u64(0); + let tx = make_tx(&mut rng); + let TxPoolContext { txpool, .. } = MockTransactionPool::new_with_txs(vec![tx]); + let task = MainTask::new( + &last_block, + config, + txpool, + block_producer, + block_importer, + generate_p2p_port(), + InMemoryPredefinedBlocks::new(predefined_blocks_map), + ); + + // when + let service = ServiceRunner::new(task); + service.start().unwrap(); + + // then + for maybe_predefined in predefined_blocks { + let actual = block_receiver.recv().await.unwrap(); + if let Some((_, block)) = maybe_predefined { + let expected = FakeProducedBlock::Predefined(block.clone()); + assert_eq!(expected, actual); + } else { + assert!(matches! { + actual, + FakeProducedBlock::New(_, _) + }); + } + } +} diff --git a/crates/services/producer/Cargo.toml b/crates/services/producer/Cargo.toml index 51c7b3aee53..5a6e8b91fbc 100644 --- a/crates/services/producer/Cargo.toml +++ b/crates/services/producer/Cargo.toml @@ -24,6 +24,7 @@ tracing = { workspace = true } fuel-core-producer = { path = "", features = ["test-helpers"] } fuel-core-trace = { path = "../../trace" } fuel-core-types = { path = "../../types", features = ["test-helpers"] } +proptest = { workspace = true } rand = { workspace = true } [features] diff --git a/crates/services/producer/proptest-regressions/block_producer/tests.txt b/crates/services/producer/proptest-regressions/block_producer/tests.txt new file mode 100644 index 00000000000..bd2f192bf33 --- /dev/null +++ b/crates/services/producer/proptest-regressions/block_producer/tests.txt @@ -0,0 +1,7 @@ +# Seeds for failure cases proptest has generated in the past. It is +# automatically read and these particular cases re-run before any +# novel cases are generated. +# +# It is recommended to check this file in to source control so that +# everyone who runs the test benefits from these saved cases. +cc dd33a239ab8e44231c0bdfc62e5712d60afc5404211bdce6bd6404ea6ed143fc # shrinks to block = V1(BlockV1 { header: V1(BlockHeaderV1 { application: ApplicationHeader { da_height: DaBlockHeight(1), consensus_parameters_version: 0, state_transition_bytecode_version: 7, generated: GeneratedApplicationFields { transactions_count: 1, message_receipt_count: 0, transactions_root: 167ff38d512ce7cfc6a39f25bf541c65d35b05a50226ab5c43179efc9a3e92e0, message_outbox_root: e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855, event_inbox_root: 0000000000000000000000000000000000000000000000000000000000000000 } }, consensus: ConsensusHeader { prev_root: 0000000000000000000000000000000000000000000000000000000000000000, height: 00000001, time: Tai64(4611686018427387914), generated: GeneratedConsensusFields { application_hash: 706ec8c8b182f5cc589bda36f4a218e0e4fc5e850bd21ea48194bc5d7b21e827 } }, metadata: Some(BlockHeaderMetadata { id: BlockId(f6e071429d32c8d90c22552c05d489e89d0c4a3d17d4a753efc16210b917882e) }) }), transactions: [Mint(Mint { tx_pointer: TxPointer { block_height: 00000000, tx_index: 0 }, input_contract: Contract { utxo_id: UtxoId { tx_id: 0000000000000000000000000000000000000000000000000000000000000000, output_index: 0 }, balance_root: 0000000000000000000000000000000000000000000000000000000000000000, state_root: 0000000000000000000000000000000000000000000000000000000000000000, tx_pointer: TxPointer { block_height: 00000000, tx_index: 0 }, contract_id: 0000000000000000000000000000000000000000000000000000000000000000 }, output_contract: Contract { input_index: 0, balance_root: 0000000000000000000000000000000000000000000000000000000000000000, state_root: 0000000000000000000000000000000000000000000000000000000000000000 }, mint_amount: 0, mint_asset_id: 0000000000000000000000000000000000000000000000000000000000000000, gas_price: 0, metadata: None })] }) diff --git a/crates/services/producer/src/block_producer.rs b/crates/services/producer/src/block_producer.rs index e86120473a2..4ccb7d57d7e 100644 --- a/crates/services/producer/src/block_producer.rs +++ b/crates/services/producer/src/block_producer.rs @@ -17,6 +17,7 @@ use fuel_core_storage::transactional::{ }; use fuel_core_types::{ blockchain::{ + block::Block, header::{ ApplicationHeader, ConsensusHeader, @@ -24,7 +25,13 @@ use fuel_core_types::{ }, primitives::DaBlockHeight, }, - fuel_tx::Transaction, + fuel_tx::{ + field::{ + InputContract, + MintGasPrice, + }, + Transaction, + }, fuel_types::{ BlockHeight, Bytes32, @@ -88,6 +95,64 @@ pub struct Producer + Producer +where + ViewProvider: AtomicView + 'static, + ViewProvider::LatestView: BlockProducerDatabase, + ConsensusProvider: ConsensusParametersProvider, +{ + pub async fn produce_and_execute_predefined( + &self, + predefined_block: &Block, + ) -> anyhow::Result> + where + Executor: ports::BlockProducer> + 'static, + { + let _production_guard = self.lock.lock().await; + + let mut transactions_source = predefined_block.transactions().to_vec(); + + let height = predefined_block.header().consensus().height; + + let block_time = predefined_block.header().consensus().time; + + let da_height = predefined_block.header().application().da_height; + + let header_to_produce = self + .new_header_with_da_height(height, block_time, da_height) + .await?; + + let maybe_mint_tx = transactions_source.pop(); + let mint_tx = + maybe_mint_tx + .and_then(|tx| tx.as_mint().cloned()) + .ok_or(anyhow!( + "The last transaction in the block should be a mint transaction" + ))?; + + let gas_price = *mint_tx.gas_price(); + let coinbase_recipient = mint_tx.input_contract().contract_id; + + let component = Components { + header_to_produce, + transactions_source, + coinbase_recipient, + gas_price, + }; + + let result = self + .executor + .produce_without_commit(component) + .map_err(Into::::into) + .with_context(|| { + format!("Failed to produce block {height:?} due to execution failure") + })?; + + debug!("Produced block with result: {:?}", result.result()); + Ok(result) + } +} impl Producer where @@ -296,7 +361,17 @@ where Ok(block_header) } - + /// Create the header for a new block at the provided height + async fn new_header_with_da_height( + &self, + height: BlockHeight, + block_time: Tai64, + da_height: DaBlockHeight, + ) -> anyhow::Result { + let mut block_header = self._new_header(height, block_time)?; + block_header.application.da_height = da_height; + Ok(block_header) + } async fn select_new_da_height( &self, gas_limit: u64, diff --git a/crates/services/producer/src/block_producer/tests.rs b/crates/services/producer/src/block_producer/tests.rs index fb2c7859f24..14cfdc63f2a 100644 --- a/crates/services/producer/src/block_producer/tests.rs +++ b/crates/services/producer/src/block_producer/tests.rs @@ -6,6 +6,7 @@ use crate::{ GasPriceProvider, MockConsensusParametersProvider, }, + Bytes32, Error, }, mocks::{ @@ -23,6 +24,7 @@ use fuel_core_producer as _; use fuel_core_types::{ blockchain::{ block::{ + Block, CompressedBlock, PartialFuelBlock, }, @@ -33,7 +35,14 @@ use fuel_core_types::{ }, primitives::DaBlockHeight, }, - fuel_tx::ConsensusParameters, + fuel_tx, + fuel_tx::{ + field::InputContract, + ConsensusParameters, + Mint, + Script, + Transaction, + }, fuel_types::BlockHeight, services::executor::Error as ExecutorError, tai64::Tai64, @@ -534,6 +543,157 @@ mod produce_and_execute_block_txpool { } } +use fuel_core_types::fuel_tx::field::MintGasPrice; +use proptest::{ + prop_compose, + proptest, +}; + +prop_compose! { + fn arb_block()(height in 1..255u8, da_height in 1..255u64, gas_price: u64, coinbase_recipient: [u8; 32], num_txs in 0..100u32) -> Block { + let mut txs : Vec<_> = (0..num_txs).map(|_| Transaction::Script(Script::default())).collect(); + let mut inner_mint = Mint::default(); + *inner_mint.gas_price_mut() = gas_price; + *inner_mint.input_contract_mut() = fuel_tx::input::contract::Contract{ + contract_id: coinbase_recipient.into(), + ..Default::default() + }; + + let mint = Transaction::Mint(inner_mint); + txs.push(mint); + let header = PartialBlockHeader { + consensus: ConsensusHeader { + height: (height as u32).into(), + ..Default::default() + }, + application: ApplicationHeader { + da_height: DaBlockHeight(da_height), + ..Default::default() + }, + }; + let outbox_message_ids = vec![]; + let event_inbox_root = Bytes32::default(); + Block::new(header, txs, &outbox_message_ids, event_inbox_root).unwrap() + } +} + +#[allow(clippy::arithmetic_side_effects)] +fn ctx_for_block( + block: &Block, + executor: MockExecutorWithCapture, +) -> TestContext> { + let prev_height = block.header().height().pred().unwrap(); + let prev_da_height = block.header().da_height.as_u64() - 1; + TestContextBuilder::new() + .with_prev_height(prev_height) + .with_prev_da_height(prev_da_height.into()) + .build_with_executor(executor) +} + +// gas_price +proptest! { + #[test] + fn produce_and_execute_predefined_block__contains_expected_gas_price(block in arb_block()) { + let rt = multithreaded_runtime(); + + // given + let executor = MockExecutorWithCapture::default(); + let ctx = ctx_for_block(&block, executor.clone()); + + //when + let _ = rt.block_on(ctx.producer().produce_and_execute_predefined(&block)).unwrap(); + + // then + let expected_gas_price = *block + .transactions().last().and_then(|tx| tx.as_mint()).unwrap().gas_price(); + let captured = executor.captured.lock().unwrap(); + let actual = captured.as_ref().unwrap().gas_price; + assert_eq!(expected_gas_price, actual); + } + + // time + #[test] + fn produce_and_execute_predefined_block__contains_expected_time(block in arb_block()) { + let rt = multithreaded_runtime(); + + // given + let executor = MockExecutorWithCapture::default(); + let ctx = ctx_for_block(&block, executor.clone()); + + //when + let _ = rt.block_on(ctx.producer().produce_and_execute_predefined(&block)).unwrap(); + + // then + let expected_time = block.header().consensus().time; + let captured = executor.captured.lock().unwrap(); + let actual = captured.as_ref().unwrap().header_to_produce.consensus.time; + assert_eq!(expected_time, actual); + } + + // coinbase + #[test] + fn produce_and_execute_predefined_block__contains_expected_coinbase_recipient(block in arb_block()) { + let rt = multithreaded_runtime(); + + // given + let executor = MockExecutorWithCapture::default(); + let ctx = ctx_for_block(&block, executor.clone()); + + //when + let _ = rt.block_on(ctx.producer().produce_and_execute_predefined(&block)).unwrap(); + + // then + let expected_coinbase = block.transactions().last().and_then(|tx| tx.as_mint()).unwrap().input_contract().contract_id; + let captured = executor.captured.lock().unwrap(); + let actual = captured.as_ref().unwrap().coinbase_recipient; + assert_eq!(expected_coinbase, actual); + } + + // DA height + #[test] + fn produce_and_execute_predefined_block__contains_expected_da_height(block in arb_block()) { + let rt = multithreaded_runtime(); + + // given + let executor = MockExecutorWithCapture::default(); + let ctx = ctx_for_block(&block, executor.clone()); + + //when + let _ = rt.block_on(ctx.producer().produce_and_execute_predefined(&block)).unwrap(); + + // then + let expected_da_height = block.header().application().da_height; + let captured = executor.captured.lock().unwrap(); + let actual = captured.as_ref().unwrap().header_to_produce.application.da_height; + assert_eq!(expected_da_height, actual); + } + + #[test] + fn produce_and_execute_predefined_block__do_not_include_original_mint_in_txs_source(block in arb_block()) { + let rt = multithreaded_runtime(); + + // given + let executor = MockExecutorWithCapture::default(); + let ctx = ctx_for_block(&block, executor.clone()); + + //when + let _ = rt.block_on(ctx.producer().produce_and_execute_predefined(&block)).unwrap(); + + // then + let captured = executor.captured.lock().unwrap(); + let txs_source = &captured.as_ref().unwrap().transactions_source; + let has_a_mint = txs_source.iter().any(|tx| matches!(tx, Transaction::Mint(_))); + assert!(!has_a_mint); + } +} + +fn multithreaded_runtime() -> tokio::runtime::Runtime { + tokio::runtime::Builder::new_multi_thread() + .enable_all() + .build() + .unwrap() +} + struct TestContext { config: Config, db: MockDb, @@ -727,4 +887,46 @@ impl TestContextBuilder { ..TestContext::default_from_db(db) } } + + fn build_with_executor(&self, executor: Ex) -> TestContext { + let da_height = self.prev_da_height; + let previous_block = PartialFuelBlock { + header: PartialBlockHeader { + application: ApplicationHeader { + da_height, + ..Default::default() + }, + consensus: ConsensusHeader { + height: self.prev_height, + ..Default::default() + }, + }, + transactions: vec![], + } + .generate(&[], Default::default()) + .unwrap() + .compress(&Default::default()); + + let db = MockDb { + blocks: Arc::new(Mutex::new( + vec![(self.prev_height, previous_block)] + .into_iter() + .collect(), + )), + consensus_parameters_version: 0, + state_transition_bytecode_version: 0, + }; + + let mock_relayer = MockRelayer { + latest_block_height: self.latest_block_height, + latest_da_blocks_with_costs: self.blocks_with_gas_costs.clone(), + ..Default::default() + }; + + TestContext { + relayer: mock_relayer, + block_gas_limit: self.block_gas_limit.unwrap_or_default(), + ..TestContext::default_from_db_and_executor(db, executor) + } + } } diff --git a/crates/services/producer/src/mocks.rs b/crates/services/producer/src/mocks.rs index 95fe741c34c..191bfdb20ed 100644 --- a/crates/services/producer/src/mocks.rs +++ b/crates/services/producer/src/mocks.rs @@ -24,6 +24,7 @@ use fuel_core_types::{ }, primitives::DaBlockHeight, }, + fuel_tx::Transaction, fuel_types::{ Address, BlockHeight, @@ -50,7 +51,6 @@ use std::{ Mutex, }, }; - // TODO: Replace mocks with `mockall`. #[derive(Default, Clone)] @@ -107,7 +107,7 @@ impl AsRef for MockDb { } } -fn to_block(component: &Components>) -> Block { +fn arc_pool_tx_comp_to_block(component: &Components>) -> Block { let transactions = component .transactions_source .clone() @@ -123,12 +123,23 @@ fn to_block(component: &Components>) -> Block { .unwrap() } +fn tx_comp_to_block(component: &Components>) -> Block { + let transactions = component.transactions_source.clone(); + Block::new( + component.header_to_produce, + transactions, + &[], + Default::default(), + ) + .unwrap() +} + impl BlockProducer> for MockExecutor { fn produce_without_commit( &self, component: Components>, ) -> ExecutorResult> { - let block = to_block(&component); + let block = arc_pool_tx_comp_to_block(&component); // simulate executor inserting a block let mut block_db = self.0.blocks.lock().unwrap(); block_db.insert( @@ -159,7 +170,7 @@ impl BlockProducer> for FailingMockExecutor { if let Some(err) = err.take() { Err(err) } else { - let block = to_block(&component); + let block = arc_pool_tx_comp_to_block(&component); Ok(UncommittedResult::new( ExecutionResult { block, @@ -174,16 +185,35 @@ impl BlockProducer> for FailingMockExecutor { } #[derive(Clone)] -pub struct MockExecutorWithCapture { - pub captured: Arc>>>>, +pub struct MockExecutorWithCapture { + pub captured: Arc>>>>, } -impl BlockProducer> for MockExecutorWithCapture { +impl BlockProducer> for MockExecutorWithCapture { fn produce_without_commit( &self, component: Components>, ) -> ExecutorResult> { - let block = to_block(&component); + let block = arc_pool_tx_comp_to_block(&component); + *self.captured.lock().unwrap() = Some(component); + Ok(UncommittedResult::new( + ExecutionResult { + block, + skipped_transactions: vec![], + tx_status: vec![], + events: vec![], + }, + Default::default(), + )) + } +} + +impl BlockProducer> for MockExecutorWithCapture { + fn produce_without_commit( + &self, + component: Components>, + ) -> ExecutorResult> { + let block = tx_comp_to_block(&component); *self.captured.lock().unwrap() = Some(component); Ok(UncommittedResult::new( ExecutionResult { @@ -197,7 +227,7 @@ impl BlockProducer> for MockExecutorWithCapture { } } -impl Default for MockExecutorWithCapture { +impl Default for MockExecutorWithCapture { fn default() -> Self { Self { captured: Arc::new(Mutex::new(None)), diff --git a/tests/tests/snapshot.rs b/tests/tests/snapshot.rs index 56b6aaa74a3..33a2848dc4a 100644 --- a/tests/tests/snapshot.rs +++ b/tests/tests/snapshot.rs @@ -10,7 +10,10 @@ use fuel_core::{ FuelService, }, }; -use fuel_core_poa::ports::Database; +use fuel_core_poa::{ + ports::Database, + Trigger, +}; use fuel_core_storage::transactional::AtomicView; use fuel_core_types::blockchain::primitives::DaBlockHeight; use rand::{ @@ -36,7 +39,10 @@ async fn loads_snapshot() { }), ..StateConfig::randomize(&mut rng) }; - let config = Config::local_node_with_state_config(starting_state.clone()); + // Disable block production + let mut config = Config::local_node_with_state_config(starting_state.clone()); + config.debug = false; + config.block_production = Trigger::Never; // setup server & client let _ = FuelService::from_combined_database(db.clone(), config)