From 3d9842ff25cf5df85eb82a41e52e78b0c1f26053 Mon Sep 17 00:00:00 2001 From: George Campbell Date: Mon, 27 Jan 2014 15:53:43 -0800 Subject: [PATCH 1/2] Setting up the new subproject for debugging observable chains. --- rxjava-contrib/rxjava-debug/build.gradle | 30 ++++ .../java/rx/operators/DebugSubscriber.java | 68 +++++++++ .../java/rx/operators/DebugSubscription.java | 23 ++++ .../src/main/java/rx/plugins/DebugHook.java | 99 +++++++++++++ .../java/rx/plugins/DebugNotification.java | 109 +++++++++++++++ .../java/rx/plugins/NotificationEvent.java | 104 ++++++++++++++ .../src/test/java/rx/debug/DebugHookTest.java | 130 ++++++++++++++++++ .../src/test/java/rx/plugins/PlugReset.java | 7 + rxjava-core/src/main/java/rx/Observable.java | 6 +- .../java/rx/observers/SafeSubscriber.java | 3 + .../RxJavaObservableExecutionHook.java | 13 +- .../main/java/rx/util/functions/Actions.java | 53 ++++++- settings.gradle | 1 + 13 files changed, 642 insertions(+), 4 deletions(-) create mode 100644 rxjava-contrib/rxjava-debug/build.gradle create mode 100644 rxjava-contrib/rxjava-debug/src/main/java/rx/operators/DebugSubscriber.java create mode 100644 rxjava-contrib/rxjava-debug/src/main/java/rx/operators/DebugSubscription.java create mode 100644 rxjava-contrib/rxjava-debug/src/main/java/rx/plugins/DebugHook.java create mode 100644 rxjava-contrib/rxjava-debug/src/main/java/rx/plugins/DebugNotification.java create mode 100644 rxjava-contrib/rxjava-debug/src/main/java/rx/plugins/NotificationEvent.java create mode 100644 rxjava-contrib/rxjava-debug/src/test/java/rx/debug/DebugHookTest.java create mode 100644 rxjava-contrib/rxjava-debug/src/test/java/rx/plugins/PlugReset.java diff --git a/rxjava-contrib/rxjava-debug/build.gradle b/rxjava-contrib/rxjava-debug/build.gradle new file mode 100644 index 0000000000..13775cd88e --- /dev/null +++ b/rxjava-contrib/rxjava-debug/build.gradle @@ -0,0 +1,30 @@ +apply plugin: 'osgi' + +sourceCompatibility = JavaVersion.VERSION_1_6 +targetCompatibility = JavaVersion.VERSION_1_6 + +dependencies { + compile project(':rxjava-core') + testCompile project(":rxjava-core").sourceSets.test.output + provided 'junit:junit-dep:4.10' + provided 'org.mockito:mockito-core:1.8.5' +} + +javadoc { + options { + doclet = "org.benjchristensen.doclet.DocletExclude" + docletpath = [rootProject.file('./gradle/doclet-exclude.jar')] + stylesheetFile = rootProject.file('./gradle/javadocStyleSheet.css') + windowTitle = "RxJava Javadoc ${project.version}" + } + options.addStringOption('top').value = '

RxJava

' +} + +jar { + manifest { + name = 'rxjava-debug' + instruction 'Bundle-Vendor', 'Netflix' + instruction 'Bundle-DocURL', 'https://github.com/Netflix/RxJava' + instruction 'Import-Package', '!org.junit,!junit.framework,!org.mockito.*,*' + } +} diff --git a/rxjava-contrib/rxjava-debug/src/main/java/rx/operators/DebugSubscriber.java b/rxjava-contrib/rxjava-debug/src/main/java/rx/operators/DebugSubscriber.java new file mode 100644 index 0000000000..5e17cb9151 --- /dev/null +++ b/rxjava-contrib/rxjava-debug/src/main/java/rx/operators/DebugSubscriber.java @@ -0,0 +1,68 @@ +package rx.operators; + +import rx.Observer; +import rx.Subscriber; +import rx.plugins.DebugNotification; +import rx.util.functions.Action1; +import rx.util.functions.Func1; + +public final class DebugSubscriber extends Subscriber { + private final Func1 onNextHook; + final Action1 events; + final Observer o; + Operator from = null; + Operator to = null; + + public DebugSubscriber( + Func1 onNextHook, + Action1 _events, + Subscriber _o, + Operator _out, + Operator _in) { + super(_o); + this.events = _events; + this.o = _o; + this.onNextHook = onNextHook; + this.from = _out; + this.to = _in; + this.add(new DebugSubscription(this)); + } + + @Override + public void onCompleted() { + events.call(DebugNotification.createOnCompleted(o, from, to)); + o.onCompleted(); + } + + @Override + public void onError(Throwable e) { + events.call(DebugNotification.createOnError(o, from, e, to)); + o.onError(e); + } + + @Override + public void onNext(T t) { + events.call(DebugNotification.createOnNext(o, from, t, to)); + o.onNext(onNextHook.call(t)); + } + + public Operator getFrom() { + return from; + } + + public void setFrom(Operator op) { + this.from = op; + } + + public Operator getTo() { + return to; + } + + public void setTo(Operator op) { + this.to = op; + } + + public Observer getActual() { + return o; + } +} \ No newline at end of file diff --git a/rxjava-contrib/rxjava-debug/src/main/java/rx/operators/DebugSubscription.java b/rxjava-contrib/rxjava-debug/src/main/java/rx/operators/DebugSubscription.java new file mode 100644 index 0000000000..e82960d5fe --- /dev/null +++ b/rxjava-contrib/rxjava-debug/src/main/java/rx/operators/DebugSubscription.java @@ -0,0 +1,23 @@ +package rx.operators; + +import rx.Subscription; +import rx.plugins.DebugNotification; + +final class DebugSubscription implements Subscription { + private final DebugSubscriber debugObserver; + + DebugSubscription(DebugSubscriber debugObserver) { + this.debugObserver = debugObserver; + } + + @Override + public void unsubscribe() { + debugObserver.events.call(DebugNotification. createUnsubscribe(debugObserver.o, debugObserver.from, debugObserver.to)); + debugObserver.unsubscribe(); + } + + @Override + public boolean isUnsubscribed() { + return debugObserver.isUnsubscribed(); + } +} \ No newline at end of file diff --git a/rxjava-contrib/rxjava-debug/src/main/java/rx/plugins/DebugHook.java b/rxjava-contrib/rxjava-debug/src/main/java/rx/plugins/DebugHook.java new file mode 100644 index 0000000000..f530de6e79 --- /dev/null +++ b/rxjava-contrib/rxjava-debug/src/main/java/rx/plugins/DebugHook.java @@ -0,0 +1,99 @@ +package rx.plugins; + +import rx.Observable; +import rx.Observable.OnSubscribe; +import rx.Subscriber; +import rx.Subscription; +import rx.operators.DebugSubscriber; +import rx.operators.Operator; +import rx.util.functions.Action1; +import rx.util.functions.Actions; +import rx.util.functions.Func1; +import rx.util.functions.Functions; + +/** + * Implements hooks into the {@link Observable} chain to emit a detailed account of all the events + * that happened. + * + * @author gscampbell + */ +public class DebugHook extends RxJavaObservableExecutionHook { + private final Func1 onNextHook; + private final Action1 events; + + /** + * Creates a new instance of the DebugHook RxJava plug-in that can be passed into + * {@link RxJavaPlugins} registerObservableExecutionHook(hook) method. + * + * @param onNextDataHook + * all of the onNext values are passed through this function to allow for + * manipulation of the values + * @param events + * This action is invoked as each notification is generated + */ + public DebugHook(Func1 onNextDataHook, Action1 events) { + this.onNextHook = onNextDataHook == null ? Functions.identity() : onNextDataHook; + this.events = events == null ? Actions.empty() : events; + } + + @Override + public OnSubscribe onSubscribeStart(Observable observableInstance, final OnSubscribe f) { + return new OnSubscribe() { + @Override + public void call(Subscriber o) { + events.call(DebugNotification.createSubscribe(o, f)); + f.call(wrapOutbound(null, o)); + } + }; + } + + @Override + public Subscription onSubscribeReturn(Observable observableInstance, Subscription subscription) { + return subscription; + } + + @Override + public OnSubscribe onCreate(final OnSubscribe f) { + return new OnSubscribe() { + @Override + public void call(Subscriber o) { + f.call(wrapInbound(null, o)); + } + }; + } + + @Override + public Operator onLift(final Operator bind) { + return new Operator() { + @Override + public Subscriber call(final Subscriber o) { + return wrapInbound(bind, bind.call(wrapOutbound(bind, o))); + } + }; + } + + @Override + public Subscription onAdd(Subscriber subscriber, Subscription s) { + return s; + } + + @SuppressWarnings("unchecked") + private Subscriber wrapOutbound(Operator bind, Subscriber o) { + if (o instanceof DebugSubscriber) { + if (bind != null) + ((DebugSubscriber) o).setFrom(bind); + return o; + } + return new DebugSubscriber(onNextHook, events, o, bind, null); + } + + @SuppressWarnings("unchecked") + private Subscriber wrapInbound(Operator bind, Subscriber o) { + if (o instanceof DebugSubscriber) { + if (bind != null) + ((DebugSubscriber) o).setTo(bind); + return o; + } + return new DebugSubscriber(onNextHook, events, o, null, bind); + } +} diff --git a/rxjava-contrib/rxjava-debug/src/main/java/rx/plugins/DebugNotification.java b/rxjava-contrib/rxjava-debug/src/main/java/rx/plugins/DebugNotification.java new file mode 100644 index 0000000000..af66b94217 --- /dev/null +++ b/rxjava-contrib/rxjava-debug/src/main/java/rx/plugins/DebugNotification.java @@ -0,0 +1,109 @@ +package rx.plugins; + +import rx.Notification; +import rx.Observable.OnSubscribe; +import rx.Observer; +import rx.observers.SafeSubscriber; +import rx.operators.DebugSubscriber; +import rx.operators.Operator; +import rx.plugins.DebugNotification.Kind; + +public class DebugNotification { + public static enum Kind { + OnNext, OnError, OnCompleted, Subscribe, Unsubscribe + } + + private final OnSubscribe source; + private final Operator from; + private final Kind kind; + private final Notification notification; + private final Operator to; + private final long nanoTime; + private final long threadId; + private Observer o; + + public static DebugNotification createSubscribe(Observer o, OnSubscribe source) { + Operator to = null; + Operator from = null; + if (o instanceof DebugSubscriber) { + to = ((DebugSubscriber) o).getTo(); + from = ((DebugSubscriber) o).getFrom(); + o = ((DebugSubscriber) o).getActual(); + } + return new DebugNotification(o, from, Kind.Subscribe, null, to, source); + } + + public static DebugNotification createOnNext(Observer o, Operator from, T t, Operator to) { + return new DebugNotification(o, from, Kind.OnNext, Notification.createOnNext(t), to, null); + } + + public static DebugNotification createOnError(Observer o, Operator from, Throwable e, Operator to) { + return new DebugNotification(o, from, Kind.OnError, Notification. createOnError(e), to, null); + } + + public static DebugNotification createOnCompleted(Observer o, Operator from, Operator to) { + return new DebugNotification(o, from, Kind.OnCompleted, Notification. createOnCompleted(), to, null); + } + + public static DebugNotification createUnsubscribe(Observer o, Operator from, Operator to) { + return new DebugNotification(o, from, Kind.Unsubscribe, null, to, null); + } + + private DebugNotification(Observer o, Operator from, Kind kind, Notification notification, Operator to, OnSubscribe source) { + this.o = (o instanceof SafeSubscriber) ? ((SafeSubscriber) o).getActual() : o; + this.from = from; + this.kind = kind; + this.notification = notification; + this.to = to; + this.source = source; + this.nanoTime = System.nanoTime(); + this.threadId = Thread.currentThread().getId(); + } + + public Operator getFrom() { + return from; + } + + public Notification getNotification() { + return notification; + } + + public Operator getTo() { + return to; + } + + public long getNanoTime() { + return nanoTime; + } + + public long getThreadId() { + return threadId; + } + + public Kind getKind() { + return kind; + } + + @Override + public String toString() { + final StringBuilder s = new StringBuilder("{"); + s.append(" \"nano\": ").append(nanoTime); + s.append(", \"thread\": ").append(threadId); + s.append(", \"observer\": \"").append(o.getClass().getName()).append("@").append(Integer.toHexString(o.hashCode())).append("\""); + s.append(", \"type\": \"").append(kind).append("\""); + if (notification != null) { + if (notification.hasValue()) + s.append(", \"value\": \"").append(notification.getValue()).append("\""); + if (notification.hasThrowable()) + s.append(", \"exception\": \"").append(notification.getThrowable().getMessage().replace("\\", "\\\\").replace("\"", "\\\"")).append("\""); + } + if (source != null) + s.append(", \"source\": \"").append(source.getClass().getName()).append("@").append(Integer.toHexString(source.hashCode())).append("\""); + if (from != null) + s.append(", \"from\": \"").append(from.getClass().getName()).append("@").append(Integer.toHexString(from.hashCode())).append("\""); + if (to != null) + s.append(", \"to\": \"").append(to.getClass().getName()).append("@").append(Integer.toHexString(to.hashCode())).append("\""); + s.append("}"); + return s.toString(); + } +} diff --git a/rxjava-contrib/rxjava-debug/src/main/java/rx/plugins/NotificationEvent.java b/rxjava-contrib/rxjava-debug/src/main/java/rx/plugins/NotificationEvent.java new file mode 100644 index 0000000000..998c562529 --- /dev/null +++ b/rxjava-contrib/rxjava-debug/src/main/java/rx/plugins/NotificationEvent.java @@ -0,0 +1,104 @@ +package rx.plugins; + +import rx.Notification; +import rx.Observable.OnSubscribe; +import rx.Observer; +import rx.observers.SafeSubscriber; +import rx.operators.DebugSubscriber; +import rx.operators.Operator; + +public class NotificationEvent { + public static enum Kind { + OnNext, OnError, OnCompleted, Subscribe, Unsubscribe + } + + private final OnSubscribe source; + private final Operator from; + private final Kind kind; + private final Notification notification; + private final Operator to; + private final long nanoTime; + private final long threadId; + private Observer o; + + public static NotificationEvent createSubscribe(Observer o, OnSubscribe source) { + Operator to = null; + Operator from = null; + if (o instanceof DebugSubscriber) { + to = ((DebugSubscriber) o).getTo(); + from = ((DebugSubscriber) o).getFrom(); + o = ((DebugSubscriber) o).getActual(); + } + return new NotificationEvent(o, from, Kind.Subscribe, null, to, source); + } + + public static NotificationEvent createOnNext(Observer o, Operator from, T t, Operator to) { + return new NotificationEvent(o, from, Kind.OnNext, Notification.createOnNext(t), to, null); + } + + public static NotificationEvent createOnError(Observer o, Operator from, Throwable e, Operator to) { + return new NotificationEvent(o, from, Kind.OnError, Notification. createOnError(e), to, null); + } + + public static NotificationEvent createOnCompleted(Observer o, Operator from, Operator to) { + return new NotificationEvent(o, from, Kind.OnCompleted, Notification. createOnCompleted(), to, null); + } + + public static NotificationEvent createUnsubscribe(Observer o, Operator from, Operator to) { + return new NotificationEvent(o, from, Kind.Unsubscribe, null, to, null); + } + + private NotificationEvent(Observer o, Operator from, Kind kind, Notification notification, Operator to, OnSubscribe source) { + this.o = (o instanceof SafeSubscriber) ? ((SafeSubscriber) o).getActual() : o; + this.from = from; + this.kind = kind; + this.notification = notification; + this.to = to; + this.source = source; + this.nanoTime = System.nanoTime(); + this.threadId = Thread.currentThread().getId(); + } + + public Operator getFrom() { + return from; + } + + public Notification getNotification() { + return notification; + } + + public Operator getTo() { + return to; + } + + public long getNanoTime() { + return nanoTime; + } + + public long getThreadId() { + return threadId; + } + + @Override + public String toString() { + final StringBuilder s = new StringBuilder("{"); + s.append(" \"nano\": ").append(nanoTime); + s.append(", \"thread\": ").append(threadId); + s.append(", \"observer\": \"").append(o.getClass().getName()).append("@").append(Integer.toHexString(o.hashCode())).append("\""); + s.append(", \"type\": \"").append(kind).append("\""); + if (notification != null) { + if (notification.hasValue()) + s.append(", \"value\": \"").append(notification.getValue()).append("\""); + if (notification.hasThrowable()) + s.append(", \"exception\": \"").append(notification.getThrowable().getMessage().replace("\\", "\\\\").replace("\"", "\\\"")).append("\""); + } + if (source != null) + s.append(", \"source\": \"").append(source.getClass().getName()).append("@").append(Integer.toHexString(source.hashCode())).append("\""); + if (from != null) + s.append(", \"from\": \"").append(from.getClass().getName()).append("@").append(Integer.toHexString(from.hashCode())).append("\""); + if (to != null) + s.append(", \"to\": \"").append(to.getClass().getName()).append("@").append(Integer.toHexString(to.hashCode())).append("\""); + s.append("}"); + return s.toString(); + } +} diff --git a/rxjava-contrib/rxjava-debug/src/test/java/rx/debug/DebugHookTest.java b/rxjava-contrib/rxjava-debug/src/test/java/rx/debug/DebugHookTest.java new file mode 100644 index 0000000000..e90de3c11f --- /dev/null +++ b/rxjava-contrib/rxjava-debug/src/test/java/rx/debug/DebugHookTest.java @@ -0,0 +1,130 @@ +package rx.debug; + +import static org.mockito.Matchers.*; +import static org.mockito.Mockito.*; + +import java.util.Arrays; + +import org.hamcrest.BaseMatcher; +import org.hamcrest.Description; +import org.junit.After; +import org.junit.Before; +import org.junit.Ignore; +import org.junit.Test; + +import rx.Notification; +import rx.Observable; +import rx.Observer; +import rx.plugins.DebugHook; +import rx.plugins.DebugNotification; +import rx.plugins.PlugReset; +import rx.plugins.RxJavaPlugins; +import rx.util.functions.Action1; +import rx.util.functions.Func1; + +public class DebugHookTest { + @Before + @After + public void reset() { + PlugReset.reset(); + } + + @Test + @Ignore + public void testSimple() { + Action1 events = mock(Action1.class); + final DebugHook hook = new DebugHook(null, events); + RxJavaPlugins.getInstance().registerObservableExecutionHook(hook); + Observable.empty().subscribe(); + verify(events, times(1)).call(subscribe()); + verify(events, times(1)).call(onCompleted()); + } + + @Test + public void testOneOp() { + Action1 events = mock(Action1.class); + final DebugHook hook = new DebugHook(null, events); + RxJavaPlugins.getInstance().registerObservableExecutionHook(hook); + Observable.from(Arrays.asList(1, 3)).flatMap(new Func1>() { + @Override + public Observable call(Integer it) { + return Observable.from(Arrays.asList(it, it + 1)); + } + }).take(3).subscribe(new Observer() { + @Override + public void onCompleted() { + } + + @Override + public void onError(Throwable e) { + } + + @Override + public void onNext(Integer t) { + } + }); + verify(events, times(6)).call(subscribe()); + verify(events, times(4)).call(onNext(1)); + // one less because it originates from the inner observable sent to merge + verify(events, times(3)).call(onNext(2)); + verify(events, times(4)).call(onNext(3)); + // because the take unsubscribes + verify(events, never()).call(onNext(4)); + } + + private static DebugNotification onNext(final T value) { + return argThat(new BaseMatcher>() { + @Override + public boolean matches(Object item) { + if (item instanceof DebugNotification) { + DebugNotification dn = (DebugNotification) item; + Notification n = dn.getNotification(); + return n != null && n.hasValue() && n.getValue().equals(value); + } + return false; + } + + @Override + public void describeTo(Description description) { + description.appendText("OnNext " + value); + } + }); + } + + private static DebugNotification subscribe() { + return argThat(new BaseMatcher() { + @Override + public boolean matches(Object item) { + if (item instanceof DebugNotification) { + DebugNotification dn = (DebugNotification) item; + return dn.getKind() == DebugNotification.Kind.Subscribe; + } + return false; + } + + @Override + public void describeTo(Description description) { + description.appendText("Subscribe"); + } + }); + } + + private static DebugNotification onCompleted() { + return argThat(new BaseMatcher() { + @Override + public boolean matches(Object item) { + if (item instanceof DebugNotification) { + DebugNotification dn = (DebugNotification) item; + Notification n = dn.getNotification(); + return n != null && n.isOnCompleted(); + } + return false; + } + + @Override + public void describeTo(Description description) { + description.appendText("onCompleted"); + } + }); + } +} diff --git a/rxjava-contrib/rxjava-debug/src/test/java/rx/plugins/PlugReset.java b/rxjava-contrib/rxjava-debug/src/test/java/rx/plugins/PlugReset.java new file mode 100644 index 0000000000..0292d1b5e3 --- /dev/null +++ b/rxjava-contrib/rxjava-debug/src/test/java/rx/plugins/PlugReset.java @@ -0,0 +1,7 @@ +package rx.plugins; + +public class PlugReset { + public static void reset() { + RxJavaPlugins.getInstance().reset(); + } +} diff --git a/rxjava-core/src/main/java/rx/Observable.java b/rxjava-core/src/main/java/rx/Observable.java index 542bedbff2..3794f33476 100644 --- a/rxjava-core/src/main/java/rx/Observable.java +++ b/rxjava-core/src/main/java/rx/Observable.java @@ -27,6 +27,7 @@ import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; +import rx.Observable.OnSubscribe; import rx.joins.Pattern2; import rx.joins.Plan0; import rx.observables.BlockingObservable; @@ -49,6 +50,7 @@ import rx.operators.OperationDematerialize; import rx.operators.OperationDistinct; import rx.operators.OperationDistinctUntilChanged; +import rx.operators.Operator; import rx.operators.OperatorDoOnEach; import rx.operators.OperationElementAt; import rx.operators.OperationFilter; @@ -98,6 +100,7 @@ import rx.operators.OperationWindow; import rx.operators.OperatorSubscribeOn; import rx.operators.OperatorZip; +import rx.operators.Operator; import rx.operators.OperatorCast; import rx.operators.OperatorFromIterable; import rx.operators.OperatorGroupBy; @@ -251,10 +254,9 @@ public static interface OnSubscribeFunc extends Function { */ public Observable lift(final Func1, Subscriber> bind) { return new Observable(new OnSubscribe() { - @Override public void call(Subscriber o) { - subscribe(bind.call(o)); + subscribe(hook.onLift((Operator) bind).call(o)); } }); } diff --git a/rxjava-core/src/main/java/rx/observers/SafeSubscriber.java b/rxjava-core/src/main/java/rx/observers/SafeSubscriber.java index a7ce075ef5..23255df38d 100644 --- a/rxjava-core/src/main/java/rx/observers/SafeSubscriber.java +++ b/rxjava-core/src/main/java/rx/observers/SafeSubscriber.java @@ -157,4 +157,7 @@ protected void _onError(Throwable e) { } } + public Subscriber getActual() { + return actual; + } } diff --git a/rxjava-core/src/main/java/rx/plugins/RxJavaObservableExecutionHook.java b/rxjava-core/src/main/java/rx/plugins/RxJavaObservableExecutionHook.java index c3aa2b44ad..da4500c494 100644 --- a/rxjava-core/src/main/java/rx/plugins/RxJavaObservableExecutionHook.java +++ b/rxjava-core/src/main/java/rx/plugins/RxJavaObservableExecutionHook.java @@ -20,6 +20,7 @@ import rx.Observable.OnSubscribeFunc; import rx.Subscriber; import rx.Subscription; +import rx.operators.Operator; import rx.util.functions.Func1; /** @@ -37,7 +38,6 @@ * * */ public abstract class RxJavaObservableExecutionHook { - /** * Invoked before {@link Observable#subscribe(rx.Subscriber)} is about to be executed. *

@@ -93,4 +93,15 @@ public Throwable onSubscribeError(Observable observableInstance return e; } + public OnSubscribe onCreate(OnSubscribe f) { + return f; + } + + public Operator onLift(final Operator bind) { + return bind; + } + + public Subscription onAdd(Subscriber subscriber, Subscription s) { + return s; + } } diff --git a/rxjava-core/src/main/java/rx/util/functions/Actions.java b/rxjava-core/src/main/java/rx/util/functions/Actions.java index 5d4d3656dc..7b233bb7fb 100644 --- a/rxjava-core/src/main/java/rx/util/functions/Actions.java +++ b/rxjava-core/src/main/java/rx/util/functions/Actions.java @@ -16,7 +16,6 @@ package rx.util.functions; import rx.Observer; -import rx.Subscriber; /** * Utility class for the Action interfaces. @@ -26,6 +25,58 @@ private Actions() { throw new IllegalStateException("No instances!"); } + public static final EmptyAction empty() { + return EMPTY_ACTION; + } + + private static final EmptyAction EMPTY_ACTION = new EmptyAction(); + + private static final class EmptyAction implements Action0, Action1, Action2, Action3, Action4, Action5, Action6, Action7, Action8, Action9, ActionN { + @Override + public void call() { + } + + @Override + public void call(Object t1) { + } + + @Override + public void call(Object t1, Object t2) { + } + + @Override + public void call(Object t1, Object t2, Object t3) { + } + + @Override + public void call(Object t1, Object t2, Object t3, Object t4) { + } + + @Override + public void call(Object t1, Object t2, Object t3, Object t4, Object t5) { + } + + @Override + public void call(Object t1, Object t2, Object t3, Object t4, Object t5, Object t6) { + } + + @Override + public void call(Object t1, Object t2, Object t3, Object t4, Object t5, Object t6, Object t7) { + } + + @Override + public void call(Object t1, Object t2, Object t3, Object t4, Object t5, Object t6, Object t7, Object t8) { + } + + @Override + public void call(Object t1, Object t2, Object t3, Object t4, Object t5, Object t6, Object t7, Object t8, Object t9) { + } + + @Override + public void call(Object... args) { + } + } + /** * Extracts a method reference to the observer's onNext method * in the form of an Action1. diff --git a/settings.gradle b/settings.gradle index 2122bb8ff6..a657ce5ba9 100644 --- a/settings.gradle +++ b/settings.gradle @@ -9,5 +9,6 @@ include 'rxjava-core', \ 'rxjava-contrib:rxjava-android', \ 'rxjava-contrib:rxjava-apache-http', \ 'rxjava-contrib:rxjava-string', \ +'rxjava-contrib:rxjava-debug', \ 'rxjava-contrib:rxjava-async-util', \ 'rxjava-contrib:rxjava-computation-expressions' From 80aa7c618eebc66bf156f96f602b84098ca1342e Mon Sep 17 00:00:00 2001 From: George Campbell Date: Mon, 10 Feb 2014 14:08:40 -0800 Subject: [PATCH 2/2] Updating StringObservable to use lift And added from(InputStream) and from(Reader) --- .../java/rx/observables/StringObservable.java | 177 +++++++++++++----- .../rx/observables/StringObservableTest.java | 73 ++++---- 2 files changed, 167 insertions(+), 83 deletions(-) diff --git a/rxjava-contrib/rxjava-string/src/main/java/rx/observables/StringObservable.java b/rxjava-contrib/rxjava-string/src/main/java/rx/observables/StringObservable.java index 1c7e1e2b1a..4d5ba0474d 100644 --- a/rxjava-contrib/rxjava-string/src/main/java/rx/observables/StringObservable.java +++ b/rxjava-contrib/rxjava-string/src/main/java/rx/observables/StringObservable.java @@ -1,12 +1,12 @@ /** * Copyright 2013 Netflix, Inc. - * + * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * + * + * http://www.apache.org/licenses/LICENSE-2.0 + * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -15,6 +15,9 @@ */ package rx.observables; +import java.io.IOException; +import java.io.InputStream; +import java.io.Reader; import java.nio.ByteBuffer; import java.nio.CharBuffer; import java.nio.charset.CharacterCodingException; @@ -27,15 +30,72 @@ import java.util.regex.Pattern; import rx.Observable; -import rx.Observable.OnSubscribeFunc; -import rx.Observer; -import rx.Subscription; +import rx.Observable.OnSubscribe; +import rx.Subscriber; +import rx.operators.Operator; import rx.util.functions.Func1; import rx.util.functions.Func2; public class StringObservable { + public static Observable from(final InputStream i) { + return from(i, 8 * 1024); + } + + public static Observable from(final InputStream i, final int size) { + return Observable.create(new OnSubscribe() { + @Override + public void call(Subscriber o) { + byte[] buffer = new byte[size]; + try { + if (o.isUnsubscribed()) + return; + int n = 0; + n = i.read(buffer); + while (n != -1 && !o.isUnsubscribed()) { + o.onNext(Arrays.copyOf(buffer, n)); + n = i.read(buffer); + } + } catch (IOException e) { + o.onError(e); + } + if (o.isUnsubscribed()) + return; + o.onCompleted(); + } + }); + } + + public static Observable from(final Reader i) { + return from(i, 8 * 1024); + } + + public static Observable from(final Reader i, final int size) { + return Observable.create(new OnSubscribe() { + @Override + public void call(Subscriber o) { + char[] buffer = new char[size]; + try { + if (o.isUnsubscribed()) + return; + int n = 0; + n = i.read(buffer); + while (n != -1 && !o.isUnsubscribed()) { + o.onNext(new String(buffer)); + n = i.read(buffer); + } + } catch (IOException e) { + o.onError(e); + } + if (o.isUnsubscribed()) + return; + o.onCompleted(); + } + }); + } + /** - * Decodes a stream the multibyte chunks into a stream of strings that works on infinite streams and where handles when a multibyte character spans two chunks. + * Decodes a stream the multibyte chunks into a stream of strings that works on infinite streams + * and where handles when a multibyte character spans two chunks. * * @param src * @param charsetName @@ -46,7 +106,8 @@ public static Observable decode(Observable src, String charsetNa } /** - * Decodes a stream the multibyte chunks into a stream of strings that works on infinite streams and where handles when a multibyte character spans two chunks. + * Decodes a stream the multibyte chunks into a stream of strings that works on infinite streams + * and where handles when a multibyte character spans two chunks. * * @param src * @param charset @@ -57,7 +118,8 @@ public static Observable decode(Observable src, Charset charset) } /** - * Decodes a stream the multibyte chunks into a stream of strings that works on infinite streams and where handles when a multibyte character spans two chunks. + * Decodes a stream the multibyte chunks into a stream of strings that works on infinite streams + * and where handles when a multibyte character spans two chunks. * This method allows for more control over how malformed and unmappable characters are handled. * * @param src @@ -65,22 +127,22 @@ public static Observable decode(Observable src, Charset charset) * @return */ public static Observable decode(final Observable src, final CharsetDecoder charsetDecoder) { - return Observable.create(new OnSubscribeFunc() { + return src.lift(new Operator() { @Override - public Subscription onSubscribe(final Observer observer) { - return src.subscribe(new Observer() { + public Subscriber call(final Subscriber o) { + return new Subscriber(o) { private ByteBuffer leftOver = null; @Override public void onCompleted() { if (process(null, leftOver, true)) - observer.onCompleted(); + o.onCompleted(); } @Override public void onError(Throwable e) { if (process(null, leftOver, true)) - observer.onError(e); + o.onError(e); } @Override @@ -120,7 +182,7 @@ public boolean process(byte[] next, ByteBuffer last, boolean endOfInput) { cr.throwException(); } catch (CharacterCodingException e) { - observer.onError(e); + o.onError(e); return false; } } @@ -134,11 +196,11 @@ public boolean process(byte[] next, ByteBuffer last, boolean endOfInput) { String string = cb.toString(); if (!string.isEmpty()) - observer.onNext(string); + o.onNext(string); return true; } - }); + }; } }); } @@ -190,13 +252,14 @@ public byte[] call(String str) { } /** - * Gather up all of the strings in to one string to be able to use it as one message. Don't use this on infinite streams. + * Gather up all of the strings in to one string to be able to use it as one message. Don't use + * this on infinite streams. * * @param src * @return */ public static Observable stringConcat(Observable src) { - return src.aggregate(new Func2() { + return src.reduce(new Func2() { @Override public String call(String a, String b) { return a + b; @@ -218,22 +281,25 @@ public String call(String a, String b) { */ public static Observable split(final Observable src, String regex) { final Pattern pattern = Pattern.compile(regex); - return Observable.create(new OnSubscribeFunc() { + + return src.lift(new Operator() { @Override - public Subscription onSubscribe(final Observer observer) { - return src.subscribe(new Observer() { + public Subscriber call(final Subscriber o) { + return new Subscriber(o) { private String leftOver = null; @Override public void onCompleted() { output(leftOver); - observer.onCompleted(); + if (!o.isUnsubscribed()) + o.onCompleted(); } @Override public void onError(Throwable e) { output(leftOver); - observer.onError(e); + if (!o.isUnsubscribed()) + o.onError(e); } @Override @@ -250,8 +316,10 @@ public void onNext(String segment) { } private int emptyPartCount = 0; + /** * when limit == 0 trailing empty parts are not emitted. + * * @param part */ private void output(String part) { @@ -259,15 +327,18 @@ private void output(String part) { emptyPartCount++; } else { - for(; emptyPartCount>0; emptyPartCount--) - observer.onNext(""); - observer.onNext(part); + for (; emptyPartCount > 0; emptyPartCount--) + if (!o.isUnsubscribed()) + o.onNext(""); + if (!o.isUnsubscribed()) + o.onNext(part); } } - }); + }; } }); } + /** * Concatenates the sequence of values by adding a separator * between them and emitting the result once the source completes. @@ -276,49 +347,55 @@ private void output(String part) { * {@link java.lang.String#valueOf(java.lang.Object)} calls. *

* For example: + * *

-     * Observable<Object> source = Observable.from("a", 1, "c");
-     * Observable<String> result = join(source, ", ");
+     * Observable<Object> source = Observable.from("a", 1, "c");
+     * Observable<String> result = join(source, ", ");
      * 
* * will yield a single element equal to "a, 1, c". * - * @param source the source sequence of CharSequence values - * @param separator the separator to a + * @param source + * the source sequence of CharSequence values + * @param separator + * the separator to a * @return an Observable which emits a single String value having the concatenated * values of the source observable with the separator between elements */ public static Observable join(final Observable source, final CharSequence separator) { - return Observable.create(new OnSubscribeFunc() { - + return source.lift(new Operator() { @Override - public Subscription onSubscribe(final Observer t1) { - return source.subscribe(new Observer() { + public Subscriber call(final Subscriber o) { + return new Subscriber(o) { boolean mayAddSeparator; StringBuilder b = new StringBuilder(); + @Override - public void onNext(T args) { - if (mayAddSeparator) { - b.append(separator); - } - mayAddSeparator = true; - b.append(String.valueOf(args)); + public void onCompleted() { + String str = b.toString(); + b = null; + if (!o.isUnsubscribed()) + o.onNext(str); + if (!o.isUnsubscribed()) + o.onCompleted(); } @Override public void onError(Throwable e) { b = null; - t1.onError(e); + if (!o.isUnsubscribed()) + o.onError(e); } @Override - public void onCompleted() { - String str = b.toString(); - b = null; - t1.onNext(str); - t1.onCompleted(); + public void onNext(Object t) { + if (mayAddSeparator) { + b.append(separator); + } + mayAddSeparator = true; + b.append(String.valueOf(t)); } - }); + }; } }); } diff --git a/rxjava-contrib/rxjava-string/src/test/java/rx/observables/StringObservableTest.java b/rxjava-contrib/rxjava-string/src/test/java/rx/observables/StringObservableTest.java index 8b45a44f9e..8b3577b601 100644 --- a/rxjava-contrib/rxjava-string/src/test/java/rx/observables/StringObservableTest.java +++ b/rxjava-contrib/rxjava-string/src/test/java/rx/observables/StringObservableTest.java @@ -1,12 +1,12 @@ /** * Copyright 2013 Netflix, Inc. - * + * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * + * + * http://www.apache.org/licenses/LICENSE-2.0 + * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -23,6 +23,7 @@ import java.nio.charset.Charset; import java.nio.charset.CharsetDecoder; import java.nio.charset.MalformedInputException; +import java.util.Arrays; import org.junit.Test; @@ -110,6 +111,7 @@ public void testEncode() { public void testSplitOnCollon() { testSplit("boo:and:foo", ":", 0, "boo", "and", "foo"); } + @Test public void testSplitOnOh() { testSplit("boo:and:foo", "o", 0, "b", "", ":and:f"); @@ -120,96 +122,101 @@ public void testSplit(String str, String regex, int limit, String... parts) { for (int i = 0; i < str.length(); i++) { String a = str.substring(0, i); String b = str.substring(i, str.length()); - testSplit(a+"|"+b, regex, limit, Observable.from(a, b), parts); + testSplit(a + "|" + b, regex, limit, Observable.from(a, b), parts); } } public void testSplit(String message, String regex, int limit, Observable src, String... parts) { Observable act = StringObservable.split(src, regex); Observable exp = Observable.from(parts); - AssertObservable.assertObservableEqualsBlocking("when input is "+message+" and limit = "+ limit, exp, act); + AssertObservable.assertObservableEqualsBlocking("when input is " + message + " and limit = " + limit, exp, act); } - + @Test public void testJoinMixed() { - Observable source = Observable.from("a", 1, "c"); - + Observable source = Observable. from(Arrays.asList("a", 1, "c")); + Observable result = StringObservable.join(source, ", "); - + Observer observer = mock(Observer.class); - + result.subscribe(new TestObserver(observer)); - + verify(observer, times(1)).onNext("a, 1, c"); verify(observer, times(1)).onCompleted(); verify(observer, never()).onError(any(Throwable.class)); } + @Test public void testJoinWithEmptyString() { Observable source = Observable.from("", "b", "c"); - + Observable result = StringObservable.join(source, ", "); - + Observer observer = mock(Observer.class); - + result.subscribe(new TestObserver(observer)); - + verify(observer, times(1)).onNext(", b, c"); verify(observer, times(1)).onCompleted(); verify(observer, never()).onError(any(Throwable.class)); } + @Test public void testJoinWithNull() { Observable source = Observable.from("a", null, "c"); - + Observable result = StringObservable.join(source, ", "); - + Observer observer = mock(Observer.class); - + result.subscribe(new TestObserver(observer)); - + verify(observer, times(1)).onNext("a, null, c"); verify(observer, times(1)).onCompleted(); verify(observer, never()).onError(any(Throwable.class)); } + @Test public void testJoinSingle() { Observable source = Observable.from("a"); - + Observable result = StringObservable.join(source, ", "); - + Observer observer = mock(Observer.class); - + result.subscribe(new TestObserver(observer)); - + verify(observer, times(1)).onNext("a"); verify(observer, times(1)).onCompleted(); verify(observer, never()).onError(any(Throwable.class)); } + @Test public void testJoinEmpty() { Observable source = Observable.empty(); - + Observable result = StringObservable.join(source, ", "); - + Observer observer = mock(Observer.class); - + result.subscribe(new TestObserver(observer)); - + verify(observer, times(1)).onNext(""); verify(observer, times(1)).onCompleted(); verify(observer, never()).onError(any(Throwable.class)); } + @Test public void testJoinThrows() { - Observable source = Observable.concat(Observable.just("a"), Observable.error(new RuntimeException("Forced failure"))); - + Observable source = Observable.concat(Observable.just("a"), Observable. error(new RuntimeException("Forced failure"))); + Observable result = StringObservable.join(source, ", "); - + Observer observer = mock(Observer.class); - + result.subscribe(new TestObserver(observer)); - + verify(observer, never()).onNext("a"); verify(observer, never()).onCompleted(); verify(observer, times(1)).onError(any(Throwable.class));