Skip to content

Commit

Permalink
Merge pull request ReactiveX#1967 from zsxwing/fix-groupby-unsubscribe
Browse files Browse the repository at this point in the history
Fix the issue that GroupBy may not call 'unsubscribe'
  • Loading branch information
benjchristensen committed Dec 17, 2014
2 parents 5c0be52 + 31281b6 commit 7c408f4
Show file tree
Hide file tree
Showing 2 changed files with 137 additions and 16 deletions.
84 changes: 68 additions & 16 deletions src/main/java/rx/internal/operators/OperatorGroupBy.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,12 @@
*/
package rx.internal.operators;

import java.util.Map;
import java.util.Queue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicLongFieldUpdater;
Expand All @@ -34,6 +36,7 @@
import rx.functions.Func1;
import rx.observables.GroupedObservable;
import rx.subjects.Subject;
import rx.subscriptions.Subscriptions;

/**
* Groups the items emitted by an Observable according to a specified criterion, and emits these
Expand Down Expand Up @@ -76,6 +79,13 @@ static final class GroupBySubscriber<K, T, R> extends Subscriber<T> {
final Func1<? super T, ? extends R> elementSelector;
final Subscriber<? super GroupedObservable<K, R>> child;

// We should not call `unsubscribe()` until `groups.isEmpty() && child.isUnsubscribed()` is true.
// Use `WIP_FOR_UNSUBSCRIBE_UPDATER` to monitor these statuses and call `unsubscribe()` properly.
// Should check both when `child.unsubscribe` is called and any group is removed.
@SuppressWarnings("rawtypes")
static final AtomicIntegerFieldUpdater<GroupBySubscriber> WIP_FOR_UNSUBSCRIBE_UPDATER = AtomicIntegerFieldUpdater.newUpdater(GroupBySubscriber.class, "wipForUnsubscribe");
volatile int wipForUnsubscribe = 1;

public GroupBySubscriber(
Func1<? super T, ? extends K> keySelector,
Func1<? super T, ? extends R> elementSelector,
Expand All @@ -84,6 +94,16 @@ public GroupBySubscriber(
this.keySelector = keySelector;
this.elementSelector = elementSelector;
this.child = child;
child.add(Subscriptions.create(new Action0() {

@Override
public void call() {
if (WIP_FOR_UNSUBSCRIBE_UPDATER.decrementAndGet(self) == 0) {
self.unsubscribe();
}
}

}));
}

private static class GroupState<K, T> {
Expand All @@ -107,7 +127,13 @@ public Observer<T> getObserver() {
private static final NotificationLite<Object> nl = NotificationLite.instance();

volatile int completionEmitted;
volatile int terminated;

private static final int UNTERMINATED = 0;
private static final int TERMINATED_WITH_COMPLETED = 1;
private static final int TERMINATED_WITH_ERROR = 2;

// Must be one of `UNTERMINATED`, `TERMINATED_WITH_COMPLETED`, `TERMINATED_WITH_ERROR`
volatile int terminated = UNTERMINATED;

@SuppressWarnings("rawtypes")
static final AtomicIntegerFieldUpdater<GroupBySubscriber> COMPLETION_EMITTED_UPDATER = AtomicIntegerFieldUpdater.newUpdater(GroupBySubscriber.class, "completionEmitted");
Expand All @@ -130,15 +156,15 @@ public void onStart() {

@Override
public void onCompleted() {
if (TERMINATED_UPDATER.compareAndSet(this, 0, 1)) {
if (TERMINATED_UPDATER.compareAndSet(this, UNTERMINATED, TERMINATED_WITH_COMPLETED)) {
// if we receive onCompleted from our parent we onComplete children
// for each group check if it is ready to accept more events if so pass the oncomplete through else buffer it.
for (GroupState<K, T> group : groups.values()) {
emitItem(group, nl.completed());
}

// special case (no groups emitted ... or all unsubscribed)
if (groups.size() == 0) {
if (groups.isEmpty()) {
// we must track 'completionEmitted' seperately from 'completed' since `completeInner` can result in childObserver.onCompleted() being emitted
if (COMPLETION_EMITTED_UPDATER.compareAndSet(this, 0, 1)) {
child.onCompleted();
Expand All @@ -149,9 +175,19 @@ public void onCompleted() {

@Override
public void onError(Throwable e) {
if (TERMINATED_UPDATER.compareAndSet(this, 0, 1)) {
// we immediately tear everything down if we receive an error
child.onError(e);
if (TERMINATED_UPDATER.compareAndSet(this, UNTERMINATED, TERMINATED_WITH_ERROR)) {
// It's safe to access all groups and emit the error.
// onNext and onError are in sequence so no group will be created in the loop.
for (GroupState<K, T> group : groups.values()) {
emitItem(group, nl.error(e));
}
try {
// we immediately tear everything down if we receive an error
child.onError(e);
} finally {
// We have not chained the subscribers, so need to call it explicitly.
unsubscribe();
}
}
}

Expand Down Expand Up @@ -187,7 +223,9 @@ public void onNext(T t) {
}
group = createNewGroup(key);
}
emitItem(group, nl.next(t));
if (group != null) {
emitItem(group, nl.next(t));
}
} catch (Throwable e) {
onError(OnErrorThrowable.addValueAsLastCause(e, t));
}
Expand Down Expand Up @@ -236,6 +274,11 @@ public void onCompleted() {
@Override
public void onError(Throwable e) {
o.onError(e);
// eagerly cleanup instead of waiting for unsubscribe
if (once.compareAndSet(false, true)) {
// done once per instance, either onComplete or onUnSubscribe
cleanupGroup(key);
}
}

@Override
Expand All @@ -250,7 +293,17 @@ public void onNext(T t) {
}
});

GroupState<K, T> putIfAbsent = groups.putIfAbsent(key, groupState);
GroupState<K, T> putIfAbsent;
for (;;) {
int wip = wipForUnsubscribe;
if (wip <= 0) {
return null;
}
if (WIP_FOR_UNSUBSCRIBE_UPDATER.compareAndSet(this, wip, wip + 1)) {
putIfAbsent = groups.putIfAbsent(key, groupState);
break;
}
}
if (putIfAbsent != null) {
// this shouldn't happen (because we receive onNext sequentially) and would mean we have a bug
throw new IllegalStateException("Group already existed while creating a new one");
Expand All @@ -264,7 +317,7 @@ private void cleanupGroup(Object key) {
GroupState<K, T> removed;
removed = groups.remove(key);
if (removed != null) {
if (removed.buffer.size() > 0) {
if (!removed.buffer.isEmpty()) {
BUFFERED_COUNT.addAndGet(self, -removed.buffer.size());
}
completeInner();
Expand Down Expand Up @@ -342,15 +395,14 @@ private void drainIfPossible(GroupState<K, T> groupState) {
}

private void completeInner() {
// if we have no outstanding groups (all completed or unsubscribe) and terminated/unsubscribed on outer
if (groups.size() == 0 && (terminated == 1 || child.isUnsubscribed())) {
// A group is removed, so check if we need to call `unsubscribe`
if (WIP_FOR_UNSUBSCRIBE_UPDATER.decrementAndGet(this) == 0) {
// It means `groups.isEmpty() && child.isUnsubscribed()` is true
unsubscribe();
} else if (groups.isEmpty() && terminated == TERMINATED_WITH_COMPLETED) {
// if we have no outstanding groups (all completed or unsubscribe) and terminated on outer
// completionEmitted ensures we only emit onCompleted once
if (COMPLETION_EMITTED_UPDATER.compareAndSet(this, 0, 1)) {

if (child.isUnsubscribed()) {
// if the entire groupBy has been unsubscribed and children are completed we will propagate the unsubscribe up.
unsubscribe();
}
child.onCompleted();
}
}
Expand Down
69 changes: 69 additions & 0 deletions src/test/java/rx/internal/operators/OperatorGroupByTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
import rx.Observable.OnSubscribe;
import rx.Observer;
import rx.Subscriber;
import rx.Subscription;
import rx.exceptions.TestException;
import rx.functions.Action0;
import rx.functions.Action1;
Expand Down Expand Up @@ -1385,4 +1386,72 @@ public void call(String s) {
assertEquals(null, key[0]);
assertEquals(Arrays.asList("a", "b", "c"), values);
}

@Test
public void testGroupByUnsubscribe() {
final Subscription s = mock(Subscription.class);
Observable<Integer> o = Observable.create(
new OnSubscribe<Integer>() {
@Override
public void call(Subscriber<? super Integer> subscriber) {
subscriber.add(s);
}
}
);
o.groupBy(new Func1<Integer, Integer>() {

@Override
public Integer call(Integer integer) {
return null;
}
}).subscribe().unsubscribe();
verify(s).unsubscribe();
}

@Test
public void testGroupByShouldPropagateError() {
final Throwable e = new RuntimeException("Oops");
final TestSubscriber<Integer> inner1 = new TestSubscriber<Integer>();
final TestSubscriber<Integer> inner2 = new TestSubscriber<Integer>();

final TestSubscriber<GroupedObservable<Integer, Integer>> outer
= new TestSubscriber<GroupedObservable<Integer, Integer>>(new Subscriber<GroupedObservable<Integer, Integer>>() {

@Override
public void onCompleted() {
}

@Override
public void onError(Throwable e) {
}

@Override
public void onNext(GroupedObservable<Integer, Integer> o) {
if (o.getKey() == 0) {
o.subscribe(inner1);
} else {
o.subscribe(inner2);
}
}
});
Observable.create(
new OnSubscribe<Integer>() {
@Override
public void call(Subscriber<? super Integer> subscriber) {
subscriber.onNext(0);
subscriber.onNext(1);
subscriber.onError(e);
}
}
).groupBy(new Func1<Integer, Integer>() {

@Override
public Integer call(Integer i) {
return i % 2;
}
}).subscribe(outer);
assertEquals(Arrays.asList(e), outer.getOnErrorEvents());
assertEquals(Arrays.asList(e), inner1.getOnErrorEvents());
assertEquals(Arrays.asList(e), inner2.getOnErrorEvents());
}
}

0 comments on commit 7c408f4

Please sign in to comment.