Skip to content

Commit

Permalink
Window operators now support backpressure in the inner observable.
Browse files Browse the repository at this point in the history
  • Loading branch information
akarnokd authored and akarnokd committed Aug 12, 2015
1 parent 2e44d56 commit d4bc4ef
Show file tree
Hide file tree
Showing 11 changed files with 1,058 additions and 21 deletions.
348 changes: 348 additions & 0 deletions src/main/java/rx/internal/operators/BufferUntilSubscriberV2.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,348 @@
/**
* Copyright 2014 Netflix, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package rx.internal.operators;

import java.util.Queue;
import java.util.concurrent.atomic.*;

import rx.*;
import rx.exceptions.*;
import rx.functions.*;
import rx.internal.util.atomic.*;
import rx.internal.util.unsafe.*;
import rx.subjects.Subject;
import rx.subscriptions.Subscriptions;

/**
* A Subject variant which buffers events until a single Subscriber arrives and replays them to it
* and potentially switches to direct delivery once the Subscriber caught up and requested an unlimited
* amount. In this case, the buffered values are no longer retained. If the Subscriber
* requests a limited amount, queueing is involved and only those values are retained which
* weren't requested by the Subscriber at that time.
*/
public final class BufferUntilSubscriberV2<T> extends Subject<T, T> {

/**
* Constructs an empty BufferUntilSubscriber instance with the default capacity hint of 16 elements.
*
* @return the created BufferUntilSubscriber instance
*/
public static <T> BufferUntilSubscriberV2<T> create() {
return create(16);
}
/**
* Constructs an empty BufferUntilSubscriber instance with a capacity hint.
* <p>The capacity hint determines the internal queue's island size: the larger
* it is the less frequent allocation will happen if there is no subscriber
* or the subscriber hasn't caught up.
* @param capacityHint the capacity hint for the internal queue
* @return the created BufferUntilSubscriber instance
*/
public static <T> BufferUntilSubscriberV2<T> create(int capacityHint) {
State<T> state = new State<T>(capacityHint);
OnSubscribeBUS<T> onSubscribe = new OnSubscribeBUS<T>(state);
return new BufferUntilSubscriberV2<T>(onSubscribe, state);
}

final State<T> state;

private BufferUntilSubscriberV2(OnSubscribe<T> onSubscribe, State<T> state) {
super(onSubscribe);
this.state = state;
}

@Override
public void onNext(T t) {
state.onNext(t);
}

@Override
public void onError(Throwable e) {
state.onError(e);
}

@Override
public void onCompleted() {
state.onCompleted();
}

@Override
public boolean hasObservers() {
return state.subscriber.get() != null;
}

/**
* The single-consumption replaying state.
*
* @param <T> the value type
*/
static final class State<T> extends AtomicLong implements Producer, Observer<T>, Action0 {
/** */
private static final long serialVersionUID = -9044104859202255786L;
/** The single subscriber. */
final AtomicReference<Subscriber<? super T>> subscriber;
/** The queue holding values until the subscriber arrives and catches up. */
final Queue<Object> queue;
/** JCTools queues don't accept nulls. */
final NotificationLite<T> nl;
/** In case the source emitted an error. */
Throwable error;
/** Indicates the source has terminated. */
volatile boolean done;
/** Emitter loop: emitting indicator. Guarded by this. */
boolean emitting;
/** Emitter loop: missed emission indicator. Guarded by this. */
boolean missed;
/** Indicates the queue can be bypassed because the child has caught up with the replay. */
volatile boolean caughtUp;
/**
* Constructor.
* @param capacityHint indicates how large each island in the Spsc queue should be to
* reduce allocation frequency
*/
public State(int capacityHint) {
this.nl = NotificationLite.instance();
this.subscriber = new AtomicReference<Subscriber<? super T>>();
Queue<Object> q;
if (capacityHint > 1) {
q = UnsafeAccess.isUnsafeAvailable()
? new SpscUnboundedArrayQueue<Object>(capacityHint)
: new SpscUnboundedAtomicArrayQueue<Object>(capacityHint);
} else {
q = UnsafeAccess.isUnsafeAvailable()
? new SpscLinkedQueue<Object>()
: new SpscLinkedAtomicQueue<Object>();
}
this.queue = q;
}

@Override
public void onNext(T t) {
if (!done) {
if (!caughtUp) {
boolean stillReplay = false;
/*
* We need to offer while holding the lock because
* we have to atomically switch caughtUp to true
* that can only happen if there isn't any concurrent
* offer() happening while the emission is in replayLoop().
*/
synchronized (this) {
if (!caughtUp) {
queue.offer(nl.next(t));
stillReplay = true;
}
}
if (stillReplay) {
replay();
return;
}
}
Subscriber<? super T> s = subscriber.get();
try {
s.onNext(t);
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
s.onError(OnErrorThrowable.addValueAsLastCause(ex, t));
}
}
}
@Override
public void onError(Throwable e) {
if (!done) {
error = e;
done = true;
if (!caughtUp) {
boolean stillReplay = false;
synchronized (this) {
stillReplay = !caughtUp;
}
if (stillReplay) {
replay();
return;
}
}
subscriber.get().onError(e);
}
}
@Override
public void onCompleted() {
if (!done) {
done = true;
if (!caughtUp) {
boolean stillReplay = false;
synchronized (this) {
stillReplay = !caughtUp;
}
if (stillReplay) {
replay();
return;
}
}
subscriber.get().onCompleted();
}
}

@Override
public void request(long n) {
if (n < 0L) {
throw new IllegalArgumentException("n >= 0 required");
} else
if (n > 0L) {
BackpressureUtils.getAndAddRequest(this, n);
replay();
} else
if (done) { // terminal events can be delivered for zero requests
replay();
}
}
/**
* Tries to set the given subscriber if not already set, sending an
* IllegalStateException to the subscriber otherwise.
* @param subscriber
*/
public void setSubscriber(Subscriber<? super T> subscriber) {
if (this.subscriber.compareAndSet(null, subscriber)) {
subscriber.add(Subscriptions.create(this));
subscriber.setProducer(this);
} else {
subscriber.onError(new IllegalStateException("Only a single subscriber is allowed"));
}
}
/**
* Tries to replay the contents of the queue.
*/
void replay() {
synchronized (this) {
if (emitting) {
missed = true;
return;
}
emitting = true;
}
Queue<Object> q = queue;
for (;;) {
Subscriber<? super T> s = subscriber.get();
boolean unlimited = false;
if (s != null) {
boolean d = done;
boolean empty = q.isEmpty();

if (checkTerminated(d, empty, s)) {
return;
}
long r = get();
unlimited = r == Long.MAX_VALUE;
long e = 0L;

while (r != 0) {
d = done;
Object v = q.poll();
empty = v == null;
if (checkTerminated(d, empty, s)) {
return;
}
if (empty) {
break;
}
T value = nl.getValue(v);
try {
s.onNext(value);
} catch (Throwable ex) {
q.clear();
Exceptions.throwIfFatal(ex);
s.onError(OnErrorThrowable.addValueAsLastCause(ex, value));
return;
}
r--;
e++;
}
if (!unlimited && e != 0L) {
addAndGet(-e);
}
}

synchronized (this) {
if (!missed) {
if (unlimited && q.isEmpty()) {
caughtUp = true;
}
emitting = false;
return;
}
missed = false;
}
}
}
/**
* Terminates the state by setting the done flag and tries to clear the queue.
* Should be called only when the child unsubscribes
*/
@Override
public void call() {
done = true;
synchronized (this) {
if (emitting) {
return;
}
emitting = true;
}
queue.clear();
}
/**
* Checks if one of the terminal conditions have been met: child unsubscribed,
* an error happened or the source terminated and the queue is empty
* @param done
* @param empty
* @param s
* @return
*/
boolean checkTerminated(boolean done, boolean empty, Subscriber<? super T> s) {
if (s.isUnsubscribed()) {
queue.clear();
return true;
}
if (done) {
Throwable e = error;
if (e != null) {
queue.clear();
s.onError(e);
return true;
} else
if (empty) {
s.onCompleted();
return true;
}
}
return false;
}
}
/**
* The OnSubscribe implementation of the BufferUntilSubscriber.
*
* @param <T> the value type
*/
static final class OnSubscribeBUS<T> implements OnSubscribe<T> {
final State<T> state;
public OnSubscribeBUS(State<T> state) {
this.state = state;
}
@Override
public void call(Subscriber<? super T> child) {
state.setSubscriber(child);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,7 @@ void replaceSubject() {
child.onNext(producer);
}
void createNewWindow() {
BufferUntilSubscriber<T> bus = BufferUntilSubscriber.create();
BufferUntilSubscriberV2<T> bus = BufferUntilSubscriberV2.create();
consumer = bus;
producer = bus;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,7 @@ void replaceSubject() {
child.onNext(producer);
}
void createNewWindow() {
BufferUntilSubscriber<T> bus = BufferUntilSubscriber.create();
BufferUntilSubscriberV2<T> bus = BufferUntilSubscriberV2.create();
consumer = bus;
producer = bus;
Observable<? extends U> other;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ public Subscriber<? super T> call(Subscriber<? super Observable<T>> child) {
final class ExactSubscriber extends Subscriber<T> {
final Subscriber<? super Observable<T>> child;
int count;
BufferUntilSubscriber<T> window;
BufferUntilSubscriberV2<T> window;
volatile boolean noWindow = true;
public ExactSubscriber(Subscriber<? super Observable<T>> child) {
/**
Expand Down Expand Up @@ -107,7 +107,7 @@ void requestMore(long n) {
public void onNext(T t) {
if (window == null) {
noWindow = false;
window = BufferUntilSubscriber.create();
window = BufferUntilSubscriberV2.create();
child.onNext(window);
}
window.onNext(t);
Expand Down Expand Up @@ -242,7 +242,7 @@ public void onCompleted() {
}

CountedSubject<T> createCountedSubject() {
final BufferUntilSubscriber<T> bus = BufferUntilSubscriber.create();
final BufferUntilSubscriberV2<T> bus = BufferUntilSubscriberV2.create();
return new CountedSubject<T>(bus, bus);
}
}
Expand Down
Loading

0 comments on commit d4bc4ef

Please sign in to comment.