From 7c807673929b210217de1ca529cc301f60daf289 Mon Sep 17 00:00:00 2001 From: Alan Szepieniec Date: Sun, 19 Jan 2025 11:15:25 +0100 Subject: [PATCH] style(`peer_loop`): Arrange message handlers in order of sequence The sequence being: - get block notification (and: maybe start challenge phase; maybe ask peer for block) - get sync challenge (and: maybe respond) - get sync challenge response (and: inform main loop) - get block request (and: maybe respond) - get block (and: maybe try ensure path) --- src/peer_loop.rs | 636 +++++++++++++++++++++++------------------------ 1 file changed, 318 insertions(+), 318 deletions(-) diff --git a/src/peer_loop.rs b/src/peer_loop.rs index 53fbdbc2..6b17660c 100644 --- a/src/peer_loop.rs +++ b/src/peer_loop.rs @@ -541,275 +541,89 @@ impl PeerLoopHandler { .await?; Ok(KEEP_CONNECTION_ALIVE) } - PeerMessage::Block(t_block) => { - log_slow_scope!(fn_name!() + "::PeerMessage::Block"); - - info!( - "Got new block from peer {}, height {}, mined {}", - self.peer_address, - t_block.header.height, - t_block.header.timestamp.standard_format() - ); - let new_block_height = t_block.header.height; - - let block = match Block::try_from(*t_block) { - Ok(block) => Box::new(block), - Err(e) => { - warn!("Peer sent invalid block: {e:?}"); - self.punish(NegativePeerSanction::InvalidTransferBlock) - .await?; - - return Ok(KEEP_CONNECTION_ALIVE); - } - }; - - // Update the value for the highest known height that peer possesses iff - // we are not in a fork reconciliation state. - if peer_state_info.fork_reconciliation_blocks.is_empty() { - peer_state_info.highest_shared_block_height = new_block_height; - } - - self.try_ensure_path(block, peer, peer_state_info).await?; - - // Reward happens as part of `try_ensure_path` - - Ok(KEEP_CONNECTION_ALIVE) - } - PeerMessage::BlockRequestBatch(BlockRequestBatch { - known_blocks, - max_response_len, - }) => { - log_slow_scope!(fn_name!() + "::PeerMessage::BlockRequestBatch"); - debug!( - "Received BlockRequestBatch from peer {}, max_response_len: {max_response_len}", - self.peer_address - ); - - // The last block in the list of the peers known block is the - // earliest block, block with lowest height, the peer has - // requested. If it does not belong to canonical chain, none of - // the later will. So we can do an early abort in that case. - let least_preferred = match known_blocks.last() { - Some(least_preferred) => *least_preferred, - None => { - self.punish(NegativePeerSanction::BatchBlocksRequestEmpty) - .await?; - - return Ok(KEEP_CONNECTION_ALIVE); - } - }; - - if !self - .global_state_lock - .lock_guard() - .await - .chain - .archival_state() - .block_belongs_to_canonical_chain(least_preferred) - .await - { - self.punish(NegativePeerSanction::BatchBlocksUnknownRequest) - .await?; - peer.send(PeerMessage::UnableToSatisfyBatchRequest).await?; - - return Ok(KEEP_CONNECTION_ALIVE); - } - - // Happy case: At least *one* of the blocks referenced by peer - // is known to us. - let mut first_block_in_response: Option = None; - { - let global_state = self.global_state_lock.lock_guard().await; - for block_digest in known_blocks { - if global_state - .chain - .archival_state() - .block_belongs_to_canonical_chain(block_digest) - .await - { - let height = global_state - .chain - .archival_state() - .get_block_header(block_digest) - .await - .unwrap() - .height; - first_block_in_response = Some(height); - debug!( - "Found block in canonical chain for batch response: {}", - block_digest - ); - break; - } - } - } - - let peers_preferred_canonical_block = match first_block_in_response { - Some(block) => block, - None => { - self.punish(NegativePeerSanction::BatchBlocksUnknownRequest) - .await?; - return Ok(KEEP_CONNECTION_ALIVE); - } - }; + PeerMessage::BlockNotificationRequest => { + log_slow_scope!(fn_name!() + "::PeerMessage::BlockNotificationRequest"); - debug!( - "Peer's most preferred block has height {peers_preferred_canonical_block}.\ - Now building response from that height." - ); + debug!("Got BlockNotificationRequest"); - // Get the relevant blocks, at most batch-size many, descending from the - // peer's (alleged) most canonical block. Don't exceed `max_response_len` - // or `STANDARD_BLOCK_BATCH_SIZE` number of blocks in response. - let len_of_response = cmp::min( - max_response_len, + peer.send(PeerMessage::BlockNotification( self.global_state_lock - .cli() - .max_number_of_blocks_before_syncing, - ); - let len_of_response = cmp::max(len_of_response, MINIMUM_BLOCK_BATCH_SIZE); - let len_of_response = cmp::min(len_of_response, STANDARD_BLOCK_BATCH_SIZE); - - let mut digests_of_returned_blocks = Vec::with_capacity(len_of_response); - let response_start_height: u64 = peers_preferred_canonical_block.into(); - let mut i: u64 = 1; - let global_state = self.global_state_lock.lock_guard().await; - while digests_of_returned_blocks.len() < len_of_response { - match global_state - .chain - .archival_state() - .archival_block_mmr - .try_get_leaf(response_start_height + i) + .lock_guard() .await - { - Some(digest) => { - digests_of_returned_blocks.push(digest); - } - None => break, - } - i += 1; - } - - let mut returned_blocks: Vec = - Vec::with_capacity(digests_of_returned_blocks.len()); - for block_digest in digests_of_returned_blocks { - let block = global_state .chain - .archival_state() - .get_block(block_digest) - .await? - .unwrap(); - returned_blocks.push(block.try_into().unwrap()); - } - - debug!( - "Returning {} blocks in batch response", - returned_blocks.len() - ); - - let response = PeerMessage::BlockResponseBatch(returned_blocks); - peer.send(response).await?; + .light_state() + .into(), + )) + .await?; Ok(KEEP_CONNECTION_ALIVE) } - PeerMessage::BlockResponseBatch(t_blocks) => { - log_slow_scope!(fn_name!() + "::PeerMessage::BlockResponseBatch"); + PeerMessage::BlockNotification(block_notification) => { + log_slow_scope!(fn_name!() + "::PeerMessage::BlockNotification"); debug!( - "handling block response batch with {} blocks", - t_blocks.len() + "Got BlockNotification of height {}", + block_notification.height ); - if t_blocks.len() < MINIMUM_BLOCK_BATCH_SIZE { - warn!("Got smaller batch response than allowed"); - self.punish(NegativePeerSanction::TooShortBlockBatch) - .await?; - return Ok(KEEP_CONNECTION_ALIVE); - } + let state = self.global_state_lock.lock_guard().await; + if state.should_enter_sync_mode( + block_notification.height, + block_notification.cumulative_proof_of_work, + ) { + if peer_state_info.sync_challenge.is_some() { + warn!("Cannot launch new sync challenge because one is already on-going."); + return Ok(KEEP_CONNECTION_ALIVE); + } + + info!( + "Peer indicates block which would activate sync mode, issuing challenge." + ); + let challenge = SyncChallenge::generate( + &block_notification, + state.chain.light_state().header().height, + ); + peer_state_info.sync_challenge = Some(IssuedSyncChallenge::new( + challenge, + block_notification.cumulative_proof_of_work, + )); + + peer.send(PeerMessage::SyncChallenge(challenge)).await?; - // Verify that we are in fact in syncing mode - // TODO: Seperate peer messages into those allowed under syncing - // and those that are not - if !self.global_state_lock.lock_guard().await.net.syncing { - warn!("Received a batch of blocks without being in syncing mode"); - self.punish(NegativePeerSanction::ReceivedBatchBlocksOutsideOfSync) - .await?; return Ok(KEEP_CONNECTION_ALIVE); } - // Verify that the response matches the current state - // We get the latest block from the DB here since this message is - // only valid for archival nodes. - let first_blocks_parent_digest: Digest = t_blocks[0].header.prev_block_digest; - let most_canonical_own_block_match: Option = self + peer_state_info.highest_shared_block_height = block_notification.height; + let block_is_new = self .global_state_lock .lock_guard() .await .chain - .archival_state() - .get_block(first_blocks_parent_digest) - .await - .expect("Block lookup must succeed"); - let most_canonical_own_block_match: Block = match most_canonical_own_block_match { - Some(block) => block, - None => { - warn!("Got batch reponse with invalid start block"); - self.punish(NegativePeerSanction::BatchBlocksInvalidStartHeight) - .await?; - return Ok(KEEP_CONNECTION_ALIVE); - } - }; - - // Convert all blocks to Block objects - debug!( - "Found own block of height {} to match received batch", - most_canonical_own_block_match.kernel.header.height - ); - let mut received_blocks = vec![]; - for t_block in t_blocks { - match Block::try_from(t_block) { - Ok(block) => { - received_blocks.push(block); - } - Err(e) => { - warn!("Received invalid transfer block from peer: {e:?}"); - self.punish(NegativePeerSanction::InvalidTransferBlock) - .await?; - return Ok(KEEP_CONNECTION_ALIVE); - } - } - } - - // Get the latest block that we know of and handle all received blocks - self.handle_blocks(received_blocks, most_canonical_own_block_match) - .await?; - - // Reward happens as part of `handle_blocks`. - - Ok(KEEP_CONNECTION_ALIVE) - } - PeerMessage::UnableToSatisfyBatchRequest => { - log_slow_scope!(fn_name!() + "::PeerMessage::UnableToSatisfyBatchRequest"); - warn!( - "Peer {} reports inability to satisfy batch request.", - self.peer_address - ); - - Ok(KEEP_CONNECTION_ALIVE) - } - PeerMessage::BlockNotificationRequest => { - log_slow_scope!(fn_name!() + "::PeerMessage::BlockNotificationRequest"); + .light_state() + .kernel + .header + .cumulative_proof_of_work + < block_notification.cumulative_proof_of_work; - debug!("Got BlockNotificationRequest"); + debug!("block_is_new: {}", block_is_new); - peer.send(PeerMessage::BlockNotification( - self.global_state_lock - .lock_guard() - .await - .chain - .light_state() - .into(), - )) - .await?; + if block_is_new + && peer_state_info.fork_reconciliation_blocks.is_empty() + && !self.global_state_lock.lock_guard().await.net.syncing + { + debug!( + "sending BlockRequestByHeight to peer for block with height {}", + block_notification.height + ); + peer.send(PeerMessage::BlockRequestByHeight(block_notification.height)) + .await?; + } else { + debug!( + "ignoring peer block. height {}. new: {}, reconciling_fork: {}", + block_notification.height, + block_is_new, + !peer_state_info.fork_reconciliation_blocks.is_empty() + ); + } Ok(KEEP_CONNECTION_ALIVE) } @@ -976,76 +790,7 @@ impl PeerLoopHandler { claimed_tip_height, issued_challenge.accumulated_pow, ))) - .await?; - - Ok(KEEP_CONNECTION_ALIVE) - } - PeerMessage::BlockNotification(block_notification) => { - log_slow_scope!(fn_name!() + "::PeerMessage::BlockNotification"); - - debug!( - "Got BlockNotification of height {}", - block_notification.height - ); - let state = self.global_state_lock.lock_guard().await; - if state.should_enter_sync_mode( - block_notification.height, - block_notification.cumulative_proof_of_work, - ) { - if peer_state_info.sync_challenge.is_some() { - warn!("Cannot launch new sync challenge because one is already on-going."); - return Ok(KEEP_CONNECTION_ALIVE); - } - - info!( - "Peer indicates block which would activate sync mode, issuing challenge." - ); - let challenge = SyncChallenge::generate( - &block_notification, - state.chain.light_state().header().height, - ); - peer_state_info.sync_challenge = Some(IssuedSyncChallenge::new( - challenge, - block_notification.cumulative_proof_of_work, - )); - - peer.send(PeerMessage::SyncChallenge(challenge)).await?; - - return Ok(KEEP_CONNECTION_ALIVE); - } - - peer_state_info.highest_shared_block_height = block_notification.height; - let block_is_new = self - .global_state_lock - .lock_guard() - .await - .chain - .light_state() - .kernel - .header - .cumulative_proof_of_work - < block_notification.cumulative_proof_of_work; - - debug!("block_is_new: {}", block_is_new); - - if block_is_new - && peer_state_info.fork_reconciliation_blocks.is_empty() - && !self.global_state_lock.lock_guard().await.net.syncing - { - debug!( - "sending BlockRequestByHeight to peer for block with height {}", - block_notification.height - ); - peer.send(PeerMessage::BlockRequestByHeight(block_notification.height)) - .await?; - } else { - debug!( - "ignoring peer block. height {}. new: {}, reconciling_fork: {}", - block_notification.height, - block_is_new, - !peer_state_info.fork_reconciliation_blocks.is_empty() - ); - } + .await?; Ok(KEEP_CONNECTION_ALIVE) } @@ -1115,6 +860,261 @@ impl PeerLoopHandler { debug!("Sent block"); Ok(KEEP_CONNECTION_ALIVE) } + PeerMessage::Block(t_block) => { + log_slow_scope!(fn_name!() + "::PeerMessage::Block"); + + info!( + "Got new block from peer {}, height {}, mined {}", + self.peer_address, + t_block.header.height, + t_block.header.timestamp.standard_format() + ); + let new_block_height = t_block.header.height; + + let block = match Block::try_from(*t_block) { + Ok(block) => Box::new(block), + Err(e) => { + warn!("Peer sent invalid block: {e:?}"); + self.punish(NegativePeerSanction::InvalidTransferBlock) + .await?; + + return Ok(KEEP_CONNECTION_ALIVE); + } + }; + + // Update the value for the highest known height that peer possesses iff + // we are not in a fork reconciliation state. + if peer_state_info.fork_reconciliation_blocks.is_empty() { + peer_state_info.highest_shared_block_height = new_block_height; + } + + self.try_ensure_path(block, peer, peer_state_info).await?; + + // Reward happens as part of `try_ensure_path` + + Ok(KEEP_CONNECTION_ALIVE) + } + PeerMessage::BlockRequestBatch(BlockRequestBatch { + known_blocks, + max_response_len, + }) => { + log_slow_scope!(fn_name!() + "::PeerMessage::BlockRequestBatch"); + debug!( + "Received BlockRequestBatch from peer {}, max_response_len: {max_response_len}", + self.peer_address + ); + + // The last block in the list of the peers known block is the + // earliest block, block with lowest height, the peer has + // requested. If it does not belong to canonical chain, none of + // the later will. So we can do an early abort in that case. + let least_preferred = match known_blocks.last() { + Some(least_preferred) => *least_preferred, + None => { + self.punish(NegativePeerSanction::BatchBlocksRequestEmpty) + .await?; + + return Ok(KEEP_CONNECTION_ALIVE); + } + }; + + if !self + .global_state_lock + .lock_guard() + .await + .chain + .archival_state() + .block_belongs_to_canonical_chain(least_preferred) + .await + { + self.punish(NegativePeerSanction::BatchBlocksUnknownRequest) + .await?; + peer.send(PeerMessage::UnableToSatisfyBatchRequest).await?; + + return Ok(KEEP_CONNECTION_ALIVE); + } + + // Happy case: At least *one* of the blocks referenced by peer + // is known to us. + let mut first_block_in_response: Option = None; + { + let global_state = self.global_state_lock.lock_guard().await; + for block_digest in known_blocks { + if global_state + .chain + .archival_state() + .block_belongs_to_canonical_chain(block_digest) + .await + { + let height = global_state + .chain + .archival_state() + .get_block_header(block_digest) + .await + .unwrap() + .height; + first_block_in_response = Some(height); + debug!( + "Found block in canonical chain for batch response: {}", + block_digest + ); + break; + } + } + } + + let peers_preferred_canonical_block = match first_block_in_response { + Some(block) => block, + None => { + self.punish(NegativePeerSanction::BatchBlocksUnknownRequest) + .await?; + return Ok(KEEP_CONNECTION_ALIVE); + } + }; + + debug!( + "Peer's most preferred block has height {peers_preferred_canonical_block}.\ + Now building response from that height." + ); + + // Get the relevant blocks, at most batch-size many, descending from the + // peer's (alleged) most canonical block. Don't exceed `max_response_len` + // or `STANDARD_BLOCK_BATCH_SIZE` number of blocks in response. + let len_of_response = cmp::min( + max_response_len, + self.global_state_lock + .cli() + .max_number_of_blocks_before_syncing, + ); + let len_of_response = cmp::max(len_of_response, MINIMUM_BLOCK_BATCH_SIZE); + let len_of_response = cmp::min(len_of_response, STANDARD_BLOCK_BATCH_SIZE); + + let mut digests_of_returned_blocks = Vec::with_capacity(len_of_response); + let response_start_height: u64 = peers_preferred_canonical_block.into(); + let mut i: u64 = 1; + let global_state = self.global_state_lock.lock_guard().await; + while digests_of_returned_blocks.len() < len_of_response { + match global_state + .chain + .archival_state() + .archival_block_mmr + .try_get_leaf(response_start_height + i) + .await + { + Some(digest) => { + digests_of_returned_blocks.push(digest); + } + None => break, + } + i += 1; + } + + let mut returned_blocks: Vec = + Vec::with_capacity(digests_of_returned_blocks.len()); + for block_digest in digests_of_returned_blocks { + let block = global_state + .chain + .archival_state() + .get_block(block_digest) + .await? + .unwrap(); + returned_blocks.push(block.try_into().unwrap()); + } + + debug!( + "Returning {} blocks in batch response", + returned_blocks.len() + ); + + let response = PeerMessage::BlockResponseBatch(returned_blocks); + peer.send(response).await?; + + Ok(KEEP_CONNECTION_ALIVE) + } + PeerMessage::BlockResponseBatch(t_blocks) => { + log_slow_scope!(fn_name!() + "::PeerMessage::BlockResponseBatch"); + + debug!( + "handling block response batch with {} blocks", + t_blocks.len() + ); + if t_blocks.len() < MINIMUM_BLOCK_BATCH_SIZE { + warn!("Got smaller batch response than allowed"); + self.punish(NegativePeerSanction::TooShortBlockBatch) + .await?; + return Ok(KEEP_CONNECTION_ALIVE); + } + + // Verify that we are in fact in syncing mode + // TODO: Seperate peer messages into those allowed under syncing + // and those that are not + if !self.global_state_lock.lock_guard().await.net.syncing { + warn!("Received a batch of blocks without being in syncing mode"); + self.punish(NegativePeerSanction::ReceivedBatchBlocksOutsideOfSync) + .await?; + return Ok(KEEP_CONNECTION_ALIVE); + } + + // Verify that the response matches the current state + // We get the latest block from the DB here since this message is + // only valid for archival nodes. + let first_blocks_parent_digest: Digest = t_blocks[0].header.prev_block_digest; + let most_canonical_own_block_match: Option = self + .global_state_lock + .lock_guard() + .await + .chain + .archival_state() + .get_block(first_blocks_parent_digest) + .await + .expect("Block lookup must succeed"); + let most_canonical_own_block_match: Block = match most_canonical_own_block_match { + Some(block) => block, + None => { + warn!("Got batch reponse with invalid start block"); + self.punish(NegativePeerSanction::BatchBlocksInvalidStartHeight) + .await?; + return Ok(KEEP_CONNECTION_ALIVE); + } + }; + + // Convert all blocks to Block objects + debug!( + "Found own block of height {} to match received batch", + most_canonical_own_block_match.kernel.header.height + ); + let mut received_blocks = vec![]; + for t_block in t_blocks { + match Block::try_from(t_block) { + Ok(block) => { + received_blocks.push(block); + } + Err(e) => { + warn!("Received invalid transfer block from peer: {e:?}"); + self.punish(NegativePeerSanction::InvalidTransferBlock) + .await?; + return Ok(KEEP_CONNECTION_ALIVE); + } + } + } + + // Get the latest block that we know of and handle all received blocks + self.handle_blocks(received_blocks, most_canonical_own_block_match) + .await?; + + // Reward happens as part of `handle_blocks`. + + Ok(KEEP_CONNECTION_ALIVE) + } + PeerMessage::UnableToSatisfyBatchRequest => { + log_slow_scope!(fn_name!() + "::PeerMessage::UnableToSatisfyBatchRequest"); + warn!( + "Peer {} reports inability to satisfy batch request.", + self.peer_address + ); + + Ok(KEEP_CONNECTION_ALIVE) + } PeerMessage::Handshake(_) => { log_slow_scope!(fn_name!() + "::PeerMessage::Handshake");