Skip to content

Commit

Permalink
wip - tweaking migrations
Browse files Browse the repository at this point in the history
  • Loading branch information
benartuso committed Jan 5, 2024
1 parent 752ab31 commit c881760
Show file tree
Hide file tree
Showing 19 changed files with 505 additions and 95 deletions.
43 changes: 40 additions & 3 deletions src/cli/src/cmd_setup.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use clap::{arg, Arg, Command};
use liboxen::command::migrate::{
CacheDataFrameSizeMigration, Migrate, PropagateSchemasMigration, UpdateVersionFilesMigration,
CacheDataFrameSizeMigration, CreateMerkleTreesMigration, Migrate, PropagateSchemasMigration,
UpdateVersionFilesMigration,
};
use liboxen::constants::{DEFAULT_BRANCH_NAME, DEFAULT_REMOTE_NAME};

Expand Down Expand Up @@ -918,7 +919,25 @@ pub fn migrate() -> Command {
)
.action(clap::ArgAction::SetTrue),
),
),
)
.subcommand(
Command::new(CreateMerkleTreesMigration.name())
.about("Reformats the underlying data model into merkle trees for storage and lookup efficiency")
.arg(
Arg::new("PATH")
.help("Directory in which to apply the migration")
.required(true),
)
.arg(
Arg::new("all")
.long("all")
.short('a')
.help(
"Run the migration for all oxen repositories in this directory",
)
.action(clap::ArgAction::SetTrue),
),
)
)
.subcommand(
Command::new("down")
Expand Down Expand Up @@ -977,7 +996,25 @@ pub fn migrate() -> Command {
)
.action(clap::ArgAction::SetTrue),
),
),
)
.subcommand(
Command::new(CreateMerkleTreesMigration.name())
.about("Reformats the underlying data model into merkle trees for storage and lookup efficiency")
.arg(
Arg::new("PATH")
.help("Directory in which to apply the migration")
.required(true),
)
.arg(
Arg::new("all")
.long("all")
.short('a')
.help(
"Run the migration for all oxen repositories in this directory",
)
.action(clap::ArgAction::SetTrue),
),
)
)
}

Expand Down
20 changes: 12 additions & 8 deletions src/cli/src/dispatch.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use liboxen::api;
use liboxen::command;
use liboxen::command::migrate::CreateMerkleTreesMigration;
use liboxen::command::migrate::Migrate;
use liboxen::command::migrate::UpdateVersionFilesMigration;
use liboxen::config::{AuthConfig, UserConfig};
Expand Down Expand Up @@ -95,16 +96,19 @@ pub async fn check_remote_version_blocking(host: impl AsRef<str>) -> Result<(),
Ok(())
}

