diff --git a/beacon_node/beacon_processor/src/lib.rs b/beacon_node/beacon_processor/src/lib.rs index f491dc7ffb0..68c33e99baf 100644 --- a/beacon_node/beacon_processor/src/lib.rs +++ b/beacon_node/beacon_processor/src/lib.rs @@ -119,6 +119,8 @@ pub struct BeaconProcessorQueueLengths { bbroots_queue: usize, blbroots_queue: usize, blbrange_queue: usize, + dcbroots_queue: usize, + dcbrange_queue: usize, gossip_bls_to_execution_change_queue: usize, lc_bootstrap_queue: usize, lc_optimistic_update_queue: usize, @@ -172,6 +174,9 @@ impl BeaconProcessorQueueLengths { bbroots_queue: 1024, blbroots_queue: 1024, blbrange_queue: 1024, + // TODO(das): pick proper values + dcbroots_queue: 1024, + dcbrange_queue: 1024, gossip_bls_to_execution_change_queue: 16384, lc_bootstrap_queue: 1024, lc_optimistic_update_queue: 512, @@ -230,6 +235,8 @@ pub const BLOCKS_BY_RANGE_REQUEST: &str = "blocks_by_range_request"; pub const BLOCKS_BY_ROOTS_REQUEST: &str = "blocks_by_roots_request"; pub const BLOBS_BY_RANGE_REQUEST: &str = "blobs_by_range_request"; pub const BLOBS_BY_ROOTS_REQUEST: &str = "blobs_by_roots_request"; +pub const DATA_COLUMNS_BY_ROOTS_REQUEST: &str = "data_columns_by_roots_request"; +pub const DATA_COLUMNS_BY_RANGE_REQUEST: &str = "data_columns_by_range_request"; pub const LIGHT_CLIENT_BOOTSTRAP_REQUEST: &str = "light_client_bootstrap"; pub const LIGHT_CLIENT_FINALITY_UPDATE_REQUEST: &str = "light_client_finality_update_request"; pub const LIGHT_CLIENT_OPTIMISTIC_UPDATE_REQUEST: &str = "light_client_optimistic_update_request"; @@ -609,6 +616,8 @@ pub enum Work { BlocksByRootsRequest(AsyncFn), BlobsByRangeRequest(BlockingFn), BlobsByRootsRequest(BlockingFn), + DataColumnsByRootsRequest(BlockingFn), + DataColumnsByRangeRequest(BlockingFn), GossipBlsToExecutionChange(BlockingFn), LightClientBootstrapRequest(BlockingFn), LightClientOptimisticUpdateRequest(BlockingFn), @@ -652,6 +661,8 @@ impl Work { Work::BlocksByRootsRequest(_) => BLOCKS_BY_ROOTS_REQUEST, Work::BlobsByRangeRequest(_) => BLOBS_BY_RANGE_REQUEST, Work::BlobsByRootsRequest(_) => BLOBS_BY_ROOTS_REQUEST, + Work::DataColumnsByRootsRequest(_) => DATA_COLUMNS_BY_ROOTS_REQUEST, + Work::DataColumnsByRangeRequest(_) => DATA_COLUMNS_BY_RANGE_REQUEST, Work::LightClientBootstrapRequest(_) => LIGHT_CLIENT_BOOTSTRAP_REQUEST, Work::LightClientOptimisticUpdateRequest(_) => LIGHT_CLIENT_OPTIMISTIC_UPDATE_REQUEST, Work::LightClientFinalityUpdateRequest(_) => LIGHT_CLIENT_FINALITY_UPDATE_REQUEST, @@ -816,6 +827,8 @@ impl BeaconProcessor { let mut bbroots_queue = FifoQueue::new(queue_lengths.bbroots_queue); let mut blbroots_queue = FifoQueue::new(queue_lengths.blbroots_queue); let mut blbrange_queue = FifoQueue::new(queue_lengths.blbrange_queue); + let mut dcbroots_queue = FifoQueue::new(queue_lengths.dcbroots_queue); + let mut dcbrange_queue = FifoQueue::new(queue_lengths.dcbrange_queue); let mut gossip_bls_to_execution_change_queue = FifoQueue::new(queue_lengths.gossip_bls_to_execution_change_queue); @@ -1118,6 +1131,10 @@ impl BeaconProcessor { self.spawn_worker(item, idle_tx); } else if let Some(item) = blbroots_queue.pop() { self.spawn_worker(item, idle_tx); + } else if let Some(item) = dcbroots_queue.pop() { + self.spawn_worker(item, idle_tx); + } else if let Some(item) = dcbrange_queue.pop() { + self.spawn_worker(item, idle_tx); // Check slashings after all other consensus messages so we prioritize // following head. // @@ -1282,6 +1299,12 @@ impl BeaconProcessor { Work::BlobsByRootsRequest { .. } => { blbroots_queue.push(work, work_id, &self.log) } + Work::DataColumnsByRootsRequest { .. } => { + dcbroots_queue.push(work, work_id, &self.log) + } + Work::DataColumnsByRangeRequest { .. } => { + dcbrange_queue.push(work, work_id, &self.log) + } Work::UnknownLightClientOptimisticUpdate { .. } => { unknown_light_client_update_queue.push(work, work_id, &self.log) } @@ -1483,7 +1506,10 @@ impl BeaconProcessor { | Work::GossipDataColumnSidecar(work) => task_spawner.spawn_async(async move { work.await; }), - Work::BlobsByRangeRequest(process_fn) | Work::BlobsByRootsRequest(process_fn) => { + Work::BlobsByRangeRequest(process_fn) + | Work::BlobsByRootsRequest(process_fn) + | Work::DataColumnsByRootsRequest(process_fn) + | Work::DataColumnsByRangeRequest(process_fn) => { task_spawner.spawn_blocking(process_fn) } Work::BlocksByRangeRequest(work) | Work::BlocksByRootsRequest(work) => { diff --git a/beacon_node/lighthouse_network/src/peer_manager/mod.rs b/beacon_node/lighthouse_network/src/peer_manager/mod.rs index 6423da56fe2..4c9551507e7 100644 --- a/beacon_node/lighthouse_network/src/peer_manager/mod.rs +++ b/beacon_node/lighthouse_network/src/peer_manager/mod.rs @@ -569,6 +569,8 @@ impl PeerManager { Protocol::LightClientOptimisticUpdate => return, Protocol::LightClientFinalityUpdate => return, Protocol::BlobsByRoot => PeerAction::MidToleranceError, + Protocol::DataColumnsByRoot => PeerAction::MidToleranceError, + Protocol::DataColumnsByRange => PeerAction::MidToleranceError, Protocol::Goodbye => PeerAction::LowToleranceError, Protocol::MetaData => PeerAction::LowToleranceError, Protocol::Status => PeerAction::LowToleranceError, @@ -587,6 +589,8 @@ impl PeerManager { Protocol::BlocksByRoot => return, Protocol::BlobsByRange => return, Protocol::BlobsByRoot => return, + Protocol::DataColumnsByRoot => return, + Protocol::DataColumnsByRange => return, Protocol::Goodbye => return, Protocol::LightClientBootstrap => return, Protocol::LightClientOptimisticUpdate => return, @@ -607,6 +611,8 @@ impl PeerManager { Protocol::BlocksByRoot => PeerAction::MidToleranceError, Protocol::BlobsByRange => PeerAction::MidToleranceError, Protocol::BlobsByRoot => PeerAction::MidToleranceError, + Protocol::DataColumnsByRoot => PeerAction::MidToleranceError, + Protocol::DataColumnsByRange => PeerAction::MidToleranceError, Protocol::LightClientBootstrap => return, Protocol::LightClientOptimisticUpdate => return, Protocol::LightClientFinalityUpdate => return, diff --git a/beacon_node/lighthouse_network/src/rpc/codec/ssz_snappy.rs b/beacon_node/lighthouse_network/src/rpc/codec/ssz_snappy.rs index 482d1d96b4a..f5d8b58dcee 100644 --- a/beacon_node/lighthouse_network/src/rpc/codec/ssz_snappy.rs +++ b/beacon_node/lighthouse_network/src/rpc/codec/ssz_snappy.rs @@ -16,10 +16,11 @@ use std::marker::PhantomData; use std::sync::Arc; use tokio_util::codec::{Decoder, Encoder}; use types::{ - BlobSidecar, ChainSpec, EthSpec, ForkContext, ForkName, Hash256, LightClientBootstrap, - LightClientFinalityUpdate, LightClientOptimisticUpdate, RuntimeVariableList, SignedBeaconBlock, - SignedBeaconBlockAltair, SignedBeaconBlockBase, SignedBeaconBlockBellatrix, - SignedBeaconBlockCapella, SignedBeaconBlockDeneb, SignedBeaconBlockElectra, + BlobSidecar, ChainSpec, DataColumnSidecar, EthSpec, ForkContext, ForkName, Hash256, + LightClientBootstrap, LightClientFinalityUpdate, LightClientOptimisticUpdate, + RuntimeVariableList, SignedBeaconBlock, SignedBeaconBlockAltair, SignedBeaconBlockBase, + SignedBeaconBlockBellatrix, SignedBeaconBlockCapella, SignedBeaconBlockDeneb, + SignedBeaconBlockElectra, }; use unsigned_varint::codec::Uvi; @@ -70,6 +71,8 @@ impl Encoder> for SSZSnappyInboundCodec { RPCResponse::BlocksByRoot(res) => res.as_ssz_bytes(), RPCResponse::BlobsByRange(res) => res.as_ssz_bytes(), RPCResponse::BlobsByRoot(res) => res.as_ssz_bytes(), + RPCResponse::DataColumnsByRoot(res) => res.as_ssz_bytes(), + RPCResponse::DataColumnsByRange(res) => res.as_ssz_bytes(), RPCResponse::LightClientBootstrap(res) => res.as_ssz_bytes(), RPCResponse::LightClientOptimisticUpdate(res) => res.as_ssz_bytes(), RPCResponse::LightClientFinalityUpdate(res) => res.as_ssz_bytes(), @@ -224,6 +227,8 @@ impl Encoder> for SSZSnappyOutboundCodec { }, OutboundRequest::BlobsByRange(req) => req.as_ssz_bytes(), OutboundRequest::BlobsByRoot(req) => req.blob_ids.as_ssz_bytes(), + OutboundRequest::DataColumnsByRange(req) => req.as_ssz_bytes(), + OutboundRequest::DataColumnsByRoot(req) => req.data_column_ids.as_ssz_bytes(), OutboundRequest::Ping(req) => req.as_ssz_bytes(), OutboundRequest::MetaData(_) => return Ok(()), // no metadata to encode }; @@ -414,7 +419,12 @@ fn context_bytes( } }; } - RPCResponse::BlobsByRange(_) | RPCResponse::BlobsByRoot(_) => { + RPCResponse::BlobsByRange(_) + | RPCResponse::BlobsByRoot(_) + | RPCResponse::DataColumnsByRoot(_) + | RPCResponse::DataColumnsByRange(_) => { + // TODO(das): If DataColumnSidecar is defined as an Electra type, update the + // context bytes to point to ForkName::Electra return fork_context.to_context_bytes(ForkName::Deneb); } RPCResponse::LightClientBootstrap(lc_bootstrap) => { @@ -512,6 +522,17 @@ fn handle_rpc_request( )?, }))) } + SupportedProtocol::DataColumnsByRootV1 => Ok(Some(InboundRequest::DataColumnsByRoot( + DataColumnsByRootRequest { + data_column_ids: RuntimeVariableList::from_ssz_bytes( + decoded_buffer, + spec.max_request_data_column_sidecars as usize, + )?, + }, + ))), + SupportedProtocol::DataColumnsByRangeV1 => Ok(Some(InboundRequest::DataColumnsByRange( + DataColumnsByRangeRequest::from_ssz_bytes(decoded_buffer)?, + ))), SupportedProtocol::PingV1 => Ok(Some(InboundRequest::Ping(Ping { data: u64::from_ssz_bytes(decoded_buffer)?, }))), @@ -604,6 +625,51 @@ fn handle_rpc_response( ), )), }, + SupportedProtocol::DataColumnsByRootV1 => match fork_name { + Some(fork_name) => { + // TODO(das): PeerDAS is currently supported for both deneb and electra. This check + // does not advertise the topic on deneb, simply allows it to decode it. Advertise + // logic is in `SupportedTopic::currently_supported`. + if fork_name.deneb_enabled() { + Ok(Some(RPCResponse::DataColumnsByRoot(Arc::new( + DataColumnSidecar::from_ssz_bytes(decoded_buffer)?, + )))) + } else { + Err(RPCError::ErrorResponse( + RPCResponseErrorCode::InvalidRequest, + "Invalid fork name for data columns by root".to_string(), + )) + } + } + None => Err(RPCError::ErrorResponse( + RPCResponseErrorCode::InvalidRequest, + format!( + "No context bytes provided for {:?} response", + versioned_protocol + ), + )), + }, + SupportedProtocol::DataColumnsByRangeV1 => match fork_name { + Some(fork_name) => { + if fork_name.deneb_enabled() { + Ok(Some(RPCResponse::DataColumnsByRange(Arc::new( + DataColumnSidecar::from_ssz_bytes(decoded_buffer)?, + )))) + } else { + Err(RPCError::ErrorResponse( + RPCResponseErrorCode::InvalidRequest, + "Invalid fork name for data columns by range".to_string(), + )) + } + } + None => Err(RPCError::ErrorResponse( + RPCResponseErrorCode::InvalidRequest, + format!( + "No context bytes provided for {:?} response", + versioned_protocol + ), + )), + }, SupportedProtocol::PingV1 => Ok(Some(RPCResponse::Pong(Ping { data: u64::from_ssz_bytes(decoded_buffer)?, }))), @@ -747,7 +813,8 @@ mod tests { use crate::types::{EnrAttestationBitfield, EnrSyncCommitteeBitfield}; use types::{ blob_sidecar::BlobIdentifier, BeaconBlock, BeaconBlockAltair, BeaconBlockBase, - BeaconBlockBellatrix, EmptyBlock, Epoch, FullPayload, Signature, Slot, + BeaconBlockBellatrix, DataColumnIdentifier, EmptyBlock, Epoch, FullPayload, Signature, + Slot, }; type Spec = types::MainnetEthSpec; @@ -794,6 +861,10 @@ mod tests { Arc::new(BlobSidecar::empty()) } + fn empty_data_column_sidecar() -> Arc> { + Arc::new(DataColumnSidecar::empty()) + } + /// Bellatrix block with length < max_rpc_size. fn bellatrix_block_small( fork_context: &ForkContext, @@ -855,6 +926,27 @@ mod tests { } } + fn dcbrange_request() -> DataColumnsByRangeRequest { + DataColumnsByRangeRequest { + start_slot: 0, + count: 10, + columns: vec![1, 2, 3], + } + } + + fn dcbroot_request(spec: &ChainSpec) -> DataColumnsByRootRequest { + DataColumnsByRootRequest { + data_column_ids: RuntimeVariableList::new( + vec![DataColumnIdentifier { + block_root: Hash256::zero(), + index: 0, + }], + spec.max_request_data_column_sidecars as usize, + ) + .unwrap(), + } + } + fn bbroot_request_v1(spec: &ChainSpec) -> BlocksByRootRequest { BlocksByRootRequest::new_v1(vec![Hash256::zero()], spec) } @@ -1012,6 +1104,12 @@ mod tests { OutboundRequest::BlobsByRoot(bbroot) => { assert_eq!(decoded, InboundRequest::BlobsByRoot(bbroot)) } + OutboundRequest::DataColumnsByRoot(dcbroot) => { + assert_eq!(decoded, InboundRequest::DataColumnsByRoot(dcbroot)) + } + OutboundRequest::DataColumnsByRange(dcbrange) => { + assert_eq!(decoded, InboundRequest::DataColumnsByRange(dcbrange)) + } OutboundRequest::Ping(ping) => { assert_eq!(decoded, InboundRequest::Ping(ping)) } @@ -1138,6 +1236,34 @@ mod tests { ), Ok(Some(RPCResponse::BlobsByRoot(empty_blob_sidecar()))), ); + + assert_eq!( + encode_then_decode_response( + SupportedProtocol::DataColumnsByRangeV1, + RPCCodedResponse::Success(RPCResponse::DataColumnsByRange( + empty_data_column_sidecar() + )), + ForkName::Deneb, + &chain_spec + ), + Ok(Some(RPCResponse::DataColumnsByRange( + empty_data_column_sidecar() + ))), + ); + + assert_eq!( + encode_then_decode_response( + SupportedProtocol::DataColumnsByRootV1, + RPCCodedResponse::Success(RPCResponse::DataColumnsByRoot( + empty_data_column_sidecar() + )), + ForkName::Deneb, + &chain_spec + ), + Ok(Some(RPCResponse::DataColumnsByRoot( + empty_data_column_sidecar() + ))), + ); } // Test RPCResponse encoding/decoding for V1 messages @@ -1491,6 +1617,8 @@ mod tests { OutboundRequest::MetaData(MetadataRequest::new_v1()), OutboundRequest::BlobsByRange(blbrange_request()), OutboundRequest::BlobsByRoot(blbroot_request(&chain_spec)), + OutboundRequest::DataColumnsByRange(dcbrange_request()), + OutboundRequest::DataColumnsByRoot(dcbroot_request(&chain_spec)), OutboundRequest::MetaData(MetadataRequest::new_v2()), ]; diff --git a/beacon_node/lighthouse_network/src/rpc/config.rs b/beacon_node/lighthouse_network/src/rpc/config.rs index d17fa112a1b..7ff189b9815 100644 --- a/beacon_node/lighthouse_network/src/rpc/config.rs +++ b/beacon_node/lighthouse_network/src/rpc/config.rs @@ -91,6 +91,8 @@ pub struct RateLimiterConfig { pub(super) blocks_by_root_quota: Quota, pub(super) blobs_by_range_quota: Quota, pub(super) blobs_by_root_quota: Quota, + pub(super) data_columns_by_root_quota: Quota, + pub(super) data_columns_by_range_quota: Quota, pub(super) light_client_bootstrap_quota: Quota, pub(super) light_client_optimistic_update_quota: Quota, pub(super) light_client_finality_update_quota: Quota, @@ -110,6 +112,12 @@ impl RateLimiterConfig { // measured against the maximum request size. pub const DEFAULT_BLOBS_BY_RANGE_QUOTA: Quota = Quota::n_every(6144, 10); pub const DEFAULT_BLOBS_BY_ROOT_QUOTA: Quota = Quota::n_every(768, 10); + // 320 blocks worth of columns for regular node, or 40 blocks for supernode. + // Range sync load balances when requesting blocks, and each batch is 32 blocks. + pub const DEFAULT_DATA_COLUMNS_BY_RANGE_QUOTA: Quota = Quota::n_every(5120, 10); + // 512 columns per request from spec. This should be plenty as peers are unlikely to send all + // sampling requests to a single peer. + pub const DEFAULT_DATA_COLUMNS_BY_ROOT_QUOTA: Quota = Quota::n_every(512, 10); pub const DEFAULT_LIGHT_CLIENT_BOOTSTRAP_QUOTA: Quota = Quota::one_every(10); pub const DEFAULT_LIGHT_CLIENT_OPTIMISTIC_UPDATE_QUOTA: Quota = Quota::one_every(10); pub const DEFAULT_LIGHT_CLIENT_FINALITY_UPDATE_QUOTA: Quota = Quota::one_every(10); @@ -126,6 +134,8 @@ impl Default for RateLimiterConfig { blocks_by_root_quota: Self::DEFAULT_BLOCKS_BY_ROOT_QUOTA, blobs_by_range_quota: Self::DEFAULT_BLOBS_BY_RANGE_QUOTA, blobs_by_root_quota: Self::DEFAULT_BLOBS_BY_ROOT_QUOTA, + data_columns_by_root_quota: Self::DEFAULT_DATA_COLUMNS_BY_ROOT_QUOTA, + data_columns_by_range_quota: Self::DEFAULT_DATA_COLUMNS_BY_RANGE_QUOTA, light_client_bootstrap_quota: Self::DEFAULT_LIGHT_CLIENT_BOOTSTRAP_QUOTA, light_client_optimistic_update_quota: Self::DEFAULT_LIGHT_CLIENT_OPTIMISTIC_UPDATE_QUOTA, @@ -175,6 +185,8 @@ impl FromStr for RateLimiterConfig { let mut blocks_by_root_quota = None; let mut blobs_by_range_quota = None; let mut blobs_by_root_quota = None; + let mut data_columns_by_root_quota = None; + let mut data_columns_by_range_quota = None; let mut light_client_bootstrap_quota = None; let mut light_client_optimistic_update_quota = None; let mut light_client_finality_update_quota = None; @@ -189,6 +201,12 @@ impl FromStr for RateLimiterConfig { Protocol::BlocksByRoot => blocks_by_root_quota = blocks_by_root_quota.or(quota), Protocol::BlobsByRange => blobs_by_range_quota = blobs_by_range_quota.or(quota), Protocol::BlobsByRoot => blobs_by_root_quota = blobs_by_root_quota.or(quota), + Protocol::DataColumnsByRoot => { + data_columns_by_root_quota = data_columns_by_root_quota.or(quota) + } + Protocol::DataColumnsByRange => { + data_columns_by_range_quota = data_columns_by_range_quota.or(quota) + } Protocol::Ping => ping_quota = ping_quota.or(quota), Protocol::MetaData => meta_data_quota = meta_data_quota.or(quota), Protocol::LightClientBootstrap => { @@ -216,6 +234,10 @@ impl FromStr for RateLimiterConfig { blobs_by_range_quota: blobs_by_range_quota .unwrap_or(Self::DEFAULT_BLOBS_BY_RANGE_QUOTA), blobs_by_root_quota: blobs_by_root_quota.unwrap_or(Self::DEFAULT_BLOBS_BY_ROOT_QUOTA), + data_columns_by_root_quota: data_columns_by_root_quota + .unwrap_or(Self::DEFAULT_DATA_COLUMNS_BY_ROOT_QUOTA), + data_columns_by_range_quota: data_columns_by_range_quota + .unwrap_or(Self::DEFAULT_DATA_COLUMNS_BY_RANGE_QUOTA), light_client_bootstrap_quota: light_client_bootstrap_quota .unwrap_or(Self::DEFAULT_LIGHT_CLIENT_BOOTSTRAP_QUOTA), light_client_optimistic_update_quota: light_client_optimistic_update_quota diff --git a/beacon_node/lighthouse_network/src/rpc/methods.rs b/beacon_node/lighthouse_network/src/rpc/methods.rs index 1b0486ff771..8849a5433d4 100644 --- a/beacon_node/lighthouse_network/src/rpc/methods.rs +++ b/beacon_node/lighthouse_network/src/rpc/methods.rs @@ -14,9 +14,9 @@ use strum::IntoStaticStr; use superstruct::superstruct; use types::blob_sidecar::BlobIdentifier; use types::{ - blob_sidecar::BlobSidecar, ChainSpec, Epoch, EthSpec, Hash256, LightClientBootstrap, - LightClientFinalityUpdate, LightClientOptimisticUpdate, RuntimeVariableList, SignedBeaconBlock, - Slot, + blob_sidecar::BlobSidecar, ChainSpec, ColumnIndex, DataColumnIdentifier, DataColumnSidecar, + Epoch, EthSpec, Hash256, LightClientBootstrap, LightClientFinalityUpdate, + LightClientOptimisticUpdate, RuntimeVariableList, SignedBeaconBlock, Slot, }; /// Maximum length of error message. @@ -293,6 +293,43 @@ impl BlobsByRangeRequest { } } +/// Request a number of beacon data columns from a peer. +#[derive(Encode, Decode, Clone, Debug, PartialEq)] +pub struct DataColumnsByRangeRequest { + /// The starting slot to request data columns. + pub start_slot: u64, + /// The number of slots from the start slot. + pub count: u64, + /// The list column indices being requested. + pub columns: Vec, +} + +impl DataColumnsByRangeRequest { + pub fn max_requested(&self) -> u64 { + self.count.saturating_mul(self.columns.len() as u64) + } + + pub fn ssz_min_len() -> usize { + DataColumnsByRangeRequest { + start_slot: 0, + count: 0, + columns: vec![0], + } + .as_ssz_bytes() + .len() + } + + pub fn ssz_max_len(spec: &ChainSpec) -> usize { + DataColumnsByRangeRequest { + start_slot: 0, + count: 0, + columns: vec![0; spec.number_of_columns], + } + .as_ssz_bytes() + .len() + } +} + /// Request a number of beacon block roots from a peer. #[superstruct( variants(V1, V2), @@ -370,6 +407,27 @@ impl BlobsByRootRequest { } } +/// Request a number of data columns from a peer. +#[derive(Clone, Debug, PartialEq)] +pub struct DataColumnsByRootRequest { + /// The list of beacon block roots and column indices being requested. + pub data_column_ids: RuntimeVariableList, +} + +impl DataColumnsByRootRequest { + pub fn new(data_column_ids: Vec, spec: &ChainSpec) -> Self { + let data_column_ids = RuntimeVariableList::from_vec( + data_column_ids, + spec.max_request_data_column_sidecars as usize, + ); + Self { data_column_ids } + } + + pub fn new_single(block_root: Hash256, index: ColumnIndex, spec: &ChainSpec) -> Self { + Self::new(vec![DataColumnIdentifier { block_root, index }], spec) + } +} + /* RPC Handling and Grouping */ // Collection of enums and structs used by the Codecs to encode/decode RPC messages @@ -400,6 +458,12 @@ pub enum RPCResponse { /// A response to a get BLOBS_BY_ROOT request. BlobsByRoot(Arc>), + /// A response to a get DATA_COLUMN_SIDECARS_BY_ROOT request. + DataColumnsByRoot(Arc>), + + /// A response to a get DATA_COLUMN_SIDECARS_BY_RANGE request. + DataColumnsByRange(Arc>), + /// A PONG response to a PING request. Pong(Ping), @@ -421,6 +485,12 @@ pub enum ResponseTermination { /// Blobs by root stream termination. BlobsByRoot, + + /// Data column sidecars by root stream termination. + DataColumnsByRoot, + + /// Data column sidecars by range stream termination. + DataColumnsByRange, } /// The structured response containing a result/code indicating success or failure @@ -511,6 +581,8 @@ impl RPCResponse { RPCResponse::BlocksByRoot(_) => Protocol::BlocksByRoot, RPCResponse::BlobsByRange(_) => Protocol::BlobsByRange, RPCResponse::BlobsByRoot(_) => Protocol::BlobsByRoot, + RPCResponse::DataColumnsByRoot(_) => Protocol::DataColumnsByRoot, + RPCResponse::DataColumnsByRange(_) => Protocol::DataColumnsByRange, RPCResponse::Pong(_) => Protocol::Ping, RPCResponse::MetaData(_) => Protocol::MetaData, RPCResponse::LightClientBootstrap(_) => Protocol::LightClientBootstrap, @@ -556,6 +628,16 @@ impl std::fmt::Display for RPCResponse { RPCResponse::BlobsByRoot(sidecar) => { write!(f, "BlobsByRoot: Blob slot: {}", sidecar.slot()) } + RPCResponse::DataColumnsByRoot(sidecar) => { + write!(f, "DataColumnsByRoot: Data column slot: {}", sidecar.slot()) + } + RPCResponse::DataColumnsByRange(sidecar) => { + write!( + f, + "DataColumnsByRange: Data column slot: {}", + sidecar.slot() + ) + } RPCResponse::Pong(ping) => write!(f, "Pong: {}", ping.data), RPCResponse::MetaData(metadata) => write!(f, "Metadata: {}", metadata.seq_number()), RPCResponse::LightClientBootstrap(bootstrap) => { diff --git a/beacon_node/lighthouse_network/src/rpc/mod.rs b/beacon_node/lighthouse_network/src/rpc/mod.rs index 027af89edfa..666cbe6fbcc 100644 --- a/beacon_node/lighthouse_network/src/rpc/mod.rs +++ b/beacon_node/lighthouse_network/src/rpc/mod.rs @@ -471,6 +471,8 @@ where ResponseTermination::BlocksByRoot => Protocol::BlocksByRoot, ResponseTermination::BlobsByRange => Protocol::BlobsByRange, ResponseTermination::BlobsByRoot => Protocol::BlobsByRoot, + ResponseTermination::DataColumnsByRoot => Protocol::DataColumnsByRoot, + ResponseTermination::DataColumnsByRange => Protocol::DataColumnsByRange, }, ), }; diff --git a/beacon_node/lighthouse_network/src/rpc/outbound.rs b/beacon_node/lighthouse_network/src/rpc/outbound.rs index 8ea7b84bc95..7752d27e759 100644 --- a/beacon_node/lighthouse_network/src/rpc/outbound.rs +++ b/beacon_node/lighthouse_network/src/rpc/outbound.rs @@ -36,6 +36,8 @@ pub enum OutboundRequest { BlocksByRoot(BlocksByRootRequest), BlobsByRange(BlobsByRangeRequest), BlobsByRoot(BlobsByRootRequest), + DataColumnsByRoot(DataColumnsByRootRequest), + DataColumnsByRange(DataColumnsByRangeRequest), Ping(Ping), MetaData(MetadataRequest), } @@ -79,6 +81,14 @@ impl OutboundRequest { SupportedProtocol::BlobsByRootV1, Encoding::SSZSnappy, )], + OutboundRequest::DataColumnsByRoot(_) => vec![ProtocolId::new( + SupportedProtocol::DataColumnsByRootV1, + Encoding::SSZSnappy, + )], + OutboundRequest::DataColumnsByRange(_) => vec![ProtocolId::new( + SupportedProtocol::DataColumnsByRangeV1, + Encoding::SSZSnappy, + )], OutboundRequest::Ping(_) => vec![ProtocolId::new( SupportedProtocol::PingV1, Encoding::SSZSnappy, @@ -100,6 +110,8 @@ impl OutboundRequest { OutboundRequest::BlocksByRoot(req) => req.block_roots().len() as u64, OutboundRequest::BlobsByRange(req) => req.max_blobs_requested::(), OutboundRequest::BlobsByRoot(req) => req.blob_ids.len() as u64, + OutboundRequest::DataColumnsByRoot(req) => req.data_column_ids.len() as u64, + OutboundRequest::DataColumnsByRange(req) => req.max_requested::(), OutboundRequest::Ping(_) => 1, OutboundRequest::MetaData(_) => 1, } @@ -113,6 +125,8 @@ impl OutboundRequest { OutboundRequest::BlocksByRoot(_) => false, OutboundRequest::BlobsByRange(_) => false, OutboundRequest::BlobsByRoot(_) => false, + OutboundRequest::DataColumnsByRoot(_) => false, + OutboundRequest::DataColumnsByRange(_) => false, OutboundRequest::Ping(_) => true, OutboundRequest::MetaData(_) => true, } @@ -133,6 +147,8 @@ impl OutboundRequest { }, OutboundRequest::BlobsByRange(_) => SupportedProtocol::BlobsByRangeV1, OutboundRequest::BlobsByRoot(_) => SupportedProtocol::BlobsByRootV1, + OutboundRequest::DataColumnsByRoot(_) => SupportedProtocol::DataColumnsByRootV1, + OutboundRequest::DataColumnsByRange(_) => SupportedProtocol::DataColumnsByRangeV1, OutboundRequest::Ping(_) => SupportedProtocol::PingV1, OutboundRequest::MetaData(req) => match req { MetadataRequest::V1(_) => SupportedProtocol::MetaDataV1, @@ -151,6 +167,8 @@ impl OutboundRequest { OutboundRequest::BlocksByRoot(_) => ResponseTermination::BlocksByRoot, OutboundRequest::BlobsByRange(_) => ResponseTermination::BlobsByRange, OutboundRequest::BlobsByRoot(_) => ResponseTermination::BlobsByRoot, + OutboundRequest::DataColumnsByRoot(_) => ResponseTermination::DataColumnsByRoot, + OutboundRequest::DataColumnsByRange(_) => ResponseTermination::DataColumnsByRange, OutboundRequest::Status(_) => unreachable!(), OutboundRequest::Goodbye(_) => unreachable!(), OutboundRequest::Ping(_) => unreachable!(), @@ -208,6 +226,10 @@ impl std::fmt::Display for OutboundRequest { OutboundRequest::BlocksByRoot(req) => write!(f, "Blocks by root: {:?}", req), OutboundRequest::BlobsByRange(req) => write!(f, "Blobs by range: {:?}", req), OutboundRequest::BlobsByRoot(req) => write!(f, "Blobs by root: {:?}", req), + OutboundRequest::DataColumnsByRoot(req) => write!(f, "Data columns by root: {:?}", req), + OutboundRequest::DataColumnsByRange(req) => { + write!(f, "Data columns by range: {:?}", req) + } OutboundRequest::Ping(ping) => write!(f, "Ping: {}", ping.data), OutboundRequest::MetaData(_) => write!(f, "MetaData request"), } diff --git a/beacon_node/lighthouse_network/src/rpc/protocol.rs b/beacon_node/lighthouse_network/src/rpc/protocol.rs index 2cdd730a2b0..6f7f0348345 100644 --- a/beacon_node/lighthouse_network/src/rpc/protocol.rs +++ b/beacon_node/lighthouse_network/src/rpc/protocol.rs @@ -18,10 +18,10 @@ use tokio_util::{ }; use types::{ BeaconBlock, BeaconBlockAltair, BeaconBlockBase, BeaconBlockCapella, BeaconBlockElectra, - BlobSidecar, ChainSpec, EmptyBlock, EthSpec, ForkContext, ForkName, LightClientBootstrap, - LightClientBootstrapAltair, LightClientFinalityUpdate, LightClientFinalityUpdateAltair, - LightClientOptimisticUpdate, LightClientOptimisticUpdateAltair, MainnetEthSpec, Signature, - SignedBeaconBlock, + BlobSidecar, ChainSpec, DataColumnSidecar, EmptyBlock, EthSpec, ForkContext, ForkName, + LightClientBootstrap, LightClientBootstrapAltair, LightClientFinalityUpdate, + LightClientFinalityUpdateAltair, LightClientOptimisticUpdate, + LightClientOptimisticUpdateAltair, MainnetEthSpec, Signature, SignedBeaconBlock, }; // Note: Hardcoding the `EthSpec` type for `SignedBeaconBlock` as min/max values is @@ -268,6 +268,12 @@ pub enum Protocol { /// The `BlobsByRoot` protocol name. #[strum(serialize = "blob_sidecars_by_root")] BlobsByRoot, + /// The `DataColumnSidecarsByRoot` protocol name. + #[strum(serialize = "data_column_sidecars_by_root")] + DataColumnsByRoot, + /// The `DataColumnSidecarsByRange` protocol name. + #[strum(serialize = "data_column_sidecars_by_range")] + DataColumnsByRange, /// The `Ping` protocol name. Ping, /// The `MetaData` protocol name. @@ -293,6 +299,8 @@ impl Protocol { Protocol::BlocksByRoot => Some(ResponseTermination::BlocksByRoot), Protocol::BlobsByRange => Some(ResponseTermination::BlobsByRange), Protocol::BlobsByRoot => Some(ResponseTermination::BlobsByRoot), + Protocol::DataColumnsByRoot => Some(ResponseTermination::DataColumnsByRoot), + Protocol::DataColumnsByRange => Some(ResponseTermination::DataColumnsByRange), Protocol::Ping => None, Protocol::MetaData => None, Protocol::LightClientBootstrap => None, @@ -319,6 +327,8 @@ pub enum SupportedProtocol { BlocksByRootV2, BlobsByRangeV1, BlobsByRootV1, + DataColumnsByRootV1, + DataColumnsByRangeV1, PingV1, MetaDataV1, MetaDataV2, @@ -338,6 +348,8 @@ impl SupportedProtocol { SupportedProtocol::BlocksByRootV2 => "2", SupportedProtocol::BlobsByRangeV1 => "1", SupportedProtocol::BlobsByRootV1 => "1", + SupportedProtocol::DataColumnsByRootV1 => "1", + SupportedProtocol::DataColumnsByRangeV1 => "1", SupportedProtocol::PingV1 => "1", SupportedProtocol::MetaDataV1 => "1", SupportedProtocol::MetaDataV2 => "2", @@ -357,6 +369,8 @@ impl SupportedProtocol { SupportedProtocol::BlocksByRootV2 => Protocol::BlocksByRoot, SupportedProtocol::BlobsByRangeV1 => Protocol::BlobsByRange, SupportedProtocol::BlobsByRootV1 => Protocol::BlobsByRoot, + SupportedProtocol::DataColumnsByRootV1 => Protocol::DataColumnsByRoot, + SupportedProtocol::DataColumnsByRangeV1 => Protocol::DataColumnsByRange, SupportedProtocol::PingV1 => Protocol::Ping, SupportedProtocol::MetaDataV1 => Protocol::MetaData, SupportedProtocol::MetaDataV2 => Protocol::MetaData, @@ -387,6 +401,12 @@ impl SupportedProtocol { ProtocolId::new(SupportedProtocol::BlobsByRangeV1, Encoding::SSZSnappy), ]); } + if fork_context.spec.is_peer_das_scheduled() { + supported.extend_from_slice(&[ + ProtocolId::new(SupportedProtocol::DataColumnsByRootV1, Encoding::SSZSnappy), + ProtocolId::new(SupportedProtocol::DataColumnsByRangeV1, Encoding::SSZSnappy), + ]); + } supported } } @@ -495,6 +515,11 @@ impl ProtocolId { ::ssz_fixed_len(), ), Protocol::BlobsByRoot => RpcLimits::new(0, spec.max_blobs_by_root_request), + Protocol::DataColumnsByRoot => RpcLimits::new(0, spec.max_data_columns_by_root_request), + Protocol::DataColumnsByRange => RpcLimits::new( + DataColumnsByRangeRequest::ssz_min_len(), + DataColumnsByRangeRequest::ssz_max_len(spec), + ), Protocol::Ping => RpcLimits::new( ::ssz_fixed_len(), ::ssz_fixed_len(), @@ -521,6 +546,8 @@ impl ProtocolId { Protocol::BlocksByRoot => rpc_block_limits_by_fork(fork_context.current_fork()), Protocol::BlobsByRange => rpc_blob_limits::(), Protocol::BlobsByRoot => rpc_blob_limits::(), + Protocol::DataColumnsByRoot => rpc_data_column_limits::(), + Protocol::DataColumnsByRange => rpc_data_column_limits::(), Protocol::Ping => RpcLimits::new( ::ssz_fixed_len(), ::ssz_fixed_len(), @@ -549,6 +576,8 @@ impl ProtocolId { | SupportedProtocol::BlocksByRootV2 | SupportedProtocol::BlobsByRangeV1 | SupportedProtocol::BlobsByRootV1 + | SupportedProtocol::DataColumnsByRootV1 + | SupportedProtocol::DataColumnsByRangeV1 | SupportedProtocol::LightClientBootstrapV1 | SupportedProtocol::LightClientOptimisticUpdateV1 | SupportedProtocol::LightClientFinalityUpdateV1 => true, @@ -589,6 +618,13 @@ pub fn rpc_blob_limits() -> RpcLimits { ) } +pub fn rpc_data_column_limits() -> RpcLimits { + RpcLimits::new( + DataColumnSidecar::::empty().as_ssz_bytes().len(), + DataColumnSidecar::::max_size(), + ) +} + /* Inbound upgrade */ // The inbound protocol reads the request, decodes it and returns the stream to the protocol @@ -668,6 +704,8 @@ pub enum InboundRequest { BlocksByRoot(BlocksByRootRequest), BlobsByRange(BlobsByRangeRequest), BlobsByRoot(BlobsByRootRequest), + DataColumnsByRoot(DataColumnsByRootRequest), + DataColumnsByRange(DataColumnsByRangeRequest), LightClientBootstrap(LightClientBootstrapRequest), LightClientOptimisticUpdate, LightClientFinalityUpdate, @@ -688,6 +726,8 @@ impl InboundRequest { InboundRequest::BlocksByRoot(req) => req.block_roots().len() as u64, InboundRequest::BlobsByRange(req) => req.max_blobs_requested::(), InboundRequest::BlobsByRoot(req) => req.blob_ids.len() as u64, + InboundRequest::DataColumnsByRoot(req) => req.data_column_ids.len() as u64, + InboundRequest::DataColumnsByRange(req) => req.max_requested::(), InboundRequest::Ping(_) => 1, InboundRequest::MetaData(_) => 1, InboundRequest::LightClientBootstrap(_) => 1, @@ -711,6 +751,8 @@ impl InboundRequest { }, InboundRequest::BlobsByRange(_) => SupportedProtocol::BlobsByRangeV1, InboundRequest::BlobsByRoot(_) => SupportedProtocol::BlobsByRootV1, + InboundRequest::DataColumnsByRoot(_) => SupportedProtocol::DataColumnsByRootV1, + InboundRequest::DataColumnsByRange(_) => SupportedProtocol::DataColumnsByRangeV1, InboundRequest::Ping(_) => SupportedProtocol::PingV1, InboundRequest::MetaData(req) => match req { MetadataRequest::V1(_) => SupportedProtocol::MetaDataV1, @@ -736,6 +778,8 @@ impl InboundRequest { InboundRequest::BlocksByRoot(_) => ResponseTermination::BlocksByRoot, InboundRequest::BlobsByRange(_) => ResponseTermination::BlobsByRange, InboundRequest::BlobsByRoot(_) => ResponseTermination::BlobsByRoot, + InboundRequest::DataColumnsByRoot(_) => ResponseTermination::DataColumnsByRoot, + InboundRequest::DataColumnsByRange(_) => ResponseTermination::DataColumnsByRange, InboundRequest::Status(_) => unreachable!(), InboundRequest::Goodbye(_) => unreachable!(), InboundRequest::Ping(_) => unreachable!(), @@ -846,6 +890,10 @@ impl std::fmt::Display for InboundRequest { InboundRequest::BlocksByRoot(req) => write!(f, "Blocks by root: {:?}", req), InboundRequest::BlobsByRange(req) => write!(f, "Blobs by range: {:?}", req), InboundRequest::BlobsByRoot(req) => write!(f, "Blobs by root: {:?}", req), + InboundRequest::DataColumnsByRoot(req) => write!(f, "Data columns by root: {:?}", req), + InboundRequest::DataColumnsByRange(req) => { + write!(f, "Data columns by range: {:?}", req) + } InboundRequest::Ping(ping) => write!(f, "Ping: {}", ping.data), InboundRequest::MetaData(_) => write!(f, "MetaData request"), InboundRequest::LightClientBootstrap(bootstrap) => { diff --git a/beacon_node/lighthouse_network/src/rpc/rate_limiter.rs b/beacon_node/lighthouse_network/src/rpc/rate_limiter.rs index b304eb546da..9fb085efd86 100644 --- a/beacon_node/lighthouse_network/src/rpc/rate_limiter.rs +++ b/beacon_node/lighthouse_network/src/rpc/rate_limiter.rs @@ -97,6 +97,10 @@ pub struct RPCRateLimiter { blbrange_rl: Limiter, /// BlobsByRoot rate limiter. blbroot_rl: Limiter, + /// DataColumnssByRoot rate limiter. + dcbroot_rl: Limiter, + /// DataColumnsByRange rate limiter. + dcbrange_rl: Limiter, /// LightClientBootstrap rate limiter. lc_bootstrap_rl: Limiter, /// LightClientOptimisticUpdate rate limiter. @@ -133,6 +137,10 @@ pub struct RPCRateLimiterBuilder { blbrange_quota: Option, /// Quota for the BlobsByRoot protocol. blbroot_quota: Option, + /// Quota for the DataColumnsByRoot protocol. + dcbroot_quota: Option, + /// Quota for the DataColumnsByRange protocol. + dcbrange_quota: Option, /// Quota for the LightClientBootstrap protocol. lcbootstrap_quota: Option, /// Quota for the LightClientOptimisticUpdate protocol. @@ -154,6 +162,8 @@ impl RPCRateLimiterBuilder { Protocol::BlocksByRoot => self.bbroots_quota = q, Protocol::BlobsByRange => self.blbrange_quota = q, Protocol::BlobsByRoot => self.blbroot_quota = q, + Protocol::DataColumnsByRoot => self.dcbroot_quota = q, + Protocol::DataColumnsByRange => self.dcbrange_quota = q, Protocol::LightClientBootstrap => self.lcbootstrap_quota = q, Protocol::LightClientOptimisticUpdate => self.lc_optimistic_update_quota = q, Protocol::LightClientFinalityUpdate => self.lc_finality_update_quota = q, @@ -191,6 +201,14 @@ impl RPCRateLimiterBuilder { .blbroot_quota .ok_or("BlobsByRoot quota not specified")?; + let dcbroot_quota = self + .dcbroot_quota + .ok_or("DataColumnsByRoot quota not specified")?; + + let dcbrange_quota = self + .dcbrange_quota + .ok_or("DataColumnsByRange quota not specified")?; + // create the rate limiters let ping_rl = Limiter::from_quota(ping_quota)?; let metadata_rl = Limiter::from_quota(metadata_quota)?; @@ -200,6 +218,8 @@ impl RPCRateLimiterBuilder { let bbrange_rl = Limiter::from_quota(bbrange_quota)?; let blbrange_rl = Limiter::from_quota(blbrange_quota)?; let blbroot_rl = Limiter::from_quota(blbroots_quota)?; + let dcbroot_rl = Limiter::from_quota(dcbroot_quota)?; + let dcbrange_rl = Limiter::from_quota(dcbrange_quota)?; let lc_bootstrap_rl = Limiter::from_quota(lc_bootstrap_quota)?; let lc_optimistic_update_rl = Limiter::from_quota(lc_optimistic_update_quota)?; let lc_finality_update_rl = Limiter::from_quota(lc_finality_update_quota)?; @@ -218,6 +238,8 @@ impl RPCRateLimiterBuilder { bbrange_rl, blbrange_rl, blbroot_rl, + dcbroot_rl, + dcbrange_rl, lc_bootstrap_rl, lc_optimistic_update_rl, lc_finality_update_rl, @@ -262,6 +284,8 @@ impl RPCRateLimiter { blocks_by_root_quota, blobs_by_range_quota, blobs_by_root_quota, + data_columns_by_root_quota, + data_columns_by_range_quota, light_client_bootstrap_quota, light_client_optimistic_update_quota, light_client_finality_update_quota, @@ -276,6 +300,8 @@ impl RPCRateLimiter { .set_quota(Protocol::BlocksByRoot, blocks_by_root_quota) .set_quota(Protocol::BlobsByRange, blobs_by_range_quota) .set_quota(Protocol::BlobsByRoot, blobs_by_root_quota) + .set_quota(Protocol::DataColumnsByRoot, data_columns_by_root_quota) + .set_quota(Protocol::DataColumnsByRange, data_columns_by_range_quota) .set_quota(Protocol::LightClientBootstrap, light_client_bootstrap_quota) .set_quota( Protocol::LightClientOptimisticUpdate, @@ -312,6 +338,8 @@ impl RPCRateLimiter { Protocol::BlocksByRoot => &mut self.bbroots_rl, Protocol::BlobsByRange => &mut self.blbrange_rl, Protocol::BlobsByRoot => &mut self.blbroot_rl, + Protocol::DataColumnsByRoot => &mut self.dcbroot_rl, + Protocol::DataColumnsByRange => &mut self.dcbrange_rl, Protocol::LightClientBootstrap => &mut self.lc_bootstrap_rl, Protocol::LightClientOptimisticUpdate => &mut self.lc_optimistic_update_rl, Protocol::LightClientFinalityUpdate => &mut self.lc_finality_update_rl, diff --git a/beacon_node/lighthouse_network/src/service/api_types.rs b/beacon_node/lighthouse_network/src/service/api_types.rs index 376ac34dee7..11df591ae4c 100644 --- a/beacon_node/lighthouse_network/src/service/api_types.rs +++ b/beacon_node/lighthouse_network/src/service/api_types.rs @@ -2,11 +2,13 @@ use std::sync::Arc; use libp2p::swarm::ConnectionId; use types::{ - BlobSidecar, EthSpec, LightClientBootstrap, LightClientFinalityUpdate, + BlobSidecar, DataColumnSidecar, EthSpec, LightClientBootstrap, LightClientFinalityUpdate, LightClientOptimisticUpdate, SignedBeaconBlock, }; -use crate::rpc::methods::{BlobsByRangeRequest, BlobsByRootRequest}; +use crate::rpc::methods::{ + BlobsByRangeRequest, BlobsByRootRequest, DataColumnsByRangeRequest, DataColumnsByRootRequest, +}; use crate::rpc::{ methods::{ BlocksByRangeRequest, BlocksByRootRequest, LightClientBootstrapRequest, @@ -27,6 +29,11 @@ pub struct SingleLookupReqId { pub req_id: Id, } +/// Request ID for data_columns_by_root requests. Block lookup do not issue this requests directly. +/// Wrapping this particular req_id, ensures not mixing this requests with a custody req_id. +#[derive(Debug, Hash, PartialEq, Eq, Clone, Copy)] +pub struct DataColumnsByRootRequestId(pub Id); + /// Id of rpc requests sent by sync to the network. #[derive(Debug, Hash, PartialEq, Eq, Clone, Copy)] pub enum SyncRequestId { @@ -34,6 +41,8 @@ pub enum SyncRequestId { SingleBlock { id: SingleLookupReqId }, /// Request searching for a set of blobs given a hash. SingleBlob { id: SingleLookupReqId }, + /// Request searching for a set of data columns given a hash and list of column indices. + DataColumnsByRoot(DataColumnsByRootRequestId, SingleLookupReqId), /// Range request that is composed by both a block range request and a blob range request. RangeBlockAndBlobs { id: Id }, } @@ -75,6 +84,10 @@ pub enum Request { LightClientFinalityUpdate, /// A request blobs root request. BlobsByRoot(BlobsByRootRequest), + /// A request data columns root request. + DataColumnsByRoot(DataColumnsByRootRequest), + /// A request data columns by range request. + DataColumnsByRange(DataColumnsByRangeRequest), } impl std::convert::From for OutboundRequest { @@ -104,6 +117,8 @@ impl std::convert::From for OutboundRequest { } Request::BlobsByRange(r) => OutboundRequest::BlobsByRange(r), Request::BlobsByRoot(r) => OutboundRequest::BlobsByRoot(r), + Request::DataColumnsByRoot(r) => OutboundRequest::DataColumnsByRoot(r), + Request::DataColumnsByRange(r) => OutboundRequest::DataColumnsByRange(r), Request::Status(s) => OutboundRequest::Status(s), } } @@ -123,10 +138,14 @@ pub enum Response { BlocksByRange(Option>>), /// A response to a get BLOBS_BY_RANGE request. A None response signals the end of the batch. BlobsByRange(Option>>), + /// A response to a get DATA_COLUMN_SIDECARS_BY_Range request. + DataColumnsByRange(Option>>), /// A response to a get BLOCKS_BY_ROOT request. BlocksByRoot(Option>>), /// A response to a get BLOBS_BY_ROOT request. BlobsByRoot(Option>>), + /// A response to a get DATA_COLUMN_SIDECARS_BY_ROOT request. + DataColumnsByRoot(Option>>), /// A response to a LightClientUpdate request. LightClientBootstrap(Arc>), /// A response to a LightClientOptimisticUpdate request. @@ -154,6 +173,16 @@ impl std::convert::From> for RPCCodedResponse { Some(b) => RPCCodedResponse::Success(RPCResponse::BlobsByRange(b)), None => RPCCodedResponse::StreamTermination(ResponseTermination::BlobsByRange), }, + Response::DataColumnsByRoot(r) => match r { + Some(d) => RPCCodedResponse::Success(RPCResponse::DataColumnsByRoot(d)), + None => RPCCodedResponse::StreamTermination(ResponseTermination::DataColumnsByRoot), + }, + Response::DataColumnsByRange(r) => match r { + Some(d) => RPCCodedResponse::Success(RPCResponse::DataColumnsByRange(d)), + None => { + RPCCodedResponse::StreamTermination(ResponseTermination::DataColumnsByRange) + } + }, Response::Status(s) => RPCCodedResponse::Success(RPCResponse::Status(s)), Response::LightClientBootstrap(b) => { RPCCodedResponse::Success(RPCResponse::LightClientBootstrap(b)) diff --git a/beacon_node/lighthouse_network/src/service/mod.rs b/beacon_node/lighthouse_network/src/service/mod.rs index fe649f4199a..4ef080619eb 100644 --- a/beacon_node/lighthouse_network/src/service/mod.rs +++ b/beacon_node/lighthouse_network/src/service/mod.rs @@ -1204,6 +1204,12 @@ impl Network { Request::BlobsByRoot { .. } => { metrics::inc_counter_vec(&metrics::TOTAL_RPC_REQUESTS, &["blobs_by_root"]) } + Request::DataColumnsByRoot { .. } => { + metrics::inc_counter_vec(&metrics::TOTAL_RPC_REQUESTS, &["data_columns_by_root"]) + } + Request::DataColumnsByRange { .. } => { + metrics::inc_counter_vec(&metrics::TOTAL_RPC_REQUESTS, &["data_columns_by_range"]) + } } NetworkEvent::RequestReceived { peer_id, @@ -1523,6 +1529,22 @@ impl Network { self.build_request(peer_request_id, peer_id, Request::BlobsByRoot(req)); Some(event) } + InboundRequest::DataColumnsByRoot(req) => { + let event = self.build_request( + peer_request_id, + peer_id, + Request::DataColumnsByRoot(req), + ); + Some(event) + } + InboundRequest::DataColumnsByRange(req) => { + let event = self.build_request( + peer_request_id, + peer_id, + Request::DataColumnsByRange(req), + ); + Some(event) + } InboundRequest::LightClientBootstrap(req) => { let event = self.build_request( peer_request_id, @@ -1580,6 +1602,12 @@ impl Network { RPCResponse::BlobsByRoot(resp) => { self.build_response(id, peer_id, Response::BlobsByRoot(Some(resp))) } + RPCResponse::DataColumnsByRoot(resp) => { + self.build_response(id, peer_id, Response::DataColumnsByRoot(Some(resp))) + } + RPCResponse::DataColumnsByRange(resp) => { + self.build_response(id, peer_id, Response::DataColumnsByRange(Some(resp))) + } // Should never be reached RPCResponse::LightClientBootstrap(bootstrap) => { self.build_response(id, peer_id, Response::LightClientBootstrap(bootstrap)) @@ -1602,6 +1630,8 @@ impl Network { ResponseTermination::BlocksByRoot => Response::BlocksByRoot(None), ResponseTermination::BlobsByRange => Response::BlobsByRange(None), ResponseTermination::BlobsByRoot => Response::BlobsByRoot(None), + ResponseTermination::DataColumnsByRoot => Response::DataColumnsByRoot(None), + ResponseTermination::DataColumnsByRange => Response::DataColumnsByRange(None), }; self.build_response(id, peer_id, response) } diff --git a/beacon_node/network/src/network_beacon_processor/mod.rs b/beacon_node/network/src/network_beacon_processor/mod.rs index ffb01a99efb..9fb14fdcb8c 100644 --- a/beacon_node/network/src/network_beacon_processor/mod.rs +++ b/beacon_node/network/src/network_beacon_processor/mod.rs @@ -8,7 +8,9 @@ use beacon_processor::{ DuplicateCache, GossipAggregatePackage, GossipAttestationPackage, Work, WorkEvent as BeaconWorkEvent, }; -use lighthouse_network::rpc::methods::{BlobsByRangeRequest, BlobsByRootRequest}; +use lighthouse_network::rpc::methods::{ + BlobsByRangeRequest, BlobsByRootRequest, DataColumnsByRangeRequest, DataColumnsByRootRequest, +}; use lighthouse_network::{ rpc::{BlocksByRangeRequest, BlocksByRootRequest, LightClientBootstrapRequest, StatusMessage}, Client, MessageId, NetworkGlobals, PeerId, PeerRequestId, @@ -602,6 +604,40 @@ impl NetworkBeaconProcessor { }) } + /// Create a new work event to process `DataColumnsByRootRequest`s from the RPC network. + pub fn send_data_columns_by_roots_request( + self: &Arc, + peer_id: PeerId, + request_id: PeerRequestId, + request: DataColumnsByRootRequest, + ) -> Result<(), Error> { + let processor = self.clone(); + let process_fn = + move || processor.handle_data_columns_by_root_request(peer_id, request_id, request); + + self.try_send(BeaconWorkEvent { + drop_during_sync: false, + work: Work::DataColumnsByRootsRequest(Box::new(process_fn)), + }) + } + + /// Create a new work event to process `DataColumnsByRange`s from the RPC network. + pub fn send_data_columns_by_range_request( + self: &Arc, + peer_id: PeerId, + request_id: PeerRequestId, + request: DataColumnsByRangeRequest, + ) -> Result<(), Error> { + let processor = self.clone(); + let process_fn = + move || processor.handle_data_columns_by_range_request(peer_id, request_id, request); + + self.try_send(BeaconWorkEvent { + drop_during_sync: false, + work: Work::DataColumnsByRangeRequest(Box::new(process_fn)), + }) + } + /// Create a new work event to process `LightClientBootstrap`s from the RPC network. pub fn send_light_client_bootstrap_request( self: &Arc, diff --git a/beacon_node/network/src/network_beacon_processor/rpc_methods.rs b/beacon_node/network/src/network_beacon_processor/rpc_methods.rs index 2a0c7ea089b..3f8cf14dcbe 100644 --- a/beacon_node/network/src/network_beacon_processor/rpc_methods.rs +++ b/beacon_node/network/src/network_beacon_processor/rpc_methods.rs @@ -4,7 +4,9 @@ use crate::status::ToStatusMessage; use crate::sync::SyncMessage; use beacon_chain::{BeaconChainError, BeaconChainTypes, HistoricalBlockError, WhenSlotSkipped}; use itertools::process_results; -use lighthouse_network::rpc::methods::{BlobsByRangeRequest, BlobsByRootRequest}; +use lighthouse_network::rpc::methods::{ + BlobsByRangeRequest, BlobsByRootRequest, DataColumnsByRangeRequest, DataColumnsByRootRequest, +}; use lighthouse_network::rpc::*; use lighthouse_network::{PeerId, PeerRequestId, ReportSource, Response, SyncInfo}; use slog::{debug, error, warn}; @@ -314,6 +316,20 @@ impl NetworkBeaconProcessor { Ok(()) } + /// Handle a `DataColumnsByRoot` request from the peer. + pub fn handle_data_columns_by_root_request( + self: Arc, + peer_id: PeerId, + _request_id: PeerRequestId, + request: DataColumnsByRootRequest, + ) { + // TODO(das): implement handler + debug!(self.log, "Received DataColumnsByRoot Request"; + "peer_id" => %peer_id, + "count" => request.data_column_ids.len() + ); + } + /// Handle a `LightClientBootstrap` request from the peer. pub fn handle_light_client_bootstrap( self: &Arc, @@ -815,6 +831,21 @@ impl NetworkBeaconProcessor { Ok(()) } + /// Handle a `DataColumnsByRange` request from the peer. + pub fn handle_data_columns_by_range_request( + self: Arc, + peer_id: PeerId, + _request_id: PeerRequestId, + req: DataColumnsByRangeRequest, + ) { + // TODO(das): implement handler + debug!(self.log, "Received DataColumnsByRange Request"; + "peer_id" => %peer_id, + "count" => req.count, + "start_slot" => req.start_slot, + ); + } + /// Helper function to ensure single item protocol always end with either a single chunk or an /// error fn terminate_response_single_item Response>( diff --git a/beacon_node/network/src/router.rs b/beacon_node/network/src/router.rs index c162d52d026..a5e27f582af 100644 --- a/beacon_node/network/src/router.rs +++ b/beacon_node/network/src/router.rs @@ -27,7 +27,7 @@ use std::sync::Arc; use std::time::{Duration, SystemTime, UNIX_EPOCH}; use tokio::sync::mpsc; use tokio_stream::wrappers::UnboundedReceiverStream; -use types::{BlobSidecar, EthSpec, SignedBeaconBlock}; +use types::{BlobSidecar, DataColumnSidecar, EthSpec, SignedBeaconBlock}; /// Handles messages from the network and routes them to the appropriate service to be handled. pub struct Router { @@ -216,6 +216,14 @@ impl Router { self.network_beacon_processor .send_blobs_by_roots_request(peer_id, request_id, request), ), + Request::DataColumnsByRoot(request) => self.handle_beacon_processor_send_result( + self.network_beacon_processor + .send_data_columns_by_roots_request(peer_id, request_id, request), + ), + Request::DataColumnsByRange(request) => self.handle_beacon_processor_send_result( + self.network_beacon_processor + .send_data_columns_by_range_request(peer_id, request_id, request), + ), Request::LightClientBootstrap(request) => self.handle_beacon_processor_send_result( self.network_beacon_processor .send_light_client_bootstrap_request(peer_id, request_id, request), @@ -258,6 +266,12 @@ impl Router { Response::BlobsByRoot(blob) => { self.on_blobs_by_root_response(peer_id, request_id, blob); } + Response::DataColumnsByRoot(data_column) => { + self.on_data_columns_by_root_response(peer_id, request_id, data_column); + } + Response::DataColumnsByRange(data_column) => { + self.on_data_columns_by_range_response(peer_id, request_id, data_column); + } // Light client responses should not be received Response::LightClientBootstrap(_) | Response::LightClientOptimisticUpdate(_) @@ -507,11 +521,11 @@ impl Router { ) { let request_id = match request_id { AppRequestId::Sync(sync_id) => match sync_id { - SyncRequestId::SingleBlock { .. } | SyncRequestId::SingleBlob { .. } => { - crit!(self.log, "Block lookups do not request BBRange requests"; "peer_id" => %peer_id); + id @ SyncRequestId::RangeBlockAndBlobs { .. } => id, + other => { + crit!(self.log, "BlocksByRange response on incorrect request"; "request" => ?other); return; } - id @ SyncRequestId::RangeBlockAndBlobs { .. } => id, }, AppRequestId::Router => { crit!(self.log, "All BBRange requests belong to sync"; "peer_id" => %peer_id); @@ -570,12 +584,8 @@ impl Router { let request_id = match request_id { AppRequestId::Sync(sync_id) => match sync_id { id @ SyncRequestId::SingleBlock { .. } => id, - SyncRequestId::RangeBlockAndBlobs { .. } => { - crit!(self.log, "Batch syncing do not request BBRoot requests"; "peer_id" => %peer_id); - return; - } - SyncRequestId::SingleBlob { .. } => { - crit!(self.log, "Blob response to block by roots request"; "peer_id" => %peer_id); + other => { + crit!(self.log, "BlocksByRoot response on incorrect request"; "request" => ?other); return; } }, @@ -608,12 +618,8 @@ impl Router { let request_id = match request_id { AppRequestId::Sync(sync_id) => match sync_id { id @ SyncRequestId::SingleBlob { .. } => id, - SyncRequestId::SingleBlock { .. } => { - crit!(self.log, "Block response to blobs by roots request"; "peer_id" => %peer_id); - return; - } - SyncRequestId::RangeBlockAndBlobs { .. } => { - crit!(self.log, "Batch syncing does not request BBRoot requests"; "peer_id" => %peer_id); + other => { + crit!(self.log, "BlobsByRoot response on incorrect request"; "request" => ?other); return; } }, @@ -636,6 +642,67 @@ impl Router { }); } + /// Handle a `DataColumnsByRoot` response from the peer. + pub fn on_data_columns_by_root_response( + &mut self, + peer_id: PeerId, + request_id: AppRequestId, + data_column: Option>>, + ) { + let request_id = match request_id { + AppRequestId::Sync(sync_id) => match sync_id { + id @ SyncRequestId::DataColumnsByRoot { .. } => id, + other => { + crit!(self.log, "DataColumnsByRoot response on incorrect request"; "request" => ?other); + return; + } + }, + AppRequestId::Router => { + crit!(self.log, "All DataColumnsByRoot requests belong to sync"; "peer_id" => %peer_id); + return; + } + }; + + trace!( + self.log, + "Received DataColumnsByRoot Response"; + "peer" => %peer_id, + ); + self.send_to_sync(SyncMessage::RpcDataColumn { + request_id, + peer_id, + data_column, + seen_timestamp: timestamp_now(), + }); + } + + pub fn on_data_columns_by_range_response( + &mut self, + peer_id: PeerId, + request_id: AppRequestId, + data_column: Option>>, + ) { + trace!( + self.log, + "Received DataColumnsByRange Response"; + "peer" => %peer_id, + ); + + if let AppRequestId::Sync(id) = request_id { + self.send_to_sync(SyncMessage::RpcDataColumn { + peer_id, + request_id: id, + data_column, + seen_timestamp: timestamp_now(), + }); + } else { + crit!( + self.log, + "All data columns by range responses should belong to sync" + ); + } + } + fn handle_beacon_processor_send_result( &mut self, result: Result<(), crate::network_beacon_processor::Error>, diff --git a/beacon_node/network/src/sync/manager.rs b/beacon_node/network/src/sync/manager.rs index 7149395839b..e8e6896cd69 100644 --- a/beacon_node/network/src/sync/manager.rs +++ b/beacon_node/network/src/sync/manager.rs @@ -101,6 +101,14 @@ pub enum SyncMessage { seen_timestamp: Duration, }, + /// A data columns has been received from the RPC + RpcDataColumn { + request_id: SyncRequestId, + peer_id: PeerId, + data_column: Option>>, + seen_timestamp: Duration, + }, + /// A block with an unknown parent has been received. UnknownParentBlock(PeerId, RpcBlock, Hash256), @@ -337,6 +345,9 @@ impl SyncManager { SyncRequestId::SingleBlob { id } => { self.on_single_blob_response(id, peer_id, RpcEvent::RPCError(error)) } + SyncRequestId::DataColumnsByRoot { .. } => { + // TODO(das) + } SyncRequestId::RangeBlockAndBlobs { id } => { if let Some(sender_id) = self.network.range_request_failed(id) { match sender_id { @@ -614,6 +625,12 @@ impl SyncManager { blob_sidecar, seen_timestamp, } => self.rpc_blob_received(request_id, peer_id, blob_sidecar, seen_timestamp), + SyncMessage::RpcDataColumn { + request_id, + peer_id, + data_column, + seen_timestamp, + } => self.rpc_data_column_received(request_id, peer_id, data_column, seen_timestamp), SyncMessage::UnknownParentBlock(peer_id, block, block_root) => { let block_slot = block.slot(); let parent_root = block.parent_root(); @@ -846,6 +863,9 @@ impl SyncManager { SyncRequestId::SingleBlob { .. } => { crit!(self.log, "Block received during blob request"; "peer_id" => %peer_id ); } + SyncRequestId::DataColumnsByRoot { .. } => { + // TODO(das) + } SyncRequestId::RangeBlockAndBlobs { id } => { self.range_block_and_blobs_response(id, peer_id, block.into()) } @@ -888,12 +908,25 @@ impl SyncManager { None => RpcEvent::StreamTermination, }, ), + SyncRequestId::DataColumnsByRoot { .. } => { + // TODO(das) + } SyncRequestId::RangeBlockAndBlobs { id } => { self.range_block_and_blobs_response(id, peer_id, blob.into()) } } } + fn rpc_data_column_received( + &mut self, + _request_id: SyncRequestId, + _peer_id: PeerId, + _data_column: Option>>, + _seen_timestamp: Duration, + ) { + // TODO(das): implement handler + } + fn on_single_blob_response( &mut self, id: SingleLookupReqId,