diff --git a/spec/Observable-spec.ts b/spec/Observable-spec.ts index 7e86b66d01..3627c37f15 100644 --- a/spec/Observable-spec.ts +++ b/spec/Observable-spec.ts @@ -901,5 +901,20 @@ describe('Observable.lift', () => { ]); done(); }); - }); + }); + + it('should not swallow internal errors', () => { + const consoleStub = sinon.stub(console, 'warn'); + try { + let source = new Observable(observer => observer.next(42)); + for (let i = 0; i < 10000; ++i) { + let base = source; + source = new Observable(observer => base.subscribe(observer)); + } + source.subscribe(); + expect(consoleStub).to.have.property('called', true); + } finally { + consoleStub.restore(); + } + }); }); diff --git a/spec/observables/bindCallback-spec.ts b/spec/observables/bindCallback-spec.ts index bfe915caec..e2caf8866f 100644 --- a/spec/observables/bindCallback-spec.ts +++ b/spec/observables/bindCallback-spec.ts @@ -280,4 +280,18 @@ describe('bindCallback', () => { expect(calls).to.equal(0); }); }); + + it('should not swallow post-callback errors', () => { + function badFunction(callback: (answer: number) => void): void { + callback(42); + throw new Error('kaboom'); + } + const consoleStub = sinon.stub(console, 'warn'); + try { + bindCallback(badFunction)().subscribe(); + expect(consoleStub).to.have.property('called', true); + } finally { + consoleStub.restore(); + } + }); }); diff --git a/spec/observables/bindNodeCallback-spec.ts b/spec/observables/bindNodeCallback-spec.ts index 19b019b4f2..874385f57e 100644 --- a/spec/observables/bindNodeCallback-spec.ts +++ b/spec/observables/bindNodeCallback-spec.ts @@ -278,4 +278,18 @@ describe('bindNodeCallback', () => { expect(results1).to.deep.equal([42, 'done']); expect(results2).to.deep.equal([42, 'done']); }); + + it('should not swallow post-callback errors', () => { + function badFunction(callback: (error: Error, answer: number) => void): void { + callback(null, 42); + throw new Error('kaboom'); + } + const consoleStub = sinon.stub(console, 'warn'); + try { + bindNodeCallback(badFunction)().subscribe(); + expect(consoleStub).to.have.property('called', true); + } finally { + consoleStub.restore(); + } + }); }); diff --git a/spec/util/canReportError-spec.ts b/spec/util/canReportError-spec.ts new file mode 100644 index 0000000000..77df00d2ae --- /dev/null +++ b/spec/util/canReportError-spec.ts @@ -0,0 +1,29 @@ +import { expect } from 'chai'; +import { noop, Subject, Subscriber } from 'rxjs'; +import { canReportError } from 'rxjs/internal/util/canReportError'; + +describe('canReportError', () => { + it('should report errors to an observer if possible', () => { + const subscriber = new Subscriber<{}>(noop, noop); + expect(canReportError(subscriber)).to.be.true; + }); + + it('should not report errors to a stopped observer', () => { + const subscriber = new Subscriber<{}>(noop, noop); + subscriber.error(new Error('kaboom')); + expect(canReportError(subscriber)).to.be.false; + }); + + it('should not report errors to a closed subject', () => { + const subject = new Subject<{}>(); + subject.unsubscribe(); + expect(canReportError(subject)).to.be.false; + }); + + it('should not report errors an observer with a stopped destination', () => { + const destination = new Subscriber<{}>(noop, noop); + const subscriber = new Subscriber<{}>(destination); + destination.error(new Error('kaboom')); + expect(canReportError(subscriber)).to.be.false; + }); +}); diff --git a/src/internal/Observable.ts b/src/internal/Observable.ts index d9e8f0c0d5..78b7506190 100644 --- a/src/internal/Observable.ts +++ b/src/internal/Observable.ts @@ -2,6 +2,7 @@ import { Operator } from './Operator'; import { Subscriber } from './Subscriber'; import { Subscription } from './Subscription'; import { TeardownLogic, OperatorFunction, PartialObserver, Subscribable } from './types'; +import { canReportError } from './util/canReportError'; import { toSubscriber } from './util/toSubscriber'; import { iif } from './observable/iif'; import { throwError } from './observable/throwError'; @@ -226,7 +227,11 @@ export class Observable implements Subscribable { sink.syncErrorThrown = true; sink.syncErrorValue = err; } - sink.error(err); + if (canReportError(sink)) { + sink.error(err); + } else { + console.warn(err); + } } } diff --git a/src/internal/Subscriber.ts b/src/internal/Subscriber.ts index d7dd3c84de..7f81c5814b 100644 --- a/src/internal/Subscriber.ts +++ b/src/internal/Subscriber.ts @@ -325,6 +325,6 @@ export class SafeSubscriber extends Subscriber { } } -function isTrustedSubscriber(obj: any) { +export function isTrustedSubscriber(obj: any) { return obj instanceof Subscriber || ('_addParentTeardownLogic' in obj && obj[rxSubscriberSymbol]); } diff --git a/src/internal/observable/bindCallback.ts b/src/internal/observable/bindCallback.ts index 1ec55260ad..3166a41fa2 100644 --- a/src/internal/observable/bindCallback.ts +++ b/src/internal/observable/bindCallback.ts @@ -3,6 +3,7 @@ import { Observable } from '../Observable'; import { AsyncSubject } from '../AsyncSubject'; import { Subscriber } from '../Subscriber'; import { map } from '../operators/map'; +import { canReportError } from '../util/canReportError'; import { isArray } from '../util/isArray'; import { isScheduler } from '../util/isScheduler'; @@ -204,7 +205,11 @@ export function bindCallback( try { callbackFunc.apply(context, [...args, handler]); } catch (err) { - subject.error(err); + if (canReportError(subject)) { + subject.error(err); + } else { + console.warn(err); + } } } return subject.subscribe(subscriber); diff --git a/src/internal/observable/bindNodeCallback.ts b/src/internal/observable/bindNodeCallback.ts index 4aefc2f755..d40e4fb6a4 100644 --- a/src/internal/observable/bindNodeCallback.ts +++ b/src/internal/observable/bindNodeCallback.ts @@ -3,6 +3,7 @@ import { AsyncSubject } from '../AsyncSubject'; import { Subscriber } from '../Subscriber'; import { SchedulerAction, SchedulerLike } from '../types'; import { map } from '../operators/map'; +import { canReportError } from '../util/canReportError'; import { isScheduler } from '../util/isScheduler'; import { isArray } from '../util/isArray'; @@ -198,7 +199,11 @@ export function bindNodeCallback( try { callbackFunc.apply(context, [...args, handler]); } catch (err) { - subject.error(err); + if (canReportError(subject)) { + subject.error(err); + } else { + console.warn(err); + } } } return subject.subscribe(subscriber); diff --git a/src/internal/util/canReportError.ts b/src/internal/util/canReportError.ts new file mode 100644 index 0000000000..caa98f7ea6 --- /dev/null +++ b/src/internal/util/canReportError.ts @@ -0,0 +1,22 @@ +import { isTrustedSubscriber, Subscriber } from '../Subscriber'; +import { Subject } from '../Subject'; + +/** + * Determines whether the ErrorObserver is closed or stopped or has a + * destination that is closed or stopped - in which case errors will + * need to be reported via a different mechanism. + * @param observer the observer + */ +export function canReportError(observer: Subscriber | Subject): boolean { + while (observer) { + const { closed, destination, isStopped } = observer as any; + if (closed || isStopped) { + return false; + } else if (destination && isTrustedSubscriber(destination)) { + observer = destination; + } else { + observer = null; + } + } + return true; +}