diff --git a/rxjava-core/src/main/java/rx/operators/ScheduledObserver.java b/rxjava-core/src/main/java/rx/operators/ScheduledObserver.java index c491760a1b..86c1fecb2e 100644 --- a/rxjava-core/src/main/java/rx/operators/ScheduledObserver.java +++ b/rxjava-core/src/main/java/rx/operators/ScheduledObserver.java @@ -15,16 +15,21 @@ */ package rx.operators; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.atomic.AtomicInteger; + import rx.Notification; import rx.Observer; import rx.Scheduler; -import rx.concurrency.Schedulers; import rx.util.functions.Action0; /* package */class ScheduledObserver implements Observer { private final Observer underlying; private final Scheduler scheduler; + private final ConcurrentLinkedQueue> queue = new ConcurrentLinkedQueue>(); + private final AtomicInteger counter = new AtomicInteger(0); + public ScheduledObserver(Observer underlying, Scheduler scheduler) { this.underlying = underlying; this.scheduler = scheduler; @@ -41,38 +46,50 @@ public void onError(final Exception e) { } @Override - public void onNext(final T v) { - enqueue(new Notification(v)); + public void onNext(final T args) { + enqueue(new Notification(args)); } - private void enqueue(final Notification notification) { + private void enqueue(Notification notification) { + // this must happen before 'counter' is used to provide synchronization between threads + queue.offer(notification); - Schedulers.currentThread().schedule(new Action0() { + // we now use counter to atomically determine if we need to start processing or not + // it will be 0 if it's the first notification or the scheduler has finished processing work + // and we need to start doing it again + if (counter.getAndIncrement() == 0) { + processQueue(); + } + } + + private void processQueue() { + scheduler.schedule(new Action0() { @Override public void call() { + Notification not = queue.poll(); - scheduler.schedule(new Action0() { - @Override - public void call() { - switch (notification.getKind()) { - case OnNext: - underlying.onNext(notification.getValue()); - break; - case OnError: - underlying.onError(notification.getException()); - break; - case OnCompleted: - underlying.onCompleted(); - break; - default: - throw new IllegalStateException("Unknown kind of notification " + notification); + switch (not.getKind()) { + case OnNext: + underlying.onNext(not.getValue()); + break; + case OnError: + underlying.onError(not.getException()); + break; + case OnCompleted: + underlying.onCompleted(); + break; + default: + throw new IllegalStateException("Unknown kind of notification " + not); - } - } - }); - } + } - }); - }; + // decrement count and if we still have work to do + // recursively schedule ourselves to process again + if (counter.decrementAndGet() > 0) { + scheduler.schedule(this); + } + } + }); + } }