Skip to content
This repository has been archived by the owner on Aug 12, 2024. It is now read-only.

Commit

Permalink
Shamrock: Reusable and restrictive coroutine context
Browse files Browse the repository at this point in the history
Signed-off-by: 白池 <[email protected]>
  • Loading branch information
whitechi73 committed Feb 21, 2024
1 parent 18126b1 commit c940aea
Show file tree
Hide file tree
Showing 7 changed files with 79 additions and 63 deletions.
17 changes: 12 additions & 5 deletions xposed/src/main/java/moe/fuqiuluo/qqinterface/servlet/BaseSvc.kt
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,10 @@ import io.ktor.utils.io.core.BytePacketBuilder
import io.ktor.utils.io.core.readBytes
import io.ktor.utils.io.core.writeFully
import io.ktor.utils.io.core.writeInt
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.DelicateCoroutinesApi
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.GlobalScope
import kotlinx.coroutines.ExperimentalCoroutinesApi
import kotlinx.coroutines.launch
import kotlinx.coroutines.suspendCancellableCoroutine
import kotlinx.coroutines.withTimeoutOrNull
Expand All @@ -28,10 +29,11 @@ import moe.fuqiuluo.shamrock.xposed.helper.internal.IPCRequest
import protobuf.oidb.TrpcOidb
import mqq.app.MobileQQ
import tencent.im.oidb.oidb_sso
import kotlin.coroutines.CoroutineContext
import kotlin.coroutines.resume

internal abstract class BaseSvc {
companion object {
companion object Default: CoroutineScope {
val currentUin: String
get() = app.currentAccountUin

Expand All @@ -46,7 +48,7 @@ internal abstract class BaseSvc {
val seq = MsfCore.getNextSeq()
val buffer = withTimeoutOrNull(timeout) {
suspendCancellableCoroutine { continuation ->
GlobalScope.launch(Dispatchers.Default) {
launch(Dispatchers.Default) {
DynamicReceiver.register(IPCRequest(cmd, seq) {
val buffer = it.getByteArrayExtra("buffer")!!
continuation.resume(buffer)
Expand Down Expand Up @@ -75,7 +77,7 @@ internal abstract class BaseSvc {
val seq = MsfCore.getNextSeq()
val buffer = withTimeoutOrNull<ByteArray?>(timeout) {
suspendCancellableCoroutine { continuation ->
GlobalScope.launch(Dispatchers.Default) {
launch(Dispatchers.Default) {
DynamicReceiver.register(IPCRequest(cmd, seq) {
val buffer = it.getByteArrayExtra("buffer")!!
continuation.resume(buffer)
Expand Down Expand Up @@ -143,6 +145,11 @@ internal abstract class BaseSvc {
toServiceMsg.addAttribute("shamrock_seq", seq)
app.sendToService(toServiceMsg)
}

@OptIn(ExperimentalCoroutinesApi::class)
override val coroutineContext: CoroutineContext by lazy {
Dispatchers.IO.limitedParallelism(12)
}
}

protected fun send(toServiceMsg: ToServiceMsg) {
Expand All @@ -153,7 +160,7 @@ internal abstract class BaseSvc {
val seq = MsfCore.getNextSeq()
val buffer = withTimeoutOrNull<ByteArray?>(timeout) {
suspendCancellableCoroutine { continuation ->
GlobalScope.launch(Dispatchers.Default) {
launch(Dispatchers.Default) {
DynamicReceiver.register(IPCRequest(toServiceMsg.serviceCmd, seq) {
val buffer = it.getByteArrayExtra("buffer")!!
continuation.resume(buffer)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,15 +1,14 @@
@file:OptIn(DelicateCoroutinesApi::class)

package moe.fuqiuluo.shamrock.remote.service

import kotlinx.coroutines.DelicateCoroutinesApi
import kotlinx.coroutines.GlobalScope
import kotlinx.coroutines.Job
import kotlinx.coroutines.launch
import moe.fuqiuluo.shamrock.remote.service.api.WebSocketClientServlet
import moe.fuqiuluo.shamrock.helper.Level
import moe.fuqiuluo.shamrock.helper.LogCenter
import moe.fuqiuluo.shamrock.remote.service.api.GlobalEventTransmitter
import moe.fuqiuluo.shamrock.remote.service.api.GlobalEventTransmitter.onMessageEvent
import moe.fuqiuluo.shamrock.remote.service.api.GlobalEventTransmitter.onNoticeEvent
import moe.fuqiuluo.shamrock.remote.service.api.GlobalEventTransmitter.onRequestEvent

internal class WebSocketClientService(
override val address: String,
Expand All @@ -27,18 +26,18 @@ internal class WebSocketClientService(
}

override fun init() {
subscribe(GlobalScope.launch {
GlobalEventTransmitter.onMessageEvent { (_, event) ->
subscribe(launch {
onMessageEvent { (_, event) ->
pushTo(event)
}
})
subscribe(GlobalScope.launch {
GlobalEventTransmitter.onNoticeEvent { event ->
subscribe(launch {
onNoticeEvent { event ->
pushTo(event)
}
})
subscribe(GlobalScope.launch {
GlobalEventTransmitter.onRequestEvent { event ->
subscribe(launch {
onRequestEvent { event ->
pushTo(event)
}
})
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
package moe.fuqiuluo.shamrock.remote.service

import kotlinx.coroutines.DelicateCoroutinesApi
import kotlinx.coroutines.GlobalScope
import kotlinx.coroutines.Job
import kotlinx.coroutines.launch
import moe.fuqiuluo.shamrock.helper.ErrorTokenException
Expand All @@ -15,7 +14,9 @@ import moe.fuqiuluo.shamrock.remote.service.data.push.*
import moe.fuqiuluo.shamrock.tools.ifNullOrEmpty
import moe.fuqiuluo.shamrock.helper.Level
import moe.fuqiuluo.shamrock.helper.LogCenter
import moe.fuqiuluo.shamrock.remote.service.api.GlobalEventTransmitter
import moe.fuqiuluo.shamrock.remote.service.api.GlobalEventTransmitter.onMessageEvent
import moe.fuqiuluo.shamrock.remote.service.api.GlobalEventTransmitter.onNoticeEvent
import moe.fuqiuluo.shamrock.remote.service.api.GlobalEventTransmitter.onRequestEvent
import moe.fuqiuluo.shamrock.xposed.helper.AppRuntimeFetcher
import org.java_websocket.WebSocket
import org.java_websocket.handshake.ClientHandshake
Expand All @@ -33,20 +34,14 @@ internal class WebSocketService(
}

override fun init() {
subscribe(GlobalScope.launch {
GlobalEventTransmitter.onMessageEvent { (_, event) ->
pushTo(event)
}
subscribe(launch {
onMessageEvent { (_, event) -> pushTo(event) }
})
subscribe(GlobalScope.launch {
GlobalEventTransmitter.onNoticeEvent { event ->
pushTo(event)
}
subscribe(launch {
onNoticeEvent { event -> pushTo(event) }
})
subscribe(GlobalScope.launch {
GlobalEventTransmitter.onRequestEvent { event ->
pushTo(event)
}
subscribe(launch {
onRequestEvent { event -> pushTo(event) }
})
LogCenter.log("WebSocketService: 初始化服务", Level.WARN)
}
Expand Down Expand Up @@ -86,7 +81,7 @@ internal class WebSocketService(
}

private fun pushMetaLifecycle() {
GlobalScope.launch {
launch {
val runtime = AppRuntimeFetcher.appRuntime
pushTo(PushMetaEvent(
time = System.currentTimeMillis() / 1000,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,12 @@

package moe.fuqiuluo.shamrock.remote.service.api

import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.DelicateCoroutinesApi
import kotlinx.coroutines.GlobalScope
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.ExperimentalCoroutinesApi
import kotlinx.coroutines.cancel
import kotlinx.coroutines.launch
import kotlinx.coroutines.sync.Mutex
import kotlinx.coroutines.sync.withLock
import kotlinx.serialization.encodeToString
import kotlinx.serialization.json.Json
import moe.fuqiuluo.shamrock.remote.action.ActionManager
Expand All @@ -30,27 +31,27 @@ import org.java_websocket.handshake.ServerHandshake
import java.lang.Exception
import java.net.URI
import kotlin.concurrent.timer
import kotlin.coroutines.CoroutineContext

internal abstract class WebSocketClientServlet(
private val url: String,
private val heartbeatInterval: Long,
private val wsHeaders: Map<String, String>
) : BaseTransmitServlet, WebSocketClient(URI(url), wsHeaders) {
) : BaseTransmitServlet, WebSocketClient(URI(url), wsHeaders), CoroutineScope {
init {
if (connectedClients.containsKey(url)) {
throw RuntimeException("WebSocketClient已存在: $url")
}
}

private var firstOpen = true
private val sendLock = Mutex()

override fun transmitAccess(): Boolean {
return ShamrockConfig.openWebSocketClient()
}

override fun onMessage(message: String) {
GlobalScope.launch {
launch {
handleMessage(message)
}
}
Expand Down Expand Up @@ -85,7 +86,6 @@ internal abstract class WebSocketClientServlet(

connectedClients[url] = this

//startHeartbeatTimer()
pushMetaLifecycle()
if (firstOpen) {
firstOpen = false
Expand All @@ -106,21 +106,21 @@ internal abstract class WebSocketClientServlet(
}
LogCenter.log("WebSocketClient onClose: $code, $reason, $remote")
unsubscribe()
coroutineContext.cancel()
connectedClients.remove(url)
}

override fun onError(ex: Exception?) {
LogCenter.log("WebSocketClient onError: ${ex?.message}")
unsubscribe()
coroutineContext.cancel()
connectedClients.remove(url)
}

protected suspend inline fun <reified T> pushTo(body: T) {
if (!transmitAccess() || isClosed || isClosing) return
try {
sendLock.withLock {
send(GlobalJson.encodeToString(body))
}
send(GlobalJson.encodeToString(body))
} catch (e: Throwable) {
LogCenter.log("被动WS推送失败: ${e.stackTraceToString()}", Level.ERROR)
}
Expand All @@ -142,8 +142,7 @@ internal abstract class WebSocketClientServlet(
}
val runtime = AppRuntimeFetcher.appRuntime
LogCenter.log("WebSocketClient心跳: ${app.longAccountUin}", Level.DEBUG)
send(
GlobalJson.encodeToString(
send(GlobalJson.encodeToString(
PushMetaEvent(
time = System.currentTimeMillis() / 1000,
selfId = app.longAccountUin,
Expand All @@ -164,7 +163,7 @@ internal abstract class WebSocketClientServlet(
}

private fun pushMetaLifecycle() {
GlobalScope.launch {
launch {
val runtime = AppRuntimeFetcher.appRuntime
val curUin = runtime.currentAccountUin
pushTo(
Expand All @@ -183,6 +182,10 @@ internal abstract class WebSocketClientServlet(
}
}

@OptIn(ExperimentalCoroutinesApi::class)
override val coroutineContext: CoroutineContext =
Dispatchers.IO.limitedParallelism(20)

companion object {
private val connectedClients = mutableMapOf<String, WebSocketClientServlet>()
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,12 +1,13 @@
@file:OptIn(DelicateCoroutinesApi::class)
@file:OptIn(DelicateCoroutinesApi::class, ExperimentalCoroutinesApi::class)

package moe.fuqiuluo.shamrock.remote.service.api

import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.DelicateCoroutinesApi
import kotlinx.coroutines.GlobalScope
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.ExperimentalCoroutinesApi
import kotlinx.coroutines.cancel
import kotlinx.coroutines.launch
import kotlinx.coroutines.sync.Mutex
import kotlinx.coroutines.sync.withLock
import kotlinx.serialization.encodeToString
import kotlinx.serialization.json.Json
import moe.fuqiuluo.shamrock.remote.action.ActionManager
Expand All @@ -31,22 +32,23 @@ import org.java_websocket.server.WebSocketServer
import java.net.InetSocketAddress
import java.net.URI
import java.util.Collections
import java.util.Timer
import kotlin.concurrent.timer
import kotlin.coroutines.CoroutineContext

internal abstract class WebSocketTransmitServlet(
host:String,
port: Int,
protected val heartbeatInterval: Long,
) : BaseTransmitServlet, WebSocketServer(InetSocketAddress(host, port)) {
private val sendLock = Mutex()
) : BaseTransmitServlet, WebSocketServer(InetSocketAddress(host, port)), CoroutineScope {
private lateinit var heartbeatTask: Timer
protected val eventReceivers: MutableList<WebSocket> = Collections.synchronizedList(mutableListOf<WebSocket>())

init {
connectionLostTimeout = 0
}

override val address: String
get() = "-"
override val address: String = "-"

override fun transmitAccess(): Boolean {
return ShamrockConfig.openWebSocket()
Expand All @@ -62,7 +64,7 @@ internal abstract class WebSocketTransmitServlet(

init {
if (heartbeatInterval > 0) {
timer("heartbeat", true, 0, heartbeatInterval) {
heartbeatTask = timer("heartbeat", true, 0, heartbeatInterval) {
val runtime = AppRuntimeFetcher.appRuntime
val curUin = runtime.currentAccountUin
LogCenter.log("WebSocket心跳: $curUin", Level.DEBUG)
Expand Down Expand Up @@ -104,7 +106,7 @@ internal abstract class WebSocketTransmitServlet(

override fun onMessage(conn: WebSocket, message: String) {
val path = URI.create(conn.resourceDescriptor).path
GlobalScope.launch {
launch {
onHandleAction(conn, message, path)
}
}
Expand All @@ -130,21 +132,26 @@ internal abstract class WebSocketTransmitServlet(
override fun onError(conn: WebSocket, ex: Exception?) {
LogCenter.log("WSServer Error: " + ex?.stackTraceToString(), Level.ERROR)
unsubscribe()
coroutineContext.cancel()
if (::heartbeatTask.isInitialized) {
heartbeatTask.cancel()
}
}

override fun onStart() {
LogCenter.log("WSServer start running on ws://${getAddress()}!")
init()
}

protected suspend inline fun <reified T> pushTo(body: T) {
protected inline fun <reified T> pushTo(body: T) {
if(!transmitAccess()) return
try {
sendLock.withLock {
broadcastTextEvent(GlobalJson.encodeToString(body))
}
broadcastTextEvent(GlobalJson.encodeToString(body))
} catch (e: Throwable) {
LogCenter.log("WS推送失败: ${e.stackTraceToString()}", Level.ERROR)
}
}

override val coroutineContext: CoroutineContext =
Dispatchers.IO.limitedParallelism(40)
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import java.io.RandomAccessFile
import java.net.HttpURLConnection
import java.net.URL
import kotlin.math.roundToInt
import kotlin.time.Duration.Companion.minutes

object DownloadUtils {
private const val MAX_THREAD = 4
Expand Down Expand Up @@ -71,7 +72,7 @@ object DownloadUtils {
}
processed += blockSize
}
withTimeoutOrNull(60000L) {
withTimeoutOrNull(1.minutes) {
while (progress.value < contentLength) {
if(progress.addAndGet(channel.receive()) >= contentLength) {
break
Expand Down
Loading

0 comments on commit c940aea

Please sign in to comment.