Skip to content

Commit

Permalink
Add permessage-deflate support
Browse files Browse the repository at this point in the history
  • Loading branch information
alexdima committed Dec 21, 2020
1 parent 7cc71d6 commit f0376ac
Show file tree
Hide file tree
Showing 3 changed files with 155 additions and 7 deletions.
157 changes: 151 additions & 6 deletions src/vs/base/parts/ipc/node/ipc.net.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

import { createHash } from 'crypto';
import { Socket, Server as NetServer, createConnection, createServer } from 'net';
import * as zlib from 'zlib';
import { Event, Emitter } from 'vs/base/common/event';
import { ClientConnectionEvent, IPCServer } from 'vs/base/parts/ipc/common/ipc';
import { join } from 'vs/base/common/path';
Expand Down Expand Up @@ -120,20 +121,114 @@ const enum ReadState {
export class WebSocketNodeSocket extends Disposable implements ISocket {

public readonly socket: NodeSocket;
public readonly permessageDeflate: boolean;
private _totalIncomingWireBytes: number;
private _totalIncomingDataBytes: number;
private _totalOutgoingWireBytes: number;
private _totalOutgoingDataBytes: number;
private readonly _zlibInflate: zlib.InflateRaw | null;
private readonly _zlibDeflate: zlib.DeflateRaw | null;
private readonly _recordInflateBytes: boolean;
private readonly _recordedInflateBytes: Buffer[] = [];
private readonly _pendingInflateData: Buffer[] = [];
private readonly _pendingDeflateData: Buffer[] = [];
private readonly _incomingData: ChunkStream;
private readonly _onData = this._register(new Emitter<VSBuffer>());
private readonly _onClose = this._register(new Emitter<void>());

private readonly _state = {
state: ReadState.PeekHeader,
readLen: Constants.MinHeaderByteSize,
fin: 0,
mask: 0
};

constructor(socket: NodeSocket) {
public get totalIncomingWireBytes(): number {
return this._totalIncomingWireBytes;
}

public get totalIncomingDataBytes(): number {
return this._totalIncomingDataBytes;
}

public get totalOutgoingWireBytes(): number {
return this._totalOutgoingWireBytes;
}

public get totalOutgoingDataBytes(): number {
return this._totalOutgoingDataBytes;
}

public get recordedInflateBytes(): VSBuffer {
if (this._recordInflateBytes) {
return VSBuffer.wrap(Buffer.concat(this._recordedInflateBytes));
}
return VSBuffer.alloc(0);
}

/**
* Create a socket which can communicate using WebSocket frames.
*
* **NOTE**: When using the permessage-deflate WebSocket extension, if parts of inflating was done
* in a different zlib instance, we need to pass all those bytes into zlib, otherwise the inflate
* might hit an inflated portion referencing a distance too far back.
*
* @param socket The underlying socket
* @param permessageDeflate Use the permessage-deflate WebSocket extension
* @param inflateBytes "Seed" zlib inflate with these bytes.
* @param recordInflateBytes Record all bytes sent to inflate
*/
constructor(socket: NodeSocket, permessageDeflate: boolean, inflateBytes: VSBuffer | null, recordInflateBytes: boolean) {
super();
this.socket = socket;
this._totalIncomingWireBytes = 0;
this._totalIncomingDataBytes = 0;
this._totalOutgoingWireBytes = 0;
this._totalOutgoingDataBytes = 0;
this.permessageDeflate = permessageDeflate;
this._recordInflateBytes = recordInflateBytes;
if (permessageDeflate) {
// See https://tools.ietf.org/html/rfc7692#page-16
// To simplify our logic, we don't negociate the window size
// and simply dedicate (2^15) / 32kb per web socket
this._zlibInflate = zlib.createInflateRaw({
windowBits: 15
});
this._zlibInflate.on('error', (err) => {
// zlib errors are fatal, since we have no idea how to recover
console.error(err);
onUnexpectedError(err);
this._onClose.fire();
});
this._zlibInflate.on('data', (data: Buffer) => {
this._pendingInflateData.push(data);
});
if (inflateBytes) {
this._zlibInflate.write(inflateBytes.buffer);
this._zlibInflate.flush(() => {
this._pendingInflateData.length = 0;
});
}

this._zlibDeflate = zlib.createDeflateRaw({
windowBits: 15
});
this._zlibDeflate.on('error', (err) => {
// zlib errors are fatal, since we have no idea how to recover
console.error(err);
onUnexpectedError(err);
this._onClose.fire();
});
this._zlibDeflate.on('data', (data: Buffer) => {
this._pendingDeflateData.push(data);
});
} else {
this._zlibInflate = null;
this._zlibDeflate = null;
}
this._incomingData = new ChunkStream();
this._register(this.socket.onData(data => this._acceptChunk(data)));
this._register(this.socket.onClose(() => this._onClose.fire()));
}

public dispose(): void {
Expand All @@ -145,14 +240,35 @@ export class WebSocketNodeSocket extends Disposable implements ISocket {
}

public onClose(listener: () => void): IDisposable {
return this.socket.onClose(listener);
return this._onClose.event(listener);
}

public onEnd(listener: () => void): IDisposable {
return this.socket.onEnd(listener);
}

public write(buffer: VSBuffer): void {
this._totalOutgoingDataBytes += buffer.byteLength;

if (this._zlibDeflate) {
this._zlibDeflate.write(<Buffer>buffer.buffer);

// See https://zlib.net/manual.html#Constants
this._zlibDeflate.flush(/*Z_SYNC_FLUSH*/2, () => {
let data = Buffer.concat(this._pendingDeflateData);
this._pendingDeflateData.length = 0;

// See https://tools.ietf.org/html/rfc7692#section-7.2.1
data = data.slice(0, data.length - 4);

this._write(VSBuffer.wrap(data), true);
});
} else {
this._write(buffer, false);
}
}

private _write(buffer: VSBuffer, compressed: boolean): void {
let headerLen = Constants.MinHeaderByteSize;
if (buffer.byteLength < 126) {
headerLen += 0;
Expand All @@ -163,7 +279,12 @@ export class WebSocketNodeSocket extends Disposable implements ISocket {
}
const header = VSBuffer.alloc(headerLen);

header.writeUInt8(0b10000010, 0);
if (compressed) {
// The RSV1 bit indicates a compressed frame
header.writeUInt8(0b11000010, 0);
} else {
header.writeUInt8(0b10000010, 0);
}
if (buffer.byteLength < 126) {
header.writeUInt8(buffer.byteLength, 1);
} else if (buffer.byteLength < 2 ** 16) {
Expand All @@ -184,6 +305,7 @@ export class WebSocketNodeSocket extends Disposable implements ISocket {
header.writeUInt8((buffer.byteLength >>> 0) & 0b11111111, ++offset);
}

this._totalOutgoingWireBytes += header.byteLength + buffer.byteLength;
this.socket.write(VSBuffer.concat([header, buffer]));
}

Expand All @@ -195,6 +317,7 @@ export class WebSocketNodeSocket extends Disposable implements ISocket {
if (data.byteLength === 0) {
return;
}
this._totalIncomingWireBytes += data.byteLength;

this._incomingData.acceptChunk(data);

Expand All @@ -203,14 +326,15 @@ export class WebSocketNodeSocket extends Disposable implements ISocket {
if (this._state.state === ReadState.PeekHeader) {
// peek to see if we can read the entire header
const peekHeader = this._incomingData.peek(this._state.readLen);
// const firstByte = peekHeader.readUInt8(0);
// const finBit = (firstByte & 0b10000000) >>> 7;
const firstByte = peekHeader.readUInt8(0);
const finBit = (firstByte & 0b10000000) >>> 7;
const secondByte = peekHeader.readUInt8(1);
const hasMask = (secondByte & 0b10000000) >>> 7;
const len = (secondByte & 0b01111111);

this._state.state = ReadState.ReadHeader;
this._state.readLen = Constants.MinHeaderByteSize + (hasMask ? 4 : 0) + (len === 126 ? 2 : 0) + (len === 127 ? 8 : 0);
this._state.fin = finBit;
this._state.mask = 0;

} else if (this._state.state === ReadState.ReadHeader) {
Expand Down Expand Up @@ -263,7 +387,28 @@ export class WebSocketNodeSocket extends Disposable implements ISocket {
this._state.readLen = Constants.MinHeaderByteSize;
this._state.mask = 0;

this._onData.fire(body);
if (this._zlibInflate) {
// See https://tools.ietf.org/html/rfc7692#section-7.2.2
if (this._recordInflateBytes) {
this._recordedInflateBytes.push(Buffer.from(<Buffer>body.buffer));
}
this._zlibInflate.write(<Buffer>body.buffer);
if (this._state.fin) {
if (this._recordInflateBytes) {
this._recordedInflateBytes.push(Buffer.from([0x00, 0x00, 0xff, 0xff]));
}
this._zlibInflate.write(Buffer.from([0x00, 0x00, 0xff, 0xff]));
}
this._zlibInflate.flush(() => {
const data = Buffer.concat(this._pendingInflateData);
this._pendingInflateData.length = 0;
this._totalIncomingDataBytes += data.length;
this._onData.fire(VSBuffer.wrap(data));
});
} else {
this._totalIncomingDataBytes += body.byteLength;
this._onData.fire(body);
}
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ export interface IExtHostSocketMessage {
type: 'VSCODE_EXTHOST_IPC_SOCKET';
initialDataChunk: string;
skipWebSocketFrames: boolean;
permessageDeflate: boolean;
inflateBytes: string;
}

export interface IExtHostReduceGraceTimeMessage {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,8 @@ function _createExtHostProtocol(): Promise<PersistentProtocol> {
if (msg.skipWebSocketFrames) {
socket = new NodeSocket(handle);
} else {
socket = new WebSocketNodeSocket(new NodeSocket(handle));
const inflateBytes = VSBuffer.wrap(Buffer.from(msg.inflateBytes, 'base64'));
socket = new WebSocketNodeSocket(new NodeSocket(handle), msg.permessageDeflate, inflateBytes, false);
}
if (protocol) {
// reconnection case
Expand Down

0 comments on commit f0376ac

Please sign in to comment.