Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Observable.window operator #411

Closed
arkivanov opened this issue Feb 11, 2020 · 26 comments · Fixed by #540
Closed

Add Observable.window operator #411

arkivanov opened this issue Feb 11, 2020 · 26 comments · Fixed by #540
Assignees
Labels
enhancement New feature or request

Comments

@arkivanov
Copy link
Contributor

No description provided.

@arkivanov arkivanov added the enhancement New feature or request label Feb 11, 2020
@arkivanov arkivanov self-assigned this Feb 11, 2020
@amihusb
Copy link
Contributor

amihusb commented Jul 29, 2020

I want to implement this.

fun <T> Observable<T>.window(
    count: Long,
    skip: Long = count,
    bufferSize: Int = Int.MAX_VALUE
): Observable<Observable<T>> = TODO()

fun <T> Observable<T>.window(
    spanMillis: Long,
    scheduler: Scheduler,
    skipMillis: Long = spanMillis,
    maxSize: Long = Long.MAX_VALUE,
    restartOnMaxSize: Boolean = false,
    bufferSize: Int = Int.MAX_VALUE
): Observable<Observable<T>> = TODO()

fun <T> Observable<T>.window(
    boundary: Observable<*>,
    bufferSize: Int = Int.MAX_VALUE
): Observable<Observable<T>> = TODO()

fun <T, U> Observable<T>.window(
    openingIndicator: Observable<U>,
    closingIndicator: (U) -> Maybe<*>,
    bufferSize: Int = Int.MAX_VALUE
): Observable<Observable<T>> = TODO()

fun <T> Observable<T>.window(
    boundarySupplier: () -> Maybe<*>,
    bufferSize: Int = Int.MAX_VALUE
): Observable<Observable<T>> = TODO()

I will start with the first variant.

  • Are there all overloads of window operator?
  • Do you want to change something in signature or names?
  • Can we reduce the number of implementations? How? I didn't find any way
  • Should we rename the second overload to windowTimed?

@arkivanov
Copy link
Contributor Author

arkivanov commented Jul 29, 2020

@amihusb I need to think about the API a bit, I will try to respond by 31/07. Thanks for contribution!

@arkivanov
Copy link
Contributor Author

Hey @amihusb, I have found a draft stash of the window operator on my machine. I could try to finish it tomorrow, at least one of the overloads. Would it work for you?

@amihusb
Copy link
Contributor

amihusb commented Jul 30, 2020

@arkivanov yes. Which overload will you do?

@arkivanov
Copy link
Contributor Author

@amihusb I still need to think about the API, because window is not a straightforward operator. But when I was working on the draft I had the following in mind:

fun <T> Observable<T>.window(count: Int, skip: Int = count): Observable<Observable<T>> = TODO()

fun <T> Observable<T>.window(boundary: Single<WindowBoundary>): Observable<Observable<T>> = TODO()

fun <T> Observable<T>.window(boundary: Observable<WindowBoundary>): Observable<Observable<T>> = TODO()

class WindowBoundary(
    val closingSignal: Completable,
    val limit: Int = Int.MAX_VALUE,
    val restartIfLimitReached: Boolean = false
)

Looks like this set of operators covers all RxJava's cases. E.g.:

fun <T> Observable<T>.window(timeSpan: Long, limit: Int, scheduler: Scheduler, restart: Boolean): Observable<Observable<T>> =
    window(
        WindowBoundary(
            closingSignal = completableOfEmpty().delay(timeSpan, scheduler),
            limit = limit,
            restartIfLimitReached = restart
        ).toSingle()
    )

fun <T> Observable<T>.window(boundary: Observable<*>): Observable<Observable<T>> =
    window(boundary.map { WindowBoundary(closingSignal = completableOfNever()) })

fun <T> Observable<T>.window(timespan: Long, timeskip: Long, scheduler: Scheduler): Observable<Observable<T>> =
    window(
        observableOf(WindowBoundary(closingSignal = completableOfEmpty().delay(timespan, scheduler)))
            .repeatWhen { maybeOf(Unit).delay(timeskip, scheduler) }
    )

But again it's not final. Let's wait for my draft to be finalized by Monday 03/08.

@amihusb
Copy link
Contributor

amihusb commented Jul 30, 2020

