-
Notifications
You must be signed in to change notification settings - Fork 59
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add Observable.window operator #411
Comments
I want to implement this. fun <T> Observable<T>.window(
count: Long,
skip: Long = count,
bufferSize: Int = Int.MAX_VALUE
): Observable<Observable<T>> = TODO()
fun <T> Observable<T>.window(
spanMillis: Long,
scheduler: Scheduler,
skipMillis: Long = spanMillis,
maxSize: Long = Long.MAX_VALUE,
restartOnMaxSize: Boolean = false,
bufferSize: Int = Int.MAX_VALUE
): Observable<Observable<T>> = TODO()
fun <T> Observable<T>.window(
boundary: Observable<*>,
bufferSize: Int = Int.MAX_VALUE
): Observable<Observable<T>> = TODO()
fun <T, U> Observable<T>.window(
openingIndicator: Observable<U>,
closingIndicator: (U) -> Maybe<*>,
bufferSize: Int = Int.MAX_VALUE
): Observable<Observable<T>> = TODO()
fun <T> Observable<T>.window(
boundarySupplier: () -> Maybe<*>,
bufferSize: Int = Int.MAX_VALUE
): Observable<Observable<T>> = TODO() I will start with the first variant.
|
@amihusb I need to think about the API a bit, I will try to respond by 31/07. Thanks for contribution! |
Hey @amihusb, I have found a draft stash of the |
@arkivanov yes. Which overload will you do? |
@amihusb I still need to think about the API, because fun <T> Observable<T>.window(count: Int, skip: Int = count): Observable<Observable<T>> = TODO()
fun <T> Observable<T>.window(boundary: Single<WindowBoundary>): Observable<Observable<T>> = TODO()
fun <T> Observable<T>.window(boundary: Observable<WindowBoundary>): Observable<Observable<T>> = TODO()
class WindowBoundary(
val closingSignal: Completable,
val limit: Int = Int.MAX_VALUE,
val restartIfLimitReached: Boolean = false
) Looks like this set of operators covers all RxJava's cases. E.g.: fun <T> Observable<T>.window(timeSpan: Long, limit: Int, scheduler: Scheduler, restart: Boolean): Observable<Observable<T>> =
window(
WindowBoundary(
closingSignal = completableOfEmpty().delay(timeSpan, scheduler),
limit = limit,
restartIfLimitReached = restart
).toSingle()
)
fun <T> Observable<T>.window(boundary: Observable<*>): Observable<Observable<T>> =
window(boundary.map { WindowBoundary(closingSignal = completableOfNever()) })
fun <T> Observable<T>.window(timespan: Long, timeskip: Long, scheduler: Scheduler): Observable<Observable<T>> =
window(
observableOf(WindowBoundary(closingSignal = completableOfEmpty().delay(timespan, scheduler)))
.repeatWhen { maybeOf(Unit).delay(timeskip, scheduler) }
) But again it's not final. Let's wait for my draft to be finalized by Monday 03/08. |
@arkivanov does this code covers RxJava behaviour? fun <T> Observable<T>.window(timeSpan: Long, limit: Int, scheduler: Scheduler, restart: Boolean): Observable<Observable<T>> =
window(
WindowBoundary(
closingSignal = completableOfEmpty().delay(timeSpan, scheduler),
limit = limit,
restartIfLimitReached = restart
).toSingle()
) If I understand correctly what If so, this implementation don't cover this case, because in RxJava implementation it opens window at But this will work correctly: fun <T> Observable<T>.window(timeSpan: Long, limit: Int, scheduler: Scheduler, restart: Boolean): Observable<Observable<T>> =
window(
WindowBoundary(
closingSignal = completableOfEmpty().delay(timespan, scheduler),
limit = limit,
restartIfLimitReached = restart
)
.toObservable()
.repeatWhen { maybeOf(Unit).delay(timespan, scheduler) }
)
|
@arkivanov also this implementation will not work correctly: fun <T> Observable<T>.window(boundary: Observable<*>): Observable<Observable<T>> =
window(boundary.map { WindowBoundary(closingSignal = completableOfNever()) }) Implementation above will open window on every In RxJava ObservableWindowBoundary works like that: Correct me if i'm wrong. |
Yes this is correct. And I believe window will be opened on subscribe as well. Since the
Currently I want it public, as well as all these three operators.
I will add if on demand, if necessary. Just need to check. |
The window is reopened once a new |
Please allow some time for me to finish everything. The API is not final yet. |
Ok, seems like i don't understand some implementations, waiting. |
I discovered some odd behaviour in RxJava |
This operator is very challenging. Please allow me some time to implement it. We are happy to accept contribution for |
It was a challenge for me too. I will start working on |
@amihusb Here are some tips for you:
PS: I'm on vacation currently, so probably won't be able to review promptly :-) Thanks for your contributions! |
@arkivanov thanks. Have a nice vacation. |
@arkivanov Hi. Sorry for long wait, there was no time. Today will post my implementation. |
@arkivanov could you help me? If i modify this test in this fix: @Test
public void cancelAfterAbandonmentSize() {
PublishSubject<Integer> ps = PublishSubject.create();
AtomicReference<Observable<Integer>> firstWindow = new AtomicReference<>();
TestObserver<Observable<Integer>> to = ps.window(3, 1)
.doOnNext((window) -> {
if (!firstWindow.compareAndSet(null, window)) {
window.subscribe();
}
})
.test();
assertTrue(ps.hasObservers());
ps.onNext(1);
ps.onNext(2);
to.dispose();
firstWindow.get()
.test()
.assertValues(1, 2);
} This test will fail with following description: This is correct behavior or this is a bug? |
Hey @amihusb to me the behaviour looks correct. This operator emits |
@arkivanov this operator emits Also i updated link to test in previous post, it was FlowableWindowWithSizeTest |
@arkivanov here is another test: @Test
public void cancelAfterAbandonmentSize() {
PublishSubject<Integer> ps = PublishSubject.create();
TestObserver<Integer> to = ps.window(3)
.flatMap((window) -> window.delaySubscription(1, TimeUnit.SECONDS))
.test();
ps.onNext(1);
ps.onNext(2);
ps.onNext(3);
to.dispose();
to.assertValues(1, 2, 3);
}
|
Yep, you are right. So both tests look like a bug currently, but we need to clarify this. Would you mind to raise an issue in their repo? |
@arkivanov yes, i will raise. |
Created an issue. Waiting for result. |
So in RxJava windows are explicitly closed (completed) if not subscribed right after emission (abandoned). There is a notice in JavaDocs:
I'm not sure what is the race condition in subject, so I asked a question in that issue. Personally I find this behaviour error prone. And Reaktive's current implementation of the window(boundaries, ...) operator does not close abandoned windows. They, being I will wait for an answer and fix my implementation if there is a reasonable concern. |
So the answer was pretty quick and is clear. We need the same behaviour. I will fix the existing In general, emitted windows may be unintentionally lost during downstream cancellation. Such windows will never reach subscribers, which potentially can prevent the upstream's cancellation. |
No description provided.
The text was updated successfully, but these errors were encountered: