Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[optqs] bug fixes and perf improvements #15452

Merged
merged 12 commits into from
Jan 15, 2025
5 changes: 5 additions & 0 deletions consensus/consensus-types/src/block.rs
Original file line number Diff line number Diff line change
Expand Up @@ -360,6 +360,11 @@ impl Block {
"Reconfiguration suffix should not carry payload"
);
}

if let Some(payload) = self.payload() {
payload.verify_epoch(self.epoch())?;
}

if let Some(failed_authors) = self.block_data().failed_authors() {
// when validating for being well formed,
// allow for missing failed authors,
Expand Down
72 changes: 63 additions & 9 deletions consensus/consensus-types/src/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ use crate::{
payload::{OptQuorumStorePayload, PayloadExecutionLimit},
proof_of_store::{BatchInfo, ProofCache, ProofOfStore},
};
use anyhow::ensure;
use aptos_crypto::{
hash::{CryptoHash, CryptoHasher},
HashValue,
Expand Down Expand Up @@ -487,6 +488,23 @@ impl Payload {
Ok(())
}

pub fn verify_inline_batches<'a>(
inline_batches: impl Iterator<Item = (&'a BatchInfo, &'a Vec<SignedTransaction>)>,
) -> anyhow::Result<()> {
for (batch, payload) in inline_batches {
// TODO: Can cloning be avoided here?
let computed_digest = BatchPayload::new(batch.author(), payload.clone()).hash();
ensure!(
computed_digest == *batch.digest(),
"Hash of the received inline batch doesn't match the digest value for batch {}: {} != {}",
batch,
computed_digest,
batch.digest()
);
}
Ok(())
}

pub fn verify(
&self,
validator: &ValidatorVerifier,
Expand All @@ -505,20 +523,20 @@ impl Payload {
),
(true, Payload::QuorumStoreInlineHybrid(inline_batches, proof_with_data, _)) => {
Self::verify_with_cache(&proof_with_data.proofs, validator, proof_cache)?;
for (batch, payload) in inline_batches.iter() {
// TODO: Can cloning be avoided here?
if BatchPayload::new(batch.author(), payload.clone()).hash() != *batch.digest()
{
return Err(anyhow::anyhow!(
"Hash of the received inline batch doesn't match the digest value",
));
}
}
Self::verify_inline_batches(
inline_batches.iter().map(|(info, txns)| (info, txns)),
)?;
Ok(())
},
(true, Payload::OptQuorumStore(opt_quorum_store)) => {
let proof_with_data = opt_quorum_store.proof_with_data();
Self::verify_with_cache(&proof_with_data.batch_summary, validator, proof_cache)?;
Self::verify_inline_batches(
opt_quorum_store
.inline_batches()
.iter()
.map(|batch| (batch.info(), batch.transactions())),
)?;
Ok(())
},
(_, _) => Err(anyhow::anyhow!(
Expand All @@ -528,6 +546,42 @@ impl Payload {
)),
}
}

pub(crate) fn verify_epoch(&self, epoch: u64) -> anyhow::Result<()> {
match self {
Payload::DirectMempool(_) => return Ok(()),
Payload::InQuorumStore(proof_with_data) => {
ensure!(
proof_with_data.proofs.iter().all(|p| p.epoch() == epoch),
"Payload epoch doesn't match given epoch"
);
},
Payload::InQuorumStoreWithLimit(proof_with_data_with_txn_limit) => {
ensure!(
proof_with_data_with_txn_limit
.proof_with_data
.proofs
.iter()
.all(|p| p.epoch() == epoch),
"Payload epoch doesn't match given epoch"
);
},
Payload::QuorumStoreInlineHybrid(inline_batches, proof_with_data, _) => {
ensure!(
proof_with_data.proofs.iter().all(|p| p.epoch() == epoch),
"Payload proof epoch doesn't match given epoch"
);
ensure!(
inline_batches.iter().all(|b| b.0.epoch() == epoch),
"Payload inline batch epoch doesn't match given epoch"
)
},
Payload::OptQuorumStore(opt_quorum_store_payload) => {
opt_quorum_store_payload.check_epoch(epoch)?;
},
};
Ok(())
}
}

