Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

1.x: fix and deprecate evicting groupBy and add new overload #5917

Merged
merged 2 commits into from
Mar 19, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ dependencies {

testCompile 'junit:junit:4.12'
testCompile 'org.mockito:mockito-core:1.10.19'
testCompile 'com.google.guava:guava:19.0'
testCompile 'com.google.guava:guava:24.0-jre'
testCompile 'com.pushtorefresh.java-private-constructor-checker:checker:1.2.0'

perfCompile 'org.openjdk.jmh:jmh-core:1.11.3'
Expand Down
75 changes: 73 additions & 2 deletions src/main/java/rx/Observable.java
Original file line number Diff line number Diff line change
Expand Up @@ -7275,7 +7275,7 @@ public final void forEach(final Action1<? super T> onNext, final Action1<Throwab
* @see <a href="http://reactivex.io/documentation/operators/groupby.html">ReactiveX operators documentation: GroupBy</a>
*/
public final <K, R> Observable<GroupedObservable<K, R>> groupBy(final Func1<? super T, ? extends K> keySelector, final Func1<? super T, ? extends R> elementSelector) {
return lift(new OperatorGroupBy<T, K, R>(keySelector, elementSelector));
return lift(new OperatorGroupByEvicting<T, K, R>(keySelector, elementSelector));
}

/**
Expand Down Expand Up @@ -7334,7 +7334,12 @@ public final <K, R> Observable<GroupedObservable<K, R>> groupBy(final Func1<? su
* if {@code evictingMapFactory} is null
* @see <a href="http://reactivex.io/documentation/operators/groupby.html">ReactiveX operators documentation: GroupBy</a>
* @since 1.3
* @deprecated since 1.3.7, use {@link #groupBy(Func1, Func1, int, boolean, Func1)}
* instead which uses much less memory. Please take note of the
* usage difference involving the evicting action which now expects
* the value from the map instead of the key.
*/
@Deprecated
public final <K, R> Observable<GroupedObservable<K, R>> groupBy(final Func1<? super T, ? extends K> keySelector,
final Func1<? super T, ? extends R> elementSelector, final Func1<Action1<K>, Map<K, Object>> evictingMapFactory) {
if (evictingMapFactory == null) {
Expand Down Expand Up @@ -7369,6 +7374,72 @@ public final <K, R> Observable<GroupedObservable<K, R>> groupBy(final Func1<? su
*
* @param keySelector
* a function that extracts the key for each item
* @param elementSelector
* a function that extracts the return element for each item
* @param bufferSize
* the size of the buffer ({@link RxRingBuffer.SIZE} may be suitable).
* @param delayError
* if and only if false then onError emissions can shortcut onNext emissions (emissions may be buffered)
* @param evictingMapFactory
* a function that given an eviction action returns a {@link Map} instance that will be used to assign
* items to the appropriate {@code GroupedObservable}s. The {@code Map} instance must be thread-safe
* and any eviction must trigger a call to the supplied action (synchronously or asynchronously).
* This can be used to limit the size of the map by evicting entries by map maximum size or access time for
* instance. Here's an example using Guava's {@code CacheBuilder} from v24.0:
* <pre>
* {@code
* Func1<Action1<Object>, Map<K, Object>> mapFactory
* = action -> CacheBuilder.newBuilder()
* .maximumSize(1000)
* .expireAfterAccess(12, TimeUnit.HOURS)
* .removalListener(entry -> action.call(entry.getValue()))
* .<K, Object> build().asMap();
* }
* </pre>
*
* @param <K>
* the key type
* @param <R>
* the element type
* @return an {@code Observable} that emits {@link GroupedObservable}s, each of which corresponds to a
* unique key value and each of which emits those items from the source Observable that share that
* key value
* @throws NullPointerException
* if {@code evictingMapFactory} is null
* @see <a href="http://reactivex.io/documentation/operators/groupby.html">ReactiveX operators documentation: GroupBy</a>
* @since 1.3.7
*/
@Experimental
public final <K, R> Observable<GroupedObservable<K, R>> groupBy(final Func1<? super T, ? extends K> keySelector,
final Func1<? super T, ? extends R> elementSelector, int bufferSize, boolean delayError,
final Func1<Action1<Object>, Map<K, Object>> evictingMapFactory) {
if (evictingMapFactory == null) {
throw new NullPointerException("evictingMapFactory cannot be null");
}
return lift(new OperatorGroupByEvicting<T, K, R>(
keySelector, elementSelector, bufferSize, delayError, evictingMapFactory));
}

/**
* Groups the items emitted by an {@code Observable} according to a specified criterion, and emits these
* grouped items as {@link GroupedObservable}s. The emitted {@code GroupedObservable} allows only a single
* {@link Subscriber} during its lifetime and if this {@code Subscriber} unsubscribes before the
* source terminates, the next emission by the source having the same key will trigger a new
* {@code GroupedObservable} emission.
* <p>
* <img width="640" height="360" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/groupBy.png" alt="">
* <p>
* <em>Note:</em> A {@link GroupedObservable} will cache the items it is to emit until such time as it
* is subscribed to. For this reason, in order to avoid memory leaks, you should not simply ignore those
* {@code GroupedObservable}s that do not concern you. Instead, you can signal to them that they may
* discard their buffers by applying an operator like {@link #ignoreElements} to them.
* <dl>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code groupBy} does not operate by default on a particular {@link Scheduler}.</dd>
* </dl>
*
* @param keySelector
* a function that extracts the key for each item
* @param <K>
* the key type
* @return an {@code Observable} that emits {@link GroupedObservable}s, each of which corresponds to a
Expand All @@ -7377,7 +7448,7 @@ public final <K, R> Observable<GroupedObservable<K, R>> groupBy(final Func1<? su
* @see <a href="http://reactivex.io/documentation/operators/groupby.html">ReactiveX operators documentation: GroupBy</a>
*/
public final <K> Observable<GroupedObservable<K, T>> groupBy(final Func1<? super T, ? extends K> keySelector) {
return lift(new OperatorGroupBy<T, K, T>(keySelector));
return lift(new OperatorGroupByEvicting<T, K, T>(keySelector));
}

/**
Expand Down
24 changes: 21 additions & 3 deletions src/main/java/rx/internal/operators/OperatorGroupBy.java
Original file line number Diff line number Diff line change
Expand Up @@ -42,13 +42,16 @@
* the source and group value type
* @param <V>
* the value type of the groups
* @deprecated
* since 1.3.7, use {@link OperatorGroupByEvicting} instead
*/
@Deprecated
public final class OperatorGroupBy<T, K, V> implements Operator<GroupedObservable<K, V>, T> {
final Func1<? super T, ? extends K> keySelector;
final Func1<? super T, ? extends V> valueSelector;
final int bufferSize;
final boolean delayError;
final Func1<Action1<K>, Map<K, Object>> mapFactory;
final Func1<Action1<K>, Map<K, Object>> mapFactory; //nullable

@SuppressWarnings({ "unchecked", "rawtypes" })
public OperatorGroupBy(Func1<? super T, ? extends K> keySelector) {
Expand Down Expand Up @@ -116,6 +119,10 @@ public static final class GroupBySubscriber<T, K, V>
final int bufferSize;
final boolean delayError;
final Map<Object, GroupedUnicast<K, V>> groups;

// double store the groups to workaround the bug in the
// signature of groupBy with evicting map factory
final Map<Object, GroupedUnicast<K, V>> groupsCopy;
final Queue<GroupedObservable<K, V>> queue;
final GroupByProducer producer;
final Queue<K> evictedKeys;
Expand All @@ -134,7 +141,7 @@ public static final class GroupBySubscriber<T, K, V>
volatile boolean done;

final AtomicInteger wip;

public GroupBySubscriber(Subscriber<? super GroupedObservable<K, V>> actual, Func1<? super T, ? extends K> keySelector,
Func1<? super T, ? extends V> valueSelector, int bufferSize, boolean delayError,
Func1<Action1<K>, Map<K, Object>> mapFactory) {
Expand All @@ -158,6 +165,7 @@ public GroupBySubscriber(Subscriber<? super GroupedObservable<K, V>> actual, Fun
this.evictedKeys = new ConcurrentLinkedQueue<K>();
this.groups = createMap(mapFactory, new EvictionAction<K>(evictedKeys));
}
this.groupsCopy = new ConcurrentHashMap<Object, GroupedUnicast<K, V>>();
}

static class EvictionAction<K> implements Action1<K> {
Expand Down Expand Up @@ -211,6 +219,9 @@ public void onNext(T t) {
if (!cancelled.get()) {
group = GroupedUnicast.createWith(key, bufferSize, this, delayError);
groups.put(mapKey, group);
if (evictedKeys != null) {
groupsCopy.put(mapKey, group);
}

groupCount.getAndIncrement();

Expand All @@ -234,7 +245,9 @@ public void onNext(T t) {
if (evictedKeys != null) {
K evictedKey;
while ((evictedKey = evictedKeys.poll()) != null) {
GroupedUnicast<K, V> g = groups.get(evictedKey);
GroupedUnicast<K, V> g = groupsCopy.remove(evictedKey);
// do a null check on g because cancel(K) could have cleared
// the map
if (g != null) {
g.onComplete();
}
Expand Down Expand Up @@ -270,6 +283,7 @@ public void onCompleted() {
}
groups.clear();
if (evictedKeys != null) {
groupsCopy.clear();
evictedKeys.clear();
}

Expand Down Expand Up @@ -304,6 +318,9 @@ public void cancel(K key) {
unsubscribe();
}
}
if (evictedKeys != null) {
groupsCopy.remove(mapKey);
}
}

void drain() {
Expand Down Expand Up @@ -364,6 +381,7 @@ void errorAll(Subscriber<? super GroupedObservable<K, V>> a, Queue<?> q, Throwab
List<GroupedUnicast<K, V>> list = new ArrayList<GroupedUnicast<K, V>>(groups.values());
groups.clear();
if (evictedKeys != null) {
groupsCopy.clear();
evictedKeys.clear();
}

Expand Down
Loading