Skip to content

Commit

Permalink
Merge pull request #733 from akarnokd/BufferWithObservableBoundary
Browse files Browse the repository at this point in the history
Buffer with Observable boundary.
  • Loading branch information
benjchristensen committed Jan 14, 2014
2 parents dadf17b + 93c9fcc commit 8a29a64
Show file tree
Hide file tree
Showing 3 changed files with 296 additions and 2 deletions.
29 changes: 29 additions & 0 deletions rxjava-core/src/main/java/rx/Observable.java
Original file line number Diff line number Diff line change
Expand Up @@ -3200,6 +3200,35 @@ public static <T1, T2, T3, T4, T5, T6, T7, T8, T9, R> Observable<R> combineLates
return create(OperationCombineLatest.combineLatest(o1, o2, o3, o4, o5, o6, o7, o8, o9, combineFunction));
}

/**
* Create an Observable that emits non-overlapping buffered items once the boundary observable emits an item.
* <p>
* Completion of either this or the boundary observable causes the returned observable
* to emit the latest buffer and complete.
* @param <B> the boundary value type (ignored)
* @param boundary the boundary observable
* @return an Observable that emits buffered items once the boundary observable emits an item.
* @see #buffer(rx.Observable, int)
*/
public <B> Observable<List<T>> buffer(Observable<B> boundary) {
return create(OperationBuffer.bufferWithBoundaryObservable(this, boundary));
}

/**
* Create an Observable that emits non-overlapping buffered items once the boundary observable emits an item.
* <p>
* Completion of either this or the boundary observable causes the returned observable
* to emit the latest buffer and complete.
* @param <B> the boundary value type (ignored)
* @param boundary the boundary observable
* @param initialCapacity the initial capacity of each buffer chunk
* @return an Observable that emits buffered items once the boundary observable emits an item.
* @see #buffer(rx.Observable, int)
*/
public <B> Observable<List<T>> buffer(Observable<B> boundary, int initialCapacity) {
return create(OperationBuffer.bufferWithBoundaryObservable(this, boundary, initialCapacity));
}

/**
* Creates an Observable that emits buffers of items it collects from the
* source Observable. The resulting Observable emits connected,
Expand Down
136 changes: 136 additions & 0 deletions rxjava-core/src/main/java/rx/operators/OperationBuffer.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
*/
package rx.operators;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
Expand Down Expand Up @@ -397,4 +398,139 @@ public void unsubscribe() {
}
}
}

/**
* Create a buffer operator with the given observable sequence as the buffer boundary.
*/
public static <T, B> OnSubscribeFunc<List<T>> bufferWithBoundaryObservable(Observable<? extends T> source, Observable<B> boundary) {
return new BufferWithObservableBoundary<T, B>(source, boundary, 16);
}
/**
* Create a buffer operator with the given observable sequence as the buffer boundary and
* with the given initial capacity for buffers.
*/
public static <T, B> OnSubscribeFunc<List<T>> bufferWithBoundaryObservable(Observable<? extends T> source, Observable<B> boundary, int initialCapacity) {
if (initialCapacity <= 0) {
throw new IllegalArgumentException("initialCapacity > 0 required");
}
return new BufferWithObservableBoundary<T, B>(source, boundary, initialCapacity);
}

/**
* Buffer until an element is emitted from a helper observable.
* @param <T> the buffered value type
*/
private static final class BufferWithObservableBoundary<T, B> implements OnSubscribeFunc<List<T>> {
final Observable<? extends T> source;
final Observable<B> boundary;
final int initialCapacity;

public BufferWithObservableBoundary(Observable<? extends T> source, Observable<B> boundary, int initialCapacity) {
this.source = source;
this.boundary = boundary;
this.initialCapacity = initialCapacity;
}

@Override
public Subscription onSubscribe(Observer<? super List<T>> t1) {
CompositeSubscription csub = new CompositeSubscription();

SourceObserver<T> so = new SourceObserver<T>(t1, initialCapacity, csub);
csub.add(source.subscribe(so));
csub.add(boundary.subscribe(new BoundaryObserver<B>(so)));

return csub;
}
/**
* Observes the source.
*/
private static final class SourceObserver<T> implements Observer<T> {
final Observer<? super List<T>> observer;
/** The buffer, if null, that indicates a terminal state. */
List<T> buffer;
final int initialCapacity;
final Object guard;
final Subscription cancel;
public SourceObserver(Observer<? super List<T>> observer, int initialCapacity, Subscription cancel) {
this.observer = observer;
this.initialCapacity = initialCapacity;
this.guard = new Object();
this.cancel = cancel;
buffer = new ArrayList<T>(initialCapacity);
}

@Override
public void onNext(T args) {
synchronized (guard) {
buffer.add(args);
}
}

@Override
public void onError(Throwable e) {
synchronized (guard) {
if (buffer == null) {
return;
}
buffer = null;
}
observer.onError(e);
cancel.unsubscribe();
}

@Override
public void onCompleted() {
emitAndComplete();
cancel.unsubscribe();
}
void emitAndReplace() {
List<T> buf;
synchronized (guard) {
if (buffer == null) {
return;
}
buf = buffer;
buffer = new ArrayList<T>(initialCapacity);
}
observer.onNext(buf);
}
void emitAndComplete() {
List<T> buf;
synchronized (guard) {
if (buffer == null) {
return;
}
buf = buffer;
buffer = null;
}
observer.onNext(buf);
observer.onCompleted();
}
}
/**
* Observes the boundary.
*/
private static final class BoundaryObserver<T> implements Observer<T> {
final SourceObserver so;

public BoundaryObserver(SourceObserver so) {
this.so = so;
}

@Override
public void onNext(T args) {
so.emitAndReplace();
}

@Override
public void onError(Throwable e) {
so.onError(e);
}

@Override
public void onCompleted() {
so.onCompleted();
}
}
}
}
133 changes: 131 additions & 2 deletions rxjava-core/src/test/java/rx/operators/OperationBufferTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -28,13 +28,13 @@
import org.junit.Test;
import org.mockito.InOrder;
import org.mockito.Mockito;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.*;

