Skip to content

Commit

Permalink
perf(model-server): make version delta computation suspendable
Browse files Browse the repository at this point in the history
Execute non suspendable part outside request threads.
  • Loading branch information
odzhychko committed Mar 8, 2024
1 parent 4800396 commit f2f6c1b
Showing 1 changed file with 35 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,18 @@
*/
package org.modelix.model.server.handlers

import kotlinx.coroutines.Deferred
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.async
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.asFlow
import kotlinx.coroutines.flow.channelFlow
import kotlinx.coroutines.flow.map
import kotlinx.coroutines.flow.onEach
import kotlinx.coroutines.runBlocking
import kotlinx.coroutines.sync.Mutex
import kotlinx.coroutines.sync.withLock
import kotlinx.coroutines.withContext
import kotlinx.datetime.Clock
import org.apache.commons.collections4.map.LRUMap
import org.modelix.model.IKeyValueStore
Expand All @@ -29,8 +35,6 @@ import org.modelix.model.api.IReadTransaction
import org.modelix.model.api.ITree
import org.modelix.model.api.IdGeneratorDummy
import org.modelix.model.api.PBranch
import org.modelix.model.api.runSynchronized
import org.modelix.model.client2.checkObjectHashes
import org.modelix.model.lazy.BranchReference
import org.modelix.model.lazy.CLTree
import org.modelix.model.lazy.CLVersion
Expand Down Expand Up @@ -258,25 +262,15 @@ class RepositoriesManager(val client: LocalModelClient) {
?: throw IllegalStateException("No version found for branch '${branch.branchName}' in repository '${branch.repositoryId}'")
}

private val deltaCache = LRUMap<Pair<String, String?>, SoftReference<Lazy<ObjectDataMap>>>(10)
fun computeDelta(versionHash: String, baseVersionHash: String?): ObjectData {
private val versionDeltaCache = VersionDeltaCache(client.storeCache)
suspend fun computeDelta(versionHash: String, baseVersionHash: String?): ObjectData {
if (versionHash == baseVersionHash) return ObjectData.empty
if (baseVersionHash == null) {
// no need to cache anything if there is no delta computation happening
return allObjectDataAsFlow(versionHash)
}

return runSynchronized(deltaCache) {
val key = versionHash to baseVersionHash
deltaCache.get(key)?.get() ?: lazy {
// lazy { ... } allows to run the computation without locking deltaCache
// SoftReference because deltas can be very large
val version = CLVersion(versionHash, client.storeCache)
val baseVersion = CLVersion(baseVersionHash, client.storeCache)
val objectsMap = version.computeDelta(baseVersion)
ObjectDataMap(objectsMap)
}.also { deltaCache[key] = SoftReference(it) }
}.value
return versionDeltaCache.getOrComputeDelta(versionHash, baseVersionHash)
}

private fun allObjectDataAsFlow(versionHash: String): ObjectDataFlow {
Expand Down Expand Up @@ -382,3 +376,29 @@ class ObjectDataFlow(private val hashObjectFlow: Flow<Pair<String, String>>) : O
private fun Flow<Pair<String, String>>.checkObjectHashes(): Flow<Pair<String, String>> {
return onEach { HashUtil.checkObjectHash(it.first, it.second) }
}

class VersionDeltaCache(val store: IDeserializingKeyValueStore) {
private val cacheMap = LRUMap<Pair<String, String?>, SoftReference<Deferred<ObjectDataMap>>>(10)

Check warning

Code scanning / detekt

Report magic numbers. Magic number is a numeric literal that is not defined as a constant and hence it's unclear what the purpose of this number is. It's better to declare such numbers as constants and give them a proper name. By default, -1, 0, 1, and 2 are not considered to be magic numbers. Warning

This expression contains a magic number. Consider defining it to a well named constant.
private val cacheMapMutex = Mutex()

suspend fun getOrComputeDelta(versionHash: String, baseVersionHash: String): ObjectDataMap {
val deferredDelta = cacheMapMutex.withLock {
val key = versionHash to baseVersionHash
val existingDeferredDelta = cacheMap[key]?.get()
if (existingDeferredDelta != null) {
existingDeferredDelta
} else {
val version = CLVersion(versionHash, store)
val baseVersion = CLVersion(baseVersionHash, store)
val newDeferredDelta = withContext(Dispatchers.IO) {
async {
ObjectDataMap(version.computeDelta(baseVersion))
}
}
cacheMap[key] = SoftReference(newDeferredDelta)
newDeferredDelta
}
}
return deferredDelta.await()
}
}

0 comments on commit f2f6c1b

Please sign in to comment.