Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Handle sync lookup request streams in network context #5583

Merged
merged 8 commits into from
Apr 22, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
156 changes: 24 additions & 132 deletions beacon_node/network/src/sync/block_lookups/common.rs
Original file line number Diff line number Diff line change
@@ -1,22 +1,21 @@
use crate::sync::block_lookups::parent_lookup::PARENT_FAIL_TOLERANCE;
use crate::sync::block_lookups::single_block_lookup::{
LookupRequestError, LookupVerifyError, SingleBlockLookup, SingleLookupRequestState, State,
LookupRequestError, SingleBlockLookup, SingleLookupRequestState,
};
use crate::sync::block_lookups::{
BlobRequestState, BlockLookups, BlockRequestState, PeerId, SINGLE_BLOCK_LOOKUP_MAX_ATTEMPTS,
};
use crate::sync::manager::{BlockProcessType, Id, SingleLookupReqId};
use crate::sync::network_context::SyncNetworkContext;
use crate::sync::network_context::{
BlobsByRootSingleBlockRequest, BlocksByRootSingleRequest, SyncNetworkContext,
};
use beacon_chain::block_verification_types::RpcBlock;
use beacon_chain::data_availability_checker::ChildComponents;
use beacon_chain::{get_block_root, BeaconChainTypes};
use lighthouse_network::rpc::methods::BlobsByRootRequest;
use lighthouse_network::rpc::BlocksByRootRequest;
use std::ops::IndexMut;
use beacon_chain::BeaconChainTypes;
use std::sync::Arc;
use std::time::Duration;
use types::blob_sidecar::{BlobIdentifier, FixedBlobSidecarList};
use types::{BlobSidecar, ChainSpec, Hash256, SignedBeaconBlock};
use types::blob_sidecar::FixedBlobSidecarList;
use types::{Hash256, SignedBeaconBlock};

