From 99881bd73eb1375bc250a72a69c8e0bf16767fd2 Mon Sep 17 00:00:00 2001 From: Jeffrey Czyz Date: Thu, 11 Feb 2021 08:57:10 -0800 Subject: [PATCH] WIP: New interface for syncing multiple listeners --- lightning-block-sync/src/init.rs | 243 +++++++++++++++++++++++++ lightning-block-sync/src/lib.rs | 133 ++++++-------- lightning-block-sync/src/test_utils.rs | 3 +- 3 files changed, 304 insertions(+), 75 deletions(-) create mode 100644 lightning-block-sync/src/init.rs diff --git a/lightning-block-sync/src/init.rs b/lightning-block-sync/src/init.rs new file mode 100644 index 00000000000..2c03d0ca9ea --- /dev/null +++ b/lightning-block-sync/src/init.rs @@ -0,0 +1,243 @@ +use crate::{BlockSource, BlockSourceResult, Cache, ChainListener, ChainNotifier}; +use crate::poll::{ChainPoller, Validate, ValidatedBlockHeader}; + +use bitcoin::blockdata::block::{Block, BlockHeader}; +use bitcoin::hash_types::BlockHash; +use bitcoin::network::constants::Network; + +/// Unbounded cache of block headers keyed by block hash. +pub type UnboundedCache = std::collections::HashMap; + +impl Cache for UnboundedCache { + fn look_up(&self, block_hash: &BlockHash) -> Option<&ValidatedBlockHeader> { + self.get(block_hash) + } + + fn block_connected(&mut self, block_hash: BlockHash, block_header: ValidatedBlockHeader) { + self.insert(block_hash, block_header); + } + + fn block_disconnected(&mut self, _block_hash: &BlockHash) -> Option { + None + } +} + +/// Performs a one-time sync of chain listeners from a single *trusted* block source, bringing their +/// view of the latest chain tip to `new_block`. +/// +/// Useful upon startup to bring each `ChannelMonitor` and the `ChannelManager` in sync, before +/// switching to `SpvClient`. +pub async fn sync_listeners( + new_block: BlockHash, + block_source: &mut B, + network: Network, + chain_notifier: &mut ChainNotifier, + mut chain_listeners: Vec<(BlockHash, &mut dyn ChainListener)>, +) -> BlockSourceResult<()> { + let new_header = match chain_notifier.header_cache.look_up(&new_block) { + Some(header) => *header, + None => block_source + .get_header(&new_block, None).await? + .validate(new_block)?, + }; + + // Find differences and disconnect blocks for each listener individually. + let mut chain_listeners_with_common_ancestors = Vec::new(); + let mut most_common_ancestor = None; + let mut most_connected_blocks = Vec::new(); + for (old_block, chain_listener) in chain_listeners.drain(..) { + let old_header = match chain_notifier.header_cache.look_up(&old_block) { + Some(header) => *header, + None => block_source + .get_header(&old_block, None).await? + .validate(old_block)? + }; + + // Disconnect any stale blocks. + let mut chain_poller = ChainPoller::new(block_source as &mut dyn BlockSource, network); + let difference = + chain_notifier.find_difference(new_header, &old_header, &mut chain_poller).await?; + chain_notifier.disconnect_blocks( + difference.disconnected_blocks, + &mut DynamicChainListener(chain_listener), + ); + + // Keep track of the most common ancestor and all blocks connected across all listeners. + chain_listeners_with_common_ancestors.push((difference.common_ancestor, chain_listener)); + if difference.connected_blocks.len() > most_connected_blocks.len() { + most_common_ancestor = Some(difference.common_ancestor); + most_connected_blocks = difference.connected_blocks; + } + } + + // Connect new blocks for all listeners at once to avoid re-fetching blocks. + if let Some(common_ancestor) = most_common_ancestor { + let mut chain_poller = ChainPoller::new(block_source as &mut dyn BlockSource, network); + let mut chain_listener = ChainListenerSet(chain_listeners_with_common_ancestors); + chain_notifier.connect_blocks( + common_ancestor, + most_connected_blocks, + &mut chain_poller, + &mut chain_listener, + ).await.unwrap(); + } + Ok(()) +} + +struct DynamicChainListener<'a>(&'a mut dyn ChainListener); + +impl<'a> ChainListener for DynamicChainListener<'a> { + fn block_connected(&mut self, _block: &Block, _height: u32) { + unreachable!() + } + + fn block_disconnected(&mut self, header: &BlockHeader, height: u32) { + self.0.block_disconnected(header, height) + } +} + +struct ChainListenerSet<'a>(Vec<(ValidatedBlockHeader, &'a mut dyn ChainListener)>); + +impl<'a> ChainListener for ChainListenerSet<'a> { + fn block_connected(&mut self, block: &Block, height: u32) { + for (header, chain_listener) in self.0.iter_mut() { + if height > header.height { + chain_listener.block_connected(block, height); + } + } + } + + fn block_disconnected(&mut self, _header: &BlockHeader, _height: u32) { + unreachable!() + } +} + +#[cfg(test)] +mod tests { + use crate::test_utils::{Blockchain, MockChainListener}; + use super::*; + + use bitcoin::network::constants::Network; + + #[tokio::test] + async fn sync_from_same_chain() { + let mut chain = Blockchain::default().with_height(4); + let new_tip = chain.tip(); + let old_tip_1 = chain.at_height(1); + let old_tip_2 = chain.at_height(2); + let old_tip_3 = chain.at_height(3); + + let mut listener_1 = MockChainListener::new() + .expect_block_connected(*old_tip_2) + .expect_block_connected(*old_tip_3) + .expect_block_connected(*new_tip); + let mut listener_2 = MockChainListener::new() + .expect_block_connected(*old_tip_3) + .expect_block_connected(*new_tip); + let mut listener_3 = MockChainListener::new() + .expect_block_connected(*new_tip); + + let listeners = vec![ + (old_tip_1.block_hash, &mut listener_1 as &mut dyn ChainListener), + (old_tip_2.block_hash, &mut listener_2 as &mut dyn ChainListener), + (old_tip_3.block_hash, &mut listener_3 as &mut dyn ChainListener), + ]; + let mut notifier = ChainNotifier { header_cache: chain.header_cache(0..=4) }; + match sync_listeners(new_tip.block_hash, &mut chain, Network::Bitcoin, &mut notifier, listeners).await { + Ok(()) => {}, + Err(e) => panic!("Unexpected error: {:?}", e), + } + } + + #[tokio::test] + async fn sync_from_different_chains() { + let mut main_chain = Blockchain::default().with_height(4); + let fork_chain_1 = main_chain.fork_at_height(1); + let fork_chain_2 = main_chain.fork_at_height(2); + let fork_chain_3 = main_chain.fork_at_height(3); + + let new_tip = main_chain.tip(); + let old_tip_1 = fork_chain_1.tip(); + let old_tip_2 = fork_chain_2.tip(); + let old_tip_3 = fork_chain_3.tip(); + + let mut listener_1 = MockChainListener::new() + .expect_block_disconnected(*fork_chain_1.at_height(4)) + .expect_block_disconnected(*fork_chain_1.at_height(3)) + .expect_block_disconnected(*fork_chain_1.at_height(2)) + .expect_block_connected(*main_chain.at_height(2)) + .expect_block_connected(*main_chain.at_height(3)) + .expect_block_connected(*main_chain.at_height(4)); + let mut listener_2 = MockChainListener::new() + .expect_block_disconnected(*fork_chain_2.at_height(4)) + .expect_block_disconnected(*fork_chain_2.at_height(3)) + .expect_block_connected(*main_chain.at_height(3)) + .expect_block_connected(*main_chain.at_height(4)); + let mut listener_3 = MockChainListener::new() + .expect_block_disconnected(*fork_chain_3.at_height(4)) + .expect_block_connected(*main_chain.at_height(4)); + + let listeners = vec![ + (old_tip_1.block_hash, &mut listener_1 as &mut dyn ChainListener), + (old_tip_2.block_hash, &mut listener_2 as &mut dyn ChainListener), + (old_tip_3.block_hash, &mut listener_3 as &mut dyn ChainListener), + ]; + let mut header_cache = fork_chain_1.header_cache(2..=4); + header_cache.extend(fork_chain_2.header_cache(3..=4)); + header_cache.extend(fork_chain_3.header_cache(4..=4)); + let mut notifier = ChainNotifier { header_cache }; + match sync_listeners(new_tip.block_hash, &mut main_chain, Network::Bitcoin, &mut notifier, listeners).await { + Ok(()) => {}, + Err(e) => panic!("Unexpected error: {:?}", e), + } + } + + #[tokio::test] + async fn sync_from_overlapping_chains() { + let mut main_chain = Blockchain::default().with_height(4); + let fork_chain_1 = main_chain.fork_at_height(1); + let fork_chain_2 = fork_chain_1.fork_at_height(2); + let fork_chain_3 = fork_chain_2.fork_at_height(3); + + let new_tip = main_chain.tip(); + let old_tip_1 = fork_chain_1.tip(); + let old_tip_2 = fork_chain_2.tip(); + let old_tip_3 = fork_chain_3.tip(); + + let mut listener_1 = MockChainListener::new() + .expect_block_disconnected(*fork_chain_1.at_height(4)) + .expect_block_disconnected(*fork_chain_1.at_height(3)) + .expect_block_disconnected(*fork_chain_1.at_height(2)) + .expect_block_connected(*main_chain.at_height(2)) + .expect_block_connected(*main_chain.at_height(3)) + .expect_block_connected(*main_chain.at_height(4)); + let mut listener_2 = MockChainListener::new() + .expect_block_disconnected(*fork_chain_2.at_height(4)) + .expect_block_disconnected(*fork_chain_2.at_height(3)) + .expect_block_disconnected(*fork_chain_2.at_height(2)) + .expect_block_connected(*main_chain.at_height(2)) + .expect_block_connected(*main_chain.at_height(3)) + .expect_block_connected(*main_chain.at_height(4)); + let mut listener_3 = MockChainListener::new() + .expect_block_disconnected(*fork_chain_3.at_height(4)) + .expect_block_disconnected(*fork_chain_3.at_height(3)) + .expect_block_disconnected(*fork_chain_3.at_height(2)) + .expect_block_connected(*main_chain.at_height(2)) + .expect_block_connected(*main_chain.at_height(3)) + .expect_block_connected(*main_chain.at_height(4)); + + let listeners = vec![ + (old_tip_1.block_hash, &mut listener_1 as &mut dyn ChainListener), + (old_tip_2.block_hash, &mut listener_2 as &mut dyn ChainListener), + (old_tip_3.block_hash, &mut listener_3 as &mut dyn ChainListener), + ]; + let mut header_cache = fork_chain_1.header_cache(2..=4); + header_cache.extend(fork_chain_2.header_cache(3..=4)); + header_cache.extend(fork_chain_3.header_cache(4..=4)); + let mut notifier = ChainNotifier { header_cache }; + match sync_listeners(new_tip.block_hash, &mut main_chain, Network::Bitcoin, &mut notifier, listeners).await { + Ok(()) => {}, + Err(e) => panic!("Unexpected error: {:?}", e), + } + } +} diff --git a/lightning-block-sync/src/lib.rs b/lightning-block-sync/src/lib.rs index f5093f19017..6b60403a24e 100644 --- a/lightning-block-sync/src/lib.rs +++ b/lightning-block-sync/src/lib.rs @@ -19,6 +19,7 @@ #[cfg(any(feature = "rest-client", feature = "rpc-client"))] pub mod http; +pub mod init; pub mod poll; #[cfg(feature = "rest-client")] @@ -36,11 +37,10 @@ mod test_utils; #[cfg(any(feature = "rest-client", feature = "rpc-client"))] mod utils; -use crate::poll::{ChainTip, Poll, Validate, ValidatedBlockHeader}; +use crate::poll::{ChainTip, Poll, ValidatedBlockHeader}; use bitcoin::blockdata::block::{Block, BlockHeader}; use bitcoin::hash_types::BlockHash; -use bitcoin::network::constants::Network; use bitcoin::util::uint::Uint256; use std::future::Future; @@ -169,32 +169,6 @@ pub trait ChainListener { fn block_disconnected(&mut self, header: &BlockHeader, height: u32); } -/// Do a one-time sync of a chain listener from a single *trusted* block source bringing its view -/// of the latest chain tip from old_block to new_block. This is useful on startup when you need -/// to bring each ChannelMonitor, as well as the overall ChannelManager, into sync with each other. -/// -/// Once you have them all at the same block, you should switch to using MicroSPVClient. -pub async fn init_sync_listener( - new_block: BlockHash, - old_block: BlockHash, - block_source: &mut B, - network: Network, - chain_listener: &mut CL, -) { - if &old_block[..] == &[0; 32] { return; } - if old_block == new_block { return; } - - let new_header = block_source - .get_header(&new_block, None).await.unwrap() - .validate(new_block).unwrap(); - let old_header = block_source - .get_header(&old_block, None).await.unwrap() - .validate(old_block).unwrap(); - let mut chain_poller = poll::ChainPoller::new(block_source as &mut dyn BlockSource, network); - let mut chain_notifier = ChainNotifier { header_cache: UnboundedCache::new() }; - chain_notifier.sync_listener(new_header, &old_header, &mut chain_poller, chain_listener).await.unwrap(); -} - /// The `Cache` trait defines behavior for managing a block header cache, where block headers are /// keyed by block hash. /// @@ -220,23 +194,6 @@ pub trait Cache { fn block_disconnected(&mut self, block_hash: &BlockHash) -> Option; } -/// Unbounded cache of block headers keyed by block hash. -pub type UnboundedCache = std::collections::HashMap; - -impl Cache for UnboundedCache { - fn look_up(&self, block_hash: &BlockHash) -> Option<&ValidatedBlockHeader> { - self.get(block_hash) - } - - fn block_connected(&mut self, block_hash: BlockHash, block_header: ValidatedBlockHeader) { - self.insert(block_hash, block_header); - } - - fn block_disconnected(&mut self, block_hash: &BlockHash) -> Option { - self.remove(block_hash) - } -} - impl SpvClient { /// Creates a new SPV client using `chain_tip` as the best known chain tip. /// @@ -301,7 +258,7 @@ impl SpvClient { /// Notifies [listeners] of blocks that have been connected or disconnected from the chain. /// /// [listeners]: trait.ChainListener.html -struct ChainNotifier { +pub struct ChainNotifier { /// Cache for looking up headers before fetching from a block source. header_cache: C, } @@ -312,6 +269,11 @@ struct ChainNotifier { /// Blocks are given in height-descending order. Therefore, blocks are first disconnected in order /// before new blocks are connected in reverse order. struct ChainDifference { + /// The most recent ancestor common between the chain tips. + /// + /// If there are any disconnected blocks, this is where the chain forked. + common_ancestor: ValidatedBlockHeader, + /// Blocks that were disconnected from the chain since the last poll. disconnected_blocks: Vec, @@ -320,8 +282,8 @@ struct ChainDifference { } impl ChainNotifier { - /// Finds the fork point between `new_header` and `old_header`, disconnecting blocks from - /// `old_header` to get to that point and then connecting blocks until `new_header`. + /// Finds the first common ancestor between `new_header` and `old_header`, disconnecting blocks + /// from `old_header` to get to that point and then connecting blocks until `new_header`. /// /// Validates headers along the transition path, but doesn't fetch blocks until the chain is /// disconnected to the fork point. Thus, this may return an `Err` that includes where the tip @@ -334,32 +296,15 @@ impl ChainNotifier { chain_poller: &mut P, chain_listener: &mut L, ) -> Result<(), (BlockSourceError, Option)> { - let mut difference = self.find_difference(new_header, old_header, chain_poller).await + let difference = self.find_difference(new_header, old_header, chain_poller).await .map_err(|e| (e, None))?; - - let mut new_tip = *old_header; - for header in difference.disconnected_blocks.drain(..) { - println!("Disconnecting block {}", header.block_hash); - if let Some(cached_header) = self.header_cache.block_disconnected(&header.block_hash) { - assert_eq!(cached_header, header); - } - chain_listener.block_disconnected(&header.header, header.height); - new_tip = header; - } - - for header in difference.connected_blocks.drain(..).rev() { - let block = chain_poller - .fetch_block(&header).await - .or_else(|e| Err((e, Some(new_tip))))?; - debug_assert_eq!(block.block_hash, header.block_hash); - - println!("Connecting block {}", header.block_hash); - self.header_cache.block_connected(header.block_hash, header); - chain_listener.block_connected(&block, header.height); - new_tip = header; - } - - Ok(()) + self.disconnect_blocks(difference.disconnected_blocks, chain_listener); + self.connect_blocks( + difference.common_ancestor, + difference.connected_blocks, + chain_poller, + chain_listener, + ).await } /// Returns the changes needed to produce the chain with `current_header` as its tip from the @@ -396,7 +341,8 @@ impl ChainNotifier { } } - Ok(ChainDifference { disconnected_blocks, connected_blocks }) + let common_ancestor = current; + Ok(ChainDifference { common_ancestor, disconnected_blocks, connected_blocks }) } /// Returns the previous header for the given header, either by looking it up in the cache or @@ -411,10 +357,49 @@ impl ChainNotifier { None => chain_poller.look_up_previous_header(header).await, } } + + /// Notifies the chain listeners of disconnected blocks. + fn disconnect_blocks( + &mut self, + mut disconnected_blocks: Vec, + chain_listener: &mut L, + ) { + for header in disconnected_blocks.drain(..) { + println!("Disconnecting block {}", header.block_hash); + if let Some(cached_header) = self.header_cache.block_disconnected(&header.block_hash) { + assert_eq!(cached_header, header); + } + chain_listener.block_disconnected(&header.header, header.height); + } + } + + /// Notifies the chain listeners of connected blocks. + async fn connect_blocks( + &mut self, + mut new_tip: ValidatedBlockHeader, + mut connected_blocks: Vec, + chain_poller: &mut P, + chain_listener: &mut L, + ) -> Result<(), (BlockSourceError, Option)> { + for header in connected_blocks.drain(..).rev() { + let block = chain_poller + .fetch_block(&header).await + .or_else(|e| Err((e, Some(new_tip))))?; + debug_assert_eq!(block.block_hash, header.block_hash); + + println!("Connecting block {}", header.block_hash); + self.header_cache.block_connected(header.block_hash, header); + chain_listener.block_connected(&block, header.height); + new_tip = header; + } + + Ok(()) + } } #[cfg(test)] mod spv_client_tests { + use crate::init::UnboundedCache; use crate::test_utils::{Blockchain, NullChainListener}; use super::*; diff --git a/lightning-block-sync/src/test_utils.rs b/lightning-block-sync/src/test_utils.rs index 807a33a69de..425a52b5ba2 100644 --- a/lightning-block-sync/src/test_utils.rs +++ b/lightning-block-sync/src/test_utils.rs @@ -1,4 +1,5 @@ -use crate::{AsyncBlockSourceResult, BlockHeaderData, BlockSource, BlockSourceError, ChainListener, UnboundedCache}; +use crate::{AsyncBlockSourceResult, BlockHeaderData, BlockSource, BlockSourceError, ChainListener}; +use crate::init::UnboundedCache; use crate::poll::{Validate, ValidatedBlockHeader}; use bitcoin::blockdata::block::{Block, BlockHeader};