Skip to content

Commit

Permalink
Add truncate_at which drops offset inclusively, fixes zowens#29.
Browse files Browse the repository at this point in the history
  • Loading branch information
davlum committed Apr 17, 2021
1 parent 22fed7c commit ac229e7
Show file tree
Hide file tree
Showing 3 changed files with 59 additions and 10 deletions.
2 changes: 1 addition & 1 deletion src/file_set.rs
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,7 @@ impl FileSet {
// offset = 6
// [0 5 10 15] => split key 5
//
// midpoint is then used as the active index/segment pair
// midpoint is then used as the active index/segment pair
let split_key = match self.closed.range(..=offset).next_back().map(|p| p.0).cloned() {
Some(key) => {
trace!("File set split key for truncation {}", key);
Expand Down
12 changes: 6 additions & 6 deletions src/index.rs
Original file line number Diff line number Diff line change
Expand Up @@ -322,18 +322,18 @@ impl Index {
fs::remove_file(path)
}

/// Truncates to an offset, inclusive. The file length of the
/// Truncates to an offset, exclusive. The file length of the
/// segment for truncation is returned.
pub fn truncate(&mut self, offset: Offset) -> Option<u32> {
// find the next offset position in order to inform
// the truncation of the segment
let next_pos = match self.find_index_pos(offset + 1) {
let next_pos = match self.find_index_pos(offset) {
Some(i) => {
trace!("Found offset mem offset {}", i);
i
}
None => {
trace!("No offset {} found in index", offset + 1);
trace!("No offset {} found in index", offset);
return None;
}
};
Expand All @@ -347,7 +347,7 @@ impl Index {
// truncation. This likely occurs when the last offset is the offset
// requested for truncation OR the offset for truncation is > than the
// last offset.
if u64::from(off) + self.base_offset <= offset {
if u64::from(off) + self.base_offset < offset {
trace!("Truncated to exact segment boundary, no need to truncate segment");
return None;
}
Expand Down Expand Up @@ -973,7 +973,7 @@ mod tests {
buf.push(14, 50);
index.append(buf).unwrap();

let file_len = index.truncate(12);
let file_len = index.truncate(13);
assert_eq!(Some(40), file_len);
assert_eq!(13, index.next_offset());
assert_eq!(3 * INDEX_ENTRY_BYTES, index.next_write_pos);
Expand All @@ -998,7 +998,7 @@ mod tests {
buf.push(14, 50);
index.append(buf).unwrap();

let file_len = index.truncate(14);
let file_len = index.truncate(15);
assert_eq!(None, file_len);
assert_eq!(15, index.next_offset());
assert_eq!(5 * INDEX_ENTRY_BYTES, index.next_write_pos);
Expand Down
55 changes: 52 additions & 3 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -486,15 +486,20 @@ impl CommitLog {
}

/// Truncates a file after the offset supplied. The resulting log will
/// contain entries up to the offset.
/// contain entries up to and including the offset.
pub fn truncate(&mut self, offset: Offset) -> io::Result<()> {
self.truncate_at(offset + 1)
}

/// Truncates a file at the offset supplied. The resulting log will
/// contain entries up to but not including the offset.
pub fn truncate_at(&mut self, offset: Offset) -> io::Result<()> {
info!("Truncating log to offset {}", offset);

// remove index/segment files rolled after the offset
let to_remove = self.file_set.take_after(offset);
for p in to_remove {
trace!("Removing segment and index starting at {}", p.0.starting_offset());
assert!(p.0.starting_offset() > offset);

p.0.remove()?;
p.1.remove()?;
Expand Down Expand Up @@ -878,7 +883,7 @@ mod tests {
],
);

// truncate to offset 2 (should remove 2 messages)
// truncate to offset 7 (should remove 0 messages)
log.truncate(7).expect("Unable to truncate file");

assert_eq!(Some(6), log.last_offset());
Expand All @@ -899,6 +904,50 @@ mod tests {
);
}

#[test]
pub fn truncate_at_zero_drops_whole_log() {
env_logger::try_init().unwrap_or(());
let dir = TestDir::new();

let mut opts = LogOptions::new(&dir);
opts.index_max_items(20);
opts.segment_max_bytes(52);
let mut log = CommitLog::new(opts).unwrap();

// append 6 messages (4 segments)
{
for _ in 0..7 {
log.append_msg(b"12345").unwrap();
}
}

// ensure we have the expected index/logs
expect_files(
&dir,
vec![
"00000000000000000000.index",
"00000000000000000000.log",
"00000000000000000002.log",
"00000000000000000002.index",
"00000000000000000004.log",
"00000000000000000004.index",
"00000000000000000006.log",
"00000000000000000006.index",
],
);

// truncate_at to offset 0 (should remove 7 messages)
log.truncate_at(0).expect("Unable to truncate file");

assert_eq!(None, log.last_offset());

// ensure we have the expected index/logs
expect_files(&dir, vec![
"00000000000000000000.index",
"00000000000000000000.log",
]);
}

fn expect_files<P: AsRef<Path>, I>(dir: P, files: I)
where
I: IntoIterator<Item = &'static str>,
Expand Down

0 comments on commit ac229e7

Please sign in to comment.