Skip to content

Commit

Permalink
add execWithMsg. test, build passed
Browse files Browse the repository at this point in the history
  • Loading branch information
dimitriylol authored and dlyman committed Feb 26, 2019
1 parent da0c602 commit f11f6cb
Show file tree
Hide file tree
Showing 8 changed files with 175 additions and 131 deletions.
4 changes: 2 additions & 2 deletions dist/worker.js
Original file line number Diff line number Diff line change
Expand Up @@ -136,10 +136,10 @@
worker.on('message', function (request) {
try {
var method = worker.methods[request.method];

if (method) {
// execute the function
var result = method.apply(method, request.params);
var result = method.apply(method, (request.params || []).concat(function (msg) { worker.send({ id: request.id, msg: msg }) }))

if (isPromise(result)) {
// promise returned, resolve this and then return
Expand Down
159 changes: 89 additions & 70 deletions dist/workerpool.js
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
* Offload tasks to a pool of workers on node.js and in the browser.
*
* @version 3.1.1
* @date 2019-02-25
* @date 2019-02-26
*
* @license
* Copyright (C) 2014-2016 Jos de Jong <[email protected]>
Expand All @@ -25,14 +25,14 @@

(function webpackUniversalModuleDefinition(root, factory) {
if(typeof exports === 'object' && typeof module === 'object')
module.exports = factory(require("os"), (function webpackLoadOptionalExternalModule() { try { return require("worker_threads"); } catch(e) {} }()), require("child_process"));
module.exports = factory(require("os"), require("child_process"));
else if(typeof define === 'function' && define.amd)
define(["os", "worker_threads", "child_process"], factory);
define(["os", "child_process"], factory);
else if(typeof exports === 'object')
exports["workerpool"] = factory(require("os"), (function webpackLoadOptionalExternalModule() { try { return require("worker_threads"); } catch(e) {} }()), require("child_process"));
exports["workerpool"] = factory(require("os"), require("child_process"));
else
root["workerpool"] = factory(root["os"], root["worker_threads"], root["child_process"]);
})(this, function(__WEBPACK_EXTERNAL_MODULE_2__, __WEBPACK_EXTERNAL_MODULE_11__, __WEBPACK_EXTERNAL_MODULE_12__) {
root["workerpool"] = factory(root["os"], root["child_process"]);
})(this, function(__WEBPACK_EXTERNAL_MODULE_2__, __WEBPACK_EXTERNAL_MODULE_11__) {
return /******/ (function(modules) { // webpackBootstrap
/******/ // The module cache
/******/ var installedModules = {};
Expand Down Expand Up @@ -194,40 +194,8 @@ return /******/ (function(modules) { // webpackBootstrap
}
}


/**
* Execute a function on a worker.
*
* Example usage:
*
* var pool = new Pool()
*
* // call a function available on the worker
* pool.exec('fibonacci', [6])
*
* // offload a function
* function add(a, b) {
* return a + b
* };
* pool.exec(add, [2, 4])
* .then(function (result) {
* console.log(result); // outputs 6
* })
* .catch(function(error) {
* console.log(error);
* });
*
* @param {String | Function} method Function name or function.
* If `method` is a string, the corresponding
* method on the worker will be executed
* If `method` is a Function, the function
* will be stringified and executed via the
* workers built-in function `run(fn, args)`.
* @param {Array} [params] Function arguments applied when calling the function
* @return {Promise.<*, Error>} result
*/
Pool.prototype.exec = function (method, params) {
// validate type of arguments
Pool.prototype._exec = function (method, params, msgCallback) {
// validate type of arguments
if (params && !Array.isArray(params)) {
throw new TypeError('Array expected as argument "params"');
}
Expand All @@ -240,6 +208,7 @@ return /******/ (function(modules) { // webpackBootstrap
var task = {
method: method,
params: params,
msgCallback: msgCallback,
resolver: resolver,
timeout: null
};
Expand Down Expand Up @@ -267,11 +236,56 @@ return /******/ (function(modules) { // webpackBootstrap
}
else if (typeof method === 'function') {
// send stringified function and function arguments to worker
return this.exec('run', [String(method), params]);
return this._exec('run', [String(method), params], msgCallback);
}
else {
throw new TypeError('Function or string expected as argument "method"');
}
}

Pool.prototype.execWithMsg = function (method, params, msgCallback) {
if (arguments.length !== 3) {
throw new TypeError('Expected 3 arguments: method, params and msgCallback')
}
return this._exec(method, params, msgCallback)
}

/**
* Execute a function on a worker.
*
* Example usage:
*
* var pool = new Pool()
*
* // call a function available on the worker
* pool.exec('fibonacci', [6])
*
* // offload a function
* function add(a, b) {
* return a + b
* };
* pool.exec(add, [2, 4])
* .then(function (result) {
* console.log(result); // outputs 6
* })
* .catch(function(error) {
* console.log(error);
* });
*
* @param {String | Function} method Function name or function.
* If `method` is a string, the corresponding
* method on the worker will be executed
* If `method` is a Function, the function
* will be stringified and executed via the
* workers built-in function `run(fn, args)`.
* @param {Array} [params] Function arguments applied when calling the function
* @return {Promise.<*, Error>} result
*/
Pool.prototype.exec = function (method, params) {
if (arguments.length >= 3) {
throw new TypeError('Expected max 2 arguments: method and params')
}
return this._exec(method, params)
};

/**
Expand All @@ -289,7 +303,6 @@ return /******/ (function(modules) { // webpackBootstrap
return this.exec('methods')
.then(function (methods) {
var proxy = {};

methods.forEach(function (method) {
proxy[method] = function () {
return pool.exec(method, Array.prototype.slice.call(arguments));
Expand Down Expand Up @@ -334,7 +347,7 @@ return /******/ (function(modules) { // webpackBootstrap
// check if the task is still pending (and not cancelled -> promise rejected)
if (task.resolver.promise.pending) {
// send the request to the worker
var promise = worker.exec(task.method, task.params, task.resolver)
var promise = worker.exec(task.method, task.params, task.resolver, task.msgCallback)
.then(me._boundNext)
.catch(function () {
// if the worker crashed and terminated, remove it from the pool
Expand Down Expand Up @@ -994,10 +1007,10 @@ return /******/ (function(modules) { // webpackBootstrap
if (WorkerThreads) {
this.worker = setupWorkerThreadWorker(this.script, WorkerThreads);
} else {
this.worker = setupProcessWorker(this.script, resolveForkOptions(options), __webpack_require__(12));
this.worker = setupProcessWorker(this.script, resolveForkOptions(options), __webpack_require__(11));
}
} else {
this.worker = setupProcessWorker(this.script, resolveForkOptions(options), __webpack_require__(12));
this.worker = setupProcessWorker(this.script, resolveForkOptions(options), __webpack_require__(11));
}
}

Expand All @@ -1019,21 +1032,31 @@ return /******/ (function(modules) { // webpackBootstrap
var id = response.id;
var task = me.processing[id];
if (task !== undefined) {
// remove the task from the queue
delete me.processing[id];

// test if we need to terminate
if (me.terminating === true) {
// complete worker termination if all tasks are finished
me.terminate();
}

// resolve the task's promise
if (response.error) {
task.resolver.reject(objectToError(response.error));
if (response.msg) {
if (task.msgCallback) {
task.msgCallback(response.msg)
}
else {
throw new TypeError("worker send an object with 'msg' field but msgCallback wasn't passed")
}
}
else {
task.resolver.resolve(response.result);
// remove the task from the queue
delete me.processing[id];

// test if we need to terminate
if (me.terminating === true) {
// complete worker termination if all tasks are finished
me.terminate();
}

// resolve the task's promise
if (response.error) {
task.resolver.reject(objectToError(response.error));
}
else {
task.resolver.resolve(response.result);
}
}
}
}
Expand Down Expand Up @@ -1090,9 +1113,10 @@ return /******/ (function(modules) { // webpackBootstrap
* @param {String} method
* @param {Array} [params]
* @param {{resolve: Function, reject: Function}} [resolver]
* @param {Function} [msgCallback] - listen messages from worker
* @return {Promise.<*, Error>} result
*/
WorkerHandler.prototype.exec = function(method, params, resolver) {
WorkerHandler.prototype.exec = function(method, params, resolver, msgCallback) {
if (!resolver) {
resolver = Promise.defer();
}
Expand All @@ -1103,7 +1127,8 @@ return /******/ (function(modules) { // webpackBootstrap
// register a new task as being in progress
this.processing[id] = {
id: id,
resolver: resolver
resolver: resolver,
msgCallback: msgCallback
};

// build a JSON-RPC request
Expand Down Expand Up @@ -1365,7 +1390,7 @@ return /******/ (function(modules) { // webpackBootstrap
* This file is automatically generated,
* changes made in this file will be overwritten.
*/
module.exports = "!function(r){function e(n){if(o[n])return o[n].exports;var t=o[n]={exports:{},id:n,loaded:!1};return r[n].call(t.exports,t,t.exports,e),t.loaded=!0,t.exports}var o={};e.m=r,e.c=o,e.p=\"\",e(0)}([function(module,exports,__webpack_require__){function convertError(r){return Object.getOwnPropertyNames(r).reduce(function(e,o){return Object.defineProperty(e,o,{value:r[o],enumerable:!0})},{})}function isPromise(r){return r&&\"function\"==typeof r.then&&\"function\"==typeof r.catch}var worker={};if(\"undefined\"!=typeof self&&\"function\"==typeof postMessage&&\"function\"==typeof addEventListener)worker.on=function(r,e){addEventListener(r,function(r){e(r.data)})},worker.send=function(r){postMessage(r)};else{if(\"undefined\"==typeof process)throw new Error(\"Script must be executed as a worker\");var WorkerThreads;try{WorkerThreads=__webpack_require__(!function(){var r=new Error('Cannot find module \"worker_threads\"');throw r.code=\"MODULE_NOT_FOUND\",r}())}catch(r){if(\"object\"!=typeof r||null===r||\"MODULE_NOT_FOUND\"!=r.code)throw r}if(WorkerThreads&&null!==WorkerThreads.parentPort){var parentPort=WorkerThreads.parentPort;worker.send=parentPort.postMessage.bind(parentPort),worker.on=parentPort.on.bind(parentPort)}else worker.on=process.on.bind(process),worker.send=process.send.bind(process)}worker.methods={},worker.methods.run=function run(fn,args){var f=eval(\"(\"+fn+\")\");return f.apply(f,args)},worker.methods.methods=function(){return Object.keys(worker.methods)},worker.on(\"message\",function(r){try{var e=worker.methods[r.method];if(!e)throw new Error('Unknown method \"'+r.method+'\"');var o=e.apply(e,r.params);isPromise(o)?o.then(function(e){worker.send({id:r.id,result:e,error:null})}).catch(function(e){worker.send({id:r.id,result:null,error:convertError(e)})}):worker.send({id:r.id,result:o,error:null})}catch(e){worker.send({id:r.id,result:null,error:convertError(e)})}}),worker.register=function(r){if(r)for(var e in r)r.hasOwnProperty(e)&&(worker.methods[e]=r[e]);worker.send(\"ready\")},exports.add=worker.register}]);";
module.exports = "!function(r){function e(n){if(o[n])return o[n].exports;var t=o[n]={exports:{},id:n,loaded:!1};return r[n].call(t.exports,t,t.exports,e),t.loaded=!0,t.exports}var o={};e.m=r,e.c=o,e.p=\"\",e(0)}([function(module,exports,__webpack_require__){function convertError(r){return Object.getOwnPropertyNames(r).reduce(function(e,o){return Object.defineProperty(e,o,{value:r[o],enumerable:!0})},{})}function isPromise(r){return r&&\"function\"==typeof r.then&&\"function\"==typeof r.catch}var worker={};if(\"undefined\"!=typeof self&&\"function\"==typeof postMessage&&\"function\"==typeof addEventListener)worker.on=function(r,e){addEventListener(r,function(r){e(r.data)})},worker.send=function(r){postMessage(r)};else{if(\"undefined\"==typeof process)throw new Error(\"Script must be executed as a worker\");var WorkerThreads;try{WorkerThreads=__webpack_require__(!function(){var r=new Error('Cannot find module \"worker_threads\"');throw r.code=\"MODULE_NOT_FOUND\",r}())}catch(r){if(\"object\"!=typeof r||null===r||\"MODULE_NOT_FOUND\"!=r.code)throw r}if(WorkerThreads&&null!==WorkerThreads.parentPort){var parentPort=WorkerThreads.parentPort;worker.send=parentPort.postMessage.bind(parentPort),worker.on=parentPort.on.bind(parentPort)}else worker.on=process.on.bind(process),worker.send=process.send.bind(process)}worker.methods={},worker.methods.run=function run(fn,args){var f=eval(\"(\"+fn+\")\");return f.apply(f,args)},worker.methods.methods=function(){return Object.keys(worker.methods)},worker.on(\"message\",function(r){try{var e=worker.methods[r.method];if(!e)throw new Error('Unknown method \"'+r.method+'\"');var o=e.apply(e,(r.params||[]).concat(function(e){worker.send({id:r.id,msg:e})}));isPromise(o)?o.then(function(e){worker.send({id:r.id,result:e,error:null})}).catch(function(e){worker.send({id:r.id,result:null,error:convertError(e)})}):worker.send({id:r.id,result:o,error:null})}catch(e){worker.send({id:r.id,result:null,error:convertError(e)})}}),worker.register=function(r){if(r)for(var e in r)r.hasOwnProperty(e)&&(worker.methods[e]=r[e]);worker.send(\"ready\")},exports.add=worker.register}]);";


/***/ }),
Expand Down Expand Up @@ -1426,7 +1451,7 @@ return /******/ (function(modules) { // webpackBootstrap

var WorkerThreads;
try {
WorkerThreads = __webpack_require__(11);
WorkerThreads = __webpack_require__(!(function webpackMissingModule() { var e = new Error("Cannot find module \"worker_threads\""); e.code = 'MODULE_NOT_FOUND'; throw e; }()));
} catch(error) {
if (typeof error === 'object' && error !== null && error.code == 'MODULE_NOT_FOUND') {
// no worker_threads, fallback to sub-process based workers
Expand Down Expand Up @@ -1494,10 +1519,10 @@ return /******/ (function(modules) { // webpackBootstrap
worker.on('message', function (request) {
try {
var method = worker.methods[request.method];

if (method) {
// execute the function
var result = method.apply(method, request.params);
var result = method.apply(method, (request.params || []).concat(function (msg) { worker.send({ id: request.id, msg: msg }) }))

if (isPromise(result)) {
// promise returned, resolve this and then return
Expand Down Expand Up @@ -1564,12 +1589,6 @@ return /******/ (function(modules) { // webpackBootstrap

/***/ }),
/* 11 */
/***/ (function(module, exports) {

module.exports = require("worker_threads");

/***/ }),
/* 12 */
/***/ (function(module, exports) {

module.exports = require("child_process");
Expand Down
2 changes: 1 addition & 1 deletion dist/workerpool.map

Large diffs are not rendered by default.

Loading

0 comments on commit f11f6cb

Please sign in to comment.