Skip to content

Commit

Permalink
writing block level dedup to sharded files (rocksdb was too slow)
Browse files Browse the repository at this point in the history
  • Loading branch information
gschoeni committed Jul 22, 2024
1 parent 1f48e0c commit efeb3a4
Show file tree
Hide file tree
Showing 16 changed files with 624 additions and 62 deletions.
9 changes: 6 additions & 3 deletions TODO.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,12 @@
* Contribute back to Polars the fix for
* /Users/gregschoeninger/.cargo/registry/src/index.crates.io-6f17d22bba15001f/polars-io-0.41.2/src/mmap.rs
* Biggest question is if we can reconstruct and render the large data frames efficiently...?
* Can optionally unpack to full file on specific commits
* yes.
* can update the download api to use ChunkReader
* Can optionally "unpack" to full file or index on specific commits
* Always use chunks
* This helps us write to the merkle tree in parallel on upload
* If !chunking turned on in branch, we unpack chunks into full files or write to duckdb or s3 or w/e
* If unpacking is turned on in branch, we unpack chunks into full files or write to duckdb or s3 or w/e
* File Chunks
* Can get rid of FileChunkNode type since it's just a file we store on disk now
* TODO L8R - Can you smartly chunk parquet files on the boundaries of columns?
Expand All @@ -32,4 +34,5 @@

* Storage Backends
* Local Backend
* S3 Backend
* S3 Backend
* This would be ridiculous # of files if chunking is turned on...
9 changes: 8 additions & 1 deletion src/cli/src/cmd/db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,9 @@ use crate::cmd::RunCmd;

pub const NAME: &str = "db";

pub mod count;
pub use count::DbCountCmd;

pub mod get;
pub use get::DbGetCmd;

