-
Notifications
You must be signed in to change notification settings - Fork 7.6k
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Merge pull request #3245 from akarnokd/BlockingObservablePart2x
BlockingObservable, next, latest, mostRecent, first, last, single,
- Loading branch information
Showing
8 changed files
with
1,029 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
130 changes: 130 additions & 0 deletions
130
src/main/java/io/reactivex/internal/operators/BlockingOperatorLatest.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,130 @@ | ||
/** | ||
* Copyright 2015 Netflix, Inc. | ||
* | ||
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in | ||
* compliance with the License. You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software distributed under the License is | ||
* distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See | ||
* the License for the specific language governing permissions and limitations under the License. | ||
*/ | ||
|
||
package io.reactivex.internal.operators; | ||
|
||
import java.util.*; | ||
import java.util.concurrent.Semaphore; | ||
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater; | ||
|
||
import org.reactivestreams.Publisher; | ||
|
||
import io.reactivex.*; | ||
import io.reactivex.Observable; | ||
import io.reactivex.internal.subscribers.DisposableSubscriber; | ||
import io.reactivex.internal.util.Exceptions; | ||
|
||
/** | ||
* Wait for and iterate over the latest values of the source observable. If the source works faster than the | ||
* iterator, values may be skipped, but not the {@code onError} or {@code onCompleted} events. | ||
*/ | ||
public enum BlockingOperatorLatest { | ||
; | ||
|
||
/** | ||
* Returns an {@code Iterable} that blocks until or unless the {@code Observable} emits an item that has not | ||
* been returned by the {@code Iterable}, then returns that item | ||
* | ||
* @param source | ||
* the source {@code Observable} | ||
* @return an {@code Iterable} that blocks until or unless the {@code Observable} emits an item that has not | ||
* been returned by the {@code Iterable}, then returns that item | ||
*/ | ||
public static <T> Iterable<T> latest(final Publisher<? extends T> source) { | ||
return new Iterable<T>() { | ||
@Override | ||
public Iterator<T> iterator() { | ||
LatestObserverIterator<T> lio = new LatestObserverIterator<>(); | ||
Observable.<T>fromPublisher(source).materialize().subscribe(lio); | ||
return lio; | ||
} | ||
}; | ||
} | ||
|
||
/** Observer of source, iterator for output. */ | ||
static final class LatestObserverIterator<T> extends DisposableSubscriber<Try<Optional<T>>> implements Iterator<T> { | ||
final Semaphore notify = new Semaphore(0); | ||
// observer's notification | ||
volatile Try<Optional<T>> value; | ||
/** Updater for the value field. */ | ||
@SuppressWarnings("rawtypes") | ||
static final AtomicReferenceFieldUpdater<LatestObserverIterator, Try> REFERENCE_UPDATER | ||
= AtomicReferenceFieldUpdater.newUpdater(LatestObserverIterator.class, Try.class, "value"); | ||
|
||
@Override | ||
public void onNext(Try<Optional<T>> args) { | ||
boolean wasntAvailable = REFERENCE_UPDATER.getAndSet(this, args) == null; | ||
if (wasntAvailable) { | ||
notify.release(); | ||
} | ||
} | ||
|
||
@Override | ||
public void onError(Throwable e) { | ||
// not expected | ||
} | ||
|
||
@Override | ||
public void onComplete() { | ||
// not expected | ||
} | ||
|
||
// iterator's notification | ||
Try<Optional<T>> iNotif; | ||
|
||
@Override | ||
public boolean hasNext() { | ||
if (iNotif != null && iNotif.hasError()) { | ||
throw Exceptions.propagate(iNotif.error()); | ||
} | ||
if (iNotif == null || iNotif.value().isPresent()) { | ||
if (iNotif == null) { | ||
try { | ||
notify.acquire(); | ||
} catch (InterruptedException ex) { | ||
dispose(); | ||
Thread.currentThread().interrupt(); | ||
iNotif = Notification.error(ex); | ||
throw Exceptions.propagate(ex); | ||
} | ||
|
||
@SuppressWarnings("unchecked") | ||
Try<Optional<T>> n = REFERENCE_UPDATER.getAndSet(this, null); | ||
iNotif = n; | ||
if (iNotif.hasError()) { | ||
throw Exceptions.propagate(iNotif.error()); | ||
} | ||
} | ||
} | ||
return iNotif.value().isPresent(); | ||
} | ||
|
||
@Override | ||
public T next() { | ||
if (hasNext()) { | ||
if (iNotif.value().isPresent()) { | ||
T v = iNotif.value().get(); | ||
iNotif = null; | ||
return v; | ||
} | ||
} | ||
throw new NoSuchElementException(); | ||
} | ||
|
||
@Override | ||
public void remove() { | ||
throw new UnsupportedOperationException("Read-only iterator."); | ||
} | ||
|
||
} | ||
} |
124 changes: 124 additions & 0 deletions
124
src/main/java/io/reactivex/internal/operators/BlockingOperatorMostRecent.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,124 @@ | ||
/** | ||
* Copyright 2015 Netflix, Inc. | ||
* | ||
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in | ||
* compliance with the License. You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software distributed under the License is | ||
* distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See | ||
* the License for the specific language governing permissions and limitations under the License. | ||
*/ | ||
|
||
package io.reactivex.internal.operators; | ||
|
||
import java.util.*; | ||
|
||
import org.reactivestreams.Publisher; | ||
|
||
import io.reactivex.Observer; | ||
import io.reactivex.internal.util.*; | ||
|
||
/** | ||
* Returns an Iterable that always returns the item most recently emitted by an Observable, or a | ||
* seed value if no item has yet been emitted. | ||
* <p> | ||
* <img width="640" src="https://github.com/ReactiveX/RxJava/wiki/images/rx-operators/B.mostRecent.png" alt=""> | ||
*/ | ||
public enum BlockingOperatorMostRecent { | ||
; | ||
/** | ||
* Returns an {@code Iterable} that always returns the item most recently emitted by the {@code Observable}. | ||
* | ||
* @param source | ||
* the source {@code Observable} | ||
* @param initialValue | ||
* a default item to return from the {@code Iterable} if {@code source} has not yet emitted any | ||
* items | ||
* @return an {@code Iterable} that always returns the item most recently emitted by {@code source}, or | ||
* {@code initialValue} if {@code source} has not yet emitted any items | ||
*/ | ||
public static <T> Iterable<T> mostRecent(final Publisher<? extends T> source, final T initialValue) { | ||
return new Iterable<T>() { | ||
@Override | ||
public Iterator<T> iterator() { | ||
MostRecentObserver<T> mostRecentObserver = new MostRecentObserver<>(initialValue); | ||
|
||
/** | ||
* Subscribe instead of unsafeSubscribe since this is the final subscribe in the chain | ||
* since it is for BlockingObservable. | ||
*/ | ||
source.subscribe(mostRecentObserver); | ||
|
||
return mostRecentObserver.getIterable(); | ||
} | ||
}; | ||
} | ||
|
||
private static final class MostRecentObserver<T> extends Observer<T> { | ||
volatile Object value; | ||
|
||
private MostRecentObserver(T value) { | ||
this.value = NotificationLite.next(value); | ||
} | ||
|
||
@Override | ||
public void onComplete() { | ||
value = NotificationLite.complete(); | ||
} | ||
|
||
@Override | ||
public void onError(Throwable e) { | ||
value = NotificationLite.error(e); | ||
} | ||
|
||
@Override | ||
public void onNext(T args) { | ||
value = NotificationLite.next(args); | ||
} | ||
|
||
/** | ||
* The {@link Iterator} return is not thread safe. In other words don't call {@link Iterator#hasNext()} in one | ||
* thread expect {@link Iterator#next()} called from a different thread to work. | ||
* @return | ||
*/ | ||
public Iterator<T> getIterable() { | ||
return new Iterator<T>() { | ||
/** | ||
* buffer to make sure that the state of the iterator doesn't change between calling hasNext() and next(). | ||
*/ | ||
private Object buf = null; | ||
|
||
@Override | ||
public boolean hasNext() { | ||
buf = value; | ||
return !NotificationLite.isComplete(buf); | ||
} | ||
|
||
@Override | ||
public T next() { | ||
try { | ||
// if hasNext wasn't called before calling next. | ||
if (buf == null) | ||
buf = value; | ||
if (NotificationLite.isComplete(buf)) | ||
throw new NoSuchElementException(); | ||
if (NotificationLite.isError(buf)) { | ||
throw Exceptions.propagate(NotificationLite.getError(buf)); | ||
} | ||
return NotificationLite.getValue(buf); | ||
} | ||
finally { | ||
buf = null; | ||
} | ||
} | ||
|
||
@Override | ||
public void remove() { | ||
throw new UnsupportedOperationException("Read only iterator"); | ||
} | ||
}; | ||
} | ||
} | ||
} |
Oops, something went wrong.