Skip to content

Commit

Permalink
IOx: adding onError/offError error-listener capability (still needs t…
Browse files Browse the repository at this point in the history
…ests and docs)
  • Loading branch information
getify committed Apr 14, 2022
1 parent 08e48f6 commit 939fedb
Showing 1 changed file with 144 additions and 24 deletions.
168 changes: 144 additions & 24 deletions src/io/iox.js
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,11 @@ onceEvent = curry(onceEvent,2);

of.empty = ofEmpty;

var registeredPromiseCatches = new WeakSet();
var registerHooks = new WeakMap();
var ioxErrorCallbacks = new WeakMap();
var ioxConnections = new Map();
var ioxReverseConnections = new Map();
const BRAND = {};
const EMPTY = Symbol("empty");
const UNSET = Symbol("unset");
Expand Down Expand Up @@ -102,8 +106,9 @@ function IOx(iof,deps = []) {
var publicAPI = Object.assign(IOx$,{
map, chain, flatMap: chain, bind: chain,
concat, run, stop, close, isClosed, never,
isNever, toString, _chain_with_IO, _inspect,
_bind: origBind, _is, [Symbol.toStringTag]: TAG,
isNever, toString, onError, offError,
_chain_with_IO, _inspect, _bind: origBind, _is,
[Symbol.toStringTag]: TAG,
});
// decorate API methods with `.pipe(..)` helper
definePipeWithAsyncFunctionComposition(publicAPI,"map");
Expand All @@ -129,16 +134,48 @@ function IOx(iof,deps = []) {
);
}

function onError(errorCB) {
if (!ioxErrorCallbacks.has(publicAPI)) {
ioxErrorCallbacks.set(publicAPI,new Set());
}
ioxErrorCallbacks.get(publicAPI).add(errorCB);
return publicAPI;
}

function offError(errorCB) {
if (ioxErrorCallbacks.has(publicAPI)) {
ioxErrorCallbacks.get(publicAPI).delete(errorCB);
}
return publicAPI;
}

function run(env) {
if (currentVal === NEVER) {
return;
}
else if (!closing) {
return (isRunSignal(env) ?
io.run(env) :
trampoline(io.run(runSignal(env)))
);
try {
return checkRunRes(
isRunSignal(env) ?

continuation(() => io.run(env),checkRunRes) :

trampoline(io.run(runSignal(env)))
);
}
catch (err) {
notifyIOxError(publicAPI,err);
throw err;
}
}
}

function checkRunRes(runRes) {
if (isPromise(runRes) && !registeredPromiseCatches.has(runRes)) {
registeredPromiseCatches.add(runRes);
runRes.catch(err => notifyIOxError(publicAPI,err));
}
return runRes;
}

