Skip to content

Commit

Permalink
fix(providers): collect BlockState before constructing DB provider
Browse files Browse the repository at this point in the history
  • Loading branch information
Rjected committed Sep 30, 2024
1 parent 65f7e88 commit c743be2
Show file tree
Hide file tree
Showing 2 changed files with 38 additions and 2 deletions.
33 changes: 32 additions & 1 deletion crates/chain-state/src/in_memory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ use reth_storage_api::StateProviderBox;
use reth_trie::{updates::TrieUpdates, HashedPostState};
use std::{collections::BTreeMap, sync::Arc, time::Instant};
use tokio::sync::{broadcast, watch};
use tracing::trace;

/// Size of the broadcast channel used to notify canonical state events.
const CANON_STATE_NOTIFICATION_CHANNEL_SIZE: usize = 256;
Expand Down Expand Up @@ -295,6 +296,7 @@ impl CanonicalInMemoryState {
/// This will update the links between blocks and remove all blocks that are [..
/// `persisted_height`].
pub fn remove_persisted_blocks(&self, persisted_num_hash: BlockNumHash) {
trace!(target: "providers::blockchain", ?persisted_num_hash, "Removing persisted blocks");
// if the persisted hash is not in the canonical in memory state, do nothing, because it
// means canonical blocks were not actually persisted.
//
Expand All @@ -310,6 +312,7 @@ impl CanonicalInMemoryState {
// acquire locks, starting with the numbers lock
let mut numbers = self.inner.in_memory_state.numbers.write();
let mut blocks = self.inner.in_memory_state.blocks.write();
trace!(target: "providers::blockchain", ?persisted_num_hash, "Clearing tree, only keeping new blocks");

let BlockNumHash { number: persisted_height, hash: _ } = persisted_num_hash;

Expand All @@ -320,7 +323,14 @@ impl CanonicalInMemoryState {
let mut old_blocks = blocks
.drain()
.map(|(_, b)| b.block.clone())
.filter(|b| b.block().number > persisted_height)
.filter(|b| {
if b.block().number > persisted_height {
true
} else {
trace!(target: "providers::blockchain", ?persisted_height, num=?b.block.number, "Removing persisted block from CanonicalInMemoryState");
false
}
})
.collect::<Vec<_>>();

// sort the blocks by number so we can insert them back in natural order (low -> high)
Expand Down Expand Up @@ -494,6 +504,23 @@ impl CanonicalInMemoryState {
self.inner.canon_state_notification_sender.send(event).ok();
}

/// Return state provider with reference to in-memory blocks that overlay database state.
///
/// This merges the state of all blocks that are part of the chain that the requested block is
/// the head of. This includes all blocks that connect back to the canonical block on disk.
pub fn state_provider_from_state(
&self,
state: &BlockState,
historical: StateProviderBox,
) -> MemoryOverlayStateProvider {
trace!(target: "providers::blockchain", state_hash=?state.hash(), "Constructing memory overlay state provider from state");
let in_memory: Vec<_> =
state.chain().into_iter().map(|block_state| block_state.block()).collect();

trace!(target: "providers::blockchain", first_mem_state=?in_memory.first().map(|block| block.block.num_hash()), last_mem_state=?in_memory.last().map(|block| block.block.num_hash()), "Constructed memory overlay state provider from state");
MemoryOverlayStateProvider::new(historical, in_memory)
}

/// Return state provider with reference to in-memory blocks that overlay database state.
///
/// This merges the state of all blocks that are part of the chain that the requested block is
Expand All @@ -503,12 +530,16 @@ impl CanonicalInMemoryState {
hash: B256,
historical: StateProviderBox,
) -> MemoryOverlayStateProvider {
trace!(target: "providers::blockchain", ?hash, "Constructing memory overlay state provider");
let in_memory = if let Some(state) = self.state_by_hash(hash) {
trace!(target: "providers::blockchain", ?hash, "Found in-memory state for overlay");
state.chain().into_iter().map(|block_state| block_state.block()).collect()
} else {
trace!(target: "providers::blockchain", ?hash, "No in-memory state for overlay");
Vec::new()
};

trace!(target: "providers::blockchain", first_mem_state=?in_memory.first().map(|block| block.block.num_hash()), last_mem_state=?in_memory.last().map(|block| block.block.num_hash()), ?hash, "Constructed memory overlay state provider");
MemoryOverlayStateProvider::new(historical, in_memory)
}

Expand Down
7 changes: 6 additions & 1 deletion crates/storage/provider/src/providers/blockchain_provider.rs
Original file line number Diff line number Diff line change
Expand Up @@ -175,9 +175,12 @@ impl<N: ProviderNodeTypes> BlockchainProvider2<N> {
state: impl AsRef<BlockState>,
) -> ProviderResult<MemoryOverlayStateProvider> {
let state = state.as_ref();
trace!(target: "providers::blockchain", "Calling anchor for fetching historical state provider");
let anchor_hash = state.anchor().hash;
trace!(target: "providers::blockchain", ?anchor_hash, "Fetching historical provider for block hash");
let latest_historical = self.database.history_by_block_hash(anchor_hash)?;
Ok(self.canonical_in_memory_state.state_provider(state.hash(), latest_historical))
trace!(target: "providers::blockchain", ?anchor_hash, "Fetched state provider for block hash");
Ok(self.canonical_in_memory_state.state_provider_from_state(state, latest_historical))
}

/// Returns:
Expand Down Expand Up @@ -1116,8 +1119,10 @@ impl<N: ProviderNodeTypes> StateProviderFactory for BlockchainProvider2<N> {
// This could be tracked by a block in the database block
Ok(state)
} else if let Some(state) = self.canonical_in_memory_state.state_by_hash(block_hash) {
trace!(target: "providers::blockchain", ?block_hash, "Fetched state, now constructing block state provider");
// ... or this could be tracked by the in memory state
let state_provider = self.block_state_provider(state)?;
trace!(target: "providers::blockchain", ?block_hash, "Returning historical overlay state provider");
Ok(Box::new(state_provider))
} else {
// if we couldn't find it anywhere, then we should return an error
Expand Down

0 comments on commit c743be2

Please sign in to comment.