From dd5544b024df4e569f7cae1f030e5ac2c1d42c47 Mon Sep 17 00:00:00 2001 From: akarnokd Date: Fri, 6 Dec 2013 09:51:25 +0100 Subject: [PATCH] ObserveOn fix for observing the same source on the same scheduler by two different observers. --- .../java/rx/operators/OperationObserveOn.java | 97 +++++++++---------- .../rx/operators/OperationObserveOnTest.java | 74 ++++++++++++++ 2 files changed, 119 insertions(+), 52 deletions(-) diff --git a/rxjava-core/src/main/java/rx/operators/OperationObserveOn.java b/rxjava-core/src/main/java/rx/operators/OperationObserveOn.java index fbfda6ef1b..22876a4ced 100644 --- a/rxjava-core/src/main/java/rx/operators/OperationObserveOn.java +++ b/rxjava-core/src/main/java/rx/operators/OperationObserveOn.java @@ -27,6 +27,7 @@ import rx.concurrency.CurrentThreadScheduler; import rx.concurrency.ImmediateScheduler; import rx.subscriptions.CompositeSubscription; +import rx.subscriptions.SerialSubscription; import rx.subscriptions.Subscriptions; import rx.util.functions.Action0; import rx.util.functions.Action1; @@ -46,10 +47,6 @@ public static OnSubscribeFunc observeOn(Observable source, S private static class ObserveOn implements OnSubscribeFunc { private final Observable source; private final Scheduler scheduler; - private volatile Scheduler recursiveScheduler; - - final ConcurrentLinkedQueue> queue = new ConcurrentLinkedQueue>(); - final AtomicInteger counter = new AtomicInteger(0); public ObserveOn(Observable source, Scheduler scheduler) { this.source = source; @@ -65,71 +62,67 @@ public Subscription onSubscribe(final Observer observer) { // do nothing if we request CurrentThreadScheduler so we don't invoke overhead return source.subscribe(observer); } else { - return observeOn(observer, scheduler); + return new Observation(observer).init(); } } + /** Observe through individual queue per observer. */ + private class Observation implements Action1> { + final Observer observer; + final CompositeSubscription s; + final ConcurrentLinkedQueue> queue; + final AtomicInteger counter; + private volatile Scheduler recursiveScheduler; + public Observation(Observer observer) { + this.observer = observer; + this.queue = new ConcurrentLinkedQueue>(); + this.counter = new AtomicInteger(0); + this.s = new CompositeSubscription(); + } + public Subscription init() { + s.add(source.materialize().subscribe(this)); + return s; + } - public Subscription observeOn(final Observer observer, final Scheduler scheduler) { - final CompositeSubscription s = new CompositeSubscription(); - - s.add(source.materialize().subscribe(new Action1>() { - - @Override - public void call(Notification e) { - // this must happen before 'counter' is used to provide synchronization between threads - queue.offer(e); - - // we now use counter to atomically determine if we need to start processing or not - // it will be 0 if it's the first notification or the scheduler has finished processing work - // and we need to start doing it again - if (counter.getAndIncrement() == 0) { - if (recursiveScheduler == null) { - s.add(scheduler.schedule(null, new Func2() { - + @Override + public void call(Notification e) { + queue.offer(e); + if (counter.getAndIncrement() == 0) { + if (recursiveScheduler == null) { + s.add(scheduler.schedule(null, new Func2() { @Override public Subscription call(Scheduler innerScheduler, T state) { // record innerScheduler so 'processQueue' can use it for all subsequent executions recursiveScheduler = innerScheduler; - processQueue(s, observer); + processQueue(); return Subscriptions.empty(); } })); - } else { - processQueue(s, observer); - } + } else { + processQueue(); } - } - })); - - return s; - } - - /** - * This uses 'recursiveScheduler' NOT 'scheduler' as it should reuse the same scheduler each time it processes. - * This means it must first get the recursiveScheduler when it first executes. - */ - private void processQueue(final CompositeSubscription s, final Observer observer) { + } + void processQueue() { + s.add(recursiveScheduler.schedule(new Action1() { + @Override + public void call(Action0 self) { + Notification not = queue.poll(); + if (not != null) { + not.accept(observer); + } - s.add(recursiveScheduler.schedule(new Action1() { - @Override - public void call(Action0 self) { - Notification not = queue.poll(); - if (not != null) { - not.accept(observer); - } + // decrement count and if we still have work to do + // recursively schedule ourselves to process again + if (counter.decrementAndGet() > 0) { + self.call(); + } - // decrement count and if we still have work to do - // recursively schedule ourselves to process again - if (counter.decrementAndGet() > 0) { - self.call(); } - - } - })); + })); + } } } -} +} \ No newline at end of file diff --git a/rxjava-core/src/test/java/rx/operators/OperationObserveOnTest.java b/rxjava-core/src/test/java/rx/operators/OperationObserveOnTest.java index 15f1a44fb3..63fa0e4f27 100644 --- a/rxjava-core/src/test/java/rx/operators/OperationObserveOnTest.java +++ b/rxjava-core/src/test/java/rx/operators/OperationObserveOnTest.java @@ -31,6 +31,7 @@ import rx.Observable; import rx.Observer; import rx.concurrency.Schedulers; +import rx.concurrency.TestScheduler; import rx.util.functions.Action1; public class OperationObserveOnTest { @@ -132,4 +133,77 @@ public void call(String t1) { inOrder.verify(observer, times(1)).onCompleted(); } + @Test + public void observeOnTheSameSchedulerTwice() { + TestScheduler scheduler = new TestScheduler(); + + Observable o = Observable.from(1, 2, 3); + Observable o2 = o.observeOn(scheduler); + + @SuppressWarnings("unchecked") + Observer observer1 = mock(Observer.class); + @SuppressWarnings("unchecked") + Observer observer2 = mock(Observer.class); + + InOrder inOrder1 = inOrder(observer1); + InOrder inOrder2 = inOrder(observer2); + + o2.subscribe(observer1); + o2.subscribe(observer2); + + scheduler.advanceTimeBy(1, TimeUnit.SECONDS); + + inOrder1.verify(observer1, times(1)).onNext(1); + inOrder1.verify(observer1, times(1)).onNext(2); + inOrder1.verify(observer1, times(1)).onNext(3); + inOrder1.verify(observer1, times(1)).onCompleted(); + verify(observer1, never()).onError(any(Throwable.class)); + inOrder1.verifyNoMoreInteractions(); + + inOrder2.verify(observer2, times(1)).onNext(1); + inOrder2.verify(observer2, times(1)).onNext(2); + inOrder2.verify(observer2, times(1)).onNext(3); + inOrder2.verify(observer2, times(1)).onCompleted(); + verify(observer2, never()).onError(any(Throwable.class)); + inOrder2.verifyNoMoreInteractions(); + + } + @Test + public void observeSameOnMultipleSchedulers() { + TestScheduler scheduler1 = new TestScheduler(); + TestScheduler scheduler2 = new TestScheduler(); + + Observable o = Observable.from(1, 2, 3); + Observable o1 = o.observeOn(scheduler1); + Observable o2 = o.observeOn(scheduler2); + + @SuppressWarnings("unchecked") + Observer observer1 = mock(Observer.class); + @SuppressWarnings("unchecked") + Observer observer2 = mock(Observer.class); + + InOrder inOrder1 = inOrder(observer1); + InOrder inOrder2 = inOrder(observer2); + + o1.subscribe(observer1); + o2.subscribe(observer2); + + scheduler1.advanceTimeBy(1, TimeUnit.SECONDS); + scheduler2.advanceTimeBy(1, TimeUnit.SECONDS); + + inOrder1.verify(observer1, times(1)).onNext(1); + inOrder1.verify(observer1, times(1)).onNext(2); + inOrder1.verify(observer1, times(1)).onNext(3); + inOrder1.verify(observer1, times(1)).onCompleted(); + verify(observer1, never()).onError(any(Throwable.class)); + inOrder1.verifyNoMoreInteractions(); + + inOrder2.verify(observer2, times(1)).onNext(1); + inOrder2.verify(observer2, times(1)).onNext(2); + inOrder2.verify(observer2, times(1)).onNext(3); + inOrder2.verify(observer2, times(1)).onCompleted(); + verify(observer2, never()).onError(any(Throwable.class)); + inOrder2.verifyNoMoreInteractions(); + + } }