Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(model-server): do not execute long-running repository operation on request threads #563

Merged
merged 4 commits into from
Mar 11, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,8 @@ class HistoryHandler(val client: IModelClient, private val repositoriesManager:
val params = call.request.queryParameters
val limit = toInt(params["limit"], 500)
val skip = toInt(params["skip"], 0)
val latestVersion = repositoriesManager.getVersion(branch)
checkNotNull(latestVersion) { "Branch not found: $branch" }
call.respondHtmlTemplate(PageWithMenuBar("repos/", "../../..")) {
headContent {
style {
Expand All @@ -80,7 +82,7 @@ class HistoryHandler(val client: IModelClient, private val repositoriesManager:
repositoryPageStyle()
}
bodyContent {
buildRepositoryPage(branch, params["head"], skip, limit)
buildRepositoryPage(branch, latestVersion, params["head"], skip, limit)
}
}
}
Expand All @@ -105,7 +107,7 @@ class HistoryHandler(val client: IModelClient, private val repositoriesManager:
}
}

fun revert(repositoryAndBranch: BranchReference, from: String?, to: String?, author: String?) {
suspend fun revert(repositoryAndBranch: BranchReference, from: String?, to: String?, author: String?) {
val version = repositoriesManager.getVersion(repositoryAndBranch) ?: throw RuntimeException("Branch doesn't exist: $repositoryAndBranch")
val branch = OTBranch(PBranch(version.tree, client.idGenerator), client.idGenerator, client.storeCache!!)
branch.runWriteT { t ->
Expand Down Expand Up @@ -160,8 +162,13 @@ class HistoryHandler(val client: IModelClient, private val repositoriesManager:
}
}

private fun FlowContent.buildRepositoryPage(repositoryAndBranch: BranchReference, headHash: String?, skip: Int, limit: Int) {
val latestVersion = repositoriesManager.getVersion(repositoryAndBranch) ?: throw RuntimeException("Branch not found: $repositoryAndBranch")
private fun FlowContent.buildRepositoryPage(
repositoryAndBranch: BranchReference,
latestVersion: CLVersion,
headHash: String?,
skip: Int,
limit: Int,
) {
val headVersion = if (headHash == null || headHash.length == 0) latestVersion else CLVersion(headHash, client.storeCache!!)
var rowIndex = 0
h1 {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import io.ktor.server.response.respondText
import io.ktor.server.routing.routing
import io.ktor.util.pipeline.PipelineContext
import kotlinx.coroutines.runBlocking
import kotlinx.html.br
import kotlinx.html.div
import kotlinx.html.h1
Expand All @@ -48,12 +49,12 @@
import org.modelix.model.persistent.HashUtil
import org.modelix.model.server.store.IStoreClient
import org.modelix.model.server.store.pollEntry
import org.modelix.model.server.store.runTransactionSuspendable
import org.modelix.model.server.templates.PageWithMenuBar
import org.slf4j.LoggerFactory
import java.io.IOException
import java.util.*
import java.util.regex.Pattern
import kotlin.collections.LinkedHashMap

val PERMISSION_MODEL_SERVER = "model-server".asResource()
val MODEL_SERVER_ENTRY = KeycloakResourceType("model-server-entry", KeycloakScope.READ_WRITE_DELETE)
Expand Down Expand Up @@ -85,7 +86,7 @@
// request to initialize it lazily, would make the code less robust.
// Each change in the logic of RepositoriesManager#maybeInitAndGetSeverId would need
// the special conditions in the affected requests to be updated.
repositoriesManager.maybeInitAndGetSeverId()
runBlocking { repositoriesManager.maybeInitAndGetSeverId() }
application.apply {
modelServerModule()
}
Expand All @@ -111,7 +112,11 @@
if (isHealthy()) {
call.respondText(text = "healthy", contentType = ContentType.Text.Plain, status = HttpStatusCode.OK)
} else {
call.respondText(text = "not healthy", contentType = ContentType.Text.Plain, status = HttpStatusCode.InternalServerError)
call.respondText(
text = "not healthy",
contentType = ContentType.Text.Plain,
status = HttpStatusCode.InternalServerError,
)
}
}
get<Paths.getHeaders> {
Expand Down Expand Up @@ -283,71 +288,76 @@
return result
}

protected fun CallContext.putEntries(newEntries: Map<String, String?>) {
protected suspend fun CallContext.putEntries(newEntries: Map<String, String?>) {

Check warning

Code scanning / detekt

Prefer splitting up complex methods into smaller, easier to test methods. Warning

The function putEntries appears to be too complex based on Cyclomatic Complexity (complexity: 18). Defined complexity threshold for methods is set to '15'

Check warning

Code scanning / detekt

Excessive nesting leads to hidden complexity. Prefer extracting code to make it easier to understand. Warning

Function putEntries is nested too deeply.
val referencedKeys: MutableSet<String> = HashSet()
for ((key, value) in newEntries) {
checkKeyPermission(key, EPermissionType.WRITE)
if (value != null) {
val matcher = HASH_PATTERN.matcher(value)
while (matcher.find()) {
val foundKey = matcher.group()
if (!newEntries.containsKey(foundKey)) {
referencedKeys.add(foundKey)
}
}
}
}
val referencedEntries = storeClient.getAll(referencedKeys)
for (key in referencedKeys) {
if (referencedEntries[key] == null) {
throw NotFoundException("Referenced key $key not found")
}
}

// Entries were previously written directly to the store.
// Now we use the RepositoriesManager to merge changes instead of just overwriting a branch.

val hashedObjects = LinkedHashMap<String, String>()
val branchChanges = LinkedHashMap<BranchReference, String?>()
val userDefinedEntries = LinkedHashMap<String, String?>()

for ((key, value) in newEntries) {
when {
HashUtil.isSha256(key) -> {
hashedObjects[key] = value ?: throw IllegalArgumentException("No value provided for $key")
}

BranchReference.tryParseBranch(key) != null -> {
branchChanges[BranchReference.tryParseBranch(key)!!] = value
}

key.startsWith(PROTECTED_PREFIX) -> {
throw NoPermissionException("Access to keys starting with '$PROTECTED_PREFIX' is only permitted to the model server itself.")
}

key.startsWith(RepositoriesManager.KEY_PREFIX) -> {
throw NoPermissionException("Access to keys starting with '${RepositoriesManager.KEY_PREFIX}' is only permitted to the model server itself.")
}

key == RepositoriesManager.LEGACY_SERVER_ID_KEY || key == RepositoriesManager.LEGACY_SERVER_ID_KEY2 -> {
throw NoPermissionException("'$key' is read-only.")
}

else -> {
userDefinedEntries[key] = value
}
}
}

HashUtil.checkObjectHashes(hashedObjects)

repositoriesManager.client.store.runTransaction {
repositoriesManager.client.store.runTransactionSuspendable {
storeClient.putAll(hashedObjects)
storeClient.putAll(userDefinedEntries)
for ((branch, value) in branchChanges) {
if (value == null) {
repositoriesManager.removeBranches(branch.repositoryId, setOf(branch.branchName))
repositoriesManager.removeBranchesBlocking(branch.repositoryId, setOf(branch.branchName))
} else {
repositoriesManager.mergeChanges(branch, value)
repositoriesManager.mergeChangesBlocking(branch, value)
}
}
}
}

Check warning

Code scanning / detekt

Member with protected visibility in final class is private. Consider using private or internal as modifier. Warning

Member with protected visibility in final class is private. Consider using private or internal as modifier.

private suspend fun CallContext.respondValue(key: String, value: String?) {
if (value == null) {
Expand All @@ -365,7 +375,10 @@
if (key.startsWith(RepositoriesManager.KEY_PREFIX)) {
throw NoPermissionException("Access to keys starting with '${RepositoriesManager.KEY_PREFIX}' is only permitted to the model server itself.")
}
if ((key == RepositoriesManager.LEGACY_SERVER_ID_KEY || key == RepositoriesManager.LEGACY_SERVER_ID_KEY2) && type.includes(EPermissionType.WRITE)) {
if ((key == RepositoriesManager.LEGACY_SERVER_ID_KEY || key == RepositoriesManager.LEGACY_SERVER_ID_KEY2) && type.includes(
EPermissionType.WRITE,
)
) {
throw NoPermissionException("'$key' is read-only.")
}
if (HashUtil.isSha256(key)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,6 @@ import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.Job
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.flow
import kotlinx.coroutines.flow.onEach
import kotlinx.coroutines.flow.onEmpty
import kotlinx.coroutines.flow.withIndex
import kotlinx.coroutines.withContext
Expand All @@ -61,7 +60,6 @@ import org.modelix.model.operations.OTBranch
import org.modelix.model.persistent.HashUtil
import org.modelix.model.server.api.v2.VersionDelta
import org.modelix.model.server.api.v2.VersionDeltaStream
import org.modelix.model.server.api.v2.toMap
import org.modelix.model.server.store.IStoreClient
import org.modelix.model.server.store.LocalModelClient
import org.modelix.modelql.server.ModelQLServer
Expand Down Expand Up @@ -231,9 +229,8 @@ class ModelReplicationServer(val repositoriesManager: RepositoriesManager) {
val delta = VersionDelta(
newVersionHash,
lastVersionHash,
objectsMap = repositoriesManager.computeDelta(newVersionHash, lastVersionHash).toMap(),
objectsMap = repositoriesManager.computeDelta(newVersionHash, lastVersionHash).asMap(),
)
delta.checkObjectHashes()
send(Json.encodeToString(delta))
lastVersionHash = newVersionHash
}
Expand Down Expand Up @@ -370,16 +367,14 @@ class ModelReplicationServer(val repositoriesManager: RepositoriesManager) {
val delta = VersionDelta(
versionHash,
baseVersionHash,
objectsMap = repositoriesManager.computeDelta(versionHash, baseVersionHash).toMap(),
objectsMap = repositoriesManager.computeDelta(versionHash, baseVersionHash).asMap(),
)
delta.checkObjectHashes()
respond(delta)
}

private suspend fun ApplicationCall.respondDeltaAsObjectStream(versionHash: String, baseVersionHash: String?, plainText: Boolean) {
respondTextWriter(contentType = if (plainText) ContentType.Text.Plain else VersionDeltaStream.CONTENT_TYPE) {
repositoriesManager.computeDelta(versionHash, baseVersionHash)
.checkObjectHashes()
repositoriesManager.computeDelta(versionHash, baseVersionHash).asFlow()
.flatten()
.withSeparator("\n")
.onEmpty { emit(versionHash) }
Expand Down Expand Up @@ -410,7 +405,3 @@ private fun Flow<String>.withSeparator(separator: String) = flow {
emit(it)
}
}

private fun <V : String?> Flow<Pair<String, V>>.checkObjectHashes(): Flow<Pair<String, V>> {
return onEach { HashUtil.checkObjectHash(it.first, it.second) }
}
Loading
Loading