From 0f61d6ab2a787a6816e6a7db451a1c37d16949f5 Mon Sep 17 00:00:00 2001 From: sword_smith Date: Mon, 20 Jan 2025 20:29:01 +0100 Subject: [PATCH 01/11] feat(global_state): Allow storing of block that's not tip Add a method to global state that allows the storing of a block without marking it as the new tip. This is intended to improve the client's ability to tolerate reorganizations. --- src/models/state/archival_state.rs | 271 +++++++++++++++-------------- src/models/state/mod.rs | 150 +++++++++++++++- src/tests/shared.rs | 4 +- 3 files changed, 290 insertions(+), 135 deletions(-) diff --git a/src/models/state/archival_state.rs b/src/models/state/archival_state.rs index d7757d8e..f6821c55 100644 --- a/src/models/state/archival_state.rs +++ b/src/models/state/archival_state.rs @@ -287,158 +287,151 @@ impl ArchivalState { &self.genesis_block } - /// Write a newly found block to database and to disk, and set it as tip. + /// Write a block to database and disk, without setting it as tip. /// - /// If block was already written to database, then it is only marked as - /// tip, and no write to disk occurs. Instead, the old block database entry - /// is assumed to be valid, and so is the block stored on disk. - pub(crate) async fn write_block_as_tip(&mut self, new_block: &Block) -> Result<()> { - async fn write_block( - archival_state: &mut ArchivalState, - new_block: &Block, - ) -> Result> { - // Fetch last file record to find disk location to store block. - // This record must exist in the DB already, unless this is the first block - // stored on disk. - let mut last_rec: LastFileRecord = archival_state - .block_index_db - .get(BlockIndexKey::LastFile) - .await - .map(|x| x.as_last_file_record()) - .unwrap_or_default(); + /// The caller should verify that the block is not already stored, otherwise + /// the block will be stored twice which will lead to inconsistencies. + async fn store_block( + self: &mut ArchivalState, + new_block: &Block, + ) -> Result> { + // Fetch last file record to find disk location to store block. + // This record must exist in the DB already, unless this is the first block + // stored on disk. + let mut last_rec: LastFileRecord = self + .block_index_db + .get(BlockIndexKey::LastFile) + .await + .map(|x| x.as_last_file_record()) + .unwrap_or_default(); - // Open the file that was last used for storing a block - let mut block_file_path = archival_state.data_dir.block_file_path(last_rec.last_file); - let serialized_block: Vec = bincode::serialize(new_block)?; - let serialized_block_size: u64 = serialized_block.len() as u64; + // Open the file that was last used for storing a block + let mut block_file_path = self.data_dir.block_file_path(last_rec.last_file); + let serialized_block: Vec = bincode::serialize(new_block)?; + let serialized_block_size: u64 = serialized_block.len() as u64; - // file operations are async. + let mut block_file = DataDirectory::open_ensure_parent_dir_exists(&block_file_path).await?; - let mut block_file = - DataDirectory::open_ensure_parent_dir_exists(&block_file_path).await?; + // Check if we should use the last file, or we need a new one. + if new_block_file_is_needed(&block_file, serialized_block_size).await { + last_rec = LastFileRecord { + last_file: last_rec.last_file + 1, + }; + block_file_path = self.data_dir.block_file_path(last_rec.last_file); + block_file = DataDirectory::open_ensure_parent_dir_exists(&block_file_path).await?; + } - // Check if we should use the last file, or we need a new one. - if new_block_file_is_needed(&block_file, serialized_block_size).await { - last_rec = LastFileRecord { - last_file: last_rec.last_file + 1, - }; - block_file_path = archival_state.data_dir.block_file_path(last_rec.last_file); - block_file = DataDirectory::open_ensure_parent_dir_exists(&block_file_path).await?; + debug!("Writing block to: {}", block_file_path.display()); + // Get associated file record from database, otherwise create it + let file_record_key: BlockIndexKey = BlockIndexKey::File(last_rec.last_file); + let file_record_value: Option = self + .block_index_db + .get(file_record_key.clone()) + .await + .map(|x| x.as_file_record()); + let file_record_value: FileRecord = match file_record_value { + Some(record) => record.add(serialized_block_size, new_block.header()), + None => { + assert!( + block_file.metadata().await.unwrap().len().is_zero(), + "If no file record exists, block file must be empty" + ); + FileRecord::new(serialized_block_size, new_block.header()) } + }; - debug!("Writing block to: {}", block_file_path.display()); - // Get associated file record from database, otherwise create it - let file_record_key: BlockIndexKey = BlockIndexKey::File(last_rec.last_file); - let file_record_value: Option = archival_state - .block_index_db - .get(file_record_key.clone()) - .await - .map(|x| x.as_file_record()); - let file_record_value: FileRecord = match file_record_value { - Some(record) => record.add(serialized_block_size, new_block.header()), - None => { - assert!( - block_file.metadata().await.unwrap().len().is_zero(), - "If no file record exists, block file must be empty" - ); - FileRecord::new(serialized_block_size, new_block.header()) - } - }; - - // Make room in file for mmapping and record where block starts - let pos = block_file.seek(SeekFrom::End(0)).await.unwrap(); - debug!("Size of file prior to block writing: {}", pos); - block_file - .seek(SeekFrom::Current(serialized_block_size as i64 - 1)) - .await - .unwrap(); - block_file.write_all(&[0]).await.unwrap(); - let file_offset: u64 = block_file - .seek(SeekFrom::Current(-(serialized_block_size as i64))) - .await - .unwrap(); - debug!( - "New file size: {} bytes", - block_file.metadata().await.unwrap().len() - ); + // Make room in file for mmapping and record where block starts + let pos = block_file.seek(SeekFrom::End(0)).await.unwrap(); + debug!("Size of file prior to block writing: {}", pos); + block_file + .seek(SeekFrom::Current(serialized_block_size as i64 - 1)) + .await + .unwrap(); + block_file.write_all(&[0]).await.unwrap(); + let file_offset: u64 = block_file + .seek(SeekFrom::Current(-(serialized_block_size as i64))) + .await + .unwrap(); + debug!( + "New file size: {} bytes", + block_file.metadata().await.unwrap().len() + ); - let height_record_key = BlockIndexKey::Height(new_block.header().height); - let mut blocks_at_same_height: Vec = match archival_state - .block_index_db - .get(height_record_key.clone()) - .await - { + let height_record_key = BlockIndexKey::Height(new_block.header().height); + let mut blocks_at_same_height: Vec = + match self.block_index_db.get(height_record_key.clone()).await { Some(rec) => rec.as_height_record(), None => vec![], }; - // Write to file with mmap, only map relevant part of file into memory - // we use spawn_blocking to make the blocking mmap async-friendly. - tokio::task::spawn_blocking(move || { - let mmap = unsafe { - MmapOptions::new() - .offset(pos) - .len(serialized_block_size as usize) - .map(&block_file) - .unwrap() - }; - let mut mmap: memmap2::MmapMut = mmap.make_mut().unwrap(); - mmap.deref_mut()[..].copy_from_slice(&serialized_block); - }) - .await?; + // Write to file with mmap, only map relevant part of file into memory + // we use spawn_blocking to make the blocking mmap async-friendly. + tokio::task::spawn_blocking(move || { + let mmap = unsafe { + MmapOptions::new() + .offset(pos) + .len(serialized_block_size as usize) + .map(&block_file) + .unwrap() + }; + let mut mmap: memmap2::MmapMut = mmap.make_mut().unwrap(); + mmap.deref_mut()[..].copy_from_slice(&serialized_block); + }) + .await?; - // Update block index database with newly stored block - let mut block_index_entries: Vec<(BlockIndexKey, BlockIndexValue)> = vec![]; - let block_record_key: BlockIndexKey = BlockIndexKey::Block(new_block.hash()); - let num_additions: u64 = new_block - .mutator_set_update() - .additions - .len() - .try_into() - .expect("Num addition records cannot exceed u64::MAX"); - let block_record_value: BlockIndexValue = - BlockIndexValue::Block(Box::new(BlockRecord { - block_header: new_block.header().clone(), - file_location: BlockFileLocation { - file_index: last_rec.last_file, - offset: file_offset, - block_length: serialized_block_size as usize, - }, - min_aocl_index: new_block.mutator_set_accumulator_after().aocl.num_leafs() - - num_additions, - num_additions, - })); - - block_index_entries.push((file_record_key, BlockIndexValue::File(file_record_value))); - block_index_entries.push((block_record_key, block_record_value)); - - block_index_entries - .push((BlockIndexKey::LastFile, BlockIndexValue::LastFile(last_rec))); - blocks_at_same_height.push(new_block.hash()); - block_index_entries.push(( - height_record_key, - BlockIndexValue::Height(blocks_at_same_height), - )); + // Update block index database with newly stored block + let mut block_index_entries: Vec<(BlockIndexKey, BlockIndexValue)> = vec![]; + let block_record_key: BlockIndexKey = BlockIndexKey::Block(new_block.hash()); + let num_additions: u64 = new_block + .mutator_set_update() + .additions + .len() + .try_into() + .expect("Num addition records cannot exceed u64::MAX"); + let block_record_value: BlockIndexValue = BlockIndexValue::Block(Box::new(BlockRecord { + block_header: new_block.header().clone(), + file_location: BlockFileLocation { + file_index: last_rec.last_file, + offset: file_offset, + block_length: serialized_block_size as usize, + }, + min_aocl_index: new_block.mutator_set_accumulator_after().aocl.num_leafs() + - num_additions, + num_additions, + })); + + block_index_entries.push((file_record_key, BlockIndexValue::File(file_record_value))); + block_index_entries.push((block_record_key, block_record_value)); + + block_index_entries.push((BlockIndexKey::LastFile, BlockIndexValue::LastFile(last_rec))); + blocks_at_same_height.push(new_block.hash()); + block_index_entries.push(( + height_record_key, + BlockIndexValue::Height(blocks_at_same_height), + )); - Ok(block_index_entries) - } + Ok(block_index_entries) + } - let block_is_new = self.get_block_header(new_block.hash()).await.is_none(); + async fn write_block_internal(&mut self, block: &Block, is_new_tip: bool) -> Result<()> { + let block_is_new = self.get_block_header(block.hash()).await.is_none(); let mut block_index_entries = if block_is_new { - write_block(self, new_block).await? + self.store_block(block).await? } else { warn!( "Attempted to store block but block was already stored.\nBlock digest: {}", - new_block.hash() + block.hash() ); vec![] }; - // Mark block as tip - block_index_entries.push(( - BlockIndexKey::BlockTipDigest, - BlockIndexValue::BlockTipDigest(new_block.hash()), - )); + // Mark block as tip, conditionally + if is_new_tip { + block_index_entries.push(( + BlockIndexKey::BlockTipDigest, + BlockIndexValue::BlockTipDigest(block.hash()), + )); + } let mut batch = WriteBatchAsync::new(); for (k, v) in block_index_entries.into_iter() { @@ -450,11 +443,29 @@ impl ArchivalState { Ok(()) } + /// Write a newly found block to database and to disk, without setting it as + /// tip. + /// + /// If block was already written to database, then this is a nop as the old + /// database entries and block stored on disk are considered valid. + pub(crate) async fn write_block_not_tip(&mut self, block: &Block) -> Result<()> { + self.write_block_internal(block, false).await + } + + /// Write a newly found block to database and to disk, and set it as tip. + /// + /// If block was already written to database, then it is only marked as + /// tip, and no write to disk occurs. Instead, the old block database entry + /// is assumed to be valid, and so is the block stored on disk. + pub(crate) async fn write_block_as_tip(&mut self, new_block: &Block) -> Result<()> { + self.write_block_internal(new_block, true).await + } + /// Add a new block as tip for the archival block MMR. /// /// All predecessors of this block must be known and stored in the block /// index database for this update to work. - pub(crate) async fn add_to_archival_block_mmr(&mut self, new_block: &Block) { + pub(crate) async fn append_to_archival_block_mmr(&mut self, new_block: &Block) { // Roll back to length of parent (accounting for genesis block), // then add new digest. let num_leafs_prior_to_this_block = new_block.header().height.into(); diff --git a/src/models/state/mod.rs b/src/models/state/mod.rs index c2407c25..3fe7e453 100644 --- a/src/models/state/mod.rs +++ b/src/models/state/mod.rs @@ -1331,7 +1331,10 @@ impl GlobalState { /// /// Returns a list of update-jobs that should be /// performed by this client. - pub async fn set_new_tip(&mut self, new_block: Block) -> Result> { + pub(crate) async fn set_new_tip( + &mut self, + new_block: Block, + ) -> Result> { self.set_new_tip_internal(new_block, vec![]).await } @@ -1341,7 +1344,7 @@ impl GlobalState { /// /// Returns a list of update-jobs that should be /// performed by this client. - pub async fn set_new_self_mined_tip( + pub(crate) async fn set_new_self_mined_tip( &mut self, new_block: Block, miner_reward_utxo_infos: Vec, @@ -1350,6 +1353,26 @@ impl GlobalState { .await } + /// Store a block to client's state *without* marking this block as a new + /// tip. No validation of block happens, as this is the caller's + /// responsibility. + async fn store_block_not_tip(&mut self, block: Block) -> Result<()> { + crate::macros::log_scope_duration!(); + + self.chain + .archival_state_mut() + .write_block_not_tip(&block) + .await?; + + // Mempool is not updated, as it's only defined relative to the tip. + // Wallet is not updated, as it can be synced to tip at any point. + + // Flush databases + self.flush_databases().await?; + + Ok(()) + } + /// Update client's state with a new block. Block is assumed to be valid, also wrt. to PoW. /// The received block will be set as the new tip, regardless of its accumulated PoW. or its /// validity. @@ -1371,7 +1394,7 @@ impl GlobalState { self.chain .archival_state_mut() - .add_to_archival_block_mmr(&new_block) + .append_to_archival_block_mmr(&new_block) .await; // update the mutator set with the UTXOs from this block @@ -2795,7 +2818,6 @@ mod global_state_tests { ) { // Verifying light state integrity let expected_tip_digest = expected_tip.hash(); - let expected_parent_digest = expected_parent.hash(); assert_eq!(expected_tip_digest, global_state.chain.light_state().hash()); // Peeking into archival state @@ -2809,6 +2831,17 @@ mod global_state_tests { .await, "Archival state must have expected sync-label" ); + assert_eq!( + expected_tip.mutator_set_accumulator_after(), + global_state + .chain + .archival_state() + .archival_mutator_set + .ams() + .accumulator() + .await, + "Archival mutator set must match that in expected tip" + ); assert_eq!( expected_tip_digest, @@ -2863,6 +2896,8 @@ mod global_state_tests { .len(), "Exactly {expected_num_blocks_at_tip_height} blocks at height must be known" ); + + let expected_parent_digest = expected_parent.hash(); assert_eq!( expected_parent_digest, global_state @@ -2982,6 +3017,113 @@ mod global_state_tests { } } + #[traced_test] + #[tokio::test] + async fn can_store_block_without_marking_it_as_tip_1_block() { + // Verify that [GlobalState::store_block_not_tip] stores block + // correctly, and that [GlobalState::set_new_tip] can be used to + // build upon blocks stored through the former method. + let network = Network::Main; + let mut rng = thread_rng(); + let genesis_block = Block::genesis_block(network); + let wallet_secret = WalletSecret::new_random(); + + let mut alice = mock_genesis_global_state( + network, + 2, + wallet_secret.clone(), + cli_args::Args::default(), + ) + .await; + + let mut alice = alice.global_state_lock.lock_guard_mut().await; + assert_eq!(genesis_block.hash(), alice.chain.light_state().hash()); + + let cb_key = WalletSecret::new_random().nth_generation_spending_key(0); + let (block_1, _) = make_mock_block(&genesis_block, None, cb_key, rng.gen()).await; + + alice.store_block_not_tip(block_1.clone()).await.unwrap(); + assert_eq!( + genesis_block.hash(), + alice.chain.light_state().hash(), + "method may not update light state's tip" + ); + assert_eq!( + genesis_block.hash(), + alice.chain.archival_state().get_tip().await.hash(), + "method may not update archival state's tip" + ); + + alice.set_new_tip(block_1.clone()).await.unwrap(); + assert_correct_global_state(&alice, block_1.clone(), genesis_block, 1, 0).await; + } + + #[traced_test] + #[tokio::test] + async fn reorganization_with_blocks_that_were_never_tips_n_blocks_deep() { + // Verify that [GlobalState::store_block_not_tip] stores block + // correctly, and that [GlobalState::set_new_tip] can be used to + // build upon blocks stored through the former method. + + /// Return a list of (Block, parent) pairs, of length N. + async fn chain_of_blocks_and_parents( + network: Network, + length: usize, + ) -> Vec<(Block, Block)> { + let mut rng = thread_rng(); + let cb_key = WalletSecret::new_random().nth_generation_spending_key(0); + let mut parent = Block::genesis_block(network); + let mut chain = vec![]; + for _ in 0..length { + let (block, _) = make_mock_block(&parent, None, cb_key, rng.gen()).await; + chain.push((block.clone(), parent.clone())); + parent = block; + } + + chain + } + + let network = Network::Main; + let genesis_block = Block::genesis_block(network); + let wallet_secret = WalletSecret::new_random(); + + for depth in 1..=4 { + let mut alice = mock_genesis_global_state( + network, + 2, + wallet_secret.clone(), + cli_args::Args::default(), + ) + .await; + let mut alice = alice.global_state_lock.lock_guard_mut().await; + assert_eq!(genesis_block.hash(), alice.chain.light_state().hash()); + let chain_a = chain_of_blocks_and_parents(network, depth).await; + let chain_b = chain_of_blocks_and_parents(network, depth).await; + let blocks_and_parents = [chain_a, chain_b].concat(); + for (block, _) in blocks_and_parents.iter() { + alice.store_block_not_tip(block.clone()).await.unwrap(); + assert_eq!( + genesis_block.hash(), + alice.chain.light_state().hash(), + "method may not update light state's tip, depth = {depth}" + ); + assert_eq!( + genesis_block.hash(), + alice.chain.archival_state().get_tip().await.hash(), + "method may not update archival state's tip, depth = {depth}" + ); + } + + // Loop over all blocks and verify that all can be marked as + // tip, resulting in a consistent, correct state. + for (block, parent) in blocks_and_parents.iter() { + alice.set_new_tip(block.clone()).await.unwrap(); + assert_correct_global_state(&alice, block.clone(), parent.to_owned(), 2, 0) + .await; + } + } + } + #[traced_test] #[tokio::test] async fn set_new_tip_can_roll_back() { diff --git a/src/tests/shared.rs b/src/tests/shared.rs index c829c953..774e3ef2 100644 --- a/src/tests/shared.rs +++ b/src/tests/shared.rs @@ -284,7 +284,9 @@ pub(crate) async fn add_block_to_archival_state( archival_state.update_mutator_set(&new_block).await.unwrap(); - archival_state.add_to_archival_block_mmr(&new_block).await; + archival_state + .append_to_archival_block_mmr(&new_block) + .await; Ok(()) } From 2b21f26c7934defb9a6c4bd0e68519ede3f04ffd Mon Sep 17 00:00:00 2001 From: sword_smith Date: Mon, 20 Jan 2025 21:07:31 +0100 Subject: [PATCH 02/11] test(global_state): verify that never-tip blocks can bridge to new tip Add a test that a chain of blocks that were never tips can be used to bridge `GlobalState` to a tip from another chain. --- src/models/state/mod.rs | 100 ++++++++++++++++++++++++++++++++-------- 1 file changed, 81 insertions(+), 19 deletions(-) diff --git a/src/models/state/mod.rs b/src/models/state/mod.rs index 3fe7e453..e4277f15 100644 --- a/src/models/state/mod.rs +++ b/src/models/state/mod.rs @@ -3058,31 +3058,93 @@ mod global_state_tests { assert_correct_global_state(&alice, block_1.clone(), genesis_block, 1, 0).await; } + /// Return a list of (Block, parent) pairs, of length N. + async fn chain_of_blocks_and_parents( + network: Network, + length: usize, + ) -> Vec<(Block, Block)> { + let mut rng = thread_rng(); + let cb_key = WalletSecret::new_random().nth_generation_spending_key(0); + let mut parent = Block::genesis_block(network); + let mut chain = vec![]; + for _ in 0..length { + let (block, _) = make_mock_block(&parent, None, cb_key, rng.gen()).await; + chain.push((block.clone(), parent.clone())); + parent = block; + } + + chain + } + #[traced_test] #[tokio::test] - async fn reorganization_with_blocks_that_were_never_tips_n_blocks_deep() { - // Verify that [GlobalState::store_block_not_tip] stores block - // correctly, and that [GlobalState::set_new_tip] can be used to - // build upon blocks stored through the former method. + async fn can_jump_to_new_tip_over_blocks_that_were_never_tips() { + let network = Network::Main; + let wallet_secret = WalletSecret::new_random(); + let mut alice = mock_genesis_global_state( + network, + 2, + wallet_secret.clone(), + cli_args::Args::default(), + ) + .await; + let mut alice = alice.global_state_lock.lock_guard_mut().await; - /// Return a list of (Block, parent) pairs, of length N. - async fn chain_of_blocks_and_parents( - network: Network, - length: usize, - ) -> Vec<(Block, Block)> { - let mut rng = thread_rng(); - let cb_key = WalletSecret::new_random().nth_generation_spending_key(0); - let mut parent = Block::genesis_block(network); - let mut chain = vec![]; - for _ in 0..length { - let (block, _) = make_mock_block(&parent, None, cb_key, rng.gen()).await; - chain.push((block.clone(), parent.clone())); - parent = block; - } + let a_length = 12; + let chain_a = chain_of_blocks_and_parents(network, a_length).await; + for (block, _) in chain_a.iter() { + alice.set_new_tip(block.to_owned()).await.unwrap(); + } + + let chain_a_tip = &chain_a[a_length - 1].0; + let chain_a_tip_parent = &chain_a[a_length - 1].1; + assert_correct_global_state( + &alice, + chain_a_tip.to_owned(), + chain_a_tip_parent.to_owned(), + 1, + 0, + ) + .await; - chain + // Store all blocks from a new chain, except the last, without + // marking any of them as tips. Verify no change in tip. + let b_length = 15; + let chain_b = chain_of_blocks_and_parents(network, b_length).await; + for (block, _) in chain_b.iter().take(b_length - 1) { + alice.store_block_not_tip(block.clone()).await.unwrap(); } + assert_correct_global_state( + &alice, + chain_a_tip.to_owned(), + chain_a_tip_parent.to_owned(), + 2, + 0, + ) + .await; + // Set chain B's last block to tip to verify that all the stored + // blocks from chain B can be used to connect it to LUCA, which in + // this case is genesis block. + let chain_b_tip = &chain_b[b_length - 1].0; + let chain_b_tip_parent = &chain_b[b_length - 1].1; + alice.set_new_tip(chain_b_tip.to_owned()).await.unwrap(); + assert_correct_global_state( + &alice, + chain_b_tip.to_owned(), + chain_b_tip_parent.to_owned(), + 1, + 0, + ) + .await; + } + + #[traced_test] + #[tokio::test] + async fn reorganization_with_blocks_that_were_never_tips_n_blocks_deep() { + // Verify that [GlobalState::store_block_not_tip] stores block + // correctly, and that [GlobalState::set_new_tip] can be used to + // build upon blocks stored through the former method. let network = Network::Main; let genesis_block = Block::genesis_block(network); let wallet_secret = WalletSecret::new_random(); From 05a5d8b14ea64bfefcceb9236f5801d5c2673fc1 Mon Sep 17 00:00:00 2001 From: sword_smith Date: Tue, 21 Jan 2025 14:13:11 +0100 Subject: [PATCH 03/11] refactor(peer_loop): Verify block before adding to fork-reconciliation list Co-authored-by: Alan Szepieniec --- src/peer_loop.rs | 36 +++++++++++++++++++++++++++--------- 1 file changed, 27 insertions(+), 9 deletions(-) diff --git a/src/peer_loop.rs b/src/peer_loop.rs index fd5fd4d8..9ab14eb8 100644 --- a/src/peer_loop.rs +++ b/src/peer_loop.rs @@ -323,18 +323,20 @@ impl PeerLoopHandler { Ok(Some(new_block_height)) } - /// Takes a single block received from a peer and (attempts to) find a path - /// from some known stored block to the received block. + /// Take a single block received from a peer and (attempt to) find a path + /// between the received block and a common ancestor stored in the blocks + /// database. /// - /// This function attempts to find the parent of a new block, either by - /// searching the database or, if necessary, by requesting it from a peer. - /// - If the parent is stored in the database, block handling continues. + /// This function attempts to find the parent of the received block, either + /// by searching the database or by requesting it from a peer. /// - If the parent is not stored, it is requested from the peer and the /// received block is pushed to the fork reconciliation list for later - /// handling by this function. - /// - /// If the parent is stored, the block and any fork reconciliation blocks - /// are passed down the pipeline. + /// handling by this function. The fork reconciliation list starts out + /// empty, but grows as more parents are requested and transmitted. + /// - If the parent is found in the database, a) block handling continues: + /// the entire list of fork reconciliation blocks are passed down the + /// pipeline, potentially leading to a state update; and b) the fork + /// reconciliation list is cleared. /// /// Locking: /// * Acquires `global_state_lock` for write via `self.punish(..)` and @@ -394,6 +396,22 @@ impl PeerLoopHandler { && peer_state.fork_reconciliation_blocks.len() + 1 < self.global_state_lock.cli().sync_mode_threshold { + if let Some(child) = peer_state.fork_reconciliation_blocks.last() { + let valid = child.is_valid(&received_block, self.now()).await; + if !valid { + self.punish(NegativePeerSanction::InvalidBlock(( + child.header().height, + child.hash(), + ))) + .await?; + warn!( + "Received invalid block in fork-reconciliation process of length {}", + peer_state.fork_reconciliation_blocks.len() + 1 + ); + peer_state.fork_reconciliation_blocks.clear(); + return Ok(()); + } + } peer_state.fork_reconciliation_blocks.push(*received_block); } else { // Blocks received out of order. Or more than allowed received without From 3e88e47fd023efb64daefdd04c810ea1d7af96d3 Mon Sep 17 00:00:00 2001 From: sword_smith Date: Tue, 21 Jan 2025 14:58:27 +0100 Subject: [PATCH 04/11] style(`peer_loop`): Improve `try_ensure_path` readability Also: - Reduce severity of for resolution errors, since we now default to 1'000 blocks, and we don't want this to cause peer bans on 1st error. - Derive `Copy` for `BlockHeader`. Co-authored-by: Alan Szepieniec --- src/models/blockchain/block/block_header.rs | 2 +- src/models/peer.rs | 2 +- src/models/peer/transfer_block.rs | 2 +- src/models/state/archival_state.rs | 4 +- src/models/state/mod.rs | 2 +- src/peer_loop.rs | 161 +++++++++----------- src/rpc_server.rs | 2 +- 7 files changed, 76 insertions(+), 99 deletions(-) diff --git a/src/models/blockchain/block/block_header.rs b/src/models/blockchain/block/block_header.rs index 115bf592..8551ce5b 100644 --- a/src/models/blockchain/block/block_header.rs +++ b/src/models/blockchain/block/block_header.rs @@ -60,7 +60,7 @@ pub(crate) const ADVANCE_DIFFICULTY_CORRECTION_FACTOR: usize = 4; pub(crate) const BLOCK_HEADER_VERSION: BFieldElement = BFieldElement::new(0); -#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq, BFieldCodec, GetSize)] +#[derive(Copy, Clone, Debug, Serialize, Deserialize, PartialEq, Eq, BFieldCodec, GetSize)] #[cfg_attr(any(test, feature = "arbitrary-impls"), derive(Arbitrary))] pub struct BlockHeader { pub version: BFieldElement, diff --git a/src/models/peer.rs b/src/models/peer.rs index 54739f28..49c35ff1 100644 --- a/src/models/peer.rs +++ b/src/models/peer.rs @@ -264,7 +264,7 @@ impl Sanction for NegativePeerSanction { NegativePeerSanction::InvalidBlock(_) => -10, NegativePeerSanction::DifferentGenesis => i32::MIN, NegativePeerSanction::ForkResolutionError((_height, count, _digest)) => { - i32::from(count).saturating_mul(-3) + i32::from(count).saturating_mul(-1) } NegativePeerSanction::SynchronizationTimeout => -5, NegativePeerSanction::FloodPeerListResponse => -2, diff --git a/src/models/peer/transfer_block.rs b/src/models/peer/transfer_block.rs index cf4f079d..52abd31d 100644 --- a/src/models/peer/transfer_block.rs +++ b/src/models/peer/transfer_block.rs @@ -61,7 +61,7 @@ impl TryFrom<&Block> for TransferBlock { } }; Ok(Self { - header: block.kernel.header.clone(), + header: block.kernel.header, body: block.kernel.body.clone(), proof, appendix: block.kernel.appendix.clone(), diff --git a/src/models/state/archival_state.rs b/src/models/state/archival_state.rs index f6821c55..a765ea43 100644 --- a/src/models/state/archival_state.rs +++ b/src/models/state/archival_state.rs @@ -389,7 +389,7 @@ impl ArchivalState { .try_into() .expect("Num addition records cannot exceed u64::MAX"); let block_record_value: BlockIndexValue = BlockIndexValue::Block(Box::new(BlockRecord { - block_header: new_block.header().clone(), + block_header: *new_block.header(), file_location: BlockFileLocation { file_index: last_rec.last_file, offset: file_offset, @@ -771,7 +771,7 @@ impl ArchivalState { // If no block was found, check if digest is genesis digest if ret.is_none() && block_digest == self.genesis_block.hash() { - ret = Some(self.genesis_block.header().clone()); + ret = Some(*self.genesis_block.header()); } ret diff --git a/src/models/state/mod.rs b/src/models/state/mod.rs index e4277f15..d1d15848 100644 --- a/src/models/state/mod.rs +++ b/src/models/state/mod.rs @@ -880,7 +880,7 @@ impl GlobalState { pub async fn get_own_handshakedata(&self) -> HandshakeData { let listen_port = self.cli().own_listen_port(); HandshakeData { - tip_header: self.chain.light_state().header().clone(), + tip_header: *self.chain.light_state().header(), listen_port, network: self.cli().network, instance_id: self.net.instance_id, diff --git a/src/peer_loop.rs b/src/peer_loop.rs index 9ab14eb8..fed8fdff 100644 --- a/src/peer_loop.rs +++ b/src/peer_loop.rs @@ -352,15 +352,68 @@ impl PeerLoopHandler { >::Error: std::error::Error + Sync + Send + 'static, ::Error: std::error::Error, { - let parent_digest = received_block.kernel.header.prev_block_digest; + // Does the received block match the fork reconciliation list? + let received_block_matches_fork_reconciliation_list = if let Some(successor) = + peer_state.fork_reconciliation_blocks.last() + { + let valid = successor + .is_valid(received_block.as_ref(), self.now()) + .await; + if !valid { + warn!( + "Fork reconciliation failed after receiving {} blocks: successor of received block is invalid", + peer_state.fork_reconciliation_blocks.len() + 1 + ); + } + valid + } else { + true + }; + + // Are we running out of RAM? + let too_many_blocks = peer_state.fork_reconciliation_blocks.len() + 1 + >= self.global_state_lock.cli().sync_mode_threshold; + if too_many_blocks { + warn!( + "Fork reconciliation failed after receiving {} blocks: block count exceeds sync mode threshold", + peer_state.fork_reconciliation_blocks.len() + 1 + ); + } + + // Block mismatch or too many blocks: abort! + if !received_block_matches_fork_reconciliation_list || too_many_blocks { + self.punish(NegativePeerSanction::ForkResolutionError(( + received_block.header().height, + peer_state.fork_reconciliation_blocks.len() as u16, + received_block.hash(), + ))) + .await?; + peer_state.fork_reconciliation_blocks = vec![]; + return Ok(()); + } + + // otherwise, append + peer_state.fork_reconciliation_blocks.push(*received_block); + + // Try fetch parent + let received_block_header = *peer_state + .fork_reconciliation_blocks + .last() + .unwrap() + .header(); + + let parent_digest = received_block_header.prev_block_digest; + let parent_height = received_block_header.height.previous() + .expect("transferred block must have previous height because genesis block cannot be transferred"); debug!("Try ensure path: fetching parent block"); - let global_state = self.global_state_lock.lock_guard().await; - let parent_block = global_state + let parent_block = self + .global_state_lock + .lock_guard() + .await .chain .archival_state() .get_block(parent_digest) .await?; - drop(global_state); debug!( "Completed parent block fetching from DB: {}", if parent_block.is_some() { @@ -369,113 +422,37 @@ impl PeerLoopHandler { "not found".to_string() } ); - let parent_height = received_block.kernel.header.height.previous() - .expect("transferred block must have previous height because genesis block cannot be transferred"); - // If parent is not known, request the parent, and add the current to - // the peer fork resolution list - if parent_block.is_none() && parent_height > BlockHeight::genesis() { + // If parent is not known (but not genesis) request it. + let Some(parent_block) = parent_block else { + if parent_height.is_genesis() { + peer_state.fork_reconciliation_blocks.clear(); + self.punish(NegativePeerSanction::DifferentGenesis).await?; + return Ok(()); + } info!( "Parent not known: Requesting previous block with height {} from peer", parent_height ); - // If the received block matches the block reconciliation state - // push it there and request its parent - if peer_state.fork_reconciliation_blocks.is_empty() - || peer_state - .fork_reconciliation_blocks - .last() - .unwrap() - .kernel - .header - .height - .previous() - .expect("fork reconcilliation blocks cannot contain genesis") - == received_block.kernel.header.height - && peer_state.fork_reconciliation_blocks.len() + 1 - < self.global_state_lock.cli().sync_mode_threshold - { - if let Some(child) = peer_state.fork_reconciliation_blocks.last() { - let valid = child.is_valid(&received_block, self.now()).await; - if !valid { - self.punish(NegativePeerSanction::InvalidBlock(( - child.header().height, - child.hash(), - ))) - .await?; - warn!( - "Received invalid block in fork-reconciliation process of length {}", - peer_state.fork_reconciliation_blocks.len() + 1 - ); - peer_state.fork_reconciliation_blocks.clear(); - return Ok(()); - } - } - peer_state.fork_reconciliation_blocks.push(*received_block); - } else { - // Blocks received out of order. Or more than allowed received without - // going into sync mode. Give up on block resolution attempt. - self.punish(NegativePeerSanction::ForkResolutionError(( - received_block.kernel.header.height, - peer_state.fork_reconciliation_blocks.len() as u16, - received_block.hash(), - ))) - .await?; - warn!( - "Fork reconciliation failed after receiving {} blocks", - peer_state.fork_reconciliation_blocks.len() + 1 - ); - peer_state.fork_reconciliation_blocks = vec![]; - return Ok(()); - } - peer.send(PeerMessage::BlockRequestByHash(parent_digest)) .await?; return Ok(()); - } - - // We got all the way back to genesis, but disagree about genesis. Ban peer. - if parent_block.is_none() && parent_height == BlockHeight::genesis() { - self.punish(NegativePeerSanction::DifferentGenesis).await?; - return Ok(()); - } + }; // We want to treat the received fork reconciliation blocks (plus the // received block) in reverse order, from oldest to newest, because // they were requested from high to low block height. let mut new_blocks = peer_state.fork_reconciliation_blocks.clone(); - new_blocks.push(*received_block); new_blocks.reverse(); - // Reset the fork resolution state since we got all the way back to find a block that we have + // Reset the fork resolution state since we got all the way back to a + // block that we have. let fork_reconciliation_event = !peer_state.fork_reconciliation_blocks.is_empty(); - peer_state.fork_reconciliation_blocks = vec![]; - - // Sanity check, that the blocks are correctly sorted (they should be) - // TODO: This has failed: Investigate! - // See: https://neptune.builders/core-team/neptune-core/issues/125 - // TODO: This assert should be replaced with something to punish or disconnect - // from a peer instead. It can be used by a malevolent peer to crash peer nodes. - let mut new_blocks_sorted_check = new_blocks.clone(); - new_blocks_sorted_check.sort_by(|a, b| a.kernel.header.height.cmp(&b.kernel.header.height)); - assert_eq!( - new_blocks_sorted_check, - new_blocks, - "Block list in fork resolution must be sorted. Got blocks in this order: {}", - new_blocks - .iter() - .map(|b| b.kernel.header.height.to_string()) - .join(", ") - ); + peer_state.fork_reconciliation_blocks.clear(); - // Parent block is guaranteed to be set here. Because: either it was fetched from the - // database, or it's the genesis block. - if let Some(new_block_height) = self - .handle_blocks(new_blocks, parent_block.unwrap()) - .await? - { + if let Some(new_block_height) = self.handle_blocks(new_blocks, parent_block).await? { // If `BlockNotification` was received during a block reconciliation // event, then the peer might have one (or more (unlikely)) blocks // that we do not have. We should thus request those blocks. diff --git a/src/rpc_server.rs b/src/rpc_server.rs index ee4ab28a..a984ae0f 100644 --- a/src/rpc_server.rs +++ b/src/rpc_server.rs @@ -1421,7 +1421,7 @@ impl RPC for NeptuneRPCServer { log_slow_scope!(fn_name!() + "::hash() tip digest"); state.chain.light_state().hash() }; - let tip_header = state.chain.light_state().header().clone(); + let tip_header = *state.chain.light_state().header(); let wallet_status = { log_slow_scope!(fn_name!() + "::get_wallet_status_for_tip()"); state.get_wallet_status_for_tip().await From 8a7f5e8e516934e90b99d0fdb59b5504af48ee13 Mon Sep 17 00:00:00 2001 From: sword-smith Date: Tue, 21 Jan 2025 01:11:22 +0100 Subject: [PATCH 05/11] feat(archival_mmr): Get MPs relative to smaller MMRs Add a function to return MMR-MPs relative to smaller MMRs than. --- src/util_types/mutator_set/archival_mmr.rs | 56 ++++++++++++++++++++++ 1 file changed, 56 insertions(+) diff --git a/src/util_types/mutator_set/archival_mmr.rs b/src/util_types/mutator_set/archival_mmr.rs index 844510dd..fa2f48a9 100644 --- a/src/util_types/mutator_set/archival_mmr.rs +++ b/src/util_types/mutator_set/archival_mmr.rs @@ -210,6 +210,62 @@ impl> ArchivalMmr { } } + /// Return membership proof, as it looks relative to a smaller version of + /// the MMR which only has `num_leafs` leafs. `num_leafs` may not exceed + /// the actual number of leafs. + pub(crate) async fn prove_membership_relative_to_smaller_mmr( + &self, + leaf_index: u64, + num_leafs: u64, + ) -> MmrMembershipProof { + // TODO: Replace this local function with the one in `twenty_first` once + // available through never version. + fn auth_path_node_indices(num_leafs: u64, leaf_index: u64) -> Vec { + assert!( + leaf_index < num_leafs, + "Leaf index out-of-bounds: {leaf_index}/{num_leafs}" + ); + + let (mut merkle_tree_index, _) = + leaf_index_to_mt_index_and_peak_index(leaf_index, num_leafs); + let mut node_index = leaf_index_to_node_index(leaf_index); + let mut height = 0; + let tree_height = u64::BITS - merkle_tree_index.leading_zeros() - 1; + let mut ret = Vec::with_capacity(tree_height as usize); + while merkle_tree_index > 1 { + let is_left_sibling = merkle_tree_index & 1 == 0; + let height_pow = 1u64 << (height + 1); + let as_1_or_minus_1: u64 = (2 * (is_left_sibling as i64) - 1) as u64; + let signed_height_pow = height_pow.wrapping_mul(as_1_or_minus_1); + let sibling = node_index + .wrapping_add(signed_height_pow) + .wrapping_sub(as_1_or_minus_1); + + node_index += 1 << ((height + 1) * is_left_sibling as u32); + + ret.push(sibling); + merkle_tree_index >>= 1; + height += 1; + } + + debug_assert_eq!(tree_height, ret.len() as u32, "Allocation must be optimal"); + + ret + } + + assert!( + num_leafs <= self.num_leafs().await, + "Cannot find membership proofs relative to bigger MMR" + ); + + let node_indices = auth_path_node_indices(num_leafs, leaf_index); + let ap_elements = self.digests.get_many(&node_indices).await; + + MmrMembershipProof { + authentication_path: ap_elements, + } + } + /// Return membership proof pub async fn prove_membership_async(&self, leaf_index: u64) -> MmrMembershipProof { // A proof consists of an authentication path From 625ce19fdb9028d3e2c42a5295d99bb419d0dbf1 Mon Sep 17 00:00:00 2001 From: sword_smith Date: Tue, 21 Jan 2025 15:40:27 +0100 Subject: [PATCH 06/11] test(archival_mmr): Test `prove_membership_relative_to_smaller_mmr` Add tests of this function to generate MMR membership proofs relative to a smaller MMR than the archival MMR in question. Can be used to generate MMR membership proofs relative to blocks earlier in the canonical chain than what is current tip. --- src/util_types/mutator_set/archival_mmr.rs | 41 ++++++++++++++++++---- 1 file changed, 35 insertions(+), 6 deletions(-) diff --git a/src/util_types/mutator_set/archival_mmr.rs b/src/util_types/mutator_set/archival_mmr.rs index fa2f48a9..0e2aa72c 100644 --- a/src/util_types/mutator_set/archival_mmr.rs +++ b/src/util_types/mutator_set/archival_mmr.rs @@ -770,21 +770,50 @@ pub(crate) mod mmr_test { } } + #[proptest(async = "tokio")] + async fn prove_membership_relative_to_smaller_mmr_test( + #[strategy(1u64..200)] _num_leafs: u64, + #[strategy(vec(arb(), #_num_leafs as usize))] digests: Vec, + #[strategy(1u64..=#_num_leafs)] reduced_num_leafs: u64, + #[strategy(0u64..#reduced_num_leafs)] leaf_index: u64, + ) { + let leaf = digests[leaf_index as usize]; + let smaller_mmr = + MmrAccumulator::new_from_leafs(digests[0..reduced_num_leafs as usize].to_vec()); + let ammr = mock::get_ammr_from_digests(digests).await; + let mp = ammr + .prove_membership_relative_to_smaller_mmr(leaf_index, reduced_num_leafs) + .await; + prop_assert!(mp.verify( + leaf_index, + leaf, + &smaller_mmr.peaks(), + smaller_mmr.num_leafs() + )); + } + #[tokio::test] async fn mmr_prove_verify_leaf_mutation_test() { - for size in 1..150 { + for size in 1u64..150 { let new_leaf: Digest = random(); let bad_leaf: Digest = random(); - let leaf_hashes_tip5: Vec = random_elements(size); + let leaf_hashes_tip5: Vec = random_elements(size as usize); let mut acc = MmrAccumulator::new_from_leafs(leaf_hashes_tip5.clone()); let mut archival: ArchivalMmr = mock::get_ammr_from_digests(leaf_hashes_tip5.clone()).await; let archival_end_state: ArchivalMmr = - mock::get_ammr_from_digests(vec![new_leaf; size]).await; + mock::get_ammr_from_digests(vec![new_leaf; size as usize]).await; for i in 0..size { - let i = i as u64; let peaks_before_update = archival.peaks().await; let mp = archival.prove_membership_async(i).await; + assert_eq!( + mp, + archival + .prove_membership_relative_to_smaller_mmr(i, size) + .await, + "Two ways of getting MMRMPs must agree" + ); + assert_eq!(archival.peaks().await, peaks_before_update); // Verify the update operation using the batch verifier @@ -809,8 +838,8 @@ pub(crate) mod mmr_test { acc.mutate_leaf(LeafMutation::new(i, new_leaf, mp)); let new_archival_peaks = archival.peaks().await; assert_eq!(new_archival_peaks, acc.peaks()); - assert_eq!(size as u64, archival.num_leafs().await); - assert_eq!(size as u64, acc.num_leafs()); + assert_eq!(size, archival.num_leafs().await); + assert_eq!(size, acc.num_leafs()); } assert_eq!(archival_end_state.peaks().await, acc.peaks()); } From ea380e9d4f926db257711005ccdedfed0a21e543 Mon Sep 17 00:00:00 2001 From: sword_smith Date: Tue, 21 Jan 2025 15:42:16 +0100 Subject: [PATCH 07/11] style(archival_state): Rename variable Remove overload of the word `new`, which previously referred to a) whether block was already stored in database and on disk, and b) whether the block in question should be considered the new tip. --- src/models/state/archival_state.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/models/state/archival_state.rs b/src/models/state/archival_state.rs index a765ea43..31fa2a29 100644 --- a/src/models/state/archival_state.rs +++ b/src/models/state/archival_state.rs @@ -413,7 +413,7 @@ impl ArchivalState { Ok(block_index_entries) } - async fn write_block_internal(&mut self, block: &Block, is_new_tip: bool) -> Result<()> { + async fn write_block_internal(&mut self, block: &Block, is_canonical_tip: bool) -> Result<()> { let block_is_new = self.get_block_header(block.hash()).await.is_none(); let mut block_index_entries = if block_is_new { self.store_block(block).await? @@ -426,7 +426,7 @@ impl ArchivalState { }; // Mark block as tip, conditionally - if is_new_tip { + if is_canonical_tip { block_index_entries.push(( BlockIndexKey::BlockTipDigest, BlockIndexValue::BlockTipDigest(block.hash()), From bd206dd42aef259a7894a391bfdef1a225cf2572 Mon Sep 17 00:00:00 2001 From: Alan Szepieniec Date: Tue, 21 Jan 2025 17:16:31 +0100 Subject: [PATCH 08/11] refactor!(`peer_loop`): Use block MMR for syncing and sync challenge 1. Introduce new type `SyncAnchor` and upgrade field `syncing` (renamed to `sync_anchor`) living on `GlobalState::net` to an `Option` over this type. This struct has two fields, a `ProofOfWork` and `MmrAccumulator`. When this field is set, the node is in sync mode. 2. The proof of work field is used to switch sync targets mid-sync. 3. The MMR accumulator is used to authenticate incoming blocks. This MMRA represents the state *after* the peer's tip's hash has been appended, so it is consistent with the block MMR in the peer's archival state. 4. Messages `AddPeerMaxBlockHeight`, `MainToPeerTaskBatchBlockRequest`, and `BlockRequestBatch` are modified to include (a duplicate of) this MMRA. 5. Message `BlockResponseBatch` is modified to include an MMR membership proof for every transferred block. Upon receiving this message, all received blocks are validated against the MMRA. 6. Add tests. Co-authored-by: Alan Szepieniec --- src/lib.rs | 3 +- src/main_loop.rs | 53 +++-- src/mine_loop.rs | 4 +- src/models/channel.rs | 17 +- src/models/peer.rs | 28 ++- src/models/state/mod.rs | 6 +- src/models/state/networking_state.rs | 31 ++- src/peer_loop.rs | 336 ++++++++++++++++++++------- src/rpc_server.rs | 2 +- src/tests/shared.rs | 30 ++- 10 files changed, 369 insertions(+), 141 deletions(-) diff --git a/src/lib.rs b/src/lib.rs index 8df052a0..152979f7 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -158,8 +158,7 @@ pub async fn initialize(cli_args: cli_args::Args) -> Result { // Create handshake data which is used when connecting to outgoing peers specified in the // CLI arguments - let syncing = false; - let networking_state = NetworkingState::new(peer_map, peer_databases, syncing); + let networking_state = NetworkingState::new(peer_map, peer_databases); let light_state: LightState = LightState::from(latest_block.clone()); let blockchain_archival_state = BlockchainArchivalState { diff --git a/src/main_loop.rs b/src/main_loop.rs index 02374805..ee62fb35 100644 --- a/src/main_loop.rs +++ b/src/main_loop.rs @@ -50,6 +50,7 @@ use crate::models::peer::PeerSynchronizationState; use crate::models::proof_abstractions::tasm::program::TritonVmProofJobOptions; use crate::models::state::block_proposal::BlockProposal; use crate::models::state::mempool::TransactionOrigin; +use crate::models::state::networking_state::SyncAnchor; use crate::models::state::tx_proving_capability::TxProvingCapability; use crate::models::state::GlobalState; use crate::models::state::GlobalStateLock; @@ -617,7 +618,7 @@ impl MainLoopHandler { self.main_to_miner_tx.send(MainToMiner::WaitForContinue); // Get out of sync mode if needed - if global_state_mut.net.syncing { + if global_state_mut.net.sync_anchor.is_some() { let stay_in_sync_mode = stay_in_sync_mode( &last_block.kernel.header, &main_loop_state.sync_state, @@ -625,7 +626,7 @@ impl MainLoopHandler { ); if !stay_in_sync_mode { info!("Exiting sync mode"); - global_state_mut.net.syncing = false; + global_state_mut.net.sync_anchor = None; self.main_to_miner_tx.send(MainToMiner::StopSyncing); } } @@ -669,36 +670,41 @@ impl MainLoopHandler { // Inform miner about new block. self.main_to_miner_tx.send(MainToMiner::NewBlock); } - PeerTaskToMain::AddPeerMaxBlockHeight(( - socket_addr, - claimed_max_height, - claimed_max_accumulative_pow, - )) => { + PeerTaskToMain::AddPeerMaxBlockHeight { + peer_address, + claimed_height, + claimed_cumulative_pow, + claimed_block_mmra, + } => { log_slow_scope!(fn_name!() + "::PeerTaskToMain::AddPeerMaxBlockHeight"); let claimed_state = - PeerSynchronizationState::new(claimed_max_height, claimed_max_accumulative_pow); + PeerSynchronizationState::new(claimed_height, claimed_cumulative_pow); main_loop_state .sync_state .peer_sync_states - .insert(socket_addr, claimed_state); + .insert(peer_address, claimed_state); // Check if synchronization mode should be activated. // Synchronization mode is entered if accumulated PoW exceeds // our tip and if the height difference is positive and beyond // a threshold value. - // TODO: If we are not checking the PoW claims of the tip this - // can be abused by forcing the client into synchronization - // mode. let mut global_state_mut = self.global_state_lock.lock_guard_mut().await; - if global_state_mut - .sync_mode_criterion(claimed_max_height, claimed_max_accumulative_pow) + if global_state_mut.sync_mode_criterion(claimed_height, claimed_cumulative_pow) + && global_state_mut + .net + .sync_anchor + .as_ref() + .is_none_or(|sa| sa.cumulative_proof_of_work < claimed_cumulative_pow) { info!( - "Entering synchronization mode due to peer {} indicating tip height {}; pow family: {:?}", - socket_addr, claimed_max_height, claimed_max_accumulative_pow - ); - global_state_mut.net.syncing = true; + "Entering synchronization mode due to peer {} indicating tip height {}; cumulative pow: {:?}", + peer_address, claimed_height, claimed_cumulative_pow + ); + global_state_mut.net.sync_anchor = Some(SyncAnchor { + cumulative_proof_of_work: claimed_cumulative_pow, + block_mmr: claimed_block_mmra, + }); self.main_to_miner_tx.send(MainToMiner::StartSyncing); } } @@ -717,7 +723,7 @@ impl MainLoopHandler { // Get out of sync mode if needed. let mut global_state_mut = self.global_state_lock.lock_guard_mut().await; - if global_state_mut.net.syncing { + if global_state_mut.net.sync_anchor.is_some() { let stay_in_sync_mode = stay_in_sync_mode( global_state_mut.chain.light_state().header(), &main_loop_state.sync_state, @@ -725,7 +731,7 @@ impl MainLoopHandler { ); if !stay_in_sync_mode { info!("Exiting sync mode"); - global_state_mut.net.syncing = false; + global_state_mut.net.sync_anchor = None; } } } @@ -1031,9 +1037,9 @@ impl MainLoopHandler { let global_state = self.global_state_lock.lock_guard().await; // Check if we are in sync mode - if !global_state.net.syncing { + let Some(anchor) = &global_state.net.sync_anchor else { return Ok(()); - } + }; info!("Running sync"); @@ -1101,6 +1107,7 @@ impl MainLoopHandler { MainToPeerTaskBatchBlockRequest { peer_addr_target: *chosen_peer, known_blocks: most_canonical_digests, + anchor_mmr: anchor.block_mmr.clone(), }, )) .expect("Sending message to peers must succeed"); @@ -1136,7 +1143,7 @@ impl MainLoopHandler { .proof_upgrader_task .as_ref() .is_some_and(|x| !x.is_finished()); - Ok(!global_state.net.syncing + Ok(global_state.net.sync_anchor.is_none() && global_state.proving_capability() == TxProvingCapability::SingleProof && !previous_upgrade_task_is_still_running && tx_upgrade_interval diff --git a/src/mine_loop.rs b/src/mine_loop.rs index 1398f50b..e862d43a 100644 --- a/src/mine_loop.rs +++ b/src/mine_loop.rs @@ -620,7 +620,9 @@ pub(crate) async fn mine( let (guesser_tx, guesser_rx) = oneshot::channel::(); let (composer_tx, composer_rx) = oneshot::channel::<(Block, Vec)>(); - let is_syncing = global_state_lock.lock(|s| s.net.syncing).await; + let is_syncing = global_state_lock + .lock(|s| s.net.sync_anchor.is_some()) + .await; let maybe_proposal = global_state_lock.lock_guard().await.block_proposal.clone(); let guess = cli_args.guess; diff --git a/src/models/channel.rs b/src/models/channel.rs index 7e917146..7670455f 100644 --- a/src/models/channel.rs +++ b/src/models/channel.rs @@ -3,6 +3,7 @@ use std::net::SocketAddr; use serde::Deserialize; use serde::Serialize; use tasm_lib::triton_vm::prelude::Digest; +use tasm_lib::twenty_first::util_types::mmr::mmr_accumulator::MmrAccumulator; use super::blockchain::block::block_height::BlockHeight; use super::blockchain::block::difficulty_control::ProofOfWork; @@ -82,6 +83,10 @@ pub struct MainToPeerTaskBatchBlockRequest { /// that the we would prefer to build on top off, if it belongs to the /// canonical chain. pub(crate) known_blocks: Vec, + + /// The block MMR accumulator relative to which incoming blocks are + /// authenticated. + pub(crate) anchor_mmr: MmrAccumulator, } #[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)] @@ -136,7 +141,15 @@ impl MainToPeerTask { #[derive(Clone, Debug, PartialEq, Eq)] pub(crate) enum PeerTaskToMain { NewBlocks(Vec), - AddPeerMaxBlockHeight((SocketAddr, BlockHeight, ProofOfWork)), + AddPeerMaxBlockHeight { + peer_address: SocketAddr, + claimed_height: BlockHeight, + claimed_cumulative_pow: ProofOfWork, + + /// The MMR *after* adding the tip hash, so not the one contained in the + /// tip, but in its child. + claimed_block_mmra: MmrAccumulator, + }, RemovePeerMaxBlockHeight(SocketAddr), PeerDiscoveryAnswer((Vec<(SocketAddr, u128)>, SocketAddr, u8)), // ([(peer_listen_address)], reported_by, distance) Transaction(Box), @@ -154,7 +167,7 @@ impl PeerTaskToMain { pub fn get_type(&self) -> String { match self { PeerTaskToMain::NewBlocks(_) => "new blocks", - PeerTaskToMain::AddPeerMaxBlockHeight(_) => "add peer max block height", + PeerTaskToMain::AddPeerMaxBlockHeight { .. } => "add peer max block height", PeerTaskToMain::RemovePeerMaxBlockHeight(_) => "remove peer max block height", PeerTaskToMain::PeerDiscoveryAnswer(_) => "peer discovery answer", PeerTaskToMain::Transaction(_) => "transaction", diff --git a/src/models/peer.rs b/src/models/peer.rs index 49c35ff1..7ebc279a 100644 --- a/src/models/peer.rs +++ b/src/models/peer.rs @@ -16,6 +16,7 @@ use serde::Deserialize; use serde::Serialize; use tasm_lib::twenty_first::prelude::Mmr; use tasm_lib::twenty_first::prelude::MmrMembershipProof; +use tasm_lib::twenty_first::util_types::mmr::mmr_accumulator::MmrAccumulator; use tracing::trace; use transaction_notification::TransactionNotification; use transfer_transaction::TransferTransaction; @@ -157,6 +158,7 @@ pub enum NegativePeerSanction { BatchBlocksRequestEmpty, InvalidTransaction, UnconfirmableTransaction, + InvalidBlockMmrAuthentication, InvalidTransferBlock, @@ -221,6 +223,9 @@ impl Display for NegativePeerSanction { NegativePeerSanction::TimedOutSyncChallengeResponse => { "timed-out sync challenge response" } + NegativePeerSanction::InvalidBlockMmrAuthentication => { + "invalid block mmr authentication" + } }; write!(f, "{string}") } @@ -288,6 +293,7 @@ impl Sanction for NegativePeerSanction { NegativePeerSanction::UnexpectedSyncChallengeResponse => -1, NegativePeerSanction::InvalidTransferBlock => -50, NegativePeerSanction::TimedOutSyncChallengeResponse => -50, + NegativePeerSanction::InvalidBlockMmrAuthentication => -4, } } } @@ -483,6 +489,16 @@ pub struct BlockRequestBatch { /// Indicates the maximum allowed number of blocks in the response. pub(crate) max_response_len: usize, + + /// The block MMR accumulator of the tip of the chain which the node is + /// syncing towards. Its number of leafs is the block height the node is + /// syncing towards. + /// + /// The receiver needs this value to know which MMR authentication paths to + /// attach to the blocks in the response. These paths allow the receiver of + /// a batch of blocks to verify that the received blocks are indeed + /// ancestors to a given tip. + pub(crate) anchor: MmrAccumulator, } #[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)] @@ -506,7 +522,7 @@ pub(crate) enum PeerMessage { BlockRequestByHash(Digest), BlockRequestBatch(BlockRequestBatch), // TODO: Consider restricting this in size - BlockResponseBatch(Vec), // TODO: Consider restricting this in size + BlockResponseBatch(Vec<(TransferBlock, MmrMembershipProof)>), // TODO: Consider restricting this in size UnableToSatisfyBatchRequest, SyncChallenge(SyncChallenge), @@ -741,7 +757,7 @@ impl SyncChallengeResponse { /// Determine whether the `SyncChallengeResponse` answers the given /// `IssuedSyncChallenge`, and not some other one. pub(crate) fn matches(&self, issued_challenge: IssuedSyncChallenge) -> bool { - let Ok(tip_predecessor) = Block::try_from(self.tip_parent.clone()) else { + let Ok(tip_parent) = Block::try_from(self.tip_parent.clone()) else { return false; }; let Ok(tip) = Block::try_from(self.tip.clone()) else { @@ -754,7 +770,7 @@ impl SyncChallengeResponse { .all(|((_, child), challenge_height)| child.header.height == *challenge_height) && issued_challenge.challenge.tip_digest == tip.hash() && issued_challenge.accumulated_pow == tip.header().cumulative_proof_of_work - && tip.has_proof_of_work(tip_predecessor.header()) + && tip.has_proof_of_work(tip_parent.header()) } /// Determine whether the proofs in `SyncChallengeResponse` are valid. Also @@ -772,6 +788,8 @@ impl SyncChallengeResponse { return false; } + let mut mmra_anchor = tip.body().block_mmr_accumulator.to_owned(); + mmra_anchor.append(tip.hash()); for ((parent, child), membership_proof) in self.blocks.iter().zip(self.membership_proofs.iter()) { @@ -781,8 +799,8 @@ impl SyncChallengeResponse { if !membership_proof.verify( child.header().height.into(), child.hash(), - &tip.body().block_mmr_accumulator.peaks(), - tip.header().height.into(), + &mmra_anchor.peaks(), + mmra_anchor.num_leafs(), ) { return false; } diff --git a/src/models/state/mod.rs b/src/models/state/mod.rs index d1d15848..54ca251c 100644 --- a/src/models/state/mod.rs +++ b/src/models/state/mod.rs @@ -1473,7 +1473,7 @@ impl GlobalState { pub async fn resync_membership_proofs(&mut self) -> Result<()> { // Do not fix memberhip proofs if node is in sync mode, as we would otherwise // have to sync many times, instead of just *one* time once we have caught up. - if self.net.syncing { + if self.net.sync_anchor.is_some() { debug!("Not syncing MS membership proofs because we are syncing"); return Ok(()); } @@ -1573,6 +1573,10 @@ impl GlobalState { // does not match ours. That's a known deficiency of this function, // and can be fixed by correctly handling the construction of old // MMR-MPs from the current archival MMR state. + // Notice that the MMR membership proofs are relative to an MMR + // where the tip digest *has* been added. So it is not relative to + // the block MMR accumulator present in the tip block, as it only + // refers to its ancestors. block_mmr_mps.push( self.chain .archival_state() diff --git a/src/models/state/networking_state.rs b/src/models/state/networking_state.rs index aab27e7a..f5b97366 100644 --- a/src/models/state/networking_state.rs +++ b/src/models/state/networking_state.rs @@ -4,11 +4,13 @@ use std::net::SocketAddr; use std::time::SystemTime; use anyhow::Result; +use tasm_lib::twenty_first::util_types::mmr::mmr_accumulator::MmrAccumulator; use crate::config_models::data_directory::DataDirectory; use crate::database::create_db_if_missing; use crate::database::NeptuneLevelDb; use crate::database::WriteBatchAsync; +use crate::models::blockchain::block::difficulty_control::ProofOfWork; use crate::models::database::PeerDatabases; use crate::models::peer; use crate::models::peer::PeerStanding; @@ -17,24 +19,31 @@ pub const BANNED_IPS_DB_NAME: &str = "banned_ips"; type PeerMap = HashMap; +/// Information about a foreign tip towards which the client is syncing. +#[derive(Debug, Clone, Eq, PartialEq)] +pub(crate) struct SyncAnchor { + pub(crate) cumulative_proof_of_work: ProofOfWork, + pub(crate) block_mmr: MmrAccumulator, +} + /// `NetworkingState` contains in-memory and persisted data for interacting /// with network peers. #[derive(Debug, Clone)] pub struct NetworkingState { - // Stores info about the peers that the client is connected to - // Peer tasks may update their own entries into this map. + /// Stores info about the peers that the client is connected to + /// Peer tasks may update their own entries into this map. pub peer_map: PeerMap, - // `peer_databases` are used to persist IPs with their standing. - // The peer tasks may update their own entries into this map. + /// `peer_databases` are used to persist IPs with their standing. + /// The peer tasks may update their own entries into this map. pub peer_databases: PeerDatabases, - // This value is only true if instance is running an archival node - // that is currently downloading blocks to catch up. - // Only the main task may update this flag - pub syncing: bool, + /// This value is only Some if the instance is running an archival node + /// that is currently in sync mode (downloading blocks in batches). + /// Only the main task may update this flag + pub(crate) sync_anchor: Option, - // Read-only value set during startup + /// Read-only value set at random during startup pub instance_id: u128, /// Timestamp for when the last tx-proof upgrade was attempted. Does not @@ -44,11 +53,11 @@ pub struct NetworkingState { } impl NetworkingState { - pub(crate) fn new(peer_map: PeerMap, peer_databases: PeerDatabases, syncing: bool) -> Self { + pub(crate) fn new(peer_map: PeerMap, peer_databases: PeerDatabases) -> Self { Self { peer_map, peer_databases, - syncing, + sync_anchor: None, instance_id: rand::random(), // Initialize to now to prevent tx proof upgrade to run immediately diff --git a/src/peer_loop.rs b/src/peer_loop.rs index fed8fdff..0f48ee0f 100644 --- a/src/peer_loop.rs +++ b/src/peer_loop.rs @@ -15,6 +15,9 @@ use rand::thread_rng; use rand::Rng; use rand::SeedableRng; use tasm_lib::triton_vm::prelude::Digest; +use tasm_lib::twenty_first::prelude::Mmr; +use tasm_lib::twenty_first::prelude::MmrMembershipProof; +use tasm_lib::twenty_first::util_types::mmr::mmr_accumulator::MmrAccumulator; use tokio::select; use tokio::sync::broadcast; use tokio::sync::mpsc; @@ -50,6 +53,7 @@ use crate::models::proof_abstractions::mast_hash::MastHash; use crate::models::proof_abstractions::timestamp::Timestamp; use crate::models::state::mempool::MEMPOOL_IGNORE_TRANSACTIONS_THIS_MANY_SECS_AHEAD; use crate::models::state::mempool::MEMPOOL_TX_THRESHOLD_AGE_IN_SECS; +use crate::models::state::GlobalState; use crate::models::state::GlobalStateLock; const STANDARD_BLOCK_BATCH_SIZE: usize = 250; @@ -206,6 +210,53 @@ impl PeerLoopHandler { } } + /// Construct a batch response, with blocks and their MMR membership proofs + /// relative to a specified anchor. + /// + /// Returns `None` if the anchor has a lower leaf count than the blocks, or + /// a block height of the response exceeds own tip height. + async fn batch_response( + state: &GlobalState, + blocks: Vec, + anchor: &MmrAccumulator, + ) -> Option> { + let own_tip_height = state.chain.light_state().header().height; + let block_heights_match_anchor = blocks + .iter() + .all(|bl| bl.header().height < anchor.num_leafs().into()); + let block_heights_known = blocks.iter().all(|bl| bl.header().height <= own_tip_height); + if !block_heights_match_anchor || !block_heights_known { + let max_block_height = match blocks.iter().map(|bl| bl.header().height).max() { + Some(height) => height.to_string(), + None => "None".to_owned(), + }; + + debug!("max_block_height: {max_block_height}"); + debug!("own_tip_height: {own_tip_height}"); + debug!("anchor.num_leafs(): {}", anchor.num_leafs()); + debug!("block_heights_match_anchor: {block_heights_match_anchor}"); + debug!("block_heights_known: {block_heights_known}"); + return None; + } + + let mut ret = vec![]; + for block in blocks { + let mmr_mp = state + .chain + .archival_state() + .archival_block_mmr + .prove_membership_relative_to_smaller_mmr( + block.header().height.into(), + anchor.num_leafs(), + ) + .await; + let block: TransferBlock = block.try_into().unwrap(); + ret.push((block, mmr_mp)); + } + + Some(ret) + } + /// Handle validation and send all blocks to the main task if they're all /// valid. Use with a list of blocks or a single block. When the /// `received_blocks` is a list, the parent of the `i+1`th block in the @@ -620,7 +671,13 @@ impl PeerLoopHandler { if block_is_new && peer_state_info.fork_reconciliation_blocks.is_empty() - && !self.global_state_lock.lock_guard().await.net.syncing + && self + .global_state_lock + .lock_guard() + .await + .net + .sync_anchor + .is_none() { debug!( "sending BlockRequestByHeight to peer for block with height {}", @@ -713,13 +770,17 @@ impl PeerLoopHandler { info!("Successful sync challenge response; relaying peer tip info to main loop."); + let mut sync_mmra_anchor = challenge_response.tip.body.block_mmr_accumulator; + sync_mmra_anchor.append(issued_challenge.challenge.tip_digest); + // Inform main loop self.to_main_tx - .send(PeerTaskToMain::AddPeerMaxBlockHeight(( - self.peer_address, - claimed_tip_height, - issued_challenge.accumulated_pow, - ))) + .send(PeerTaskToMain::AddPeerMaxBlockHeight { + peer_address: self.peer_address, + claimed_height: claimed_tip_height, + claimed_cumulative_pow: issued_challenge.accumulated_pow, + claimed_block_mmra: sync_mmra_anchor, + }) .await?; Ok(KEEP_CONNECTION_ALIVE) @@ -827,6 +888,7 @@ impl PeerLoopHandler { PeerMessage::BlockRequestBatch(BlockRequestBatch { known_blocks, max_response_len, + anchor, }) => { log_slow_scope!(fn_name!() + "::PeerMessage::BlockRequestBatch"); debug!( @@ -848,15 +910,15 @@ impl PeerLoopHandler { } }; - if !self - .global_state_lock - .lock_guard() - .await + let state = self.global_state_lock.lock_guard().await; + let block_mmr_num_leafs = state.chain.light_state().header().height.next().into(); + let luca_is_known = state .chain .archival_state() .block_belongs_to_canonical_chain(least_preferred) - .await - { + .await; + if !luca_is_known || anchor.num_leafs() > block_mmr_num_leafs { + drop(state); self.punish(NegativePeerSanction::BatchBlocksUnknownRequest) .await?; peer.send(PeerMessage::UnableToSatisfyBatchRequest).await?; @@ -866,17 +928,16 @@ impl PeerLoopHandler { // Happy case: At least *one* of the blocks referenced by peer // is known to us. - let mut first_block_in_response: Option = None; - { - let global_state = self.global_state_lock.lock_guard().await; + let first_block_in_response = { + let mut first_block_in_response: Option = None; for block_digest in known_blocks { - if global_state + if state .chain .archival_state() .block_belongs_to_canonical_chain(block_digest) .await { - let height = global_state + let height = state .chain .archival_state() .get_block_header(block_digest) @@ -891,42 +952,36 @@ impl PeerLoopHandler { break; } } - } - let peers_preferred_canonical_block = match first_block_in_response { - Some(block) => block, - None => { - self.punish(NegativePeerSanction::BatchBlocksUnknownRequest) - .await?; - return Ok(KEEP_CONNECTION_ALIVE); - } + first_block_in_response + .expect("existence of LUCA should have been established already.") }; debug!( - "Peer's most preferred block has height {peers_preferred_canonical_block}.\ + "Peer's most preferred block has height {first_block_in_response}.\ Now building response from that height." ); // Get the relevant blocks, at most batch-size many, descending from the // peer's (alleged) most canonical block. Don't exceed `max_response_len` // or `STANDARD_BLOCK_BATCH_SIZE` number of blocks in response. - let len_of_response = cmp::min( + let max_response_len = cmp::min( max_response_len, self.global_state_lock.cli().sync_mode_threshold, ); - let len_of_response = cmp::max(len_of_response, MINIMUM_BLOCK_BATCH_SIZE); - let len_of_response = cmp::min(len_of_response, STANDARD_BLOCK_BATCH_SIZE); + let max_response_len = cmp::max(max_response_len, MINIMUM_BLOCK_BATCH_SIZE); + let max_response_len = cmp::min(max_response_len, STANDARD_BLOCK_BATCH_SIZE); - let mut digests_of_returned_blocks = Vec::with_capacity(len_of_response); - let response_start_height: u64 = peers_preferred_canonical_block.into(); + let mut digests_of_returned_blocks = Vec::with_capacity(max_response_len); + let response_start_height: u64 = first_block_in_response.into(); let mut i: u64 = 1; - let global_state = self.global_state_lock.lock_guard().await; - while digests_of_returned_blocks.len() < len_of_response { - match global_state + while digests_of_returned_blocks.len() < max_response_len { + let block_height = response_start_height + i; + match state .chain .archival_state() .archival_block_mmr - .try_get_leaf(response_start_height + i) + .try_get_leaf(block_height) .await { Some(digest) => { @@ -937,36 +992,47 @@ impl PeerLoopHandler { i += 1; } - let mut returned_blocks: Vec = + let mut returned_blocks: Vec = Vec::with_capacity(digests_of_returned_blocks.len()); for block_digest in digests_of_returned_blocks { - let block = global_state + let block = state .chain .archival_state() .get_block(block_digest) .await? .unwrap(); - returned_blocks.push(block.try_into().unwrap()); + returned_blocks.push(block); } - debug!( - "Returning {} blocks in batch response", - returned_blocks.len() - ); + let response = Self::batch_response(&state, returned_blocks, &anchor).await; + let response = match response { + Some(response) => response, + None => { + drop(state); + warn!("Unable to satisfy batch-block request"); + self.punish(NegativePeerSanction::BatchBlocksUnknownRequest) + .await?; + return Ok(KEEP_CONNECTION_ALIVE); + } + }; - let response = PeerMessage::BlockResponseBatch(returned_blocks); + debug!("Returning {} blocks in batch response", response.len()); + + let response = PeerMessage::BlockResponseBatch(response); peer.send(response).await?; Ok(KEEP_CONNECTION_ALIVE) } - PeerMessage::BlockResponseBatch(t_blocks) => { + PeerMessage::BlockResponseBatch(authenticated_blocks) => { log_slow_scope!(fn_name!() + "::PeerMessage::BlockResponseBatch"); debug!( "handling block response batch with {} blocks", - t_blocks.len() + authenticated_blocks.len() ); - if t_blocks.len() < MINIMUM_BLOCK_BATCH_SIZE { + + // (Alan:) why is there even a minimum? + if authenticated_blocks.len() < MINIMUM_BLOCK_BATCH_SIZE { warn!("Got smaller batch response than allowed"); self.punish(NegativePeerSanction::TooShortBlockBatch) .await?; @@ -976,17 +1042,25 @@ impl PeerLoopHandler { // Verify that we are in fact in syncing mode // TODO: Seperate peer messages into those allowed under syncing // and those that are not - if !self.global_state_lock.lock_guard().await.net.syncing { + let Some(sync_achor) = self + .global_state_lock + .lock_guard() + .await + .net + .sync_anchor + .clone() + else { warn!("Received a batch of blocks without being in syncing mode"); self.punish(NegativePeerSanction::ReceivedBatchBlocksOutsideOfSync) .await?; return Ok(KEEP_CONNECTION_ALIVE); - } + }; // Verify that the response matches the current state // We get the latest block from the DB here since this message is // only valid for archival nodes. - let first_blocks_parent_digest: Digest = t_blocks[0].header.prev_block_digest; + let (first_block, _) = &authenticated_blocks[0]; + let first_blocks_parent_digest: Digest = first_block.header.prev_block_digest; let most_canonical_own_block_match: Option = self .global_state_lock .lock_guard() @@ -1012,18 +1086,27 @@ impl PeerLoopHandler { most_canonical_own_block_match.kernel.header.height ); let mut received_blocks = vec![]; - for t_block in t_blocks { - match Block::try_from(t_block) { - Ok(block) => { - received_blocks.push(block); - } - Err(e) => { - warn!("Received invalid transfer block from peer: {e:?}"); - self.punish(NegativePeerSanction::InvalidTransferBlock) - .await?; - return Ok(KEEP_CONNECTION_ALIVE); - } + for (t_block, membership_proof) in authenticated_blocks { + let Ok(block) = Block::try_from(t_block) else { + warn!("Received invalid transfer block from peer"); + self.punish(NegativePeerSanction::InvalidTransferBlock) + .await?; + return Ok(KEEP_CONNECTION_ALIVE); + }; + + if !membership_proof.verify( + block.header().height.into(), + block.hash(), + &sync_achor.block_mmr.peaks(), + sync_achor.block_mmr.num_leafs(), + ) { + warn!("Authentication of received block fails relative to anchor"); + self.punish(NegativePeerSanction::InvalidBlockMmrAuthentication) + .await?; + return Ok(KEEP_CONNECTION_ALIVE); } + + received_blocks.push(block); } // Get the latest block that we know of and handle all received blocks @@ -1361,6 +1444,7 @@ impl PeerLoopHandler { peer.send(PeerMessage::BlockRequestBatch(BlockRequestBatch { known_blocks: batch_block_request.known_blocks, max_response_len, + anchor: batch_block_request.anchor_mmr, })) .await?; @@ -1456,7 +1540,7 @@ impl PeerLoopHandler { break; } Some(peer_msg) => { - let syncing = self.global_state_lock.lock(|s| s.net.syncing).await; + let syncing = self.global_state_lock.lock(|s| s.net.sync_anchor.is_some()).await; if peer_msg.ignore_during_sync() && syncing { debug!("Ignoring {} message during syncing, from {}", peer_msg.get_type(), self.peer_address); continue; @@ -1984,21 +2068,41 @@ mod peer_loop_tests { StdRng::seed_from_u64(5550001).gen(), ) .await; - let blocks = vec![genesis_block, block_1, block_2, block_3, block_4, block_5]; + let blocks = vec![ + genesis_block, + block_1, + block_2, + block_3, + block_4, + block_5.clone(), + ]; for block in blocks.iter().skip(1) { state_lock.set_new_tip(block.to_owned()).await.unwrap(); } + let mmra = state_lock + .lock_guard() + .await + .chain + .archival_state() + .archival_block_mmr + .to_accumulator_async() + .await; for i in 0..=4 { - let response = (i + 1..=5) - .map(|j| blocks[j].clone().try_into().unwrap()) - .collect_vec(); + let expected_response = { + let state = state_lock.lock_guard().await; + let blocks_for_response = blocks.iter().skip(i + 1).cloned().collect_vec(); + PeerLoopHandler::batch_response(&state, blocks_for_response, &mmra) + .await + .unwrap() + }; let mock = Mock::new(vec![ Action::Read(PeerMessage::BlockRequestBatch(BlockRequestBatch { known_blocks: vec![blocks[i].hash()], max_response_len: 14, + anchor: mmra.clone(), })), - Action::Write(PeerMessage::BlockResponseBatch(response)), + Action::Write(PeerMessage::BlockResponseBatch(expected_response)), Action::Read(PeerMessage::Bye), ]); let mut peer_loop_handler = PeerLoopHandler::new( @@ -2049,16 +2153,32 @@ mod peer_loop_tests { state_lock.set_new_tip(block_3_b.clone()).await?; state_lock.set_new_tip(block_3_a.clone()).await?; + let anchor = state_lock + .lock_guard() + .await + .chain + .archival_state() + .archival_block_mmr + .to_accumulator_async() + .await; + let response_1 = { + let state_lock = state_lock.lock_guard().await; + PeerLoopHandler::batch_response( + &state_lock, + vec![block_1.clone(), block_2_a.clone(), block_3_a.clone()], + &anchor, + ) + .await + .unwrap() + }; + let mut mock = Mock::new(vec![ Action::Read(PeerMessage::BlockRequestBatch(BlockRequestBatch { known_blocks: vec![genesis_block.hash()], max_response_len: 14, + anchor: anchor.clone(), })), - Action::Write(PeerMessage::BlockResponseBatch(vec![ - block_1.clone().try_into().unwrap(), - block_2_a.clone().try_into().unwrap(), - block_3_a.clone().try_into().unwrap(), - ])), + Action::Write(PeerMessage::BlockResponseBatch(response_1)), Action::Read(PeerMessage::Bye), ]); @@ -2077,15 +2197,23 @@ mod peer_loop_tests { .await?; // Peer knows block 2_b, verify that canonical chain with 2_a is returned + let response_2 = { + let state_lock = state_lock.lock_guard().await; + PeerLoopHandler::batch_response( + &state_lock, + vec![block_2_a, block_3_a.clone()], + &anchor, + ) + .await + .unwrap() + }; mock = Mock::new(vec![ Action::Read(PeerMessage::BlockRequestBatch(BlockRequestBatch { known_blocks: vec![block_2_b.hash(), block_1.hash(), genesis_block.hash()], max_response_len: 14, + anchor, })), - Action::Write(PeerMessage::BlockResponseBatch(vec![ - block_2_a.try_into().unwrap(), - block_3_a.clone().try_into().unwrap(), - ])), + Action::Write(PeerMessage::BlockResponseBatch(response_2)), Action::Read(PeerMessage::Bye), ]); @@ -2141,18 +2269,40 @@ mod peer_loop_tests { state_lock.set_new_tip(block_3_a.clone()).await?; // Peer knows block 2_b, verify that canonical chain with 2_a is returned + let mut expected_anchor = block_3_a.body().block_mmr_accumulator.clone(); + expected_anchor.append(block_3_a.hash()); + let state_anchor = state_lock + .lock_guard() + .await + .chain + .archival_state() + .archival_block_mmr + .to_accumulator_async() + .await; + assert_eq!( + expected_anchor, state_anchor, + "Catching assumption about MMRA in tip and in archival state" + ); + + let response = { + let state_lock = state_lock.lock_guard().await; + PeerLoopHandler::batch_response( + &state_lock, + vec![block_1.clone(), block_2_a, block_3_a.clone()], + &expected_anchor, + ) + .await + .unwrap() + }; let mock = Mock::new(vec![ Action::Read(PeerMessage::BlockRequestBatch(BlockRequestBatch { known_blocks: vec![block_2_b.hash(), genesis_block.hash(), block_1.hash()], max_response_len: 14, + anchor: expected_anchor, })), // Since genesis block is the 1st known in the list of known blocks, // it's immediate descendent, block_1, is the first one returned. - Action::Write(PeerMessage::BlockResponseBatch(vec![ - block_1.try_into().unwrap(), - block_2_a.try_into().unwrap(), - block_3_a.clone().try_into().unwrap(), - ])), + Action::Write(PeerMessage::BlockResponseBatch(response)), Action::Read(PeerMessage::Bye), ]); @@ -3272,6 +3422,7 @@ mod peer_loop_tests { mod sync_challenges { use super::*; + use crate::tests::shared::fake_valid_sequence_of_blocks_for_tests_dyn; #[traced_test] #[tokio::test] @@ -3411,7 +3562,7 @@ mod peer_loop_tests { // criterion. Alice issues a challenge. Bob responds. Alice enters into // sync mode. - let mut rng = StdRng::seed_from_u64(5550001); + let mut rng = thread_rng(); let network = Network::Main; let genesis_block: Block = Block::genesis_block(network); @@ -3449,9 +3600,13 @@ mod peer_loop_tests { alice.set_new_tip(block_1.clone()).await?; bob.set_new_tip(block_1.clone()).await?; - let blocks: [Block; 11] = - fake_valid_sequence_of_blocks_for_tests(&block_1, TARGET_BLOCK_INTERVAL, rng.gen()) - .await; + let blocks = fake_valid_sequence_of_blocks_for_tests_dyn( + &block_1, + TARGET_BLOCK_INTERVAL, + rng.gen(), + rng.gen_range(11..20), + ) + .await; for block in &blocks { bob.set_new_tip(block.clone()).await?; } @@ -3507,11 +3662,14 @@ mod peer_loop_tests { .await?; // AddPeerMaxBlockHeight message triggered *after* sync challenge - let expected_message_from_alice_peer_loop = PeerTaskToMain::AddPeerMaxBlockHeight(( - bob_socket_address, - bob_tip.header().height, - bob_tip.header().cumulative_proof_of_work, - )); + let mut expected_anchor_mmra = bob_tip.body().block_mmr_accumulator.clone(); + expected_anchor_mmra.append(bob_tip.hash()); + let expected_message_from_alice_peer_loop = PeerTaskToMain::AddPeerMaxBlockHeight { + peer_address: bob_socket_address, + claimed_height: bob_tip.header().height, + claimed_cumulative_pow: bob_tip.header().cumulative_proof_of_work, + claimed_block_mmra: expected_anchor_mmra, + }; let observed_message_from_alice_peer_loop = alice_peer_to_main_rx.recv().await.unwrap(); assert_eq!( expected_message_from_alice_peer_loop, diff --git a/src/rpc_server.rs b/src/rpc_server.rs index a984ae0f..8f3303b4 100644 --- a/src/rpc_server.rs +++ b/src/rpc_server.rs @@ -1426,7 +1426,7 @@ impl RPC for NeptuneRPCServer { log_slow_scope!(fn_name!() + "::get_wallet_status_for_tip()"); state.get_wallet_status_for_tip().await }; - let syncing = state.net.syncing; + let syncing = state.net.sync_anchor.is_some(); let mempool_size = { log_slow_scope!(fn_name!() + "::mempool.get_size()"); state.mempool.get_size() diff --git a/src/tests/shared.rs b/src/tests/shared.rs index 774e3ef2..782f7906 100644 --- a/src/tests/shared.rs +++ b/src/tests/shared.rs @@ -202,14 +202,13 @@ pub(crate) async fn mock_genesis_global_state( ) -> GlobalStateLock { let (archival_state, peer_db, _data_dir) = mock_genesis_archival_state(network).await; - let syncing = false; let mut peer_map: HashMap = get_peer_map(); for i in 0..peer_count { let peer_address = std::net::SocketAddr::from_str(&format!("123.123.123.{}:8080", i)).unwrap(); peer_map.insert(peer_address, get_dummy_peer(peer_address)); } - let networking_state = NetworkingState::new(peer_map, peer_db, syncing); + let networking_state = NetworkingState::new(peer_map, peer_db); let genesis_block = archival_state.get_tip().await; // Sanity check @@ -1004,15 +1003,34 @@ pub(crate) async fn fake_valid_block_for_tests( /// /// Sequence is N-long. Every block i with i > 0 has block i-1 as its /// predecessor; block 0 has the `predecessor` argument as predecessor. Every -/// block is valid in terms of both `is_valid` and `has_proof_of_work`. +/// block is valid in terms of both `is_valid` and `has_proof_of_work`. But +/// the STARK proofs are mocked. pub(crate) async fn fake_valid_sequence_of_blocks_for_tests( - mut predecessor: &Block, + predecessor: &Block, block_interval: Timestamp, seed: [u8; 32], ) -> [Block; N] { + fake_valid_sequence_of_blocks_for_tests_dyn(predecessor, block_interval, seed, N) + .await + .try_into() + .unwrap() +} + +/// Create a deterministic sequence of valid blocks. +/// +/// Sequence is N-long. Every block i with i > 0 has block i-1 as its +/// predecessor; block 0 has the `predecessor` argument as predecessor. Every +/// block is valid in terms of both `is_valid` and `has_proof_of_work`. But +/// the STARK proofs are mocked. +pub(crate) async fn fake_valid_sequence_of_blocks_for_tests_dyn( + mut predecessor: &Block, + block_interval: Timestamp, + seed: [u8; 32], + n: usize, +) -> Vec { let mut blocks = vec![]; let mut rng: StdRng = SeedableRng::from_seed(seed); - for _ in 0..N { + for _ in 0..n { let block = fake_valid_successor_for_tests( predecessor, predecessor.header().timestamp + block_interval, @@ -1022,7 +1040,7 @@ pub(crate) async fn fake_valid_sequence_of_blocks_for_tests( blocks.push(block); predecessor = blocks.last().unwrap(); } - blocks.try_into().unwrap() + blocks } pub(crate) async fn wallet_state_has_all_valid_mps( From ebcd3ab0053b8af8c2719d875279448e98ca9090 Mon Sep 17 00:00:00 2001 From: sword_smith Date: Wed, 22 Jan 2025 16:57:25 +0100 Subject: [PATCH 09/11] feat(`main_loop`): Tolerate arbitrarily deep reorganizations Attach a new field `champion` to the `SyncAnchor` living on `NetworkingState`, which records the block height and digest of the block with largest height on the fork specified by the anchor. When compiling a block batch request, use the digest given by this champion. If the champion is not set, select block digests based on a sparse collection of heights skewed towards own tip height. Also append the genesis block digest for good measure. Sending the champion digest, if it exists, in a request for a batch of blocks ensures that these requests can work their way up arbitrarily long forks. Supplying the genesis block hash, means that the fork can go all the way back to genesis. In the common case where forks are shallow, the distribution of heights close to own tip means that there is little reconciliation work. Co-authored-by: Alan Szepieniec --- src/main_loop.rs | 169 +++++++++++++++++++++------ src/models/peer.rs | 5 + src/models/state/mod.rs | 2 +- src/models/state/networking_state.rs | 24 ++++ src/peer_loop.rs | 49 ++++++-- 5 files changed, 199 insertions(+), 50 deletions(-) diff --git a/src/main_loop.rs b/src/main_loop.rs index ee62fb35..2d933ab9 100644 --- a/src/main_loop.rs +++ b/src/main_loop.rs @@ -69,7 +69,7 @@ const TRANSACTION_UPGRADE_CHECK_INTERVAL_IN_SECONDS: u64 = 60; // 1 minute const SANCTION_PEER_TIMEOUT_FACTOR: u64 = 40; const POTENTIAL_PEER_MAX_COUNT_AS_A_FACTOR_OF_MAX_PEERS: usize = 20; -const STANDARD_BATCH_BLOCK_LOOKBEHIND_SIZE: usize = 100; +pub(crate) const MAX_NUM_DIGESTS_IN_BATCH_REQUEST: usize = 200; const TX_UPDATER_CHANNEL_CAPACITY: usize = 1; /// Wraps a transmission channel. @@ -145,19 +145,13 @@ impl MutableMainLoopState { } /// handles batch-downloading of blocks if we are more than n blocks behind +#[derive(Default, Debug)] struct SyncState { peer_sync_states: HashMap, last_sync_request: Option<(SystemTime, BlockHeight, SocketAddr)>, } impl SyncState { - fn default() -> Self { - Self { - peer_sync_states: HashMap::new(), - last_sync_request: None, - } - } - fn record_request( &mut self, requested_block_height: BlockHeight, @@ -177,9 +171,11 @@ impl SyncState { .collect() } - /// Determine if a peer should be sanctioned for failing to respond to a synchronization - /// request. Also determine if a new request should be made or the previous one should be - /// allowed to run for longer. + /// Determine if a peer should be sanctioned for failing to respond to a + /// synchronization request fast enough. Also determine if a new request + /// should be made or the previous one should be allowed to run for longer. + /// + /// Returns (peer to be sanctioned, attempt new request). fn get_status_of_last_request( &self, current_block_height: BlockHeight, @@ -360,7 +356,7 @@ fn stay_in_sync_mode( .values() .max_by_key(|x| x.claimed_max_pow); match max_claimed_pow { - None => false, // we lost all connections. Can't sync. + None => false, // No peer have passed the sync challenge phase. // Synchronization is left when the remaining number of block is half of what has // been indicated to fit into RAM @@ -593,18 +589,34 @@ impl MainLoopHandler { // The peer tasks also check this condition, if block is more canonical than current // tip, but we have to check it again since the block update might have already been applied // through a message from another peer (or from own miner). - // TODO: Is this check right? We might still want to store the blocks even though - // they are not more canonical than what we currently have, in the case of deep reorganizations - // that is. This check fails to correctly resolve deep reorganizations. Should that be fixed, - // or should deep reorganizations simply be fixed by clearing the database? let mut global_state_mut = self.global_state_lock.lock_guard_mut().await; + let new_canonical = + global_state_mut.incoming_block_is_more_canonical(&last_block); + + if !new_canonical { + // The blocks are not canonical, but: if we are in sync + // mode and these blocks beat our current champion, then + // we store them anyway, without marking them as tip. + let Some(sync_anchor) = global_state_mut.net.sync_anchor.as_mut() else { + warn!( + "Blocks were not new, and we're not syncing. Not storing blocks." + ); + return Ok(()); + }; + if sync_anchor + .champion + .is_some_and(|(height, _)| height >= last_block.header().height) + { + warn!("Repeated blocks received in sync mode, not storing"); + return Ok(()); + } + + sync_anchor.catch_up(last_block.header().height, last_block.hash()); - if !global_state_mut.incoming_block_is_more_canonical(&last_block) { - warn!("Blocks were not new. Not storing blocks."); + for block in blocks { + global_state_mut.store_block_not_tip(block).await?; + } - // TODO: Consider fixing deep reorganization problem described above. - // Alternatively set the `sync_mode_threshold` value higher - // if this problem is encountered. return Ok(()); } @@ -704,6 +716,7 @@ impl MainLoopHandler { global_state_mut.net.sync_anchor = Some(SyncAnchor { cumulative_proof_of_work: claimed_cumulative_pow, block_mmr: claimed_block_mmra, + champion: None, }); self.main_to_miner_tx.send(MainToMiner::StartSyncing); } @@ -1029,6 +1042,41 @@ impl MainLoopHandler { Ok(()) } + /// Return a list of block heights for a block-batch request. + /// + /// Returns an ordered list of the heights of *most preferred block* + /// to build on, where current tip is always the most preferred block. + /// + /// Uses a factor to ensure that the peer will always have something to + /// build on top of by providing potential starting points all the way + /// back to genesis. + fn batch_request_uca_candidate_heights(own_tip_height: BlockHeight) -> Vec { + let mut look_behind = 0; + let mut ret = vec![]; + + // A factor of 1.07 can look back ~1m blocks in 200 digests. + const FACTOR: f64 = 1.07f64; + while ret.len() < MAX_NUM_DIGESTS_IN_BATCH_REQUEST - 1 { + let height = match own_tip_height.checked_sub(look_behind) { + None => break, + Some(height) => { + if height.is_genesis() { + break; + } else { + height + } + } + }; + + ret.push(height); + look_behind = ((look_behind as f64 + 1.0) * FACTOR).floor() as u64; + } + + ret.push(BlockHeight::genesis()); + + ret + } + /// Logic for requesting the batch-download of blocks from peers /// /// Locking: @@ -1044,7 +1092,7 @@ impl MainLoopHandler { info!("Running sync"); // Check when latest batch of blocks was requested - let (current_block_hash, current_block_height, current_block_proof_of_work_family) = ( + let (own_tip_hash, own_tip_height, own_cumulative_pow) = ( global_state.chain.light_state().hash(), global_state.chain.light_state().kernel.header.height, global_state @@ -1057,7 +1105,7 @@ impl MainLoopHandler { let (peer_to_sanction, try_new_request): (Option, bool) = main_loop_state .sync_state - .get_status_of_last_request(current_block_height, self.now()); + .get_status_of_last_request(own_tip_height, self.now()); // Sanction peer if they failed to respond if let Some(peer) = peer_to_sanction { @@ -1076,7 +1124,7 @@ impl MainLoopHandler { // Pick a random peer that has reported to have relevant blocks let candidate_peers = main_loop_state .sync_state - .get_potential_peers_for_sync_request(current_block_proof_of_work_family); + .get_potential_peers_for_sync_request(own_cumulative_pow); let mut rng = thread_rng(); let chosen_peer = candidate_peers.choose(&mut rng); assert!( @@ -1084,36 +1132,44 @@ impl MainLoopHandler { "A synchronization candidate must be available for a request. Otherwise the data structure is in an invalid state and syncing should not be active" ); - // Find the blocks to request - let tip_digest = current_block_hash; - let most_canonical_digests = global_state - .chain - .archival_state() - .get_ancestor_block_digests(tip_digest, STANDARD_BATCH_BLOCK_LOOKBEHIND_SIZE) - .await; - - // List of digests, ordered after which block we would like to find descendents from, - // from highest to lowest. - let most_canonical_digests = [vec![tip_digest], most_canonical_digests].concat(); + let ordered_preferred_block_digests = match anchor.champion { + Some((_height, digest)) => vec![digest], + None => { + // Find candidate-UCA digests based on a sparse distribution of + // block heights skewed towards own tip height + let request_heights = Self::batch_request_uca_candidate_heights(own_tip_height); + let mut ordered_preferred_block_digests = vec![]; + for height in request_heights { + let digest = global_state + .chain + .archival_state() + .archival_block_mmr + .get_leaf_async(height.into()) + .await; + ordered_preferred_block_digests.push(digest); + } + ordered_preferred_block_digests + } + }; // Send message to the relevant peer loop to request the blocks let chosen_peer = chosen_peer.unwrap(); info!( "Sending block batch request to {}\nrequesting blocks descending from {}\n height {}", - chosen_peer, current_block_hash, current_block_height + chosen_peer, own_tip_hash, own_tip_height ); self.main_to_peer_broadcast_tx .send(MainToPeerTask::RequestBlockBatch( MainToPeerTaskBatchBlockRequest { peer_addr_target: *chosen_peer, - known_blocks: most_canonical_digests, + known_blocks: ordered_preferred_block_digests, anchor_mmr: anchor.block_mmr.clone(), }, )) .expect("Sending message to peers must succeed"); // Record that this request was sent to the peer - let requested_block_height = current_block_height.next(); + let requested_block_height = own_tip_height.next(); main_loop_state .sync_state .record_request(requested_block_height, *chosen_peer, self.now()); @@ -1691,6 +1747,43 @@ mod test { } } + mod sync_mode { + use test_strategy::proptest; + + use super::*; + + #[proptest] + fn batch_request_heights_prop(#[strategy(0u64..100_000_000_000)] own_height: u64) { + batch_request_heights_sanity(own_height); + } + + #[test] + fn batch_request_heights_unit() { + let own_height = 1_000_000u64; + batch_request_heights_sanity(own_height); + } + + fn batch_request_heights_sanity(own_height: u64) { + let heights = MainLoopHandler::batch_request_uca_candidate_heights(own_height.into()); + + let mut heights_rev = heights.clone(); + heights_rev.reverse(); + assert!( + heights_rev.is_sorted(), + "Heights must be sorted from high-to-low" + ); + + heights_rev.dedup(); + assert_eq!(heights_rev.len(), heights.len(), "duplicates"); + + assert_eq!(heights[0], own_height.into(), "starts with own tip height"); + assert!( + heights.last().unwrap().is_genesis(), + "ends with genesis block" + ); + } + } + mod proof_upgrader { use super::*; use crate::job_queue::triton_vm::TritonVmJobQueue; diff --git a/src/models/peer.rs b/src/models/peer.rs index 7ebc279a..481b645b 100644 --- a/src/models/peer.rs +++ b/src/models/peer.rs @@ -156,6 +156,7 @@ pub enum NegativePeerSanction { BatchBlocksInvalidStartHeight, BatchBlocksUnknownRequest, BatchBlocksRequestEmpty, + BatchBlocksRequestTooManyDigests, InvalidTransaction, UnconfirmableTransaction, InvalidBlockMmrAuthentication, @@ -226,6 +227,9 @@ impl Display for NegativePeerSanction { NegativePeerSanction::InvalidBlockMmrAuthentication => { "invalid block mmr authentication" } + NegativePeerSanction::BatchBlocksRequestTooManyDigests => { + "too many digests in batch block request" + } }; write!(f, "{string}") } @@ -294,6 +298,7 @@ impl Sanction for NegativePeerSanction { NegativePeerSanction::InvalidTransferBlock => -50, NegativePeerSanction::TimedOutSyncChallengeResponse => -50, NegativePeerSanction::InvalidBlockMmrAuthentication => -4, + NegativePeerSanction::BatchBlocksRequestTooManyDigests => -50, } } } diff --git a/src/models/state/mod.rs b/src/models/state/mod.rs index 54ca251c..bdad007f 100644 --- a/src/models/state/mod.rs +++ b/src/models/state/mod.rs @@ -1356,7 +1356,7 @@ impl GlobalState { /// Store a block to client's state *without* marking this block as a new /// tip. No validation of block happens, as this is the caller's /// responsibility. - async fn store_block_not_tip(&mut self, block: Block) -> Result<()> { + pub(crate) async fn store_block_not_tip(&mut self, block: Block) -> Result<()> { crate::macros::log_scope_duration!(); self.chain diff --git a/src/models/state/networking_state.rs b/src/models/state/networking_state.rs index f5b97366..a690d3ee 100644 --- a/src/models/state/networking_state.rs +++ b/src/models/state/networking_state.rs @@ -4,12 +4,14 @@ use std::net::SocketAddr; use std::time::SystemTime; use anyhow::Result; +use tasm_lib::prelude::Digest; use tasm_lib::twenty_first::util_types::mmr::mmr_accumulator::MmrAccumulator; use crate::config_models::data_directory::DataDirectory; use crate::database::create_db_if_missing; use crate::database::NeptuneLevelDb; use crate::database::WriteBatchAsync; +use crate::models::blockchain::block::block_height::BlockHeight; use crate::models::blockchain::block::difficulty_control::ProofOfWork; use crate::models::database::PeerDatabases; use crate::models::peer; @@ -22,8 +24,30 @@ type PeerMap = HashMap; /// Information about a foreign tip towards which the client is syncing. #[derive(Debug, Clone, Eq, PartialEq)] pub(crate) struct SyncAnchor { + /// Cumulative proof-of-work number of the target fork that we are syncing + /// towards. This number is immutable for each `SyncAnchor`. pub(crate) cumulative_proof_of_work: ProofOfWork, + + /// The block MMR accumulator *after* appending the claimed tip digest. This + /// value is immutable for each `SyncAnchor`. pub(crate) block_mmr: MmrAccumulator, + + /// Indicates the block that we have currently synced to under this anchor. + pub(crate) champion: Option<(BlockHeight, Digest)>, +} + +impl SyncAnchor { + pub(crate) fn catch_up(&mut self, height: BlockHeight, block_hash: Digest) { + let new_champion = Some((height, block_hash)); + match self.champion { + Some((current_height, _)) => { + if current_height < height { + self.champion = new_champion + } + } + None => self.champion = new_champion, + }; + } } /// `NetworkingState` contains in-memory and persisted data for interacting diff --git a/src/peer_loop.rs b/src/peer_loop.rs index 0f48ee0f..5f417054 100644 --- a/src/peer_loop.rs +++ b/src/peer_loop.rs @@ -29,6 +29,7 @@ use tracing::warn; use crate::connect_to_peers::close_peer_connected_callback; use crate::macros::fn_name; use crate::macros::log_slow_scope; +use crate::main_loop::MAX_NUM_DIGESTS_IN_BATCH_REQUEST; use crate::models::blockchain::block::block_height::BlockHeight; use crate::models::blockchain::block::Block; use crate::models::blockchain::transaction::Transaction; @@ -263,10 +264,18 @@ impl PeerLoopHandler { /// list is the `i`th block. The parent of element zero in this list is /// `parent_of_first_block`. /// - /// Returns Err when the connection should be closed; returns Ok(None) if - /// some block is invalid or if the last block is not canonical; returns - /// Ok(Some(block_height)) otherwise, referring to the largest block height - /// in the batch. + /// # Return Value + /// - `Err` when the connection should be closed; + /// - `Ok(None)` if some block is invalid + /// - `Ok(None)` if the last block has insufficient cumulative PoW and we + /// are not syncing; + /// - `Ok(None)` if the last block has insufficient height and we are + /// syncing; + /// - `Ok(Some(block_height))` otherwise, referring to the block with the + /// highest height in the batch. + /// + /// A return value of Ok(Some(_)) means that the message was passed on to + /// main loop. /// /// # Locking /// * Acquires `global_state_lock` for write via `self.punish(..)` and @@ -340,16 +349,28 @@ impl PeerLoopHandler { // evaluate the fork choice rule debug!("Checking last block's canonicity ..."); + let last_block = received_blocks.last().unwrap(); let is_canonical = self .global_state_lock .lock_guard() .await - .incoming_block_is_more_canonical(received_blocks.last().unwrap()); - debug!("is canonical? {is_canonical}"); - if !is_canonical { + .incoming_block_is_more_canonical(last_block); + let last_block_height = last_block.header().height; + let sync_mode_active_and_have_new_champion = self + .global_state_lock + .lock_guard() + .await + .net + .sync_anchor + .as_ref() + .is_some_and(|x| { + x.champion + .is_none_or(|(height, _)| height < last_block_height) + }); + if !is_canonical && !sync_mode_active_and_have_new_champion { warn!( "Received {} blocks from peer but incoming blocks are less \ - canonical than current tip.", + canonical than current tip, or current sync-champion.", received_blocks.len() ); return Ok(None); @@ -357,21 +378,20 @@ impl PeerLoopHandler { // Send the new blocks to the main task which handles the state update // and storage to the database. - let new_block_height = received_blocks.last().unwrap().header().height; let number_of_received_blocks = received_blocks.len(); self.to_main_tx .send(PeerTaskToMain::NewBlocks(received_blocks)) .await?; info!( "Updated block info by block from peer. block height {}", - new_block_height + last_block_height ); // Valuable, new, hard-to-produce information. Reward peer. self.reward(PositivePeerSanction::ValidBlocks(number_of_received_blocks)) .await?; - Ok(Some(new_block_height)) + Ok(Some(last_block_height)) } /// Take a single block received from a peer and (attempt to) find a path @@ -896,6 +916,13 @@ impl PeerLoopHandler { self.peer_address ); + if known_blocks.len() > MAX_NUM_DIGESTS_IN_BATCH_REQUEST { + self.punish(NegativePeerSanction::BatchBlocksRequestTooManyDigests) + .await?; + + return Ok(KEEP_CONNECTION_ALIVE); + } + // The last block in the list of the peers known block is the // earliest block, block with lowest height, the peer has // requested. If it does not belong to canonical chain, none of From 5eb8d2fa0320f5bdcdeda140608434490d21f487 Mon Sep 17 00:00:00 2001 From: sword_smith Date: Wed, 22 Jan 2025 19:26:42 +0100 Subject: [PATCH 10/11] feat(main_loop): Add global timeout for sync mode If in sync mode and no new blocks have been received the last 480 seconds, then sync mode is abandoned and all the peers claiming to serve said blocks are punished. --- src/main_loop.rs | 149 +++++++++++++++++++++++++-- src/models/state/networking_state.rs | 24 ++++- 2 files changed, 161 insertions(+), 12 deletions(-) diff --git a/src/main_loop.rs b/src/main_loop.rs index 2d933ab9..83192eee 100644 --- a/src/main_loop.rs +++ b/src/main_loop.rs @@ -68,6 +68,16 @@ const EXPECTED_UTXOS_PRUNE_INTERVAL_IN_SECS: u64 = 19 * 60; // 19 mins const TRANSACTION_UPGRADE_CHECK_INTERVAL_IN_SECONDS: u64 = 60; // 1 minute const SANCTION_PEER_TIMEOUT_FACTOR: u64 = 40; + +/// Number of seconds within which an individual peer is expected to respond +/// to a synchronization request. +const INDIVIDUAL_PEER_SYNCHRONIZATION_TIMEOUT_IN_SECONDS: u64 = + SANCTION_PEER_TIMEOUT_FACTOR * SYNC_REQUEST_INTERVAL_IN_SECONDS; + +/// Number of seconds that a synchronization may run without any progress. +const GLOBAL_SYNCHRONIZATION_TIMEOUT_IN_SECONDS: u64 = + INDIVIDUAL_PEER_SYNCHRONIZATION_TIMEOUT_IN_SECONDS * 4; + const POTENTIAL_PEER_MAX_COUNT_AS_A_FACTOR_OF_MAX_PEERS: usize = 20; pub(crate) const MAX_NUM_DIGESTS_IN_BATCH_REQUEST: usize = 200; const TX_UPDATER_CHANNEL_CAPACITY: usize = 1; @@ -193,9 +203,7 @@ impl SyncState { // The last sync request updated the state (None, true) } else if req_time - + Duration::from_secs( - SANCTION_PEER_TIMEOUT_FACTOR * SYNC_REQUEST_INTERVAL_IN_SECONDS, - ) + + Duration::from_secs(INDIVIDUAL_PEER_SYNCHRONIZATION_TIMEOUT_IN_SECONDS) < now { // The last sync request was not answered, sanction peer @@ -713,11 +721,8 @@ impl MainLoopHandler { "Entering synchronization mode due to peer {} indicating tip height {}; cumulative pow: {:?}", peer_address, claimed_height, claimed_cumulative_pow ); - global_state_mut.net.sync_anchor = Some(SyncAnchor { - cumulative_proof_of_work: claimed_cumulative_pow, - block_mmr: claimed_block_mmra, - champion: None, - }); + global_state_mut.net.sync_anchor = + Some(SyncAnchor::new(claimed_cumulative_pow, claimed_block_mmra)); self.main_to_miner_tx.send(MainToMiner::StartSyncing); } } @@ -1081,7 +1086,7 @@ impl MainLoopHandler { /// /// Locking: /// * acquires `global_state_lock` for read - async fn block_sync(&self, main_loop_state: &mut MutableMainLoopState) -> Result<()> { + async fn block_sync(&mut self, main_loop_state: &mut MutableMainLoopState) -> Result<()> { let global_state = self.global_state_lock.lock_guard().await; // Check if we are in sync mode @@ -1091,7 +1096,6 @@ impl MainLoopHandler { info!("Running sync"); - // Check when latest batch of blocks was requested let (own_tip_hash, own_tip_height, own_cumulative_pow) = ( global_state.chain.light_state().hash(), global_state.chain.light_state().kernel.header.height, @@ -1103,6 +1107,35 @@ impl MainLoopHandler { .cumulative_proof_of_work, ); + // Check if sync mode has timed out entirely, in which case it should + // be abandoned. + let anchor = anchor.to_owned(); + if self.now().duration_since(anchor.updated)?.as_secs() + > GLOBAL_SYNCHRONIZATION_TIMEOUT_IN_SECONDS + { + warn!("Sync mode has timed out. Abandoning sync mode."); + + // Abandon attempt, and punish all peers claiming to serve these + // blocks. + drop(global_state); + self.global_state_lock + .lock_guard_mut() + .await + .net + .sync_anchor = None; + + let peers_to_punish = main_loop_state + .sync_state + .get_potential_peers_for_sync_request(own_cumulative_pow); + + for peer in peers_to_punish { + self.main_to_peer_broadcast_tx + .send(MainToPeerTask::PeerSynchronizationTimeout(peer))?; + } + + return Ok(()); + } + let (peer_to_sanction, try_new_request): (Option, bool) = main_loop_state .sync_state .get_status_of_last_request(own_tip_height, self.now()); @@ -1748,9 +1781,11 @@ mod test { } mod sync_mode { + use tasm_lib::twenty_first::util_types::mmr::mmr_accumulator::MmrAccumulator; use test_strategy::proptest; use super::*; + use crate::tests::shared::get_dummy_socket_address; #[proptest] fn batch_request_heights_prop(#[strategy(0u64..100_000_000_000)] own_height: u64) { @@ -1782,6 +1817,100 @@ mod test { "ends with genesis block" ); } + + #[tokio::test] + #[traced_test] + async fn sync_mode_abandoned_on_global_timeout() { + let test_setup = setup(0).await; + let TestSetup { + task_join_handles, + mut main_loop_handler, + .. + } = test_setup; + + let mut mutable_main_loop_state = MutableMainLoopState::new(task_join_handles); + + main_loop_handler + .block_sync(&mut mutable_main_loop_state) + .await + .expect("Must return OK when no sync mode is set"); + + // Mock that we are in a valid sync state + let claimed_max_height = 1_000u64.into(); + let claimed_max_pow = ProofOfWork::new([100; 6]); + main_loop_handler + .global_state_lock + .lock_guard_mut() + .await + .net + .sync_anchor = Some(SyncAnchor::new( + claimed_max_pow, + MmrAccumulator::new_from_leafs(vec![]), + )); + mutable_main_loop_state.sync_state.peer_sync_states.insert( + get_dummy_socket_address(0), + PeerSynchronizationState::new(claimed_max_height, claimed_max_pow), + ); + + let sync_start_time = main_loop_handler + .global_state_lock + .lock_guard() + .await + .net + .sync_anchor + .as_ref() + .unwrap() + .updated; + main_loop_handler + .block_sync(&mut mutable_main_loop_state) + .await + .expect("Must return OK when sync mode has not timed out yet"); + assert!( + main_loop_handler + .global_state_lock + .lock_guard() + .await + .net + .sync_anchor + .is_some(), + "Sync mode must still be set before timeout has occurred" + ); + + assert_eq!( + sync_start_time, + main_loop_handler + .global_state_lock + .lock_guard() + .await + .net + .sync_anchor + .as_ref() + .unwrap() + .updated, + "timestamp may not be updated without state change" + ); + + // Mock that sync-mode has timed out + main_loop_handler = main_loop_handler.with_mocked_time( + SystemTime::now() + + Duration::from_secs(GLOBAL_SYNCHRONIZATION_TIMEOUT_IN_SECONDS + 1), + ); + + main_loop_handler + .block_sync(&mut mutable_main_loop_state) + .await + .expect("Must return OK when sync mode has timed out"); + assert!( + main_loop_handler + .global_state_lock + .lock_guard() + .await + .net + .sync_anchor + .is_none(), + "Sync mode must be unset on timeout" + ); + } } mod proof_upgrader { diff --git a/src/models/state/networking_state.rs b/src/models/state/networking_state.rs index a690d3ee..ed0d7170 100644 --- a/src/models/state/networking_state.rs +++ b/src/models/state/networking_state.rs @@ -34,18 +34,38 @@ pub(crate) struct SyncAnchor { /// Indicates the block that we have currently synced to under this anchor. pub(crate) champion: Option<(BlockHeight, Digest)>, + + /// The last time this anchor was either created or updated. + pub(crate) updated: SystemTime, } impl SyncAnchor { + pub(crate) fn new( + claimed_cumulative_pow: ProofOfWork, + claimed_block_mmra: MmrAccumulator, + ) -> Self { + Self { + cumulative_proof_of_work: claimed_cumulative_pow, + block_mmr: claimed_block_mmra, + champion: None, + updated: SystemTime::now(), + } + } + pub(crate) fn catch_up(&mut self, height: BlockHeight, block_hash: Digest) { let new_champion = Some((height, block_hash)); + let updated = SystemTime::now(); match self.champion { Some((current_height, _)) => { if current_height < height { - self.champion = new_champion + self.champion = new_champion; + self.updated = updated; } } - None => self.champion = new_champion, + None => { + self.champion = new_champion; + self.updated = updated; + } }; } } From 0a083d89abd272dff4750fc04298c13fdd002030 Mon Sep 17 00:00:00 2001 From: sword_smith Date: Wed, 22 Jan 2025 19:34:22 +0100 Subject: [PATCH 11/11] chore: make linter happy --- src/mine_loop.rs | 4 ++-- .../blockchain/block/validity/block_primitive_witness.rs | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/src/mine_loop.rs b/src/mine_loop.rs index e862d43a..3f69d757 100644 --- a/src/mine_loop.rs +++ b/src/mine_loop.rs @@ -1579,7 +1579,7 @@ pub(crate) mod mine_loop_tests { guess_worker( block, - prev_block.header().clone(), + *prev_block.header(), worker_task_tx, composer_utxos, sleepy_guessing, @@ -1776,7 +1776,7 @@ pub(crate) mod mine_loop_tests { let mut rng = thread_rng(); let mut counter = 0; let mut successor_block = Block::new( - successor_header.clone(), + successor_header, successor_body.clone(), appendix, BlockProof::Invalid, diff --git a/src/models/blockchain/block/validity/block_primitive_witness.rs b/src/models/blockchain/block/validity/block_primitive_witness.rs index 1d7292dd..35808681 100644 --- a/src/models/blockchain/block/validity/block_primitive_witness.rs +++ b/src/models/blockchain/block/validity/block_primitive_witness.rs @@ -297,7 +297,7 @@ pub(crate) mod test { (parent_header, parent_body, parent_appendix).prop_flat_map( move |(header, body, appendix)| { let parent_kernel = BlockKernel { - header: header.clone(), + header, body: body.clone(), appendix: appendix.clone(), };