Skip to content

Commit

Permalink
[WIP] merklize into copied pages
Browse files Browse the repository at this point in the history
  • Loading branch information
rphmeier authored and pepyakin committed Jan 2, 2025
1 parent aa13633 commit a7dbd67
Show file tree
Hide file tree
Showing 10 changed files with 438 additions and 463 deletions.
4 changes: 2 additions & 2 deletions nomt/src/bitbox/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -231,7 +231,7 @@ impl SyncController {
sync_seqn: u32,
page_cache: PageCache,
mut merkle_tx: MerkleTransaction,
page_diffs: merkle::PageDiffs,
updated_pages: merkle::UpdatedPages,
) {
let page_pool = self.db.shared.page_pool.clone();
let bitbox = self.db.clone();
Expand All @@ -240,7 +240,7 @@ impl SyncController {
// UNWRAP: safe because begin_sync is called only once.
let wal_result_tx = self.wal_result_tx.take().unwrap();
self.db.shared.sync_tp.execute(move || {
page_cache.prepare_transaction(page_diffs.into_iter(), &mut merkle_tx);
page_cache.absorb_and_populate_transaction(updated_pages, &mut merkle_tx);

let mut wal_blob_builder = wal_blob_builder.lock();
let ht_pages = bitbox.prepare_sync(
Expand Down
9 changes: 4 additions & 5 deletions nomt/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -376,7 +376,7 @@ impl<T: HashAlgorithm> Nomt<T> {
let new_root = merkle_update.root;
self.shared.lock().root = new_root;
self.store
.commit(tx, self.page_cache.clone(), merkle_update.page_diffs)?;
.commit(tx, self.page_cache.clone(), merkle_update.updated_pages)?;

Ok((
new_root,
Expand Down Expand Up @@ -535,17 +535,16 @@ fn compute_root_node<H: NodeHasher>(page_cache: &PageCache) -> Node {
let Some(root_page) = page_cache.get(ROOT_PAGE_ID) else {
return TERMINATOR;
};
let read_pass = page_cache.new_read_pass();

// 3 cases.
// 1: root page is empty. in this case, root is the TERMINATOR.
// 2: root page has top two slots filled, but _their_ children are empty. root is a leaf.
// this is because internal nodes and leaf nodes would have items below.
// 3: root is an internal node.
let is_empty = |node_index| root_page.node(&read_pass, node_index) == TERMINATOR;
let is_empty = |node_index| root_page.node(node_index) == TERMINATOR;

let left = root_page.node(&read_pass, 0);
let right = root_page.node(&read_pass, 1);
let left = root_page.node(0);
let right = root_page.node(1);

if is_empty(0) && is_empty(1) {
TERMINATOR
Expand Down
26 changes: 13 additions & 13 deletions nomt/src/merkle/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ use crossbeam::channel::{self, Receiver, Sender};
use parking_lot::Mutex;

use nomt_core::{
page_id::PageId,
trie::{self, KeyPath, Node, ValueHash},
trie_pos::TriePosition,
};
Expand All @@ -18,7 +17,6 @@ use std::{collections::HashMap, sync::Arc};
use crate::{
io::PagePool,
page_cache::{PageCache, ShardIndex},
page_diff::PageDiff,
rw_pass_cell::WritePassEnvelope,
store::Store,
HashAlgorithm, Witness, WitnessedOperations, WitnessedPath, WitnessedRead, WitnessedWrite,
Expand All @@ -29,11 +27,13 @@ mod page_walker;
mod seek;
mod worker;

/// Page diffs produced by update workers.
pub struct PageDiffs(Vec<Vec<(PageId, PageDiff)>>);
pub use page_walker::UpdatedPage;

impl IntoIterator for PageDiffs {
type Item = (PageId, PageDiff);
/// Updated pages produced by update workers.
pub struct UpdatedPages(Vec<Vec<UpdatedPage>>);

impl IntoIterator for UpdatedPages {
type Item = UpdatedPage;
type IntoIter = std::iter::Flatten<<Vec<Vec<Self::Item>> as IntoIterator>::IntoIter>;

fn into_iter(self) -> Self::IntoIter {
Expand Down Expand Up @@ -238,7 +238,7 @@ impl UpdateHandle {
writes: Vec::new(),
});

let mut page_diffs = Vec::new();
let mut updated_pages = Vec::new();

let mut path_proof_offset = 0;
let mut witnessed_start = 0;
Expand All @@ -254,7 +254,7 @@ impl UpdateHandle {
new_root = Some(root);
}

page_diffs.push(output.page_diffs);
updated_pages.push(output.updated_pages);

// if the Commit worker collected the witnessed paths
// then we need to aggregate them
Expand Down Expand Up @@ -310,7 +310,7 @@ impl UpdateHandle {
// UNWRAP: one thread always produces the root.
Output {
root: new_root.unwrap(),
page_diffs: PageDiffs(page_diffs),
updated_pages: UpdatedPages(updated_pages),
witness: maybe_witness,
witnessed_operations: maybe_witnessed_ops,
}
Expand All @@ -321,8 +321,8 @@ impl UpdateHandle {
pub struct Output {
/// The new root.
pub root: Node,
/// All page-diffs from all worker threads. The covered sets of pages are disjoint.
pub page_diffs: PageDiffs,
/// All updated pages from all worker threads. The covered sets of pages are disjoint.
pub updated_pages: UpdatedPages,
/// Optional witness
pub witness: Option<Witness>,
/// Optional list of all witnessed operations.
Expand Down Expand Up @@ -350,15 +350,15 @@ enum RootPagePending {
struct WorkerOutput {
root: Option<Node>,
witnessed_paths: Option<Vec<(WitnessedPath, Option<trie::LeafData>, usize)>>,
page_diffs: Vec<(PageId, PageDiff)>,
updated_pages: Vec<UpdatedPage>,
}

impl WorkerOutput {
fn new(witness: bool) -> Self {
WorkerOutput {
root: None,
witnessed_paths: if witness { Some(Vec::new()) } else { None },
page_diffs: Vec::new(),
updated_pages: Vec::new(),
}
}
}
Expand Down
Loading

0 comments on commit a7dbd67

Please sign in to comment.