Skip to content
This repository has been archived by the owner on Jul 6, 2024. It is now read-only.

Commit

Permalink
Add PoC of typing indicator status
Browse files Browse the repository at this point in the history
  • Loading branch information
mklkj committed Feb 10, 2024
1 parent 5748f89 commit d3bbd4e
Show file tree
Hide file tree
Showing 7 changed files with 152 additions and 17 deletions.
7 changes: 7 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -482,6 +482,13 @@ przy wywołaniu serializera, tj. np. `Json.encodeToString()` zserializuje obiekt
nadrzędny nam chodzi, więc powinniśmy zapisać `Json.encodeToString<Typ>()` i tego mi właśnie
na początku zabrakło.

Idziemy dalej i implementujemy statusy o pisaniu wiadomości. Na razie zrobiłem to tak, że user
piszący wiadomość wysyła pusty obiekt websocketem do serwera, który rozgłasza info o tym, że ten
user coś pisze w obiekcie z jego participant id wszystkim zainteresowanym. Następnie ci wszyscy
zainteresowani trzymają listę wszystkich osób, które coś pisały i okresowo usuwają z niej te wpisy,
które są starsze niż 1 sekunda. Dzięki temu w UI pokazujemy tych, którzy faktycznie coś w danym
momencie piszą i ukrywamy, kiedy pisać przestają.

## Materiały

- biblioteki KMM 1 - https://github.com/terrakok/kmm-awesome
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ import io.github.mklkj.kommunicator.data.models.MessageBroadcast
import io.github.mklkj.kommunicator.data.models.MessageEvent
import io.github.mklkj.kommunicator.data.models.MessagePush
import io.github.mklkj.kommunicator.data.models.MessageRequest
import io.github.mklkj.kommunicator.data.models.TypingBroadcast
import io.github.mklkj.kommunicator.data.models.TypingPush
import io.github.mklkj.kommunicator.data.repository.MessagesRepository
import io.github.mklkj.kommunicator.getDeserialized
import io.ktor.client.plugins.websocket.DefaultClientWebSocketSession
Expand All @@ -16,9 +18,19 @@ import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.IO
import kotlinx.coroutines.SupervisorJob
import kotlinx.coroutines.cancel
import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.delay
import kotlinx.coroutines.flow.MutableStateFlow
import kotlinx.coroutines.flow.collect
import kotlinx.coroutines.flow.consumeAsFlow
import kotlinx.coroutines.flow.onEach
import kotlinx.coroutines.flow.update
import kotlinx.coroutines.launch
import kotlinx.datetime.Clock
import kotlinx.datetime.Instant
import kotlinx.uuid.UUID
import org.koin.core.annotation.Factory
import kotlin.time.Duration.Companion.seconds

private const val TAG = "ConversationClient"

Expand All @@ -32,13 +44,17 @@ class ConversationClient(

private var chatSession: DefaultClientWebSocketSession? = null
private var chatId: UUID? = null
private val typingChannel = Channel<Unit>()
val typingParticipants = MutableStateFlow<Map<UUID, Instant>>(emptyMap())

fun connect(chatId: UUID, onFailure: (Throwable) -> Unit) {
Logger.withTag(TAG).i("Connecting to websocket on $chatId chat")
this.chatId = chatId
scope.launch {
runCatching {
chatSession = messagesRepository.getChatSession(chatId)
initializeTypingObserver()
initializeTypingStaleTimer()
observeIncomingMessages()
}.onFailure(onFailure)
}
Expand All @@ -54,13 +70,24 @@ class ConversationClient(
messagesRepository.handleReceivedMessage(chatId ?: return, messageEvent)
}

is TypingBroadcast -> {
Logger.withTag(TAG).i("Receive typing from: ${messageEvent.participantId}")

typingParticipants.update {
it + mapOf(messageEvent.participantId to Clock.System.now())
}
}

// not implemented on server
is MessagePush -> Unit
TypingPush -> Unit
}
}
}

