From 041f9458801aff5b07037257cbf921ea96e648ce Mon Sep 17 00:00:00 2001 From: wilk Date: Tue, 13 Nov 2018 12:30:36 +0100 Subject: [PATCH] #16 add test for broken workers and failure procedure --- __tests__/broken.ts | 69 ++++++++++++++++++++++++++++++++ __tests__/fail.ts | 95 ++++++++++++++++++++++++++++++++++++++++++++ __tests__/healing.ts | 20 ++++++---- __tests__/job.ts | 27 +++++++++++++ package.json | 8 ++-- 5 files changed, 207 insertions(+), 12 deletions(-) create mode 100644 __tests__/broken.ts create mode 100644 __tests__/fail.ts diff --git a/__tests__/broken.ts b/__tests__/broken.ts new file mode 100644 index 0000000..d84fdae --- /dev/null +++ b/__tests__/broken.ts @@ -0,0 +1,69 @@ +import { EventEmitter } from 'events' + +const FAKE_ERROR_MESSAGE = 'fake error' +let emittedMessages = 0 + +// mock of parentPort, so it can be used inside worker.js +class ParentPortMock extends EventEmitter { + postMessage(message: string): void { + // don't emit another error when the "error catch message" is sent again from postMessage + if (emittedMessages === 0) { + emittedMessages++ + this.emit('fake message', message) + } + } +} + +const parentPort = new ParentPortMock() + +// mock of Worker thread +export class WorkerMock extends EventEmitter { + constructor(private file: string) { + super() + + // interpret worker.js + require(file) + + // emit an error when something is sent from the worker + parentPort.on('fake message', () => + this.emit('error', new Error(FAKE_ERROR_MESSAGE)) + ) + + setTimeout(() => this.emit('error', new Error(FAKE_ERROR_MESSAGE)), 250) + } + + // parrot the worker + postMessage(message: string): void { + parentPort.emit('message', message) + } + + terminate(cb: Function): void { + cb() + } +} + +// mock worker_threads +const mock = jest.mock('worker_threads', () => ({ + Worker: WorkerMock, + parentPort +})) + +import { start } from '../src/job' + +// restore original worker_threads module +afterAll(() => mock.restoreAllMocks()) + +describe('Broken Job Testing', () => { + it('should not start the worker pool with broken workers', async () => { + let error + + try { + await start() + } catch (err) { + error = err + } + + expect(error).toBeDefined() + expect(error.message).toBe(FAKE_ERROR_MESSAGE) + }) +}) diff --git a/__tests__/fail.ts b/__tests__/fail.ts new file mode 100644 index 0000000..97bd798 --- /dev/null +++ b/__tests__/fail.ts @@ -0,0 +1,95 @@ +import { EventEmitter } from 'events' +import os from 'os' + +const MAX_THREADS = os.cpus().length +const FAKE_ERROR_MESSAGE = 'fake error' +let emittedMessages = 0 + +// mock of parentPort, so it can be used inside worker.js +class ParentPortMock extends EventEmitter { + postMessage(message: string): void { + // don't emit another error when the "error catch message" is sent again from postMessage + if (emittedMessages === 0) { + emittedMessages++ + this.emit('fake message', message) + } + } +} + +const parentPort = new ParentPortMock() +let workersCounter = 0 +const mockCallback = jest.fn() + +// mock of Worker thread +export class WorkerMock extends EventEmitter { + constructor(private file: string) { + super() + workersCounter++ + + // interpret worker.js + require(file) + + // emit an error when something is sent from the worker + parentPort.on('fake message', () => + this.emit('error', new Error(FAKE_ERROR_MESSAGE)) + ) + + setTimeout(() => { + if (workersCounter > MAX_THREADS) { + this.emit('error', new Error(FAKE_ERROR_MESSAGE)) + mockCallback() + } else this.emit('online') + }, 250) + } + + // parrot the worker + postMessage(message: string): void { + parentPort.emit('message', message) + } + + terminate(cb: Function): void { + cb() + } +} + +// mock worker_threads +const mock = jest.mock('worker_threads', () => ({ + Worker: WorkerMock, + parentPort +})) + +import { job, stop, start } from '../src/job' + +afterAll(async () => await stop()) +// restore original worker_threads module +afterAll(() => mock.restoreAllMocks()) + +describe('Fail Testing', () => { + it('should not resurrect a broken dead worker', async done => { + let error + let res + + try { + await start() + res = await job(() => { + let i = 0 + for (i = 0; i < 1000000; i++) {} + + return i + }) + } catch (err) { + error = err + } + + setTimeout(() => { + expect(res).toBeUndefined() + expect(error).toBeDefined() + expect(error.message).toBe(FAKE_ERROR_MESSAGE) + // if a dead worker has been resurrected, then the workersCounter must be greater + // than the MAX_THREADS const + expect(mockCallback).toBeCalled() + + done() + }, 500) + }) +}) diff --git a/__tests__/healing.ts b/__tests__/healing.ts index a30a015..4eb415a 100644 --- a/__tests__/healing.ts +++ b/__tests__/healing.ts @@ -59,8 +59,8 @@ afterAll(async () => await stop()) // restore original worker_threads module afterAll(() => mock.restoreAllMocks()) -describe('Job Testing', () => { - it('should resurrect a dead worker', async () => { +describe('Self-Healing Testing', () => { + it('should resurrect a dead worker', async done => { let error let res @@ -75,11 +75,15 @@ describe('Job Testing', () => { error = err } - expect(res).toBeUndefined() - expect(error).toBeDefined() - expect(error.message).toBe(FAKE_ERROR_MESSAGE) - // if a dead worker has been resurrected, then the workersCounter must be greater - // than the MAX_THREADS const - expect(workersCounter).toBe(MAX_THREADS + 1) + setTimeout(() => { + expect(res).toBeUndefined() + expect(error).toBeDefined() + expect(error.message).toBe(FAKE_ERROR_MESSAGE) + // if a dead worker has been resurrected, then the workersCounter must be greater + // than the MAX_THREADS const + expect(workersCounter).toBe(MAX_THREADS + 1) + + done() + }, 300) }) }) diff --git a/__tests__/job.ts b/__tests__/job.ts index 8c470d5..2be3ada 100644 --- a/__tests__/job.ts +++ b/__tests__/job.ts @@ -123,6 +123,33 @@ describe('Job Testing', () => { expect(res).toBeUndefined() }) + it('should throw an error if the config is not an object', async () => { + let error + let res + + try { + // @ts-ignore + res = await job( + () => { + let i = 0 + for (i = 0; i < 1000000; i++) {} + + return i + }, + { ctx: 'context' } + ) + } catch (err) { + error = err + } + + expect(error).toBeDefined() + expect(error.message).toEqual( + `job needs an object as ctx.\nTry with:\n> job(() => {...}, {ctx: {...}})` + ) + expect(typeof error.stack).toBe('string') + expect(res).toBeUndefined() + }) + it('should throw a serialization error when a class is given back to main thread', async () => { let error let res diff --git a/package.json b/package.json index 5a3eb57..feb35c2 100644 --- a/package.json +++ b/package.json @@ -51,10 +51,10 @@ ], "coverageThreshold": { "global": { - "branches": 90, - "functions": 80, - "lines": 95, - "statements": 90 + "branches": 95, + "functions": 100, + "lines": 100, + "statements": 95 } }, "transform": {