Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

GroupBy fixes as reported in Issue 282 #284

Merged
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Add Object overloads for dynamic language support.
Need these until we finish work at #204
benjchristensen committed Jun 5, 2013
commit eb932d8a9eacb6517cf18c68edbb722fe98ec505
Original file line number Diff line number Diff line change
@@ -19,17 +19,21 @@ import static org.mockito.Matchers.*;
import static org.mockito.Mockito.*;

import java.util.Arrays;
import java.util.Collection;
import java.util.Map;

import org.junit.Before;
import org.junit.Test;
import static org.junit.Assert.*;

import org.mockito.Mock;
import org.mockito.MockitoAnnotations;

import rx.Notification;
import rx.Observable;
import rx.Observer;
import rx.Subscription;
import rx.observables.GroupedObservable;
import rx.subscriptions.Subscriptions;
import rx.util.functions.Func1;

@@ -280,6 +284,29 @@ def class ObservableTests {
Observable.from(1, 2, 3).all({ x -> x > 0 }).subscribe({ result -> a.received(result) });
verify(a, times(1)).received(true);
}

@Test
public void testGroupBy() {
int count=0;

Observable.from("one", "two", "three", "four", "five", "six")
.groupBy({String s -> s.length()})
.mapMany({
groupObservable ->

return groupObservable.map({
s ->
return "Value: " + s + " Group: " + groupObservable.getKey();
});
}).toBlockingObservable().forEach({
s ->
println(s);
count++;
})

assertEquals(6, count);
}


def class AsyncObservable implements Func1<Observer<Integer>, Subscription> {

44 changes: 44 additions & 0 deletions rxjava-core/src/main/java/rx/Observable.java
Original file line number Diff line number Diff line change
@@ -1294,6 +1294,28 @@ public static <K, T, R> Observable<GroupedObservable<K, R>> groupBy(Observable<T
return create(OperationGroupBy.groupBy(source, keySelector, elementSelector));
}

@SuppressWarnings("rawtypes")
public static <K, T, R> Observable<GroupedObservable<K, R>> groupBy(Observable<T> source, final Object keySelector, final Object elementSelector) {
final FuncN _k = Functions.from(keySelector);
final FuncN _e = Functions.from(elementSelector);

return groupBy(source, new Func1<T, K>() {

@SuppressWarnings("unchecked")
@Override
public K call(T t1) {
return (K) _k.call(t1);
}
}, new Func1<T, R>() {

@SuppressWarnings("unchecked")
@Override
public R call(T t1) {
return (R) _e.call(t1);
}
});
}

/**
* Groups the items emitted by an Observable according to a specified criteria, and emits these
* grouped items as Observables, one Observable per group.
@@ -1314,6 +1336,20 @@ public static <K, T, R> Observable<GroupedObservable<K, R>> groupBy(Observable<T
public static <K, T> Observable<GroupedObservable<K, T>> groupBy(Observable<T> source, final Func1<T, K> keySelector) {
return create(OperationGroupBy.groupBy(source, keySelector));
}

@SuppressWarnings("rawtypes")
public static <K, T> Observable<GroupedObservable<K, T>> groupBy(Observable<T> source, final Object keySelector) {
final FuncN _k = Functions.from(keySelector);

return groupBy(source, new Func1<T, K>() {

@SuppressWarnings("unchecked")
@Override
public K call(T t1) {
return (K) _k.call(t1);
}
});
}

/**
* This behaves like <code>merge</code> except that if any of the merged Observables notify
@@ -3560,6 +3596,10 @@ public Observable<T> startWith(T... values) {
public <K, R> Observable<GroupedObservable<K, R>> groupBy(final Func1<T, K> keySelector, final Func1<T, R> elementSelector) {
return groupBy(this, keySelector, elementSelector);
}

public <K, R> Observable<GroupedObservable<K, R>> groupBy(final Object keySelector, final Object elementSelector) {
return groupBy(this, keySelector, elementSelector);
}

/**
* Groups the items emitted by an Observable according to a specified criteria, and emits these
@@ -3578,6 +3618,10 @@ public <K, R> Observable<GroupedObservable<K, R>> groupBy(final Func1<T, K> keyS
public <K> Observable<GroupedObservable<K, T>> groupBy(final Func1<T, K> keySelector) {
return groupBy(this, keySelector);
}

public <K> Observable<GroupedObservable<K, T>> groupBy(final Object keySelector) {
return groupBy(this, keySelector);
}

/**
* Converts an Observable into a BlockingObservable (an Observable with blocking operators).