Skip to content

Commit

Permalink
Simplify chain segment processing
Browse files Browse the repository at this point in the history
  • Loading branch information
paulhauner committed Jun 28, 2022
1 parent 2e9a8c0 commit 5be6374
Showing 1 changed file with 142 additions and 147 deletions.
289 changes: 142 additions & 147 deletions beacon_node/beacon_chain/src/beacon_chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,7 @@ use crate::block_times_cache::BlockTimesCache;
use crate::block_verification::{
check_block_is_finalized_descendant, check_block_relevancy, get_block_root,
signature_verify_chain_segment, BlockError, ExecutionPendingBlock, GossipVerifiedBlock,
IntoExecutionPendingBlock, PayloadVerificationOutcome, SignatureVerifiedBlock,
POS_PANDA_BANNER,
IntoExecutionPendingBlock, PayloadVerificationOutcome, POS_PANDA_BANNER,
};
use crate::chain_config::ChainConfig;
use crate::early_attester_cache::EarlyAttesterCache;
Expand Down Expand Up @@ -89,7 +88,6 @@ use store::{
DatabaseBlock, Error as DBError, HotColdDB, KeyValueStore, KeyValueStoreOp, StoreItem, StoreOp,
};
use task_executor::{ShutdownReason, TaskExecutor};
use tokio::sync::mpsc;
use tree_hash::TreeHash;
use types::beacon_state::CloneConfig;
use types::*;
Expand Down Expand Up @@ -2147,6 +2145,22 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
})
}

