Skip to content

Commit

Permalink
[exec-pool] Add a TODO for process_block_retrieval_v2
Browse files Browse the repository at this point in the history
  • Loading branch information
hariria committed Jan 28, 2025
1 parent ca4499f commit 8389258
Show file tree
Hide file tree
Showing 2 changed files with 78 additions and 39 deletions.
21 changes: 20 additions & 1 deletion consensus/src/block_storage/sync_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,9 @@ use crate::{
epoch_manager::LivenessStorageData,
logging::{LogEvent, LogSchema},
monitor,
network::{DeprecatedIncomingBlockRetrievalRequest, NetworkSender},
network::{
DeprecatedIncomingBlockRetrievalRequest, IncomingBlockRetrievalRequest, NetworkSender,
},
network_interface::ConsensusMsg,
payload_manager::TPayloadManager,
persistent_liveness_storage::{LedgerRecoveryData, PersistentLivenessStorage, RecoveryData},
Expand Down Expand Up @@ -505,6 +507,23 @@ impl BlockStore {
.send(Ok(response_bytes.into()))
.map_err(|_| anyhow::anyhow!("Failed to send block retrieval response"))
}

/// TODO @bchocho @hariria to implement in upcoming PR
/// Retrieve a n chained blocks from the block store starting from
/// an initial parent id, returning with <n (as many as possible) if
/// id or its ancestors can not be found.
///
/// The current version of the function is not really async, but keeping it this way for
/// future possible changes.
pub async fn process_block_retrieval_v2(
&self,
request: IncomingBlockRetrievalRequest,
) -> anyhow::Result<()> {
bail!(
"Unexpected request {:?} for process_block_retrieval_v2",
request.req
)
}
}

/// BlockRetriever is used internally to retrieve blocks
Expand Down
96 changes: 58 additions & 38 deletions consensus/src/epoch_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,7 @@ pub struct EpochManager<P: OnChainConfigProvider> {
round_manager_close_tx: Option<oneshot::Sender<oneshot::Sender<()>>>,
epoch_state: Option<Arc<EpochState>>,
block_retrieval_tx:
Option<aptos_channel::Sender<AccountAddress, DeprecatedIncomingBlockRetrievalRequest>>,
Option<aptos_channel::Sender<AccountAddress, IncomingBlockRetrievalRequest>>,
quorum_store_msg_tx: Option<aptos_channel::Sender<AccountAddress, (Author, VerifiedEvent)>>,
quorum_store_coordinator_tx: Option<Sender<CoordinatorCommand>>,
quorum_store_storage: Arc<dyn QuorumStoreStorage>,
Expand Down Expand Up @@ -563,27 +563,58 @@ impl<P: OnChainConfigProvider> EpochManager<P> {
block_store: Arc<BlockStore>,
max_blocks_allowed: u64,
) {
let (request_tx, mut request_rx) =
aptos_channel::new::<_, DeprecatedIncomingBlockRetrievalRequest>(
QueueStyle::KLAST,
10,
Some(&counters::BLOCK_RETRIEVAL_TASK_MSGS),
);
let (request_tx, mut request_rx) = aptos_channel::new::<_, IncomingBlockRetrievalRequest>(
QueueStyle::KLAST,
10,
Some(&counters::BLOCK_RETRIEVAL_TASK_MSGS),
);
let task = async move {
info!(epoch = epoch, "Block retrieval task starts");
while let Some(request) = request_rx.next().await {
if request.req.num_blocks() > max_blocks_allowed {
warn!(
"Ignore block retrieval with too many blocks: {}",
request.req.num_blocks()
);
continue;
}
if let Err(e) = monitor!(
"process_block_retrieval",
block_store.process_block_retrieval(request).await
) {
warn!(epoch = epoch, error = ?e, kind = error_kind(&e));
match request.req {
// TODO @bchocho @hariria deprecate once BlockRetrievalRequest enum release is complete
BlockRetrievalRequest::V1(v1) => {
if v1.num_blocks() > max_blocks_allowed {
warn!(
"Ignore block retrieval with too many blocks: {}",
v1.num_blocks()
);
continue;
}
if let Err(e) = monitor!(
"process_block_retrieval",
block_store
.process_block_retrieval(DeprecatedIncomingBlockRetrievalRequest {
req: v1,
protocol: request.protocol,
response_sender: request.response_sender,
})
.await
) {
warn!(epoch = epoch, error = ?e, kind = error_kind(&e));
}
},
BlockRetrievalRequest::V2(v2) => {
if v2.num_blocks() > max_blocks_allowed {
warn!(
"Ignore block retrieval with too many blocks: {}",
v2.num_blocks()
);
continue;
}
if let Err(e) = monitor!(
"process_block_retrieval",
block_store
.process_block_retrieval_v2(IncomingBlockRetrievalRequest {
req: BlockRetrievalRequest::V2(v2),
protocol: request.protocol,
response_sender: request.response_sender,
})
.await
) {
warn!(epoch = epoch, error = ?e, kind = error_kind(&e));
}
},
}
}
info!(epoch = epoch, "Block retrieval task stops");
Expand Down Expand Up @@ -1702,7 +1733,12 @@ impl<P: OnChainConfigProvider> EpochManager<P> {
// TODO @bchocho @hariria can remove after all nodes upgrade to release with enum BlockRetrievalRequest (not struct)
IncomingRpcRequest::DeprecatedBlockRetrieval(request) => {
if let Some(tx) = &self.block_retrieval_tx {
tx.push(peer_id, request)
let incoming_block_retrieval_request = IncomingBlockRetrievalRequest {
req: BlockRetrievalRequest::V1(request.req),
protocol: request.protocol,
response_sender: request.response_sender,
};
tx.push(peer_id, incoming_block_retrieval_request)
} else {
error!("Round manager not started");
Ok(())
Expand Down Expand Up @@ -1732,25 +1768,9 @@ impl<P: OnChainConfigProvider> EpochManager<P> {
bail!("Rand manager not started");
}
},
IncomingRpcRequest::BlockRetrieval(IncomingBlockRetrievalRequest {
req,
protocol,
response_sender,
}) => {
IncomingRpcRequest::BlockRetrieval(request) => {
if let Some(tx) = &self.block_retrieval_tx {
match req {
BlockRetrievalRequest::V1(v1) => {
tx.push(peer_id, DeprecatedIncomingBlockRetrievalRequest {
req: v1,
protocol,
response_sender,
})
},
// TODO @bchocho @hariria implement after all nodes upgrade to release with enum BlockRetrievalRequest (not struct)
BlockRetrievalRequest::V2(v2) => {
bail!("Should not have received a BlockRetrievalRequestV2 {:?} from peer with id {}", v2, peer_id);
},
}
tx.push(peer_id, request)
} else {
error!("Round manager not started");
Ok(())
Expand Down

0 comments on commit 8389258

Please sign in to comment.