-
Notifications
You must be signed in to change notification settings - Fork 90
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Permit unsubscribe to return a promise #211
Comments
I guess the question is: what's the point? Is it timing, so that other operations are sequenced after underlying resources are released? Is it error handling? Will subscribe code know how to handle cleanup failure in a meaningful way? |
For fun I implemented an async iterator of Implementation/**
* @template {EventTarget} Target
* @param {string} type
* @param {Target} target
* @returns {AsyncGenerator<Event, void, void>}
*/
export default function fromEvent(type, target) {
/** @type {Event[]} */
const buffer = [];
let isListening = true;
/** @type {EventListener} */
function handler(event) {
buffer.push(event);
}
function cleanUp() {
isListening = false;
target.removeEventListener(type, handler);
}
target.addEventListener(type, handler);
return {
return() {
cleanUp();
return Promise.resolve({ value: undefined, done: true });
},
throw(error) {
cleanUp();
return Promise.resolve({ value: error, done: true });
},
async next() {
// eslint-disable-next-line no-unmodified-loop-condition
while (buffer.length === 0 && isListening) {
await new Promise((resolve) => globalThis.setTimeout(resolve));
}
if (!isListening) {
return { value: undefined, done: true };
}
const value = /** @type {Event} */ (buffer.shift());
return { value, done: false };
},
[Symbol.asyncIterator]() {
return this;
},
};
} Test Casesimport test from "ava";
import fromEvent from "../from-event.js";
test("from event", async (t) => {
const target = new EventTarget();
const listener = fromEvent("foo", target);
const iter = listener[Symbol.asyncIterator]();
const fooEvents = [new Event("foo"), new Event("foo")];
target.dispatchEvent(fooEvents[0]);
target.dispatchEvent(fooEvents[1]);
t.deepEqual(await iter.next(), { value: fooEvents[0], done: false });
t.deepEqual(await iter.next(), { value: fooEvents[1], done: false });
});
test("ok on empty buffer", async (t) => {
const target = new EventTarget();
const listener = fromEvent("foo", target);
const iter = listener[Symbol.asyncIterator]();
const fooEvents = [new Event("foo"), new Event("foo")];
globalThis.setTimeout(() => {
target.dispatchEvent(fooEvents[0]);
target.dispatchEvent(fooEvents[1]);
}, 50);
t.deepEqual(await iter.next(), { value: fooEvents[0], done: false });
t.deepEqual(await iter.next(), { value: fooEvents[1], done: false });
});
test("`.return` removes the listener", async (t) => {
const target = new EventTarget();
const listener = fromEvent("foo", target);
const iter = listener[Symbol.asyncIterator]();
const fooEvents = [new Event("foo"), new Event("foo"), new Event("foo")];
target.dispatchEvent(fooEvents[0]);
target.dispatchEvent(fooEvents[1]);
t.deepEqual(await iter.next(), { value: fooEvents[0], done: false });
t.deepEqual(await iter.next(), { value: fooEvents[1], done: false });
listener.return();
target.dispatchEvent(fooEvents[2]);
t.is((await iter.next()).done, true);
});
test("`.throw` removes the listener", async (t) => {
const target = new EventTarget();
const listener = fromEvent("foo", target);
const iter = listener[Symbol.asyncIterator]();
const fooEvents = [new Event("foo"), new Event("foo"), new Event("foo")];
target.dispatchEvent(fooEvents[0]);
target.dispatchEvent(fooEvents[1]);
t.deepEqual(await iter.next(), { value: fooEvents[0], done: false });
t.deepEqual(await iter.next(), { value: fooEvents[1], done: false });
listener.throw(new Error("error"));
target.dispatchEvent(fooEvents[2]);
t.is((await iter.next()).done, true);
}); I think You will also notice that there is a subtle difference in behavior if more then one event is dispatched in the same tick. An observable will run in the same tick but an async iterator will always wait until the next tick to run the handler. This subtle difference might be important in some user code where an event triggered would alter the behavior of how subsequent events are fired. Thirdly, notice that I have to create my own process loop that runs once per tick. I’m not an engine expert so this might be minor, but I do wonder about the performance implication of this, especially if the task required to run in this main loop is non-trivial, or if an inexperienced (or tired) programmer fails to find an optimized version to check if an action is needed. |
@runarberg I don't think we're talking about the same thing. You're talking about making Observables into async iterables. I'm talking about making async iterables into observables. In general I agree that async iteration is not well suited to the general pubsub use case, particularly when there are multiple subscribers. For one thing there's the problem you mention with sync things becoming async, which leads to lost performance (and the potential for flickering behavior in UIs). Another problem is that stack traces are lost, and it becomes hard to tell why things happened. Also the order in which subscribers are notified about an event is then determined by some very hidden engine internals and may (or may not be) essentially random. Since async iterables have these problems, if they are the data source it may make sense to convert them to observables when there are multiple subscribers interested in the "events" (resolution of iterator step promises) they produce. My wish is that this be possible, and that an observable created from a generic async iterator be able to ensure that the async underlying iterator is cleaned up correctly. |
I find it is not a well known feature but the spec allows sync and async iterators to specify a
return()
method. This in turn enables the creation of iterables that have handles to underlying resources, like files or connections. Since resources referred to by handles are usually accessed (in modern styles) with async APIs, I'm particularly interested in theAsyncIterator.return
method, which returns a promise. If such a method exists, there is no way to know that it would be safe not to call it, and it would seem to me unwise not to await on the promise. Doing so would cause errors to escape their context, instead bubbling up to the global promise rejection handler. Thus, in order to be able to create Observables from async iterators which hold underlying resources, unsubscribe should be able to return a promise.The text was updated successfully, but these errors were encountered: