Skip to content

Commit

Permalink
Zip NULL and COMPLETE Sentinels
Browse files Browse the repository at this point in the history
  • Loading branch information
benjchristensen committed Feb 5, 2014
1 parent cf28bce commit 3d5474f
Show file tree
Hide file tree
Showing 2 changed files with 76 additions and 10 deletions.
24 changes: 16 additions & 8 deletions rxjava-core/src/main/java/rx/operators/OperatorZip.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicLong;

import rx.Notification;
import rx.Observable;
import rx.Observer;
import rx.Subscriber;
Expand Down Expand Up @@ -131,6 +130,9 @@ private static class Zip<R> {
final FuncN<? extends R> zipFunction;
final CompositeSubscription childSubscription = new CompositeSubscription();

static Object NULL_SENTINEL = new Object();
static Object COMPLETE_SENTINEL = new Object();

@SuppressWarnings("rawtypes")
public Zip(Observable[] os, final Subscriber<? super R> observer, FuncN<? extends R> zipFunction) {
this.os = os;
Expand Down Expand Up @@ -170,13 +172,16 @@ void tick() {
boolean allHaveValues = true;
for (int i = 0; i < observers.length; i++) {
vs[i] = ((InnerObserver) observers[i]).items.peek();
if (vs[i] instanceof Notification) {
if (vs[i] == NULL_SENTINEL) {
// special handling for null
vs[i] = null;
} else if (vs[i] == COMPLETE_SENTINEL) {
// special handling for onComplete
observer.onCompleted();
// we need to unsubscribe from all children since children are independently subscribed
childSubscription.unsubscribe();
return;
}
if (vs[i] == null) {
} else if (vs[i] == null) {
allHaveValues = false;
// we continue as there may be an onCompleted on one of the others
continue;
Expand All @@ -189,7 +194,7 @@ void tick() {
for (int i = 0; i < observers.length; i++) {
((InnerObserver) observers[i]).items.poll();
// eagerly check if the next item on this queue is an onComplete
if (((InnerObserver) observers[i]).items.peek() instanceof Notification) {
if (((InnerObserver) observers[i]).items.peek() == COMPLETE_SENTINEL) {
// it is an onComplete so shut down
observer.onCompleted();
// we need to unsubscribe from all children since children are independently subscribed
Expand All @@ -213,7 +218,7 @@ final class InnerObserver extends Subscriber {
@SuppressWarnings("unchecked")
@Override
public void onCompleted() {
items.add(Notification.createOnCompleted());
items.add(COMPLETE_SENTINEL);
tick();
}

Expand All @@ -226,8 +231,11 @@ public void onError(Throwable e) {
@SuppressWarnings("unchecked")
@Override
public void onNext(Object t) {
// TODO use a placeholder for NULL, such as Notification<T>(null)
items.add(t);
if (t == null) {
items.add(NULL_SENTINEL);
} else {
items.add(t);
}
tick();
}
};
Expand Down
62 changes: 60 additions & 2 deletions rxjava-core/src/test/java/rx/operators/OperatorZipTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
Expand All @@ -32,12 +31,12 @@
import org.junit.Test;
import org.mockito.InOrder;

import rx.Notification;
import rx.Observable;
import rx.Observable.OnSubscribe;
import rx.Observer;
import rx.Subscriber;
import rx.Subscription;
import rx.operators.OperationReduceTest.CustomException;
import rx.subjects.PublishSubject;
import rx.subscriptions.Subscriptions;
import rx.util.functions.Action1;
Expand Down Expand Up @@ -928,6 +927,65 @@ public void onNext(String s) {
assertEquals("5-5", list.get(4));
}

@Test
public void testEmitNull() {
Observable<Integer> oi = Observable.from(1, null, 3);
Observable<String> os = Observable.from("a", "b", null);
Observable<String> o = Observable.zip(oi, os, new Func2<Integer, String, String>() {

@Override
public String call(Integer t1, String t2) {
return t1 + "-" + t2;
}

});

final ArrayList<String> list = new ArrayList<String>();
o.subscribe(new Action1<String>() {

@Override
public void call(String s) {
System.out.println(s);
list.add(s);
}
});

assertEquals(3, list.size());
assertEquals("1-a", list.get(0));
assertEquals("null-b", list.get(1));
assertEquals("3-null", list.get(2));
}

@Test
public void testEmitMaterializedNotifications() {
Observable<Notification<Integer>> oi = Observable.from(1, 2, 3).materialize();
Observable<Notification<String>> os = Observable.from("a", "b", "c").materialize();
Observable<String> o = Observable.zip(oi, os, new Func2<Notification<Integer>, Notification<String>, String>() {

@Override
public String call(Notification<Integer> t1, Notification<String> t2) {
return t1.getKind() + "_" + t1.getValue() + "-" + t2.getKind() + "_" + t2.getValue();
}

});

final ArrayList<String> list = new ArrayList<String>();
o.subscribe(new Action1<String>() {

@Override
public void call(String s) {
System.out.println(s);
list.add(s);
}
});

assertEquals(4, list.size());
assertEquals("OnNext_1-OnNext_a", list.get(0));
assertEquals("OnNext_2-OnNext_b", list.get(1));
assertEquals("OnNext_3-OnNext_c", list.get(2));
assertEquals("OnCompleted_null-OnCompleted_null", list.get(3));
}

Observable<Integer> OBSERVABLE_OF_5_INTEGERS = OBSERVABLE_OF_5_INTEGERS(new AtomicInteger());

Observable<Integer> OBSERVABLE_OF_5_INTEGERS(final AtomicInteger numEmitted) {
Expand Down

0 comments on commit 3d5474f

Please sign in to comment.