Skip to content

Commit

Permalink
Improved plugin start in case of an high number of databases
Browse files Browse the repository at this point in the history
  • Loading branch information
conker84 committed Feb 28, 2022
1 parent 21070d7 commit f11efa0
Show file tree
Hide file tree
Showing 7 changed files with 108 additions and 75 deletions.
58 changes: 25 additions & 33 deletions common/src/main/kotlin/streams/config/StreamsConfig.kt
Original file line number Diff line number Diff line change
Expand Up @@ -3,25 +3,20 @@ package streams.config
import kotlinx.coroutines.runBlocking
import kotlinx.coroutines.sync.Mutex
import kotlinx.coroutines.sync.withLock
import org.neo4j.dbms.api.DatabaseManagementService
import org.neo4j.kernel.internal.GraphDatabaseAPI
import org.neo4j.logging.Log
import org.neo4j.logging.internal.LogService
import org.neo4j.plugin.configuration.ConfigurationLifecycle
import org.neo4j.plugin.configuration.ConfigurationLifecycleUtils
import org.neo4j.plugin.configuration.EventType
import org.neo4j.plugin.configuration.listners.ConfigurationLifecycleListener
import streams.extensions.databaseManagementService
import streams.extensions.getDefaultDbName
import streams.extensions.isAvailable
import streams.utils.Neo4jUtils
import streams.utils.ProcedureUtils
import streams.utils.StreamsUtils
import java.io.File
import java.util.concurrent.ConcurrentHashMap
import java.util.concurrent.atomic.AtomicReference

class StreamsConfig(private val log: Log, private val dbms: DatabaseManagementService) {
class StreamsConfig(private val log: Log) {

companion object {
private const val SUN_JAVA_COMMAND = "sun.java.command"
Expand All @@ -41,6 +36,8 @@ class StreamsConfig(private val log: Log, private val dbms: DatabaseManagementSe
const val POLL_INTERVAL = "streams.sink.poll.interval"
const val INSTANCE_WAIT_TIMEOUT = "streams.wait.timeout"
const val INSTANCE_WAIT_TIMEOUT_VALUE = 120000L
const val CONFIG_WAIT_FOR_AVAILABLE = "streams.wait.for.available"
const val CONFIG_WAIT_FOR_AVAILABLE_VALUE = true

private const val DEFAULT_TRIGGER_PERIOD: Int = 10000

Expand All @@ -59,8 +56,8 @@ class StreamsConfig(private val log: Log, private val dbms: DatabaseManagementSe

fun getInstance(db: GraphDatabaseAPI): StreamsConfig = cache.computeIfAbsent(StreamsUtils.getName(db)) {
StreamsConfig(log = db.dependencyResolver
.resolveDependency(LogService::class.java)
.getUserLog(StreamsConfig::class.java), db.databaseManagementService())
.resolveDependency(LogService::class.java)
.getUserLog(StreamsConfig::class.java))
}

fun removeInstance(db: GraphDatabaseAPI) {
Expand All @@ -83,11 +80,13 @@ class StreamsConfig(private val log: Log, private val dbms: DatabaseManagementSe
fun getSystemDbWaitTimeout(config: Map<String, Any?>) = config.getOrDefault(SYSTEM_DB_WAIT_TIMEOUT, SYSTEM_DB_WAIT_TIMEOUT_VALUE).toString().toLong()

fun getInstanceWaitTimeout(config: Map<String, Any?>) = config.getOrDefault(INSTANCE_WAIT_TIMEOUT, INSTANCE_WAIT_TIMEOUT_VALUE).toString().toLong()

fun isWaitForAvailable(config: Map<String, Any?>) = config.getOrDefault(CONFIG_WAIT_FOR_AVAILABLE, CONFIG_WAIT_FOR_AVAILABLE_VALUE).toString().toBoolean()
}

private val configLifecycle: ConfigurationLifecycle

private enum class Status {RUNNING, STOPPED, CLOSED, UNKNOWN}
enum class Status {RUNNING, STARTING, STOPPED, CLOSED, UNKNOWN}

private val status = AtomicReference(Status.UNKNOWN)

Expand All @@ -100,37 +99,30 @@ class StreamsConfig(private val log: Log, private val dbms: DatabaseManagementSe
true, log, true, "streams.", "kafka.")
}

fun start() = runBlocking {
fun start(db: GraphDatabaseAPI) = runBlocking {
if (log.isDebugEnabled) {
log.debug("Starting StreamsConfig")
}
mutex.withLock {
if (status.get() == Status.RUNNING) return@runBlocking
if (setOf(Status.RUNNING, Status.STARTING).contains(status.get())) return@runBlocking
try {
// wait for all database to be ready
val isInstanceReady = StreamsUtils.blockUntilFalseOrTimeout(getInstanceWaitTimeout()) {
if (log.isDebugEnabled) {
log.debug("Waiting for the Neo4j instance to be ready...")
}
dbms.isAvailable(100)
}
if (!isInstanceReady) {
log.warn("${getInstanceWaitTimeout()} ms have passed and the instance is not online, the Streams plugin will not started")
return@runBlocking
}
if (ProcedureUtils.isCluster(dbms)) {
log.info("We're in cluster instance waiting for the ${StreamsUtils.LEADER}s to be elected in each database")
// in case is a cluster we wait for the correct cluster formation => LEADER elected
Neo4jUtils.waitForTheLeaders(dbms, log) { configStart() }
if (isWaitForAvailable()) {
status.set(Status.STARTING)
Neo4jUtils.waitForAvailable(db, log, getInstanceWaitTimeout(), { status.set(Status.UNKNOWN) }) { configStart() }
} else {
configStart()
}
} catch (e: Exception) {
log.warn("Cannot start StreamsConfig because of the following exception:", e)
status.set(Status.UNKNOWN)
}
}
}

fun startEager() = runBlocking {
configStart()
}

private fun configStart() = try {
configLifecycle.start()
status.set(Status.RUNNING)
Expand All @@ -139,14 +131,16 @@ class StreamsConfig(private val log: Log, private val dbms: DatabaseManagementSe
log.error("Cannot start the StreamsConfig because of the following exception", e)
}

fun status(): Status = status.get()

fun stop(shutdown: Boolean = false) = runBlocking {
if (log.isDebugEnabled) {
log.debug("Stopping StreamsConfig")
}
mutex.withLock {
val status = getStopStatus(shutdown)
if (this@StreamsConfig.status.get() == status) return@runBlocking
configStop(shutdown, status)
val stopStatus = getStopStatus(shutdown)
if (status.get() == stopStatus) return@runBlocking
configStop(shutdown, stopStatus)
}
}

Expand Down Expand Up @@ -201,10 +195,6 @@ class StreamsConfig(private val log: Log, private val dbms: DatabaseManagementSe

fun getConfiguration(): Map<String, Any> = ConfigurationLifecycleUtils.toMap(configLifecycle.configuration)

fun defaultDbName() = this.dbms.getDefaultDbName()

fun isDefaultDb(dbName: String) = this.defaultDbName() == dbName

fun isSourceGloballyEnabled() = Companion.isSourceGloballyEnabled(getConfiguration())

fun isSourceEnabled(dbName: String) = Companion.isSourceEnabled(getConfiguration(), dbName)
Expand All @@ -221,4 +211,6 @@ class StreamsConfig(private val log: Log, private val dbms: DatabaseManagementSe

fun getInstanceWaitTimeout() = Companion.getInstanceWaitTimeout(getConfiguration())

fun isWaitForAvailable() = Companion.isWaitForAvailable(getConfiguration())

}
42 changes: 36 additions & 6 deletions common/src/main/kotlin/streams/config/StreamsConfigProcedures.kt
Original file line number Diff line number Diff line change
@@ -1,10 +1,9 @@
package streams.configuration
package streams.config

import org.neo4j.graphdb.GraphDatabaseService
import org.neo4j.kernel.internal.GraphDatabaseAPI
import org.neo4j.logging.Log
import org.neo4j.procedure.*
import streams.config.StreamsConfig
import streams.events.KeyValueResult
import java.util.stream.Stream

Expand All @@ -24,7 +23,7 @@ class StreamsConfigProcedures {
var db: GraphDatabaseService? = null

@Admin
@Procedure
@Procedure("streams.configuration.set")
@Description("""
streams.configuration.set(<properties_map>, <config_map>) YIELD name, value
""")
Expand All @@ -35,14 +34,13 @@ class StreamsConfigProcedures {
}
val map = properties.mapValues { it.value.toString() }
val instance = StreamsConfig.getInstance(db!! as GraphDatabaseAPI)
println("Instance hash: ${instance.hashCode()}")
val cfg = StreamsConfigProceduresConfiguration(config)
instance.setProperties(map, cfg.save)
return get()
}

@Admin
@Procedure
@Procedure("streams.configuration.remove")
@Description("""
streams.configuration.remove(<properties_list>, <config_map>) YIELD name, value
""")
Expand All @@ -58,7 +56,7 @@ class StreamsConfigProcedures {
}

@Admin
@Procedure
@Procedure("streams.configuration.get")
@Description("""
streams.configuration.get() YIELD name, value
""")
Expand All @@ -67,4 +65,36 @@ class StreamsConfigProcedures {
.entries
.map { KeyValueResult(it.key, it.value) }
.stream()

@Admin
@Procedure("streams.configuration.status")
@Description("""
streams.configuration.status() YIELD status
""")
fun status(): Stream<KeyValueResult> = Stream.of(KeyValueResult("status", StreamsConfig.getInstance(db!! as GraphDatabaseAPI).status().toString()))

@Admin
@Procedure("streams.configuration.start")
@Description("""
streams.configuration.start() YIELD status
""")
fun start(): Stream<KeyValueResult> {
val streamsConfig = StreamsConfig.getInstance(db!! as GraphDatabaseAPI)
if (!setOf(StreamsConfig.Status.RUNNING, StreamsConfig.Status.STARTING)
.contains(streamsConfig.status())) {
streamsConfig.startEager()
}
return Stream.of(KeyValueResult("status", streamsConfig.status().toString()))
}

@Admin
@Procedure("streams.configuration.stop")
@Description("""
streams.configuration.stop() YIELD status
""")
fun stop(): Stream<KeyValueResult> {
val streamsConfig = StreamsConfig.getInstance(db!! as GraphDatabaseAPI)
streamsConfig.stop()
return Stream.of(KeyValueResult("status", streamsConfig.status().toString()))
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package streams.extensions
import org.neo4j.dbms.api.DatabaseManagementService
import org.neo4j.kernel.internal.GraphDatabaseAPI
import streams.utils.StreamsUtils
import java.util.concurrent.TimeUnit

fun DatabaseManagementService.getSystemDb() = this.database(StreamsUtils.SYSTEM_DATABASE_NAME) as GraphDatabaseAPI

Expand Down
74 changes: 43 additions & 31 deletions common/src/main/kotlin/streams/utils/Neo4jUtils.kt
Original file line number Diff line number Diff line change
Expand Up @@ -4,30 +4,23 @@ import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.GlobalScope
import kotlinx.coroutines.delay
import kotlinx.coroutines.launch
import org.neo4j.configuration.Config
import org.neo4j.configuration.GraphDatabaseSettings
import kotlinx.coroutines.runBlocking
import org.neo4j.dbms.api.DatabaseManagementService
import org.neo4j.graphdb.DatabaseShutdownException
import org.neo4j.graphdb.QueryExecutionException
import org.neo4j.kernel.internal.GraphDatabaseAPI
import org.neo4j.logging.Log
import streams.extensions.execute
import streams.extensions.isAvailable
import java.lang.reflect.InvocationTargetException
import kotlin.streams.toList

object Neo4jUtils {
fun isWriteableInstance(db: GraphDatabaseAPI, availableAction: () -> Boolean = { true }): Boolean {
try {
val isSlave = StreamsUtils.ignoreExceptions(
{
val hadb = Class.forName("org.neo4j.kernel.ha.HighlyAvailableGraphDatabase")
hadb.isInstance(db) && !(hadb.getMethod("isMaster").invoke(db) as Boolean)
}, ClassNotFoundException::class.java, IllegalAccessException::class.java,
InvocationTargetException::class.java, NoSuchMethodException::class.java)
if (isSlave != null && isSlave) {
return false
}

return availableAction() && ProcedureUtils.clusterMemberRole(db).equals(StreamsUtils.LEADER, ignoreCase = true)
return availableAction() && ProcedureUtils
.clusterMemberRole(db)
.equals(StreamsUtils.LEADER, ignoreCase = true)
} catch (e: QueryExecutionException) {
if (e.statusCode.equals("Neo.ClientError.Procedure.ProcedureNotFound", ignoreCase = true)) {
return availableAction()
Expand Down Expand Up @@ -55,11 +48,8 @@ object Neo4jUtils {
.toList()
.contains(StreamsUtils.LEADER)
}
} catch (e: QueryExecutionException) {
if (e.statusCode.equals("Neo.ClientError.Procedure.ProcedureNotFound", ignoreCase = true)) {
false
}
throw e
} catch (e: Exception) {
false
}

fun <T> executeInWriteableInstance(db: GraphDatabaseAPI,
Expand All @@ -70,24 +60,46 @@ object Neo4jUtils {
null
}

fun isClusterCorrectlyFormed(dbms: DatabaseManagementService) = dbms.listDatabases()
.filterNot { it == StreamsUtils.SYSTEM_DATABASE_NAME }
.map { dbms.database(it) as GraphDatabaseAPI }
.all { clusterHasLeader(it) }

fun waitForTheLeaders(dbms: DatabaseManagementService, log: Log, timeout: Long = 120000, action: () -> Unit) {
fun waitForTheLeaders(db: GraphDatabaseAPI, log: Log, timeout: Long = 240000, onFailure: () -> Unit = {}, action: () -> Unit) {
GlobalScope.launch(Dispatchers.IO) {
val start = System.currentTimeMillis()
val delay: Long = 2000
while (!isClusterCorrectlyFormed(dbms) && System.currentTimeMillis() - start < timeout) {
log.info("${StreamsUtils.LEADER} not found, new check comes in $delay milliseconds...")
delay(delay)
var isClusterCorrectlyFormed: Boolean
do {
isClusterCorrectlyFormed = clusterHasLeader(db)
if (!isClusterCorrectlyFormed) {
log.info("${StreamsUtils.LEADER} not found, new check comes in $delay milliseconds...")
delay(delay)
}
} while (!isClusterCorrectlyFormed && System.currentTimeMillis() - start < timeout)
if (isClusterCorrectlyFormed) {
log.debug("${StreamsUtils.LEADER} has been found")
action()
} else {
log.warn("$timeout ms have passed and the ${StreamsUtils.LEADER} has not been elected, the Streams plugin will not started")
onFailure()
}
action()
}
}

fun isReadReplica(db: GraphDatabaseAPI): Boolean = db.dependencyResolver
.resolveDependency(Config::class.java)
.let { it.get(GraphDatabaseSettings.mode).name == "READ_REPLICA" }
fun waitForAvailable(db: GraphDatabaseAPI, log: Log, timeout: Long = 240000, onFailure: () -> Unit = {}, action: () -> Unit) {
GlobalScope.launch(Dispatchers.IO) {
val start = System.currentTimeMillis()
val delay: Long = 2000
var isAvailable: Boolean
do {
isAvailable = db.isAvailable(delay)
if (!isAvailable) {
log.debug("Waiting for Neo4j to be ready...")
}
} while (!isAvailable && System.currentTimeMillis() - start < timeout)
if (isAvailable) {
log.debug("Neo4j is ready")
action()
} else {
log.warn("$timeout ms have passed and Neo4j is not online, the Streams plugin will not started")
onFailure()
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ class StreamsEventSinkAvailabilityListener(dependencies: StreamsEventSinkExtensi
override fun available() = runBlocking {
mutex.withLock {
setAvailable(db, true)
streamsConfig.start()
streamsConfig.start(db)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1307,8 +1307,8 @@ class Neo4jSinkTaskTest {
}
}

@Test()
@Ignore("Ignore, flaky")
@Test
@Ignore("flaky")
fun `should stop the query and fails with small timeout and vice versa`() {
val myTopic = "foo"
val props = mutableMapOf<String, String>()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ class StreamsEventRouterAvailabilityListener(private val db: GraphDatabaseAPI,
override fun available() = runBlocking {
mutex.withLock {
setAvailable(db, true)
streamsConfig.start()
streamsConfig.start(db)
}
}

Expand Down

0 comments on commit f11efa0

Please sign in to comment.