Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added Observable.window(count, skip) operator #534

Merged
merged 8 commits into from
Sep 13, 2020
5 changes: 5 additions & 0 deletions reaktive/api/reaktive.api
Original file line number Diff line number Diff line change
Expand Up @@ -951,6 +951,11 @@ public final class com/badoo/reaktive/observable/WindowByBoundaryKt {
public static synthetic fun window$default (Lcom/badoo/reaktive/observable/Observable;Lcom/badoo/reaktive/observable/Observable;JZILjava/lang/Object;)Lcom/badoo/reaktive/observable/Observable;
}

public final class com/badoo/reaktive/observable/WindowSizedKt {
public static final fun window (Lcom/badoo/reaktive/observable/Observable;JJ)Lcom/badoo/reaktive/observable/Observable;
public static synthetic fun window$default (Lcom/badoo/reaktive/observable/Observable;JJILjava/lang/Object;)Lcom/badoo/reaktive/observable/Observable;
}

public final class com/badoo/reaktive/observable/WithLatestFromKt {
public static final fun withLatestFrom (Lcom/badoo/reaktive/observable/Observable;Lcom/badoo/reaktive/observable/Observable;Lcom/badoo/reaktive/observable/Observable;Lcom/badoo/reaktive/observable/Observable;Lkotlin/jvm/functions/Function4;)Lcom/badoo/reaktive/observable/Observable;
public static final fun withLatestFrom (Lcom/badoo/reaktive/observable/Observable;Lcom/badoo/reaktive/observable/Observable;Lcom/badoo/reaktive/observable/Observable;Lkotlin/jvm/functions/Function3;)Lcom/badoo/reaktive/observable/Observable;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
package com.badoo.reaktive.observable

import com.badoo.reaktive.base.setCancellable
import com.badoo.reaktive.disposable.Disposable
import com.badoo.reaktive.disposable.DisposableWrapper
import com.badoo.reaktive.subject.unicast.UnicastSubject
import com.badoo.reaktive.utils.atomic.AtomicBoolean
import com.badoo.reaktive.utils.atomic.AtomicInt
import com.badoo.reaktive.utils.atomic.AtomicLong
import com.badoo.reaktive.utils.queue.SharedQueue

/**
* Please refer to the corresponding RxJava
* [document](http://reactivex.io/RxJava/javadoc/io/reactivex/Observable.html#window-long-long-).
*/
fun <T> Observable<T>.window(
count: Long,
skip: Long = count
): Observable<Observable<T>> {
require(count > 0) { "count > 0 required but it was $count" }
require(skip > 0) { "skip > 0 required but it was $skip" }

return observable { emitter ->
val activeWindowsCount = AtomicInt(1)
val upstreamObserver = UpstreamObserver(
count = count,
skip = skip,
activeWindowsCount = activeWindowsCount,
downstream = emitter
)

emitter.setCancellable {
if (activeWindowsCount.addAndGet(-1) == 0) {
upstreamObserver.dispose()
}
}

subscribe(upstreamObserver)
}
}

private class UpstreamObserver<T>(
private val count: Long,
private val skip: Long,
private val activeWindowsCount: AtomicInt,
private val downstream: ObservableCallbacks<Observable<T>>
) : DisposableWrapper(), ObservableObserver<T> {
private val windows = SharedQueue<UnicastSubject<T>>()
private val skippedCount = AtomicLong()
private val tailWindowValuesCount = AtomicLong()
private val onWindowTerminate: () -> Unit = {
if (activeWindowsCount.addAndGet(-1) == 0) {
dispose()
}
}

override fun onSubscribe(disposable: Disposable) {
set(disposable)
}

override fun onNext(value: T) {
val skipped = skippedCount.value
val windowWrapper: WindowWrapper<T>?

if (skipped == 0L) {
activeWindowsCount.addAndGet(1)
val window = UnicastSubject<T>(onTerminate = onWindowTerminate)
windowWrapper = WindowWrapper(window)
windows.offer(window)
downstream.onNext(windowWrapper)
} else {
windowWrapper = null
}

windows.forEach { it.onNext(value) }

skippedCount.value = (skipped + 1) % skip

if (tailWindowValuesCount.value + 1 == count) {
requireNotNull(windows.poll()).onComplete()
tailWindowValuesCount.addAndGet(1 - skip)
} else {
tailWindowValuesCount.addAndGet(1)
}

if (windowWrapper?.isSubscribed?.value == false) {
windowWrapper.window.onComplete()
}
}

override fun onComplete() {
windows.forEach { it.onComplete() }
downstream.onComplete()
dispose()
}

override fun onError(error: Throwable) {
windows.forEach { it.onError(error) }
downstream.onError(error)
dispose()
}

private class WindowWrapper<T>(
val window: UnicastSubject<T>
) : Observable<T> {
val isSubscribed = AtomicBoolean()

override fun subscribe(observer: ObservableObserver<T>) {
isSubscribed.value = true
window.subscribe(observer)
}
}
}
Loading