Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(core): Stream JSON data over push channels to reduce memory usage #10273

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion packages/cli/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,7 @@
"ioredis": "5.3.2",
"isbot": "3.6.13",
"json-diff": "1.0.6",
"json-stream-stringify": "3.1.4",
"jsonschema": "1.4.1",
"jsonwebtoken": "9.0.2",
"ldapts": "4.2.6",
Expand Down Expand Up @@ -159,7 +160,6 @@
"simple-git": "3.17.0",
"source-map-support": "0.5.21",
"sqlite3": "5.1.7",
"sse-channel": "4.0.0",
"sshpk": "1.17.0",
"swagger-ui-express": "5.0.0",
"syslog-client": "1.1.1",
Expand Down
67 changes: 38 additions & 29 deletions packages/cli/src/push/abstract.push.ts
Original file line number Diff line number Diff line change
@@ -1,20 +1,24 @@
import { assert, jsonStringify } from 'n8n-workflow';
import { Service } from 'typedi';
import { type Readable } from 'stream';
import { JsonStreamStringify } from 'json-stream-stringify';

import type { IPushDataType } from '@/Interfaces';
import type { Logger } from '@/Logger';

/**
* Abstract class for two-way push communication.
* Keeps track of user sessions and enables sending messages.
*
* @emits message when a message is received from a client
*/
import { Logger } from '@/Logger';

