Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Unexpected Observable#cache behavior #2913

Closed
NachoSoto opened this issue Apr 23, 2015 · 6 comments
Closed

Unexpected Observable#cache behavior #2913

NachoSoto opened this issue Apr 23, 2015 · 6 comments

Comments

@NachoSoto
Copy link

I recently started noticing that cache was not behaving as I expected. I might have misunderstood its semantics, but I thought cache was equivalent to replay, except it would only connect to the underlying observable on the first subscription.

I wrote this simple test that validates the semantics that I was expecting, and it's not passing:

final PublishSubject<Integer> subject = PublishSubject.create();

final Observable<Integer> cachedObservable = subject.cache(1);
TestObserver<Integer> observer;

// Subscribe for the first time, without prior events
observer = new TestObserver<>();
cachedObservable.subscribe(observer);

observer.assertReceivedOnNext(ImmutableList.<Integer>of());

subject.onNext(1);
observer.assertReceivedOnNext(ImmutableList.of(1));

subject.onNext(2);
observer.assertReceivedOnNext(ImmutableList.of(1, 2));

// Resubscribe
observer = new TestObserver<>();
cachedObservable.subscribe(observer);

// Should receive the last value because capacity is 1.
observer.assertReceivedOnNext(ImmutableList.of(2));

Was my assumption about its behavior wrong, or does this expose a bug?

If I change subject.cache(1) to subject.replay(1) and I connect the returned observable, the test passes.

@akarnokd
Copy link
Member

Hi, from javadoc:

@param capacity hint for number of items to cache (for optimizing underlying data structure)

The parameter is just the hint for the underlying buffer to avoid unnecessary resizing of the buffer (an ArrayList) if the expected number of items are known upfront.

@NachoSoto
Copy link
Author

I'm confused by this, in the first line of Observable#cache's documentation though:

This method has similar behavior to replay() except that this
auto-subscribes to the source Observable rather than returning
a ConnectableObservable for which you must call connect to activate the subscription.

I didn't notice that the capacity of both operators has actually completely different meanings. Why would the documentation say that they have similar behaviors?

@akarnokd
Copy link
Member

It doesn't refer to replay(int bufferSize) but to the unbounded replay().

@NachoSoto
Copy link
Author

Is there a way to get cache semantics? (i.e., auto connecting on first subscription) while being able to specify a capacity, like replay?

@akarnokd
Copy link
Member

You need to roll your own operator, just take the source of OnSubscribeCache and change the default ReplaySubject:

public final class OnSubscribeBoundedCache<T> implements OnSubscribe<T> {
    protected final Observable<? extends T> source;
    protected final Subject<? super T, ? extends T> cache;
    volatile int sourceSubscribed;
    @SuppressWarnings("rawtypes")
    static final AtomicIntegerFieldUpdater<OnSubscribeBoundedCache> SRC_SUBSCRIBED_UPDATER
            = AtomicIntegerFieldUpdater.newUpdater(OnSubscribeBoundedCache.class, "sourceSubscribed");

    public OnSubscribeBoundedCache(Observable<? extends T> source, int bufferSize) {
        this.source = source;
        this.cache = ReplaySubject.<T> createWithSize(bufferSize);
    }
    @Override
    public void call(Subscriber<? super T> s) {
        if (SRC_SUBSCRIBED_UPDATER.compareAndSet(this, 0, 1)) {
            source.subscribe(cache);
        }
        cache.unsafeSubscribe(s);
    }
}
Observable cached = Observable.create(new OnSubscribeBoundedCache(source, 256))
cached.subscribe(...);
cached.subscribe(...);

@NachoSoto
Copy link
Author

Thanks @akarnokd! that has the exact semantics I needed :)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants