diff --git a/reaktive/api/reaktive.api b/reaktive/api/reaktive.api index 7a07a7338..f0accfa43 100644 --- a/reaktive/api/reaktive.api +++ b/reaktive/api/reaktive.api @@ -951,6 +951,11 @@ public final class com/badoo/reaktive/observable/WindowByBoundaryKt { public static synthetic fun window$default (Lcom/badoo/reaktive/observable/Observable;Lcom/badoo/reaktive/observable/Observable;JZILjava/lang/Object;)Lcom/badoo/reaktive/observable/Observable; } +public final class com/badoo/reaktive/observable/WindowSizedKt { + public static final fun window (Lcom/badoo/reaktive/observable/Observable;JJ)Lcom/badoo/reaktive/observable/Observable; + public static synthetic fun window$default (Lcom/badoo/reaktive/observable/Observable;JJILjava/lang/Object;)Lcom/badoo/reaktive/observable/Observable; +} + public final class com/badoo/reaktive/observable/WithLatestFromKt { public static final fun withLatestFrom (Lcom/badoo/reaktive/observable/Observable;Lcom/badoo/reaktive/observable/Observable;Lcom/badoo/reaktive/observable/Observable;Lcom/badoo/reaktive/observable/Observable;Lkotlin/jvm/functions/Function4;)Lcom/badoo/reaktive/observable/Observable; public static final fun withLatestFrom (Lcom/badoo/reaktive/observable/Observable;Lcom/badoo/reaktive/observable/Observable;Lcom/badoo/reaktive/observable/Observable;Lkotlin/jvm/functions/Function3;)Lcom/badoo/reaktive/observable/Observable; diff --git a/reaktive/src/commonMain/kotlin/com/badoo/reaktive/observable/WindowSized.kt b/reaktive/src/commonMain/kotlin/com/badoo/reaktive/observable/WindowSized.kt new file mode 100644 index 000000000..6d6e7268d --- /dev/null +++ b/reaktive/src/commonMain/kotlin/com/badoo/reaktive/observable/WindowSized.kt @@ -0,0 +1,113 @@ +package com.badoo.reaktive.observable + +import com.badoo.reaktive.base.setCancellable +import com.badoo.reaktive.disposable.Disposable +import com.badoo.reaktive.disposable.DisposableWrapper +import com.badoo.reaktive.subject.unicast.UnicastSubject +import com.badoo.reaktive.utils.atomic.AtomicBoolean +import com.badoo.reaktive.utils.atomic.AtomicInt +import com.badoo.reaktive.utils.atomic.AtomicLong +import com.badoo.reaktive.utils.queue.SharedQueue + +/** + * Please refer to the corresponding RxJava + * [document](http://reactivex.io/RxJava/javadoc/io/reactivex/Observable.html#window-long-long-). + */ +fun Observable.window( + count: Long, + skip: Long = count +): Observable> { + require(count > 0) { "count > 0 required but it was $count" } + require(skip > 0) { "skip > 0 required but it was $skip" } + + return observable { emitter -> + val activeWindowsCount = AtomicInt(1) + val upstreamObserver = UpstreamObserver( + count = count, + skip = skip, + activeWindowsCount = activeWindowsCount, + downstream = emitter + ) + + emitter.setCancellable { + if (activeWindowsCount.addAndGet(-1) == 0) { + upstreamObserver.dispose() + } + } + + subscribe(upstreamObserver) + } +} + +private class UpstreamObserver( + private val count: Long, + private val skip: Long, + private val activeWindowsCount: AtomicInt, + private val downstream: ObservableCallbacks> +) : DisposableWrapper(), ObservableObserver { + private val windows = SharedQueue>() + private val skippedCount = AtomicLong() + private val tailWindowValuesCount = AtomicLong() + private val onWindowTerminate: () -> Unit = { + if (activeWindowsCount.addAndGet(-1) == 0) { + dispose() + } + } + + override fun onSubscribe(disposable: Disposable) { + set(disposable) + } + + override fun onNext(value: T) { + val skipped = skippedCount.value + val windowWrapper: WindowWrapper? + + if (skipped == 0L) { + activeWindowsCount.addAndGet(1) + val window = UnicastSubject(onTerminate = onWindowTerminate) + windowWrapper = WindowWrapper(window) + windows.offer(window) + downstream.onNext(windowWrapper) + } else { + windowWrapper = null + } + + windows.forEach { it.onNext(value) } + + skippedCount.value = (skipped + 1) % skip + + if (tailWindowValuesCount.value + 1 == count) { + requireNotNull(windows.poll()).onComplete() + tailWindowValuesCount.addAndGet(1 - skip) + } else { + tailWindowValuesCount.addAndGet(1) + } + + if (windowWrapper?.isSubscribed?.value == false) { + windowWrapper.window.onComplete() + } + } + + override fun onComplete() { + windows.forEach { it.onComplete() } + downstream.onComplete() + dispose() + } + + override fun onError(error: Throwable) { + windows.forEach { it.onError(error) } + downstream.onError(error) + dispose() + } + + private class WindowWrapper( + val window: UnicastSubject + ) : Observable { + val isSubscribed = AtomicBoolean() + + override fun subscribe(observer: ObservableObserver) { + isSubscribed.value = true + window.subscribe(observer) + } + } +} diff --git a/reaktive/src/commonTest/kotlin/com/badoo/reaktive/observable/WindowSizedTest.kt b/reaktive/src/commonTest/kotlin/com/badoo/reaktive/observable/WindowSizedTest.kt new file mode 100644 index 000000000..748337c6c --- /dev/null +++ b/reaktive/src/commonTest/kotlin/com/badoo/reaktive/observable/WindowSizedTest.kt @@ -0,0 +1,443 @@ +package com.badoo.reaktive.observable + +import com.badoo.reaktive.base.Observer +import com.badoo.reaktive.completable.CompletableCallbacks +import com.badoo.reaktive.test.base.assertError +import com.badoo.reaktive.test.base.hasSubscribers +import com.badoo.reaktive.test.observable.* +import kotlin.test.Test +import kotlin.test.assertFalse +import kotlin.test.assertTrue + +class WindowSizedTest { + + private val upstream = TestObservable() + + private fun window(count: Long = 1, skip: Long = count) = upstream.window(count = count, skip = skip).testWindows().test() + + private fun Observable>.testWindows(): Observable> = observableUnsafe { observer -> + subscribe( + object : ObservableObserver>, CompletableCallbacks by observer, Observer by observer { + override fun onNext(value: Observable) { + observer.onNext(value.test()) + } + } + ) + } + + @Test + fun first_window_receives_values_WHEN_windows_is_gapless_and_upstream_produced_values() { + val observer = window(count = 3) + + upstream.onNext(1, 2, 3, 4, 5) + + val window = observer.values.first() + window.assertValues(1, 2, 3) + } + + @Test + fun second_window_receives_values_WHEN_windows_is_gapless_and_upstream_produced_values() { + val observer = window(count = 3) + + upstream.onNext(1, 2, 3, 4, 5) + + val window = observer.values[1] + window.assertValues(4, 5) + } + + @Test + fun first_window_receives_values_WHEN_windows_is_overlapping_and_upstream_produced_values() { + val observer = window(count = 3) + + upstream.onNext(1, 2, 3, 4, 5) + + val window = observer.values.first() + window.assertValues(1, 2, 3) + } + + @Test + fun second_window_receives_values_WHEN_windows_is_overlapping_and_upstream_produced_values() { + val observer = window(count = 3, skip = 2) + + upstream.onNext(1, 2, 3, 4, 5) + + val window = observer.values[1] + window.assertValues(3, 4, 5) + } + + @Test + fun third_window_receives_value_WHEN_windows_is_overlapping_and_upstream_produced_values() { + val observer = window(count = 3, skip = 2) + + upstream.onNext(1, 2, 3, 4, 5) + + val window = observer.values[2] + window.assertValues(5) + } + + @Test + fun first_window_receives_values_WHEN_windows_is_non_overlapping_and_upstream_produced_values() { + val observer = window(count = 2, skip = 3) + + upstream.onNext(1, 2, 3, 4, 5) + + val window = observer.values.first() + window.assertValues(1, 2) + } + + @Test + fun second_window_receives_values_WHEN_windows_is_non_overlapping_and_upstream_produced_values() { + val observer = window(count = 2, skip = 3) + + upstream.onNext(1, 2, 3, 4, 5) + + val window = observer.values[1] + window.assertValues(4, 5) + } + + @Test + fun second_window_receives_values_WHEN_windows_is_gapless_and_upstream_produced_values_and_downstream_disposed() { + val observer = window(count = 3, skip = 3) + + upstream.onNext(1, 2, 3, 4) + val windowObserver = observer.values[1] + observer.dispose() + upstream.onNext(5, 6) + + windowObserver.assertValues(4, 5, 6) + } + + @Test + fun second_window_receives_values_WHEN_windows_is_overlapping_and_upstream_produced_values_and_downstream_disposed() { + val observer = window(count = 3, skip = 2) + + upstream.onNext(1, 2, 3) + val windowObserver = observer.values.last() + observer.dispose() + upstream.onNext(4) + + windowObserver.assertValues(3, 4) + } + + @Test + fun second_window_receives_values_WHEN_windows_id_non_overlapping_and_upstream_produced_values_and_downstream_disposed() { + val observer = window(count = 2, skip = 3) + + upstream.onNext(1, 2, 3, 4) + val windowObserver = observer.values.last() + observer.dispose() + upstream.onNext(5) + + windowObserver.assertValues(4, 5) + } + + @Test + fun disposed_WHEN_windows_is_gapless_and_downstream_disposed() { + val observer = upstream.window(count = 1).test() + + observer.dispose() + + assertTrue(observer.isDisposed) + } + + @Test + fun disposed_WHEN_windows_is_overlapping_and_downstream_disposed() { + val observer = upstream.window(count = 2, skip = 1).test() + + observer.dispose() + + assertTrue(observer.isDisposed) + } + + @Test + fun disposed_WHEN_windows_is_non_overlapping_and_downstream_disposed() { + val observer = upstream.window(count = 1, skip = 2).test() + + observer.dispose() + + assertTrue(observer.isDisposed) + } + + @Test + fun first_window_produces_error_WHEN_windows_is_gapless_and_upstream_produces_values_and_window_subscribed_second_time() { + val observer = upstream.window(count = 1).test() + + upstream.onNext(0) + observer.values.first().test() + val windowObserver = observer.values.first().test() + + windowObserver.assertError() + } + + @Test + fun first_window_produces_error_WHEN_windows_is_overlapping_and_upstream_produces_values_and_window_subscribed_second_time() { + val observer = upstream.window(count = 2, skip = 1).test() + + upstream.onNext(0) + observer.values.first().test() + val windowObserver = observer.values.first().test() + + windowObserver.assertError() + } + + @Test + fun first_window_produces_error_WHEN_windows_is_non_overlapping_and_upstream_produces_values_and_window_subscribed_second_time() { + val observer = upstream.window(count = 1, skip = 2).test() + + upstream.onNext(0) + observer.values.first().test() + val windowObserver = observer.values.first().test() + + windowObserver.assertError() + } + + @Test + fun produces_error_WHEN_windows_is_gapless_and_upstream_produces_error() { + val observer = window(count = 2) + val error = Error() + + upstream.onError(error) + + observer.assertError(error) + } + + @Test + fun produces_error_WHEN_windows_is_overlapping_and_upstream_produces_error() { + val observer = window(count = 2, skip = 1) + val error = Error() + + upstream.onError(error) + + observer.assertError(error) + } + + @Test + fun produces_error_WHEN_windows_is_non_overlapping_and_upstream_produces_error() { + val observer = window(count = 1, skip = 2) + val error = Error() + + upstream.onError(error) + + observer.assertError(error) + } + + @Test + fun first_window_produces_error_WHEN_windows_is_gapless_and_upstream_produces_values_and_error() { + val observer = window(count = 2, skip = 2) + val error = Throwable() + + upstream.onNext(0) + upstream.onError(error) + + val window = observer.values.first() + + window.assertError(error) + } + + @Test + fun first_window_produces_error_WHEN_windows_is_overlapping_and_upstream_produces_values_and_error() { + val observer = window(count = 4, skip = 2) + val error = Throwable() + + upstream.onNext(1, 2, 3) + upstream.onError(error) + + val window = observer.values.first() + + window.assertError(error) + } + + @Test + fun second_window_produces_error_WHEN_windows_is_overlapping_and_upstream_produces_values_and_error() { + val observer = window(count = 4, skip = 2) + val error = Throwable() + + upstream.onNext(1, 2, 3) + upstream.onError(error) + + val window = observer.values[1] + + window.assertError(error) + } + + @Test + fun first_window_produces_error_WHEN_windows_is_non_overlapping_and_upstream_produces_values_and_error() { + val observer = window(count = 2, skip = 3) + val error = Throwable() + + upstream.onNext(1) + upstream.onError(error) + + val window = observer.values.first() + + window.assertError(error) + } + + @Test + fun first_window_completes_WHEN_windows_is_gapless_and_upstream_produces_values_and_completes() { + val observer = window(count = 2, skip = 2) + + upstream.onNext(1) + upstream.onComplete() + + val window = observer.values.first() + + window.assertComplete() + } + + @Test + fun first_window_completes_WHEN_windows_is_overlapping_and_upstream_produces_values_and_completes() { + val observer = window(count = 4, skip = 2) + + upstream.onNext(1, 2, 3) + upstream.onComplete() + + val window = observer.values.first() + + window.assertComplete() + } + + @Test + fun second_window_completes_WHEN_windows_is_overlapping_and_upstream_produces_values_and_completes() { + val observer = window(count = 4, skip = 2) + + upstream.onNext(1, 2, 3) + upstream.onComplete() + + val window = observer.values[1] + + window.assertComplete() + } + + @Test + fun first_window_completes_WHEN_windows_is_non_overlapping_and_upstream_produces_values_and_completes() { + val observer = window(count = 2, skip = 3) + + upstream.onNext(1) + upstream.onComplete() + + val window = observer.values.first() + + window.assertComplete() + } + + @Test + fun unsubscribe_from_upstream_WHEN_windows_is_gapless_and_upstream_produced_value_and_downstream_disposed_and_window_disposed() { + val observer = window(count = 2) + + upstream.onNext(1) + observer.dispose() + observer.values[0].dispose() + + assertFalse(upstream.hasSubscribers) + } + + @Test + fun unsubscribe_from_upstream_WHEN_windows_is_overlapping_and_upstream_produced_value_and_downstream_disposed_and_window_disposed() { + val observer = window(count = 3, skip = 2) + + upstream.onNext(1) + observer.dispose() + observer.values[0].dispose() + + assertFalse(upstream.hasSubscribers) + } + + @Test + fun unsubscribe_from_upstream_WHEN_windows_is_non_overlapping_and_upstream_produced_value_and_downstream_disposed_and_window_disposed() { + val observer = window(count = 2, skip = 3) + + upstream.onNext(1) + observer.dispose() + observer.values[0].dispose() + + assertFalse(upstream.hasSubscribers) + } + + @Test + fun unsubscribe_from_upstream_WHEN_windows_is_gapless_and_downstream_disposed() { + val observer = window() + + observer.dispose() + + assertFalse(upstream.hasSubscribers) + } + + @Test + fun unsubscribe_from_upstream_WHEN_windows_is_overlapping_and_downstream_disposed() { + val observer = window(count = 3, skip = 2) + + observer.dispose() + + assertFalse(upstream.hasSubscribers) + } + + @Test + fun unsubscribe_from_upstream_WHEN_windows_is_non_overlapping_and_downstream_disposed() { + val observer = window(count = 2, skip = 3) + + observer.dispose() + + assertFalse(upstream.hasSubscribers) + } + + @Test + fun unsubscribe_from_upstream_WHEN_windows_is_gapless_and_upstream_produces_value_and_downstream_not_subscribed_to_window_and_downstream_disposed() { + val observer = upstream.window(count = 2).test() + + upstream.onNext(1) + observer.dispose() + + assertFalse(upstream.hasSubscribers) + } + + @Test + fun does_not_unsubscribe_from_upstream_WHEN_windows_is_gapless_upstream_produces_value_and_downstream_subscribed_to_window_and_downstream_disposed() { + val observer = window(count = 2) + + upstream.onNext(0) + observer.dispose() + + assertTrue(upstream.hasSubscribers) + } + + @Test + fun unsubscribe_from_upstream_WHEN_windows_is_overlapping_and_upstream_produces_value_and_downstream_not_subscribed_to_window_and_downstream_disposed() { + val observer = upstream.window(count = 2, skip = 1).test() + + upstream.onNext(1) + observer.dispose() + + assertFalse(upstream.hasSubscribers) + } + + @Test + fun does_not_unsubscribe_from_upstream_WHEN_windows_is_overlapping_upstream_produces_value_and_downstream_subscribed_to_window_and_downstream_disposed() { + val observer = window(count = 2, skip = 1) + + upstream.onNext(0) + observer.dispose() + + assertTrue(upstream.hasSubscribers) + } + + @Test + fun unsubscribe_from_upstream_WHEN_windows_is_non_overlapping_and_upstream_produces_value_and_downstream_not_subscribed_to_window_and_downstream_disposed() { + val observer = upstream.window(count = 2, skip = 3).test() + + upstream.onNext(1) + observer.dispose() + + assertFalse(upstream.hasSubscribers) + } + + @Test + fun does_not_unsubscribe_from_upstream_WHEN_windows_is_non_overlapping_upstream_produces_value_and_downstream_subscribed_to_window_and_downstream_disposed() { + val observer = window(count = 2, skip = 3) + + upstream.onNext(0) + observer.dispose() + + assertTrue(upstream.hasSubscribers) + } +}