diff --git a/rxjava-contrib/rxjava-string/src/main/java/rx/observables/StringObservable.java b/rxjava-contrib/rxjava-string/src/main/java/rx/observables/StringObservable.java index 3bdb58a877..8a22ef8cb7 100644 --- a/rxjava-contrib/rxjava-string/src/main/java/rx/observables/StringObservable.java +++ b/rxjava-contrib/rxjava-string/src/main/java/rx/observables/StringObservable.java @@ -197,6 +197,7 @@ public byte[] call(String str) { */ public static Observable stringConcat(Observable src) { return src.aggregate(new Func2() { + @Override public String call(String a, String b) { return a + b; } @@ -267,4 +268,58 @@ private void output(String part) { } }); } + /** + * Concatenates the sequence of values by adding a separator + * between them and emitting the result once the source completes. + *

+ * The conversion from the value type to String is performed via + * {@link java.lang.String#valueOf(java.lang.Object)} calls. + *

+ * For example: + *

+     * Observable<Object> source = Observable.from("a", 1, "c");
+     * Observable<String> result = join(source, ", ");
+     * 
+ * + * will yield a single element equal to "a, 1, c". + * + * @param source the source sequence of CharSequence values + * @param separator the separator to a + * @return an Observable which emits a single String value having the concatenated + * values of the source observable with the separator between elements + */ + public static Observable join(final Observable source, final CharSequence separator) { + return Observable.create(new OnSubscribeFunc() { + + @Override + public Subscription onSubscribe(final Observer t1) { + return source.subscribe(new Observer() { + boolean mayAddSeparator; + StringBuilder b = new StringBuilder(); + @Override + public void onNext(T args) { + if (mayAddSeparator) { + b.append(separator); + } + mayAddSeparator = true; + b.append(String.valueOf(args)); + } + + @Override + public void onError(Throwable e) { + b = null; + t1.onError(e); + } + + @Override + public void onCompleted() { + String str = b.toString(); + b = null; + t1.onNext(str); + t1.onCompleted(); + } + }); + } + }); + } } diff --git a/rxjava-contrib/rxjava-string/src/test/java/rx/observables/StringObservableTest.java b/rxjava-contrib/rxjava-string/src/test/java/rx/observables/StringObservableTest.java index 86a686958c..6b135894f9 100644 --- a/rxjava-contrib/rxjava-string/src/test/java/rx/observables/StringObservableTest.java +++ b/rxjava-contrib/rxjava-string/src/test/java/rx/observables/StringObservableTest.java @@ -23,10 +23,10 @@ import java.nio.charset.MalformedInputException; import org.junit.Test; +import static org.mockito.Mockito.*; import rx.Observable; -import rx.observables.BlockingObservable; -import rx.observables.StringObservable; +import rx.Observer; import rx.util.AssertObservable; public class StringObservableTest { @@ -127,4 +127,89 @@ public void testSplit(String message, String regex, int limit, Observable exp = Observable.from(parts); AssertObservable.assertObservableEqualsBlocking("when input is "+message+" and limit = "+ limit, exp, act); } + + @Test + public void testJoinMixed() { + Observable source = Observable.from("a", 1, "c"); + + Observable result = StringObservable.join(source, ", "); + + Observer observer = mock(Observer.class); + + result.subscribe(observer); + + verify(observer, times(1)).onNext("a, 1, c"); + verify(observer, times(1)).onCompleted(); + verify(observer, never()).onError(any(Throwable.class)); + } + @Test + public void testJoinWithEmptyString() { + Observable source = Observable.from("", "b", "c"); + + Observable result = StringObservable.join(source, ", "); + + Observer observer = mock(Observer.class); + + result.subscribe(observer); + + verify(observer, times(1)).onNext(", b, c"); + verify(observer, times(1)).onCompleted(); + verify(observer, never()).onError(any(Throwable.class)); + } + @Test + public void testJoinWithNull() { + Observable source = Observable.from("a", null, "c"); + + Observable result = StringObservable.join(source, ", "); + + Observer observer = mock(Observer.class); + + result.subscribe(observer); + + verify(observer, times(1)).onNext("a, null, c"); + verify(observer, times(1)).onCompleted(); + verify(observer, never()).onError(any(Throwable.class)); + } + @Test + public void testJoinSingle() { + Observable source = Observable.from("a"); + + Observable result = StringObservable.join(source, ", "); + + Observer observer = mock(Observer.class); + + result.subscribe(observer); + + verify(observer, times(1)).onNext("a"); + verify(observer, times(1)).onCompleted(); + verify(observer, never()).onError(any(Throwable.class)); + } + @Test + public void testJoinEmpty() { + Observable source = Observable.empty(); + + Observable result = StringObservable.join(source, ", "); + + Observer observer = mock(Observer.class); + + result.subscribe(observer); + + verify(observer, times(1)).onNext(""); + verify(observer, times(1)).onCompleted(); + verify(observer, never()).onError(any(Throwable.class)); + } + @Test + public void testJoinThrows() { + Observable source = Observable.concat(Observable.just("a"), Observable.error(new RuntimeException("Forced failure"))); + + Observable result = StringObservable.join(source, ", "); + + Observer observer = mock(Observer.class); + + result.subscribe(observer); + + verify(observer, never()).onNext("a"); + verify(observer, never()).onCompleted(); + verify(observer, times(1)).onError(any(Throwable.class)); + } }