diff --git a/packages/drivers/odsp-socket-storage/src/OdspDocumentDeltaConnection.ts b/packages/drivers/odsp-socket-storage/src/OdspDocumentDeltaConnection.ts index aac1d5a2eee1..849e613af7ff 100644 --- a/packages/drivers/odsp-socket-storage/src/OdspDocumentDeltaConnection.ts +++ b/packages/drivers/odsp-socket-storage/src/OdspDocumentDeltaConnection.ts @@ -11,7 +11,6 @@ import { IClient, IDocumentDeltaConnection, } from "@microsoft/fluid-protocol-definitions"; -import * as assert from "assert"; import { IOdspSocketError } from "./contracts"; import { debug } from "./debug"; import { errorObjectFromOdspError } from "./OdspUtils"; @@ -111,6 +110,14 @@ export class OdspDocumentDeltaConnection extends DocumentDeltaConnection impleme documentId: string, telemetryLogger: ITelemetryLogger): ISocketReference { let socketReference = OdspDocumentDeltaConnection.socketIoSockets.get(key); + + // verify the socket is healthy before reusing it + if (socketReference && (!socketReference.socket || !socketReference.socket.connected)) { + // the socket is in a bad state. fully remove the reference + socketReference = undefined; + OdspDocumentDeltaConnection.removeSocketIoReference(key, true); + } + if (socketReference) { telemetryLogger.sendTelemetryEvent({ references: socketReference.references, @@ -145,6 +152,10 @@ export class OdspDocumentDeltaConnection extends DocumentDeltaConnection impleme }); socket.on("server_disconnect", (socketError: IOdspSocketError) => { + // the server always closes the socket after sending this message + // fully remove the socket reference now + OdspDocumentDeltaConnection.removeSocketIoReference(key, true); + // Raise it as disconnect. // That produces cleaner telemetry (no errors) and keeps protocol simpler (and not driver-specific). socket.emit("disconnect", errorObjectFromOdspError(socketError)); @@ -171,12 +182,11 @@ export class OdspDocumentDeltaConnection extends DocumentDeltaConnection impleme private static removeSocketIoReference(key: string, isFatalError?: boolean) { const socketReference = OdspDocumentDeltaConnection.socketIoSockets.get(key); if (!socketReference) { - // this is expected to happens if we removed the reference due the socket not being connected + // this is expected to happen if we removed the reference due the socket not being connected return; } socketReference.references--; - assert(socketReference.delayDeleteTimeout === undefined); debug(`Removed socketio reference for ${key}. Remaining references: ${socketReference.references}.`); @@ -187,12 +197,19 @@ export class OdspDocumentDeltaConnection extends DocumentDeltaConnection impleme socketReference.socket = undefined; } + // clear the pending deletion if there is one + if (socketReference.delayDeleteTimeout !== undefined) { + clearTimeout(socketReference.delayDeleteTimeout); + socketReference.delayDeleteTimeout = undefined; + socketReference.delayDeleteTimeoutSetTime = undefined; + } + OdspDocumentDeltaConnection.socketIoSockets.delete(key); debug(`Deleted socketio reference for ${key}. Is fatal error: ${isFatalError}.`); return; } - if (socketReference.references === 0) { + if (socketReference.references === 0 && socketReference.delayDeleteTimeout === undefined) { socketReference.delayDeleteTimeout = setTimeout(() => { OdspDocumentDeltaConnection.socketIoSockets.delete(key); @@ -214,9 +231,9 @@ export class OdspDocumentDeltaConnection extends DocumentDeltaConnection impleme * @param socketReferenceKey - socket reference key */ constructor( - socket: SocketIOClient.Socket, - documentId: string, - private socketReferenceKey: string | undefined) { + socket: SocketIOClient.Socket, + documentId: string, + private socketReferenceKey: string | undefined) { super(socket, documentId); } @@ -228,7 +245,7 @@ export class OdspDocumentDeltaConnection extends DocumentDeltaConnection impleme throw new Error("Invalid socket reference key"); } - OdspDocumentDeltaConnection.removeSocketIoReference(this.socketReferenceKey); + OdspDocumentDeltaConnection.removeSocketIoReference(this.socketReferenceKey, socketProtocolError); this.socketReferenceKey = undefined; this.emit("disconnect", "client closing connection"); diff --git a/packages/drivers/socket-storage-shared/src/documentDeltaConnection.ts b/packages/drivers/socket-storage-shared/src/documentDeltaConnection.ts index 29c4658ecdbd..110c34115353 100644 --- a/packages/drivers/socket-storage-shared/src/documentDeltaConnection.ts +++ b/packages/drivers/socket-storage-shared/src/documentDeltaConnection.ts @@ -382,6 +382,12 @@ export class DocumentDeltaConnection extends EventEmitter implements IDocumentDe reject(createErrorObject("connect_timeout", "Socket connection timed out")); }); + // Listen for disconnects + this.addConnectionListener("disconnect", () => { + this.disconnect(true); + reject(createErrorObject("disconnect", "Socket disconnectd")); + }); + this.addConnectionListener("connect_document_success", (response: IConnected) => { this.removeTrackedListeners(true); resolve(response);