From 74dbb42d39ed475db6e42eeb391b89cc705af656 Mon Sep 17 00:00:00 2001 From: Dasa Paddock Date: Mon, 23 Jan 2023 13:11:40 -0800 Subject: [PATCH] feat(terser): Update WorkerPool to reuse Workers (#1409) * Update WorkerPool to reuse Workers * test number of workers used * Address feedback * Fix ESLint warnings * Use regular `for` loop * Address feedback --- packages/terser/src/constants.ts | 2 + packages/terser/src/module.ts | 27 ++++++- packages/terser/src/type.ts | 11 +++ packages/terser/src/worker-pool.ts | 119 ++++++++++++++++------------- packages/terser/src/worker.ts | 29 +++---- packages/terser/test/test.js | 22 +++++- 6 files changed, 134 insertions(+), 76 deletions(-) create mode 100644 packages/terser/src/constants.ts diff --git a/packages/terser/src/constants.ts b/packages/terser/src/constants.ts new file mode 100644 index 000000000..36a95c890 --- /dev/null +++ b/packages/terser/src/constants.ts @@ -0,0 +1,2 @@ +export const taskInfo = Symbol('taskInfo'); +export const freeWorker = Symbol('freeWorker'); diff --git a/packages/terser/src/module.ts b/packages/terser/src/module.ts index a74cc7f3c..2877a6e49 100644 --- a/packages/terser/src/module.ts +++ b/packages/terser/src/module.ts @@ -9,15 +9,23 @@ import { WorkerPool } from './worker-pool'; export default function terser(input: Options = {}) { const { maxWorkers, ...options } = input; - const workerPool = new WorkerPool({ - filePath: fileURLToPath(import.meta.url), - maxWorkers - }); + let workerPool: WorkerPool | null | undefined; + let numOfChunks = 0; + let numOfWorkersUsed = 0; return { name: 'terser', async renderChunk(code: string, chunk: RenderedChunk, outputOptions: NormalizedOutputOptions) { + if (!workerPool) { + workerPool = new WorkerPool({ + filePath: fileURLToPath(import.meta.url), + maxWorkers + }); + } + + numOfChunks += 1; + const defaultOptions: Options = { sourceMap: outputOptions.sourcemap === true || typeof outputOptions.sourcemap === 'string' }; @@ -80,7 +88,18 @@ export default function terser(input: Options = {}) { return result; } catch (e) { return Promise.reject(e); + } finally { + numOfChunks -= 1; + if (numOfChunks === 0) { + numOfWorkersUsed = workerPool.numWorkers; + workerPool.close(); + workerPool = null; + } } + }, + + get numOfWorkersUsed() { + return numOfWorkersUsed; } }; } diff --git a/packages/terser/src/type.ts b/packages/terser/src/type.ts index daaf66651..0866af55f 100644 --- a/packages/terser/src/type.ts +++ b/packages/terser/src/type.ts @@ -1,5 +1,10 @@ +import type { AsyncResource } from 'async_hooks'; +import type { Worker } from 'worker_threads'; + import type { MinifyOptions } from 'terser'; +import type { taskInfo } from './constants'; + export interface Options extends MinifyOptions { nameCache?: Record; maxWorkers?: number; @@ -12,6 +17,12 @@ export interface WorkerContext { export type WorkerCallback = (err: Error | null, output?: WorkerOutput) => void; +interface WorkerPoolTaskInfo extends AsyncResource { + done(err: Error | null, result: any): void; +} + +export type WorkerWithTaskInfo = Worker & { [taskInfo]?: WorkerPoolTaskInfo | null }; + export interface WorkerContextSerialized { code: string; options: string; diff --git a/packages/terser/src/worker-pool.ts b/packages/terser/src/worker-pool.ts index 5154d4510..5630fd963 100644 --- a/packages/terser/src/worker-pool.ts +++ b/packages/terser/src/worker-pool.ts @@ -1,18 +1,31 @@ +import { AsyncResource } from 'async_hooks'; import { Worker } from 'worker_threads'; import { cpus } from 'os'; import { EventEmitter } from 'events'; import serializeJavascript from 'serialize-javascript'; +import { freeWorker, taskInfo } from './constants'; + import type { WorkerCallback, WorkerContext, WorkerOutput, WorkerPoolOptions, - WorkerPoolTask + WorkerPoolTask, + WorkerWithTaskInfo } from './type'; -const symbol = Symbol.for('FreeWoker'); +class WorkerPoolTaskInfo extends AsyncResource { + constructor(private callback: WorkerCallback) { + super('WorkerPoolTaskInfo'); + } + + done(err: Error | null, result: any) { + this.runInAsyncScope(this.callback, null, err, result); + this.emitDestroy(); + } +} export class WorkerPool extends EventEmitter { protected maxInstances: number; @@ -21,7 +34,8 @@ export class WorkerPool extends EventEmitter { protected tasks: WorkerPoolTask[] = []; - protected workers = 0; + protected workers: WorkerWithTaskInfo[] = []; + protected freeWorkers: WorkerWithTaskInfo[] = []; constructor(options: WorkerPoolOptions) { super(); @@ -29,29 +43,21 @@ export class WorkerPool extends EventEmitter { this.maxInstances = options.maxWorkers || cpus().length; this.filePath = options.filePath; - this.on(symbol, () => { + this.on(freeWorker, () => { if (this.tasks.length > 0) { - this.run(); + const { context, cb } = this.tasks.shift()!; + this.runTask(context, cb); } }); } - add(context: WorkerContext, cb: WorkerCallback) { - this.tasks.push({ - context, - cb - }); - - if (this.workers >= this.maxInstances) { - return; - } - - this.run(); + get numWorkers(): number { + return this.workers.length; } - async addAsync(context: WorkerContext): Promise { + addAsync(context: WorkerContext): Promise { return new Promise((resolve, reject) => { - this.add(context, (err, output) => { + this.runTask(context, (err, output) => { if (err) { reject(err); return; @@ -67,51 +73,54 @@ export class WorkerPool extends EventEmitter { }); } - private run() { - if (this.tasks.length === 0) { - return; - } - - const task = this.tasks.shift(); - - if (typeof task === 'undefined') { - return; + close() { + for (let i = 0; i < this.workers.length; i++) { + const worker = this.workers[i]; + worker.terminate(); } + } - this.workers += 1; - - let called = false; - const callCallback = (err: Error | null, output?: WorkerOutput) => { - if (called) { - return; - } - called = true; - - this.workers -= 1; - - task.cb(err, output); - this.emit(symbol); - }; - - const worker = new Worker(this.filePath, { - workerData: { - code: task.context.code, - options: serializeJavascript(task.context.options) - } - }); + private addNewWorker() { + const worker: WorkerWithTaskInfo = new Worker(this.filePath); - worker.on('message', (data) => { - callCallback(null, data); + worker.on('message', (result) => { + worker[taskInfo]?.done(null, result); + worker[taskInfo] = null; + this.freeWorkers.push(worker); + this.emit(freeWorker); }); worker.on('error', (err) => { - callCallback(err); + if (worker[taskInfo]) { + worker[taskInfo].done(err, null); + } else { + this.emit('error', err); + } + this.workers.splice(this.workers.indexOf(worker), 1); + this.addNewWorker(); }); - worker.on('exit', (code) => { - if (code !== 0) { - callCallback(new Error(`Minify worker stopped with exit code ${code}`)); + this.workers.push(worker); + this.freeWorkers.push(worker); + this.emit(freeWorker); + } + + private runTask(context: WorkerContext, cb: WorkerCallback) { + if (this.freeWorkers.length === 0) { + this.tasks.push({ context, cb }); + if (this.numWorkers < this.maxInstances) { + this.addNewWorker(); } - }); + return; + } + + const worker = this.freeWorkers.pop(); + if (worker) { + worker[taskInfo] = new WorkerPoolTaskInfo(cb); + worker.postMessage({ + code: context.code, + options: serializeJavascript(context.options) + }); + } } } diff --git a/packages/terser/src/worker.ts b/packages/terser/src/worker.ts index 1c3ea03b0..d2c528c1e 100644 --- a/packages/terser/src/worker.ts +++ b/packages/terser/src/worker.ts @@ -1,5 +1,4 @@ -import process from 'process'; -import { isMainThread, parentPort, workerData } from 'worker_threads'; +import { isMainThread, parentPort } from 'worker_threads'; import { hasOwnProperty, isObject } from 'smob'; @@ -22,21 +21,25 @@ function isWorkerContextSerialized(input: unknown): input is WorkerContextSerial ); } -export async function runWorker() { - if (isMainThread || !parentPort || !isWorkerContextSerialized(workerData)) { +export function runWorker() { + if (isMainThread || !parentPort) { return; } - try { - // eslint-disable-next-line no-eval - const eval2 = eval; + // eslint-disable-next-line no-eval + const eval2 = eval; - const options = eval2(`(${workerData.options})`); + parentPort.on('message', async (data: WorkerContextSerialized) => { + if (!isWorkerContextSerialized(data)) { + return; + } + + const options = eval2(`(${data.options})`); - const result = await minify(workerData.code, options); + const result = await minify(data.code, options); const output: WorkerOutput = { - code: result.code || workerData.code, + code: result.code || data.code, nameCache: options.nameCache }; @@ -48,8 +51,6 @@ export async function runWorker() { output.sourceMap = result.map; } - parentPort.postMessage(output); - } catch (e) { - process.exit(1); - } + parentPort?.postMessage(output); + }); } diff --git a/packages/terser/test/test.js b/packages/terser/test/test.js index 67d1eb6df..316fb2288 100644 --- a/packages/terser/test/test.js +++ b/packages/terser/test/test.js @@ -46,9 +46,11 @@ test.serial('minify via terser options', async (t) => { }); test.serial('minify multiple outputs', async (t) => { + let plugin; + const bundle = await rollup({ input: 'test/fixtures/unminified.js', - plugins: [terser()] + plugins: [(plugin = terser({ maxWorkers: 2 }))] }); const [bundle1, bundle2] = await Promise.all([ @@ -60,6 +62,20 @@ test.serial('minify multiple outputs', async (t) => { t.is(output1.code, '"use strict";window.a=5,window.a<3&&console.log(4);\n'); t.is(output2.code, 'window.a=5,window.a<3&&console.log(4);\n'); + t.is(plugin.numOfWorkersUsed, 2, 'used 2 workers'); +}); + +test.serial('minify multiple outputs with only 1 worker', async (t) => { + let plugin; + + const bundle = await rollup({ + input: 'test/fixtures/unminified.js', + plugins: [(plugin = terser({ maxWorkers: 1 }))] + }); + + await Promise.all([bundle.generate({ format: 'cjs' }), bundle.generate({ format: 'es' })]); + + t.is(plugin.numOfWorkersUsed, 1, 'used 1 worker'); }); test.serial('minify esm module', async (t) => { @@ -122,7 +138,7 @@ test.serial('throw error on terser fail', async (t) => { await bundle.generate({ format: 'esm' }); t.falsy(true); } catch (error) { - t.is(error.toString(), 'Error: Minify worker stopped with exit code 1'); + t.is(error.toString(), 'SyntaxError: Name expected'); } }); @@ -142,7 +158,7 @@ test.serial('throw error on terser fail with multiple outputs', async (t) => { await Promise.all([bundle.generate({ format: 'cjs' }), bundle.generate({ format: 'esm' })]); t.falsy(true); } catch (error) { - t.is(error.toString(), 'Error: Minify worker stopped with exit code 1'); + t.is(error.toString(), 'SyntaxError: Name expected'); } });