diff --git a/packages/loader/container-loader/src/catchUpMonitor.ts b/packages/loader/container-loader/src/catchUpMonitor.ts new file mode 100644 index 000000000000..39e45f1cf6cc --- /dev/null +++ b/packages/loader/container-loader/src/catchUpMonitor.ts @@ -0,0 +1,99 @@ +/*! + * Copyright (c) Microsoft Corporation and contributors. All rights reserved. + * Licensed under the MIT License. + */ + +import { IDisposable, IEvent } from "@fluidframework/common-definitions"; +import { assert, TypedEventEmitter } from "@fluidframework/common-utils"; +import { IDeltaManager } from "@fluidframework/container-definitions"; +import { ISequencedDocumentMessage } from "@fluidframework/protocol-definitions"; + +/** @see ICatchUpMonitor for usage */ +type CaughtUpListener = () => void; + +/** @see ICatchUpMonitor for usage */ +export interface ICatchUpMonitorEvents extends IEvent { + (event: "caughtUp", listener: CaughtUpListener): void; +} + +/** Monitor that emits an event when a Container has caught up to a given point in the op stream */ +export interface ICatchUpMonitor extends TypedEventEmitter, IDisposable { } + +/** + * Monitors a Container's DeltaManager, notifying listeners when all ops have been processed + * that were known at the time the monitor was created. + */ +export class CatchUpMonitor extends TypedEventEmitter implements ICatchUpMonitor { + private readonly targetSeqNumber: number; + private caughtUp: boolean = false; + + private readonly opHandler = (message: Pick) => { + if (!this.caughtUp && message.sequenceNumber >= this.targetSeqNumber) { + this.caughtUp = true; + this.emit("caughtUp"); + } + }; + + /** + * Create the CatchUpMonitor, setting the target sequence number to wait for based on DeltaManager's current state. + */ + constructor( + private readonly deltaManager: IDeltaManager, + ) { + super(); + + this.targetSeqNumber = this.deltaManager.lastKnownSeqNumber; + + assert(this.targetSeqNumber >= this.deltaManager.lastSequenceNumber, + "Cannot wait for seqNumber below last processed sequence number"); + + this.deltaManager.on("op", this.opHandler); + + // Simulate the last processed op to set caughtUp in case we already are + this.opHandler({ sequenceNumber: this.deltaManager.lastSequenceNumber }); + + // If a listener is added after we are already caught up, notify that new listener immediately + this.on("newListener", (event: string, listener) => { + if (event === "caughtUp") { + const caughtUpListener = listener as CaughtUpListener; + if (this.caughtUp) { + caughtUpListener(); + } + } + }); + } + + public disposed: boolean = false; + public dispose() { + if (this.disposed) { + return; + } + this.disposed = true; + + this.removeAllListeners(); + this.deltaManager.off("op", this.opHandler); + } +} + +/** Monitor that always notifies listeners immediately */ +export class ImmediateCatchUpMonitor extends TypedEventEmitter implements ICatchUpMonitor { + constructor() { + super(); + this.on("newListener", (event: string, listener) => { + if (event === "caughtUp") { + const caughtUpListener = listener as CaughtUpListener; + caughtUpListener(); + } + }); + } + + public disposed: boolean = false; + public dispose() { + if (this.disposed) { + return; + } + this.disposed = true; + + this.removeAllListeners(); + } +} diff --git a/packages/loader/container-loader/src/connectionStateHandler.ts b/packages/loader/container-loader/src/connectionStateHandler.ts index 34e9840739f0..55bbeca40943 100644 --- a/packages/loader/container-loader/src/connectionStateHandler.ts +++ b/packages/loader/container-loader/src/connectionStateHandler.ts @@ -5,11 +5,12 @@ import { ITelemetryLogger, ITelemetryProperties } from "@fluidframework/common-definitions"; import { assert, Timer } from "@fluidframework/common-utils"; -import { IConnectionDetails } from "@fluidframework/container-definitions"; +import { IConnectionDetails, IDeltaManager } from "@fluidframework/container-definitions"; import { ILocalSequencedClient, IProtocolHandler } from "@fluidframework/protocol-base"; import { ConnectionMode, IQuorumClients } from "@fluidframework/protocol-definitions"; import { PerformanceEvent } from "@fluidframework/telemetry-utils"; import { ConnectionState } from "./connectionState"; +import { CatchUpMonitor, ICatchUpMonitor, ImmediateCatchUpMonitor } from "./catchUpMonitor"; /** Constructor parameter type for passing in dependencies needed by the ConnectionStateHandler */ export interface IConnectionStateHandlerInputs { @@ -36,7 +37,7 @@ const JoinOpTimeoutMs = 45000; * sequenced or blocked by the server before emitting the new "connected" event and allowing runtime to resubmit ops. * * Each connection is assigned a clientId by the service, and the connection is book-ended by a Join and a Leave op - * generated by the service. Due to the distributed nature of the ordering service, in the case of reconnect we cannot + * generated by the service. Due to the distributed nature of the Relay Service, in the case of reconnect we cannot * make any assumptions about ordering of operations between the old and new connections - i.e. new Join op could * be sequenced before old Leave op (and some acks from pending ops that were in flight when we disconnected). * @@ -47,14 +48,17 @@ const JoinOpTimeoutMs = 45000; * pending ops can safely be submitted with the new clientId without fear of duplication in the sequenced op stream. * (B) We process the Join op for the new clientId (identified when the underlying connection was first established), * indicating the service is ready to sequence ops sent with the new clientId. + * (C) We process all ops known at the time the underlying connection was established (so we are "caught up") * * For (A) we give up waiting after some time (same timeout as server uses), and go ahead and transition to Connected. * For (B) we log telemetry if it takes too long, but still only transition to Connected when the Join op is processed * and we are added to the Quorum. + * For (C) this is optional behavior, controlled by the parameters of receivedConnectEvent */ export class ConnectionStateHandler { private _connectionState = ConnectionState.Disconnected; private _pendingClientId: string | undefined; + private catchUpMonitor: ICatchUpMonitor | undefined; private readonly prevClientLeftTimer: Timer; private readonly joinOpTimer: Timer; @@ -106,7 +110,7 @@ export class ConnectionStateHandler { const quorumClients = this.handler.quorumClients(); const details = { quorumInitialized: quorumClients !== undefined, - hasPendingClientId: this.pendingClientId !== undefined, + pendingClientId: this.pendingClientId, inQuorum: quorumClients?.getMember(this.pendingClientId ?? "") !== undefined, waitingForLeaveOp: this.waitingForLeaveOp, }; @@ -184,7 +188,10 @@ export class ConnectionStateHandler { && !this.waitingForLeaveOp ) { this.waitEvent?.end({ source }); - this.setConnectionState(ConnectionState.Connected); + + assert(this.catchUpMonitor !== undefined, + "catchUpMonitor should always be set if pendingClientId is set"); + this.catchUpMonitor.on("caughtUp", this.transitionToConnectedState); } else { // Adding this event temporarily so that we can get help debugging if something goes wrong. this.logger.sendTelemetryEvent({ @@ -210,32 +217,47 @@ export class ConnectionStateHandler { } public receivedDisconnectEvent(reason: string) { - if (this.joinOpTimer.hasTimer) { - this.stopJoinOpTimer(); - } this.setConnectionState(ConnectionState.Disconnected, reason); } + private readonly transitionToConnectedState = () => { + // Defensive measure, we should always be in CatchingUp state when this is called. + if (this._connectionState === ConnectionState.CatchingUp) { + this.setConnectionState(ConnectionState.Connected); + } else { + this.logger.sendTelemetryEvent({ + eventName: "cannotTransitionToConnectedState", + connectionState: ConnectionState[this._connectionState], + }); + } + }; + /** * The "connect" event indicates the connection to the Relay Service is live. * However, some additional conditions must be met before we can fully transition to * "Connected" state. This function handles that interim period, known as "Connecting" state. * @param connectionMode - Read or Write connection - * @param details - Connection details returned from the ordering service + * @param details - Connection details returned from the Relay Service + * @param deltaManager - DeltaManager to be used for delaying Connected transition until caught up. + * If it's undefined, then don't delay and transition to Connected as soon as Leave/Join op are accounted for */ public receivedConnectEvent( connectionMode: ConnectionMode, details: IConnectionDetails, + deltaManager?: IDeltaManager, ) { const oldState = this._connectionState; this._connectionState = ConnectionState.CatchingUp; const writeConnection = connectionMode === "write"; - assert(writeConnection || !this.handler.shouldClientJoinWrite(), + assert(!this.handler.shouldClientJoinWrite() || writeConnection, 0x30a /* shouldClientJoinWrite should imply this is a writeConnection */); - assert(writeConnection || !this.waitingForLeaveOp, + assert(!this.waitingForLeaveOp || writeConnection, 0x2a6 /* "waitingForLeaveOp should imply writeConnection (we need to be ready to flush pending ops)" */); + // Defensive measure in case catchUpMonitor from previous connection attempt wasn't already cleared + this.catchUpMonitor?.dispose(); + // Note that this may be undefined since the connection is established proactively on load // and the quorum may still be under initialization. const quorumClients: IQuorumClients | undefined = this.handler.quorumClients(); @@ -248,6 +270,11 @@ export class ConnectionStateHandler { // we know there can no longer be outstanding ops that we sent with the previous client id. this._pendingClientId = details.clientId; + // We may want to catch up to known ops as of now before transitioning to Connected state + this.catchUpMonitor = deltaManager !== undefined + ? new CatchUpMonitor(deltaManager) + : new ImmediateCatchUpMonitor(); + // IMPORTANT: Report telemetry after we set _pendingClientId, but before transitioning to Connected state this.handler.logConnectionStateChangeTelemetry(ConnectionState.CatchingUp, oldState); @@ -264,9 +291,21 @@ export class ConnectionStateHandler { this.startJoinOpTimer(); } else if (!this.waitingForLeaveOp) { // We're not waiting for Join or Leave op (if read-only connection those don't even apply), - // go ahead and declare the state to be Connected! - // If we are waiting for Leave op still, do nothing for now, we will transition to Connected later. - this.setConnectionState(ConnectionState.Connected); + // but we do need to wait until we are caught up (to now-known ops) before transitioning to Connected state. + this.catchUpMonitor.on("caughtUp", this.transitionToConnectedState); + } + // else - We are waiting for Leave op still, do nothing for now, we will transition to Connected later + } + + /** Clear all the state used during the Connecting phase (set in receivedConnectEvent) */ + private clearPendingConnectionState() { + this._pendingClientId = undefined; + + this.catchUpMonitor?.dispose(); + this.catchUpMonitor = undefined; + + if (this.joinOpTimer.hasTimer) { + this.stopJoinOpTimer(); } } @@ -295,8 +334,9 @@ export class ConnectionStateHandler { } this._clientId = this.pendingClientId; } else if (value === ConnectionState.Disconnected) { - // Important as we process our own joinSession message through delta request - this._pendingClientId = undefined; + // Clear pending state immediately to prepare for reconnect + this.clearPendingConnectionState(); + // Only wait for "leave" message if the connected client exists in the quorum because only the write // client will exist in the quorum and only for those clients we will receive "removeMember" event and // the client has some unacked ops. diff --git a/packages/loader/container-loader/src/container.ts b/packages/loader/container-loader/src/container.ts index 9af41ccbb004..970c521cf7eb 100644 --- a/packages/loader/container-loader/src/container.ts +++ b/packages/loader/container-loader/src/container.ts @@ -181,6 +181,10 @@ export async function waitContainerToCatchUp(container: IContainer) { }; container.on("closed", closedCallback); + // Depending on config, transition to "connected" state may include the guarantee + // that all known ops have been processed. If so, we may introduce additional wait here. + // Waiting for "connected" state in either case gets us at least to our own Join op + // which is a reasonable approximation of "caught up" const waitForOps = () => { assert(container.connectionState === ConnectionState.CatchingUp || container.connectionState === ConnectionState.Connected, @@ -543,7 +547,7 @@ export class Container extends EventEmitterWithErrorHandling i /** * Returns true if container is dirty. * Which means data loss if container is closed at that same moment - * Most likely that happens when there is no network connection to ordering service + * Most likely that happens when there is no network connection to Relay Service */ public get isDirty() { return this._dirtyContainer; @@ -1551,9 +1555,15 @@ export class Container extends EventEmitterWithErrorHandling i } } + const deltaManagerForCatchingUp = + this.mc.config.getBoolean("Fluid.Container.CatchUpBeforeDeclaringConnected") === true ? + this.deltaManager + : undefined; + this.connectionStateHandler.receivedConnectEvent( this.connectionMode, details, + deltaManagerForCatchingUp, ); }); diff --git a/packages/loader/container-loader/src/test/catchUpMonitor.spec.ts b/packages/loader/container-loader/src/test/catchUpMonitor.spec.ts new file mode 100644 index 000000000000..0f48ef93702e --- /dev/null +++ b/packages/loader/container-loader/src/test/catchUpMonitor.spec.ts @@ -0,0 +1,164 @@ +/*! + * Copyright (c) Microsoft Corporation and contributors. All rights reserved. + * Licensed under the MIT License. + */ + +/* eslint-disable max-len */ + +import { strict as assert } from "assert"; +import { IDeltaManager, IDeltaManagerEvents } from "@fluidframework/container-definitions"; +import { TypedEventEmitter } from "@fluidframework/common-utils"; +import { CatchUpMonitor, ImmediateCatchUpMonitor } from "../catchUpMonitor"; + +class MockDeltaManagerForCatchingUp + extends TypedEventEmitter + implements Pick, "lastSequenceNumber" | "lastKnownSeqNumber"> +{ // eslint-disable-line @typescript-eslint/brace-style + constructor( + public lastSequenceNumber: number = 5, + public lastKnownSeqNumber: number = 10, + ) { + super(); + } + + /** Simulate processing op with the given sequence number, to trigger CatchUpMonitor */ + emitOpWithSequenceNumber(sequenceNumber: number) { + this.emit("op", { sequenceNumber }); + } + + /** Trigger the CatchUpMonitor by emitting op with the target sequence number */ + emitOpToCatchUp() { + this.emitOpWithSequenceNumber(this.lastKnownSeqNumber); + } + + static create(sequenceNumbers: { + lastSequenceNumber?: number; + lastKnownSeqNumber?: number; + } = {}): MockDeltaManagerForCatchingUp & IDeltaManager { + // eslint-disable-next-line @typescript-eslint/no-unsafe-return + return new MockDeltaManagerForCatchingUp( + sequenceNumbers.lastSequenceNumber, + sequenceNumbers.lastKnownSeqNumber, + ) as any; + } +} + +describe("CatchUpMonitor", () => { + let monitor: CatchUpMonitor; + + afterEach(() => { + monitor?.dispose(); + }); + + it("constructor validates DeltaManager sequence number coherency", async () => { + const mockDeltaManager = MockDeltaManagerForCatchingUp.create({ + lastSequenceNumber: 20, + lastKnownSeqNumber: 15, // Should be impossible in real world + }); + + assert.throws(() => new CatchUpMonitor(mockDeltaManager), "Expect assert when DeltaManager in invalid state"); + }); + + it("Emits caughtUp event when caught up to the point it was created", () => { + const mockDeltaManager = MockDeltaManagerForCatchingUp.create({ + lastSequenceNumber: 10, + lastKnownSeqNumber: 15, + }); + let caughtUp = false; + + mockDeltaManager.lastKnownSeqNumber = 20; + monitor = new CatchUpMonitor(mockDeltaManager); + mockDeltaManager.lastKnownSeqNumber = 25; // Shouldn't change anything about the monitor + monitor.on("caughtUp", () => { caughtUp = true; }); + + mockDeltaManager.emitOpWithSequenceNumber(19); // Less than 20 + assert(!caughtUp, "Shouldn't be considered caught up yet"); + mockDeltaManager.emitOpWithSequenceNumber(21); // Greater than 20 + assert(caughtUp, "Should be considered caught up now"); + }); + + it("Adding a listener after already caught up invokes the listener immediately", () => { + const mockDeltaManager = MockDeltaManagerForCatchingUp.create({ + lastSequenceNumber: 10, + lastKnownSeqNumber: 15, + }); + let caughtUp = false; + + monitor = new CatchUpMonitor(mockDeltaManager); + mockDeltaManager.emitOpToCatchUp(); + + monitor.on("caughtUp", () => { caughtUp = true; }); + assert(caughtUp, "caughtUp should have fired immediately"); + }); + + it("Emits caught up immediately if last known/processed sequence numbers match", () => { + const mockDeltaManager = MockDeltaManagerForCatchingUp.create({ + lastSequenceNumber: 10, + lastKnownSeqNumber: 10, + }); + let caughtUp = false; + + monitor = new CatchUpMonitor(mockDeltaManager); + + monitor.on("caughtUp", () => { caughtUp = true; }); + assert(caughtUp, "caughtUp should have fired immediately"); + }); + + it("Only emits caughtUp once", () => { + const mockDeltaManager = MockDeltaManagerForCatchingUp.create({ + lastSequenceNumber: 10, + lastKnownSeqNumber: 15, + }); + let caughtUpCount = 0; + + monitor = new CatchUpMonitor(mockDeltaManager); + monitor.on("caughtUp", () => { ++caughtUpCount; }); + + mockDeltaManager.emitOpWithSequenceNumber(15); + assert.equal(caughtUpCount, 1, "caughtUp should have fired once"); + mockDeltaManager.emitOpWithSequenceNumber(16); + assert.equal(caughtUpCount, 1, "caughtUp should have fired only once"); + + let secondCaughtUpCount = 0; + monitor.on("caughtUp", () => { secondCaughtUpCount = 1; }); + assert.equal(secondCaughtUpCount, 1, "New listener should still get invoked once caught up"); + mockDeltaManager.emitOpWithSequenceNumber(17); + assert.equal(secondCaughtUpCount, 1, "Subsequent ops will not cause caughtUp again on second listener"); + }); + + it("Dispose removes all listeners", () => { + const mockDeltaManager = MockDeltaManagerForCatchingUp.create(); + monitor = new CatchUpMonitor(mockDeltaManager); + + monitor.on("caughtUp", () => {}); + monitor.on("caughtUp", () => {}); + monitor.on("caughtUp", () => {}); + monitor.dispose(); + + assert(monitor.disposed, "dispose() should set disposed"); + assert.equal(monitor.listenerCount("caughtUp"), 0, "dispose() should clear all listeners"); + assert.equal(mockDeltaManager.listenerCount("op"), 0, "CatchUpMonitor.dispose should remove listener on DeltaManager"); + }); +}); + +describe("ImmediateCatchUpMonitor", () => { + it("caughtUp event fires immediately upon adding a listener", () => { + const monitor = new ImmediateCatchUpMonitor(); + let caughtUp = false; + monitor.on("caughtUp", () => { + caughtUp = true; + }); + assert(caughtUp, "callback should be invoked immediately"); + }); + + it("Dispose removes all listeners", () => { + const monitor = new ImmediateCatchUpMonitor(); + monitor.on("caughtUp", () => {}); + monitor.on("caughtUp", () => {}); + monitor.on("caughtUp", () => {}); + monitor.dispose(); + + assert(monitor.disposed, "dispose() should set disposed"); + assert.equal(monitor.listenerCount("caughtUp"), 0, "dispose() should clear all listeners"); + }); +}); diff --git a/packages/loader/container-loader/src/test/connectionStateHandler.spec.ts b/packages/loader/container-loader/src/test/connectionStateHandler.spec.ts index 98c33f2c834e..97fcd8170a3f 100644 --- a/packages/loader/container-loader/src/test/connectionStateHandler.spec.ts +++ b/packages/loader/container-loader/src/test/connectionStateHandler.spec.ts @@ -6,14 +6,26 @@ /* eslint-disable max-len */ import { strict as assert } from "assert"; -import { TelemetryNullLogger } from "@fluidframework/common-utils"; +import { TelemetryNullLogger, Timer, TypedEventEmitter } from "@fluidframework/common-utils"; import { ProtocolOpHandler } from "@fluidframework/protocol-base"; import { IClient, IClientConfiguration, ITokenClaims } from "@fluidframework/protocol-definitions"; -import { IConnectionDetails } from "@fluidframework/container-definitions"; +import { IConnectionDetails, IDeltaManager, IDeltaManagerEvents } from "@fluidframework/container-definitions"; import { SinonFakeTimers, useFakeTimers } from "sinon"; import { ITelemetryProperties } from "@fluidframework/common-definitions"; import { ConnectionState } from "../connectionState"; import { ConnectionStateHandler, IConnectionStateHandlerInputs } from "../connectionStateHandler"; +import { ICatchUpMonitor } from "../catchUpMonitor"; + +class MockDeltaManagerForCatchingUp + extends TypedEventEmitter + implements Pick, "lastSequenceNumber" | "lastKnownSeqNumber"> +{ // eslint-disable-line @typescript-eslint/brace-style + lastSequenceNumber: number = 5; + lastKnownSeqNumber: number = 10; + catchUp() { + this.emit("op", { sequenceNumber: this.lastKnownSeqNumber }); + } +} describe("ConnectionStateHandler Tests", () => { let clock: SinonFakeTimers; @@ -25,8 +37,11 @@ describe("ConnectionStateHandler Tests", () => { let client: IClient; const expectedTimeout = 90000; const pendingClientId = "pendingClientId"; + let deltaManagerForCatchingUp: MockDeltaManagerForCatchingUp; let connectionStateHandler_receivedAddMemberEvent: (id: string) => void; let connectionStateHandler_receivedRemoveMemberEvent: (id: string) => void; + let connectionStateHandler_joinOpTimer: () => Timer; + let connectionStateHandler_catchUpMonitor: () => ICatchUpMonitor; // Stash the real setTimeout because sinon fake timers will hijack it. const realSetTimeout = setTimeout; @@ -91,6 +106,9 @@ describe("ConnectionStateHandler Tests", () => { (id: string) => { (connectionStateHandler as any).receivedAddMemberEvent(id); }; connectionStateHandler_receivedRemoveMemberEvent = (id: string) => { (connectionStateHandler as any).receivedRemoveMemberEvent(id); }; + connectionStateHandler_joinOpTimer = () => (connectionStateHandler as any).joinOpTimer as Timer; + connectionStateHandler_catchUpMonitor = () => (connectionStateHandler as any).catchUpMonitor as ICatchUpMonitor; + deltaManagerForCatchingUp = new MockDeltaManagerForCatchingUp(); }); it("Should move to connected state on normal flow for read client", async () => { @@ -101,6 +119,17 @@ describe("ConnectionStateHandler Tests", () => { "Read Client should be in connected state"); }); + it("Should move to connected after catching up for read client", async () => { + assert.strictEqual(connectionStateHandler.connectionState, ConnectionState.Disconnected, + "Client should be in Disconnected state"); + connectionStateHandler.receivedConnectEvent(client.mode, connectionDetails, deltaManagerForCatchingUp as any); + assert.strictEqual(connectionStateHandler.connectionState, ConnectionState.CatchingUp, + "Client should be in CatchingUp state"); + deltaManagerForCatchingUp.catchUp(); + assert.strictEqual(connectionStateHandler.connectionState, ConnectionState.Connected, + "Read Client should be in Connected state"); + }); + it("Should move to connected state on normal flow for write client", async () => { client.mode = "write"; assert.strictEqual(connectionStateHandler.connectionState, ConnectionState.Disconnected, @@ -118,6 +147,26 @@ describe("ConnectionStateHandler Tests", () => { "Client should be in connected state"); }); + it("Should move to connected state after catching up for write client", async () => { + client.mode = "write"; + assert.strictEqual(connectionStateHandler.connectionState, ConnectionState.Disconnected, + "Client should be in Disconnected state"); + connectionStateHandler.receivedConnectEvent(client.mode, connectionDetails, deltaManagerForCatchingUp as any); + assert.strictEqual(connectionStateHandler.connectionState, ConnectionState.CatchingUp, + "Client should be in CatchingUp state"); + protocolHandler.quorum.addMember("anotherClientId", { client, sequenceNumber: 0 }); + connectionStateHandler_receivedAddMemberEvent("anotherClientId"); + assert.strictEqual(connectionStateHandler.connectionState, ConnectionState.CatchingUp, + "Some other client joined."); + protocolHandler.quorum.addMember(pendingClientId, { client, sequenceNumber: 0 }); + connectionStateHandler_receivedAddMemberEvent(pendingClientId); + assert.strictEqual(connectionStateHandler.connectionState, ConnectionState.CatchingUp, + "Client should be in CatchingUp state until caught up"); + deltaManagerForCatchingUp.catchUp(); + assert.strictEqual(connectionStateHandler.connectionState, ConnectionState.Connected, + "Client should be in Connected state"); + }); + it("Should move to connected state on normal flow for write client, even if quorum isn't initialized at first", async () => { // swap out quorumClients fn for one that returns undefined at first handlerInputs.quorumClients = () => undefined; @@ -286,6 +335,23 @@ describe("ConnectionStateHandler Tests", () => { "Client 2 should now be in connected state"); }); + it("All pending state should be cleared after disconnect", async () => { + client.mode = "write"; + connectionStateHandler.receivedConnectEvent(client.mode, connectionDetails); + assert(connectionStateHandler.pendingClientId !== undefined, "pendingClientId should be set after receiving 'connect' event"); + assert(connectionStateHandler_catchUpMonitor !== undefined, "catchUpMonitor should be set after receiving 'connect' event"); + assert(connectionStateHandler_joinOpTimer().hasTimer, "joinOpTimer should be set after receiving 'connect' event"); + + let catchUpMonitorDisposed = false; + connectionStateHandler_catchUpMonitor().dispose = () => { catchUpMonitorDisposed = true; }; + + connectionStateHandler.receivedDisconnectEvent("test"); + assert(connectionStateHandler.pendingClientId === undefined, "pendingClientId should not be set after receiving 'disconnect' event"); + assert(connectionStateHandler_catchUpMonitor() === undefined, "catchUpMonitor should not be set after receiving 'disconnect' event"); + assert(catchUpMonitorDisposed, "Original catchUpMonitor should have been disposed after receiving 'disconnect' event"); + assert(!connectionStateHandler_joinOpTimer().hasTimer, "joinOpTimer should not be set after receiving 'disconnect' event"); + }); + it("Should wait for client 1 to leave before moving to connected state(Client 3) when client 2 got disconnected from connecting state", async () => { client.mode = "write"; connectionStateHandler.receivedConnectEvent(client.mode, connectionDetails); diff --git a/packages/loader/container-loader/src/test/container.spec.ts b/packages/loader/container-loader/src/test/container.spec.ts new file mode 100644 index 000000000000..12a4c1691133 --- /dev/null +++ b/packages/loader/container-loader/src/test/container.spec.ts @@ -0,0 +1,135 @@ +/*! + * Copyright (c) Microsoft Corporation and contributors. All rights reserved. + * Licensed under the MIT License. + */ + +/* eslint-disable max-len */ +/* eslint-disable @typescript-eslint/consistent-type-assertions */ + +import assert from "assert"; +import { EventEmitter } from "events"; +import { AttachState, IAudience, IContainer, IContainerEvents, IDeltaManager, IDeltaManagerEvents, ReadOnlyInfo } from "@fluidframework/container-definitions"; +import { sessionStorageConfigProvider } from "@fluidframework/telemetry-utils"; +import { TypedEventEmitter } from "@fluidframework/common-utils"; +import { IFluidRouter } from "@fluidframework/core-interfaces"; +import { IResolvedUrl } from "@fluidframework/driver-definitions"; +import { ISequencedDocumentMessage, IDocumentMessage } from "@fluidframework/protocol-definitions"; +import { Container, waitContainerToCatchUp } from "../container"; +import { Loader } from "../loader"; +import { CatchUpMonitor, ImmediateCatchUpMonitor } from "../catchUpMonitor"; +import { ConnectionState } from "../connectionState"; + +class MockDeltaManager + extends TypedEventEmitter + implements Partial, "on" | "off" | "once">> +{ // eslint-disable-line @typescript-eslint/brace-style + hasCheckpointSequenceNumber = true; + lastKnownSeqNumber = 2; + lastSequenceNumber = 1; +} + +class MockContainer extends TypedEventEmitter implements Partial> { + deltaManager: IDeltaManager = new MockDeltaManager() as any; + resolvedUrl?: IResolvedUrl | undefined; + attachState?: AttachState | undefined; + closed?: boolean | undefined = false; + isDirty?: boolean | undefined; + connectionState?: ConnectionState | undefined; + connected?: boolean | undefined; + audience?: IAudience | undefined; + clientId?: string | undefined; + readOnlyInfo?: ReadOnlyInfo | undefined; + IFluidRouter?: IFluidRouter | undefined; + + get mockDeltaManager() { return this.deltaManager as any as MockDeltaManager; } + + connect() { + this.connectionState = ConnectionState.Connected; + this.emit("connected"); + } +} + +describe("Container", () => { + describe("constructor", () => { + const oldRawConfig = sessionStorageConfigProvider.value.getRawConfig; + let injectedSettings = {}; + + before(() => { + // eslint-disable-next-line @typescript-eslint/no-unsafe-return + sessionStorageConfigProvider.value.getRawConfig = (name) => injectedSettings[name]; + }); + + afterEach(() => { + injectedSettings = {}; + }); + + after(() => { + sessionStorageConfigProvider.value.getRawConfig = oldRawConfig; + }); + + it("Fluid.Container.CatchUpBeforeDeclaringConnected = true, use CatchUpMonitor", () => { + injectedSettings["Fluid.Container.CatchUpBeforeDeclaringConnected"] = true; + + const container = new Container({ services: { options: {} } } as Loader, {}); + const deltaManager: any = container.deltaManager; + deltaManager.connectionManager.connection = {}; // Avoid assert 0x0df + (deltaManager as EventEmitter).emit("connect", { clientId: "someClientId" }); + + const catchUpMonitor = (container as any).connectionStateHandler.catchUpMonitor; + assert(catchUpMonitor instanceof CatchUpMonitor); + }); + + it("Fluid.Container.CatchUpBeforeDeclaringConnected undefined, use ImmediateCatchUpMonitor", () => { + const container = new Container({ services: { options: {} } } as Loader, {}); + const deltaManager: any = container.deltaManager; + deltaManager.connectionManager.connection = {}; // Avoid assert 0x0df + (deltaManager as EventEmitter).emit("connect", { clientId: "someClientId" }); + + const catchUpMonitor = (container as any).connectionStateHandler.catchUpMonitor; + assert(catchUpMonitor instanceof ImmediateCatchUpMonitor); + }); + }); + + describe("waitContainerToCatchUp", () => { + it("Closed Container fails", async () => { + const mockContainer = new MockContainer(); + mockContainer.closed = true; + + await assert.rejects(async () => + waitContainerToCatchUp(mockContainer as any as IContainer), "Passing a closed container should throw"); + }); + + it("Connected Container waits for catching up", async () => { + const mockContainer = new MockContainer(); + mockContainer.connectionState = ConnectionState.Connected; + + const waitP = waitContainerToCatchUp(mockContainer as any as IContainer); + mockContainer.mockDeltaManager.emit("op", { sequenceNumber: 2 }); + + // Should resolve immediately, otherwise test will time out + await waitP; + }); + + it("Connected and caught up Container resolves immediately", async () => { + const mockContainer = new MockContainer(); + mockContainer.mockDeltaManager.lastSequenceNumber = 2; // to match lastKnownSeqNumber + mockContainer.connectionState = ConnectionState.Connected; + + const waitP = waitContainerToCatchUp(mockContainer as any as IContainer); + + // Should resolve immediately, otherwise test will time out + await waitP; + }); + + it("Disconnected Container gets Connected then waits for catching up", async () => { + const mockContainer = new MockContainer(); + mockContainer.connectionState = ConnectionState.Disconnected; + + const waitP = waitContainerToCatchUp(mockContainer as any as IContainer); + mockContainer.mockDeltaManager.emit("op", { sequenceNumber: 2 }); + + // Should resolve immediately, otherwise test will time out + await waitP; + }); + }); +});