From efeb3a4e857d38f0669a4b7f9ab8f005a3926a1c Mon Sep 17 00:00:00 2001 From: Greg Schoeninger Date: Mon, 22 Jul 2024 10:03:12 -0700 Subject: [PATCH] writing block level dedup to sharded files (rocksdb was too slow) --- TODO.md | 9 +- src/cli/src/cmd/db.rs | 9 +- src/cli/src/cmd/db/count.rs | 39 ++ src/cli/src/cmd/db/get.rs | 10 +- src/cli/src/cmd/db/list.rs | 14 +- src/cli/src/main.rs | 4 +- src/lib/src/command/db.rs | 91 +++- .../migrate/m05_optimize_merkle_tree.rs | 9 +- src/lib/src/core/db/merkle_node_db.rs | 10 +- src/lib/src/core/db/opts.rs | 28 +- src/lib/src/core/db/u128_kv_db.rs | 10 + src/lib/src/core/df/tabular.rs | 6 +- src/lib/src/core/index/file_chunker.rs | 389 +++++++++++++++++- .../core/index/merkle_tree/node/file_node.rs | 40 +- src/lib/src/io/chunk_reader.rs | 6 +- src/lib/src/util/hasher.rs | 12 + 16 files changed, 624 insertions(+), 62 deletions(-) create mode 100644 src/cli/src/cmd/db/count.rs diff --git a/TODO.md b/TODO.md index 554c275c6..b021fbb37 100644 --- a/TODO.md +++ b/TODO.md @@ -4,10 +4,12 @@ * Contribute back to Polars the fix for * /Users/gregschoeninger/.cargo/registry/src/index.crates.io-6f17d22bba15001f/polars-io-0.41.2/src/mmap.rs * Biggest question is if we can reconstruct and render the large data frames efficiently...? - * Can optionally unpack to full file on specific commits + * yes. + * can update the download api to use ChunkReader + * Can optionally "unpack" to full file or index on specific commits * Always use chunks * This helps us write to the merkle tree in parallel on upload - * If !chunking turned on in branch, we unpack chunks into full files or write to duckdb or s3 or w/e + * If unpacking is turned on in branch, we unpack chunks into full files or write to duckdb or s3 or w/e * File Chunks * Can get rid of FileChunkNode type since it's just a file we store on disk now * TODO L8R - Can you smartly chunk parquet files on the boundaries of columns? @@ -32,4 +34,5 @@ * Storage Backends * Local Backend - * S3 Backend \ No newline at end of file + * S3 Backend + * This would be ridiculous # of files if chunking is turned on... \ No newline at end of file diff --git a/src/cli/src/cmd/db.rs b/src/cli/src/cmd/db.rs index 7fd24d9ff..0872cb502 100644 --- a/src/cli/src/cmd/db.rs +++ b/src/cli/src/cmd/db.rs @@ -8,6 +8,9 @@ use crate::cmd::RunCmd; pub const NAME: &str = "db"; +pub mod count; +pub use count::DbCountCmd; + pub mod get; pub use get::DbGetCmd; @@ -60,7 +63,11 @@ impl RunCmd for DbCmd { impl DbCmd { fn get_subcommands(&self) -> HashMap> { - let commands: Vec> = vec![Box::new(DbListCmd), Box::new(DbGetCmd)]; + let commands: Vec> = vec![ + Box::new(DbListCmd), + Box::new(DbGetCmd), + Box::new(DbCountCmd), + ]; let mut runners: HashMap> = HashMap::new(); for cmd in commands { runners.insert(cmd.name().to_string(), cmd); diff --git a/src/cli/src/cmd/db/count.rs b/src/cli/src/cmd/db/count.rs new file mode 100644 index 000000000..0a7ebd9b7 --- /dev/null +++ b/src/cli/src/cmd/db/count.rs @@ -0,0 +1,39 @@ +use std::path::PathBuf; + +use async_trait::async_trait; +use clap::{Arg, Command}; + +use liboxen::command; +use liboxen::error::OxenError; + +use crate::cmd::RunCmd; +pub const NAME: &str = "count"; + +pub struct DbCountCmd; + +#[async_trait] +impl RunCmd for DbCountCmd { + fn name(&self) -> &str { + NAME + } + + fn args(&self) -> Command { + // Setups the CLI args for the command + Command::new(NAME) + .about("List the full key value database.") + .arg(Arg::new("PATH").help("The path of the database.")) + } + + async fn run(&self, args: &clap::ArgMatches) -> Result<(), OxenError> { + // Parse Args + let Some(path) = args.get_one::("PATH") else { + return Err(OxenError::basic_str("Must supply path")); + }; + + let count = command::db::count(PathBuf::from(path))?; + + println!("There are {} entries in the database", count); + + Ok(()) + } +} diff --git a/src/cli/src/cmd/db/get.rs b/src/cli/src/cmd/db/get.rs index afda1aa8c..7f9416d69 100644 --- a/src/cli/src/cmd/db/get.rs +++ b/src/cli/src/cmd/db/get.rs @@ -18,6 +18,13 @@ impl RunCmd for DbGetCmd { .about("Get a value from the database given a key.") .arg(Arg::new("PATH").help("The path of the database.")) .arg(Arg::new("KEY").help("The key to get the value for.")) + .arg( + Arg::new("dtype") + .short('d') + .long("dtype") + .help("The data type of the key.") + .default_value("str"), + ) } async fn run(&self, args: &clap::ArgMatches) -> Result<(), OxenError> { @@ -30,7 +37,8 @@ impl RunCmd for DbGetCmd { return Err(OxenError::basic_str(error)); }; - let value = command::db::get(path, key)?; + let dtype = args.get_one::("dtype").map(|x| x.as_str()); + let value = command::db::get(path, key, dtype)?; println!("{}", value); Ok(()) diff --git a/src/cli/src/cmd/db/list.rs b/src/cli/src/cmd/db/list.rs index e30cedd11..ecce851e0 100644 --- a/src/cli/src/cmd/db/list.rs +++ b/src/cli/src/cmd/db/list.rs @@ -22,6 +22,12 @@ impl RunCmd for DbListCmd { Command::new(NAME) .about("List the full key value database.") .arg(Arg::new("PATH").help("The path of the database.")) + .arg( + Arg::new("limit") + .short('l') + .long("limit") + .help("The maximum number of entries to list"), + ) } async fn run(&self, args: &clap::ArgMatches) -> Result<(), OxenError> { @@ -30,11 +36,11 @@ impl RunCmd for DbListCmd { return Err(OxenError::basic_str("Must supply path")); }; - let result = command::db::list(PathBuf::from(path))?; + let limit = args + .get_one::("limit") + .map(|x| x.parse::().expect("limit must be valid size")); - for (key, value) in result { - println!("{key}\t{value}"); - } + command::db::list(PathBuf::from(path), limit)?; Ok(()) } diff --git a/src/cli/src/main.rs b/src/cli/src/main.rs index c61628f47..9dc213c78 100644 --- a/src/cli/src/main.rs +++ b/src/cli/src/main.rs @@ -2,14 +2,14 @@ use std::collections::HashMap; use std::process::ExitCode; use clap::Command; -use env_logger::Env; +// use env_logger::Env; pub mod cmd; pub mod helpers; #[tokio::main] async fn main() -> ExitCode { - env_logger::init_from_env(Env::default()); + env_logger::builder().format_timestamp_micros().init(); let cmds: Vec> = vec![ Box::new(cmd::AddCmd), diff --git a/src/lib/src/command/db.rs b/src/lib/src/command/db.rs index b4b884b7f..24eaf7ca5 100644 --- a/src/lib/src/command/db.rs +++ b/src/lib/src/command/db.rs @@ -4,32 +4,51 @@ //! use crate::error::OxenError; +use crate::util::progress_bar::spinner_with_msg; use rocksdb::{IteratorMode, LogLevel, Options, DB}; use std::path::Path; use std::str; /// List the key -> value pairs in a database -pub fn list(path: impl AsRef) -> Result, OxenError> { +pub fn list(path: impl AsRef, limit: Option) -> Result<(), OxenError> { let path = path.as_ref(); let mut opts = Options::default(); opts.set_log_level(LogLevel::Fatal); - let mut result: Vec<(String, String)> = Vec::new(); - let db = DB::open_for_read_only(&opts, dunce::simplified(path), false)?; let iter = db.iterator(IteratorMode::Start); + let mut count = 0; for item in iter { + if let Some(limit) = limit { + if count >= limit { + break; + } + } + match item { Ok((key, value)) => { - let Ok(key) = str::from_utf8(&key) else { - continue; + let key = if let Ok(key) = str::from_utf8(&key) { + key.to_string() + } else { + // deserialize as u128 + if key.len() == 16 { + let key: [u8; 16] = (*key).try_into().map_err(|_| { + OxenError::basic_str("Could not convert key to [u8; 16]") + })?; + let key = u128::from_le_bytes(key); + format!("{}", key) + } else { + return Err(OxenError::basic_str( + "Could not read iterate over db values", + )); + } }; if let Ok(val) = str::from_utf8(&value) { - result.push((key.to_string(), val.to_string())); + println!("{key}\t{val}"); } else { - result.push((key.to_string(), "".to_string())); + println!("{key}\t"); } } _ => { @@ -38,22 +57,68 @@ pub fn list(path: impl AsRef) -> Result, OxenError> )); } } + count += 1; } - Ok(result) + println!("{} total entries", count); + + Ok(()) } -// Get a value from a database -pub fn get(path: impl AsRef, key: impl AsRef) -> Result { +/// Count the values in a database +pub fn count(path: impl AsRef) -> Result { let path = path.as_ref(); - let key = key.as_ref(); + let opts = Options::default(); + log::debug!("Opening db at {:?}", path); + let db = DB::open_for_read_only(&opts, dunce::simplified(path), false)?; + log::debug!("Opened db at {:?}", path); + let iter = db.iterator(IteratorMode::Start); + log::debug!("Iterating over db at {:?}", path); + let progress = spinner_with_msg(format!("Counting db at {:?}", path)); + let mut count = 0; + for _ in iter { + count += 1; + progress.inc(1); + progress.set_message(format!("{} entries", count)); + } + progress.finish_and_clear(); + Ok(count) +} + +/// Get a value from a database +pub fn get( + path: impl AsRef, + key: impl AsRef, + dtype: Option<&str>, +) -> Result { + let path = path.as_ref(); + let str_key = key.as_ref(); let mut opts = Options::default(); opts.set_log_level(LogLevel::Fatal); + let key = if let Some(dtype) = dtype { + if dtype == "u128" { + let key = str_key.parse::()?; + key.to_le_bytes().to_vec() + } else { + str_key.as_bytes().to_vec() + } + } else { + str_key.as_bytes().to_vec() + }; + + log::debug!("Opening db at {:?}", path); let db = DB::open_for_read_only(&opts, dunce::simplified(path), false)?; + log::debug!("Opened db at {:?}", path); + if let Some(value) = db.get(key)? { - Ok(String::from_utf8(value)?) + log::debug!("Got value from db at {:?}", path); + if let Ok(value) = str::from_utf8(&value) { + Ok(value.to_string()) + } else { + Ok(format!("<{} bytes>", value.len())) + } } else { - Err(OxenError::basic_str(format!("Key {} not found", key))) + Err(OxenError::basic_str(format!("Key {} not found", str_key))) } } diff --git a/src/lib/src/command/migrate/m05_optimize_merkle_tree.rs b/src/lib/src/command/migrate/m05_optimize_merkle_tree.rs index 058667b1e..6865d55d8 100644 --- a/src/lib/src/command/migrate/m05_optimize_merkle_tree.rs +++ b/src/lib/src/command/migrate/m05_optimize_merkle_tree.rs @@ -6,7 +6,7 @@ use super::Migrate; use crate::core::db::merkle_node_db::MerkleNodeDB; use crate::core::db::tree_db::{TreeObject, TreeObjectChild}; use crate::core::db::{self, str_val_db}; -use crate::core::index::file_chunker::FileChunker; +use crate::core::index::file_chunker::{ChunkShardManager, FileChunker}; use crate::core::index::merkle_tree::node::*; use crate::core::index::{CommitEntryReader, CommitReader}; use crate::error::OxenError; @@ -317,7 +317,7 @@ fn migrate_dir( path: file_name.to_owned(), }; let uhash = u128::from_str_radix(hash, 16).expect("Failed to parse hex string"); - println!("Bucket [{}] Val [{}] {} for {:?}", i, j, hash, val); + // println!("Bucket [{}] Val [{}] {} for {:?}", i, j, hash, val); tree_db.write_one(uhash, MerkleTreeNodeType::Dir, &val)?; migrate_dir(repo, reader, file_name, hash)?; } @@ -328,7 +328,7 @@ fn migrate_dir( path: file_name.to_owned(), }; let uhash = u128::from_str_radix(hash, 16).expect("Failed to parse hex string"); - println!("Bucket [{}] Val [{}] {} for {:?}", i, j, hash, val); + // println!("Bucket [{}] Val [{}] {} for {:?}", i, j, hash, val); tree_db.write_one(uhash, MerkleTreeNodeType::Schema, &val)?; } } @@ -369,7 +369,8 @@ fn migrate_file( path.display() )))?; let chunker = FileChunker::new(repo); - let chunks = chunker.save_chunks(&commit_entry)?; + let mut csm = ChunkShardManager::new(repo)?; + let chunks = chunker.save_chunks(&commit_entry, &mut csm)?; // Then start refactoring the commands into a "legacy" module so we can still make the old // dbs but start implementing them with the new merkle object diff --git a/src/lib/src/core/db/merkle_node_db.rs b/src/lib/src/core/db/merkle_node_db.rs index 385847138..1209078ca 100644 --- a/src/lib/src/core/db/merkle_node_db.rs +++ b/src/lib/src/core/db/merkle_node_db.rs @@ -328,11 +328,11 @@ impl MerkleNodeDB { let mut buf = Vec::new(); item.serialize(&mut Serializer::new(&mut buf)).unwrap(); let data_len = buf.len() as u64; - log::debug!("--write_one-- dtype {:?}", dtype); - log::debug!("--write_one-- hash {:x}", hash); - log::debug!("--write_one-- data_offset {}", self.data_offset); - log::debug!("--write_one-- data_len {}", data_len); - log::debug!("--write_one-- item {:?}", item); + // log::debug!("--write_one-- dtype {:?}", dtype); + // log::debug!("--write_one-- hash {:x}", hash); + // log::debug!("--write_one-- data_offset {}", self.data_offset); + // log::debug!("--write_one-- data_len {}", data_len); + // log::debug!("--write_one-- item {:?}", item); lookup_file.write_all(&dtype.to_u8().to_le_bytes())?; lookup_file.write_all(&hash.to_le_bytes())?; diff --git a/src/lib/src/core/db/opts.rs b/src/lib/src/core/db/opts.rs index 7af3af5ae..8704ba703 100644 --- a/src/lib/src/core/db/opts.rs +++ b/src/lib/src/core/db/opts.rs @@ -1,4 +1,7 @@ -use rocksdb::{LogLevel, Options}; +use rocksdb::{ + BlockBasedIndexType, BlockBasedOptions, Cache, DBCompactionStyle, DBCompressionType, LogLevel, + Options, SliceTransform, +}; pub fn default() -> Options { let mut opts = Options::default(); @@ -12,5 +15,28 @@ pub fn default() -> Options { let max_open_files = std::env::var("MAX_OPEN_FILES") .map_or(128, |v| v.parse().expect("MAX_OPEN_FILES must be a number")); opts.set_max_open_files(max_open_files); + + // From Claude Anthropic + // opts.set_bloom_locality(10); + // opts.set_max_open_files(-1); // Use as many as the OS allows + // opts.set_compaction_style(DBCompactionStyle::Level); + // opts.increase_parallelism(num_cpus::get() as i32); + // opts.set_prefix_extractor(SliceTransform::create_fixed_prefix(5)); + // opts.set_level_compaction_dynamic_level_bytes(true); + // opts.set_disable_auto_compactions(true); + // opts.set_compression_type(DBCompressionType::Lz4); + // let mut block_opts = BlockBasedOptions::default(); + // block_opts.set_index_type(BlockBasedIndexType::TwoLevelIndexSearch); + // opts.set_block_based_table_factory(&block_opts); + + opts.set_compression_type(DBCompressionType::Zstd); + opts.set_compaction_style(DBCompactionStyle::Level); + opts.set_target_file_size_base(16 * 1024 * 1024); // 16MB + opts.set_write_buffer_size(16 * 1024 * 1024); // 16MB + opts.set_max_bytes_for_level_base(16 * 1024 * 1024); // 16MB + let mut block_opts = BlockBasedOptions::default(); + block_opts.set_index_type(BlockBasedIndexType::TwoLevelIndexSearch); + opts.set_block_based_table_factory(&block_opts); + opts } diff --git a/src/lib/src/core/db/u128_kv_db.rs b/src/lib/src/core/db/u128_kv_db.rs index 9b5c44101..3abab4505 100644 --- a/src/lib/src/core/db/u128_kv_db.rs +++ b/src/lib/src/core/db/u128_kv_db.rs @@ -71,3 +71,13 @@ where } } } + +pub fn put_buf( + db: &DBWithThreadMode, + key: u128, + entry: &[u8], +) -> Result<(), OxenError> { + let key = key.to_be_bytes().to_vec(); + db.put(key, entry)?; + Ok(()) +} diff --git a/src/lib/src/core/df/tabular.rs b/src/lib/src/core/df/tabular.rs index 4d2db9809..3c51e7c8d 100644 --- a/src/lib/src/core/df/tabular.rs +++ b/src/lib/src/core/df/tabular.rs @@ -945,7 +945,7 @@ pub fn show_node( Ok(df) => { log::debug!("Finished reading chunked parquet"); Ok(df) - }, + } err => Err(OxenError::basic_str(format!( "Could not read chunked parquet: {:?}", err @@ -961,7 +961,7 @@ pub fn show_node( Ok(df) => { log::debug!("Finished reading chunked arrow"); Ok(df) - }, + } err => Err(OxenError::basic_str(format!( "Could not read chunked arrow: {:?}", err @@ -975,7 +975,7 @@ pub fn show_node( Ok(df) => { log::debug!("Finished reading line delimited json"); Ok(df) - }, + } err => Err(OxenError::basic_str(format!( "Could not read chunked json: {:?}", err diff --git a/src/lib/src/core/index/file_chunker.rs b/src/lib/src/core/index/file_chunker.rs index c745915ba..00d04ec6e 100644 --- a/src/lib/src/core/index/file_chunker.rs +++ b/src/lib/src/core/index/file_chunker.rs @@ -12,18 +12,345 @@ //! * Time to query the file //! +use rand::Rng; +use serde::{Deserialize, Serialize}; +use std::collections::HashMap; use std::fs::File; use std::io::Read; use std::io::Write; +use std::path::{Path, PathBuf}; +use rocksdb::BlockBasedOptions; +use rocksdb::DBCompactionStyle; +use rocksdb::DBCompressionType; +use rocksdb::DBWithThreadMode; +use rocksdb::MultiThreaded; +use rocksdb::SingleThreaded; +use rocksdb::WaitForCompactOptions; + +use crate::constants::CHUNKS_DIR; +use crate::constants::TREE_DIR; +use crate::core::db; +use crate::core::db::u128_kv_db; use crate::error::OxenError; use crate::model::CommitEntry; use crate::model::LocalRepository; use crate::util; use crate::util::hasher; +use crate::util::progress_bar::oxen_progress_bar; +use crate::util::progress_bar::ProgressBarType; // static chunk size of 16kb pub const CHUNK_SIZE: usize = 16 * 1024; +const SHARD_CAPACITY: usize = 1000 * CHUNK_SIZE; + +/// Chunk Shard DB keeps track of which hash belongs in which shard file +/// Is a simple kv pair from u128 hash to a u32 shard file number +/// Each shard file contains ~1000 hashes and their associated chunk data. +/// When a shard gets too big we close it and start a new one. +pub struct ChunkShardDB { + db: DBWithThreadMode, +} + +impl ChunkShardDB { + fn db_path(repo: &LocalRepository) -> PathBuf { + util::fs::oxen_hidden_dir(&repo.path) + .join(TREE_DIR) + .join(Path::new(CHUNKS_DIR)) + } + + pub fn new(repo: &LocalRepository) -> Result { + let path = Self::db_path(repo); + let mut opts = db::opts::default(); + + // let mut block_opts = BlockBasedOptions::default(); + // block_opts.set_index_type(BlockBasedIndexType::HashSearch); + // opts.set_block_based_table_factory(&block_opts); + let db = DBWithThreadMode::open(&opts, dunce::simplified(&path))?; + Ok(Self { db }) + } + + pub fn has_key(&self, hash: u128) -> bool { + u128_kv_db::has_key(&self.db, hash) + } + + pub fn put(&self, hash: u128, shard_idx: u32) -> Result<(), OxenError> { + let value = shard_idx.to_le_bytes(); + u128_kv_db::put_buf(&self.db, hash, &value)?; + Ok(()) + } +} + +/// ChunkShardIndex is the index at the top of the shard file +#[derive(Serialize, Deserialize)] +pub struct ChunkShardIndex { + pub hash_offsets: HashMap, +} + +impl ChunkShardIndex { + pub fn new() -> Self { + Self { + hash_offsets: HashMap::new(), + } + } +} + +pub struct ChunkShardFile { + pub path: PathBuf, + pub file: File, + pub index: ChunkShardIndex, + pub buffer_offset: usize, + pub buffer: Vec, +} + +impl ChunkShardFile { + pub fn db_path(repo: &LocalRepository) -> PathBuf { + let path = util::fs::oxen_hidden_dir(&repo.path) + .join(TREE_DIR) + .join("shards"); + path + } + + pub fn shard_path(repo: &LocalRepository, file_idx: u32) -> PathBuf { + let path = Self::db_path(repo); + path.join(format!("shard_{}", file_idx)) + } + + pub fn open_existing( + repo: &LocalRepository, + file_idx: u32, + ) -> Result { + log::debug!("Opening shard file: {:?}", Self::shard_path(repo, file_idx)); + let path = Self::shard_path(repo, file_idx); + let file = File::open(&path)?; + // allocate the data buffer + let mut shard_file = ChunkShardFile { + path, + file, + index: ChunkShardIndex::new(), + buffer_offset: 0, + buffer: Vec::new(), + }; + shard_file.read_index()?; + shard_file.read_data()?; + Ok(shard_file) + } + + pub fn create(repo: &LocalRepository, file_idx: u32) -> Result { + log::debug!( + "Creating shard file: {:?}", + Self::shard_path(repo, file_idx) + ); + let path = Self::shard_path(repo, file_idx); + let file = File::create(&path)?; + let index = ChunkShardIndex::new(); + Ok(ChunkShardFile { + path, + file, + index, + buffer_offset: 0, + buffer: vec![0; SHARD_CAPACITY], + }) + } + + pub fn has_capacity(&self, buffer_len: usize) -> bool { + (self.buffer_offset + buffer_len) < SHARD_CAPACITY + } + + pub fn add_buffer(&mut self, hash: u128, buffer: &[u8]) -> Result<(), OxenError> { + if !self.has_capacity(buffer.len()) { + return Err(OxenError::basic_str("Shard is full")); + } + + self.index + .hash_offsets + .insert(hash, self.buffer_offset as u16); + self.buffer[self.buffer_offset..self.buffer_offset + buffer.len()].copy_from_slice(buffer); + self.buffer_offset += buffer.len(); + Ok(()) + } + + pub fn read_index(&mut self) -> Result<(), OxenError> { + // read the index size + let mut buffer = [0u8; 4]; // u32 is 4 bytes + self.file.read(&mut buffer)?; + let index_size = u32::from_le_bytes(buffer) as usize; + + let mut index_bytes = vec![0u8; index_size]; + self.file.read(&mut index_bytes)?; + self.index = bincode::deserialize(&index_bytes)?; + + log::debug!( + "Read index of size {} with {:?} hashes", + bytesize::ByteSize::b(index_size as u64), + self.index.hash_offsets.len() + ); + + Ok(()) + } + + pub fn read_data(&mut self) -> Result<(), OxenError> { + // read the buffer size + let mut buffer = [0u8; 4]; // u32 is 4 bytes + self.file.read(&mut buffer)?; + self.buffer_offset = u32::from_le_bytes(buffer) as usize; + + log::debug!("read data with {:?} bytes", self.buffer_offset); + + // read the buffer + let mut buffer = vec![0u8; self.buffer_offset]; + self.file.read(&mut buffer)?; + + // Allocate the full size for the buffer + self.buffer = vec![0u8; SHARD_CAPACITY]; + // Copy the data into the buffer + self.buffer[..self.buffer_offset].copy_from_slice(&buffer); + Ok(()) + } + + pub fn save(&mut self) -> Result<(), OxenError> { + log::debug!("Saving shard file: {:?}", self.path); + // Overwrite existing file + self.file = File::create(&self.path)?; + // write the index to the file + let index_bytes = bincode::serialize(&self.index)?; + log::debug!("Saving shard index: {:?}", index_bytes.len()); + // write index size to the file + self.file + .write_all(&(index_bytes.len() as u32).to_le_bytes())?; + // write index to the file + self.file.write_all(&index_bytes)?; + // write the data size + self.file + .write_all(&(self.buffer_offset as u32).to_le_bytes())?; + // write only the data that has been written + let data = &self.buffer[..self.buffer_offset]; + log::debug!("Saving shard data: {:?}", data.len()); + self.file.write_all(data)?; + self.file.sync_all()?; + log::debug!("Saved shard file: {:?}", self.path); + Ok(()) + } +} + +/// ChunkShardManager reads how many shards we have and moves to the next one if the current one is full +pub struct ChunkShardManager { + repo: LocalRepository, + db: ChunkShardDB, + current_idx: u32, + current_file: ChunkShardFile, +} + +impl ChunkShardManager { + pub fn new(repo: &LocalRepository) -> Result { + log::debug!("Opening chunk shard manager"); + // find all the current shard files + let shard_dir = ChunkShardFile::db_path(repo); + if !shard_dir.exists() { + util::fs::create_dir_all(&shard_dir)?; + } + let mut shard_paths: Vec = std::fs::read_dir(&shard_dir)? + .map(|x| x.unwrap().path()) + .collect::>(); + + // sort the shard paths by the file index + shard_paths.sort_by(|a, b| { + let a_idx = a + .file_stem() + .unwrap() + .to_str() + .unwrap() + .split('_') + .nth(1) + .unwrap() + .parse::() + .unwrap(); + let b_idx = b + .file_stem() + .unwrap() + .to_str() + .unwrap() + .split('_') + .nth(1) + .unwrap() + .parse::() + .unwrap(); + a_idx.cmp(&b_idx) + }); + + let mut current_idx = 0; + let mut current_file: Option = None; + for path in shard_paths { + log::debug!("Opening shard file: {:?}", path); + let file_idx = path + .file_stem() + .unwrap() + .to_str() + .unwrap() + .split('_') + .nth(1) + .unwrap() + .parse::()?; + if let Ok(shard_file) = ChunkShardFile::open_existing(repo, file_idx) { + log::debug!("Opened shard file: {:?}", path); + if shard_file.has_capacity(CHUNK_SIZE) { + log::debug!("Shard [{}] has capacity, using it", file_idx); + current_idx = file_idx; + current_file = Some(shard_file); + } + } + // else { + // log::debug!("Failed to open shard file: {:?}", path); + // current_idx = file_idx; + // current_file = Some(ChunkShardFile::create(repo, file_idx)?); + // } + } + + if current_file.is_none() { + log::debug!( + "Creating new shard file: {:?}", + ChunkShardFile::shard_path(repo, current_idx) + ); + current_file = Some(ChunkShardFile::create(repo, current_idx)?); + } + + log::debug!("Current shard index: {:?}", current_idx); + + let chunk_db = ChunkShardDB::new(repo)?; + Ok(Self { + repo: repo.clone(), + current_idx, + current_file: current_file.unwrap(), + db: chunk_db, + }) + } + + pub fn has_chunk(&self, hash: u128) -> bool { + self.db.has_key(hash) + } + + pub fn write_chunk(&mut self, hash: u128, chunk: &[u8]) -> Result { + // log::debug!("Writing chunk {} -> {} to shard: [{}]", hash, chunk.len(), self.current_idx); + // Save the lookup from hash to shard_idx + self.db.put(hash, self.current_idx)?; + // Add the chunk to the current file + self.current_file.add_buffer(hash, chunk)?; + // If the file is full, save it and start a new one + if !self.current_file.has_capacity(chunk.len()) { + log::debug!("Shard file is full, saving {}", self.current_idx); + self.current_file.save()?; + self.current_idx += 1; + log::debug!("Shard file is full, starting new one {}", self.current_idx); + self.current_file = ChunkShardFile::create(&self.repo, self.current_idx)?; + } + Ok(self.current_idx) + } + + pub fn save_all(&mut self) -> Result<(), OxenError> { + self.current_file.save()?; + Ok(()) + } +} pub struct FileChunker { repo: LocalRepository, @@ -34,13 +361,21 @@ impl FileChunker { Self { repo: repo.clone() } } - pub fn save_chunks(&self, entry: &CommitEntry) -> Result, OxenError> { + pub fn save_chunks( + &self, + entry: &CommitEntry, + csm: &mut ChunkShardManager, + ) -> Result, OxenError> { let version_file = util::fs::version_path(&self.repo, entry); - let mut read_file = File::open(version_file)?; + let mut read_file = File::open(&version_file)?; + + // Create a progress bar + println!("Saving chunks for {:?}", entry.path); + let progress_bar = oxen_progress_bar(entry.num_bytes, ProgressBarType::Bytes); // Read/Write chunks let mut buffer = vec![0; CHUNK_SIZE]; // 16KB buffer - let mut hashes: Vec = Vec::new(); + let mut hashes: Vec<(u128, u32)> = Vec::new(); while let Ok(bytes_read) = read_file.read(&mut buffer) { if bytes_read == 0 { break; // End of file @@ -49,28 +384,40 @@ impl FileChunker { buffer.truncate(bytes_read); // Process the buffer here - // println!("Read {} bytes from {:?}", bytes_read, version_file); - let hash = hasher::hash_buffer_128bit(&buffer); - let shash = format!("{:x}", hash); + // log::debug!("Read {} bytes from {:?}", bytes_read, version_file); + // let hash = hasher::hash_buffer_128bit(&buffer); + // let shash = format!("{:x}", hash); - // Write the chunk to disk if it doesn't exist - let output_path = util::fs::chunk_path(&self.repo, &shash); - if let Some(parent) = output_path.parent() { - if !parent.exists() { - util::fs::create_dir_all(parent)?; - } - } + // // Write the chunk to disk if it doesn't exist + // let output_path = util::fs::chunk_path(&self.repo, &shash); + // if let Some(parent) = output_path.parent() { + // if !parent.exists() { + // util::fs::create_dir_all(parent)?; + // } + // } - if !output_path.exists() { - let mut output_file = File::create(output_path)?; - let bytes_written = output_file.write(&buffer)?; - if bytes_written != bytes_read { - return Err(OxenError::basic_str("Failed to write all bytes to chunk")); - } - } + // if !output_path.exists() { + // let mut output_file = File::create(output_path)?; + // let bytes_written = output_file.write(&buffer)?; + // if bytes_written != bytes_read { + // return Err(OxenError::basic_str("Failed to write all bytes to chunk")); + // } + // } + + // TODO: Where to check if the chunk already exists? - hashes.push(hash); + // Save the chunk to the database + let hash = hasher::hash_buffer_128bit(&buffer); + if !csm.has_chunk(hash) { + let shard_idx = csm.write_chunk(hash, &buffer)?; + hashes.push((hash, shard_idx)); + } + progress_bar.inc(bytes_read as u64); } + println!("Saved {} chunks for {:?}", hashes.len(), entry.path); + + // Flush the progress to disk + csm.save_all()?; Ok(hashes) } diff --git a/src/lib/src/core/index/merkle_tree/node/file_node.rs b/src/lib/src/core/index/merkle_tree/node/file_node.rs index d9e54099b..4fef239ef 100644 --- a/src/lib/src/core/index/merkle_tree/node/file_node.rs +++ b/src/lib/src/core/index/merkle_tree/node/file_node.rs @@ -17,11 +17,49 @@ pub struct FileNode { pub last_modified_nanoseconds: u32, // File chunks - pub chunk_hashes: Vec, + pub chunk_hashes: Vec<(u128, u32)>, // TODO: We should look at the stat for other data to have here. Such as file permissions, etc. // https://man7.org/linux/man-pages/man1/stat.1.html // FUTURE IDEAS: + + // CONFIGURE THE REPO FOR BLOB STORAGE (S3, etc) or LOCAL CHUNKS + // * In order to support FineWeb scale data, we should just have the raw files + // offloaded to S3, then be able to piped back through the server + + // Another cool thing about chunks...is we can push them without saving them locally + // We should always unpack to chunk dir instead of the tmp file as well + + /* + + Rocksdb is too slow with a single db after inserting over 100k nodes. + + Our goal is to have a quick way to look up where a u128 hash is stored + in the chunks db. + + Maybe store the chunks in sharded files 16MB that they can quickly look up into. + + If we maintain one file that is a map from u128 hash to the chunk file and offset + we can very quickly look up the chunk file and offset for a given hash. + + Say we had 1TB of unique data, we would have 6,2500,000 hashes which would be + 6,2500,000 * 16 bytes = 1GB of hashes in memory + or if we have keys and values more like 2GB in memory. The values would be the files they live in. + + HashMap where key is the hash and the value is file it lives in. + TODO: Test if RocksDB can handle 6,250,000 entries of 16 bytes keys to 4 bytes values. + + Then we can iterate over the chunks, check if the current shard is too large, + if it is, we close it and generate a new shard. The next unique chunks get + written there. + + So now we have one index file at the top. That knows the mapping from hash -> shard. + + Then individual shard files that know the mapping from hash -> offset in the file. + Should these have index and data files or just all be in one struct? I like the idea of separate. + These individual shard files can be 16MB and live in memory until we flush them. + + */ // The data is always going to be chunked and stored locally // On the server it might be unpacked into a full file, duckdb database, or s3 // On the server we want to ability to update how the file is stored in order diff --git a/src/lib/src/io/chunk_reader.rs b/src/lib/src/io/chunk_reader.rs index dca460f68..ca878a118 100644 --- a/src/lib/src/io/chunk_reader.rs +++ b/src/lib/src/io/chunk_reader.rs @@ -58,7 +58,7 @@ impl Read for ChunkReader { ); if self.offset >= self.node.num_bytes { self.offset = 0; - return Ok(0) + return Ok(0); } // FileNode has a vector of chunks @@ -85,7 +85,8 @@ impl Read for ChunkReader { // Find the hashed chunk file let chunk_hash = self.node.chunk_hashes[chunk_index as usize]; - let hash_str = format!("{:x}", chunk_hash); + let hash_str = format!("{:x}", chunk_hash.0); + todo!("use chunk_hash.1 to find the shard file"); let chunk_path = util::fs::chunk_path(&self.repo, &hash_str); log::debug!( @@ -125,7 +126,6 @@ impl Read for ChunkReader { self.offset += bytes_to_copy; log::debug!("Total read {:?}/{}", total_read, buf.len()); log::debug!("-end- Offset {:?} / {}", self.offset, self.node.num_bytes); - } log::debug!("--END-- Total read {:?}", total_read); diff --git a/src/lib/src/util/hasher.rs b/src/lib/src/util/hasher.rs index db8e0d12d..b7e073a2f 100644 --- a/src/lib/src/util/hasher.rs +++ b/src/lib/src/util/hasher.rs @@ -8,6 +8,8 @@ use std::io::BufReader; use std::path::Path; use xxhash_rust::xxh3::{xxh3_128, Xxh3}; +use super::progress_bar::spinner_with_msg; + pub fn hash_buffer(buffer: &[u8]) -> String { let val = xxh3_128(buffer); format!("{val:x}") @@ -133,9 +135,12 @@ fn hash_large_file_contents(path: &Path) -> Result { OxenError::basic_str(format!("Could not open file {:?} due to {:?}", path, err)) })?; + let progress = spinner_with_msg(format!("Hashing large file...")); + let mut reader = BufReader::new(file); let mut hasher = Xxh3::new(); let mut buffer = [0; 4096]; + let mut total_bytes: u64 = 0; loop { let count = reader.read(&mut buffer).map_err(|_| { @@ -148,6 +153,13 @@ fn hash_large_file_contents(path: &Path) -> Result { } hasher.update(&buffer[..count]); + progress.inc(count as u64); + total_bytes += count as u64; + progress.set_message(format!( + "Hashing {:?} bytes {:?}", + bytesize::ByteSize::b(total_bytes), + path + )); } let result = hasher.digest128();