Skip to content

Commit

Permalink
Merge pull request #403 from zsxwing/cast
Browse files Browse the repository at this point in the history
Implemented the 'cast' and 'ofType' operators
  • Loading branch information
benjchristensen committed Sep 25, 2013
2 parents 440aa4d + da870a5 commit 6670080
Show file tree
Hide file tree
Showing 3 changed files with 140 additions and 0 deletions.
39 changes: 39 additions & 0 deletions rxjava-core/src/main/java/rx/Observable.java
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import rx.operators.OperationAverage;
import rx.operators.OperationBuffer;
import rx.operators.OperationCache;
import rx.operators.OperationCast;
import rx.operators.OperationCombineLatest;
import rx.operators.OperationConcat;
import rx.operators.OperationDebounce;
Expand Down Expand Up @@ -4369,6 +4370,44 @@ public BlockingObservable<T> toBlockingObservable() {
return BlockingObservable.from(this);
}

/**
* Converts the elements of an observable sequence to the specified type.
*
* @param klass
* The target class type which the elements will be converted to.
*
* @return An observable sequence that contains each element of the source
* sequence converted to the specified type.
*
* @see <a
* href="http://msdn.microsoft.com/en-us/library/hh211842(v=vs.103).aspx">MSDN:
* Observable.Cast</a>
*/
public <R> Observable<R> cast(final Class<R> klass) {
return create(OperationCast.cast(this, klass));
}

/**
* Filters the elements of an observable sequence based on the specified
* type.
*
* @param klass
* The class type to filter the elements in the source sequence
* on.
*
* @return An observable sequence that contains elements from the input
* sequence of type klass.
*
* @see <a href="http://msdn.microsoft.com/en-us/library/hh229380(v=vs.103).aspx">MSDN: Observable.OfType</a>
*/
public <R> Observable<R> ofType(final Class<R> klass) {
return filter(new Func1<T, Boolean>() {
public Boolean call(T t) {
return klass.isInstance(t);
}
}).cast(klass);
}

/**
* Whether a given {@link Function} is an internal implementation inside rx.* packages or not.
* <p>
Expand Down
61 changes: 61 additions & 0 deletions rxjava-core/src/main/java/rx/operators/OperationCast.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
package rx.operators;

import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;

import org.junit.Test;

import rx.Observable;
import rx.Observable.OnSubscribeFunc;
import rx.Observer;
import rx.util.functions.Func1;

/**
* Converts the elements of an observable sequence to the specified type.
*/
public class OperationCast {

public static <T, R> OnSubscribeFunc<R> cast(
Observable<? extends T> source, final Class<R> klass) {
return OperationMap.map(source, new Func1<T, R>() {
public R call(T t) {
return klass.cast(t);
}
});
}

public static class UnitTest {

@Test
public void testCast() {
Observable<?> source = Observable.from(1, 2);
Observable<Integer> observable = Observable.create(cast(source,
Integer.class));

@SuppressWarnings("unchecked")
Observer<Integer> aObserver = mock(Observer.class);
observable.subscribe(aObserver);
verify(aObserver, times(1)).onNext(1);
verify(aObserver, times(1)).onNext(1);
verify(aObserver, never()).onError(
org.mockito.Matchers.any(Throwable.class));
verify(aObserver, times(1)).onCompleted();
}

@Test
public void testCastWithWrongType() {
Observable<?> source = Observable.from(1, 2);
Observable<Boolean> observable = Observable.create(cast(source,
Boolean.class));

@SuppressWarnings("unchecked")
Observer<Boolean> aObserver = mock(Observer.class);
observable.subscribe(aObserver);
verify(aObserver, times(1)).onError(
org.mockito.Matchers.any(ClassCastException.class));
}
}

}
40 changes: 40 additions & 0 deletions rxjava-core/src/test/java/rx/ObservableTests.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@
import static org.mockito.Mockito.*;

import java.util.ArrayList;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
Expand Down Expand Up @@ -701,4 +703,42 @@ public void onNext(String v) {
fail("It should be a NumberFormatException");
}
}

@Test
public void testOfType() {
Observable<String> observable = Observable.from(1, "abc", false, 2L).ofType(String.class);

@SuppressWarnings("unchecked")
Observer<Object> aObserver = mock(Observer.class);
observable.subscribe(aObserver);
verify(aObserver, never()).onNext(1);
verify(aObserver, times(1)).onNext("abc");
verify(aObserver, never()).onNext(false);
verify(aObserver, never()).onNext(2L);
verify(aObserver, never()).onError(
org.mockito.Matchers.any(Throwable.class));
verify(aObserver, times(1)).onCompleted();
}

@Test
public void testOfTypeWithPolymorphism() {
ArrayList<Integer> l1 = new ArrayList<Integer>();
l1.add(1);
LinkedList<Integer> l2 = new LinkedList<Integer>();
l2.add(2);

@SuppressWarnings("rawtypes")
Observable<List> observable = Observable.<Object>from(l1, l2, "123").ofType(List.class);

@SuppressWarnings("unchecked")
Observer<Object> aObserver = mock(Observer.class);
observable.subscribe(aObserver);
verify(aObserver, times(1)).onNext(l1);
verify(aObserver, times(1)).onNext(l2);
verify(aObserver, never()).onNext("123");
verify(aObserver, never()).onError(
org.mockito.Matchers.any(Throwable.class));
verify(aObserver, times(1)).onCompleted();
}

}

0 comments on commit 6670080

Please sign in to comment.