-
Notifications
You must be signed in to change notification settings - Fork 7.6k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Unexpected Observable#cache behavior #2913
Comments
Hi, from javadoc:
The parameter is just the hint for the underlying buffer to avoid unnecessary resizing of the buffer (an ArrayList) if the expected number of items are known upfront. |
I'm confused by this, in the first line of
I didn't notice that the |
It doesn't refer to |
Is there a way to get |
You need to roll your own operator, just take the source of OnSubscribeCache and change the default ReplaySubject: public final class OnSubscribeBoundedCache<T> implements OnSubscribe<T> {
protected final Observable<? extends T> source;
protected final Subject<? super T, ? extends T> cache;
volatile int sourceSubscribed;
@SuppressWarnings("rawtypes")
static final AtomicIntegerFieldUpdater<OnSubscribeBoundedCache> SRC_SUBSCRIBED_UPDATER
= AtomicIntegerFieldUpdater.newUpdater(OnSubscribeBoundedCache.class, "sourceSubscribed");
public OnSubscribeBoundedCache(Observable<? extends T> source, int bufferSize) {
this.source = source;
this.cache = ReplaySubject.<T> createWithSize(bufferSize);
}
@Override
public void call(Subscriber<? super T> s) {
if (SRC_SUBSCRIBED_UPDATER.compareAndSet(this, 0, 1)) {
source.subscribe(cache);
}
cache.unsafeSubscribe(s);
}
}
Observable cached = Observable.create(new OnSubscribeBoundedCache(source, 256))
cached.subscribe(...);
cached.subscribe(...); |
Thanks @akarnokd! that has the exact semantics I needed :) |
I recently started noticing that
cache
was not behaving as I expected. I might have misunderstood its semantics, but I thoughtcache
was equivalent toreplay
, except it would onlyconnect
to the underlying observable on the first subscription.I wrote this simple test that validates the semantics that I was expecting, and it's not passing:
Was my assumption about its behavior wrong, or does this expose a bug?
If I change
subject.cache(1)
tosubject.replay(1)
and Iconnect
the returned observable, the test passes.The text was updated successfully, but these errors were encountered: