-
-
Notifications
You must be signed in to change notification settings - Fork 21
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat(concatEagerMap): add operator #81
Conversation
a339d0e
to
6fb86cb
Compare
I'll have a look at this later. FWIW, I wrote a version of function concatMapEager<T, O extends ObservableInput<any>>(
project: (value: T, index: number) => O,
concurrency?: number
): OperatorFunction<T, ObservedValueOf<O>> {
type R = ObservedValueOf<O>;
type Inner = {
complete: boolean;
index: number;
values: R[];
};
return (source) =>
defer(() => {
let activeIndex = 0;
const innersByIndex = new Map<number, Inner>();
function flush() {
const values: R[] = [];
let activeInner = innersByIndex.get(activeIndex);
while (activeInner) {
values.push(...activeInner.values);
activeInner.values.length = 0;
if (activeInner.complete) {
innersByIndex.delete(activeIndex);
activeInner = innersByIndex.get(++activeIndex);
} else {
break;
}
}
return values;
}
return source.pipe(
mergeMap(
(value, index) =>
from(project(value, index)).pipe(
materialize<R>(),
map((notification) => ({
index,
notification,
}))
),
concurrency
),
mergeMap(({ index, notification }) => {
let inner = innersByIndex.get(index);
if (!inner) {
inner = { complete: false, index, values: [] };
innersByIndex.set(index, inner);
}
switch (notification.kind) {
case "N":
inner.values.push(notification.value!);
break;
case "C":
inner.complete = true;
break;
case "E":
return notification.toObservable();
default:
return throwError("Unexpected notification kind.");
}
if (inner.index !== activeIndex) {
return EMPTY;
}
return flush();
})
);
});
} |
nice! Thanks for sharing that. I like the name of your operator a lot better 😅 That's cool! TIL about I don't mind closing this PR in favor of your implementation, of course. Your implementation is lot more declarative and clearer, which makes it less prone to bugs, and a lot easier to understand and maintain. My implementation, on the other hand, is a lot more imperative/complicated. However, mine uses less operators. Do you think that can make a difference in terms of performance? Lastly, I can't help myself from insisting (for the last time, I promise) on the idea that if we take into account what's the behavior that's expected from concatMapEager() => concatMapEager(Infinity)
concatMapEager(1) => concatMap() Wouldn't it make sense that concatMap() => concatMapEager(1)
concatMap(Infinity) => concatMapEager() ? I mean, the only thing that makes it |
It's the name used in RxJava.
IMO, there are only two that could impact performance in any meaningful way, but, even then, I would be pretty surprised if the cost were to be a significant bottleneck. I'm talking about the The reality is that the operator is most likely to be used with situations like the one in the issue in the RxJS repo: each source value effects some expensive async request to obtain some resource. And the caller wants some of those to happen concurrently. There is no chance that there will be performance issues in those scenarios. And, IMO, any scenario in which notifications occur with sufficient frequency to effect perf problems in the time domain will likely have major problems in the memory domain, as this operator is inherently an unbounded buffer. The main reason for doing it the way I did it is - as you've mentioned - that it's more declarative and builds on
Given that passing a concurrency parameter would completely change the widely accepted behaviour of |
Thanks a lot for the detailed answer. I'm closing this in favor of your implementation. The main reason why I wrote it using Also, about whether Thanks a lot for taking the time to give me such a detailed answer. |
As discussed in the "Allow concurrency in
concatMap
" issue of RxJS, this could be a nice operator to have.I'm not happy with the name that I came up with, so I'm more than open to suggestions.* Also, the tests are not very complete... I'm more than willing to add more. Just let me know what other things should be tested and I will add the tests.(* I just renamed it from
sortedMergeMap
toconcatEagerMap
based on this. I'm still open to suggestions, though)