Skip to content

Commit

Permalink
fix(model-server): do not execute long-running repository operation o…
Browse files Browse the repository at this point in the history
…n request threads

Execute them on Dispatchers.IO because they are mostly blocked by waiting for data from store.
Long-running operations are `computeDelta` and `mergeChanges`.
  • Loading branch information
odzhychko committed Mar 7, 2024
1 parent 2c8c792 commit 85297b9
Show file tree
Hide file tree
Showing 4 changed files with 36 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ import io.ktor.server.resources.get
import io.ktor.server.resources.post
import io.ktor.server.response.respondRedirect
import io.ktor.server.routing.routing
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.withContext
import kotlinx.datetime.Instant
import kotlinx.datetime.TimeZone
import kotlinx.datetime.toJavaLocalDateTime
Expand Down Expand Up @@ -92,7 +94,9 @@ class HistoryHandler(val client: IModelClient, private val repositoriesManager:
val fromVersion = params["from"]!!
val toVersion = params["to"]!!
val user = getUserName()
revert(branch, fromVersion, toVersion, user)
withContext(Dispatchers.IO) {
revert(branch, fromVersion, toVersion, user)
}
call.respondRedirect(".")
}
// post("/history/{repoId}/{branch}/delete") {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ import io.ktor.server.resources.put
import io.ktor.server.response.respondText
import io.ktor.server.routing.routing
import io.ktor.util.pipeline.PipelineContext
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.withContext
import kotlinx.html.br
import kotlinx.html.div
import kotlinx.html.h1
Expand Down Expand Up @@ -164,7 +166,9 @@ class KeyValueLikeModelServer(val repositoriesManager: RepositoriesManager) {
val key = call.parameters["key"]!!
val value = call.receiveText()
try {
putEntries(mapOf(key to value))
withContext(Dispatchers.IO) {
putEntries(mapOf(key to value))
}
call.respondText("OK")
} catch (e: NotFoundException) {
call.respondText(e.message ?: "Not found", status = HttpStatusCode.NotFound)
Expand Down Expand Up @@ -283,7 +287,7 @@ class KeyValueLikeModelServer(val repositoriesManager: RepositoriesManager) {
return result
}

protected fun CallContext.putEntries(newEntries: Map<String, String?>) {
protected suspend fun CallContext.putEntries(newEntries: Map<String, String?>) = withContext(Dispatchers.IO) {

Check warning

Code scanning / detekt

Prefer splitting up complex methods into smaller, easier to test methods. Warning

The function putEntries appears to be too complex based on Cyclomatic Complexity (complexity: 18). Defined complexity threshold for methods is set to '15'
val referencedKeys: MutableSet<String> = HashSet()
for ((key, value) in newEntries) {
checkKeyPermission(key, EPermissionType.WRITE)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -189,7 +189,9 @@ class ModelReplicationServer(val repositoriesManager: RepositoriesManager) {
val deltaFromClient = call.receive<VersionDelta>()
deltaFromClient.checkObjectHashes()
storeClient.putAll(deltaFromClient.getAllObjects())
val mergedHash = repositoriesManager.mergeChanges(branchRef(), deltaFromClient.versionHash)
val mergedHash = withContext(Dispatchers.IO) {
repositoriesManager.mergeChanges(branchRef(), deltaFromClient.versionHash)
}
call.respondDelta(mergedHash, deltaFromClient.versionHash)
}

Expand Down Expand Up @@ -226,14 +228,15 @@ class ModelReplicationServer(val repositoriesManager: RepositoriesManager) {
webSocket("listen") {
var lastVersionHash = call.request.queryParameters["lastKnown"]
while (coroutineContext[Job]?.isCancelled == false) {
val newVersionHash =
repositoriesManager.pollVersionHash(call.branchRef(), lastVersionHash)
val newVersionHash = repositoriesManager.pollVersionHash(call.branchRef(), lastVersionHash)
val objectMap = withContext(Dispatchers.IO) {
repositoriesManager.computeDelta(newVersionHash, lastVersionHash).checkObjectHashes().toMap()
}
val delta = VersionDelta(
newVersionHash,
lastVersionHash,
objectsMap = repositoriesManager.computeDelta(newVersionHash, lastVersionHash).toMap(),
objectsMap = objectMap,
)
delta.checkObjectHashes()
send(Json.encodeToString(delta))
lastVersionHash = newVersionHash
}
Expand Down Expand Up @@ -295,7 +298,9 @@ class ModelReplicationServer(val repositoriesManager: RepositoriesManager) {
baseVersion = version,
operations = ops.map { it.getOriginalOp() }.toTypedArray(),
)
repositoriesManager.mergeChanges(branchRef, newVersion.getContentHash())
withContext(Dispatchers.IO) {
repositoriesManager.mergeChanges(branchRef, newVersion.getContentHash())
}
}
})
}
Expand Down Expand Up @@ -367,16 +372,22 @@ class ModelReplicationServer(val repositoriesManager: RepositoriesManager) {
}

private suspend fun ApplicationCall.respondDeltaAsJson(versionHash: String, baseVersionHash: String?) {
val objectsMap = withContext(Dispatchers.IO) {
repositoriesManager.computeDelta(versionHash, baseVersionHash).checkObjectHashes().toMap()
}
val delta = VersionDelta(
versionHash,
baseVersionHash,
objectsMap = repositoriesManager.computeDelta(versionHash, baseVersionHash).toMap(),
objectsMap = objectsMap,
)
delta.checkObjectHashes()
respond(delta)
}

private suspend fun ApplicationCall.respondDeltaAsObjectStream(versionHash: String, baseVersionHash: String?, plainText: Boolean) {
private suspend fun ApplicationCall.respondDeltaAsObjectStream(
versionHash: String,
baseVersionHash: String?,
plainText: Boolean,
) {
respondTextWriter(contentType = if (plainText) ContentType.Text.Plain else VersionDeltaStream.CONTENT_TYPE) {
repositoriesManager.computeDelta(versionHash, baseVersionHash)
.checkObjectHashes()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,11 @@ class ModelQLServer private constructor(val rootNodeProvider: () -> INode?, val
handleCall(call, { rootNode to area }, {})
}

suspend fun handleCall(call: ApplicationCall, input: (write: Boolean) -> Pair<INode, IArea>, afterQueryExecution: () -> Unit = {}) {
suspend fun handleCall(
call: ApplicationCall,
input: (write: Boolean) -> Pair<INode, IArea>,
afterQueryExecution: suspend () -> Unit = {},
) {
try {
val serializedQuery = call.receiveText()
val json = UntypedModelQL.json
Expand Down

0 comments on commit 85297b9

Please sign in to comment.