Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Skip fixed #936

Merged
merged 2 commits into from
Mar 4, 2014
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
84 changes: 2 additions & 82 deletions rxjava-core/src/main/java/rx/Observable.java
Original file line number Diff line number Diff line change
Expand Up @@ -49,87 +49,7 @@
import rx.observables.ConnectableObservable;
import rx.observables.GroupedObservable;
import rx.observers.SafeSubscriber;
import rx.operators.OnSubscribeFromIterable;
import rx.operators.OnSubscribeRange;
import rx.operators.OperationAll;
import rx.operators.OperationAmb;
import rx.operators.OperationAny;
import rx.operators.OperationAsObservable;
import rx.operators.OperationAverage;
import rx.operators.OperationBuffer;
import rx.operators.OperationCache;
import rx.operators.OperationCombineLatest;
import rx.operators.OperationConcat;
import rx.operators.OperationDebounce;
import rx.operators.OperationDefaultIfEmpty;
import rx.operators.OperationDefer;
import rx.operators.OperationDelay;
import rx.operators.OperationDematerialize;
import rx.operators.OperationDistinct;
import rx.operators.OperationDistinctUntilChanged;
import rx.operators.OperationElementAt;
import rx.operators.OperationFinally;
import rx.operators.OperationFlatMap;
import rx.operators.OperationGroupByUntil;
import rx.operators.OperationGroupJoin;
import rx.operators.OperationInterval;
import rx.operators.OperationJoin;
import rx.operators.OperationJoinPatterns;
import rx.operators.OperationMaterialize;
import rx.operators.OperationMergeDelayError;
import rx.operators.OperationMergeMaxConcurrent;
import rx.operators.OperationMinMax;
import rx.operators.OperationMulticast;
import rx.operators.OperationOnErrorResumeNextViaObservable;
import rx.operators.OperationOnErrorReturn;
import rx.operators.OperationOnExceptionResumeNextViaObservable;
import rx.operators.OperationParallelMerge;
import rx.operators.OperationReplay;
import rx.operators.OperationRetry;
import rx.operators.OperationSample;
import rx.operators.OperationSequenceEqual;
import rx.operators.OperationSingle;
import rx.operators.OperationSkip;
import rx.operators.OperationSkipLast;
import rx.operators.OperationSkipUntil;
import rx.operators.OperationSkipWhile;
import rx.operators.OperationSum;
import rx.operators.OperationSwitch;
import rx.operators.OperationSynchronize;
import rx.operators.OperationTakeLast;
import rx.operators.OperationTakeTimed;
import rx.operators.OperationTakeUntil;
import rx.operators.OperationTakeWhile;
import rx.operators.OperationThrottleFirst;
import rx.operators.OperationTimeInterval;
import rx.operators.OperationTimer;
import rx.operators.OperationToMap;
import rx.operators.OperationToMultimap;
import rx.operators.OperationToObservableFuture;
import rx.operators.OperationUsing;
import rx.operators.OperationWindow;
import rx.operators.OperatorCast;
import rx.operators.OperatorDoOnEach;
import rx.operators.OperatorFilter;
import rx.operators.OperatorGroupBy;
import rx.operators.OperatorMap;
import rx.operators.OperatorMerge;
import rx.operators.OperatorObserveOn;
import rx.operators.OperatorOnErrorResumeNextViaFunction;
import rx.operators.OperatorOnErrorFlatMap;
import rx.operators.OperatorParallel;
import rx.operators.OperatorRepeat;
import rx.operators.OperatorScan;
import rx.operators.OperatorSubscribeOn;
import rx.operators.OperatorTake;
import rx.operators.OperatorTimeout;
import rx.operators.OperatorTimeoutWithSelector;
import rx.operators.OperatorTimestamp;
import rx.operators.OperatorToObservableList;
import rx.operators.OperatorToObservableSortedList;
import rx.operators.OperatorUnsubscribeOn;
import rx.operators.OperatorZip;
import rx.operators.OperatorZipIterable;
import rx.operators.*;
import rx.plugins.RxJavaObservableExecutionHook;
import rx.plugins.RxJavaPlugins;
import rx.schedulers.Schedulers;
Expand Down Expand Up @@ -6274,7 +6194,7 @@ public final Observable<T> singleOrDefault(T defaultValue, Func1<? super T, Bool
* @see <a href="https://github.com/Netflix/RxJava/wiki/Filtering-Observables#wiki-skip">RxJava Wiki: skip()</a>
*/
public final Observable<T> skip(int num) {
return create(OperationSkip.skip(this, num));
return lift(new OperatorSkip<T>(num));
}

/**
Expand Down
75 changes: 0 additions & 75 deletions rxjava-core/src/main/java/rx/operators/OperationSkip.java
Original file line number Diff line number Diff line change
Expand Up @@ -39,81 +39,6 @@
*/
public final class OperationSkip {

/**
* Skips a specified number of contiguous values from the start of a Observable sequence and then returns the remaining values.
*
* @param items
* @param num
* @return the observable sequence starting after a number of skipped values
*
* @see <a href="http://msdn.microsoft.com/en-us/library/hh229847(v=vs.103).aspx">Observable.Skip(TSource) Method</a>
*/
public static <T> OnSubscribeFunc<T> skip(final Observable<? extends T> items, final int num) {
// wrap in a Observable so that if a chain is built up, then asynchronously subscribed to twice we will have 2 instances of Take<T> rather than 1 handing both, which is not thread-safe.
return new OnSubscribeFunc<T>() {

@Override
public Subscription onSubscribe(Observer<? super T> observer) {
return new Skip<T>(items, num).onSubscribe(observer);
}

};
}

/**
* This class is NOT thread-safe if invoked and referenced multiple times. In other words, don't subscribe to it multiple times from different threads.
* <p>
* It IS thread-safe from within it while receiving onNext events from multiple threads.
*
* @param <T>
*/
private static class Skip<T> implements OnSubscribeFunc<T> {
private final int num;
private final Observable<? extends T> items;

Skip(final Observable<? extends T> items, final int num) {
this.num = num;
this.items = items;
}

public Subscription onSubscribe(Observer<? super T> observer) {
return items.subscribe(new ItemObserver(observer));
}

/**
* Used to subscribe to the 'items' Observable sequence and forward to the actualObserver up to 'num' count.
*/
private class ItemObserver implements Observer<T> {

private AtomicInteger counter = new AtomicInteger();
private final Observer<? super T> observer;

public ItemObserver(Observer<? super T> observer) {
this.observer = observer;
}

@Override
public void onCompleted() {
observer.onCompleted();
}

@Override
public void onError(Throwable e) {
observer.onError(e);
}

@Override
public void onNext(T args) {
// skip them until we reach the 'num' value
if (counter.incrementAndGet() > num) {
observer.onNext(args);
}
}

}

}

/**
* Skip the items after subscription for the given duration.
*
Expand Down
1 change: 1 addition & 0 deletions rxjava-core/src/main/java/rx/operators/OperatorMap.java
Original file line number Diff line number Diff line change
Expand Up @@ -61,3 +61,4 @@ public void onNext(T t) {
}

}

51 changes: 51 additions & 0 deletions rxjava-core/src/main/java/rx/operators/OperatorSkip.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
package rx.operators;

import rx.Observable;
import rx.Subscriber;

/**
* Returns an Observable that skips the first <code>num</code> items emitted by the source
* Observable.
* <p>
* <img width="640" src="https://github.com/Netflix/RxJava/wiki/images/rx-operators/skip.png">
* <p>
* You can ignore the first <code>num</code> items emitted by an Observable and attend only to
* those items that come after, by modifying the Observable with the skip operation.
*/
public final class OperatorSkip<T> implements Observable.Operator<T, T> {

final int n;

public OperatorSkip(int n) {
this.n = n;
}

@Override
public Subscriber<? super T> call(final Subscriber<? super T> child) {
return new Subscriber<T>(child) {

int skipped = 0;

@Override
public void onCompleted() {
child.onCompleted();
}

@Override
public void onError(Throwable e) {
child.onError(e);
}

@Override
public void onNext(T t) {
if(skipped >= n) {
child.onNext(t);
} else {
skipped += 1;
}
}

};
}

}
31 changes: 0 additions & 31 deletions rxjava-core/src/test/java/rx/operators/OperationSkipTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@

import static org.mockito.Matchers.*;
import static org.mockito.Mockito.*;
import static rx.operators.OperationSkip.*;

import java.util.concurrent.TimeUnit;

Expand All @@ -31,36 +30,6 @@

public class OperationSkipTest {

@Test
public void testSkip1() {
Observable<String> w = Observable.from("one", "two", "three");
Observable<String> skip = Observable.create(skip(w, 2));

@SuppressWarnings("unchecked")
Observer<String> observer = mock(Observer.class);
skip.subscribe(observer);
verify(observer, never()).onNext("one");
verify(observer, never()).onNext("two");
verify(observer, times(1)).onNext("three");
verify(observer, never()).onError(any(Throwable.class));
verify(observer, times(1)).onCompleted();
}

@Test
public void testSkip2() {
Observable<String> w = Observable.from("one", "two", "three");
Observable<String> skip = Observable.create(skip(w, 1));

@SuppressWarnings("unchecked")
Observer<String> observer = mock(Observer.class);
skip.subscribe(observer);
verify(observer, never()).onNext("one");
verify(observer, times(1)).onNext("two");
verify(observer, times(1)).onNext("three");
verify(observer, never()).onError(any(Throwable.class));
verify(observer, times(1)).onCompleted();
}

@Test
public void testSkipTimed() {
TestScheduler scheduler = new TestScheduler();
Expand Down
Loading