Skip to content

Commit

Permalink
2.x: Fix Observable.flatMap with maxConcurrency hangs (ReactiveX#6947)
Browse files Browse the repository at this point in the history
  • Loading branch information
vjgarciag96 committed Apr 14, 2020
1 parent 7ca43c7 commit fe1089b
Show file tree
Hide file tree
Showing 3 changed files with 73 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -334,6 +334,7 @@ void drainLoop() {
if (checkTerminate()) {
return;
}
int innerCompleted = 0;
SimplePlainQueue<U> svq = queue;

if (svq != null) {
Expand All @@ -349,9 +350,18 @@ void drainLoop() {
}

child.onNext(o);
innerCompleted++;
}
}

if (innerCompleted != 0) {
if (maxConcurrency != Integer.MAX_VALUE) {
subscribeMore(innerCompleted);
innerCompleted = 0;
}
continue;
}

boolean d = done;
svq = queue;
InnerObserver<?, ?>[] inner = observers.get();
Expand All @@ -376,7 +386,6 @@ void drainLoop() {
return;
}

int innerCompleted = 0;
if (n != 0) {
long startId = lastId;
int index = lastIndex;
Expand Down Expand Up @@ -463,27 +472,33 @@ void drainLoop() {

if (innerCompleted != 0) {
if (maxConcurrency != Integer.MAX_VALUE) {
while (innerCompleted-- != 0) {
ObservableSource<? extends U> p;
synchronized (this) {
p = sources.poll();
if (p == null) {
wip--;
continue;
}
}
subscribeInner(p);
}
subscribeMore(innerCompleted);
innerCompleted = 0;
}
continue;
}

missed = addAndGet(-missed);
if (missed == 0) {
break;
}
}
}

void subscribeMore(int innerCompleted) {
while (innerCompleted-- != 0) {
ObservableSource<? extends U> p;
synchronized (this) {
p = sources.poll();
if (p == null) {
wip--;
continue;
}
}
subscribeInner(p);
}
}

boolean checkTerminate() {
if (cancelled) {
return true;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1157,4 +1157,27 @@ public void innerErrorsMainCancelled() {

assertFalse("Has subscribers?", pp1.hasSubscribers());
}

@Test(timeout = 5000)
public void mixedScalarAsync() {
for (int i = 0; i < TestHelper.RACE_DEFAULT_LOOPS; i++) {
Flowable
.range(0, 20)
.flatMap(new Function<Integer, Publisher<?>>() {
@Override
public Publisher<?> apply(Integer integer) throws Exception {
if (integer % 5 != 0) {
return Flowable
.just(integer);
}

return Flowable
.just(-integer)
.observeOn(Schedulers.computation());
}
}, false, 1)
.ignoreElements()
.blockingAwait();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -1118,4 +1118,27 @@ public void innerErrorsMainCancelled() {

assertFalse("Has subscribers?", ps1.hasObservers());
}

@Test(timeout = 5000)
public void mixedScalarAsync() {
for (int i = 0; i < TestHelper.RACE_DEFAULT_LOOPS; i++) {
Observable
.range(0, 20)
.flatMap(new Function<Integer, ObservableSource<?>>() {
@Override
public ObservableSource<?> apply(Integer integer) throws Exception {
if (integer % 5 != 0) {
return Observable
.just(integer);
}

return Observable
.just(-integer)
.observeOn(Schedulers.computation());
}
}, false, 1)
.ignoreElements()
.blockingAwait();
}
}
}

0 comments on commit fe1089b

Please sign in to comment.