Skip to content

Commit

Permalink
refactor(conference,registration): align internal events handling, us…
Browse files Browse the repository at this point in the history
…e newSingleThreadContext (#374)
  • Loading branch information
drymarau authored Mar 25, 2024
1 parent 45e46fa commit 321c477
Show file tree
Hide file tree
Showing 18 changed files with 285 additions and 252 deletions.
2 changes: 1 addition & 1 deletion .idea/kotlinc.xml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ public sealed interface ConferenceEvent {
}

@Deprecated(
message = "Use Roster.present instead to observe presentation state.",
message = "Use Roster.presenter instead to observe presentation state.",
level = DeprecationLevel.WARNING,
)
public data class PresentationStartConferenceEvent(
Expand All @@ -33,7 +33,7 @@ public data class PresentationStartConferenceEvent(
) : ConferenceEvent

@Deprecated(
message = "Use Roster.present instead to observe presentation state.",
message = "Use Roster.presenter instead to observe presentation state.",
level = DeprecationLevel.WARNING,
)
public data class PresentationStopConferenceEvent(override val at: Long) : ConferenceEvent
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,16 +35,17 @@ import com.pexip.sdk.conference.infinity.internal.MessengerImpl
import com.pexip.sdk.conference.infinity.internal.RefererImpl
import com.pexip.sdk.conference.infinity.internal.RosterImpl
import com.pexip.sdk.conference.infinity.internal.ThemeImpl
import com.pexip.sdk.conference.infinity.internal.WhileSubscribedWithDebounce
import com.pexip.sdk.conference.infinity.internal.events
import com.pexip.sdk.core.WhileSubscribedWithDebounce
import com.pexip.sdk.core.retry
import com.pexip.sdk.infinity.UnsupportedInfinityException
import com.pexip.sdk.media.IceServer
import com.pexip.sdk.media.MediaConnectionSignaling
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.DelicateCoroutinesApi
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.ExperimentalCoroutinesApi
import kotlinx.coroutines.SupervisorJob
import kotlinx.coroutines.asCoroutineDispatcher
import kotlinx.coroutines.cancel
import kotlinx.coroutines.flow.MutableSharedFlow
import kotlinx.coroutines.flow.SharingStarted
Expand All @@ -53,24 +54,24 @@ import kotlinx.coroutines.flow.mapNotNull
import kotlinx.coroutines.flow.merge
import kotlinx.coroutines.flow.shareIn
import kotlinx.coroutines.launch
import kotlinx.coroutines.newSingleThreadContext
import kotlinx.coroutines.withContext
import java.net.URL
import java.util.concurrent.CopyOnWriteArraySet
import java.util.concurrent.Executors
import kotlin.time.Duration.Companion.milliseconds

public class InfinityConference private constructor(
private val step: InfinityService.ConferenceStep,
response: RequestTokenResponse,
) : Conference {

private val executor = Executors.newSingleThreadScheduledExecutor()
private val scope = CoroutineScope(SupervisorJob() + executor.asCoroutineDispatcher())
@OptIn(ExperimentalCoroutinesApi::class, DelicateCoroutinesApi::class)
private val context = newSingleThreadContext("InfinityConference")
private val scope = CoroutineScope(SupervisorJob() + context)
private val store = TokenStore.create(response)

private val event = step.events(store).shareIn(
scope = scope,
started = SharingStarted.WhileSubscribedWithDebounce(100.milliseconds),
started = SharingStarted.WhileSubscribedWithDebounce(),
)
private val listeners = CopyOnWriteArraySet<ConferenceEventListener>()
private val mutableConferenceEvent = MutableSharedFlow<ConferenceEvent>()
Expand Down Expand Up @@ -161,7 +162,7 @@ public class InfinityConference private constructor(

override fun leave() {
scope.cancel()
executor.shutdown()
context.close()
listeners.clear()
}

Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2022-2023 Pexip AS
* Copyright 2022-2024 Pexip AS
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -20,34 +20,19 @@ import com.pexip.sdk.api.infinity.DisconnectEvent
import com.pexip.sdk.api.infinity.PresentationStartEvent
import com.pexip.sdk.api.infinity.PresentationStopEvent
import com.pexip.sdk.api.infinity.ReferEvent
import com.pexip.sdk.api.infinity.TokenStore
import com.pexip.sdk.conference.DisconnectConferenceEvent
import com.pexip.sdk.conference.FailureConferenceEvent
import com.pexip.sdk.conference.PresentationStartConferenceEvent
import com.pexip.sdk.conference.PresentationStopConferenceEvent
import com.pexip.sdk.conference.ReferConferenceEvent
import kotlinx.coroutines.flow.MutableSharedFlow
import java.util.UUID
import kotlin.properties.Delegates
import kotlin.random.Random
import kotlin.test.BeforeTest
import kotlin.test.Test
import kotlin.test.assertEquals
import kotlin.test.fail

class ConferenceEventTest {

private var at by Delegates.notNull<Long>()
private lateinit var event: MutableSharedFlow<Result<Event>>
private lateinit var store: TokenStore

@BeforeTest
fun setUp() {
at = Random.nextLong(Long.MAX_VALUE)
event = MutableSharedFlow()
store = TokenStore.create(Random.nextToken())
}

@Test
fun `returns ConferenceEvent if type is registered`() {
val at = Random.nextLong(Long.MAX_VALUE)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import assertk.fail
import com.pexip.sdk.api.infinity.DataChannelMessage
import com.pexip.sdk.conference.Message
import com.pexip.sdk.conference.MessageNotSentException
import com.pexip.sdk.core.awaitSubscriptionCountAtLeast
import com.pexip.sdk.media.Data
import com.pexip.sdk.media.DataChannel
import kotlinx.coroutines.flow.Flow
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import com.pexip.sdk.api.EventSourceListener
import com.pexip.sdk.api.infinity.InfinityService
import com.pexip.sdk.api.infinity.Token
import com.pexip.sdk.api.infinity.TokenStore
import com.pexip.sdk.core.awaitSubscriptionCountAtLeast
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.MutableSharedFlow
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ import com.pexip.sdk.api.infinity.TokenStore
import com.pexip.sdk.api.infinity.UpdateRequest
import com.pexip.sdk.api.infinity.UpdateResponse
import com.pexip.sdk.api.infinity.UpdateSdpEvent
import com.pexip.sdk.core.awaitSubscriptionCountAtLeast
import com.pexip.sdk.media.CandidateSignalingEvent
import com.pexip.sdk.media.IceServer
import com.pexip.sdk.media.OfferSignalingEvent
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ import com.pexip.sdk.conference.Layout
import com.pexip.sdk.conference.LayoutId
import com.pexip.sdk.conference.SplashScreen
import com.pexip.sdk.conference.TransformLayoutException
import com.pexip.sdk.core.awaitSubscriptionCountAtLeast
import kotlinx.coroutines.flow.MutableSharedFlow
import kotlinx.coroutines.test.runTest
import kotlin.random.Random
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,6 @@ package com.pexip.sdk.conference.infinity.internal
import com.pexip.sdk.api.infinity.DtmfRequest
import com.pexip.sdk.api.infinity.RefreshTokenResponse
import com.pexip.sdk.conference.Message
import kotlinx.coroutines.flow.MutableSharedFlow
import kotlinx.coroutines.flow.first
import java.util.UUID
import kotlin.random.Random

Expand All @@ -45,8 +43,3 @@ internal fun Random.nextMessage(at: Long = System.currentTimeMillis(), direct: B
payload = nextString(64),
direct = direct,
)

internal suspend fun <T> MutableSharedFlow<T>.awaitSubscriptionCountAtLeast(threshold: Int): Int {
require(threshold > 0) { "threshold must be a positive number." }
return subscriptionCount.first { it >= threshold }
}
9 changes: 9 additions & 0 deletions sdk-core/api/sdk-core.api
Original file line number Diff line number Diff line change
@@ -1,8 +1,17 @@
public abstract interface annotation class com/pexip/sdk/core/InternalSdkApi : java/lang/annotation/Annotation {
}

public final class com/pexip/sdk/core/MutableSharedFlowKt {
public static final fun awaitSubscriptionCountAtLeast (Lkotlinx/coroutines/flow/MutableSharedFlow;ILkotlin/coroutines/Continuation;)Ljava/lang/Object;
}

public final class com/pexip/sdk/core/RetryKt {
public static final fun retry-FbhrOv8 (IJJDLkotlin/jvm/functions/Function0;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
public static synthetic fun retry-FbhrOv8$default (IJJDLkotlin/jvm/functions/Function0;Lkotlin/coroutines/Continuation;ILjava/lang/Object;)Ljava/lang/Object;
}

public final class com/pexip/sdk/core/SharingStartedKt {
public static final fun WhileSubscribedWithDebounce-HG0u8IE (Lkotlinx/coroutines/flow/SharingStarted$Companion;J)Lkotlinx/coroutines/flow/SharingStarted;
public static synthetic fun WhileSubscribedWithDebounce-HG0u8IE$default (Lkotlinx/coroutines/flow/SharingStarted$Companion;JILjava/lang/Object;)Lkotlinx/coroutines/flow/SharingStarted;
}

30 changes: 30 additions & 0 deletions sdk-core/src/main/kotlin/com/pexip/sdk/core/MutableSharedFlow.kt
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
/*
* Copyright 2024 Pexip AS
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.pexip.sdk.core

import kotlinx.coroutines.flow.MutableSharedFlow
import kotlinx.coroutines.flow.first

/**
* Awaits until this [MutableSharedFlow] has at least [threshold] subscribes.
*
* @param threshold a number of subscriptions to await
*/
@InternalSdkApi
public suspend fun <T> MutableSharedFlow<T>.awaitSubscriptionCountAtLeast(threshold: Int) {
require(threshold > 0) { "threshold must be a positive number." }
subscriptionCount.first { it >= threshold }
}
48 changes: 48 additions & 0 deletions sdk-core/src/main/kotlin/com/pexip/sdk/core/SharingStarted.kt
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
/*
* Copyright 2024 Pexip AS
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.pexip.sdk.core

import kotlinx.coroutines.FlowPreview
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.SharingCommand
import kotlinx.coroutines.flow.SharingStarted
import kotlinx.coroutines.flow.StateFlow
import kotlinx.coroutines.flow.debounce
import kotlinx.coroutines.flow.distinctUntilChanged
import kotlinx.coroutines.flow.map
import kotlin.time.Duration
import kotlin.time.Duration.Companion.milliseconds

/**
* Sharing is started when the first subscriber appears after a given [timeout] has passed since the
* most recent subscription and stops if there are no subscribers.
*/
@Suppress("FunctionName")
@InternalSdkApi
public fun SharingStarted.Companion.WhileSubscribedWithDebounce(timeout: Duration = 100.milliseconds): SharingStarted =
StartedWhileSubscribedWithDebounce(timeout)

@OptIn(FlowPreview::class)
private class StartedWhileSubscribedWithDebounce(private val timeout: Duration) : SharingStarted {

override fun command(subscriptionCount: StateFlow<Int>): Flow<SharingCommand> =
subscriptionCount
.map { if (it > 0) SharingCommand.START else SharingCommand.STOP_AND_RESET_REPLAY_CACHE }
.debounce { if (it == SharingCommand.START) timeout else Duration.ZERO }
.distinctUntilChanged()

override fun toString(): String = "SharingStarted.WhileSubscribedWithDebounce($timeout)"
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import com.pexip.sdk.api.infinity.InfinityService
import com.pexip.sdk.api.infinity.RequestRegistrationTokenResponse
import com.pexip.sdk.api.infinity.TokenStore
import com.pexip.sdk.api.infinity.TokenStore.Companion.refreshTokenIn
import com.pexip.sdk.core.WhileSubscribedWithDebounce
import com.pexip.sdk.core.retry
import com.pexip.sdk.infinity.UnsupportedInfinityException
import com.pexip.sdk.registration.RegisteredDevicesCallback
Expand All @@ -28,36 +29,39 @@ import com.pexip.sdk.registration.RegistrationEvent
import com.pexip.sdk.registration.RegistrationEventListener
import com.pexip.sdk.registration.infinity.internal.RegisteredDevicesFetcher
import com.pexip.sdk.registration.infinity.internal.RegistrationEvent
import com.pexip.sdk.registration.infinity.internal.registrationEvent
import com.pexip.sdk.registration.infinity.internal.events
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.DelicateCoroutinesApi
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.ExperimentalCoroutinesApi
import kotlinx.coroutines.SupervisorJob
import kotlinx.coroutines.asCoroutineDispatcher
import kotlinx.coroutines.cancel
import kotlinx.coroutines.flow.MutableSharedFlow
import kotlinx.coroutines.flow.SharingStarted
import kotlinx.coroutines.flow.flowOn
import kotlinx.coroutines.flow.launchIn
import kotlinx.coroutines.flow.buffer
import kotlinx.coroutines.flow.mapNotNull
import kotlinx.coroutines.flow.merge
import kotlinx.coroutines.flow.onEach
import kotlinx.coroutines.flow.shareIn
import kotlinx.coroutines.launch
import kotlinx.coroutines.newSingleThreadContext
import kotlinx.coroutines.withContext
import java.net.URL
import java.util.concurrent.CopyOnWriteArraySet
import java.util.concurrent.Executors

public class InfinityRegistration private constructor(
step: InfinityService.RegistrationStep,
response: RequestRegistrationTokenResponse,
) : Registration {

private val executor = Executors.newSingleThreadScheduledExecutor()
private val scope = CoroutineScope(SupervisorJob() + executor.asCoroutineDispatcher())
@OptIn(ExperimentalCoroutinesApi::class, DelicateCoroutinesApi::class)
private val context = newSingleThreadContext("InfinityRegistration")
private val scope = CoroutineScope(SupervisorJob() + context)
private val store = TokenStore.create(response)
private val fetcher = RegisteredDevicesFetcher(step, store)
private val registrationEvent = step
.registrationEvent(store)
.shareIn(scope, SharingStarted.Lazily)
private val event = step.events(store).shareIn(
scope = scope,
started = SharingStarted.WhileSubscribedWithDebounce(),
)
private val listeners = CopyOnWriteArraySet<RegistrationEventListener>()
private val mutableRegistrationEvent = MutableSharedFlow<RegistrationEvent>()

Expand All @@ -72,10 +76,15 @@ public class InfinityRegistration private constructor(
releaseToken = { retry { step.releaseToken(it).await() } },
onFailure = { mutableRegistrationEvent.emit(RegistrationEvent(it)) },
)
merge(registrationEvent, mutableRegistrationEvent)
.onEach { event -> listeners.forEach { it.onRegistrationEvent(event) } }
.flowOn(Dispatchers.Main.immediate)
.launchIn(scope)
scope.launch {
merge(event.mapNotNull(::RegistrationEvent), mutableRegistrationEvent)
.buffer()
.collect { event ->
withContext(Dispatchers.Main.immediate) {
listeners.forEach { it.onRegistrationEvent(event) }
}
}
}
}

override fun getRegisteredDevices(query: String, callback: RegisteredDevicesCallback) {
Expand All @@ -98,7 +107,7 @@ public class InfinityRegistration private constructor(

override fun dispose() {
scope.cancel()
executor.shutdown()
context.close()
listeners.clear()
}

Expand Down
Loading

0 comments on commit 321c477

Please sign in to comment.