Skip to content

Commit

Permalink
fixes #147
Browse files Browse the repository at this point in the history
  • Loading branch information
conker84 committed Jan 31, 2019
1 parent 0bc6341 commit e0e12d1
Show file tree
Hide file tree
Showing 8 changed files with 138 additions and 16 deletions.
17 changes: 17 additions & 0 deletions common/src/main/kotlin/streams/utils/CoroutineUtils.kt
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
package streams.utils

import kotlinx.coroutines.delay

suspend fun <T> retryForException(exceptions: Array<Class<out Throwable>>, retries: Int, delayTime: Long, action: () -> T): T {
return try {
action()
} catch (e: Exception) {
val isInstance = exceptions.any { it.isInstance(e) }
if (isInstance && retries > 0) {
delay(delayTime)
retryForException(exceptions = exceptions, retries = retries - 1, delayTime = delayTime, action = action)
} else {
throw e
}
}
}
63 changes: 63 additions & 0 deletions common/src/test/kotlin/streams/utils/CoroutineUtilsTest.kt
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
package streams.utils

import kotlinx.coroutines.runBlocking
import org.junit.Test
import java.io.IOException
import java.lang.ClassCastException
import kotlin.test.assertEquals
import kotlin.test.assertTrue

class CoroutineUtilsTest {

@Test
fun `should success after retry for known exception`() = runBlocking {
var count = 0
var excuted = false
retryForException<Unit>(exceptions = arrayOf(RuntimeException::class.java),
retries = 4, delayTime = 100) {
if (count < 2) {
++count
throw RuntimeException()
}
excuted = true
}

assertEquals(2, count)
assertTrue { excuted }
}

@Test(expected = RuntimeException::class)
fun `should fail after retry for known exception`() {
var retries = 3
runBlocking {
retryForException<Unit>(exceptions = arrayOf(RuntimeException::class.java),
retries = 3, delayTime = 100) {
if (retries >= 0) {
--retries
throw RuntimeException()
}
}
}
}

@Test
fun `should fail fast unknown exception`() {
var iteration = 0
var isIOException = false
try {
runBlocking {
retryForException<Unit>(exceptions = arrayOf(RuntimeException::class.java),
retries = 3, delayTime = 100) {
if (iteration >= 0) {
++iteration
throw IOException()
}
}
}
} catch (e: Exception) {
isIOException = e is IOException
}
assertTrue { isIOException }
assertEquals(1, iteration)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,16 +3,21 @@ package streams.kafka.connect.sink
import kotlinx.coroutines.async
import kotlinx.coroutines.channels.ticker
import kotlinx.coroutines.coroutineScope
import kotlinx.coroutines.runBlocking
import kotlinx.coroutines.selects.whileSelect
import org.apache.kafka.common.config.ConfigException
import org.apache.kafka.connect.sink.SinkRecord
import org.neo4j.driver.v1.AuthTokens
import org.neo4j.driver.v1.Config
import org.neo4j.driver.v1.Driver
import org.neo4j.driver.v1.GraphDatabase
import org.neo4j.driver.v1.exceptions.ClientException
import org.neo4j.driver.v1.exceptions.TransientException
import org.slf4j.Logger
import org.slf4j.LoggerFactory
import streams.utils.StreamsUtils
import streams.utils.retryForException
import org.apache.kafka.connect.errors.ConnectException
import java.util.concurrent.TimeUnit


Expand Down Expand Up @@ -55,6 +60,7 @@ class Neo4jService(private val config: Neo4jSinkConnectorConfig) {
configBuilder.withMaxConnectionLifetime(this.config.connectionMaxConnectionLifetime, TimeUnit.MILLISECONDS)
configBuilder.withConnectionAcquisitionTimeout(this.config.connectionAcquisitionTimeout, TimeUnit.MILLISECONDS)
configBuilder.withLoadBalancingStrategy(this.config.loadBalancingStrategy)
configBuilder.withMaxTransactionRetryTime(config.retryBackoff, TimeUnit.MILLISECONDS)
val neo4jConfig = configBuilder.toConfig()
this.driver = GraphDatabase.driver(this.config.serverUri, authToken, neo4jConfig)
}
Expand All @@ -64,25 +70,28 @@ class Neo4jService(private val config: Neo4jSinkConnectorConfig) {
}

private fun write(topic: String, records: List<SinkRecord>) {
val session = driver.session()
val query = "${StreamsUtils.UNWIND} ${config.topicMap[topic]}"
val data = mapOf<String, Any>("events" to records.map { converter.convert(it.value()) })
session.writeTransaction {
driver.session().use { session ->
try {
it.run(query, data)
it.success()
if (log.isDebugEnabled) {
log.debug("Successfully executed query: `$query`, with data: `$data`")
runBlocking {
retryForException<Unit>(exceptions = arrayOf(ClientException::class.java, TransientException::class.java),
retries = config.retryMaxAttempts, delayTime = 0) { // we use the delayTime = 0, because we delegate the retryBackoff to the Neo4j Java Driver
session.writeTransaction {
val result = it.run(query, data)
if (log.isDebugEnabled) {
log.debug("Successfully executed query: `$query`. Summary: ${result.summary()}")
}
}
}
}
} catch (e: Exception) {
if (log.isDebugEnabled) {
log.debug("Exception `${e.message}` while executing query: `$query`, with data: `$data`")
}
it.failure()
throw e
}
it.close()
}
session.close()
}

suspend fun writeData(data: Map<String, List<List<SinkRecord>>>) = coroutineScope {
Expand All @@ -105,8 +114,12 @@ class Neo4jService(private val config: Neo4jSinkConnectorConfig) {
it.onAwait { !isAllCompleted } // Stops the whileSelect
}
}
val exceptionMessages = deferredList
.mapNotNull { it.getCompletionExceptionOrNull() }
.map { it.message }
.joinToString("\n")
if (exceptionMessages.isNotBlank()) {
throw ConnectException(exceptionMessages)
}
}



}
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import org.apache.kafka.common.config.ConfigDef
import org.apache.kafka.connect.connector.Task
import org.apache.kafka.connect.sink.SinkConnector
import org.slf4j.LoggerFactory
import streams.kafka.connect.utils.PropertiesUtil

@Title("Neo4j Sink Connector")
@Description("The Neo4j Sink connector reads data from Kafka and and writes the data to Neo4j using a Cypher Template")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import org.apache.kafka.common.config.ConfigException
import org.apache.kafka.connect.sink.SinkTask
import org.neo4j.driver.internal.async.pool.PoolSettings
import org.neo4j.driver.v1.Config
import streams.kafka.connect.utils.PropertiesUtil
import java.io.File
import java.net.URI
import java.util.concurrent.TimeUnit
Expand All @@ -26,6 +27,7 @@ object ConfigGroup {
const val AUTHENTICATION = "Authentication"
const val TOPIC_CYPHER_MAPPING = "Topic Cypher Mapping"
const val BATCH = "Batch Management"
const val RETRY = "Retry Strategy"
const val DEPRECATED = "Deprecated Properties (please check the documentation)"
}

Expand All @@ -47,6 +49,9 @@ class Neo4jSinkConnectorConfig(originals: Map<*, *>): AbstractConfig(config(), o
val connectionAcquisitionTimeout: Long
val loadBalancingStrategy: Config.LoadBalancingStrategy

val retryBackoff: Long
val retryMaxAttempts: Int

val batchTimeout: Long
val batchSize: Int

Expand Down Expand Up @@ -76,6 +81,9 @@ class Neo4jSinkConnectorConfig(originals: Map<*, *>): AbstractConfig(config(), o
loadBalancingStrategy = ConfigUtils
.getEnum(Config.LoadBalancingStrategy::class.java, this, CONNECTION_LOAD_BALANCE_STRATEGY)

retryBackoff = getLong(RETRY_BACKOFF_MSECS)
retryMaxAttempts = getInt(RETRY_MAX_ATTEMPTS)

batchTimeout = getLong(BATCH_TIMEOUT_MSECS)
batchSize = getInt(BATCH_SIZE)

Expand Down Expand Up @@ -121,11 +129,16 @@ class Neo4jSinkConnectorConfig(originals: Map<*, *>): AbstractConfig(config(), o
const val BATCH_SIZE = "neo4j.batch.size"
const val BATCH_TIMEOUT_MSECS = "neo4j.batch.timeout.msecs"

const val RETRY_BACKOFF_MSECS = "neo4j.retry.backoff.msecs"
const val RETRY_MAX_ATTEMPTS = "neo4j.retry.max.attemps"

const val TOPIC_CYPHER_PREFIX = "neo4j.topic.cypher."

const val CONNECTION_POOL_MAX_SIZE_DEFAULT = 100
val BATCH_TIMEOUT_DEFAULT = TimeUnit.SECONDS.toMillis(30L)
const val BATCH_SIZE_DEFAULT = 1000
val RETRY_BACKOFF_DEFAULT = TimeUnit.SECONDS.toMillis(30L)
const val RETRY_MAX_ATTEMPTS_DEFAULT = 5

fun config(): ConfigDef {
return ConfigDef()
Expand Down Expand Up @@ -258,6 +271,21 @@ class Neo4jSinkConnectorConfig(originals: Map<*, *>): AbstractConfig(config(), o
.defaultValue(BATCH_TIMEOUT_DEFAULT)
.group(ConfigGroup.BATCH)
.validator(ConfigDef.Range.atLeast(1)).build())
.define(ConfigKeyBuilder
.of(RETRY_BACKOFF_MSECS, ConfigDef.Type.LONG)
.documentation(PropertiesUtil.getProperty(RETRY_BACKOFF_MSECS))
.importance(ConfigDef.Importance.MEDIUM)
.defaultValue(RETRY_BACKOFF_DEFAULT)
.group(ConfigGroup.RETRY)
.validator(ConfigDef.Range.atLeast(1))
.build())
.define(ConfigKeyBuilder
.of(RETRY_MAX_ATTEMPTS, ConfigDef.Type.INT)
.documentation(PropertiesUtil.getProperty(RETRY_MAX_ATTEMPTS))
.importance(ConfigDef.Importance.MEDIUM)
.defaultValue(RETRY_MAX_ATTEMPTS_DEFAULT)
.group(ConfigGroup.RETRY)
.validator(ConfigDef.Range.atLeast(1)).build())
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,14 +29,12 @@ class Neo4jSinkTask : SinkTask() {
return@runBlocking
}

// TODO define a retry policy in that case we must throw `RetriableException`
val data = EventBuilder()
.withBatchSize(config.batchSize)
.withTopics(config.topicMap.keys)
.withSinkRecords(collection)
.build()
neo4jService.writeData(data)

}

override fun stop() {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package streams.kafka.connect.sink
package streams.kafka.connect.utils

import org.slf4j.LoggerFactory
import java.util.*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,4 +28,6 @@ neo4j.connection.max.lifetime.msecs=Type: Long;\nDescription: The max Neo4j conn
neo4j.connection.acquisition.timeout.msecs=Type: Long;\nDescription: The max Neo4j acquisition timeout (default 1 hour)
neo4j.connection.liveness.check.timeout.msecs=Type: Long;\nDescription: The max Neo4j liveness check timeout (default 1 hour)
neo4j.connection.max.pool.size=Type: Int;\nDescription: The max pool size (default 100)
neo4j.load.balance.strategy=Type: enum[ROUND_ROBIN, LEAST_CONNECTED];\nDescription: The Neo4j load balance strategy (default LEAST_CONNECTED)
neo4j.load.balance.strategy=Type: enum[ROUND_ROBIN, LEAST_CONNECTED];\nDescription: The Neo4j load balance strategy (default LEAST_CONNECTED)
neo4j.retry.backoff.msecs=Type: Long;\nDescription: The time in milliseconds to wait following a transient error before a retry attempt is made (default 30000).
neo4j.retry.max.attemps=Type: Int;\nDescription: The maximum number of times to retry on transient errors before failing the task (default 5).

0 comments on commit e0e12d1

Please sign in to comment.