Skip to content

Commit

Permalink
1.x: fix from(Iterable) error handling of Iterable/Iterator (#3862)
Browse files Browse the repository at this point in the history
* 1.x: fix from(Iterable) error handling of Iterable/Iterator

* Check dead-on-arrival Subscribers

* Use n again to avoid a potential cache-miss with get()
  • Loading branch information
akarnokd committed Apr 21, 2016
1 parent 95389c2 commit 3f6c4fd
Show file tree
Hide file tree
Showing 2 changed files with 325 additions and 49 deletions.
130 changes: 94 additions & 36 deletions src/main/java/rx/internal/operators/OnSubscribeFromIterable.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import rx.*;
import rx.Observable.OnSubscribe;
import rx.exceptions.Exceptions;

/**
* Converts an {@code Iterable} sequence into an {@code Observable}.
Expand All @@ -42,11 +43,25 @@ public OnSubscribeFromIterable(Iterable<? extends T> iterable) {

@Override
public void call(final Subscriber<? super T> o) {
final Iterator<? extends T> it = is.iterator();
if (!it.hasNext() && !o.isUnsubscribed())
o.onCompleted();
else
o.setProducer(new IterableProducer<T>(o, it));
final Iterator<? extends T> it;
boolean b;

try {
it = is.iterator();

b = it.hasNext();
} catch (Throwable ex) {
Exceptions.throwOrReport(ex, o);
return;
}

if (!o.isUnsubscribed()) {
if (!b) {
o.onCompleted();
} else {
o.setProducer(new IterableProducer<T>(o, it));
}
}
}

private static final class IterableProducer<T> extends AtomicLong implements Producer {
Expand Down Expand Up @@ -81,55 +96,98 @@ void slowpath(long n) {
final Iterator<? extends T> it = this.it;

long r = n;
while (true) {
/*
* This complicated logic is done to avoid touching the
* volatile `requested` value during the loop itself. If
* it is touched during the loop the performance is
* impacted significantly.
*/
long numToEmit = r;
while (true) {
long e = 0;

for (;;) {
while (e != r) {
if (o.isUnsubscribed()) {
return;
} else if (it.hasNext()) {
if (--numToEmit >= 0) {
o.onNext(it.next());
} else
break;
} else if (!o.isUnsubscribed()) {
o.onCompleted();
}

T value;

try {
value = it.next();
} catch (Throwable ex) {
Exceptions.throwOrReport(ex, o);
return;
} else {
// is unsubscribed
}

o.onNext(value);

if (o.isUnsubscribed()) {
return;
}

boolean b;

try {
b = it.hasNext();
} catch (Throwable ex) {
Exceptions.throwOrReport(ex, o);
return;
}

if (!b) {
if (!o.isUnsubscribed()) {
o.onCompleted();
}
return;
}

e++;
}
r = addAndGet(-r);
if (r == 0L) {
// we're done emitting the number requested so
// return
return;

r = get();
if (e == r) {
r = BackpressureUtils.produced(this, e);
if (r == 0L) {
break;
}
e = 0L;
}

}

}

void fastpath() {
// fast-path without backpressure
final Subscriber<? super T> o = this.o;
final Iterator<? extends T> it = this.it;

while (true) {
for (;;) {
if (o.isUnsubscribed()) {
return;
} else if (it.hasNext()) {
o.onNext(it.next());
} else if (!o.isUnsubscribed()) {
o.onCompleted();
}

T value;

try {
value = it.next();
} catch (Throwable ex) {
Exceptions.throwOrReport(ex, o);
return;
}

o.onNext(value);

if (o.isUnsubscribed()) {
return;
} else {
// is unsubscribed
}

boolean b;

try {
b = it.hasNext();
} catch (Throwable ex) {
Exceptions.throwOrReport(ex, o);
return;
}

if (!b) {
if (!o.isUnsubscribed()) {
o.onCompleted();
}
return;
}
}
Expand Down
Loading

0 comments on commit 3f6c4fd

Please sign in to comment.