Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

No InterruptedException with synchronous BlockingObservable #3120

Merged
merged 1 commit into from
Jul 30, 2015

Conversation

ypresto
Copy link
Contributor

@ypresto ypresto commented Jul 29, 2015

I'm using Observable for backport of Java 8's java.util.stream (i.e. no more loops) for Android project.

List<Foo> list2 = Observable.from(list)
        .map(...)
        .filter(...)
        .toList().toBlocking().single()

But it sometimes emits InterruptedException at BlockingObservable.
#1804 (comment)

As the BlocingObservable is placed in map() of another observable with subscribeOn(Schedulers.io()), and it is unsubscribed from main thread, perhaps Future.cancel(true) is called on unsubscribing. (#1914)

This PR allows BlockingObsevable not to be interrupted when source observable emits synchronously, by checking current latch or queue state before awaiting for them.

@ypresto ypresto force-pushed the no-interrupt-for-sync branch from a9be33b to 8ad2260 Compare July 29, 2015 17:36
@akarnokd
Copy link
Member

So what happens is that you use some blocking calls inside an observable sequence and it gets interrupted because of the underlying task is cancelled?

I believe it is an indication that the flow can be assembled non-blockingly instead. I've been fiddling with the idea that once RxJava scheduler threads become identifiable, toBlocking() would throw/onError if it is attempted to use it inside such schedulers.

@ypresto
Copy link
Contributor Author

ypresto commented Jul 30, 2015

So what happens is that you use some blocking calls inside an observable sequence and it gets interrupted because of the underlying task is cancelled?

Yes. But in my case, blocking call is inside of another (synchronous) method and I don't want to make it non-blocking, for testability and convenience.

For example, in instagram-like app, there are entities to represent photo/movie and comment.
Both entities has entity translation logic and they can be written in for-loop or toList().toBlocking().single().

// MediaFileModel
public MediaFileEntity entityFromDao(MediaFileDao dao, List<CommentDao> commentDaoList) {
    List<CommentEntity> comments = Observable.from(commentDaoList)
            .map(mCommentModel::entityFromDao) // No more loops!
            .filter(not(mCommentModel::isDeleted))
            .toList().toBlocking().single(); // called from where..?
    MediaFileEntity mediaFile = new MediaFileEntity();
    mediaFile.setUuid(dao.getUuid());
    mediaFile.setComments(comments);
    ...
    return mediaFile;
}
// MediaFileStore
public Observable<List<MediaFileEntity>> allEntities() {
    return queryAll() // async operation
            .map(daoList -> {
                    Observable.from(daoList)
                            .map(mMediaFileModel::entityFromDao) // is calling toBlocking() or not...?
                            .toList()
            );
}

I think so too that asynchronous (with blocking wait) operation should be assembled non-blockingly.
But I think it is just convenient if nesting of synchronous (without blocking wait) operation is allowed. :)

@akarnokd
Copy link
Member

The changes look okay to me.

akarnokd added a commit that referenced this pull request Jul 30, 2015
No InterruptedException with synchronous BlockingObservable
@akarnokd akarnokd merged commit 072ffad into ReactiveX:1.x Jul 30, 2015
@ypresto
Copy link
Contributor Author

ypresto commented Jul 30, 2015

Thanks! 🎉 🎉

@ypresto ypresto deleted the no-interrupt-for-sync branch July 31, 2015 01:48
@ypresto
Copy link
Contributor Author

ypresto commented Apr 14, 2016

For your (who found this PR by searching) information:

RxJava is quite slow for synchronous operation.
Especially for zip(), it has significant overhead of waiting for async (and even if it is sync actually) observables.

So it'll better to use Java 8 Stream API or its backport for sync stream operations.
Lightweight-Stream-API has only <500 methods and also suitable for Android.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants