diff --git a/rxjava-core/src/main/java/rx/Observable.java b/rxjava-core/src/main/java/rx/Observable.java index c0ecf6226f..872e3deb16 100644 --- a/rxjava-core/src/main/java/rx/Observable.java +++ b/rxjava-core/src/main/java/rx/Observable.java @@ -3031,11 +3031,29 @@ public Observable where(Func1 predicate) { * a function to apply to each item emitted by the Observable * @return an Observable that emits the items from the source Observable, transformed by the * given function + * @see MSDN: Observable.Select */ public Observable map(Func1 func) { return create(OperationMap.map(this, func)); } + /** + * Returns an Observable that applies the given function to each item emitted by an + * Observable and emits the result. + *

+ * + * + * @param func + * a function to apply to each item emitted by the Observable. The function takes the + * index of the emitted item as additional parameter. + * @return an Observable that emits the items from the source Observable, transformed by the + * given function + * @see MSDN: Observable.Select + */ + public Observable mapWithIndex(Func2 func) { + return create(OperationMap.mapWithIndex(this, func)); + } + /** * Creates a new Observable by applying a function that you supply to each item emitted by * the source Observable, where that function returns an Observable, and then merging those diff --git a/rxjava-core/src/main/java/rx/operators/OperationMap.java b/rxjava-core/src/main/java/rx/operators/OperationMap.java index fa7fca2758..9eb2520420 100644 --- a/rxjava-core/src/main/java/rx/operators/OperationMap.java +++ b/rxjava-core/src/main/java/rx/operators/OperationMap.java @@ -25,6 +25,7 @@ import org.junit.Before; import org.junit.Test; +import org.mockito.InOrder; import org.mockito.Mock; import org.mockito.MockitoAnnotations; @@ -33,6 +34,7 @@ import rx.Observer; import rx.Subscription; import rx.util.functions.Func1; +import rx.util.functions.Func2; /** * Applies a function of your choosing to every item emitted by an Observable, and returns this @@ -56,8 +58,42 @@ public final class OperationMap { * the type of the output sequence. * @return a sequence that is the result of applying the transformation function to each item in the input sequence. */ - public static OnSubscribeFunc map(Observable sequence, Func1 func) { - return new MapObservable(sequence, func); + public static OnSubscribeFunc map(final Observable sequence, final Func1 func) { + return new OnSubscribeFunc() { + @Override + public Subscription onSubscribe(Observer observer) { + return new MapObservable(sequence, new Func2() { + @Override + public R call(T value, @SuppressWarnings("unused") Integer unused) { + return func.call(value); + } + }).onSubscribe(observer); + } + }; + } + + /** + * Accepts a sequence and a transformation function. Returns a sequence that is the result of + * applying the transformation function to each item in the sequence. + * + * @param sequence + * the input sequence. + * @param func + * a function to apply to each item in the sequence. The function gets the index of the emitted item + * as additional parameter. + * @param + * the type of the input sequence. + * @param + * the type of the output sequence. + * @return a sequence that is the result of applying the transformation function to each item in the input sequence. + */ + public static OnSubscribeFunc mapWithIndex(final Observable sequence, final Func2 func) { + return new OnSubscribeFunc() { + @Override + public Subscription onSubscribe(Observer observer) { + return new MapObservable(sequence, func).onSubscribe(observer); + } + }; } /** @@ -89,56 +125,50 @@ public static OnSubscribeFunc mapMany(Observable sequence * the type of the output sequence. */ private static class MapObservable implements OnSubscribeFunc { - public MapObservable(Observable sequence, Func1 func) { + public MapObservable(Observable sequence, Func2 func) { this.sequence = sequence; this.func = func; } - private Observable sequence; - - private Func1 func; - - public Subscription onSubscribe(Observer observer) { - return sequence.subscribe(new MapObserver(observer, func)); - } - } - - /** - * An observer that applies a transformation function to each item and forwards the result to an inner observer. - * - * @param - * the type of the observer items. - * @param - * the type of the inner observer items. - */ - private static class MapObserver implements Observer { - public MapObserver(Observer observer, Func1 func) { - this.observer = observer; - this.func = func; - } - - Observer observer; - - Func1 func; + private final Observable sequence; + private final Func2 func; + private int index; - public void onNext(T value) { - // let the exception be thrown if func fails as a SafeObserver wrapping this will handle it - observer.onNext(func.call(value)); - } + @Override + public Subscription onSubscribe(final Observer observer) { + return sequence.subscribe(new Observer() { + @Override + public void onNext(T value) { + observer.onNext(func.call(value, index)); + index++; + } - public void onError(Throwable ex) { - observer.onError(ex); - } + @Override + public void onError(Throwable ex) { + observer.onError(ex); + } - public void onCompleted() { - observer.onCompleted(); + @Override + public void onCompleted() { + observer.onCompleted(); + } + }); } } public static class UnitTest { @Mock Observer stringObserver; - + @Mock + Observer stringObserver2; + + final static Func2 APPEND_INDEX = new Func2() { + @Override + public String call(String value, Integer index) { + return value + index; + } + }; + @Before public void before() { MockitoAnnotations.initMocks(this); @@ -164,9 +194,42 @@ public String call(Map map) { verify(stringObserver, times(1)).onNext("OneFirst"); verify(stringObserver, times(1)).onNext("TwoFirst"); verify(stringObserver, times(1)).onCompleted(); + } + @Test + public void testMapWithIndex() { + Observable w = Observable.from("a", "b", "c"); + Observable m = Observable.create(mapWithIndex(w, APPEND_INDEX)); + m.subscribe(stringObserver); + InOrder inOrder = inOrder(stringObserver); + inOrder.verify(stringObserver, times(1)).onNext("a0"); + inOrder.verify(stringObserver, times(1)).onNext("b1"); + inOrder.verify(stringObserver, times(1)).onNext("c2"); + inOrder.verify(stringObserver, times(1)).onCompleted(); + verify(stringObserver, never()).onError(any(Throwable.class)); } + + @Test + public void testMapWithIndexAndMultipleSubscribers() { + Observable w = Observable.from("a", "b", "c"); + Observable m = Observable.create(mapWithIndex(w, APPEND_INDEX)); + m.subscribe(stringObserver); + m.subscribe(stringObserver2); + InOrder inOrder = inOrder(stringObserver); + inOrder.verify(stringObserver, times(1)).onNext("a0"); + inOrder.verify(stringObserver, times(1)).onNext("b1"); + inOrder.verify(stringObserver, times(1)).onNext("c2"); + inOrder.verify(stringObserver, times(1)).onCompleted(); + verify(stringObserver, never()).onError(any(Throwable.class)); + InOrder inOrder2 = inOrder(stringObserver2); + inOrder2.verify(stringObserver2, times(1)).onNext("a0"); + inOrder2.verify(stringObserver2, times(1)).onNext("b1"); + inOrder2.verify(stringObserver2, times(1)).onNext("c2"); + inOrder2.verify(stringObserver2, times(1)).onCompleted(); + verify(stringObserver2, never()).onError(any(Throwable.class)); + } + @Test public void testMapMany() { /* simulate a top-level async call which returns IDs */ @@ -246,12 +309,34 @@ public String call(Map map) { } + @Test + public void testMapWithError() { + Observable w = Observable.from("one", "fail", "two", "three", "fail"); + Observable m = Observable.create(map(w, new Func1() { + @Override + public String call(String s) { + if ("fail".equals(s)) { + throw new RuntimeException("Forced Failure"); + } + return s; + } + })); + + m.subscribe(stringObserver); + verify(stringObserver, times(1)).onNext("one"); + verify(stringObserver, never()).onNext("two"); + verify(stringObserver, never()).onNext("three"); + verify(stringObserver, never()).onCompleted(); + verify(stringObserver, times(1)).onError(any(Throwable.class)); + } + @Test public void testMapWithSynchronousObservableContainingError() { Observable w = Observable.from("one", "fail", "two", "three", "fail"); final AtomicInteger c1 = new AtomicInteger(); final AtomicInteger c2 = new AtomicInteger(); Observable m = Observable.create(map(w, new Func1() { + @Override public String call(String s) { if ("fail".equals(s)) throw new RuntimeException("Forced Failure"); @@ -260,6 +345,7 @@ public String call(String s) { return s; } })).map(new Func1() { + @Override public String call(String s) { System.out.println("SecondMapper:" + s); c2.incrementAndGet(); @@ -280,7 +366,7 @@ public String call(String s) { assertEquals(1, c2.get()); } - private Map getMap(String prefix) { + private static Map getMap(String prefix) { Map m = new HashMap(); m.put("firstName", prefix + "First"); m.put("lastName", prefix + "Last");