Skip to content

Commit

Permalink
Change the workers to capture the stack trace for all subsequent sche…
Browse files Browse the repository at this point in the history
…duled actions to increase the readability of uncaught and fatal exceptions that bubble up to the schedulers.
  • Loading branch information
abersnaze committed Jun 1, 2016
1 parent 271c83b commit c6ada18
Show file tree
Hide file tree
Showing 7 changed files with 119 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ ThreadWorker get() {
while (!expiringWorkerQueue.isEmpty()) {
ThreadWorker threadWorker = expiringWorkerQueue.poll();
if (threadWorker != null) {
threadWorker.resetContext();
return threadWorker;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,7 @@ private static class EventLoopWorker extends Scheduler.Worker {

EventLoopWorker(PoolWorker poolWorker) {
this.poolWorker = poolWorker;

poolWorker.resetContext();
}

@Override
Expand Down
5 changes: 3 additions & 2 deletions src/main/java/rx/internal/schedulers/ExecutorScheduler.java
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ public Worker createWorker() {

/** Worker that schedules tasks on the executor indirectly through a trampoline mechanism. */
static final class ExecutorSchedulerWorker extends Scheduler.Worker implements Runnable {
private final Throwable creationContext = SchedulerContextException.create();
final Executor executor;
// TODO: use a better performing structure for task tracking
final CompositeSubscription tasks;
Expand All @@ -64,7 +65,7 @@ public Subscription schedule(Action0 action) {
if (isUnsubscribed()) {
return Subscriptions.unsubscribed();
}
ScheduledAction ea = new ScheduledAction(action, tasks);
ScheduledAction ea = new ScheduledAction(action, tasks, creationContext);
tasks.add(ea);
queue.offer(ea);
if (wip.getAndIncrement() == 0) {
Expand Down Expand Up @@ -146,7 +147,7 @@ public void call() {
((ScheduledAction)s2).add(removeMas);
}
}
});
}, this.creationContext);
// This will make sure if ea.call() gets executed before this line
// we don't override the current task in mas.
first.set(ea);
Expand Down
11 changes: 8 additions & 3 deletions src/main/java/rx/internal/schedulers/NewThreadWorker.java
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
* @warn class description missing
*/
public class NewThreadWorker extends Scheduler.Worker implements Subscription {
private Throwable creationContext = SchedulerContextException.create();
private final ScheduledExecutorService executor;
private final RxJavaSchedulersHook schedulersHook;
volatile boolean isUnsubscribed;
Expand Down Expand Up @@ -234,7 +235,7 @@ public Subscription schedule(final Action0 action, long delayTime, TimeUnit unit
*/
public ScheduledAction scheduleActual(final Action0 action, long delayTime, TimeUnit unit) {
Action0 decoratedAction = schedulersHook.onSchedule(action);
ScheduledAction run = new ScheduledAction(decoratedAction);
ScheduledAction run = new ScheduledAction(decoratedAction, creationContext);
Future<?> f;
if (delayTime <= 0) {
f = executor.submit(run);
Expand All @@ -247,7 +248,7 @@ public ScheduledAction scheduleActual(final Action0 action, long delayTime, Time
}
public ScheduledAction scheduleActual(final Action0 action, long delayTime, TimeUnit unit, CompositeSubscription parent) {
Action0 decoratedAction = schedulersHook.onSchedule(action);
ScheduledAction run = new ScheduledAction(decoratedAction, parent);
ScheduledAction run = new ScheduledAction(decoratedAction, parent, creationContext);
parent.add(run);

Future<?> f;
Expand All @@ -263,7 +264,7 @@ public ScheduledAction scheduleActual(final Action0 action, long delayTime, Time

public ScheduledAction scheduleActual(final Action0 action, long delayTime, TimeUnit unit, SubscriptionList parent) {
Action0 decoratedAction = schedulersHook.onSchedule(action);
ScheduledAction run = new ScheduledAction(decoratedAction, parent);
ScheduledAction run = new ScheduledAction(decoratedAction, parent, creationContext);
parent.add(run);

Future<?> f;
Expand All @@ -288,4 +289,8 @@ public void unsubscribe() {
public boolean isUnsubscribed() {
return isUnsubscribed;
}

public void resetContext() {
creationContext = SchedulerContextException.create();
}
}
12 changes: 9 additions & 3 deletions src/main/java/rx/internal/schedulers/ScheduledAction.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import java.util.concurrent.atomic.*;

import rx.Subscription;
import rx.exceptions.Exceptions;
import rx.exceptions.OnErrorNotImplementedException;
import rx.functions.Action0;
import rx.internal.util.SubscriptionList;
Expand All @@ -34,18 +35,22 @@ public final class ScheduledAction extends AtomicReference<Thread> implements Ru
private static final long serialVersionUID = -3962399486978279857L;
final SubscriptionList cancel;
final Action0 action;
final Throwable creationContext;

public ScheduledAction(Action0 action) {
public ScheduledAction(Action0 action, Throwable creationContext) {
this.action = action;
this.cancel = new SubscriptionList();
this.creationContext = creationContext;
}
public ScheduledAction(Action0 action, CompositeSubscription parent) {
public ScheduledAction(Action0 action, CompositeSubscription parent, Throwable creationContext) {
this.action = action;
this.cancel = new SubscriptionList(new Remover(this, parent));
this.creationContext = creationContext;
}
public ScheduledAction(Action0 action, SubscriptionList parent) {
public ScheduledAction(Action0 action, SubscriptionList parent, Throwable creationContext) {
this.action = action;
this.cancel = new SubscriptionList(new Remover2(this, parent));
this.creationContext = creationContext;
}

@Override
Expand All @@ -61,6 +66,7 @@ public void run() {
} else {
ie = new IllegalStateException("Fatal Exception thrown on Scheduler.Worker thread.", e);
}
Exceptions.addCause(ie, creationContext);
RxJavaPlugins.getInstance().getErrorHandler().handleError(ie);
Thread thread = Thread.currentThread();
thread.getUncaughtExceptionHandler().uncaughtException(thread, ie);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
/**
* Copyright 2016 Netflix, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package rx.internal.schedulers;

/**
* Used only for providing context around where work was scheduled should an error occur in a different thread.
*/
public class SchedulerContextException extends Exception {
/**
* Constant to use when disabled
*/
private static final Throwable CONTEXT_MISSING = new SchedulerContextException("Missing context. Enable by setting the system property \"rxjava.captureSchedulerContext=true\"");

static {
CONTEXT_MISSING.setStackTrace(new StackTraceElement[0]);
}

/**
* @return a {@link Throwable} that captures the stack trace or a {@link Throwable} that documents how to enable the feature if needed.
*/
public static Throwable create() {
String def = "false";
String setTo = System.getProperty("rxjava.captureSchedulerContext", def);
return setTo != def && "true".equals(setTo) ? new SchedulerContextException("Asynchronous work scheduled at") : CONTEXT_MISSING;
}

private SchedulerContextException(String message) {
super(message);
}

private static final long serialVersionUID = 1L;
}
52 changes: 52 additions & 0 deletions src/test/java/rx/schedulers/AbstractSchedulerTests.java
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,8 @@
import rx.functions.Action0;
import rx.functions.Action1;
import rx.functions.Func1;
import rx.plugins.RxJavaErrorHandler;
import rx.plugins.RxJavaPlugins;

/**
* Base tests for all schedulers including Immediate/Current.
Expand Down Expand Up @@ -502,4 +504,54 @@ public void onNext(T args) {

}

@Test
public final void testStackTraceAcrossThreads() throws Throwable {
final AtomicReference<Throwable> exceptionRef = new AtomicReference<Throwable>();
final CountDownLatch done = new CountDownLatch(1);
System.setProperty("rxjava.captureSchedulerContext", "true");

try {

RxJavaPlugins.getInstance().reset();
RxJavaPlugins.getInstance().registerErrorHandler(new RxJavaErrorHandler() {
@Override
public void handleError(Throwable e) {
exceptionRef.set(e);
done.countDown();
}
});

try {
getScheduler().createWorker().schedule(new Action0() {
@Override
public void call() {
throw new RuntimeException();
}
});
} catch (Exception e) {
exceptionRef.set(e);
done.countDown();
}

done.await();

Throwable exception = exceptionRef.get();
Throwable e = exception;
while (e.getCause() != null) {
e = e.getCause();
}

StackTraceElement[] st = e.getStackTrace();
for (StackTraceElement stackTraceElement : st) {
if (stackTraceElement.getMethodName().equals("testStackTraceAcrossThreads")) {
// pass we found this class in the stack trace.
return;
}
}

throw exception;
} finally {
System.setProperty("rxjava.captureSchedulerContext", "false");
}
}
}

0 comments on commit c6ada18

Please sign in to comment.