Skip to content

Commit

Permalink
Fix the issue that GroupBy may not call 'unsubscribe'
Browse files Browse the repository at this point in the history
  • Loading branch information
zsxwing committed Dec 14, 2014
1 parent 29e764a commit 8026b41
Show file tree
Hide file tree
Showing 2 changed files with 67 additions and 8 deletions.
53 changes: 45 additions & 8 deletions src/main/java/rx/internal/operators/OperatorGroupBy.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicLongFieldUpdater;
Expand All @@ -34,6 +35,7 @@
import rx.functions.Func1;
import rx.observables.GroupedObservable;
import rx.subjects.Subject;
import rx.subscriptions.Subscriptions;

/**
* Groups the items emitted by an Observable according to a specified criterion, and emits these
Expand Down Expand Up @@ -76,6 +78,10 @@ static final class GroupBySubscriber<K, T, R> extends Subscriber<T> {
final Func1<? super T, ? extends R> elementSelector;
final Subscriber<? super GroupedObservable<K, R>> child;

@SuppressWarnings("rawtypes")
static final AtomicIntegerFieldUpdater<GroupBySubscriber> WIP_FOR_UNSUBSCRIBE_UPDATER = AtomicIntegerFieldUpdater.newUpdater(GroupBySubscriber.class, "wipForUnsubscribe");
volatile int wipForUnsubscribe = 1;

public GroupBySubscriber(
Func1<? super T, ? extends K> keySelector,
Func1<? super T, ? extends R> elementSelector,
Expand All @@ -84,6 +90,16 @@ public GroupBySubscriber(
this.keySelector = keySelector;
this.elementSelector = elementSelector;
this.child = child;
child.add(Subscriptions.create(new Action0() {

@Override
public void call() {
if (WIP_FOR_UNSUBSCRIBE_UPDATER.decrementAndGet(self) == 0) {
self.unsubscribe();
}
}

}));
}

private static class GroupState<K, T> {
Expand Down Expand Up @@ -138,7 +154,7 @@ public void onCompleted() {
}

// special case (no groups emitted ... or all unsubscribed)
if (groups.size() == 0) {
if (groups.isEmpty()) {
// we must track 'completionEmitted' seperately from 'completed' since `completeInner` can result in childObserver.onCompleted() being emitted
if (COMPLETION_EMITTED_UPDATER.compareAndSet(this, 0, 1)) {
child.onCompleted();
Expand All @@ -150,8 +166,13 @@ public void onCompleted() {
@Override
public void onError(Throwable e) {
if (TERMINATED_UPDATER.compareAndSet(this, 0, 1)) {
// we immediately tear everything down if we receive an error
child.onError(e);
try {
// we immediately tear everything down if we receive an error
child.onError(e);
} finally {
// We have not chained the subscribers, so need to call it explicitly.
unsubscribe();
}
}
}

Expand Down Expand Up @@ -187,7 +208,9 @@ public void onNext(T t) {
}
group = createNewGroup(key);
}
emitItem(group, nl.next(t));
if (group != null) {
emitItem(group, nl.next(t));
}
} catch (Throwable e) {
onError(OnErrorThrowable.addValueAsLastCause(e, t));
}
Expand Down Expand Up @@ -250,7 +273,17 @@ public void onNext(T t) {
}
});

GroupState<K, T> putIfAbsent = groups.putIfAbsent(key, groupState);
GroupState<K, T> putIfAbsent;
for (;;) {
int wip = wipForUnsubscribe;
if (wip <= 0) {
return null;
}
if (WIP_FOR_UNSUBSCRIBE_UPDATER.compareAndSet(this, wip, wip + 1)) {
putIfAbsent = groups.putIfAbsent(key, groupState);
break;
}
}
if (putIfAbsent != null) {
// this shouldn't happen (because we receive onNext sequentially) and would mean we have a bug
throw new IllegalStateException("Group already existed while creating a new one");
Expand All @@ -264,7 +297,7 @@ private void cleanupGroup(Object key) {
GroupState<K, T> removed;
removed = groups.remove(key);
if (removed != null) {
if (removed.buffer.size() > 0) {
if (!removed.buffer.isEmpty()) {
BUFFERED_COUNT.addAndGet(self, -removed.buffer.size());
}
completeInner();
Expand Down Expand Up @@ -342,16 +375,20 @@ private void drainIfPossible(GroupState<K, T> groupState) {
}

private void completeInner() {
if (WIP_FOR_UNSUBSCRIBE_UPDATER.decrementAndGet(this) == 0) {
unsubscribe();
}
// if we have no outstanding groups (all completed or unsubscribe) and terminated/unsubscribed on outer
if (groups.size() == 0 && (terminated == 1 || child.isUnsubscribed())) {
if (groups.isEmpty() && (terminated == 1 || child.isUnsubscribed())) {
// completionEmitted ensures we only emit onCompleted once
if (COMPLETION_EMITTED_UPDATER.compareAndSet(this, 0, 1)) {

if (child.isUnsubscribed()) {
// if the entire groupBy has been unsubscribed and children are completed we will propagate the unsubscribe up.
unsubscribe();
} else {
child.onCompleted();
}
child.onCompleted();
}
}
}
Expand Down
22 changes: 22 additions & 0 deletions src/test/java/rx/internal/operators/OperatorGroupByTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
import rx.Observable.OnSubscribe;
import rx.Observer;
import rx.Subscriber;
import rx.Subscription;
import rx.exceptions.TestException;
import rx.functions.Action0;
import rx.functions.Action1;
Expand Down Expand Up @@ -1385,4 +1386,25 @@ public void call(String s) {
assertEquals(null, key[0]);
assertEquals(Arrays.asList("a", "b", "c"), values);
}

@Test
public void testGroupByUnsubscribe() {
final Subscription s = mock(Subscription.class);
Observable<Integer> o = Observable.create(
new OnSubscribe<Integer>() {
@Override
public void call(Subscriber<? super Integer> subscriber) {
subscriber.add(s);
}
}
);
o.groupBy(new Func1<Integer, Integer>() {

@Override
public Integer call(Integer integer) {
return null;
}
}).subscribe().unsubscribe();
verify(s).unsubscribe();
}
}

0 comments on commit 8026b41

Please sign in to comment.