diff --git a/jacodb-analysis/src/main/kotlin/org/jacodb/analysis/ifds/Producer.kt b/jacodb-analysis/src/main/kotlin/org/jacodb/analysis/ifds/Producer.kt
new file mode 100644
index 000000000..0dab3c883
--- /dev/null
+++ b/jacodb-analysis/src/main/kotlin/org/jacodb/analysis/ifds/Producer.kt
@@ -0,0 +1,156 @@
+/*
+ * Copyright 2022 UnitTestBot contributors (utbot.org)
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.jacodb.analysis.ifds
+
+import java.util.concurrent.atomic.AtomicInteger
+import java.util.concurrent.atomic.AtomicReference
+
+interface Producer {
+ fun produce(event: T)
+ fun subscribe(consumer: Consumer)
+}
+
+fun interface Consumer {
+ fun consume(event: T)
+}
+
+class SyncProducer : Producer {
+ private val consumers: MutableList> = mutableListOf()
+ private val events: MutableList = mutableListOf()
+
+ @Synchronized
+ override fun produce(event: T) {
+ for (consumer in consumers) {
+ consumer.consume(event)
+ }
+ events.add(event)
+ }
+
+ @Synchronized
+ override fun subscribe(consumer: Consumer) {
+ for (event in events) {
+ consumer.consume(event)
+ }
+ consumers.add(consumer)
+ }
+}
+
+sealed interface ConsList : Iterable
+
+object Nil : ConsList {
+ override fun iterator(): Iterator = object : Iterator {
+ override fun hasNext(): Boolean = false
+ override fun next(): Nothing {
+ throw NoSuchElementException()
+ }
+ }
+
+ override fun toString(): String = javaClass.simpleName
+}
+
+data class Cons(
+ val value: T,
+ val tail: ConsList,
+) : ConsList {
+ override fun iterator(): Iterator = Iter(this)
+
+ private class Iter(private var list: ConsList) : Iterator {
+ override fun hasNext(): Boolean = list !is Nil
+
+ override fun next(): T = when (val list = list) {
+ is Nil -> throw NoSuchElementException()
+ is Cons -> {
+ val value = list.value
+ this.list = list.tail
+ value
+ }
+ }
+ }
+}
+
+class NonBlockingQueue {
+ data class Node(
+ val value: T,
+ @Volatile var next: Node? = null,
+ )
+
+ var head: Node? = null
+ private set
+ val tail: AtomicReference> = AtomicReference(head)
+ val size: AtomicInteger = AtomicInteger(0)
+
+ fun add(element: T) {
+ val node = Node(element)
+ var currentTail: Node?
+ while (true) {
+ currentTail = tail.get()
+ if (tail.compareAndSet(currentTail, node)) break
+ }
+ if (currentTail != null) {
+ currentTail.next = node
+ } else {
+ head = node
+ }
+ size.incrementAndGet()
+ }
+}
+
+class ConcurrentProducer : Producer {
+ private var consumers: AtomicReference>> = AtomicReference(Nil)
+ private val events: NonBlockingQueue = NonBlockingQueue()
+
+ override fun produce(event: T) {
+ var currentConsumers: ConsList>
+ while (true) {
+ currentConsumers = consumers.get() ?: continue
+ if (consumers.compareAndSet(currentConsumers, null)) break
+ }
+
+ events.add(event)
+
+ try {
+ for (consumer in currentConsumers) {
+ consumer.consume(event)
+ }
+ } finally {
+ check(consumers.compareAndSet(null, currentConsumers))
+ }
+ }
+
+ override fun subscribe(consumer: Consumer) {
+ var last: NonBlockingQueue.Node? = null
+ while (true) {
+ val start = if (last != null) last.next else events.head
+ var current = start
+ while (current != null) {
+ last = current
+ consumer.consume(current.value)
+ current = current.next
+ }
+
+ val currentConsumers = consumers.get() ?: continue
+ if (!consumers.compareAndSet(currentConsumers, null)) continue
+ if (events.tail.get() === last) {
+ val newConsumers = Cons(consumer, currentConsumers)
+ check(consumers.compareAndSet(null, newConsumers))
+ break
+ } else {
+ check(consumers.compareAndSet(null, currentConsumers))
+ }
+ }
+ }
+}
diff --git a/jacodb-analysis/src/main/kotlin/org/jacodb/analysis/ifds/Runner.kt b/jacodb-analysis/src/main/kotlin/org/jacodb/analysis/ifds/Runner.kt
index a8fc4f916..2214ab5ca 100644
--- a/jacodb-analysis/src/main/kotlin/org/jacodb/analysis/ifds/Runner.kt
+++ b/jacodb-analysis/src/main/kotlin/org/jacodb/analysis/ifds/Runner.kt
@@ -110,6 +110,7 @@ class UniRunner(
// Add edge to worklist:
workList.trySend(edge).getOrThrow()
+ manager.handleControlEvent(queueIsNotEmpty)
return true
}
@@ -122,7 +123,6 @@ class UniRunner(
val edge = workList.tryReceive().getOrElse {
manager.handleControlEvent(queueIsEmpty)
val edge = workList.receive()
- manager.handleControlEvent(queueIsNotEmpty)
edge
}
tabulationAlgorithmStep(edge, this@coroutineScope)
diff --git a/jacodb-analysis/src/main/kotlin/org/jacodb/analysis/ifds/Summary.kt b/jacodb-analysis/src/main/kotlin/org/jacodb/analysis/ifds/Summary.kt
index 3f0c90206..79935ac74 100644
--- a/jacodb-analysis/src/main/kotlin/org/jacodb/analysis/ifds/Summary.kt
+++ b/jacodb-analysis/src/main/kotlin/org/jacodb/analysis/ifds/Summary.kt
@@ -16,7 +16,6 @@
package org.jacodb.analysis.ifds
-import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.MutableSharedFlow
import kotlinx.coroutines.flow.SharedFlow
import org.jacodb.api.JcMethod
@@ -48,56 +47,76 @@ interface Vulnerability : Summary {
/**
* Contains summaries for many methods and allows to update them and subscribe for them.
*/
-interface SummaryStorage {
+class SummaryStorageWithFlows {
+ private val summaries = ConcurrentHashMap>()
+ private val outFlows = ConcurrentHashMap>()
+
/**
- * A list of all methods for which summaries are not empty.
+ * @return a list with all methods for which there are some summaries.
*/
- val knownMethods: List
+ val knownMethods: List
+ get() = summaries.keys.toList()
+
+ private fun getFlow(method: JcMethod): MutableSharedFlow {
+ return outFlows.computeIfAbsent(method) {
+ MutableSharedFlow(replay = Int.MAX_VALUE)
+ }
+ }
/**
- * Adds [fact] to summary of its method.
+ * Adds a new [fact] to the storage.
*/
- fun add(fact: T)
+ fun add(fact: T) {
+ val isNew = summaries.computeIfAbsent(fact.method) { ConcurrentHashMap.newKeySet() }.add(fact)
+ if (isNew) {
+ val flow = getFlow(fact.method)
+ check(flow.tryEmit(fact))
+ }
+ }
/**
* @return a flow with all facts summarized for the given [method].
* Already received facts, along with the facts that will be sent to this storage later,
* will be emitted to the returned flow.
*/
- fun getFacts(method: JcMethod): Flow
+ fun getFacts(method: JcMethod): SharedFlow {
+ return getFlow(method)
+ }
/**
* @return a list will all facts summarized for the given [method] so far.
*/
- fun getCurrentFacts(method: JcMethod): List
+ fun getCurrentFacts(method: JcMethod): List {
+ return getFacts(method).replayCache
+ }
}
-class SummaryStorageImpl : SummaryStorage {
+class SummaryStorageWithProducers(
+ private val useConcurrentProducer: Boolean = false,
+) {
private val summaries = ConcurrentHashMap>()
- private val outFlows = ConcurrentHashMap>()
-
- override val knownMethods: List
- get() = summaries.keys.toList()
-
- private fun getFlow(method: JcMethod): MutableSharedFlow {
- return outFlows.computeIfAbsent(method) {
- MutableSharedFlow(replay = Int.MAX_VALUE)
+ private val producers = ConcurrentHashMap>()
+
+ private fun getProducer(method: JcMethod): Producer {
+ return producers.computeIfAbsent(method) {
+ if (useConcurrentProducer) {
+ ConcurrentProducer()
+ } else {
+ SyncProducer()
+ }
}
}
- override fun add(fact: T) {
+ fun add(fact: T) {
val isNew = summaries.computeIfAbsent(fact.method) { ConcurrentHashMap.newKeySet() }.add(fact)
if (isNew) {
- val flow = getFlow(fact.method)
- check(flow.tryEmit(fact))
+ val producer = getProducer(fact.method)
+ producer.produce(fact)
}
}
- override fun getFacts(method: JcMethod): SharedFlow {
- return getFlow(method)
- }
-
- override fun getCurrentFacts(method: JcMethod): List {
- return getFacts(method).replayCache
+ fun subscribe(method: JcMethod, handler: (T) -> Unit) {
+ val producer = getProducer(method)
+ producer.subscribe(handler)
}
}
diff --git a/jacodb-analysis/src/main/kotlin/org/jacodb/analysis/taint/TaintManager.kt b/jacodb-analysis/src/main/kotlin/org/jacodb/analysis/taint/TaintManager.kt
index f81a1599c..e27cd228d 100644
--- a/jacodb-analysis/src/main/kotlin/org/jacodb/analysis/taint/TaintManager.kt
+++ b/jacodb-analysis/src/main/kotlin/org/jacodb/analysis/taint/TaintManager.kt
@@ -22,8 +22,6 @@ import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.cancelAndJoin
import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.delay
-import kotlinx.coroutines.flow.launchIn
-import kotlinx.coroutines.flow.onEach
import kotlinx.coroutines.isActive
import kotlinx.coroutines.joinAll
import kotlinx.coroutines.launch
@@ -34,7 +32,9 @@ import org.jacodb.analysis.ifds.ControlEvent
import org.jacodb.analysis.ifds.IfdsResult
import org.jacodb.analysis.ifds.Manager
import org.jacodb.analysis.ifds.QueueEmptinessChanged
-import org.jacodb.analysis.ifds.SummaryStorageImpl
+import org.jacodb.analysis.ifds.SummaryEdge
+import org.jacodb.analysis.ifds.SummaryStorageWithFlows
+import org.jacodb.analysis.ifds.SummaryStorageWithProducers
import org.jacodb.analysis.ifds.TraceGraph
import org.jacodb.analysis.ifds.UniRunner
import org.jacodb.analysis.ifds.UnitResolver
@@ -63,8 +63,8 @@ open class TaintManager(
protected val runnerForUnit: MutableMap = hashMapOf()
private val queueIsEmpty = ConcurrentHashMap()
- private val summaryEdgesStorage = SummaryStorageImpl()
- private val vulnerabilitiesStorage = SummaryStorageImpl()
+ private val summaryEdgesStorage = SummaryStorageWithProducers>()
+ private val vulnerabilitiesStorage = SummaryStorageWithFlows()
private val stopRendezvous = Channel(Channel.RENDEZVOUS)
@@ -277,10 +277,7 @@ open class TaintManager(
scope: CoroutineScope,
handler: (TaintEdge) -> Unit,
) {
- summaryEdgesStorage
- .getFacts(method)
- .onEach { handler(it.edge) }
- .launchIn(scope)
+ summaryEdgesStorage.subscribe(method) { handler(it.edge) }
}
fun vulnerabilityTraceGraph(vulnerability: TaintVulnerability): TraceGraph {
diff --git a/jacodb-analysis/src/main/kotlin/org/jacodb/analysis/unused/UnusedVariableManager.kt b/jacodb-analysis/src/main/kotlin/org/jacodb/analysis/unused/UnusedVariableManager.kt
index ea3a99d01..6779653f2 100644
--- a/jacodb-analysis/src/main/kotlin/org/jacodb/analysis/unused/UnusedVariableManager.kt
+++ b/jacodb-analysis/src/main/kotlin/org/jacodb/analysis/unused/UnusedVariableManager.kt
@@ -34,7 +34,7 @@ import org.jacodb.analysis.ifds.Edge
import org.jacodb.analysis.ifds.Manager
import org.jacodb.analysis.ifds.QueueEmptinessChanged
import org.jacodb.analysis.ifds.Runner
-import org.jacodb.analysis.ifds.SummaryStorageImpl
+import org.jacodb.analysis.ifds.SummaryStorageWithFlows
import org.jacodb.analysis.ifds.UniRunner
import org.jacodb.analysis.ifds.UnitResolver
import org.jacodb.analysis.ifds.UnitType
@@ -62,8 +62,7 @@ class UnusedVariableManager(
private val runnerForUnit: MutableMap> = hashMapOf()
private val queueIsEmpty = ConcurrentHashMap()
- private val summaryEdgesStorage = SummaryStorageImpl()
- private val vulnerabilitiesStorage = SummaryStorageImpl()
+ private val summaryEdgesStorage = SummaryStorageWithFlows()
private val stopRendezvous = Channel(Channel.RENDEZVOUS)
diff --git a/jacodb-analysis/src/test/resources/additional.json b/jacodb-analysis/src/test/resources/additional.json
index 4c883436e..aac9b7655 100644
--- a/jacodb-analysis/src/test/resources/additional.json
+++ b/jacodb-analysis/src/test/resources/additional.json
@@ -603,5 +603,46 @@
}
}
]
+ },
+ {
+ "_": "MethodSource",
+ "methodInfo": {
+ "cls": {
+ "classNameMatcher": {
+ "_": "NameIsEqualTo",
+ "name": "Properties"
+ },
+ "packageMatcher": {
+ "_": "NameIsEqualTo",
+ "name": "java.util"
+ }
+ },
+ "functionName": {
+ "_": "NameIsEqualTo",
+ "name": "getProperty"
+ },
+ "applyToOverrides": true,
+ "exclude": [],
+ "functionLabel": null,
+ "modifier": -1,
+ "parametersMatchers": [],
+ "returnTypeMatcher": {
+ "_": "AnyTypeMatches"
+ }
+ },
+ "condition": {
+ "_": "ConstantTrue"
+ },
+ "actionsAfter": [
+ {
+ "_": "AssignMark",
+ "position": {
+ "_": "Result"
+ },
+ "mark": {
+ "name": "PROPERTY"
+ }
+ }
+ ]
}
]