pub fn check_repo_migration_needed(_repo: &LocalRepository) -> Result<(), OxenError> {
let _migrations: Vec<Box<dyn Migrate>> = vec![Box::new(UpdateVersionFilesMigration)];
pub fn check_repo_migration_needed(repo: &LocalRepository) -> Result<(), OxenError> {
let migrations: Vec<Box<dyn Migrate>> = vec![
Box::new(UpdateVersionFilesMigration),
Box::new(CreateMerkleTreesMigration),
];

let migrations_needed: Vec<Box<dyn Migrate>> = Vec::new();
let mut migrations_needed: Vec<Box<dyn Migrate>> = Vec::new();

// for migration in migrations {
// if migration.is_needed(repo)? {
// migrations_needed.push(migration);
// }
// }
for migration in migrations {
if migration.is_needed(repo)? {
migrations_needed.push(migration);
}
}

if migrations_needed.is_empty() {
return Ok(());
Expand Down
10 changes: 9 additions & 1 deletion src/cli/src/parse_and_run.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,8 @@ use crate::cmd_setup::{ADD, COMMIT, DF, DIFF, DOWNLOAD, LOG, LS, METADATA, RESTO
use crate::dispatch;
use clap::ArgMatches;
use liboxen::command::migrate::{
CacheDataFrameSizeMigration, Migrate, PropagateSchemasMigration, UpdateVersionFilesMigration,
CacheDataFrameSizeMigration, CreateMerkleTreesMigration, Migrate, PropagateSchemasMigration,
UpdateVersionFilesMigration,
};
use liboxen::constants::{DEFAULT_BRANCH_NAME, DEFAULT_HOST, DEFAULT_REMOTE_NAME};
use liboxen::error::OxenError;
Expand Down Expand Up @@ -1083,6 +1084,13 @@ pub async fn migrate(sub_matches: &ArgMatches) {
eprintln!("Error running migration: {}", err);
std::process::exit(1);
}
} else if migration == CreateMerkleTreesMigration.name() {
if let Err(err) =
run_migration(&CreateMerkleTreesMigration, direction, sub_matches)
{
eprintln!("Error running migration: {}", err);
std::process::exit(1);
}
} else {
eprintln!("Invalid migration: {}", migration);
}
Expand Down
2 changes: 1 addition & 1 deletion src/lib/src/api/local/commits.rs
Original file line number Diff line number Diff line change
Expand Up @@ -428,7 +428,7 @@ pub fn construct_commit_merkle_tree(
commit: &Commit,
) -> Result<(), OxenError> {
let commit_writer = CommitEntryWriter::new(repo, commit)?;
commit_writer.construct_merkle_tree_new(&repo.path)?;
commit_writer.construct_merkle_tree_from_legacy_commit(&repo.path)?;
Ok(())
}

Expand Down
4 changes: 2 additions & 2 deletions src/lib/src/api/local/repositories.rs
Original file line number Diff line number Diff line change
Expand Up @@ -380,7 +380,7 @@ mod tests {
message: String::from(constants::INITIAL_COMMIT_MSG),
author: String::from("Ox"),
email: String::from("[email protected]"),
root_hash: "".to_string(),
root_hash: None,
timestamp,
};
let repo_new = RepoNew::from_root_commit(namespace, name, root_commit);
Expand Down Expand Up @@ -544,7 +544,7 @@ mod tests {
author: String::from("Ox"),
email: String::from("[email protected]"),
timestamp,
root_hash: "".to_string(),
root_hash: None,
};
let repo_new = RepoNew::from_root_commit(old_namespace, name, root_commit);
let _repo = api::local::repositories::create(sync_dir, repo_new)?;
Expand Down
3 changes: 2 additions & 1 deletion src/lib/src/command/migrate.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use std::path::Path;

use crate::error::OxenError;
use crate::{error::OxenError, model::LocalRepository};

pub mod create_merkle_trees;
pub use create_merkle_trees::CreateMerkleTreesMigration;
Expand All @@ -17,5 +17,6 @@ pub use cache_dataframe_size::CacheDataFrameSizeMigration;
pub trait Migrate {
fn up(&self, path: &Path, all: bool) -> Result<(), OxenError>;
fn down(&self, path: &Path, all: bool) -> Result<(), OxenError>;
fn is_needed(&self, repo: &LocalRepository) -> Result<bool, OxenError>;
fn name(&self) -> &'static str;
}
5 changes: 5 additions & 0 deletions src/lib/src/command/migrate/cache_dataframe_size.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,11 @@ impl Migrate for CacheDataFrameSizeMigration {
}
Ok(())
}

fn is_needed(&self, repo: &LocalRepository) -> Result<bool, OxenError> {

Check failure on line 41 in src/lib/src/command/migrate/cache_dataframe_size.rs

View workflow job for this annotation

GitHub Actions / Clippy

unused variable: `repo`

Check warning on line 41 in src/lib/src/command/migrate/cache_dataframe_size.rs

View workflow job for this annotation

GitHub Actions / Check

unused variable: `repo`
// Server-side migration, not necessary for autodetection on client
Ok(false)
}
}

pub fn cache_data_frame_size_for_all_repos_up(path: &Path) -> Result<(), OxenError> {
Expand Down
48 changes: 43 additions & 5 deletions src/lib/src/command/migrate/create_merkle_trees.rs
Original file line number Diff line number Diff line change
@@ -1,16 +1,20 @@
use rocksdb::{DBWithThreadMode, MultiThreaded};

use super::Migrate;

use std::path::Path;
use std::path::{Path, PathBuf};

use crate::api;
use crate::core::index::CommitReader;
use crate::core::db::{self, path_db};
use crate::core::index::{CommitEntryWriter, CommitReader, CommitWriter};
use crate::error::OxenError;
use crate::model::LocalRepository;
use crate::util::progress_bar::{oxen_progress_bar, ProgressBarType};
pub struct CreateMerkleTreesMigration {}
use crate::{api, constants};

pub struct CreateMerkleTreesMigration;
impl Migrate for CreateMerkleTreesMigration {
fn name(&self) -> &'static str {
"cache_data_frame_size"
"create_merkle_trees"
}
fn up(&self, path: &Path, all: bool) -> Result<(), OxenError> {
if all {
Expand All @@ -32,6 +36,18 @@ impl Migrate for CreateMerkleTreesMigration {
}
Ok(())
}
fn is_needed(&self, repo: &LocalRepository) -> Result<bool, OxenError> {
let objects_dir = repo
.path
.join(constants::OXEN_HIDDEN_DIR)
.join(constants::OBJECTS_DIR);
if !objects_dir.exists() {
return Ok(true);
}
// This may need a more elaborate check for migrations that are aborted with a single repo...
// but it's too computationally expensive to parse through all the trees.
Ok(false)
}
}

pub fn create_merkle_trees_for_all_repos_up(path: &Path) -> Result<(), OxenError> {
Expand Down Expand Up @@ -73,8 +89,14 @@ pub fn create_merkle_trees_up(repo: &LocalRepository) -> Result<(), OxenError> {
// Get all commits in repo, then construct merkle tree for each commit
let reader = CommitReader::new(repo)?;
let all_commits = reader.list_all()?;
// sort these by timestamp from oldest to newest
let mut all_commits = all_commits.clone();
all_commits.sort_by(|a, b| a.timestamp.cmp(&b.timestamp));

let bar = oxen_progress_bar(all_commits.len() as u64, ProgressBarType::Counter);
let commit_writer = CommitWriter::new(repo)?;
for commit in all_commits {
// Create the merkle tree for each commit
match api::local::commits::construct_commit_merkle_tree(repo, &commit) {
Ok(_) => {}
Err(err) => {
Expand All @@ -85,6 +107,18 @@ pub fn create_merkle_trees_up(repo: &LocalRepository) -> Result<(), OxenError> {
)
}
}
// Then we need to associate the root hash of the merkle tree with the commit
let mut commit_to_update = commit.clone();
let dir_hashes_db_dir = CommitEntryWriter::commit_dir_hash_db(&repo.path, &commit.id);
let dir_hashes_db: DBWithThreadMode<MultiThreaded> =
DBWithThreadMode::open_for_read_only(&db::opts::default(), &dir_hashes_db_dir, false)?;

let root_hash: String = path_db::get_entry(&dir_hashes_db, &PathBuf::from(""))?.unwrap();

commit_to_update.update_root_hash(root_hash);

commit_writer.add_commit_to_db(&commit_to_update)?;

bar.inc(1);
}
Ok(())
Expand All @@ -94,3 +128,7 @@ pub fn create_merkle_trees_down(_repo: &LocalRepository) -> Result<(), OxenError
println!("There are no operations to be run");
Ok(())
}

// This logic mirrors the CommitEntryReader pre 0.10.0. It is only used here to get existing commit data for migration,
// and will not work on repos created with 0.10.0 or later.
// fn legacy_list_entries()
5 changes: 5 additions & 0 deletions src/lib/src/command/migrate/propagate_schemas.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,11 @@ impl Migrate for PropagateSchemasMigration {
}
Ok(())
}

fn is_needed(&self, repo: &LocalRepository) -> Result<bool, OxenError> {

Check failure on line 38 in src/lib/src/command/migrate/propagate_schemas.rs

View workflow job for this annotation

GitHub Actions / Clippy

unused variable: `repo`

Check warning on line 38 in src/lib/src/command/migrate/propagate_schemas.rs

View workflow job for this annotation

GitHub Actions / Check

unused variable: `repo`
// Server-side migration, not necessary for autodetection on client
Ok(false)
}
}

pub fn propagate_schemas_for_all_repos_up(path: &Path) -> Result<(), OxenError> {
Expand Down
31 changes: 31 additions & 0 deletions src/lib/src/command/migrate/update_version_files.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ use std::path::{Path, PathBuf};

use jwalk::WalkDir;

use crate::constants;
use crate::constants::{HASH_FILE, VERSIONS_DIR, VERSION_FILE_NAME};

use crate::core::index::{CommitEntryReader, CommitReader};
Expand Down Expand Up @@ -42,6 +43,36 @@ impl Migrate for UpdateVersionFilesMigration {
}
Ok(())
}

fn is_needed(&self, repo: &LocalRepository) -> Result<bool, OxenError> {
let versions_dir = repo
.path
.join(constants::OXEN_HIDDEN_DIR)
.join(constants::VERSIONS_DIR);
if !versions_dir.exists() {
return Ok(false);
}
for entry in WalkDir::new(&versions_dir) {
let entry = entry?;
if entry.file_type().is_file() {
let path = entry.path();
let filename = match path.file_name() {
Some(filename) => filename.to_string_lossy().to_string(),
None => continue,
};

if filename.starts_with(constants::HASH_FILE) {
continue;
}

if filename.starts_with(constants::VERSION_FILE_NAME) {
return Ok(false);
}
return Ok(true);
}
}
Ok(false)
}
}