function stop() {
Expand All @@ -155,6 +192,7 @@ function IOx(iof,deps = []) {
else if (!closing) {
closing = true;
stop();
removeIOxConnections(publicAPI);

let cont = continuation(
() => {
Expand Down Expand Up @@ -193,8 +231,11 @@ function IOx(iof,deps = []) {
unregisterWithDeps();
deps = currentEnv = iof = null;
if (isPromise(v)) {
updateCurrentVal(v,/*drainQueueIfAsync=*/false)
.catch(logUnhandledError);
let res = updateCurrentVal(v,/*drainQueueIfAsync=*/false);
if (!registeredPromiseCatches.has(res)) {
registeredPromiseCatches.add(res);
res.catch(err => notifyIOxError(publicAPI,err));
}
}
else {
trampoline(
Expand Down Expand Up @@ -273,6 +314,8 @@ function IOx(iof,deps = []) {
// note: also, *parent* IOx must still be open!
chainReturnedIOxs && !chainReturnedIOxs.has(returnedIOx)
) {
addIOxConnection(returnedIOx,publicAPI);

// *parent* IOx should remember this specific
// *returned* IOx (to avoid unnecessary listening)
chainReturnedIOxs.add(returnedIOx);
Expand Down Expand Up @@ -807,7 +850,7 @@ function IOx(iof,deps = []) {
ioDepsPending.add(dep);
ret.push(UNSET);
// wait for the IO result
depRes.then(v => {
let pr = depRes.then(v => {
// still actually waiting on this IO result?
if (ioDepsPending && ioDepsPending.has(dep)) {
ioDepsPending.delete(dep);
Expand All @@ -817,8 +860,11 @@ function IOx(iof,deps = []) {
// async microtask from the promise
trampoline(onDepUpdate(dep,v));
}
})
.catch(logUnhandledError);
});
if (!registeredPromiseCatches.has(pr)) {
registeredPromiseCatches.add(pr);
pr.catch(err => notifyIOxError(publicAPI,err));
}
}
else {
ioDepsPending.delete(dep);
Expand Down Expand Up @@ -869,8 +915,10 @@ function IOx(iof,deps = []) {
// async microtask from the promise
trampoline(handleValue(v2))
));
// silence unhandled rejection warnings
pr.catch(EMPTY_FUNC);
if (!registeredPromiseCatches.has(pr)) {
registeredPromiseCatches.add(pr);
pr.catch(err => notifyIOxError(publicAPI,err));
}
return pr;
}
else {
Expand Down Expand Up @@ -993,6 +1041,14 @@ function IOx(iof,deps = []) {
// need to subscribe to any deps?
if (Array.isArray(deps) && deps.length > 0) {
registering = true;

// register connections between IOx instances
for (let dep of deps) {
if (is(dep)) {
addIOxConnection(dep,publicAPI);
}
}

return registerDep(deps);
}
else {
Expand Down Expand Up @@ -1562,6 +1618,8 @@ function zip(ioxs = []) {
}

function subscribeToIOxs([ nextIOx, ...remainingIOxs ],env) {
addIOxConnection(nextIOx,iox);

return continuation(
() => {
// need a queue to hold stream's values?
Expand Down Expand Up @@ -1765,6 +1823,8 @@ function merge(ioxs = []) {
}

function subscribeToIOxs([ nextIOx, ...remainingIOxs ],env) {
addIOxConnection(nextIOx,iox);

return continuation(
() => {
// register a listener for the stream?
Expand Down Expand Up @@ -1864,11 +1924,9 @@ function merge(ioxs = []) {
}

function fromIO(io) {
return IOx(skipFirstIdentity,[ io, ]);
return IOx((env,v) => v,[ io, ]);
}

function skipFirstIdentity(env,v) { return v; }

function fromIter($V,closeOnComplete = true) {
// note: the internals of `fromIter(..)` are
// all async, so no need for the trampolining
Expand Down Expand Up @@ -1965,7 +2023,11 @@ function fromIter($V,closeOnComplete = true) {
subscribed = true;
it = getIter($V);
({ pr: hasPaused, next: signalPaused, } = getDeferred());
drainIterator().catch(logUnhandledError);
let pr = drainIterator();
if (!registeredPromiseCatches.has(pr)) {
registeredPromiseCatches.add(pr);
pr.catch(err => notifyIOxError(iox,err));
}
}
}

Expand Down Expand Up @@ -2016,10 +2078,11 @@ function fromIter($V,closeOnComplete = true) {

}

// note: the internals of `toIter(..)` are
// all async, so no need for the trampolining
// here
function toIter(iox,env) {
// note: the internals of `toIter(..)` are
// all async, so no need for the trampolining
// here

var finalPr;
var prQueue = [];
var nextQueue = [];
Expand Down Expand Up @@ -2194,7 +2257,7 @@ function fromObservable(obsv) {
// (lazily) setup observable subscription
subscription = obsv.subscribe({
next: v => iox(v),
error: logUnhandledError,
error: err => notifyIOxError(iox,err),
complete: close,
});
}
Expand Down Expand Up @@ -2248,6 +2311,63 @@ function fromObservable(obsv) {

}

function addIOxConnection(fromIO,toIO) {
if (!ioxConnections.has(fromIO)) {
ioxConnections.set(fromIO,new Set());
}
var conns = ioxConnections.get(fromIO);
conns.add(toIO);
if (!ioxReverseConnections.has(toIO)) {
ioxReverseConnections.set(toIO,new Set());
}
ioxReverseConnections.get(toIO).add(conns);
}

function removeIOxConnections(io) {
ioxConnections.delete(io);
if (ioxReverseConnections.has(io)) {
let connsLists = ioxReverseConnections.get(io);
connsLists.forEach(conns => conns.delete(io));
ioxReverseConnections.delete(io);
}
}

function notifyIOxError(iox,err) {
var errReported = false;

// basically, do a breadth-first traversal of the
// registered IOx connections looking for the
// first error handler we can successfully report
// the error to
var ioxs = [ iox, ];
while (ioxs.length > 0) {
let nextIOx = ioxs.shift();

if (ioxErrorCallbacks.has(nextIOx)) {
for (let ecb of ioxErrorCallbacks.get(nextIOx)) {
try {
ecb(err);
errReported = true;
}
catch (err2) {
logUnhandledError(err2);
}
}
}

// error not "successfully" reported, but there's
// other IOx connections we could attempt to notify?
if (!errReported && ioxConnections.has(nextIOx)) {
ioxs.push(...ioxConnections.get(nextIOx));
}
}

// never found any onError handler(s)?
if (!errReported) {
logUnhandledError(err);
}
}

// attempts to use `global.reportError(..)`; falls back to
// `console.error(..)` or `console.log(..)`
//
Expand Down Expand Up @@ -2287,9 +2407,9 @@ function defineNeverIOx() {
map: NeverIOx$, chain: NeverIOx$, flatMap: NeverIOx$,
bind: NeverIOx$, concat: NeverIOx$, run: EMPTY_FUNC,
stop: EMPTY_FUNC, close: EMPTY_FUNC, isClosed,
never: NeverIOx$, isNever, toString,
_chain_with_IO, _inspect, _bind: origBind,
_is: IOx.is, [Symbol.toStringTag]: TAG,
never: NeverIOx$, isNever, toString, onError: EMPTY_FUNC,
offError: EMPTY_FUNC, _chain_with_IO, _inspect,
_bind: origBind, _is: IOx.is, [Symbol.toStringTag]: TAG,
});
registerHooks.set(publicAPI,[ EMPTY_FUNC, EMPTY_FUNC ]);
NeverIOx$.pipe = NeverIOx$;
Expand Down

0 comments on commit 939fedb

Please sign in to comment.