Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Revert 2 #561

Merged
merged 4 commits into from
Apr 2, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@
"arg": "^5.0.2",
"hyperdyperid": "^1.2.0",
"multibase": "^4.0.6",
"thingies": "^1.18.0"
"thingies": "^1.20.0"
},
"devDependencies": {
"@automerge/automerge": "2.1.7",
Expand Down Expand Up @@ -136,6 +136,7 @@
"lodash": "^4.17.21",
"loro-crdt": "^0.4.1",
"markdown-it": "^13.0.1",
"memfs": "^4.8.1",
"messagepack": "^1.1.12",
"msgpack-lite": "^0.1.26",
"msgpack5": "^6.0.2",
Expand Down
85 changes: 85 additions & 0 deletions src/json-crdt/history/LocalHistoryCrud.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
import {File, FileOptions} from '../file/File';
import {CborEncoder} from '../../json-pack/cbor/CborEncoder';
import type {CrudApi} from 'memfs/lib/crud/types';
import type {Locks} from 'thingies/es2020/Locks';
import type {Patch} from '../../json-crdt-patch';
import type {PatchLog} from './PatchLog';
import type {LocalHistory} from './types';

export const genId = (octets: number = 8): string => {
const uint8 = crypto.getRandomValues(new Uint8Array(octets));
let hex = '';
for (let i = 0; i < octets; i++) hex += uint8[i].toString(16).padStart(2, '0');
return hex;
};

const STATE_FILE_NAME = 'state.seq.cbor';

export class LocalHistoryCrud implements LocalHistory {
protected fileOpts: FileOptions = {
cborEncoder: new CborEncoder(),
};

constructor(
protected readonly crud: CrudApi,
protected readonly locks: Locks,
) {}

public async create(collection: string[], log: PatchLog): Promise<{id: string}> {
// TODO: Remove `log.end`, just `log` should be enough.
const file = new File(log.end, log, this.fileOpts);
const blob = file.toBinary({
format: 'seq.cbor',
model: 'binary',
});
const id = genId();
await this.lock(collection, id, async () => {
await this.crud.put([...collection, id], STATE_FILE_NAME, blob, {throwIf: 'exists'});
});
return {id};
}

public async read(collection: string[], id: string): Promise<{log: PatchLog; cursor: string}> {
const blob = await this.crud.get([...collection, id], STATE_FILE_NAME);
const {log} = File.fromSeqCbor(blob);
return {
log,
cursor: '',
};
}

public readHistory(collection: string[], id: string, cursor: string): Promise<{log: PatchLog; cursor: string}> {
throw new Error('Method not implemented.');
}

public async update(collection: string[], id: string, patches: Patch[]): Promise<void> {
await this.lock(collection, id, async () => {
const blob = await this.crud.get([...collection, id], STATE_FILE_NAME);
const {log} = File.fromSeqCbor(blob);
log.end.applyBatch(patches);
const file = new File(log.end, log, this.fileOpts);
const blob2 = file.toBinary({
format: 'seq.cbor',
model: 'binary',
});
await this.crud.put([...collection, id], STATE_FILE_NAME, blob2, {throwIf: 'missing'});
});
}

public async delete(collection: string[], id: string): Promise<void> {
await this.lock(collection, id, async () => {
await this.crud.drop(collection, true);
});
}

protected async lock(collection: string[], id: string, fn: () => Promise<void>): Promise<void> {
const key = collection.join('/') + '/' + id;
await this.locks.lock(
key,
250,
500,
)(async () => {
await fn();
});
}
}
7 changes: 4 additions & 3 deletions src/json-crdt/history/PatchLog.ts
Original file line number Diff line number Diff line change
Expand Up @@ -43,9 +43,10 @@ export class PatchLog implements Printable {
public readonly end: Model;

/**
* The patches in the log, stored in an AVL tree for efficient replaying. The
* collection of patches which are applied to the `start()` model to reach
* the `end` model.
* The collection of patches which are applied to the `start()` model to reach
* the `end` model. The patches in the log, stored in an AVL tree for
* efficient replaying. The patches are sorted by their logical timestamps
* and applied in causal order.
*
* @readonly
*/
Expand Down
72 changes: 72 additions & 0 deletions src/json-crdt/history/SessionHistory.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
import {createRace} from 'thingies/es2020/createRace';
import {FanOutUnsubscribe} from 'thingies/es2020/fanout';
import {InsValOp, Patch} from '../../json-crdt-patch';
import {ValNode} from '../nodes';
import {toSchema} from '../schema/toSchema';
import {PatchLog} from './PatchLog';
import {RedoItem, UndoItem, UndoRedoStack} from './UndoRedoStack';

class Undo implements UndoItem {
constructor(public readonly undo: () => Redo) {}
}

class Redo implements RedoItem {
constructor(public readonly redo: () => Undo) {}
}

export class SessionHistory {
constructor(public readonly log: PatchLog) {}

private readonly __onPatchRace = createRace();

public attachUndoRedo(stack: UndoRedoStack): FanOutUnsubscribe {
const onBeforePatch = (patch: Patch) => {
this.__onPatchRace(() => {
const undo = this.createUndo(patch);
stack.push(undo);
});
};
const unsubscribe = this.log.end.api.onBeforePatch.listen(onBeforePatch);
return unsubscribe;
}

public createUndo(patch: Patch): Undo {
const undoTasks: Array<() => void> = [];
const ops = patch.ops;
const length = ops.length;
for (let i = length - 1; i >= 0; i--) {
const op = ops[i];
switch (op.name()) {
case 'ins_val': {
const insOp = op as InsValOp;
const valNode = this.log.end.index.get(insOp.obj);
if (!(valNode instanceof ValNode)) throw new Error('INVALID_NODE');
const copy = toSchema(valNode.node());
const valNodeId = valNode.id;
const task = () => {
const end = this.log.end;
const valNode = end.index.get(valNodeId);
if (!valNode) return;
end.api.wrap(valNode).asVal().set(copy);
};
undoTasks.push(task);
}
}
}
const undo = new Undo(() => {
this.__onPatchRace(() => {
for (const task of undoTasks) task();
});
return new Redo(() => {
const undo = this.__onPatchRace(() => {
// TODO: This line needs to be changed:
const redoPatch = Patch.fromBinary(patch.toBinary());
this.log.end.api.builder.patch = redoPatch;
return this.createUndo(redoPatch);
});
return undo!;
});
});
return undo;
}
}
62 changes: 62 additions & 0 deletions src/json-crdt/history/__tests__/LocalHistoryCrud.spec.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
import {memfs} from 'memfs';
import {NodeCrud} from 'memfs/lib/node-to-crud';
import {Locks} from 'thingies/es2020/Locks';
import {LocalHistoryCrud} from '../LocalHistoryCrud';
import {PatchLog} from '../PatchLog';
import {Model} from '../../model';

const setup = async () => {
const {fs, vol} = memfs();
const crud = new NodeCrud({fs: fs.promises, dir: '/'});
const locks = new Locks();
const local = new LocalHistoryCrud(crud, locks);
return {
fs,
vol,
crud,
locks,
local,
};
};

test('can create a new document', async () => {
const {local} = await setup();
const model = Model.withLogicalClock();
model.api.root({
foo: 'spam',
});
const log = PatchLog.fromNewModel(model);
const {id} = await local.create(['test'], log);
expect(typeof id).toBe('string');
expect(id.length > 6).toBe(true);
const {log: log2} = await local.read(['test'], id);
expect(log2.end.view()).toStrictEqual({foo: 'spam'});
});

test('throws on non-existing document', async () => {
const {local} = await setup();
try {
await local.read(['test'], 'asdfasdf');
throw new Error('FAIL');
} catch (err) {
expect((err as Error).message).toBe('Collection /test/asdfasdf does not exist');
}
});

test('can delete a document', async () => {
const {local} = await setup();
const model = Model.withLogicalClock();
model.api.root({
foo: 'spam',
});
const log = PatchLog.fromNewModel(model);
const {id} = await local.create(['test'], log);
await local.read(['test'], id);
await local.delete(['test'], id);
try {
await local.read(['test'], id);
throw new Error('FAIL');
} catch (err) {
expect((err as Error).message).toBe(`Collection /test/${id} does not exist`);
}
});
14 changes: 8 additions & 6 deletions src/json-crdt/history/types.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import {Patch} from '../../json-crdt-patch';
import {PatchLog} from '../history/PatchLog';
import {Model} from '../model';
import type {Patch} from '../../json-crdt-patch';
import type {PatchLog} from '../history/PatchLog';
import type {Model} from '../model';

/**
* A history of patches that have been applied to a model, stored on the
Expand Down Expand Up @@ -37,9 +37,11 @@ export interface RemoteHistory<Cursor> {
}

export interface LocalHistory {
load(id: string): Promise<EditingSessionHistory>;
// loadHistory(id: string): Promise<PatchLog>;
apply(id: string, patches: Patch[]): Promise<void>;
create(collection: string[], log: PatchLog): Promise<{id: string}>;
read(collection: string[], id: string): Promise<{log: PatchLog; cursor: string}>;
readHistory(collection: string[], id: string, cursor: string): Promise<{log: PatchLog; cursor: string}>;
update(collection: string[], id: string, patches: Patch[]): Promise<void>;
delete(collection: string[], id: string): Promise<void>;
}

export interface EditingSessionHistory {
Expand Down
17 changes: 12 additions & 5 deletions yarn.lock
Original file line number Diff line number Diff line change
Expand Up @@ -3997,6 +3997,13 @@ memfs@^3.4.1, memfs@^3.4.3:
dependencies:
fs-monkey "^1.0.4"

memfs@^4.8.1:
version "4.8.1"
resolved "https://registry.yarnpkg.com/memfs/-/memfs-4.8.1.tgz#1e02c15c4397212a9a1b037fa4324c6f7dd45b47"
integrity sha512-7q/AdPzf2WpwPlPL4v1kE2KsJsHl7EF4+hAeVzlyanr2+YnR21NVn9mDqo+7DEaKDRsQy8nvxPlKH4WqMtiO0w==
dependencies:
tslib "^2.0.0"

[email protected]:
version "1.0.1"
resolved "https://registry.yarnpkg.com/merge-descriptors/-/merge-descriptors-1.0.1.tgz#b00aaa556dd8b44568150ec9d1b953f3f90cbb61"
Expand Down Expand Up @@ -5348,10 +5355,10 @@ test-exclude@^6.0.0:
glob "^7.1.4"
minimatch "^3.0.4"

thingies@^1.18.0:
version "1.18.0"
resolved "https://registry.yarnpkg.com/thingies/-/thingies-1.18.0.tgz#827141872d96f3c3c2c0b432ab0dfdb581b4b4ac"
integrity sha512-WiB26BQP0MF47Bbvbq0P19KpyfrvdTK07L8xnltobpZ/aJPmu52CBGhYjLsnFgjyawmusJ0gVkTplnnoz2hBkQ==
thingies@^1.20.0:
version "1.20.0"
resolved "https://registry.yarnpkg.com/thingies/-/thingies-1.20.0.tgz#27bf93397c39c3ff36601197e8cf78f43b7b2319"
integrity sha512-WvXY4CjHp/Uim2Ri0daqu6jkNTHJTk1H8NvuMQiOL0mgtdkqoSH5fkENy2M6XnvsLOp5iwyPcbmokoBjVb4lnQ==

thunky@^1.0.2:
version "1.1.0"
Expand Down Expand Up @@ -5453,7 +5460,7 @@ tslib@^1.13.0, tslib@^1.8.1:
resolved "https://registry.yarnpkg.com/tslib/-/tslib-1.14.1.tgz#cf2d38bdc34a134bcaf1091c41f6619e2f672d00"
integrity sha512-Xni35NKzjgMrwevysHTCArtLDpPvye8zV/0E4EyYn43P7/7qvQwPh9BGkHewbMulVntbigmcT7rdX3BNo9wRJg==

tslib@^2.0.1, tslib@^2.0.3, tslib@^2.1.0, tslib@^2.6.2:
tslib@^2.0.0, tslib@^2.0.1, tslib@^2.0.3, tslib@^2.1.0, tslib@^2.6.2:
version "2.6.2"
resolved "https://registry.yarnpkg.com/tslib/-/tslib-2.6.2.tgz#703ac29425e7b37cd6fd456e92404d46d1f3e4ae"
integrity sha512-AEYxH93jGFPn/a2iVAwW87VuUIkR1FVUKB77NwMF7nBTDkDrrT/Hpt/IrCJ0QXhW27jTBDcf5ZY7w6RiqTMw2Q==
Expand Down
Loading