Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Restore Log on Error & Spawn Blocking in Streamer #5585

Merged
merged 5 commits into from
Apr 16, 2024
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
90 changes: 57 additions & 33 deletions beacon_node/beacon_chain/src/beacon_block_streamer.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,9 @@
use crate::{metrics, BeaconChain, BeaconChainError, BeaconChainTypes};
use execution_layer::{ExecutionLayer, ExecutionPayloadBodyV1};
use slog::{crit, debug, Logger};
use slog::{crit, debug, error, Logger};
use std::collections::HashMap;
use std::sync::Arc;
use store::{DatabaseBlock, ExecutionPayloadDeneb};
use task_executor::TaskExecutor;
use tokio::sync::{
mpsc::{self, UnboundedSender},
RwLock,
Expand Down Expand Up @@ -395,18 +394,18 @@ impl<T: BeaconChainTypes> BeaconBlockStreamer<T> {
pub fn new(
beacon_chain: &Arc<BeaconChain<T>>,
check_caches: CheckCaches,
) -> Result<Self, BeaconChainError> {
) -> Result<Arc<Self>, BeaconChainError> {
let execution_layer = beacon_chain
.execution_layer
.as_ref()
.ok_or(BeaconChainError::ExecutionLayerMissing)?
.clone();

Ok(Self {
Ok(Arc::new(Self {
execution_layer,
check_caches,
beacon_chain: beacon_chain.clone(),
})
}))
}

fn check_caches(&self, root: Hash256) -> Option<Arc<SignedBeaconBlock<T::EthSpec>>> {
Expand All @@ -425,30 +424,45 @@ impl<T: BeaconChainTypes> BeaconBlockStreamer<T> {
}
}

fn load_payloads(&self, block_roots: Vec<Hash256>) -> Vec<(Hash256, LoadResult<T::EthSpec>)> {
let mut db_blocks = Vec::new();

for root in block_roots {
if let Some(cached_block) = self.check_caches(root).map(LoadedBeaconBlock::Full) {
db_blocks.push((root, Ok(Some(cached_block))));
continue;
}

match self.beacon_chain.store.try_get_full_block(&root) {
Err(e) => db_blocks.push((root, Err(e.into()))),
Ok(opt_block) => db_blocks.push((
root,
Ok(opt_block.map(|db_block| match db_block {
DatabaseBlock::Full(block) => LoadedBeaconBlock::Full(Arc::new(block)),
DatabaseBlock::Blinded(block) => {
LoadedBeaconBlock::Blinded(Box::new(block))
async fn load_payloads(
self: &Arc<Self>,
block_roots: Vec<Hash256>,
) -> Result<Vec<(Hash256, LoadResult<T::EthSpec>)>, BeaconChainError> {
let streamer = self.clone();
let chain = streamer.beacon_chain.clone();
// Loading from the DB is slow -> spawn a blocking task
chain
.spawn_blocking_handle(
move || {
let mut db_blocks = Vec::new();
for root in block_roots {
if let Some(cached_block) =
streamer.check_caches(root).map(LoadedBeaconBlock::Full)
{
db_blocks.push((root, Ok(Some(cached_block))));
continue;
}
})),
)),
}
}

db_blocks
match streamer.beacon_chain.store.try_get_full_block(&root) {
Err(e) => db_blocks.push((root, Err(e.into()))),
Ok(opt_block) => db_blocks.push((
root,
Ok(opt_block.map(|db_block| match db_block {
DatabaseBlock::Full(block) => {
LoadedBeaconBlock::Full(Arc::new(block))
}
DatabaseBlock::Blinded(block) => {
LoadedBeaconBlock::Blinded(Box::new(block))
}
})),
)),
}
}
db_blocks
},
"load_beacon_blocks",
)
.await
}

/// Pre-process the loaded blocks into execution engine requests.
Expand Down Expand Up @@ -549,7 +563,7 @@ impl<T: BeaconChainTypes> BeaconBlockStreamer<T> {

// used when the execution engine doesn't support the payload bodies methods
async fn stream_blocks_fallback(
&self,
self: Arc<Self>,
block_roots: Vec<Hash256>,
sender: UnboundedSender<(Hash256, Arc<BlockResult<T::EthSpec>>)>,
) {
Expand All @@ -575,7 +589,7 @@ impl<T: BeaconChainTypes> BeaconBlockStreamer<T> {
}

async fn stream_blocks(
&self,
self: Arc<Self>,
block_roots: Vec<Hash256>,
sender: UnboundedSender<(Hash256, Arc<BlockResult<T::EthSpec>>)>,
) {
Expand All @@ -584,7 +598,17 @@ impl<T: BeaconChainTypes> BeaconBlockStreamer<T> {
let mut n_sent = 0usize;
let mut engine_requests = 0usize;

let payloads = self.load_payloads(block_roots);
let payloads = match self.load_payloads(block_roots).await {
Ok(payloads) => payloads,
Err(e) => {
error!(
self.beacon_chain.log,
"BeaconBlockStreamer: Failed to load payloads";
"error" => ?e
);
return;
}
};
let requests = self.get_requests(payloads).await;

for (root, request) in requests {
Expand Down Expand Up @@ -624,7 +648,7 @@ impl<T: BeaconChainTypes> BeaconBlockStreamer<T> {
}

pub async fn stream(
self,
self: Arc<Self>,
block_roots: Vec<Hash256>,
sender: UnboundedSender<(Hash256, Arc<BlockResult<T::EthSpec>>)>,
) {
Expand All @@ -650,16 +674,16 @@ impl<T: BeaconChainTypes> BeaconBlockStreamer<T> {
}

pub fn launch_stream(
self,
self: Arc<Self>,
block_roots: Vec<Hash256>,
executor: &TaskExecutor,
) -> impl Stream<Item = (Hash256, Arc<BlockResult<T::EthSpec>>)> {
let (block_tx, block_rx) = mpsc::unbounded_channel();
debug!(
self.beacon_chain.log,
"Launching a BeaconBlockStreamer";
"blocks" => block_roots.len(),
);
let executor = self.beacon_chain.task_executor.clone();
executor.spawn(self.stream(block_roots, block_tx), "get_blocks_sender");
UnboundedReceiverStream::new(block_rx)
}
Expand Down
8 changes: 2 additions & 6 deletions beacon_node/beacon_chain/src/beacon_chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1139,7 +1139,6 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
pub fn get_blocks_checking_caches(
self: &Arc<Self>,
block_roots: Vec<Hash256>,
executor: &TaskExecutor,
) -> Result<
impl Stream<
Item = (
Expand All @@ -1149,14 +1148,12 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
>,
Error,
> {
Ok(BeaconBlockStreamer::<T>::new(self, CheckCaches::Yes)?
.launch_stream(block_roots, executor))
Ok(BeaconBlockStreamer::<T>::new(self, CheckCaches::Yes)?.launch_stream(block_roots))
}

pub fn get_blocks(
self: &Arc<Self>,
block_roots: Vec<Hash256>,
executor: &TaskExecutor,
) -> Result<
impl Stream<
Item = (
Expand All @@ -1166,8 +1163,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
>,
Error,
> {
Ok(BeaconBlockStreamer::<T>::new(self, CheckCaches::No)?
.launch_stream(block_roots, executor))
Ok(BeaconBlockStreamer::<T>::new(self, CheckCaches::No)?.launch_stream(block_roots))
}

pub fn get_blobs_checking_early_attester_cache(
Expand Down
6 changes: 2 additions & 4 deletions beacon_node/network/src/network_beacon_processor/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -509,9 +509,8 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
) -> Result<(), Error<T::EthSpec>> {
let processor = self.clone();
let process_fn = async move {
let executor = processor.executor.clone();
processor
.handle_blocks_by_range_request(executor, peer_id, request_id, request)
.handle_blocks_by_range_request(peer_id, request_id, request)
.await;
};

Expand All @@ -530,9 +529,8 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
) -> Result<(), Error<T::EthSpec>> {
let processor = self.clone();
let process_fn = async move {
let executor = processor.executor.clone();
processor
.handle_blocks_by_root_request(executor, peer_id, request_id, request)
.handle_blocks_by_root_request(peer_id, request_id, request)
.await;
};

Expand Down
Loading