Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

r11s: enable socket.io long-polling as a fallback #8820

Merged
merged 6 commits into from
Feb 8, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion api-report/driver-base.api.md
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import { TypedEventEmitter } from '@fluidframework/common-utils';

// @public
export class DocumentDeltaConnection extends TypedEventEmitter<IDocumentDeltaConnectionEvents> implements IDocumentDeltaConnection, IDisposable {
protected constructor(socket: SocketIOClient.Socket, documentId: string, logger: ITelemetryLogger);
protected constructor(socket: SocketIOClient.Socket, documentId: string, logger: ITelemetryLogger, enableLongPollingDowngrades?: boolean);
znewton marked this conversation as resolved.
Show resolved Hide resolved
// (undocumented)
protected addTrackedListener(event: string, listener: (...args: any[]) => void): void;
checkpointSequenceNumber: number | undefined;
Expand Down
34 changes: 34 additions & 0 deletions packages/drivers/driver-base/src/documentDeltaConnection.ts
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,8 @@ export class DocumentDeltaConnection

private _details: IConnected | undefined;

private reconnectAttempts: number = 0;

// Listeners only needed while the connection is in progress
private readonly connectionListeners: Map<string, (...args: any[]) => void> = new Map();
// Listeners used throughout the lifetime of the DocumentDeltaConnection
Expand Down Expand Up @@ -109,11 +111,13 @@ export class DocumentDeltaConnection
* @param socket - websocket to be used
* @param documentId - ID of the document
* @param logger - for reporting telemetry events
* @param enableLongPollingDowngrades - allow connection to be downgraded to long-polling on websocket failure
*/
protected constructor(
protected readonly socket: SocketIOClient.Socket,
public documentId: string,
logger: ITelemetryLogger,
private readonly enableLongPollingDowngrades: boolean = false,
) {
super();

Expand Down Expand Up @@ -362,16 +366,46 @@ export class DocumentDeltaConnection

// Listen for connection issues
this.addConnectionListener("connect_error", (error) => {
let isWebSocketTransportError = false;
try {
const description = error?.description;
if (description && typeof description === "object") {
if (error.type === "TransportError") {
isWebSocketTransportError = true;
}
// That's a WebSocket. Clear it as we can't log it.
description.target = undefined;
}
} catch(_e) {}

// Handle socket transport downgrading.
if (isWebSocketTransportError &&
this.enableLongPollingDowngrades &&
this.socket.io.opts.transports?.[0] !== "polling") {
// Downgrade transports to polling upgrade mechanism.
this.socket.io.opts.transports = ["polling", "websocket"];
// Don't alter reconnection behavior if already enabled.
if (!this.socket.io.reconnection()) {
// Allow single reconnection attempt using polling upgrade mechanism.
this.socket.io.reconnection(true);
this.socket.io.reconnectionAttempts(1);
}
}

// Allow built-in socket.io reconnection handling.
if (this.socket.io.reconnection() &&
this.reconnectAttempts < this.socket.io.reconnectionAttempts()) {
// Reconnection is enabled and maximum reconnect attempts have not been reached.
return;
}

fail(true, this.createErrorObject("connectError", error));
});

this.addConnectionListener("reconnect_attempt", () => {
this.reconnectAttempts++;
});

// Listen for timeouts
this.addConnectionListener("connect_timeout", () => {
fail(true, this.createErrorObject("connectTimeout"));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ export class R11sDocumentDeltaConnection extends DocumentDeltaConnection
tenantId,
},
reconnection: false,
// Default to websocket connection, with long-polling disabled
transports: ["websocket"],
timeout: timeoutMs,
});
Expand All @@ -46,7 +47,9 @@ export class R11sDocumentDeltaConnection extends DocumentDeltaConnection
versions: protocolVersions,
};

const deltaConnection = new R11sDocumentDeltaConnection(socket, id, logger);
// TODO: expose to host at factory level
const enableLongPollingDowngrades = true;
const deltaConnection = new R11sDocumentDeltaConnection(socket, id, logger, enableLongPollingDowngrades);

await deltaConnection.initialize(connectMessage, timeoutMs);
return deltaConnection;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -117,12 +117,12 @@ export function create(

// Create and register a socket.io connection on the server
const io = new Server(server, {
// enable compatibility with socket.io v2 clients
// Enable compatibility with socket.io v2 clients
allowEIO3: true,
// Indicates whether a connection should use compression
perMessageDeflate: true,
// ensure long polling is never used
transports: [ "websocket" ],
// Enable long-polling as a fallback
transports: ["websocket", "polling"],
cors: {
// Explicitly allow all origins by reflecting request origin.
// As a service that has potential to host countless different client apps,
Expand Down