diff --git a/src/lib.rs b/src/lib.rs index 8df052a0..152979f7 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -158,8 +158,7 @@ pub async fn initialize(cli_args: cli_args::Args) -> Result { // Create handshake data which is used when connecting to outgoing peers specified in the // CLI arguments - let syncing = false; - let networking_state = NetworkingState::new(peer_map, peer_databases, syncing); + let networking_state = NetworkingState::new(peer_map, peer_databases); let light_state: LightState = LightState::from(latest_block.clone()); let blockchain_archival_state = BlockchainArchivalState { diff --git a/src/main_loop.rs b/src/main_loop.rs index 02374805..83192eee 100644 --- a/src/main_loop.rs +++ b/src/main_loop.rs @@ -50,6 +50,7 @@ use crate::models::peer::PeerSynchronizationState; use crate::models::proof_abstractions::tasm::program::TritonVmProofJobOptions; use crate::models::state::block_proposal::BlockProposal; use crate::models::state::mempool::TransactionOrigin; +use crate::models::state::networking_state::SyncAnchor; use crate::models::state::tx_proving_capability::TxProvingCapability; use crate::models::state::GlobalState; use crate::models::state::GlobalStateLock; @@ -67,8 +68,18 @@ const EXPECTED_UTXOS_PRUNE_INTERVAL_IN_SECS: u64 = 19 * 60; // 19 mins const TRANSACTION_UPGRADE_CHECK_INTERVAL_IN_SECONDS: u64 = 60; // 1 minute const SANCTION_PEER_TIMEOUT_FACTOR: u64 = 40; + +/// Number of seconds within which an individual peer is expected to respond +/// to a synchronization request. +const INDIVIDUAL_PEER_SYNCHRONIZATION_TIMEOUT_IN_SECONDS: u64 = + SANCTION_PEER_TIMEOUT_FACTOR * SYNC_REQUEST_INTERVAL_IN_SECONDS; + +/// Number of seconds that a synchronization may run without any progress. +const GLOBAL_SYNCHRONIZATION_TIMEOUT_IN_SECONDS: u64 = + INDIVIDUAL_PEER_SYNCHRONIZATION_TIMEOUT_IN_SECONDS * 4; + const POTENTIAL_PEER_MAX_COUNT_AS_A_FACTOR_OF_MAX_PEERS: usize = 20; -const STANDARD_BATCH_BLOCK_LOOKBEHIND_SIZE: usize = 100; +pub(crate) const MAX_NUM_DIGESTS_IN_BATCH_REQUEST: usize = 200; const TX_UPDATER_CHANNEL_CAPACITY: usize = 1; /// Wraps a transmission channel. @@ -144,19 +155,13 @@ impl MutableMainLoopState { } /// handles batch-downloading of blocks if we are more than n blocks behind +#[derive(Default, Debug)] struct SyncState { peer_sync_states: HashMap, last_sync_request: Option<(SystemTime, BlockHeight, SocketAddr)>, } impl SyncState { - fn default() -> Self { - Self { - peer_sync_states: HashMap::new(), - last_sync_request: None, - } - } - fn record_request( &mut self, requested_block_height: BlockHeight, @@ -176,9 +181,11 @@ impl SyncState { .collect() } - /// Determine if a peer should be sanctioned for failing to respond to a synchronization - /// request. Also determine if a new request should be made or the previous one should be - /// allowed to run for longer. + /// Determine if a peer should be sanctioned for failing to respond to a + /// synchronization request fast enough. Also determine if a new request + /// should be made or the previous one should be allowed to run for longer. + /// + /// Returns (peer to be sanctioned, attempt new request). fn get_status_of_last_request( &self, current_block_height: BlockHeight, @@ -196,9 +203,7 @@ impl SyncState { // The last sync request updated the state (None, true) } else if req_time - + Duration::from_secs( - SANCTION_PEER_TIMEOUT_FACTOR * SYNC_REQUEST_INTERVAL_IN_SECONDS, - ) + + Duration::from_secs(INDIVIDUAL_PEER_SYNCHRONIZATION_TIMEOUT_IN_SECONDS) < now { // The last sync request was not answered, sanction peer @@ -359,7 +364,7 @@ fn stay_in_sync_mode( .values() .max_by_key(|x| x.claimed_max_pow); match max_claimed_pow { - None => false, // we lost all connections. Can't sync. + None => false, // No peer have passed the sync challenge phase. // Synchronization is left when the remaining number of block is half of what has // been indicated to fit into RAM @@ -592,18 +597,34 @@ impl MainLoopHandler { // The peer tasks also check this condition, if block is more canonical than current // tip, but we have to check it again since the block update might have already been applied // through a message from another peer (or from own miner). - // TODO: Is this check right? We might still want to store the blocks even though - // they are not more canonical than what we currently have, in the case of deep reorganizations - // that is. This check fails to correctly resolve deep reorganizations. Should that be fixed, - // or should deep reorganizations simply be fixed by clearing the database? let mut global_state_mut = self.global_state_lock.lock_guard_mut().await; + let new_canonical = + global_state_mut.incoming_block_is_more_canonical(&last_block); + + if !new_canonical { + // The blocks are not canonical, but: if we are in sync + // mode and these blocks beat our current champion, then + // we store them anyway, without marking them as tip. + let Some(sync_anchor) = global_state_mut.net.sync_anchor.as_mut() else { + warn!( + "Blocks were not new, and we're not syncing. Not storing blocks." + ); + return Ok(()); + }; + if sync_anchor + .champion + .is_some_and(|(height, _)| height >= last_block.header().height) + { + warn!("Repeated blocks received in sync mode, not storing"); + return Ok(()); + } + + sync_anchor.catch_up(last_block.header().height, last_block.hash()); - if !global_state_mut.incoming_block_is_more_canonical(&last_block) { - warn!("Blocks were not new. Not storing blocks."); + for block in blocks { + global_state_mut.store_block_not_tip(block).await?; + } - // TODO: Consider fixing deep reorganization problem described above. - // Alternatively set the `sync_mode_threshold` value higher - // if this problem is encountered. return Ok(()); } @@ -617,7 +638,7 @@ impl MainLoopHandler { self.main_to_miner_tx.send(MainToMiner::WaitForContinue); // Get out of sync mode if needed - if global_state_mut.net.syncing { + if global_state_mut.net.sync_anchor.is_some() { let stay_in_sync_mode = stay_in_sync_mode( &last_block.kernel.header, &main_loop_state.sync_state, @@ -625,7 +646,7 @@ impl MainLoopHandler { ); if !stay_in_sync_mode { info!("Exiting sync mode"); - global_state_mut.net.syncing = false; + global_state_mut.net.sync_anchor = None; self.main_to_miner_tx.send(MainToMiner::StopSyncing); } } @@ -669,36 +690,39 @@ impl MainLoopHandler { // Inform miner about new block. self.main_to_miner_tx.send(MainToMiner::NewBlock); } - PeerTaskToMain::AddPeerMaxBlockHeight(( - socket_addr, - claimed_max_height, - claimed_max_accumulative_pow, - )) => { + PeerTaskToMain::AddPeerMaxBlockHeight { + peer_address, + claimed_height, + claimed_cumulative_pow, + claimed_block_mmra, + } => { log_slow_scope!(fn_name!() + "::PeerTaskToMain::AddPeerMaxBlockHeight"); let claimed_state = - PeerSynchronizationState::new(claimed_max_height, claimed_max_accumulative_pow); + PeerSynchronizationState::new(claimed_height, claimed_cumulative_pow); main_loop_state .sync_state .peer_sync_states - .insert(socket_addr, claimed_state); + .insert(peer_address, claimed_state); // Check if synchronization mode should be activated. // Synchronization mode is entered if accumulated PoW exceeds // our tip and if the height difference is positive and beyond // a threshold value. - // TODO: If we are not checking the PoW claims of the tip this - // can be abused by forcing the client into synchronization - // mode. let mut global_state_mut = self.global_state_lock.lock_guard_mut().await; - if global_state_mut - .sync_mode_criterion(claimed_max_height, claimed_max_accumulative_pow) + if global_state_mut.sync_mode_criterion(claimed_height, claimed_cumulative_pow) + && global_state_mut + .net + .sync_anchor + .as_ref() + .is_none_or(|sa| sa.cumulative_proof_of_work < claimed_cumulative_pow) { info!( - "Entering synchronization mode due to peer {} indicating tip height {}; pow family: {:?}", - socket_addr, claimed_max_height, claimed_max_accumulative_pow - ); - global_state_mut.net.syncing = true; + "Entering synchronization mode due to peer {} indicating tip height {}; cumulative pow: {:?}", + peer_address, claimed_height, claimed_cumulative_pow + ); + global_state_mut.net.sync_anchor = + Some(SyncAnchor::new(claimed_cumulative_pow, claimed_block_mmra)); self.main_to_miner_tx.send(MainToMiner::StartSyncing); } } @@ -717,7 +741,7 @@ impl MainLoopHandler { // Get out of sync mode if needed. let mut global_state_mut = self.global_state_lock.lock_guard_mut().await; - if global_state_mut.net.syncing { + if global_state_mut.net.sync_anchor.is_some() { let stay_in_sync_mode = stay_in_sync_mode( global_state_mut.chain.light_state().header(), &main_loop_state.sync_state, @@ -725,7 +749,7 @@ impl MainLoopHandler { ); if !stay_in_sync_mode { info!("Exiting sync mode"); - global_state_mut.net.syncing = false; + global_state_mut.net.sync_anchor = None; } } } @@ -1023,22 +1047,56 @@ impl MainLoopHandler { Ok(()) } + /// Return a list of block heights for a block-batch request. + /// + /// Returns an ordered list of the heights of *most preferred block* + /// to build on, where current tip is always the most preferred block. + /// + /// Uses a factor to ensure that the peer will always have something to + /// build on top of by providing potential starting points all the way + /// back to genesis. + fn batch_request_uca_candidate_heights(own_tip_height: BlockHeight) -> Vec { + let mut look_behind = 0; + let mut ret = vec![]; + + // A factor of 1.07 can look back ~1m blocks in 200 digests. + const FACTOR: f64 = 1.07f64; + while ret.len() < MAX_NUM_DIGESTS_IN_BATCH_REQUEST - 1 { + let height = match own_tip_height.checked_sub(look_behind) { + None => break, + Some(height) => { + if height.is_genesis() { + break; + } else { + height + } + } + }; + + ret.push(height); + look_behind = ((look_behind as f64 + 1.0) * FACTOR).floor() as u64; + } + + ret.push(BlockHeight::genesis()); + + ret + } + /// Logic for requesting the batch-download of blocks from peers /// /// Locking: /// * acquires `global_state_lock` for read - async fn block_sync(&self, main_loop_state: &mut MutableMainLoopState) -> Result<()> { + async fn block_sync(&mut self, main_loop_state: &mut MutableMainLoopState) -> Result<()> { let global_state = self.global_state_lock.lock_guard().await; // Check if we are in sync mode - if !global_state.net.syncing { + let Some(anchor) = &global_state.net.sync_anchor else { return Ok(()); - } + }; info!("Running sync"); - // Check when latest batch of blocks was requested - let (current_block_hash, current_block_height, current_block_proof_of_work_family) = ( + let (own_tip_hash, own_tip_height, own_cumulative_pow) = ( global_state.chain.light_state().hash(), global_state.chain.light_state().kernel.header.height, global_state @@ -1049,9 +1107,38 @@ impl MainLoopHandler { .cumulative_proof_of_work, ); + // Check if sync mode has timed out entirely, in which case it should + // be abandoned. + let anchor = anchor.to_owned(); + if self.now().duration_since(anchor.updated)?.as_secs() + > GLOBAL_SYNCHRONIZATION_TIMEOUT_IN_SECONDS + { + warn!("Sync mode has timed out. Abandoning sync mode."); + + // Abandon attempt, and punish all peers claiming to serve these + // blocks. + drop(global_state); + self.global_state_lock + .lock_guard_mut() + .await + .net + .sync_anchor = None; + + let peers_to_punish = main_loop_state + .sync_state + .get_potential_peers_for_sync_request(own_cumulative_pow); + + for peer in peers_to_punish { + self.main_to_peer_broadcast_tx + .send(MainToPeerTask::PeerSynchronizationTimeout(peer))?; + } + + return Ok(()); + } + let (peer_to_sanction, try_new_request): (Option, bool) = main_loop_state .sync_state - .get_status_of_last_request(current_block_height, self.now()); + .get_status_of_last_request(own_tip_height, self.now()); // Sanction peer if they failed to respond if let Some(peer) = peer_to_sanction { @@ -1070,7 +1157,7 @@ impl MainLoopHandler { // Pick a random peer that has reported to have relevant blocks let candidate_peers = main_loop_state .sync_state - .get_potential_peers_for_sync_request(current_block_proof_of_work_family); + .get_potential_peers_for_sync_request(own_cumulative_pow); let mut rng = thread_rng(); let chosen_peer = candidate_peers.choose(&mut rng); assert!( @@ -1078,35 +1165,44 @@ impl MainLoopHandler { "A synchronization candidate must be available for a request. Otherwise the data structure is in an invalid state and syncing should not be active" ); - // Find the blocks to request - let tip_digest = current_block_hash; - let most_canonical_digests = global_state - .chain - .archival_state() - .get_ancestor_block_digests(tip_digest, STANDARD_BATCH_BLOCK_LOOKBEHIND_SIZE) - .await; - - // List of digests, ordered after which block we would like to find descendents from, - // from highest to lowest. - let most_canonical_digests = [vec![tip_digest], most_canonical_digests].concat(); + let ordered_preferred_block_digests = match anchor.champion { + Some((_height, digest)) => vec![digest], + None => { + // Find candidate-UCA digests based on a sparse distribution of + // block heights skewed towards own tip height + let request_heights = Self::batch_request_uca_candidate_heights(own_tip_height); + let mut ordered_preferred_block_digests = vec![]; + for height in request_heights { + let digest = global_state + .chain + .archival_state() + .archival_block_mmr + .get_leaf_async(height.into()) + .await; + ordered_preferred_block_digests.push(digest); + } + ordered_preferred_block_digests + } + }; // Send message to the relevant peer loop to request the blocks let chosen_peer = chosen_peer.unwrap(); info!( "Sending block batch request to {}\nrequesting blocks descending from {}\n height {}", - chosen_peer, current_block_hash, current_block_height + chosen_peer, own_tip_hash, own_tip_height ); self.main_to_peer_broadcast_tx .send(MainToPeerTask::RequestBlockBatch( MainToPeerTaskBatchBlockRequest { peer_addr_target: *chosen_peer, - known_blocks: most_canonical_digests, + known_blocks: ordered_preferred_block_digests, + anchor_mmr: anchor.block_mmr.clone(), }, )) .expect("Sending message to peers must succeed"); // Record that this request was sent to the peer - let requested_block_height = current_block_height.next(); + let requested_block_height = own_tip_height.next(); main_loop_state .sync_state .record_request(requested_block_height, *chosen_peer, self.now()); @@ -1136,7 +1232,7 @@ impl MainLoopHandler { .proof_upgrader_task .as_ref() .is_some_and(|x| !x.is_finished()); - Ok(!global_state.net.syncing + Ok(global_state.net.sync_anchor.is_none() && global_state.proving_capability() == TxProvingCapability::SingleProof && !previous_upgrade_task_is_still_running && tx_upgrade_interval @@ -1684,6 +1780,139 @@ mod test { } } + mod sync_mode { + use tasm_lib::twenty_first::util_types::mmr::mmr_accumulator::MmrAccumulator; + use test_strategy::proptest; + + use super::*; + use crate::tests::shared::get_dummy_socket_address; + + #[proptest] + fn batch_request_heights_prop(#[strategy(0u64..100_000_000_000)] own_height: u64) { + batch_request_heights_sanity(own_height); + } + + #[test] + fn batch_request_heights_unit() { + let own_height = 1_000_000u64; + batch_request_heights_sanity(own_height); + } + + fn batch_request_heights_sanity(own_height: u64) { + let heights = MainLoopHandler::batch_request_uca_candidate_heights(own_height.into()); + + let mut heights_rev = heights.clone(); + heights_rev.reverse(); + assert!( + heights_rev.is_sorted(), + "Heights must be sorted from high-to-low" + ); + + heights_rev.dedup(); + assert_eq!(heights_rev.len(), heights.len(), "duplicates"); + + assert_eq!(heights[0], own_height.into(), "starts with own tip height"); + assert!( + heights.last().unwrap().is_genesis(), + "ends with genesis block" + ); + } + + #[tokio::test] + #[traced_test] + async fn sync_mode_abandoned_on_global_timeout() { + let test_setup = setup(0).await; + let TestSetup { + task_join_handles, + mut main_loop_handler, + .. + } = test_setup; + + let mut mutable_main_loop_state = MutableMainLoopState::new(task_join_handles); + + main_loop_handler + .block_sync(&mut mutable_main_loop_state) + .await + .expect("Must return OK when no sync mode is set"); + + // Mock that we are in a valid sync state + let claimed_max_height = 1_000u64.into(); + let claimed_max_pow = ProofOfWork::new([100; 6]); + main_loop_handler + .global_state_lock + .lock_guard_mut() + .await + .net + .sync_anchor = Some(SyncAnchor::new( + claimed_max_pow, + MmrAccumulator::new_from_leafs(vec![]), + )); + mutable_main_loop_state.sync_state.peer_sync_states.insert( + get_dummy_socket_address(0), + PeerSynchronizationState::new(claimed_max_height, claimed_max_pow), + ); + + let sync_start_time = main_loop_handler + .global_state_lock + .lock_guard() + .await + .net + .sync_anchor + .as_ref() + .unwrap() + .updated; + main_loop_handler + .block_sync(&mut mutable_main_loop_state) + .await + .expect("Must return OK when sync mode has not timed out yet"); + assert!( + main_loop_handler + .global_state_lock + .lock_guard() + .await + .net + .sync_anchor + .is_some(), + "Sync mode must still be set before timeout has occurred" + ); + + assert_eq!( + sync_start_time, + main_loop_handler + .global_state_lock + .lock_guard() + .await + .net + .sync_anchor + .as_ref() + .unwrap() + .updated, + "timestamp may not be updated without state change" + ); + + // Mock that sync-mode has timed out + main_loop_handler = main_loop_handler.with_mocked_time( + SystemTime::now() + + Duration::from_secs(GLOBAL_SYNCHRONIZATION_TIMEOUT_IN_SECONDS + 1), + ); + + main_loop_handler + .block_sync(&mut mutable_main_loop_state) + .await + .expect("Must return OK when sync mode has timed out"); + assert!( + main_loop_handler + .global_state_lock + .lock_guard() + .await + .net + .sync_anchor + .is_none(), + "Sync mode must be unset on timeout" + ); + } + } + mod proof_upgrader { use super::*; use crate::job_queue::triton_vm::TritonVmJobQueue; diff --git a/src/mine_loop.rs b/src/mine_loop.rs index 1398f50b..3f69d757 100644 --- a/src/mine_loop.rs +++ b/src/mine_loop.rs @@ -620,7 +620,9 @@ pub(crate) async fn mine( let (guesser_tx, guesser_rx) = oneshot::channel::(); let (composer_tx, composer_rx) = oneshot::channel::<(Block, Vec)>(); - let is_syncing = global_state_lock.lock(|s| s.net.syncing).await; + let is_syncing = global_state_lock + .lock(|s| s.net.sync_anchor.is_some()) + .await; let maybe_proposal = global_state_lock.lock_guard().await.block_proposal.clone(); let guess = cli_args.guess; @@ -1577,7 +1579,7 @@ pub(crate) mod mine_loop_tests { guess_worker( block, - prev_block.header().clone(), + *prev_block.header(), worker_task_tx, composer_utxos, sleepy_guessing, @@ -1774,7 +1776,7 @@ pub(crate) mod mine_loop_tests { let mut rng = thread_rng(); let mut counter = 0; let mut successor_block = Block::new( - successor_header.clone(), + successor_header, successor_body.clone(), appendix, BlockProof::Invalid, diff --git a/src/models/blockchain/block/block_header.rs b/src/models/blockchain/block/block_header.rs index 115bf592..8551ce5b 100644 --- a/src/models/blockchain/block/block_header.rs +++ b/src/models/blockchain/block/block_header.rs @@ -60,7 +60,7 @@ pub(crate) const ADVANCE_DIFFICULTY_CORRECTION_FACTOR: usize = 4; pub(crate) const BLOCK_HEADER_VERSION: BFieldElement = BFieldElement::new(0); -#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq, BFieldCodec, GetSize)] +#[derive(Copy, Clone, Debug, Serialize, Deserialize, PartialEq, Eq, BFieldCodec, GetSize)] #[cfg_attr(any(test, feature = "arbitrary-impls"), derive(Arbitrary))] pub struct BlockHeader { pub version: BFieldElement, diff --git a/src/models/blockchain/block/validity/block_primitive_witness.rs b/src/models/blockchain/block/validity/block_primitive_witness.rs index 1d7292dd..35808681 100644 --- a/src/models/blockchain/block/validity/block_primitive_witness.rs +++ b/src/models/blockchain/block/validity/block_primitive_witness.rs @@ -297,7 +297,7 @@ pub(crate) mod test { (parent_header, parent_body, parent_appendix).prop_flat_map( move |(header, body, appendix)| { let parent_kernel = BlockKernel { - header: header.clone(), + header, body: body.clone(), appendix: appendix.clone(), }; diff --git a/src/models/channel.rs b/src/models/channel.rs index 7e917146..7670455f 100644 --- a/src/models/channel.rs +++ b/src/models/channel.rs @@ -3,6 +3,7 @@ use std::net::SocketAddr; use serde::Deserialize; use serde::Serialize; use tasm_lib::triton_vm::prelude::Digest; +use tasm_lib::twenty_first::util_types::mmr::mmr_accumulator::MmrAccumulator; use super::blockchain::block::block_height::BlockHeight; use super::blockchain::block::difficulty_control::ProofOfWork; @@ -82,6 +83,10 @@ pub struct MainToPeerTaskBatchBlockRequest { /// that the we would prefer to build on top off, if it belongs to the /// canonical chain. pub(crate) known_blocks: Vec, + + /// The block MMR accumulator relative to which incoming blocks are + /// authenticated. + pub(crate) anchor_mmr: MmrAccumulator, } #[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)] @@ -136,7 +141,15 @@ impl MainToPeerTask { #[derive(Clone, Debug, PartialEq, Eq)] pub(crate) enum PeerTaskToMain { NewBlocks(Vec), - AddPeerMaxBlockHeight((SocketAddr, BlockHeight, ProofOfWork)), + AddPeerMaxBlockHeight { + peer_address: SocketAddr, + claimed_height: BlockHeight, + claimed_cumulative_pow: ProofOfWork, + + /// The MMR *after* adding the tip hash, so not the one contained in the + /// tip, but in its child. + claimed_block_mmra: MmrAccumulator, + }, RemovePeerMaxBlockHeight(SocketAddr), PeerDiscoveryAnswer((Vec<(SocketAddr, u128)>, SocketAddr, u8)), // ([(peer_listen_address)], reported_by, distance) Transaction(Box), @@ -154,7 +167,7 @@ impl PeerTaskToMain { pub fn get_type(&self) -> String { match self { PeerTaskToMain::NewBlocks(_) => "new blocks", - PeerTaskToMain::AddPeerMaxBlockHeight(_) => "add peer max block height", + PeerTaskToMain::AddPeerMaxBlockHeight { .. } => "add peer max block height", PeerTaskToMain::RemovePeerMaxBlockHeight(_) => "remove peer max block height", PeerTaskToMain::PeerDiscoveryAnswer(_) => "peer discovery answer", PeerTaskToMain::Transaction(_) => "transaction", diff --git a/src/models/peer.rs b/src/models/peer.rs index 54739f28..481b645b 100644 --- a/src/models/peer.rs +++ b/src/models/peer.rs @@ -16,6 +16,7 @@ use serde::Deserialize; use serde::Serialize; use tasm_lib::twenty_first::prelude::Mmr; use tasm_lib::twenty_first::prelude::MmrMembershipProof; +use tasm_lib::twenty_first::util_types::mmr::mmr_accumulator::MmrAccumulator; use tracing::trace; use transaction_notification::TransactionNotification; use transfer_transaction::TransferTransaction; @@ -155,8 +156,10 @@ pub enum NegativePeerSanction { BatchBlocksInvalidStartHeight, BatchBlocksUnknownRequest, BatchBlocksRequestEmpty, + BatchBlocksRequestTooManyDigests, InvalidTransaction, UnconfirmableTransaction, + InvalidBlockMmrAuthentication, InvalidTransferBlock, @@ -221,6 +224,12 @@ impl Display for NegativePeerSanction { NegativePeerSanction::TimedOutSyncChallengeResponse => { "timed-out sync challenge response" } + NegativePeerSanction::InvalidBlockMmrAuthentication => { + "invalid block mmr authentication" + } + NegativePeerSanction::BatchBlocksRequestTooManyDigests => { + "too many digests in batch block request" + } }; write!(f, "{string}") } @@ -264,7 +273,7 @@ impl Sanction for NegativePeerSanction { NegativePeerSanction::InvalidBlock(_) => -10, NegativePeerSanction::DifferentGenesis => i32::MIN, NegativePeerSanction::ForkResolutionError((_height, count, _digest)) => { - i32::from(count).saturating_mul(-3) + i32::from(count).saturating_mul(-1) } NegativePeerSanction::SynchronizationTimeout => -5, NegativePeerSanction::FloodPeerListResponse => -2, @@ -288,6 +297,8 @@ impl Sanction for NegativePeerSanction { NegativePeerSanction::UnexpectedSyncChallengeResponse => -1, NegativePeerSanction::InvalidTransferBlock => -50, NegativePeerSanction::TimedOutSyncChallengeResponse => -50, + NegativePeerSanction::InvalidBlockMmrAuthentication => -4, + NegativePeerSanction::BatchBlocksRequestTooManyDigests => -50, } } } @@ -483,6 +494,16 @@ pub struct BlockRequestBatch { /// Indicates the maximum allowed number of blocks in the response. pub(crate) max_response_len: usize, + + /// The block MMR accumulator of the tip of the chain which the node is + /// syncing towards. Its number of leafs is the block height the node is + /// syncing towards. + /// + /// The receiver needs this value to know which MMR authentication paths to + /// attach to the blocks in the response. These paths allow the receiver of + /// a batch of blocks to verify that the received blocks are indeed + /// ancestors to a given tip. + pub(crate) anchor: MmrAccumulator, } #[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)] @@ -506,7 +527,7 @@ pub(crate) enum PeerMessage { BlockRequestByHash(Digest), BlockRequestBatch(BlockRequestBatch), // TODO: Consider restricting this in size - BlockResponseBatch(Vec), // TODO: Consider restricting this in size + BlockResponseBatch(Vec<(TransferBlock, MmrMembershipProof)>), // TODO: Consider restricting this in size UnableToSatisfyBatchRequest, SyncChallenge(SyncChallenge), @@ -741,7 +762,7 @@ impl SyncChallengeResponse { /// Determine whether the `SyncChallengeResponse` answers the given /// `IssuedSyncChallenge`, and not some other one. pub(crate) fn matches(&self, issued_challenge: IssuedSyncChallenge) -> bool { - let Ok(tip_predecessor) = Block::try_from(self.tip_parent.clone()) else { + let Ok(tip_parent) = Block::try_from(self.tip_parent.clone()) else { return false; }; let Ok(tip) = Block::try_from(self.tip.clone()) else { @@ -754,7 +775,7 @@ impl SyncChallengeResponse { .all(|((_, child), challenge_height)| child.header.height == *challenge_height) && issued_challenge.challenge.tip_digest == tip.hash() && issued_challenge.accumulated_pow == tip.header().cumulative_proof_of_work - && tip.has_proof_of_work(tip_predecessor.header()) + && tip.has_proof_of_work(tip_parent.header()) } /// Determine whether the proofs in `SyncChallengeResponse` are valid. Also @@ -772,6 +793,8 @@ impl SyncChallengeResponse { return false; } + let mut mmra_anchor = tip.body().block_mmr_accumulator.to_owned(); + mmra_anchor.append(tip.hash()); for ((parent, child), membership_proof) in self.blocks.iter().zip(self.membership_proofs.iter()) { @@ -781,8 +804,8 @@ impl SyncChallengeResponse { if !membership_proof.verify( child.header().height.into(), child.hash(), - &tip.body().block_mmr_accumulator.peaks(), - tip.header().height.into(), + &mmra_anchor.peaks(), + mmra_anchor.num_leafs(), ) { return false; } diff --git a/src/models/peer/transfer_block.rs b/src/models/peer/transfer_block.rs index cf4f079d..52abd31d 100644 --- a/src/models/peer/transfer_block.rs +++ b/src/models/peer/transfer_block.rs @@ -61,7 +61,7 @@ impl TryFrom<&Block> for TransferBlock { } }; Ok(Self { - header: block.kernel.header.clone(), + header: block.kernel.header, body: block.kernel.body.clone(), proof, appendix: block.kernel.appendix.clone(), diff --git a/src/models/state/archival_state.rs b/src/models/state/archival_state.rs index d7757d8e..31fa2a29 100644 --- a/src/models/state/archival_state.rs +++ b/src/models/state/archival_state.rs @@ -287,158 +287,151 @@ impl ArchivalState { &self.genesis_block } - /// Write a newly found block to database and to disk, and set it as tip. + /// Write a block to database and disk, without setting it as tip. /// - /// If block was already written to database, then it is only marked as - /// tip, and no write to disk occurs. Instead, the old block database entry - /// is assumed to be valid, and so is the block stored on disk. - pub(crate) async fn write_block_as_tip(&mut self, new_block: &Block) -> Result<()> { - async fn write_block( - archival_state: &mut ArchivalState, - new_block: &Block, - ) -> Result> { - // Fetch last file record to find disk location to store block. - // This record must exist in the DB already, unless this is the first block - // stored on disk. - let mut last_rec: LastFileRecord = archival_state - .block_index_db - .get(BlockIndexKey::LastFile) - .await - .map(|x| x.as_last_file_record()) - .unwrap_or_default(); + /// The caller should verify that the block is not already stored, otherwise + /// the block will be stored twice which will lead to inconsistencies. + async fn store_block( + self: &mut ArchivalState, + new_block: &Block, + ) -> Result> { + // Fetch last file record to find disk location to store block. + // This record must exist in the DB already, unless this is the first block + // stored on disk. + let mut last_rec: LastFileRecord = self + .block_index_db + .get(BlockIndexKey::LastFile) + .await + .map(|x| x.as_last_file_record()) + .unwrap_or_default(); - // Open the file that was last used for storing a block - let mut block_file_path = archival_state.data_dir.block_file_path(last_rec.last_file); - let serialized_block: Vec = bincode::serialize(new_block)?; - let serialized_block_size: u64 = serialized_block.len() as u64; + // Open the file that was last used for storing a block + let mut block_file_path = self.data_dir.block_file_path(last_rec.last_file); + let serialized_block: Vec = bincode::serialize(new_block)?; + let serialized_block_size: u64 = serialized_block.len() as u64; - // file operations are async. + let mut block_file = DataDirectory::open_ensure_parent_dir_exists(&block_file_path).await?; - let mut block_file = - DataDirectory::open_ensure_parent_dir_exists(&block_file_path).await?; + // Check if we should use the last file, or we need a new one. + if new_block_file_is_needed(&block_file, serialized_block_size).await { + last_rec = LastFileRecord { + last_file: last_rec.last_file + 1, + }; + block_file_path = self.data_dir.block_file_path(last_rec.last_file); + block_file = DataDirectory::open_ensure_parent_dir_exists(&block_file_path).await?; + } - // Check if we should use the last file, or we need a new one. - if new_block_file_is_needed(&block_file, serialized_block_size).await { - last_rec = LastFileRecord { - last_file: last_rec.last_file + 1, - }; - block_file_path = archival_state.data_dir.block_file_path(last_rec.last_file); - block_file = DataDirectory::open_ensure_parent_dir_exists(&block_file_path).await?; + debug!("Writing block to: {}", block_file_path.display()); + // Get associated file record from database, otherwise create it + let file_record_key: BlockIndexKey = BlockIndexKey::File(last_rec.last_file); + let file_record_value: Option = self + .block_index_db + .get(file_record_key.clone()) + .await + .map(|x| x.as_file_record()); + let file_record_value: FileRecord = match file_record_value { + Some(record) => record.add(serialized_block_size, new_block.header()), + None => { + assert!( + block_file.metadata().await.unwrap().len().is_zero(), + "If no file record exists, block file must be empty" + ); + FileRecord::new(serialized_block_size, new_block.header()) } + }; - debug!("Writing block to: {}", block_file_path.display()); - // Get associated file record from database, otherwise create it - let file_record_key: BlockIndexKey = BlockIndexKey::File(last_rec.last_file); - let file_record_value: Option = archival_state - .block_index_db - .get(file_record_key.clone()) - .await - .map(|x| x.as_file_record()); - let file_record_value: FileRecord = match file_record_value { - Some(record) => record.add(serialized_block_size, new_block.header()), - None => { - assert!( - block_file.metadata().await.unwrap().len().is_zero(), - "If no file record exists, block file must be empty" - ); - FileRecord::new(serialized_block_size, new_block.header()) - } - }; - - // Make room in file for mmapping and record where block starts - let pos = block_file.seek(SeekFrom::End(0)).await.unwrap(); - debug!("Size of file prior to block writing: {}", pos); - block_file - .seek(SeekFrom::Current(serialized_block_size as i64 - 1)) - .await - .unwrap(); - block_file.write_all(&[0]).await.unwrap(); - let file_offset: u64 = block_file - .seek(SeekFrom::Current(-(serialized_block_size as i64))) - .await - .unwrap(); - debug!( - "New file size: {} bytes", - block_file.metadata().await.unwrap().len() - ); + // Make room in file for mmapping and record where block starts + let pos = block_file.seek(SeekFrom::End(0)).await.unwrap(); + debug!("Size of file prior to block writing: {}", pos); + block_file + .seek(SeekFrom::Current(serialized_block_size as i64 - 1)) + .await + .unwrap(); + block_file.write_all(&[0]).await.unwrap(); + let file_offset: u64 = block_file + .seek(SeekFrom::Current(-(serialized_block_size as i64))) + .await + .unwrap(); + debug!( + "New file size: {} bytes", + block_file.metadata().await.unwrap().len() + ); - let height_record_key = BlockIndexKey::Height(new_block.header().height); - let mut blocks_at_same_height: Vec = match archival_state - .block_index_db - .get(height_record_key.clone()) - .await - { + let height_record_key = BlockIndexKey::Height(new_block.header().height); + let mut blocks_at_same_height: Vec = + match self.block_index_db.get(height_record_key.clone()).await { Some(rec) => rec.as_height_record(), None => vec![], }; - // Write to file with mmap, only map relevant part of file into memory - // we use spawn_blocking to make the blocking mmap async-friendly. - tokio::task::spawn_blocking(move || { - let mmap = unsafe { - MmapOptions::new() - .offset(pos) - .len(serialized_block_size as usize) - .map(&block_file) - .unwrap() - }; - let mut mmap: memmap2::MmapMut = mmap.make_mut().unwrap(); - mmap.deref_mut()[..].copy_from_slice(&serialized_block); - }) - .await?; + // Write to file with mmap, only map relevant part of file into memory + // we use spawn_blocking to make the blocking mmap async-friendly. + tokio::task::spawn_blocking(move || { + let mmap = unsafe { + MmapOptions::new() + .offset(pos) + .len(serialized_block_size as usize) + .map(&block_file) + .unwrap() + }; + let mut mmap: memmap2::MmapMut = mmap.make_mut().unwrap(); + mmap.deref_mut()[..].copy_from_slice(&serialized_block); + }) + .await?; - // Update block index database with newly stored block - let mut block_index_entries: Vec<(BlockIndexKey, BlockIndexValue)> = vec![]; - let block_record_key: BlockIndexKey = BlockIndexKey::Block(new_block.hash()); - let num_additions: u64 = new_block - .mutator_set_update() - .additions - .len() - .try_into() - .expect("Num addition records cannot exceed u64::MAX"); - let block_record_value: BlockIndexValue = - BlockIndexValue::Block(Box::new(BlockRecord { - block_header: new_block.header().clone(), - file_location: BlockFileLocation { - file_index: last_rec.last_file, - offset: file_offset, - block_length: serialized_block_size as usize, - }, - min_aocl_index: new_block.mutator_set_accumulator_after().aocl.num_leafs() - - num_additions, - num_additions, - })); - - block_index_entries.push((file_record_key, BlockIndexValue::File(file_record_value))); - block_index_entries.push((block_record_key, block_record_value)); - - block_index_entries - .push((BlockIndexKey::LastFile, BlockIndexValue::LastFile(last_rec))); - blocks_at_same_height.push(new_block.hash()); - block_index_entries.push(( - height_record_key, - BlockIndexValue::Height(blocks_at_same_height), - )); + // Update block index database with newly stored block + let mut block_index_entries: Vec<(BlockIndexKey, BlockIndexValue)> = vec![]; + let block_record_key: BlockIndexKey = BlockIndexKey::Block(new_block.hash()); + let num_additions: u64 = new_block + .mutator_set_update() + .additions + .len() + .try_into() + .expect("Num addition records cannot exceed u64::MAX"); + let block_record_value: BlockIndexValue = BlockIndexValue::Block(Box::new(BlockRecord { + block_header: *new_block.header(), + file_location: BlockFileLocation { + file_index: last_rec.last_file, + offset: file_offset, + block_length: serialized_block_size as usize, + }, + min_aocl_index: new_block.mutator_set_accumulator_after().aocl.num_leafs() + - num_additions, + num_additions, + })); + + block_index_entries.push((file_record_key, BlockIndexValue::File(file_record_value))); + block_index_entries.push((block_record_key, block_record_value)); + + block_index_entries.push((BlockIndexKey::LastFile, BlockIndexValue::LastFile(last_rec))); + blocks_at_same_height.push(new_block.hash()); + block_index_entries.push(( + height_record_key, + BlockIndexValue::Height(blocks_at_same_height), + )); - Ok(block_index_entries) - } + Ok(block_index_entries) + } - let block_is_new = self.get_block_header(new_block.hash()).await.is_none(); + async fn write_block_internal(&mut self, block: &Block, is_canonical_tip: bool) -> Result<()> { + let block_is_new = self.get_block_header(block.hash()).await.is_none(); let mut block_index_entries = if block_is_new { - write_block(self, new_block).await? + self.store_block(block).await? } else { warn!( "Attempted to store block but block was already stored.\nBlock digest: {}", - new_block.hash() + block.hash() ); vec![] }; - // Mark block as tip - block_index_entries.push(( - BlockIndexKey::BlockTipDigest, - BlockIndexValue::BlockTipDigest(new_block.hash()), - )); + // Mark block as tip, conditionally + if is_canonical_tip { + block_index_entries.push(( + BlockIndexKey::BlockTipDigest, + BlockIndexValue::BlockTipDigest(block.hash()), + )); + } let mut batch = WriteBatchAsync::new(); for (k, v) in block_index_entries.into_iter() { @@ -450,11 +443,29 @@ impl ArchivalState { Ok(()) } + /// Write a newly found block to database and to disk, without setting it as + /// tip. + /// + /// If block was already written to database, then this is a nop as the old + /// database entries and block stored on disk are considered valid. + pub(crate) async fn write_block_not_tip(&mut self, block: &Block) -> Result<()> { + self.write_block_internal(block, false).await + } + + /// Write a newly found block to database and to disk, and set it as tip. + /// + /// If block was already written to database, then it is only marked as + /// tip, and no write to disk occurs. Instead, the old block database entry + /// is assumed to be valid, and so is the block stored on disk. + pub(crate) async fn write_block_as_tip(&mut self, new_block: &Block) -> Result<()> { + self.write_block_internal(new_block, true).await + } + /// Add a new block as tip for the archival block MMR. /// /// All predecessors of this block must be known and stored in the block /// index database for this update to work. - pub(crate) async fn add_to_archival_block_mmr(&mut self, new_block: &Block) { + pub(crate) async fn append_to_archival_block_mmr(&mut self, new_block: &Block) { // Roll back to length of parent (accounting for genesis block), // then add new digest. let num_leafs_prior_to_this_block = new_block.header().height.into(); @@ -760,7 +771,7 @@ impl ArchivalState { // If no block was found, check if digest is genesis digest if ret.is_none() && block_digest == self.genesis_block.hash() { - ret = Some(self.genesis_block.header().clone()); + ret = Some(*self.genesis_block.header()); } ret diff --git a/src/models/state/mod.rs b/src/models/state/mod.rs index c2407c25..bdad007f 100644 --- a/src/models/state/mod.rs +++ b/src/models/state/mod.rs @@ -880,7 +880,7 @@ impl GlobalState { pub async fn get_own_handshakedata(&self) -> HandshakeData { let listen_port = self.cli().own_listen_port(); HandshakeData { - tip_header: self.chain.light_state().header().clone(), + tip_header: *self.chain.light_state().header(), listen_port, network: self.cli().network, instance_id: self.net.instance_id, @@ -1331,7 +1331,10 @@ impl GlobalState { /// /// Returns a list of update-jobs that should be /// performed by this client. - pub async fn set_new_tip(&mut self, new_block: Block) -> Result> { + pub(crate) async fn set_new_tip( + &mut self, + new_block: Block, + ) -> Result> { self.set_new_tip_internal(new_block, vec![]).await } @@ -1341,7 +1344,7 @@ impl GlobalState { /// /// Returns a list of update-jobs that should be /// performed by this client. - pub async fn set_new_self_mined_tip( + pub(crate) async fn set_new_self_mined_tip( &mut self, new_block: Block, miner_reward_utxo_infos: Vec, @@ -1350,6 +1353,26 @@ impl GlobalState { .await } + /// Store a block to client's state *without* marking this block as a new + /// tip. No validation of block happens, as this is the caller's + /// responsibility. + pub(crate) async fn store_block_not_tip(&mut self, block: Block) -> Result<()> { + crate::macros::log_scope_duration!(); + + self.chain + .archival_state_mut() + .write_block_not_tip(&block) + .await?; + + // Mempool is not updated, as it's only defined relative to the tip. + // Wallet is not updated, as it can be synced to tip at any point. + + // Flush databases + self.flush_databases().await?; + + Ok(()) + } + /// Update client's state with a new block. Block is assumed to be valid, also wrt. to PoW. /// The received block will be set as the new tip, regardless of its accumulated PoW. or its /// validity. @@ -1371,7 +1394,7 @@ impl GlobalState { self.chain .archival_state_mut() - .add_to_archival_block_mmr(&new_block) + .append_to_archival_block_mmr(&new_block) .await; // update the mutator set with the UTXOs from this block @@ -1450,7 +1473,7 @@ impl GlobalState { pub async fn resync_membership_proofs(&mut self) -> Result<()> { // Do not fix memberhip proofs if node is in sync mode, as we would otherwise // have to sync many times, instead of just *one* time once we have caught up. - if self.net.syncing { + if self.net.sync_anchor.is_some() { debug!("Not syncing MS membership proofs because we are syncing"); return Ok(()); } @@ -1550,6 +1573,10 @@ impl GlobalState { // does not match ours. That's a known deficiency of this function, // and can be fixed by correctly handling the construction of old // MMR-MPs from the current archival MMR state. + // Notice that the MMR membership proofs are relative to an MMR + // where the tip digest *has* been added. So it is not relative to + // the block MMR accumulator present in the tip block, as it only + // refers to its ancestors. block_mmr_mps.push( self.chain .archival_state() @@ -2795,7 +2822,6 @@ mod global_state_tests { ) { // Verifying light state integrity let expected_tip_digest = expected_tip.hash(); - let expected_parent_digest = expected_parent.hash(); assert_eq!(expected_tip_digest, global_state.chain.light_state().hash()); // Peeking into archival state @@ -2809,6 +2835,17 @@ mod global_state_tests { .await, "Archival state must have expected sync-label" ); + assert_eq!( + expected_tip.mutator_set_accumulator_after(), + global_state + .chain + .archival_state() + .archival_mutator_set + .ams() + .accumulator() + .await, + "Archival mutator set must match that in expected tip" + ); assert_eq!( expected_tip_digest, @@ -2863,6 +2900,8 @@ mod global_state_tests { .len(), "Exactly {expected_num_blocks_at_tip_height} blocks at height must be known" ); + + let expected_parent_digest = expected_parent.hash(); assert_eq!( expected_parent_digest, global_state @@ -2982,6 +3021,175 @@ mod global_state_tests { } } + #[traced_test] + #[tokio::test] + async fn can_store_block_without_marking_it_as_tip_1_block() { + // Verify that [GlobalState::store_block_not_tip] stores block + // correctly, and that [GlobalState::set_new_tip] can be used to + // build upon blocks stored through the former method. + let network = Network::Main; + let mut rng = thread_rng(); + let genesis_block = Block::genesis_block(network); + let wallet_secret = WalletSecret::new_random(); + + let mut alice = mock_genesis_global_state( + network, + 2, + wallet_secret.clone(), + cli_args::Args::default(), + ) + .await; + + let mut alice = alice.global_state_lock.lock_guard_mut().await; + assert_eq!(genesis_block.hash(), alice.chain.light_state().hash()); + + let cb_key = WalletSecret::new_random().nth_generation_spending_key(0); + let (block_1, _) = make_mock_block(&genesis_block, None, cb_key, rng.gen()).await; + + alice.store_block_not_tip(block_1.clone()).await.unwrap(); + assert_eq!( + genesis_block.hash(), + alice.chain.light_state().hash(), + "method may not update light state's tip" + ); + assert_eq!( + genesis_block.hash(), + alice.chain.archival_state().get_tip().await.hash(), + "method may not update archival state's tip" + ); + + alice.set_new_tip(block_1.clone()).await.unwrap(); + assert_correct_global_state(&alice, block_1.clone(), genesis_block, 1, 0).await; + } + + /// Return a list of (Block, parent) pairs, of length N. + async fn chain_of_blocks_and_parents( + network: Network, + length: usize, + ) -> Vec<(Block, Block)> { + let mut rng = thread_rng(); + let cb_key = WalletSecret::new_random().nth_generation_spending_key(0); + let mut parent = Block::genesis_block(network); + let mut chain = vec![]; + for _ in 0..length { + let (block, _) = make_mock_block(&parent, None, cb_key, rng.gen()).await; + chain.push((block.clone(), parent.clone())); + parent = block; + } + + chain + } + + #[traced_test] + #[tokio::test] + async fn can_jump_to_new_tip_over_blocks_that_were_never_tips() { + let network = Network::Main; + let wallet_secret = WalletSecret::new_random(); + let mut alice = mock_genesis_global_state( + network, + 2, + wallet_secret.clone(), + cli_args::Args::default(), + ) + .await; + let mut alice = alice.global_state_lock.lock_guard_mut().await; + + let a_length = 12; + let chain_a = chain_of_blocks_and_parents(network, a_length).await; + for (block, _) in chain_a.iter() { + alice.set_new_tip(block.to_owned()).await.unwrap(); + } + + let chain_a_tip = &chain_a[a_length - 1].0; + let chain_a_tip_parent = &chain_a[a_length - 1].1; + assert_correct_global_state( + &alice, + chain_a_tip.to_owned(), + chain_a_tip_parent.to_owned(), + 1, + 0, + ) + .await; + + // Store all blocks from a new chain, except the last, without + // marking any of them as tips. Verify no change in tip. + let b_length = 15; + let chain_b = chain_of_blocks_and_parents(network, b_length).await; + for (block, _) in chain_b.iter().take(b_length - 1) { + alice.store_block_not_tip(block.clone()).await.unwrap(); + } + assert_correct_global_state( + &alice, + chain_a_tip.to_owned(), + chain_a_tip_parent.to_owned(), + 2, + 0, + ) + .await; + + // Set chain B's last block to tip to verify that all the stored + // blocks from chain B can be used to connect it to LUCA, which in + // this case is genesis block. + let chain_b_tip = &chain_b[b_length - 1].0; + let chain_b_tip_parent = &chain_b[b_length - 1].1; + alice.set_new_tip(chain_b_tip.to_owned()).await.unwrap(); + assert_correct_global_state( + &alice, + chain_b_tip.to_owned(), + chain_b_tip_parent.to_owned(), + 1, + 0, + ) + .await; + } + + #[traced_test] + #[tokio::test] + async fn reorganization_with_blocks_that_were_never_tips_n_blocks_deep() { + // Verify that [GlobalState::store_block_not_tip] stores block + // correctly, and that [GlobalState::set_new_tip] can be used to + // build upon blocks stored through the former method. + let network = Network::Main; + let genesis_block = Block::genesis_block(network); + let wallet_secret = WalletSecret::new_random(); + + for depth in 1..=4 { + let mut alice = mock_genesis_global_state( + network, + 2, + wallet_secret.clone(), + cli_args::Args::default(), + ) + .await; + let mut alice = alice.global_state_lock.lock_guard_mut().await; + assert_eq!(genesis_block.hash(), alice.chain.light_state().hash()); + let chain_a = chain_of_blocks_and_parents(network, depth).await; + let chain_b = chain_of_blocks_and_parents(network, depth).await; + let blocks_and_parents = [chain_a, chain_b].concat(); + for (block, _) in blocks_and_parents.iter() { + alice.store_block_not_tip(block.clone()).await.unwrap(); + assert_eq!( + genesis_block.hash(), + alice.chain.light_state().hash(), + "method may not update light state's tip, depth = {depth}" + ); + assert_eq!( + genesis_block.hash(), + alice.chain.archival_state().get_tip().await.hash(), + "method may not update archival state's tip, depth = {depth}" + ); + } + + // Loop over all blocks and verify that all can be marked as + // tip, resulting in a consistent, correct state. + for (block, parent) in blocks_and_parents.iter() { + alice.set_new_tip(block.clone()).await.unwrap(); + assert_correct_global_state(&alice, block.clone(), parent.to_owned(), 2, 0) + .await; + } + } + } + #[traced_test] #[tokio::test] async fn set_new_tip_can_roll_back() { diff --git a/src/models/state/networking_state.rs b/src/models/state/networking_state.rs index aab27e7a..ed0d7170 100644 --- a/src/models/state/networking_state.rs +++ b/src/models/state/networking_state.rs @@ -4,11 +4,15 @@ use std::net::SocketAddr; use std::time::SystemTime; use anyhow::Result; +use tasm_lib::prelude::Digest; +use tasm_lib::twenty_first::util_types::mmr::mmr_accumulator::MmrAccumulator; use crate::config_models::data_directory::DataDirectory; use crate::database::create_db_if_missing; use crate::database::NeptuneLevelDb; use crate::database::WriteBatchAsync; +use crate::models::blockchain::block::block_height::BlockHeight; +use crate::models::blockchain::block::difficulty_control::ProofOfWork; use crate::models::database::PeerDatabases; use crate::models::peer; use crate::models::peer::PeerStanding; @@ -17,24 +21,73 @@ pub const BANNED_IPS_DB_NAME: &str = "banned_ips"; type PeerMap = HashMap; +/// Information about a foreign tip towards which the client is syncing. +#[derive(Debug, Clone, Eq, PartialEq)] +pub(crate) struct SyncAnchor { + /// Cumulative proof-of-work number of the target fork that we are syncing + /// towards. This number is immutable for each `SyncAnchor`. + pub(crate) cumulative_proof_of_work: ProofOfWork, + + /// The block MMR accumulator *after* appending the claimed tip digest. This + /// value is immutable for each `SyncAnchor`. + pub(crate) block_mmr: MmrAccumulator, + + /// Indicates the block that we have currently synced to under this anchor. + pub(crate) champion: Option<(BlockHeight, Digest)>, + + /// The last time this anchor was either created or updated. + pub(crate) updated: SystemTime, +} + +impl SyncAnchor { + pub(crate) fn new( + claimed_cumulative_pow: ProofOfWork, + claimed_block_mmra: MmrAccumulator, + ) -> Self { + Self { + cumulative_proof_of_work: claimed_cumulative_pow, + block_mmr: claimed_block_mmra, + champion: None, + updated: SystemTime::now(), + } + } + + pub(crate) fn catch_up(&mut self, height: BlockHeight, block_hash: Digest) { + let new_champion = Some((height, block_hash)); + let updated = SystemTime::now(); + match self.champion { + Some((current_height, _)) => { + if current_height < height { + self.champion = new_champion; + self.updated = updated; + } + } + None => { + self.champion = new_champion; + self.updated = updated; + } + }; + } +} + /// `NetworkingState` contains in-memory and persisted data for interacting /// with network peers. #[derive(Debug, Clone)] pub struct NetworkingState { - // Stores info about the peers that the client is connected to - // Peer tasks may update their own entries into this map. + /// Stores info about the peers that the client is connected to + /// Peer tasks may update their own entries into this map. pub peer_map: PeerMap, - // `peer_databases` are used to persist IPs with their standing. - // The peer tasks may update their own entries into this map. + /// `peer_databases` are used to persist IPs with their standing. + /// The peer tasks may update their own entries into this map. pub peer_databases: PeerDatabases, - // This value is only true if instance is running an archival node - // that is currently downloading blocks to catch up. - // Only the main task may update this flag - pub syncing: bool, + /// This value is only Some if the instance is running an archival node + /// that is currently in sync mode (downloading blocks in batches). + /// Only the main task may update this flag + pub(crate) sync_anchor: Option, - // Read-only value set during startup + /// Read-only value set at random during startup pub instance_id: u128, /// Timestamp for when the last tx-proof upgrade was attempted. Does not @@ -44,11 +97,11 @@ pub struct NetworkingState { } impl NetworkingState { - pub(crate) fn new(peer_map: PeerMap, peer_databases: PeerDatabases, syncing: bool) -> Self { + pub(crate) fn new(peer_map: PeerMap, peer_databases: PeerDatabases) -> Self { Self { peer_map, peer_databases, - syncing, + sync_anchor: None, instance_id: rand::random(), // Initialize to now to prevent tx proof upgrade to run immediately diff --git a/src/peer_loop.rs b/src/peer_loop.rs index fd5fd4d8..5f417054 100644 --- a/src/peer_loop.rs +++ b/src/peer_loop.rs @@ -15,6 +15,9 @@ use rand::thread_rng; use rand::Rng; use rand::SeedableRng; use tasm_lib::triton_vm::prelude::Digest; +use tasm_lib::twenty_first::prelude::Mmr; +use tasm_lib::twenty_first::prelude::MmrMembershipProof; +use tasm_lib::twenty_first::util_types::mmr::mmr_accumulator::MmrAccumulator; use tokio::select; use tokio::sync::broadcast; use tokio::sync::mpsc; @@ -26,6 +29,7 @@ use tracing::warn; use crate::connect_to_peers::close_peer_connected_callback; use crate::macros::fn_name; use crate::macros::log_slow_scope; +use crate::main_loop::MAX_NUM_DIGESTS_IN_BATCH_REQUEST; use crate::models::blockchain::block::block_height::BlockHeight; use crate::models::blockchain::block::Block; use crate::models::blockchain::transaction::Transaction; @@ -50,6 +54,7 @@ use crate::models::proof_abstractions::mast_hash::MastHash; use crate::models::proof_abstractions::timestamp::Timestamp; use crate::models::state::mempool::MEMPOOL_IGNORE_TRANSACTIONS_THIS_MANY_SECS_AHEAD; use crate::models::state::mempool::MEMPOOL_TX_THRESHOLD_AGE_IN_SECS; +use crate::models::state::GlobalState; use crate::models::state::GlobalStateLock; const STANDARD_BLOCK_BATCH_SIZE: usize = 250; @@ -206,16 +211,71 @@ impl PeerLoopHandler { } } + /// Construct a batch response, with blocks and their MMR membership proofs + /// relative to a specified anchor. + /// + /// Returns `None` if the anchor has a lower leaf count than the blocks, or + /// a block height of the response exceeds own tip height. + async fn batch_response( + state: &GlobalState, + blocks: Vec, + anchor: &MmrAccumulator, + ) -> Option> { + let own_tip_height = state.chain.light_state().header().height; + let block_heights_match_anchor = blocks + .iter() + .all(|bl| bl.header().height < anchor.num_leafs().into()); + let block_heights_known = blocks.iter().all(|bl| bl.header().height <= own_tip_height); + if !block_heights_match_anchor || !block_heights_known { + let max_block_height = match blocks.iter().map(|bl| bl.header().height).max() { + Some(height) => height.to_string(), + None => "None".to_owned(), + }; + + debug!("max_block_height: {max_block_height}"); + debug!("own_tip_height: {own_tip_height}"); + debug!("anchor.num_leafs(): {}", anchor.num_leafs()); + debug!("block_heights_match_anchor: {block_heights_match_anchor}"); + debug!("block_heights_known: {block_heights_known}"); + return None; + } + + let mut ret = vec![]; + for block in blocks { + let mmr_mp = state + .chain + .archival_state() + .archival_block_mmr + .prove_membership_relative_to_smaller_mmr( + block.header().height.into(), + anchor.num_leafs(), + ) + .await; + let block: TransferBlock = block.try_into().unwrap(); + ret.push((block, mmr_mp)); + } + + Some(ret) + } + /// Handle validation and send all blocks to the main task if they're all /// valid. Use with a list of blocks or a single block. When the /// `received_blocks` is a list, the parent of the `i+1`th block in the /// list is the `i`th block. The parent of element zero in this list is /// `parent_of_first_block`. /// - /// Returns Err when the connection should be closed; returns Ok(None) if - /// some block is invalid or if the last block is not canonical; returns - /// Ok(Some(block_height)) otherwise, referring to the largest block height - /// in the batch. + /// # Return Value + /// - `Err` when the connection should be closed; + /// - `Ok(None)` if some block is invalid + /// - `Ok(None)` if the last block has insufficient cumulative PoW and we + /// are not syncing; + /// - `Ok(None)` if the last block has insufficient height and we are + /// syncing; + /// - `Ok(Some(block_height))` otherwise, referring to the block with the + /// highest height in the batch. + /// + /// A return value of Ok(Some(_)) means that the message was passed on to + /// main loop. /// /// # Locking /// * Acquires `global_state_lock` for write via `self.punish(..)` and @@ -289,16 +349,28 @@ impl PeerLoopHandler { // evaluate the fork choice rule debug!("Checking last block's canonicity ..."); + let last_block = received_blocks.last().unwrap(); let is_canonical = self .global_state_lock .lock_guard() .await - .incoming_block_is_more_canonical(received_blocks.last().unwrap()); - debug!("is canonical? {is_canonical}"); - if !is_canonical { + .incoming_block_is_more_canonical(last_block); + let last_block_height = last_block.header().height; + let sync_mode_active_and_have_new_champion = self + .global_state_lock + .lock_guard() + .await + .net + .sync_anchor + .as_ref() + .is_some_and(|x| { + x.champion + .is_none_or(|(height, _)| height < last_block_height) + }); + if !is_canonical && !sync_mode_active_and_have_new_champion { warn!( "Received {} blocks from peer but incoming blocks are less \ - canonical than current tip.", + canonical than current tip, or current sync-champion.", received_blocks.len() ); return Ok(None); @@ -306,35 +378,36 @@ impl PeerLoopHandler { // Send the new blocks to the main task which handles the state update // and storage to the database. - let new_block_height = received_blocks.last().unwrap().header().height; let number_of_received_blocks = received_blocks.len(); self.to_main_tx .send(PeerTaskToMain::NewBlocks(received_blocks)) .await?; info!( "Updated block info by block from peer. block height {}", - new_block_height + last_block_height ); // Valuable, new, hard-to-produce information. Reward peer. self.reward(PositivePeerSanction::ValidBlocks(number_of_received_blocks)) .await?; - Ok(Some(new_block_height)) + Ok(Some(last_block_height)) } - /// Takes a single block received from a peer and (attempts to) find a path - /// from some known stored block to the received block. + /// Take a single block received from a peer and (attempt to) find a path + /// between the received block and a common ancestor stored in the blocks + /// database. /// - /// This function attempts to find the parent of a new block, either by - /// searching the database or, if necessary, by requesting it from a peer. - /// - If the parent is stored in the database, block handling continues. + /// This function attempts to find the parent of the received block, either + /// by searching the database or by requesting it from a peer. /// - If the parent is not stored, it is requested from the peer and the /// received block is pushed to the fork reconciliation list for later - /// handling by this function. - /// - /// If the parent is stored, the block and any fork reconciliation blocks - /// are passed down the pipeline. + /// handling by this function. The fork reconciliation list starts out + /// empty, but grows as more parents are requested and transmitted. + /// - If the parent is found in the database, a) block handling continues: + /// the entire list of fork reconciliation blocks are passed down the + /// pipeline, potentially leading to a state update; and b) the fork + /// reconciliation list is cleared. /// /// Locking: /// * Acquires `global_state_lock` for write via `self.punish(..)` and @@ -350,15 +423,68 @@ impl PeerLoopHandler { >::Error: std::error::Error + Sync + Send + 'static, ::Error: std::error::Error, { - let parent_digest = received_block.kernel.header.prev_block_digest; + // Does the received block match the fork reconciliation list? + let received_block_matches_fork_reconciliation_list = if let Some(successor) = + peer_state.fork_reconciliation_blocks.last() + { + let valid = successor + .is_valid(received_block.as_ref(), self.now()) + .await; + if !valid { + warn!( + "Fork reconciliation failed after receiving {} blocks: successor of received block is invalid", + peer_state.fork_reconciliation_blocks.len() + 1 + ); + } + valid + } else { + true + }; + + // Are we running out of RAM? + let too_many_blocks = peer_state.fork_reconciliation_blocks.len() + 1 + >= self.global_state_lock.cli().sync_mode_threshold; + if too_many_blocks { + warn!( + "Fork reconciliation failed after receiving {} blocks: block count exceeds sync mode threshold", + peer_state.fork_reconciliation_blocks.len() + 1 + ); + } + + // Block mismatch or too many blocks: abort! + if !received_block_matches_fork_reconciliation_list || too_many_blocks { + self.punish(NegativePeerSanction::ForkResolutionError(( + received_block.header().height, + peer_state.fork_reconciliation_blocks.len() as u16, + received_block.hash(), + ))) + .await?; + peer_state.fork_reconciliation_blocks = vec![]; + return Ok(()); + } + + // otherwise, append + peer_state.fork_reconciliation_blocks.push(*received_block); + + // Try fetch parent + let received_block_header = *peer_state + .fork_reconciliation_blocks + .last() + .unwrap() + .header(); + + let parent_digest = received_block_header.prev_block_digest; + let parent_height = received_block_header.height.previous() + .expect("transferred block must have previous height because genesis block cannot be transferred"); debug!("Try ensure path: fetching parent block"); - let global_state = self.global_state_lock.lock_guard().await; - let parent_block = global_state + let parent_block = self + .global_state_lock + .lock_guard() + .await .chain .archival_state() .get_block(parent_digest) .await?; - drop(global_state); debug!( "Completed parent block fetching from DB: {}", if parent_block.is_some() { @@ -367,97 +493,37 @@ impl PeerLoopHandler { "not found".to_string() } ); - let parent_height = received_block.kernel.header.height.previous() - .expect("transferred block must have previous height because genesis block cannot be transferred"); - // If parent is not known, request the parent, and add the current to - // the peer fork resolution list - if parent_block.is_none() && parent_height > BlockHeight::genesis() { + // If parent is not known (but not genesis) request it. + let Some(parent_block) = parent_block else { + if parent_height.is_genesis() { + peer_state.fork_reconciliation_blocks.clear(); + self.punish(NegativePeerSanction::DifferentGenesis).await?; + return Ok(()); + } info!( "Parent not known: Requesting previous block with height {} from peer", parent_height ); - // If the received block matches the block reconciliation state - // push it there and request its parent - if peer_state.fork_reconciliation_blocks.is_empty() - || peer_state - .fork_reconciliation_blocks - .last() - .unwrap() - .kernel - .header - .height - .previous() - .expect("fork reconcilliation blocks cannot contain genesis") - == received_block.kernel.header.height - && peer_state.fork_reconciliation_blocks.len() + 1 - < self.global_state_lock.cli().sync_mode_threshold - { - peer_state.fork_reconciliation_blocks.push(*received_block); - } else { - // Blocks received out of order. Or more than allowed received without - // going into sync mode. Give up on block resolution attempt. - self.punish(NegativePeerSanction::ForkResolutionError(( - received_block.kernel.header.height, - peer_state.fork_reconciliation_blocks.len() as u16, - received_block.hash(), - ))) - .await?; - warn!( - "Fork reconciliation failed after receiving {} blocks", - peer_state.fork_reconciliation_blocks.len() + 1 - ); - peer_state.fork_reconciliation_blocks = vec![]; - return Ok(()); - } - peer.send(PeerMessage::BlockRequestByHash(parent_digest)) .await?; return Ok(()); - } - - // We got all the way back to genesis, but disagree about genesis. Ban peer. - if parent_block.is_none() && parent_height == BlockHeight::genesis() { - self.punish(NegativePeerSanction::DifferentGenesis).await?; - return Ok(()); - } + }; // We want to treat the received fork reconciliation blocks (plus the // received block) in reverse order, from oldest to newest, because // they were requested from high to low block height. let mut new_blocks = peer_state.fork_reconciliation_blocks.clone(); - new_blocks.push(*received_block); new_blocks.reverse(); - // Reset the fork resolution state since we got all the way back to find a block that we have + // Reset the fork resolution state since we got all the way back to a + // block that we have. let fork_reconciliation_event = !peer_state.fork_reconciliation_blocks.is_empty(); - peer_state.fork_reconciliation_blocks = vec![]; - - // Sanity check, that the blocks are correctly sorted (they should be) - // TODO: This has failed: Investigate! - // See: https://neptune.builders/core-team/neptune-core/issues/125 - // TODO: This assert should be replaced with something to punish or disconnect - // from a peer instead. It can be used by a malevolent peer to crash peer nodes. - let mut new_blocks_sorted_check = new_blocks.clone(); - new_blocks_sorted_check.sort_by(|a, b| a.kernel.header.height.cmp(&b.kernel.header.height)); - assert_eq!( - new_blocks_sorted_check, - new_blocks, - "Block list in fork resolution must be sorted. Got blocks in this order: {}", - new_blocks - .iter() - .map(|b| b.kernel.header.height.to_string()) - .join(", ") - ); + peer_state.fork_reconciliation_blocks.clear(); - // Parent block is guaranteed to be set here. Because: either it was fetched from the - // database, or it's the genesis block. - if let Some(new_block_height) = self - .handle_blocks(new_blocks, parent_block.unwrap()) - .await? - { + if let Some(new_block_height) = self.handle_blocks(new_blocks, parent_block).await? { // If `BlockNotification` was received during a block reconciliation // event, then the peer might have one (or more (unlikely)) blocks // that we do not have. We should thus request those blocks. @@ -625,7 +691,13 @@ impl PeerLoopHandler { if block_is_new && peer_state_info.fork_reconciliation_blocks.is_empty() - && !self.global_state_lock.lock_guard().await.net.syncing + && self + .global_state_lock + .lock_guard() + .await + .net + .sync_anchor + .is_none() { debug!( "sending BlockRequestByHeight to peer for block with height {}", @@ -718,13 +790,17 @@ impl PeerLoopHandler { info!("Successful sync challenge response; relaying peer tip info to main loop."); + let mut sync_mmra_anchor = challenge_response.tip.body.block_mmr_accumulator; + sync_mmra_anchor.append(issued_challenge.challenge.tip_digest); + // Inform main loop self.to_main_tx - .send(PeerTaskToMain::AddPeerMaxBlockHeight(( - self.peer_address, - claimed_tip_height, - issued_challenge.accumulated_pow, - ))) + .send(PeerTaskToMain::AddPeerMaxBlockHeight { + peer_address: self.peer_address, + claimed_height: claimed_tip_height, + claimed_cumulative_pow: issued_challenge.accumulated_pow, + claimed_block_mmra: sync_mmra_anchor, + }) .await?; Ok(KEEP_CONNECTION_ALIVE) @@ -832,6 +908,7 @@ impl PeerLoopHandler { PeerMessage::BlockRequestBatch(BlockRequestBatch { known_blocks, max_response_len, + anchor, }) => { log_slow_scope!(fn_name!() + "::PeerMessage::BlockRequestBatch"); debug!( @@ -839,6 +916,13 @@ impl PeerLoopHandler { self.peer_address ); + if known_blocks.len() > MAX_NUM_DIGESTS_IN_BATCH_REQUEST { + self.punish(NegativePeerSanction::BatchBlocksRequestTooManyDigests) + .await?; + + return Ok(KEEP_CONNECTION_ALIVE); + } + // The last block in the list of the peers known block is the // earliest block, block with lowest height, the peer has // requested. If it does not belong to canonical chain, none of @@ -853,15 +937,15 @@ impl PeerLoopHandler { } }; - if !self - .global_state_lock - .lock_guard() - .await + let state = self.global_state_lock.lock_guard().await; + let block_mmr_num_leafs = state.chain.light_state().header().height.next().into(); + let luca_is_known = state .chain .archival_state() .block_belongs_to_canonical_chain(least_preferred) - .await - { + .await; + if !luca_is_known || anchor.num_leafs() > block_mmr_num_leafs { + drop(state); self.punish(NegativePeerSanction::BatchBlocksUnknownRequest) .await?; peer.send(PeerMessage::UnableToSatisfyBatchRequest).await?; @@ -871,17 +955,16 @@ impl PeerLoopHandler { // Happy case: At least *one* of the blocks referenced by peer // is known to us. - let mut first_block_in_response: Option = None; - { - let global_state = self.global_state_lock.lock_guard().await; + let first_block_in_response = { + let mut first_block_in_response: Option = None; for block_digest in known_blocks { - if global_state + if state .chain .archival_state() .block_belongs_to_canonical_chain(block_digest) .await { - let height = global_state + let height = state .chain .archival_state() .get_block_header(block_digest) @@ -896,42 +979,36 @@ impl PeerLoopHandler { break; } } - } - let peers_preferred_canonical_block = match first_block_in_response { - Some(block) => block, - None => { - self.punish(NegativePeerSanction::BatchBlocksUnknownRequest) - .await?; - return Ok(KEEP_CONNECTION_ALIVE); - } + first_block_in_response + .expect("existence of LUCA should have been established already.") }; debug!( - "Peer's most preferred block has height {peers_preferred_canonical_block}.\ + "Peer's most preferred block has height {first_block_in_response}.\ Now building response from that height." ); // Get the relevant blocks, at most batch-size many, descending from the // peer's (alleged) most canonical block. Don't exceed `max_response_len` // or `STANDARD_BLOCK_BATCH_SIZE` number of blocks in response. - let len_of_response = cmp::min( + let max_response_len = cmp::min( max_response_len, self.global_state_lock.cli().sync_mode_threshold, ); - let len_of_response = cmp::max(len_of_response, MINIMUM_BLOCK_BATCH_SIZE); - let len_of_response = cmp::min(len_of_response, STANDARD_BLOCK_BATCH_SIZE); + let max_response_len = cmp::max(max_response_len, MINIMUM_BLOCK_BATCH_SIZE); + let max_response_len = cmp::min(max_response_len, STANDARD_BLOCK_BATCH_SIZE); - let mut digests_of_returned_blocks = Vec::with_capacity(len_of_response); - let response_start_height: u64 = peers_preferred_canonical_block.into(); + let mut digests_of_returned_blocks = Vec::with_capacity(max_response_len); + let response_start_height: u64 = first_block_in_response.into(); let mut i: u64 = 1; - let global_state = self.global_state_lock.lock_guard().await; - while digests_of_returned_blocks.len() < len_of_response { - match global_state + while digests_of_returned_blocks.len() < max_response_len { + let block_height = response_start_height + i; + match state .chain .archival_state() .archival_block_mmr - .try_get_leaf(response_start_height + i) + .try_get_leaf(block_height) .await { Some(digest) => { @@ -942,36 +1019,47 @@ impl PeerLoopHandler { i += 1; } - let mut returned_blocks: Vec = + let mut returned_blocks: Vec = Vec::with_capacity(digests_of_returned_blocks.len()); for block_digest in digests_of_returned_blocks { - let block = global_state + let block = state .chain .archival_state() .get_block(block_digest) .await? .unwrap(); - returned_blocks.push(block.try_into().unwrap()); + returned_blocks.push(block); } - debug!( - "Returning {} blocks in batch response", - returned_blocks.len() - ); + let response = Self::batch_response(&state, returned_blocks, &anchor).await; + let response = match response { + Some(response) => response, + None => { + drop(state); + warn!("Unable to satisfy batch-block request"); + self.punish(NegativePeerSanction::BatchBlocksUnknownRequest) + .await?; + return Ok(KEEP_CONNECTION_ALIVE); + } + }; - let response = PeerMessage::BlockResponseBatch(returned_blocks); + debug!("Returning {} blocks in batch response", response.len()); + + let response = PeerMessage::BlockResponseBatch(response); peer.send(response).await?; Ok(KEEP_CONNECTION_ALIVE) } - PeerMessage::BlockResponseBatch(t_blocks) => { + PeerMessage::BlockResponseBatch(authenticated_blocks) => { log_slow_scope!(fn_name!() + "::PeerMessage::BlockResponseBatch"); debug!( "handling block response batch with {} blocks", - t_blocks.len() + authenticated_blocks.len() ); - if t_blocks.len() < MINIMUM_BLOCK_BATCH_SIZE { + + // (Alan:) why is there even a minimum? + if authenticated_blocks.len() < MINIMUM_BLOCK_BATCH_SIZE { warn!("Got smaller batch response than allowed"); self.punish(NegativePeerSanction::TooShortBlockBatch) .await?; @@ -981,17 +1069,25 @@ impl PeerLoopHandler { // Verify that we are in fact in syncing mode // TODO: Seperate peer messages into those allowed under syncing // and those that are not - if !self.global_state_lock.lock_guard().await.net.syncing { + let Some(sync_achor) = self + .global_state_lock + .lock_guard() + .await + .net + .sync_anchor + .clone() + else { warn!("Received a batch of blocks without being in syncing mode"); self.punish(NegativePeerSanction::ReceivedBatchBlocksOutsideOfSync) .await?; return Ok(KEEP_CONNECTION_ALIVE); - } + }; // Verify that the response matches the current state // We get the latest block from the DB here since this message is // only valid for archival nodes. - let first_blocks_parent_digest: Digest = t_blocks[0].header.prev_block_digest; + let (first_block, _) = &authenticated_blocks[0]; + let first_blocks_parent_digest: Digest = first_block.header.prev_block_digest; let most_canonical_own_block_match: Option = self .global_state_lock .lock_guard() @@ -1017,18 +1113,27 @@ impl PeerLoopHandler { most_canonical_own_block_match.kernel.header.height ); let mut received_blocks = vec![]; - for t_block in t_blocks { - match Block::try_from(t_block) { - Ok(block) => { - received_blocks.push(block); - } - Err(e) => { - warn!("Received invalid transfer block from peer: {e:?}"); - self.punish(NegativePeerSanction::InvalidTransferBlock) - .await?; - return Ok(KEEP_CONNECTION_ALIVE); - } + for (t_block, membership_proof) in authenticated_blocks { + let Ok(block) = Block::try_from(t_block) else { + warn!("Received invalid transfer block from peer"); + self.punish(NegativePeerSanction::InvalidTransferBlock) + .await?; + return Ok(KEEP_CONNECTION_ALIVE); + }; + + if !membership_proof.verify( + block.header().height.into(), + block.hash(), + &sync_achor.block_mmr.peaks(), + sync_achor.block_mmr.num_leafs(), + ) { + warn!("Authentication of received block fails relative to anchor"); + self.punish(NegativePeerSanction::InvalidBlockMmrAuthentication) + .await?; + return Ok(KEEP_CONNECTION_ALIVE); } + + received_blocks.push(block); } // Get the latest block that we know of and handle all received blocks @@ -1366,6 +1471,7 @@ impl PeerLoopHandler { peer.send(PeerMessage::BlockRequestBatch(BlockRequestBatch { known_blocks: batch_block_request.known_blocks, max_response_len, + anchor: batch_block_request.anchor_mmr, })) .await?; @@ -1461,7 +1567,7 @@ impl PeerLoopHandler { break; } Some(peer_msg) => { - let syncing = self.global_state_lock.lock(|s| s.net.syncing).await; + let syncing = self.global_state_lock.lock(|s| s.net.sync_anchor.is_some()).await; if peer_msg.ignore_during_sync() && syncing { debug!("Ignoring {} message during syncing, from {}", peer_msg.get_type(), self.peer_address); continue; @@ -1989,21 +2095,41 @@ mod peer_loop_tests { StdRng::seed_from_u64(5550001).gen(), ) .await; - let blocks = vec![genesis_block, block_1, block_2, block_3, block_4, block_5]; + let blocks = vec![ + genesis_block, + block_1, + block_2, + block_3, + block_4, + block_5.clone(), + ]; for block in blocks.iter().skip(1) { state_lock.set_new_tip(block.to_owned()).await.unwrap(); } + let mmra = state_lock + .lock_guard() + .await + .chain + .archival_state() + .archival_block_mmr + .to_accumulator_async() + .await; for i in 0..=4 { - let response = (i + 1..=5) - .map(|j| blocks[j].clone().try_into().unwrap()) - .collect_vec(); + let expected_response = { + let state = state_lock.lock_guard().await; + let blocks_for_response = blocks.iter().skip(i + 1).cloned().collect_vec(); + PeerLoopHandler::batch_response(&state, blocks_for_response, &mmra) + .await + .unwrap() + }; let mock = Mock::new(vec![ Action::Read(PeerMessage::BlockRequestBatch(BlockRequestBatch { known_blocks: vec![blocks[i].hash()], max_response_len: 14, + anchor: mmra.clone(), })), - Action::Write(PeerMessage::BlockResponseBatch(response)), + Action::Write(PeerMessage::BlockResponseBatch(expected_response)), Action::Read(PeerMessage::Bye), ]); let mut peer_loop_handler = PeerLoopHandler::new( @@ -2054,16 +2180,32 @@ mod peer_loop_tests { state_lock.set_new_tip(block_3_b.clone()).await?; state_lock.set_new_tip(block_3_a.clone()).await?; + let anchor = state_lock + .lock_guard() + .await + .chain + .archival_state() + .archival_block_mmr + .to_accumulator_async() + .await; + let response_1 = { + let state_lock = state_lock.lock_guard().await; + PeerLoopHandler::batch_response( + &state_lock, + vec![block_1.clone(), block_2_a.clone(), block_3_a.clone()], + &anchor, + ) + .await + .unwrap() + }; + let mut mock = Mock::new(vec![ Action::Read(PeerMessage::BlockRequestBatch(BlockRequestBatch { known_blocks: vec![genesis_block.hash()], max_response_len: 14, + anchor: anchor.clone(), })), - Action::Write(PeerMessage::BlockResponseBatch(vec![ - block_1.clone().try_into().unwrap(), - block_2_a.clone().try_into().unwrap(), - block_3_a.clone().try_into().unwrap(), - ])), + Action::Write(PeerMessage::BlockResponseBatch(response_1)), Action::Read(PeerMessage::Bye), ]); @@ -2082,15 +2224,23 @@ mod peer_loop_tests { .await?; // Peer knows block 2_b, verify that canonical chain with 2_a is returned + let response_2 = { + let state_lock = state_lock.lock_guard().await; + PeerLoopHandler::batch_response( + &state_lock, + vec![block_2_a, block_3_a.clone()], + &anchor, + ) + .await + .unwrap() + }; mock = Mock::new(vec![ Action::Read(PeerMessage::BlockRequestBatch(BlockRequestBatch { known_blocks: vec![block_2_b.hash(), block_1.hash(), genesis_block.hash()], max_response_len: 14, + anchor, })), - Action::Write(PeerMessage::BlockResponseBatch(vec![ - block_2_a.try_into().unwrap(), - block_3_a.clone().try_into().unwrap(), - ])), + Action::Write(PeerMessage::BlockResponseBatch(response_2)), Action::Read(PeerMessage::Bye), ]); @@ -2146,18 +2296,40 @@ mod peer_loop_tests { state_lock.set_new_tip(block_3_a.clone()).await?; // Peer knows block 2_b, verify that canonical chain with 2_a is returned + let mut expected_anchor = block_3_a.body().block_mmr_accumulator.clone(); + expected_anchor.append(block_3_a.hash()); + let state_anchor = state_lock + .lock_guard() + .await + .chain + .archival_state() + .archival_block_mmr + .to_accumulator_async() + .await; + assert_eq!( + expected_anchor, state_anchor, + "Catching assumption about MMRA in tip and in archival state" + ); + + let response = { + let state_lock = state_lock.lock_guard().await; + PeerLoopHandler::batch_response( + &state_lock, + vec![block_1.clone(), block_2_a, block_3_a.clone()], + &expected_anchor, + ) + .await + .unwrap() + }; let mock = Mock::new(vec![ Action::Read(PeerMessage::BlockRequestBatch(BlockRequestBatch { known_blocks: vec![block_2_b.hash(), genesis_block.hash(), block_1.hash()], max_response_len: 14, + anchor: expected_anchor, })), // Since genesis block is the 1st known in the list of known blocks, // it's immediate descendent, block_1, is the first one returned. - Action::Write(PeerMessage::BlockResponseBatch(vec![ - block_1.try_into().unwrap(), - block_2_a.try_into().unwrap(), - block_3_a.clone().try_into().unwrap(), - ])), + Action::Write(PeerMessage::BlockResponseBatch(response)), Action::Read(PeerMessage::Bye), ]); @@ -3277,6 +3449,7 @@ mod peer_loop_tests { mod sync_challenges { use super::*; + use crate::tests::shared::fake_valid_sequence_of_blocks_for_tests_dyn; #[traced_test] #[tokio::test] @@ -3416,7 +3589,7 @@ mod peer_loop_tests { // criterion. Alice issues a challenge. Bob responds. Alice enters into // sync mode. - let mut rng = StdRng::seed_from_u64(5550001); + let mut rng = thread_rng(); let network = Network::Main; let genesis_block: Block = Block::genesis_block(network); @@ -3454,9 +3627,13 @@ mod peer_loop_tests { alice.set_new_tip(block_1.clone()).await?; bob.set_new_tip(block_1.clone()).await?; - let blocks: [Block; 11] = - fake_valid_sequence_of_blocks_for_tests(&block_1, TARGET_BLOCK_INTERVAL, rng.gen()) - .await; + let blocks = fake_valid_sequence_of_blocks_for_tests_dyn( + &block_1, + TARGET_BLOCK_INTERVAL, + rng.gen(), + rng.gen_range(11..20), + ) + .await; for block in &blocks { bob.set_new_tip(block.clone()).await?; } @@ -3512,11 +3689,14 @@ mod peer_loop_tests { .await?; // AddPeerMaxBlockHeight message triggered *after* sync challenge - let expected_message_from_alice_peer_loop = PeerTaskToMain::AddPeerMaxBlockHeight(( - bob_socket_address, - bob_tip.header().height, - bob_tip.header().cumulative_proof_of_work, - )); + let mut expected_anchor_mmra = bob_tip.body().block_mmr_accumulator.clone(); + expected_anchor_mmra.append(bob_tip.hash()); + let expected_message_from_alice_peer_loop = PeerTaskToMain::AddPeerMaxBlockHeight { + peer_address: bob_socket_address, + claimed_height: bob_tip.header().height, + claimed_cumulative_pow: bob_tip.header().cumulative_proof_of_work, + claimed_block_mmra: expected_anchor_mmra, + }; let observed_message_from_alice_peer_loop = alice_peer_to_main_rx.recv().await.unwrap(); assert_eq!( expected_message_from_alice_peer_loop, diff --git a/src/rpc_server.rs b/src/rpc_server.rs index ee4ab28a..8f3303b4 100644 --- a/src/rpc_server.rs +++ b/src/rpc_server.rs @@ -1421,12 +1421,12 @@ impl RPC for NeptuneRPCServer { log_slow_scope!(fn_name!() + "::hash() tip digest"); state.chain.light_state().hash() }; - let tip_header = state.chain.light_state().header().clone(); + let tip_header = *state.chain.light_state().header(); let wallet_status = { log_slow_scope!(fn_name!() + "::get_wallet_status_for_tip()"); state.get_wallet_status_for_tip().await }; - let syncing = state.net.syncing; + let syncing = state.net.sync_anchor.is_some(); let mempool_size = { log_slow_scope!(fn_name!() + "::mempool.get_size()"); state.mempool.get_size() diff --git a/src/tests/shared.rs b/src/tests/shared.rs index c829c953..782f7906 100644 --- a/src/tests/shared.rs +++ b/src/tests/shared.rs @@ -202,14 +202,13 @@ pub(crate) async fn mock_genesis_global_state( ) -> GlobalStateLock { let (archival_state, peer_db, _data_dir) = mock_genesis_archival_state(network).await; - let syncing = false; let mut peer_map: HashMap = get_peer_map(); for i in 0..peer_count { let peer_address = std::net::SocketAddr::from_str(&format!("123.123.123.{}:8080", i)).unwrap(); peer_map.insert(peer_address, get_dummy_peer(peer_address)); } - let networking_state = NetworkingState::new(peer_map, peer_db, syncing); + let networking_state = NetworkingState::new(peer_map, peer_db); let genesis_block = archival_state.get_tip().await; // Sanity check @@ -284,7 +283,9 @@ pub(crate) async fn add_block_to_archival_state( archival_state.update_mutator_set(&new_block).await.unwrap(); - archival_state.add_to_archival_block_mmr(&new_block).await; + archival_state + .append_to_archival_block_mmr(&new_block) + .await; Ok(()) } @@ -1002,15 +1003,34 @@ pub(crate) async fn fake_valid_block_for_tests( /// /// Sequence is N-long. Every block i with i > 0 has block i-1 as its /// predecessor; block 0 has the `predecessor` argument as predecessor. Every -/// block is valid in terms of both `is_valid` and `has_proof_of_work`. +/// block is valid in terms of both `is_valid` and `has_proof_of_work`. But +/// the STARK proofs are mocked. pub(crate) async fn fake_valid_sequence_of_blocks_for_tests( - mut predecessor: &Block, + predecessor: &Block, block_interval: Timestamp, seed: [u8; 32], ) -> [Block; N] { + fake_valid_sequence_of_blocks_for_tests_dyn(predecessor, block_interval, seed, N) + .await + .try_into() + .unwrap() +} + +/// Create a deterministic sequence of valid blocks. +/// +/// Sequence is N-long. Every block i with i > 0 has block i-1 as its +/// predecessor; block 0 has the `predecessor` argument as predecessor. Every +/// block is valid in terms of both `is_valid` and `has_proof_of_work`. But +/// the STARK proofs are mocked. +pub(crate) async fn fake_valid_sequence_of_blocks_for_tests_dyn( + mut predecessor: &Block, + block_interval: Timestamp, + seed: [u8; 32], + n: usize, +) -> Vec { let mut blocks = vec![]; let mut rng: StdRng = SeedableRng::from_seed(seed); - for _ in 0..N { + for _ in 0..n { let block = fake_valid_successor_for_tests( predecessor, predecessor.header().timestamp + block_interval, @@ -1020,7 +1040,7 @@ pub(crate) async fn fake_valid_sequence_of_blocks_for_tests( blocks.push(block); predecessor = blocks.last().unwrap(); } - blocks.try_into().unwrap() + blocks } pub(crate) async fn wallet_state_has_all_valid_mps( diff --git a/src/util_types/mutator_set/archival_mmr.rs b/src/util_types/mutator_set/archival_mmr.rs index 844510dd..0e2aa72c 100644 --- a/src/util_types/mutator_set/archival_mmr.rs +++ b/src/util_types/mutator_set/archival_mmr.rs @@ -210,6 +210,62 @@ impl> ArchivalMmr { } } + /// Return membership proof, as it looks relative to a smaller version of + /// the MMR which only has `num_leafs` leafs. `num_leafs` may not exceed + /// the actual number of leafs. + pub(crate) async fn prove_membership_relative_to_smaller_mmr( + &self, + leaf_index: u64, + num_leafs: u64, + ) -> MmrMembershipProof { + // TODO: Replace this local function with the one in `twenty_first` once + // available through never version. + fn auth_path_node_indices(num_leafs: u64, leaf_index: u64) -> Vec { + assert!( + leaf_index < num_leafs, + "Leaf index out-of-bounds: {leaf_index}/{num_leafs}" + ); + + let (mut merkle_tree_index, _) = + leaf_index_to_mt_index_and_peak_index(leaf_index, num_leafs); + let mut node_index = leaf_index_to_node_index(leaf_index); + let mut height = 0; + let tree_height = u64::BITS - merkle_tree_index.leading_zeros() - 1; + let mut ret = Vec::with_capacity(tree_height as usize); + while merkle_tree_index > 1 { + let is_left_sibling = merkle_tree_index & 1 == 0; + let height_pow = 1u64 << (height + 1); + let as_1_or_minus_1: u64 = (2 * (is_left_sibling as i64) - 1) as u64; + let signed_height_pow = height_pow.wrapping_mul(as_1_or_minus_1); + let sibling = node_index + .wrapping_add(signed_height_pow) + .wrapping_sub(as_1_or_minus_1); + + node_index += 1 << ((height + 1) * is_left_sibling as u32); + + ret.push(sibling); + merkle_tree_index >>= 1; + height += 1; + } + + debug_assert_eq!(tree_height, ret.len() as u32, "Allocation must be optimal"); + + ret + } + + assert!( + num_leafs <= self.num_leafs().await, + "Cannot find membership proofs relative to bigger MMR" + ); + + let node_indices = auth_path_node_indices(num_leafs, leaf_index); + let ap_elements = self.digests.get_many(&node_indices).await; + + MmrMembershipProof { + authentication_path: ap_elements, + } + } + /// Return membership proof pub async fn prove_membership_async(&self, leaf_index: u64) -> MmrMembershipProof { // A proof consists of an authentication path @@ -714,21 +770,50 @@ pub(crate) mod mmr_test { } } + #[proptest(async = "tokio")] + async fn prove_membership_relative_to_smaller_mmr_test( + #[strategy(1u64..200)] _num_leafs: u64, + #[strategy(vec(arb(), #_num_leafs as usize))] digests: Vec, + #[strategy(1u64..=#_num_leafs)] reduced_num_leafs: u64, + #[strategy(0u64..#reduced_num_leafs)] leaf_index: u64, + ) { + let leaf = digests[leaf_index as usize]; + let smaller_mmr = + MmrAccumulator::new_from_leafs(digests[0..reduced_num_leafs as usize].to_vec()); + let ammr = mock::get_ammr_from_digests(digests).await; + let mp = ammr + .prove_membership_relative_to_smaller_mmr(leaf_index, reduced_num_leafs) + .await; + prop_assert!(mp.verify( + leaf_index, + leaf, + &smaller_mmr.peaks(), + smaller_mmr.num_leafs() + )); + } + #[tokio::test] async fn mmr_prove_verify_leaf_mutation_test() { - for size in 1..150 { + for size in 1u64..150 { let new_leaf: Digest = random(); let bad_leaf: Digest = random(); - let leaf_hashes_tip5: Vec = random_elements(size); + let leaf_hashes_tip5: Vec = random_elements(size as usize); let mut acc = MmrAccumulator::new_from_leafs(leaf_hashes_tip5.clone()); let mut archival: ArchivalMmr = mock::get_ammr_from_digests(leaf_hashes_tip5.clone()).await; let archival_end_state: ArchivalMmr = - mock::get_ammr_from_digests(vec![new_leaf; size]).await; + mock::get_ammr_from_digests(vec![new_leaf; size as usize]).await; for i in 0..size { - let i = i as u64; let peaks_before_update = archival.peaks().await; let mp = archival.prove_membership_async(i).await; + assert_eq!( + mp, + archival + .prove_membership_relative_to_smaller_mmr(i, size) + .await, + "Two ways of getting MMRMPs must agree" + ); + assert_eq!(archival.peaks().await, peaks_before_update); // Verify the update operation using the batch verifier @@ -753,8 +838,8 @@ pub(crate) mod mmr_test { acc.mutate_leaf(LeafMutation::new(i, new_leaf, mp)); let new_archival_peaks = archival.peaks().await; assert_eq!(new_archival_peaks, acc.peaks()); - assert_eq!(size as u64, archival.num_leafs().await); - assert_eq!(size as u64, acc.num_leafs()); + assert_eq!(size, archival.num_leafs().await); + assert_eq!(size, acc.num_leafs()); } assert_eq!(archival_end_state.peaks().await, acc.peaks()); }