Expand Down Expand Up @@ -60,7 +63,11 @@ impl RunCmd for DbCmd {

impl DbCmd {
fn get_subcommands(&self) -> HashMap<String, Box<dyn RunCmd>> {
let commands: Vec<Box<dyn RunCmd>> = vec![Box::new(DbListCmd), Box::new(DbGetCmd)];
let commands: Vec<Box<dyn RunCmd>> = vec![
Box::new(DbListCmd),
Box::new(DbGetCmd),
Box::new(DbCountCmd),
];
let mut runners: HashMap<String, Box<dyn RunCmd>> = HashMap::new();
for cmd in commands {
runners.insert(cmd.name().to_string(), cmd);
Expand Down
39 changes: 39 additions & 0 deletions src/cli/src/cmd/db/count.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
use std::path::PathBuf;

use async_trait::async_trait;
use clap::{Arg, Command};

use liboxen::command;
use liboxen::error::OxenError;

use crate::cmd::RunCmd;
pub const NAME: &str = "count";

pub struct DbCountCmd;

#[async_trait]
impl RunCmd for DbCountCmd {
fn name(&self) -> &str {
NAME
}

fn args(&self) -> Command {
// Setups the CLI args for the command
Command::new(NAME)
.about("List the full key value database.")
.arg(Arg::new("PATH").help("The path of the database."))
}

async fn run(&self, args: &clap::ArgMatches) -> Result<(), OxenError> {
// Parse Args
let Some(path) = args.get_one::<String>("PATH") else {
return Err(OxenError::basic_str("Must supply path"));
};

let count = command::db::count(PathBuf::from(path))?;

println!("There are {} entries in the database", count);

Ok(())
}
}
10 changes: 9 additions & 1 deletion src/cli/src/cmd/db/get.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,13 @@ impl RunCmd for DbGetCmd {
.about("Get a value from the database given a key.")
.arg(Arg::new("PATH").help("The path of the database."))
.arg(Arg::new("KEY").help("The key to get the value for."))
.arg(
Arg::new("dtype")
.short('d')
.long("dtype")
.help("The data type of the key.")
.default_value("str"),
)
}

async fn run(&self, args: &clap::ArgMatches) -> Result<(), OxenError> {
Expand All @@ -30,7 +37,8 @@ impl RunCmd for DbGetCmd {
return Err(OxenError::basic_str(error));
};

let value = command::db::get(path, key)?;
let dtype = args.get_one::<String>("dtype").map(|x| x.as_str());
let value = command::db::get(path, key, dtype)?;
println!("{}", value);

Ok(())
Expand Down
14 changes: 10 additions & 4 deletions src/cli/src/cmd/db/list.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,12 @@ impl RunCmd for DbListCmd {
Command::new(NAME)
.about("List the full key value database.")
.arg(Arg::new("PATH").help("The path of the database."))
.arg(
Arg::new("limit")
.short('l')
.long("limit")
.help("The maximum number of entries to list"),
)
}

async fn run(&self, args: &clap::ArgMatches) -> Result<(), OxenError> {
Expand All @@ -30,11 +36,11 @@ impl RunCmd for DbListCmd {
return Err(OxenError::basic_str("Must supply path"));
};

let result = command::db::list(PathBuf::from(path))?;
let limit = args
.get_one::<String>("limit")
.map(|x| x.parse::<usize>().expect("limit must be valid size"));

for (key, value) in result {
println!("{key}\t{value}");
}
command::db::list(PathBuf::from(path), limit)?;

Ok(())
}
Expand Down
4 changes: 2 additions & 2 deletions src/cli/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,14 @@ use std::collections::HashMap;
use std::process::ExitCode;

use clap::Command;
use env_logger::Env;
// use env_logger::Env;

pub mod cmd;
pub mod helpers;

#[tokio::main]
async fn main() -> ExitCode {
env_logger::init_from_env(Env::default());
env_logger::builder().format_timestamp_micros().init();

let cmds: Vec<Box<dyn cmd::RunCmd>> = vec![
Box::new(cmd::AddCmd),
Expand Down
91 changes: 78 additions & 13 deletions src/lib/src/command/db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,32 +4,51 @@
//!
use crate::error::OxenError;
use crate::util::progress_bar::spinner_with_msg;

use rocksdb::{IteratorMode, LogLevel, Options, DB};
use std::path::Path;
use std::str;

/// List the key -> value pairs in a database
pub fn list(path: impl AsRef<Path>) -> Result<Vec<(String, String)>, OxenError> {
pub fn list(path: impl AsRef<Path>, limit: Option<usize>) -> Result<(), OxenError> {
let path = path.as_ref();
let mut opts = Options::default();
opts.set_log_level(LogLevel::Fatal);

let mut result: Vec<(String, String)> = Vec::new();

let db = DB::open_for_read_only(&opts, dunce::simplified(path), false)?;
let iter = db.iterator(IteratorMode::Start);
let mut count = 0;
for item in iter {
if let Some(limit) = limit {
if count >= limit {
break;
}
}

match item {
Ok((key, value)) => {
let Ok(key) = str::from_utf8(&key) else {
continue;
let key = if let Ok(key) = str::from_utf8(&key) {
key.to_string()
} else {
// deserialize as u128
if key.len() == 16 {
let key: [u8; 16] = (*key).try_into().map_err(|_| {
OxenError::basic_str("Could not convert key to [u8; 16]")
})?;
let key = u128::from_le_bytes(key);
format!("{}", key)
} else {
return Err(OxenError::basic_str(
"Could not read iterate over db values",
));
}
};

if let Ok(val) = str::from_utf8(&value) {
result.push((key.to_string(), val.to_string()));
println!("{key}\t{val}");
} else {
result.push((key.to_string(), "<binary data>".to_string()));
println!("{key}\t<binary data>");
}
}
_ => {
Expand All @@ -38,22 +57,68 @@ pub fn list(path: impl AsRef<Path>) -> Result<Vec<(String, String)>, OxenError>
));
}
}
count += 1;
}

Ok(result)
println!("{} total entries", count);

Ok(())
}

// Get a value from a database
pub fn get(path: impl AsRef<Path>, key: impl AsRef<str>) -> Result<String, OxenError> {
/// Count the values in a database
pub fn count(path: impl AsRef<Path>) -> Result<usize, OxenError> {
let path = path.as_ref();
let key = key.as_ref();
let opts = Options::default();
log::debug!("Opening db at {:?}", path);
let db = DB::open_for_read_only(&opts, dunce::simplified(path), false)?;
log::debug!("Opened db at {:?}", path);
let iter = db.iterator(IteratorMode::Start);
log::debug!("Iterating over db at {:?}", path);
let progress = spinner_with_msg(format!("Counting db at {:?}", path));
let mut count = 0;
for _ in iter {
count += 1;
progress.inc(1);
progress.set_message(format!("{} entries", count));
}
progress.finish_and_clear();
Ok(count)
}

/// Get a value from a database
pub fn get(
path: impl AsRef<Path>,
key: impl AsRef<str>,
dtype: Option<&str>,
) -> Result<String, OxenError> {
let path = path.as_ref();
let str_key = key.as_ref();
let mut opts = Options::default();
opts.set_log_level(LogLevel::Fatal);

let key = if let Some(dtype) = dtype {
if dtype == "u128" {
let key = str_key.parse::<u128>()?;
key.to_le_bytes().to_vec()
} else {
str_key.as_bytes().to_vec()
}
} else {
str_key.as_bytes().to_vec()
};

log::debug!("Opening db at {:?}", path);
let db = DB::open_for_read_only(&opts, dunce::simplified(path), false)?;
log::debug!("Opened db at {:?}", path);

if let Some(value) = db.get(key)? {
Ok(String::from_utf8(value)?)
log::debug!("Got value from db at {:?}", path);
if let Ok(value) = str::from_utf8(&value) {
Ok(value.to_string())
} else {
Ok(format!("<{} bytes>", value.len()))
}
} else {
Err(OxenError::basic_str(format!("Key {} not found", key)))
Err(OxenError::basic_str(format!("Key {} not found", str_key)))
}
}
9 changes: 5 additions & 4 deletions src/lib/src/command/migrate/m05_optimize_merkle_tree.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use super::Migrate;
use crate::core::db::merkle_node_db::MerkleNodeDB;
use crate::core::db::tree_db::{TreeObject, TreeObjectChild};
use crate::core::db::{self, str_val_db};
use crate::core::index::file_chunker::FileChunker;
use crate::core::index::file_chunker::{ChunkShardManager, FileChunker};
use crate::core::index::merkle_tree::node::*;
use crate::core::index::{CommitEntryReader, CommitReader};
use crate::error::OxenError;
Expand Down Expand Up @@ -317,7 +317,7 @@ fn migrate_dir(
path: file_name.to_owned(),
};
let uhash = u128::from_str_radix(hash, 16).expect("Failed to parse hex string");
println!("Bucket [{}] Val [{}] {} for {:?}", i, j, hash, val);
// println!("Bucket [{}] Val [{}] {} for {:?}", i, j, hash, val);
tree_db.write_one(uhash, MerkleTreeNodeType::Dir, &val)?;
migrate_dir(repo, reader, file_name, hash)?;
}
Expand All @@ -328,7 +328,7 @@ fn migrate_dir(
path: file_name.to_owned(),
};
let uhash = u128::from_str_radix(hash, 16).expect("Failed to parse hex string");
println!("Bucket [{}] Val [{}] {} for {:?}", i, j, hash, val);
// println!("Bucket [{}] Val [{}] {} for {:?}", i, j, hash, val);
tree_db.write_one(uhash, MerkleTreeNodeType::Schema, &val)?;
}
}
Expand Down Expand Up @@ -369,7 +369,8 @@ fn migrate_file(
path.display()
)))?;
let chunker = FileChunker::new(repo);
let chunks = chunker.save_chunks(&commit_entry)?;
let mut csm = ChunkShardManager::new(repo)?;
let chunks = chunker.save_chunks(&commit_entry, &mut csm)?;

// Then start refactoring the commands into a "legacy" module so we can still make the old
// dbs but start implementing them with the new merkle object
Expand Down
10 changes: 5 additions & 5 deletions src/lib/src/core/db/merkle_node_db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -328,11 +328,11 @@ impl MerkleNodeDB {
let mut buf = Vec::new();
item.serialize(&mut Serializer::new(&mut buf)).unwrap();
let data_len = buf.len() as u64;
log::debug!("--write_one-- dtype {:?}", dtype);
log::debug!("--write_one-- hash {:x}", hash);
log::debug!("--write_one-- data_offset {}", self.data_offset);
log::debug!("--write_one-- data_len {}", data_len);
log::debug!("--write_one-- item {:?}", item);
// log::debug!("--write_one-- dtype {:?}", dtype);
// log::debug!("--write_one-- hash {:x}", hash);
// log::debug!("--write_one-- data_offset {}", self.data_offset);
// log::debug!("--write_one-- data_len {}", data_len);
// log::debug!("--write_one-- item {:?}", item);

lookup_file.write_all(&dtype.to_u8().to_le_bytes())?;
lookup_file.write_all(&hash.to_le_bytes())?;
Expand Down
28 changes: 27 additions & 1 deletion src/lib/src/core/db/opts.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,7 @@
use rocksdb::{LogLevel, Options};
use rocksdb::{
BlockBasedIndexType, BlockBasedOptions, Cache, DBCompactionStyle, DBCompressionType, LogLevel,

Check warning on line 2 in src/lib/src/core/db/opts.rs

View workflow job for this annotation

GitHub Actions / Check

unused imports: `Cache`, `SliceTransform`

Check failure on line 2 in src/lib/src/core/db/opts.rs

View workflow job for this annotation

GitHub Actions / Clippy

unused imports: `Cache`, `SliceTransform`
Options, SliceTransform,
};

pub fn default() -> Options {
let mut opts = Options::default();
Expand All @@ -12,5 +15,28 @@ pub fn default() -> Options {
let max_open_files = std::env::var("MAX_OPEN_FILES")
.map_or(128, |v| v.parse().expect("MAX_OPEN_FILES must be a number"));
opts.set_max_open_files(max_open_files);

// From Claude Anthropic
// opts.set_bloom_locality(10);
// opts.set_max_open_files(-1); // Use as many as the OS allows
// opts.set_compaction_style(DBCompactionStyle::Level);
// opts.increase_parallelism(num_cpus::get() as i32);
// opts.set_prefix_extractor(SliceTransform::create_fixed_prefix(5));
// opts.set_level_compaction_dynamic_level_bytes(true);
// opts.set_disable_auto_compactions(true);
// opts.set_compression_type(DBCompressionType::Lz4);
// let mut block_opts = BlockBasedOptions::default();
// block_opts.set_index_type(BlockBasedIndexType::TwoLevelIndexSearch);
// opts.set_block_based_table_factory(&block_opts);

opts.set_compression_type(DBCompressionType::Zstd);
opts.set_compaction_style(DBCompactionStyle::Level);
opts.set_target_file_size_base(16 * 1024 * 1024); // 16MB
opts.set_write_buffer_size(16 * 1024 * 1024); // 16MB
opts.set_max_bytes_for_level_base(16 * 1024 * 1024); // 16MB
let mut block_opts = BlockBasedOptions::default();
block_opts.set_index_type(BlockBasedIndexType::TwoLevelIndexSearch);
opts.set_block_based_table_factory(&block_opts);

opts
}
10 changes: 10 additions & 0 deletions src/lib/src/core/db/u128_kv_db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -71,3 +71,13 @@ where
}
}
}

pub fn put_buf<T: ThreadMode>(
db: &DBWithThreadMode<T>,
key: u128,
entry: &[u8],
) -> Result<(), OxenError> {
let key = key.to_be_bytes().to_vec();
db.put(key, entry)?;
Ok(())
}
Loading

0 comments on commit efeb3a4

Please sign in to comment.