diff --git a/CHANGES.md b/CHANGES.md index c5ad0fd9dff..b5e5c54669f 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -5,7 +5,7 @@ Features ✨: - Improvements 🙌: - - + - Rework sending Event management (#154) Bugfix 🐛: - diff --git a/matrix-sdk-android/src/main/java/org/matrix/android/sdk/api/session/room/model/relation/ReactionInfo.kt b/matrix-sdk-android/src/main/java/org/matrix/android/sdk/api/session/room/model/relation/ReactionInfo.kt index 5b5c9e68864..733d6c37e86 100644 --- a/matrix-sdk-android/src/main/java/org/matrix/android/sdk/api/session/room/model/relation/ReactionInfo.kt +++ b/matrix-sdk-android/src/main/java/org/matrix/android/sdk/api/session/room/model/relation/ReactionInfo.kt @@ -23,7 +23,7 @@ import com.squareup.moshi.JsonClass data class ReactionInfo( @Json(name = "rel_type") override val type: String?, @Json(name = "event_id") override val eventId: String, - val key: String, + @Json(name = "key") val key: String, // always null for reaction @Json(name = "m.in_reply_to") override val inReplyTo: ReplyToContent? = null, @Json(name = "option") override val option: Int? = null diff --git a/matrix-sdk-android/src/main/java/org/matrix/android/sdk/api/session/room/send/SendService.kt b/matrix-sdk-android/src/main/java/org/matrix/android/sdk/api/session/room/send/SendService.kt index 36f6e538a91..152a018e78a 100644 --- a/matrix-sdk-android/src/main/java/org/matrix/android/sdk/api/session/room/send/SendService.kt +++ b/matrix-sdk-android/src/main/java/org/matrix/android/sdk/api/session/room/send/SendService.kt @@ -123,11 +123,6 @@ interface SendService { */ fun deleteFailedEcho(localEcho: TimelineEvent) - /** - * Delete all the events in one of the sending states - */ - fun clearSendingQueue() - /** * Cancel sending a specific event. It has to be in one of the sending states */ diff --git a/matrix-sdk-android/src/main/java/org/matrix/android/sdk/internal/crypto/algorithms/megolm/MXMegolmEncryption.kt b/matrix-sdk-android/src/main/java/org/matrix/android/sdk/internal/crypto/algorithms/megolm/MXMegolmEncryption.kt index 1185ea7962f..466722788d5 100644 --- a/matrix-sdk-android/src/main/java/org/matrix/android/sdk/internal/crypto/algorithms/megolm/MXMegolmEncryption.kt +++ b/matrix-sdk-android/src/main/java/org/matrix/android/sdk/internal/crypto/algorithms/megolm/MXMegolmEncryption.kt @@ -420,7 +420,7 @@ internal class MXMegolmEncryption( sendToDeviceTask.execute(sendToDeviceParams) true } catch (failure: Throwable) { - Timber.v("## CRYPTO | CRYPTO | reshareKey() : fail to send <$sessionId> to $userId:$deviceId") + Timber.e(failure, "## CRYPTO | CRYPTO | reshareKey() : fail to send <$sessionId> to $userId:$deviceId") false } } diff --git a/matrix-sdk-android/src/main/java/org/matrix/android/sdk/internal/crypto/tasks/EncryptEventTask.kt b/matrix-sdk-android/src/main/java/org/matrix/android/sdk/internal/crypto/tasks/EncryptEventTask.kt index 75f4c1730f9..1b88fbe9cc7 100644 --- a/matrix-sdk-android/src/main/java/org/matrix/android/sdk/internal/crypto/tasks/EncryptEventTask.kt +++ b/matrix-sdk-android/src/main/java/org/matrix/android/sdk/internal/crypto/tasks/EncryptEventTask.kt @@ -17,8 +17,13 @@ package org.matrix.android.sdk.internal.crypto.tasks import org.matrix.android.sdk.api.session.crypto.CryptoService import org.matrix.android.sdk.api.session.events.model.Event +import org.matrix.android.sdk.api.session.events.model.EventType +import org.matrix.android.sdk.api.session.events.model.toContent import org.matrix.android.sdk.api.session.room.send.SendState +import org.matrix.android.sdk.internal.crypto.MXCRYPTO_ALGORITHM_MEGOLM +import org.matrix.android.sdk.internal.crypto.MXEventDecryptionResult import org.matrix.android.sdk.internal.crypto.model.MXEncryptEventContentResult +import org.matrix.android.sdk.internal.database.mapper.ContentMapper import org.matrix.android.sdk.internal.session.room.send.LocalEchoRepository import org.matrix.android.sdk.internal.task.Task import org.matrix.android.sdk.internal.util.awaitCallback @@ -38,13 +43,14 @@ internal class DefaultEncryptEventTask @Inject constructor( private val localEchoRepository: LocalEchoRepository ) : EncryptEventTask { override suspend fun execute(params: EncryptEventTask.Params): Event { - if (!params.crypto.isRoomEncrypted(params.roomId)) return params.event + // don't want to wait for any query + // if (!params.crypto.isRoomEncrypted(params.roomId)) return params.event val localEvent = params.event if (localEvent.eventId == null) { throw IllegalArgumentException() } - localEchoRepository.updateSendState(localEvent.eventId, SendState.ENCRYPTING) + localEchoRepository.updateSendState(localEvent.eventId, localEvent.roomId, SendState.ENCRYPTING) val localMutableContent = localEvent.content?.toMutableMap() ?: mutableMapOf() params.keepKeys?.forEach { @@ -52,6 +58,7 @@ internal class DefaultEncryptEventTask @Inject constructor( } // try { + // let it throws awaitCallback { params.crypto.encryptEventContent(localMutableContent, localEvent.type, params.roomId, it) }.let { result -> @@ -63,18 +70,34 @@ internal class DefaultEncryptEventTask @Inject constructor( } } val safeResult = result.copy(eventContent = modifiedContent) + // Better handling of local echo, to avoid decrypting transition on remote echo + // Should I only do it for text messages? + val decryptionLocalEcho = if (result.eventContent["algorithm"] == MXCRYPTO_ALGORITHM_MEGOLM) { + MXEventDecryptionResult( + clearEvent = Event( + type = localEvent.type, + content = localEvent.content, + roomId = localEvent.roomId + ).toContent(), + forwardingCurve25519KeyChain = emptyList(), + senderCurve25519Key = result.eventContent["sender_key"] as? String, + claimedEd25519Key = params.crypto.getMyDevice().fingerprint() + ) + } else { + null + } + + localEchoRepository.updateEcho(localEvent.eventId) { _, localEcho -> + localEcho.type = EventType.ENCRYPTED + localEcho.content = ContentMapper.map(modifiedContent) + decryptionLocalEcho?.also { + localEcho.setDecryptionResult(it) + } + } return localEvent.copy( type = safeResult.eventType, content = safeResult.eventContent ) } -// } catch (throwable: Throwable) { -// val sendState = when (throwable) { -// is Failure.CryptoError -> SendState.FAILED_UNKNOWN_DEVICES -// else -> SendState.UNDELIVERED -// } -// localEchoUpdater.updateSendState(localEvent.eventId, sendState) -// throw throwable -// } } } diff --git a/matrix-sdk-android/src/main/java/org/matrix/android/sdk/internal/crypto/tasks/RedactEventTask.kt b/matrix-sdk-android/src/main/java/org/matrix/android/sdk/internal/crypto/tasks/RedactEventTask.kt new file mode 100644 index 00000000000..f35d1b63e8f --- /dev/null +++ b/matrix-sdk-android/src/main/java/org/matrix/android/sdk/internal/crypto/tasks/RedactEventTask.kt @@ -0,0 +1,49 @@ +/* + * Copyright 2020 The Matrix.org Foundation C.I.C. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.matrix.android.sdk.internal.crypto.tasks + +import org.greenrobot.eventbus.EventBus +import org.matrix.android.sdk.internal.network.executeRequest +import org.matrix.android.sdk.internal.session.room.RoomAPI +import org.matrix.android.sdk.internal.session.room.send.SendResponse +import org.matrix.android.sdk.internal.task.Task +import javax.inject.Inject + +internal interface RedactEventTask : Task { + data class Params( + val txID: String, + val roomId: String, + val eventId: String, + val reason: String? + ) +} + +internal class DefaultRedactEventTask @Inject constructor( + private val roomAPI: RoomAPI, + private val eventBus: EventBus) : RedactEventTask { + + override suspend fun execute(params: RedactEventTask.Params): String { + val executeRequest = executeRequest(eventBus) { + apiCall = roomAPI.redactEvent( + txId = params.txID, + roomId = params.roomId, + eventId = params.eventId, + reason = if (params.reason == null) emptyMap() else mapOf("reason" to params.reason) + ) + } + return executeRequest.eventId + } +} diff --git a/matrix-sdk-android/src/main/java/org/matrix/android/sdk/internal/crypto/tasks/SendEventTask.kt b/matrix-sdk-android/src/main/java/org/matrix/android/sdk/internal/crypto/tasks/SendEventTask.kt index 10b0823c65e..1a712036c8f 100644 --- a/matrix-sdk-android/src/main/java/org/matrix/android/sdk/internal/crypto/tasks/SendEventTask.kt +++ b/matrix-sdk-android/src/main/java/org/matrix/android/sdk/internal/crypto/tasks/SendEventTask.kt @@ -15,6 +15,7 @@ */ package org.matrix.android.sdk.internal.crypto.tasks +import org.greenrobot.eventbus.EventBus import org.matrix.android.sdk.api.session.crypto.CryptoService import org.matrix.android.sdk.api.session.events.model.Event import org.matrix.android.sdk.api.session.room.send.SendState @@ -23,12 +24,12 @@ import org.matrix.android.sdk.internal.session.room.RoomAPI import org.matrix.android.sdk.internal.session.room.send.LocalEchoRepository import org.matrix.android.sdk.internal.session.room.send.SendResponse import org.matrix.android.sdk.internal.task.Task -import org.greenrobot.eventbus.EventBus import javax.inject.Inject internal interface SendEventTask : Task { data class Params( val event: Event, + val encrypt: Boolean, val cryptoService: CryptoService? ) } @@ -40,11 +41,11 @@ internal class DefaultSendEventTask @Inject constructor( private val eventBus: EventBus) : SendEventTask { override suspend fun execute(params: SendEventTask.Params): String { - val event = handleEncryption(params) - val localId = event.eventId!! - try { - localEchoRepository.updateSendState(localId, SendState.SENDING) + val event = handleEncryption(params) + val localId = event.eventId!! + + localEchoRepository.updateSendState(localId, params.event.roomId, SendState.SENDING) val executeRequest = executeRequest(eventBus) { apiCall = roomAPI.send( localId, @@ -53,26 +54,23 @@ internal class DefaultSendEventTask @Inject constructor( eventType = event.type ) } - localEchoRepository.updateSendState(localId, SendState.SENT) + localEchoRepository.updateSendState(localId, params.event.roomId, SendState.SENT) return executeRequest.eventId } catch (e: Throwable) { - localEchoRepository.updateSendState(localId, SendState.UNDELIVERED) +// localEchoRepository.updateSendState(params.event.eventId!!, SendState.UNDELIVERED) throw e } } + @Throws private suspend fun handleEncryption(params: SendEventTask.Params): Event { - if (params.cryptoService?.isRoomEncrypted(params.event.roomId ?: "") == true) { - try { - return encryptEventTask.execute(EncryptEventTask.Params( - params.event.roomId ?: "", - params.event, - listOf("m.relates_to"), - params.cryptoService - )) - } catch (throwable: Throwable) { - // We said it's ok to send verification request in clear - } + if (params.encrypt && !params.event.isEncrypted()) { + return encryptEventTask.execute(EncryptEventTask.Params( + params.event.roomId ?: "", + params.event, + listOf("m.relates_to"), + params.cryptoService!! + )) } return params.event } diff --git a/matrix-sdk-android/src/main/java/org/matrix/android/sdk/internal/crypto/tasks/SendVerificationMessageTask.kt b/matrix-sdk-android/src/main/java/org/matrix/android/sdk/internal/crypto/tasks/SendVerificationMessageTask.kt index b48f84ac910..782300c7b00 100644 --- a/matrix-sdk-android/src/main/java/org/matrix/android/sdk/internal/crypto/tasks/SendVerificationMessageTask.kt +++ b/matrix-sdk-android/src/main/java/org/matrix/android/sdk/internal/crypto/tasks/SendVerificationMessageTask.kt @@ -44,7 +44,7 @@ internal class DefaultSendVerificationMessageTask @Inject constructor( val localId = event.eventId!! try { - localEchoRepository.updateSendState(localId, SendState.SENDING) + localEchoRepository.updateSendState(localId, event.roomId, SendState.SENDING) val executeRequest = executeRequest(eventBus) { apiCall = roomAPI.send( localId, @@ -53,10 +53,10 @@ internal class DefaultSendVerificationMessageTask @Inject constructor( eventType = event.type ) } - localEchoRepository.updateSendState(localId, SendState.SENT) + localEchoRepository.updateSendState(localId, event.roomId, SendState.SENT) return executeRequest.eventId } catch (e: Throwable) { - localEchoRepository.updateSendState(localId, SendState.UNDELIVERED) + localEchoRepository.updateSendState(localId, event.roomId, SendState.UNDELIVERED) throw e } } diff --git a/matrix-sdk-android/src/main/java/org/matrix/android/sdk/internal/session/DefaultSession.kt b/matrix-sdk-android/src/main/java/org/matrix/android/sdk/internal/session/DefaultSession.kt index 679a24be0cc..359dd265d85 100644 --- a/matrix-sdk-android/src/main/java/org/matrix/android/sdk/internal/session/DefaultSession.kt +++ b/matrix-sdk-android/src/main/java/org/matrix/android/sdk/internal/session/DefaultSession.kt @@ -64,6 +64,7 @@ import org.matrix.android.sdk.internal.di.SessionId import org.matrix.android.sdk.internal.di.UnauthenticatedWithCertificate import org.matrix.android.sdk.internal.di.WorkManagerProvider import org.matrix.android.sdk.internal.session.identity.DefaultIdentityService +import org.matrix.android.sdk.internal.session.room.send.queue.EventSenderProcessor import org.matrix.android.sdk.internal.session.room.timeline.TimelineEventDecryptor import org.matrix.android.sdk.internal.session.sync.SyncTokenStore import org.matrix.android.sdk.internal.session.sync.job.SyncThread @@ -121,7 +122,8 @@ internal class DefaultSession @Inject constructor( private val taskExecutor: TaskExecutor, private val callSignalingService: Lazy, @UnauthenticatedWithCertificate - private val unauthenticatedWithCertificateOkHttpClient: Lazy + private val unauthenticatedWithCertificateOkHttpClient: Lazy, + private val eventSenderProcessor: EventSenderProcessor ) : Session, RoomService by roomService.get(), RoomDirectoryService by roomDirectoryService.get(), @@ -161,6 +163,7 @@ internal class DefaultSession @Inject constructor( } eventBus.register(this) timelineEventDecryptor.start() + eventSenderProcessor.start() } override fun requireBackgroundSync() { @@ -204,6 +207,7 @@ internal class DefaultSession @Inject constructor( cryptoService.get().close() isOpen = false eventBus.unregister(this) + eventSenderProcessor.interrupt() } override fun getSyncStateLive() = getSyncThread().liveState() diff --git a/matrix-sdk-android/src/main/java/org/matrix/android/sdk/internal/session/SessionComponent.kt b/matrix-sdk-android/src/main/java/org/matrix/android/sdk/internal/session/SessionComponent.kt index ffa7c841cfe..ed586d35f89 100644 --- a/matrix-sdk-android/src/main/java/org/matrix/android/sdk/internal/session/SessionComponent.kt +++ b/matrix-sdk-android/src/main/java/org/matrix/android/sdk/internal/session/SessionComponent.kt @@ -45,7 +45,6 @@ import org.matrix.android.sdk.internal.session.pushers.AddHttpPusherWorker import org.matrix.android.sdk.internal.session.pushers.PushersModule import org.matrix.android.sdk.internal.session.room.RoomModule import org.matrix.android.sdk.internal.session.room.relation.SendRelationWorker -import org.matrix.android.sdk.internal.session.room.send.EncryptEventWorker import org.matrix.android.sdk.internal.session.room.send.MultipleEventSendingDispatcherWorker import org.matrix.android.sdk.internal.session.room.send.RedactEventWorker import org.matrix.android.sdk.internal.session.room.send.SendEventWorker @@ -109,8 +108,6 @@ internal interface SessionComponent { fun inject(worker: SendRelationWorker) - fun inject(worker: EncryptEventWorker) - fun inject(worker: MultipleEventSendingDispatcherWorker) fun inject(worker: RedactEventWorker) diff --git a/matrix-sdk-android/src/main/java/org/matrix/android/sdk/internal/session/SessionModule.kt b/matrix-sdk-android/src/main/java/org/matrix/android/sdk/internal/session/SessionModule.kt index 355a152c828..102a34d9de7 100644 --- a/matrix-sdk-android/src/main/java/org/matrix/android/sdk/internal/session/SessionModule.kt +++ b/matrix-sdk-android/src/main/java/org/matrix/android/sdk/internal/session/SessionModule.kt @@ -43,6 +43,8 @@ import org.matrix.android.sdk.api.session.securestorage.SharedSecretStorageServi import org.matrix.android.sdk.api.session.typing.TypingUsersTracker import org.matrix.android.sdk.internal.crypto.crosssigning.ShieldTrustUpdater import org.matrix.android.sdk.internal.crypto.secrets.DefaultSharedSecretStorageService +import org.matrix.android.sdk.internal.crypto.tasks.DefaultRedactEventTask +import org.matrix.android.sdk.internal.crypto.tasks.RedactEventTask import org.matrix.android.sdk.internal.crypto.verification.VerificationMessageProcessor import org.matrix.android.sdk.internal.database.DatabaseCleaner import org.matrix.android.sdk.internal.database.EventInsertLiveObserver @@ -367,4 +369,7 @@ internal abstract class SessionModule { @Binds abstract fun bindTypingUsersTracker(tracker: DefaultTypingUsersTracker): TypingUsersTracker + + @Binds + abstract fun bindRedactEventTask(task: DefaultRedactEventTask): RedactEventTask } diff --git a/matrix-sdk-android/src/main/java/org/matrix/android/sdk/internal/session/call/DefaultCallSignalingService.kt b/matrix-sdk-android/src/main/java/org/matrix/android/sdk/internal/session/call/DefaultCallSignalingService.kt index 519beaf2ac2..019da27d27e 100644 --- a/matrix-sdk-android/src/main/java/org/matrix/android/sdk/internal/session/call/DefaultCallSignalingService.kt +++ b/matrix-sdk-android/src/main/java/org/matrix/android/sdk/internal/session/call/DefaultCallSignalingService.kt @@ -36,8 +36,8 @@ import org.matrix.android.sdk.api.util.NoOpCancellable import org.matrix.android.sdk.internal.di.UserId import org.matrix.android.sdk.internal.session.SessionScope import org.matrix.android.sdk.internal.session.call.model.MxCallImpl +import org.matrix.android.sdk.internal.session.room.send.queue.EventSenderProcessor import org.matrix.android.sdk.internal.session.room.send.LocalEchoEventFactory -import org.matrix.android.sdk.internal.session.room.send.RoomEventSender import org.matrix.android.sdk.internal.task.TaskExecutor import org.matrix.android.sdk.internal.task.configureWith import timber.log.Timber @@ -50,7 +50,7 @@ internal class DefaultCallSignalingService @Inject constructor( private val userId: String, private val activeCallHandler: ActiveCallHandler, private val localEchoEventFactory: LocalEchoEventFactory, - private val roomEventSender: RoomEventSender, + private val eventSenderProcessor: EventSenderProcessor, private val taskExecutor: TaskExecutor, private val turnServerTask: GetTurnServerTask ) : CallSignalingService { @@ -103,7 +103,7 @@ internal class DefaultCallSignalingService @Inject constructor( otherUserId = otherUserId, isVideoCall = isVideoCall, localEchoEventFactory = localEchoEventFactory, - roomEventSender = roomEventSender + eventSenderProcessor = eventSenderProcessor ) activeCallHandler.addCall(call).also { return call @@ -165,7 +165,7 @@ internal class DefaultCallSignalingService @Inject constructor( otherUserId = event.senderId ?: return@let, isVideoCall = content.isVideo(), localEchoEventFactory = localEchoEventFactory, - roomEventSender = roomEventSender + eventSenderProcessor = eventSenderProcessor ) activeCallHandler.addCall(incomingCall) onCallInvite(incomingCall, content) diff --git a/matrix-sdk-android/src/main/java/org/matrix/android/sdk/internal/session/call/model/MxCallImpl.kt b/matrix-sdk-android/src/main/java/org/matrix/android/sdk/internal/session/call/model/MxCallImpl.kt index 7edb375d8ef..6c0d437a601 100644 --- a/matrix-sdk-android/src/main/java/org/matrix/android/sdk/internal/session/call/model/MxCallImpl.kt +++ b/matrix-sdk-android/src/main/java/org/matrix/android/sdk/internal/session/call/model/MxCallImpl.kt @@ -29,8 +29,8 @@ import org.matrix.android.sdk.api.session.room.model.call.CallCandidatesContent import org.matrix.android.sdk.api.session.room.model.call.CallHangupContent import org.matrix.android.sdk.api.session.room.model.call.CallInviteContent import org.matrix.android.sdk.internal.session.call.DefaultCallSignalingService +import org.matrix.android.sdk.internal.session.room.send.queue.EventSenderProcessor import org.matrix.android.sdk.internal.session.room.send.LocalEchoEventFactory -import org.matrix.android.sdk.internal.session.room.send.RoomEventSender import org.webrtc.IceCandidate import org.webrtc.SessionDescription import timber.log.Timber @@ -43,7 +43,7 @@ internal class MxCallImpl( override val otherUserId: String, override val isVideoCall: Boolean, private val localEchoEventFactory: LocalEchoEventFactory, - private val roomEventSender: RoomEventSender + private val eventSenderProcessor: EventSenderProcessor ) : MxCall { override var state: CallState = CallState.Idle @@ -91,7 +91,7 @@ internal class MxCallImpl( offer = CallInviteContent.Offer(sdp = sdp.description) ) .let { createEventAndLocalEcho(type = EventType.CALL_INVITE, roomId = roomId, content = it.toContent()) } - .also { roomEventSender.sendEvent(it) } + .also { eventSenderProcessor.postEvent(it) } } override fun sendLocalIceCandidates(candidates: List) { @@ -106,7 +106,7 @@ internal class MxCallImpl( } ) .let { createEventAndLocalEcho(type = EventType.CALL_CANDIDATES, roomId = roomId, content = it.toContent()) } - .also { roomEventSender.sendEvent(it) } + .also { eventSenderProcessor.postEvent(it) } } override fun sendLocalIceCandidateRemovals(candidates: List) { @@ -119,7 +119,7 @@ internal class MxCallImpl( callId = callId ) .let { createEventAndLocalEcho(type = EventType.CALL_HANGUP, roomId = roomId, content = it.toContent()) } - .also { roomEventSender.sendEvent(it) } + .also { eventSenderProcessor.postEvent(it) } state = CallState.Terminated } @@ -132,7 +132,7 @@ internal class MxCallImpl( answer = CallAnswerContent.Answer(sdp = sdp.description) ) .let { createEventAndLocalEcho(type = EventType.CALL_ANSWER, roomId = roomId, content = it.toContent()) } - .also { roomEventSender.sendEvent(it) } + .also { eventSenderProcessor.postEvent(it) } } private fun createEventAndLocalEcho(localId: String = LocalEcho.createLocalEchoId(), type: String, roomId: String, content: Content): Event { diff --git a/matrix-sdk-android/src/main/java/org/matrix/android/sdk/internal/session/room/relation/DefaultRelationService.kt b/matrix-sdk-android/src/main/java/org/matrix/android/sdk/internal/session/room/relation/DefaultRelationService.kt index a151a16383c..38c542e07ea 100644 --- a/matrix-sdk-android/src/main/java/org/matrix/android/sdk/internal/session/room/relation/DefaultRelationService.kt +++ b/matrix-sdk-android/src/main/java/org/matrix/android/sdk/internal/session/room/relation/DefaultRelationService.kt @@ -17,7 +17,6 @@ package org.matrix.android.sdk.internal.session.room.relation import androidx.lifecycle.LiveData import androidx.lifecycle.Transformations -import androidx.work.OneTimeWorkRequest import com.squareup.inject.assisted.Assisted import com.squareup.inject.assisted.AssistedInject import com.zhuinden.monarchy.Monarchy @@ -39,21 +38,18 @@ import org.matrix.android.sdk.internal.database.model.TimelineEventEntity import org.matrix.android.sdk.internal.database.query.where import org.matrix.android.sdk.internal.di.SessionDatabase import org.matrix.android.sdk.internal.di.SessionId -import org.matrix.android.sdk.internal.session.room.send.EncryptEventWorker import org.matrix.android.sdk.internal.session.room.send.LocalEchoEventFactory -import org.matrix.android.sdk.internal.session.room.send.RedactEventWorker -import org.matrix.android.sdk.internal.session.room.send.SendEventWorker -import org.matrix.android.sdk.internal.session.room.timeline.TimelineSendEventWorkCommon +import org.matrix.android.sdk.internal.session.room.send.queue.EventSenderProcessor import org.matrix.android.sdk.internal.task.TaskExecutor import org.matrix.android.sdk.internal.task.configureWith import org.matrix.android.sdk.internal.util.fetchCopyMap -import org.matrix.android.sdk.internal.worker.WorkerParamsFactory import timber.log.Timber internal class DefaultRelationService @AssistedInject constructor( @Assisted private val roomId: String, @SessionId private val sessionId: String, - private val timeLineSendEventWorkCommon: TimelineSendEventWorkCommon, +// private val timeLineSendEventWorkCommon: TimelineSendEventWorkCommon, + private val eventSenderProcessor: EventSenderProcessor, private val eventFactory: LocalEchoEventFactory, private val cryptoService: CryptoService, private val findReactionEventForUndoTask: FindReactionEventForUndoTask, @@ -83,8 +79,7 @@ internal class DefaultRelationService @AssistedInject constructor( .none { it.addedByMe && it.key == reaction }) { val event = eventFactory.createReactionEvent(roomId, targetEventId, reaction) .also { saveLocalEcho(it) } - val sendRelationWork = createSendEventWork(event, true) - timeLineSendEventWorkCommon.postWork(roomId, sendRelationWork) + return eventSenderProcessor.postEvent(event, false /* reaction are not encrypted*/) } else { Timber.w("Reaction already added") NoOpCancellable @@ -107,9 +102,7 @@ internal class DefaultRelationService @AssistedInject constructor( data.redactEventId?.let { toRedact -> val redactEvent = eventFactory.createRedactEvent(roomId, toRedact, null) .also { saveLocalEcho(it) } - val redactWork = createRedactEventWork(redactEvent, toRedact, null) - - timeLineSendEventWorkCommon.postWork(roomId, redactWork) + eventSenderProcessor.postRedaction(redactEvent, null) } } } @@ -121,18 +114,6 @@ internal class DefaultRelationService @AssistedInject constructor( .executeBy(taskExecutor) } - // TODO duplicate with send service? - private fun createRedactEventWork(localEvent: Event, eventId: String, reason: String?): OneTimeWorkRequest { - val sendContentWorkerParams = RedactEventWorker.Params( - sessionId, - localEvent.eventId!!, - roomId, - eventId, - reason) - val redactWorkData = WorkerParamsFactory.toData(sendContentWorkerParams) - return timeLineSendEventWorkCommon.createWork(redactWorkData, true) - } - override fun editTextMessage(targetEventId: String, msgType: String, newBodyText: CharSequence, @@ -141,14 +122,7 @@ internal class DefaultRelationService @AssistedInject constructor( val event = eventFactory .createReplaceTextEvent(roomId, targetEventId, newBodyText, newBodyAutoMarkdown, msgType, compatibilityBodyText) .also { saveLocalEcho(it) } - return if (cryptoService.isRoomEncrypted(roomId)) { - val encryptWork = createEncryptEventWork(event, listOf("m.relates_to")) - val workRequest = createSendEventWork(event, false) - timeLineSendEventWorkCommon.postSequentialWorks(roomId, encryptWork, workRequest) - } else { - val workRequest = createSendEventWork(event, true) - timeLineSendEventWorkCommon.postWork(roomId, workRequest) - } + return eventSenderProcessor.postEvent(event, cryptoService.isRoomEncrypted(roomId)) } override fun editReply(replyToEdit: TimelineEvent, @@ -165,14 +139,7 @@ internal class DefaultRelationService @AssistedInject constructor( compatibilityBodyText ) .also { saveLocalEcho(it) } - return if (cryptoService.isRoomEncrypted(roomId)) { - val encryptWork = createEncryptEventWork(event, listOf("m.relates_to")) - val workRequest = createSendEventWork(event, false) - timeLineSendEventWorkCommon.postSequentialWorks(roomId, encryptWork, workRequest) - } else { - val workRequest = createSendEventWork(event, true) - timeLineSendEventWorkCommon.postWork(roomId, workRequest) - } + return eventSenderProcessor.postEvent(event, cryptoService.isRoomEncrypted(roomId)) } override fun fetchEditHistory(eventId: String, callback: MatrixCallback>) { @@ -189,27 +156,7 @@ internal class DefaultRelationService @AssistedInject constructor( ?.also { saveLocalEcho(it) } ?: return null - return if (cryptoService.isRoomEncrypted(roomId)) { - val encryptWork = createEncryptEventWork(event, listOf("m.relates_to")) - val workRequest = createSendEventWork(event, false) - timeLineSendEventWorkCommon.postSequentialWorks(roomId, encryptWork, workRequest) - } else { - val workRequest = createSendEventWork(event, true) - timeLineSendEventWorkCommon.postWork(roomId, workRequest) - } - } - - private fun createEncryptEventWork(event: Event, keepKeys: List?): OneTimeWorkRequest { - // Same parameter - val params = EncryptEventWorker.Params(sessionId, event.eventId!!, keepKeys) - val sendWorkData = WorkerParamsFactory.toData(params) - return timeLineSendEventWorkCommon.createWork(sendWorkData, true) - } - - private fun createSendEventWork(event: Event, startChain: Boolean): OneTimeWorkRequest { - val sendContentWorkerParams = SendEventWorker.Params(sessionId = sessionId, eventId = event.eventId!!) - val sendWorkData = WorkerParamsFactory.toData(sendContentWorkerParams) - return timeLineSendEventWorkCommon.createWork(sendWorkData, startChain) + return eventSenderProcessor.postEvent(event, cryptoService.isRoomEncrypted(roomId)) } override fun getEventAnnotationsSummary(eventId: String): EventAnnotationsSummary? { diff --git a/matrix-sdk-android/src/main/java/org/matrix/android/sdk/internal/session/room/send/DefaultSendService.kt b/matrix-sdk-android/src/main/java/org/matrix/android/sdk/internal/session/room/send/DefaultSendService.kt index ec366cb6aaf..5c395c19070 100644 --- a/matrix-sdk-android/src/main/java/org/matrix/android/sdk/internal/session/room/send/DefaultSendService.kt +++ b/matrix-sdk-android/src/main/java/org/matrix/android/sdk/internal/session/room/send/DefaultSendService.kt @@ -48,10 +48,9 @@ import org.matrix.android.sdk.api.util.NoOpCancellable import org.matrix.android.sdk.internal.di.SessionId import org.matrix.android.sdk.internal.di.WorkManagerProvider import org.matrix.android.sdk.internal.session.content.UploadContentWorker -import org.matrix.android.sdk.internal.session.room.timeline.TimelineSendEventWorkCommon +import org.matrix.android.sdk.internal.session.room.send.queue.EventSenderProcessor import org.matrix.android.sdk.internal.task.TaskExecutor import org.matrix.android.sdk.internal.util.CancelableWork -import org.matrix.android.sdk.internal.worker.AlwaysSuccessfulWorker import org.matrix.android.sdk.internal.worker.WorkerParamsFactory import org.matrix.android.sdk.internal.worker.startChain import timber.log.Timber @@ -63,13 +62,12 @@ private const val UPLOAD_WORK = "UPLOAD_WORK" internal class DefaultSendService @AssistedInject constructor( @Assisted private val roomId: String, private val workManagerProvider: WorkManagerProvider, - private val timelineSendEventWorkCommon: TimelineSendEventWorkCommon, @SessionId private val sessionId: String, private val localEchoEventFactory: LocalEchoEventFactory, private val cryptoService: CryptoService, private val taskExecutor: TaskExecutor, private val localEchoRepository: LocalEchoRepository, - private val roomEventSender: RoomEventSender, + private val eventSenderProcessor: EventSenderProcessor, private val cancelSendTracker: CancelSendTracker ) : SendService { @@ -92,19 +90,6 @@ internal class DefaultSendService @AssistedInject constructor( .let { sendEvent(it) } } - // For test only - private fun sendTextMessages(text: CharSequence, msgType: String, autoMarkdown: Boolean, times: Int): Cancelable { - return CancelableBag().apply { - // Send the event several times - repeat(times) { i -> - localEchoEventFactory.createTextEvent(roomId, msgType, "$text - $i", autoMarkdown) - .also { createLocalEcho(it) } - .let { sendEvent(it) } - .also { add(it) } - } - } - } - override fun sendFormattedTextMessage(text: String, formattedText: String, msgType: String): Cancelable { return localEchoEventFactory.createFormattedTextEvent(roomId, TextContent(text, formattedText), msgType) .also { createLocalEcho(it) } @@ -133,13 +118,14 @@ internal class DefaultSendService @AssistedInject constructor( override fun redactEvent(event: Event, reason: String?): Cancelable { // TODO manage media/attachements? - return createRedactEventWork(event, reason) - .let { timelineSendEventWorkCommon.postWork(roomId, it) } + val redactionEcho = localEchoEventFactory.createRedactEvent(roomId, event.eventId!!, reason) + .also { createLocalEcho(it) } + return eventSenderProcessor.postRedaction(redactionEcho, reason) } override fun resendTextMessage(localEcho: TimelineEvent): Cancelable { if (localEcho.root.isTextMessage() && localEcho.root.sendState.hasFailed()) { - localEchoRepository.updateSendState(localEcho.eventId, SendState.UNSENT) + localEchoRepository.updateSendState(localEcho.eventId, roomId, SendState.UNSENT) return sendEvent(localEcho.root) } return NoOpCancellable @@ -153,7 +139,7 @@ internal class DefaultSendService @AssistedInject constructor( val url = messageContent.getFileUrl() ?: return NoOpCancellable if (url.startsWith("mxc://")) { // We need to resend only the message as the attachment is ok - localEchoRepository.updateSendState(localEcho.eventId, SendState.UNSENT) + localEchoRepository.updateSendState(localEcho.eventId, roomId, SendState.UNSENT) return sendEvent(localEcho.root) } @@ -170,7 +156,7 @@ internal class DefaultSendService @AssistedInject constructor( queryUri = Uri.parse(messageContent.url), type = ContentAttachmentData.Type.IMAGE ) - localEchoRepository.updateSendState(localEcho.eventId, SendState.UNSENT) + localEchoRepository.updateSendState(localEcho.eventId, roomId, SendState.UNSENT) internalSendMedia(listOf(localEcho.root), attachmentData, true) } is MessageVideoContent -> { @@ -184,7 +170,7 @@ internal class DefaultSendService @AssistedInject constructor( queryUri = Uri.parse(messageContent.url), type = ContentAttachmentData.Type.VIDEO ) - localEchoRepository.updateSendState(localEcho.eventId, SendState.UNSENT) + localEchoRepository.updateSendState(localEcho.eventId, roomId, SendState.UNSENT) internalSendMedia(listOf(localEcho.root), attachmentData, true) } is MessageFileContent -> { @@ -195,7 +181,7 @@ internal class DefaultSendService @AssistedInject constructor( queryUri = Uri.parse(messageContent.url), type = ContentAttachmentData.Type.FILE ) - localEchoRepository.updateSendState(localEcho.eventId, SendState.UNSENT) + localEchoRepository.updateSendState(localEcho.eventId, roomId, SendState.UNSENT) internalSendMedia(listOf(localEcho.root), attachmentData, true) } is MessageAudioContent -> { @@ -207,7 +193,7 @@ internal class DefaultSendService @AssistedInject constructor( queryUri = Uri.parse(messageContent.url), type = ContentAttachmentData.Type.AUDIO ) - localEchoRepository.updateSendState(localEcho.eventId, SendState.UNSENT) + localEchoRepository.updateSendState(localEcho.eventId, roomId, SendState.UNSENT) internalSendMedia(listOf(localEcho.root), attachmentData, true) } else -> NoOpCancellable @@ -222,25 +208,6 @@ internal class DefaultSendService @AssistedInject constructor( } } - override fun clearSendingQueue() { - timelineSendEventWorkCommon.cancelAllWorks(roomId) - workManagerProvider.workManager.cancelUniqueWork(buildWorkName(UPLOAD_WORK)) - - // Replace the worker chains with a AlwaysSuccessfulWorker, to ensure the queues are well emptied - workManagerProvider.matrixOneTimeWorkRequestBuilder() - .build().let { - timelineSendEventWorkCommon.postWork(roomId, it, ExistingWorkPolicy.REPLACE) - - // need to clear also image sending queue - workManagerProvider.workManager - .beginUniqueWork(buildWorkName(UPLOAD_WORK), ExistingWorkPolicy.REPLACE, it) - .enqueue() - } - taskExecutor.executorScope.launch { - localEchoRepository.clearSendingQueue(roomId) - } - } - override fun cancelSend(eventId: String) { cancelSendTracker.markLocalEchoForCancel(eventId, roomId) taskExecutor.executorScope.launch { @@ -262,13 +229,6 @@ internal class DefaultSendService @AssistedInject constructor( } } -// override fun failAllPendingMessages() { -// taskExecutor.executorScope.launch { -// val eventsToResend = localEchoRepository.getAllEventsWithStates(roomId, SendState.PENDING_STATES) -// localEchoRepository.updateSendState(roomId, eventsToResend.map { it.eventId }, SendState.UNDELIVERED) -// } -// } - override fun sendMedia(attachment: ContentAttachmentData, compressBeforeSending: Boolean, roomIds: Set): Cancelable { @@ -301,7 +261,7 @@ internal class DefaultSendService @AssistedInject constructor( val dispatcherWork = createMultipleEventDispatcherWork(isRoomEncrypted) workManagerProvider.workManager - .beginUniqueWork(buildWorkName(UPLOAD_WORK), ExistingWorkPolicy.APPEND, uploadWork) + .beginUniqueWork(buildWorkName(UPLOAD_WORK), ExistingWorkPolicy.APPEND_OR_REPLACE, uploadWork) .then(dispatcherWork) .enqueue() .also { operation -> @@ -322,7 +282,7 @@ internal class DefaultSendService @AssistedInject constructor( } private fun sendEvent(event: Event): Cancelable { - return roomEventSender.sendEvent(event) + return eventSenderProcessor.postEvent(event, cryptoService.isRoomEncrypted(event.roomId!!)) } private fun createLocalEcho(event: Event) { @@ -333,28 +293,6 @@ internal class DefaultSendService @AssistedInject constructor( return "${roomId}_$identifier" } - private fun createEncryptEventWork(event: Event, startChain: Boolean): OneTimeWorkRequest { - // Same parameter - return EncryptEventWorker.Params(sessionId, event.eventId ?: "") - .let { WorkerParamsFactory.toData(it) } - .let { - workManagerProvider.matrixOneTimeWorkRequestBuilder() - .setConstraints(WorkManagerProvider.workConstraints) - .setInputData(it) - .startChain(startChain) - .setBackoffCriteria(BackoffPolicy.LINEAR, WorkManagerProvider.BACKOFF_DELAY, TimeUnit.MILLISECONDS) - .build() - } - } - - private fun createRedactEventWork(event: Event, reason: String?): OneTimeWorkRequest { - return localEchoEventFactory.createRedactEvent(roomId, event.eventId!!, reason) - .also { createLocalEcho(it) } - .let { RedactEventWorker.Params(sessionId, it.eventId!!, roomId, event.eventId, reason) } - .let { WorkerParamsFactory.toData(it) } - .let { timelineSendEventWorkCommon.createWork(it, true) } - } - private fun createUploadMediaWork(allLocalEchos: List, attachment: ContentAttachmentData, isRoomEncrypted: Boolean, diff --git a/matrix-sdk-android/src/main/java/org/matrix/android/sdk/internal/session/room/send/EncryptEventWorker.kt b/matrix-sdk-android/src/main/java/org/matrix/android/sdk/internal/session/room/send/EncryptEventWorker.kt deleted file mode 100644 index 73b4c48e146..00000000000 --- a/matrix-sdk-android/src/main/java/org/matrix/android/sdk/internal/session/room/send/EncryptEventWorker.kt +++ /dev/null @@ -1,146 +0,0 @@ -/* - * Copyright 2020 The Matrix.org Foundation C.I.C. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.matrix.android.sdk.internal.session.room.send - -import android.content.Context -import androidx.work.WorkerParameters -import com.squareup.moshi.JsonClass -import org.matrix.android.sdk.api.failure.Failure -import org.matrix.android.sdk.api.session.crypto.CryptoService -import org.matrix.android.sdk.api.session.events.model.Event -import org.matrix.android.sdk.api.session.events.model.EventType -import org.matrix.android.sdk.api.session.events.model.toContent -import org.matrix.android.sdk.api.session.room.send.SendState -import org.matrix.android.sdk.internal.crypto.MXCRYPTO_ALGORITHM_MEGOLM -import org.matrix.android.sdk.internal.crypto.MXEventDecryptionResult -import org.matrix.android.sdk.internal.crypto.model.MXEncryptEventContentResult -import org.matrix.android.sdk.internal.database.mapper.ContentMapper -import org.matrix.android.sdk.internal.session.SessionComponent -import org.matrix.android.sdk.internal.util.awaitCallback -import org.matrix.android.sdk.internal.worker.SessionSafeCoroutineWorker -import org.matrix.android.sdk.internal.worker.SessionWorkerParams -import org.matrix.android.sdk.internal.worker.WorkerParamsFactory -import timber.log.Timber -import javax.inject.Inject - -/** - * Possible previous worker: None - * Possible next worker : Always [SendEventWorker] - */ -internal class EncryptEventWorker(context: Context, params: WorkerParameters) - : SessionSafeCoroutineWorker(context, params, Params::class.java) { - - @JsonClass(generateAdapter = true) - internal data class Params( - override val sessionId: String, - val eventId: String, - /** Do not encrypt these keys, keep them as is in encrypted content (e.g. m.relates_to) */ - val keepKeys: List? = null, - override val lastFailureMessage: String? = null - ) : SessionWorkerParams - - @Inject lateinit var crypto: CryptoService - @Inject lateinit var localEchoRepository: LocalEchoRepository - @Inject lateinit var cancelSendTracker: CancelSendTracker - - override fun injectWith(injector: SessionComponent) { - injector.inject(this) - } - - override suspend fun doSafeWork(params: Params): Result { - Timber.v("## SendEvent: Start Encrypt work for event ${params.eventId}") - - val localEvent = localEchoRepository.getUpToDateEcho(params.eventId) - if (localEvent?.eventId == null) { - return Result.success() - } - - if (cancelSendTracker.isCancelRequestedFor(localEvent.eventId, localEvent.roomId)) { - return Result.success() - .also { Timber.e("## SendEvent: Event sending has been cancelled ${localEvent.eventId}") } - } - - localEchoRepository.updateSendState(localEvent.eventId, SendState.ENCRYPTING) - - val localMutableContent = localEvent.content?.toMutableMap() ?: mutableMapOf() - params.keepKeys?.forEach { - localMutableContent.remove(it) - } - - var error: Throwable? = null - var result: MXEncryptEventContentResult? = null - try { - result = awaitCallback { - crypto.encryptEventContent(localMutableContent, localEvent.type, localEvent.roomId!!, it) - } - } catch (throwable: Throwable) { - error = throwable - } - if (result != null) { - val modifiedContent = HashMap(result.eventContent) - params.keepKeys?.forEach { toKeep -> - localEvent.content?.get(toKeep)?.let { - // put it back in the encrypted thing - modifiedContent[toKeep] = it - } - } - // Better handling of local echo, to avoid decrypting transition on remote echo - // Should I only do it for text messages? - val decryptionLocalEcho = if (result.eventContent["algorithm"] == MXCRYPTO_ALGORITHM_MEGOLM) { - MXEventDecryptionResult( - clearEvent = Event( - type = localEvent.type, - content = localEvent.content, - roomId = localEvent.roomId - ).toContent(), - forwardingCurve25519KeyChain = emptyList(), - senderCurve25519Key = result.eventContent["sender_key"] as? String, - claimedEd25519Key = crypto.getMyDevice().fingerprint() - ) - } else { - null - } - localEchoRepository.updateEcho(localEvent.eventId) { _, localEcho -> - localEcho.type = EventType.ENCRYPTED - localEcho.content = ContentMapper.map(modifiedContent) - decryptionLocalEcho?.also { - localEcho.setDecryptionResult(it) - } - } - - val nextWorkerParams = SendEventWorker.Params(sessionId = params.sessionId, eventId = params.eventId) - return Result.success(WorkerParamsFactory.toData(nextWorkerParams)) - } else { - val sendState = when (error) { - is Failure.CryptoError -> SendState.FAILED_UNKNOWN_DEVICES - else -> SendState.UNDELIVERED - } - localEchoRepository.updateSendState(localEvent.eventId, sendState) - // always return success, or the chain will be stuck for ever! - val nextWorkerParams = SendEventWorker.Params( - sessionId = params.sessionId, - eventId = localEvent.eventId, - lastFailureMessage = error?.localizedMessage ?: "Error" - ) - return Result.success(WorkerParamsFactory.toData(nextWorkerParams)) - } - } - - override fun buildErrorParams(params: Params, message: String): Params { - return params.copy(lastFailureMessage = params.lastFailureMessage ?: message) - } -} diff --git a/matrix-sdk-android/src/main/java/org/matrix/android/sdk/internal/session/room/send/LocalEchoRepository.kt b/matrix-sdk-android/src/main/java/org/matrix/android/sdk/internal/session/room/send/LocalEchoRepository.kt index 9e1de291c4e..f4871ab35df 100644 --- a/matrix-sdk-android/src/main/java/org/matrix/android/sdk/internal/session/room/send/LocalEchoRepository.kt +++ b/matrix-sdk-android/src/main/java/org/matrix/android/sdk/internal/session/room/send/LocalEchoRepository.kt @@ -88,8 +88,9 @@ internal class LocalEchoRepository @Inject constructor(@SessionDatabase private } } - fun updateSendState(eventId: String, sendState: SendState) { + fun updateSendState(eventId: String, roomId: String?, sendState: SendState) { Timber.v("## SendEvent: [${System.currentTimeMillis()}] Update local state of $eventId to ${sendState.name}") + eventBus.post(DefaultTimeline.OnLocalEchoUpdated(roomId ?: "", eventId, sendState)) updateEchoAsync(eventId) { realm, sendingEventEntity -> if (sendState == SendState.SENT && sendingEventEntity.sendState == SendState.SYNCED) { // If already synced, do not put as sent @@ -137,6 +138,14 @@ internal class LocalEchoRepository @Inject constructor(@SessionDatabase private } } + fun deleteFailedEchoAsync(roomId: String, eventId: String?) { + monarchy.runTransactionSync { realm -> + TimelineEventEntity.where(realm, roomId = roomId, eventId = eventId ?: "").findFirst()?.deleteFromRealm() + EventEntity.where(realm, eventId = eventId ?: "").findFirst()?.deleteFromRealm() + roomSummaryUpdater.updateSendingInformation(realm, roomId) + } + } + suspend fun clearSendingQueue(roomId: String) { monarchy.awaitTransaction { realm -> TimelineEventEntity diff --git a/matrix-sdk-android/src/main/java/org/matrix/android/sdk/internal/session/room/send/MultipleEventSendingDispatcherWorker.kt b/matrix-sdk-android/src/main/java/org/matrix/android/sdk/internal/session/room/send/MultipleEventSendingDispatcherWorker.kt index ba69a8751bd..bc307bc74f7 100644 --- a/matrix-sdk-android/src/main/java/org/matrix/android/sdk/internal/session/room/send/MultipleEventSendingDispatcherWorker.kt +++ b/matrix-sdk-android/src/main/java/org/matrix/android/sdk/internal/session/room/send/MultipleEventSendingDispatcherWorker.kt @@ -17,7 +17,6 @@ package org.matrix.android.sdk.internal.session.room.send import android.content.Context -import androidx.work.BackoffPolicy import androidx.work.OneTimeWorkRequest import androidx.work.WorkerParameters import com.squareup.moshi.JsonClass @@ -31,7 +30,6 @@ import org.matrix.android.sdk.internal.worker.SessionWorkerParams import org.matrix.android.sdk.internal.worker.WorkerParamsFactory import org.matrix.android.sdk.internal.worker.startChain import timber.log.Timber -import java.util.concurrent.TimeUnit import javax.inject.Inject /** @@ -57,7 +55,7 @@ internal class MultipleEventSendingDispatcherWorker(context: Context, params: Wo override fun doOnError(params: Params): Result { params.localEchoIds.forEach { localEchoIds -> - localEchoRepository.updateSendState(localEchoIds.eventId, SendState.UNDELIVERED) + localEchoRepository.updateSendState(localEchoIds.eventId, localEchoIds.roomId, SendState.UNDELIVERED) } return super.doOnError(params) @@ -73,19 +71,10 @@ internal class MultipleEventSendingDispatcherWorker(context: Context, params: Wo params.localEchoIds.forEach { localEchoIds -> val roomId = localEchoIds.roomId val eventId = localEchoIds.eventId - if (params.isEncrypted) { - localEchoRepository.updateSendState(eventId, SendState.ENCRYPTING) - Timber.v("## SendEvent: [${System.currentTimeMillis()}] Schedule encrypt and send event $eventId") - val encryptWork = createEncryptEventWork(params.sessionId, eventId, true) - // Note that event will be replaced by the result of the previous work - val sendWork = createSendEventWork(params.sessionId, eventId, false) - timelineSendEventWorkCommon.postSequentialWorks(roomId, encryptWork, sendWork) - } else { - localEchoRepository.updateSendState(eventId, SendState.SENDING) + localEchoRepository.updateSendState(eventId, roomId, SendState.SENDING) Timber.v("## SendEvent: [${System.currentTimeMillis()}] Schedule send event $eventId") val sendWork = createSendEventWork(params.sessionId, eventId, true) timelineSendEventWorkCommon.postWork(roomId, sendWork) - } } return Result.success() @@ -95,18 +84,6 @@ internal class MultipleEventSendingDispatcherWorker(context: Context, params: Wo return params.copy(lastFailureMessage = params.lastFailureMessage ?: message) } - private fun createEncryptEventWork(sessionId: String, eventId: String, startChain: Boolean): OneTimeWorkRequest { - val params = EncryptEventWorker.Params(sessionId, eventId) - val sendWorkData = WorkerParamsFactory.toData(params) - - return workManagerProvider.matrixOneTimeWorkRequestBuilder() - .setConstraints(WorkManagerProvider.workConstraints) - .setInputData(sendWorkData) - .startChain(startChain) - .setBackoffCriteria(BackoffPolicy.LINEAR, WorkManagerProvider.BACKOFF_DELAY, TimeUnit.MILLISECONDS) - .build() - } - private fun createSendEventWork(sessionId: String, eventId: String, startChain: Boolean): OneTimeWorkRequest { val sendContentWorkerParams = SendEventWorker.Params(sessionId = sessionId, eventId = eventId) val sendWorkData = WorkerParamsFactory.toData(sendContentWorkerParams) diff --git a/matrix-sdk-android/src/main/java/org/matrix/android/sdk/internal/session/room/send/RoomEventSender.kt b/matrix-sdk-android/src/main/java/org/matrix/android/sdk/internal/session/room/send/RoomEventSender.kt deleted file mode 100644 index 8f783d74786..00000000000 --- a/matrix-sdk-android/src/main/java/org/matrix/android/sdk/internal/session/room/send/RoomEventSender.kt +++ /dev/null @@ -1,75 +0,0 @@ -/* - * Copyright 2020 The Matrix.org Foundation C.I.C. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.matrix.android.sdk.internal.session.room.send - -import androidx.work.BackoffPolicy -import androidx.work.OneTimeWorkRequest -import org.matrix.android.sdk.api.session.crypto.CryptoService -import org.matrix.android.sdk.api.session.events.model.Event -import org.matrix.android.sdk.api.util.Cancelable -import org.matrix.android.sdk.internal.di.SessionId -import org.matrix.android.sdk.internal.di.WorkManagerProvider -import org.matrix.android.sdk.internal.session.room.timeline.TimelineSendEventWorkCommon -import org.matrix.android.sdk.internal.worker.WorkerParamsFactory -import org.matrix.android.sdk.internal.worker.startChain -import timber.log.Timber -import java.util.concurrent.TimeUnit -import javax.inject.Inject - -internal class RoomEventSender @Inject constructor( - private val workManagerProvider: WorkManagerProvider, - private val timelineSendEventWorkCommon: TimelineSendEventWorkCommon, - @SessionId private val sessionId: String, - private val cryptoService: CryptoService -) { - fun sendEvent(event: Event): Cancelable { - // Encrypted room handling - return if (cryptoService.isRoomEncrypted(event.roomId ?: "") - && !event.isEncrypted() // In case of resend where it's already encrypted so skip to send - ) { - Timber.v("## SendEvent: [${System.currentTimeMillis()}] Schedule encrypt and send event ${event.eventId}") - val encryptWork = createEncryptEventWork(event, true) - // Note that event will be replaced by the result of the previous work - val sendWork = createSendEventWork(event, false) - timelineSendEventWorkCommon.postSequentialWorks(event.roomId ?: "", encryptWork, sendWork) - } else { - Timber.v("## SendEvent: [${System.currentTimeMillis()}] Schedule send event ${event.eventId}") - val sendWork = createSendEventWork(event, true) - timelineSendEventWorkCommon.postWork(event.roomId ?: "", sendWork) - } - } - - private fun createEncryptEventWork(event: Event, startChain: Boolean): OneTimeWorkRequest { - // Same parameter - val params = EncryptEventWorker.Params(sessionId, event.eventId!!) - val sendWorkData = WorkerParamsFactory.toData(params) - - return workManagerProvider.matrixOneTimeWorkRequestBuilder() - .setConstraints(WorkManagerProvider.workConstraints) - .setInputData(sendWorkData) - .startChain(startChain) - .setBackoffCriteria(BackoffPolicy.LINEAR, WorkManagerProvider.BACKOFF_DELAY, TimeUnit.MILLISECONDS) - .build() - } - - private fun createSendEventWork(event: Event, startChain: Boolean): OneTimeWorkRequest { - val sendContentWorkerParams = SendEventWorker.Params(sessionId = sessionId, eventId = event.eventId!!) - val sendWorkData = WorkerParamsFactory.toData(sendContentWorkerParams) - - return timelineSendEventWorkCommon.createWork(sendWorkData, startChain) - } -} diff --git a/matrix-sdk-android/src/main/java/org/matrix/android/sdk/internal/session/room/send/SendEventWorker.kt b/matrix-sdk-android/src/main/java/org/matrix/android/sdk/internal/session/room/send/SendEventWorker.kt index 0014213b3f6..2c835ff56c8 100644 --- a/matrix-sdk-android/src/main/java/org/matrix/android/sdk/internal/session/room/send/SendEventWorker.kt +++ b/matrix-sdk-android/src/main/java/org/matrix/android/sdk/internal/session/room/send/SendEventWorker.kt @@ -22,12 +22,11 @@ import com.squareup.moshi.JsonClass import io.realm.RealmConfiguration import org.greenrobot.eventbus.EventBus import org.matrix.android.sdk.api.failure.shouldBeRetried -import org.matrix.android.sdk.api.session.events.model.Content +import org.matrix.android.sdk.api.session.crypto.CryptoService import org.matrix.android.sdk.api.session.room.send.SendState +import org.matrix.android.sdk.internal.crypto.tasks.SendEventTask import org.matrix.android.sdk.internal.di.SessionDatabase -import org.matrix.android.sdk.internal.network.executeRequest import org.matrix.android.sdk.internal.session.SessionComponent -import org.matrix.android.sdk.internal.session.room.RoomAPI import org.matrix.android.sdk.internal.worker.SessionSafeCoroutineWorker import org.matrix.android.sdk.internal.worker.SessionWorkerParams import timber.log.Timber @@ -47,11 +46,14 @@ internal class SendEventWorker(context: Context, internal data class Params( override val sessionId: String, override val lastFailureMessage: String? = null, - val eventId: String + val eventId: String, + // use this as an override if you want to send in clear in encrypted room + val isEncrypted: Boolean? = null ) : SessionWorkerParams @Inject lateinit var localEchoRepository: LocalEchoRepository - @Inject lateinit var roomAPI: RoomAPI + @Inject lateinit var sendEventTask: SendEventTask + @Inject lateinit var cryptoService: CryptoService @Inject lateinit var eventBus: EventBus @Inject lateinit var cancelSendTracker: CancelSendTracker @SessionDatabase @Inject lateinit var realmConfiguration: RealmConfiguration @@ -63,7 +65,7 @@ internal class SendEventWorker(context: Context, override suspend fun doSafeWork(params: Params): Result { val event = localEchoRepository.getUpToDateEcho(params.eventId) if (event?.eventId == null || event.roomId == null) { - localEchoRepository.updateSendState(params.eventId, SendState.UNDELIVERED) + localEchoRepository.updateSendState(params.eventId, event?.roomId, SendState.UNDELIVERED) return Result.success() .also { Timber.e("Work cancelled due to bad input data") } } @@ -77,7 +79,7 @@ internal class SendEventWorker(context: Context, } if (params.lastFailureMessage != null) { - localEchoRepository.updateSendState(event.eventId, SendState.UNDELIVERED) + localEchoRepository.updateSendState(event.eventId, event.roomId, SendState.UNDELIVERED) // Transmit the error return Result.success(inputData) .also { Timber.e("Work cancelled due to input error from parent") } @@ -85,12 +87,12 @@ internal class SendEventWorker(context: Context, Timber.v("## SendEvent: [${System.currentTimeMillis()}] Send event ${params.eventId}") return try { - sendEvent(event.eventId, event.roomId, event.type, event.content) + sendEventTask.execute(SendEventTask.Params(event, params.isEncrypted ?: cryptoService.isRoomEncrypted(event.roomId), cryptoService)) Result.success() } catch (exception: Throwable) { if (/*currentAttemptCount >= MAX_NUMBER_OF_RETRY_BEFORE_FAILING ||**/ !exception.shouldBeRetried()) { Timber.e("## SendEvent: [${System.currentTimeMillis()}] Send event Failed cannot retry ${params.eventId} > ${exception.localizedMessage}") - localEchoRepository.updateSendState(event.eventId, SendState.UNDELIVERED) + localEchoRepository.updateSendState(event.eventId, event.roomId, SendState.UNDELIVERED) return Result.success() } else { Timber.e("## SendEvent: [${System.currentTimeMillis()}] Send event Failed schedule retry ${params.eventId} > ${exception.localizedMessage}") @@ -102,12 +104,4 @@ internal class SendEventWorker(context: Context, override fun buildErrorParams(params: Params, message: String): Params { return params.copy(lastFailureMessage = params.lastFailureMessage ?: message) } - - private suspend fun sendEvent(eventId: String, roomId: String, type: String, content: Content?) { - localEchoRepository.updateSendState(eventId, SendState.SENDING) - executeRequest(eventBus) { - apiCall = roomAPI.send(eventId, roomId, type, content) - } - localEchoRepository.updateSendState(eventId, SendState.SENT) - } } diff --git a/matrix-sdk-android/src/main/java/org/matrix/android/sdk/internal/session/room/send/queue/EventSenderProcessor.kt b/matrix-sdk-android/src/main/java/org/matrix/android/sdk/internal/session/room/send/queue/EventSenderProcessor.kt new file mode 100644 index 00000000000..62e225c624d --- /dev/null +++ b/matrix-sdk-android/src/main/java/org/matrix/android/sdk/internal/session/room/send/queue/EventSenderProcessor.kt @@ -0,0 +1,239 @@ +/* + * Copyright 2020 The Matrix.org Foundation C.I.C. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.matrix.android.sdk.internal.session.room.send.queue + +import kotlinx.coroutines.launch +import kotlinx.coroutines.runBlocking +import org.matrix.android.sdk.api.auth.data.SessionParams +import org.matrix.android.sdk.api.auth.data.sessionId +import org.matrix.android.sdk.api.extensions.tryOrNull +import org.matrix.android.sdk.api.failure.Failure +import org.matrix.android.sdk.api.failure.MatrixError +import org.matrix.android.sdk.api.failure.isTokenError +import org.matrix.android.sdk.api.session.crypto.CryptoService +import org.matrix.android.sdk.api.session.events.model.Event +import org.matrix.android.sdk.api.session.sync.SyncState +import org.matrix.android.sdk.api.util.Cancelable +import org.matrix.android.sdk.internal.session.SessionScope +import org.matrix.android.sdk.internal.task.TaskExecutor +import timber.log.Timber +import java.io.IOException +import java.net.InetAddress +import java.net.InetSocketAddress +import java.net.Socket +import java.util.Timer +import java.util.TimerTask +import java.util.concurrent.LinkedBlockingQueue +import javax.inject.Inject +import kotlin.concurrent.schedule + +/** + * A simple ever running thread unique for that session responsible of sending events in order. + * Each send is retried 3 times, if there is no network (e.g if cannot ping home server) it will wait and + * periodically test reachability before resume (does not count as a retry) + * + * If the app is killed before all event were sent, on next wakeup the scheduled events will be re posted + */ +@SessionScope +internal class EventSenderProcessor @Inject constructor( + private val cryptoService: CryptoService, + private val sessionParams: SessionParams, + private val queuedTaskFactory: QueuedTaskFactory, + private val taskExecutor: TaskExecutor, + private val memento: QueueMemento +) : Thread("SENDER_THREAD_SID_${sessionParams.credentials.sessionId()}") { + + private fun markAsManaged(task: QueuedTask) { + memento.track(task) + } + + private fun markAsFinished(task: QueuedTask) { + memento.unTrack(task) + } + + // API + fun postEvent(event: Event): Cancelable { + return postEvent(event, event.roomId?.let { cryptoService.isRoomEncrypted(it) } ?: false) + } + + override fun start() { + super.start() + // We should check for sending events not handled because app was killed + // But we should be careful of only took those that was submitted to us, because if it's + // for example it's a media event it is handled by some worker and he will handle it + // This is a bit fragile :/ + // also some events cannot be retried manually by users, e.g reactions + // they were previously relying on workers to do the work :/ and was expected to always finally succeed + // Also some echos are not to be resent like redaction echos (fake event created for aggregation) + + tryOrNull { + taskExecutor.executorScope.launch { + Timber.d("## Send relaunched pending events on restart") + memento.restoreTasks(this@EventSenderProcessor) + } + } + } + + fun postEvent(event: Event, encrypt: Boolean): Cancelable { + val task = queuedTaskFactory.createSendTask(event, encrypt) + return postTask(task) + } + + fun postRedaction(redactionLocalEcho: Event, reason: String?): Cancelable { + return postRedaction(redactionLocalEcho.eventId!!, redactionLocalEcho.redacts!!, redactionLocalEcho.roomId!!, reason) + } + + fun postRedaction(redactionLocalEchoId: String, eventToRedactId: String, roomId: String, reason: String?): Cancelable { + val task = queuedTaskFactory.createRedactTask(redactionLocalEchoId, eventToRedactId, roomId, reason) + return postTask(task) + } + + fun postTask(task: QueuedTask): Cancelable { + // non blocking add to queue + sendingQueue.add(task) + markAsManaged(task) + return object : Cancelable { + override fun cancel() { + task.cancel() + } + } + } + + companion object { + private const val RETRY_WAIT_TIME_MS = 10_000L + } + + private var sendingQueue = LinkedBlockingQueue() + + private var networkAvailableLock = Object() + private var canReachServer = true + private var retryNoNetworkTask: TimerTask? = null + + override fun run() { + Timber.v("## SendThread started ts:${System.currentTimeMillis()}") + try { + while (!isInterrupted) { + Timber.v("## SendThread wait for task to process") + val task = sendingQueue.take() + Timber.v("## SendThread Found task to process $task") + + if (task.isCancelled()) { + Timber.v("## SendThread send cancelled for $task") + // we do not execute this one + continue + } + // we check for network connectivity + while (!canReachServer) { + Timber.v("## SendThread cannot reach server, wait ts:${System.currentTimeMillis()}") + // schedule to retry + waitForNetwork() + // if thread as been killed meanwhile +// if (state == State.KILLING) break + } + Timber.v("## Server is Reachable") + // so network is available + + runBlocking { + retryLoop@ while (task.retryCount < 3) { + try { + // SendPerformanceProfiler.startStage(task.event.eventId!!, SendPerformanceProfiler.Stages.SEND_WORKER) + Timber.v("## SendThread retryLoop for $task retryCount ${task.retryCount}") + task.execute() + // sendEventTask.execute(SendEventTask.Params(task.event, task.encrypt, cryptoService)) + // SendPerformanceProfiler.stopStage(task.event.eventId, SendPerformanceProfiler.Stages.SEND_WORKER) + break@retryLoop + } catch (exception: Throwable) { + when { + exception is IOException || exception is Failure.NetworkConnection -> { + canReachServer = false + task.retryCount++ + if (task.retryCount >= 3) task.onTaskFailed() + while (!canReachServer) { + Timber.v("## SendThread retryLoop cannot reach server, wait ts:${System.currentTimeMillis()}") + // schedule to retry + waitForNetwork() + } + } + (exception is Failure.ServerError && exception.error.code == MatrixError.M_LIMIT_EXCEEDED) -> { + task.retryCount++ + if (task.retryCount >= 3) task.onTaskFailed() + Timber.v("## SendThread retryLoop retryable error for $task reason: ${exception.localizedMessage}") + // wait a bit + // Todo if its a quota exception can we get timout? + sleep(3_000) + continue@retryLoop + } + exception.isTokenError() -> { + Timber.v("## SendThread retryLoop retryable TOKEN error, interrupt") + // we can exit the loop + task.onTaskFailed() + throw InterruptedException() + } + else -> { + Timber.v("## SendThread retryLoop Un-Retryable error, try next task") + // this task is in error, check next one? + break@retryLoop + } + } + } + } + } + markAsFinished(task) + } + } catch (interruptionException: InterruptedException) { + // will be thrown is thread is interrupted while seeping + interrupt() + Timber.v("## InterruptedException!! ${interruptionException.localizedMessage}") + } +// state = State.KILLED + // is this needed? + retryNoNetworkTask?.cancel() + Timber.w("## SendThread finished ${System.currentTimeMillis()}") + } + + private fun waitForNetwork() { + retryNoNetworkTask = Timer(SyncState.NoNetwork.toString(), false).schedule(RETRY_WAIT_TIME_MS) { + synchronized(networkAvailableLock) { + canReachServer = checkHostAvailable().also { + Timber.v("## SendThread checkHostAvailable $it") + } + networkAvailableLock.notify() + } + } + synchronized(networkAvailableLock) { networkAvailableLock.wait() } + } + + /** + * Check if homeserver is reachable. + */ + private fun checkHostAvailable(): Boolean { + val host = sessionParams.homeServerConnectionConfig.homeServerUri.host ?: return false + val port = sessionParams.homeServerConnectionConfig.homeServerUri.port.takeIf { it != -1 } ?: 80 + val timeout = 30_000 + try { + Socket().use { socket -> + val inetAddress: InetAddress = InetAddress.getByName(host) + val inetSocketAddress = InetSocketAddress(inetAddress, port) + socket.connect(inetSocketAddress, timeout) + return true + } + } catch (e: IOException) { + Timber.v("## EventSender isHostAvailable failure ${e.localizedMessage}") + return false + } + } +} diff --git a/matrix-sdk-android/src/main/java/org/matrix/android/sdk/internal/session/room/send/queue/QueueMemento.kt b/matrix-sdk-android/src/main/java/org/matrix/android/sdk/internal/session/room/send/queue/QueueMemento.kt new file mode 100644 index 00000000000..e69c65ec4cb --- /dev/null +++ b/matrix-sdk-android/src/main/java/org/matrix/android/sdk/internal/session/room/send/queue/QueueMemento.kt @@ -0,0 +1,121 @@ +/* + * Copyright 2020 The Matrix.org Foundation C.I.C. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.matrix.android.sdk.internal.session.room.send.queue + +import android.content.Context +import org.matrix.android.sdk.api.auth.data.sessionId +import org.matrix.android.sdk.api.extensions.tryOrNull +import org.matrix.android.sdk.api.session.crypto.CryptoService +import org.matrix.android.sdk.api.session.room.send.SendState +import org.matrix.android.sdk.internal.di.SessionId +import org.matrix.android.sdk.internal.session.room.send.LocalEchoRepository +import timber.log.Timber +import javax.inject.Inject + +/** + * Simple lightweight persistence + * Don't want to go in DB due to current issues + * Will never manage lots of events, it simply uses sharedPreferences. + * It is just used to remember what events/localEchos was managed by the event sender in order to + * reschedule them (and only them) on next restart + */ +internal class QueueMemento @Inject constructor(context: Context, + @SessionId sessionId: String, + private val queuedTaskFactory: QueuedTaskFactory, + private val localEchoRepository: LocalEchoRepository, + private val cryptoService: CryptoService) { + + private val storage = context.getSharedPreferences("QueueMemento_$sessionId", Context.MODE_PRIVATE) + private val managedTaskInfos = mutableListOf() + + fun track(task: QueuedTask) { + synchronized(managedTaskInfos) { + managedTaskInfos.add(task) + persist() + } + } + + fun unTrack(task: QueuedTask) { + managedTaskInfos.remove(task) + persist() + } + + private fun persist() { + managedTaskInfos.mapIndexedNotNull { index, queuedTask -> + toTaskInfo(queuedTask, index)?.let { TaskInfo.map(it) } + }.toSet().let { set -> + storage.edit() + .putStringSet("ManagedBySender", set) + .apply() + } + } + + private fun toTaskInfo(task: QueuedTask, order: Int): TaskInfo? { + synchronized(managedTaskInfos) { + return when (task) { + is SendEventQueuedTask -> SendEventTaskInfo( + localEchoId = task.event.eventId ?: "", + encrypt = task.encrypt, + order = order + ) + is RedactQueuedTask -> RedactEventTaskInfo( + redactionLocalEcho = task.redactionLocalEchoId, + order = order + ) + else -> null + } + } + } + + suspend fun restoreTasks(eventProcessor: EventSenderProcessor) { + // events should be restarted in correct order + storage.getStringSet("ManagedBySender", null)?.let { pending -> + Timber.d("## Send - Recovering unsent events $pending") + pending.mapNotNull { tryOrNull { TaskInfo.map(it) } } + } + ?.sortedBy { it.order } + ?.forEach { info -> + try { + when (info) { + is SendEventTaskInfo -> { + localEchoRepository.getUpToDateEcho(info.localEchoId)?.let { + if (it.sendState.isSending() && it.eventId != null && it.roomId != null) { + localEchoRepository.updateSendState(it.eventId, it.roomId, SendState.UNSENT) + Timber.d("## Send -Reschedule send $info") + eventProcessor.postTask(queuedTaskFactory.createSendTask(it, info.encrypt ?: cryptoService.isRoomEncrypted(it.roomId))) + } + } + } + is RedactEventTaskInfo -> { + info.redactionLocalEcho?.let { localEchoRepository.getUpToDateEcho(it) }?.let { + localEchoRepository.updateSendState(it.eventId!!, it.roomId, SendState.UNSENT) + // try to get reason + val reason = it.content?.get("reason") as? String + if (it.redacts != null && it.roomId != null) { + Timber.d("## Send -Reschedule redact $info") + eventProcessor.postTask(queuedTaskFactory.createRedactTask(it.eventId, it.redacts, it.roomId, reason)) + } + } + // postTask(queuedTaskFactory.createRedactTask(info.eventToRedactId, info.) + } + } + } catch (failure: Throwable) { + Timber.e("failed to restore task $info") + } + } + } +} diff --git a/matrix-sdk-android/src/main/java/org/matrix/android/sdk/internal/session/room/send/queue/QueuedTask.kt b/matrix-sdk-android/src/main/java/org/matrix/android/sdk/internal/session/room/send/queue/QueuedTask.kt new file mode 100644 index 00000000000..bccbc97ff43 --- /dev/null +++ b/matrix-sdk-android/src/main/java/org/matrix/android/sdk/internal/session/room/send/queue/QueuedTask.kt @@ -0,0 +1,29 @@ +/* + * Copyright 2020 The Matrix.org Foundation C.I.C. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.matrix.android.sdk.internal.session.room.send.queue + +abstract class QueuedTask { + var retryCount = 0 + + abstract suspend fun execute() + + abstract fun onTaskFailed() + + abstract fun isCancelled() : Boolean + + abstract fun cancel() +} diff --git a/matrix-sdk-android/src/main/java/org/matrix/android/sdk/internal/session/room/send/queue/QueuedTaskFactory.kt b/matrix-sdk-android/src/main/java/org/matrix/android/sdk/internal/session/room/send/queue/QueuedTaskFactory.kt new file mode 100644 index 00000000000..90bb47c4350 --- /dev/null +++ b/matrix-sdk-android/src/main/java/org/matrix/android/sdk/internal/session/room/send/queue/QueuedTaskFactory.kt @@ -0,0 +1,57 @@ +/* + * Copyright 2020 The Matrix.org Foundation C.I.C. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.matrix.android.sdk.internal.session.room.send.queue + +import org.matrix.android.sdk.api.session.crypto.CryptoService +import org.matrix.android.sdk.api.session.events.model.Event +import org.matrix.android.sdk.internal.crypto.tasks.RedactEventTask +import org.matrix.android.sdk.internal.crypto.tasks.SendEventTask +import org.matrix.android.sdk.internal.session.room.send.CancelSendTracker +import org.matrix.android.sdk.internal.session.room.send.LocalEchoRepository +import javax.inject.Inject + +internal class QueuedTaskFactory @Inject constructor( + private val sendEventTask: SendEventTask, + private val cryptoService: CryptoService, + private val localEchoRepository: LocalEchoRepository, + private val redactEventTask: RedactEventTask, + private val cancelSendTracker: CancelSendTracker +) { + + fun createSendTask(event: Event, encrypt: Boolean): QueuedTask { + return SendEventQueuedTask( + event = event, + encrypt = encrypt, + cryptoService = cryptoService, + localEchoRepository = localEchoRepository, + sendEventTask = sendEventTask, + cancelSendTracker = cancelSendTracker + ) + } + + fun createRedactTask(redactionLocalEcho: String, eventId: String, roomId: String, reason: String?): QueuedTask { + return RedactQueuedTask( + redactionLocalEchoId = redactionLocalEcho, + toRedactEventId = eventId, + roomId = roomId, + reason = reason, + redactEventTask = redactEventTask, + localEchoRepository = localEchoRepository, + cancelSendTracker = cancelSendTracker + ) + } +} diff --git a/matrix-sdk-android/src/main/java/org/matrix/android/sdk/internal/session/room/send/queue/RedactQueuedTask.kt b/matrix-sdk-android/src/main/java/org/matrix/android/sdk/internal/session/room/send/queue/RedactQueuedTask.kt new file mode 100644 index 00000000000..a3c19a1f7ca --- /dev/null +++ b/matrix-sdk-android/src/main/java/org/matrix/android/sdk/internal/session/room/send/queue/RedactQueuedTask.kt @@ -0,0 +1,53 @@ +/* + * Copyright 2020 The Matrix.org Foundation C.I.C. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.matrix.android.sdk.internal.session.room.send.queue + +import org.matrix.android.sdk.api.session.room.send.SendState +import org.matrix.android.sdk.internal.crypto.tasks.RedactEventTask +import org.matrix.android.sdk.internal.session.room.send.CancelSendTracker +import org.matrix.android.sdk.internal.session.room.send.LocalEchoRepository + +internal class RedactQueuedTask( + val toRedactEventId: String, + val redactionLocalEchoId: String, + val roomId: String, + val reason: String?, + val redactEventTask: RedactEventTask, + val localEchoRepository: LocalEchoRepository, + val cancelSendTracker: CancelSendTracker +) : QueuedTask() { + + private var _isCancelled: Boolean = false + + override fun toString() = "[RedactEventRunnableTask $redactionLocalEchoId]" + + override suspend fun execute() { + redactEventTask.execute(RedactEventTask.Params(redactionLocalEchoId, roomId, toRedactEventId, reason)) + } + + override fun onTaskFailed() { + localEchoRepository.updateSendState(redactionLocalEchoId, roomId, SendState.UNDELIVERED) + } + + override fun isCancelled(): Boolean { + return _isCancelled || cancelSendTracker.isCancelRequestedFor(redactionLocalEchoId, roomId) + } + + override fun cancel() { + _isCancelled = true + } +} diff --git a/matrix-sdk-android/src/main/java/org/matrix/android/sdk/internal/session/room/send/queue/SendEventQueuedTask.kt b/matrix-sdk-android/src/main/java/org/matrix/android/sdk/internal/session/room/send/queue/SendEventQueuedTask.kt new file mode 100644 index 00000000000..09da0908f93 --- /dev/null +++ b/matrix-sdk-android/src/main/java/org/matrix/android/sdk/internal/session/room/send/queue/SendEventQueuedTask.kt @@ -0,0 +1,65 @@ +/* + * Copyright 2020 The Matrix.org Foundation C.I.C. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.matrix.android.sdk.internal.session.room.send.queue + +import org.matrix.android.sdk.api.session.crypto.CryptoService +import org.matrix.android.sdk.api.session.events.model.Event +import org.matrix.android.sdk.api.session.events.model.EventType +import org.matrix.android.sdk.api.session.room.send.SendState +import org.matrix.android.sdk.internal.crypto.tasks.SendEventTask +import org.matrix.android.sdk.internal.session.room.send.CancelSendTracker +import org.matrix.android.sdk.internal.session.room.send.LocalEchoRepository + +internal class SendEventQueuedTask( + val event: Event, + val encrypt: Boolean, + val sendEventTask: SendEventTask, + val cryptoService: CryptoService, + val localEchoRepository: LocalEchoRepository, + val cancelSendTracker: CancelSendTracker +) : QueuedTask() { + + private var _isCancelled: Boolean = false + + override fun toString() = "[SendEventRunnableTask ${event.eventId}]" + + override suspend fun execute() { + sendEventTask.execute(SendEventTask.Params(event, encrypt, cryptoService)) + } + + override fun onTaskFailed() { + when (event.getClearType()) { + EventType.REDACTION, + EventType.REACTION -> { + // we just delete? it will not be present on timeline and no ux to retry + localEchoRepository.deleteFailedEchoAsync(eventId = event.eventId, roomId = event.roomId ?: "") + // TODO update aggregation :/ or it will stay locally + } + else -> { + localEchoRepository.updateSendState(event.eventId!!, event.roomId, SendState.UNDELIVERED) + } + } + } + + override fun isCancelled(): Boolean { + return _isCancelled || cancelSendTracker.isCancelRequestedFor(event.eventId, event.roomId) + } + + override fun cancel() { + _isCancelled = true + } +} diff --git a/matrix-sdk-android/src/main/java/org/matrix/android/sdk/internal/session/room/send/queue/TaskInfo.kt b/matrix-sdk-android/src/main/java/org/matrix/android/sdk/internal/session/room/send/queue/TaskInfo.kt new file mode 100644 index 00000000000..87c6299c4d8 --- /dev/null +++ b/matrix-sdk-android/src/main/java/org/matrix/android/sdk/internal/session/room/send/queue/TaskInfo.kt @@ -0,0 +1,76 @@ +/* + * Copyright 2020 The Matrix.org Foundation C.I.C. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.matrix.android.sdk.internal.session.room.send.queue + +import com.squareup.moshi.Json +import com.squareup.moshi.JsonClass +import com.squareup.moshi.Moshi +import org.matrix.android.sdk.api.extensions.tryOrNull +import org.matrix.android.sdk.internal.di.SerializeNulls +import org.matrix.android.sdk.internal.network.parsing.RuntimeJsonAdapterFactory + +/** + * Info that need to be persisted by the sender thread + * With polymorphic moshi parsing + */ +internal interface TaskInfo { + val type: String + val order: Int + + companion object { + const val TYPE_UNKNOWN = "TYPE_UNKNOWN" + const val TYPE_SEND = "TYPE_SEND" + const val TYPE_REDACT = "TYPE_REDACT" + + val moshi = Moshi.Builder() + .add(RuntimeJsonAdapterFactory.of(TaskInfo::class.java, "type", FallbackTaskInfo::class.java) + .registerSubtype(SendEventTaskInfo::class.java, TYPE_SEND) + .registerSubtype(RedactEventTaskInfo::class.java, TYPE_REDACT) + ) + .add(SerializeNulls.JSON_ADAPTER_FACTORY) + .build() + + fun map(info: TaskInfo): String { + return moshi.adapter(TaskInfo::class.java).toJson(info) + } + + fun map(string: String): TaskInfo? { + return tryOrNull { moshi.adapter(TaskInfo::class.java).fromJson(string) } + } + } +} + +@JsonClass(generateAdapter = true) +internal data class SendEventTaskInfo( + @Json(name = "type") override val type: String = TaskInfo.TYPE_SEND, + @Json(name = "localEchoId") val localEchoId: String, + @Json(name = "encrypt") val encrypt: Boolean?, + @Json(name = "order") override val order: Int +) : TaskInfo + +@JsonClass(generateAdapter = true) +internal data class RedactEventTaskInfo( + @Json(name = "type") override val type: String = TaskInfo.TYPE_REDACT, + @Json(name = "redactionLocalEcho") val redactionLocalEcho: String?, + @Json(name = "order") override val order: Int +) : TaskInfo + +@JsonClass(generateAdapter = true) +internal data class FallbackTaskInfo( + @Json(name = "type") override val type: String = TaskInfo.TYPE_REDACT, + @Json(name = "order") override val order: Int +) : TaskInfo diff --git a/matrix-sdk-android/src/main/java/org/matrix/android/sdk/internal/session/room/timeline/DefaultTimeline.kt b/matrix-sdk-android/src/main/java/org/matrix/android/sdk/internal/session/room/timeline/DefaultTimeline.kt index 9178759bcc1..1e5c3600d33 100644 --- a/matrix-sdk-android/src/main/java/org/matrix/android/sdk/internal/session/room/timeline/DefaultTimeline.kt +++ b/matrix-sdk-android/src/main/java/org/matrix/android/sdk/internal/session/room/timeline/DefaultTimeline.kt @@ -32,8 +32,11 @@ import org.matrix.android.sdk.api.extensions.tryOrNull import org.matrix.android.sdk.api.session.events.model.EventType import org.matrix.android.sdk.api.session.events.model.RelationType import org.matrix.android.sdk.api.session.events.model.toModel +import org.matrix.android.sdk.api.session.room.model.EventAnnotationsSummary +import org.matrix.android.sdk.api.session.room.model.ReactionAggregatedSummary import org.matrix.android.sdk.api.session.room.model.ReadReceipt import org.matrix.android.sdk.api.session.room.model.message.MessageContent +import org.matrix.android.sdk.api.session.room.model.relation.ReactionContent import org.matrix.android.sdk.api.session.room.send.SendState import org.matrix.android.sdk.api.session.room.timeline.Timeline import org.matrix.android.sdk.api.session.room.timeline.TimelineEvent @@ -83,6 +86,7 @@ internal class DefaultTimeline( data class OnNewTimelineEvents(val roomId: String, val eventIds: List) data class OnLocalEchoCreated(val roomId: String, val timelineEvent: TimelineEvent) + data class OnLocalEchoUpdated(val roomId: String, val eventId: String, val sendState: SendState) companion object { val BACKGROUND_HANDLER = createBackgroundHandler("TIMELINE_DB_THREAD") @@ -102,7 +106,9 @@ internal class DefaultTimeline( private var prevDisplayIndex: Int? = null private var nextDisplayIndex: Int? = null - private val inMemorySendingEvents = Collections.synchronizedList(ArrayList()) + + private val uiEchoManager = UIEchoManager() + private val builtEvents = Collections.synchronizedList(ArrayList()) private val builtEventsIdMap = Collections.synchronizedMap(HashMap()) private val backwardsState = AtomicReference(State()) @@ -163,10 +169,7 @@ internal class DefaultTimeline( sendingEvents = roomEntity.sendingTimelineEvents.where().filterEventsWithSettings().findAll() sendingEvents.addChangeListener { events -> - // Remove in memory as soon as they are known by database - events.forEach { te -> - inMemorySendingEvents.removeAll { te.eventId == it.eventId } - } + uiEchoManager.sentEventsUpdated(events) postSnapshot() } nonFilteredEvents = buildEventQuery(realm).sort(TimelineEventEntityFields.DISPLAY_INDEX, Sort.DESCENDING).findAll() @@ -318,16 +321,15 @@ internal class DefaultTimeline( @Subscribe(threadMode = ThreadMode.MAIN) fun onLocalEchoCreated(onLocalEchoCreated: OnLocalEchoCreated) { - if (isLive && onLocalEchoCreated.roomId == roomId) { - // do not add events that would have been filtered - if (listOf(onLocalEchoCreated.timelineEvent).filterEventsWithSettings().isNotEmpty()) { - listeners.forEach { - it.onNewTimelineEvents(listOf(onLocalEchoCreated.timelineEvent.eventId)) - } - Timber.v("On local echo created: ${onLocalEchoCreated.timelineEvent.eventId}") - inMemorySendingEvents.add(0, onLocalEchoCreated.timelineEvent) - postSnapshot() - } + if (uiEchoManager.onLocalEchoCreated(onLocalEchoCreated)) { + postSnapshot() + } + } + + @Subscribe(threadMode = ThreadMode.MAIN) + fun onLocalEchoUpdated(onLocalEchoUpdated: OnLocalEchoUpdated) { + if (uiEchoManager.onLocalEchoUpdated(onLocalEchoUpdated)) { + postSnapshot() } } @@ -407,10 +409,12 @@ internal class DefaultTimeline( private fun buildSendingEvents(): List { val builtSendingEvents = ArrayList() if (hasReachedEnd(Timeline.Direction.FORWARDS) && !hasMoreInCache(Timeline.Direction.FORWARDS)) { - builtSendingEvents.addAll(inMemorySendingEvents.filterEventsWithSettings()) + builtSendingEvents.addAll(uiEchoManager.getInMemorySendingEvents().filterEventsWithSettings()) sendingEvents.forEach { timelineEventEntity -> if (builtSendingEvents.find { it.eventId == timelineEventEntity.eventId } == null) { - builtSendingEvents.add(timelineEventMapper.map(timelineEventEntity)) + val element = timelineEventMapper.map(timelineEventEntity) + uiEchoManager.updateSentStateWithUiEcho(element) + builtSendingEvents.add(element) } } } @@ -622,10 +626,7 @@ internal class DefaultTimeline( val timelineEvent = buildTimelineEvent(eventEntity) val transactionId = timelineEvent.root.unsignedData?.transactionId - val sendingEvent = inMemorySendingEvents.find { - it.eventId == transactionId - } - inMemorySendingEvents.remove(sendingEvent) + uiEchoManager.onSyncedEvent(transactionId) if (timelineEvent.isEncrypted() && timelineEvent.root.mxDecryptionResult == null) { @@ -649,7 +650,10 @@ internal class DefaultTimeline( timelineEventEntity = eventEntity, buildReadReceipts = settings.buildReadReceipts, correctedReadReceipts = hiddenReadReceipts.correctedReadReceipts(eventEntity.eventId) - ) + ).let { + // eventually enhance with ui echo? + (uiEchoManager.decorateEventWithReactionUiEcho(it) ?: it) + } /** * This has to be called on TimelineThread as it accesses realm live results @@ -797,4 +801,155 @@ internal class DefaultTimeline( val isPaginating: Boolean = false, val requestedPaginationCount: Int = 0 ) + + private data class ReactionUiEchoData( + val localEchoId: String, + val reactedOnEventId: String, + val reaction: String + ) + + inner class UIEchoManager { + + private val inMemorySendingEvents = Collections.synchronizedList(ArrayList()) + + fun getInMemorySendingEvents(): List { + return inMemorySendingEvents + } + + /** + * Due to lag of DB updates, we keep some UI echo of some properties to update timeline faster + */ + private val inMemorySendingStates = Collections.synchronizedMap(HashMap()) + + private val inMemoryReactions = Collections.synchronizedMap>(HashMap()) + + fun sentEventsUpdated(events: RealmResults) { + // Remove in memory as soon as they are known by database + events.forEach { te -> + inMemorySendingEvents.removeAll { te.eventId == it.eventId } + } + inMemorySendingStates.keys.removeAll { key -> + events.find { it.eventId == key } == null + } + inMemoryReactions.keys.removeAll { key -> + events.find { it.eventId == key } == null + } + } + + fun onLocalEchoUpdated(onLocalEchoUpdated: OnLocalEchoUpdated): Boolean { + if (isLive && onLocalEchoUpdated.roomId == roomId) { + val existingState = inMemorySendingStates[onLocalEchoUpdated.eventId] + inMemorySendingStates[onLocalEchoUpdated.eventId] = onLocalEchoUpdated.sendState + if (existingState != onLocalEchoUpdated.sendState) { + return true + } + } + return false + } + + // return true if should update + fun onLocalEchoCreated(onLocalEchoCreated: OnLocalEchoCreated): Boolean { + var postSnapshot = false + if (isLive && onLocalEchoCreated.roomId == roomId) { + // Manage some ui echos (do it before filter because actual event could be filtered out) + when (onLocalEchoCreated.timelineEvent.root.getClearType()) { + EventType.REDACTION -> { + } + EventType.REACTION -> { + val content = onLocalEchoCreated.timelineEvent.root.content?.toModel() + if (RelationType.ANNOTATION == content?.relatesTo?.type) { + val reaction = content.relatesTo.key + val relatedEventID = content.relatesTo.eventId + inMemoryReactions.getOrPut(relatedEventID) { mutableListOf() } + .add( + ReactionUiEchoData( + localEchoId = onLocalEchoCreated.timelineEvent.eventId, + reactedOnEventId = relatedEventID, + reaction = reaction + ) + ) + postSnapshot = rebuildEvent(relatedEventID) { + decorateEventWithReactionUiEcho(it) + } || postSnapshot + } + } + } + + // do not add events that would have been filtered + if (listOf(onLocalEchoCreated.timelineEvent).filterEventsWithSettings().isNotEmpty()) { + listeners.forEach { + it.onNewTimelineEvents(listOf(onLocalEchoCreated.timelineEvent.eventId)) + } + Timber.v("On local echo created: ${onLocalEchoCreated.timelineEvent.eventId}") + inMemorySendingEvents.add(0, onLocalEchoCreated.timelineEvent) + postSnapshot = true + } + } + return postSnapshot + } + + fun decorateEventWithReactionUiEcho(timelineEvent: TimelineEvent): TimelineEvent? { + val relatedEventID = timelineEvent.eventId + val contents = inMemoryReactions[relatedEventID] ?: return null + + var existingAnnotationSummary = timelineEvent.annotations ?: EventAnnotationsSummary( + relatedEventID + ) + val updateReactions = existingAnnotationSummary.reactionsSummary.toMutableList() + contents.forEach { uiEchoReaction -> + val existing = updateReactions.firstOrNull { it.key == uiEchoReaction.reaction } + if (existing == null) { + // just add the new key + ReactionAggregatedSummary( + key = uiEchoReaction.reaction, + count = 1, + addedByMe = true, + firstTimestamp = System.currentTimeMillis(), + sourceEvents = emptyList(), + localEchoEvents = listOf(uiEchoReaction.localEchoId) + ).let { updateReactions.add(it) } + } else { + // update Existing Key + if (!existing.localEchoEvents.contains(uiEchoReaction.localEchoId)) { + updateReactions.remove(existing) + // only update if echo is not yet there + ReactionAggregatedSummary( + key = existing.key, + count = existing.count + 1, + addedByMe = true, + firstTimestamp = existing.firstTimestamp, + sourceEvents = existing.sourceEvents, + localEchoEvents = existing.localEchoEvents + uiEchoReaction.localEchoId + + ).let { updateReactions.add(it) } + } + } + } + + existingAnnotationSummary = existingAnnotationSummary.copy( + reactionsSummary = updateReactions + ) + return timelineEvent.copy( + annotations = existingAnnotationSummary + ) + } + + fun updateSentStateWithUiEcho(element: TimelineEvent) { + inMemorySendingStates[element.eventId]?.let { + // Timber.v("## ${System.currentTimeMillis()} Send event refresh echo with live state ${it} from state ${element.root.sendState}") + element.root.sendState = element.root.sendState.takeIf { it == SendState.SENT } ?: it + } + } + + fun onSyncedEvent(transactionId: String?) { + val sendingEvent = inMemorySendingEvents.find { + it.eventId == transactionId + } + inMemorySendingEvents.remove(sendingEvent) + // Is it too early to clear it? will be done when removed from sending anyway? + inMemoryReactions.forEach { (_, u) -> + u.filterNot { it.localEchoId == transactionId } + } + } + } } diff --git a/matrix-sdk-android/src/main/java/org/matrix/android/sdk/internal/session/room/timeline/TimelineSendEventWorkCommon.kt b/matrix-sdk-android/src/main/java/org/matrix/android/sdk/internal/session/room/timeline/TimelineSendEventWorkCommon.kt index e30d1b5b440..bfd4e22cc22 100644 --- a/matrix-sdk-android/src/main/java/org/matrix/android/sdk/internal/session/room/timeline/TimelineSendEventWorkCommon.kt +++ b/matrix-sdk-android/src/main/java/org/matrix/android/sdk/internal/session/room/timeline/TimelineSendEventWorkCommon.kt @@ -21,7 +21,6 @@ import androidx.work.ExistingWorkPolicy import androidx.work.ListenableWorker import androidx.work.OneTimeWorkRequest import org.matrix.android.sdk.api.util.Cancelable -import org.matrix.android.sdk.api.util.NoOpCancellable import org.matrix.android.sdk.internal.di.WorkManagerProvider import org.matrix.android.sdk.internal.util.CancelableWork import org.matrix.android.sdk.internal.worker.startChain @@ -38,24 +37,6 @@ internal class TimelineSendEventWorkCommon @Inject constructor( private val workManagerProvider: WorkManagerProvider ) { - fun postSequentialWorks(roomId: String, vararg workRequests: OneTimeWorkRequest): Cancelable { - return when { - workRequests.isEmpty() -> NoOpCancellable - workRequests.size == 1 -> postWork(roomId, workRequests.first()) - else -> { - val firstWork = workRequests.first() - var continuation = workManagerProvider.workManager - .beginUniqueWork(buildWorkName(roomId), ExistingWorkPolicy.APPEND, firstWork) - for (i in 1 until workRequests.size) { - val workRequest = workRequests[i] - continuation = continuation.then(workRequest) - } - continuation.enqueue() - CancelableWork(workManagerProvider.workManager, firstWork.id) - } - } - } - fun postWork(roomId: String, workRequest: OneTimeWorkRequest, policy: ExistingWorkPolicy = ExistingWorkPolicy.APPEND_OR_REPLACE): Cancelable { workManagerProvider.workManager .beginUniqueWork(buildWorkName(roomId), policy, workRequest) @@ -77,11 +58,6 @@ internal class TimelineSendEventWorkCommon @Inject constructor( return "${roomId}_$SEND_WORK" } - fun cancelAllWorks(roomId: String) { - workManagerProvider.workManager - .cancelUniqueWork(buildWorkName(roomId)) - } - companion object { private const val SEND_WORK = "SEND_WORK" private const val BACKOFF_DELAY = 10_000L diff --git a/vector/src/main/java/im/vector/app/features/home/room/detail/RoomDetailAction.kt b/vector/src/main/java/im/vector/app/features/home/room/detail/RoomDetailAction.kt index 88eb1b51099..912967138ba 100644 --- a/vector/src/main/java/im/vector/app/features/home/room/detail/RoomDetailAction.kt +++ b/vector/src/main/java/im/vector/app/features/home/room/detail/RoomDetailAction.kt @@ -68,7 +68,6 @@ sealed class RoomDetailAction : VectorViewModelAction { data class IgnoreUser(val userId: String?) : RoomDetailAction() - object ClearSendQueue : RoomDetailAction() object ResendAll : RoomDetailAction() data class StartCall(val isVideo: Boolean) : RoomDetailAction() object EndCall : RoomDetailAction() diff --git a/vector/src/main/java/im/vector/app/features/home/room/detail/RoomDetailFragment.kt b/vector/src/main/java/im/vector/app/features/home/room/detail/RoomDetailFragment.kt index 92c3499e5ec..13ffb7b9bb4 100644 --- a/vector/src/main/java/im/vector/app/features/home/room/detail/RoomDetailFragment.kt +++ b/vector/src/main/java/im/vector/app/features/home/room/detail/RoomDetailFragment.kt @@ -645,13 +645,6 @@ class RoomDetailFragment @Inject constructor( override fun onOptionsItemSelected(item: MenuItem): Boolean { return when (item.itemId) { - R.id.clear_message_queue -> { - // This a temporary option during dev as it is not super stable - // Cancel all pending actions in room queue and post a dummy - // Then mark all sending events as undelivered - roomDetailViewModel.handle(RoomDetailAction.ClearSendQueue) - true - } R.id.invite -> { navigator.openInviteUsersToRoom(requireActivity(), roomDetailArgs.roomId) true diff --git a/vector/src/main/java/im/vector/app/features/home/room/detail/RoomDetailViewModel.kt b/vector/src/main/java/im/vector/app/features/home/room/detail/RoomDetailViewModel.kt index 19cc27a5107..e721e0948d1 100644 --- a/vector/src/main/java/im/vector/app/features/home/room/detail/RoomDetailViewModel.kt +++ b/vector/src/main/java/im/vector/app/features/home/room/detail/RoomDetailViewModel.kt @@ -251,7 +251,6 @@ class RoomDetailViewModel @AssistedInject constructor( is RoomDetailAction.HandleTombstoneEvent -> handleTombstoneEvent(action) is RoomDetailAction.ResendMessage -> handleResendEvent(action) is RoomDetailAction.RemoveFailedEcho -> handleRemove(action) - is RoomDetailAction.ClearSendQueue -> handleClearSendQueue() is RoomDetailAction.ResendAll -> handleResendAll() is RoomDetailAction.MarkAllAsRead -> handleMarkAllAsRead() is RoomDetailAction.ReportContent -> handleReportContent(action) @@ -542,9 +541,6 @@ class RoomDetailViewModel @AssistedInject constructor( return@withState false } when (itemId) { - R.id.clear_message_queue -> - // For now always disable when not in developer mode, worker cancellation is not working properly - timeline.pendingEventCount() > 0 && vectorPreferences.developerMode() R.id.resend_all -> state.asyncRoomSummary()?.hasFailedSending == true R.id.timeline_setting -> true R.id.invite -> state.canInvite @@ -1065,10 +1061,6 @@ class RoomDetailViewModel @AssistedInject constructor( } } - private fun handleClearSendQueue() { - room.clearSendingQueue() - } - private fun handleResendAll() { room.resendAllFailedMessages() } diff --git a/vector/src/main/java/im/vector/app/features/home/room/detail/timeline/item/MessageTextItem.kt b/vector/src/main/java/im/vector/app/features/home/room/detail/timeline/item/MessageTextItem.kt index 791a3c388ff..feba62dea3d 100644 --- a/vector/src/main/java/im/vector/app/features/home/room/detail/timeline/item/MessageTextItem.kt +++ b/vector/src/main/java/im/vector/app/features/home/room/detail/timeline/item/MessageTextItem.kt @@ -41,23 +41,27 @@ abstract class MessageTextItem : AbsMessageItem() { var movementMethod: MovementMethod? = null override fun bind(holder: Holder) { - super.bind(holder) - holder.messageView.movementMethod = movementMethod if (useBigFont) { holder.messageView.textSize = 44F } else { holder.messageView.textSize = 14F } - renderSendState(holder.messageView, holder.messageView) - holder.messageView.setOnClickListener(attributes.itemClickListener) - holder.messageView.setOnLongClickListener(attributes.itemLongClickListener) if (searchForPills) { - message?.findPillsAndProcess(coroutineScope) { it.bind(holder.messageView) } + message?.findPillsAndProcess(coroutineScope) { + // mmm.. not sure this is so safe in regards to cell reuse + it.bind(holder.messageView) + } } val textFuture = PrecomputedTextCompat.getTextFuture( message ?: "", TextViewCompat.getTextMetricsParams(holder.messageView), null) + super.bind(holder) + holder.messageView.movementMethod = movementMethod + + renderSendState(holder.messageView, holder.messageView) + holder.messageView.setOnClickListener(attributes.itemClickListener) + holder.messageView.setOnLongClickListener(attributes.itemLongClickListener) holder.messageView.setTextFuture(textFuture) } diff --git a/vector/src/main/res/menu/menu_timeline.xml b/vector/src/main/res/menu/menu_timeline.xml index 86ffe1f7248..72b4a00682e 100644 --- a/vector/src/main/res/menu/menu_timeline.xml +++ b/vector/src/main/res/menu/menu_timeline.xml @@ -68,11 +68,4 @@ app:showAsAction="never" tools:visible="true" /> - - \ No newline at end of file