Skip to content

Commit

Permalink
Add detached support to Quorum DDS and integrate into data migration …
Browse files Browse the repository at this point in the history
…demo (#10028)
  • Loading branch information
ChumpChief authored Apr 29, 2022
1 parent b530262 commit 1156196
Show file tree
Hide file tree
Showing 4 changed files with 193 additions and 28 deletions.
1 change: 1 addition & 0 deletions examples/hosts/app-integration/schema-upgrade/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
"@fluid-experimental/get-container": "^0.59.3000",
"@fluid-experimental/react-inputs": "^0.59.3000",
"@fluid-experimental/task-manager": "^0.59.3000",
"@fluid-internal/quorum": "^0.59.2000",
"@fluidframework/aqueduct": "^0.59.3000",
"@fluidframework/cell": "^0.59.3000",
"@fluidframework/common-definitions": "^0.20.1",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,13 @@
*/

import { TaskManager } from "@fluid-experimental/task-manager";
import { Quorum } from "@fluid-internal/quorum";
import { DataObject, DataObjectFactory } from "@fluidframework/aqueduct";
import { IEvent, IEventProvider } from "@fluidframework/common-definitions";
import { ConsensusRegisterCollection } from "@fluidframework/register-collection";
// import { IFluidHandle } from "@fluidframework/core-interfaces";

const quorumKey = "quorum";
const crcKey = "crc";
const taskManagerKey = "task-manager";
const markedForDestructionKey = "marked";
Expand All @@ -29,9 +31,17 @@ export interface IContainerKillBit extends IEventProvider<IContainerKillBitEvent
}

