Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

opt: do not copy pages before bitbox writeout #655

Merged
merged 1 commit into from
Jan 2, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions nomt/src/bitbox/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -126,9 +126,9 @@ impl DB {
&self,
sync_seqn: u32,
page_pool: &PagePool,
changes: Vec<(PageId, BucketIndex, Option<(FatPage, PageDiff)>)>,
changes: Vec<(PageId, BucketIndex, Option<(Arc<FatPage>, PageDiff)>)>,
wal_blob_builder: &mut WalBlobBuilder,
) -> Vec<(u64, FatPage)> {
) -> Vec<(u64, Arc<FatPage>)> {
wal_blob_builder.reset(sync_seqn);

let mut meta_map = self.shared.meta_map.write();
Expand Down Expand Up @@ -173,7 +173,7 @@ impl DB {
let mut buf = page_pool.alloc_fat_page();
buf[..].copy_from_slice(meta_map.page_slice(changed_meta_page));
let pn = self.shared.store.meta_bytes_index(changed_meta_page as u64);
ht_pages.push((pn, buf));
ht_pages.push((pn, Arc::new(buf)));
}

if cfg!(debug_assertions) {
Expand Down Expand Up @@ -207,7 +207,7 @@ pub struct SyncController {
/// The channel to receive the result of the WAL writeout.
wal_result_rx: Receiver<anyhow::Result<()>>,
/// The pages along with their page numbers to write out to the HT file.
ht_to_write: Arc<Mutex<Option<Vec<(u64, FatPage)>>>>,
ht_to_write: Arc<Mutex<Option<Vec<(u64, Arc<FatPage>)>>>>,
}

impl SyncController {
Expand Down
5 changes: 3 additions & 2 deletions nomt/src/bitbox/writeout.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ use std::{
fs::File,
io::{Seek as _, SeekFrom, Write},
os::fd::AsRawFd as _,
sync::Arc,
};

use crate::io::{FatPage, IoCommand, IoHandle, IoKind};
Expand All @@ -30,15 +31,15 @@ pub(super) fn truncate_wal(mut wal_fd: &File) -> anyhow::Result<()> {
pub(super) fn write_ht(
io_handle: IoHandle,
ht_fd: &File,
mut ht: Vec<(u64, FatPage)>,
mut ht: Vec<(u64, Arc<FatPage>)>,
) -> anyhow::Result<()> {
let mut sent = 0;

ht.sort_unstable_by_key(|item| item.0);
for (pn, page) in ht {
io_handle
.send(IoCommand {
kind: IoKind::Write(ht_fd.as_raw_fd(), pn, page),
kind: IoKind::WriteArc(ht_fd.as_raw_fd(), pn, page),
user_data: 0,
})
.unwrap();
Expand Down
1 change: 0 additions & 1 deletion nomt/src/io/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ pub use page_pool::{FatPage, PagePool};
pub enum IoKind {
Read(RawFd, u64, FatPage),
Write(RawFd, u64, FatPage),
#[allow(dead_code)]
WriteArc(RawFd, u64, Arc<FatPage>),
WriteRaw(RawFd, u64, Page),
}
Expand Down
8 changes: 4 additions & 4 deletions nomt/src/page_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -398,7 +398,7 @@ impl PageCache {
) {
let mut apply_page = |page_id,
bucket: &mut Option<BucketIndex>,
page_data: Option<&FatPage>,
page_data: Option<&Arc<FatPage>>,
page_diff: PageDiff| {
match (page_data, *bucket) {
(None, Some(known_bucket)) => {
Expand All @@ -410,7 +410,7 @@ impl PageCache {
*bucket = None;
}
(Some(page), maybe_bucket) if !page_diff.cleared() => {
let new_bucket = tx.write_page(page_id, maybe_bucket, page, page_diff);
let new_bucket = tx.write_page(page_id, maybe_bucket, page.clone(), page_diff);
*bucket = Some(new_bucket);
}
_ => {} // empty pages which had no known bucket. don't write or delete.
Expand Down Expand Up @@ -441,7 +441,7 @@ impl PageCache {
apply_page(
updated_page.page_id,
bucket,
page_data.as_ref().map(|x| &**x),
page_data.as_ref(),
updated_page.diff,
);
continue;
Expand All @@ -462,7 +462,7 @@ impl PageCache {
apply_page(
updated_page.page_id,
bucket,
cache_entry.page_data.as_ref().map(|x| &**x),
cache_entry.page_data.as_ref(),
updated_page.diff,
);
}
Expand Down
15 changes: 3 additions & 12 deletions nomt/src/store/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,6 @@ struct Shared {
values: beatree::Tree,
pages: bitbox::DB,
rollback: Option<Rollback>,
page_pool: PagePool,
io_pool: IoPool,
meta_fd: File,
#[allow(unused)]
Expand Down Expand Up @@ -179,7 +178,6 @@ impl Store {
))),
shared: Arc::new(Shared {
rollback,
page_pool,
values,
pages,
io_pool,
Expand Down Expand Up @@ -292,9 +290,8 @@ impl ValueTransaction {
/// An atomic transaction on merkle tree pages to be applied against the store
/// with [`Store::commit`].
pub struct MerkleTransaction {
pub(crate) page_pool: PagePool,
pub(crate) bucket_allocator: bitbox::BucketAllocator,
pub(crate) new_pages: Vec<(PageId, BucketIndex, Option<(FatPage, PageDiff)>)>,
pub(crate) new_pages: Vec<(PageId, BucketIndex, Option<(Arc<FatPage>, PageDiff)>)>,
}

impl MerkleTransaction {
Expand All @@ -303,20 +300,14 @@ impl MerkleTransaction {
&mut self,
page_id: PageId,
bucket: Option<BucketIndex>,
page: &FatPage,
page: Arc<FatPage>,
page_diff: PageDiff,
) -> BucketIndex {
let bucket_index =
bucket.unwrap_or_else(|| self.bucket_allocator.allocate(page_id.clone()));

// Perform a deep clone of the page. For that allocate a new page and copy the data over.
//
// TODO: get rid of this copy.
let mut new_page = self.page_pool.alloc_fat_page();
new_page.copy_from_slice(page);

self.new_pages
.push((page_id, bucket_index, Some((new_page, page_diff))));
.push((page_id, bucket_index, Some((page, page_diff))));
bucket_index
}

Expand Down
1 change: 0 additions & 1 deletion nomt/src/store/sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,6 @@ impl Sync {
let mut rollback_sync = rollback.map(|rollback| rollback.sync());

let merkle_tx = MerkleTransaction {
page_pool: shared.page_pool.clone(),
bucket_allocator: bitbox.bucket_allocator(),
new_pages: Vec::new(),
};
Expand Down
Loading