-
Notifications
You must be signed in to change notification settings - Fork 376
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Utility for syncing a set of chain listeners
Add a utility for syncing a set of chain listeners to a common chain tip. Required to use before creating an SpvClient when the chain listener used with the client is actually a set of listeners each of which may have had left off at a different block. This would occur when the listeners had been persisted individually at different frequencies (e.g., a ChainMonitor's individual ChannelMonitors).
- Loading branch information
Showing
2 changed files
with
359 additions
and
55 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,276 @@ | ||
use crate::{BlockSource, BlockSourceResult, Cache, ChainListener, ChainNotifier}; | ||
use crate::poll::{ChainPoller, Validate, ValidatedBlockHeader}; | ||
|
||
use bitcoin::blockdata::block::{Block, BlockHeader}; | ||
use bitcoin::hash_types::BlockHash; | ||
use bitcoin::network::constants::Network; | ||
|
||
/// Performs a one-time sync of chain listeners using a single *trusted* block source, bringing each | ||
/// listener's view of the chain from its paired block hash to `block_source`'s best chain tip. | ||
/// | ||
/// Upon success, the returned header can be used to initialize [`SpvClient`]. In the case of | ||
/// failure, each listener may be left at a different block hash than the one it was originally | ||
/// paired with. | ||
/// | ||
/// Useful during startup to bring the [`ChannelManager`] and each [`ChannelMonitor`] in sync before | ||
/// switching to [`SpvClient`]. | ||
/// | ||
/// [`SpvClient`]: ../struct.SpvClient.html | ||
/// [`ChannelManager`]: ../../lightning/ln/channelmanager/struct.ChannelManager.html | ||
/// [`ChannelMonitor`]: ../../lightning/chain/channelmonitor/struct.ChannelMonitor.html | ||
pub async fn sync_listeners<B: BlockSource, C: Cache>( | ||
block_source: &mut B, | ||
network: Network, | ||
header_cache: &mut C, | ||
mut chain_listeners: Vec<(BlockHash, &mut dyn ChainListener)>, | ||
) -> BlockSourceResult<ValidatedBlockHeader> { | ||
let (best_block_hash, best_block_height) = block_source.get_best_block().await?; | ||
let new_header = block_source | ||
.get_header(&best_block_hash, best_block_height).await? | ||
.validate(best_block_hash)?; | ||
|
||
// Find differences and disconnect blocks for each listener individually. | ||
let mut chain_listeners_at_height = Vec::new(); | ||
let mut most_common_ancestor = None; | ||
let mut most_connected_blocks = Vec::new(); | ||
for (old_block, chain_listener) in chain_listeners.drain(..) { | ||
let old_header = match header_cache.look_up(&old_block) { | ||
Some(header) => *header, | ||
None => block_source | ||
.get_header(&old_block, None).await? | ||
.validate(old_block)? | ||
}; | ||
|
||
// Disconnect any stale blocks, but keep them in the cache for the next iteration. | ||
let header_cache = &mut NonDiscardingCache(header_cache); | ||
let mut chain_notifier = ChainNotifier { header_cache }; | ||
let mut chain_poller = ChainPoller::new(block_source as &mut dyn BlockSource, network); | ||
let difference = | ||
chain_notifier.find_difference(new_header, &old_header, &mut chain_poller).await?; | ||
chain_notifier.disconnect_blocks( | ||
difference.disconnected_blocks, | ||
&mut DynamicChainListener(chain_listener), | ||
); | ||
|
||
// Keep track of the most common ancestor and all blocks connected across all listeners. | ||
chain_listeners_at_height.push((difference.common_ancestor.height, chain_listener)); | ||
if difference.connected_blocks.len() > most_connected_blocks.len() { | ||
most_common_ancestor = Some(difference.common_ancestor); | ||
most_connected_blocks = difference.connected_blocks; | ||
} | ||
} | ||
|
||
// Connect new blocks for all listeners at once to avoid re-fetching blocks. | ||
if let Some(common_ancestor) = most_common_ancestor { | ||
let mut chain_notifier = ChainNotifier { header_cache }; | ||
let mut chain_poller = ChainPoller::new(block_source as &mut dyn BlockSource, network); | ||
let mut chain_listener = ChainListenerSet(chain_listeners_at_height); | ||
chain_notifier.connect_blocks( | ||
common_ancestor, | ||
most_connected_blocks, | ||
&mut chain_poller, | ||
&mut chain_listener, | ||
).await.or_else(|(e, _)| Err(e))?; | ||
} | ||
|
||
Ok(new_header) | ||
} | ||
|
||
/// A cache that won't discard any block headers. Used to prevent losing headers that are needed to | ||
/// disconnect blocks common to more than one listener. | ||
struct NonDiscardingCache<'a, C: Cache>(&'a mut C); | ||
|
||
impl<'a, C: Cache> Cache for NonDiscardingCache<'a, C> { | ||
fn look_up(&self, block_hash: &BlockHash) -> Option<&ValidatedBlockHeader> { | ||
self.0.look_up(block_hash) | ||
} | ||
|
||
fn block_connected(&mut self, _block_hash: BlockHash, _block_header: ValidatedBlockHeader) { | ||
unreachable!() | ||
} | ||
|
||
fn block_disconnected(&mut self, _block_hash: &BlockHash) -> Option<ValidatedBlockHeader> { | ||
None | ||
} | ||
} | ||
|
||
/// Wrapper for supporting dynamically sized chain listeners. | ||
struct DynamicChainListener<'a>(&'a mut dyn ChainListener); | ||
|
||
impl<'a> ChainListener for DynamicChainListener<'a> { | ||
fn block_connected(&mut self, _block: &Block, _height: u32) { | ||
unreachable!() | ||
} | ||
|
||
fn block_disconnected(&mut self, header: &BlockHeader, height: u32) { | ||
self.0.block_disconnected(header, height) | ||
} | ||
} | ||
|
||
/// A set of dynamically sized chain listeners, each paired with a starting block height. | ||
struct ChainListenerSet<'a>(Vec<(u32, &'a mut dyn ChainListener)>); | ||
|
||
impl<'a> ChainListener for ChainListenerSet<'a> { | ||
fn block_connected(&mut self, block: &Block, height: u32) { | ||
for (starting_height, chain_listener) in self.0.iter_mut() { | ||
if height > *starting_height { | ||
chain_listener.block_connected(block, height); | ||
} | ||
} | ||
} | ||
|
||
fn block_disconnected(&mut self, _header: &BlockHeader, _height: u32) { | ||
unreachable!() | ||
} | ||
} | ||
|
||
#[cfg(test)] | ||
mod tests { | ||
use crate::test_utils::{Blockchain, MockChainListener}; | ||
use super::*; | ||
|
||
use bitcoin::network::constants::Network; | ||
|
||
#[tokio::test] | ||
async fn sync_from_same_chain() { | ||
let mut chain = Blockchain::default().with_height(4); | ||
let new_tip = chain.tip(); | ||
let old_tip_1 = chain.at_height(1); | ||
let old_tip_2 = chain.at_height(2); | ||
let old_tip_3 = chain.at_height(3); | ||
|
||
let mut listener_1 = MockChainListener::new() | ||
.expect_block_connected(*old_tip_2) | ||
.expect_block_connected(*old_tip_3) | ||
.expect_block_connected(*new_tip); | ||
let mut listener_2 = MockChainListener::new() | ||
.expect_block_connected(*old_tip_3) | ||
.expect_block_connected(*new_tip); | ||
let mut listener_3 = MockChainListener::new() | ||
.expect_block_connected(*new_tip); | ||
|
||
let listeners = vec![ | ||
(old_tip_1.block_hash, &mut listener_1 as &mut dyn ChainListener), | ||
(old_tip_2.block_hash, &mut listener_2 as &mut dyn ChainListener), | ||
(old_tip_3.block_hash, &mut listener_3 as &mut dyn ChainListener), | ||
]; | ||
let mut cache = chain.header_cache(0..=4); | ||
match sync_listeners(&mut chain, Network::Bitcoin, &mut cache, listeners).await { | ||
Ok(header) => assert_eq!(header, new_tip), | ||
Err(e) => panic!("Unexpected error: {:?}", e), | ||
} | ||
} | ||
|
||
#[tokio::test] | ||
async fn sync_from_different_chains() { | ||
let mut main_chain = Blockchain::default().with_height(4); | ||
let fork_chain_1 = main_chain.fork_at_height(1); | ||
let fork_chain_2 = main_chain.fork_at_height(2); | ||
let fork_chain_3 = main_chain.fork_at_height(3); | ||
|
||
let new_tip = main_chain.tip(); | ||
let old_tip_1 = fork_chain_1.tip(); | ||
let old_tip_2 = fork_chain_2.tip(); | ||
let old_tip_3 = fork_chain_3.tip(); | ||
|
||
let mut listener_1 = MockChainListener::new() | ||
.expect_block_disconnected(*fork_chain_1.at_height(4)) | ||
.expect_block_disconnected(*fork_chain_1.at_height(3)) | ||
.expect_block_disconnected(*fork_chain_1.at_height(2)) | ||
.expect_block_connected(*main_chain.at_height(2)) | ||
.expect_block_connected(*main_chain.at_height(3)) | ||
.expect_block_connected(*main_chain.at_height(4)); | ||
let mut listener_2 = MockChainListener::new() | ||
.expect_block_disconnected(*fork_chain_2.at_height(4)) | ||
.expect_block_disconnected(*fork_chain_2.at_height(3)) | ||
.expect_block_connected(*main_chain.at_height(3)) | ||
.expect_block_connected(*main_chain.at_height(4)); | ||
let mut listener_3 = MockChainListener::new() | ||
.expect_block_disconnected(*fork_chain_3.at_height(4)) | ||
.expect_block_connected(*main_chain.at_height(4)); | ||
|
||
let listeners = vec![ | ||
(old_tip_1.block_hash, &mut listener_1 as &mut dyn ChainListener), | ||
(old_tip_2.block_hash, &mut listener_2 as &mut dyn ChainListener), | ||
(old_tip_3.block_hash, &mut listener_3 as &mut dyn ChainListener), | ||
]; | ||
let mut cache = fork_chain_1.header_cache(2..=4); | ||
cache.extend(fork_chain_2.header_cache(3..=4)); | ||
cache.extend(fork_chain_3.header_cache(4..=4)); | ||
match sync_listeners(&mut main_chain, Network::Bitcoin, &mut cache, listeners).await { | ||
Ok(header) => assert_eq!(header, new_tip), | ||
Err(e) => panic!("Unexpected error: {:?}", e), | ||
} | ||
} | ||
|
||
#[tokio::test] | ||
async fn sync_from_overlapping_chains() { | ||
let mut main_chain = Blockchain::default().with_height(4); | ||
let fork_chain_1 = main_chain.fork_at_height(1); | ||
let fork_chain_2 = fork_chain_1.fork_at_height(2); | ||
let fork_chain_3 = fork_chain_2.fork_at_height(3); | ||
|
||
let new_tip = main_chain.tip(); | ||
let old_tip_1 = fork_chain_1.tip(); | ||
let old_tip_2 = fork_chain_2.tip(); | ||
let old_tip_3 = fork_chain_3.tip(); | ||
|
||
let mut listener_1 = MockChainListener::new() | ||
.expect_block_disconnected(*fork_chain_1.at_height(4)) | ||
.expect_block_disconnected(*fork_chain_1.at_height(3)) | ||
.expect_block_disconnected(*fork_chain_1.at_height(2)) | ||
.expect_block_connected(*main_chain.at_height(2)) | ||
.expect_block_connected(*main_chain.at_height(3)) | ||
.expect_block_connected(*main_chain.at_height(4)); | ||
let mut listener_2 = MockChainListener::new() | ||
.expect_block_disconnected(*fork_chain_2.at_height(4)) | ||
.expect_block_disconnected(*fork_chain_2.at_height(3)) | ||
.expect_block_disconnected(*fork_chain_2.at_height(2)) | ||
.expect_block_connected(*main_chain.at_height(2)) | ||
.expect_block_connected(*main_chain.at_height(3)) | ||
.expect_block_connected(*main_chain.at_height(4)); | ||
let mut listener_3 = MockChainListener::new() | ||
.expect_block_disconnected(*fork_chain_3.at_height(4)) | ||
.expect_block_disconnected(*fork_chain_3.at_height(3)) | ||
.expect_block_disconnected(*fork_chain_3.at_height(2)) | ||
.expect_block_connected(*main_chain.at_height(2)) | ||
.expect_block_connected(*main_chain.at_height(3)) | ||
.expect_block_connected(*main_chain.at_height(4)); | ||
|
||
let listeners = vec![ | ||
(old_tip_1.block_hash, &mut listener_1 as &mut dyn ChainListener), | ||
(old_tip_2.block_hash, &mut listener_2 as &mut dyn ChainListener), | ||
(old_tip_3.block_hash, &mut listener_3 as &mut dyn ChainListener), | ||
]; | ||
let mut cache = fork_chain_1.header_cache(2..=4); | ||
cache.extend(fork_chain_2.header_cache(3..=4)); | ||
cache.extend(fork_chain_3.header_cache(4..=4)); | ||
match sync_listeners(&mut main_chain, Network::Bitcoin, &mut cache, listeners).await { | ||
Ok(header) => assert_eq!(header, new_tip), | ||
Err(e) => panic!("Unexpected error: {:?}", e), | ||
} | ||
} | ||
|
||
#[tokio::test] | ||
async fn cache_connected_and_keep_disconnected_blocks() { | ||
let mut main_chain = Blockchain::default().with_height(2); | ||
let fork_chain = main_chain.fork_at_height(1); | ||
|
||
let new_tip = main_chain.tip(); | ||
let old_tip = fork_chain.tip(); | ||
|
||
let mut listener = MockChainListener::new() | ||
.expect_block_disconnected(*fork_chain.at_height(2)) | ||
.expect_block_connected(*main_chain.at_height(2)); | ||
|
||
let listeners = vec![(old_tip.block_hash, &mut listener as &mut dyn ChainListener)]; | ||
let mut cache = fork_chain.header_cache(2..=2); | ||
match sync_listeners(&mut main_chain, Network::Bitcoin, &mut cache, listeners).await { | ||
Ok(_) => { | ||
assert!(cache.contains_key(&new_tip.block_hash)); | ||
assert!(cache.contains_key(&old_tip.block_hash)); | ||
}, | ||
Err(e) => panic!("Unexpected error: {:?}", e), | ||
} | ||
} | ||
} |
Oops, something went wrong.