@Service()
export abstract class AbstractPush<T> {
protected connections: Record<string, T> = {};

protected abstract close(connection: T): void;
protected abstract sendToOneConnection(connection: T, data: string): void;
protected abstract sendTo(clients: T[], stream: Readable): Promise<void>;
protected abstract ping(connection: T): void;

constructor(protected readonly logger: Logger) {}
private messageQueue: Array<[T[], Readable]> = [];

constructor(protected readonly logger: Logger) {
// Ping all connected clients every 60 seconds
setInterval(() => this.pingAll(), 60 * 1000);
}

protected add(pushRef: string, connection: T) {
const { connections } = this;
Expand All @@ -38,32 +42,37 @@
delete this.connections[pushRef];
}

private sendTo(type: IPushDataType, data: unknown, pushRefs: string[]) {
this.logger.debug(`Send data of type "${type}" to editor-UI`, {
dataType: type,
pushRefs: pushRefs.join(', '),
});

const stringifiedPayload = jsonStringify({ type, data }, { replaceCircularRefs: true });

for (const pushRef of pushRefs) {
const connection = this.connections[pushRef];
assert(connection);
this.sendToOneConnection(connection, stringifiedPayload);
sendToOne(type: IPushDataType, data: unknown, pushRef: string) {
const client = this.connections[pushRef];
if (client === undefined) {
this.logger.error(`The session "${pushRef}" is not registered.`, { pushRef });
return;
}
this.enqueue([client], type, data);
}

sendToAll(type: IPushDataType, data?: unknown) {
this.sendTo(type, data, Object.keys(this.connections));
const clients = Object.values(this.connections);
this.enqueue(clients, type, data);
}

sendToOne(type: IPushDataType, data: unknown, pushRef: string) {
if (this.connections[pushRef] === undefined) {
this.logger.error(`The session "${pushRef}" is not registered.`, { pushRef });
return;
private pingAll() {
for (const pushRef in this.connections) {
this.ping(this.connections[pushRef]);
}
}

private enqueue<D>(clients: T[], type: IPushDataType, data?: D) {
const stream = new JsonStreamStringify({ type, data }, undefined, undefined, true);
this.messageQueue.push([clients, stream]);
setImmediate(async () => this.processQueue());

Check failure on line 68 in packages/cli/src/push/abstract.push.ts

View workflow job for this annotation

GitHub Actions / Lint / Lint

Returning an awaited promise is required in this context
}

this.sendTo(type, data, [pushRef]);
private async processQueue() {
while (this.messageQueue.length) {
const [clients, stream] = this.messageQueue.shift()!;
await this.sendTo(clients, stream);
}
}

closeAllConnections() {
Expand Down
4 changes: 2 additions & 2 deletions packages/cli/src/push/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -66,11 +66,11 @@ export class Push extends TypedEmitter<PushEvents> {
this.emit('editorUiConnected', pushRef);
}

broadcast(type: IPushDataType, data?: unknown) {
broadcast<D>(type: IPushDataType, data?: D) {
this.backend.sendToAll(type, data);
}

send(type: IPushDataType, data: unknown, pushRef: string) {
send<D>(type: IPushDataType, data: D, pushRef: string) {
/**
* Multi-main setup: In a manual webhook execution, the main process that
* handles a webhook might not be the same as the main process that created
Expand Down
57 changes: 40 additions & 17 deletions packages/cli/src/push/sse.push.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@
import SSEChannel from 'sse-channel';
import { Service } from 'typedi';

import { Logger } from '@/Logger';
import { type Readable } from 'stream';

import { AbstractPush } from './abstract.push';
import type { PushRequest, PushResponse } from './types';
Expand All @@ -10,29 +8,54 @@ type Connection = { req: PushRequest; res: PushResponse };

@Service()
export class SSEPush extends AbstractPush<Connection> {
readonly channel = new SSEChannel();

readonly connections: Record<string, Connection> = {};
add(pushRef: string, connection: Connection) {
const { req, res } = connection;

constructor(logger: Logger) {
super(logger);
// Initialize the connection
req.socket.setTimeout(0);
req.socket.setNoDelay(true);
req.socket.setKeepAlive(true);

this.channel.on('disconnect', (_, { req }) => {
this.remove(req?.query?.pushRef);
});
}
res.setHeader('Content-Type', 'text/event-stream; charset=UTF-8');
res.setHeader('Cache-Control', 'no-cache');
res.setHeader('Connection', 'keep-alive');
res.writeHead(200);
res.write(':ok\n\n');
res.flush();

add(pushRef: string, connection: Connection) {
super.add(pushRef, connection);
this.channel.addClient(connection.req, connection.res);

// When the client disconnects, remove the client
const removeClient = () => this.remove(pushRef);
req.once('end', removeClient);
req.once('close', removeClient);
res.once('finish', removeClient);
}

protected close({ res }: Connection) {
res.end();
this.channel.removeClient(res);
}

protected sendToOneConnection(connection: Connection, data: string) {
this.channel.send(data, [connection.res]);
protected async sendTo(connections: Connection[], stream: Readable) {
connections.forEach(({ res }) => res.write('data: '));
await new Promise<void>((resolve, reject) => {
stream
.once('error', reject)
.on('data', (chunk: Buffer) => {
connections.forEach(({ res }) => res.write(chunk));
})
.once('end', resolve);
});

connections.forEach(({ res }) => res.write('\n\n'));
// `flush()` is defined in the compression middleware.
// This is necessary because the compression middleware sometimes waits
// for a certain amount of data before sending the data to the client
connections.forEach(({ res }) => res.flush());
}

protected ping({ res }: Connection) {
res.write(':ping\n\n');
res.flush();
}
}
11 changes: 10 additions & 1 deletion packages/cli/src/push/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,4 +10,13 @@ export type PushRequest = AuthenticatedRequest<{}, {}, {}, { pushRef: string }>;
export type SSEPushRequest = PushRequest & { ws: undefined };
export type WebSocketPushRequest = PushRequest & { ws: WebSocket };

export type PushResponse = Response & { req: PushRequest };
export type PushResponse = Response & {
req: PushRequest;

/**
* `flush()` is defined in the compression middleware.
* This is necessary because the compression middleware sometimes waits
* for a certain amount of data before sending the data to the client
*/
flush: () => void;
};
46 changes: 24 additions & 22 deletions packages/cli/src/push/websocket.push.ts
Original file line number Diff line number Diff line change
@@ -1,21 +1,17 @@
import type WebSocket from 'ws';
import { Service } from 'typedi';
import { Logger } from '@/Logger';
import type { Readable } from 'stream';
import type WebSocket from 'ws';

import { AbstractPush } from './abstract.push';

function heartbeat(this: WebSocket) {
this.isAlive = true;
}

export const EMPTY_BUFFER = Buffer.alloc(0);

@Service()
export class WebSocketPush extends AbstractPush<WebSocket> {
constructor(logger: Logger) {
super(logger);

// Ping all connected clients every 60 seconds
setInterval(() => this.pingAll(), 60 * 1000);
}

add(pushRef: string, connection: WebSocket) {
connection.isAlive = true;
connection.on('pong', heartbeat);
Expand All @@ -33,21 +29,27 @@ export class WebSocketPush extends AbstractPush<WebSocket> {
connection.close();
}

protected sendToOneConnection(connection: WebSocket, data: string): void {
connection.send(data);
protected async sendTo(connections: WebSocket[], stream: Readable) {
await new Promise<void>((resolve, reject) => {
stream
.once('error', reject)
.on('data', (chunk: Buffer) => {
connections.forEach((connection) => connection.send(chunk, { fin: false }));
})
.once('end', () => {
connections.forEach((connection) => connection.send(EMPTY_BUFFER));
resolve();
});
});
}

private pingAll() {
for (const pushRef in this.connections) {
const connection = this.connections[pushRef];
// If a connection did not respond with a `PONG` in the last 60 seconds, disconnect
if (!connection.isAlive) {
delete this.connections[pushRef];
return connection.terminate();
}

connection.isAlive = false;
connection.ping();
protected ping(connection: WebSocket): void {
// If a connection did not respond with a `PONG` in the last 60 seconds, disconnect
if (!connection.isAlive) {
return connection.terminate();
}

connection.isAlive = false;
connection.ping();
}
}
17 changes: 0 additions & 17 deletions packages/cli/src/sse-channel.d.ts

This file was deleted.

31 changes: 23 additions & 8 deletions packages/cli/test/unit/push/websocket.push.test.ts
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
import { Container } from 'typedi';
import { EventEmitter } from 'events';
import type WebSocket from 'ws';
import { sleep } from 'n8n-workflow';
import { mock } from 'jest-mock-extended';

import { WebSocketPush } from '@/push/websocket.push';
import { EMPTY_BUFFER, WebSocketPush } from '@/push/websocket.push';
import { Logger } from '@/Logger';
import type { PushDataExecutionRecovered } from '@/Interfaces';

Expand All @@ -29,12 +30,13 @@ describe('WebSocketPush', () => {
const pushRef2 = 'test-session2';

mockInstance(Logger);
const webSocketPush = Container.get(WebSocketPush);
let webSocketPush: WebSocketPush;
const mockWebSocket1 = createMockWebSocket();
const mockWebSocket2 = createMockWebSocket();

beforeEach(() => {
jest.resetAllMocks();
webSocketPush = new WebSocketPush(mock());
});

it('can add a connection', () => {
Expand All @@ -53,7 +55,7 @@ describe('WebSocketPush', () => {
expect(mockWebSocket1.listenerCount('pong')).toBe(0);
});

it('sends data to one connection', () => {
it('sends data to one connection', async () => {
webSocketPush.add(pushRef1, mockWebSocket1);
webSocketPush.add(pushRef2, mockWebSocket2);
const data: PushDataExecutionRecovered = {
Expand All @@ -63,9 +65,14 @@ describe('WebSocketPush', () => {
},
};

// Use real timers to yield back to the event-loop, to finish streaming the data
jest.useRealTimers();
webSocketPush.sendToOne('executionRecovered', data, pushRef1);
await sleep(100);
jest.useFakeTimers();

expect(mockWebSocket1.send).toHaveBeenCalledWith(
expect(mockWebSocket1.send).toHaveBeenNthCalledWith(
1,
JSON.stringify({
type: 'executionRecovered',
data: {
Expand All @@ -75,11 +82,14 @@ describe('WebSocketPush', () => {
},
},
}),
{ fin: false },
);
expect(mockWebSocket1.send).toHaveBeenNthCalledWith(2, EMPTY_BUFFER);

expect(mockWebSocket2.send).not.toHaveBeenCalled();
});

it('sends data to all connections', () => {
it('sends data to all connections', async () => {
webSocketPush.add(pushRef1, mockWebSocket1);
webSocketPush.add(pushRef2, mockWebSocket2);
const data: PushDataExecutionRecovered = {
Expand All @@ -89,7 +99,10 @@ describe('WebSocketPush', () => {
},
};

jest.useRealTimers();
webSocketPush.sendToAll('executionRecovered', data);
await sleep(100);
jest.useFakeTimers();

const expectedMsg = JSON.stringify({
type: 'executionRecovered',
Expand All @@ -100,8 +113,10 @@ describe('WebSocketPush', () => {
},
},
});
expect(mockWebSocket1.send).toHaveBeenCalledWith(expectedMsg);
expect(mockWebSocket2.send).toHaveBeenCalledWith(expectedMsg);
expect(mockWebSocket1.send).toHaveBeenCalledWith(expectedMsg, { fin: false });
expect(mockWebSocket1.send).toHaveBeenCalledWith(EMPTY_BUFFER);
expect(mockWebSocket2.send).toHaveBeenCalledWith(expectedMsg, { fin: false });
expect(mockWebSocket2.send).toHaveBeenCalledWith(EMPTY_BUFFER);
});

it('pings all connections', () => {
Expand Down
Loading
Loading