/// A convenience method for spawning a blocking task. It maps an `Option` and
/// `tokio::JoinError` into a single `BeaconChainError`.
async fn spawn_blocking_handle<F, R>(&self, task: F, name: &'static str) -> Result<R, Error>
where
F: FnOnce() -> R + Send + 'static,
R: Send + 'static,
{
let handle = self
.task_executor
.clone()
.spawn_blocking_handle(task, name)
.ok_or(Error::RuntimeShutdown)?;

handle.await.map_err(Error::TokioJoin)
}

/// Attempt to verify and import a chain of blocks to `self`.
///
/// The provided blocks _must_ each reference the previous block via `block.parent_root` (i.e.,
Expand All @@ -2162,149 +2176,112 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
chain_segment: Vec<Arc<SignedBeaconBlock<T::EthSpec>>>,
) -> ChainSegmentResult<T::EthSpec> {
let mut imported_blocks = 0;
let (signature_verified_blocks_tx, mut signature_verified_blocks_rx) =
mpsc::unbounded_channel();

// Spawn the task that performs signature verification on the `chain_segment`. Each time it
// verifies a sub-segment of blocks, they will be sent to the
// `signature_verified_blocks_rx` so we can further verify and import them.
let chain = self.clone();
let signature_verify_handle = match self.task_executor.spawn_blocking_handle(
// The `signature_verify_chain_segment` function *must* be called from a blocking
// (non-async) context otherwise it will panic!
let filtered_chain_segment_future = self.spawn_blocking_handle(
move || {
chain.signature_verify_chain_segment(chain_segment, signature_verified_blocks_tx)
},
"sig_verify_chain_segment",
) {
Some(handle) => handle,
None => {
return ChainSegmentResult::Failed {
imported_blocks,
error: Error::RuntimeShutdown.into(),
}
}
};
let mut filtered_chain_segment = Vec::with_capacity(chain_segment.len());

// Keep waiting for more signature verified sub-segments. Once the
// `signature_verified_blocks_rx` returns `None` the sender has been dropped and we can
// continue to checking the response of the signature verification task.
while let Some(signature_verified_blocks) = signature_verified_blocks_rx.recv().await {
// Import the blocks into the chain.
for signature_verified_block in signature_verified_blocks {
match self.process_block(signature_verified_block).await {
Ok(_) => imported_blocks += 1,
Err(error) => {
// Dropping the receiver will stop the signature verification task from
// running. The code would be functionally equivalent without the explicit
// `drop` but it is here to make the developers intentions clear.
drop(signature_verified_blocks_rx);
return ChainSegmentResult::Failed {
// Produce a list of the parent root and slot of the child of each block.
//
// E.g., `children[0] == (chain_segment[1].parent_root(), chain_segment[1].slot())`
let children = chain_segment
.iter()
.skip(1)
.map(|block| (block.parent_root(), block.slot()))
.collect::<Vec<_>>();

for (i, block) in chain_segment.into_iter().enumerate() {
// Ensure the block is the correct structure for the fork at `block.slot()`.
if let Err(e) = block.fork_name(&chain.spec) {
return Err(ChainSegmentResult::Failed {
imported_blocks,
error,
};
error: BlockError::InconsistentFork(e),
});
}
}
}
}

// Check to see if the signature verification task returned an error.
match signature_verify_handle.await {
// The signature verification task completed successfully.
Ok(Ok(())) => ChainSegmentResult::Successful { imported_blocks },
// The signature verification task completed, but returned an error.
Ok(Err(error)) => ChainSegmentResult::Failed {
imported_blocks,
error,
},
// We don't know if the signature verification task completed since tokio returned an
// error.
Err(error) => ChainSegmentResult::Failed {
imported_blocks,
error: BeaconChainError::TokioJoin(error).into(),
},
}
}
let block_root = get_block_root(&block);

/// Split the given `chain_segment` into one or more sub-segment that can be verified in a
/// batch of BLS signatures. Once a sub-segment has been verified, send it down the
/// `signature_verified_blocks_tx` channel for further verification and import.
///
/// If the receiver for `signature_verified_blocks_tx` is dropped, exit early since this
/// indicates that some block failed further verification. If one block in a chain segment is
/// invalid, all following blocks must also be invalid.
fn signature_verify_chain_segment(
self: &Arc<Self>,
chain_segment: Vec<Arc<SignedBeaconBlock<T::EthSpec>>>,
signature_verified_blocks_tx: mpsc::UnboundedSender<Vec<SignatureVerifiedBlock<T>>>,
) -> Result<(), BlockError<T::EthSpec>> {
let mut filtered_chain_segment = Vec::with_capacity(chain_segment.len());
if let Some((child_parent_root, child_slot)) = children.get(i) {
// If this block has a child in this chain segment, ensure that its parent root matches
// the root of this block.
//
// Without this check it would be possible to have a block verified using the
// incorrect shuffling. That would be bad, mmkay.
if block_root != *child_parent_root {
return Err(ChainSegmentResult::Failed {
imported_blocks,
error: BlockError::NonLinearParentRoots,
});
}

// Produce a list of the parent root and slot of the child of each block.
//
// E.g., `children[0] == (chain_segment[1].parent_root(), chain_segment[1].slot())`
let children = chain_segment
.iter()
.skip(1)
.map(|block| (block.parent_root(), block.slot()))
.collect::<Vec<_>>();

for (i, block) in chain_segment.into_iter().enumerate() {
// Ensure the block is the correct structure for the fork at `block.slot()`.
block
.fork_name(&self.spec)
.map_err(BlockError::InconsistentFork)?;

let block_root = get_block_root(&block);

if let Some((child_parent_root, child_slot)) = children.get(i) {
// If this block has a child in this chain segment, ensure that its parent root matches
// the root of this block.
//
// Without this check it would be possible to have a block verified using the
// incorrect shuffling. That would be bad, mmkay.
if block_root != *child_parent_root {
return Err(BlockError::NonLinearParentRoots);
}
// Ensure that the slots are strictly increasing throughout the chain segment.
if *child_slot <= block.slot() {
return Err(ChainSegmentResult::Failed {
imported_blocks,
error: BlockError::NonLinearSlots,
});
}
}

// Ensure that the slots are strictly increasing throughout the chain segment.
if *child_slot <= block.slot() {
return Err(BlockError::NonLinearSlots);
match check_block_relevancy(&block, Some(block_root), &chain) {
// If the block is relevant, add it to the filtered chain segment.
Ok(_) => filtered_chain_segment.push((block_root, block)),
// If the block is already known, simply ignore this block.
Err(BlockError::BlockIsAlreadyKnown) => continue,
// If the block is the genesis block, simply ignore this block.
Err(BlockError::GenesisBlock) => continue,
// If the block is is for a finalized slot, simply ignore this block.
//
// The block is either:
//
// 1. In the canonical finalized chain.
// 2. In some non-canonical chain at a slot that has been finalized already.
//
// In the case of (1), there's no need to re-import and later blocks in this
// segement might be useful.
//
// In the case of (2), skipping the block is valid since we should never import it.
// However, we will potentially get a `ParentUnknown` on a later block. The sync
// protocol will need to ensure this is handled gracefully.
Err(BlockError::WouldRevertFinalizedSlot { .. }) => continue,
// The block has a known parent that does not descend from the finalized block.
// There is no need to process this block or any children.
Err(BlockError::NotFinalizedDescendant { block_parent_root }) => {
return Err(ChainSegmentResult::Failed {
imported_blocks,
error: BlockError::NotFinalizedDescendant { block_parent_root },
});
}
// If there was an error whilst determining if the block was invalid, return that
// error.
Err(BlockError::BeaconChainError(e)) => {
return Err(ChainSegmentResult::Failed {
imported_blocks,
error: BlockError::BeaconChainError(e),
});
}
// If the block was decided to be irrelevant for any other reason, don't include
// this block or any of it's children in the filtered chain segment.
_ => break,
}
}
}

match check_block_relevancy(&block, Some(block_root), self) {
// If the block is relevant, add it to the filtered chain segment.
Ok(_) => filtered_chain_segment.push((block_root, block)),
// If the block is already known, simply ignore this block.
Err(BlockError::BlockIsAlreadyKnown) => continue,
// If the block is the genesis block, simply ignore this block.
Err(BlockError::GenesisBlock) => continue,
// If the block is is for a finalized slot, simply ignore this block.
//
// The block is either:
//
// 1. In the canonical finalized chain.
// 2. In some non-canonical chain at a slot that has been finalized already.
//
// In the case of (1), there's no need to re-import and later blocks in this
// segement might be useful.
//
// In the case of (2), skipping the block is valid since we should never import it.
// However, we will potentially get a `ParentUnknown` on a later block. The sync
// protocol will need to ensure this is handled gracefully.
Err(BlockError::WouldRevertFinalizedSlot { .. }) => continue,
// The block has a known parent that does not descend from the finalized block.
// There is no need to process this block or any children.
Err(e @ BlockError::NotFinalizedDescendant { .. }) => return Err(e),
// If there was an error whilst determining if the block was invalid, return that
// error.
Err(e @ BlockError::BeaconChainError(_)) => return Err(e),
// If the block was decided to be irrelevant for any other reason, don't include
// this block or any of it's children in the filtered chain segment.
_ => break,
Ok(filtered_chain_segment)
},
"filter_chain_segement",
);

let mut filtered_chain_segment = match filtered_chain_segment_future.await {
Ok(Ok(filtered_segment)) => filtered_segment,
Ok(Err(segment_result)) => return segment_result,
Err(error) => {
return ChainSegmentResult::Failed {
imported_blocks,
error: BlockError::BeaconChainError(error),
}
}
}
};

while let Some((_root, block)) = filtered_chain_segment.first() {
// Determine the epoch of the first block in the remaining segment.
Expand All @@ -2325,26 +2302,44 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
let mut blocks = filtered_chain_segment.split_off(last_index);
std::mem::swap(&mut blocks, &mut filtered_chain_segment);

// Try to exit early if the upstream function has dropped the receiver, indicating an
// invalid block import.
if signature_verified_blocks_tx.is_closed() {
return Err(BlockError::BlockImportCancelled);
}
let chain = self.clone();
let signature_verification_future = self.spawn_blocking_handle(
move || signature_verify_chain_segment(blocks, &chain),
"signature_verify_chain_segment",
);

// Verify the signature of the blocks, returning early if the signature is invalid.
let signature_verified_blocks = signature_verify_chain_segment(blocks, self)?;
let signature_verified_blocks = match signature_verification_future.await {
Ok(Ok(blocks)) => blocks,
Ok(Err(error)) => {
return ChainSegmentResult::Failed {
imported_blocks,
error,
};
}
Err(error) => {
return ChainSegmentResult::Failed {
imported_blocks,
error: BlockError::BeaconChainError(error),
};
}
};

if let Err(e) = signature_verified_blocks_tx.send(signature_verified_blocks) {
debug!(
self.log,
"Aborted chain segment verification";
"error" => %e
);
return Err(BlockError::BlockImportCancelled);
// Import the blocks into the chain.
for signature_verified_block in signature_verified_blocks {
match self.process_block(signature_verified_block).await {
Ok(_) => imported_blocks += 1,
Err(error) => {
return ChainSegmentResult::Failed {
imported_blocks,
error,
};
}
}
}
}

Ok(())
ChainSegmentResult::Successful { imported_blocks }
}

/// Returns `Ok(GossipVerifiedBlock)` if the supplied `block` should be forwarded onto the
Expand Down

0 comments on commit 5be6374

Please sign in to comment.