Skip to content

Commit

Permalink
Unsubscribe when thread is interrupted
Browse files Browse the repository at this point in the history
  • Loading branch information
Roman Mazur committed Jan 20, 2015
1 parent c49704b commit 7020bcc
Show file tree
Hide file tree
Showing 5 changed files with 142 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,7 @@ public boolean hasNext() {
try {
notify.acquire();
} catch (InterruptedException ex) {
unsubscribe();
Thread.currentThread().interrupt();
iNotif = Notification.createOnError(ex);
throw Exceptions.propagate(ex);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,7 @@ private boolean moveToNext() {
}
throw new IllegalStateException("Should not reach here");
} catch (InterruptedException e) {
observer.unsubscribe();
Thread.currentThread().interrupt();
error = e;
throw Exceptions.propagate(error);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import rx.Notification;
import rx.Observable;
import rx.Subscriber;
import rx.Subscription;
import rx.exceptions.Exceptions;

/**
Expand All @@ -49,7 +50,7 @@ public static <T> Iterator<T> toIterator(Observable<? extends T> source) {
final BlockingQueue<Notification<? extends T>> notifications = new LinkedBlockingQueue<Notification<? extends T>>();

// using subscribe instead of unsafeSubscribe since this is a BlockingObservable "final subscribe"
source.materialize().subscribe(new Subscriber<Notification<? extends T>>() {
final Subscription subscription = source.materialize().subscribe(new Subscriber<Notification<? extends T>>() {
@Override
public void onCompleted() {
// ignore
Expand Down Expand Up @@ -94,6 +95,7 @@ private Notification<? extends T> take() {
try {
return notifications.take();
} catch (InterruptedException e) {
subscription.unsubscribe();
throw Exceptions.propagate(e);
}
}
Expand Down
7 changes: 5 additions & 2 deletions src/main/java/rx/observables/BlockingObservable.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@

import rx.Observable;
import rx.Subscriber;
import rx.Subscription;
import rx.functions.Action1;
import rx.functions.Func1;
import rx.functions.Functions;
Expand Down Expand Up @@ -98,7 +99,7 @@ public void forEach(final Action1<? super T> onNext) {
* Use 'subscribe' instead of 'unsafeSubscribe' for Rx contract behavior
* as this is the final subscribe in the chain.
*/
o.subscribe(new Subscriber<T>() {
Subscription subscription = o.subscribe(new Subscriber<T>() {
@Override
public void onCompleted() {
latch.countDown();
Expand Down Expand Up @@ -127,6 +128,7 @@ public void onNext(T args) {
try {
latch.await();
} catch (InterruptedException e) {
subscription.unsubscribe();
// set the interrupted flag again so callers can still get it
// for more information see https://github.com/ReactiveX/RxJava/pull/147#issuecomment-13624780
Thread.currentThread().interrupt();
Expand Down Expand Up @@ -438,7 +440,7 @@ private T blockForSingle(final Observable<? extends T> observable) {
final AtomicReference<Throwable> returnException = new AtomicReference<Throwable>();
final CountDownLatch latch = new CountDownLatch(1);

observable.subscribe(new Subscriber<T>() {
Subscription subscription = observable.subscribe(new Subscriber<T>() {
@Override
public void onCompleted() {
latch.countDown();
Expand All @@ -459,6 +461,7 @@ public void onNext(final T item) {
try {
latch.await();
} catch (InterruptedException e) {
subscription.unsubscribe();
Thread.currentThread().interrupt();
throw new RuntimeException("Interrupted while waiting for subscription to complete.", e);
}
Expand Down
132 changes: 132 additions & 0 deletions src/test/java/rx/observables/BlockingObservableTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -410,4 +410,136 @@ public void call() {
}
assertTrue("Timeout means `unsubscribe` is not called", unsubscribe.await(30, TimeUnit.SECONDS));
}

@Test
public void testUnsubscribeFromSingleWhenInterrupted() throws InterruptedException {
new InterruptionTests().assertUnsubscribeIsInvoked("single()", new Action1<BlockingObservable<Void>>() {
@Override
public void call(final BlockingObservable<Void> o) {
o.single();
}
});
}

@Test
public void testUnsubscribeFromForEachWhenInterrupted() throws InterruptedException {
new InterruptionTests().assertUnsubscribeIsInvoked("forEach()", new Action1<BlockingObservable<Void>>() {
@Override
public void call(final BlockingObservable<Void> o) {
o.forEach(new Action1<Void>() {
@Override
public void call(final Void aVoid) {
// nothing
}
});
}
});
}

@Test
public void testUnsubscribeFromFirstWhenInterrupted() throws InterruptedException {
new InterruptionTests().assertUnsubscribeIsInvoked("first()", new Action1<BlockingObservable<Void>>() {
@Override
public void call(final BlockingObservable<Void> o) {
o.first();
}
});
}

@Test
public void testUnsubscribeFromLastWhenInterrupted() throws InterruptedException {
new InterruptionTests().assertUnsubscribeIsInvoked("last()", new Action1<BlockingObservable<Void>>() {
@Override
public void call(final BlockingObservable<Void> o) {
o.last();
}
});
}

@Test
public void testUnsubscribeFromLatestWhenInterrupted() throws InterruptedException {
new InterruptionTests().assertUnsubscribeIsInvoked("latest()", new Action1<BlockingObservable<Void>>() {
@Override
public void call(final BlockingObservable<Void> o) {
o.latest().iterator().next();
}
});
}

@Test
public void testUnsubscribeFromNextWhenInterrupted() throws InterruptedException {
new InterruptionTests().assertUnsubscribeIsInvoked("next()", new Action1<BlockingObservable<Void>>() {
@Override
public void call(final BlockingObservable<Void> o) {
o.next().iterator().next();
}
});
}

@Test
public void testUnsubscribeFromGetIteratorWhenInterrupted() throws InterruptedException {
new InterruptionTests().assertUnsubscribeIsInvoked("getIterator()", new Action1<BlockingObservable<Void>>() {
@Override
public void call(final BlockingObservable<Void> o) {
o.getIterator().next();
}
});
}

@Test
public void testUnsubscribeFromToIterableWhenInterrupted() throws InterruptedException {
new InterruptionTests().assertUnsubscribeIsInvoked("toIterable()", new Action1<BlockingObservable<Void>>() {
@Override
public void call(final BlockingObservable<Void> o) {
o.toIterable().iterator().next();
}
});
}

/** Utilities set for interruption behaviour tests. */
private static class InterruptionTests {

private boolean isUnSubscribed;
private RuntimeException error;
private CountDownLatch latch = new CountDownLatch(1);

private Observable<Void> createObservable() {
return Observable.<Void>never().doOnUnsubscribe(new Action0() {
@Override
public void call() {
isUnSubscribed = true;
}
});
}

private void startBlockingAndInterrupt(final Action1<BlockingObservable<Void>> blockingAction) {
Thread subscriptionThread = new Thread() {
@Override
public void run() {
try {
blockingAction.call(createObservable().toBlocking());
} catch (RuntimeException e) {
if (!(e.getCause() instanceof InterruptedException)) {
error = e;
}
}
latch.countDown();
}
};
subscriptionThread.start();
subscriptionThread.interrupt();
}

void assertUnsubscribeIsInvoked(final String method, final Action1<BlockingObservable<Void>> blockingAction)
throws InterruptedException {
startBlockingAndInterrupt(blockingAction);
assertTrue("Timeout means interruption is not performed", latch.await(30, TimeUnit.SECONDS));
if (error != null) {
throw error;
}
assertTrue("'unsubscribe' is not invoked when thread is interrupted for " + method, isUnSubscribed);
}

}

}

0 comments on commit 7020bcc

Please sign in to comment.