impl fmt::Display for Payload {
Expand Down
26 changes: 26 additions & 0 deletions consensus/consensus-types/src/payload.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
// SPDX-License-Identifier: Apache-2.0

use crate::proof_of_store::{BatchInfo, ProofOfStore};
use anyhow::ensure;
use aptos_executor_types::ExecutorResult;
use aptos_infallible::Mutex;
use aptos_types::{transaction::SignedTransaction, PeerId};
Expand Down Expand Up @@ -33,6 +34,7 @@ pub trait TDataInfo {

pub struct DataFetchFut {
pub iteration: u32,
pub responders: Vec<Arc<Mutex<Vec<PeerId>>>>,
pub fut: Shared<BoxFuture<'static, ExecutorResult<Vec<SignedTransaction>>>>,
}

Expand Down Expand Up @@ -186,6 +188,10 @@ impl InlineBatch {
pub fn info(&self) -> &BatchInfo {
&self.batch_info
}

pub fn transactions(&self) -> &Vec<SignedTransaction> {
&self.transactions
}
}

#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, Eq)]
Expand Down Expand Up @@ -286,6 +292,26 @@ impl OptQuorumStorePayloadV1 {
PayloadExecutionLimit::MaxTransactionsToExecute(max) => Some(max),
}
}

pub fn check_epoch(&self, epoch: u64) -> anyhow::Result<()> {
ensure!(
self.inline_batches
.iter()
.all(|b| b.info().epoch() == epoch),
"OptQS InlineBatch epoch doesn't match given epoch"
);
ensure!(
self.opt_batches.iter().all(|b| b.info().epoch() == epoch),
"OptQS OptBatch epoch doesn't match given epoch"
);

ensure!(
self.proofs.iter().all(|b| b.info().epoch() == epoch),
"OptQS Proof epoch doesn't match given epoch"
);

Ok(())
}
}

#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, Eq)]
Expand Down
16 changes: 16 additions & 0 deletions consensus/consensus-types/src/pipelined_block.rs
Original file line number Diff line number Diff line change
Expand Up @@ -105,13 +105,15 @@ impl PipelineFutures {
}

pub struct PipelineInputTx {
pub qc_tx: Option<oneshot::Sender<Arc<QuorumCert>>>,
pub rand_tx: Option<oneshot::Sender<Option<Randomness>>>,
pub order_vote_tx: Option<oneshot::Sender<()>>,
pub order_proof_tx: Option<oneshot::Sender<()>>,
pub commit_proof_tx: Option<oneshot::Sender<LedgerInfoWithSignatures>>,
}

