-
Notifications
You must be signed in to change notification settings - Fork 7.6k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Reentrant scheduling2 #648
Conversation
RxJava-pull-requests #582 SUCCESS |
This seems like it's a bug inside current Schedulers as recursion should work without memory leaks. Here is the histogram showing the leak:
I will dig in to where this leak is occurring. We should not need new subscription or scheduler types to solve this, otherwise anything using schedulers is broken. Code in Java 6 for proving the leak as modified from the original Java 8 bug report: /**
* Generates an observable sequence by iterating a state from an initial
* state until the condition returns false.
*/
public static <TState, R> OnSubscribeFunc<R> generate(
final TState initialState,
final Func1<TState, Boolean> condition,
final Func1<TState, TState> iterate,
final Func1<TState, R> resultSelector,
final Scheduler scheduler) {
return new OnSubscribeFunc<R>() {
@Override
public Subscription onSubscribe(final Observer<? super R> observer) {
return scheduler.schedule(initialState, new Func2<Scheduler, TState, Subscription>() {
@Override
public Subscription call(Scheduler s, TState state) {
boolean hasNext;
try {
hasNext = condition.call(state);
} catch (Throwable t) {
observer.onError(t);
return Subscriptions.empty();
}
if (hasNext) {
R result;
try {
result = resultSelector.call(state);
} catch (Throwable t) {
observer.onError(t);
return Subscriptions.empty();
}
observer.onNext(result);
TState nextState;
try {
nextState = iterate.call(state);
} catch (Throwable t) {
observer.onError(t);
return Subscriptions.empty();
}
return s.schedule(nextState, this);
}
observer.onCompleted();
return Subscriptions.empty();
}
});
}
};
}
public static void main(String[] args) throws Exception {
// Thread.sleep(10000);
Observable<Integer> source = Observable.create(generate(
0, new Func1<Integer, Boolean>() {
@Override
public Boolean call(Integer t1) {
return true;
}
},
new Func1<Integer, Integer>() {
@Override
public Integer call(Integer t) {
return t + 1;
}
},
new Func1<Integer, Integer>() {
@Override
public Integer call(Integer t) {
return t;
}
}, Schedulers.newThread()));
final CountDownLatch latch = new CountDownLatch(1);
Subscription s = source.subscribe(new Observer<Integer>() {
@Override
public void onCompleted() {
latch.countDown();
}
@Override
public void onError(Throwable e) {
e.printStackTrace();
latch.countDown();
}
@Override
public void onNext(Integer v) {
if (v % 100000 == 0) {
System.out.println(v);
}
if (v >= 10000000) {
latch.countDown();
}
}
});
latch.await();
System.out.println("Wait done.");
s.unsubscribe();
System.out.println("Unsubscribe done.");
} @headinthebox will try in .Net |
This memory leak is fixed in #712 |
Revised version of PR #643.
scheduleRunnable()
overloads toScheduler
directly to avoid constant wrapping betweenRunnable
andAction0
.ReentrantScheduler
to work with a parent scheduler directly.ForwardSubscription
withIncrementalSubscription
as the first one didn't correctly managed the orderly nature of swapping subscriptions: an unfortunate thread scheduling could have swapped in an older subscription before a new subscription.In my opinion, the
Scheduler
and its implementations should useRunnable
as the internal unit of work instead ofAction0
. Since theExecutor
s requireRunnable
anyway, less wrapping means less memory and better performance. TheScheduler
interface can retain theAction0
overloads but no other implementation should need to deal with them.