@arkivanov does this code covers RxJava behaviour?

fun <T> Observable<T>.window(timeSpan: Long, limit: Int, scheduler: Scheduler, restart: Boolean): Observable<Observable<T>> =
    window(
        WindowBoundary(
            closingSignal = completableOfEmpty().delay(timeSpan, scheduler),
            limit = limit,
            restartIfLimitReached = restart
        ).toSingle()
    )

If I understand correctly what fun <T> Observable<T>.window(boundary: Single<WindowBoundary>): Observable<Observable<T>> = TODO() does:
reciever.onNext -> open window -> closingSignal.subscribe -> closingSignal.onComplete -> close window
Right?

If so, this implementation don't cover this case, because in RxJava implementation it opens window at onSubscribe.

But this will work correctly:

fun <T> Observable<T>.window(timeSpan: Long, limit: Int, scheduler: Scheduler, restart: Boolean): Observable<Observable<T>> =
    window(
        WindowBoundary(
            closingSignal = completableOfEmpty().delay(timespan, scheduler),
            limit = limit,
            restartIfLimitReached = restart
       )
            .toObservable()
            .repeatWhen { maybeOf(Unit).delay(timespan, scheduler) }
    )

WindowBoundary will be in public use? I think it should be internal
bufferSize is just ommited?

@amihusb
Copy link
Contributor

amihusb commented Jul 30, 2020

@arkivanov also this implementation will not work correctly:

fun <T> Observable<T>.window(boundary: Observable<*>): Observable<Observable<T>> =
    window(boundary.map { WindowBoundary(closingSignal = completableOfNever()) })

Implementation above will open window on every boundary.onNext and never closes.

In RxJava ObservableWindowBoundary works like that:
boundary.onNext -> close active window and open new window

Correct me if i'm wrong.

@arkivanov
Copy link
Contributor Author

If I understand correctly what fun <T> Observable<T>.window(boundary: Single<WindowBoundary>): Observable<Observable<T>> = TODO() does:
reciever.onNext -> open window -> closingSignal.subscribe -> closingSignal.onComplete -> close window
Right?

Yes this is correct. And I believe window will be opened on subscribe as well. Since the WindowBoundary will be emitted synchronously on subscribe. Correct me If I'm wrong.

WindowBoundary will be in public use? I think it should be internal

Currently I want it public, as well as all these three operators.

bufferSize is just ommited?

I will add if on demand, if necessary. Just need to check.

@arkivanov
Copy link
Contributor Author

@arkivanov also this implementation will not work correctly:

The window is reopened once a new WindowBoundary is emitted.

@arkivanov
Copy link
Contributor Author

arkivanov commented Jul 30, 2020

Please allow some time for me to finish everything. The API is not final yet.

@amihusb
Copy link
Contributor

amihusb commented Jul 30, 2020

Ok, seems like i don't understand some implementations, waiting.

@arkivanov
Copy link
Contributor Author

I discovered some odd behaviour in RxJava window implementations and raised the following issue: ReactiveX/RxJava#7048. Waiting for clarification.

@arkivanov
Copy link
Contributor Author

This operator is very challenging. Please allow me some time to implement it. We are happy to accept contribution for window(count, skip) version. But this may be challenging as well.

@amihusb
Copy link
Contributor

amihusb commented Aug 4, 2020

It was a challenge for me too. I will start working on window (count, skip) in the evening, thanks.

@arkivanov
Copy link
Contributor Author

arkivanov commented Aug 5, 2020

@amihusb Here are some tips for you:

  • Please check the issue I raised for RxJava, there is a bug in window(count, skip): 3.x: Observable.window() operators do not dispose upstream while there is an active window ReactiveX/RxJava#7048
  • Write granular meaningful tests - each test should check just one specific thing (please refer to existing tests)
  • Please use testing utils like TestObservable, window(...).test(), TestScheduler, etc.
  • Please try to avoid using existing operators and builders in tests (e.g. observable {}, observableOfEmpty, etc.) - health of other operators should not affect unrelated tests
  • Don't forget to run detekt and apiDump tasks at the end
  • Please try to keep the same code style for readability across the project

PS: I'm on vacation currently, so probably won't be able to review promptly :-)

Thanks for your contributions!