pub struct PipelineInputRx {
pub qc_rx: oneshot::Receiver<Arc<QuorumCert>>,
pub rand_rx: oneshot::Receiver<Option<Randomness>>,
pub order_vote_rx: oneshot::Receiver<()>,
pub order_proof_fut: TaskFuture<()>,
Expand Down Expand Up @@ -145,6 +147,8 @@ pub struct PipelinedBlock {
pipeline_tx: Arc<Mutex<Option<PipelineInputTx>>>,
#[derivative(PartialEq = "ignore")]
pipeline_abort_handle: Arc<Mutex<Option<Vec<AbortHandle>>>>,
#[derivative(PartialEq = "ignore")]
block_qc: Arc<Mutex<Option<Arc<QuorumCert>>>>,
}

impl Serialize for PipelinedBlock {
Expand Down Expand Up @@ -286,6 +290,13 @@ impl PipelinedBlock {
.take()
.expect("pre_commit_result_rx missing.")
}

pub fn set_qc(&self, qc: Arc<QuorumCert>) {
*self.block_qc.lock() = Some(qc.clone());
if let Some(tx) = self.pipeline_tx().lock().as_mut() {
tx.qc_tx.take().map(|tx| tx.send(qc));
}
}
}

impl Debug for PipelinedBlock {
Expand Down Expand Up @@ -317,6 +328,7 @@ impl PipelinedBlock {
pipeline_futs: Arc::new(Mutex::new(None)),
pipeline_tx: Arc::new(Mutex::new(None)),
pipeline_abort_handle: Arc::new(Mutex::new(None)),
block_qc: Arc::new(Mutex::new(None)),
}
}

Expand Down Expand Up @@ -422,6 +434,10 @@ impl PipelinedBlock {
pub fn get_execution_summary(&self) -> Option<ExecutionSummary> {
self.execution_summary.get().cloned()
}

pub fn qc(&self) -> Option<Arc<QuorumCert>> {
self.block_qc.lock().clone()
}
}

/// Pipeline related functions
Expand Down
24 changes: 19 additions & 5 deletions consensus/src/block_preparer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,12 @@ use crate::{
transaction_filter::TransactionFilter,
transaction_shuffler::TransactionShuffler,
};
use aptos_consensus_types::block::Block;
use aptos_consensus_types::{block::Block, quorum_cert::QuorumCert};
use aptos_executor_types::ExecutorResult;
use aptos_types::transaction::SignedTransaction;
use fail::fail_point;
use std::{sync::Arc, time::Instant};
use futures::future::Shared;
use std::{future::Future, sync::Arc, time::Instant};

pub struct BlockPreparer {
payload_manager: Arc<dyn TPayloadManager>,
Expand All @@ -36,16 +37,29 @@ impl BlockPreparer {
}
}

pub async fn prepare_block(&self, block: &Block) -> ExecutorResult<Vec<SignedTransaction>> {
pub async fn prepare_block(
&self,
block: &Block,
block_qc_fut: Shared<impl Future<Output = Option<Arc<QuorumCert>>>>,
) -> ExecutorResult<Vec<SignedTransaction>> {
fail_point!("consensus::prepare_block", |_| {
use aptos_executor_types::ExecutorError;
use std::{thread, time::Duration};
thread::sleep(Duration::from_millis(10));
Err(ExecutorError::CouldNotGetData)
});
let start_time = Instant::now();
let (txns, max_txns_from_block_to_execute) =
self.payload_manager.get_transactions(block).await?;

let (txns, max_txns_from_block_to_execute) = tokio::select! {
// Poll the block qc future until a QC is received. Ignore None outcomes.
Some(qc) = block_qc_fut => {
let block_voters = Some(qc.ledger_info().get_voters_bitvec().clone());
self.payload_manager.get_transactions(block, block_voters).await
},
result = self.payload_manager.get_transactions(block, None) => {
result
}
}?;
let txn_filter = self.txn_filter.clone();
let txn_deduper = self.txn_deduper.clone();
let txn_shuffler = self.txn_shuffler.clone();
Expand Down
4 changes: 3 additions & 1 deletion consensus/src/block_storage/block_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -449,6 +449,7 @@ impl BlockStore {
pipelined_block.block().timestamp_usecs(),
BlockStage::QC_ADDED,
);
pipelined_block.set_qc(Arc::new(qc.clone()));
},
None => bail!("Insert {} without having the block in store first", qc),
};
Expand Down Expand Up @@ -524,7 +525,8 @@ impl BlockStore {

pub async fn wait_for_payload(&self, block: &Block, deadline: Duration) -> anyhow::Result<()> {
let duration = deadline.saturating_sub(self.time_service.get_current_timestamp());
tokio::time::timeout(duration, self.payload_manager.get_transactions(block)).await??;
tokio::time::timeout(duration, self.payload_manager.get_transactions(block, None))
.await??;
Ok(())
}

Expand Down
1 change: 1 addition & 0 deletions consensus/src/dag/tests/helpers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ impl TPayloadManager for MockPayloadManager {
async fn get_transactions(
&self,
_block: &Block,
_block_signers: Option<BitVec>,
) -> ExecutorResult<(Vec<SignedTransaction>, Option<u64>)> {
Ok((Vec::new(), None))
}
Expand Down
1 change: 0 additions & 1 deletion consensus/src/epoch_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -699,7 +699,6 @@ impl<P: OnChainConfigProvider> EpochManager<P> {
network_sender,
epoch_state.verifier.clone(),
self.proof_cache.clone(),
self.config.safety_rules.backend.clone(),
self.quorum_store_storage.clone(),
!consensus_config.is_dag_enabled(),
consensus_key,
Expand Down
14 changes: 11 additions & 3 deletions consensus/src/execution_pipeline.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,9 @@ use crate::{
pipeline::pipeline_phase::CountedRequest,
state_computer::StateComputeResultFut,
};
use aptos_consensus_types::{block::Block, pipeline_execution_result::PipelineExecutionResult};
use aptos_consensus_types::{
block::Block, pipeline_execution_result::PipelineExecutionResult, quorum_cert::QuorumCert,
};
use aptos_crypto::HashValue;
use aptos_executor_types::{
state_compute_result::StateComputeResult, BlockExecutorTrait, ExecutorError, ExecutorResult,
Expand All @@ -25,7 +27,7 @@ use aptos_types::{
},
};
use fail::fail_point;
use futures::future::BoxFuture;
use futures::{future::BoxFuture, FutureExt};
use once_cell::sync::Lazy;
use rayon::iter::{IndexedParallelIterator, IntoParallelIterator, ParallelIterator};
use std::{
Expand Down Expand Up @@ -92,6 +94,7 @@ impl ExecutionPipeline {
block: Block,
metadata: BlockMetadataExt,
parent_block_id: HashValue,
block_qc: Option<Arc<QuorumCert>>,
txn_generator: BlockPreparer,
block_executor_onchain_config: BlockExecutorConfigFromOnchain,
pre_commit_hook: PreCommitHook,
Expand All @@ -110,6 +113,7 @@ impl ExecutionPipeline {
command_creation_time: Instant::now(),
pre_commit_hook,
lifetime_guard,
block_qc,
})
.expect("Failed to send block to execution pipeline.");

Expand Down Expand Up @@ -139,10 +143,13 @@ impl ExecutionPipeline {
result_tx,
command_creation_time,
lifetime_guard,
block_qc,
} = command;
counters::PREPARE_BLOCK_WAIT_TIME.observe_duration(command_creation_time.elapsed());
debug!("prepare_block received block {}.", block.id());
let input_txns = block_preparer.prepare_block(&block).await;
let input_txns = block_preparer
.prepare_block(&block, async { block_qc }.shared())
.await;
if let Err(e) = input_txns {
result_tx
.send(Err(e))
Expand Down Expand Up @@ -375,6 +382,7 @@ struct PrepareBlockCommand {
result_tx: oneshot::Sender<ExecutorResult<PipelineExecutionResult>>,
command_creation_time: Instant,
lifetime_guard: CountedRequest<()>,
block_qc: Option<Arc<QuorumCert>>,
}

struct ExecuteBlockCommand {
Expand Down
Loading
Loading