#[derive(Debug, Copy, Clone)]
pub enum ResponseType {
Expand Down Expand Up @@ -73,9 +72,6 @@ pub trait RequestState<L: Lookup, T: BeaconChainTypes> {
/// The type of the request .
type RequestType;

/// A block or blob response.
type ResponseType;

/// The type created after validation.
type VerifiedResponseType: Clone;

Expand All @@ -85,30 +81,27 @@ pub trait RequestState<L: Lookup, T: BeaconChainTypes> {
/* Request building methods */

/// Construct a new request.
fn build_request(
&mut self,
spec: &ChainSpec,
) -> Result<(PeerId, Self::RequestType), LookupRequestError> {
fn build_request(&mut self) -> Result<(PeerId, Self::RequestType), LookupRequestError> {
// Verify and construct request.
self.too_many_attempts()?;
let peer = self.get_peer()?;
let request = self.new_request(spec);
let request = self.new_request();
Ok((peer, request))
}

/// Construct a new request and send it.
fn build_request_and_send(
&mut self,
id: Id,
cx: &SyncNetworkContext<T>,
cx: &mut SyncNetworkContext<T>,
) -> Result<(), LookupRequestError> {
// Check if request is necessary.
if !self.get_state().is_awaiting_download() {
return Ok(());
}

// Construct request.
let (peer_id, request) = self.build_request(&cx.chain.spec)?;
let (peer_id, request) = self.build_request()?;

// Update request state.
let req_counter = self.get_state_mut().on_download_start(peer_id);
Expand Down Expand Up @@ -144,61 +137,18 @@ pub trait RequestState<L: Lookup, T: BeaconChainTypes> {
}

/// Initialize `Self::RequestType`.
fn new_request(&self, spec: &ChainSpec) -> Self::RequestType;
fn new_request(&self) -> Self::RequestType;

/// Send the request to the network service.
fn make_request(
id: SingleLookupReqId,
peer_id: PeerId,
request: Self::RequestType,
cx: &SyncNetworkContext<T>,
cx: &mut SyncNetworkContext<T>,
) -> Result<(), LookupRequestError>;

/* Response handling methods */

/// Verify the response is valid based on what we requested.
fn verify_response(
&mut self,
expected_block_root: Hash256,
peer_id: PeerId,
response: Option<Self::ResponseType>,
) -> Result<Option<Self::VerifiedResponseType>, LookupVerifyError> {
let result = match *self.get_state().get_state() {
State::AwaitingDownload => Err(LookupVerifyError::ExtraBlocksReturned),
State::Downloading { peer_id: _ } => {
// TODO: We requested a download from Downloading { peer_id }, but the network
// injects a response from a different peer_id. What should we do? The peer_id to
// track for scoring is the one that actually sent the response, not the state's
self.verify_response_inner(expected_block_root, response)
}
State::Processing { .. } | State::Processed { .. } => match response {
// We sent the block for processing and received an extra block.
Some(_) => Err(LookupVerifyError::ExtraBlocksReturned),
// This is simply the stream termination and we are already processing the block
None => Ok(None),
},
};

match result {
Ok(Some(response)) => {
self.get_state_mut().on_download_success(peer_id);
Ok(Some(response))
}
Ok(None) => Ok(None),
Err(e) => {
self.get_state_mut().on_download_failure();
Err(e)
}
}
}

/// The response verification unique to block or blobs.
fn verify_response_inner(
&mut self,
expected_block_root: Hash256,
response: Option<Self::ResponseType>,
) -> Result<Option<Self::VerifiedResponseType>, LookupVerifyError>;

/// A getter for the parent root of the response. Returns an `Option` because we won't know
/// the blob parent if we don't end up getting any blobs in the response.
fn get_parent_root(verified_response: &Self::VerifiedResponseType) -> Option<Hash256>;
Expand Down Expand Up @@ -247,49 +197,24 @@ pub trait RequestState<L: Lookup, T: BeaconChainTypes> {
}

impl<L: Lookup, T: BeaconChainTypes> RequestState<L, T> for BlockRequestState<L> {
type RequestType = BlocksByRootRequest;
type ResponseType = Arc<SignedBeaconBlock<T::EthSpec>>;
type RequestType = BlocksByRootSingleRequest;
type VerifiedResponseType = Arc<SignedBeaconBlock<T::EthSpec>>;
type ReconstructedResponseType = RpcBlock<T::EthSpec>;

fn new_request(&self, spec: &ChainSpec) -> BlocksByRootRequest {
BlocksByRootRequest::new(vec![self.requested_block_root], spec)
fn new_request(&self) -> Self::RequestType {
BlocksByRootSingleRequest(self.requested_block_root)
}

fn make_request(
id: SingleLookupReqId,
peer_id: PeerId,
request: Self::RequestType,
cx: &SyncNetworkContext<T>,
cx: &mut SyncNetworkContext<T>,
) -> Result<(), LookupRequestError> {
cx.block_lookup_request(id, peer_id, request)
.map_err(LookupRequestError::SendFailed)
}

fn verify_response_inner(
&mut self,
expected_block_root: Hash256,
response: Option<Self::ResponseType>,
) -> Result<Option<Arc<SignedBeaconBlock<T::EthSpec>>>, LookupVerifyError> {
match response {
Some(block) => {
// Compute the block root using this specific function so that we can get timing
// metrics.
let block_root = get_block_root(&block);
if block_root != expected_block_root {
// return an error and drop the block
// NOTE: we take this is as a download failure to prevent counting the
// attempt as a chain failure, but simply a peer failure.
Err(LookupVerifyError::RootMismatch)
} else {
// Return the block for processing.
Ok(Some(block))
}
}
None => Err(LookupVerifyError::NoBlockReturned),
}
}

fn get_parent_root(verified_response: &Arc<SignedBeaconBlock<T::EthSpec>>) -> Option<Hash256> {
Some(verified_response.parent_root())
}
Expand Down Expand Up @@ -340,60 +265,27 @@ impl<L: Lookup, T: BeaconChainTypes> RequestState<L, T> for BlockRequestState<L>
}

impl<L: Lookup, T: BeaconChainTypes> RequestState<L, T> for BlobRequestState<L, T::EthSpec> {
type RequestType = BlobsByRootRequest;
type ResponseType = Arc<BlobSidecar<T::EthSpec>>;
type RequestType = BlobsByRootSingleBlockRequest;
type VerifiedResponseType = FixedBlobSidecarList<T::EthSpec>;
type ReconstructedResponseType = FixedBlobSidecarList<T::EthSpec>;

fn new_request(&self, spec: &ChainSpec) -> BlobsByRootRequest {
let blob_id_vec: Vec<BlobIdentifier> = self.requested_ids.clone().into();
BlobsByRootRequest::new(blob_id_vec, spec)
fn new_request(&self) -> Self::RequestType {
BlobsByRootSingleBlockRequest {
block_root: self.block_root,
indices: self.requested_ids.indices(),
}
}

fn make_request(
id: SingleLookupReqId,
peer_id: PeerId,
request: Self::RequestType,
cx: &SyncNetworkContext<T>,
cx: &mut SyncNetworkContext<T>,
) -> Result<(), LookupRequestError> {
cx.blob_lookup_request(id, peer_id, request)
.map_err(LookupRequestError::SendFailed)
}

fn verify_response_inner(
&mut self,
expected_block_root: Hash256,
blob: Option<Self::ResponseType>,
) -> Result<Option<FixedBlobSidecarList<T::EthSpec>>, LookupVerifyError> {
match blob {
Some(blob) => {
let received_id = blob.id();

if !self.requested_ids.contains(&received_id) {
return Err(LookupVerifyError::UnrequestedBlobId(received_id));
}
if !blob.verify_blob_sidecar_inclusion_proof().unwrap_or(false) {
return Err(LookupVerifyError::InvalidInclusionProof);
}
if blob.block_root() != expected_block_root {
return Err(LookupVerifyError::UnrequestedHeader);
}

// State should remain downloading until we receive the stream terminator.
self.requested_ids.remove(&received_id);

// The inclusion proof check above ensures `blob.index` is < MAX_BLOBS_PER_BLOCK
let blob_index = blob.index;
*self.blob_download_queue.index_mut(blob_index as usize) = Some(blob);
Ok(None)
}
None => {
let blobs = std::mem::take(&mut self.blob_download_queue);
Ok(Some(blobs))
}
}
}

fn get_parent_root(verified_response: &FixedBlobSidecarList<T::EthSpec>) -> Option<Hash256> {
verified_response
.into_iter()
Expand Down
Loading
Loading