Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(terser): Update WorkerPool to reuse Workers #1409

Merged
merged 6 commits into from
Jan 23, 2023
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 17 additions & 4 deletions packages/terser/src/module.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,15 +9,22 @@ import { WorkerPool } from './worker-pool';
export default function terser(input: Options = {}) {
const { maxWorkers, ...options } = input;

const workerPool = new WorkerPool({
filePath: fileURLToPath(import.meta.url),
maxWorkers
});
let workerPool: WorkerPool | null;
dasa marked this conversation as resolved.
Show resolved Hide resolved
let numOfChunks = 0;

return {
name: 'terser',

async renderChunk(code: string, chunk: RenderedChunk, outputOptions: NormalizedOutputOptions) {
if (!workerPool) {
workerPool = new WorkerPool({
filePath: fileURLToPath(import.meta.url),
maxWorkers
});
}

numOfChunks += 1;

const defaultOptions: Options = {
sourceMap: outputOptions.sourcemap === true || typeof outputOptions.sourcemap === 'string'
};
Expand Down Expand Up @@ -80,6 +87,12 @@ export default function terser(input: Options = {}) {
return result;
} catch (e) {
return Promise.reject(e);
} finally {
numOfChunks -= 1;
if (numOfChunks === 0) {
workerPool.close();
workerPool = null;
dasa marked this conversation as resolved.
Show resolved Hide resolved
}
}
}
};
Expand Down
114 changes: 59 additions & 55 deletions packages/terser/src/worker-pool.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import { AsyncResource } from 'async_hooks';
import { Worker } from 'worker_threads';
import { cpus } from 'os';
import { EventEmitter } from 'events';
Expand All @@ -12,7 +13,21 @@ import type {
WorkerPoolTask
} from './type';

const symbol = Symbol.for('FreeWoker');
const taskInfo = Symbol('taskInfo');
const freeWorker = Symbol('freeWorker');

type WorkerWithTaskInfo = Worker & { [taskInfo]?: null | WorkerPoolTaskInfo };
dasa marked this conversation as resolved.
Show resolved Hide resolved

class WorkerPoolTaskInfo extends AsyncResource {
constructor(private callback: WorkerCallback) {
super('WorkerPoolTaskInfo');
}

done(err: Error | null, result: any) {
this.runInAsyncScope(this.callback, null, err, result);
this.emitDestroy();
}
}

export class WorkerPool extends EventEmitter {
protected maxInstances: number;
Expand All @@ -21,37 +36,26 @@ export class WorkerPool extends EventEmitter {

protected tasks: WorkerPoolTask[] = [];

protected workers = 0;
protected workers: WorkerWithTaskInfo[] = [];
protected freeWorkers: WorkerWithTaskInfo[] = [];

constructor(options: WorkerPoolOptions) {
super();

this.maxInstances = options.maxWorkers || cpus().length;
this.filePath = options.filePath;

this.on(symbol, () => {
this.on(freeWorker, () => {
if (this.tasks.length > 0) {
this.run();
const { context, cb } = this.tasks.shift()!;
this.runTask(context, cb);
}
});
}

add(context: WorkerContext, cb: WorkerCallback) {
this.tasks.push({
context,
cb
});

if (this.workers >= this.maxInstances) {
return;
}

this.run();
}

async addAsync(context: WorkerContext): Promise<WorkerOutput> {
addAsync(context: WorkerContext): Promise<WorkerOutput> {
return new Promise((resolve, reject) => {
this.add(context, (err, output) => {
this.runTask(context, (err, output) => {
if (err) {
reject(err);
return;
Expand All @@ -67,51 +71,51 @@ export class WorkerPool extends EventEmitter {
});
}

private run() {
if (this.tasks.length === 0) {
return;
}

const task = this.tasks.shift();

if (typeof task === 'undefined') {
return;
close() {
for (const worker of this.workers) {
dasa marked this conversation as resolved.
Show resolved Hide resolved
worker.terminate();
}
}

this.workers += 1;

let called = false;
const callCallback = (err: Error | null, output?: WorkerOutput) => {
if (called) {
return;
}
called = true;

this.workers -= 1;

task.cb(err, output);
this.emit(symbol);
};

const worker = new Worker(this.filePath, {
workerData: {
code: task.context.code,
options: serializeJavascript(task.context.options)
}
});
private addNewWorker() {
const worker: WorkerWithTaskInfo = new Worker(this.filePath);

worker.on('message', (data) => {
callCallback(null, data);
worker.on('message', (result) => {
worker[taskInfo]!.done(null, result);
dasa marked this conversation as resolved.
Show resolved Hide resolved
worker[taskInfo] = null;
this.freeWorkers.push(worker);
this.emit(freeWorker);
});

worker.on('error', (err) => {
callCallback(err);
if (worker[taskInfo]) {
worker[taskInfo].done(err, null);
} else {
this.emit('error', err);
}
this.workers.splice(this.workers.indexOf(worker), 1);
this.addNewWorker();
});

worker.on('exit', (code) => {
if (code !== 0) {
callCallback(new Error(`Minify worker stopped with exit code ${code}`));
this.workers.push(worker);
this.freeWorkers.push(worker);
this.emit(freeWorker);
}

private runTask(context: WorkerContext, cb: WorkerCallback) {
if (this.freeWorkers.length === 0) {
this.tasks.push({ context, cb });
if (this.workers.length < this.maxInstances) {
this.addNewWorker();
}
return;
}

const worker = this.freeWorkers.pop()!;
dasa marked this conversation as resolved.
Show resolved Hide resolved
worker[taskInfo] = new WorkerPoolTaskInfo(cb);
worker.postMessage({
code: context.code,
options: serializeJavascript(context.options)
});
}
}
29 changes: 15 additions & 14 deletions packages/terser/src/worker.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
import process from 'process';
import { isMainThread, parentPort, workerData } from 'worker_threads';
import { isMainThread, parentPort } from 'worker_threads';

import { hasOwnProperty, isObject } from 'smob';

Expand All @@ -22,21 +21,25 @@ function isWorkerContextSerialized(input: unknown): input is WorkerContextSerial
);
}

export async function runWorker() {
if (isMainThread || !parentPort || !isWorkerContextSerialized(workerData)) {
export function runWorker() {
if (isMainThread || !parentPort) {
return;
}

try {
// eslint-disable-next-line no-eval
const eval2 = eval;
// eslint-disable-next-line no-eval
const eval2 = eval;

const options = eval2(`(${workerData.options})`);
parentPort.on('message', async (data: WorkerContextSerialized) => {
if (!isWorkerContextSerialized(data)) {
return;
}

const options = eval2(`(${data.options})`);

const result = await minify(workerData.code, options);
const result = await minify(data.code, options);

const output: WorkerOutput = {
code: result.code || workerData.code,
code: result.code || data.code,
nameCache: options.nameCache
};

Expand All @@ -48,8 +51,6 @@ export async function runWorker() {
output.sourceMap = result.map;
}

parentPort.postMessage(output);
} catch (e) {
process.exit(1);
}
parentPort!.postMessage(output);
dasa marked this conversation as resolved.
Show resolved Hide resolved
});
}
4 changes: 2 additions & 2 deletions packages/terser/test/test.js
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ test.serial('throw error on terser fail', async (t) => {
await bundle.generate({ format: 'esm' });
t.falsy(true);
} catch (error) {
t.is(error.toString(), 'Error: Minify worker stopped with exit code 1');
t.is(error.toString(), 'SyntaxError: Name expected');
}
});

Expand All @@ -142,7 +142,7 @@ test.serial('throw error on terser fail with multiple outputs', async (t) => {
await Promise.all([bundle.generate({ format: 'cjs' }), bundle.generate({ format: 'esm' })]);
t.falsy(true);
} catch (error) {
t.is(error.toString(), 'Error: Minify worker stopped with exit code 1');
t.is(error.toString(), 'SyntaxError: Name expected');
}
});

Expand Down