Skip to content

Commit

Permalink
Checkpoint
Browse files Browse the repository at this point in the history
  • Loading branch information
kaimast committed Jan 21, 2025
1 parent f1841eb commit 5cf9e23
Show file tree
Hide file tree
Showing 6 changed files with 251 additions and 88 deletions.
39 changes: 27 additions & 12 deletions src/logic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ use crate::wal::{LogEntry, WriteAheadLog};
use crate::{Error, Key, Params, StartMode, WriteBatch, WriteOp, WriteOptions};

#[cfg(feature = "wisckey")]
use crate::values::{ValueLog, ValueRef};
use crate::values::{ValueFreelist, ValueLog, ValueRef};

use crate::data_blocks::DataEntry;

Expand Down Expand Up @@ -134,6 +134,9 @@ impl DbLogic {
let memtable;
let wal;

#[cfg(feature = "wisckey")]
let value_log;

if create {
cfg_if! {
if #[ cfg(feature="_async-io") ] {
Expand Down Expand Up @@ -162,6 +165,9 @@ impl DbLogic {
manifest = Arc::new(Manifest::new(params.clone()).await);
memtable = RwLock::new(MemtableRef::wrap(Memtable::new(1)));
wal = Arc::new(WriteAheadLog::new(params.clone()).await?);

value_log =
Arc::new(ValueLog::new(wal.clone(), params.clone(), manifest.clone()).await);
} else {
log::info!(
"Opening database folder at \"{}\"",
Expand All @@ -171,21 +177,26 @@ impl DbLogic {
manifest = Arc::new(Manifest::open(params.clone()).await?);

let mut mtable = Memtable::new(manifest.get_seq_number_offset().await);
wal = Arc::new(
WriteAheadLog::open(params.clone(), manifest.get_log_offset().await, &mut mtable)
.await?,
);

cfg_if! {
if #[cfg(feature="wisckey")] {
let mut freelist = ValueFreelist::open(params.clone(), manifest.clone()).await?;
wal = Arc::new(
WriteAheadLog::open(params.clone(), manifest.get_log_offset().await, &mut mtable, &mut freelist)
.await?,
);
value_log = Arc::new(ValueLog::open(wal.clone(), params.clone(), manifest.clone(), freelist).await?);
} else {
wal = Arc::new(
WriteAheadLog::open(params.clone(), manifest.get_log_offset().await, &mut mtable)
.await?,
);
}
}

memtable = RwLock::new(MemtableRef::wrap(mtable));
}

#[cfg(feature = "wisckey")]
let value_log = if create {
Arc::new(ValueLog::new(wal.clone(), params.clone(), manifest.clone()).await)
} else {
Arc::new(ValueLog::open(wal.clone(), params.clone(), manifest.clone()).await?)
};

let data_blocks = Arc::new(DataBlocks::new(params.clone(), manifest.clone()));

if params.num_levels == 0 {
Expand Down Expand Up @@ -601,6 +612,10 @@ impl DbLogic {
logger.l0_table_added();
}

// Sync all freelist changes to disk
#[cfg(feature = "wisckey")]
self.value_log.sync().await?;

// Then update manifest and flush WAL
let seq_offset = mem.get().get_next_seq_number();
self.manifest.set_seq_number_offset(seq_offset).await;
Expand Down
69 changes: 63 additions & 6 deletions src/values/freelist.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,9 +31,15 @@ struct FreelistPageHeader {
struct FreelistPage {
header: FreelistPageHeader,

/// True if some changes to this page might not
/// have been written to disk yet
dirty: bool,

/// Tracks the value batches covered by this page
/// and where they start in the bitmap
offsets: Vec<u16>,

/// The actual bitmap
entries: BitVec<u8>,
}

Expand All @@ -48,6 +54,7 @@ impl FreelistPage {
num_batches: 0,
num_entries: 0,
},
dirty: true,
offsets: Default::default(),
entries: Default::default(),
}
Expand Down Expand Up @@ -78,6 +85,7 @@ impl FreelistPage {
header,
offsets,
entries,
dirty: false,
})
}

Expand All @@ -88,7 +96,7 @@ impl FreelistPage {
/// Add a new value batch to this page
///
/// - Returns true if there was enoguh space
/// - Note, this will not update the page until you flush()
/// - Note, this will not update the page until calling sync()
pub fn expand(&mut self, num_entries: usize) -> bool {
const MAX_SIZE: usize = 4 * 1024;
assert!(num_entries < MAX_SIZE);
Expand All @@ -108,6 +116,7 @@ impl FreelistPage {

// We assume all entries are in use for a new batch
self.entries.resize(current_entries + num_entries, true);
self.dirty = true;

true
} else {
Expand All @@ -116,15 +125,21 @@ impl FreelistPage {
}

/// Write the current state of the page to the disk
pub async fn flush(&self, path: &Path) -> Result<(), Error> {
pub async fn sync(&mut self, path: &Path) -> Result<bool, Error> {
if !self.dirty {
return Ok(false);
}

let mut data = self.header.as_bytes().to_vec();
data.extend_from_slice(self.offsets.as_bytes());
data.extend_from_slice(self.entries.as_raw_slice());

disk::write(path, &data).await.map_err(|err| {
Error::from_io_error(format!("Failed to write freelist page at `{path:?}`"), err)
})?;
Ok(())

self.dirty = false;
Ok(true)
}

/// Are any of the values in this freelist still in use?
Expand Down Expand Up @@ -187,7 +202,7 @@ impl FreelistPage {
/// Mark a value as deleted
///
/// - Returns the offset within the page that got changed
/// - Changes will not be persisted until we flush()
/// - Changes will not be persisted until we sync()
pub fn mark_value_as_deleted(&mut self, vid: ValueId) -> u16 {
let (start_pos, end_pos) = self.get_batch_range(vid.0);

Expand All @@ -200,8 +215,22 @@ impl FreelistPage {
}

*marker = false;
self.dirty = true;

(start_pos as u16) + (vid.1 as u16)
}

pub fn unset_entry(&mut self, offset: u16) {
let mut marker = self
.entries
.get_mut(offset as usize)
.expect("Offset out of range");

if *marker {
*marker = false;
self.dirty = true;
}
}
}

/// Keeps track of which entries in the value log are still
Expand Down Expand Up @@ -255,6 +284,25 @@ impl ValueFreelist {
Ok(obj)
}

pub async fn sync(&self) -> Result<(), Error> {
let mut count = 0;
let mut pages = self.pages.write().await;

for (_, page) in pages.iter_mut() {
if !page.dirty {
continue;
}

let path = self.get_page_file_path(&page.get_identifier());
let updated = page.sync(&path).await?;
assert!(updated);
count += 1;
}

log::trace!("Flushed {count} freelist pages to disk");
Ok(())
}

pub async fn num_pages(&self) -> usize {
self.pages.read().await.len()
}
Expand Down Expand Up @@ -321,12 +369,12 @@ impl ValueFreelist {
let success = page.expand(num_entries);
assert!(success, "Data did not fit in new page?");
pages.push_back((batch_id, page));
&pages.back_mut().unwrap().1
&mut pages.back_mut().unwrap().1
};

// Persist changes to disk
let path = self.get_page_file_path(&p.get_identifier());
p.flush(&path).await?;
p.sync(&path).await?;
Ok(())
}

Expand Down Expand Up @@ -360,6 +408,15 @@ impl ValueFreelist {

Ok((page_id, offset))
}

pub async fn unset_entry(&self, page_id: FreelistPageId, offset: u16) {
let mut pages = self.pages.write().await;

match pages.binary_search_by_key(&page_id, |(_, p)| p.get_identifier()) {
Ok(idx) => pages[idx].1.unset_entry(offset),
Err(_) => panic!("No page with id={page_id}"),
}
}
}

#[cfg(test)]
Expand Down
7 changes: 5 additions & 2 deletions src/values/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -100,10 +100,9 @@ impl ValueLog {
wal: Arc<WriteAheadLog>,
params: Arc<Params>,
manifest: Arc<Manifest>,
freelist: ValueFreelist,
) -> Result<Self, Error> {
let batch_caches = Self::init_caches(&params);
let freelist = ValueFreelist::open(params.clone(), manifest.clone()).await?;

Ok(Self {
wal,
freelist,
Expand Down Expand Up @@ -260,4 +259,8 @@ impl ValueLog {

Ok(ValueBatch::get_ref(batch, offset))
}

pub async fn sync(&self) -> Result<(), Error> {
self.freelist.sync().await
}
}
72 changes: 54 additions & 18 deletions src/wal/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ use tokio_uring::fs::{File, OpenOptions};
use monoio::fs::{File, OpenOptions};

#[cfg(feature = "wisckey")]
use crate::values::FreelistPageId;
use crate::values::{FreelistPageId, ValueFreelist};

use crate::memtable::Memtable;
use crate::{Error, Params, WriteOp};
Expand Down Expand Up @@ -45,7 +45,7 @@ impl LogEntry<'_> {
match self {
Self::Write(_) => Self::WRITE,
#[cfg(feature = "wisckey")]
Self::ValueDeletion(_,_) => Self::VALUE_DELETION,
Self::ValueDeletion(_, _) => Self::VALUE_DELETION,
}
}
}
Expand Down Expand Up @@ -87,12 +87,37 @@ struct LogStatus {
stop_flag: bool,
}

impl LogStatus {
fn new(position: u64, start_position: u64) -> Self {
Self {
queue_pos: position,
write_pos: position,
sync_pos: position,
flush_pos: start_position,
offset_pos: start_position,
queue: vec![],
sync_flag: false,
stop_flag: false,
}
}
}

struct LogInner {
status: RwLock<LogStatus>,
queue_cond: Notify,
write_cond: Notify,
}

impl LogInner {
fn new(status: LogStatus) -> Self {
Self {
status: RwLock::new(status),
queue_cond: Default::default(),
write_cond: Default::default(),
}
}
}

/// The write-ahead log keeps track of the most recent changes
/// It can be used to recover from crashes
pub struct WriteAheadLog {
Expand Down Expand Up @@ -133,34 +158,45 @@ impl WriteAheadLog {
/// Open an existing log and insert entries into memtable
///
/// This is similar to `new` but fetches state from disk first.
#[cfg(feature = "wisckey")]
pub async fn open(
params: Arc<Params>,
start_position: u64,
memtable: &mut Memtable,
freelist: &mut ValueFreelist,
) -> Result<Self, Error> {
// This reads the file(s) in the current thread
// because we cannot send stuff between threads easily

let mut reader = WalReader::new(params.clone(), start_position).await?;
let position = reader.run(memtable).await?;

let status = LogStatus {
queue_pos: position,
write_pos: position,
sync_pos: position,
flush_pos: start_position,
offset_pos: start_position,
queue: vec![],
sync_flag: false,
stop_flag: false,
};
let position = reader.run(memtable, freelist).await?;

let inner = Arc::new(LogInner {
status: RwLock::new(status),
queue_cond: Default::default(),
write_cond: Default::default(),
});
let status = LogStatus::new(position, start_position);
let inner = Arc::new(LogInner::new(status));
let finish_receiver = Self::continue_writer(inner.clone(), position, params);

Ok(Self {
inner,
finish_receiver: Mutex::new(Some(finish_receiver)),
})
}

#[cfg(not(feature = "wisckey"))]
pub async fn open(
params: Arc<Params>,
start_position: u64,
memtable: &mut Memtable,
) -> Result<Self, Error> {
// This reads the file(s) in the current thread
// because we cannot send stuff between threads easily

let mut reader = WalReader::new(params.clone(), start_position).await?;

let position = reader.run(memtable).await?;

let status = LogStatus::new(position, start_position);
let inner = Arc::new(LogInner::new(status));
let finish_receiver = Self::continue_writer(inner.clone(), position, params);

Ok(Self {
Expand Down
Loading

0 comments on commit 5cf9e23

Please sign in to comment.