Skip to content

Commit

Permalink
Merge pull request #3224 from akarnokd/OperatorDistinct2x
Browse files Browse the repository at this point in the history
Operator distinct, timeInterval, common Timed container.
  • Loading branch information
akarnokd committed Aug 28, 2015
2 parents 35150b0 + 12266f3 commit b2b252c
Show file tree
Hide file tree
Showing 6 changed files with 354 additions and 27 deletions.
61 changes: 54 additions & 7 deletions src/main/java/io/reactivex/Observable.java
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
import io.reactivex.observables.*;
import io.reactivex.plugins.RxJavaPlugins;
import io.reactivex.schedulers.*;
import io.reactivex.subscribers.SafeSubscriber;
import io.reactivex.subscribers.*;

public class Observable<T> implements Publisher<T> {
final Publisher<T> onSubscribe;
Expand Down Expand Up @@ -364,7 +364,7 @@ public final Observable<T> skipWhile(Predicate<? super T> predicate) {
return lift(new OperatorSkipWhile<>(predicate));
}

public final Observable<T> skipUntil(Publisher<? extends T> other) {
public final <U> Observable<T> skipUntil(Publisher<? extends U> other) {
Objects.requireNonNull(other);
return lift(new OperatorSkipUntil<>(other));
}
Expand Down Expand Up @@ -1111,20 +1111,20 @@ public final <U> Observable<U> ofType(Class<U> clazz) {
return filter(clazz::isInstance).cast(clazz);
}

public final Observable<Timestamped<T>> timestamp() {
public final Observable<Timed<T>> timestamp() {
return timestamp(TimeUnit.MILLISECONDS, Schedulers.trampoline());
}

public final Observable<Timestamped<T>> timestamp(Scheduler scheduler) {
public final Observable<Timed<T>> timestamp(Scheduler scheduler) {
return timestamp(TimeUnit.MILLISECONDS, scheduler);
}

public final Observable<Timestamped<T>> timestamp(TimeUnit unit) {
public final Observable<Timed<T>> timestamp(TimeUnit unit) {
return timestamp(unit, Schedulers.trampoline());
}

public final Observable<Timestamped<T>> timestamp(TimeUnit unit, Scheduler scheduler) {
return map(v -> new Timestamped<>(v, scheduler.now(unit), unit));
public final Observable<Timed<T>> timestamp(TimeUnit unit, Scheduler scheduler) {
return map(v -> new Timed<>(v, scheduler.now(unit), unit));
}

public final Observable<Try<Optional<T>>> materialize() {
Expand All @@ -1146,4 +1146,51 @@ public final Observable<T> dematerialize() {
public final Observable<T> limit(long n) {
return take(n);
}

public final Observable<T> distinct() {
return distinct(HashSet::new);
}

public final Observable<T> distinct(Supplier<? extends Collection<? super T>> collectionSupplier) {
return lift(OperatorDistinct.withCollection(collectionSupplier));
}

public final Observable<T> distinctUntilChanged() {
return lift(OperatorDistinct.untilChanged());
}

@Deprecated
public final Observable<Observable<T>> nest() {
return just(this);
}

public final Observable<T> serialize() {
return lift(s -> new SerializedSubscriber<>(s));
}

public final Observable<T> take(long time, TimeUnit unit, Scheduler scheduler) {
// TODO consider inlining this behavior
return takeUntil(timer(time, unit, scheduler));
}

public final Observable<T> skip(long time, TimeUnit unit, Scheduler scheduler) {
// TODO consider inlining this behavior
return skipUntil(timer(time, unit, scheduler));
}

public final Observable<Timed<T>> timeInterval() {
return timeInterval(TimeUnit.MILLISECONDS, Schedulers.trampoline());
}

public final Observable<Timed<T>> timeInterval(Scheduler scheduler) {
return timeInterval(TimeUnit.MILLISECONDS, scheduler);
}

public final Observable<Timed<T>> timeInterval(TimeUnit unit) {
return timeInterval(unit, Schedulers.trampoline());
}

public final Observable<Timed<T>> timeInterval(TimeUnit unit, Scheduler scheduler) {
return lift(new OperatorTimeInterval<>(unit, scheduler));
}
}
153 changes: 153 additions & 0 deletions src/main/java/io/reactivex/internal/operators/OperatorDistinct.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,153 @@
/**
* Copyright 2015 Netflix, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in
* compliance with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is
* distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See
* the License for the specific language governing permissions and limitations under the License.
*/

package io.reactivex.internal.operators;

import java.util.*;
import java.util.function.*;

import org.reactivestreams.*;

import io.reactivex.Observable.Operator;
import io.reactivex.internal.subscribers.CancellingSubscriber;
import io.reactivex.internal.subscriptions.*;

public final class OperatorDistinct<T> implements Operator<T, T> {

final Supplier<? extends Predicate<? super T>> predicateSupplier;

public OperatorDistinct(Supplier<? extends Predicate<? super T>> predicateSupplier) {
this.predicateSupplier = predicateSupplier;
}

public static <T> OperatorDistinct<T> withCollection(Supplier<? extends Collection<? super T>> collectionSupplier) {
Supplier<? extends Predicate<? super T>> p = () -> {
Collection<? super T> coll = collectionSupplier.get();

return t -> {
if (t == null) {
coll.clear();
return true;
}
return coll.add(t);
};
};

return new OperatorDistinct<>(p);
}

static final OperatorDistinct<Object> UNTIL_CHANGED;
static {
Supplier<? extends Predicate<? super Object>> p = () -> {
Object[] last = { null };

return t -> {
if (t == null) {
last[0] = null;
return true;
}
Object o = last[0];
last[0] = t;
return !Objects.equals(o, t);
};
};
UNTIL_CHANGED = new OperatorDistinct<>(p);
}

@SuppressWarnings("unchecked")
public static <T> OperatorDistinct<T> untilChanged() {
return (OperatorDistinct<T>)UNTIL_CHANGED;
}

@Override
public Subscriber<? super T> apply(Subscriber<? super T> t) {
Predicate<? super T> coll;
try {
coll = predicateSupplier.get();
} catch (Throwable e) {
t.onSubscribe(EmptySubscription.INSTANCE);
t.onError(e);
return CancellingSubscriber.INSTANCE;
}

if (coll == null) {
t.onSubscribe(EmptySubscription.INSTANCE);
t.onError(new NullPointerException("predicateSupplier returned null"));
return CancellingSubscriber.INSTANCE;
}

return null;
}

static final class DistinctSubscriber<T> implements Subscriber<T> {
final Subscriber<? super T> actual;
final Predicate<? super T> predicate;

Subscription s;

public DistinctSubscriber(Subscriber<? super T> actual, Predicate<? super T> predicate) {
this.actual = actual;
this.predicate = predicate;
}

@Override
public void onSubscribe(Subscription s) {
if (SubscriptionHelper.validateSubscription(this.s, s)) {
return;
}
this.s = s;
actual.onSubscribe(s);
}

@Override
public void onNext(T t) {
boolean b;
try {
b = predicate.test(t);
} catch (Throwable e) {
s.cancel();
actual.onError(e);
return;
}

if (b) {
actual.onNext(t);
} else {
s.request(1);
}
}

@Override
public void onError(Throwable t) {
try {
predicate.test(null); // special case: poison pill
} catch (Throwable e) {
t.addSuppressed(e);
actual.onError(t);
return;
}
actual.onError(t);
}

@Override
public void onComplete() {
try {
predicate.test(null); // special case: poison pill
} catch (Throwable e) {
actual.onError(e);
return;
}
actual.onComplete();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
import io.reactivex.internal.util.NotificationLite;
import io.reactivex.observables.ConnectableObservable;
import io.reactivex.plugins.RxJavaPlugins;
import io.reactivex.schedulers.Timestamped;
import io.reactivex.schedulers.Timed;

public final class OperatorReplay<T> extends ConnectableObservable<T> {
/** The source observable. */
Expand Down Expand Up @@ -1078,12 +1078,12 @@ public SizeAndTimeBoundReplayBuffer(int limit, long maxAge, TimeUnit unit, Sched

@Override
Object enterTransform(Object value) {
return new Timestamped<>(value, scheduler.now(unit), unit);
return new Timed<>(value, scheduler.now(unit), unit);
}

@Override
Object leaveTransform(Object value) {
return ((Timestamped<?>)value).value();
return ((Timed<?>)value).value();
}

@Override
Expand All @@ -1102,8 +1102,8 @@ void truncate() {
prev = next;
next = next.get();
} else {
Timestamped<?> v = (Timestamped<?>)next.value;
if (v.timestamp() <= timeLimit) {
Timed<?> v = (Timed<?>)next.value;
if (v.time() <= timeLimit) {
e++;
size--;
prev = next;
Expand All @@ -1130,8 +1130,8 @@ void truncateFinal() {
int e = 0;
for (;;) {
if (next != null && size > 1) {
Timestamped<?> v = (Timestamped<?>)next.value;
if (v.timestamp() <= timeLimit) {
Timed<?> v = (Timed<?>)next.value;
if (v.time() <= timeLimit) {
e++;
size--;
prev = next;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
/**
* Copyright 2015 Netflix, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in
* compliance with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is
* distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See
* the License for the specific language governing permissions and limitations under the License.
*/

package io.reactivex.internal.operators;

import java.util.concurrent.TimeUnit;

import org.reactivestreams.*;

import io.reactivex.Observable.Operator;
import io.reactivex.Scheduler;
import io.reactivex.schedulers.Timed;

public final class OperatorTimeInterval<T> implements Operator<Timed<T>, T> {
final Scheduler scheduler;
final TimeUnit unit;

public OperatorTimeInterval(TimeUnit unit, Scheduler scheduler) {
this.scheduler = scheduler;
this.unit = unit;
}

@Override
public Subscriber<? super T> apply(Subscriber<? super Timed<T>> t) {
return new TimeIntervalSubscriber<>(t, unit, scheduler);
}

static final class TimeIntervalSubscriber<T> implements Subscriber<T> {
final Subscriber<? super Timed<T>> actual;
final TimeUnit unit;
final Scheduler scheduler;

long lastTime;

public TimeIntervalSubscriber(Subscriber<? super Timed<T>> actual, TimeUnit unit, Scheduler scheduler) {
this.actual = actual;
this.scheduler = scheduler;
this.unit = unit;
}

@Override
public void onSubscribe(Subscription s) {
lastTime = scheduler.now(unit);
actual.onSubscribe(s);
}

@Override
public void onNext(T t) {
long now = scheduler.now(unit);
long last = lastTime;
lastTime = now;
long delta = now - last;
actual.onNext(new Timed<>(t, delta, unit));
}

@Override
public void onError(Throwable t) {
actual.onError(t);
}

@Override
public void onComplete() {
actual.onComplete();
}
}
}
Loading

0 comments on commit b2b252c

Please sign in to comment.