From c88176048b29e766fac74216967acbd9f3946327 Mon Sep 17 00:00:00 2001 From: Ben Artuso Date: Thu, 4 Jan 2024 19:27:10 -0500 Subject: [PATCH] wip - tweaking migrations --- src/cli/src/cmd_setup.rs | 43 ++++- src/cli/src/dispatch.rs | 20 +- src/cli/src/parse_and_run.rs | 10 +- src/lib/src/api/local/commits.rs | 2 +- src/lib/src/api/local/repositories.rs | 4 +- src/lib/src/command/migrate.rs | 3 +- .../command/migrate/cache_dataframe_size.rs | 5 + .../command/migrate/create_merkle_trees.rs | 48 ++++- .../src/command/migrate/propagate_schemas.rs | 5 + .../command/migrate/update_version_files.rs | 31 +++ src/lib/src/core/index.rs | 4 + .../src/core/index/commit_dir_entry_reader.rs | 180 +++++++++++++----- src/lib/src/core/index/commit_entry_reader.rs | 9 +- src/lib/src/core/index/commit_entry_writer.rs | 44 +++-- src/lib/src/core/index/commit_validator.rs | 6 +- .../index/legacy_commit_dir_entry_reader.rs | 83 ++++++++ .../core/index/legacy_commit_entry_reader.rs | 91 +++++++++ src/lib/src/model/commit.rs | 10 +- src/server/src/controllers/repositories.rs | 2 +- 19 files changed, 505 insertions(+), 95 deletions(-) create mode 100644 src/lib/src/core/index/legacy_commit_dir_entry_reader.rs create mode 100644 src/lib/src/core/index/legacy_commit_entry_reader.rs diff --git a/src/cli/src/cmd_setup.rs b/src/cli/src/cmd_setup.rs index 2bca84b6e..dcd01ae81 100644 --- a/src/cli/src/cmd_setup.rs +++ b/src/cli/src/cmd_setup.rs @@ -1,6 +1,7 @@ use clap::{arg, Arg, Command}; use liboxen::command::migrate::{ - CacheDataFrameSizeMigration, Migrate, PropagateSchemasMigration, UpdateVersionFilesMigration, + CacheDataFrameSizeMigration, CreateMerkleTreesMigration, Migrate, PropagateSchemasMigration, + UpdateVersionFilesMigration, }; use liboxen::constants::{DEFAULT_BRANCH_NAME, DEFAULT_REMOTE_NAME}; @@ -918,7 +919,25 @@ pub fn migrate() -> Command { ) .action(clap::ArgAction::SetTrue), ), - ), + ) + .subcommand( + Command::new(CreateMerkleTreesMigration.name()) + .about("Reformats the underlying data model into merkle trees for storage and lookup efficiency") + .arg( + Arg::new("PATH") + .help("Directory in which to apply the migration") + .required(true), + ) + .arg( + Arg::new("all") + .long("all") + .short('a') + .help( + "Run the migration for all oxen repositories in this directory", + ) + .action(clap::ArgAction::SetTrue), + ), + ) ) .subcommand( Command::new("down") @@ -977,7 +996,25 @@ pub fn migrate() -> Command { ) .action(clap::ArgAction::SetTrue), ), - ), + ) + .subcommand( + Command::new(CreateMerkleTreesMigration.name()) + .about("Reformats the underlying data model into merkle trees for storage and lookup efficiency") + .arg( + Arg::new("PATH") + .help("Directory in which to apply the migration") + .required(true), + ) + .arg( + Arg::new("all") + .long("all") + .short('a') + .help( + "Run the migration for all oxen repositories in this directory", + ) + .action(clap::ArgAction::SetTrue), + ), + ) ) } diff --git a/src/cli/src/dispatch.rs b/src/cli/src/dispatch.rs index cfae3ae45..773a1372f 100644 --- a/src/cli/src/dispatch.rs +++ b/src/cli/src/dispatch.rs @@ -1,5 +1,6 @@ use liboxen::api; use liboxen::command; +use liboxen::command::migrate::CreateMerkleTreesMigration; use liboxen::command::migrate::Migrate; use liboxen::command::migrate::UpdateVersionFilesMigration; use liboxen::config::{AuthConfig, UserConfig}; @@ -95,16 +96,19 @@ pub async fn check_remote_version_blocking(host: impl AsRef) -> Result<(), Ok(()) } -pub fn check_repo_migration_needed(_repo: &LocalRepository) -> Result<(), OxenError> { - let _migrations: Vec> = vec![Box::new(UpdateVersionFilesMigration)]; +pub fn check_repo_migration_needed(repo: &LocalRepository) -> Result<(), OxenError> { + let migrations: Vec> = vec![ + Box::new(UpdateVersionFilesMigration), + Box::new(CreateMerkleTreesMigration), + ]; - let migrations_needed: Vec> = Vec::new(); + let mut migrations_needed: Vec> = Vec::new(); - // for migration in migrations { - // if migration.is_needed(repo)? { - // migrations_needed.push(migration); - // } - // } + for migration in migrations { + if migration.is_needed(repo)? { + migrations_needed.push(migration); + } + } if migrations_needed.is_empty() { return Ok(()); diff --git a/src/cli/src/parse_and_run.rs b/src/cli/src/parse_and_run.rs index 4fb94a4d5..6aae862cf 100644 --- a/src/cli/src/parse_and_run.rs +++ b/src/cli/src/parse_and_run.rs @@ -8,7 +8,8 @@ use crate::cmd_setup::{ADD, COMMIT, DF, DIFF, DOWNLOAD, LOG, LS, METADATA, RESTO use crate::dispatch; use clap::ArgMatches; use liboxen::command::migrate::{ - CacheDataFrameSizeMigration, Migrate, PropagateSchemasMigration, UpdateVersionFilesMigration, + CacheDataFrameSizeMigration, CreateMerkleTreesMigration, Migrate, PropagateSchemasMigration, + UpdateVersionFilesMigration, }; use liboxen::constants::{DEFAULT_BRANCH_NAME, DEFAULT_HOST, DEFAULT_REMOTE_NAME}; use liboxen::error::OxenError; @@ -1083,6 +1084,13 @@ pub async fn migrate(sub_matches: &ArgMatches) { eprintln!("Error running migration: {}", err); std::process::exit(1); } + } else if migration == CreateMerkleTreesMigration.name() { + if let Err(err) = + run_migration(&CreateMerkleTreesMigration, direction, sub_matches) + { + eprintln!("Error running migration: {}", err); + std::process::exit(1); + } } else { eprintln!("Invalid migration: {}", migration); } diff --git a/src/lib/src/api/local/commits.rs b/src/lib/src/api/local/commits.rs index f2ad8a76a..178d428dc 100644 --- a/src/lib/src/api/local/commits.rs +++ b/src/lib/src/api/local/commits.rs @@ -428,7 +428,7 @@ pub fn construct_commit_merkle_tree( commit: &Commit, ) -> Result<(), OxenError> { let commit_writer = CommitEntryWriter::new(repo, commit)?; - commit_writer.construct_merkle_tree_new(&repo.path)?; + commit_writer.construct_merkle_tree_from_legacy_commit(&repo.path)?; Ok(()) } diff --git a/src/lib/src/api/local/repositories.rs b/src/lib/src/api/local/repositories.rs index a8f524381..8fa567705 100644 --- a/src/lib/src/api/local/repositories.rs +++ b/src/lib/src/api/local/repositories.rs @@ -380,7 +380,7 @@ mod tests { message: String::from(constants::INITIAL_COMMIT_MSG), author: String::from("Ox"), email: String::from("ox@oxen.ai"), - root_hash: "".to_string(), + root_hash: None, timestamp, }; let repo_new = RepoNew::from_root_commit(namespace, name, root_commit); @@ -544,7 +544,7 @@ mod tests { author: String::from("Ox"), email: String::from("ox@oxen.ai"), timestamp, - root_hash: "".to_string(), + root_hash: None, }; let repo_new = RepoNew::from_root_commit(old_namespace, name, root_commit); let _repo = api::local::repositories::create(sync_dir, repo_new)?; diff --git a/src/lib/src/command/migrate.rs b/src/lib/src/command/migrate.rs index a45c2d74a..58dc0dc85 100644 --- a/src/lib/src/command/migrate.rs +++ b/src/lib/src/command/migrate.rs @@ -1,6 +1,6 @@ use std::path::Path; -use crate::error::OxenError; +use crate::{error::OxenError, model::LocalRepository}; pub mod create_merkle_trees; pub use create_merkle_trees::CreateMerkleTreesMigration; @@ -17,5 +17,6 @@ pub use cache_dataframe_size::CacheDataFrameSizeMigration; pub trait Migrate { fn up(&self, path: &Path, all: bool) -> Result<(), OxenError>; fn down(&self, path: &Path, all: bool) -> Result<(), OxenError>; + fn is_needed(&self, repo: &LocalRepository) -> Result; fn name(&self) -> &'static str; } diff --git a/src/lib/src/command/migrate/cache_dataframe_size.rs b/src/lib/src/command/migrate/cache_dataframe_size.rs index 765847089..692b78860 100644 --- a/src/lib/src/command/migrate/cache_dataframe_size.rs +++ b/src/lib/src/command/migrate/cache_dataframe_size.rs @@ -37,6 +37,11 @@ impl Migrate for CacheDataFrameSizeMigration { } Ok(()) } + + fn is_needed(&self, repo: &LocalRepository) -> Result { + // Server-side migration, not necessary for autodetection on client + Ok(false) + } } pub fn cache_data_frame_size_for_all_repos_up(path: &Path) -> Result<(), OxenError> { diff --git a/src/lib/src/command/migrate/create_merkle_trees.rs b/src/lib/src/command/migrate/create_merkle_trees.rs index 9b1170b92..4b1c41d02 100644 --- a/src/lib/src/command/migrate/create_merkle_trees.rs +++ b/src/lib/src/command/migrate/create_merkle_trees.rs @@ -1,16 +1,20 @@ +use rocksdb::{DBWithThreadMode, MultiThreaded}; + use super::Migrate; -use std::path::Path; +use std::path::{Path, PathBuf}; -use crate::api; -use crate::core::index::CommitReader; +use crate::core::db::{self, path_db}; +use crate::core::index::{CommitEntryWriter, CommitReader, CommitWriter}; use crate::error::OxenError; use crate::model::LocalRepository; use crate::util::progress_bar::{oxen_progress_bar, ProgressBarType}; -pub struct CreateMerkleTreesMigration {} +use crate::{api, constants}; + +pub struct CreateMerkleTreesMigration; impl Migrate for CreateMerkleTreesMigration { fn name(&self) -> &'static str { - "cache_data_frame_size" + "create_merkle_trees" } fn up(&self, path: &Path, all: bool) -> Result<(), OxenError> { if all { @@ -32,6 +36,18 @@ impl Migrate for CreateMerkleTreesMigration { } Ok(()) } + fn is_needed(&self, repo: &LocalRepository) -> Result { + let objects_dir = repo + .path + .join(constants::OXEN_HIDDEN_DIR) + .join(constants::OBJECTS_DIR); + if !objects_dir.exists() { + return Ok(true); + } + // This may need a more elaborate check for migrations that are aborted with a single repo... + // but it's too computationally expensive to parse through all the trees. + Ok(false) + } } pub fn create_merkle_trees_for_all_repos_up(path: &Path) -> Result<(), OxenError> { @@ -73,8 +89,14 @@ pub fn create_merkle_trees_up(repo: &LocalRepository) -> Result<(), OxenError> { // Get all commits in repo, then construct merkle tree for each commit let reader = CommitReader::new(repo)?; let all_commits = reader.list_all()?; + // sort these by timestamp from oldest to newest + let mut all_commits = all_commits.clone(); + all_commits.sort_by(|a, b| a.timestamp.cmp(&b.timestamp)); + let bar = oxen_progress_bar(all_commits.len() as u64, ProgressBarType::Counter); + let commit_writer = CommitWriter::new(repo)?; for commit in all_commits { + // Create the merkle tree for each commit match api::local::commits::construct_commit_merkle_tree(repo, &commit) { Ok(_) => {} Err(err) => { @@ -85,6 +107,18 @@ pub fn create_merkle_trees_up(repo: &LocalRepository) -> Result<(), OxenError> { ) } } + // Then we need to associate the root hash of the merkle tree with the commit + let mut commit_to_update = commit.clone(); + let dir_hashes_db_dir = CommitEntryWriter::commit_dir_hash_db(&repo.path, &commit.id); + let dir_hashes_db: DBWithThreadMode = + DBWithThreadMode::open_for_read_only(&db::opts::default(), &dir_hashes_db_dir, false)?; + + let root_hash: String = path_db::get_entry(&dir_hashes_db, &PathBuf::from(""))?.unwrap(); + + commit_to_update.update_root_hash(root_hash); + + commit_writer.add_commit_to_db(&commit_to_update)?; + bar.inc(1); } Ok(()) @@ -94,3 +128,7 @@ pub fn create_merkle_trees_down(_repo: &LocalRepository) -> Result<(), OxenError println!("There are no operations to be run"); Ok(()) } + +// This logic mirrors the CommitEntryReader pre 0.10.0. It is only used here to get existing commit data for migration, +// and will not work on repos created with 0.10.0 or later. +// fn legacy_list_entries() diff --git a/src/lib/src/command/migrate/propagate_schemas.rs b/src/lib/src/command/migrate/propagate_schemas.rs index d58bcbaf4..d38cfe021 100644 --- a/src/lib/src/command/migrate/propagate_schemas.rs +++ b/src/lib/src/command/migrate/propagate_schemas.rs @@ -34,6 +34,11 @@ impl Migrate for PropagateSchemasMigration { } Ok(()) } + + fn is_needed(&self, repo: &LocalRepository) -> Result { + // Server-side migration, not necessary for autodetection on client + Ok(false) + } } pub fn propagate_schemas_for_all_repos_up(path: &Path) -> Result<(), OxenError> { diff --git a/src/lib/src/command/migrate/update_version_files.rs b/src/lib/src/command/migrate/update_version_files.rs index 67adb5821..bffb3e6a1 100644 --- a/src/lib/src/command/migrate/update_version_files.rs +++ b/src/lib/src/command/migrate/update_version_files.rs @@ -4,6 +4,7 @@ use std::path::{Path, PathBuf}; use jwalk::WalkDir; +use crate::constants; use crate::constants::{HASH_FILE, VERSIONS_DIR, VERSION_FILE_NAME}; use crate::core::index::{CommitEntryReader, CommitReader}; @@ -42,6 +43,36 @@ impl Migrate for UpdateVersionFilesMigration { } Ok(()) } + + fn is_needed(&self, repo: &LocalRepository) -> Result { + let versions_dir = repo + .path + .join(constants::OXEN_HIDDEN_DIR) + .join(constants::VERSIONS_DIR); + if !versions_dir.exists() { + return Ok(false); + } + for entry in WalkDir::new(&versions_dir) { + let entry = entry?; + if entry.file_type().is_file() { + let path = entry.path(); + let filename = match path.file_name() { + Some(filename) => filename.to_string_lossy().to_string(), + None => continue, + }; + + if filename.starts_with(constants::HASH_FILE) { + continue; + } + + if filename.starts_with(constants::VERSION_FILE_NAME) { + return Ok(false); + } + return Ok(true); + } + } + Ok(false) + } } pub fn update_version_files_up(repo: &LocalRepository) -> Result<(), OxenError> { diff --git a/src/lib/src/core/index.rs b/src/lib/src/core/index.rs index f00dc09e6..1d6b483f1 100644 --- a/src/lib/src/core/index.rs +++ b/src/lib/src/core/index.rs @@ -9,6 +9,8 @@ pub mod commit_sync_status; pub mod commit_validator; pub mod commit_writer; pub mod entry_indexer; +pub mod legacy_commit_dir_entry_reader; +pub mod legacy_commit_entry_reader; pub mod merge_conflict_db_reader; pub mod merge_conflict_reader; pub mod merge_conflict_writer; @@ -43,6 +45,8 @@ pub use crate::core::index::entry_indexer::EntryIndexer; pub use crate::core::index::commit_dir_entry_reader::CommitDirEntryReader; pub use crate::core::index::commit_entry_reader::CommitEntryReader; +pub use crate::core::index::legacy_commit_dir_entry_reader::LegacyCommitDirEntryReader; +pub use crate::core::index::legacy_commit_entry_reader::LegacyCommitEntryReader; pub use crate::core::index::merge_conflict_db_reader::MergeConflictDBReader; pub use crate::core::index::merge_conflict_reader::MergeConflictReader; pub use crate::core::index::merger::Merger; diff --git a/src/lib/src/core/index/commit_dir_entry_reader.rs b/src/lib/src/core/index/commit_dir_entry_reader.rs index 23abf2797..54a467f18 100644 --- a/src/lib/src/core/index/commit_dir_entry_reader.rs +++ b/src/lib/src/core/index/commit_dir_entry_reader.rs @@ -266,6 +266,7 @@ impl CommitDirEntryReader { page: usize, page_size: usize, ) -> Result, OxenError> { + log::debug!("deleteme calling list_entry_page"); // Don't have a skip here.... let mut entries: Vec = Vec::new(); let mut entry_i = 0; @@ -273,35 +274,74 @@ impl CommitDirEntryReader { let start_page = if page == 0 { 0 } else { page - 1 }; let start_idx = start_page * page_size; + // For every vnode, get the vnode and add its children to a list + // TODO: possible optimization - these will all be sorted by path coming out, + // so if this is slow we can treat it as merging n sorted lists instead of concatenating and then sorting + + let mut file_children: Vec = Vec::new(); + for vnode_child in self.dir_object.children() { - // Get vnode entry - TODONOW: method here to get the object given a ChildObject and repo? let vnode = self.object_reader.get_vnode(vnode_child.hash())?.unwrap(); + for entry in vnode.children() { - if let TreeObjectChild::File { path, .. } = entry { - if entries.len() >= page_size { - break; - } - - log::debug!( - "considering entry {:?} with entry_i {:?} and start_idx {:?}", - entry, - entry_i, - start_idx - ); - - if entry_i >= start_idx { - // Get file object by hash - let file_object = self.object_reader.get_file(entry.hash())?.unwrap(); - // Get commit entry from file object - let entry = file_object.to_commit_entry(path, &self.commit_id); - log::debug!("adding entry to results"); - entries.push(entry); - } - entry_i += 1; + if let TreeObjectChild::File { .. } = entry { + file_children.push(entry.to_owned()); } } } + // Now sort these all by path + file_children.sort_by(|a, b| { + let a_path = a.path(); + let b_path = b.path(); + a_path.cmp(b_path) + }); + + // Apply pagination logic to the file_children list + for entry in file_children { + if entries.len() >= page_size { + break; + } + + if entry_i >= start_idx { + // Get file object by hash + let file_object = self.object_reader.get_file(entry.hash())?.unwrap(); + // Get commit entry from file object + let entry = file_object.to_commit_entry(entry.path(), &self.commit_id); + entries.push(entry); + } + entry_i += 1; + } + + // for vnode_child in self.dir_object.children() { + // // Get vnode entry - TODONOW: method here to get the object given a ChildObject and repo? + // let vnode = self.object_reader.get_vnode(vnode_child.hash())?.unwrap(); + // for entry in vnode.children() { + // if let TreeObjectChild::File { path, .. } = entry { + // if entries.len() >= page_size { + // break; + // } + + // log::debug!( + // "considering entry {:?} with entry_i {:?} and start_idx {:?}", + // entry, + // entry_i, + // start_idx + // ); + + // if entry_i >= start_idx { + // // Get file object by hash + // let file_object = self.object_reader.get_file(entry.hash())?.unwrap(); + // // Get commit entry from file object + // let entry = file_object.to_commit_entry(path, &self.commit_id); + // log::debug!("adding entry to results"); + // entries.push(entry); + // } + // entry_i += 1; + // } + // } + // } + Ok(entries) } @@ -311,45 +351,93 @@ impl CommitDirEntryReader { page_size: usize, offset: usize, ) -> Result, OxenError> { + log::debug!("deleteme calling list_entry_page_with_offset"); + + // Apply logic from above here let mut entries: Vec = Vec::new(); let start_page = if page == 0 { 0 } else { page - 1 }; let mut start_idx = start_page * page_size; let mut entry_i = 0; - log::debug!("list_entry_page_with_offset(1) page: {page}, page_size: {page_size}, offset: {offset} start_idx: {start_idx} start_page: {start_page}"); if start_idx >= offset { start_idx -= offset; } - log::debug!("list_entry_page_with_offset(2) page: {page}, page_size: {page_size}, offset: {offset} start_idx: {start_idx} start_page: {start_page}"); + + // Get all vnode chidlren + let mut file_children: Vec = Vec::new(); for vnode_child in self.dir_object.children() { - // Get vnode entry - TODONOW: method here to get the object given a ChildObject and repo? let vnode = self.object_reader.get_vnode(vnode_child.hash())?.unwrap(); + for entry in vnode.children() { - if let TreeObjectChild::File { path, .. } = entry { - if entries.len() >= page_size { - break; - } - - log::debug!( - "considering entry {:?} with entry_i {:?} and start_idx {:?}", - entry, - entry_i, - start_idx - ); - - if entry_i >= start_idx { - // Get file object by hash - let file_object = self.object_reader.get_file(entry.hash())?.unwrap(); - // Get commit entry from file object - let entry = file_object.to_commit_entry(path, &self.commit_id); - log::debug!("adding entry to results"); - entries.push(entry); - } - entry_i += 1; + if let TreeObjectChild::File { .. } = entry { + file_children.push(entry.to_owned()); } } } + + // Now sort these all by path + file_children.sort_by(|a, b| { + let a_path = a.path(); + let b_path = b.path(); + a_path.cmp(b_path) + }); + + // Apply pagination logic to the file_children list + for entry in file_children { + if entries.len() >= page_size { + break; + } + + if entry_i >= start_idx { + // Get file object by hash + let file_object = self.object_reader.get_file(entry.hash())?.unwrap(); + // Get commit entry from file object + let entry = file_object.to_commit_entry(entry.path(), &self.commit_id); + entries.push(entry); + } + entry_i += 1; + } + + // let mut entries: Vec = Vec::new(); + // let start_page = if page == 0 { 0 } else { page - 1 }; + // let mut start_idx = start_page * page_size; + // let mut entry_i = 0; + // log::debug!("list_entry_page_with_offset(1) page: {page}, page_size: {page_size}, offset: {offset} start_idx: {start_idx} start_page: {start_page}"); + + // if start_idx >= offset { + // start_idx -= offset; + // } + // log::debug!("list_entry_page_with_offset(2) page: {page}, page_size: {page_size}, offset: {offset} start_idx: {start_idx} start_page: {start_page}"); + + // for vnode_child in self.dir_object.children() { + // // Get vnode entry - TODONOW: method here to get the object given a ChildObject and repo? + // let vnode = self.object_reader.get_vnode(vnode_child.hash())?.unwrap(); + // for entry in vnode.children() { + // if let TreeObjectChild::File { path, .. } = entry { + // if entries.len() >= page_size { + // break; + // } + + // log::debug!( + // "considering entry {:?} with entry_i {:?} and start_idx {:?}", + // entry, + // entry_i, + // start_idx + // ); + + // if entry_i >= start_idx { + // // Get file object by hash + // let file_object = self.object_reader.get_file(entry.hash())?.unwrap(); + // // Get commit entry from file object + // let entry = file_object.to_commit_entry(path, &self.commit_id); + // log::debug!("adding entry to results"); + // entries.push(entry); + // } + // entry_i += 1; + // } + // } + // } Ok(entries) } } diff --git a/src/lib/src/core/index/commit_entry_reader.rs b/src/lib/src/core/index/commit_entry_reader.rs index 36a76f5ea..20b82fe84 100644 --- a/src/lib/src/core/index/commit_entry_reader.rs +++ b/src/lib/src/core/index/commit_entry_reader.rs @@ -192,7 +192,10 @@ impl CommitEntryReader { page: usize, page_size: usize, ) -> Result, OxenError> { - let entries = self.list_entries()?; + let mut entries = self.list_entries()?; + + // Entries not automatically path-sorted due to tree structure + entries.sort_by(|a, b| a.path.cmp(&b.path)); let start_page = if page == 0 { 0 } else { page - 1 }; let start_idx = start_page * page_size; @@ -212,7 +215,9 @@ impl CommitEntryReader { log::debug!("CommitEntryReader::list_directory() dir: {:?}", dir); let mut entries = vec![]; // This lists all the committed dirs - let dirs = self.list_dirs()?; + let mut dirs = self.list_dirs()?; + dirs.sort(); + for committed_dir in dirs { // Have to make sure we are in a subset of the dir (not really a tree structure) // log::debug!("CommitEntryReader::list_directory() checking committed_dir: {:?}", committed_dir); diff --git a/src/lib/src/core/index/commit_entry_writer.rs b/src/lib/src/core/index/commit_entry_writer.rs index 6f57835a8..76540f5ce 100644 --- a/src/lib/src/core/index/commit_entry_writer.rs +++ b/src/lib/src/core/index/commit_entry_writer.rs @@ -5,7 +5,9 @@ use crate::constants::{ use crate::core::db; use crate::core::db::path_db; use crate::core::db::tree_db::{TreeObject, TreeObjectChild, TreeObjectChildWithStatus}; -use crate::core::index::{CommitDirEntryWriter, RefWriter, SchemaReader, SchemaWriter}; +use crate::core::index::{ + CommitDirEntryWriter, LegacyCommitDirEntryReader, RefWriter, SchemaReader, SchemaWriter, +}; use crate::error::OxenError; use crate::model::{ @@ -336,7 +338,7 @@ impl CommitEntryWriter { origin_path: &Path, ) -> Result<(), OxenError> { if self.commit.parent_ids.is_empty() { - self.construct_merkle_tree_new(origin_path) + self.construct_merkle_tree_empty(origin_path) } else { self.construct_merkle_tree_from_parent(staged_data, origin_path) } @@ -451,7 +453,10 @@ impl CommitEntryWriter { Ok(()) } - pub fn construct_merkle_tree_new(&self, _origin_path: &Path) -> Result<(), OxenError> { + pub fn construct_merkle_tree_from_legacy_commit( + &self, + _origin_path: &Path, + ) -> Result<(), OxenError> { log::debug!("constructing new merkle tree"); // Operate on all dirs to make the tree from scratch... let mut dir_paths = path_db::list_paths(&self.dir_db, &PathBuf::from(""))?; @@ -466,14 +471,8 @@ impl CommitEntryWriter { children: Vec::new(), hash: util::hasher::compute_children_hash(&Vec::new()), }; - // Do we need all three of these really? path_db::put(&self.dirs_db, root_node.hash(), &root_node)?; path_db::put(&self.dir_hashes_db, PathBuf::from(""), &root_node.hash())?; - // path_db::put( - // &self.temp_commit_hashes_db, - // &self.commit.id, - // &root_node.hash(), - // )?; return Ok(()); } @@ -494,8 +493,19 @@ impl CommitEntryWriter { let _root_hash: String = path_db::get_entry(&self.dir_hashes_db, PathBuf::from(""))?.unwrap(); - // Insert into the commit hashes db - // path_db::put(&self.temp_commit_hashes_db, &self.commit.id, &root_hash)?; + Ok(()) + } + + pub fn construct_merkle_tree_empty(&self, _origin_path: &Path) -> Result<(), OxenError> { + // Initial commits will never have entries - just need to populate the root node + + let empty_root = TreeObject::Dir { + children: Vec::new(), + hash: util::hasher::compute_children_hash(&Vec::new()), + }; + + path_db::put(&self.dirs_db, empty_root.hash(), &empty_root)?; + path_db::put(&self.dir_hashes_db, PathBuf::from(""), &empty_root.hash())?; Ok(()) } @@ -794,9 +804,12 @@ impl CommitEntryWriter { for dir in dirs { log::debug!("new merkle constructor processing dir {:?}", dir); let file_child_objs = self.write_file_objects_for_dir(dir.to_path_buf())?; + log::debug!("got file_child_objs {:?}", file_child_objs); let schema_child_objs = self.write_schema_objects_for_dir(dir.to_path_buf(), &schema_map)?; + log::debug!("got schema_child_objs {:?}", schema_child_objs); let dir_child_objs = self.gather_dir_children_for_dir(dir.to_path_buf(), &dir_map)?; + log::debug!("got dir_child_objs {:?}", dir_child_objs); let mut all_children: Vec = file_child_objs; all_children.extend(schema_child_objs); @@ -899,9 +912,8 @@ impl CommitEntryWriter { fn write_file_objects_for_dir(&self, dir: PathBuf) -> Result, OxenError> { log::debug!("in write file objects from dir for dir {:?}", dir); - let object_reader = ObjectDBReader::new(&self.repository)?; let dir_entry_reader = - CommitDirEntryReader::new(&self.repository, &self.commit.id, &dir, object_reader)?; + LegacyCommitDirEntryReader::new(&self.repository, &self.commit.id, &dir)?; // Get all file children let files = dir_entry_reader.list_entries()?; @@ -1003,12 +1015,10 @@ impl CommitEntryWriter { std::fs::metadata(&temp_db_path).is_ok() ); - log::debug!("about to open the db"); let opts = db::opts::default(); let temp_tree_db: DBWithThreadMode = DBWithThreadMode::open(&opts, dunce::simplified(&temp_db_path))?; - log::debug!("successfully opened the db"); - let commit_hash: &String = &self.commit.root_hash; + let commit_hash: &String = &self.commit.root_hash.clone().unwrap(); let root_dir_node: TreeObject = path_db::get_entry(&self.dirs_db, commit_hash)?.unwrap(); // TODONOW: error handling here @@ -1074,7 +1084,7 @@ impl CommitEntryWriter { pub fn new_temp_print_tree_db(&self) -> Result<(), OxenError> { // Get the hash of this commit - let commit_hash = &self.commit.root_hash; + let commit_hash = &self.commit.root_hash.clone().unwrap(); // let commit_hash: String = path_db::get_entry(&self.dirs_, &self.commit.id)?.unwrap(); // Get the root dir node (this hash) diff --git a/src/lib/src/core/index/commit_validator.rs b/src/lib/src/core/index/commit_validator.rs index 74e1db547..e8698f869 100644 --- a/src/lib/src/core/index/commit_validator.rs +++ b/src/lib/src/core/index/commit_validator.rs @@ -92,7 +92,7 @@ fn validate_complete_merkle_tree( commit: &Commit, ) -> Result { let object_reader = ObjectDBReader::new(repository)?; - let root_hash = commit.root_hash.clone(); + let root_hash = commit.root_hash.clone().unwrap(); log::debug!("got root_hash {:?}", root_hash); let root_node = object_reader.get_dir(&root_hash)?.unwrap(); for child in root_node.children() { @@ -211,8 +211,8 @@ fn validate_changed_parts_of_merkle_tree( parent: &Commit, ) -> Result { let object_reader = ObjectDBReader::new(repository)?; - let root_hash = commit.root_hash.clone(); - let parent_root_hash = parent.root_hash.clone(); + let root_hash = commit.root_hash.clone().unwrap(); + let parent_root_hash = parent.root_hash.clone().unwrap(); let root_node = object_reader.get_dir(&root_hash)?.unwrap(); let parent_root_node = object_reader.get_dir(&parent_root_hash)?.unwrap(); diff --git a/src/lib/src/core/index/legacy_commit_dir_entry_reader.rs b/src/lib/src/core/index/legacy_commit_dir_entry_reader.rs new file mode 100644 index 000000000..d803c1186 --- /dev/null +++ b/src/lib/src/core/index/legacy_commit_dir_entry_reader.rs @@ -0,0 +1,83 @@ +use crate::constants::{FILES_DIR, HISTORY_DIR}; +use crate::core::db; +use crate::core::db::path_db; +use crate::error::OxenError; +use crate::model::{CommitEntry, LocalRepository}; +use crate::util; + +use rocksdb::{DBWithThreadMode, MultiThreaded}; +use std::path::{Path, PathBuf}; +use std::str; +/// # CommitDirEntryReader +/// We keep a list of all the committed files in a subdirectory directory for fast lookup +pub struct LegacyCommitDirEntryReader { + db: DBWithThreadMode, + dir: PathBuf, +} + +impl LegacyCommitDirEntryReader { + /// .oxen/history/commit_id/files/path/to/dir + pub fn db_dir(base_path: &Path, commit_id: &str, dir: &Path) -> PathBuf { + if dir == Path::new("") { + return util::fs::oxen_hidden_dir(base_path) + .join(HISTORY_DIR) + .join(commit_id) + .join(FILES_DIR); + } + + util::fs::oxen_hidden_dir(base_path) + .join(HISTORY_DIR) + .join(commit_id) + .join(FILES_DIR) + .join(dir) + } + + pub fn db_exists(base_path: &Path, commit_id: &str, dir: &Path) -> bool { + // Must check the CURRENT file since the .oxen/history/COMMIT_ID/files/ path + // may have already been created by a deeper object + let db_path = LegacyCommitDirEntryReader::db_dir(base_path, commit_id, dir); + db_path.join("CURRENT").exists() + } + + /// # Create new commit dir + /// Contains all the committed files within that dir, for faster filtering per dir + pub fn new( + repository: &LocalRepository, + commit_id: &str, + dir: &Path, + ) -> Result { + LegacyCommitDirEntryReader::new_from_path(&repository.path, commit_id, dir) + } + + pub fn new_from_path( + base_path: &Path, + commit_id: &str, + dir: &Path, + ) -> Result { + let db_path = LegacyCommitDirEntryReader::db_dir(base_path, commit_id, dir); + log::debug!( + "CommitDirEntryReader::new() dir {:?} db_path {:?}", + dir, + db_path + ); + + let opts = db::opts::default(); + if !LegacyCommitDirEntryReader::db_exists(base_path, commit_id, dir) { + if let Err(err) = std::fs::create_dir_all(&db_path) { + log::error!("CommitDirEntryReader could not create dir {db_path:?}\nErr: {err:?}"); + } + // open it then lose scope to close it + let _db: DBWithThreadMode = + DBWithThreadMode::open(&opts, dunce::simplified(&db_path))?; + } + + Ok(LegacyCommitDirEntryReader { + db: DBWithThreadMode::open_for_read_only(&opts, &db_path, true)?, + dir: dir.to_owned(), + }) + } + + pub fn list_entries(&self) -> Result, OxenError> { + path_db::list_entries(&self.db) + } +} diff --git a/src/lib/src/core/index/legacy_commit_entry_reader.rs b/src/lib/src/core/index/legacy_commit_entry_reader.rs new file mode 100644 index 000000000..8a74e55e2 --- /dev/null +++ b/src/lib/src/core/index/legacy_commit_entry_reader.rs @@ -0,0 +1,91 @@ +// Only necessary for facilitating migrations from old commit storage formats to new ones +use crate::constants::{DIRS_DIR, HISTORY_DIR}; +use crate::core::db; +use crate::core::index::LegacyCommitDirEntryReader; +use crate::error::OxenError; +use crate::model::{Commit, CommitEntry}; +use crate::util; + +use rocksdb::{DBWithThreadMode, MultiThreaded}; +use std::path::{Path, PathBuf}; + +use crate::core::db::path_db; +use crate::model::LocalRepository; + +pub struct LegacyCommitEntryReader { + base_path: PathBuf, + dir_db: DBWithThreadMode, + pub commit_id: String, +} + +impl LegacyCommitEntryReader { + pub fn db_path(base_path: impl AsRef, commit_id: &str) -> PathBuf { + util::fs::oxen_hidden_dir(&base_path) + .join(HISTORY_DIR) + .join(commit_id) + .join(DIRS_DIR) + } + + pub fn new( + repository: &LocalRepository, + commit: &Commit, + ) -> Result { + log::debug!("CommitEntryReader::new() commit_id: {}", commit.id); + LegacyCommitEntryReader::new_from_commit_id(repository, &commit.id) + } + + pub fn new_from_commit_id( + repository: &LocalRepository, + commit_id: &str, + ) -> Result { + LegacyCommitEntryReader::new_from_path(&repository.path, commit_id) + } + + pub fn new_from_path( + base_path: impl AsRef, + commit_id: &str, + ) -> Result { + let path = Self::db_path(&base_path, commit_id); + let opts = db::opts::default(); + log::debug!( + "CommitEntryReader::new_from_commit_id() commit_id: {} path: {:?}", + commit_id, + path + ); + + if !path.exists() { + std::fs::create_dir_all(&path)?; + // open it then lose scope to close it + let _db: DBWithThreadMode = + DBWithThreadMode::open(&opts, dunce::simplified(&path))?; + } + + Ok(LegacyCommitEntryReader { + base_path: base_path.as_ref().to_owned(), + dir_db: DBWithThreadMode::open_for_read_only(&opts, &path, true)?, + commit_id: commit_id.to_owned(), + }) + } + + /// Lists all the directories in the commit + pub fn list_dirs(&self) -> Result, OxenError> { + let root = PathBuf::from(""); + let mut paths = path_db::list_paths(&self.dir_db, &root)?; + if !paths.contains(&root) { + paths.push(root); + } + paths.sort(); + Ok(paths) + } + + pub fn list_entries(&self) -> Result, OxenError> { + let mut paths: Vec = vec![]; + for dir in self.list_dirs()? { + let commit_dir = + LegacyCommitDirEntryReader::new_from_path(&self.base_path, &self.commit_id, &dir)?; + let mut files = commit_dir.list_entries()?; + paths.append(&mut files); + } + Ok(paths) + } +} diff --git a/src/lib/src/model/commit.rs b/src/lib/src/model/commit.rs index e92f856e5..92e2a0f27 100644 --- a/src/lib/src/model/commit.rs +++ b/src/lib/src/model/commit.rs @@ -46,7 +46,7 @@ pub struct Commit { pub message: String, pub author: String, pub email: String, - pub root_hash: String, + pub root_hash: Option, // Option for now to facilciate migration from older stored commits #[serde(with = "time::serde::rfc3339")] pub timestamp: OffsetDateTime, } @@ -65,7 +65,7 @@ pub struct CommitWithSize { pub message: String, pub author: String, pub email: String, - pub root_hash: String, + pub root_hash: Option, #[serde(with = "time::serde::rfc3339")] pub timestamp: OffsetDateTime, pub size: u64, @@ -79,7 +79,7 @@ pub struct CommitWithBranchName { pub message: String, pub author: String, pub email: String, - pub root_hash: String, + pub root_hash: Option, #[serde(with = "time::serde::rfc3339")] pub timestamp: OffsetDateTime, pub size: u64, @@ -110,14 +110,14 @@ impl Commit { author: new_commit.author.to_owned(), email: new_commit.email.to_owned(), timestamp: new_commit.timestamp.to_owned(), - root_hash: "".to_string(), + root_hash: None, } } // TODONOW - change the above initialization - should be a param - etc. pub fn update_root_hash(&mut self, root_hash: String) { - self.root_hash = root_hash; + self.root_hash = Some(root_hash); } pub fn from_with_size(commit: &CommitWithSize) -> Commit { diff --git a/src/server/src/controllers/repositories.rs b/src/server/src/controllers/repositories.rs index 36d74eec8..0da15c1b3 100644 --- a/src/server/src/controllers/repositories.rs +++ b/src/server/src/controllers/repositories.rs @@ -371,7 +371,7 @@ mod tests { author: String::from("Ox"), email: String::from("ox@oxen.ai"), timestamp, - root_hash: String::from(""), + root_hash: None, }; let repo_new = RepoNew::from_root_commit("Testing-Name", "Testing-Namespace", root_commit); let data = serde_json::to_string(&repo_new)?;