Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

flow control and discard watermark #2295

Closed
wants to merge 23 commits into from
Closed
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions demo/client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -224,7 +224,7 @@ function initOptions(term: TerminalType): void {
'handler',
'screenKeys',
'termName',
'useFlowControl',
// 'useFlowControl',
jerch marked this conversation as resolved.
Show resolved Hide resolved
jerch marked this conversation as resolved.
Show resolved Hide resolved
// Complex option
'theme'
];
Expand All @@ -236,7 +236,8 @@ function initOptions(term: TerminalType): void {
fontWeight: ['normal', 'bold', '100', '200', '300', '400', '500', '600', '700', '800', '900'],
fontWeightBold: ['normal', 'bold', '100', '200', '300', '400', '500', '600', '700', '800', '900'],
rendererType: ['dom', 'canvas'],
wordSeparator: null
wordSeparator: null,
answerbackString: null
jerch marked this conversation as resolved.
Show resolved Hide resolved
};
const options = Object.keys((<any>term)._core.options);
const booleanOptions = [];
Expand Down
94 changes: 73 additions & 21 deletions demo/server.js
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,31 @@ var pty = require('node-pty');
*/
const USE_BINARY_UTF8 = false;

/**
* Whether to use flow control.
* This must be in sync with frontend option useFlowControl!
*/
const USE_FLOW_CONTROL = false;

// send ENQ as ACK request (hardcoded in xterm.js)
const FLOW_CONTROL_ACK_REQUEST = '\x05';
// ACK response (answerbackString in xterm.js)
const FLOW_CONTROL_ACK_RESPONSE = '\x06\x06\x06\x06';
// send ACK request every n-th bytes
const ACK_WATERMARK = 131072;
// max allowed pending ACK requests before pausing pty
const MAX_PENDING_ACK = 4;

// settings for prebuffering
const MAX_SEND_INTERVAL = 5;
const MAX_CHUNK_SIZE = 65536;


