Skip to content

Commit

Permalink
[execution-pool] BlockRetrievalRequest deprecation
Browse files Browse the repository at this point in the history
  • Loading branch information
hariria committed Jan 25, 2025
1 parent 3e471a8 commit 5435053
Show file tree
Hide file tree
Showing 7 changed files with 201 additions and 33 deletions.
71 changes: 67 additions & 4 deletions consensus/consensus-types/src/block_retrieval.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,15 +15,27 @@ pub const NUM_PEERS_PER_RETRY: usize = 3;
pub const RETRY_INTERVAL_MSEC: u64 = 500;
pub const RPC_TIMEOUT_MSEC: u64 = 5000;

#[derive(Serialize, Deserialize, Clone, Debug, PartialEq, Eq)]
#[allow(deprecated)]
pub enum BlockRetrievalRequest {
V1(BlockRetrievalRequestV1),
V2(BlockRetrievalRequestV2),
}

/// RPC to get a chain of block of the given length starting from the given block id.
/// NOTE: The [`BlockRetrievalRequest`](BlockRetrievalRequest) struct is being renamed to
/// [`BlockRetrievalRequestV1`](BlockRetrievalRequestV1) and deprecated in favor of a
/// [`BlockRetrievalRequest`](BlockRetrievalRequest) enum
///
/// Going forward, please use the [`BlockRetrievalRequest`](BlockRetrievalRequest) enum
#[derive(Serialize, Deserialize, Clone, Debug, PartialEq, Eq)]
pub struct BlockRetrievalRequest {
pub struct BlockRetrievalRequestV1 {
block_id: HashValue,
num_blocks: u64,
target_block_id: Option<HashValue>,
}

impl BlockRetrievalRequest {
impl BlockRetrievalRequestV1 {
pub fn new(block_id: HashValue, num_blocks: u64) -> Self {
Self {
block_id,
Expand Down Expand Up @@ -61,7 +73,8 @@ impl BlockRetrievalRequest {
}
}

impl fmt::Display for BlockRetrievalRequest {
#[allow(deprecated)]
impl fmt::Display for BlockRetrievalRequestV1 {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
write!(
f,
Expand All @@ -71,6 +84,55 @@ impl fmt::Display for BlockRetrievalRequest {
}
}

/// RPC to get a chain of block of the given length starting from the given block id.
#[derive(Serialize, Deserialize, Clone, Debug, PartialEq, Eq)]
pub struct BlockRetrievalRequestV2 {
block_id: HashValue,
num_blocks: u64,
// TODO: remove the Option, if it's not too painful
target_epoch_and_round: Option<(u64, u64)>,
}

impl BlockRetrievalRequestV2 {
pub fn new(block_id: HashValue, num_blocks: u64) -> Self {
BlockRetrievalRequestV2 {
block_id,
num_blocks,
target_epoch_and_round: None,
}
}

pub fn new_with_target_round(
block_id: HashValue,
num_blocks: u64,
target_epoch: u64,
target_round: u64,
) -> Self {
BlockRetrievalRequestV2 {
block_id,
num_blocks,
target_epoch_and_round: Some((target_epoch, target_round)),
}
}

pub fn block_id(&self) -> HashValue {
self.block_id
}

pub fn num_blocks(&self) -> u64 {
self.num_blocks
}

pub fn target_epoch_and_round(&self) -> Option<(u64, u64)> {
self.target_epoch_and_round
}

pub fn match_target_round(&self, epoch: u64, round: u64) -> bool {
self.target_epoch_and_round()
.map_or(false, |target| (epoch, round) <= target)
}
}

#[derive(Serialize, Deserialize, Clone, Debug, PartialEq, Eq)]
pub enum BlockRetrievalStatus {
// Successfully fill in the request.
Expand Down Expand Up @@ -103,9 +165,10 @@ impl BlockRetrievalResponse {
&self.blocks
}

#[allow(deprecated)]
pub fn verify(
&self,
retrieval_request: BlockRetrievalRequest,
retrieval_request: BlockRetrievalRequestV1,
sig_verifier: &ValidatorVerifier,
) -> anyhow::Result<()> {
ensure!(
Expand Down
7 changes: 5 additions & 2 deletions consensus/src/block_storage/sync_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
// Parts of the project are originally copyright © Meta Platforms, Inc.
// SPDX-License-Identifier: Apache-2.0

#[allow(deprecated)]
use crate::{
block_storage::{
pending_blocks::PendingBlocks,
Expand All @@ -27,7 +28,7 @@ use anyhow::{anyhow, bail, Context};
use aptos_consensus_types::{
block::Block,
block_retrieval::{
BlockRetrievalRequest, BlockRetrievalResponse, BlockRetrievalStatus, NUM_PEERS_PER_RETRY,
BlockRetrievalRequestV1, BlockRetrievalResponse, BlockRetrievalStatus, NUM_PEERS_PER_RETRY,
NUM_RETRIES, RETRY_INTERVAL_MSEC, RPC_TIMEOUT_MSEC,
},
common::Author,
Expand Down Expand Up @@ -468,6 +469,8 @@ impl BlockStore {
///
/// The current version of the function is not really async, but keeping it this way for
/// future possible changes.
/// TODO @bchocho @hariria can remove after all nodes upgrade to release with enum BlockRetrievalRequest (not struct)
#[allow(deprecated)]
pub async fn process_block_retrieval(
&self,
request: IncomingBlockRetrievalRequest,
Expand Down Expand Up @@ -576,7 +579,7 @@ impl BlockRetriever {
.boxed(),
)
}
let request = BlockRetrievalRequest::new_with_target_block_id(
let request = BlockRetrievalRequestV1::new_with_target_block_id(
block_id,
retrieve_batch_size,
target_block_id,
Expand Down
44 changes: 41 additions & 3 deletions consensus/src/epoch_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,9 @@
// Parts of the project are originally copyright © Meta Platforms, Inc.
// SPDX-License-Identifier: Apache-2.0

/// TODO @bchocho @hariria can remove after all nodes upgrade to release with enum BlockRetrievalRequest (not struct)
#[allow(deprecated)]
use crate::network::IncomingBlockRetrievalRequest;
use crate::{
block_storage::{
pending_blocks::PendingBlocks,
Expand Down Expand Up @@ -31,7 +34,7 @@ use crate::{
metrics_safety_rules::MetricsSafetyRules,
monitor,
network::{
IncomingBatchRetrievalRequest, IncomingBlockRetrievalRequest, IncomingDAGRequest,
IncomingBatchRetrievalRequest, IncomingBlockRetrievalRequestV2, IncomingDAGRequest,
IncomingRandGenRequest, IncomingRpcRequest, NetworkReceivers, NetworkSender,
},
network_interface::{ConsensusMsg, ConsensusNetworkClient},
Expand Down Expand Up @@ -59,6 +62,7 @@ use aptos_bounded_executor::BoundedExecutor;
use aptos_channels::{aptos_channel, message_queues::QueueStyle};
use aptos_config::config::{ConsensusConfig, DagConsensusConfig, ExecutionConfig, NodeConfig};
use aptos_consensus_types::{
block_retrieval::{BlockRetrievalRequest, BlockRetrievalRequestV1},
common::{Author, Round},
epoch_retrieval::EpochRetrievalRequest,
proof_of_store::ProofCache,
Expand Down Expand Up @@ -555,12 +559,15 @@ impl<P: OnChainConfigProvider> EpochManager<P> {
Ok(())
}

/// TODO @bchocho @hariria can remove after all nodes upgrade to release with enum BlockRetrievalRequest (not struct)
#[allow(deprecated)]
fn spawn_block_retrieval_task(
&mut self,
epoch: u64,
block_store: Arc<BlockStore>,
max_blocks_allowed: u64,
) {
// TODO @bchocho @hariria channel will take IncomingBlockRetrievalRequestV2 in the future
let (request_tx, mut request_rx) = aptos_channel::new::<_, IncomingBlockRetrievalRequest>(
QueueStyle::KLAST,
10,
Expand Down Expand Up @@ -1666,6 +1673,8 @@ impl<P: OnChainConfigProvider> EpochManager<P> {
}
}

/// TODO: @bchocho @hariria can change after all nodes upgrade to release with enum BlockRetrievalRequest (not struct)
#[allow(deprecated)]
fn process_rpc_request(
&mut self,
peer_id: Author,
Expand All @@ -1684,13 +1693,19 @@ impl<P: OnChainConfigProvider> EpochManager<P> {
return Ok(());
},
None => {
ensure!(matches!(request, IncomingRpcRequest::BlockRetrieval(_)));
// TODO: @bchocho @hariria can change after all nodes upgrade to release with enum BlockRetrievalRequest (not struct)
ensure!(matches!(
request,
IncomingRpcRequest::DeprecatedBlockRetrieval(_)
| IncomingRpcRequest::BlockRetrievalV2(_)
));
},
_ => {},
}

match request {
IncomingRpcRequest::BlockRetrieval(request) => {
// TODO @bchocho @hariria can remove after all nodes upgrade to release with enum BlockRetrievalRequest (not struct)
IncomingRpcRequest::DeprecatedBlockRetrieval(request) => {
if let Some(tx) = &self.block_retrieval_tx {
tx.push(peer_id, request)
} else {
Expand Down Expand Up @@ -1722,6 +1737,29 @@ impl<P: OnChainConfigProvider> EpochManager<P> {
bail!("Rand manager not started");
}
},
IncomingRpcRequest::BlockRetrievalV2(IncomingBlockRetrievalRequestV2 {
req,
protocol,
response_sender,
}) => {
if let Some(tx) = &self.block_retrieval_tx {
let checked_request: BlockRetrievalRequestV1 = match req {
BlockRetrievalRequest::V1(v1) => v1,
// TODO @bchocho @hariria implement after all nodes upgrade to release with enum BlockRetrievalRequest (not struct)
BlockRetrievalRequest::V2(_) => {
unimplemented!("Should not have received a BlockRetrievalRequestV2...")
},
};
tx.push(peer_id, IncomingBlockRetrievalRequest {
req: checked_request,
protocol,
response_sender,
})
} else {
error!("Round manager not started");
Ok(())
}
},
}
}

Expand Down
44 changes: 34 additions & 10 deletions consensus/src/network.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ use anyhow::{anyhow, bail, ensure};
use aptos_channels::{self, aptos_channel, message_queues::QueueStyle};
use aptos_config::network_id::NetworkId;
use aptos_consensus_types::{
block_retrieval::{BlockRetrievalRequest, BlockRetrievalResponse},
block_retrieval::{BlockRetrievalRequest, BlockRetrievalRequestV1, BlockRetrievalResponse},
common::Author,
order_vote_msg::OrderVoteMsg,
pipeline::{commit_decision::CommitDecision, commit_vote::CommitVote},
Expand Down Expand Up @@ -92,10 +92,24 @@ impl RpcResponder {
}
}

/// NOTE: The [`IncomingBlockRetrievalRequest`](IncomingBlockRetrievalRequest) struct is being
/// deprecated in favor of [`IncomingBlockRetrievalRequestV2`](IncomingBlockRetrievalRequestV2) which
/// supports the new [`BlockRetrievalRequest`](BlockRetrievalRequest) enum for the `req` field
///
/// Going forward, please use [`IncomingBlockRetrievalRequestV2`](IncomingBlockRetrievalRequestV2)
/// For more details, see comments above [`BlockRetrievalRequestV1`](BlockRetrievalRequestV1)
/// TODO @bchocho @hariria can remove after all nodes upgrade to release with enum BlockRetrievalRequest (not struct)
#[derive(Debug)]
pub struct IncomingBlockRetrievalRequest {
pub req: BlockRetrievalRequestV1,
pub protocol: ProtocolId,
pub response_sender: oneshot::Sender<Result<Bytes, RpcError>>,
}

/// The block retrieval request is used internally for implementing RPC: the callback is executed
/// for carrying the response
#[derive(Debug)]
pub struct IncomingBlockRetrievalRequest {
pub struct IncomingBlockRetrievalRequestV2 {
pub req: BlockRetrievalRequest,
pub protocol: ProtocolId,
pub response_sender: oneshot::Sender<Result<Bytes, RpcError>>,
Expand Down Expand Up @@ -132,21 +146,27 @@ pub struct IncomingRandGenRequest {

#[derive(Debug)]
pub enum IncomingRpcRequest {
BlockRetrieval(IncomingBlockRetrievalRequest),
/// NOTE: This is being phased out in two releases to accommodate `IncomingBlockRetrievalRequestV2`
/// TODO @bchocho @hariria can remove after all nodes upgrade to release with enum BlockRetrievalRequest (not struct)
DeprecatedBlockRetrieval(IncomingBlockRetrievalRequest),
BatchRetrieval(IncomingBatchRetrievalRequest),
DAGRequest(IncomingDAGRequest),
CommitRequest(IncomingCommitRequest),
RandGenRequest(IncomingRandGenRequest),
BlockRetrievalV2(IncomingBlockRetrievalRequestV2),
}

impl IncomingRpcRequest {
/// TODO @bchocho @hariria can remove after all nodes upgrade to release with enum BlockRetrievalRequest (not struct)
#[allow(deprecated)]
pub fn epoch(&self) -> Option<u64> {
match self {
IncomingRpcRequest::BatchRetrieval(req) => Some(req.req.epoch()),
IncomingRpcRequest::DAGRequest(req) => Some(req.req.epoch()),
IncomingRpcRequest::RandGenRequest(req) => Some(req.req.epoch()),
IncomingRpcRequest::CommitRequest(req) => req.req.epoch(),
IncomingRpcRequest::BlockRetrieval(_) => None,
IncomingRpcRequest::DeprecatedBlockRetrieval(_) => None,
IncomingRpcRequest::BlockRetrievalV2(_) => None,
}
}
}
Expand Down Expand Up @@ -223,7 +243,7 @@ impl NetworkSender {
/// returns a future that is fulfilled with BlockRetrievalResponse.
pub async fn request_block(
&self,
retrieval_request: BlockRetrievalRequest,
retrieval_request: BlockRetrievalRequestV1,
from: Author,
timeout: Duration,
) -> anyhow::Result<BlockRetrievalResponse> {
Expand Down Expand Up @@ -708,6 +728,8 @@ impl NetworkTask {
}
}

/// TODO @bchocho @hariria can remove after all nodes upgrade to release with enum BlockRetrievalRequest (not struct)
#[allow(deprecated)]
pub async fn start(mut self) {
while let Some(message) = self.all_events.next().await {
monitor!("network_main_loop", match message {
Expand Down Expand Up @@ -812,11 +834,13 @@ impl NetworkTask {
"{}",
request
);
IncomingRpcRequest::BlockRetrieval(IncomingBlockRetrievalRequest {
req: *request,
protocol,
response_sender: callback,
})
IncomingRpcRequest::DeprecatedBlockRetrieval(
IncomingBlockRetrievalRequest {
req: *request,
protocol,
response_sender: callback,
},
)
},
ConsensusMsg::BatchRequestMsg(request) => {
debug!(
Expand Down
12 changes: 9 additions & 3 deletions consensus/src/network_interface.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ use crate::{
};
use aptos_config::network_id::{NetworkId, PeerNetworkId};
use aptos_consensus_types::{
block_retrieval::{BlockRetrievalRequest, BlockRetrievalResponse},
block_retrieval::{BlockRetrievalRequest, BlockRetrievalRequestV1, BlockRetrievalResponse},
epoch_retrieval::EpochRetrievalRequest,
order_vote_msg::OrderVoteMsg,
pipeline::{commit_decision::CommitDecision, commit_vote::CommitVote},
Expand All @@ -35,8 +35,11 @@ use std::{collections::HashMap, time::Duration};
/// Network type for consensus
#[derive(Clone, Debug, Deserialize, Serialize)]
pub enum ConsensusMsg {
/// DEPRECATED: Please use [`ConsensusMsg::BlockRetrievalRequestV2`](ConsensusMsg::BlockRetrievalRequestV2) going forward
/// RPC to get a chain of block of the given length starting from the given block id.
BlockRetrievalRequest(Box<BlockRetrievalRequest>),
/// Note: Naming here is `BlockRetrievalRequest` and not `DeprecatedBlockRetrievalRequest`
/// to be consistent with the naming implementation in [`ConsensusMsg::name`](ConsensusMsg::name)
BlockRetrievalRequest(Box<BlockRetrievalRequestV1>),
/// Carries the returned blocks and the retrieval status.
BlockRetrievalResponse(Box<BlockRetrievalResponse>),
/// Request to get a EpochChangeProof from current_epoch to target_epoch
Expand Down Expand Up @@ -83,12 +86,14 @@ pub enum ConsensusMsg {
OrderVoteMsg(Box<OrderVoteMsg>),
/// RoundTimeoutMsg is broadcasted by a validator once it decides to timeout the current round.
RoundTimeoutMsg(Box<RoundTimeoutMsg>),
/// RPC to get a chain of block of the given length starting from the given block id, using epoch and round.
BlockRetrievalRequestV2(Box<BlockRetrievalRequest>),
}

/// Network type for consensus
impl ConsensusMsg {
/// ConsensusMsg type in string
///
/// TODO @bchocho @hariria can remove after all nodes upgrade to release with enum BlockRetrievalRequest (not struct)
pub fn name(&self) -> &str {
match self {
ConsensusMsg::BlockRetrievalRequest(_) => "BlockRetrievalRequest",
Expand All @@ -111,6 +116,7 @@ impl ConsensusMsg {
ConsensusMsg::RandGenMessage(_) => "RandGenMessage",
ConsensusMsg::BatchResponseV2(_) => "BatchResponseV2",
ConsensusMsg::RoundTimeoutMsg(_) => "RoundTimeoutV2",
ConsensusMsg::BlockRetrievalRequestV2(_) => "BlockRetrievalRequestV2",
}
}
}
Expand Down
Loading

0 comments on commit 5435053

Please sign in to comment.