Skip to content

Commit

Permalink
Add batch support to editManager (#16920)
Browse files Browse the repository at this point in the history
With the new feature allowing ops come in batch the sequence number is
share between those ops, so for disambiguate I added an indexInBatch
field to SequenceCommit.
  • Loading branch information
robertobe-ms authored Aug 22, 2023
1 parent 6ed344e commit 1359103
Show file tree
Hide file tree
Showing 3 changed files with 183 additions and 56 deletions.
188 changes: 133 additions & 55 deletions experimental/dds/tree2/src/shared-tree-core/editManager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,21 @@ import {
} from "../core";
import { createEmitter, ISubscribable } from "../events";
import { getChangeReplaceType, onForkTransitive, SharedTreeBranch } from "./branch";
import { Commit, SeqNumber, SequencedCommit, SummarySessionBranch } from "./editManagerFormat";
import {
Commit,
SeqNumber,
SequenceId,
sequenceIdComparator,
equalSequenceIds,
minSequenceId,
SequencedCommit,
SummarySessionBranch,
} from "./editManagerFormat";

export const minimumPossibleSequenceNumber: SeqNumber = brand(Number.MIN_SAFE_INTEGER);
const minimumPossibleSequenceId: SequenceId = {
sequenceNumber: minimumPossibleSequenceNumber,
};

export interface EditManagerEvents<TChangeset> {
/**
Expand Down Expand Up @@ -58,13 +70,16 @@ export class EditManager<
*/
private readonly trunkMetadata = new Map<
RevisionTag,
{ sequenceNumber: SeqNumber; sessionId: SessionId }
{ sequenceId: SequenceId; sessionId: SessionId }
>();
/**
* A map from a sequence number to the commit in the {@link trunk} which has that sequence number.
* A map from a sequence id to the commit in the {@link trunk} which has that sequence id.
* This also includes an entry for the {@link trunkBase} which always has the lowest key in the map.
*/
private readonly sequenceMap = new BTree<SeqNumber, GraphCommit<TChangeset>>();
private readonly sequenceMap = new BTree<SequenceId, GraphCommit<TChangeset>>(
undefined,
sequenceIdComparator,
);

/** The {@link UndoRedoManager} associated with the trunk */
private readonly trunkUndoRedoManager: UndoRedoManager<TChangeset, TEditor>;
Expand All @@ -87,17 +102,19 @@ export class EditManager<
private readonly localBranchUndoRedoManager: UndoRedoManager<TChangeset, TEditor>;

/**
* Tracks where on the trunk all registered branches are based. Each key is the sequence number of a commit on
* Tracks where on the trunk all registered branches are based. Each key is the sequence id of a commit on
* the trunk, and the value is the set of all branches who have that commit as their common ancestor with the trunk.
*/
private readonly trunkBranches = new BTree<
SeqNumber,
SequenceId,
Set<SharedTreeBranch<TEditor, TChangeset>>
>();
>(undefined, sequenceIdComparator);

/**
* The sequence number of the newest commit on the trunk that has been received by all peers.
* Defaults to {@link minimumPossibleSequenceNumber} if no commits have been received.
*
* @remarks If there are more than one commit with the same sequence number we assume this refers to the last commit in the batch.
*/
private minimumSequenceNumber = minimumPossibleSequenceNumber;

Expand Down Expand Up @@ -136,7 +153,7 @@ export class EditManager<
revision: assertIsRevisionTag("00000000-0000-4000-8000-000000000000"),
change: changeFamily.rebaser.compose([]),
};
this.sequenceMap.set(minimumPossibleSequenceNumber, this.trunkBase);
this.sequenceMap.set(minimumPossibleSequenceId, this.trunkBase);
this.localBranchUndoRedoManager = UndoRedoManager.create(changeFamily);
this.trunkUndoRedoManager = this.localBranchUndoRedoManager.clone();
this.trunk = new SharedTreeBranch(
Expand Down Expand Up @@ -164,46 +181,46 @@ export class EditManager<
* branches remain usable even as the minimum sequence number advances.
*/
private registerBranch(branch: SharedTreeBranch<TEditor, TChangeset>): void {
const trackBranch = (b: SharedTreeBranch<TEditor, TChangeset>): SeqNumber => {
const trackBranch = (b: SharedTreeBranch<TEditor, TChangeset>): SequenceId => {
const trunkCommit =
findCommonAncestor(this.trunk.getHead(), b.getHead()) ??
fail("Expected branch to be related to trunk");
const sequenceNumber =
this.trunkMetadata.get(trunkCommit.revision)?.sequenceNumber ??
minimumPossibleSequenceNumber;
const branches = getOrCreate(this.trunkBranches, sequenceNumber, () => new Set());
const sequenceId =
this.trunkMetadata.get(trunkCommit.revision)?.sequenceId ??
minimumPossibleSequenceId;
const branches = getOrCreate(this.trunkBranches, sequenceId, () => new Set());

assert(!branches.has(b), 0x670 /* Branch was registered more than once */);
branches.add(b);
return sequenceNumber;
return sequenceId;
};

const untrackBranch = (
b: SharedTreeBranch<TEditor, TChangeset>,
sequenceNumber: SeqNumber,
sequenceId: SequenceId,
): void => {
const branches =
this.trunkBranches.get(sequenceNumber) ?? fail("Expected branch to be tracked");
this.trunkBranches.get(sequenceId) ?? fail("Expected branch to be tracked");

assert(branches.delete(b), 0x671 /* Expected branch to be tracked */);
if (branches.size === 0) {
this.trunkBranches.delete(sequenceNumber);
this.trunkBranches.delete(sequenceId);
}
};

// Record the sequence number of the branch's base commit on the trunk
const trunkBase = { sequenceNumber: trackBranch(branch) };
// Record the sequence id of the branch's base commit on the trunk
const trunkBase = { sequenceId: trackBranch(branch) };
// Whenever the branch is rebased, update our record of its base trunk commit
const offRebase = branch.on("change", (args) => {
if (args.type === "replace" && getChangeReplaceType(args) === "rebase") {
untrackBranch(branch, trunkBase.sequenceNumber);
trunkBase.sequenceNumber = trackBranch(branch);
untrackBranch(branch, trunkBase.sequenceId);
trunkBase.sequenceId = trackBranch(branch);
this.trimTrunk();
}
});
// When the branch is disposed, update our branch set and trim the trunk
const offDispose = branch.on("dispose", () => {
untrackBranch(branch, trunkBase.sequenceNumber);
untrackBranch(branch, trunkBase.sequenceId);
this.trimTrunk();
offRebase();
offDispose();
Expand All @@ -212,7 +229,9 @@ export class EditManager<

/**
* Advances the minimum sequence number, and removes all commits from the trunk which lie outside the collaboration window.
* @param minimumSequenceNumber - the sequence number of the newest commit that all peers (including this one) have received and applied to their trunks
* @param minimumSequenceNumber - the sequence number of the newest commit that all peers (including this one) have received and applied to their trunks.
*
* @remarks If there are more than one commit with the same sequence number we assume this refers to the last commit in the batch.
*/
public advanceMinimumSequenceNumber(minimumSequenceNumber: SeqNumber): void {
if (minimumSequenceNumber === this.minimumSequenceNumber) {
Expand All @@ -234,19 +253,20 @@ export class EditManager<
* @returns the number of commits that were removed from the trunk
*/
private trimTrunk(): void {
/** The sequence number of the oldest commit on the trunk that will be retained */
let trunkTailSearchKey = this.minimumSequenceNumber;
/** The sequence id of the oldest commit on the trunk that will be retained */
let trunkTailSequenceId: SequenceId = {
sequenceNumber: this.minimumSequenceNumber,
indexInBatch: Number.POSITIVE_INFINITY,
};
// If there are any outstanding registered branches, get the one that is the oldest (has the "most behind" trunk base)
const minimumBranchBaseSequenceNumber = this.trunkBranches.minKey();
if (minimumBranchBaseSequenceNumber !== undefined) {
// If that branch is behind the minimum sequence number, we only want to evict commits older than it,
// even if those commits are behind the minimum sequence number
trunkTailSearchKey = brand(
Math.min(trunkTailSearchKey, minimumBranchBaseSequenceNumber),
);
const minimumBranchBaseSequenceId = this.trunkBranches.minKey();
if (minimumBranchBaseSequenceId !== undefined) {
// If that branch is behind the minimum sequence id, we only want to evict commits older than it,
// even if those commits are behind the minimum sequence id
trunkTailSequenceId = minSequenceId(trunkTailSequenceId, minimumBranchBaseSequenceId);
}

const [sequenceNumber, latestEvicted] = this.getClosestTrunkCommit(trunkTailSearchKey);
const [sequenceId, latestEvicted] = this.getClosestTrunkCommit(trunkTailSequenceId);
// Don't do any work if the commit found by the search is already the tail of the trunk
if (latestEvicted !== this.trunkBase) {
// The minimum sequence number informs us that all peer branches are at least caught up to the tail commit,
Expand All @@ -264,6 +284,8 @@ export class EditManager<
// We mutate the most recent of the evicted commits to become the new trunk base. That way, any other commits that
// have parent pointers to the latest evicted commit will stay linked, even though that it is no longer part of the trunk.
const newTrunkBase = latestEvicted as Mutable<typeof latestEvicted>;
// The metadata for new trunk base revision needs to be deleted before modifying it.
this.trunkMetadata.delete(newTrunkBase.revision);
// Copying the revision of the old trunk base into the new trunk base means we don't need to write out the original
// revision to summaries. All clients agree that the trunk base always has the same hardcoded revision.
newTrunkBase.revision = this.trunkBase.revision;
Expand All @@ -275,15 +297,15 @@ export class EditManager<

// Update any state that is derived from trunk commits
this.sequenceMap.editRange(
minimumPossibleSequenceNumber,
sequenceNumber,
minimumPossibleSequenceId,
sequenceId,
true,
(s, { revision }) => {
// Cleanup look-aside data for each evicted commit
this.trunkMetadata.delete(revision);
this.localBranchUndoRedoManager.untrackCommitType(revision);
// Delete all evicted commits from `sequenceMap` except for the latest one, which is the new `trunkBase`
if (s === sequenceNumber) {
if (equalSequenceIds(s, sequenceId)) {
assert(
revision === newTrunkBase.revision,
0x729 /* Expected last evicted commit to be new trunk base */,
Expand All @@ -293,6 +315,16 @@ export class EditManager<
}
},
);

const trunkSize = getPathFromBase(this.trunk.getHead(), this.trunkBase).length;
assert(
this.sequenceMap.size === trunkSize + 1,
"The size of the sequenceMap must have one element more than the trunk",
);
assert(
this.trunkMetadata.size === trunkSize,
"The size of the trunkMetadata must be the same as the trunk",
);
}
}

Expand Down Expand Up @@ -325,9 +357,12 @@ export class EditManager<
const commit: SequencedCommit<TChangeset> = {
change: c.change,
revision: c.revision,
sequenceNumber: metadata.sequenceNumber,
sequenceNumber: metadata.sequenceId.sequenceNumber,
sessionId: metadata.sessionId,
};
if (metadata.sequenceId.indexInBatch !== undefined) {
commit.indexInBatch = metadata.sequenceId.indexInBatch;
}
return commit;
});

Expand Down Expand Up @@ -369,10 +404,19 @@ export class EditManager<
trunkRevisionCache.set(this.trunkBase.revision, this.trunkBase);
this.trunk.setHead(
data.trunk.reduce((base, c) => {
const sequenceId: SequenceId =
c.indexInBatch === undefined
? {
sequenceNumber: c.sequenceNumber,
}
: {
sequenceNumber: c.sequenceNumber,
indexInBatch: c.indexInBatch,
};
const commit = mintCommit(base, c);
this.sequenceMap.set(c.sequenceNumber, commit);
this.sequenceMap.set(sequenceId, commit);
this.trunkMetadata.set(c.revision, {
sequenceNumber: c.sequenceNumber,
sequenceId,
sessionId: c.sessionId,
});
trunkRevisionCache.set(c.revision, commit);
Expand Down Expand Up @@ -449,6 +493,18 @@ export class EditManager<
sequenceNumber > this.minimumSequenceNumber,
0x713 /* Expected change sequence number to exceed the last known minimum sequence number */,
);

const commitsSequenceNumber = this.getBatch(sequenceNumber);
const sequenceId: SequenceId =
commitsSequenceNumber.length === 0
? {
sequenceNumber,
}
: {
sequenceNumber,
indexInBatch: commitsSequenceNumber.length,
};

if (newCommit.sessionId === this.localSessionId) {
const [firstLocalCommit] = getPathFromBase(
this.localBranch.getHead(),
Expand All @@ -461,7 +517,7 @@ export class EditManager<

// The first local branch commit is already rebased over the trunk, so we can push it directly to the trunk.
this.pushToTrunk(
sequenceNumber,
sequenceId,
{ ...firstLocalCommit, sessionId: this.localSessionId },
true,
);
Expand All @@ -482,7 +538,7 @@ export class EditManager<

if (peerLocalBranch.getHead() === this.trunk.getHead()) {
// If the branch is fully caught up and empty after being rebased, then push to the trunk directly
this.pushToTrunk(sequenceNumber, newCommit);
this.pushToTrunk(sequenceId, newCommit);
peerLocalBranch.setHead(this.trunk.getHead());
} else {
// Otherwise, rebase the change over the trunk and append it, and append the original change to the peer branch.
Expand All @@ -494,7 +550,7 @@ export class EditManager<
);

peerLocalBranch.apply(newCommit.change, newCommit.revision);
this.pushToTrunk(sequenceNumber, {
this.pushToTrunk(sequenceId, {
...newCommit,
change: newChangeFullyRebased,
});
Expand All @@ -515,11 +571,7 @@ export class EditManager<
return [commit, commits];
}

private pushToTrunk(
sequenceNumber: SeqNumber,
commit: Commit<TChangeset>,
local = false,
): void {
private pushToTrunk(sequenceId: SequenceId, commit: Commit<TChangeset>, local = false): void {
this.trunk.setHead(mintCommit(this.trunk.getHead(), commit));
const trunkHead = this.trunk.getHead();
if (local) {
Expand All @@ -530,22 +582,48 @@ export class EditManager<
this.trunkUndoRedoManager.trackCommit(trunkHead, type);
}
this.trunk.repairDataStoreProvider?.applyChange(commit.change);
this.sequenceMap.set(sequenceNumber, trunkHead);
this.trunkMetadata.set(trunkHead.revision, { sequenceNumber, sessionId: commit.sessionId });
this.sequenceMap.set(sequenceId, trunkHead);
this.trunkMetadata.set(trunkHead.revision, { sequenceId, sessionId: commit.sessionId });
this.events.emit("newTrunkHead", trunkHead);
}

/**
* Finds the most recent trunk commit that was sequenced at or before the given sequence number.
* @param sequenceNumber - the sequence number to search for
* Finds the most recent trunk commit that was sequenced at or before the given point.
* @param searchBy - the sequence number or the sequence id to search for
* @remarks Fails if there is no eligible commit.
* @returns the closest commit and its sequence number
* @returns the closest commit and its sequence id
*/
private getClosestTrunkCommit(sequenceNumber: SeqNumber): [SeqNumber, GraphCommit<TChangeset>] {
const commit = this.sequenceMap.getPairOrNextLower(sequenceNumber);
assert(commit !== undefined, 0x72a /* Sequence number has been evicted */);
private getClosestTrunkCommit(searchBy: SeqNumber): [SequenceId, GraphCommit<TChangeset>];
private getClosestTrunkCommit(searchBy: SequenceId): [SequenceId, GraphCommit<TChangeset>];
private getClosestTrunkCommit(
searchBy: SeqNumber | SequenceId,
): [SequenceId, GraphCommit<TChangeset>] {
const sequenceId: SequenceId =
typeof searchBy === "number"
? {
// This is to make sure that the correct commit is selected in this 2 scenarios:
// 1) The commit is unique for that sequence number
// 2) There are more than one commit for the same sequence number, in this case we need to select the last one.
sequenceNumber: searchBy,
indexInBatch: Number.POSITIVE_INFINITY,
}
: searchBy;

const commit = this.sequenceMap.getPairOrNextLower(sequenceId);
assert(commit !== undefined, "sequence id has been evicted");
return commit;
}

private getBatch(sequenceNumber: SeqNumber): [SequenceId, GraphCommit<TChangeset>][] {
const startSequenceId: SequenceId = {
sequenceNumber,
};
const endSequenceId: SequenceId = {
sequenceNumber: brand((sequenceNumber as number) + 1),
};

return this.sequenceMap.getRange(startSequenceId, endSequenceId, false);
}
}

/**
Expand Down
Loading

0 comments on commit 1359103

Please sign in to comment.