diff --git a/src/vs/base/parts/ipc/node/ipc.net.ts b/src/vs/base/parts/ipc/node/ipc.net.ts index 39ef87b846446..74425689a10d0 100644 --- a/src/vs/base/parts/ipc/node/ipc.net.ts +++ b/src/vs/base/parts/ipc/node/ipc.net.ts @@ -5,6 +5,7 @@ import { createHash } from 'crypto'; import { Socket, Server as NetServer, createConnection, createServer } from 'net'; +import * as zlib from 'zlib'; import { Event, Emitter } from 'vs/base/common/event'; import { ClientConnectionEvent, IPCServer } from 'vs/base/parts/ipc/common/ipc'; import { join } from 'vs/base/common/path'; @@ -120,20 +121,114 @@ const enum ReadState { export class WebSocketNodeSocket extends Disposable implements ISocket { public readonly socket: NodeSocket; + public readonly permessageDeflate: boolean; + private _totalIncomingWireBytes: number; + private _totalIncomingDataBytes: number; + private _totalOutgoingWireBytes: number; + private _totalOutgoingDataBytes: number; + private readonly _zlibInflate: zlib.InflateRaw | null; + private readonly _zlibDeflate: zlib.DeflateRaw | null; + private readonly _recordInflateBytes: boolean; + private readonly _recordedInflateBytes: Buffer[] = []; + private readonly _pendingInflateData: Buffer[] = []; + private readonly _pendingDeflateData: Buffer[] = []; private readonly _incomingData: ChunkStream; private readonly _onData = this._register(new Emitter()); + private readonly _onClose = this._register(new Emitter()); private readonly _state = { state: ReadState.PeekHeader, readLen: Constants.MinHeaderByteSize, + fin: 0, mask: 0 }; - constructor(socket: NodeSocket) { + public get totalIncomingWireBytes(): number { + return this._totalIncomingWireBytes; + } + + public get totalIncomingDataBytes(): number { + return this._totalIncomingDataBytes; + } + + public get totalOutgoingWireBytes(): number { + return this._totalOutgoingWireBytes; + } + + public get totalOutgoingDataBytes(): number { + return this._totalOutgoingDataBytes; + } + + public get recordedInflateBytes(): VSBuffer { + if (this._recordInflateBytes) { + return VSBuffer.wrap(Buffer.concat(this._recordedInflateBytes)); + } + return VSBuffer.alloc(0); + } + + /** + * Create a socket which can communicate using WebSocket frames. + * + * **NOTE**: When using the permessage-deflate WebSocket extension, if parts of inflating was done + * in a different zlib instance, we need to pass all those bytes into zlib, otherwise the inflate + * might hit an inflated portion referencing a distance too far back. + * + * @param socket The underlying socket + * @param permessageDeflate Use the permessage-deflate WebSocket extension + * @param inflateBytes "Seed" zlib inflate with these bytes. + * @param recordInflateBytes Record all bytes sent to inflate + */ + constructor(socket: NodeSocket, permessageDeflate: boolean, inflateBytes: VSBuffer | null, recordInflateBytes: boolean) { super(); this.socket = socket; + this._totalIncomingWireBytes = 0; + this._totalIncomingDataBytes = 0; + this._totalOutgoingWireBytes = 0; + this._totalOutgoingDataBytes = 0; + this.permessageDeflate = permessageDeflate; + this._recordInflateBytes = recordInflateBytes; + if (permessageDeflate) { + // See https://tools.ietf.org/html/rfc7692#page-16 + // To simplify our logic, we don't negociate the window size + // and simply dedicate (2^15) / 32kb per web socket + this._zlibInflate = zlib.createInflateRaw({ + windowBits: 15 + }); + this._zlibInflate.on('error', (err) => { + // zlib errors are fatal, since we have no idea how to recover + console.error(err); + onUnexpectedError(err); + this._onClose.fire(); + }); + this._zlibInflate.on('data', (data: Buffer) => { + this._pendingInflateData.push(data); + }); + if (inflateBytes) { + this._zlibInflate.write(inflateBytes.buffer); + this._zlibInflate.flush(() => { + this._pendingInflateData.length = 0; + }); + } + + this._zlibDeflate = zlib.createDeflateRaw({ + windowBits: 15 + }); + this._zlibDeflate.on('error', (err) => { + // zlib errors are fatal, since we have no idea how to recover + console.error(err); + onUnexpectedError(err); + this._onClose.fire(); + }); + this._zlibDeflate.on('data', (data: Buffer) => { + this._pendingDeflateData.push(data); + }); + } else { + this._zlibInflate = null; + this._zlibDeflate = null; + } this._incomingData = new ChunkStream(); this._register(this.socket.onData(data => this._acceptChunk(data))); + this._register(this.socket.onClose(() => this._onClose.fire())); } public dispose(): void { @@ -145,7 +240,7 @@ export class WebSocketNodeSocket extends Disposable implements ISocket { } public onClose(listener: () => void): IDisposable { - return this.socket.onClose(listener); + return this._onClose.event(listener); } public onEnd(listener: () => void): IDisposable { @@ -153,6 +248,27 @@ export class WebSocketNodeSocket extends Disposable implements ISocket { } public write(buffer: VSBuffer): void { + this._totalOutgoingDataBytes += buffer.byteLength; + + if (this._zlibDeflate) { + this._zlibDeflate.write(buffer.buffer); + + // See https://zlib.net/manual.html#Constants + this._zlibDeflate.flush(/*Z_SYNC_FLUSH*/2, () => { + let data = Buffer.concat(this._pendingDeflateData); + this._pendingDeflateData.length = 0; + + // See https://tools.ietf.org/html/rfc7692#section-7.2.1 + data = data.slice(0, data.length - 4); + + this._write(VSBuffer.wrap(data), true); + }); + } else { + this._write(buffer, false); + } + } + + private _write(buffer: VSBuffer, compressed: boolean): void { let headerLen = Constants.MinHeaderByteSize; if (buffer.byteLength < 126) { headerLen += 0; @@ -163,7 +279,12 @@ export class WebSocketNodeSocket extends Disposable implements ISocket { } const header = VSBuffer.alloc(headerLen); - header.writeUInt8(0b10000010, 0); + if (compressed) { + // The RSV1 bit indicates a compressed frame + header.writeUInt8(0b11000010, 0); + } else { + header.writeUInt8(0b10000010, 0); + } if (buffer.byteLength < 126) { header.writeUInt8(buffer.byteLength, 1); } else if (buffer.byteLength < 2 ** 16) { @@ -184,6 +305,7 @@ export class WebSocketNodeSocket extends Disposable implements ISocket { header.writeUInt8((buffer.byteLength >>> 0) & 0b11111111, ++offset); } + this._totalOutgoingWireBytes += header.byteLength + buffer.byteLength; this.socket.write(VSBuffer.concat([header, buffer])); } @@ -195,6 +317,7 @@ export class WebSocketNodeSocket extends Disposable implements ISocket { if (data.byteLength === 0) { return; } + this._totalIncomingWireBytes += data.byteLength; this._incomingData.acceptChunk(data); @@ -203,14 +326,15 @@ export class WebSocketNodeSocket extends Disposable implements ISocket { if (this._state.state === ReadState.PeekHeader) { // peek to see if we can read the entire header const peekHeader = this._incomingData.peek(this._state.readLen); - // const firstByte = peekHeader.readUInt8(0); - // const finBit = (firstByte & 0b10000000) >>> 7; + const firstByte = peekHeader.readUInt8(0); + const finBit = (firstByte & 0b10000000) >>> 7; const secondByte = peekHeader.readUInt8(1); const hasMask = (secondByte & 0b10000000) >>> 7; const len = (secondByte & 0b01111111); this._state.state = ReadState.ReadHeader; this._state.readLen = Constants.MinHeaderByteSize + (hasMask ? 4 : 0) + (len === 126 ? 2 : 0) + (len === 127 ? 8 : 0); + this._state.fin = finBit; this._state.mask = 0; } else if (this._state.state === ReadState.ReadHeader) { @@ -263,7 +387,28 @@ export class WebSocketNodeSocket extends Disposable implements ISocket { this._state.readLen = Constants.MinHeaderByteSize; this._state.mask = 0; - this._onData.fire(body); + if (this._zlibInflate) { + // See https://tools.ietf.org/html/rfc7692#section-7.2.2 + if (this._recordInflateBytes) { + this._recordedInflateBytes.push(Buffer.from(body.buffer)); + } + this._zlibInflate.write(body.buffer); + if (this._state.fin) { + if (this._recordInflateBytes) { + this._recordedInflateBytes.push(Buffer.from([0x00, 0x00, 0xff, 0xff])); + } + this._zlibInflate.write(Buffer.from([0x00, 0x00, 0xff, 0xff])); + } + this._zlibInflate.flush(() => { + const data = Buffer.concat(this._pendingInflateData); + this._pendingInflateData.length = 0; + this._totalIncomingDataBytes += data.length; + this._onData.fire(VSBuffer.wrap(data)); + }); + } else { + this._totalIncomingDataBytes += body.byteLength; + this._onData.fire(body); + } } } } diff --git a/src/vs/workbench/services/extensions/common/extensionHostProtocol.ts b/src/vs/workbench/services/extensions/common/extensionHostProtocol.ts index df53b4709d399..46a8e5f459b0d 100644 --- a/src/vs/workbench/services/extensions/common/extensionHostProtocol.ts +++ b/src/vs/workbench/services/extensions/common/extensionHostProtocol.ts @@ -20,6 +20,8 @@ export interface IExtHostSocketMessage { type: 'VSCODE_EXTHOST_IPC_SOCKET'; initialDataChunk: string; skipWebSocketFrames: boolean; + permessageDeflate: boolean; + inflateBytes: string; } export interface IExtHostReduceGraceTimeMessage { diff --git a/src/vs/workbench/services/extensions/node/extensionHostProcessSetup.ts b/src/vs/workbench/services/extensions/node/extensionHostProcessSetup.ts index 6ff6b8fc3a376..c7bad89095f17 100644 --- a/src/vs/workbench/services/extensions/node/extensionHostProcessSetup.ts +++ b/src/vs/workbench/services/extensions/node/extensionHostProcessSetup.ts @@ -120,7 +120,8 @@ function _createExtHostProtocol(): Promise { if (msg.skipWebSocketFrames) { socket = new NodeSocket(handle); } else { - socket = new WebSocketNodeSocket(new NodeSocket(handle)); + const inflateBytes = VSBuffer.wrap(Buffer.from(msg.inflateBytes, 'base64')); + socket = new WebSocketNodeSocket(new NodeSocket(handle), msg.permessageDeflate, inflateBytes, false); } if (protocol) { // reconnection case