From 4a6729b229e58df2e43ecff819c21c8873336618 Mon Sep 17 00:00:00 2001 From: Andrea Santurbano Date: Thu, 25 Jul 2019 15:32:48 +0200 Subject: [PATCH] fixes #181: Validate configuration before attempting to start --- .../kotlin/streams/utils/ValidationUtils.kt | 41 ++++++++------ .../streams/utils/ValidationUtilsTest.kt | 36 ++++++++++++ .../StreamsEventSinkExtensionFactory.kt | 1 - .../kotlin/streams/kafka/KafkaEventSink.kt | 28 ++++++---- .../streams/kafka/KafkaSinkConfiguration.kt | 43 +++++++++++---- .../integrations/kafka/KafkaEventSinkBase.kt | 1 + .../kafka/KafkaEventSinkNoConfigurationIT.kt | 55 +++++++++++++++++++ .../kafka/KafkaEventSinkSuiteIT.kt | 1 + .../kafka/KafkaSinkConfigurationTest.kt | 36 +++++++++++- 9 files changed, 200 insertions(+), 42 deletions(-) create mode 100644 common/src/test/kotlin/streams/utils/ValidationUtilsTest.kt create mode 100644 consumer/src/test/kotlin/integrations/kafka/KafkaEventSinkNoConfigurationIT.kt diff --git a/common/src/main/kotlin/streams/utils/ValidationUtils.kt b/common/src/main/kotlin/streams/utils/ValidationUtils.kt index a12e5043..2d44c7a7 100644 --- a/common/src/main/kotlin/streams/utils/ValidationUtils.kt +++ b/common/src/main/kotlin/streams/utils/ValidationUtils.kt @@ -1,22 +1,31 @@ package streams.utils +import java.io.IOException +import java.net.Socket +import java.net.URI + object ValidationUtils { - fun validateTopics(cdcMergeTopics: Set, cdcSchemaTopics: Set, - cypherTopics: Set, nodePatternTopics: Set, relPatternTopics: Set) { - val allTopicsLists = mutableListOf() - allTopicsLists.addAll(cdcMergeTopics) - allTopicsLists.addAll(cdcSchemaTopics) - allTopicsLists.addAll(cypherTopics) - allTopicsLists.addAll(nodePatternTopics) - allTopicsLists.addAll(relPatternTopics) - val crossDefinedTopics = allTopicsLists.map { it to 1 } - .groupBy({ it.first }, { it.second }) - .mapValues { it.value.reduce { acc, i -> acc + i } } - .filterValues { it > 1 } - .keys - if (crossDefinedTopics.isNotEmpty()) { - throw RuntimeException("The following topics are cross defined: $crossDefinedTopics") - } + fun isServerReachable(url: String, port: Int): Boolean = try { + Socket(url, port).use { true } + } catch (e: IOException) { + false } + + fun checkServersUnreachable(urls: String, separator: String = ","): List = urls + .split(separator) + .map { + val uri = URI.create(it) + when (uri.host.isNullOrBlank()) { + true -> { + val splitted = it.split(":") + URI("fake-scheme", "", splitted.first(), splitted.last().toInt(), + "", "", "") + } + else -> uri + } + } + .filter { uri -> !isServerReachable(uri.host, uri.port) } + .map { if (it.scheme == "fake-scheme") "${it.host}:${it.port}" else it.toString() } + } \ No newline at end of file diff --git a/common/src/test/kotlin/streams/utils/ValidationUtilsTest.kt b/common/src/test/kotlin/streams/utils/ValidationUtilsTest.kt new file mode 100644 index 00000000..46646b59 --- /dev/null +++ b/common/src/test/kotlin/streams/utils/ValidationUtilsTest.kt @@ -0,0 +1,36 @@ +package streams.utils + +import org.junit.Test +import org.testcontainers.containers.GenericContainer +import kotlin.test.assertEquals +import kotlin.test.assertTrue + +class FakeWebServer: GenericContainer("alpine") { + override fun start() { + this.withCommand("/bin/sh", "-c", "while true; do { echo -e 'HTTP/1.1 200 OK'; echo ; } | nc -l -p 8000; done") + .withExposedPorts(8000) + super.start() + } + + fun getUrl() = "http://localhost:${getMappedPort(8000)}" +} + +class ValidationUtilsTest { + + @Test + fun `should reach the server`() { + val httpServer = FakeWebServer() + httpServer.start() + assertTrue { ValidationUtils.checkServersUnreachable(httpServer.getUrl()).isEmpty() } + httpServer.stop() + } + + @Test + fun `should not reach the server`() { + val urls = "http://my.fake.host:1234,PLAINTEXT://my.fake.host1:1234,my.fake.host2:1234" + val checkServersUnreachable = ValidationUtils + .checkServersUnreachable(urls) + assertTrue { checkServersUnreachable.isNotEmpty() } + assertEquals(urls.split(",").toList(), checkServersUnreachable) + } +} \ No newline at end of file diff --git a/consumer/src/main/kotlin/streams/StreamsEventSinkExtensionFactory.kt b/consumer/src/main/kotlin/streams/StreamsEventSinkExtensionFactory.kt index 8dd31fe2..bc91f126 100644 --- a/consumer/src/main/kotlin/streams/StreamsEventSinkExtensionFactory.kt +++ b/consumer/src/main/kotlin/streams/StreamsEventSinkExtensionFactory.kt @@ -84,7 +84,6 @@ class StreamsEventSinkExtensionFactory : KernelExtensionFactory) { - val kafkaKeyConfig = config[ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG].orEmpty() - val kafkaValueConfig = config[ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG].orEmpty() - val key = if (kafkaKeyConfig.isNotBlank() && !SUPPORTED_DESERIALIZER.contains(kafkaKeyConfig)) { +private fun validateDeserializers(config: KafkaSinkConfiguration) { + val key = if (!SUPPORTED_DESERIALIZER.contains(config.keyDeserializer)) { ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG - } else if (kafkaValueConfig.isNotBlank() && !SUPPORTED_DESERIALIZER.contains(kafkaValueConfig)) { + } else if (!SUPPORTED_DESERIALIZER.contains(config.valueDeserializer)) { ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG } else { "" } if (key.isNotBlank()) { - throw RuntimeException("The property kafka.$key contains an invalid deserializer. Supported deserializers are $SUPPORTED_DESERIALIZER") + throw RuntimeException("The property `kafka.$key` contains an invalid deserializer. Supported deserializers are $SUPPORTED_DESERIALIZER") + } +} + +private fun validateConnection(url: String, kafkaPropertyKey: String, checkReachable: Boolean = true) { + if (url.isBlank()) { + throw RuntimeException("The `kafka.$kafkaPropertyKey` property is empty") + } else if (checkReachable) { + val unreachableServers = ValidationUtils.checkServersUnreachable(url) + if (unreachableServers.isNotEmpty()) { + throw RuntimeException("The servers defined into the property `kafka.$kafkaPropertyKey` are not reachable: $unreachableServers") + } } } @@ -42,11 +52,18 @@ data class KafkaSinkConfiguration(val zookeeperConnect: String = "localhost:2181 companion object { - fun from(cfg: Config) : KafkaSinkConfiguration { + fun from(cfg: Config): KafkaSinkConfiguration { return from(cfg.raw) } - fun from(cfg: Map) : KafkaSinkConfiguration { + fun from(cfg: Map): KafkaSinkConfiguration { + val kafkaCfg = create(cfg) + validate(kafkaCfg) + return kafkaCfg + } + + // Visible for testing + fun create(cfg: Map): KafkaSinkConfiguration { val config = cfg .filterKeys { it.startsWith(kafkaConfigPrefix) } .mapKeys { it.key.substring(kafkaConfigPrefix.length) } @@ -54,7 +71,6 @@ data class KafkaSinkConfiguration(val zookeeperConnect: String = "localhost:2181 val keys = JSONUtils.asMap(default).keys.map { it.toPointCase() } - validate(config) val extraProperties = config.filterKeys { !keys.contains(it) } val streamsSinkConfiguration = StreamsSinkConfiguration.from(cfg) @@ -71,7 +87,14 @@ data class KafkaSinkConfiguration(val zookeeperConnect: String = "localhost:2181 ) } - private fun validate(config: Map) { + private fun validate(config: KafkaSinkConfiguration) { + validateConnection(config.zookeeperConnect, "zookeeper.connect", false) + validateConnection(config.bootstrapServers, CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG) + val schemaRegistryUrlKey = "schema.registry.url" + if (config.extraProperties.containsKey(schemaRegistryUrlKey)) { + val schemaRegistryUrl = config.extraProperties.getOrDefault(schemaRegistryUrlKey, "") + validateConnection(schemaRegistryUrl, schemaRegistryUrlKey, false) + } validateDeserializers(config) } } diff --git a/consumer/src/test/kotlin/integrations/kafka/KafkaEventSinkBase.kt b/consumer/src/test/kotlin/integrations/kafka/KafkaEventSinkBase.kt index 2776a866..9114d659 100644 --- a/consumer/src/test/kotlin/integrations/kafka/KafkaEventSinkBase.kt +++ b/consumer/src/test/kotlin/integrations/kafka/KafkaEventSinkBase.kt @@ -60,6 +60,7 @@ open class KafkaEventSinkBase { graphDatabaseBuilder = org.neo4j.test.TestGraphDatabaseFactory() .newImpermanentDatabaseBuilder() .setConfig("kafka.bootstrap.servers", KafkaEventSinkSuiteIT.kafka.bootstrapServers) + .setConfig("kafka.zookeeper.connect", KafkaEventSinkSuiteIT.kafka.envMap["KAFKA_ZOOKEEPER_CONNECT"]) .setConfig("streams.sink.enabled", "true") kafkaProducer = createProducer() kafkaAvroProducer = createProducer(valueSerializer = KafkaAvroSerializer::class.java.name, diff --git a/consumer/src/test/kotlin/integrations/kafka/KafkaEventSinkNoConfigurationIT.kt b/consumer/src/test/kotlin/integrations/kafka/KafkaEventSinkNoConfigurationIT.kt new file mode 100644 index 00000000..505ff2b4 --- /dev/null +++ b/consumer/src/test/kotlin/integrations/kafka/KafkaEventSinkNoConfigurationIT.kt @@ -0,0 +1,55 @@ +package integrations.kafka + +import io.confluent.kafka.serializers.KafkaAvroDeserializer +import org.junit.Test +import org.neo4j.kernel.internal.GraphDatabaseAPI +import org.neo4j.test.TestGraphDatabaseFactory +import org.testcontainers.containers.GenericContainer +import kotlin.test.assertEquals + + +class FakeWebServer: GenericContainer("alpine") { + override fun start() { + this.withCommand("/bin/sh", "-c", "while true; do { echo -e 'HTTP/1.1 200 OK'; echo ; } | nc -l -p 8000; done") + .withExposedPorts(8000) + super.start() + } + + fun getUrl() = "http://localhost:${getMappedPort(8000)}" +} + +class KafkaEventSinkNoConfigurationIT { + + private val topic = "no-config" + + @Test + fun `the db should start even with no bootstrap servers provided()`() { + val db = TestGraphDatabaseFactory() + .newImpermanentDatabaseBuilder() + .setConfig("kafka.bootstrap.servers", "") + .setConfig("streams.sink.enabled", "true") + .setConfig("streams.sink.topic.cypher.$topic", "CREATE (p:Place{name: event.name, coordinates: event.coordinates, citizens: event.citizens})") + .newGraphDatabase() as GraphDatabaseAPI + val count = db.execute("MATCH (n) RETURN COUNT(n) AS count").columnAs("count").next() + assertEquals(0L, count) + } + + @Test + fun `the db should start even with AVRO serializers and no schema registry url provided()`() { + val fakeWebServer = FakeWebServer() + fakeWebServer.start() + val url = fakeWebServer.getUrl().replace("http://", "") + val db = TestGraphDatabaseFactory() + .newImpermanentDatabaseBuilder() + .setConfig("kafka.bootstrap.servers", url) + .setConfig("kafka.zookeeper.connect", url) + .setConfig("streams.sink.enabled", "true") + .setConfig("streams.sink.topic.cypher.$topic", "CREATE (p:Place{name: event.name, coordinates: event.coordinates, citizens: event.citizens})") + .setConfig("kafka.key.deserializer", KafkaAvroDeserializer::class.java.name) + .setConfig("kafka.value.deserializer", KafkaAvroDeserializer::class.java.name) + .newGraphDatabase() as GraphDatabaseAPI + val count = db.execute("MATCH (n) RETURN COUNT(n) AS count").columnAs("count").next() + assertEquals(0L, count) + fakeWebServer.stop() + } +} \ No newline at end of file diff --git a/consumer/src/test/kotlin/integrations/kafka/KafkaEventSinkSuiteIT.kt b/consumer/src/test/kotlin/integrations/kafka/KafkaEventSinkSuiteIT.kt index 113cfbfa..028c238d 100644 --- a/consumer/src/test/kotlin/integrations/kafka/KafkaEventSinkSuiteIT.kt +++ b/consumer/src/test/kotlin/integrations/kafka/KafkaEventSinkSuiteIT.kt @@ -58,6 +58,7 @@ class KafkaEventSinkSuiteIT { fun tearDownContainer() { StreamsUtils.ignoreExceptions({ kafka.stop() + schemaRegistry.start() }, kotlin.UninitializedPropertyAccessException::class.java) } } diff --git a/consumer/src/test/kotlin/streams/kafka/KafkaSinkConfigurationTest.kt b/consumer/src/test/kotlin/streams/kafka/KafkaSinkConfigurationTest.kt index b8e09648..9644aae5 100644 --- a/consumer/src/test/kotlin/streams/kafka/KafkaSinkConfigurationTest.kt +++ b/consumer/src/test/kotlin/streams/kafka/KafkaSinkConfigurationTest.kt @@ -14,7 +14,7 @@ import kotlin.test.assertTrue class KafkaSinkConfigurationTest { @Test - fun shouldReturnDefaultConfiguration() { + fun `should return default configuration`() { val default = KafkaSinkConfiguration() StreamsSinkConfigurationTest.testDefaultConf(default.streamsSinkConfiguration) @@ -28,7 +28,7 @@ class KafkaSinkConfigurationTest { } @Test - fun shouldReturnConfigurationFromMap() { + fun `should return configuration from map`() { val pollingInterval = "10" val topic = "topic-neo" val topicKey = "streams.sink.topic.cypher.$topic" @@ -56,7 +56,7 @@ class KafkaSinkConfigurationTest { "key.deserializer" to ByteArrayDeserializer::class.java.name, "value.deserializer" to KafkaAvroDeserializer::class.java.name) - val kafkaSinkConfiguration = KafkaSinkConfiguration.from(config) + val kafkaSinkConfiguration = KafkaSinkConfiguration.create(config.raw) StreamsSinkConfigurationTest.testFromConf(kafkaSinkConfiguration.streamsSinkConfiguration, pollingInterval, topic, topicValue) assertEquals(emptyMap(), kafkaSinkConfiguration.extraProperties) assertEquals(zookeeper, kafkaSinkConfiguration.zookeeperConnect) @@ -75,4 +75,34 @@ class KafkaSinkConfigurationTest { assertEquals(topicValue, streamsConfig.topics.cypherTopics[topic]) } + @Test(expected = RuntimeException::class) + fun `should not validate the configuration`() { + val zookeeper = "zookeeper:2181" + val bootstrap = "bootstrap:9092" + try { + val pollingInterval = "10" + val topic = "topic-neo" + val topicKey = "streams.sink.topic.cypher.$topic" + val topicValue = "MERGE (n:Label{ id: event.id }) " + val group = "foo" + val autoOffsetReset = "latest" + val autoCommit = "false" + val config = Config.builder() + .withSetting("streams.sink.polling.interval", pollingInterval) + .withSetting(topicKey, topicValue) + .withSetting("kafka.zookeeper.connect", zookeeper) + .withSetting("kafka.bootstrap.servers", bootstrap) + .withSetting("kafka.auto.offset.reset", autoOffsetReset) + .withSetting("kafka.enable.auto.commit", autoCommit) + .withSetting("kafka.group.id", group) + .withSetting("kafka.key.deserializer", ByteArrayDeserializer::class.java.name) + .withSetting("kafka.value.deserializer", KafkaAvroDeserializer::class.java.name) + .build() + KafkaSinkConfiguration.from(config) + } catch (e: RuntimeException) { + assertEquals("The servers defined into the property `kafka.bootstrap.servers` are not reachable: [$bootstrap]", e.message) + throw e + } + } + } \ No newline at end of file