-
Notifications
You must be signed in to change notification settings - Fork 7.6k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Implemented SerialSubscription and Timeout operator #434
Changes from all commits
669c7b6
22eaa5e
10da2ae
b58bc45
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,128 @@ | ||
/** | ||
* Copyright 2013 Netflix, Inc. | ||
* | ||
* Licensed under the Apache License, Version 2.0 (the "License"); | ||
* you may not use this file except in compliance with the License. | ||
* You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
package rx.operators; | ||
|
||
import rx.Observable; | ||
import rx.Observer; | ||
import rx.Scheduler; | ||
import rx.Subscription; | ||
import rx.subscriptions.CompositeSubscription; | ||
import rx.subscriptions.SerialSubscription; | ||
import rx.util.functions.Action0; | ||
import rx.util.functions.Func0; | ||
|
||
import java.util.concurrent.TimeUnit; | ||
import java.util.concurrent.TimeoutException; | ||
import java.util.concurrent.atomic.AtomicBoolean; | ||
import java.util.concurrent.atomic.AtomicLong; | ||
|
||
public final class OperationTimeout { | ||
public static <T> Observable.OnSubscribeFunc<T> timeout(Observable<? extends T> source, long timeout, TimeUnit timeUnit, Scheduler scheduler) { | ||
return new Timeout<T>(source, timeout, timeUnit, scheduler); | ||
} | ||
|
||
private static class Timeout<T> implements Observable.OnSubscribeFunc<T> { | ||
private final Observable<? extends T> source; | ||
private final long timeout; | ||
private final TimeUnit timeUnit; | ||
private final Scheduler scheduler; | ||
|
||
private Timeout(Observable<? extends T> source, long timeout, TimeUnit timeUnit, Scheduler scheduler) { | ||
this.source = source; | ||
this.timeout = timeout; | ||
this.timeUnit = timeUnit; | ||
this.scheduler = scheduler; | ||
} | ||
|
||
@Override | ||
public Subscription onSubscribe(final Observer<? super T> observer) { | ||
final AtomicBoolean terminated = new AtomicBoolean(false); | ||
final AtomicLong actual = new AtomicLong(0L); // Required to handle race between onNext and timeout | ||
final SerialSubscription serial = new SerialSubscription(); | ||
final Object gate = new Object(); | ||
CompositeSubscription composite = new CompositeSubscription(); | ||
final Func0<Subscription> schedule = new Func0<Subscription>() { | ||
@Override | ||
public Subscription call() { | ||
final long expected = actual.get(); | ||
return scheduler.schedule(new Action0() { | ||
@Override | ||
public void call() { | ||
boolean timeoutWins = false; | ||
synchronized (gate) { | ||
if (expected == actual.get() && !terminated.getAndSet(true)) { | ||
timeoutWins = true; | ||
} | ||
} | ||
if (timeoutWins) { | ||
observer.onError(new TimeoutException()); | ||
} | ||
|
||
} | ||
}, timeout, timeUnit); | ||
} | ||
}; | ||
SafeObservableSubscription subscription = new SafeObservableSubscription(); | ||
composite.add(subscription.wrap(source.subscribe(new Observer<T>() { | ||
@Override | ||
public void onNext(T value) { | ||
boolean onNextWins = false; | ||
synchronized (gate) { | ||
if (!terminated.get()) { | ||
actual.incrementAndGet(); | ||
onNextWins = true; | ||
} | ||
} | ||
if (onNextWins) { | ||
serial.setSubscription(schedule.call()); | ||
observer.onNext(value); | ||
} | ||
} | ||
|
||
@Override | ||
public void onError(Throwable error) { | ||
boolean onErrorWins = false; | ||
synchronized (gate) { | ||
if (!terminated.getAndSet(true)) { | ||
onErrorWins = true; | ||
} | ||
} | ||
if (onErrorWins) { | ||
serial.unsubscribe(); | ||
observer.onError(error); | ||
} | ||
} | ||
|
||
@Override | ||
public void onCompleted() { | ||
boolean onCompletedWins = false; | ||
synchronized (gate) { | ||
if (!terminated.getAndSet(true)) { | ||
onCompletedWins = true; | ||
} | ||
} | ||
if (onCompletedWins) { | ||
serial.unsubscribe(); | ||
observer.onCompleted(); | ||
} | ||
} | ||
}))); | ||
composite.add(serial); | ||
serial.setSubscription(schedule.call()); | ||
return composite; | ||
} | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,70 @@ | ||
/** | ||
* Copyright 2013 Netflix, Inc. | ||
* | ||
* Licensed under the Apache License, Version 2.0 (the "License"); | ||
* you may not use this file except in compliance with the License. | ||
* You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
package rx.subscriptions; | ||
|
||
import rx.Subscription; | ||
|
||
/** | ||
* Represents a subscription whose underlying subscription can be swapped for another subscription | ||
* which causes the previous underlying subscription to be unsubscribed. | ||
* | ||
* @see <a href="http://msdn.microsoft.com/en-us/library/system.reactive.disposables.serialdisposable(v=vs.103).aspx">Rx.Net equivalent SerialDisposable</a> | ||
*/ | ||
public class SerialSubscription implements Subscription { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think this class is achieving the same as No idea why I wrote such a horrible Javadoc on it that doesn't explain it well. From the MSDN doc it states: "Represents a disposable whose underlying disposable can be swapped for another disposable." Your Javadoc description should replace the There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Hi Ben, SerialSubscription and MultipleAssignmentSubscription are different things. MultipleAssignment simply enables you to swap out the underlying subscription in a thread-safe manner. Whereas, Serial unsubscribes from the previous underlying as you replace. See: There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Ah okay, cool. |
||
private boolean unsubscribed; | ||
private Subscription subscription; | ||
private final Object gate = new Object(); | ||
|
||
@Override | ||
public void unsubscribe() { | ||
Subscription toUnsubscribe = null; | ||
synchronized (gate) { | ||
if (!unsubscribed) { | ||
if (subscription != null) { | ||
toUnsubscribe = subscription; | ||
subscription = null; | ||
} | ||
unsubscribed = true; | ||
} | ||
} | ||
if (toUnsubscribe != null) { | ||
toUnsubscribe.unsubscribe(); | ||
} | ||
} | ||
|
||
public Subscription getSubscription() { | ||
synchronized (gate) { | ||
return subscription; | ||
} | ||
} | ||
|
||
public void setSubscription(Subscription subscription) { | ||
Subscription toUnsubscribe = null; | ||
synchronized (gate) { | ||
if (!unsubscribed) { | ||
if (this.subscription != null) { | ||
toUnsubscribe = this.subscription; | ||
} | ||
this.subscription = subscription; | ||
} else { | ||
toUnsubscribe = subscription; | ||
} | ||
} | ||
if (toUnsubscribe != null) { | ||
toUnsubscribe.unsubscribe(); | ||
} | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,117 @@ | ||
/** | ||
* Copyright 2013 Netflix, Inc. | ||
* | ||
* Licensed under the Apache License, Version 2.0 (the "License"); | ||
* you may not use this file except in compliance with the License. | ||
* You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
package rx; | ||
|
||
import org.junit.Before; | ||
import org.junit.Test; | ||
import org.mockito.MockitoAnnotations; | ||
import rx.concurrency.TestScheduler; | ||
import rx.subjects.PublishSubject; | ||
|
||
import java.util.concurrent.TimeUnit; | ||
import java.util.concurrent.TimeoutException; | ||
|
||
import static org.mockito.Matchers.any; | ||
import static org.mockito.Mockito.mock; | ||
import static org.mockito.Mockito.never; | ||
import static org.mockito.Mockito.verify; | ||
|
||
public class TimeoutTests { | ||
private PublishSubject<String> underlyingSubject; | ||
private TestScheduler testScheduler; | ||
private Observable<String> withTimeout; | ||
private static final long TIMEOUT = 3; | ||
private static final TimeUnit TIME_UNIT = TimeUnit.SECONDS; | ||
|
||
@Before | ||
public void setUp() { | ||
MockitoAnnotations.initMocks(this); | ||
|
||
underlyingSubject = PublishSubject.create(); | ||
testScheduler = new TestScheduler(); | ||
withTimeout = underlyingSubject.timeout(TIMEOUT, TIME_UNIT, testScheduler); | ||
} | ||
|
||
@Test | ||
public void shouldNotTimeoutIfOnNextWithinTimeout() { | ||
Observer<String> observer = mock(Observer.class); | ||
Subscription subscription = withTimeout.subscribe(observer); | ||
testScheduler.advanceTimeBy(2, TimeUnit.SECONDS); | ||
underlyingSubject.onNext("One"); | ||
verify(observer).onNext("One"); | ||
testScheduler.advanceTimeBy(2, TimeUnit.SECONDS); | ||
verify(observer, never()).onError(any(Throwable.class)); | ||
subscription.unsubscribe(); | ||
} | ||
|
||
@Test | ||
public void shouldNotTimeoutIfSecondOnNextWithinTimeout() { | ||
Observer<String> observer = mock(Observer.class); | ||
Subscription subscription = withTimeout.subscribe(observer); | ||
testScheduler.advanceTimeBy(2, TimeUnit.SECONDS); | ||
underlyingSubject.onNext("One"); | ||
testScheduler.advanceTimeBy(2, TimeUnit.SECONDS); | ||
underlyingSubject.onNext("Two"); | ||
verify(observer).onNext("Two"); | ||
testScheduler.advanceTimeBy(2, TimeUnit.SECONDS); | ||
verify(observer, never()).onError(any(Throwable.class)); | ||
subscription.unsubscribe(); | ||
} | ||
|
||
@Test | ||
public void shouldTimeoutIfOnNextNotWithinTimeout() { | ||
Observer<String> observer = mock(Observer.class); | ||
Subscription subscription = withTimeout.subscribe(observer); | ||
testScheduler.advanceTimeBy(TIMEOUT + 1, TimeUnit.SECONDS); | ||
verify(observer).onError(any(TimeoutException.class)); | ||
subscription.unsubscribe(); | ||
} | ||
|
||
@Test | ||
public void shouldTimeoutIfSecondOnNextNotWithinTimeout() { | ||
Observer<String> observer = mock(Observer.class); | ||
Subscription subscription = withTimeout.subscribe(observer); | ||
testScheduler.advanceTimeBy(2, TimeUnit.SECONDS); | ||
underlyingSubject.onNext("One"); | ||
verify(observer).onNext("One"); | ||
testScheduler.advanceTimeBy(TIMEOUT + 1, TimeUnit.SECONDS); | ||
verify(observer).onError(any(TimeoutException.class)); | ||
subscription.unsubscribe(); | ||
} | ||
|
||
@Test | ||
public void shouldCompleteIfUnderlyingComletes() { | ||
Observer<String> observer = mock(Observer.class); | ||
Subscription subscription = withTimeout.subscribe(observer); | ||
testScheduler.advanceTimeBy(2, TimeUnit.SECONDS); | ||
underlyingSubject.onCompleted(); | ||
testScheduler.advanceTimeBy(2, TimeUnit.SECONDS); | ||
verify(observer).onCompleted(); | ||
verify(observer, never()).onError(any(Throwable.class)); | ||
subscription.unsubscribe(); | ||
} | ||
|
||
@Test | ||
public void shouldErrorIfUnderlyingErrors() { | ||
Observer<String> observer = mock(Observer.class); | ||
Subscription subscription = withTimeout.subscribe(observer); | ||
testScheduler.advanceTimeBy(2, TimeUnit.SECONDS); | ||
underlyingSubject.onError(new UnsupportedOperationException()); | ||
testScheduler.advanceTimeBy(2, TimeUnit.SECONDS); | ||
verify(observer).onError(any(UnsupportedOperationException.class)); | ||
subscription.unsubscribe(); | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Would the use of
compareAndSet
patterns simplify this code to not need these win/loss checks?The nice thing about
compareAndSet
is it gives that in a very idiomatic manner.Reason I suggest it is that this code flow is non-trivial to read and understand with the synchronization gates and conditional flows that all need to be just right in order to work.
Here is an example where I use
compareAndSet
for state changes to achieve timeout in Hystrix: https://github.com/Netflix/Hystrix/blob/master/hystrix-core/src/main/java/com/netflix/hystrix/HystrixCommand.java#L1015An
AtomicReference
is used to atomically switch between the 3 possible states:The successful flow would attempt this when receiving
onNext
:The timeout flow would attempt this if the scheduler triggers it:
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The problem is that you have to handle the case where the timeout timer fires but onNext beats it into the synchronized section. As onNext does not mutate the timeout status, there would be nothing to stop the timeout thread from onErroring. The effect of this is you would get an onNext immediately followed by an onError.
The way to mitigate this is to have onNext increment a counter. As you can't cover two condition checks in a single, atomic compareAndSet you have to fall back to explicit synchronization.
The Rx team use a similar mechanism to handle this case.