function startServer() {
var app = express();
expressWs(app);

var terminals = {},
logs = {};
var terminals = {};

app.use('/xterm.css', express.static(__dirname + '/../css/xterm.css'));
app.get('/logo.png', (req, res) => res.sendFile(__dirname + '/logo.png'));
Expand Down Expand Up @@ -52,10 +70,6 @@ function startServer() {

console.log('Created terminal with PID: ' + term.pid);
terminals[term.pid] = term;
logs[term.pid] = '';
term.on('data', function(data) {
logs[term.pid] += data;
});
res.send(term.pid.toString());
res.end();
});
Expand All @@ -74,59 +88,97 @@ function startServer() {
app.ws('/terminals/:pid', function (ws, req) {
var term = terminals[parseInt(req.params.pid)];
console.log('Connected to terminal ' + term.pid);
ws.send(logs[term.pid]);

// string message buffering
function buffer(socket, timeout) {
const _send = data => {
// handle only 'open' websocket state
if (ws.readyState === 1) {
// test high latency
// setTimeout(() => ws.send(data), 250);
ws.send(data);
}
}

/**
* message buffering - limits are MAX_SEND_INTERVAL and MAX_CHUNK_SIZE
*/
// string message
function buffer(timeout, limit) {
let s = '';
let sender = null;
return (data) => {
s += data;
if (!sender) {
if (s.length > limit) {
clearTimeout(sender);
_send(s);
s = '';
sender = null;
} else if (!sender) {
sender = setTimeout(() => {
socket.send(s);
_send(s);
s = '';
sender = null;
}, timeout);
}
};
}
// binary message buffering
function bufferUtf8(socket, timeout) {
// binary message
function bufferUtf8(timeout, limit) {
let buffer = [];
let sender = null;
let length = 0;
return (data) => {
buffer.push(data);
length += data.length;
if (!sender) {
if (length > limit) {
clearTimeout(sender);
_send(Buffer.concat(buffer, length));
buffer = [];
sender = null;
length = 0;
} else if (!sender) {
sender = setTimeout(() => {
socket.send(Buffer.concat(buffer, length));
_send(Buffer.concat(buffer, length));
buffer = [];
sender = null;
length = 0;
}, timeout);
}
};
}
const send = USE_BINARY_UTF8 ? bufferUtf8(ws, 5) : buffer(ws, 5);
const send = (USE_BINARY_UTF8 ? bufferUtf8 : buffer)(MAX_SEND_INTERVAL, MAX_CHUNK_SIZE);

let ackPending = 0;
let bytesSent = 0;

term.on('data', function(data) {
try {
send(data);
} catch (ex) {
// The WebSocket is not open, ignore
send(data);
if (USE_FLOW_CONTROL) {
bytesSent += data.length;
if (bytesSent > ACK_WATERMARK) {
send(FLOW_CONTROL_ACK_REQUEST);
ackPending++;
bytesSent = 0;
if (ackPending > MAX_PENDING_ACK) {
term.pause();
}
}
}
});
ws.on('message', function(msg) {
if (USE_FLOW_CONTROL && msg === FLOW_CONTROL_ACK_RESPONSE) {
ackPending--;
jerch marked this conversation as resolved.
Show resolved Hide resolved
if (ackPending <= MAX_PENDING_ACK) {
term.resume();
}
return;
}
term.write(msg);
});
ws.on('close', function () {
term.kill();
console.log('Closed terminal ' + term.pid);
// Clean things up
delete terminals[term.pid];
delete logs[term.pid];
});
});

Expand Down
11 changes: 11 additions & 0 deletions src/InputHandler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -211,6 +211,9 @@ export class InputHandler extends Disposable implements IInputHandler {
this._parser.setExecuteHandler(C1.NEL, () => this.nextLine());
this._parser.setExecuteHandler(C1.HTS, () => this.tabSet());

// install ENQ handler - disabled by default, set useFlowControl to enable
// this._parser.setExecuteHandler(C0.ENQ, () => this.enquiry());

/**
* OSC handler
*/
Expand Down Expand Up @@ -495,6 +498,14 @@ export class InputHandler extends Disposable implements IInputHandler {
return this._parser.addOscHandler(ident, callback);
}

/**
* ENQ
* Enquiry (Ctrl-E).
*/
public enquiry(): void {
this._coreService.triggerDataEvent(this._terminal.options.answerbackString);
jerch marked this conversation as resolved.
Show resolved Hide resolved
}

/**
* BEL
* Bell (Ctrl-G).
Expand Down
98 changes: 38 additions & 60 deletions src/Terminal.ts
Original file line number Diff line number Diff line change
Expand Up @@ -63,11 +63,15 @@ import { CoreService } from 'common/services/CoreService';
const document = (typeof window !== 'undefined') ? window.document : null;

/**
* The amount of write requests to queue before sending an XOFF signal to the
* pty process. This number must be small in order for ^C and similar sequences
* to be responsive.
* Safety watermark to avoid memory exhaustion and browser engine crash on fast data input.
* Once hit the terminal will stop working. Enable flow control to avoid this limit
* and make sure that your backend correctly propagates this to the underlying pty.
* (see docs for further instructions)
* Since this limit is meant as a safety parachute to prevent browser crashs,
* it is set to a very high number. Typically xterm.js gets unresponsive with
* a much lower number (>500 kB).
*/
const WRITE_BUFFER_PAUSE_THRESHOLD = 5;
const DISCARD_WATERMARK = 50000000; // ~50 MB

/**
* The max number of ms to spend on writes before allowing the renderer to
Expand Down Expand Up @@ -160,14 +164,12 @@ export class Terminal extends Disposable implements ITerminal, IDisposable, IInp
public writeBuffer: string[];
public writeBufferUtf8: Uint8Array[];
private _writeInProgress: boolean;

/**
* Whether _xterm.js_ sent XOFF in order to catch up with the pty process.
* This is a distinct state from writeStopped so that if the user requested
* XOFF via ^S that it will not automatically resume when the writeBuffer goes
* below threshold.
* Sum of length of pending chunks in all write buffers.
* Note: For the string chunks the actual memory usage is
* doubled (JSString char takes 2 bytes).
*/
private _xoffSentToCatchUp: boolean;
private _writeBuffersPendingSize: number = 0;

/** Whether writing has been stopped as a result of XOFF */
// private _writeStopped: boolean;
Expand Down Expand Up @@ -292,8 +294,6 @@ export class Terminal extends Disposable implements ITerminal, IDisposable, IInp
this.writeBufferUtf8 = [];
this._writeInProgress = false;

this._xoffSentToCatchUp = false;
// this._writeStopped = false;
this._userScrolling = false;

// Register input handler and refire/handle events
Expand Down Expand Up @@ -421,6 +421,12 @@ export class Terminal extends Disposable implements ITerminal, IDisposable, IInp
}
}
break;
case 'useFlowControl':
if (this.optionsService.options.useFlowControl) {
(this._inputHandler as any)._parser.setExecuteHandler(C0.ENQ, () => (this._inputHandler as any).enquiry());
} else {
(this._inputHandler as any)._parser.clearExecuteHandler(C0.ENQ);
}
}
});
}
Expand Down Expand Up @@ -1211,17 +1217,16 @@ export class Terminal extends Disposable implements ITerminal, IDisposable, IInp
return;
}

this.writeBufferUtf8.push(data);

