Skip to content

Commit

Permalink
Merge of #5582
Browse files Browse the repository at this point in the history
  • Loading branch information
mergify[bot] authored Apr 15, 2024
2 parents fee2ee9 + 2df8092 commit 6b4d20f
Showing 1 changed file with 66 additions and 53 deletions.
119 changes: 66 additions & 53 deletions beacon_node/network/src/network_beacon_processor/rpc_methods.rs
Original file line number Diff line number Diff line change
Expand Up @@ -134,13 +134,37 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
request_id: PeerRequestId,
request: BlocksByRootRequest,
) {
self.terminate_response_stream(
peer_id,
request_id,
self.clone()
.handle_blocks_by_root_request_inner(executor, peer_id, request_id, request)
.await,
Response::BlocksByRoot,
);
}

/// Handle a `BlocksByRoot` request from the peer.
pub async fn handle_blocks_by_root_request_inner(
self: Arc<Self>,
executor: TaskExecutor,
peer_id: PeerId,
request_id: PeerRequestId,
request: BlocksByRootRequest,
) -> Result<(), (RPCResponseErrorCode, &'static str)> {
let requested_blocks = request.block_roots().len();
let mut block_stream = match self
.chain
.get_blocks_checking_caches(request.block_roots().to_vec(), &executor)
{
Ok(block_stream) => block_stream,
Err(e) => return error!(self.log, "Error getting block stream"; "error" => ?e),
Err(e) => {
error!(self.log, "Error getting block stream"; "error" => ?e);
return Err((
RPCResponseErrorCode::ServerError,
"Error getting block stream",
));
}
};
// Fetching blocks is async because it may have to hit the execution layer for payloads.
let mut send_block_count = 0;
Expand Down Expand Up @@ -169,13 +193,10 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
"block_root" => ?root,
"reason" => "execution layer not synced",
);
// send the stream terminator
return self.send_error_response(
peer_id,
return Err((
RPCResponseErrorCode::ResourceUnavailable,
"Execution layer not synced".into(),
request_id,
);
"Execution layer not synced",
));
}
Err(e) => {
debug!(
Expand All @@ -196,8 +217,7 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
"returned" => %send_block_count
);

// send stream termination
self.send_response(peer_id, Response::BlocksByRoot(None), request_id);
Ok(())
}

/// Handle a `BlobsByRoot` request from the peer.
Expand Down Expand Up @@ -380,6 +400,24 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
request_id: PeerRequestId,
req: BlocksByRangeRequest,
) {
self.terminate_response_stream(
peer_id,
request_id,
self.clone()
.handle_blocks_by_range_request_inner(executor, peer_id, request_id, req)
.await,
Response::BlocksByRange,
);
}

/// Handle a `BlocksByRange` request from the peer.
pub async fn handle_blocks_by_range_request_inner(
self: Arc<Self>,
executor: TaskExecutor,
peer_id: PeerId,
request_id: PeerRequestId,
req: BlocksByRangeRequest,
) -> Result<(), (RPCResponseErrorCode, &'static str)> {
debug!(self.log, "Received BlocksByRange Request";
"peer_id" => %peer_id,
"count" => req.count(),
Expand All @@ -401,12 +439,10 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
}
});
if *req.count() > max_request_size {
return self.send_error_response(
peer_id,
return Err((
RPCResponseErrorCode::InvalidRequest,
format!("Request exceeded max size {max_request_size}"),
request_id,
);
"Request exceeded max size",
));
}

let forwards_block_root_iter = match self
Expand All @@ -424,25 +460,15 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
"requested_slot" => slot,
"oldest_known_slot" => oldest_block_slot
);
return self.send_error_response(
peer_id,
RPCResponseErrorCode::ResourceUnavailable,
"Backfilling".into(),
request_id,
);
return Err((RPCResponseErrorCode::ResourceUnavailable, "Backfilling"));
}
Err(e) => {
self.send_error_response(
peer_id,
RPCResponseErrorCode::ServerError,
"Database error".into(),
request_id,
);
return error!(self.log, "Unable to obtain root iter";
error!(self.log, "Unable to obtain root iter";
"request" => ?req,
"peer" => %peer_id,
"error" => ?e
);
return Err((RPCResponseErrorCode::ServerError, "Database error"));
}
};

Expand All @@ -468,11 +494,12 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
let block_roots = match maybe_block_roots {
Ok(block_roots) => block_roots,
Err(e) => {
return error!(self.log, "Error during iteration over blocks";
error!(self.log, "Error during iteration over blocks";
"request" => ?req,
"peer" => %peer_id,
"error" => ?e
)
);
return Err((RPCResponseErrorCode::ServerError, "Iteration error"));
}
};

Expand All @@ -481,7 +508,10 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {

let mut block_stream = match self.chain.get_blocks(block_roots, &executor) {
Ok(block_stream) => block_stream,
Err(e) => return error!(self.log, "Error getting block stream"; "error" => ?e),
Err(e) => {
error!(self.log, "Error getting block stream"; "error" => ?e);
return Err((RPCResponseErrorCode::ServerError, "Iterator error"));
}
};

// Fetching blocks is async because it may have to hit the execution layer for payloads.
Expand Down Expand Up @@ -511,12 +541,7 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
"peer" => %peer_id,
"request_root" => ?root
);
return self.send_error_response(
peer_id,
RPCResponseErrorCode::ServerError,
"Database inconsistency".into(),
request_id,
);
return Err((RPCResponseErrorCode::ServerError, "Database inconsistency"));
}
Err(BeaconChainError::BlockHashMissingFromExecutionLayer(_)) => {
debug!(
Expand All @@ -526,12 +551,10 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
"reason" => "execution layer not synced",
);
// send the stream terminator
return self.send_error_response(
peer_id,
return Err((
RPCResponseErrorCode::ResourceUnavailable,
"Execution layer not synced".into(),
request_id,
);
"Execution layer not synced",
));
}
Err(e) => {
if matches!(
Expand All @@ -556,12 +579,7 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
}

// send the stream terminator
return self.send_error_response(
peer_id,
RPCResponseErrorCode::ServerError,
"Failed fetching blocks".into(),
request_id,
);
return Err((RPCResponseErrorCode::ServerError, "Failed fetching blocks"));
}
}
}
Expand Down Expand Up @@ -594,12 +612,7 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
);
}

// send the stream terminator
self.send_network_message(NetworkMessage::SendResponse {
peer_id,
response: Response::BlocksByRange(None),
id: request_id,
});
Ok(())
}

/// Handle a `BlobsByRange` request from the peer.
Expand Down

0 comments on commit 6b4d20f

Please sign in to comment.