Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Reentrant scheduling2 #648

Closed
wants to merge 4 commits into from

Conversation

akarnokd
Copy link
Member

Revised version of PR #643.

  • Added scheduleRunnable() overloads to Scheduler directly to avoid constant wrapping between Runnable and Action0.
  • Removed the helper interface.
  • Reworked ReentrantScheduler to work with a parent scheduler directly.
  • Replaced ForwardSubscription with IncrementalSubscription as the first one didn't correctly managed the orderly nature of swapping subscriptions: an unfortunate thread scheduling could have swapped in an older subscription before a new subscription.

In my opinion, the Scheduler and its implementations should use Runnable as the internal unit of work instead of Action0. Since the Executors require Runnable anyway, less wrapping means less memory and better performance. The Scheduler interface can retain the Action0 overloads but no other implementation should need to deal with them.

@cloudbees-pull-request-builder

RxJava-pull-requests #582 SUCCESS
This pull request looks good

@benjchristensen
Copy link
Member

This seems like it's a bug inside current Schedulers as recursion should work without memory leaks. Here is the histogram showing the leak:

JVM version is 24.45-b08
Iterating over heap. This may take a while...
Object Histogram:

num       #instances    #bytes  Class description
--------------------------------------------------------------------------
1:      1488649 83360752    java.lang.Object[]
2:      2976652 47626432    java.util.concurrent.atomic.AtomicReference
3:      1488325 47626400    rx.schedulers.DiscardableAction
4:      1488324 47626368    java.util.concurrent.FutureTask
5:      1488330 35719920    java.util.ArrayList
6:      1488324 35719776    rx.subscriptions.CompositeSubscription$State
7:      1488456 23815296    java.lang.Integer
8:      1488326 23813216    java.util.concurrent.atomic.AtomicBoolean
9:      1488326 23813216    rx.operators.SafeObservableSubscription
10:     1488325 23813200    rx.subscriptions.CompositeSubscription
11:     1488324 23813184    rx.subscriptions.Subscriptions$3
12:     7036    905920  * MethodKlass
13:     7036    813896  * ConstMethodKlass
14:     499 567648  * ConstantPoolKlass
15:     499 348832  * InstanceKlassKlass
16:     447 339168  * ConstantPoolCacheKlass
17:     2051    148592  char[]
18:     702 106856  byte[]
19:     569 68872   java.lang.Class
20:     830 52568   * System ObjArray
21:     2027    48648   java.lang.String
22:     769 44128   short[]
23:     124 40800   * MethodDataKlass
24:     785 31400   java.util.TreeMap$Entry
25:     53  28408   * ObjArrayKlassKlass
26:     138 9936    java.lang.reflect.Field
27:     218 6976    java.util.concurrent.ConcurrentHashMap$HashEntry
28:     192 6568    java.lang.String[]
29:     138 4416    java.util.HashMap$Entry
30:     8   4288    * TypeArrayKlassKlass
31:     178 4272    java.util.LinkedList$Node
32:     116 3712    java.util.Hashtable$Entry
33:     97  3104    java.util.LinkedList
34:     193 3088    java.lang.Object
35:     46  2944    java.net.URL
36:     30  2848    java.util.HashMap$Entry[]
37:     66  2704    java.util.concurrent.ConcurrentHashMap$HashEntry[]
38:     66  2640    java.util.concurrent.ConcurrentHashMap$Segment
39:     72  2304    java.util.concurrent.locks.ReentrantLock$NonfairSync
40:     11  2288    * KlassKlass
41:     38  1824    sun.util.locale.LocaleObjectCache$CacheEntry
42:     36  1728    java.util.HashMap
43:     5   1696    int[]
44:     36  1440    java.util.LinkedHashMap$Entry
45:     18  1296    java.lang.reflect.Constructor
46:     16  1280    java.util.WeakHashMap$Entry[]
47:     1   1040    java.lang.Integer[]
48:     26  1040    java.lang.ref.SoftReference
49:     6   992 java.util.Hashtable$Entry[]
50:     16  896 java.util.WeakHashMap
51:     21  840 java.lang.ref.Finalizer

I will dig in to where this leak is occurring. We should not need new subscription or scheduler types to solve this, otherwise anything using schedulers is broken.

Code in Java 6 for proving the leak as modified from the original Java 8 bug report:

/**
     * Generates an observable sequence by iterating a state from an initial
     * state until the condition returns false.
     */
    public static <TState, R> OnSubscribeFunc<R> generate(
            final TState initialState,
            final Func1<TState, Boolean> condition,
            final Func1<TState, TState> iterate,
            final Func1<TState, R> resultSelector,
            final Scheduler scheduler) {
        return new OnSubscribeFunc<R>() {
            @Override
            public Subscription onSubscribe(final Observer<? super R> observer) {
                return scheduler.schedule(initialState, new Func2<Scheduler, TState, Subscription>() {
                    @Override
                    public Subscription call(Scheduler s, TState state) {
                        boolean hasNext;
                        try {
                            hasNext = condition.call(state);
                        } catch (Throwable t) {
                            observer.onError(t);
                            return Subscriptions.empty();
                        }
                        if (hasNext) {
                            R result;
                            try {
                                result = resultSelector.call(state);
                            } catch (Throwable t) {
                                observer.onError(t);
                                return Subscriptions.empty();
                            }
                            observer.onNext(result);

                            TState nextState;
                            try {
                                nextState = iterate.call(state);
                            } catch (Throwable t) {
                                observer.onError(t);
                                return Subscriptions.empty();
                            }

                            return s.schedule(nextState, this);
                        }
                        observer.onCompleted();
                        return Subscriptions.empty();
                    }
                });
            }
        };
    }

    public static void main(String[] args) throws Exception {

//        Thread.sleep(10000);

        Observable<Integer> source = Observable.create(generate(
                0, new Func1<Integer, Boolean>() {

                    @Override
                    public Boolean call(Integer t1) {
                        return true;
                    }
                },
                new Func1<Integer, Integer>() {

                    @Override
                    public Integer call(Integer t) {
                        return t + 1;
                    }
                },
                new Func1<Integer, Integer>() {

                    @Override
                    public Integer call(Integer t) {
                        return t;
                    }
                }, Schedulers.newThread()));

        final CountDownLatch latch = new CountDownLatch(1);
        Subscription s = source.subscribe(new Observer<Integer>() {

            @Override
            public void onCompleted() {
                latch.countDown();
            }

            @Override
            public void onError(Throwable e) {
                e.printStackTrace();
                latch.countDown();
            }

            @Override
            public void onNext(Integer v) {
                if (v % 100000 == 0) {
                    System.out.println(v);
                }
                if (v >= 10000000) {
                    latch.countDown();
                }
            }
        });

        latch.await();

        System.out.println("Wait done.");

        s.unsubscribe();

        System.out.println("Unsubscribe done.");
    }

@headinthebox will try in .Net

@benjchristensen
Copy link
Member

This memory leak is fixed in #712

@akarnokd akarnokd deleted the ReentrantScheduling2 branch January 13, 2014 09:58
jihoonson pushed a commit to jihoonson/RxJava that referenced this pull request Mar 6, 2020
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants