diff --git a/.idea/kotlinc.xml b/.idea/kotlinc.xml
index 8d81632f8..fe63bb677 100644
--- a/.idea/kotlinc.xml
+++ b/.idea/kotlinc.xml
@@ -1,6 +1,6 @@
-
+
\ No newline at end of file
diff --git a/sdk-conference/src/main/kotlin/com/pexip/sdk/conference/ConferenceEvent.kt b/sdk-conference/src/main/kotlin/com/pexip/sdk/conference/ConferenceEvent.kt
index 159d1a65d..2e5814eb1 100644
--- a/sdk-conference/src/main/kotlin/com/pexip/sdk/conference/ConferenceEvent.kt
+++ b/sdk-conference/src/main/kotlin/com/pexip/sdk/conference/ConferenceEvent.kt
@@ -23,7 +23,7 @@ public sealed interface ConferenceEvent {
}
@Deprecated(
- message = "Use Roster.present instead to observe presentation state.",
+ message = "Use Roster.presenter instead to observe presentation state.",
level = DeprecationLevel.WARNING,
)
public data class PresentationStartConferenceEvent(
@@ -33,7 +33,7 @@ public data class PresentationStartConferenceEvent(
) : ConferenceEvent
@Deprecated(
- message = "Use Roster.present instead to observe presentation state.",
+ message = "Use Roster.presenter instead to observe presentation state.",
level = DeprecationLevel.WARNING,
)
public data class PresentationStopConferenceEvent(override val at: Long) : ConferenceEvent
diff --git a/sdk-conference/src/main/kotlin/com/pexip/sdk/conference/infinity/InfinityConference.kt b/sdk-conference/src/main/kotlin/com/pexip/sdk/conference/infinity/InfinityConference.kt
index b164dff44..e124454cf 100644
--- a/sdk-conference/src/main/kotlin/com/pexip/sdk/conference/infinity/InfinityConference.kt
+++ b/sdk-conference/src/main/kotlin/com/pexip/sdk/conference/infinity/InfinityConference.kt
@@ -35,16 +35,17 @@ import com.pexip.sdk.conference.infinity.internal.MessengerImpl
import com.pexip.sdk.conference.infinity.internal.RefererImpl
import com.pexip.sdk.conference.infinity.internal.RosterImpl
import com.pexip.sdk.conference.infinity.internal.ThemeImpl
-import com.pexip.sdk.conference.infinity.internal.WhileSubscribedWithDebounce
import com.pexip.sdk.conference.infinity.internal.events
+import com.pexip.sdk.core.WhileSubscribedWithDebounce
import com.pexip.sdk.core.retry
import com.pexip.sdk.infinity.UnsupportedInfinityException
import com.pexip.sdk.media.IceServer
import com.pexip.sdk.media.MediaConnectionSignaling
import kotlinx.coroutines.CoroutineScope
+import kotlinx.coroutines.DelicateCoroutinesApi
import kotlinx.coroutines.Dispatchers
+import kotlinx.coroutines.ExperimentalCoroutinesApi
import kotlinx.coroutines.SupervisorJob
-import kotlinx.coroutines.asCoroutineDispatcher
import kotlinx.coroutines.cancel
import kotlinx.coroutines.flow.MutableSharedFlow
import kotlinx.coroutines.flow.SharingStarted
@@ -53,24 +54,24 @@ import kotlinx.coroutines.flow.mapNotNull
import kotlinx.coroutines.flow.merge
import kotlinx.coroutines.flow.shareIn
import kotlinx.coroutines.launch
+import kotlinx.coroutines.newSingleThreadContext
import kotlinx.coroutines.withContext
import java.net.URL
import java.util.concurrent.CopyOnWriteArraySet
-import java.util.concurrent.Executors
-import kotlin.time.Duration.Companion.milliseconds
public class InfinityConference private constructor(
private val step: InfinityService.ConferenceStep,
response: RequestTokenResponse,
) : Conference {
- private val executor = Executors.newSingleThreadScheduledExecutor()
- private val scope = CoroutineScope(SupervisorJob() + executor.asCoroutineDispatcher())
+ @OptIn(ExperimentalCoroutinesApi::class, DelicateCoroutinesApi::class)
+ private val context = newSingleThreadContext("InfinityConference")
+ private val scope = CoroutineScope(SupervisorJob() + context)
private val store = TokenStore.create(response)
private val event = step.events(store).shareIn(
scope = scope,
- started = SharingStarted.WhileSubscribedWithDebounce(100.milliseconds),
+ started = SharingStarted.WhileSubscribedWithDebounce(),
)
private val listeners = CopyOnWriteArraySet()
private val mutableConferenceEvent = MutableSharedFlow()
@@ -161,7 +162,7 @@ public class InfinityConference private constructor(
override fun leave() {
scope.cancel()
- executor.shutdown()
+ context.close()
listeners.clear()
}
@@ -177,6 +178,10 @@ public class InfinityConference private constructor(
* @return an instance of [InfinityConference]
* @throws UnsupportedInfinityException if the version of Infinity is not supported
*/
+ @Deprecated(
+ message = "Use a version of this method that accepts ConferenceStep",
+ replaceWith = ReplaceWith("create(service.newRequest(node).conference(conferenceAlias)), response)"),
+ )
@JvmStatic
public fun create(
service: InfinityService,
diff --git a/sdk-conference/src/main/kotlin/com/pexip/sdk/conference/infinity/internal/Util.kt b/sdk-conference/src/main/kotlin/com/pexip/sdk/conference/infinity/internal/Util.kt
deleted file mode 100644
index da69c0faa..000000000
--- a/sdk-conference/src/main/kotlin/com/pexip/sdk/conference/infinity/internal/Util.kt
+++ /dev/null
@@ -1,35 +0,0 @@
-/*
- * Copyright 2024 Pexip AS
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.pexip.sdk.conference.infinity.internal
-
-import kotlinx.coroutines.FlowPreview
-import kotlinx.coroutines.flow.SharingCommand
-import kotlinx.coroutines.flow.SharingStarted
-import kotlinx.coroutines.flow.debounce
-import kotlinx.coroutines.flow.distinctUntilChanged
-import kotlinx.coroutines.flow.map
-import kotlin.time.Duration
-
-@OptIn(FlowPreview::class)
-@Suppress("ktlint:standard:function-naming")
-internal fun SharingStarted.Companion.WhileSubscribedWithDebounce(timeout: Duration) =
- SharingStarted { subscriptionCount ->
- subscriptionCount
- .map { it > 0 }
- .distinctUntilChanged()
- .debounce(timeout)
- .map { if (it) SharingCommand.START else SharingCommand.STOP_AND_RESET_REPLAY_CACHE }
- }
diff --git a/sdk-conference/src/test/kotlin/com/pexip/sdk/conference/infinity/internal/ConferenceEventTest.kt b/sdk-conference/src/test/kotlin/com/pexip/sdk/conference/infinity/internal/ConferenceEventTest.kt
index 030710c1f..5597ccad3 100644
--- a/sdk-conference/src/test/kotlin/com/pexip/sdk/conference/infinity/internal/ConferenceEventTest.kt
+++ b/sdk-conference/src/test/kotlin/com/pexip/sdk/conference/infinity/internal/ConferenceEventTest.kt
@@ -1,5 +1,5 @@
/*
- * Copyright 2022-2023 Pexip AS
+ * Copyright 2022-2024 Pexip AS
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -20,34 +20,19 @@ import com.pexip.sdk.api.infinity.DisconnectEvent
import com.pexip.sdk.api.infinity.PresentationStartEvent
import com.pexip.sdk.api.infinity.PresentationStopEvent
import com.pexip.sdk.api.infinity.ReferEvent
-import com.pexip.sdk.api.infinity.TokenStore
import com.pexip.sdk.conference.DisconnectConferenceEvent
import com.pexip.sdk.conference.FailureConferenceEvent
import com.pexip.sdk.conference.PresentationStartConferenceEvent
import com.pexip.sdk.conference.PresentationStopConferenceEvent
import com.pexip.sdk.conference.ReferConferenceEvent
-import kotlinx.coroutines.flow.MutableSharedFlow
import java.util.UUID
-import kotlin.properties.Delegates
import kotlin.random.Random
-import kotlin.test.BeforeTest
import kotlin.test.Test
import kotlin.test.assertEquals
import kotlin.test.fail
class ConferenceEventTest {
- private var at by Delegates.notNull()
- private lateinit var event: MutableSharedFlow>
- private lateinit var store: TokenStore
-
- @BeforeTest
- fun setUp() {
- at = Random.nextLong(Long.MAX_VALUE)
- event = MutableSharedFlow()
- store = TokenStore.create(Random.nextToken())
- }
-
@Test
fun `returns ConferenceEvent if type is registered`() {
val at = Random.nextLong(Long.MAX_VALUE)
diff --git a/sdk-conference/src/test/kotlin/com/pexip/sdk/conference/infinity/internal/DataChannelMessengerImplTest.kt b/sdk-conference/src/test/kotlin/com/pexip/sdk/conference/infinity/internal/DataChannelMessengerImplTest.kt
index 2065ff117..3d93abc09 100644
--- a/sdk-conference/src/test/kotlin/com/pexip/sdk/conference/infinity/internal/DataChannelMessengerImplTest.kt
+++ b/sdk-conference/src/test/kotlin/com/pexip/sdk/conference/infinity/internal/DataChannelMessengerImplTest.kt
@@ -28,6 +28,7 @@ import assertk.fail
import com.pexip.sdk.api.infinity.DataChannelMessage
import com.pexip.sdk.conference.Message
import com.pexip.sdk.conference.MessageNotSentException
+import com.pexip.sdk.core.awaitSubscriptionCountAtLeast
import com.pexip.sdk.media.Data
import com.pexip.sdk.media.DataChannel
import kotlinx.coroutines.flow.Flow
diff --git a/sdk-conference/src/test/kotlin/com/pexip/sdk/conference/infinity/internal/EventsTest.kt b/sdk-conference/src/test/kotlin/com/pexip/sdk/conference/infinity/internal/EventsTest.kt
index 014ced59e..8eae36010 100644
--- a/sdk-conference/src/test/kotlin/com/pexip/sdk/conference/infinity/internal/EventsTest.kt
+++ b/sdk-conference/src/test/kotlin/com/pexip/sdk/conference/infinity/internal/EventsTest.kt
@@ -25,6 +25,7 @@ import com.pexip.sdk.api.EventSourceListener
import com.pexip.sdk.api.infinity.InfinityService
import com.pexip.sdk.api.infinity.Token
import com.pexip.sdk.api.infinity.TokenStore
+import com.pexip.sdk.core.awaitSubscriptionCountAtLeast
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.MutableSharedFlow
diff --git a/sdk-conference/src/test/kotlin/com/pexip/sdk/conference/infinity/internal/MediaConnectionSignalingImplTest.kt b/sdk-conference/src/test/kotlin/com/pexip/sdk/conference/infinity/internal/MediaConnectionSignalingImplTest.kt
index 75cbbe86a..471115c2f 100644
--- a/sdk-conference/src/test/kotlin/com/pexip/sdk/conference/infinity/internal/MediaConnectionSignalingImplTest.kt
+++ b/sdk-conference/src/test/kotlin/com/pexip/sdk/conference/infinity/internal/MediaConnectionSignalingImplTest.kt
@@ -42,6 +42,7 @@ import com.pexip.sdk.api.infinity.TokenStore
import com.pexip.sdk.api.infinity.UpdateRequest
import com.pexip.sdk.api.infinity.UpdateResponse
import com.pexip.sdk.api.infinity.UpdateSdpEvent
+import com.pexip.sdk.core.awaitSubscriptionCountAtLeast
import com.pexip.sdk.media.CandidateSignalingEvent
import com.pexip.sdk.media.IceServer
import com.pexip.sdk.media.OfferSignalingEvent
diff --git a/sdk-conference/src/test/kotlin/com/pexip/sdk/conference/infinity/internal/ThemeImplTest.kt b/sdk-conference/src/test/kotlin/com/pexip/sdk/conference/infinity/internal/ThemeImplTest.kt
index 594947edd..a0f9e081b 100644
--- a/sdk-conference/src/test/kotlin/com/pexip/sdk/conference/infinity/internal/ThemeImplTest.kt
+++ b/sdk-conference/src/test/kotlin/com/pexip/sdk/conference/infinity/internal/ThemeImplTest.kt
@@ -46,6 +46,7 @@ import com.pexip.sdk.conference.Layout
import com.pexip.sdk.conference.LayoutId
import com.pexip.sdk.conference.SplashScreen
import com.pexip.sdk.conference.TransformLayoutException
+import com.pexip.sdk.core.awaitSubscriptionCountAtLeast
import kotlinx.coroutines.flow.MutableSharedFlow
import kotlinx.coroutines.test.runTest
import kotlin.random.Random
diff --git a/sdk-conference/src/test/kotlin/com/pexip/sdk/conference/infinity/internal/Util.kt b/sdk-conference/src/test/kotlin/com/pexip/sdk/conference/infinity/internal/Util.kt
index ea56ae8e8..e69cfcb68 100644
--- a/sdk-conference/src/test/kotlin/com/pexip/sdk/conference/infinity/internal/Util.kt
+++ b/sdk-conference/src/test/kotlin/com/pexip/sdk/conference/infinity/internal/Util.kt
@@ -18,8 +18,6 @@ package com.pexip.sdk.conference.infinity.internal
import com.pexip.sdk.api.infinity.DtmfRequest
import com.pexip.sdk.api.infinity.RefreshTokenResponse
import com.pexip.sdk.conference.Message
-import kotlinx.coroutines.flow.MutableSharedFlow
-import kotlinx.coroutines.flow.first
import java.util.UUID
import kotlin.random.Random
@@ -45,8 +43,3 @@ internal fun Random.nextMessage(at: Long = System.currentTimeMillis(), direct: B
payload = nextString(64),
direct = direct,
)
-
-internal suspend fun MutableSharedFlow.awaitSubscriptionCountAtLeast(threshold: Int): Int {
- require(threshold > 0) { "threshold must be a positive number." }
- return subscriptionCount.first { it >= threshold }
-}
diff --git a/sdk-core/api/sdk-core.api b/sdk-core/api/sdk-core.api
index 67ae9abbb..908a3f469 100644
--- a/sdk-core/api/sdk-core.api
+++ b/sdk-core/api/sdk-core.api
@@ -1,8 +1,17 @@
public abstract interface annotation class com/pexip/sdk/core/InternalSdkApi : java/lang/annotation/Annotation {
}
+public final class com/pexip/sdk/core/MutableSharedFlowKt {
+ public static final fun awaitSubscriptionCountAtLeast (Lkotlinx/coroutines/flow/MutableSharedFlow;ILkotlin/coroutines/Continuation;)Ljava/lang/Object;
+}
+
public final class com/pexip/sdk/core/RetryKt {
public static final fun retry-FbhrOv8 (IJJDLkotlin/jvm/functions/Function0;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
public static synthetic fun retry-FbhrOv8$default (IJJDLkotlin/jvm/functions/Function0;Lkotlin/coroutines/Continuation;ILjava/lang/Object;)Ljava/lang/Object;
}
+public final class com/pexip/sdk/core/SharingStartedKt {
+ public static final fun WhileSubscribedWithDebounce-HG0u8IE (Lkotlinx/coroutines/flow/SharingStarted$Companion;J)Lkotlinx/coroutines/flow/SharingStarted;
+ public static synthetic fun WhileSubscribedWithDebounce-HG0u8IE$default (Lkotlinx/coroutines/flow/SharingStarted$Companion;JILjava/lang/Object;)Lkotlinx/coroutines/flow/SharingStarted;
+}
+
diff --git a/sdk-core/src/main/kotlin/com/pexip/sdk/core/MutableSharedFlow.kt b/sdk-core/src/main/kotlin/com/pexip/sdk/core/MutableSharedFlow.kt
new file mode 100644
index 000000000..9cbda4ba2
--- /dev/null
+++ b/sdk-core/src/main/kotlin/com/pexip/sdk/core/MutableSharedFlow.kt
@@ -0,0 +1,30 @@
+/*
+ * Copyright 2024 Pexip AS
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.pexip.sdk.core
+
+import kotlinx.coroutines.flow.MutableSharedFlow
+import kotlinx.coroutines.flow.first
+
+/**
+ * Awaits until this [MutableSharedFlow] has at least [threshold] subscribes.
+ *
+ * @param threshold a number of subscriptions to await
+ */
+@InternalSdkApi
+public suspend fun MutableSharedFlow.awaitSubscriptionCountAtLeast(threshold: Int) {
+ require(threshold > 0) { "threshold must be a positive number." }
+ subscriptionCount.first { it >= threshold }
+}
diff --git a/sdk-core/src/main/kotlin/com/pexip/sdk/core/SharingStarted.kt b/sdk-core/src/main/kotlin/com/pexip/sdk/core/SharingStarted.kt
new file mode 100644
index 000000000..f02bf0731
--- /dev/null
+++ b/sdk-core/src/main/kotlin/com/pexip/sdk/core/SharingStarted.kt
@@ -0,0 +1,48 @@
+/*
+ * Copyright 2024 Pexip AS
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.pexip.sdk.core
+
+import kotlinx.coroutines.FlowPreview
+import kotlinx.coroutines.flow.Flow
+import kotlinx.coroutines.flow.SharingCommand
+import kotlinx.coroutines.flow.SharingStarted
+import kotlinx.coroutines.flow.StateFlow
+import kotlinx.coroutines.flow.debounce
+import kotlinx.coroutines.flow.distinctUntilChanged
+import kotlinx.coroutines.flow.map
+import kotlin.time.Duration
+import kotlin.time.Duration.Companion.milliseconds
+
+/**
+ * Sharing is started when the first subscriber appears after a given [timeout] has passed since the
+ * most recent subscription and stops if there are no subscribers.
+ */
+@Suppress("FunctionName")
+@InternalSdkApi
+public fun SharingStarted.Companion.WhileSubscribedWithDebounce(timeout: Duration = 100.milliseconds): SharingStarted =
+ StartedWhileSubscribedWithDebounce(timeout)
+
+@OptIn(FlowPreview::class)
+private class StartedWhileSubscribedWithDebounce(private val timeout: Duration) : SharingStarted {
+
+ override fun command(subscriptionCount: StateFlow): Flow =
+ subscriptionCount
+ .map { if (it > 0) SharingCommand.START else SharingCommand.STOP_AND_RESET_REPLAY_CACHE }
+ .debounce { if (it == SharingCommand.START) timeout else Duration.ZERO }
+ .distinctUntilChanged()
+
+ override fun toString(): String = "SharingStarted.WhileSubscribedWithDebounce($timeout)"
+}
diff --git a/sdk-registration/src/main/kotlin/com/pexip/sdk/registration/infinity/InfinityRegistration.kt b/sdk-registration/src/main/kotlin/com/pexip/sdk/registration/infinity/InfinityRegistration.kt
index b44e1c418..be38bb831 100644
--- a/sdk-registration/src/main/kotlin/com/pexip/sdk/registration/infinity/InfinityRegistration.kt
+++ b/sdk-registration/src/main/kotlin/com/pexip/sdk/registration/infinity/InfinityRegistration.kt
@@ -20,6 +20,7 @@ import com.pexip.sdk.api.infinity.InfinityService
import com.pexip.sdk.api.infinity.RequestRegistrationTokenResponse
import com.pexip.sdk.api.infinity.TokenStore
import com.pexip.sdk.api.infinity.TokenStore.Companion.refreshTokenIn
+import com.pexip.sdk.core.WhileSubscribedWithDebounce
import com.pexip.sdk.core.retry
import com.pexip.sdk.infinity.UnsupportedInfinityException
import com.pexip.sdk.registration.RegisteredDevicesCallback
@@ -28,36 +29,39 @@ import com.pexip.sdk.registration.RegistrationEvent
import com.pexip.sdk.registration.RegistrationEventListener
import com.pexip.sdk.registration.infinity.internal.RegisteredDevicesFetcher
import com.pexip.sdk.registration.infinity.internal.RegistrationEvent
-import com.pexip.sdk.registration.infinity.internal.registrationEvent
+import com.pexip.sdk.registration.infinity.internal.events
import kotlinx.coroutines.CoroutineScope
+import kotlinx.coroutines.DelicateCoroutinesApi
import kotlinx.coroutines.Dispatchers
+import kotlinx.coroutines.ExperimentalCoroutinesApi
import kotlinx.coroutines.SupervisorJob
-import kotlinx.coroutines.asCoroutineDispatcher
import kotlinx.coroutines.cancel
import kotlinx.coroutines.flow.MutableSharedFlow
import kotlinx.coroutines.flow.SharingStarted
-import kotlinx.coroutines.flow.flowOn
-import kotlinx.coroutines.flow.launchIn
+import kotlinx.coroutines.flow.buffer
+import kotlinx.coroutines.flow.mapNotNull
import kotlinx.coroutines.flow.merge
-import kotlinx.coroutines.flow.onEach
import kotlinx.coroutines.flow.shareIn
import kotlinx.coroutines.launch
+import kotlinx.coroutines.newSingleThreadContext
+import kotlinx.coroutines.withContext
import java.net.URL
import java.util.concurrent.CopyOnWriteArraySet
-import java.util.concurrent.Executors
public class InfinityRegistration private constructor(
step: InfinityService.RegistrationStep,
response: RequestRegistrationTokenResponse,
) : Registration {
- private val executor = Executors.newSingleThreadScheduledExecutor()
- private val scope = CoroutineScope(SupervisorJob() + executor.asCoroutineDispatcher())
+ @OptIn(ExperimentalCoroutinesApi::class, DelicateCoroutinesApi::class)
+ private val context = newSingleThreadContext("InfinityRegistration")
+ private val scope = CoroutineScope(SupervisorJob() + context)
private val store = TokenStore.create(response)
private val fetcher = RegisteredDevicesFetcher(step, store)
- private val registrationEvent = step
- .registrationEvent(store)
- .shareIn(scope, SharingStarted.Lazily)
+ private val event = step.events(store).shareIn(
+ scope = scope,
+ started = SharingStarted.WhileSubscribedWithDebounce(),
+ )
private val listeners = CopyOnWriteArraySet()
private val mutableRegistrationEvent = MutableSharedFlow()
@@ -72,10 +76,15 @@ public class InfinityRegistration private constructor(
releaseToken = { retry { step.releaseToken(it).await() } },
onFailure = { mutableRegistrationEvent.emit(RegistrationEvent(it)) },
)
- merge(registrationEvent, mutableRegistrationEvent)
- .onEach { event -> listeners.forEach { it.onRegistrationEvent(event) } }
- .flowOn(Dispatchers.Main.immediate)
- .launchIn(scope)
+ scope.launch {
+ merge(event.mapNotNull(::RegistrationEvent), mutableRegistrationEvent)
+ .buffer()
+ .collect { event ->
+ withContext(Dispatchers.Main.immediate) {
+ listeners.forEach { it.onRegistrationEvent(event) }
+ }
+ }
+ }
}
override fun getRegisteredDevices(query: String, callback: RegisteredDevicesCallback) {
@@ -98,7 +107,7 @@ public class InfinityRegistration private constructor(
override fun dispose() {
scope.cancel()
- executor.shutdown()
+ context.close()
listeners.clear()
}
@@ -114,6 +123,10 @@ public class InfinityRegistration private constructor(
* @return an instance of [InfinityRegistration]
* @throws UnsupportedInfinityException if the version of Infinity is not supported
*/
+ @Deprecated(
+ message = "Use a version of this method that accepts RegistrationStep",
+ replaceWith = ReplaceWith("create(service.newRequest(node).registration(deviceAlias)), response)"),
+ )
@JvmStatic
public fun create(
service: InfinityService,
diff --git a/sdk-registration/src/main/kotlin/com/pexip/sdk/registration/infinity/internal/Events.kt b/sdk-registration/src/main/kotlin/com/pexip/sdk/registration/infinity/internal/Events.kt
new file mode 100644
index 000000000..8a3806980
--- /dev/null
+++ b/sdk-registration/src/main/kotlin/com/pexip/sdk/registration/infinity/internal/Events.kt
@@ -0,0 +1,34 @@
+/*
+ * Copyright 2024 Pexip AS
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.pexip.sdk.registration.infinity.internal
+
+import com.pexip.sdk.api.coroutines.asFlow
+import com.pexip.sdk.api.infinity.InfinityService
+import com.pexip.sdk.api.infinity.TokenStore
+import kotlinx.coroutines.ExperimentalCoroutinesApi
+import kotlinx.coroutines.delay
+import kotlinx.coroutines.flow.flatMapLatest
+import kotlinx.coroutines.flow.flow
+import kotlinx.coroutines.flow.retryWhen
+import kotlin.time.Duration.Companion.seconds
+
+@OptIn(ExperimentalCoroutinesApi::class)
+internal fun InfinityService.RegistrationStep.events(store: TokenStore) = flow { emit(store.get()) }
+ .flatMapLatest { events(it).asFlow() }
+ .retryWhen { _, attempt ->
+ delay(attempt.seconds.coerceAtMost(5.seconds))
+ true
+ }
diff --git a/sdk-registration/src/main/kotlin/com/pexip/sdk/registration/infinity/internal/RegistrationEvent.kt b/sdk-registration/src/main/kotlin/com/pexip/sdk/registration/infinity/internal/RegistrationEvent.kt
index c1593a7ff..6dfcc4a3f 100644
--- a/sdk-registration/src/main/kotlin/com/pexip/sdk/registration/infinity/internal/RegistrationEvent.kt
+++ b/sdk-registration/src/main/kotlin/com/pexip/sdk/registration/infinity/internal/RegistrationEvent.kt
@@ -1,5 +1,5 @@
/*
- * Copyright 2022-2023 Pexip AS
+ * Copyright 2022-2024 Pexip AS
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -16,21 +16,11 @@
package com.pexip.sdk.registration.infinity.internal
import com.pexip.sdk.api.Event
-import com.pexip.sdk.api.coroutines.asFlow
import com.pexip.sdk.api.infinity.IncomingCancelledEvent
import com.pexip.sdk.api.infinity.IncomingEvent
-import com.pexip.sdk.api.infinity.InfinityService
-import com.pexip.sdk.api.infinity.TokenStore
import com.pexip.sdk.registration.FailureRegistrationEvent
import com.pexip.sdk.registration.IncomingCancelledRegistrationEvent
import com.pexip.sdk.registration.IncomingRegistrationEvent
-import kotlinx.coroutines.ExperimentalCoroutinesApi
-import kotlinx.coroutines.delay
-import kotlinx.coroutines.flow.flatMapLatest
-import kotlinx.coroutines.flow.flow
-import kotlinx.coroutines.flow.mapNotNull
-import kotlinx.coroutines.flow.retryWhen
-import kotlin.time.Duration.Companion.seconds
@Suppress("ktlint:standard:function-naming")
internal inline fun RegistrationEvent(
@@ -53,15 +43,3 @@ internal inline fun RegistrationEvent(
@Suppress("ktlint:standard:function-naming")
internal inline fun RegistrationEvent(t: Throwable, at: () -> Long = System::currentTimeMillis) =
FailureRegistrationEvent(at(), t)
-
-@OptIn(ExperimentalCoroutinesApi::class)
-internal fun InfinityService.RegistrationStep.registrationEvent(
- store: TokenStore,
- at: () -> Long = System::currentTimeMillis,
-) = flow { emit(store.get()) }
- .flatMapLatest { events(it).asFlow() }
- .mapNotNull { RegistrationEvent(it, at) }
- .retryWhen { _, attempt ->
- delay(attempt.seconds.coerceAtMost(5.seconds))
- true
- }
diff --git a/sdk-registration/src/test/kotlin/com/pexip/sdk/registration/infinity/internal/EventsTest.kt b/sdk-registration/src/test/kotlin/com/pexip/sdk/registration/infinity/internal/EventsTest.kt
new file mode 100644
index 000000000..9838ce280
--- /dev/null
+++ b/sdk-registration/src/test/kotlin/com/pexip/sdk/registration/infinity/internal/EventsTest.kt
@@ -0,0 +1,120 @@
+/*
+ * Copyright 2024 Pexip AS
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.pexip.sdk.registration.infinity.internal
+
+import app.cash.turbine.test
+import assertk.assertThat
+import assertk.assertions.isEqualTo
+import com.pexip.sdk.api.Event
+import com.pexip.sdk.api.EventSource
+import com.pexip.sdk.api.EventSourceFactory
+import com.pexip.sdk.api.EventSourceListener
+import com.pexip.sdk.api.infinity.InfinityService
+import com.pexip.sdk.api.infinity.Token
+import com.pexip.sdk.api.infinity.TokenStore
+import com.pexip.sdk.core.awaitSubscriptionCountAtLeast
+import kotlinx.coroutines.CoroutineScope
+import kotlinx.coroutines.flow.Flow
+import kotlinx.coroutines.flow.MutableSharedFlow
+import kotlinx.coroutines.flow.launchIn
+import kotlinx.coroutines.flow.onEach
+import kotlinx.coroutines.test.TestScope
+import kotlinx.coroutines.test.runTest
+import kotlin.random.Random
+import kotlin.test.BeforeTest
+import kotlin.test.Test
+
+class EventsTest {
+
+ private lateinit var event: MutableSharedFlow>
+ private lateinit var store: TokenStore
+
+ @BeforeTest
+ fun setUp() {
+ event = MutableSharedFlow()
+ store = TokenStore.create(Random.nextToken())
+ }
+
+ @Test
+ fun `maps Event to ConferenceEvent`() = runTest {
+ val step = testRegistrationStep()
+ step.events(store).test {
+ event.awaitSubscriptionCountAtLeast(1)
+ val events = List(10) { TestEvent() }
+ events.forEach {
+ event.emit(Result.success(it))
+ assertThat(awaitItem(), "event").isEqualTo(it)
+ }
+ expectNoEvents()
+ cancelAndIgnoreRemainingEvents()
+ }
+ }
+
+ @Test
+ fun `failure restarts the flow`() = runTest {
+ val step = testRegistrationStep()
+ step.events(store).test {
+ event.awaitSubscriptionCountAtLeast(1)
+ val event1 = TestEvent()
+ event.emit(Result.success(event1))
+ assertThat(awaitItem(), "event").isEqualTo(event1)
+ event.emit(Result.failure(Throwable()))
+ event.awaitSubscriptionCountAtLeast(1)
+ val event2 = TestEvent()
+ event.emit(Result.success(event2))
+ assertThat(awaitItem(), "event").isEqualTo(event2)
+ expectNoEvents()
+ cancelAndIgnoreRemainingEvents()
+ }
+ }
+
+ private fun TestScope.testRegistrationStep() = object : InfinityService.RegistrationStep {
+
+ override fun events(token: Token): EventSourceFactory {
+ assertThat(token, "token").isEqualTo(store.get())
+ return TestEventSourceFactory(backgroundScope, event)
+ }
+ }
+
+ private class TestEventSourceFactory(
+ private val scope: CoroutineScope,
+ private val event: Flow>,
+ ) : EventSourceFactory {
+
+ override fun create(listener: EventSourceListener): EventSource =
+ TestEventSource(scope, event, listener)
+ }
+
+ private class TestEventSource(
+ scope: CoroutineScope,
+ event: Flow>,
+ listener: EventSourceListener,
+ ) : EventSource {
+
+ private val job = event
+ .onEach {
+ it.fold(
+ onSuccess = { event -> listener.onEvent(this, event) },
+ onFailure = { t -> listener.onClosed(this, t) },
+ )
+ }
+ .launchIn(scope)
+
+ override fun cancel(): Unit = job.cancel()
+ }
+
+ private class TestEvent : Event
+}
diff --git a/sdk-registration/src/test/kotlin/com/pexip/sdk/registration/infinity/internal/RegistrationEventTest.kt b/sdk-registration/src/test/kotlin/com/pexip/sdk/registration/infinity/internal/RegistrationEventTest.kt
index 1a511b275..6f56b28fd 100644
--- a/sdk-registration/src/test/kotlin/com/pexip/sdk/registration/infinity/internal/RegistrationEventTest.kt
+++ b/sdk-registration/src/test/kotlin/com/pexip/sdk/registration/infinity/internal/RegistrationEventTest.kt
@@ -15,124 +15,22 @@
*/
package com.pexip.sdk.registration.infinity.internal
-import app.cash.turbine.test
-import assertk.Assert
-import assertk.all
-import assertk.assertThat
-import assertk.assertions.isEqualTo
-import assertk.assertions.isInstanceOf
-import assertk.assertions.prop
import com.pexip.sdk.api.Event
-import com.pexip.sdk.api.EventSource
-import com.pexip.sdk.api.EventSourceFactory
-import com.pexip.sdk.api.EventSourceListener
-import com.pexip.sdk.api.infinity.ByeEvent
import com.pexip.sdk.api.infinity.IncomingCancelledEvent
import com.pexip.sdk.api.infinity.IncomingEvent
-import com.pexip.sdk.api.infinity.InfinityService
-import com.pexip.sdk.api.infinity.Token
-import com.pexip.sdk.api.infinity.TokenStore
import com.pexip.sdk.registration.FailureRegistrationEvent
import com.pexip.sdk.registration.IncomingCancelledRegistrationEvent
import com.pexip.sdk.registration.IncomingRegistrationEvent
-import com.pexip.sdk.registration.RegistrationEvent
-import kotlinx.coroutines.CoroutineScope
-import kotlinx.coroutines.flow.Flow
-import kotlinx.coroutines.flow.MutableSharedFlow
-import kotlinx.coroutines.flow.first
-import kotlinx.coroutines.flow.launchIn
-import kotlinx.coroutines.flow.onEach
-import kotlinx.coroutines.test.TestScope
-import kotlinx.coroutines.test.runTest
-import kotlin.properties.Delegates
import kotlin.random.Random
-import kotlin.test.BeforeTest
import kotlin.test.Test
import kotlin.test.assertEquals
import kotlin.test.fail
internal class RegistrationEventTest {
- private var at by Delegates.notNull()
- private lateinit var event: MutableSharedFlow>
- private lateinit var store: TokenStore
-
- @BeforeTest
- fun setUp() {
- at = Random.nextLong(Long.MAX_VALUE)
- event = MutableSharedFlow()
- store = TokenStore.create(Random.nextToken())
- }
-
- @Test
- fun `maps Event to RegistrationEvent`() = runTest {
- val step = testRegistrationStep()
- step.registrationEvent(store) { at }.test {
- event.awaitSubscribers()
- val incomingEvent = IncomingEvent(
- conferenceAlias = Random.nextString(8),
- remoteDisplayName = Random.nextString(8),
- token = Random.nextString(8),
- )
- event.emit(Result.success(incomingEvent))
- assertThat(awaitItem(), "incomingEvent")
- .isInstanceOf()
- .all {
- at(at)
- prop(IncomingRegistrationEvent::conferenceAlias)
- .isEqualTo(incomingEvent.conferenceAlias)
- prop(IncomingRegistrationEvent::remoteDisplayName)
- .isEqualTo(incomingEvent.remoteDisplayName)
- prop(IncomingRegistrationEvent::token)
- .isEqualTo(incomingEvent.token)
- }
- val incomingCancelledEvent = IncomingCancelledEvent(Random.nextString(8))
- event.emit(Result.success(incomingCancelledEvent))
- assertThat(awaitItem(), "incomingCancelledEvent")
- .isInstanceOf()
- .all {
- at(at)
- prop(IncomingCancelledRegistrationEvent::token)
- .isEqualTo(incomingCancelledEvent.token)
- }
- val byeEvent = ByeEvent
- event.emit(Result.success(byeEvent))
- expectNoEvents()
- cancelAndIgnoreRemainingEvents()
- }
- }
-
- @Test
- fun `failure restarts the flow`() = runTest {
- val step = testRegistrationStep()
- step.registrationEvent(store) { at }.test {
- event.awaitSubscribers()
- val incomingCancelledEvent = IncomingCancelledEvent(Random.nextString(8))
- event.emit(Result.success(incomingCancelledEvent))
- assertThat(awaitItem(), "incomingCancelledEvent")
- .isInstanceOf()
- .all {
- at(at)
- prop(IncomingCancelledRegistrationEvent::token)
- .isEqualTo(incomingCancelledEvent.token)
- }
- event.emit(Result.failure(Throwable()))
- event.awaitSubscribers()
- event.emit(Result.success(incomingCancelledEvent))
- assertThat(awaitItem(), "presentationStopEvent")
- .isInstanceOf()
- .all {
- at(at)
- prop(IncomingCancelledRegistrationEvent::token)
- .isEqualTo(incomingCancelledEvent.token)
- }
- expectNoEvents()
- cancelAndIgnoreRemainingEvents()
- }
- }
-
@Test
fun `returns RegistrationEvent if type is registered`() {
+ val at = Random.nextLong(Long.MAX_VALUE)
val testCases = buildMap {
val incomingEvent = IncomingEvent(
conferenceAlias = Random.nextString(8),
@@ -167,45 +65,4 @@ internal class RegistrationEventTest {
}
private data object TestEvent : Event
-
- private suspend fun MutableSharedFlow.awaitSubscribers() {
- subscriptionCount.first { it > 0 }
- }
-
- private fun TestScope.testRegistrationStep() = object : InfinityService.RegistrationStep {
-
- override fun events(token: Token): EventSourceFactory {
- assertThat(token, "token").isEqualTo(store.get())
- return TestEventSourceFactory(backgroundScope, event)
- }
- }
-
- private class TestEventSourceFactory(
- private val scope: CoroutineScope,
- private val event: Flow>,
- ) : EventSourceFactory {
-
- override fun create(listener: EventSourceListener): EventSource =
- TestEventSource(scope, event, listener)
- }
-
- private class TestEventSource(
- scope: CoroutineScope,
- event: Flow>,
- listener: EventSourceListener,
- ) : EventSource {
-
- private val job = event
- .onEach {
- it.fold(
- onSuccess = { event -> listener.onEvent(this, event) },
- onFailure = { t -> listener.onClosed(this, t) },
- )
- }
- .launchIn(scope)
-
- override fun cancel(): Unit = job.cancel()
- }
-
- private fun Assert.at(at: Long) = prop(RegistrationEvent::at).isEqualTo(at)
}