// Send XOFF to pause the pty process if the write buffer becomes too large so
// xterm.js can catch up before more data is sent. This is necessary in order
// to keep signals such as ^C responsive.
if (this.options.useFlowControl && !this._xoffSentToCatchUp && this.writeBufferUtf8.length >= WRITE_BUFFER_PAUSE_THRESHOLD) {
// XOFF - stop pty pipe
// XON will be triggered by emulator before processing data chunk
this._coreService.triggerDataEvent(C0.DC3);
this._xoffSentToCatchUp = true;
// safety measure: dont allow the backend to crash
// the terminal by writing to much data to fast.
// If we hit this, the terminal cant keep up with data written
// and will start to degenerate.
if (this._writeBuffersPendingSize > DISCARD_WATERMARK) {
throw new Error('write data discarded, use flow control to avoid losing data');
jerch marked this conversation as resolved.
Show resolved Hide resolved
}
this._writeBuffersPendingSize += data.length;

this.writeBufferUtf8.push(data);

if (!this._writeInProgress && this.writeBufferUtf8.length > 0) {
// Kick off a write which will write all data in sequence recursively
Expand All @@ -1244,23 +1249,11 @@ export class Terminal extends Disposable implements ITerminal, IDisposable, IInp
const data = this.writeBufferUtf8[bufferOffset];
bufferOffset++;

// If XOFF was sent in order to catch up with the pty process, resume it if
// we reached the end of the writeBuffer to allow more data to come in.
if (this._xoffSentToCatchUp && this.writeBufferUtf8.length === bufferOffset) {
this._coreService.triggerDataEvent(C0.DC1);
this._xoffSentToCatchUp = false;
}

this._refreshStart = this.buffer.y;
this._refreshEnd = this.buffer.y;

// HACK: Set the parser state based on it's state at the time of return.
// This works around the bug #662 which saw the parser state reset in the
// middle of parsing escape sequence in two chunks. For some reason the
// state of the parser resets to 0 after exiting parser.parse. This change
// just sets the state back based on the correct return statement.

this._inputHandler.parseUtf8(data);
this._writeBuffersPendingSize -= data.length;

this.updateRange(this.buffer.y);
this.refresh(this._refreshStart, this._refreshEnd);
Expand Down Expand Up @@ -1298,17 +1291,16 @@ export class Terminal extends Disposable implements ITerminal, IDisposable, IInp
return;
}

this.writeBuffer.push(data);

// Send XOFF to pause the pty process if the write buffer becomes too large so
// xterm.js can catch up before more data is sent. This is necessary in order
// to keep signals such as ^C responsive.
if (this.options.useFlowControl && !this._xoffSentToCatchUp && this.writeBuffer.length >= WRITE_BUFFER_PAUSE_THRESHOLD) {
// XOFF - stop pty pipe
// XON will be triggered by emulator before processing data chunk
this._coreService.triggerDataEvent(C0.DC3);
this._xoffSentToCatchUp = true;
// safety measure: dont allow the backend to crash
// the terminal by writing to much data to fast.
// If we hit this, the terminal cant keep up with data written
// and will start to degenerate.
if (this._writeBuffersPendingSize > DISCARD_WATERMARK) {
throw new Error('write data discarded, use flow control to avoid losing data');
}
this._writeBuffersPendingSize += data.length;

this.writeBuffer.push(data);

if (!this._writeInProgress && this.writeBuffer.length > 0) {
// Kick off a write which will write all data in sequence recursively
Expand All @@ -1331,23 +1323,11 @@ export class Terminal extends Disposable implements ITerminal, IDisposable, IInp
const data = this.writeBuffer[bufferOffset];
bufferOffset++;

// If XOFF was sent in order to catch up with the pty process, resume it if
// we reached the end of the writeBuffer to allow more data to come in.
if (this._xoffSentToCatchUp && this.writeBuffer.length === bufferOffset) {
this._coreService.triggerDataEvent(C0.DC1);
this._xoffSentToCatchUp = false;
}

this._refreshStart = this.buffer.y;
this._refreshEnd = this.buffer.y;

// HACK: Set the parser state based on it's state at the time of return.
// This works around the bug #662 which saw the parser state reset in the
// middle of parsing escape sequence in two chunks. For some reason the
// state of the parser resets to 0 after exiting parser.parse. This change
// just sets the state back based on the correct return statement.

this._inputHandler.parse(data);
this._writeBuffersPendingSize -= data.length;

this.updateRange(this.buffer.y);
this.refresh(this._refreshStart, this._refreshEnd);
Expand Down Expand Up @@ -1864,7 +1844,6 @@ export class Terminal extends Disposable implements ITerminal, IDisposable, IInp
const writeBuffer = this.writeBuffer;
const writeBufferUtf8 = this.writeBufferUtf8;
const writeInProgress = this._writeInProgress;
const xoffSentToCatchUp = this._xoffSentToCatchUp;
const userScrolling = this._userScrolling;

this._setup();
Expand All @@ -1881,7 +1860,6 @@ export class Terminal extends Disposable implements ITerminal, IDisposable, IInp
this.writeBuffer = writeBuffer;
this.writeBufferUtf8 = writeBufferUtf8;
this._writeInProgress = writeInProgress;
this._xoffSentToCatchUp = xoffSentToCatchUp;
this._userScrolling = userScrolling;

// do a full screen refresh
Expand Down
3 changes: 2 additions & 1 deletion src/common/services/OptionsService.ts
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,8 @@ export const DEFAULT_OPTIONS: ITerminalOptions = Object.freeze({
debug: false,
cancelEvents: false,
useFlowControl: false,
wordSeparator: ' ()[]{}\'"'
wordSeparator: ' ()[]{}\'"',
answerbackString: '\x06\x06\x06\x06'
jerch marked this conversation as resolved.
Show resolved Hide resolved
});

/**
Expand Down