Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Behavior subject time gap fix 2 #1185

Merged
merged 6 commits into from
May 20, 2014
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
PublishSubject to match BehaviorSubject
akarnokd authored and akarnokd committed May 9, 2014

Verified

This commit was signed with the committer’s verified signature. The key has expired.
zalegrala Zach Leslie
commit d27b8c7791b6d76474b8d9cda42a7e1ecc7f03f6
2 changes: 1 addition & 1 deletion rxjava-core/src/main/java/rx/subjects/BehaviorSubject.java
Original file line number Diff line number Diff line change
@@ -274,7 +274,7 @@ public void onNext(T v) {
return state.observers.get().observers.length;
}

private static final class BehaviorObserver<T> {
static final class BehaviorObserver<T> {
final Observer<? super T> actual;
final NotificationLite<T> nl = NotificationLite.instance();
/** Guarded by this. */
90 changes: 23 additions & 67 deletions rxjava-core/src/main/java/rx/subjects/PublishSubject.java
Original file line number Diff line number Diff line change
@@ -15,14 +15,10 @@
*/
package rx.subjects;

import java.util.Collection;
import java.util.concurrent.atomic.AtomicReference;

import rx.Notification;
import rx.Observer;
import rx.functions.Action0;
import rx.functions.Action1;
import rx.subjects.SubjectSubscriptionManager.SubjectObserver;
import rx.operators.NotificationLite;
import rx.subjects.BehaviorSubject.BehaviorOnSubscribe;
import rx.subjects.BehaviorSubject.State;

/**
* Subject that, once and {@link Observer} has subscribed, publishes all subsequent events to the subscriber.
@@ -50,85 +46,45 @@
public final class PublishSubject<T> extends Subject<T, T> {

public static <T> PublishSubject<T> create() {
final SubjectSubscriptionManager<T> subscriptionManager = new SubjectSubscriptionManager<T>();
// set a default value so subscriptions will immediately receive this until a new notification is received
final AtomicReference<Notification<T>> lastNotification = new AtomicReference<Notification<T>>();

OnSubscribe<T> onSubscribe = subscriptionManager.getOnSubscribeFunc(
/**
* This function executes at beginning of subscription.
*
* This will always run, even if Subject is in terminal state.
*/
new Action1<SubjectObserver<? super T>>() {

@Override
public void call(SubjectObserver<? super T> o) {
// nothing onSubscribe unless in terminal state which is the next function
}
},
/**
* This function executes if the Subject is terminated before subscription occurs.
*/
new Action1<SubjectObserver<? super T>>() {

@Override
public void call(SubjectObserver<? super T> o) {
/*
* If we are already terminated, or termination happens while trying to subscribe
* this will be invoked and we emit whatever the last terminal value was.
*/
lastNotification.get().accept(o);
}
}, null);

return new PublishSubject<T>(onSubscribe, subscriptionManager, lastNotification);
State<T> state = new State<T>();
return new PublishSubject<T>(new BehaviorOnSubscribe<T>(state), state);
}

private final SubjectSubscriptionManager<T> subscriptionManager;
final AtomicReference<Notification<T>> lastNotification;

protected PublishSubject(OnSubscribe<T> onSubscribe, SubjectSubscriptionManager<T> subscriptionManager, AtomicReference<Notification<T>> lastNotification) {
final State<T> state;
private final NotificationLite<T> nl = NotificationLite.instance();
protected PublishSubject(OnSubscribe<T> onSubscribe, State<T> state) {
super(onSubscribe);
this.subscriptionManager = subscriptionManager;
this.lastNotification = lastNotification;
this.state = state;
}

@Override
public void onCompleted() {
Collection<SubjectObserver<? super T>> observers = subscriptionManager.terminate(new Action0() {
@Override
public void call() {
lastNotification.set(Notification.<T> createOnCompleted());
}
});
if (observers != null) {
for (Observer<? super T> o : observers) {
o.onCompleted();
Object last = state.get();
if (last == null || state.active) {
Object n = nl.completed();
for (BehaviorSubject.BehaviorObserver<T> bo : state.terminate(n)) {
bo.emitNext(n);
}
}

}

@Override
public void onError(final Throwable e) {
Collection<SubjectObserver<? super T>> observers = subscriptionManager.terminate(new Action0() {

@Override
public void call() {
lastNotification.set(Notification.<T> createOnError(e));
}
});
if (observers != null) {
for (Observer<? super T> o : observers) {
o.onError(e);
Object last = state.get();
if (last == null || state.active) {
Object n = nl.error(e);
for (BehaviorSubject.BehaviorObserver<T> bo : state.terminate(n)) {
bo.emitNext(n);
}
}
}

@Override
public void onNext(T v) {
for (Observer<? super T> o : subscriptionManager.rawSnapshot()) {
o.onNext(v);
for (BehaviorSubject.BehaviorObserver<T> bo : state.observers()) {
bo.emitNext(nl.next(v));
}
}
}