-
Notifications
You must be signed in to change notification settings - Fork 7.6k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
toMap - prevent multiple terminal events, support backpressure #4251
toMap - prevent multiple terminal events, support backpressure #4251
Conversation
I thought it supported backpressure. Could you rewrite it by using |
unrelated test failure:
|
Sure. |
What I might do is fix multiple terminal emissions in |
assertEquals(Arrays.asList(e2), list); | ||
ts.assertNotCompleted(); | ||
} finally { | ||
RxJavaHooks.setOnError(null); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This needs fixing as well.
In terms of releasing a value so can be gc'd I thought to modify protected final void complete(R value) {
Subscriber<? super R> a = actual;
for (;;) {
int s = state.get();
if (s == NO_REQUEST_HAS_VALUE || s == HAS_REQUEST_HAS_VALUE || a.isUnsubscribed()) {
return;
}
if (s == HAS_REQUEST_NO_VALUE) {
R v = value; // <--------------------------------
value = null; // <--------------------------------
a.onNext(v);
if (!a.isUnsubscribed()) {
a.onCompleted();
}
state.lazySet(HAS_REQUEST_HAS_VALUE);
return;
}
this.value = value;
if (state.compareAndSet(NO_REQUEST_NO_VALUE, NO_REQUEST_HAS_VALUE)) {
return;
}
}
} Is that ok? |
don't review latest change, just committed so could work on it from another location |
I'm not sure if it's worth it. Many operators don't really do that because it is also likely the whole chain gets forgotten and GC claims all of them on its own. If you want to make sure there is no leak, use |
Could you rebase the whole thing? Somehow, you picked up changes to master and they show up in the diff. |
Current coverage is 84.39% (diff: 100%)@@ 1.x #4251 diff @@
==========================================
Files 267 267
Lines 17460 17460
Methods 0 0
Messages 0 0
Branches 2660 2662 +2
==========================================
+ Hits 14732 14735 +3
- Misses 1865 1869 +4
+ Partials 863 856 -7
|
2e13ecf
to
c705109
Compare
|
👍 |
Func1<? super T, ? extends K> keySelector, | ||
Func1<? super T, ? extends V> valueSelector) { | ||
this(keySelector, valueSelector, new DefaultToMapFactory<K, V>()); | ||
this(source, keySelector, valueSelector, DefaultMapFactory.<K, V>instance()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
A small trick: make the OnSubscribeToMap also implement Func0<Map<K, V>>
and assign this
to the map factory function - of course you can't use this()
constructor delegation but assign to the final fields explicitly. This way, there is no need for an extra class.
c705109
to
b2007cc
Compare
Good idea, I've updated the PR. |
👍 |
As per discussion in #4242, if an operator maps an
onNext
emission to anonError
emission downstream then it needs be defensive about anonCompleted
being sent from upstream even if upstream has been unsubscribed.Includes three unit tests that failed on the original code. The fix also has the side effect of enabling gc of
map
when the factory fails (onError
was called on the child, notthis
somap
was not set to null).