Skip to content

Commit

Permalink
Merge pull request #922 from abersnaze/debug-updates
Browse files Browse the repository at this point in the history
Changes made while integrating it with our internal system
  • Loading branch information
benjchristensen committed Feb 24, 2014
2 parents 747f97e + 4258d77 commit dd2d410
Show file tree
Hide file tree
Showing 5 changed files with 211 additions and 100 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -4,47 +4,72 @@
import rx.Observer;
import rx.Subscriber;
import rx.functions.Action1;
import rx.functions.Action2;
import rx.functions.Func1;
import rx.plugins.DebugNotification;

public final class DebugSubscriber<T> extends Subscriber<T> {
public final class DebugSubscriber<T, C> extends Subscriber<T> {
private final Func1<T, T> onNextHook;
final Action1<DebugNotification> events;
final Observer<? super T> o;
Operator<? extends T, ?> from = null;
Operator<?, ? super T> to = null;
private final Func1<DebugNotification, C> start;
private final Action1<C> complete;
private final Action2<C, Throwable> error;
private final Observer<? super T> o;
private Operator<? extends T, ?> from = null;
private Operator<?, ? super T> to = null;

public DebugSubscriber(
Func1<T, T> onNextHook,
Action1<DebugNotification> _events,
Func1<DebugNotification, C> start,
Action1<C> complete,
Action2<C, Throwable> error,
Subscriber<? super T> _o,
Operator<? extends T, ?> _out,
Operator<?, ? super T> _in) {
super(_o);
this.events = _events;
this.start = start;
this.complete = complete;
this.error = error;
this.o = _o;
this.onNextHook = onNextHook;
this.from = _out;
this.to = _in;
this.add(new DebugSubscription<T>(this));
this.add(new DebugSubscription<T, C>(this, start, complete, error));
}

@Override
public void onCompleted() {
events.call(DebugNotification.createOnCompleted(o, from, to));
o.onCompleted();
final DebugNotification<T, C> n = DebugNotification.createOnCompleted(o, from, to);
C context = start.call(n);
try {
o.onCompleted();
complete.call(context);
} catch (Throwable e) {
error.call(context, e);
}
}

@Override
public void onError(Throwable e) {
events.call(DebugNotification.createOnError(o, from, e, to));
o.onError(e);
final DebugNotification<T, C> n = DebugNotification.createOnError(o, from, e, to);
C context = start.call(n);
try {
o.onError(e);
complete.call(context);
} catch (Throwable e2) {
error.call(context, e2);
}
}

@Override
public void onNext(T t) {
events.call(DebugNotification.createOnNext(o, from, t, to));
o.onNext(onNextHook.call(t));
final DebugNotification<T, C> n = DebugNotification.createOnNext(o, from, t, to);
C context = start.call(n);
try {
o.onNext(onNextHook.call(t));
complete.call(context);
} catch (Throwable e) {
error.call(context, e);
}
}

public Operator<? extends T, ?> getFrom() {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,19 +1,34 @@
package rx.operators;

import rx.Subscription;
import rx.functions.Action1;
import rx.functions.Action2;
import rx.functions.Func1;
import rx.plugins.DebugNotification;

final class DebugSubscription<T> implements Subscription {
private final DebugSubscriber<T> debugObserver;
final class DebugSubscription<T, C> implements Subscription {
private final DebugSubscriber<T, C> debugObserver;
private final Func1<DebugNotification, C> start;
private final Action1<C> complete;
private final Action2<C, Throwable> error;

DebugSubscription(DebugSubscriber<T> debugObserver) {
DebugSubscription(DebugSubscriber<T, C> debugObserver, Func1<DebugNotification, C> start, Action1<C> complete, Action2<C, Throwable> error) {
this.debugObserver = debugObserver;
this.start = start;
this.complete = complete;
this.error = error;
}

@Override
public void unsubscribe() {
debugObserver.events.call(DebugNotification.<T> createUnsubscribe(debugObserver.o, debugObserver.from, debugObserver.to));
debugObserver.unsubscribe();
final DebugNotification<T, C> n = DebugNotification.<T, C> createUnsubscribe(debugObserver.getActual(), debugObserver.getFrom(), debugObserver.getTo());
C context = start.call(n);
try {
debugObserver.unsubscribe();
complete.call(context);
} catch (Throwable e) {
error.call(context, e);
}
}

@Override
Expand Down
57 changes: 40 additions & 17 deletions rxjava-contrib/rxjava-debug/src/main/java/rx/plugins/DebugHook.java
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import rx.Subscriber;
import rx.Subscription;
import rx.functions.Action1;
import rx.functions.Action2;
import rx.functions.Actions;
import rx.functions.Func1;
import rx.functions.Functions;
Expand All @@ -17,9 +18,11 @@
*
* @author gscampbell
*/
public class DebugHook extends RxJavaObservableExecutionHook {
public class DebugHook<C> extends RxJavaObservableExecutionHook {
private final Func1 onNextHook;
private final Action1<DebugNotification> events;
private final Func1<DebugNotification, C> start;
private final Action1<C> complete;
private final Action2<C, Throwable> error;

/**
* Creates a new instance of the DebugHook RxJava plug-in that can be passed into
Expand All @@ -31,18 +34,26 @@ public class DebugHook extends RxJavaObservableExecutionHook {
* @param events
* This action is invoked as each notification is generated
*/
public DebugHook(Func1 onNextDataHook, Action1<DebugNotification> events) {
public DebugHook(Func1 onNextDataHook, Func1<DebugNotification, C> start, Action1<C> complete, Action2<C, Throwable> error) {
this.complete = complete;
this.error = error;
this.onNextHook = onNextDataHook == null ? Functions.identity() : onNextDataHook;
this.events = events == null ? Actions.empty() : events;
this.start = (Func1<DebugNotification, C>) (start == null ? Actions.empty() : start);
}

@Override
public <T> OnSubscribe<T> onSubscribeStart(Observable<? extends T> observableInstance, final OnSubscribe<T> f) {
public <T> OnSubscribe<T> onSubscribeStart(final Observable<? extends T> observableInstance, final OnSubscribe<T> f) {
return new OnSubscribe<T>() {
@Override
public void call(Subscriber<? super T> o) {
events.call(DebugNotification.createSubscribe(o, f));
f.call(wrapOutbound(null, o));
C context = start.call(DebugNotification.createSubscribe(o, observableInstance, f));
try {
f.call(wrapOutbound(null, o));
complete.call(context);
}
catch(Throwable e) {
error.call(context, e);
}
}
};
}
Expand All @@ -54,12 +65,7 @@ public <T> Subscription onSubscribeReturn(Observable<? extends T> observableInst

@Override
public <T> OnSubscribe<T> onCreate(final OnSubscribe<T> f) {
return new OnSubscribe<T>() {
@Override
public void call(Subscriber<? super T> o) {
f.call(wrapInbound(null, o));
}
};
return new OnCreateWrapper<T>(f);
}

@Override
Expand All @@ -81,19 +87,36 @@ public <T> Subscription onAdd(Subscriber<T> subscriber, Subscription s) {
private <R> Subscriber<? super R> wrapOutbound(Operator<? extends R, ?> bind, Subscriber<? super R> o) {
if (o instanceof DebugSubscriber) {
if (bind != null)
((DebugSubscriber<R>) o).setFrom(bind);
((DebugSubscriber<R, C>) o).setFrom(bind);
return o;
}
return new DebugSubscriber<R>(onNextHook, events, o, bind, null);
return new DebugSubscriber<R, C>(onNextHook, start, complete, error, o, bind, null);
}

@SuppressWarnings("unchecked")
private <T> Subscriber<? super T> wrapInbound(Operator<?, ? super T> bind, Subscriber<? super T> o) {
if (o instanceof DebugSubscriber) {
if (bind != null)
((DebugSubscriber<T>) o).setTo(bind);
((DebugSubscriber<T, C>) o).setTo(bind);
return o;
}
return new DebugSubscriber<T>(onNextHook, events, o, null, bind);
return new DebugSubscriber<T, C>(onNextHook, start, complete, error, o, null, bind);
}

public final class OnCreateWrapper<T> implements OnSubscribe<T> {
private final OnSubscribe<T> f;

private OnCreateWrapper(OnSubscribe<T> f) {
this.f = f;
}

@Override
public void call(Subscriber<? super T> o) {
f.call(wrapInbound(null, o));
}

public OnSubscribe<T> getActual() {
return f;
}
}
}
Original file line number Diff line number Diff line change
@@ -1,103 +1,119 @@
package rx.plugins;

import rx.Notification;
import rx.Observable;
import rx.Observable.OnSubscribe;
import rx.Observable.Operator;
import rx.Observer;
import rx.observers.SafeSubscriber;
import rx.operators.DebugSubscriber;

public class DebugNotification<T> {
public class DebugNotification<T, C> {
public static enum Kind {
OnNext, OnError, OnCompleted, Subscribe, Unsubscribe
}

private final OnSubscribe<T> source;
private final Observable<? extends T> source;
private final OnSubscribe<T> sourceFunc;
private final Operator<? extends T, ?> from;
private final Kind kind;
private final Notification<T> notification;
private final Operator<?, ? super T> to;
private final long nanoTime;
private final long threadId;
private Observer o;
private final Throwable throwable;
private final T value;
private final Observer observer;

public static <T> DebugNotification<T> createSubscribe(Observer<? super T> o, OnSubscribe<T> source) {
public static <T, C> DebugNotification<T, C> createSubscribe(Observer<? super T> o, Observable<? extends T> source, OnSubscribe<T> sourceFunc) {
Operator<?, ? super T> to = null;
Operator<? extends T, ?> from = null;
if (o instanceof SafeSubscriber) {
o = ((SafeSubscriber) o).getActual();
}
if (o instanceof DebugSubscriber) {
to = ((DebugSubscriber<T>) o).getTo();
from = ((DebugSubscriber<T>) o).getFrom();
to = ((DebugSubscriber<T, C>) o).getTo();
from = ((DebugSubscriber<T, C>) o).getFrom();
o = ((DebugSubscriber) o).getActual();
}
return new DebugNotification<T>(o, from, Kind.Subscribe, null, to, source);
if (sourceFunc instanceof DebugHook.OnCreateWrapper) {
sourceFunc = ((DebugHook.OnCreateWrapper) sourceFunc).getActual();
}
return new DebugNotification<T, C>(o, from, Kind.Subscribe, null, null, to, source, sourceFunc);
}

public static <T> DebugNotification<T> createOnNext(Observer<? super T> o, Operator<? extends T, ?> from, T t, Operator<?, ? super T> to) {
return new DebugNotification<T>(o, from, Kind.OnNext, Notification.createOnNext(t), to, null);
public static <T, C> DebugNotification<T, C> createOnNext(Observer<? super T> o, Operator<? extends T, ?> from, T t, Operator<?, ? super T> to) {
return new DebugNotification<T, C>(o, from, Kind.OnNext, t, null, to, null, null);
}

public static <T> DebugNotification<T> createOnError(Observer<? super T> o, Operator<? extends T, ?> from, Throwable e, Operator<?, ? super T> to) {
return new DebugNotification<T>(o, from, Kind.OnError, Notification.<T> createOnError(e), to, null);
public static <T, C> DebugNotification<T, C> createOnError(Observer<? super T> o, Operator<? extends T, ?> from, Throwable e, Operator<?, ? super T> to) {
return new DebugNotification<T, C>(o, from, Kind.OnError, null, e, to, null, null);
}

public static <T> DebugNotification<T> createOnCompleted(Observer<? super T> o, Operator<? extends T, ?> from, Operator<?, ? super T> to) {
return new DebugNotification<T>(o, from, Kind.OnCompleted, Notification.<T> createOnCompleted(), to, null);
public static <T, C> DebugNotification<T, C> createOnCompleted(Observer<? super T> o, Operator<? extends T, ?> from, Operator<?, ? super T> to) {
return new DebugNotification<T, C>(o, from, Kind.OnCompleted, null, null, to, null, null);
}

public static <T> DebugNotification<T> createUnsubscribe(Observer<? super T> o, Operator<? extends T, ?> from, Operator<?, ? super T> to) {
return new DebugNotification<T>(o, from, Kind.Unsubscribe, null, to, null);
public static <T, C> DebugNotification<T, C> createUnsubscribe(Observer<? super T> o, Operator<? extends T, ?> from, Operator<?, ? super T> to) {
return new DebugNotification<T, C>(o, from, Kind.Unsubscribe, null, null, to, null, null);
}

private DebugNotification(Observer o, Operator<? extends T, ?> from, Kind kind, Notification<T> notification, Operator<?, ? super T> to, OnSubscribe<T> source) {
this.o = (o instanceof SafeSubscriber) ? ((SafeSubscriber) o).getActual() : o;
private DebugNotification(Observer o, Operator<? extends T, ?> from, Kind kind, T value, Throwable throwable, Operator<?, ? super T> to, Observable<? extends T> source, OnSubscribe<T> sourceFunc) {
this.observer = (o instanceof SafeSubscriber) ? ((SafeSubscriber) o).getActual() : o;
this.from = from;
this.kind = kind;
this.notification = notification;
this.value = value;
this.throwable = throwable;
this.to = to;
this.source = source;
this.nanoTime = System.nanoTime();
this.threadId = Thread.currentThread().getId();
this.sourceFunc = sourceFunc;
}

public Observer getObserver() {
return observer;
}

public Operator<? extends T, ?> getFrom() {
return from;
}

public Notification<T> getNotification() {
return notification;
public T getValue() {
return value;
}

public Operator<?, ? super T> getTo() {
return to;
public Throwable getThrowable() {
return throwable;
}

public long getNanoTime() {
return nanoTime;
}

public long getThreadId() {
return threadId;
public Operator<?, ? super T> getTo() {
return to;
}

public Kind getKind() {
return kind;
}

public Observable<? extends T> getSource() {
return source;
}

public OnSubscribe<T> getSourceFunc() {
return sourceFunc;
}

@Override
/**
* Does a very bad job of making JSON like string.
*/
public String toString() {
final StringBuilder s = new StringBuilder("{");
s.append(" \"nano\": ").append(nanoTime);
s.append(", \"thread\": ").append(threadId);
s.append(", \"observer\": \"").append(o.getClass().getName()).append("@").append(Integer.toHexString(o.hashCode())).append("\"");
s.append("\"observer\": \"").append(observer.getClass().getName()).append("@").append(Integer.toHexString(observer.hashCode())).append("\"");
s.append(", \"type\": \"").append(kind).append("\"");
if (notification != null) {
if (notification.hasValue())
s.append(", \"value\": \"").append(notification.getValue()).append("\"");
if (notification.hasThrowable())
s.append(", \"exception\": \"").append(notification.getThrowable().getMessage().replace("\\", "\\\\").replace("\"", "\\\"")).append("\"");
}
if (kind == Kind.OnNext)
// not json safe
s.append(", \"value\": \"").append(value).append("\"");
if (kind == Kind.OnError)
s.append(", \"exception\": \"").append(throwable.getMessage().replace("\\", "\\\\").replace("\"", "\\\"")).append("\"");
if (source != null)
s.append(", \"source\": \"").append(source.getClass().getName()).append("@").append(Integer.toHexString(source.hashCode())).append("\"");
if (sourceFunc != null)
s.append(", \"sourceFunc\": \"").append(sourceFunc.getClass().getName()).append("@").append(Integer.toHexString(sourceFunc.hashCode())).append("\"");
if (from != null)
s.append(", \"from\": \"").append(from.getClass().getName()).append("@").append(Integer.toHexString(from.hashCode())).append("\"");
if (to != null)
Expand Down
Loading

0 comments on commit dd2d410

Please sign in to comment.