Skip to content

Commit

Permalink
fix schema reader constructor
Browse files Browse the repository at this point in the history
  • Loading branch information
EloyMartinez committed Aug 28, 2024
1 parent 0018afe commit 8631676
Show file tree
Hide file tree
Showing 11 changed files with 57 additions and 55 deletions.
6 changes: 3 additions & 3 deletions src/lib/src/api/local/entries.rs
Original file line number Diff line number Diff line change
Expand Up @@ -645,8 +645,8 @@ pub fn read_unsynced_schemas(
last_commit: &Commit,
this_commit: &Commit,
) -> Result<Vec<SchemaEntry>, OxenError> {
let this_schema_reader = SchemaReader::new(local_repo, &this_commit.id, None)?;
let last_schema_reader = SchemaReader::new(local_repo, &last_commit.id, None)?;
let this_schema_reader = SchemaReader::new(local_repo, &this_commit.id)?;
let last_schema_reader = SchemaReader::new(local_repo, &last_commit.id)?;

let this_schemas = this_schema_reader.list_schema_entries()?;
let last_schemas = last_schema_reader.list_schema_entries()?;
Expand Down Expand Up @@ -676,7 +676,7 @@ pub fn list_tabular_files_in_repo(
local_repo: &LocalRepository,
commit: &Commit,
) -> Result<Vec<MetadataEntry>, OxenError> {
let schema_reader = core::index::SchemaReader::new(local_repo, &commit.id, None)?;
let schema_reader = core::index::SchemaReader::new(local_repo, &commit.id)?;
let schemas = schema_reader.list_schemas()?;

let mut meta_entries: Vec<MetadataEntry> = vec![];
Expand Down
10 changes: 5 additions & 5 deletions src/lib/src/api/local/schemas.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,14 +14,14 @@ pub fn list(
if let Some(commit_id) = commit_id {
if let Some(commit) = api::local::commits::commit_from_branch_or_commit_id(repo, commit_id)?
{
let schema_reader = SchemaReader::new(repo, &commit.id, None)?;
let schema_reader = SchemaReader::new(repo, &commit.id)?;
schema_reader.list_schemas()
} else {
Err(OxenError::commit_id_does_not_exist(commit_id))
}
} else {
let head_commit = api::local::commits::head_commit(repo)?;
let schema_reader = SchemaReader::new(repo, &head_commit.id, None)?;
let schema_reader = SchemaReader::new(repo, &head_commit.id)?;
schema_reader.list_schemas()
}
}
Expand All @@ -34,7 +34,7 @@ pub fn list_from_ref(
let revision = revision.as_ref();
log::debug!("list_from_ref() listing for ref: {:?}", schema_ref.as_ref());
if let Some(commit) = api::local::revisions::get(repo, revision)? {
let schema_reader = SchemaReader::new(repo, &commit.id, None)?;
let schema_reader = SchemaReader::new(repo, &commit.id)?;
schema_reader.list_schemas_for_ref(schema_ref)
} else {
Err(OxenError::revision_not_found(revision.into()))
Expand All @@ -48,7 +48,7 @@ pub fn get_by_path(
let path = path.as_ref();
log::debug!("here's our repo path: {:?}", repo.path);
let commit = api::local::commits::head_commit(repo)?;
let schema_reader = SchemaReader::new(repo, &commit.id, None)?;
let schema_reader = SchemaReader::new(repo, &commit.id)?;
schema_reader.get_schema_for_file(path)
}

Expand All @@ -63,7 +63,7 @@ pub fn get_by_path_from_ref(
log::debug!("Getting schema for {:?} at revision {}", path, revision);
if let Some(commit) = api::local::revisions::get(repo, revision)? {
log::debug!("Got commit {:?} at revision {}", commit.id, revision);
let schema_reader = SchemaReader::new(repo, &commit.id, None)?;
let schema_reader = SchemaReader::new(repo, &commit.id)?;
schema_reader.get_schema_for_file(path)
} else {
Err(OxenError::revision_not_found(revision.into()))
Expand Down
2 changes: 1 addition & 1 deletion src/lib/src/command/add.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ pub fn add<P: AsRef<Path>>(repo: &LocalRepository, path: P) -> Result<(), OxenEr
let stager = Stager::new_with_merge(repo)?;
let commit = api::local::commits::head_commit(repo)?;
let reader = CommitEntryReader::new(repo, &commit)?;
let schema_reader = SchemaReader::new(repo, &commit.id, None)?;
let schema_reader = SchemaReader::new(repo, &commit.id)?;
let ignore = oxenignore::create(repo);
log::debug!("---START--- oxen add: {:?}", path.as_ref());

Expand Down
2 changes: 1 addition & 1 deletion src/lib/src/core/index/commit_writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -832,7 +832,7 @@ mod tests {
// Create committer with no commits
let repo_path = &repo.path;
let entry_reader = CommitEntryReader::new_from_head(&repo)?;
let schema_reader = SchemaReader::new_from_head(&repo, None)?;
let schema_reader = SchemaReader::new_from_head(&repo)?;
let commit_writer = CommitWriter::new(&repo)?;

let train_dir = repo_path.join("training_data");
Expand Down
2 changes: 1 addition & 1 deletion src/lib/src/core/index/entry_indexer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -564,7 +564,7 @@ impl EntryIndexer {
commit: &Commit,
mut limit: usize,
) -> Result<Vec<SchemaEntry>, OxenError> {
let schema_reader = SchemaReader::new(&self.repository, &commit.id, None)?;
let schema_reader = SchemaReader::new(&self.repository, &commit.id)?;
let schemas = schema_reader.list_schema_entries()?;

if limit == 0 {
Expand Down
4 changes: 2 additions & 2 deletions src/lib/src/core/index/merger.rs
Original file line number Diff line number Diff line change
Expand Up @@ -396,7 +396,7 @@ impl Merger {
let stager = Stager::new(repo)?;
let commit = api::local::commits::head_commit(repo)?;
let reader = CommitEntryReader::new(repo, &commit)?;
let schema_reader = SchemaReader::new(repo, &commit.id, None)?;
let schema_reader = SchemaReader::new(repo, &commit.id)?;
let ignore = oxenignore::create(repo);
stager.add(&repo.path, &reader, &schema_reader, &ignore)?;

Expand Down Expand Up @@ -432,7 +432,7 @@ impl Merger {
let stager = Stager::new(repo)?;
let commit = api::local::commits::head_commit(repo)?;
let reader = CommitEntryReader::new(repo, &commit)?;
let schema_reader = SchemaReader::new(repo, &commit.id, None)?;
let schema_reader = SchemaReader::new(repo, &commit.id)?;
let ignore = oxenignore::create(repo);
stager.add(&repo.path, &reader, &schema_reader, &ignore)?;

Expand Down
48 changes: 25 additions & 23 deletions src/lib/src/core/index/schema_reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,36 +17,38 @@ pub enum SchemaReader {
}

impl SchemaReader {
pub fn new(
pub fn new(repository: &LocalRepository, commit_id: &str) -> Result<Self, OxenError> {
Ok(SchemaReader::Objects(ObjectsSchemaReader::new(
repository, commit_id,
)?))
}

pub fn new_from_workspace(
repository: &LocalRepository,
commit_id: &str,
workspace: Option<&Workspace>,
workspace: &Workspace,
) -> Result<Self, OxenError> {
match workspace {
Some(workspace) => Ok(SchemaReader::DuckDB(DuckDBSchemaReader::new(
repository,
commit_id,
workspace.clone(),
)?)),
None => Ok(SchemaReader::Objects(ObjectsSchemaReader::new(
repository, commit_id,
)?)),
}
Ok(SchemaReader::DuckDB(DuckDBSchemaReader::new(
repository,
commit_id,
workspace.clone(),
)?))
}

pub fn new_from_head(
pub fn new_from_head(repository: &LocalRepository) -> Result<Self, OxenError> {
Ok(SchemaReader::Objects(ObjectsSchemaReader::new_from_head(
repository,
)?))
}

pub fn new_from_head_with_workspace(
repository: &LocalRepository,
workspace: Option<&Workspace>,
workspace: &Workspace,
) -> Result<Self, OxenError> {
match workspace {
Some(workspace) => Ok(SchemaReader::DuckDB(DuckDBSchemaReader::new_from_head(
repository,
workspace.clone(),
)?)),
None => Ok(SchemaReader::Objects(ObjectsSchemaReader::new_from_head(
repository,
)?)),
}
Ok(SchemaReader::DuckDB(DuckDBSchemaReader::new_from_head(
repository,
workspace.clone(),
)?))
}

pub fn get_schema_for_file<P: AsRef<Path>>(
Expand Down
32 changes: 16 additions & 16 deletions src/lib/src/core/index/stager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -785,7 +785,7 @@ impl Stager {
// Also add as removed to staged schema db
if util::fs::is_tabular(path) {
// Get schema for this file
let schema_reader = SchemaReader::new(&self.repository, &entry.commit_id, None)?;
let schema_reader = SchemaReader::new(&self.repository, &entry.commit_id)?;
let schema = schema_reader.get_schema_for_file(path)?;
if let Some(schema) = schema {
let staged_schema = StagedSchema {
Expand Down Expand Up @@ -1015,7 +1015,7 @@ impl Stager {
// Add all untracked files and modified files
let (dir_paths, total) = self.list_unstaged_files_in_dir(dir);
// log::debug!("Stager.add_dir {:?} -> {}", dir, total);
let schema_reader = SchemaReader::new(&self.repository, &entry_reader.commit_id, None)?;
let schema_reader = SchemaReader::new(&self.repository, &entry_reader.commit_id)?;
// println!("Adding files in directory: {short_path:?}");
let size: u64 = unsafe { std::mem::transmute(total) };
let msg = format!("Adding directory {short_path:?}");
Expand Down Expand Up @@ -1674,7 +1674,7 @@ mod tests {
let commit_reader = CommitReader::new(&repo)?;
let commit = commit_reader.head_commit()?;
let entry_reader = CommitEntryReader::new(&stager.repository, &commit)?;
let schema_reader = SchemaReader::new(&stager.repository, &commit.id, None)?;
let schema_reader = SchemaReader::new(&stager.repository, &commit.id)?;

let repo_path = &stager.repository.path;
let hello_file = test::add_txt_file_to_dir(repo_path, "Hello World")?;
Expand Down Expand Up @@ -1712,7 +1712,7 @@ mod tests {
// Create entry_reader with no commits
let head_commit = CommitReader::new(&stager.repository)?.head_commit()?;
let entry_reader = CommitEntryReader::new_from_head(&stager.repository)?;
let schema_reader = SchemaReader::new(&stager.repository, &head_commit.id, None)?;
let schema_reader = SchemaReader::new(&stager.repository, &head_commit.id)?;
// Make sure we have a valid file
let repo_path = &stager.repository.path;
let hello_file = test::add_txt_file_to_dir(repo_path, "Hello World")?;
Expand All @@ -1735,7 +1735,7 @@ mod tests {
// Create entry_reader with no commits
let head_commit = CommitReader::new(&stager.repository)?.head_commit()?;
let entry_reader = CommitEntryReader::new_from_head(&stager.repository)?;
let schema_reader = SchemaReader::new(&stager.repository, &head_commit.id, None)?;
let schema_reader = SchemaReader::new(&stager.repository, &head_commit.id)?;
// Make sure we have a valid file
let repo_path = &stager.repository.path;
let hello_file = test::add_txt_file_to_dir(repo_path, "Hello World")?;
Expand All @@ -1750,7 +1750,7 @@ mod tests {
stager.unstage()?;

// try to add it again
let schema_reader = SchemaReader::new(&stager.repository, &commit.id, None)?;
let schema_reader = SchemaReader::new(&stager.repository, &commit.id)?;
let entry_reader = CommitEntryReader::new(&repo, &commit)?;
stager.add_file(&hello_file, &entry_reader, &schema_reader)?;

Expand All @@ -1767,7 +1767,7 @@ mod tests {
test::run_empty_stager_test(|stager, _repo| {
// Create entry_reader with no commits
let entry_reader = CommitEntryReader::new_from_head(&stager.repository)?;
let schema_reader = SchemaReader::new_from_head(&stager.repository, None)?;
let schema_reader = SchemaReader::new_from_head(&stager.repository)?;
let hello_file = PathBuf::from("non-existant.txt");
if stager
.add_file(&hello_file, &entry_reader, &schema_reader)
Expand All @@ -1786,7 +1786,7 @@ mod tests {
test::run_empty_stager_test(|stager, _repo| {
// Create entry_reader with no commits
let entry_reader = CommitEntryReader::new_from_head(&stager.repository)?;
let schema_reader = SchemaReader::new_from_head(&stager.repository, None)?;
let schema_reader = SchemaReader::new_from_head(&stager.repository)?;
let hello_file = test::add_txt_file_to_dir(&stager.repository.path, "Hello 1")?;
stager.add_file(&hello_file, &entry_reader, &schema_reader)?;

Expand Down Expand Up @@ -1829,7 +1829,7 @@ mod tests {
test::run_empty_stager_test(|stager, _repo| {
// Create entry_reader with no commits
let entry_reader = CommitEntryReader::new_from_head(&stager.repository)?;
let schema_reader = SchemaReader::new_from_head(&stager.repository, None)?;
let schema_reader = SchemaReader::new_from_head(&stager.repository)?;
// Write two files to directories
let repo_path = &stager.repository.path;
let sub_dir = repo_path.join("training_data").join("deeper");
Expand Down Expand Up @@ -1876,7 +1876,7 @@ mod tests {
test::run_empty_stager_test(|stager, _repo| {
// Create entry_reader with no commits
let entry_reader = CommitEntryReader::new_from_head(&stager.repository)?;
let schema_reader = SchemaReader::new_from_head(&stager.repository, None)?;
let schema_reader = SchemaReader::new_from_head(&stager.repository)?;

let repo_path = &stager.repository.path;
let hello_file = test::add_txt_file_to_dir(repo_path, "Hello World")?;
Expand All @@ -1899,7 +1899,7 @@ mod tests {
test::run_empty_stager_test(|stager, _repo| {
// Create entry_reader with no commits
let entry_reader = CommitEntryReader::new_from_head(&stager.repository)?;
let schema_reader = SchemaReader::new_from_head(&stager.repository, None)?;
let schema_reader = SchemaReader::new_from_head(&stager.repository)?;
let repo_path = &stager.repository.path;
let hello_file = test::add_txt_file_to_dir(repo_path, "Hello World")?;
let relative_path = util::fs::path_relative_to_dir(&hello_file, repo_path)?;
Expand All @@ -1922,7 +1922,7 @@ mod tests {
test::run_empty_stager_test(|stager, _repo| {
// Create entry_reader with no commits
let entry_reader = CommitEntryReader::new_from_head(&stager.repository)?;
let schema_reader = SchemaReader::new_from_head(&stager.repository, None)?;
let schema_reader = SchemaReader::new_from_head(&stager.repository)?;
// Write two files to a sub directory
let repo_path = &stager.repository.path;
let sub_dir = repo_path.join("training_data");
Expand Down Expand Up @@ -1951,7 +1951,7 @@ mod tests {
test::run_empty_stager_test(|stager, _repo| {
// Create entry_reader with no commits
let entry_reader = CommitEntryReader::new_from_head(&stager.repository)?;
let schema_reader = SchemaReader::new_from_head(&stager.repository, None)?;
let schema_reader = SchemaReader::new_from_head(&stager.repository)?;
// Write two files to a sub directory
let repo_path = &stager.repository.path;
let training_data_dir = PathBuf::from("training_data");
Expand Down Expand Up @@ -2045,7 +2045,7 @@ mod tests {
test::run_empty_stager_test(|stager, repo| {
// Create entry_reader with no commits
let entry_reader = CommitEntryReader::new_from_head(&stager.repository)?;
let schema_reader = SchemaReader::new_from_head(&stager.repository, None)?;
let schema_reader = SchemaReader::new_from_head(&stager.repository)?;
let repo_path = &stager.repository.path;
let hello_file = test::add_txt_file_to_dir(repo_path, "Hello 1")?;

Expand Down Expand Up @@ -2136,7 +2136,7 @@ mod tests {
let commit_reader = CommitReader::new(&repo)?;
let commit = commit_reader.head_commit()?;
let entry_reader = CommitEntryReader::new(&repo, &commit)?;
let schema_reader = SchemaReader::new(&repo, &commit.id, None)?;
let schema_reader = SchemaReader::new(&repo, &commit.id)?;
// Write two files to a sub directory
let repo_path = &stager.repository.path;
let annotations_dir = PathBuf::from("annotations");
Expand Down Expand Up @@ -2290,7 +2290,7 @@ mod tests {
let commit_reader = CommitReader::new(&repo)?;
let commit = commit_reader.head_commit()?;
let entry_reader = CommitEntryReader::new(&stager.repository, &commit)?;
let schema_reader = SchemaReader::new(&stager.repository, &commit.id, None)?;
let schema_reader = SchemaReader::new(&stager.repository, &commit.id)?;

// Create 2 sub directories, one with Write two files to a sub directory
let repo_path = &stager.repository.path;
Expand Down
2 changes: 1 addition & 1 deletion src/lib/src/core/index/workspaces/files.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ pub fn add(workspace: &Workspace, filepath: &Path) -> Result<PathBuf, OxenError>
log::debug!("core::index::workspaces::files::add adding file {filepath:?}");
// Add a schema_reader to stager.add_file for?

let schema_reader = SchemaReader::new(workspace_repo, &commit.id, Some(workspace))?;
let schema_reader = SchemaReader::new_from_workspace(workspace_repo, &commit.id, workspace)?;

stager.add_file(filepath.as_ref(), &reader, &schema_reader)?;
log::debug!("done adding file in the stager");
Expand Down
2 changes: 1 addition & 1 deletion src/lib/src/core/index/workspaces/stager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,7 @@ fn list_staged_files(

pub fn list_schemas(workspace: &Workspace) -> Result<HashMap<PathBuf, StagedSchema>, OxenError> {
// schema reader to see if we already have schema metadata for the file
let schema_reader = SchemaReader::new(&workspace.base_repo, &workspace.commit.id, None)?;
let schema_reader = SchemaReader::new(&workspace.base_repo, &workspace.commit.id)?;
let db_path = schemas_db_path(workspace);
log::debug!("list_schemas from files_db_path {db_path:?}");
let opts = db::key_val::opts::default();
Expand Down
2 changes: 1 addition & 1 deletion src/server/src/controllers/branches.rs
Original file line number Diff line number Diff line change
Expand Up @@ -271,7 +271,7 @@ pub async fn list_entry_versions(
for (commit, entry) in commits_with_versions {
// For each version, get the schema hash if one exists.
let maybe_schema_hash = if util::fs::is_tabular(&entry.path) {
let schema_reader = SchemaReader::new(&repo, &commit.id, None)?;
let schema_reader = SchemaReader::new(&repo, &commit.id)?;
let maybe_schema = schema_reader.get_schema_for_file(&entry.path)?;
match maybe_schema {
Some(schema) => Some(schema.hash),
Expand Down

0 comments on commit 8631676

Please sign in to comment.