Skip to content

Commit

Permalink
style(peer_loop): Improve try_ensure_path readability
Browse files Browse the repository at this point in the history
Also:
 - Reduce severity of for resolution errors, since we now default to
   1'000 blocks, and we don't want this to cause peer bans on 1st error.
 - Derive `Copy` for `BlockHeader`.

Co-authored-by: Alan Szepieniec <[email protected]>
  • Loading branch information
Sword-Smith and aszepieniec committed Jan 22, 2025
1 parent 05a5d8b commit 3e88e47
Show file tree
Hide file tree
Showing 7 changed files with 76 additions and 99 deletions.
2 changes: 1 addition & 1 deletion src/models/blockchain/block/block_header.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ pub(crate) const ADVANCE_DIFFICULTY_CORRECTION_FACTOR: usize = 4;

pub(crate) const BLOCK_HEADER_VERSION: BFieldElement = BFieldElement::new(0);

#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq, BFieldCodec, GetSize)]
#[derive(Copy, Clone, Debug, Serialize, Deserialize, PartialEq, Eq, BFieldCodec, GetSize)]
#[cfg_attr(any(test, feature = "arbitrary-impls"), derive(Arbitrary))]
pub struct BlockHeader {
pub version: BFieldElement,
Expand Down
2 changes: 1 addition & 1 deletion src/models/peer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -264,7 +264,7 @@ impl Sanction for NegativePeerSanction {
NegativePeerSanction::InvalidBlock(_) => -10,
NegativePeerSanction::DifferentGenesis => i32::MIN,
NegativePeerSanction::ForkResolutionError((_height, count, _digest)) => {
i32::from(count).saturating_mul(-3)
i32::from(count).saturating_mul(-1)
}
NegativePeerSanction::SynchronizationTimeout => -5,
NegativePeerSanction::FloodPeerListResponse => -2,
Expand Down
2 changes: 1 addition & 1 deletion src/models/peer/transfer_block.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ impl TryFrom<&Block> for TransferBlock {
}
};
Ok(Self {
header: block.kernel.header.clone(),
header: block.kernel.header,
body: block.kernel.body.clone(),
proof,
appendix: block.kernel.appendix.clone(),
Expand Down
4 changes: 2 additions & 2 deletions src/models/state/archival_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -389,7 +389,7 @@ impl ArchivalState {
.try_into()
.expect("Num addition records cannot exceed u64::MAX");
let block_record_value: BlockIndexValue = BlockIndexValue::Block(Box::new(BlockRecord {
block_header: new_block.header().clone(),
block_header: *new_block.header(),
file_location: BlockFileLocation {
file_index: last_rec.last_file,
offset: file_offset,
Expand Down Expand Up @@ -771,7 +771,7 @@ impl ArchivalState {

// If no block was found, check if digest is genesis digest
if ret.is_none() && block_digest == self.genesis_block.hash() {
ret = Some(self.genesis_block.header().clone());
ret = Some(*self.genesis_block.header());
}

ret
Expand Down
2 changes: 1 addition & 1 deletion src/models/state/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -880,7 +880,7 @@ impl GlobalState {
pub async fn get_own_handshakedata(&self) -> HandshakeData {
let listen_port = self.cli().own_listen_port();
HandshakeData {
tip_header: self.chain.light_state().header().clone(),
tip_header: *self.chain.light_state().header(),
listen_port,
network: self.cli().network,
instance_id: self.net.instance_id,
Expand Down
161 changes: 69 additions & 92 deletions src/peer_loop.rs
Original file line number Diff line number Diff line change
Expand Up @@ -352,15 +352,68 @@ impl PeerLoopHandler {
<S as Sink<PeerMessage>>::Error: std::error::Error + Sync + Send + 'static,
<S as TryStream>::Error: std::error::Error,
{
let parent_digest = received_block.kernel.header.prev_block_digest;
// Does the received block match the fork reconciliation list?
let received_block_matches_fork_reconciliation_list = if let Some(successor) =
peer_state.fork_reconciliation_blocks.last()
{
let valid = successor
.is_valid(received_block.as_ref(), self.now())
.await;
if !valid {
warn!(
"Fork reconciliation failed after receiving {} blocks: successor of received block is invalid",
peer_state.fork_reconciliation_blocks.len() + 1
);
}
valid
} else {
true
};

// Are we running out of RAM?
let too_many_blocks = peer_state.fork_reconciliation_blocks.len() + 1
>= self.global_state_lock.cli().sync_mode_threshold;
if too_many_blocks {
warn!(
"Fork reconciliation failed after receiving {} blocks: block count exceeds sync mode threshold",
peer_state.fork_reconciliation_blocks.len() + 1
);
}

// Block mismatch or too many blocks: abort!
if !received_block_matches_fork_reconciliation_list || too_many_blocks {
self.punish(NegativePeerSanction::ForkResolutionError((
received_block.header().height,
peer_state.fork_reconciliation_blocks.len() as u16,
received_block.hash(),
)))
.await?;
peer_state.fork_reconciliation_blocks = vec![];
return Ok(());
}

// otherwise, append
peer_state.fork_reconciliation_blocks.push(*received_block);

// Try fetch parent
let received_block_header = *peer_state
.fork_reconciliation_blocks
.last()
.unwrap()
.header();

let parent_digest = received_block_header.prev_block_digest;
let parent_height = received_block_header.height.previous()
.expect("transferred block must have previous height because genesis block cannot be transferred");
debug!("Try ensure path: fetching parent block");
let global_state = self.global_state_lock.lock_guard().await;
let parent_block = global_state
let parent_block = self
.global_state_lock
.lock_guard()
.await
.chain
.archival_state()
.get_block(parent_digest)
.await?;
drop(global_state);
debug!(
"Completed parent block fetching from DB: {}",
if parent_block.is_some() {
Expand All @@ -369,113 +422,37 @@ impl PeerLoopHandler {
"not found".to_string()
}
);
let parent_height = received_block.kernel.header.height.previous()
.expect("transferred block must have previous height because genesis block cannot be transferred");

// If parent is not known, request the parent, and add the current to
// the peer fork resolution list
if parent_block.is_none() && parent_height > BlockHeight::genesis() {
// If parent is not known (but not genesis) request it.
let Some(parent_block) = parent_block else {
if parent_height.is_genesis() {
peer_state.fork_reconciliation_blocks.clear();
self.punish(NegativePeerSanction::DifferentGenesis).await?;
return Ok(());
}
info!(
"Parent not known: Requesting previous block with height {} from peer",
parent_height
);

// If the received block matches the block reconciliation state
// push it there and request its parent
if peer_state.fork_reconciliation_blocks.is_empty()
|| peer_state
.fork_reconciliation_blocks
.last()
.unwrap()
.kernel
.header
.height
.previous()
.expect("fork reconcilliation blocks cannot contain genesis")
== received_block.kernel.header.height
&& peer_state.fork_reconciliation_blocks.len() + 1
< self.global_state_lock.cli().sync_mode_threshold
{
if let Some(child) = peer_state.fork_reconciliation_blocks.last() {
let valid = child.is_valid(&received_block, self.now()).await;
if !valid {
self.punish(NegativePeerSanction::InvalidBlock((
child.header().height,
child.hash(),
)))
.await?;
warn!(
"Received invalid block in fork-reconciliation process of length {}",
peer_state.fork_reconciliation_blocks.len() + 1
);
peer_state.fork_reconciliation_blocks.clear();
return Ok(());
}
}
peer_state.fork_reconciliation_blocks.push(*received_block);
} else {
// Blocks received out of order. Or more than allowed received without
// going into sync mode. Give up on block resolution attempt.
self.punish(NegativePeerSanction::ForkResolutionError((
received_block.kernel.header.height,
peer_state.fork_reconciliation_blocks.len() as u16,
received_block.hash(),
)))
.await?;
warn!(
"Fork reconciliation failed after receiving {} blocks",
peer_state.fork_reconciliation_blocks.len() + 1
);
peer_state.fork_reconciliation_blocks = vec![];
return Ok(());
}

peer.send(PeerMessage::BlockRequestByHash(parent_digest))
.await?;

return Ok(());
}

// We got all the way back to genesis, but disagree about genesis. Ban peer.
if parent_block.is_none() && parent_height == BlockHeight::genesis() {
self.punish(NegativePeerSanction::DifferentGenesis).await?;
return Ok(());
}
};

// We want to treat the received fork reconciliation blocks (plus the
// received block) in reverse order, from oldest to newest, because
// they were requested from high to low block height.
let mut new_blocks = peer_state.fork_reconciliation_blocks.clone();
new_blocks.push(*received_block);
new_blocks.reverse();

// Reset the fork resolution state since we got all the way back to find a block that we have
// Reset the fork resolution state since we got all the way back to a
// block that we have.
let fork_reconciliation_event = !peer_state.fork_reconciliation_blocks.is_empty();
peer_state.fork_reconciliation_blocks = vec![];

// Sanity check, that the blocks are correctly sorted (they should be)
// TODO: This has failed: Investigate!
// See: https://neptune.builders/core-team/neptune-core/issues/125
// TODO: This assert should be replaced with something to punish or disconnect
// from a peer instead. It can be used by a malevolent peer to crash peer nodes.
let mut new_blocks_sorted_check = new_blocks.clone();
new_blocks_sorted_check.sort_by(|a, b| a.kernel.header.height.cmp(&b.kernel.header.height));
assert_eq!(
new_blocks_sorted_check,
new_blocks,
"Block list in fork resolution must be sorted. Got blocks in this order: {}",
new_blocks
.iter()
.map(|b| b.kernel.header.height.to_string())
.join(", ")
);
peer_state.fork_reconciliation_blocks.clear();

// Parent block is guaranteed to be set here. Because: either it was fetched from the
// database, or it's the genesis block.
if let Some(new_block_height) = self
.handle_blocks(new_blocks, parent_block.unwrap())
.await?
{
if let Some(new_block_height) = self.handle_blocks(new_blocks, parent_block).await? {
// If `BlockNotification` was received during a block reconciliation
// event, then the peer might have one (or more (unlikely)) blocks
// that we do not have. We should thus request those blocks.
Expand Down
2 changes: 1 addition & 1 deletion src/rpc_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1421,7 +1421,7 @@ impl RPC for NeptuneRPCServer {
log_slow_scope!(fn_name!() + "::hash() tip digest");
state.chain.light_state().hash()
};
let tip_header = state.chain.light_state().header().clone();
let tip_header = *state.chain.light_state().header();
let wallet_status = {
log_slow_scope!(fn_name!() + "::get_wallet_status_for_tip()");
state.get_wallet_status_for_tip().await
Expand Down

0 comments on commit 3e88e47

Please sign in to comment.