From 9924232e7d2c9dd1f67caa02b668e60ad926ffef Mon Sep 17 00:00:00 2001 From: Mark Mackey Date: Fri, 12 Apr 2024 14:21:32 -0500 Subject: [PATCH 1/4] Restore Logging in Error Cases --- .../network_beacon_processor/rpc_methods.rs | 120 ++++++++++-------- 1 file changed, 67 insertions(+), 53 deletions(-) diff --git a/beacon_node/network/src/network_beacon_processor/rpc_methods.rs b/beacon_node/network/src/network_beacon_processor/rpc_methods.rs index 81c8f662ee9..d075298e593 100644 --- a/beacon_node/network/src/network_beacon_processor/rpc_methods.rs +++ b/beacon_node/network/src/network_beacon_processor/rpc_methods.rs @@ -134,6 +134,16 @@ impl NetworkBeaconProcessor { request_id: PeerRequestId, request: BlocksByRootRequest, ) { + let log_results = |peer_id, requested_blocks, send_block_count| { + debug!( + self.log, + "BlocksByRoot Outgoing Response Processed"; + "peer" => %peer_id, + "requested" => requested_blocks, + "returned" => %send_block_count + ); + }; + let requested_blocks = request.block_roots().len(); let mut block_stream = match self .chain @@ -169,6 +179,8 @@ impl NetworkBeaconProcessor { "block_root" => ?root, "reason" => "execution layer not synced", ); + + log_results(peer_id, requested_blocks, send_block_count); // send the stream terminator return self.send_error_response( peer_id, @@ -188,13 +200,7 @@ impl NetworkBeaconProcessor { } } } - debug!( - self.log, - "Received BlocksByRoot Request"; - "peer" => %peer_id, - "requested" => requested_blocks, - "returned" => %send_block_count - ); + log_results(peer_id, requested_blocks, send_block_count); // send stream termination self.send_response(peer_id, Response::BlocksByRoot(None), request_id); @@ -479,6 +485,36 @@ impl NetworkBeaconProcessor { // remove all skip slots let block_roots = block_roots.into_iter().flatten().collect::>(); + let current_slot = self + .chain + .slot() + .unwrap_or_else(|_| self.chain.slot_clock.genesis_slot()); + + let log_results = |req: BlocksByRangeRequest, peer_id, blocks_sent| { + if blocks_sent < (*req.count() as usize) { + debug!( + self.log, + "BlocksByRange Outgoing Response Processed"; + "peer" => %peer_id, + "msg" => "Failed to return all requested blocks", + "start_slot" => req.start_slot(), + "current_slot" => current_slot, + "requested" => req.count(), + "returned" => blocks_sent + ); + } else { + debug!( + self.log, + "BlocksByRange Outgoing Response Processed"; + "peer" => %peer_id, + "start_slot" => req.start_slot(), + "current_slot" => current_slot, + "requested" => req.count(), + "returned" => blocks_sent + ); + } + }; + let mut block_stream = match self.chain.get_blocks(block_roots, &executor) { Ok(block_stream) => block_stream, Err(e) => return error!(self.log, "Error getting block stream"; "error" => ?e), @@ -486,7 +522,6 @@ impl NetworkBeaconProcessor { // Fetching blocks is async because it may have to hit the execution layer for payloads. let mut blocks_sent = 0; - while let Some((root, result)) = block_stream.next().await { match result.as_ref() { Ok(Some(block)) => { @@ -511,6 +546,7 @@ impl NetworkBeaconProcessor { "peer" => %peer_id, "request_root" => ?root ); + log_results(req, peer_id, blocks_sent); return self.send_error_response( peer_id, RPCResponseErrorCode::ServerError, @@ -525,6 +561,7 @@ impl NetworkBeaconProcessor { "block_root" => ?root, "reason" => "execution layer not synced", ); + log_results(req, peer_id, blocks_sent); // send the stream terminator return self.send_error_response( peer_id, @@ -554,7 +591,7 @@ impl NetworkBeaconProcessor { "error" => ?e ); } - + log_results(req, peer_id, blocks_sent); // send the stream terminator return self.send_error_response( peer_id, @@ -566,34 +603,7 @@ impl NetworkBeaconProcessor { } } - let current_slot = self - .chain - .slot() - .unwrap_or_else(|_| self.chain.slot_clock.genesis_slot()); - - if blocks_sent < (*req.count() as usize) { - debug!( - self.log, - "BlocksByRange outgoing response processed"; - "peer" => %peer_id, - "msg" => "Failed to return all requested blocks", - "start_slot" => req.start_slot(), - "current_slot" => current_slot, - "requested" => req.count(), - "returned" => blocks_sent - ); - } else { - debug!( - self.log, - "BlocksByRange outgoing response processed"; - "peer" => %peer_id, - "start_slot" => req.start_slot(), - "current_slot" => current_slot, - "requested" => req.count(), - "returned" => blocks_sent - ); - } - + log_results(req, peer_id, blocks_sent); // send the stream terminator self.send_network_message(NetworkMessage::SendResponse { peer_id, @@ -741,9 +751,25 @@ impl NetworkBeaconProcessor { } }; + let current_slot = self + .chain + .slot() + .unwrap_or_else(|_| self.chain.slot_clock.genesis_slot()); + + let log_results = |peer_id, req: BlobsByRangeRequest, blobs_sent| { + debug!( + self.log, + "BlobsByRange Outgoing Response processed"; + "peer" => %peer_id, + "start_slot" => req.start_slot, + "current_slot" => current_slot, + "requested" => req.count, + "returned" => blobs_sent + ); + }; + // remove all skip slots let block_roots = block_roots.into_iter().flatten(); - let mut blobs_sent = 0; for root in block_roots { @@ -767,6 +793,8 @@ impl NetworkBeaconProcessor { "block_root" => ?root, "error" => ?e ); + log_results(peer_id, req, blobs_sent); + return Err(( RPCResponseErrorCode::ServerError, "No blobs and failed fetching corresponding block", @@ -774,21 +802,7 @@ impl NetworkBeaconProcessor { } } } - - let current_slot = self - .chain - .slot() - .unwrap_or_else(|_| self.chain.slot_clock.genesis_slot()); - - debug!( - self.log, - "BlobsByRange Response processed"; - "peer" => %peer_id, - "start_slot" => req.start_slot, - "current_slot" => current_slot, - "requested" => req.count, - "returned" => blobs_sent - ); + log_results(peer_id, req, blobs_sent); Ok(()) } From 4ca0475f8486c43a8fdf54a87aad69e6836676d2 Mon Sep 17 00:00:00 2001 From: Mark Mackey Date: Mon, 15 Apr 2024 15:31:20 -0500 Subject: [PATCH 2/4] Use Spawn Blocking for Loading Blocks in Streamer --- .../beacon_chain/src/beacon_block_streamer.rs | 90 ++++++++++++------- beacon_node/beacon_chain/src/beacon_chain.rs | 8 +- .../src/network_beacon_processor/mod.rs | 6 +- .../network_beacon_processor/rpc_methods.rs | 7 +- 4 files changed, 63 insertions(+), 48 deletions(-) diff --git a/beacon_node/beacon_chain/src/beacon_block_streamer.rs b/beacon_node/beacon_chain/src/beacon_block_streamer.rs index bbd5bfcac9a..e15f009e956 100644 --- a/beacon_node/beacon_chain/src/beacon_block_streamer.rs +++ b/beacon_node/beacon_chain/src/beacon_block_streamer.rs @@ -1,10 +1,9 @@ use crate::{metrics, BeaconChain, BeaconChainError, BeaconChainTypes}; use execution_layer::{ExecutionLayer, ExecutionPayloadBodyV1}; -use slog::{crit, debug, Logger}; +use slog::{crit, debug, error, Logger}; use std::collections::HashMap; use std::sync::Arc; use store::{DatabaseBlock, ExecutionPayloadDeneb}; -use task_executor::TaskExecutor; use tokio::sync::{ mpsc::{self, UnboundedSender}, RwLock, @@ -395,18 +394,18 @@ impl BeaconBlockStreamer { pub fn new( beacon_chain: &Arc>, check_caches: CheckCaches, - ) -> Result { + ) -> Result, BeaconChainError> { let execution_layer = beacon_chain .execution_layer .as_ref() .ok_or(BeaconChainError::ExecutionLayerMissing)? .clone(); - Ok(Self { + Ok(Arc::new(Self { execution_layer, check_caches, beacon_chain: beacon_chain.clone(), - }) + })) } fn check_caches(&self, root: Hash256) -> Option>> { @@ -425,30 +424,45 @@ impl BeaconBlockStreamer { } } - fn load_payloads(&self, block_roots: Vec) -> Vec<(Hash256, LoadResult)> { - let mut db_blocks = Vec::new(); - - for root in block_roots { - if let Some(cached_block) = self.check_caches(root).map(LoadedBeaconBlock::Full) { - db_blocks.push((root, Ok(Some(cached_block)))); - continue; - } - - match self.beacon_chain.store.try_get_full_block(&root) { - Err(e) => db_blocks.push((root, Err(e.into()))), - Ok(opt_block) => db_blocks.push(( - root, - Ok(opt_block.map(|db_block| match db_block { - DatabaseBlock::Full(block) => LoadedBeaconBlock::Full(Arc::new(block)), - DatabaseBlock::Blinded(block) => { - LoadedBeaconBlock::Blinded(Box::new(block)) + async fn load_payloads( + self: &Arc, + block_roots: Vec, + ) -> Result)>, BeaconChainError> { + let streamer = self.clone(); + let chain = streamer.beacon_chain.clone(); + // Loading from the DB is slow -> spawn a blocking task + chain + .spawn_blocking_handle( + move || { + let mut db_blocks = Vec::new(); + for root in block_roots { + if let Some(cached_block) = + streamer.check_caches(root).map(LoadedBeaconBlock::Full) + { + db_blocks.push((root, Ok(Some(cached_block)))); + continue; } - })), - )), - } - } - db_blocks + match streamer.beacon_chain.store.try_get_full_block(&root) { + Err(e) => db_blocks.push((root, Err(e.into()))), + Ok(opt_block) => db_blocks.push(( + root, + Ok(opt_block.map(|db_block| match db_block { + DatabaseBlock::Full(block) => { + LoadedBeaconBlock::Full(Arc::new(block)) + } + DatabaseBlock::Blinded(block) => { + LoadedBeaconBlock::Blinded(Box::new(block)) + } + })), + )), + } + } + db_blocks + }, + "load_beacon_blocks", + ) + .await } /// Pre-process the loaded blocks into execution engine requests. @@ -549,7 +563,7 @@ impl BeaconBlockStreamer { // used when the execution engine doesn't support the payload bodies methods async fn stream_blocks_fallback( - &self, + self: Arc, block_roots: Vec, sender: UnboundedSender<(Hash256, Arc>)>, ) { @@ -575,7 +589,7 @@ impl BeaconBlockStreamer { } async fn stream_blocks( - &self, + self: Arc, block_roots: Vec, sender: UnboundedSender<(Hash256, Arc>)>, ) { @@ -584,7 +598,17 @@ impl BeaconBlockStreamer { let mut n_sent = 0usize; let mut engine_requests = 0usize; - let payloads = self.load_payloads(block_roots); + let payloads = match self.load_payloads(block_roots).await { + Ok(payloads) => payloads, + Err(e) => { + error!( + self.beacon_chain.log, + "BeaconBlockStreamer: Failed to load payloads"; + "error" => ?e + ); + return; + } + }; let requests = self.get_requests(payloads).await; for (root, request) in requests { @@ -624,7 +648,7 @@ impl BeaconBlockStreamer { } pub async fn stream( - self, + self: Arc, block_roots: Vec, sender: UnboundedSender<(Hash256, Arc>)>, ) { @@ -650,9 +674,8 @@ impl BeaconBlockStreamer { } pub fn launch_stream( - self, + self: Arc, block_roots: Vec, - executor: &TaskExecutor, ) -> impl Stream>)> { let (block_tx, block_rx) = mpsc::unbounded_channel(); debug!( @@ -660,6 +683,7 @@ impl BeaconBlockStreamer { "Launching a BeaconBlockStreamer"; "blocks" => block_roots.len(), ); + let executor = self.beacon_chain.task_executor.clone(); executor.spawn(self.stream(block_roots, block_tx), "get_blocks_sender"); UnboundedReceiverStream::new(block_rx) } diff --git a/beacon_node/beacon_chain/src/beacon_chain.rs b/beacon_node/beacon_chain/src/beacon_chain.rs index 7c497e74584..b3790024f81 100644 --- a/beacon_node/beacon_chain/src/beacon_chain.rs +++ b/beacon_node/beacon_chain/src/beacon_chain.rs @@ -1139,7 +1139,6 @@ impl BeaconChain { pub fn get_blocks_checking_caches( self: &Arc, block_roots: Vec, - executor: &TaskExecutor, ) -> Result< impl Stream< Item = ( @@ -1149,14 +1148,12 @@ impl BeaconChain { >, Error, > { - Ok(BeaconBlockStreamer::::new(self, CheckCaches::Yes)? - .launch_stream(block_roots, executor)) + Ok(BeaconBlockStreamer::::new(self, CheckCaches::Yes)?.launch_stream(block_roots)) } pub fn get_blocks( self: &Arc, block_roots: Vec, - executor: &TaskExecutor, ) -> Result< impl Stream< Item = ( @@ -1166,8 +1163,7 @@ impl BeaconChain { >, Error, > { - Ok(BeaconBlockStreamer::::new(self, CheckCaches::No)? - .launch_stream(block_roots, executor)) + Ok(BeaconBlockStreamer::::new(self, CheckCaches::No)?.launch_stream(block_roots)) } pub fn get_blobs_checking_early_attester_cache( diff --git a/beacon_node/network/src/network_beacon_processor/mod.rs b/beacon_node/network/src/network_beacon_processor/mod.rs index 27b9e676da6..f10646c7414 100644 --- a/beacon_node/network/src/network_beacon_processor/mod.rs +++ b/beacon_node/network/src/network_beacon_processor/mod.rs @@ -509,9 +509,8 @@ impl NetworkBeaconProcessor { ) -> Result<(), Error> { let processor = self.clone(); let process_fn = async move { - let executor = processor.executor.clone(); processor - .handle_blocks_by_range_request(executor, peer_id, request_id, request) + .handle_blocks_by_range_request(peer_id, request_id, request) .await; }; @@ -530,9 +529,8 @@ impl NetworkBeaconProcessor { ) -> Result<(), Error> { let processor = self.clone(); let process_fn = async move { - let executor = processor.executor.clone(); processor - .handle_blocks_by_root_request(executor, peer_id, request_id, request) + .handle_blocks_by_root_request(peer_id, request_id, request) .await; }; diff --git a/beacon_node/network/src/network_beacon_processor/rpc_methods.rs b/beacon_node/network/src/network_beacon_processor/rpc_methods.rs index d075298e593..d7dab7fd1c6 100644 --- a/beacon_node/network/src/network_beacon_processor/rpc_methods.rs +++ b/beacon_node/network/src/network_beacon_processor/rpc_methods.rs @@ -11,7 +11,6 @@ use slog::{debug, error, warn}; use slot_clock::SlotClock; use std::collections::{hash_map::Entry, HashMap}; use std::sync::Arc; -use task_executor::TaskExecutor; use tokio_stream::StreamExt; use types::blob_sidecar::BlobIdentifier; use types::{Epoch, EthSpec, ForkName, Hash256, Slot}; @@ -129,7 +128,6 @@ impl NetworkBeaconProcessor { /// Handle a `BlocksByRoot` request from the peer. pub async fn handle_blocks_by_root_request( self: Arc, - executor: TaskExecutor, peer_id: PeerId, request_id: PeerRequestId, request: BlocksByRootRequest, @@ -147,7 +145,7 @@ impl NetworkBeaconProcessor { let requested_blocks = request.block_roots().len(); let mut block_stream = match self .chain - .get_blocks_checking_caches(request.block_roots().to_vec(), &executor) + .get_blocks_checking_caches(request.block_roots().to_vec()) { Ok(block_stream) => block_stream, Err(e) => return error!(self.log, "Error getting block stream"; "error" => ?e), @@ -381,7 +379,6 @@ impl NetworkBeaconProcessor { /// Handle a `BlocksByRange` request from the peer. pub async fn handle_blocks_by_range_request( self: Arc, - executor: TaskExecutor, peer_id: PeerId, request_id: PeerRequestId, req: BlocksByRangeRequest, @@ -515,7 +512,7 @@ impl NetworkBeaconProcessor { } }; - let mut block_stream = match self.chain.get_blocks(block_roots, &executor) { + let mut block_stream = match self.chain.get_blocks(block_roots) { Ok(block_stream) => block_stream, Err(e) => return error!(self.log, "Error getting block stream"; "error" => ?e), }; From d0f1ae5605e6d0f476d435988cf0cb489ff083e5 Mon Sep 17 00:00:00 2001 From: Mark Mackey Date: Tue, 16 Apr 2024 08:44:32 -0500 Subject: [PATCH 3/4] Address Sean's Comments --- .../src/network_beacon_processor/rpc_methods.rs | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/beacon_node/network/src/network_beacon_processor/rpc_methods.rs b/beacon_node/network/src/network_beacon_processor/rpc_methods.rs index f47a3d9a4a9..1e72dc42578 100644 --- a/beacon_node/network/src/network_beacon_processor/rpc_methods.rs +++ b/beacon_node/network/src/network_beacon_processor/rpc_methods.rs @@ -152,7 +152,7 @@ impl NetworkBeaconProcessor { let log_results = |peer_id, requested_blocks, send_block_count| { debug!( self.log, - "BlocksByRoot Outgoing Response Processed"; + "BlocksByRoot outgoing response processed"; "peer" => %peer_id, "requested" => requested_blocks, "returned" => %send_block_count @@ -304,7 +304,7 @@ impl NetworkBeaconProcessor { } debug!( self.log, - "Received BlobsByRoot Request"; + "BlobsByRoot outgoing response processed"; "peer" => %peer_id, "request_root" => %requested_root, "request_indices" => ?requested_indices, @@ -515,7 +515,7 @@ impl NetworkBeaconProcessor { if blocks_sent < (*req.count() as usize) { debug!( self.log, - "BlocksByRange Outgoing Response Processed"; + "BlocksByRange outgoing response processed"; "peer" => %peer_id, "msg" => "Failed to return all requested blocks", "start_slot" => req.start_slot(), @@ -526,7 +526,7 @@ impl NetworkBeaconProcessor { } else { debug!( self.log, - "BlocksByRange Outgoing Response Processed"; + "BlocksByRange outgoing response processed"; "peer" => %peer_id, "start_slot" => req.start_slot(), "current_slot" => current_slot, @@ -766,7 +766,7 @@ impl NetworkBeaconProcessor { let log_results = |peer_id, req: BlobsByRangeRequest, blobs_sent| { debug!( self.log, - "BlobsByRange Outgoing Response processed"; + "BlobsByRange outgoing response processed"; "peer" => %peer_id, "start_slot" => req.start_slot, "current_slot" => current_slot, From 8b44c62f01dffa56873b7b5466ba052f41bc5de5 Mon Sep 17 00:00:00 2001 From: Mark Mackey Date: Tue, 16 Apr 2024 08:47:19 -0500 Subject: [PATCH 4/4] save a clone --- beacon_node/beacon_chain/src/beacon_block_streamer.rs | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/beacon_node/beacon_chain/src/beacon_block_streamer.rs b/beacon_node/beacon_chain/src/beacon_block_streamer.rs index e15f009e956..4f413ce2a86 100644 --- a/beacon_node/beacon_chain/src/beacon_block_streamer.rs +++ b/beacon_node/beacon_chain/src/beacon_block_streamer.rs @@ -429,9 +429,8 @@ impl BeaconBlockStreamer { block_roots: Vec, ) -> Result)>, BeaconChainError> { let streamer = self.clone(); - let chain = streamer.beacon_chain.clone(); // Loading from the DB is slow -> spawn a blocking task - chain + self.beacon_chain .spawn_blocking_handle( move || { let mut db_blocks = Vec::new();