diff --git a/spec/operators/publishReplay-spec.ts b/spec/operators/publishReplay-spec.ts index c9fdf744b8..dc6a0a937d 100644 --- a/spec/operators/publishReplay-spec.ts +++ b/spec/operators/publishReplay-spec.ts @@ -409,4 +409,75 @@ describe('Observable.prototype.publishReplay', () => { published.connect(); }); + + it('should mirror a simple source Observable with selector', () => { + const values = {a: 2, b: 4, c: 6, d: 8}; + const selector = observable => observable.map(v => 2 * v); + const source = cold('--1-2---3-4---|'); + const sourceSubs = '^ !'; + const published = source.publishReplay(1, Number.POSITIVE_INFINITY, selector); + const expected = '--a-b---c-d---|'; + + expectObservable(published).toBe(expected, values); + expectSubscriptions(source.subscriptions).toBe(sourceSubs); + }); + + it('should emit an error when the selector throws an exception', () => { + const error = "It's broken"; + const selector = () => { + throw error; + }; + const source = cold('--1-2---3-4---|'); + const published = source.publishReplay(1, Number.POSITIVE_INFINITY, selector); + + // The exception is thrown outside Rx chain (not as an error notification). + expect(() => published.subscribe()).to.throw(error); + }); + + it('should emit an error when the selector returns an Observable that emits an error', () => { + const error = "It's broken"; + const innerObservable = cold('--5-6----#', undefined, error); + const selector = observable => observable.mergeMapTo(innerObservable); + const source = cold('--1--2---3---|'); + const sourceSubs = '^ !'; + const published = source.publishReplay(1, Number.POSITIVE_INFINITY, selector); + const expected = '----5-65-6-#'; + + expectObservable(published).toBe(expected, undefined, error); + expectSubscriptions(source.subscriptions).toBe(sourceSubs); + }); + + it('should terminate immediately when the selector returns an empty Observable', () => { + const selector = () => Observable.empty(); + const source = cold('--1--2---3---|'); + const sourceSubs = '(^!)'; + const published = source.publishReplay(1, Number.POSITIVE_INFINITY, selector); + const expected = '|'; + + expectObservable(published).toBe(expected); + expectSubscriptions(source.subscriptions).toBe(sourceSubs); + }); + + it('should not emit and should not complete/error when the selector returns never', () => { + const selector = () => Observable.never(); + const source = cold('-'); + const sourceSubs = '^'; + const published = source.publishReplay(1, Number.POSITIVE_INFINITY, selector); + const expected = '-'; + + expectObservable(published).toBe(expected); + expectSubscriptions(source.subscriptions).toBe(sourceSubs); + }); + + it('should emit error when the selector returns Observable.throw', () => { + const error = "It's broken"; + const selector = () => Observable.throw(error); + const source = cold('--1--2---3---|'); + const sourceSubs = '(^!)'; + const published = source.publishReplay(1, Number.POSITIVE_INFINITY, selector); + const expected = '#'; + + expectObservable(published).toBe(expected, undefined, error); + expectSubscriptions(source.subscriptions).toBe(sourceSubs); + }); }); diff --git a/src/operator/publishReplay.ts b/src/operator/publishReplay.ts index c8cd566a58..735af72340 100644 --- a/src/operator/publishReplay.ts +++ b/src/operator/publishReplay.ts @@ -2,17 +2,27 @@ import { Observable } from '../Observable'; import { IScheduler } from '../Scheduler'; import { ConnectableObservable } from '../observable/ConnectableObservable'; import { publishReplay as higherOrder } from '../operators/publishReplay'; +import { OperatorFunction, MonoTypeOperatorFunction } from '../interfaces'; + +/* tslint:disable:max-line-length */ +export function publishReplay(this: Observable, bufferSize?: number, windowTime?: number, scheduler?: IScheduler): ConnectableObservable; +export function publishReplay(this: Observable, bufferSize?: number, windowTime?: number, selector?: MonoTypeOperatorFunction, scheduler?: IScheduler): Observable; +export function publishReplay(this: Observable, bufferSize?: number, windowTime?: number, selector?: OperatorFunction): Observable; +/* tslint:enable:max-line-length */ /** * @param bufferSize * @param windowTime + * @param selectorOrScheduler * @param scheduler - * @return {ConnectableObservable} + * @return {Observable | ConnectableObservable} * @method publishReplay * @owner Observable */ -export function publishReplay(this: Observable, bufferSize: number = Number.POSITIVE_INFINITY, - windowTime: number = Number.POSITIVE_INFINITY, - scheduler?: IScheduler): ConnectableObservable { - return higherOrder(bufferSize, windowTime, scheduler)(this) as ConnectableObservable; +export function publishReplay(this: Observable, bufferSize?: number, + windowTime?: number, + selectorOrScheduler?: IScheduler | OperatorFunction, + scheduler?: IScheduler): Observable | ConnectableObservable { + + return higherOrder(bufferSize, windowTime, selectorOrScheduler as any, scheduler)(this); } diff --git a/src/operators/publishReplay.ts b/src/operators/publishReplay.ts index 5943db4acb..dc3879bac5 100644 --- a/src/operators/publishReplay.ts +++ b/src/operators/publishReplay.ts @@ -3,10 +3,25 @@ import { ReplaySubject } from '../ReplaySubject'; import { IScheduler } from '../Scheduler'; import { multicast } from './multicast'; import { ConnectableObservable } from '../observable/ConnectableObservable'; -import { UnaryFunction } from '../interfaces'; +import { UnaryFunction, MonoTypeOperatorFunction, OperatorFunction } from '../interfaces'; -export function publishReplay(bufferSize: number = Number.POSITIVE_INFINITY, - windowTime: number = Number.POSITIVE_INFINITY, - scheduler?: IScheduler): UnaryFunction, ConnectableObservable> { - return (source: Observable) => multicast(new ReplaySubject(bufferSize, windowTime, scheduler))(source) as ConnectableObservable; +/* tslint:disable:max-line-length */ +export function publishReplay(bufferSize?: number, windowTime?: number, scheduler?: IScheduler): UnaryFunction, ConnectableObservable>; +export function publishReplay(bufferSize?: number, windowTime?: number, selector?: MonoTypeOperatorFunction, scheduler?: IScheduler): MonoTypeOperatorFunction; +export function publishReplay(bufferSize?: number, windowTime?: number, selector?: OperatorFunction, scheduler?: IScheduler): OperatorFunction; +/* tslint:enable:max-line-length */ + +export function publishReplay(bufferSize?: number, + windowTime?: number, + selectorOrScheduler?: IScheduler | OperatorFunction, + scheduler?: IScheduler): UnaryFunction, ConnectableObservable | Observable> { + + if (selectorOrScheduler && typeof selectorOrScheduler !== 'function') { + scheduler = selectorOrScheduler; + } + + const selector = typeof selectorOrScheduler === 'function' ? selectorOrScheduler : undefined; + const subject = new ReplaySubject(bufferSize, windowTime, scheduler); + + return (source: Observable) => multicast(() => subject, selector)(source) as Observable | ConnectableObservable; }