diff --git a/.idea/kotlinc.xml b/.idea/kotlinc.xml index 8d81632f8..fe63bb677 100644 --- a/.idea/kotlinc.xml +++ b/.idea/kotlinc.xml @@ -1,6 +1,6 @@ - \ No newline at end of file diff --git a/sdk-conference/src/main/kotlin/com/pexip/sdk/conference/ConferenceEvent.kt b/sdk-conference/src/main/kotlin/com/pexip/sdk/conference/ConferenceEvent.kt index 159d1a65d..2e5814eb1 100644 --- a/sdk-conference/src/main/kotlin/com/pexip/sdk/conference/ConferenceEvent.kt +++ b/sdk-conference/src/main/kotlin/com/pexip/sdk/conference/ConferenceEvent.kt @@ -23,7 +23,7 @@ public sealed interface ConferenceEvent { } @Deprecated( - message = "Use Roster.present instead to observe presentation state.", + message = "Use Roster.presenter instead to observe presentation state.", level = DeprecationLevel.WARNING, ) public data class PresentationStartConferenceEvent( @@ -33,7 +33,7 @@ public data class PresentationStartConferenceEvent( ) : ConferenceEvent @Deprecated( - message = "Use Roster.present instead to observe presentation state.", + message = "Use Roster.presenter instead to observe presentation state.", level = DeprecationLevel.WARNING, ) public data class PresentationStopConferenceEvent(override val at: Long) : ConferenceEvent diff --git a/sdk-conference/src/main/kotlin/com/pexip/sdk/conference/infinity/InfinityConference.kt b/sdk-conference/src/main/kotlin/com/pexip/sdk/conference/infinity/InfinityConference.kt index b164dff44..e124454cf 100644 --- a/sdk-conference/src/main/kotlin/com/pexip/sdk/conference/infinity/InfinityConference.kt +++ b/sdk-conference/src/main/kotlin/com/pexip/sdk/conference/infinity/InfinityConference.kt @@ -35,16 +35,17 @@ import com.pexip.sdk.conference.infinity.internal.MessengerImpl import com.pexip.sdk.conference.infinity.internal.RefererImpl import com.pexip.sdk.conference.infinity.internal.RosterImpl import com.pexip.sdk.conference.infinity.internal.ThemeImpl -import com.pexip.sdk.conference.infinity.internal.WhileSubscribedWithDebounce import com.pexip.sdk.conference.infinity.internal.events +import com.pexip.sdk.core.WhileSubscribedWithDebounce import com.pexip.sdk.core.retry import com.pexip.sdk.infinity.UnsupportedInfinityException import com.pexip.sdk.media.IceServer import com.pexip.sdk.media.MediaConnectionSignaling import kotlinx.coroutines.CoroutineScope +import kotlinx.coroutines.DelicateCoroutinesApi import kotlinx.coroutines.Dispatchers +import kotlinx.coroutines.ExperimentalCoroutinesApi import kotlinx.coroutines.SupervisorJob -import kotlinx.coroutines.asCoroutineDispatcher import kotlinx.coroutines.cancel import kotlinx.coroutines.flow.MutableSharedFlow import kotlinx.coroutines.flow.SharingStarted @@ -53,24 +54,24 @@ import kotlinx.coroutines.flow.mapNotNull import kotlinx.coroutines.flow.merge import kotlinx.coroutines.flow.shareIn import kotlinx.coroutines.launch +import kotlinx.coroutines.newSingleThreadContext import kotlinx.coroutines.withContext import java.net.URL import java.util.concurrent.CopyOnWriteArraySet -import java.util.concurrent.Executors -import kotlin.time.Duration.Companion.milliseconds public class InfinityConference private constructor( private val step: InfinityService.ConferenceStep, response: RequestTokenResponse, ) : Conference { - private val executor = Executors.newSingleThreadScheduledExecutor() - private val scope = CoroutineScope(SupervisorJob() + executor.asCoroutineDispatcher()) + @OptIn(ExperimentalCoroutinesApi::class, DelicateCoroutinesApi::class) + private val context = newSingleThreadContext("InfinityConference") + private val scope = CoroutineScope(SupervisorJob() + context) private val store = TokenStore.create(response) private val event = step.events(store).shareIn( scope = scope, - started = SharingStarted.WhileSubscribedWithDebounce(100.milliseconds), + started = SharingStarted.WhileSubscribedWithDebounce(), ) private val listeners = CopyOnWriteArraySet() private val mutableConferenceEvent = MutableSharedFlow() @@ -161,7 +162,7 @@ public class InfinityConference private constructor( override fun leave() { scope.cancel() - executor.shutdown() + context.close() listeners.clear() } @@ -177,6 +178,10 @@ public class InfinityConference private constructor( * @return an instance of [InfinityConference] * @throws UnsupportedInfinityException if the version of Infinity is not supported */ + @Deprecated( + message = "Use a version of this method that accepts ConferenceStep", + replaceWith = ReplaceWith("create(service.newRequest(node).conference(conferenceAlias)), response)"), + ) @JvmStatic public fun create( service: InfinityService, diff --git a/sdk-conference/src/main/kotlin/com/pexip/sdk/conference/infinity/internal/Util.kt b/sdk-conference/src/main/kotlin/com/pexip/sdk/conference/infinity/internal/Util.kt deleted file mode 100644 index da69c0faa..000000000 --- a/sdk-conference/src/main/kotlin/com/pexip/sdk/conference/infinity/internal/Util.kt +++ /dev/null @@ -1,35 +0,0 @@ -/* - * Copyright 2024 Pexip AS - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.pexip.sdk.conference.infinity.internal - -import kotlinx.coroutines.FlowPreview -import kotlinx.coroutines.flow.SharingCommand -import kotlinx.coroutines.flow.SharingStarted -import kotlinx.coroutines.flow.debounce -import kotlinx.coroutines.flow.distinctUntilChanged -import kotlinx.coroutines.flow.map -import kotlin.time.Duration - -@OptIn(FlowPreview::class) -@Suppress("ktlint:standard:function-naming") -internal fun SharingStarted.Companion.WhileSubscribedWithDebounce(timeout: Duration) = - SharingStarted { subscriptionCount -> - subscriptionCount - .map { it > 0 } - .distinctUntilChanged() - .debounce(timeout) - .map { if (it) SharingCommand.START else SharingCommand.STOP_AND_RESET_REPLAY_CACHE } - } diff --git a/sdk-conference/src/test/kotlin/com/pexip/sdk/conference/infinity/internal/ConferenceEventTest.kt b/sdk-conference/src/test/kotlin/com/pexip/sdk/conference/infinity/internal/ConferenceEventTest.kt index 030710c1f..5597ccad3 100644 --- a/sdk-conference/src/test/kotlin/com/pexip/sdk/conference/infinity/internal/ConferenceEventTest.kt +++ b/sdk-conference/src/test/kotlin/com/pexip/sdk/conference/infinity/internal/ConferenceEventTest.kt @@ -1,5 +1,5 @@ /* - * Copyright 2022-2023 Pexip AS + * Copyright 2022-2024 Pexip AS * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -20,34 +20,19 @@ import com.pexip.sdk.api.infinity.DisconnectEvent import com.pexip.sdk.api.infinity.PresentationStartEvent import com.pexip.sdk.api.infinity.PresentationStopEvent import com.pexip.sdk.api.infinity.ReferEvent -import com.pexip.sdk.api.infinity.TokenStore import com.pexip.sdk.conference.DisconnectConferenceEvent import com.pexip.sdk.conference.FailureConferenceEvent import com.pexip.sdk.conference.PresentationStartConferenceEvent import com.pexip.sdk.conference.PresentationStopConferenceEvent import com.pexip.sdk.conference.ReferConferenceEvent -import kotlinx.coroutines.flow.MutableSharedFlow import java.util.UUID -import kotlin.properties.Delegates import kotlin.random.Random -import kotlin.test.BeforeTest import kotlin.test.Test import kotlin.test.assertEquals import kotlin.test.fail class ConferenceEventTest { - private var at by Delegates.notNull() - private lateinit var event: MutableSharedFlow> - private lateinit var store: TokenStore - - @BeforeTest - fun setUp() { - at = Random.nextLong(Long.MAX_VALUE) - event = MutableSharedFlow() - store = TokenStore.create(Random.nextToken()) - } - @Test fun `returns ConferenceEvent if type is registered`() { val at = Random.nextLong(Long.MAX_VALUE) diff --git a/sdk-conference/src/test/kotlin/com/pexip/sdk/conference/infinity/internal/DataChannelMessengerImplTest.kt b/sdk-conference/src/test/kotlin/com/pexip/sdk/conference/infinity/internal/DataChannelMessengerImplTest.kt index 2065ff117..3d93abc09 100644 --- a/sdk-conference/src/test/kotlin/com/pexip/sdk/conference/infinity/internal/DataChannelMessengerImplTest.kt +++ b/sdk-conference/src/test/kotlin/com/pexip/sdk/conference/infinity/internal/DataChannelMessengerImplTest.kt @@ -28,6 +28,7 @@ import assertk.fail import com.pexip.sdk.api.infinity.DataChannelMessage import com.pexip.sdk.conference.Message import com.pexip.sdk.conference.MessageNotSentException +import com.pexip.sdk.core.awaitSubscriptionCountAtLeast import com.pexip.sdk.media.Data import com.pexip.sdk.media.DataChannel import kotlinx.coroutines.flow.Flow diff --git a/sdk-conference/src/test/kotlin/com/pexip/sdk/conference/infinity/internal/EventsTest.kt b/sdk-conference/src/test/kotlin/com/pexip/sdk/conference/infinity/internal/EventsTest.kt index 014ced59e..8eae36010 100644 --- a/sdk-conference/src/test/kotlin/com/pexip/sdk/conference/infinity/internal/EventsTest.kt +++ b/sdk-conference/src/test/kotlin/com/pexip/sdk/conference/infinity/internal/EventsTest.kt @@ -25,6 +25,7 @@ import com.pexip.sdk.api.EventSourceListener import com.pexip.sdk.api.infinity.InfinityService import com.pexip.sdk.api.infinity.Token import com.pexip.sdk.api.infinity.TokenStore +import com.pexip.sdk.core.awaitSubscriptionCountAtLeast import kotlinx.coroutines.CoroutineScope import kotlinx.coroutines.flow.Flow import kotlinx.coroutines.flow.MutableSharedFlow diff --git a/sdk-conference/src/test/kotlin/com/pexip/sdk/conference/infinity/internal/MediaConnectionSignalingImplTest.kt b/sdk-conference/src/test/kotlin/com/pexip/sdk/conference/infinity/internal/MediaConnectionSignalingImplTest.kt index 75cbbe86a..471115c2f 100644 --- a/sdk-conference/src/test/kotlin/com/pexip/sdk/conference/infinity/internal/MediaConnectionSignalingImplTest.kt +++ b/sdk-conference/src/test/kotlin/com/pexip/sdk/conference/infinity/internal/MediaConnectionSignalingImplTest.kt @@ -42,6 +42,7 @@ import com.pexip.sdk.api.infinity.TokenStore import com.pexip.sdk.api.infinity.UpdateRequest import com.pexip.sdk.api.infinity.UpdateResponse import com.pexip.sdk.api.infinity.UpdateSdpEvent +import com.pexip.sdk.core.awaitSubscriptionCountAtLeast import com.pexip.sdk.media.CandidateSignalingEvent import com.pexip.sdk.media.IceServer import com.pexip.sdk.media.OfferSignalingEvent diff --git a/sdk-conference/src/test/kotlin/com/pexip/sdk/conference/infinity/internal/ThemeImplTest.kt b/sdk-conference/src/test/kotlin/com/pexip/sdk/conference/infinity/internal/ThemeImplTest.kt index 594947edd..a0f9e081b 100644 --- a/sdk-conference/src/test/kotlin/com/pexip/sdk/conference/infinity/internal/ThemeImplTest.kt +++ b/sdk-conference/src/test/kotlin/com/pexip/sdk/conference/infinity/internal/ThemeImplTest.kt @@ -46,6 +46,7 @@ import com.pexip.sdk.conference.Layout import com.pexip.sdk.conference.LayoutId import com.pexip.sdk.conference.SplashScreen import com.pexip.sdk.conference.TransformLayoutException +import com.pexip.sdk.core.awaitSubscriptionCountAtLeast import kotlinx.coroutines.flow.MutableSharedFlow import kotlinx.coroutines.test.runTest import kotlin.random.Random diff --git a/sdk-conference/src/test/kotlin/com/pexip/sdk/conference/infinity/internal/Util.kt b/sdk-conference/src/test/kotlin/com/pexip/sdk/conference/infinity/internal/Util.kt index ea56ae8e8..e69cfcb68 100644 --- a/sdk-conference/src/test/kotlin/com/pexip/sdk/conference/infinity/internal/Util.kt +++ b/sdk-conference/src/test/kotlin/com/pexip/sdk/conference/infinity/internal/Util.kt @@ -18,8 +18,6 @@ package com.pexip.sdk.conference.infinity.internal import com.pexip.sdk.api.infinity.DtmfRequest import com.pexip.sdk.api.infinity.RefreshTokenResponse import com.pexip.sdk.conference.Message -import kotlinx.coroutines.flow.MutableSharedFlow -import kotlinx.coroutines.flow.first import java.util.UUID import kotlin.random.Random @@ -45,8 +43,3 @@ internal fun Random.nextMessage(at: Long = System.currentTimeMillis(), direct: B payload = nextString(64), direct = direct, ) - -internal suspend fun MutableSharedFlow.awaitSubscriptionCountAtLeast(threshold: Int): Int { - require(threshold > 0) { "threshold must be a positive number." } - return subscriptionCount.first { it >= threshold } -} diff --git a/sdk-core/api/sdk-core.api b/sdk-core/api/sdk-core.api index 67ae9abbb..908a3f469 100644 --- a/sdk-core/api/sdk-core.api +++ b/sdk-core/api/sdk-core.api @@ -1,8 +1,17 @@ public abstract interface annotation class com/pexip/sdk/core/InternalSdkApi : java/lang/annotation/Annotation { } +public final class com/pexip/sdk/core/MutableSharedFlowKt { + public static final fun awaitSubscriptionCountAtLeast (Lkotlinx/coroutines/flow/MutableSharedFlow;ILkotlin/coroutines/Continuation;)Ljava/lang/Object; +} + public final class com/pexip/sdk/core/RetryKt { public static final fun retry-FbhrOv8 (IJJDLkotlin/jvm/functions/Function0;Lkotlin/coroutines/Continuation;)Ljava/lang/Object; public static synthetic fun retry-FbhrOv8$default (IJJDLkotlin/jvm/functions/Function0;Lkotlin/coroutines/Continuation;ILjava/lang/Object;)Ljava/lang/Object; } +public final class com/pexip/sdk/core/SharingStartedKt { + public static final fun WhileSubscribedWithDebounce-HG0u8IE (Lkotlinx/coroutines/flow/SharingStarted$Companion;J)Lkotlinx/coroutines/flow/SharingStarted; + public static synthetic fun WhileSubscribedWithDebounce-HG0u8IE$default (Lkotlinx/coroutines/flow/SharingStarted$Companion;JILjava/lang/Object;)Lkotlinx/coroutines/flow/SharingStarted; +} + diff --git a/sdk-core/src/main/kotlin/com/pexip/sdk/core/MutableSharedFlow.kt b/sdk-core/src/main/kotlin/com/pexip/sdk/core/MutableSharedFlow.kt new file mode 100644 index 000000000..9cbda4ba2 --- /dev/null +++ b/sdk-core/src/main/kotlin/com/pexip/sdk/core/MutableSharedFlow.kt @@ -0,0 +1,30 @@ +/* + * Copyright 2024 Pexip AS + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.pexip.sdk.core + +import kotlinx.coroutines.flow.MutableSharedFlow +import kotlinx.coroutines.flow.first + +/** + * Awaits until this [MutableSharedFlow] has at least [threshold] subscribes. + * + * @param threshold a number of subscriptions to await + */ +@InternalSdkApi +public suspend fun MutableSharedFlow.awaitSubscriptionCountAtLeast(threshold: Int) { + require(threshold > 0) { "threshold must be a positive number." } + subscriptionCount.first { it >= threshold } +} diff --git a/sdk-core/src/main/kotlin/com/pexip/sdk/core/SharingStarted.kt b/sdk-core/src/main/kotlin/com/pexip/sdk/core/SharingStarted.kt new file mode 100644 index 000000000..f02bf0731 --- /dev/null +++ b/sdk-core/src/main/kotlin/com/pexip/sdk/core/SharingStarted.kt @@ -0,0 +1,48 @@ +/* + * Copyright 2024 Pexip AS + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.pexip.sdk.core + +import kotlinx.coroutines.FlowPreview +import kotlinx.coroutines.flow.Flow +import kotlinx.coroutines.flow.SharingCommand +import kotlinx.coroutines.flow.SharingStarted +import kotlinx.coroutines.flow.StateFlow +import kotlinx.coroutines.flow.debounce +import kotlinx.coroutines.flow.distinctUntilChanged +import kotlinx.coroutines.flow.map +import kotlin.time.Duration +import kotlin.time.Duration.Companion.milliseconds + +/** + * Sharing is started when the first subscriber appears after a given [timeout] has passed since the + * most recent subscription and stops if there are no subscribers. + */ +@Suppress("FunctionName") +@InternalSdkApi +public fun SharingStarted.Companion.WhileSubscribedWithDebounce(timeout: Duration = 100.milliseconds): SharingStarted = + StartedWhileSubscribedWithDebounce(timeout) + +@OptIn(FlowPreview::class) +private class StartedWhileSubscribedWithDebounce(private val timeout: Duration) : SharingStarted { + + override fun command(subscriptionCount: StateFlow): Flow = + subscriptionCount + .map { if (it > 0) SharingCommand.START else SharingCommand.STOP_AND_RESET_REPLAY_CACHE } + .debounce { if (it == SharingCommand.START) timeout else Duration.ZERO } + .distinctUntilChanged() + + override fun toString(): String = "SharingStarted.WhileSubscribedWithDebounce($timeout)" +} diff --git a/sdk-registration/src/main/kotlin/com/pexip/sdk/registration/infinity/InfinityRegistration.kt b/sdk-registration/src/main/kotlin/com/pexip/sdk/registration/infinity/InfinityRegistration.kt index b44e1c418..be38bb831 100644 --- a/sdk-registration/src/main/kotlin/com/pexip/sdk/registration/infinity/InfinityRegistration.kt +++ b/sdk-registration/src/main/kotlin/com/pexip/sdk/registration/infinity/InfinityRegistration.kt @@ -20,6 +20,7 @@ import com.pexip.sdk.api.infinity.InfinityService import com.pexip.sdk.api.infinity.RequestRegistrationTokenResponse import com.pexip.sdk.api.infinity.TokenStore import com.pexip.sdk.api.infinity.TokenStore.Companion.refreshTokenIn +import com.pexip.sdk.core.WhileSubscribedWithDebounce import com.pexip.sdk.core.retry import com.pexip.sdk.infinity.UnsupportedInfinityException import com.pexip.sdk.registration.RegisteredDevicesCallback @@ -28,36 +29,39 @@ import com.pexip.sdk.registration.RegistrationEvent import com.pexip.sdk.registration.RegistrationEventListener import com.pexip.sdk.registration.infinity.internal.RegisteredDevicesFetcher import com.pexip.sdk.registration.infinity.internal.RegistrationEvent -import com.pexip.sdk.registration.infinity.internal.registrationEvent +import com.pexip.sdk.registration.infinity.internal.events import kotlinx.coroutines.CoroutineScope +import kotlinx.coroutines.DelicateCoroutinesApi import kotlinx.coroutines.Dispatchers +import kotlinx.coroutines.ExperimentalCoroutinesApi import kotlinx.coroutines.SupervisorJob -import kotlinx.coroutines.asCoroutineDispatcher import kotlinx.coroutines.cancel import kotlinx.coroutines.flow.MutableSharedFlow import kotlinx.coroutines.flow.SharingStarted -import kotlinx.coroutines.flow.flowOn -import kotlinx.coroutines.flow.launchIn +import kotlinx.coroutines.flow.buffer +import kotlinx.coroutines.flow.mapNotNull import kotlinx.coroutines.flow.merge -import kotlinx.coroutines.flow.onEach import kotlinx.coroutines.flow.shareIn import kotlinx.coroutines.launch +import kotlinx.coroutines.newSingleThreadContext +import kotlinx.coroutines.withContext import java.net.URL import java.util.concurrent.CopyOnWriteArraySet -import java.util.concurrent.Executors public class InfinityRegistration private constructor( step: InfinityService.RegistrationStep, response: RequestRegistrationTokenResponse, ) : Registration { - private val executor = Executors.newSingleThreadScheduledExecutor() - private val scope = CoroutineScope(SupervisorJob() + executor.asCoroutineDispatcher()) + @OptIn(ExperimentalCoroutinesApi::class, DelicateCoroutinesApi::class) + private val context = newSingleThreadContext("InfinityRegistration") + private val scope = CoroutineScope(SupervisorJob() + context) private val store = TokenStore.create(response) private val fetcher = RegisteredDevicesFetcher(step, store) - private val registrationEvent = step - .registrationEvent(store) - .shareIn(scope, SharingStarted.Lazily) + private val event = step.events(store).shareIn( + scope = scope, + started = SharingStarted.WhileSubscribedWithDebounce(), + ) private val listeners = CopyOnWriteArraySet() private val mutableRegistrationEvent = MutableSharedFlow() @@ -72,10 +76,15 @@ public class InfinityRegistration private constructor( releaseToken = { retry { step.releaseToken(it).await() } }, onFailure = { mutableRegistrationEvent.emit(RegistrationEvent(it)) }, ) - merge(registrationEvent, mutableRegistrationEvent) - .onEach { event -> listeners.forEach { it.onRegistrationEvent(event) } } - .flowOn(Dispatchers.Main.immediate) - .launchIn(scope) + scope.launch { + merge(event.mapNotNull(::RegistrationEvent), mutableRegistrationEvent) + .buffer() + .collect { event -> + withContext(Dispatchers.Main.immediate) { + listeners.forEach { it.onRegistrationEvent(event) } + } + } + } } override fun getRegisteredDevices(query: String, callback: RegisteredDevicesCallback) { @@ -98,7 +107,7 @@ public class InfinityRegistration private constructor( override fun dispose() { scope.cancel() - executor.shutdown() + context.close() listeners.clear() } @@ -114,6 +123,10 @@ public class InfinityRegistration private constructor( * @return an instance of [InfinityRegistration] * @throws UnsupportedInfinityException if the version of Infinity is not supported */ + @Deprecated( + message = "Use a version of this method that accepts RegistrationStep", + replaceWith = ReplaceWith("create(service.newRequest(node).registration(deviceAlias)), response)"), + ) @JvmStatic public fun create( service: InfinityService, diff --git a/sdk-registration/src/main/kotlin/com/pexip/sdk/registration/infinity/internal/Events.kt b/sdk-registration/src/main/kotlin/com/pexip/sdk/registration/infinity/internal/Events.kt new file mode 100644 index 000000000..8a3806980 --- /dev/null +++ b/sdk-registration/src/main/kotlin/com/pexip/sdk/registration/infinity/internal/Events.kt @@ -0,0 +1,34 @@ +/* + * Copyright 2024 Pexip AS + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.pexip.sdk.registration.infinity.internal + +import com.pexip.sdk.api.coroutines.asFlow +import com.pexip.sdk.api.infinity.InfinityService +import com.pexip.sdk.api.infinity.TokenStore +import kotlinx.coroutines.ExperimentalCoroutinesApi +import kotlinx.coroutines.delay +import kotlinx.coroutines.flow.flatMapLatest +import kotlinx.coroutines.flow.flow +import kotlinx.coroutines.flow.retryWhen +import kotlin.time.Duration.Companion.seconds + +@OptIn(ExperimentalCoroutinesApi::class) +internal fun InfinityService.RegistrationStep.events(store: TokenStore) = flow { emit(store.get()) } + .flatMapLatest { events(it).asFlow() } + .retryWhen { _, attempt -> + delay(attempt.seconds.coerceAtMost(5.seconds)) + true + } diff --git a/sdk-registration/src/main/kotlin/com/pexip/sdk/registration/infinity/internal/RegistrationEvent.kt b/sdk-registration/src/main/kotlin/com/pexip/sdk/registration/infinity/internal/RegistrationEvent.kt index c1593a7ff..6dfcc4a3f 100644 --- a/sdk-registration/src/main/kotlin/com/pexip/sdk/registration/infinity/internal/RegistrationEvent.kt +++ b/sdk-registration/src/main/kotlin/com/pexip/sdk/registration/infinity/internal/RegistrationEvent.kt @@ -1,5 +1,5 @@ /* - * Copyright 2022-2023 Pexip AS + * Copyright 2022-2024 Pexip AS * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -16,21 +16,11 @@ package com.pexip.sdk.registration.infinity.internal import com.pexip.sdk.api.Event -import com.pexip.sdk.api.coroutines.asFlow import com.pexip.sdk.api.infinity.IncomingCancelledEvent import com.pexip.sdk.api.infinity.IncomingEvent -import com.pexip.sdk.api.infinity.InfinityService -import com.pexip.sdk.api.infinity.TokenStore import com.pexip.sdk.registration.FailureRegistrationEvent import com.pexip.sdk.registration.IncomingCancelledRegistrationEvent import com.pexip.sdk.registration.IncomingRegistrationEvent -import kotlinx.coroutines.ExperimentalCoroutinesApi -import kotlinx.coroutines.delay -import kotlinx.coroutines.flow.flatMapLatest -import kotlinx.coroutines.flow.flow -import kotlinx.coroutines.flow.mapNotNull -import kotlinx.coroutines.flow.retryWhen -import kotlin.time.Duration.Companion.seconds @Suppress("ktlint:standard:function-naming") internal inline fun RegistrationEvent( @@ -53,15 +43,3 @@ internal inline fun RegistrationEvent( @Suppress("ktlint:standard:function-naming") internal inline fun RegistrationEvent(t: Throwable, at: () -> Long = System::currentTimeMillis) = FailureRegistrationEvent(at(), t) - -@OptIn(ExperimentalCoroutinesApi::class) -internal fun InfinityService.RegistrationStep.registrationEvent( - store: TokenStore, - at: () -> Long = System::currentTimeMillis, -) = flow { emit(store.get()) } - .flatMapLatest { events(it).asFlow() } - .mapNotNull { RegistrationEvent(it, at) } - .retryWhen { _, attempt -> - delay(attempt.seconds.coerceAtMost(5.seconds)) - true - } diff --git a/sdk-registration/src/test/kotlin/com/pexip/sdk/registration/infinity/internal/EventsTest.kt b/sdk-registration/src/test/kotlin/com/pexip/sdk/registration/infinity/internal/EventsTest.kt new file mode 100644 index 000000000..9838ce280 --- /dev/null +++ b/sdk-registration/src/test/kotlin/com/pexip/sdk/registration/infinity/internal/EventsTest.kt @@ -0,0 +1,120 @@ +/* + * Copyright 2024 Pexip AS + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.pexip.sdk.registration.infinity.internal + +import app.cash.turbine.test +import assertk.assertThat +import assertk.assertions.isEqualTo +import com.pexip.sdk.api.Event +import com.pexip.sdk.api.EventSource +import com.pexip.sdk.api.EventSourceFactory +import com.pexip.sdk.api.EventSourceListener +import com.pexip.sdk.api.infinity.InfinityService +import com.pexip.sdk.api.infinity.Token +import com.pexip.sdk.api.infinity.TokenStore +import com.pexip.sdk.core.awaitSubscriptionCountAtLeast +import kotlinx.coroutines.CoroutineScope +import kotlinx.coroutines.flow.Flow +import kotlinx.coroutines.flow.MutableSharedFlow +import kotlinx.coroutines.flow.launchIn +import kotlinx.coroutines.flow.onEach +import kotlinx.coroutines.test.TestScope +import kotlinx.coroutines.test.runTest +import kotlin.random.Random +import kotlin.test.BeforeTest +import kotlin.test.Test + +class EventsTest { + + private lateinit var event: MutableSharedFlow> + private lateinit var store: TokenStore + + @BeforeTest + fun setUp() { + event = MutableSharedFlow() + store = TokenStore.create(Random.nextToken()) + } + + @Test + fun `maps Event to ConferenceEvent`() = runTest { + val step = testRegistrationStep() + step.events(store).test { + event.awaitSubscriptionCountAtLeast(1) + val events = List(10) { TestEvent() } + events.forEach { + event.emit(Result.success(it)) + assertThat(awaitItem(), "event").isEqualTo(it) + } + expectNoEvents() + cancelAndIgnoreRemainingEvents() + } + } + + @Test + fun `failure restarts the flow`() = runTest { + val step = testRegistrationStep() + step.events(store).test { + event.awaitSubscriptionCountAtLeast(1) + val event1 = TestEvent() + event.emit(Result.success(event1)) + assertThat(awaitItem(), "event").isEqualTo(event1) + event.emit(Result.failure(Throwable())) + event.awaitSubscriptionCountAtLeast(1) + val event2 = TestEvent() + event.emit(Result.success(event2)) + assertThat(awaitItem(), "event").isEqualTo(event2) + expectNoEvents() + cancelAndIgnoreRemainingEvents() + } + } + + private fun TestScope.testRegistrationStep() = object : InfinityService.RegistrationStep { + + override fun events(token: Token): EventSourceFactory { + assertThat(token, "token").isEqualTo(store.get()) + return TestEventSourceFactory(backgroundScope, event) + } + } + + private class TestEventSourceFactory( + private val scope: CoroutineScope, + private val event: Flow>, + ) : EventSourceFactory { + + override fun create(listener: EventSourceListener): EventSource = + TestEventSource(scope, event, listener) + } + + private class TestEventSource( + scope: CoroutineScope, + event: Flow>, + listener: EventSourceListener, + ) : EventSource { + + private val job = event + .onEach { + it.fold( + onSuccess = { event -> listener.onEvent(this, event) }, + onFailure = { t -> listener.onClosed(this, t) }, + ) + } + .launchIn(scope) + + override fun cancel(): Unit = job.cancel() + } + + private class TestEvent : Event +} diff --git a/sdk-registration/src/test/kotlin/com/pexip/sdk/registration/infinity/internal/RegistrationEventTest.kt b/sdk-registration/src/test/kotlin/com/pexip/sdk/registration/infinity/internal/RegistrationEventTest.kt index 1a511b275..6f56b28fd 100644 --- a/sdk-registration/src/test/kotlin/com/pexip/sdk/registration/infinity/internal/RegistrationEventTest.kt +++ b/sdk-registration/src/test/kotlin/com/pexip/sdk/registration/infinity/internal/RegistrationEventTest.kt @@ -15,124 +15,22 @@ */ package com.pexip.sdk.registration.infinity.internal -import app.cash.turbine.test -import assertk.Assert -import assertk.all -import assertk.assertThat -import assertk.assertions.isEqualTo -import assertk.assertions.isInstanceOf -import assertk.assertions.prop import com.pexip.sdk.api.Event -import com.pexip.sdk.api.EventSource -import com.pexip.sdk.api.EventSourceFactory -import com.pexip.sdk.api.EventSourceListener -import com.pexip.sdk.api.infinity.ByeEvent import com.pexip.sdk.api.infinity.IncomingCancelledEvent import com.pexip.sdk.api.infinity.IncomingEvent -import com.pexip.sdk.api.infinity.InfinityService -import com.pexip.sdk.api.infinity.Token -import com.pexip.sdk.api.infinity.TokenStore import com.pexip.sdk.registration.FailureRegistrationEvent import com.pexip.sdk.registration.IncomingCancelledRegistrationEvent import com.pexip.sdk.registration.IncomingRegistrationEvent -import com.pexip.sdk.registration.RegistrationEvent -import kotlinx.coroutines.CoroutineScope -import kotlinx.coroutines.flow.Flow -import kotlinx.coroutines.flow.MutableSharedFlow -import kotlinx.coroutines.flow.first -import kotlinx.coroutines.flow.launchIn -import kotlinx.coroutines.flow.onEach -import kotlinx.coroutines.test.TestScope -import kotlinx.coroutines.test.runTest -import kotlin.properties.Delegates import kotlin.random.Random -import kotlin.test.BeforeTest import kotlin.test.Test import kotlin.test.assertEquals import kotlin.test.fail internal class RegistrationEventTest { - private var at by Delegates.notNull() - private lateinit var event: MutableSharedFlow> - private lateinit var store: TokenStore - - @BeforeTest - fun setUp() { - at = Random.nextLong(Long.MAX_VALUE) - event = MutableSharedFlow() - store = TokenStore.create(Random.nextToken()) - } - - @Test - fun `maps Event to RegistrationEvent`() = runTest { - val step = testRegistrationStep() - step.registrationEvent(store) { at }.test { - event.awaitSubscribers() - val incomingEvent = IncomingEvent( - conferenceAlias = Random.nextString(8), - remoteDisplayName = Random.nextString(8), - token = Random.nextString(8), - ) - event.emit(Result.success(incomingEvent)) - assertThat(awaitItem(), "incomingEvent") - .isInstanceOf() - .all { - at(at) - prop(IncomingRegistrationEvent::conferenceAlias) - .isEqualTo(incomingEvent.conferenceAlias) - prop(IncomingRegistrationEvent::remoteDisplayName) - .isEqualTo(incomingEvent.remoteDisplayName) - prop(IncomingRegistrationEvent::token) - .isEqualTo(incomingEvent.token) - } - val incomingCancelledEvent = IncomingCancelledEvent(Random.nextString(8)) - event.emit(Result.success(incomingCancelledEvent)) - assertThat(awaitItem(), "incomingCancelledEvent") - .isInstanceOf() - .all { - at(at) - prop(IncomingCancelledRegistrationEvent::token) - .isEqualTo(incomingCancelledEvent.token) - } - val byeEvent = ByeEvent - event.emit(Result.success(byeEvent)) - expectNoEvents() - cancelAndIgnoreRemainingEvents() - } - } - - @Test - fun `failure restarts the flow`() = runTest { - val step = testRegistrationStep() - step.registrationEvent(store) { at }.test { - event.awaitSubscribers() - val incomingCancelledEvent = IncomingCancelledEvent(Random.nextString(8)) - event.emit(Result.success(incomingCancelledEvent)) - assertThat(awaitItem(), "incomingCancelledEvent") - .isInstanceOf() - .all { - at(at) - prop(IncomingCancelledRegistrationEvent::token) - .isEqualTo(incomingCancelledEvent.token) - } - event.emit(Result.failure(Throwable())) - event.awaitSubscribers() - event.emit(Result.success(incomingCancelledEvent)) - assertThat(awaitItem(), "presentationStopEvent") - .isInstanceOf() - .all { - at(at) - prop(IncomingCancelledRegistrationEvent::token) - .isEqualTo(incomingCancelledEvent.token) - } - expectNoEvents() - cancelAndIgnoreRemainingEvents() - } - } - @Test fun `returns RegistrationEvent if type is registered`() { + val at = Random.nextLong(Long.MAX_VALUE) val testCases = buildMap { val incomingEvent = IncomingEvent( conferenceAlias = Random.nextString(8), @@ -167,45 +65,4 @@ internal class RegistrationEventTest { } private data object TestEvent : Event - - private suspend fun MutableSharedFlow.awaitSubscribers() { - subscriptionCount.first { it > 0 } - } - - private fun TestScope.testRegistrationStep() = object : InfinityService.RegistrationStep { - - override fun events(token: Token): EventSourceFactory { - assertThat(token, "token").isEqualTo(store.get()) - return TestEventSourceFactory(backgroundScope, event) - } - } - - private class TestEventSourceFactory( - private val scope: CoroutineScope, - private val event: Flow>, - ) : EventSourceFactory { - - override fun create(listener: EventSourceListener): EventSource = - TestEventSource(scope, event, listener) - } - - private class TestEventSource( - scope: CoroutineScope, - event: Flow>, - listener: EventSourceListener, - ) : EventSource { - - private val job = event - .onEach { - it.fold( - onSuccess = { event -> listener.onEvent(this, event) }, - onFailure = { t -> listener.onClosed(this, t) }, - ) - } - .launchIn(scope) - - override fun cancel(): Unit = job.cancel() - } - - private fun Assert.at(at: Long) = prop(RegistrationEvent::at).isEqualTo(at) }