Skip to content

Commit

Permalink
1.x: make just() support backpressure
Browse files Browse the repository at this point in the history
  • Loading branch information
akarnokd committed Nov 5, 2015
1 parent e802bb7 commit d6579d2
Show file tree
Hide file tree
Showing 10 changed files with 635 additions and 196 deletions.
2 changes: 1 addition & 1 deletion src/main/java/rx/Observable.java
Original file line number Diff line number Diff line change
Expand Up @@ -8304,7 +8304,7 @@ public final Observable<T> subscribeOn(Scheduler scheduler) {
if (this instanceof ScalarSynchronousObservable) {
return ((ScalarSynchronousObservable<T>)this).scalarScheduleOn(scheduler);
}
return nest().lift(new OperatorSubscribeOn<T>(scheduler));
return create(new OperatorSubscribeOn<T>(this, scheduler));
}

/**
Expand Down
39 changes: 37 additions & 2 deletions src/main/java/rx/Single.java
Original file line number Diff line number Diff line change
Expand Up @@ -1679,8 +1679,43 @@ public void onNext(T t) {
* @see <a href="http://www.grahamlea.com/2014/07/rxjava-threading-examples/">RxJava Threading Examples</a>
* @see #observeOn
*/
public final Single<T> subscribeOn(Scheduler scheduler) {
return nest().lift(new OperatorSubscribeOn<T>(scheduler));
public final Single<T> subscribeOn(final Scheduler scheduler) {
return create(new OnSubscribe<T>() {
@Override
public void call(final SingleSubscriber<? super T> t) {
final Scheduler.Worker w = scheduler.createWorker();
t.add(w);

w.schedule(new Action0() {
@Override
public void call() {
SingleSubscriber<T> ssub = new SingleSubscriber<T>() {
@Override
public void onSuccess(T value) {
try {
t.onSuccess(value);
} finally {
w.unsubscribe();
}
}

@Override
public void onError(Throwable error) {
try {
t.onError(error);
} finally {
w.unsubscribe();
}
}
};

t.add(ssub);

Single.this.subscribe(ssub);
}
});
}
});
}

/**
Expand Down
122 changes: 55 additions & 67 deletions src/main/java/rx/internal/operators/OperatorSubscribeOn.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,96 +15,84 @@
*/
package rx.internal.operators;

import rx.Observable;
import rx.Observable.Operator;
import rx.Producer;
import rx.Scheduler;
import rx.*;
import rx.Observable.OnSubscribe;
import rx.Scheduler.Worker;
import rx.Subscriber;
import rx.functions.Action0;

/**
* Subscribes Observers on the specified {@code Scheduler}.
* <p>
* <img width="640" src="https://github.com/ReactiveX/RxJava/wiki/images/rx-operators/subscribeOn.png" alt="">
*
* @param <T> the value type of the actual source
*/
public class OperatorSubscribeOn<T> implements Operator<T, Observable<T>> {
public class OperatorSubscribeOn<T> implements OnSubscribe<T> {

private final Scheduler scheduler;
final Scheduler scheduler;
final Observable<T> source;

public OperatorSubscribeOn(Scheduler scheduler) {
public OperatorSubscribeOn(Observable<T> source, Scheduler scheduler) {
this.scheduler = scheduler;
this.source = source;
}

@Override
public Subscriber<? super Observable<T>> call(final Subscriber<? super T> subscriber) {
public void call(final Subscriber<? super T> subscriber) {
final Worker inner = scheduler.createWorker();
subscriber.add(inner);
return new Subscriber<Observable<T>>(subscriber) {

@Override
public void onCompleted() {
// ignore because this is a nested Observable and we expect only 1 Observable<T> emitted to onNext
}

@Override
public void onError(Throwable e) {
subscriber.onError(e);
}


inner.schedule(new Action0() {
@Override
public void onNext(final Observable<T> o) {
inner.schedule(new Action0() {

public void call() {
final Thread t = Thread.currentThread();

Subscriber<T> s = new Subscriber<T>(subscriber) {
@Override
public void call() {
final Thread t = Thread.currentThread();
o.unsafeSubscribe(new Subscriber<T>(subscriber) {

@Override
public void onCompleted() {
subscriber.onCompleted();
}

@Override
public void onError(Throwable e) {
subscriber.onError(e);
}

@Override
public void onNext(T t) {
subscriber.onNext(t);
}

public void onNext(T t) {
subscriber.onNext(t);
}

@Override
public void onError(Throwable e) {
try {
subscriber.onError(e);
} finally {
inner.unsubscribe();
}
}

@Override
public void onCompleted() {
try {
subscriber.onCompleted();
} finally {
inner.unsubscribe();
}
}

@Override
public void setProducer(final Producer p) {
subscriber.setProducer(new Producer() {
@Override
public void setProducer(final Producer producer) {
subscriber.setProducer(new Producer() {

@Override
public void request(final long n) {
if (Thread.currentThread() == t) {
// don't schedule if we're already on the thread (primarily for first setProducer call)
// see unit test 'testSetProducerSynchronousRequest' for more context on this
producer.request(n);
} else {
inner.schedule(new Action0() {

@Override
public void call() {
producer.request(n);
}
});
public void request(final long n) {
if (t == Thread.currentThread()) {
p.request(n);
} else {
inner.schedule(new Action0() {
@Override
public void call() {
p.request(n);
}
}

});
});
}
}

});
}
});
};

source.unsafeSubscribe(s);
}

};
});
}
}
96 changes: 63 additions & 33 deletions src/main/java/rx/internal/operators/OperatorTimeoutBase.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,16 +16,11 @@
package rx.internal.operators;

import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import java.util.concurrent.atomic.AtomicLongFieldUpdater;

import rx.Observable;
import rx.*;
import rx.Observable.Operator;
import rx.Scheduler;
import rx.Subscriber;
import rx.Subscription;
import rx.functions.Func3;
import rx.functions.Func4;
import rx.functions.*;
import rx.internal.producers.ProducerArbiter;
import rx.observers.SerializedSubscriber;
import rx.subscriptions.SerialSubscription;

Expand Down Expand Up @@ -65,74 +60,86 @@ class OperatorTimeoutBase<T> implements Operator<T, T> {
public Subscriber<? super T> call(Subscriber<? super T> subscriber) {
Scheduler.Worker inner = scheduler.createWorker();
subscriber.add(inner);
final SerialSubscription serial = new SerialSubscription();
subscriber.add(serial);
// Use SynchronizedSubscriber for safe memory access
// as the subscriber will be accessed in the current thread or the
// scheduler or other Observables.
final SerializedSubscriber<T> synchronizedSubscriber = new SerializedSubscriber<T>(subscriber);

final SerialSubscription serial = new SerialSubscription();
synchronizedSubscriber.add(serial);

TimeoutSubscriber<T> timeoutSubscriber = new TimeoutSubscriber<T>(synchronizedSubscriber, timeoutStub, serial, other, inner);

synchronizedSubscriber.add(timeoutSubscriber);
synchronizedSubscriber.setProducer(timeoutSubscriber.arbiter);

serial.set(firstTimeoutStub.call(timeoutSubscriber, 0L, inner));

return timeoutSubscriber;
}

/* package-private */static final class TimeoutSubscriber<T> extends
Subscriber<T> {

private final SerialSubscription serial;
private final Object gate = new Object();

private final SerializedSubscriber<T> serializedSubscriber;

private final TimeoutStub<T> timeoutStub;

private final Observable<? extends T> other;

private final Scheduler.Worker inner;

volatile int terminated;
volatile long actual;
final ProducerArbiter arbiter;

/** Guarded by this. */
boolean terminated;
/** Guarded by this. */
long actual;

@SuppressWarnings("rawtypes")
static final AtomicIntegerFieldUpdater<TimeoutSubscriber> TERMINATED_UPDATER
= AtomicIntegerFieldUpdater.newUpdater(TimeoutSubscriber.class, "terminated");
@SuppressWarnings("rawtypes")
static final AtomicLongFieldUpdater<TimeoutSubscriber> ACTUAL_UPDATER
= AtomicLongFieldUpdater.newUpdater(TimeoutSubscriber.class, "actual");

private TimeoutSubscriber(
SerializedSubscriber<T> serializedSubscriber,
TimeoutStub<T> timeoutStub, SerialSubscription serial,
Observable<? extends T> other,
Scheduler.Worker inner) {
super(serializedSubscriber);
this.serializedSubscriber = serializedSubscriber;
this.timeoutStub = timeoutStub;
this.serial = serial;
this.other = other;
this.inner = inner;
this.arbiter = new ProducerArbiter();
}

@Override
public void setProducer(Producer p) {
arbiter.setProducer(p);
}

@Override
public void onNext(T value) {
boolean onNextWins = false;
synchronized (gate) {
if (terminated == 0) {
ACTUAL_UPDATER.incrementAndGet(this);
long a;
synchronized (this) {
if (!terminated) {
a = ++actual;
onNextWins = true;
} else {
a = actual;
}
}
if (onNextWins) {
serializedSubscriber.onNext(value);
serial.set(timeoutStub.call(this, actual, value, inner));
serial.set(timeoutStub.call(this, a, value, inner));
}
}

@Override
public void onError(Throwable error) {
boolean onErrorWins = false;
synchronized (gate) {
if (TERMINATED_UPDATER.getAndSet(this, 1) == 0) {
synchronized (this) {
if (!terminated) {
terminated = true;
onErrorWins = true;
}
}
Expand All @@ -145,8 +152,9 @@ public void onError(Throwable error) {
@Override
public void onCompleted() {
boolean onCompletedWins = false;
synchronized (gate) {
if (TERMINATED_UPDATER.getAndSet(this, 1) == 0) {
synchronized (this) {
if (!terminated) {
terminated = true;
onCompletedWins = true;
}
}
Expand All @@ -159,17 +167,39 @@ public void onCompleted() {
public void onTimeout(long seqId) {
long expected = seqId;
boolean timeoutWins = false;
synchronized (gate) {
if (expected == actual && TERMINATED_UPDATER.getAndSet(this, 1) == 0) {
synchronized (this) {
if (expected == actual && !terminated) {
terminated = true;
timeoutWins = true;
}
}
if (timeoutWins) {
if (other == null) {
serializedSubscriber.onError(new TimeoutException());
} else {
other.unsafeSubscribe(serializedSubscriber);
serial.set(serializedSubscriber);
Subscriber<T> second = new Subscriber<T>() {
@Override
public void onNext(T t) {
serializedSubscriber.onNext(t);
}

@Override
public void onError(Throwable e) {
serializedSubscriber.onError(e);
}

@Override
public void onCompleted() {
serializedSubscriber.onCompleted();
}

@Override
public void setProducer(Producer p) {
arbiter.setProducer(p);
}
};
other.unsafeSubscribe(second);
serial.set(second);
}
}
}
Expand Down
Loading

0 comments on commit d6579d2

Please sign in to comment.