pub fn update_version_files_up(repo: &LocalRepository) -> Result<(), OxenError> {
Expand Down
4 changes: 4 additions & 0 deletions src/lib/src/core/index.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ pub mod commit_sync_status;
pub mod commit_validator;
pub mod commit_writer;
pub mod entry_indexer;
pub mod legacy_commit_dir_entry_reader;
pub mod legacy_commit_entry_reader;
pub mod merge_conflict_db_reader;
pub mod merge_conflict_reader;
pub mod merge_conflict_writer;
Expand Down Expand Up @@ -43,6 +45,8 @@ pub use crate::core::index::entry_indexer::EntryIndexer;

pub use crate::core::index::commit_dir_entry_reader::CommitDirEntryReader;
pub use crate::core::index::commit_entry_reader::CommitEntryReader;
pub use crate::core::index::legacy_commit_dir_entry_reader::LegacyCommitDirEntryReader;
pub use crate::core::index::legacy_commit_entry_reader::LegacyCommitEntryReader;
pub use crate::core::index::merge_conflict_db_reader::MergeConflictDBReader;
pub use crate::core::index::merge_conflict_reader::MergeConflictReader;
pub use crate::core::index::merger::Merger;
Expand Down
Loading

0 comments on commit c881760

Please sign in to comment.