Useful requestor factories #17
Replies: 9 comments 13 replies
-
There is also the 'widget' from https://www.crockford.com/parseq_demo.html and 'delay' from https://www.crockford.com/pp/eventuality.pptx |
Beta Was this translation helpful? Give feedback.
-
Well, I wrote a bunch of helper functions which basically enrich the parseq library with the patterns I more frequenly employ. Let me provide couple of example: wrap_reason: it's factory which takes a requestor and returns a never failing requestor. I find it useful when I want to start many downloads in parallel and I don't want to block the overall task if one or more are failing. function wrap_reason(requestor) {
return function (cb, value) {
return requestor(function (value, reason) {
return cb({value, reason});
}, value);
};
} factory_maker (make_factory? factory_of_factories? Phil Karlton said that naming things is hard in computer science - I would add, expecially when english is not your first language). Ok, this is a bit complicated to describe. When "requestorizing" an eventual task (fs.readFile for instance), you need to pass its parameters as the factory arguments (if they are known in advance), or if they are know in a later turn, as the requestor second argument. function factory_maker(requestor, factory_name = "factory") {
//the adapter combines the online value passed to the requestor with the
// closure/context in which the factory is executed
// its return value is passed to the requestor
return function factory(adapter) {
//a default adapter is provided in order to manage the most common cases
function default_adapter(precomputed) {
return function (value) {
//default: both values are object, so we give the requestor their merge
if (
typeof precomputed === "object"
&& !Array.isArray(precomputed)
) {
return Object.assign(
{},
precomputed,
value
);
}
//otherwise, default behavior is to provide only the precomputed value
//unless it's nullish
return precomputed ?? value;
};
}
if (typeof adapter !== "function") {
adapter = default_adapter(adapter);
}
return function req(cb, value) {
check_callback(cb, factory_name);
return parseq.sequence([
requestorize(adapter),
requestor
])(cb, value);
};
};
} A simple example: function readFileRequestor(callback, {path, options}) {
fs.readFile(path, options, function (err, data) {
if (err) {
return callback(undefined, err);
}
return callback(data);
});
}
const readFile = factory_maker(readFileRequestor);
//generic adapter
readFile(function (value_from_previous_requestor) {
return {
path: value_from_previous_requestor.default_path,
options: {}
};
});
//default adapter
//is merged with the value arriving from the previous requestor if the adapter argument is an object
readFile({path: "./file.ext", options: {}});
//default adapter
readFile(); //the value arriving from the previous requestor is passed to the requestor In this way I can write a more readable code, and more testable too (I can write tests for the adapter function if they're not straightforward). 😄 |
Beta Was this translation helpful? Give feedback.
-
I call a function that makes factories a 'factory factory'. Is that too meta? Too repetitive? |
Beta Was this translation helpful? Give feedback.
-
pool(throttle)Runs a dynamic collection of requestors in parallel. It is like Unlike a regular requestor factory, it returns an object with two methods: add(requestor, initial_value)Adds the requestor to the pool. If the pool is not being throttled, it will be run immediately. Otherwise it will be put on a wait list. The requestor is given the initial_value when it is run. requestor(callback) -> cancel(reason)A requestor that completes wunce the pool has no more running requestors. If every requestor succeeds, the callback is called with the array of values produced by the requestors, in the order they were added to the pool. If any requestor fails, its reason is passed to the callback and all other requestors are cancelled. Any attempt to run this requestor whilst it is already running will fail. ExampleHere, a pool is used to traverse the filesystem, running an operation on each file as it is discovered. No more than 10 file_requestor's are run simultaneously. The
On success, value is an array containing file_requestor's value for each file, in the order that the file was discovered. An implementation is available here. |
Beta Was this translation helpful? Give feedback.
-
I've recently faced an issue with parseq, concerning the amount of memory occupied by the requestor returned values during a parallel run. In this case the throttle parameter is not helpful, since the resource bottleneck is in the local process, and at the end of the parallel task, you still have the all the returned values in an array. function reduce(
reducer,
initial_value,
requestor_array,
throttle
) {
throttle = throttle || requestor_array.length;
return parseq.sequence([
parseq.parallel(requestor_array.slice(0, throttle)),
requestorize(function (array) {
return array.reduce(reducer, initial_value);
}),
if_else(
() => throttle >= requestor_array.length,
do_nothing,
(callback, value) => reduce(
reducer,
value,
requestor_array.slice(throttle),
throttle
)(callback, value)
)
]);
} The reducer is a two argument function, not a requestor, since I don't see why I should perform an eventual task in the reducer step (but it can be easily extended). It's still pretty raw, though. I've discarded the time parameters and the optional requestors array. I was also wondering if I can extend it in order to apply it to an infinite stream (pagination, UI events, etc.) as RxJs does. I still think reactive programming (the observer pattern) is the more natural way to deal with events/streams but I would like to give a try. |
Beta Was this translation helpful? Give feedback.
-
Log the value of a sequence. You can put it anywhere in a sequence, and it will put the current value in the log, without disrupting the sequence progress.
|
Beta Was this translation helpful? Give feedback.
-
My receipt to requestorize a promise...quite convoluted, but because of how promises work:
const is_thunk = (f) => typeof f === "function" && !f.length;
const is_promise = (p) => typeof p?.then === "function";//a thenable
function promise_requestorize(
promise_thunk,
action = "executing promise",
cancel = undefined
) {
if (!is_thunk(promise_thunk)) {
throw parseq.make_reason(
action,
`Not a thunk when ${action}`,
promise_thunk
);
}
return function (callback) {
parseq.check_callback(callback, action);
let is_called = false;
function promise_callback(value, reason) {
if (!is_called) {
is_called = true;
if (value === undefined) {
return callback(
undefined,
//first callback call: promise has thrown
parseq.make_reason(
"promise_requestorize",
`Failed when ${action}`,
reason
)
);
}
return callback(value);
}
//second callback call: callback has thrown
const e = new Error(`Callback failed when ${action}`);
e.evidence = reason;
throw e;
}
const promise = promise_thunk();
if (!is_promise(promise)) {
const ee = new Error(`Not a promise when ${action}`);
return promise_callback(
undefined,
ee
);
}
promise.then(promise_callback).catch(function (e) {
//at this point we still don't know if the promise or the callback has thrown
promise_callback(
undefined,
e
);
});
if (typeof cancel === "function") {
return cancel;
}
};
} |
Beta Was this translation helpful? Give feedback.
-
I should verify both... |
Beta Was this translation helpful? Give feedback.
-
Actually the run function already discards multiple execution of callbacks*: Line 150 in 04aa2ee Line 165 in 04aa2ee number is used as a flag *however, there's the loophole described in #13 |
Beta Was this translation helpful? Give feedback.
-
As far as I am aware, only a single non-parseq requestor factory has ever been published:
requestorize
, from How JavaScript Works. As a result, parseq users have been left to write their own.This is a place where we can share and learn about requestor factories that have proven useful.
Please provide an example of your requestor factory in use. You can include a link to its implementation if it is non-trivial. For example:
thru(requestor)
Returns a requestor that produces the same value it receives. The requestor
parameter is optional.
In the preceeding example, requestors
b()
andc()
receive the value produced bya()
. The value produced byb()
is discarded.Beta Was this translation helpful? Give feedback.
All reactions