fun sendMessage(chatId: UUID, message: MessageRequest) {
fun sendMessage(message: MessageRequest) {
val chatId = chatId ?: return

scope.launch {
when (chatSession) {
null -> {
Expand All @@ -82,6 +109,34 @@ class ConversationClient(
}
}

private fun initializeTypingObserver() {
scope.launch {
typingChannel.consumeAsFlow()
// .debounce(0.5.seconds)
.onEach {
chatSession?.sendSerialized<MessageEvent>(TypingPush)
}
.collect()
}
}

private fun initializeTypingStaleTimer() {
scope.launch {
while (true) {
typingParticipants.update { participants ->
participants.filterValues { lastUpdate ->
Clock.System.now().minus(lastUpdate) <= 1.seconds
}
}
delay(1.seconds)
}
}
}

fun onTyping() {
typingChannel.trySend(Unit)
}

fun onDispose() {
chatSession?.cancel()
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,10 +40,10 @@ import cafe.adriel.voyager.koin.getScreenModel
import cafe.adriel.voyager.navigator.LocalNavigator
import cafe.adriel.voyager.navigator.currentOrThrow
import io.github.mklkj.kommunicator.data.db.entity.LocalMessage
import io.github.mklkj.kommunicator.data.models.Message
import io.github.mklkj.kommunicator.data.models.MessageRequest
import io.github.mklkj.kommunicator.ui.utils.collectAsStateWithLifecycle
import io.github.mklkj.kommunicator.ui.utils.scaffoldPadding
import kotlinx.datetime.Instant
import kotlinx.uuid.UUID

class ConversationScreen(private val chatId: UUID) : Screen {
Expand Down Expand Up @@ -74,6 +74,10 @@ class ConversationScreen(private val chatId: UUID) : Screen {
chatListState.animateScrollToItem(0)
}

LaunchedEffect(state.typingParticipants) {
chatListState.animateScrollToItem(0)
}

Scaffold(
topBar = {
TopAppBar(
Expand All @@ -95,19 +99,51 @@ class ConversationScreen(private val chatId: UUID) : Screen {
reverseLayout = true,
modifier = Modifier.weight(1f)
) {
items(state.typingParticipants.toList(), key = { it.first.toString() }) {
ChatTyping(it.second)
}
items(state.messages, key = { it.id.toString() }) {
ChatMessage(it)
}
}
ChatInput(
isLoading = state.isLoading,
onSendClick = { viewModel.sendMessage(chatId, it) },
onTyping = viewModel::onTyping,
onSendClick = viewModel::sendMessage,
modifier = Modifier.fillMaxWidth().imePadding()
)
}
}
}

@Composable
private fun ChatTyping(time: Instant, modifier: Modifier = Modifier) {
val bubbleColor = MaterialTheme.colorScheme.surface

Box(
contentAlignment = Alignment.CenterStart,
modifier = modifier
.padding(end = 50.dp)
.fillMaxWidth()
) {
Text(
text = "...\n$time",
modifier = Modifier
.padding(8.dp)
.clip(
RoundedCornerShape(
bottomStart = 20.dp,
bottomEnd = 20.dp,
topEnd = 20.dp,
topStart = 2.dp
)
)
.background(bubbleColor)
.padding(16.dp)
)
}
}

@Composable
private fun ChatMessage(message: LocalMessage, modifier: Modifier = Modifier) {
val bubbleColor = when {
Expand Down Expand Up @@ -148,6 +184,7 @@ class ConversationScreen(private val chatId: UUID) : Screen {
@Composable
private fun ChatInput(
isLoading: Boolean,
onTyping: () -> Unit,
onSendClick: (MessageRequest) -> Unit,
modifier: Modifier = Modifier,
) {
Expand All @@ -156,7 +193,10 @@ class ConversationScreen(private val chatId: UUID) : Screen {

TextField(
value = content,
onValueChange = { content = it },
onValueChange = {
onTyping()
content = it
},
maxLines = 3,
placeholder = { Text(text = "Type a message...") },
trailingIcon = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,13 @@ package io.github.mklkj.kommunicator.ui.modules.conversation

import io.github.mklkj.kommunicator.Chats
import io.github.mklkj.kommunicator.data.db.entity.LocalMessage
import kotlinx.datetime.Instant
import kotlinx.uuid.UUID

data class ConversationState(
val isLoading: Boolean = true,
val errorMessage: String? = null,
val chat: Chats? = null,
val messages: List<LocalMessage> = emptyList(),
val typingParticipants: Map<UUID, Instant> = emptyMap(),
)
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import io.github.mklkj.kommunicator.data.repository.MessagesRepository
import io.github.mklkj.kommunicator.data.ws.ConversationClient
import io.github.mklkj.kommunicator.ui.base.BaseViewModel
import kotlinx.coroutines.flow.collect
import kotlinx.coroutines.flow.onEach
import kotlinx.coroutines.flow.combine
import kotlinx.coroutines.flow.update
import kotlinx.uuid.UUID
import org.koin.core.annotation.Factory
Expand Down Expand Up @@ -38,8 +38,12 @@ class ConversationViewModel(
)
}

fun sendMessage(chatId: UUID, message: MessageRequest) {
conversationClient.sendMessage(chatId, message)
fun sendMessage(message: MessageRequest) {
conversationClient.sendMessage(message)
}

fun onTyping() {
conversationClient.onTyping()
}

override fun onDispose() {
Expand Down Expand Up @@ -72,13 +76,17 @@ class ConversationViewModel(

private fun observeMessages(chatId: UUID) {
launch("observe_messages", isFlowObserver = true) {
messagesRepository.observeMessages(chatId)
.onEach { messages ->
mutableState.update {
it.copy(messages = messages)
}
combine(
flow = messagesRepository.observeMessages(chatId),
flow2 = conversationClient.typingParticipants,
) { messages, typingParticipants ->
mutableState.update {
it.copy(
messages = messages,
typingParticipants = typingParticipants,
)
}
.collect()
}.collect()
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ import io.github.mklkj.kommunicator.data.models.MessageBroadcast
import io.github.mklkj.kommunicator.data.models.MessageEntity
import io.github.mklkj.kommunicator.data.models.MessageEvent
import io.github.mklkj.kommunicator.data.models.MessagePush
import io.github.mklkj.kommunicator.data.models.TypingBroadcast
import io.github.mklkj.kommunicator.data.models.TypingPush
import io.github.mklkj.kommunicator.data.service.ChatService
import io.github.mklkj.kommunicator.data.service.MessageService
import io.github.mklkj.kommunicator.data.service.NotificationService
Expand Down Expand Up @@ -136,6 +138,9 @@ fun Route.chatWebsockets() {
} else null
}
.onEach { message ->
val connections = chatConnections.getConnections(chatId)
val participantId = participants.single { userId == it.userId }.id

when (message) {
is MessagePush -> {
val entity = MessageEntity(
Expand All @@ -147,16 +152,13 @@ fun Route.chatWebsockets() {
)
messageService.saveMessage(entity)

val connections = chatConnections.getConnections(chatId)
connections
.filterNot { it.userId == userId }
.forEach { connection ->
println("Notify user: ${connection.userId}")
val event = MessageBroadcast(
id = entity.id,
participantId = participants
.single { userId == it.userId }
.id,
participantId = participantId,
content = entity.content,
createdAt = entity.timestamp
)
Expand All @@ -170,8 +172,20 @@ fun Route.chatWebsockets() {
)
}

TypingPush -> {
connections
.filterNot { it.userId == userId }
.forEach { connection ->
val event = TypingBroadcast(
participantId = participantId,
)
connection.session.sendSerialized<MessageEvent>(event)
}
}

// not handled on server
is MessageBroadcast -> Unit
is TypingBroadcast -> TODO()
}
}
.collect()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,3 +23,11 @@ data class MessagePush(
val id: UUID,
val content: String,
) : MessageEvent()

@Serializable
data object TypingPush : MessageEvent()

@Serializable
data class TypingBroadcast(
val participantId: UUID,
) : MessageEvent()

0 comments on commit d3bbd4e

Please sign in to comment.