Skip to content

Commit

Permalink
repo: move resolving of concurrent operations to repo level (#111)
Browse files Browse the repository at this point in the history
It's useful for the UI layer to know that there's been concurrent
operations, so it can inform the user that that happened. It'll be
even more useful when we soon start making the resolution involve
rebasing commits, since that's even more important for the UI layer to
present to the user. This patch gets us a bit closer to that by moving
the resolution to the repo level.
  • Loading branch information
martinvonz committed Mar 24, 2022
1 parent 350d8b4 commit d7b60c9
Show file tree
Hide file tree
Showing 2 changed files with 70 additions and 44 deletions.
67 changes: 29 additions & 38 deletions lib/src/op_heads_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ use crate::lock::FileLock;
use crate::op_store::{OpStore, OperationId, OperationMetadata};
use crate::operation::Operation;
use crate::repo::RepoLoader;
use crate::transaction::UnpublishedOperation;
use crate::{dag_walk, op_store};

/// Manages the very set of current heads of the operation log. The store is
Expand All @@ -34,6 +33,17 @@ pub struct OpHeadsStore {
dir: PathBuf,
}

pub enum OpHeads {
/// There's a single latest operation. This is the normal case.
Single(Operation),
/// There are multiple latest operations, which means there has been
/// concurrent operations. These need to be resolved.
Unresolved {
locked_op_heads: LockedOpHeads,
op_heads: Vec<Operation>,
},
}

#[derive(Debug, Error, PartialEq, Eq)]
pub enum OpHeadResolutionError {
#[error("Operation log has no heads")]
Expand Down Expand Up @@ -112,10 +122,10 @@ impl OpHeadsStore {
}
}

pub fn get_single_op_head(
pub fn get_heads(
self: &Arc<Self>,
repo_loader: &RepoLoader,
) -> Result<Operation, OpHeadResolutionError> {
) -> Result<OpHeads, OpHeadResolutionError> {
let mut op_heads = self.get_op_heads();

if op_heads.is_empty() {
Expand All @@ -127,7 +137,11 @@ impl OpHeadsStore {
if op_heads.len() == 1 {
let operation_id = op_heads.pop().unwrap();
let operation = op_store.read_operation(&operation_id).unwrap();
return Ok(Operation::new(op_store.clone(), operation_id, operation));
return Ok(OpHeads::Single(Operation::new(
op_store.clone(),
operation_id,
operation,
)));
}

// There are multiple heads. We take a lock, then check if there are still
Expand All @@ -136,7 +150,7 @@ impl OpHeadsStore {
// merge all the views into one. We then write that view and a corresponding
// operation to the op-store.
// Note that the locking isn't necessary for correctness; we take the lock
// only to avoid other concurrent processes from doing the same work (and
// only to prevent other concurrent processes from doing the same work (and
// producing another set of divergent heads).
let locked_op_heads = self.lock();
let op_head_ids = self.get_op_heads();
Expand All @@ -149,7 +163,11 @@ impl OpHeadsStore {
let op_head_id = op_head_ids[0].clone();
let op_head = op_store.read_operation(&op_head_id).unwrap();
// Return early so we don't write a merge operation with a single parent
return Ok(Operation::new(op_store.clone(), op_head_id, op_head));
return Ok(OpHeads::Single(Operation::new(
op_store.clone(),
op_head_id,
op_head,
)));
}

let op_heads = op_head_ids
Expand All @@ -163,14 +181,13 @@ impl OpHeadsStore {

// Return without creating a merge operation
if op_heads.len() == 1 {
return Ok(op_heads.pop().unwrap());
return Ok(OpHeads::Single(op_heads.pop().unwrap()));
}

let merged_repo = merge_op_heads(repo_loader, op_heads).leave_unpublished();
let merge_operation = merged_repo.operation().clone();
locked_op_heads.finish(&merge_operation);
// TODO: Change the return type include the repo if we have it (as we do here)
Ok(merge_operation)
Ok(OpHeads::Unresolved {
locked_op_heads,
op_heads,
})
}

/// Removes operations in the input that are ancestors of other operations
Expand All @@ -189,29 +206,3 @@ impl OpHeadsStore {
op_heads.into_iter().collect()
}
}

fn merge_op_heads(repo_loader: &RepoLoader, mut op_heads: Vec<Operation>) -> UnpublishedOperation {
op_heads.sort_by_key(|op| op.store_operation().metadata.end_time.timestamp.clone());
let base_repo = repo_loader.load_at(&op_heads[0]);
let mut tx = base_repo.start_transaction("resolve concurrent operations");
let merged_repo = tx.mut_repo();
let neighbors_fn = |op: &Operation| op.parents();
for (i, other_op_head) in op_heads.iter().enumerate().skip(1) {
let ancestor_op = dag_walk::closest_common_node(
op_heads[0..i].to_vec(),
vec![other_op_head.clone()],
&neighbors_fn,
&|op: &Operation| op.id().clone(),
)
.unwrap();
let base_repo = repo_loader.load_at(&ancestor_op);
let other_repo = repo_loader.load_at(other_op_head);
merged_repo.merge(&base_repo, &other_repo);
}
let op_parent_ids = op_heads.iter().map(|op| op.id().clone()).collect();
tx.set_parents(op_parent_ids);
// TODO: We already have the resulting View in this case but Operation cannot
// keep it. Teach Operation to have a cached View so the caller won't have
// to re-read it from the store (by calling Operation::view())?
tx.write()
}
47 changes: 41 additions & 6 deletions lib/src/repo.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,17 +25,17 @@ use thiserror::Error;
use crate::backend::{BackendError, CommitId};
use crate::commit::Commit;
use crate::commit_builder::CommitBuilder;
use crate::dag_walk::topo_order_reverse;
use crate::dag_walk::{closest_common_node, topo_order_reverse};
use crate::index::{IndexRef, MutableIndex, ReadonlyIndex};
use crate::index_store::IndexStore;
use crate::op_heads_store::OpHeadsStore;
use crate::op_heads_store::{OpHeads, OpHeadsStore};
use crate::op_store::{BranchTarget, OpStore, OperationId, RefTarget, WorkspaceId};
use crate::operation::Operation;
use crate::rewrite::DescendantRebaser;
use crate::settings::{RepoSettings, UserSettings};
use crate::simple_op_store::SimpleOpStore;
use crate::store::Store;
use crate::transaction::Transaction;
use crate::transaction::{Transaction, UnpublishedOperation};
use crate::view::{RefName, View};
use crate::{backend, op_store};

Expand Down Expand Up @@ -332,9 +332,44 @@ impl RepoLoader {
}

pub fn load_at_head(&self) -> Arc<ReadonlyRepo> {
let op = self.op_heads_store.get_single_op_head(self).unwrap();
let view = View::new(op.view().take_store_view());
self._finish_load(op, view)
let op_heads = self.op_heads_store.get_heads(self).unwrap();
match op_heads {
OpHeads::Single(op) => {
let view = View::new(op.view().take_store_view());
self._finish_load(op, view)
}
OpHeads::Unresolved {
locked_op_heads,
op_heads,
} => {
let merged_repo = self.merge_op_heads(op_heads).leave_unpublished();
locked_op_heads.finish(merged_repo.operation());
merged_repo
}
}
}

fn merge_op_heads(&self, mut op_heads: Vec<Operation>) -> UnpublishedOperation {
op_heads.sort_by_key(|op| op.store_operation().metadata.end_time.timestamp.clone());
let base_repo = self.load_at(&op_heads[0]);
let mut tx = base_repo.start_transaction("resolve concurrent operations");
let merged_repo = tx.mut_repo();
let neighbors_fn = |op: &Operation| op.parents();
for (i, other_op_head) in op_heads.iter().enumerate().skip(1) {
let ancestor_op = closest_common_node(
op_heads[0..i].to_vec(),
vec![other_op_head.clone()],
&neighbors_fn,
&|op: &Operation| op.id().clone(),
)
.unwrap();
let base_repo = self.load_at(&ancestor_op);
let other_repo = self.load_at(other_op_head);
merged_repo.merge(&base_repo, &other_repo);
}
let op_parent_ids = op_heads.iter().map(|op| op.id().clone()).collect();
tx.set_parents(op_parent_ids);
tx.write()
}

pub fn load_at(&self, op: &Operation) -> Arc<ReadonlyRepo> {
Expand Down

0 comments on commit d7b60c9

Please sign in to comment.