Skip to content

Commit

Permalink
Operator buffer with boundary and open-close, fixes to the timed
Browse files Browse the repository at this point in the history
variants. 

Added QueueDrain and QueueDrainSubscriber for common queue-drain
operations. Not applied outside the buffer()s as of now.
  • Loading branch information
akarnokd committed Aug 30, 2015
1 parent 993acd7 commit 9f39481
Show file tree
Hide file tree
Showing 9 changed files with 1,054 additions and 133 deletions.
14 changes: 9 additions & 5 deletions src/main/java/io/reactivex/Observable.java
Original file line number Diff line number Diff line change
Expand Up @@ -1878,19 +1878,23 @@ public final <TOpening, TClosing, U extends Collection<? super T>> Observable<U>
Observable<? extends TOpening> bufferOpenings,
Function<? super TOpening, ? extends Publisher<? extends TClosing>> bufferClosingSelector,
Supplier<U> bufferSupplier) {
// TODO
throw new UnsupportedOperationException();
Objects.requireNonNull(bufferOpenings);
Objects.requireNonNull(bufferClosingSelector);
Objects.requireNonNull(bufferSupplier);
return lift(new OperatorBufferBoundary<>(bufferOpenings, bufferClosingSelector, bufferSupplier));
}

public final <B> Observable<List<T>> buffer(Observable<B> boundary) {
/*
* XXX: javac complains if this is not manually cast, Eclipse is fine
*/
return buffer(boundary, (Supplier<List<T>>)ArrayList::new);
}

public final <B, U extends Collection<? super T>> Observable<List<T>> buffer(Observable<B> boundary, Supplier<U> bufferSupplier) {
// TODO
throw new UnsupportedOperationException();
public final <B, U extends Collection<? super T>> Observable<U> buffer(Observable<B> boundary, Supplier<U> bufferSupplier) {
Objects.requireNonNull(boundary);
Objects.requireNonNull(bufferSupplier);
return lift(new OperatorBufferExactBoundary<>(boundary, bufferSupplier));
}

public final <B> Observable<List<T>> buffer(Observable<B> boundary, int initialCapacity) {
Expand Down
17 changes: 8 additions & 9 deletions src/main/java/io/reactivex/Try.java
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,8 @@ private Try(T value, Throwable error) {
/**
* Constructs a Try instance by wrapping the given value.
*
* @param value
* @return
* @param value the value to wrap
* @return the created Try instance
*/
public static <T> Try<T> ofValue(T value) {
// TODO ? Objects.requireNonNull(value);
Expand All @@ -47,17 +47,16 @@ public static <T> Try<T> ofValue(T value) {
*
* <p>Null Throwables are replaced by NullPointerException instance in this Try.
*
* @param e
* @return
* @param e the exception to wrap
* @return the new Try instance holding the exception
*/
public static <T> Try<T> ofError(Throwable e) {
// TODO ? Objects.requireNonNull(e);
return new Try<>(null, e != null ? e : new NullPointerException());
}

/**
* Returns the value or null if the value is actually null or if this Try holds an error instead.
* @return
* @return the value contained
* @see #hasValue()
*/
public T value() {
Expand All @@ -67,7 +66,7 @@ public T value() {
/**
* Returns the error or null if this Try holds a value instead.
*
* @return
* @return the Throwable contained or null
*
*/
public Throwable error() {
Expand All @@ -76,15 +75,15 @@ public Throwable error() {

/**
* Returns true if this Try holds an error.
* @return
* @return true if this Try holds an error
*/
public boolean hasError() {
return error != null;
}

/**
* Returns true if this Try holds a value.
* @return
* @return true if this Try holds a value
*/
public boolean hasValue() {
return error == null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ public boolean add(T newResource) {
* Removes the given resource from this composite and calls the disposer if the resource
* was indeed in the composite.
* @param resource the resource to remove, not-null (not verified)
* @return
* @return true if the resource was removed, false otherwise
*/
@Override
public boolean remove(T resource) {
Expand Down
Loading

0 comments on commit 9f39481

Please sign in to comment.