Skip to content
Zac Sweers edited this page Apr 9, 2019 · 7 revisions

Welcome to the RxDogTag wiki!

Examples

Simple

Consider the following trivial case of emitting an error with no error handling.

Observable.error(new RuntimeException("Unhandled error!"))
    .subscribe();

This results in a stacktrace like this:

io.reactivex.exceptions.OnErrorNotImplementedException: The exception was not handled due to missing onError handler in the subscribe() method call. Further reading: https://github.com/ReactiveX/RxJava/wiki/Error-Handling | Unhandled error!

	at io.reactivex.internal.functions.Functions$OnErrorMissingConsumer.accept(Functions.java:704)
	at io.reactivex.internal.functions.Functions$OnErrorMissingConsumer.accept(Functions.java:701)
	at io.reactivex.internal.observers.LambdaObserver.onError(LambdaObserver.java:77)
	at io.reactivex.internal.disposables.EmptyDisposable.error(EmptyDisposable.java:63)
	at io.reactivex.internal.operators.observable.ObservableError.subscribeActual(ObservableError.java:38)
	at io.reactivex.Observable.subscribe(Observable.java:12090)
	at io.reactivex.Observable.subscribe(Observable.java:12076)
	at io.reactivex.Observable.subscribe(Observable.java:11954)
	at com.uber.anotherpackage.ReadMeExample.simpleSubscribe(ReadMeExample.java:26)
	<collapsed internal calls>
Caused by: java.lang.RuntimeException: Unhandled error!
	at com.uber.anotherpackage.ReadMeExample.simpleSubscribe(ReadMeExample.java:25)
	... 25 more

Now let's look at the same example with tagging enabled:

io.reactivex.exceptions.OnErrorNotImplementedException: Unhandled error!

Caused by: java.lang.RuntimeException: Unhandled error!
	at [[ Inferred subscribe point ]].(:0)
	at com.uber.anotherpackage.ReadMeExample.simpleSubscribe(ReadMeExample.java:26)
	at [[ Original trace ]].(:0)
	at com.uber.anotherpackage.ReadMeExample.simpleSubscribe(ReadMeExample.java:25)
	... 25 more

In the simple case, there's not much added benefit since the execution was synchronous and single threaded, but you can see original trace has now been updated to indicate the exact line that subscribe() was called. In this case: ReadMeExample.java:26.

To reduce noise - RxDogTag will wrap the original cause in a synthetic OnErrorNotImplementedException. This uses the original cause's message and doesn't fill in the stacktrace as it's irrelevant to the trace.

Threading

Consider a more complex example with threading:

CountDownLatch latch = new CountDownLatch(1);
Observable.range(0, 10)
    .subscribeOn(Schedulers.io())
    .<Integer>map(i -> null)
    .subscribe(i -> latch.countDown());
latch.await();

This is a fairly common case in RxJava concurrency. Without tagging, this yields the following trace:

io.reactivex.exceptions.OnErrorNotImplementedException: The exception was not handled due to missing onError handler in the subscribe() method call. Further reading: https://github.com/ReactiveX/RxJava/wiki/Error-Handling | The mapper function returned a null value.

	at io.reactivex.internal.functions.Functions$OnErrorMissingConsumer.accept(Functions.java:704)
	at io.reactivex.internal.functions.Functions$OnErrorMissingConsumer.accept(Functions.java:701)
	at io.reactivex.internal.observers.LambdaObserver.onError(LambdaObserver.java:77)
	at io.reactivex.internal.observers.BasicFuseableObserver.onError(BasicFuseableObserver.java:100)
	at io.reactivex.internal.observers.BasicFuseableObserver.fail(BasicFuseableObserver.java:110)
	at io.reactivex.internal.operators.observable.ObservableMap$MapObserver.onNext(ObservableMap.java:59)
	at io.reactivex.internal.operators.observable.ObservableSubscribeOn$SubscribeOnObserver.onNext(ObservableSubscribeOn.java:58)
	at io.reactivex.internal.operators.observable.ObservableRange$RangeDisposable.run(ObservableRange.java:64)
	at io.reactivex.internal.operators.observable.ObservableRange.subscribeActual(ObservableRange.java:35)
	at io.reactivex.Observable.subscribe(Observable.java:12090)
	at io.reactivex.internal.operators.observable.ObservableSubscribeOn$SubscribeTask.run(ObservableSubscribeOn.java:96)
	at io.reactivex.Scheduler$DisposeTask.run(Scheduler.java:578)
	at io.reactivex.internal.schedulers.ScheduledRunnable.run(ScheduledRunnable.java:66)
	at io.reactivex.internal.schedulers.ScheduledRunnable.call(ScheduledRunnable.java:57)
	at java.util.concurrent.FutureTask.run(FutureTask.java:266)
	at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
	at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.NullPointerException: The mapper function returned a null value.
	at io.reactivex.internal.functions.ObjectHelper.requireNonNull(ObjectHelper.java:39)
	at io.reactivex.internal.operators.observable.ObservableMap$MapObserver.onNext(ObservableMap.java:57)
	... 14 more

Yikes! This is basically impossible to investigate if you're looking at a crash report from the wild.

Now the same trace with tagging enabled:

io.reactivex.exceptions.OnErrorNotImplementedException: The mapper function returned a null value.

Caused by: java.lang.NullPointerException: The mapper function returned a null value.
	at [[ Inferred subscribe point ]].(:0)
	at com.uber.anotherpackage.ReadMeExample.complex(ReadMeExample.java:55)
	at [[ Original trace ]].(:0)
	at io.reactivex.internal.functions.ObjectHelper.requireNonNull(ObjectHelper.java:39)
	at io.reactivex.internal.operators.observable.ObservableMap$MapObserver.onNext(ObservableMap.java:57)
	at io.reactivex.internal.operators.observable.ObservableSubscribeOn$SubscribeOnObserver.onNext(ObservableSubscribeOn.java:58)
	at io.reactivex.internal.operators.observable.ObservableRange$RangeDisposable.run(ObservableRange.java:64)
	at io.reactivex.internal.operators.observable.ObservableRange.subscribeActual(ObservableRange.java:35)
	at io.reactivex.Observable.subscribe(Observable.java:12090)
	at io.reactivex.internal.operators.observable.ObservableSubscribeOn$SubscribeTask.run(ObservableSubscribeOn.java:96)
	at io.reactivex.Scheduler$DisposeTask.run(Scheduler.java:578)
	at io.reactivex.internal.schedulers.ScheduledRunnable.run(ScheduledRunnable.java:66)
	at io.reactivex.internal.schedulers.ScheduledRunnable.call(ScheduledRunnable.java:57)
	at java.util.concurrent.FutureTask.run(FutureTask.java:266)
	at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
	at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)

Now we have the example subscribe line at ReadMeExample.java:43. It may not be a silver bullet to root-causing why the exception occurred, but at least you know where it's emanating from.

Delegate callbacks

Let's look at one more example. This is similar to the previous, but instead of the exception occurring upstream in the chain, the exception occurs in one of the observer callbacks; in this case - onNext.

CountDownLatch latch = new CountDownLatch(1);
Observable.range(0, 10)
    .subscribeOn(Schedulers.io())
    .subscribe(i -> throwSomething());
latch.await();

private void throwSomething() {
  throw new RuntimeException("Unhandled error!");
}

Without tagging, this yields the following trace:

io.reactivex.exceptions.OnErrorNotImplementedException: The exception was not handled due to missing onError handler in the subscribe() method call. Further reading: https://github.com/ReactiveX/RxJava/wiki/Error-Handling | Unhandled error!
	at io.reactivex.internal.functions.Functions$OnErrorMissingConsumer.accept(Functions.java:704)
	at io.reactivex.internal.functions.Functions$OnErrorMissingConsumer.accept(Functions.java:701)
	at io.reactivex.internal.observers.LambdaObserver.onError(LambdaObserver.java:77)
	at io.reactivex.internal.observers.LambdaObserver.onNext(LambdaObserver.java:67)
	at io.reactivex.internal.operators.observable.ObservableSubscribeOn$SubscribeOnObserver.onNext(ObservableSubscribeOn.java:58)
	at io.reactivex.internal.operators.observable.ObservableRange$RangeDisposable.run(ObservableRange.java:64)
	at io.reactivex.internal.operators.observable.ObservableRange.subscribeActual(ObservableRange.java:35)
	at io.reactivex.Observable.subscribe(Observable.java:12090)
	at io.reactivex.internal.operators.observable.ObservableSubscribeOn$SubscribeTask.run(ObservableSubscribeOn.java:96)
	at io.reactivex.Scheduler$DisposeTask.run(Scheduler.java:578)
	at io.reactivex.internal.schedulers.ScheduledRunnable.run(ScheduledRunnable.java:66)
	at io.reactivex.internal.schedulers.ScheduledRunnable.call(ScheduledRunnable.java:57)
	at java.util.concurrent.FutureTask.run(FutureTask.java:266)
	at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
	at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.RuntimeException: Unhandled error!
	at com.uber.anotherpackage.ReadMeExample.throwSomething(ReadMeExample.java:70)
	at com.uber.anotherpackage.ReadMeExample.lambda$complexDelegate$2(ReadMeExample.java:65)
	at io.reactivex.internal.observers.LambdaObserver.onNext(LambdaObserver.java:63)
	... 14 more

