Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Simplify Esplora::update_local_chain and add tests #1267

Merged
merged 6 commits into from
Jan 31, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
95 changes: 24 additions & 71 deletions crates/esplora/src/async_ext.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,14 @@ use async_trait::async_trait;
use bdk_chain::collections::btree_map;
use bdk_chain::{
bitcoin::{BlockHash, OutPoint, ScriptBuf, Txid},
collections::{BTreeMap, BTreeSet},
collections::BTreeMap,
local_chain::{self, CheckPoint},
BlockId, ConfirmationTimeHeightAnchor, TxGraph,
};
use esplora_client::{Error, TxStatus};
use futures::{stream::FuturesOrdered, TryStreamExt};

use crate::{anchor_from_status, ASSUME_FINAL_DEPTH};
use crate::anchor_from_status;

/// Trait to extend the functionality of [`esplora_client::AsyncClient`].
///
Expand Down Expand Up @@ -85,10 +85,11 @@ impl EsploraAsyncExt for esplora_client::AsyncClient {
local_tip: CheckPoint,
request_heights: impl IntoIterator<IntoIter = impl Iterator<Item = u32> + Send> + Send,
) -> Result<local_chain::Update, Error> {
let request_heights = request_heights.into_iter().collect::<BTreeSet<_>>();
let new_tip_height = self.get_height().await?;

// atomically fetch blocks from esplora
// Atomically fetch latest blocks from Esplora. This way, we avoid creating an update with
// an inconsistent set of blocks (assuming that a reorg depth cannot be greater than the
evanlinjin marked this conversation as resolved.
Show resolved Hide resolved
// latest blocks fetched).
evanlinjin marked this conversation as resolved.
Show resolved Hide resolved
let mut fetched_blocks = {
let heights = (0..=new_tip_height).rev();
let hashes = self
Expand All @@ -99,89 +100,41 @@ impl EsploraAsyncExt for esplora_client::AsyncClient {
heights.zip(hashes).collect::<BTreeMap<u32, BlockHash>>()
};

// fetch heights that the caller is interested in
// fetch blocks of heights that the caller is interested in, reusing latest blocks that are
evanlinjin marked this conversation as resolved.
Show resolved Hide resolved
// already fetched.
for height in request_heights {
// do not fetch blocks higher than remote tip
if height > new_tip_height {
continue;
}
// only fetch what is missing
if let btree_map::Entry::Vacant(entry) = fetched_blocks.entry(height) {
let hash = self.get_block_hash(height).await?;
entry.insert(hash);
entry.insert(self.get_block_hash(height).await?);
}
}

// find the earliest point of agreement between local chain and fetched chain
let earliest_agreement_cp = {
let mut earliest_agreement_cp = Option::<CheckPoint>::None;

let local_tip_height = local_tip.height();
for local_cp in local_tip.iter() {
let local_block = local_cp.block_id();

// the updated hash (block hash at this height after the update), can either be:
// 1. a block that already existed in `fetched_blocks`
// 2. a block that exists locally and at least has a depth of ASSUME_FINAL_DEPTH
// 3. otherwise we can freshly fetch the block from remote, which is safe as it
// is guaranteed that this would be at or below ASSUME_FINAL_DEPTH from the
// remote tip
let updated_hash = match fetched_blocks.entry(local_block.height) {
btree_map::Entry::Occupied(entry) => *entry.get(),
btree_map::Entry::Vacant(entry) => *entry.insert(
if local_tip_height - local_block.height >= ASSUME_FINAL_DEPTH {
local_block.hash
} else {
self.get_block_hash(local_block.height).await?
},
),
};

// since we may introduce blocks below the point of agreement, we cannot break
// here unconditionally - we only break if we guarantee there are no new heights
// below our current local checkpoint
if local_block.hash == updated_hash {
earliest_agreement_cp = Some(local_cp);

let first_new_height = *fetched_blocks
.keys()
.next()
.expect("must have at least one new block");
if first_new_height >= local_block.height {
break;
}
}
// Ensure `fetched_blocks` can create an update that connects with the original chain.
evanlinjin marked this conversation as resolved.
Show resolved Hide resolved
for (height, local_hash) in local_tip.iter().map(|cp| (cp.height(), cp.hash())) {
evanlinjin marked this conversation as resolved.
Show resolved Hide resolved
if height > new_tip_height {
continue;
}

earliest_agreement_cp
};

let tip = {
// first checkpoint to use for the update chain
let first_cp = match earliest_agreement_cp {
Some(cp) => cp,
None => {
let (&height, &hash) = fetched_blocks
.iter()
.next()
.expect("must have at least one new block");
CheckPoint::new(BlockId { height, hash })
let fetched_hash = match fetched_blocks.entry(height) {
btree_map::Entry::Occupied(entry) => *entry.get(),
btree_map::Entry::Vacant(entry) => {
*entry.insert(self.get_block_hash(height).await?)
}
};
// transform fetched chain into the update chain
fetched_blocks
// we exclude anything at or below the first cp of the update chain otherwise
// building the chain will fail
.split_off(&(first_cp.height() + 1))
.into_iter()
.map(|(height, hash)| BlockId { height, hash })
.fold(first_cp, |prev_cp, block| {
prev_cp.push(block).expect("must extend checkpoint")
})
};

// We have found point of agreement so the update will connect!
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What if no point of agreement is found? Is this possible?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should never happen because the genesis block must connect! However, let's say hypothetically, the genesis doesn't connect (because the wallet and esplora are on different networks), the update formed will try replace the genesis, resulting in the local_chain::MissingGenesisError which is the error that we want!

if fetched_hash == local_hash {
break;
}
}

Ok(local_chain::Update {
tip,
tip: CheckPoint::from_block_ids(fetched_blocks.into_iter().map(BlockId::from))
.expect("must be in height order"),
introduce_older_blocks: true,
})
}
Expand Down
95 changes: 23 additions & 72 deletions crates/esplora/src/blocking_ext.rs
Original file line number Diff line number Diff line change
@@ -1,15 +1,15 @@
use std::thread::JoinHandle;

use bdk_chain::collections::btree_map;
use bdk_chain::collections::{BTreeMap, BTreeSet};
use bdk_chain::collections::BTreeMap;
use bdk_chain::{
bitcoin::{BlockHash, OutPoint, ScriptBuf, Txid},
local_chain::{self, CheckPoint},
BlockId, ConfirmationTimeHeightAnchor, TxGraph,
};
use esplora_client::{Error, TxStatus};

use crate::{anchor_from_status, ASSUME_FINAL_DEPTH};
use crate::anchor_from_status;

/// Trait to extend the functionality of [`esplora_client::BlockingClient`].
///
Expand Down Expand Up @@ -78,10 +78,11 @@ impl EsploraExt for esplora_client::BlockingClient {
local_tip: CheckPoint,
request_heights: impl IntoIterator<Item = u32>,
) -> Result<local_chain::Update, Error> {
let request_heights = request_heights.into_iter().collect::<BTreeSet<_>>();
let new_tip_height = self.get_height()?;

// atomically fetch blocks from esplora
// Atomically fetch latest blocks from Esplora. This way, we avoid creating an update with
// an inconsistent set of blocks (assuming that a reorg depth cannot be greater than the
// latest blocks fetched).
let mut fetched_blocks = {
let heights = (0..=new_tip_height).rev();
let hashes = self
Expand All @@ -91,89 +92,39 @@ impl EsploraExt for esplora_client::BlockingClient {
heights.zip(hashes).collect::<BTreeMap<u32, BlockHash>>()
};

// fetch heights that the caller is interested in
// fetch blocks of heights that the caller is interested in, reusing latest blocks that are
// already fetched.
for height in request_heights {
// do not fetch blocks higher than remote tip
if height > new_tip_height {
continue;
}
// only fetch what is missing
if let btree_map::Entry::Vacant(entry) = fetched_blocks.entry(height) {
let hash = self.get_block_hash(height)?;
entry.insert(hash);
entry.insert(self.get_block_hash(height)?);
}
}

// find the earliest point of agreement between local chain and fetched chain
let earliest_agreement_cp = {
let mut earliest_agreement_cp = Option::<CheckPoint>::None;

let local_tip_height = local_tip.height();
for local_cp in local_tip.iter() {
let local_block = local_cp.block_id();

// the updated hash (block hash at this height after the update), can either be:
// 1. a block that already existed in `fetched_blocks`
// 2. a block that exists locally and at least has a depth of ASSUME_FINAL_DEPTH
// 3. otherwise we can freshly fetch the block from remote, which is safe as it
// is guaranteed that this would be at or below ASSUME_FINAL_DEPTH from the
// remote tip
let updated_hash = match fetched_blocks.entry(local_block.height) {
btree_map::Entry::Occupied(entry) => *entry.get(),
btree_map::Entry::Vacant(entry) => *entry.insert(
if local_tip_height - local_block.height >= ASSUME_FINAL_DEPTH {
local_block.hash
} else {
self.get_block_hash(local_block.height)?
},
),
};

// since we may introduce blocks below the point of agreement, we cannot break
// here unconditionally - we only break if we guarantee there are no new heights
// below our current local checkpoint
if local_block.hash == updated_hash {
earliest_agreement_cp = Some(local_cp);

let first_new_height = *fetched_blocks
.keys()
.next()
.expect("must have at least one new block");
if first_new_height >= local_block.height {
break;
}
}
// Ensure `fetched_blocks` can create an update that connects with the original chain.
for (height, local_hash) in local_tip.iter().map(|cp| (cp.height(), cp.hash())) {
if height > new_tip_height {
continue;
}

earliest_agreement_cp
};

let tip = {
// first checkpoint to use for the update chain
let first_cp = match earliest_agreement_cp {
Some(cp) => cp,
None => {
let (&height, &hash) = fetched_blocks
.iter()
.next()
.expect("must have at least one new block");
CheckPoint::new(BlockId { height, hash })
}
let fetched_hash = match fetched_blocks.entry(height) {
btree_map::Entry::Occupied(entry) => *entry.get(),
btree_map::Entry::Vacant(entry) => *entry.insert(self.get_block_hash(height)?),
};
// transform fetched chain into the update chain
fetched_blocks
// we exclude anything at or below the first cp of the update chain otherwise
// building the chain will fail
.split_off(&(first_cp.height() + 1))
.into_iter()
.map(|(height, hash)| BlockId { height, hash })
.fold(first_cp, |prev_cp, block| {
prev_cp.push(block).expect("must extend checkpoint")
})
};

// We have found point of agreement so the update will connect!
if fetched_hash == local_hash {
break;
}
}

Ok(local_chain::Update {
tip,
tip: CheckPoint::from_block_ids(fetched_blocks.into_iter().map(BlockId::from))
.expect("must be in height order"),
introduce_older_blocks: true,
})
}
Expand Down
2 changes: 0 additions & 2 deletions crates/esplora/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,6 @@ mod async_ext;
#[cfg(feature = "async")]
pub use async_ext::*;

const ASSUME_FINAL_DEPTH: u32 = 15;

fn anchor_from_status(status: &TxStatus) -> Option<ConfirmationTimeHeightAnchor> {
if let TxStatus {
block_height: Some(height),
Expand Down