Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(concatEagerMap): add operator #81

Closed
wants to merge 1 commit into from

Conversation

josepot
Copy link
Contributor

@josepot josepot commented Jun 26, 2020

As discussed in the "Allow concurrency in concatMap" issue of RxJS, this could be a nice operator to have. I'm not happy with the name that I came up with, so I'm more than open to suggestions.* Also, the tests are not very complete... I'm more than willing to add more. Just let me know what other things should be tested and I will add the tests.

(* I just renamed it from sortedMergeMap to concatEagerMap based on this. I'm still open to suggestions, though)

@josepot josepot force-pushed the feat/sortedMergeMap branch from a339d0e to 6fb86cb Compare June 26, 2020 11:52
@josepot josepot changed the title feat(sortedMergeMap): add operator feat(concatEagerMap): add operator Jun 26, 2020
@cartant
Copy link
Owner

cartant commented Jun 26, 2020

I'll have a look at this later. FWIW, I wrote a version of concatMapEager, this morning:

function concatMapEager<T, O extends ObservableInput<any>>(
  project: (value: T, index: number) => O,
  concurrency?: number
): OperatorFunction<T, ObservedValueOf<O>> {
  type R = ObservedValueOf<O>;
  type Inner = {
    complete: boolean;
    index: number;
    values: R[];
  };

  return (source) =>
    defer(() => {
      let activeIndex = 0;
      const innersByIndex = new Map<number, Inner>();

      function flush() {
        const values: R[] = [];
        let activeInner = innersByIndex.get(activeIndex);
        while (activeInner) {
          values.push(...activeInner.values);
          activeInner.values.length = 0;
          if (activeInner.complete) {
            innersByIndex.delete(activeIndex);
            activeInner = innersByIndex.get(++activeIndex);
          } else {
            break;
          }
        }
        return values;
      }

      return source.pipe(
        mergeMap(
          (value, index) =>
            from(project(value, index)).pipe(
              materialize<R>(),
              map((notification) => ({
                index,
                notification,
              }))
            ),
          concurrency
        ),
        mergeMap(({ index, notification }) => {
          let inner = innersByIndex.get(index);
          if (!inner) {
            inner = { complete: false, index, values: [] };
            innersByIndex.set(index, inner);
          }
          switch (notification.kind) {
            case "N":
              inner.values.push(notification.value!);
              break;
            case "C":
              inner.complete = true;
              break;
            case "E":
              return notification.toObservable();
            default:
              return throwError("Unexpected notification kind.");
          }
          if (inner.index !== activeIndex) {
            return EMPTY;
          }
          return flush();
        })
      );
    });
}

@josepot
Copy link
Contributor Author

josepot commented Jun 26, 2020

nice! Thanks for sharing that. I like the name of your operator a lot better 😅

That's cool! TIL about materialize 😍 !!

I don't mind closing this PR in favor of your implementation, of course.

Your implementation is lot more declarative and clearer, which makes it less prone to bugs, and a lot easier to understand and maintain. My implementation, on the other hand, is a lot more imperative/complicated. However, mine uses less operators. Do you think that can make a difference in terms of performance?

Lastly, I can't help myself from insisting (for the last time, I promise) on the idea that if we take into account what's the behavior that's expected from concat -rather than thinking about how it's currently implemented-, then I think that it would make sense to allow concatMap to receive the concurrent parameter. I mean, considering that:

concatMapEager() => concatMapEager(Infinity)
concatMapEager(1) => concatMap()

Wouldn't it make sense that

concatMap() => concatMapEager(1)
concatMap(Infinity) => concatMapEager()

?

I mean, the only thing that makes it eager is the fact that the concurrent parameter has the default value of Infinity, if it had the default value of one, then it would be the same as concatMap... Anyways, I also realize that this is a weird hill to die on

@cartant
Copy link
Owner

cartant commented Jun 26, 2020

I like the name of your operator a lot better

It's the name used in RxJava.

Do you think that can make a difference in terms of performance?

IMO, there are only two that could impact performance in any meaningful way, but, even then, I would be pretty surprised if the cost were to be a significant bottleneck. I'm talking about the materialize and the map that effect subscriptions for each value from the source. (Also, this cost will be further reduced in version 8.)

The reality is that the operator is most likely to be used with situations like the one in the issue in the RxJS repo: each source value effects some expensive async request to obtain some resource. And the caller wants some of those to happen concurrently. There is no chance that there will be performance issues in those scenarios. And, IMO, any scenario in which notifications occur with sufficient frequency to effect perf problems in the time domain will likely have major problems in the memory domain, as this operator is inherently an unbounded buffer.

The main reason for doing it the way I did it is - as you've mentioned - that it's more declarative and builds on mergeMap - which is a fundamental, extensively-tested core operator. If this were to be implemented in the core, it would be different and InnerSubscriber and OuterSubscriber would be used. They cannot be used in user land, as they're not exported. Generally, when building a user-land operator, I try pretty hard to compose it from core operators and reach for new Observable only when the composition becomes complicated. You can see the process in this blog post, in which a user-land operator becomes increasingly complicated as bugs are found and fixed.

... it would make sense to allow concatMap to receive the concurrent parameter

Given that passing a concurrency parameter would completely change the widely accepted behaviour of concatMap - in terms of its subscribing to sources in a serial fashion - I'm convinced that it needs a different operator name. The precedent is there in RxJava.

@josepot
Copy link
Contributor Author

josepot commented Jun 27, 2020

Thanks a lot for the detailed answer.

I'm closing this in favor of your implementation. The main reason why I wrote it using new Observable is because I thought that could make the transition to core a bit easier, but after reading your answer I do realize that this was an unnecessary premature optimization.

Also, about whether concatMap should accept the concurrent parameter: another reason not to change its current signature is that -generally speaking- operators should do "one thing and do it well". So, yep, I can see how it would be a lot less confusing to have the normal concatMap behave in the way that it currently does, and then have its "eager" version for when this behavior is needed.

Thanks a lot for taking the time to give me such a detailed answer.

@josepot josepot closed this Jun 27, 2020
@josepot josepot deleted the feat/sortedMergeMap branch June 28, 2020 12:51
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants