Skip to content

Commit

Permalink
fixes neo4j-contrib#181: Validate configuration before attempting to …
Browse files Browse the repository at this point in the history
…start
  • Loading branch information
conker84 committed Jul 25, 2019
1 parent 580aa1e commit 4a6729b
Show file tree
Hide file tree
Showing 9 changed files with 200 additions and 42 deletions.
41 changes: 25 additions & 16 deletions common/src/main/kotlin/streams/utils/ValidationUtils.kt
Original file line number Diff line number Diff line change
@@ -1,22 +1,31 @@
package streams.utils

import java.io.IOException
import java.net.Socket
import java.net.URI

object ValidationUtils {

fun validateTopics(cdcMergeTopics: Set<String>, cdcSchemaTopics: Set<String>,
cypherTopics: Set<String>, nodePatternTopics: Set<String>, relPatternTopics: Set<String>) {
val allTopicsLists = mutableListOf<String>()
allTopicsLists.addAll(cdcMergeTopics)
allTopicsLists.addAll(cdcSchemaTopics)
allTopicsLists.addAll(cypherTopics)
allTopicsLists.addAll(nodePatternTopics)
allTopicsLists.addAll(relPatternTopics)
val crossDefinedTopics = allTopicsLists.map { it to 1 }
.groupBy({ it.first }, { it.second })
.mapValues { it.value.reduce { acc, i -> acc + i } }
.filterValues { it > 1 }
.keys
if (crossDefinedTopics.isNotEmpty()) {
throw RuntimeException("The following topics are cross defined: $crossDefinedTopics")
}
fun isServerReachable(url: String, port: Int): Boolean = try {
Socket(url, port).use { true }
} catch (e: IOException) {
false
}

fun checkServersUnreachable(urls: String, separator: String = ","): List<String> = urls
.split(separator)
.map {
val uri = URI.create(it)
when (uri.host.isNullOrBlank()) {
true -> {
val splitted = it.split(":")
URI("fake-scheme", "", splitted.first(), splitted.last().toInt(),
"", "", "")
}
else -> uri
}
}
.filter { uri -> !isServerReachable(uri.host, uri.port) }
.map { if (it.scheme == "fake-scheme") "${it.host}:${it.port}" else it.toString() }

}
36 changes: 36 additions & 0 deletions common/src/test/kotlin/streams/utils/ValidationUtilsTest.kt
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
package streams.utils

import org.junit.Test
import org.testcontainers.containers.GenericContainer
import kotlin.test.assertEquals
import kotlin.test.assertTrue

class FakeWebServer: GenericContainer<FakeWebServer>("alpine") {
override fun start() {
this.withCommand("/bin/sh", "-c", "while true; do { echo -e 'HTTP/1.1 200 OK'; echo ; } | nc -l -p 8000; done")
.withExposedPorts(8000)
super.start()
}

fun getUrl() = "http://localhost:${getMappedPort(8000)}"
}

class ValidationUtilsTest {

@Test
fun `should reach the server`() {
val httpServer = FakeWebServer()
httpServer.start()
assertTrue { ValidationUtils.checkServersUnreachable(httpServer.getUrl()).isEmpty() }
httpServer.stop()
}

@Test
fun `should not reach the server`() {
val urls = "http://my.fake.host:1234,PLAINTEXT://my.fake.host1:1234,my.fake.host2:1234"
val checkServersUnreachable = ValidationUtils
.checkServersUnreachable(urls)
assertTrue { checkServersUnreachable.isNotEmpty() }
assertEquals(urls.split(",").toList(), checkServersUnreachable)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,6 @@ class StreamsEventSinkExtensionFactory : KernelExtensionFactory<StreamsEventSink
streamsTopicService.clearAll()
streamsTopicService.setAll(streamsSinkConfiguration.topics)
eventSink.start()
streamsLog.info("Streams Sink module initialised")
}

override fun stop() {
Expand Down
28 changes: 16 additions & 12 deletions consumer/src/main/kotlin/streams/kafka/KafkaEventSink.kt
Original file line number Diff line number Diff line change
Expand Up @@ -68,20 +68,24 @@ class KafkaEventSink(private val config: Config,
return
}
log.info("Starting the Kafka Sink")
this.eventConsumer = getEventConsumerFactory()
.createStreamsEventConsumer(config.raw, log)
.withTopics(topics)
this.eventConsumer.start()
this.job = createJob()
if (isWriteableInstance) {
if (log.isDebugEnabled) {
log.debug("Subscribed topics with Cypher queries: ${streamsTopicService.getAllCypherTemplates()}")
log.debug("Subscribed topics with CDC configuration: ${streamsTopicService.getAllCDCTopics()}")
} else {
log.info("Subscribed topics: $topics")
try {
this.eventConsumer = getEventConsumerFactory()
.createStreamsEventConsumer(config.raw, log)
.withTopics(topics)
this.eventConsumer.start()
this.job = createJob()
if (isWriteableInstance) {
if (log.isDebugEnabled) {
log.debug("Subscribed topics with Cypher queries: ${streamsTopicService.getAllCypherTemplates()}")
log.debug("Subscribed topics with CDC configuration: ${streamsTopicService.getAllCDCTopics()}")
} else {
log.info("Subscribed topics: $topics")
}
}
log.info("Kafka Sink started")
} catch (e: Exception) {
log.warn("The Kafka Sink cannot be started because of this exception:", e)
}
log.info("Kafka Sink started")
}

override fun stop() = runBlocking { // TODO move to the abstract class
Expand Down
43 changes: 33 additions & 10 deletions consumer/src/main/kotlin/streams/kafka/KafkaSinkConfiguration.kt
Original file line number Diff line number Diff line change
Expand Up @@ -8,25 +8,35 @@ import org.neo4j.kernel.configuration.Config
import streams.StreamsSinkConfiguration
import streams.extensions.toPointCase
import streams.serialization.JSONUtils
import streams.utils.ValidationUtils
import java.util.*


private const val kafkaConfigPrefix = "kafka."

private val SUPPORTED_DESERIALIZER = listOf(ByteArrayDeserializer::class.java.name, KafkaAvroDeserializer::class.java.name)

private fun validateDeserializers(config: Map<String, String>) {
val kafkaKeyConfig = config[ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG].orEmpty()
val kafkaValueConfig = config[ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG].orEmpty()
val key = if (kafkaKeyConfig.isNotBlank() && !SUPPORTED_DESERIALIZER.contains(kafkaKeyConfig)) {
private fun validateDeserializers(config: KafkaSinkConfiguration) {
val key = if (!SUPPORTED_DESERIALIZER.contains(config.keyDeserializer)) {
ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG
} else if (kafkaValueConfig.isNotBlank() && !SUPPORTED_DESERIALIZER.contains(kafkaValueConfig)) {
} else if (!SUPPORTED_DESERIALIZER.contains(config.valueDeserializer)) {
ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG
} else {
""
}
if (key.isNotBlank()) {
throw RuntimeException("The property kafka.$key contains an invalid deserializer. Supported deserializers are $SUPPORTED_DESERIALIZER")
throw RuntimeException("The property `kafka.$key` contains an invalid deserializer. Supported deserializers are $SUPPORTED_DESERIALIZER")
}
}

private fun validateConnection(url: String, kafkaPropertyKey: String, checkReachable: Boolean = true) {
if (url.isBlank()) {
throw RuntimeException("The `kafka.$kafkaPropertyKey` property is empty")
} else if (checkReachable) {
val unreachableServers = ValidationUtils.checkServersUnreachable(url)
if (unreachableServers.isNotEmpty()) {
throw RuntimeException("The servers defined into the property `kafka.$kafkaPropertyKey` are not reachable: $unreachableServers")
}
}
}

Expand All @@ -42,19 +52,25 @@ data class KafkaSinkConfiguration(val zookeeperConnect: String = "localhost:2181

companion object {

fun from(cfg: Config) : KafkaSinkConfiguration {
fun from(cfg: Config): KafkaSinkConfiguration {
return from(cfg.raw)
}

fun from(cfg: Map<String, String>) : KafkaSinkConfiguration {
fun from(cfg: Map<String, String>): KafkaSinkConfiguration {
val kafkaCfg = create(cfg)
validate(kafkaCfg)
return kafkaCfg
}

// Visible for testing
fun create(cfg: Map<String, String>): KafkaSinkConfiguration {
val config = cfg
.filterKeys { it.startsWith(kafkaConfigPrefix) }
.mapKeys { it.key.substring(kafkaConfigPrefix.length) }
val default = KafkaSinkConfiguration()


val keys = JSONUtils.asMap(default).keys.map { it.toPointCase() }
validate(config)
val extraProperties = config.filterKeys { !keys.contains(it) }

val streamsSinkConfiguration = StreamsSinkConfiguration.from(cfg)
Expand All @@ -71,7 +87,14 @@ data class KafkaSinkConfiguration(val zookeeperConnect: String = "localhost:2181
)
}

private fun validate(config: Map<String, String>) {
private fun validate(config: KafkaSinkConfiguration) {
validateConnection(config.zookeeperConnect, "zookeeper.connect", false)
validateConnection(config.bootstrapServers, CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG)
val schemaRegistryUrlKey = "schema.registry.url"
if (config.extraProperties.containsKey(schemaRegistryUrlKey)) {
val schemaRegistryUrl = config.extraProperties.getOrDefault(schemaRegistryUrlKey, "")
validateConnection(schemaRegistryUrl, schemaRegistryUrlKey, false)
}
validateDeserializers(config)
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ open class KafkaEventSinkBase {
graphDatabaseBuilder = org.neo4j.test.TestGraphDatabaseFactory()
.newImpermanentDatabaseBuilder()
.setConfig("kafka.bootstrap.servers", KafkaEventSinkSuiteIT.kafka.bootstrapServers)
.setConfig("kafka.zookeeper.connect", KafkaEventSinkSuiteIT.kafka.envMap["KAFKA_ZOOKEEPER_CONNECT"])
.setConfig("streams.sink.enabled", "true")
kafkaProducer = createProducer()
kafkaAvroProducer = createProducer(valueSerializer = KafkaAvroSerializer::class.java.name,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
package integrations.kafka

import io.confluent.kafka.serializers.KafkaAvroDeserializer
import org.junit.Test
import org.neo4j.kernel.internal.GraphDatabaseAPI
import org.neo4j.test.TestGraphDatabaseFactory
import org.testcontainers.containers.GenericContainer
import kotlin.test.assertEquals


class FakeWebServer: GenericContainer<FakeWebServer>("alpine") {
override fun start() {
this.withCommand("/bin/sh", "-c", "while true; do { echo -e 'HTTP/1.1 200 OK'; echo ; } | nc -l -p 8000; done")
.withExposedPorts(8000)
super.start()
}

fun getUrl() = "http://localhost:${getMappedPort(8000)}"
}

class KafkaEventSinkNoConfigurationIT {

private val topic = "no-config"

@Test
fun `the db should start even with no bootstrap servers provided()`() {
val db = TestGraphDatabaseFactory()
.newImpermanentDatabaseBuilder()
.setConfig("kafka.bootstrap.servers", "")
.setConfig("streams.sink.enabled", "true")
.setConfig("streams.sink.topic.cypher.$topic", "CREATE (p:Place{name: event.name, coordinates: event.coordinates, citizens: event.citizens})")
.newGraphDatabase() as GraphDatabaseAPI
val count = db.execute("MATCH (n) RETURN COUNT(n) AS count").columnAs<Long>("count").next()
assertEquals(0L, count)
}

@Test
fun `the db should start even with AVRO serializers and no schema registry url provided()`() {
val fakeWebServer = FakeWebServer()
fakeWebServer.start()
val url = fakeWebServer.getUrl().replace("http://", "")
val db = TestGraphDatabaseFactory()
.newImpermanentDatabaseBuilder()
.setConfig("kafka.bootstrap.servers", url)
.setConfig("kafka.zookeeper.connect", url)
.setConfig("streams.sink.enabled", "true")
.setConfig("streams.sink.topic.cypher.$topic", "CREATE (p:Place{name: event.name, coordinates: event.coordinates, citizens: event.citizens})")
.setConfig("kafka.key.deserializer", KafkaAvroDeserializer::class.java.name)
.setConfig("kafka.value.deserializer", KafkaAvroDeserializer::class.java.name)
.newGraphDatabase() as GraphDatabaseAPI
val count = db.execute("MATCH (n) RETURN COUNT(n) AS count").columnAs<Long>("count").next()
assertEquals(0L, count)
fakeWebServer.stop()
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ class KafkaEventSinkSuiteIT {
fun tearDownContainer() {
StreamsUtils.ignoreExceptions({
kafka.stop()
schemaRegistry.start()
}, kotlin.UninitializedPropertyAccessException::class.java)
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ import kotlin.test.assertTrue
class KafkaSinkConfigurationTest {

@Test
fun shouldReturnDefaultConfiguration() {
fun `should return default configuration`() {
val default = KafkaSinkConfiguration()
StreamsSinkConfigurationTest.testDefaultConf(default.streamsSinkConfiguration)

Expand All @@ -28,7 +28,7 @@ class KafkaSinkConfigurationTest {
}

@Test
fun shouldReturnConfigurationFromMap() {
fun `should return configuration from map`() {
val pollingInterval = "10"
val topic = "topic-neo"
val topicKey = "streams.sink.topic.cypher.$topic"
Expand Down Expand Up @@ -56,7 +56,7 @@ class KafkaSinkConfigurationTest {
"key.deserializer" to ByteArrayDeserializer::class.java.name,
"value.deserializer" to KafkaAvroDeserializer::class.java.name)

val kafkaSinkConfiguration = KafkaSinkConfiguration.from(config)
val kafkaSinkConfiguration = KafkaSinkConfiguration.create(config.raw)
StreamsSinkConfigurationTest.testFromConf(kafkaSinkConfiguration.streamsSinkConfiguration, pollingInterval, topic, topicValue)
assertEquals(emptyMap(), kafkaSinkConfiguration.extraProperties)
assertEquals(zookeeper, kafkaSinkConfiguration.zookeeperConnect)
Expand All @@ -75,4 +75,34 @@ class KafkaSinkConfigurationTest {
assertEquals(topicValue, streamsConfig.topics.cypherTopics[topic])
}

@Test(expected = RuntimeException::class)
fun `should not validate the configuration`() {
val zookeeper = "zookeeper:2181"
val bootstrap = "bootstrap:9092"
try {
val pollingInterval = "10"
val topic = "topic-neo"
val topicKey = "streams.sink.topic.cypher.$topic"
val topicValue = "MERGE (n:Label{ id: event.id }) "
val group = "foo"
val autoOffsetReset = "latest"
val autoCommit = "false"
val config = Config.builder()
.withSetting("streams.sink.polling.interval", pollingInterval)
.withSetting(topicKey, topicValue)
.withSetting("kafka.zookeeper.connect", zookeeper)
.withSetting("kafka.bootstrap.servers", bootstrap)
.withSetting("kafka.auto.offset.reset", autoOffsetReset)
.withSetting("kafka.enable.auto.commit", autoCommit)
.withSetting("kafka.group.id", group)
.withSetting("kafka.key.deserializer", ByteArrayDeserializer::class.java.name)
.withSetting("kafka.value.deserializer", KafkaAvroDeserializer::class.java.name)
.build()
KafkaSinkConfiguration.from(config)
} catch (e: RuntimeException) {
assertEquals("The servers defined into the property `kafka.bootstrap.servers` are not reachable: [$bootstrap]", e.message)
throw e
}
}

}

0 comments on commit 4a6729b

Please sign in to comment.