import rx.Observable;
import rx.Observer;
import rx.Subscription;
import rx.schedulers.TestScheduler;
import rx.subjects.PublishSubject;
import rx.subscriptions.Subscriptions;
import rx.util.functions.Action0;
import rx.util.functions.Action1;
Expand Down Expand Up @@ -383,4 +383,133 @@ public void testBufferStopsWhenUnsubscribed1() {

inOrder.verifyNoMoreInteractions();
}

@Test
public void bufferWithBONormal1() {
PublishSubject<Integer> source = PublishSubject.create();
PublishSubject<Integer> boundary = PublishSubject.create();

@SuppressWarnings("unchecked")
Observer<Object> o = mock(Observer.class);
InOrder inOrder = Mockito.inOrder(o);

source.buffer(boundary).subscribe(o);

source.onNext(1);
source.onNext(2);
source.onNext(3);

boundary.onNext(1);

inOrder.verify(o, times(1)).onNext(Arrays.asList(1, 2, 3));

source.onNext(4);
source.onNext(5);

boundary.onNext(2);

inOrder.verify(o, times(1)).onNext(Arrays.asList(4, 5));

source.onNext(6);
boundary.onCompleted();

inOrder.verify(o, times(1)).onNext(Arrays.asList(6));

inOrder.verify(o).onCompleted();

verify(o, never()).onError(any(Throwable.class));
}
@Test
public void bufferWithBOEmptyLastViaBoundary() {
PublishSubject<Integer> source = PublishSubject.create();
PublishSubject<Integer> boundary = PublishSubject.create();

@SuppressWarnings("unchecked")
Observer<Object> o = mock(Observer.class);
InOrder inOrder = Mockito.inOrder(o);

source.buffer(boundary).subscribe(o);

boundary.onCompleted();

inOrder.verify(o, times(1)).onNext(Arrays.asList());

inOrder.verify(o).onCompleted();

verify(o, never()).onError(any(Throwable.class));
}
@Test
public void bufferWithBOEmptyLastViaSource() {
PublishSubject<Integer> source = PublishSubject.create();
PublishSubject<Integer> boundary = PublishSubject.create();

@SuppressWarnings("unchecked")
Observer<Object> o = mock(Observer.class);
InOrder inOrder = Mockito.inOrder(o);

source.buffer(boundary).subscribe(o);

source.onCompleted();

inOrder.verify(o, times(1)).onNext(Arrays.asList());

inOrder.verify(o).onCompleted();

verify(o, never()).onError(any(Throwable.class));
}
@Test
public void bufferWithBOEmptyLastViaBoth() {
PublishSubject<Integer> source = PublishSubject.create();
PublishSubject<Integer> boundary = PublishSubject.create();

@SuppressWarnings("unchecked")
Observer<Object> o = mock(Observer.class);
InOrder inOrder = Mockito.inOrder(o);

source.buffer(boundary).subscribe(o);

source.onCompleted();
boundary.onCompleted();

inOrder.verify(o, times(1)).onNext(Arrays.asList());

inOrder.verify(o).onCompleted();

verify(o, never()).onError(any(Throwable.class));
}

@Test
public void bufferWithBOSourceThrows() {
PublishSubject<Integer> source = PublishSubject.create();
PublishSubject<Integer> boundary = PublishSubject.create();

@SuppressWarnings("unchecked")
Observer<Object> o = mock(Observer.class);

source.buffer(boundary).subscribe(o);
source.onNext(1);
source.onError(new OperationReduceTest.CustomException());

verify(o).onError(any(OperationReduceTest.CustomException.class));
verify(o, never()).onCompleted();
verify(o, never()).onNext(any());
}

@Test
public void bufferWithBOBoundaryThrows() {
PublishSubject<Integer> source = PublishSubject.create();
PublishSubject<Integer> boundary = PublishSubject.create();

@SuppressWarnings("unchecked")
Observer<Object> o = mock(Observer.class);

source.buffer(boundary).subscribe(o);

source.onNext(1);
boundary.onError(new OperationReduceTest.CustomException());

verify(o).onError(any(OperationReduceTest.CustomException.class));
verify(o, never()).onCompleted();
verify(o, never()).onNext(any());
}
}

0 comments on commit 8a29a64

Please sign in to comment.