diff --git a/consensus/consensus-types/src/block.rs b/consensus/consensus-types/src/block.rs index e6ad6c6c15a66..e80fbb7c83347 100644 --- a/consensus/consensus-types/src/block.rs +++ b/consensus/consensus-types/src/block.rs @@ -360,6 +360,11 @@ impl Block { "Reconfiguration suffix should not carry payload" ); } + + if let Some(payload) = self.payload() { + payload.verify_epoch(self.epoch())?; + } + if let Some(failed_authors) = self.block_data().failed_authors() { // when validating for being well formed, // allow for missing failed authors, diff --git a/consensus/consensus-types/src/common.rs b/consensus/consensus-types/src/common.rs index 869a40c683cc4..f57c5c50cbf5d 100644 --- a/consensus/consensus-types/src/common.rs +++ b/consensus/consensus-types/src/common.rs @@ -6,6 +6,7 @@ use crate::{ payload::{OptQuorumStorePayload, PayloadExecutionLimit}, proof_of_store::{BatchInfo, ProofCache, ProofOfStore}, }; +use anyhow::ensure; use aptos_crypto::{ hash::{CryptoHash, CryptoHasher}, HashValue, @@ -487,6 +488,23 @@ impl Payload { Ok(()) } + pub fn verify_inline_batches<'a>( + inline_batches: impl Iterator)>, + ) -> anyhow::Result<()> { + for (batch, payload) in inline_batches { + // TODO: Can cloning be avoided here? + let computed_digest = BatchPayload::new(batch.author(), payload.clone()).hash(); + ensure!( + computed_digest == *batch.digest(), + "Hash of the received inline batch doesn't match the digest value for batch {}: {} != {}", + batch, + computed_digest, + batch.digest() + ); + } + Ok(()) + } + pub fn verify( &self, validator: &ValidatorVerifier, @@ -505,20 +523,20 @@ impl Payload { ), (true, Payload::QuorumStoreInlineHybrid(inline_batches, proof_with_data, _)) => { Self::verify_with_cache(&proof_with_data.proofs, validator, proof_cache)?; - for (batch, payload) in inline_batches.iter() { - // TODO: Can cloning be avoided here? - if BatchPayload::new(batch.author(), payload.clone()).hash() != *batch.digest() - { - return Err(anyhow::anyhow!( - "Hash of the received inline batch doesn't match the digest value", - )); - } - } + Self::verify_inline_batches( + inline_batches.iter().map(|(info, txns)| (info, txns)), + )?; Ok(()) }, (true, Payload::OptQuorumStore(opt_quorum_store)) => { let proof_with_data = opt_quorum_store.proof_with_data(); Self::verify_with_cache(&proof_with_data.batch_summary, validator, proof_cache)?; + Self::verify_inline_batches( + opt_quorum_store + .inline_batches() + .iter() + .map(|batch| (batch.info(), batch.transactions())), + )?; Ok(()) }, (_, _) => Err(anyhow::anyhow!( @@ -528,6 +546,42 @@ impl Payload { )), } } + + pub(crate) fn verify_epoch(&self, epoch: u64) -> anyhow::Result<()> { + match self { + Payload::DirectMempool(_) => return Ok(()), + Payload::InQuorumStore(proof_with_data) => { + ensure!( + proof_with_data.proofs.iter().all(|p| p.epoch() == epoch), + "Payload epoch doesn't match given epoch" + ); + }, + Payload::InQuorumStoreWithLimit(proof_with_data_with_txn_limit) => { + ensure!( + proof_with_data_with_txn_limit + .proof_with_data + .proofs + .iter() + .all(|p| p.epoch() == epoch), + "Payload epoch doesn't match given epoch" + ); + }, + Payload::QuorumStoreInlineHybrid(inline_batches, proof_with_data, _) => { + ensure!( + proof_with_data.proofs.iter().all(|p| p.epoch() == epoch), + "Payload proof epoch doesn't match given epoch" + ); + ensure!( + inline_batches.iter().all(|b| b.0.epoch() == epoch), + "Payload inline batch epoch doesn't match given epoch" + ) + }, + Payload::OptQuorumStore(opt_quorum_store_payload) => { + opt_quorum_store_payload.check_epoch(epoch)?; + }, + }; + Ok(()) + } } impl fmt::Display for Payload { diff --git a/consensus/consensus-types/src/payload.rs b/consensus/consensus-types/src/payload.rs index 9514c8d00907c..d4dd7b26db86a 100644 --- a/consensus/consensus-types/src/payload.rs +++ b/consensus/consensus-types/src/payload.rs @@ -2,6 +2,7 @@ // SPDX-License-Identifier: Apache-2.0 use crate::proof_of_store::{BatchInfo, ProofOfStore}; +use anyhow::ensure; use aptos_executor_types::ExecutorResult; use aptos_infallible::Mutex; use aptos_types::{transaction::SignedTransaction, PeerId}; @@ -33,6 +34,7 @@ pub trait TDataInfo { pub struct DataFetchFut { pub iteration: u32, + pub responders: Vec>>>, pub fut: Shared>>>, } @@ -186,6 +188,10 @@ impl InlineBatch { pub fn info(&self) -> &BatchInfo { &self.batch_info } + + pub fn transactions(&self) -> &Vec { + &self.transactions + } } #[derive(Serialize, Deserialize, Debug, Clone, PartialEq, Eq)] @@ -286,6 +292,26 @@ impl OptQuorumStorePayloadV1 { PayloadExecutionLimit::MaxTransactionsToExecute(max) => Some(max), } } + + pub fn check_epoch(&self, epoch: u64) -> anyhow::Result<()> { + ensure!( + self.inline_batches + .iter() + .all(|b| b.info().epoch() == epoch), + "OptQS InlineBatch epoch doesn't match given epoch" + ); + ensure!( + self.opt_batches.iter().all(|b| b.info().epoch() == epoch), + "OptQS OptBatch epoch doesn't match given epoch" + ); + + ensure!( + self.proofs.iter().all(|b| b.info().epoch() == epoch), + "OptQS Proof epoch doesn't match given epoch" + ); + + Ok(()) + } } #[derive(Serialize, Deserialize, Debug, Clone, PartialEq, Eq)] diff --git a/consensus/consensus-types/src/pipelined_block.rs b/consensus/consensus-types/src/pipelined_block.rs index b0cc069aff768..e3a0856996576 100644 --- a/consensus/consensus-types/src/pipelined_block.rs +++ b/consensus/consensus-types/src/pipelined_block.rs @@ -105,6 +105,7 @@ impl PipelineFutures { } pub struct PipelineInputTx { + pub qc_tx: Option>>, pub rand_tx: Option>>, pub order_vote_tx: Option>, pub order_proof_tx: Option>, @@ -112,6 +113,7 @@ pub struct PipelineInputTx { } pub struct PipelineInputRx { + pub qc_rx: oneshot::Receiver>, pub rand_rx: oneshot::Receiver>, pub order_vote_rx: oneshot::Receiver<()>, pub order_proof_fut: TaskFuture<()>, @@ -145,6 +147,8 @@ pub struct PipelinedBlock { pipeline_tx: Arc>>, #[derivative(PartialEq = "ignore")] pipeline_abort_handle: Arc>>>, + #[derivative(PartialEq = "ignore")] + block_qc: Arc>>>, } impl Serialize for PipelinedBlock { @@ -286,6 +290,13 @@ impl PipelinedBlock { .take() .expect("pre_commit_result_rx missing.") } + + pub fn set_qc(&self, qc: Arc) { + *self.block_qc.lock() = Some(qc.clone()); + if let Some(tx) = self.pipeline_tx().lock().as_mut() { + tx.qc_tx.take().map(|tx| tx.send(qc)); + } + } } impl Debug for PipelinedBlock { @@ -317,6 +328,7 @@ impl PipelinedBlock { pipeline_futs: Arc::new(Mutex::new(None)), pipeline_tx: Arc::new(Mutex::new(None)), pipeline_abort_handle: Arc::new(Mutex::new(None)), + block_qc: Arc::new(Mutex::new(None)), } } @@ -422,6 +434,10 @@ impl PipelinedBlock { pub fn get_execution_summary(&self) -> Option { self.execution_summary.get().cloned() } + + pub fn qc(&self) -> Option> { + self.block_qc.lock().clone() + } } /// Pipeline related functions diff --git a/consensus/src/block_preparer.rs b/consensus/src/block_preparer.rs index aa3d82706f32d..5db76e27ef47e 100644 --- a/consensus/src/block_preparer.rs +++ b/consensus/src/block_preparer.rs @@ -8,11 +8,12 @@ use crate::{ transaction_filter::TransactionFilter, transaction_shuffler::TransactionShuffler, }; -use aptos_consensus_types::block::Block; +use aptos_consensus_types::{block::Block, quorum_cert::QuorumCert}; use aptos_executor_types::ExecutorResult; use aptos_types::transaction::SignedTransaction; use fail::fail_point; -use std::{sync::Arc, time::Instant}; +use futures::future::Shared; +use std::{future::Future, sync::Arc, time::Instant}; pub struct BlockPreparer { payload_manager: Arc, @@ -36,7 +37,11 @@ impl BlockPreparer { } } - pub async fn prepare_block(&self, block: &Block) -> ExecutorResult> { + pub async fn prepare_block( + &self, + block: &Block, + block_qc_fut: Shared>>>, + ) -> ExecutorResult> { fail_point!("consensus::prepare_block", |_| { use aptos_executor_types::ExecutorError; use std::{thread, time::Duration}; @@ -44,8 +49,17 @@ impl BlockPreparer { Err(ExecutorError::CouldNotGetData) }); let start_time = Instant::now(); - let (txns, max_txns_from_block_to_execute) = - self.payload_manager.get_transactions(block).await?; + + let (txns, max_txns_from_block_to_execute) = tokio::select! { + // Poll the block qc future until a QC is received. Ignore None outcomes. + Some(qc) = block_qc_fut => { + let block_voters = Some(qc.ledger_info().get_voters_bitvec().clone()); + self.payload_manager.get_transactions(block, block_voters).await + }, + result = self.payload_manager.get_transactions(block, None) => { + result + } + }?; let txn_filter = self.txn_filter.clone(); let txn_deduper = self.txn_deduper.clone(); let txn_shuffler = self.txn_shuffler.clone(); diff --git a/consensus/src/block_storage/block_store.rs b/consensus/src/block_storage/block_store.rs index b8c8146121e0a..9e223e252a63d 100644 --- a/consensus/src/block_storage/block_store.rs +++ b/consensus/src/block_storage/block_store.rs @@ -449,6 +449,7 @@ impl BlockStore { pipelined_block.block().timestamp_usecs(), BlockStage::QC_ADDED, ); + pipelined_block.set_qc(Arc::new(qc.clone())); }, None => bail!("Insert {} without having the block in store first", qc), }; @@ -524,7 +525,8 @@ impl BlockStore { pub async fn wait_for_payload(&self, block: &Block, deadline: Duration) -> anyhow::Result<()> { let duration = deadline.saturating_sub(self.time_service.get_current_timestamp()); - tokio::time::timeout(duration, self.payload_manager.get_transactions(block)).await??; + tokio::time::timeout(duration, self.payload_manager.get_transactions(block, None)) + .await??; Ok(()) } diff --git a/consensus/src/dag/tests/helpers.rs b/consensus/src/dag/tests/helpers.rs index dab407099c303..281ace993dc0d 100644 --- a/consensus/src/dag/tests/helpers.rs +++ b/consensus/src/dag/tests/helpers.rs @@ -34,6 +34,7 @@ impl TPayloadManager for MockPayloadManager { async fn get_transactions( &self, _block: &Block, + _block_signers: Option, ) -> ExecutorResult<(Vec, Option)> { Ok((Vec::new(), None)) } diff --git a/consensus/src/epoch_manager.rs b/consensus/src/epoch_manager.rs index a1be0f289de64..14ed035334525 100644 --- a/consensus/src/epoch_manager.rs +++ b/consensus/src/epoch_manager.rs @@ -699,7 +699,6 @@ impl EpochManager

{ network_sender, epoch_state.verifier.clone(), self.proof_cache.clone(), - self.config.safety_rules.backend.clone(), self.quorum_store_storage.clone(), !consensus_config.is_dag_enabled(), consensus_key, diff --git a/consensus/src/execution_pipeline.rs b/consensus/src/execution_pipeline.rs index 3a535f42b5e36..2836022fec75c 100644 --- a/consensus/src/execution_pipeline.rs +++ b/consensus/src/execution_pipeline.rs @@ -10,7 +10,9 @@ use crate::{ pipeline::pipeline_phase::CountedRequest, state_computer::StateComputeResultFut, }; -use aptos_consensus_types::{block::Block, pipeline_execution_result::PipelineExecutionResult}; +use aptos_consensus_types::{ + block::Block, pipeline_execution_result::PipelineExecutionResult, quorum_cert::QuorumCert, +}; use aptos_crypto::HashValue; use aptos_executor_types::{ state_compute_result::StateComputeResult, BlockExecutorTrait, ExecutorError, ExecutorResult, @@ -25,7 +27,7 @@ use aptos_types::{ }, }; use fail::fail_point; -use futures::future::BoxFuture; +use futures::{future::BoxFuture, FutureExt}; use once_cell::sync::Lazy; use rayon::iter::{IndexedParallelIterator, IntoParallelIterator, ParallelIterator}; use std::{ @@ -92,6 +94,7 @@ impl ExecutionPipeline { block: Block, metadata: BlockMetadataExt, parent_block_id: HashValue, + block_qc: Option>, txn_generator: BlockPreparer, block_executor_onchain_config: BlockExecutorConfigFromOnchain, pre_commit_hook: PreCommitHook, @@ -110,6 +113,7 @@ impl ExecutionPipeline { command_creation_time: Instant::now(), pre_commit_hook, lifetime_guard, + block_qc, }) .expect("Failed to send block to execution pipeline."); @@ -139,10 +143,13 @@ impl ExecutionPipeline { result_tx, command_creation_time, lifetime_guard, + block_qc, } = command; counters::PREPARE_BLOCK_WAIT_TIME.observe_duration(command_creation_time.elapsed()); debug!("prepare_block received block {}.", block.id()); - let input_txns = block_preparer.prepare_block(&block).await; + let input_txns = block_preparer + .prepare_block(&block, async { block_qc }.shared()) + .await; if let Err(e) = input_txns { result_tx .send(Err(e)) @@ -375,6 +382,7 @@ struct PrepareBlockCommand { result_tx: oneshot::Sender>, command_creation_time: Instant, lifetime_guard: CountedRequest<()>, + block_qc: Option>, } struct ExecuteBlockCommand { diff --git a/consensus/src/payload_manager.rs b/consensus/src/payload_manager.rs index 16f3e305ea585..86a537edb940b 100644 --- a/consensus/src/payload_manager.rs +++ b/consensus/src/payload_manager.rs @@ -56,6 +56,7 @@ pub trait TPayloadManager: Send + Sync { async fn get_transactions( &self, block: &Block, + block_voters: Option, ) -> ExecutorResult<(Vec, Option)>; } @@ -81,6 +82,7 @@ impl TPayloadManager for DirectMempoolPayloadManager { async fn get_transactions( &self, block: &Block, + _block_signers: Option, ) -> ExecutorResult<(Vec, Option)> { let Some(payload) = block.payload() else { return Ok((Vec::new(), None)); @@ -126,7 +128,7 @@ impl QuorumStorePayloadManager { } fn request_transactions( - batches: Vec<(BatchInfo, Vec)>, + batches: Vec<(BatchInfo, Arc>>)>, block_timestamp: u64, batch_reader: Arc, ) -> Vec<( @@ -146,7 +148,7 @@ impl QuorumStorePayloadManager { batch_reader.get_batch( *batch_info.digest(), batch_info.expiration(), - responders, + responders.clone(), ), )); } else { @@ -226,7 +228,7 @@ impl TPayloadManager for QuorumStorePayloadManager { .map(|proof| { ( proof.info().clone(), - proof.shuffled_signers(&self.ordered_authors), + Arc::new(Mutex::new(proof.shuffled_signers(&self.ordered_authors))), ) }) .collect(), @@ -250,20 +252,25 @@ impl TPayloadManager for QuorumStorePayloadManager { return; } - let batches_and_responders = data_pointer + let (batches_and_responders, responders) = data_pointer .batch_summary .iter() .map(|proof| { let signers = proof.signers(ordered_authors); + let responders = Arc::new(Mutex::new(signers)); // TODO(ibalajiarun): Add block author to signers - (proof.info().clone(), signers) + ((proof.info().clone(), responders.clone()), responders) }) - .collect(); + .unzip(); let fut = request_txns_from_quorum_store(batches_and_responders, timestamp, batch_reader) .boxed() .shared(); - *data_fut = Some(DataFetchFut { fut, iteration: 0 }) + *data_fut = Some(DataFetchFut { + fut, + iteration: 0, + responders, + }) } match payload { @@ -381,6 +388,7 @@ impl TPayloadManager for QuorumStorePayloadManager { async fn get_transactions( &self, block: &Block, + block_signers: Option, ) -> ExecutorResult<(Vec, Option)> { let Some(payload) = block.payload() else { return Ok((Vec::new(), None)); @@ -388,7 +396,7 @@ impl TPayloadManager for QuorumStorePayloadManager { let transaction_payload = match payload { Payload::InQuorumStore(proof_with_data) => { - let transactions = process_payload( + let transactions = process_qs_payload( proof_with_data, self.batch_reader.clone(), block, @@ -401,7 +409,7 @@ impl TPayloadManager for QuorumStorePayloadManager { ) }, Payload::InQuorumStoreWithLimit(proof_with_data) => { - let transactions = process_payload( + let transactions = process_qs_payload( &proof_with_data.proof_with_data, self.batch_reader.clone(), block, @@ -420,7 +428,7 @@ impl TPayloadManager for QuorumStorePayloadManager { max_txns_to_execute, ) => { let all_transactions = { - let mut all_txns = process_payload( + let mut all_txns = process_qs_payload( proof_with_data, self.batch_reader.clone(), block, @@ -448,18 +456,20 @@ impl TPayloadManager for QuorumStorePayloadManager { ) }, Payload::OptQuorumStore(opt_qs_payload) => { - let opt_batch_txns = process_payload_helper( + let opt_batch_txns = process_optqs_payload( opt_qs_payload.opt_batches(), self.batch_reader.clone(), block, &self.ordered_authors, + block_signers.as_ref(), ) .await?; - let proof_batch_txns = process_payload_helper( + let proof_batch_txns = process_optqs_payload( opt_qs_payload.proof_with_data(), self.batch_reader.clone(), block, &self.ordered_authors, + None, ) .await?; let inline_batch_txns = opt_qs_payload.inline_batches().transactions(); @@ -549,7 +559,7 @@ async fn get_transactions_for_observer( } async fn request_txns_from_quorum_store( - batches_and_responders: Vec<(BatchInfo, Vec)>, + batches_and_responders: Vec<(BatchInfo, Arc>>)>, timestamp: u64, batch_reader: Arc, ) -> ExecutorResult> { @@ -581,44 +591,72 @@ async fn request_txns_from_quorum_store( Ok(ret) } -async fn process_payload_helper( +async fn process_optqs_payload( data_ptr: &BatchPointer, batch_reader: Arc, block: &Block, ordered_authors: &[PeerId], + additional_peers_to_request: Option<&BitVec>, ) -> ExecutorResult> { - let (iteration, fut) = { + let (iteration, fut, existing_responders) = { let data_fut_guard = data_ptr.data_fut.lock(); let data_fut = data_fut_guard.as_ref().expect("must be initialized"); - (data_fut.iteration, data_fut.fut.clone()) + ( + data_fut.iteration, + data_fut.fut.clone(), + data_fut.responders.clone(), + ) }; + let mut signers = Vec::new(); + if let Some(peers) = additional_peers_to_request { + for i in peers.iter_ones() { + if let Some(author) = ordered_authors.get(i) { + signers.push(*author); + } + } + } + + // Append the additional peers to existing responders list + // NB: this might append the same signers multiple times, but this + // should be very rare and has no negative effects. + for responders in existing_responders { + responders.lock().append(&mut signers.clone()); + } + let result = fut.await; + // If error, reschedule before returning the result if result.is_err() { let mut data_fut_guard = data_ptr.data_fut.lock(); let data_fut = data_fut_guard.as_mut().expect("must be initialized"); // Protection against race, check the iteration number before rescheduling. if data_fut.iteration == iteration { - let batches_and_responders = data_ptr + let (batches_and_responders, responders) = data_ptr .batch_summary .iter() - .map(|proof| { - let mut signers = proof.signers(ordered_authors); + .map(|summary| { + let mut signers = signers.clone(); + signers.append(&mut summary.signers(ordered_authors)); if let Some(author) = block.author() { signers.push(author); } - (proof.info().clone(), signers) + let responders = Arc::new(Mutex::new(signers)); + + ((summary.info().clone(), responders.clone()), responders) }) - .collect(); - data_fut.fut = request_txns_from_quorum_store( - batches_and_responders, - block.timestamp_usecs(), - batch_reader, - ) - .boxed() - .shared(); - data_fut.iteration = iteration + 1; + .unzip(); + *data_fut = DataFetchFut { + fut: request_txns_from_quorum_store( + batches_and_responders, + block.timestamp_usecs(), + batch_reader, + ) + .boxed() + .shared(), + iteration: iteration + 1, + responders, + }; } } result @@ -626,7 +664,7 @@ async fn process_payload_helper( /// This is deprecated. Use `process_payload_helper` instead after migrating to /// OptQuorumStore payload -async fn process_payload( +async fn process_qs_payload( proof_with_data: &ProofWithData, batch_reader: Arc, block: &Block, @@ -667,7 +705,9 @@ async fn process_payload( .map(|proof| { ( proof.info().clone(), - proof.shuffled_signers(ordered_authors), + Arc::new(Mutex::new( + proof.shuffled_signers(ordered_authors), + )), ) }) .collect(), @@ -692,7 +732,9 @@ async fn process_payload( .map(|proof| { ( proof.info().clone(), - proof.shuffled_signers(ordered_authors), + Arc::new(Mutex::new( + proof.shuffled_signers(ordered_authors), + )), ) }) .collect(), @@ -753,6 +795,7 @@ impl TPayloadManager for ConsensusObserverPayloadManager { async fn get_transactions( &self, block: &Block, + _block_signers: Option, ) -> ExecutorResult<(Vec, Option)> { return get_transactions_for_observer(block, &self.txns_pool, &self.consensus_publisher) .await; diff --git a/consensus/src/pipeline/execution_schedule_phase.rs b/consensus/src/pipeline/execution_schedule_phase.rs index 42a673a0ea4a5..3a8d170e3151d 100644 --- a/consensus/src/pipeline/execution_schedule_phase.rs +++ b/consensus/src/pipeline/execution_schedule_phase.rs @@ -102,6 +102,7 @@ impl StatelessPipeline for ExecutionSchedulePhase { b.block(), b.parent_id(), b.randomness().cloned(), + b.qc(), lifetime_guard.spawn(()), ) .await; diff --git a/consensus/src/pipeline/pipeline_builder.rs b/consensus/src/pipeline/pipeline_builder.rs index 2b41ffc3a6f69..85ddac2f87e37 100644 --- a/consensus/src/pipeline/pipeline_builder.rs +++ b/consensus/src/pipeline/pipeline_builder.rs @@ -4,8 +4,7 @@ use crate::{ block_preparer::BlockPreparer, block_storage::tracing::{observe_block, BlockStage}, - counters, - counters::{update_counters_for_block, update_counters_for_compute_result}, + counters::{self, update_counters_for_block, update_counters_for_compute_result}, execution_pipeline::SIG_VERIFY_POOL, monitor, payload_manager::TPayloadManager, @@ -23,6 +22,7 @@ use aptos_consensus_types::{ PipelineInputRx, PipelineInputTx, PipelinedBlock, PostCommitResult, PostLedgerUpdateResult, PostPreCommitResult, PreCommitResult, PrepareResult, TaskError, TaskFuture, TaskResult, }, + quorum_cert::QuorumCert, }; use aptos_crypto::HashValue; use aptos_executor_types::{state_compute_result::StateComputeResult, BlockExecutorTrait}; @@ -194,6 +194,7 @@ impl PipelineBuilder { } fn channel(abort_handles: &mut Vec) -> (PipelineInputTx, PipelineInputRx) { + let (qc_tx, qc_rx) = oneshot::channel(); let (rand_tx, rand_rx) = oneshot::channel(); let (order_vote_tx, order_vote_rx) = oneshot::channel(); let (order_proof_tx, order_proof_fut) = oneshot::channel(); @@ -216,12 +217,14 @@ impl PipelineBuilder { ); ( PipelineInputTx { + qc_tx: Some(qc_tx), rand_tx: Some(rand_tx), order_vote_tx: Some(order_vote_tx), order_proof_tx: Some(order_proof_tx), commit_proof_tx: Some(commit_proof_tx), }, PipelineInputRx { + qc_rx, rand_rx, order_vote_rx, order_proof_fut, @@ -289,6 +292,7 @@ impl PipelineBuilder { let mut abort_handles = vec![]; let (tx, rx) = Self::channel(&mut abort_handles); let PipelineInputRx { + qc_rx, rand_rx, order_vote_rx, order_proof_fut, @@ -296,7 +300,7 @@ impl PipelineBuilder { } = rx; let prepare_fut = spawn_shared_fut( - Self::prepare(self.block_preparer.clone(), block.clone()), + Self::prepare(self.block_preparer.clone(), block.clone(), qc_rx), &mut abort_handles, ); let execute_fut = spawn_shared_fut( @@ -406,12 +410,27 @@ impl PipelineBuilder { /// Precondition: Block is inserted into block tree (all ancestors are available) /// What it does: Wait for all data becomes available and verify transaction signatures - async fn prepare(preparer: Arc, block: Arc) -> TaskResult { + async fn prepare( + preparer: Arc, + block: Arc, + qc_rx: oneshot::Receiver>, + ) -> TaskResult { let mut tracker = Tracker::start_waiting("prepare", &block); tracker.start_working(); + + let qc_rx = async { + match qc_rx.await { + Ok(qc) => Some(qc), + Err(_) => { + warn!("[BlockPreparer] qc tx cancelled for block {}", block.id()); + None + }, + } + } + .shared(); // the loop can only be abort by the caller let input_txns = loop { - match preparer.prepare_block(&block).await { + match preparer.prepare_block(&block, qc_rx.clone()).await { Ok(input_txns) => break input_txns, Err(e) => { warn!( diff --git a/consensus/src/quorum_store/batch_requester.rs b/consensus/src/quorum_store/batch_requester.rs index c7dfe6aeff0c1..322054402e6e5 100644 --- a/consensus/src/quorum_store/batch_requester.rs +++ b/consensus/src/quorum_store/batch_requester.rs @@ -12,6 +12,7 @@ use crate::{ use aptos_consensus_types::proof_of_store::BatchInfo; use aptos_crypto::HashValue; use aptos_executor_types::*; +use aptos_infallible::Mutex; use aptos_logger::prelude::*; use aptos_types::{transaction::SignedTransaction, validator_verifier::ValidatorVerifier, PeerId}; use futures::{stream::FuturesUnordered, StreamExt}; @@ -20,7 +21,7 @@ use std::{sync::Arc, time::Duration}; use tokio::{sync::oneshot, time}; struct BatchRequesterState { - signers: Vec, + signers: Arc>>, next_index: usize, ret_tx: oneshot::Sender>>, num_retries: usize, @@ -29,7 +30,7 @@ struct BatchRequesterState { impl BatchRequesterState { fn new( - signers: Vec, + signers: Arc>>, ret_tx: oneshot::Sender>>, retry_limit: usize, ) -> Self { @@ -43,25 +44,25 @@ impl BatchRequesterState { } fn next_request_peers(&mut self, num_peers: usize) -> Option> { + let signers = self.signers.lock(); if self.num_retries == 0 { let mut rng = rand::thread_rng(); // make sure nodes request from the different set of nodes - self.next_index = rng.gen::() % self.signers.len(); + self.next_index = rng.gen::() % signers.len(); counters::SENT_BATCH_REQUEST_COUNT.inc_by(num_peers as u64); } else { counters::SENT_BATCH_REQUEST_RETRY_COUNT.inc_by(num_peers as u64); } if self.num_retries < self.retry_limit { self.num_retries += 1; - let ret = self - .signers + let ret = signers .iter() .cycle() .skip(self.next_index) .take(num_peers) .cloned() .collect(); - self.next_index = (self.next_index + num_peers) % self.signers.len(); + self.next_index = (self.next_index + num_peers) % signers.len(); Some(ret) } else { None @@ -132,7 +133,7 @@ impl BatchRequester { &self, digest: HashValue, expiration: u64, - responders: Vec, + responders: Arc>>, ret_tx: oneshot::Sender>>, mut subscriber_rx: oneshot::Receiver, ) -> Option<(BatchInfo, Vec)> { diff --git a/consensus/src/quorum_store/batch_store.rs b/consensus/src/quorum_store/batch_store.rs index 523744c76749f..9c594ee228cb5 100644 --- a/consensus/src/quorum_store/batch_store.rs +++ b/consensus/src/quorum_store/batch_store.rs @@ -15,6 +15,7 @@ use anyhow::bail; use aptos_consensus_types::proof_of_store::{BatchInfo, SignedBatchInfo}; use aptos_crypto::{CryptoMaterialError, HashValue}; use aptos_executor_types::{ExecutorError, ExecutorResult}; +use aptos_infallible::Mutex; use aptos_logger::prelude::*; use aptos_types::{transaction::SignedTransaction, validator_signer::ValidatorSigner, PeerId}; use dashmap::{ @@ -26,7 +27,7 @@ use once_cell::sync::OnceCell; use std::{ sync::{ atomic::{AtomicU64, Ordering}, - Arc, Mutex, + Arc, }, time::Duration, }; @@ -309,10 +310,7 @@ impl BatchStore { // Add expiration for the inserted entry, no need to be atomic w. insertion. #[allow(clippy::unwrap_used)] { - self.expirations - .lock() - .unwrap() - .add_item(digest, expiration_time); + self.expirations.lock().add_item(digest, expiration_time); } Ok(true) } @@ -346,7 +344,7 @@ impl BatchStore { // after the expiration time. This will help remote peers fetch batches that just expired but are within their // execution window. let expiration_time = certified_time.saturating_sub(self.expiration_buffer_usecs); - let expired_digests = self.expirations.lock().unwrap().expire(expiration_time); + let expired_digests = self.expirations.lock().expire(expiration_time); let mut ret = Vec::new(); for h in expired_digests { let removed_value = match self.db_cache.entry(h) { @@ -497,7 +495,7 @@ pub trait BatchReader: Send + Sync { &self, digest: HashValue, expiration: u64, - signers: Vec, + signers: Arc>>, ) -> oneshot::Receiver>>; fn update_certified_timestamp(&self, certified_time: u64); @@ -529,7 +527,7 @@ impl BatchReader for Batch &self, digest: HashValue, expiration: u64, - signers: Vec, + signers: Arc>>, ) -> oneshot::Receiver>> { let (tx, rx) = oneshot::channel(); let batch_store = self.batch_store.clone(); diff --git a/consensus/src/quorum_store/quorum_store_builder.rs b/consensus/src/quorum_store/quorum_store_builder.rs index 73a4d4f49880e..e7878e82ee336 100644 --- a/consensus/src/quorum_store/quorum_store_builder.rs +++ b/consensus/src/quorum_store/quorum_store_builder.rs @@ -25,7 +25,7 @@ use crate::{ round_manager::VerifiedEvent, }; use aptos_channels::{aptos_channel, message_queues::QueueStyle}; -use aptos_config::config::{QuorumStoreConfig, SecureBackend}; +use aptos_config::config::QuorumStoreConfig; use aptos_consensus_types::{ common::Author, proof_of_store::ProofCache, request_response::GetPayloadCommand, }; @@ -129,7 +129,6 @@ pub struct InnerBuilder { network_sender: NetworkSender, verifier: Arc, proof_cache: ProofCache, - _backend: SecureBackend, coordinator_tx: Sender, coordinator_rx: Option>, batch_generator_cmd_tx: tokio::sync::mpsc::Sender, @@ -165,7 +164,6 @@ impl InnerBuilder { network_sender: NetworkSender, verifier: Arc, proof_cache: ProofCache, - backend: SecureBackend, quorum_store_storage: Arc, broadcast_proofs: bool, consensus_key: Arc, @@ -205,7 +203,6 @@ impl InnerBuilder { network_sender, verifier, proof_cache, - _backend: backend, coordinator_tx, coordinator_rx: Some(coordinator_rx), batch_generator_cmd_tx, diff --git a/consensus/src/quorum_store/tests/batch_requester_test.rs b/consensus/src/quorum_store/tests/batch_requester_test.rs index 33b63849e1940..148fd2278f063 100644 --- a/consensus/src/quorum_store/tests/batch_requester_test.rs +++ b/consensus/src/quorum_store/tests/batch_requester_test.rs @@ -13,6 +13,7 @@ use aptos_consensus_types::{ proof_of_store::{BatchId, ProofOfStore, SignedBatchInfo}, }; use aptos_crypto::HashValue; +use aptos_infallible::Mutex; use aptos_types::{ aggregate_signature::PartialSignatures, block_info::BlockInfo, @@ -21,7 +22,10 @@ use aptos_types::{ validator_verifier::{ValidatorConsensusInfo, ValidatorVerifier}, }; use move_core_types::account_address::AccountAddress; -use std::time::{Duration, Instant}; +use std::{ + sync::Arc, + time::{Duration, Instant}, +}; use tokio::sync::oneshot; #[derive(Clone)] @@ -98,7 +102,7 @@ async fn test_batch_request_exists() { .request_batch( *batch.digest(), batch.expiration(), - vec![AccountAddress::random()], + Arc::new(Mutex::new(vec![AccountAddress::random()])), tx, subscriber_rx, ) @@ -194,7 +198,7 @@ async fn test_batch_request_not_exists_not_expired() { .request_batch( *batch.digest(), batch.expiration(), - vec![AccountAddress::random()], + Arc::new(Mutex::new(vec![AccountAddress::random()])), tx, subscriber_rx, ) @@ -242,7 +246,7 @@ async fn test_batch_request_not_exists_expired() { .request_batch( *batch.digest(), batch.expiration(), - vec![AccountAddress::random()], + Arc::new(Mutex::new(vec![AccountAddress::random()])), tx, subscriber_rx, ) diff --git a/consensus/src/quorum_store/tests/proof_coordinator_test.rs b/consensus/src/quorum_store/tests/proof_coordinator_test.rs index 2fd9d36ac57f5..8ce920aa8c84c 100644 --- a/consensus/src/quorum_store/tests/proof_coordinator_test.rs +++ b/consensus/src/quorum_store/tests/proof_coordinator_test.rs @@ -13,6 +13,7 @@ use crate::{ use aptos_consensus_types::proof_of_store::{BatchId, SignedBatchInfo, SignedBatchInfoMsg}; use aptos_crypto::HashValue; use aptos_executor_types::ExecutorResult; +use aptos_infallible::Mutex; use aptos_types::{ transaction::SignedTransaction, validator_verifier::random_validator_verifier, PeerId, }; @@ -33,7 +34,7 @@ impl BatchReader for MockBatchReader { &self, _digest: HashValue, _expiration: u64, - _signers: Vec, + _signers: Arc>>, ) -> tokio::sync::oneshot::Receiver>> { unimplemented!() } diff --git a/consensus/src/round_manager.rs b/consensus/src/round_manager.rs index 072a3a0f3c897..892c1ee7a4d27 100644 --- a/consensus/src/round_manager.rs +++ b/consensus/src/round_manager.rs @@ -1135,6 +1135,20 @@ impl RoundManager { pub async fn process_verified_proposal(&mut self, proposal: Block) -> anyhow::Result<()> { let proposal_round = proposal.round(); + let sync_info = self.block_store.sync_info(); + + if proposal_round <= sync_info.highest_round() { + sample!( + SampleRate::Duration(Duration::from_secs(1)), + warn!( + sync_info = sync_info, + proposal = proposal, + "Ignoring proposal. SyncInfo round is higher than proposal round." + ) + ); + return Ok(()); + } + let vote = self.create_vote(proposal).await?; self.round_state.record_vote(vote.clone()); let vote_msg = VoteMsg::new(vote.clone(), self.block_store.sync_info()); diff --git a/consensus/src/round_manager_test.rs b/consensus/src/round_manager_test.rs index 1123f33352919..292ddfe269ea8 100644 --- a/consensus/src/round_manager_test.rs +++ b/consensus/src/round_manager_test.rs @@ -482,6 +482,7 @@ impl NodeSetup { self.vote_queue.pop_front().unwrap() } + #[allow(unused)] pub async fn next_order_vote(&mut self) -> OrderVoteMsg { while self.order_vote_queue.is_empty() { self.next_network_message().await; diff --git a/consensus/src/state_computer.rs b/consensus/src/state_computer.rs index faa9d2439377d..a2a0ae732e198 100644 --- a/consensus/src/state_computer.rs +++ b/consensus/src/state_computer.rs @@ -21,7 +21,7 @@ use anyhow::Result; use aptos_consensus_notifications::ConsensusNotificationSender; use aptos_consensus_types::{ block::Block, common::Round, pipeline_execution_result::PipelineExecutionResult, - pipelined_block::PipelinedBlock, + pipelined_block::PipelinedBlock, quorum_cert::QuorumCert, }; use aptos_crypto::HashValue; use aptos_executor_types::{ @@ -228,6 +228,7 @@ impl StateComputer for ExecutionProxy { // The parent block id. parent_block_id: HashValue, randomness: Option, + block_qc: Option>, lifetime_guard: CountedRequest<()>, ) -> StateComputeResultFut { let block_id = block.id(); @@ -274,6 +275,7 @@ impl StateComputer for ExecutionProxy { block.clone(), metadata.clone(), parent_block_id, + block_qc, transaction_generator, block_executor_onchain_config, self.pre_commit_hook(), diff --git a/consensus/src/state_computer_tests.rs b/consensus/src/state_computer_tests.rs index 3143cc796327c..aee28239087ef 100644 --- a/consensus/src/state_computer_tests.rs +++ b/consensus/src/state_computer_tests.rs @@ -198,7 +198,7 @@ async fn should_see_and_notify_validator_txns() { // Ensure the dummy executor has received the txns. let _ = execution_policy - .schedule_compute(&block, HashValue::zero(), None, dummy_guard()) + .schedule_compute(&block, HashValue::zero(), None, None, dummy_guard()) .await .await .unwrap(); diff --git a/consensus/src/state_replication.rs b/consensus/src/state_replication.rs index bbe171fc4e3e7..297a2178ee963 100644 --- a/consensus/src/state_replication.rs +++ b/consensus/src/state_replication.rs @@ -8,7 +8,9 @@ use crate::{ transaction_deduper::TransactionDeduper, transaction_shuffler::TransactionShuffler, }; use anyhow::Result; -use aptos_consensus_types::{block::Block, pipelined_block::PipelinedBlock}; +use aptos_consensus_types::{ + block::Block, pipelined_block::PipelinedBlock, quorum_cert::QuorumCert, +}; use aptos_crypto::HashValue; use aptos_executor_types::ExecutorResult; use aptos_types::{ @@ -32,6 +34,7 @@ pub trait StateComputer: Send + Sync { // The parent block root hash. _parent_block_id: HashValue, _randomness: Option, + _block_qc: Option>, _lifetime_guard: CountedRequest<()>, ) -> StateComputeResultFut { unimplemented!(); diff --git a/consensus/src/test_utils/mock_execution_client.rs b/consensus/src/test_utils/mock_execution_client.rs index 24594f2460124..d957449ed883f 100644 --- a/consensus/src/test_utils/mock_execution_client.rs +++ b/consensus/src/test_utils/mock_execution_client.rs @@ -75,8 +75,10 @@ impl MockExecutionClient { .lock() .remove(&block.id()) .ok_or_else(|| format_err!("Cannot find block"))?; - let (mut payload_txns, _max_txns_from_block_to_execute) = - self.payload_manager.get_transactions(block.block()).await?; + let (mut payload_txns, _max_txns_from_block_to_execute) = self + .payload_manager + .get_transactions(block.block(), None) + .await?; txns.append(&mut payload_txns); } // they may fail during shutdown diff --git a/consensus/src/test_utils/mock_state_computer.rs b/consensus/src/test_utils/mock_state_computer.rs index ff620acb8a0bb..5b9c65af200a4 100644 --- a/consensus/src/test_utils/mock_state_computer.rs +++ b/consensus/src/test_utils/mock_state_computer.rs @@ -14,7 +14,7 @@ use crate::{ use anyhow::{anyhow, Result}; use aptos_consensus_types::{ block::Block, pipeline_execution_result::PipelineExecutionResult, - pipelined_block::PipelinedBlock, + pipelined_block::PipelinedBlock, quorum_cert::QuorumCert, }; use aptos_crypto::HashValue; use aptos_executor_types::{ @@ -125,6 +125,7 @@ impl StateComputer for RandomComputeResultStateComputer { _block: &Block, parent_block_id: HashValue, _randomness: Option, + _block_qc: Option>, _lifetime_guard: CountedRequest<()>, ) -> StateComputeResultFut { // trapdoor for Execution Error diff --git a/testsuite/forge/src/config.rs b/testsuite/forge/src/config.rs index 61584befb8752..980ba6c650ce2 100644 --- a/testsuite/forge/src/config.rs +++ b/testsuite/forge/src/config.rs @@ -250,6 +250,10 @@ impl ForgeConfig { helm_values["fullnode"]["config"]["consensus_observer"]["enable_pipeline"] = true.into(); } + + // enable optqs + helm_values["validator"]["config"]["consensus"]["quorum_store"] + ["enable_opt_quorum_store"] = true.into(); })) }