Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added op:join to concat objects with separator between elements. #604

Merged
merged 1 commit into from
Dec 12, 2013
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -197,6 +197,7 @@ public byte[] call(String str) {
*/
public static Observable<String> stringConcat(Observable<String> src) {
return src.aggregate(new Func2<String, String, String>() {
@Override
public String call(String a, String b) {
return a + b;
}
Expand Down Expand Up @@ -267,4 +268,58 @@ private void output(String part) {
}
});
}
/**
* Concatenates the sequence of values by adding a separator
* between them and emitting the result once the source completes.
* <p>
* The conversion from the value type to String is performed via
* {@link java.lang.String#valueOf(java.lang.Object)} calls.
* <p>
* For example:
* <pre>
* Observable&lt;Object> source = Observable.from("a", 1, "c");
* Observable&lt;String> result = join(source, ", ");
* </pre>
*
* will yield a single element equal to "a, 1, c".
*
* @param source the source sequence of CharSequence values
* @param separator the separator to a
* @return an Observable which emits a single String value having the concatenated
* values of the source observable with the separator between elements
*/
public static <T> Observable<String> join(final Observable<T> source, final CharSequence separator) {
return Observable.create(new OnSubscribeFunc<String>() {

@Override
public Subscription onSubscribe(final Observer<? super String> t1) {
return source.subscribe(new Observer<T>() {
boolean mayAddSeparator;
StringBuilder b = new StringBuilder();
@Override
public void onNext(T args) {
if (mayAddSeparator) {
b.append(separator);
}
mayAddSeparator = true;
b.append(String.valueOf(args));
}

@Override
public void onError(Throwable e) {
b = null;
t1.onError(e);
}

@Override
public void onCompleted() {
String str = b.toString();
b = null;
t1.onNext(str);
t1.onCompleted();
}
});
}
});
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,10 @@
import java.nio.charset.MalformedInputException;

import org.junit.Test;
import static org.mockito.Mockito.*;

import rx.Observable;
import rx.observables.BlockingObservable;
import rx.observables.StringObservable;
import rx.Observer;
import rx.util.AssertObservable;

public class StringObservableTest {
Expand Down Expand Up @@ -127,4 +127,89 @@ public void testSplit(String message, String regex, int limit, Observable<String
Observable<String> exp = Observable.from(parts);
AssertObservable.assertObservableEqualsBlocking("when input is "+message+" and limit = "+ limit, exp, act);
}

@Test
public void testJoinMixed() {
Observable<Object> source = Observable.<Object>from("a", 1, "c");

Observable<String> result = StringObservable.join(source, ", ");

Observer<Object> observer = mock(Observer.class);

result.subscribe(observer);

verify(observer, times(1)).onNext("a, 1, c");
verify(observer, times(1)).onCompleted();
verify(observer, never()).onError(any(Throwable.class));
}
@Test
public void testJoinWithEmptyString() {
Observable<String> source = Observable.from("", "b", "c");

Observable<String> result = StringObservable.join(source, ", ");

Observer<Object> observer = mock(Observer.class);

result.subscribe(observer);

verify(observer, times(1)).onNext(", b, c");
verify(observer, times(1)).onCompleted();
verify(observer, never()).onError(any(Throwable.class));
}
@Test
public void testJoinWithNull() {
Observable<String> source = Observable.from("a", null, "c");

Observable<String> result = StringObservable.join(source, ", ");

Observer<Object> observer = mock(Observer.class);

result.subscribe(observer);

verify(observer, times(1)).onNext("a, null, c");
verify(observer, times(1)).onCompleted();
verify(observer, never()).onError(any(Throwable.class));
}
@Test
public void testJoinSingle() {
Observable<String> source = Observable.from("a");

Observable<String> result = StringObservable.join(source, ", ");

Observer<Object> observer = mock(Observer.class);

result.subscribe(observer);

verify(observer, times(1)).onNext("a");
verify(observer, times(1)).onCompleted();
verify(observer, never()).onError(any(Throwable.class));
}
@Test
public void testJoinEmpty() {
Observable<String> source = Observable.empty();

Observable<String> result = StringObservable.join(source, ", ");

Observer<Object> observer = mock(Observer.class);

result.subscribe(observer);

verify(observer, times(1)).onNext("");
verify(observer, times(1)).onCompleted();
verify(observer, never()).onError(any(Throwable.class));
}
@Test
public void testJoinThrows() {
Observable<String> source = Observable.concat(Observable.just("a"), Observable.<String>error(new RuntimeException("Forced failure")));

Observable<String> result = StringObservable.join(source, ", ");

Observer<Object> observer = mock(Observer.class);

result.subscribe(observer);

verify(observer, never()).onNext("a");
verify(observer, never()).onCompleted();
verify(observer, times(1)).onError(any(Throwable.class));
}
}