Skip to content

Commit

Permalink
fix(expand): now works properly with asynchronous schedulers
Browse files Browse the repository at this point in the history
* test(expand-spec): add broken expand with scheduler unit test

* fix(expand): fix expand with async/asap schedulers
  • Loading branch information
trxcllnt authored Jun 15, 2020
1 parent 8f43e71 commit 294b27e
Show file tree
Hide file tree
Showing 2 changed files with 27 additions and 3 deletions.
28 changes: 26 additions & 2 deletions spec/operators/expand-spec.ts
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
import { expect } from 'chai';
import { expand, mergeMap, map } from 'rxjs/operators';
import { expand, mergeMap, map, take, toArray } from 'rxjs/operators';
import { TestScheduler } from 'rxjs/testing';
import { hot, cold, expectObservable, expectSubscriptions } from '../helpers/marble-testing';
import { Subscribable, EMPTY, Observable, of, Observer } from 'rxjs';
import { Subscribable, EMPTY, Observable, of, Observer, asapScheduler, asyncScheduler } from 'rxjs';

declare const type: Function;

Expand Down Expand Up @@ -414,4 +414,28 @@ describe('expand operator', () => {
expectObservable(result).toBe(expected, values);
expectSubscriptions(e1.subscriptions).toBe(e1subs);
});

it('should work with the AsapScheduler', (done) => {
const expected = [0, 1, 2, 3, 4, 5, 6, 7, 8, 9];
of(0).pipe(
expand((x) => of(x + 1), Infinity, asapScheduler),
take(10),
toArray()
).subscribe(
(actual) => expect(actual).to.deep.equal(expected),
done, done
);
});

it('should work with the AsyncScheduler', (done) => {
const expected = [0, 1, 2, 3, 4, 5, 6, 7, 8, 9];
of(0).pipe(
expand((x) => of(x + 1), Infinity, asyncScheduler),
take(10),
toArray()
).subscribe(
(actual) => expect(actual).to.deep.equal(expected),
done, done
);
});
});
2 changes: 1 addition & 1 deletion src/internal/operators/expand.ts
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,7 @@ export class ExpandSubscriber<T, R> extends OuterSubscriber<T, R> {
if (this.active < this.concurrent) {
destination.next(value);
try {
this.active++;
const { project } = this;
const result = project(value, index);
if (!this.scheduler) {
Expand All @@ -149,7 +150,6 @@ export class ExpandSubscriber<T, R> extends OuterSubscriber<T, R> {
}

private subscribeToProjection(result: any, value: T, index: number): void {
this.active++;
const destination = this.destination as Subscription;
destination.add(subscribeToResult<T, R>(this, result, value, index));
}
Expand Down

0 comments on commit 294b27e

Please sign in to comment.