@amihusb
Copy link
Contributor

amihusb commented Aug 6, 2020

@arkivanov thanks.

Have a nice vacation.

@amihusb
Copy link
Contributor

amihusb commented Aug 16, 2020

@arkivanov Hi. Sorry for long wait, there was no time. Today will post my implementation.

@amihusb
Copy link
Contributor

amihusb commented Aug 16, 2020

@arkivanov could you help me?

If i modify this test in this fix:

@Test
public void cancelAfterAbandonmentSize() {
    PublishSubject<Integer> ps = PublishSubject.create();
    AtomicReference<Observable<Integer>> firstWindow = new AtomicReference<>();
    TestObserver<Observable<Integer>> to = ps.window(3, 1)
            .doOnNext((window) -> {
                if (!firstWindow.compareAndSet(null, window)) {
                    window.subscribe();
                }
            })
            .test();

    assertTrue(ps.hasObservers());

    ps.onNext(1);
    ps.onNext(2);

    to.dispose();

    firstWindow.get()
            .test()
            .assertValues(1, 2);
}

This test will fail with following description:
Value count differs; expected: 2 [1, 2] but was: 1 [1] (latch = 0, values = 1, errors = 0, completions = 1)

This is correct behavior or this is a bug?

@arkivanov
Copy link
Contributor Author

Hey @amihusb to me the behaviour looks correct. This operator emits non-overlapping windows, previous window completes once a new one is emitted. The skip parameter is 1 so there will be a new window after every item. The first window should receive 1 and complete, after that the second window will be emitted and will receive 2.

@amihusb
Copy link
Contributor

amihusb commented Aug 16, 2020

@arkivanov this operator emits overlapping windows. Here is the test.

Also i updated link to test in previous post, it was FlowableWindowWithSizeTest

@amihusb
Copy link
Contributor

amihusb commented Aug 16, 2020

@arkivanov here is another test:

@Test
public void cancelAfterAbandonmentSize() {
    PublishSubject<Integer> ps = PublishSubject.create();

    TestObserver<Integer> to = ps.window(3)
            .flatMap((window) -> window.delaySubscription(1, TimeUnit.SECONDS))
            .test();

    ps.onNext(1);
    ps.onNext(2);
    ps.onNext(3);

    to.dispose();

    to.assertValues(1, 2, 3);
}

Value count differs; expected: 3 [1, 2, 3] but was: 0 [] (latch = 1, values = 0, errors = 0, completions = 0, disposed!)

@arkivanov
Copy link
Contributor Author

@amihusb

this operator emits overlapping windows

Yep, you are right. So both tests look like a bug currently, but we need to clarify this. Would you mind to raise an issue in their repo?

@amihusb
Copy link
Contributor

amihusb commented Aug 16, 2020

@arkivanov yes, i will raise.

@amihusb
Copy link
Contributor

amihusb commented Aug 16, 2020

Created an issue. Waiting for result.

@arkivanov
Copy link
Contributor Author

So in RxJava windows are explicitly closed (completed) if not subscribed right after emission (abandoned). There is a notice in JavaDocs:

Note that ignoring windows or subscribing later (i.e., on another thread) will result in
so-called window abandonment where a window may not contain any elements. In this case, subsequent
elements will be dropped until the condition for the next window boundary is satisfied. The behavior is
a trade-off for ensuring upstream cancellation can happen under some race conditions.

I'm not sure what is the race condition in subject, so I asked a question in that issue.

Personally I find this behaviour error prone. And Reaktive's current implementation of the window(boundaries, ...) operator does not close abandoned windows. They, being UnicastSubjects (same as in RxJava), accumulate values until subscribed. Each window can be subscribed only once, again same as in RxJava. And windows considered closed when disposed.

I will wait for an answer and fix my implementation if there is a reasonable concern.

@arkivanov
Copy link
Contributor Author

So the answer was pretty quick and is clear. We need the same behaviour. I will fix the existing window(boundaries, ...) implementation, and all following ones should comply to this requirement as well.

In general, emitted windows may be unintentionally lost during downstream cancellation. Such windows will never reach subscribers, which potentially can prevent the upstream's cancellation.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement New feature or request
Projects
None yet
Development

Successfully merging a pull request may close this issue.

2 participants