Skip to content

Commit

Permalink
feat(onUnhandledError): configuration point added for unhandled errors
Browse files Browse the repository at this point in the history
- Adds new configuration setting `onUnhandledError`, which defaults to using "hostReportError" behavior.
- Adds tests that ensure it is called appropriately, and that it is always asynchronous.
- Updates internal name of empty observer to be `EMPTY_OBSERVER` throughout and types it to prevent mutations. Reduces overhead by using the `noop` function for its callbacks.
- Errors that occur during subscription setup _after_ the subscription was already closed will no longer log to `console.warn`

BREAKING CHANGE: Errors that occur during setup of an observable subscription after the subscription has emitted an error or completed will now throw in their own call stack. Before it would call `console.warn`. This is potentially breaking in edge cases for node applications as a node app may be configured to crash for an unhandled exception. In the unlikely event this affects you, you can configure the behavior to `console.warn` in the new configuration setting like so: `import { config } from 'rxjs'; config.onUnhandledError = (err) => console.warn(err);`
  • Loading branch information
benlesh committed Aug 26, 2020
1 parent c9ea2b0 commit 19ed152
Show file tree
Hide file tree
Showing 12 changed files with 244 additions and 48 deletions.
1 change: 1 addition & 0 deletions api_guard/dist/types/index.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,7 @@ export declare function concat<O1 extends ObservableInput<any>, O2 extends Obser
export declare function concat<A extends ObservableInput<any>[]>(...observables: A): Observable<ObservedValueUnionFromArray<A>>;

export declare const config: {
onUnhandledError: ((err: any) => void) | null;
quietBadConfig: boolean;
Promise: PromiseConstructorLike;
useDeprecatedSynchronousErrorHandling: boolean;
Expand Down
26 changes: 14 additions & 12 deletions spec/Observable-spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -991,19 +991,21 @@ describe('Observable.lift', () => {
);
});

it('should not swallow internal errors', () => {
const consoleStub = sinon.stub(console, 'warn');
try {
let source = new Observable<number>((observer) => observer.next(42));
for (let i = 0; i < 10000; ++i) {
let base = source;
source = new Observable<number>((observer) => base.subscribe(observer));
it('should not swallow internal errors', (done) => {
config.onUnhandledError = (err) => {
expect(err).to.equal('bad');
config.onUnhandledError = null;
done();
};

new Observable(subscriber => {
subscriber.error('test');
throw 'bad';
}).subscribe({
error: err => {
expect(err).to.equal('test');
}
source.subscribe();
expect(consoleStub).to.have.property('called', true);
} finally {
consoleStub.restore();
}
});
});

// TODO: Stop skipping this until a later refactor (probably in version 8)
Expand Down
159 changes: 159 additions & 0 deletions spec/config-spec.ts
Original file line number Diff line number Diff line change
@@ -1,9 +1,168 @@
/** @prettier */

import { config } from '../src/internal/config';
import { expect } from 'chai';
import { Observable } from 'rxjs';

describe('config', () => {
it('should have a Promise property that defaults to nothing', () => {
expect(config).to.have.property('Promise');
expect(config.Promise).to.be.undefined;
});

describe('onUnhandledError', () => {
afterEach(() => {
config.onUnhandledError = null;
});

it('should default to null', () => {
expect(config.onUnhandledError).to.be.null;
});

it('should call asynchronously if an error is emitted and not handled by the consumer observer', (done) => {
let called = false;
const results: any[] = [];

config.onUnhandledError = err => {
called = true;
expect(err).to.equal('bad');
done()
};

const source = new Observable<number>(subscriber => {
subscriber.next(1);
subscriber.error('bad');
});

source.subscribe({
next: value => results.push(value),
});
expect(called).to.be.false;
expect(results).to.deep.equal([1]);
});

it('should call asynchronously if an error is emitted and not handled by the consumer next callback', (done) => {
let called = false;
const results: any[] = [];

config.onUnhandledError = err => {
called = true;
expect(err).to.equal('bad');
done()
};

const source = new Observable<number>(subscriber => {
subscriber.next(1);
subscriber.error('bad');
});

source.subscribe(value => results.push(value));
expect(called).to.be.false;
expect(results).to.deep.equal([1]);
});

it('should call asynchronously if an error is emitted and not handled by the consumer in the empty case', (done) => {
let called = false;
config.onUnhandledError = err => {
called = true;
expect(err).to.equal('bad');
done()
};

const source = new Observable(subscriber => {
subscriber.error('bad');
});

source.subscribe();
expect(called).to.be.false;
});

it('should call asynchronously if a subscription setup errors after the subscription is closed by an error', (done) => {
let called = false;
config.onUnhandledError = err => {
called = true;
expect(err).to.equal('bad');
done()
};

const source = new Observable(subscriber => {
subscriber.error('handled');
throw 'bad';
});

let syncSentError: any;
source.subscribe({
error: err => {
syncSentError = err;
}
});

expect(syncSentError).to.equal('handled');
expect(called).to.be.false;
});

it('should call asynchronously if a subscription setup errors after the subscription is closed by a completion', (done) => {
let called = false;
let completed = false;

config.onUnhandledError = err => {
called = true;
expect(err).to.equal('bad');
done()
};

const source = new Observable(subscriber => {
subscriber.complete();
throw 'bad';
});

source.subscribe({
error: () => {
throw 'should not be called';
},
complete: () => {
completed = true;
}
});

expect(completed).to.be.true;
expect(called).to.be.false;
});

/**
* Thie test is added so people know this behavior is _intentional_. It's part of the contract of observables
* and, while I'm not sure I like it, it might start surfacing untold numbers of errors, and break
* node applications if we suddenly changed this to start throwing errors on other jobs for instances
* where users accidentally called `subscriber.error` twice. Likewise, would we report an error
* for two calls of `complete`? This is really something a build-time tool like a linter should
* capture. Not a run time error reporting event.
*/
it('should not be called if two errors are sent to the subscriber', (done) => {
let called = false;
config.onUnhandledError = () => {
called = true;
};

const source = new Observable(subscriber => {
subscriber.error('handled');
subscriber.error('swallowed');
});

let syncSentError: any;
source.subscribe({
error: err => {
syncSentError = err;
}
});

expect(syncSentError).to.equal('handled');
// This timeout would be scheduled _after_ any error timeout that might be scheduled
// (But we're not scheduling that), so this is just an artificial delay to make sure the
// behavior sticks.
setTimeout(() => {
expect(called).to.be.false;
done();
});
});
});
});
22 changes: 22 additions & 0 deletions src/internal/EMPTY_OBSERVER.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
import { Observer } from './types';
import { config } from './config';
import { reportUnhandledError } from './util/reportUnhandledError';
import { noop } from './util/noop';

/**
* The observer used as a stub for subscriptions where the user did not
* pass any arguments to `subscribe`. Comes with the default error handling
* behavior.
*/
export const EMPTY_OBSERVER: Readonly<Observer<any>> = {
closed: true,
next: noop,
error(err: any): void {
if (config.useDeprecatedSynchronousErrorHandling) {
throw err;
} else {
reportUnhandledError(err);
}
},
complete: noop
};
5 changes: 3 additions & 2 deletions src/internal/Observable.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import { toSubscriber } from './util/toSubscriber';
import { observable as Symbol_observable } from './symbol/observable';
import { pipeFromArray } from './util/pipe';
import { config } from './config';
import { reportUnhandledError } from './util/reportUnhandledError';

/**
* A representation of any set of values over any amount of time. This is the most basic building block
Expand Down Expand Up @@ -253,8 +254,8 @@ export class Observable<T> implements Subscribable<T> {
sink.error(err);
} else {
// If an error is thrown during subscribe, but our subscriber is closed, so we cannot notify via the
// subscription "error" channel, we are warning the developer of the problem here, via the console.
console.warn(err);
// subscription "error" channel, it is an unhandled error and we need to report it appropriately.
reportUnhandledError(err);
}
}
}
Expand Down
16 changes: 0 additions & 16 deletions src/internal/Observer.ts

This file was deleted.

12 changes: 6 additions & 6 deletions src/internal/Subscriber.ts
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
import { isFunction } from './util/isFunction';
import { empty as emptyObserver } from './Observer';
import { EMPTY_OBSERVER } from './EMPTY_OBSERVER';
import { Observer, PartialObserver } from './types';
import { Subscription, isSubscription } from './Subscription';
import { config } from './config';
import { hostReportError } from './util/hostReportError';
import { reportUnhandledError } from './util/reportUnhandledError';

/**
* Implements the {@link Observer} interface and extends the
Expand Down Expand Up @@ -58,11 +58,11 @@ export class Subscriber<T> extends Subscription implements Observer<T> {

switch (arguments.length) {
case 0:
this.destination = emptyObserver;
this.destination = EMPTY_OBSERVER;
break;
case 1:
if (!destinationOrNext) {
this.destination = emptyObserver;
this.destination = EMPTY_OBSERVER;
break;
}
if (typeof destinationOrNext === 'object') {
Expand Down Expand Up @@ -165,7 +165,7 @@ export class SafeSubscriber<T> extends Subscriber<T> {
next = observerOrNext.next;
error = observerOrNext.error;
complete = observerOrNext.complete;
if (observerOrNext !== emptyObserver) {
if (observerOrNext !== EMPTY_OBSERVER) {
let context: any;
if (config.useDeprecatedNextContext) {
context = Object.create(observerOrNext);
Expand Down Expand Up @@ -224,7 +224,7 @@ export class SafeSubscriber<T> extends Subscriber<T> {
throw err;
}
} else {
hostReportError(err);
reportUnhandledError(err);
}
}

Expand Down
11 changes: 11 additions & 0 deletions src/internal/config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,17 @@ let _enable_deoptimized_subscriber_creation = false;
* like what Promise contructor should used to create Promises
*/
export const config = {
/**
* A registration point for unhandled errors from RxJS. These are errors that
* cannot were not handled by consuming code in the usual subscription path. For
* example, if you have this configured, and you subscribe to an observable without
* providing an error handler, errors from that subscription will end up here. This
* will _always_ be called asynchronously on another job in the runtime. This is because
* we do not want errors thrown in this user-configured handler to interfere with the
* behavior of the library.
*/
onUnhandledError: null as ((err: any) => void) | null,

/**
* If true, console logs for deprecation warnings will not be emitted.
* @deprecated this will be removed in v8 when all deprecated settings are removed.
Expand Down
8 changes: 0 additions & 8 deletions src/internal/util/hostReportError.ts

This file was deleted.

24 changes: 24 additions & 0 deletions src/internal/util/reportUnhandledError.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
/** @prettier */
import { config } from '../config';

/**
* Handles an error on another job either with the user-configured {@link onUnhandledError},
* or by throwing it on that new job so it can be picked up by `window.onerror`, `process.on('error')`, etc.
*
* This should be called whenever there is an error that is out-of-band with the subscription
* or when an error hits a terminal boundary of the subscription and no error handler was provided.
*
* @param err the error to report
*/
export function reportUnhandledError(err: any) {
setTimeout(() => {
const { onUnhandledError } = config;
if (onUnhandledError) {
// Execute the user-configured error handler.
onUnhandledError(err);
} else {
// Throw so it is picked up by the runtime's uncaught error mechanism.
throw err;
}
});
}
4 changes: 2 additions & 2 deletions src/internal/util/subscribeToPromise.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import { Subscriber } from '../Subscriber';
import { hostReportError } from './hostReportError';
import { reportUnhandledError } from './reportUnhandledError';

export const subscribeToPromise = <T>(promise: PromiseLike<T>) => (subscriber: Subscriber<T>) => {
promise.then(
Expand All @@ -11,6 +11,6 @@ export const subscribeToPromise = <T>(promise: PromiseLike<T>) => (subscriber: S
},
(err: any) => subscriber.error(err)
)
.then(null, hostReportError);
.then(null, reportUnhandledError);
return subscriber;
};
4 changes: 2 additions & 2 deletions src/internal/util/toSubscriber.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
/** @prettier */
import { Subscriber } from '../Subscriber';
import { empty as emptyObserver } from '../Observer';
import { EMPTY_OBSERVER } from '../EMPTY_OBSERVER';
import { PartialObserver, Observer } from '../types';
import { isSubscription } from '../Subscription';

Expand All @@ -20,7 +20,7 @@ export function toSubscriber<T>(
}

if (!nextOrObserver && !error && !complete) {
return new Subscriber(emptyObserver);
return new Subscriber(EMPTY_OBSERVER);
}

return new Subscriber(nextOrObserver, error, complete);
Expand Down

0 comments on commit 19ed152

Please sign in to comment.