Similar to the first example, this isn't terrible to root-cause. onNext is throwing a traceable exception in this trivial case.

With tagging enabled:

io.reactivex.exceptions.OnErrorNotImplementedException: The exception was not handled due to missing onError handler in the subscribe() method call. Further reading: https://github.com/ReactiveX/RxJava/wiki/Error-Handling | Unhandled error!
	at io.reactivex.internal.functions.Functions$OnErrorMissingConsumer.accept(Functions.java:704)
	at io.reactivex.internal.functions.Functions$OnErrorMissingConsumer.accept(Functions.java:701)
	at io.reactivex.internal.observers.LambdaObserver.onError(LambdaObserver.java:77)
	at io.reactivex.internal.observers.LambdaObserver.onNext(LambdaObserver.java:67)
	at com.uber.rxdogtag.DogTagObserver.lambda$onNext$3(DogTagObserver.java:53)
	at com.uber.rxdogtag.RxDogTag.guardedDelegateCall(RxDogTag.java:262)
	at com.uber.rxdogtag.DogTagObserver.onNext(DogTagObserver.java:53)
	at io.reactivex.internal.operators.observable.ObservableSubscribeOn$SubscribeOnObserver.onNext(ObservableSubscribeOn.java:58)
	at io.reactivex.internal.operators.observable.ObservableRange$RangeDisposable.run(ObservableRange.java:64)
	at io.reactivex.internal.operators.observable.ObservableRange.subscribeActual(ObservableRange.java:35)
	at io.reactivex.Observable.subscribe(Observable.java:12090)
	at io.reactivex.internal.operators.observable.ObservableSubscribeOn$SubscribeTask.run(ObservableSubscribeOn.java:96)
	at io.reactivex.Scheduler$DisposeTask.run(Scheduler.java:578)
	at io.reactivex.internal.schedulers.ScheduledRunnable.run(ScheduledRunnable.java:66)
	at io.reactivex.internal.schedulers.ScheduledRunnable.call(ScheduledRunnable.java:57)
	at java.util.concurrent.FutureTask.run(FutureTask.java:266)
	at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
	at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.RuntimeException: Unhandled error!
	at [[ Originating callback: onNext ]].(:0)
	at [[ Inferred subscribe point ]].(:0)
	at com.uber.anotherpackage.ReadMeExample.complexDelegate(ReadMeExample.java:65)
	at [[ Original trace ]].(:0)
	at com.uber.anotherpackage.ReadMeExample.throwSomething(ReadMeExample.java:70)
	at com.uber.anotherpackage.ReadMeExample.lambda$complexDelegate$2(ReadMeExample.java:65)
	at io.reactivex.internal.observers.LambdaObserver.onNext(LambdaObserver.java:63)
	... 17 more

Now we're given added context that this occurred in the onNext callback according to [[ Originating callback: onNext ]] at the subscribe() call at ReadMeExample.java:60. This callback handling is supported for all the standard callbacks and will report the correct name for each (onSuccess, onComplete, etc).

Under the hood

This mechanism relies on a combination of RxJavaPlugins's on*Subscribe hooks and LambdaConsumerIntrospection.

The onSubscribe hooks allow RxDogTag to inspect incoming Observers and, if no error handling is implemented according to LambdaConsumerIntrospection, will wrap them in a custom decorating observer that records the necessary tagging information via Throwable created at subscribe-time. This gives us all the information needed later to report any errors. In the event of an error from upstream, the decorating observer will synthesize a new stacktrace with the deduced subscribe() line.

The Throwable created at subscribe-time does incur a non-zero cost to create, but in practice we haven't found this to be an issue. This mechanism is also lighter weight than assembly tracking, and more targeted since it tells you exactly where to look. The idea is that usually hardest part is finding where the error is happening, and not necessarily fixing it once you've found it.

More information on LambdaConsumerIntrospection can be found in its implementation PR: https://github.com/ReactiveX/RxJava/pull/5590

Decision tree:

-> Is the observer an instance of `LambdaConsumerIntrospection`?
  -> Does `hasCustomOnError()` return `false`?
    -> Decorate with a DogTag observer.
Clone this wiki locally