export class ContainerKillBit extends DataObject implements IContainerKillBit {
private _quorum: Quorum | undefined;
private _crc: ConsensusRegisterCollection<boolean> | undefined;
private _taskManager: TaskManager | undefined;

private get quorum() {
if (this._quorum === undefined) {
throw new Error("Couldn't retrieve the Quorum");
}
return this._quorum;
}

private get crc() {
if (this._crc === undefined) {
throw new Error("Couldn't retrieve the ConsensusRegisterCollection");
Expand All @@ -57,14 +67,31 @@ export class ContainerKillBit extends DataObject implements IContainerKillBit {
}

public get markedForDestruction() {
return this.crc.read(markedForDestructionKey) as boolean;
return this.quorum.get(markedForDestructionKey) as boolean;
}

public async markForDestruction() {
// This should probably use a quorum-type data structure here.
// Then, when everyone sees the quorum proposal get approved they can choose to either volunteer
// or close themselves
await this.crc.write(markedForDestructionKey, true);
// Early exit/resolve if already marked.
if (this.markedForDestruction) {
return;
}

// Note that the marking could come from another client (e.g. two clients try to mark simultaneously).
// Watching via the event listener will work regardless of whether our marking or a remote client's
// marking was the one that actually wrote the flag.
return new Promise<void>((resolve, reject) => {
const acceptedListener = (key: string) => {
if (key === markedForDestructionKey) {
resolve();
this.quorum.off("accepted", acceptedListener);
}
};
this.quorum.on("accepted", acceptedListener);
// Even if quorum.set() becomes a promise, this will remain fire-and-forget since we don't care
// whether our marking or a remote client's marking writes the flag (though maybe we'd do retry
// logic if a remote client rejects the local client's mark).
this.quorum.set(markedForDestructionKey, true);
});
}

public async volunteerForDestruction(): Promise<void> {
Expand All @@ -76,24 +103,42 @@ export class ContainerKillBit extends DataObject implements IContainerKillBit {
}

protected async initializingFirstTime() {
const quorum = Quorum.create(this.runtime);
const crc = ConsensusRegisterCollection.create(this.runtime);
const taskManager = TaskManager.create(this.runtime);
this.root.set(quorumKey, quorum.handle);
this.root.set(crcKey, crc.handle);
this.root.set(taskManagerKey, taskManager.handle);
await Promise.all([
crc.write(markedForDestructionKey, false),
crc.write(deadKey, false),
]);
// TODO: Update if/when .set() returns a promise.
const initialSetP = new Promise<void>((resolve) => {
const watchForInitialSet = (key: string) => {
if (key === markedForDestructionKey) {
resolve();
quorum.off("accepted", watchForInitialSet);
}
};
quorum.on("accepted", watchForInitialSet);
});
quorum.set(markedForDestructionKey, false);
await initialSetP;
await crc.write(deadKey, false);
}

protected async hasInitialized() {
const quorumHandle = this.root.get(quorumKey);
this._quorum = await quorumHandle.get();

const crcHandle = this.root.get(crcKey);
this._crc = await crcHandle.get();

this.crc.on("atomicChanged", (key) => {
this.quorum.on("accepted", (key: string) => {
if (key === markedForDestructionKey) {
this.emit("markedForDestruction");
} else if (key === deadKey) {
}
});

this.crc.on("atomicChanged", (key) => {
if (key === deadKey) {
this.emit("dead");
}
});
Expand All @@ -114,6 +159,7 @@ export const ContainerKillBitInstantiationFactory =
ContainerKillBit,
[
ConsensusRegisterCollection.getFactory(),
Quorum.getFactory(),
TaskManager.getFactory(),
],
{},
Expand Down
88 changes: 74 additions & 14 deletions packages/dds/quorum/src/quorum.ts
Original file line number Diff line number Diff line change
Expand Up @@ -187,7 +187,7 @@ export class Quorum extends SharedObject<IQuorumEvents> implements IQuorum {

this.incomingOp.on("set", this.handleIncomingSet);
this.incomingOp.on("delete", this.handleIncomingDelete);
this.incomingOp.on("accept", this.handleIncomingAcceptOp);
this.incomingOp.on("accept", this.handleIncomingAccept);

this.runtime.getQuorum().on("removeMember", this.handleQuorumRemoveMember);

Expand Down Expand Up @@ -229,7 +229,29 @@ export class Quorum extends SharedObject<IQuorumEvents> implements IQuorum {
*/
// eslint-disable-next-line @typescript-eslint/explicit-module-boundary-types
public set(key: string, value: any): void {
// TODO: handle detached scenario, just auto accept basically
const currentValue = this.values.get(key);
// Early-exit if we can't submit a valid proposal (there's already a pending proposal)
if (currentValue !== undefined && currentValue.pending !== undefined) {
return;
}

// If not attached, we basically pretend we got an ack immediately.
// TODO: Should we just directly store the value rather than the full simulation?
if (!this.isAttached()) {
// Queueing as a microtask to permit callers to complete their callstacks before the result of the set
// takes effect. This more closely resembles the pattern in the attached state, where the ack will not
// be received synchronously.
queueMicrotask(() => {
this.handleIncomingSet(
key,
value,
0 /* refSeq */,
0 /* setSequenceNumber */,
"detachedClient" /* clientId */,
);
});
return;
}

const setOp: IQuorumSetOperation = {
type: "set",
Expand All @@ -246,7 +268,28 @@ export class Quorum extends SharedObject<IQuorumEvents> implements IQuorum {
* {@inheritDoc IQuorum.delete}
*/
public delete(key: string): void {
// TODO: handle detached scenario, just auto accept basically
const currentValue = this.values.get(key);
// Early-exit if we can't submit a valid proposal (there's nothing to delete or already a pending proposal).
if (currentValue === undefined || currentValue.pending !== undefined) {
return;
}

// If not attached, we basically pretend we got an ack immediately.
// TODO: Should we just directly store the value rather than the full simulation?
if (!this.isAttached()) {
// Queueing as a microtask to permit callers to complete their callstacks before the result of the delete
// takes effect. This more closely resembles the pattern in the attached state, where the ack will not
// be received synchronously.
queueMicrotask(() => {
this.handleIncomingDelete(
key,
0 /* refSeq */,
0 /* setSequenceNumber */,
"detachedClient" /* clientId */,
);
});
return;
}

const deleteOp: IQuorumDeleteOperation = {
type: "delete",
Expand All @@ -257,6 +300,19 @@ export class Quorum extends SharedObject<IQuorumEvents> implements IQuorum {
this.submitLocalMessage(deleteOp);
}

/**
* Get a point-in-time list of clients who must sign off on values coming in for them to move from "pending" to
* "accepted" state. This list is finalized for a value at the moment it goes pending (i.e. if more clients
* join later, they are not added to the list of signoffs).
* @returns The list of clientIds for clients who must sign off to accept the incoming pending value
*/
private getSignoffClients(): string[] {
// If detached, we don't need anyone to sign off. Otherwise, we need all currently connected clients.
return this.isAttached()
? [...this.runtime.getQuorum().getMembers().keys()]
: [];
}

private readonly handleIncomingSet = (
key: string,
value: any,
Expand All @@ -265,8 +321,9 @@ export class Quorum extends SharedObject<IQuorumEvents> implements IQuorum {
clientId: string,
): void => {
const currentValue = this.values.get(key);
// A proposal is valid if the value is unknown or if it was made with knowledge of the most recently accepted
// value. We'll drop invalid proposals on the ground.
// We use a consensus-like approach here, so a proposal is valid if the value is unset or if there is no
// pending change and it was made with knowledge of the most recently accepted value. We'll drop invalid
// proposals on the ground.
const proposalValid =
currentValue === undefined
|| (currentValue.pending === undefined && currentValue.accepted.sequenceNumber <= refSeq);
Expand All @@ -279,8 +336,7 @@ export class Quorum extends SharedObject<IQuorumEvents> implements IQuorum {

// We expect signoffs from all connected clients at the time the set was sequenced, except for the client
// who issued the set (that client implicitly signs off).
const connectedClientIds = [...this.runtime.getQuorum().getMembers().keys()];
const expectedSignoffs = connectedClientIds.filter((quorumMemberId) => quorumMemberId !== clientId);
const expectedSignoffs = this.getSignoffClients().filter((quorumMemberId) => quorumMemberId !== clientId);

const newQuorumValue: QuorumValue = {
accepted,
Expand Down Expand Up @@ -323,11 +379,14 @@ export class Quorum extends SharedObject<IQuorumEvents> implements IQuorum {
clientId: string,
): void => {
const currentValue = this.values.get(key);
// A proposal is valid if the value is unknown or if it was made with knowledge of the most recently accepted
// value. We'll drop invalid proposals on the ground.
// We use a consensus-like approach here, so a proposal is valid if there's a value to delete, there's no
// other pending change, and the delete was made with knowledge of the most recently accepted value. Note
// this differs slightly from set because delete of an unset value is a no-op - these no-ops should also
// be prevented on the sending side but we'll guard here too. We'll drop invalid proposals on the ground.
const proposalValid =
currentValue === undefined
|| (currentValue.pending === undefined && currentValue.accepted.sequenceNumber <= refSeq);
currentValue !== undefined
&& currentValue.pending === undefined
&& currentValue.accepted.sequenceNumber <= refSeq;
if (!proposalValid) {
// TODO: If delete() returns a promise we will need to resolve it false for invalid proposals.
return;
Expand All @@ -337,8 +396,7 @@ export class Quorum extends SharedObject<IQuorumEvents> implements IQuorum {

// We expect signoffs from all connected clients at the time the delete was sequenced, except for the client
// who issued the delete (that client implicitly signs off).
const connectedClientIds = [...this.runtime.getQuorum().getMembers().keys()];
const expectedSignoffs = connectedClientIds.filter((quorumMemberId) => quorumMemberId !== clientId);
const expectedSignoffs = this.getSignoffClients().filter((quorumMemberId) => quorumMemberId !== clientId);

const newQuorumValue: QuorumValue = {
accepted,
Expand Down Expand Up @@ -368,7 +426,7 @@ export class Quorum extends SharedObject<IQuorumEvents> implements IQuorum {
}
};

private readonly handleIncomingAcceptOp = (
private readonly handleIncomingAccept = (
key: string,
pendingSeq: number,
clientId: string,
Expand All @@ -380,6 +438,8 @@ export class Quorum extends SharedObject<IQuorumEvents> implements IQuorum {
|| !pending.expectedSignoffs.includes(clientId)) {
// Drop unexpected accepts on the ground. This can happen normally in resubmit on reconnect cases, and
// is benign since the client implicitly accepts on disconnect.
// TODO: We could filter out just the accept ops when resubmitting on reconnect to avoid this - the
// proposals could still be resubmitted.
return;
}

Expand Down
64 changes: 61 additions & 3 deletions packages/dds/quorum/src/test/quorum.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -214,9 +214,67 @@ describe("Quorum", () => {
});
});

describe.skip("Detached/Attach", () => {
describe("Behavior before attach", () => { });
describe("Behavior after attaching", () => { });
describe("Detached/Attach", () => {
let containerRuntimeFactory: MockContainerRuntimeFactory;

beforeEach(() => {
containerRuntimeFactory = new MockContainerRuntimeFactory();
});

it("Can set and delete values before attaching and functions normally after attaching", async () => {
// Create a detached Quorum.
const dataStoreRuntime = new MockFluidDataStoreRuntime();
const containerRuntime = containerRuntimeFactory.createContainerRuntime(dataStoreRuntime);

const quorum = new Quorum("quorum", dataStoreRuntime, QuorumFactory.Attributes);
assert.strict(!quorum.isAttached(), "Quorum is attached earlier than expected");

const accept1P = new Promise<void>((resolve) => {
quorum.on("accepted", (key) => {
if (key === "baz") {
resolve();
}
});
});
quorum.set("foo", "bar");
quorum.set("baz", "boop");
await accept1P;
assert.strictEqual(quorum.get("baz"), "boop", "Couldn't set value in detached state");

const accept2P = new Promise<void>((resolve) => {
quorum.on("accepted", (key) => {
if (key === "foo") {
resolve();
}
});
});
quorum.delete("foo");
await accept2P;
assert.strictEqual(quorum.get("foo"), undefined, "Couldn't delete value in detached state");

// Attach the Quorum
const services = {
deltaConnection: containerRuntime.createDeltaConnection(),
objectStorage: new MockStorage(),
};
quorum.connect(services);

assert.strict(quorum.isAttached(), "Quorum is not attached when expected");
assert.strictEqual(quorum.get("foo"), undefined, "Wrong value in foo after attach");
assert.strictEqual(quorum.get("baz"), "boop", "Wrong value in baz after attach");

const accept3P = new Promise<void>((resolve) => {
quorum.on("accepted", (key) => {
if (key === "woz") {
resolve();
}
});
});
quorum.set("woz", "wiz");
containerRuntimeFactory.processAllMessages();
await accept3P;
assert.strictEqual(quorum.get("woz"), "wiz", "Wrong value in woz after post-attach set");
});
});

describe("Disconnect/Reconnect", () => {
Expand Down

0 comments on commit 1156196

Please sign in to comment.