-
Notifications
You must be signed in to change notification settings - Fork 152
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Feat/abort listener #448
Feat/abort listener #448
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks a lot, this looks good!
I made some inline comments, can you have a look at those?
@josdejong var workerpool = require('../');
var pool = workerpool.pool();
function asycTimeout() {
return new Promise(function (resolve) {
let timeout = setTimeout(function () {
resolve();
}, 5000);
workerpool.addAbortListener(async function () {
await new Promise((res, rej) => {
setTimeout(res, 1000);
});
clearTimeout(timeout);
resolve();
});
});
};
pool.exec(asycTimeout, []) The above will error with UPDATE: After playing around with scopes on the function wrapper from in worker.methods.run = function run(fn, args) {
var f = new Function('return (' + fn + ').apply(this, arguments);');
f.addAbortListener = function(listener) {
worker.abortListeners.push(listener);
}
return f.apply(f, args);
}; If we modify the global value to var workerpool = require('../');
var pool = workerpool.pool();
function asycTimeout() {
var me = this;
return new Promise(function (resolve) {
let timeout = setTimeout(function () {
resolve();
}, 5000);
console.log(me.addAbortListener, globalThis);
me.addAbortListener(async function () {
console.log("adasd", clearTimeout);
clearTimeout(timeout);
resolve();
});
});
};
pool.exec(asycTimeout, []) |
It sounds like a good idea to attach const addAbortListener = this.addAbortListener
// ...
addAbortListener(...) EDIT: and then it makes sense to me to offer this as the only way to add an abort listener, for both offloaded and dedicated workers, right? |
Yes this makes sense to me. can make the updates and implement the tests/examples now that we have this worked out. Since we now can extend the function created for the worker task we can create a
So for example this.worker.addEventListener |
Question on how errors are processed. Since I think for this new feature to work as intended the message protocol does need extending to account for |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the updates! I added a few inline comments.
About the WorkerHandler terminating the worker when receiving a cancel
or timeout
error: that is a very good point. Do you have ideas on how to solve this? Indeed both the worker side and the WorkerHandler side both need to know whether there is an abort listener in place or not, and adjust their behavior accordingly.
@josdejong Sorry for the delayed response. After giving it some thought I see a possible flow for communication which will allow for proper tracking of handlers based on if a worker can be reused after the sequenceDiagram
participant Pool
participant WorkerHandler
participant Worker
Pool->>WorkerHandler: TimeoutError/CamcelationError occures, move task with rosolver to `tracking` queue. Send a message to the worker to run cleanup with the task id
WorkerHandler ->> Worker: Worker recieves message, execute abort handlers.
Worker ->> WorkerHandler: Send the result of abort handler execution to the worker handler with the task id sent
WorkerHandler ->> Pool: Check the task id for a tracking and if present either resolve or reject the resolver promise based on the data sent in the message from the worker. Cleanup the task context
With the above model, the The other idea, although much more involved is to rewrite how items are processed on the producer. Instead of items only being processed in a single promise chain with a recursive call. We could use something like p-queue to handle assigning tasks to workers and managing |
Ow nice I didn't know that you can draw a sequenceDiagram straight in GitHub issues 😎. I'm not sure whether a queue is really needed since a WorkerHandler and a Worker only process a single task at a time, but at least both need to get the information on whether to abort or not. I think we could do something like this (it's close to your diagram I think):
What do you think? |
Your outline makes sense and aligns with what the diagram outlines but with better definitions of possible execution results. I think you have mapped out most of the remaining details. However, I think we might want an explicit case for when there are |
Thanks. That makes sense indeed, the abort handler can throw an error too. |
Hey @josdejong I think I have made good progress and was able to implement the feature set we have discussed above where
I have added tests for both I still have to update the docs / examples but I think it is ready for review. One thing I am noticing is that when I run tests the
|
Ok I've now merge #470. Can you update the PR to the latest version of the |
Merged in,I was thinking the return in the initially |
When changing // file: terminatest.js
const { Promise } = require('./src/Promise');
const Pool = require('./src/Pool');
run()
async function run() {
console.log('start')
const pool1 = new Pool();
await pool1.exec(sleep, [100])
.timeout(50)
.catch(err => { console.error(String(err)) })
const pool2 = new Pool();
const result = await pool2.exec(add, [3, 4])
console.log({ result })
await sleep(1000)
console.log('pool1 workers:', pool1.workers.length) // <-- is 1 but should be 0
await pool1.terminate() // <-- never finishes
await pool2.terminate()
console.log('done')
}
function sleep(delay) {
return new Promise((resolve) => setTimeout(resolve, delay))
}
function add(a, b) {
return a + b
} I did some debugging but didn't yet found the cause of this. Some observations:
But (3) is not the right solution, we should probably look into why the terminate callback is not invoked in |
Awesome! I'm really happy that this last vague bug is resolved now 😅 ! Let's try to improve |
I think we're there, clicking the Merge button now 🎉 . Thanks a lot for your patience and perseverance Josh, this was a long journey with quite some bumps along the road! |
Published now in |
No problem, was great working on this :) |
@joshLong145 Hello, It's a great feature for me. thanks for your working. const main = async () => {
const cleanedUpTask = pool.exec('asyncTimeout', [], {
on: function (payload) {
if (payload.stdout) {
console.log(`[Worker] Out: ${payload.stdout.trim()}`)
}
}
}).timeout(1_000).catch((err) => {
console.log("task timeout");
console.log("timeout occured: ", err.message);
console.log("worker count ", workerCount);
return pool.exec(add, [1, 2]).then((sum) => {
console.log('add result', sum);
console.log("worker count: ", workerCount);
});
});
await cleanedUpTask;
...
}
main() worker.js function asyncTimeout() {
var me = this;
console.log('trigger AsyncTimeout')
return new Promise(function (resolve) {
let timeout = setTimeout(() => {
resolve();
}, 5000);
me.worker.addAbortListener(async function () {
console.log('clearTimeout')
clearTimeout(timeout);
resolve();
});
});
} output is
|
@joshLong145 While testing
|
@GwiYeong |
@joshLong145 I created #479. Thanks! |
Adds an
abortListener
to workers which allow for cleanup of async tasks which can be run as a cleanup operation to allow workers to be reused if a task timeout or cancellation occurs.connects PR #441