Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

toMap - prevent multiple terminal events, support backpressure #4251

Merged

Conversation

davidmoten
Copy link
Collaborator

As per discussion in #4242, if an operator maps an onNext emission to an onError emission downstream then it needs be defensive about an onCompleted being sent from upstream even if upstream has been unsubscribed.

Includes three unit tests that failed on the original code. The fix also has the side effect of enabling gc of map when the factory fails (onError was called on the child, not this so map was not set to null).

@akarnokd
Copy link
Member

I thought it supported backpressure. Could you rewrite it by using DeferredScalarSubscription?

@akarnokd akarnokd added the Bug label Jul 28, 2016
@akarnokd akarnokd added this to the 1.2 milestone Jul 28, 2016
@davidmoten
Copy link
Collaborator Author

unrelated test failure:

rx.schedulers.ComputationSchedulerTests > testHandledErrorIsNotDeliveredToThreadHandler FAILED
    java.lang.AssertionError: Handler should not have received anything expected:<0> but was:<1>
        at org.junit.Assert.fail(Assert.java:88)
        at org.junit.Assert.failNotEquals(Assert.java:834)
        at org.junit.Assert.assertEquals(Assert.java:645)
        at rx.schedulers.SchedulerTests.testHandledErrorIsNotDeliveredToThreadHandler(SchedulerTests.java:102)
        at rx.schedulers.ComputationSchedulerTests.testHandledErrorIsNotDeliveredToThreadHandler(ComputationSchedulerTests.java:153)

@davidmoten
Copy link
Collaborator Author

I thought it supported backpressure. Could you rewrite it by using DeferredScalarSubscription?

Sure.

@davidmoten
Copy link
Collaborator Author

What I might do is fix multiple terminal emissions in OnSubscribeCollect first and make necessary done changes to DeferredScalarSubscriber in that PR. I'll follow that with the backpressure fix for OperatorToMap.

assertEquals(Arrays.asList(e2), list);
ts.assertNotCompleted();
} finally {
RxJavaHooks.setOnError(null);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This needs fixing as well.

@davidmoten
Copy link
Collaborator Author

@akarnokd

In terms of releasing a value so can be gc'd I thought to modify DeferredScalarSubscriber in this method so that the field this.value is set to null just before the call to a.onNext():

protected final void complete(R value) {
        Subscriber<? super R> a = actual;
        for (;;) {
            int s = state.get();

            if (s == NO_REQUEST_HAS_VALUE || s == HAS_REQUEST_HAS_VALUE || a.isUnsubscribed()) {
                return;
            }
            if (s == HAS_REQUEST_NO_VALUE) {
                R v = value; // <--------------------------------
                value = null; // <--------------------------------
                a.onNext(v);
                if (!a.isUnsubscribed()) {
                    a.onCompleted();
                }
                state.lazySet(HAS_REQUEST_HAS_VALUE);
                return;
            }
            this.value = value;
            if (state.compareAndSet(NO_REQUEST_NO_VALUE, NO_REQUEST_HAS_VALUE)) {
                return;
            }
        }
    }

Is that ok?

@davidmoten
Copy link
Collaborator Author

don't review latest change, just committed so could work on it from another location

@akarnokd
Copy link
Member

I'm not sure if it's worth it. Many operators don't really do that because it is also likely the whole chain gets forgotten and GC claims all of them on its own. If you want to make sure there is no leak, use onTerminateDetach.

@akarnokd
Copy link
Member

Could you rebase the whole thing? Somehow, you picked up changes to master and they show up in the diff.

@codecov-io
Copy link

codecov-io commented Jul 29, 2016

Current coverage is 84.39% (diff: 100%)

Merging #4251 into 1.x will increase coverage by 0.01%

@@                1.x      #4251   diff @@
==========================================
  Files           267        267          
  Lines         17460      17460          
  Methods           0          0          
  Messages          0          0          
  Branches       2660       2662     +2   
==========================================
+ Hits          14732      14735     +3   
- Misses         1865       1869     +4   
+ Partials        863        856     -7   

Powered by Codecov. Last update 0577b4c...37da430

@davidmoten davidmoten force-pushed the to-map-prevent-multiple-terminal-events branch 2 times, most recently from 2e13ecf to c705109 Compare July 29, 2016 11:13
@davidmoten
Copy link
Collaborator Author

  • Rebased
  • Rewrote to use DeferredScalarSubscriberSafe
  • use singleton of DefaultMapFactory
  • moved to OnSubscribe to save allocations
  • added backpressure test

@akarnokd
Copy link
Member

👍

Func1<? super T, ? extends K> keySelector,
Func1<? super T, ? extends V> valueSelector) {
this(keySelector, valueSelector, new DefaultToMapFactory<K, V>());
this(source, keySelector, valueSelector, DefaultMapFactory.<K, V>instance());
Copy link
Member

@akarnokd akarnokd Jul 29, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A small trick: make the OnSubscribeToMap also implement Func0<Map<K, V>> and assign this to the map factory function - of course you can't use this() constructor delegation but assign to the final fields explicitly. This way, there is no need for an extra class.

@davidmoten davidmoten changed the title toMap - prevent multiple terminal events toMap - prevent multiple terminal events, support backpressure Jul 29, 2016
@davidmoten davidmoten force-pushed the to-map-prevent-multiple-terminal-events branch from c705109 to b2007cc Compare July 29, 2016 22:25
@davidmoten
Copy link
Collaborator Author

Good idea, I've updated the PR.

@akarnokd
Copy link
Member

👍

@akarnokd akarnokd merged commit 969d94c into ReactiveX:1.x Jul 30, 2016
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants